Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid creating temporary inproc channels #4625

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/fair_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ TEST (fair_queue, cleanup)
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);

// Either closing or resetting the channel should make it eligible for cleanup
// Only closing the channel should make it eligible for cleanup
channel1->close ();
channel2.reset ();

Expand All @@ -275,5 +275,5 @@ TEST (fair_queue, cleanup)
ASSERT_TRUE (queue.periodic_update (0s));

ASSERT_TRUE (queue.empty ());
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.queues_size (), 2);
}
99 changes: 17 additions & 82 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,29 @@ template <typename Request, typename Source>
class fair_queue final
{
public:
struct origin
{
Source source;
std::shared_ptr<nano::transport::channel> channel;

origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ channel }
{
}
};

private:
/**
* Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request.
*/
struct origin_entry
struct origin
{
Source source;

// Optional is needed to distinguish between a source with no associated channel and a source with an expired channel
// TODO: Store channel as shared_ptr after networking fixes are done
std::optional<std::weak_ptr<nano::transport::channel>> maybe_channel;
// This can be null for some sources (eg. local RPC) to indicate that the source is not associated with a channel.
std::shared_ptr<nano::transport::channel> channel;

origin_entry (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source }
origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ std::move (channel) }
{
if (channel)
{
maybe_channel = std::weak_ptr{ channel };
}
}

origin_entry (origin const & origin) :
origin_entry (origin.source, origin.channel)
{
}
origin (origin const & origin) = default;

bool alive () const
{
if (maybe_channel)
if (channel)
{
if (auto channel_l = maybe_channel->lock ())
{
return channel_l->alive ();
}
else
{
return false;
}
return channel->alive ();
}
else
{
Expand All @@ -76,49 +49,10 @@ class fair_queue final
}
}

// TODO: Store channel as shared_ptr to avoid this mess
auto operator<=> (origin_entry const & other) const
{
// First compare source
if (auto cmp = source <=> other.source; cmp != 0)
{
return cmp;
}

if (maybe_channel && other.maybe_channel)
{
// Then compare channels by ownership, not by the channel's value or state
std::owner_less<std::weak_ptr<nano::transport::channel>> less;
if (less (*maybe_channel, *other.maybe_channel))
{
return std::strong_ordering::less;
}
if (less (*other.maybe_channel, *maybe_channel))
{
return std::strong_ordering::greater;
}
}
else
{
if (maybe_channel && !other.maybe_channel)
{
return std::strong_ordering::greater;
}
if (!maybe_channel && other.maybe_channel)
{
return std::strong_ordering::less;
}
}

return std::strong_ordering::equivalent;
}

operator origin () const
{
return { source, maybe_channel ? maybe_channel->lock () : nullptr };
}
auto operator<=> (origin const & other) const = default;
};

private:
struct entry
{
using queue_t = std::deque<Request>;
Expand Down Expand Up @@ -264,6 +198,7 @@ class fair_queue final
value_type next ()
{
release_assert (!empty ()); // Should be checked before calling next
debug_assert ((std::chrono::steady_clock::now () - last_update) < 60s); // The queue should be cleaned up periodically

if (should_seek ())
{
Expand All @@ -283,6 +218,8 @@ class fair_queue final

std::deque<value_type> next_batch (size_t max_count)
{
periodic_update ();

auto const count = std::min (size (), max_count);

std::deque<value_type> result;
Expand Down Expand Up @@ -358,13 +295,11 @@ class fair_queue final
}

private:
std::map<origin_entry, entry> queues;
typename std::map<origin_entry, entry>::iterator iterator{ queues.end () };
std::map<origin, entry> queues;
typename std::map<origin, entry>::iterator iterator{ queues.end () };
size_t counter{ 0 };

size_t total_size{ 0 };

std::chrono::steady_clock::time_point last_update{};
std::chrono::steady_clock::time_point last_update{ std::chrono::steady_clock::now () };

public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const
Expand Down
5 changes: 3 additions & 2 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::
stats (stats_a),
logger (logger_a),
is_final (is_final_a),
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 }
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 },
inproc_channel{ std::make_shared<nano::transport::inproc::channel> (node, node) }
{
vote_generation_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
Expand Down Expand Up @@ -250,7 +251,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &
{
network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (node, node)); // TODO: Avoid creating a temporary channel each time
vote_processor.vote (vote_a, inproc_channel);
}

void nano::vote_generator::run ()
Expand Down
1 change: 1 addition & 0 deletions nano/node/vote_generator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,6 @@ class vote_generator final
std::deque<candidate_t> candidates;
std::atomic<bool> stopped{ false };
std::thread thread;
std::shared_ptr<nano::transport::channel> inproc_channel;
};
}
Loading