diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index a9c5c1b8d2..d84ac7da05 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -878,68 +878,6 @@ TEST (network, tcp_no_accept_excluded_peers) ASSERT_TIMELY_EQ (5s, node0->network.size (), 1); } -/* -namespace nano -{ -TEST (network, tcp_message_manager) -{ - nano::transport::tcp_message_manager manager (1); - item.node_id = nano::account (100); - ASSERT_EQ (0, manager.entries.size ()); - manager.put_message (item); - ASSERT_EQ (1, manager.entries.size ()); - ASSERT_EQ (manager.get_message ().node_id, item.node_id); - ASSERT_EQ (0, manager.entries.size ()); - - // Fill the queue - manager.entries = decltype (manager.entries) (manager.max_entries, item); - ASSERT_EQ (manager.entries.size (), manager.max_entries); - - // This task will wait until a message is consumed - auto future = std::async (std::launch::async, [&] { - manager.put_message (item); - }); - - // This should give sufficient time to execute put_message - // and prove that it waits on condition variable - std::this_thread::sleep_for (200ms); - - ASSERT_EQ (manager.entries.size (), manager.max_entries); - ASSERT_EQ (manager.get_message ().node_id, item.node_id); - ASSERT_NE (std::future_status::timeout, future.wait_for (1s)); - ASSERT_EQ (manager.entries.size (), manager.max_entries); - - nano::tcp_message_manager manager2 (2); - size_t message_count = 10'000; - std::vector consumers; - for (auto i = 0; i < 4; ++i) - { - consumers.emplace_back ([&] { - for (auto i = 0; i < message_count; ++i) - { - ASSERT_EQ (manager.get_message ().node_id, item.node_id); - } - }); - } - std::vector producers; - for (auto i = 0; i < 4; ++i) - { - producers.emplace_back ([&] { - for (auto i = 0; i < message_count; ++i) - { - manager.put_message (item); - } - }); - } - - for (auto & t : boost::range::join (producers, consumers)) - { - t.join (); - } -} -} -*/ - TEST (network, cleanup_purge) { auto test_start = std::chrono::steady_clock::now (); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 8cc7727d56..35e53d9572 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -273,6 +273,9 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue); ASSERT_EQ (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads); ASSERT_EQ (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size); + + ASSERT_EQ (conf.node.message_processor.threads, defaults.node.message_processor.threads); + ASSERT_EQ (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue); } TEST (toml, optional_child) @@ -584,6 +587,10 @@ TEST (toml, daemon_config_deserialize_no_defaults) threads = 999 batch_size = 999 + [node.message_processor] + threads = 999 + max_queue = 999 + [opencl] device = 999 enable = true @@ -741,6 +748,9 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue); ASSERT_NE (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads); ASSERT_NE (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size); + + ASSERT_NE (conf.node.message_processor.threads, defaults.node.message_processor.threads); + ASSERT_NE (conf.node.message_processor.max_queue, defaults.node.message_processor.max_queue); } /** There should be no required values **/ diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 662981d0a6..f78371b799 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -49,7 +49,7 @@ enum class type election, blockprocessor, network, - network_processed, + message, channel, channel_sent, socket, @@ -78,6 +78,7 @@ enum class type thread_runner, signal_manager, peer_history, + message_processor, // bootstrap bulk_pull_client, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 616a8a573e..18f34e7dfc 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -72,6 +72,9 @@ enum class type syn_cookies, peer_history, port_mapping, + message_processor, + message_processor_overfill, + message_processor_type, bootstrap_ascending, bootstrap_ascending_accounts, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 8b1b7618e0..810514a14b 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -22,8 +22,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::work: thread_role_name_string = "Work pool"; break; - case nano::thread_role::name::packet_processing: - thread_role_name_string = "Pkt processing"; + case nano::thread_role::name::message_processing: + thread_role_name_string = "Msg processing"; break; case nano::thread_role::name::vote_processing: thread_role_name_string = "Vote processing"; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index ceaeac6652..1b8d50b639 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -12,7 +12,7 @@ enum class name unknown, io, work, - packet_processing, + message_processing, vote_processing, block_processing, request_loop, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index a069e9715e..24635fe0fe 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -99,6 +99,8 @@ add_library( local_vote_history.hpp make_store.hpp make_store.cpp + message_processor.hpp + message_processor.cpp network.hpp network.cpp nodeconfig.hpp diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp new file mode 100644 index 0000000000..16606f594c --- /dev/null +++ b/nano/node/message_processor.cpp @@ -0,0 +1,316 @@ +#include +#include +#include + +nano::message_processor::message_processor (message_processor_config const & config_a, nano::node & node_a) : + config{ config_a }, + node{ node_a }, + stats{ node.stats }, + logger{ node.logger } +{ + queue.max_size_query = [this] (auto const & origin) { + return config.max_queue; + }; + + queue.priority_query = [this] (auto const & origin) { + return 1; + }; +} + +nano::message_processor::~message_processor () +{ + debug_assert (threads.empty ()); +} + +void nano::message_processor::start () +{ + debug_assert (threads.empty ()); + + for (int n = 0; n < config.threads; ++n) + { + threads.emplace_back ([this] () { + nano::thread_role::set (nano::thread_role::name::message_processing); + try + { + run (); + } + catch (boost::system::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); + release_assert (false); + } + catch (...) + { + node.logger.critical (nano::log::type::network, "Unknown error"); + release_assert (false); + } + }); + } +} + +void nano::message_processor::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + condition.notify_all (); + + for (auto & thread : threads) + { + if (thread.joinable ()) + { + thread.join (); + } + } + threads.clear (); +} + +bool nano::message_processor::put (std::unique_ptr message, std::shared_ptr const & channel) +{ + release_assert (message != nullptr); + release_assert (channel != nullptr); + + auto const type = message->type (); + + bool added = false; + { + nano::lock_guard guard{ mutex }; + added = queue.push ({ std::move (message), channel }, { nano::no_value{}, channel }); + } + if (added) + { + stats.inc (nano::stat::type::message_processor, nano::stat::detail::process); + stats.inc (nano::stat::type::message_processor_type, to_stat_detail (type)); + + condition.notify_all (); + } + else + { + stats.inc (nano::stat::type::message_processor, nano::stat::detail::overfill); + stats.inc (nano::stat::type::message_processor_overfill, to_stat_detail (type)); + } + return added; +} + +void nano::message_processor::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + stats.inc (nano::stat::type::message_processor, nano::stat::detail::loop); + + if (!queue.empty ()) + { + run_batch (lock); + debug_assert (!lock.owns_lock ()); + lock.lock (); + } + else + { + condition.wait (lock, [&] { + return stopped || !queue.empty (); + }); + } + } +} + +void nano::message_processor::run_batch (nano::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); + + nano::timer timer; + timer.start (); + + size_t const max_batch_size = 1024 * 4; + auto batch = queue.next_batch (max_batch_size); + + lock.unlock (); + + for (auto const & [entry, origin] : batch) + { + auto const & [message, channel] = entry; + release_assert (message != nullptr); + process (*message, channel); + } + + if (timer.since_start () > std::chrono::milliseconds (100)) + { + logger.debug (nano::log::type::message_processor, "Processed {} messages in {} milliseconds (rate of {} messages per second)", + batch.size (), + timer.since_start ().count (), + ((batch.size () * 1000ULL) / timer.value ().count ())); + } +} + +namespace +{ +// TODO: This was moved, so compare with latest develop before merging to avoid merge bugs +class process_visitor : public nano::message_visitor +{ +public: + process_visitor (nano::node & node_a, std::shared_ptr const & channel_a) : + node{ node_a }, + channel{ channel_a } + { + } + + void keepalive (nano::keepalive const & message) override + { + // Check for special node port data + auto peer0 (message.peers[0]); + if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) + { + // TODO: Remove this as we do not need to establish a second connection to the same peer + nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ()); + node.network.merge_peer (new_endpoint); + + // Remember this for future forwarding to other peers + channel->set_peering_endpoint (new_endpoint); + } + } + + void publish (nano::publish const & message) override + { + bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + if (!added) + { + node.network.publish_filter.clear (message.digest); + node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); + } + } + + void confirm_req (nano::confirm_req const & message) override + { + // Don't load nodes with disabled voting + // TODO: This check should be cached somewhere + if (node.config.enable_voting && node.wallets.reps ().voting > 0) + { + if (!message.roots_hashes.empty ()) + { + node.aggregator.request (message.roots_hashes, channel); + } + } + } + + void confirm_ack (nano::confirm_ack const & message) override + { + if (!message.vote->account.is_zero ()) + { + node.vote_processor.vote (message.vote, channel); + } + } + + void bulk_pull (nano::bulk_pull const &) override + { + debug_assert (false); + } + + void bulk_pull_account (nano::bulk_pull_account const &) override + { + debug_assert (false); + } + + void bulk_push (nano::bulk_push const &) override + { + debug_assert (false); + } + + void frontier_req (nano::frontier_req const &) override + { + debug_assert (false); + } + + void node_id_handshake (nano::node_id_handshake const & message) override + { + node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + } + + void telemetry_req (nano::telemetry_req const & message) override + { + // Send an empty telemetry_ack if we do not want, just to acknowledge that we have received the message to + // remove any timeouts on the server side waiting for a message. + nano::telemetry_ack telemetry_ack{ node.network_params.network }; + if (!node.flags.disable_providing_telemetry_metrics) + { + auto telemetry_data = node.local_telemetry (); + telemetry_ack = nano::telemetry_ack{ node.network_params.network, telemetry_data }; + } + channel->send (telemetry_ack, nullptr, nano::transport::buffer_drop_policy::no_socket_drop); + } + + void telemetry_ack (nano::telemetry_ack const & message) override + { + node.telemetry.process (message, channel); + } + + void asc_pull_req (nano::asc_pull_req const & message) override + { + node.bootstrap_server.request (message, channel); + } + + void asc_pull_ack (nano::asc_pull_ack const & message) override + { + node.ascendboot.process (message, channel); + } + +private: + nano::node & node; + std::shared_ptr channel; +}; +} + +void nano::message_processor::process (nano::message const & message, std::shared_ptr const & channel) +{ + release_assert (channel != nullptr); + + debug_assert (message.header.network == node.network_params.network.current_network); + debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); + + stats.inc (nano::stat::type::message, to_stat_detail (message.type ()), nano::stat::dir::in); + logger.trace (nano::log::type::message, to_log_detail (message.type ()), nano::log::arg{ "message", message }); + + process_visitor visitor{ node, channel }; + message.visit (visitor); +} + +std::unique_ptr nano::message_processor::collect_container_info (std::string const & name) +{ + nano::lock_guard guard{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (queue.collect_container_info ("queue")); + return composite; +} + +/* + * message_processor_config + */ + +nano::error nano::message_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("threads", threads, "Number of threads to use for message processing. \ntype:uint64"); + toml.put ("max_queue", max_queue, "Maximum number of messages per peer to queue for processing. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::message_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("threads", threads); + toml.get ("max_queue", max_queue); + + return toml.get_error (); +} \ No newline at end of file diff --git a/nano/node/message_processor.hpp b/nano/node/message_processor.hpp new file mode 100644 index 0000000000..e160b3396a --- /dev/null +++ b/nano/node/message_processor.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace nano +{ +class message_processor_config final +{ +public: + nano::error deserialize (nano::tomlconfig & toml); + nano::error serialize (nano::tomlconfig & toml) const; + +public: + size_t threads{ std::min (nano::hardware_concurrency () / 4, 2u) }; + size_t max_queue{ 64 }; +}; + +/* + * If mutex locking is ever a performance bottleneck, using a lock-free queue in front of the priority queue should be considered. + */ +class message_processor final +{ +public: + explicit message_processor (message_processor_config const &, nano::node &); + ~message_processor (); + + void start (); + void stop (); + + bool put (std::unique_ptr, std::shared_ptr const &); + void process (nano::message const &, std::shared_ptr const &); + + std::unique_ptr collect_container_info (std::string const & name); + +private: + void run (); + void run_batch (nano::unique_lock &); + +private: // Dependencies + message_processor_config const & config; + nano::node & node; + nano::stats & stats; + nano::logger & logger; + +private: + using entry_t = std::pair, std::shared_ptr>; + nano::fair_queue queue; + + std::atomic stopped{ false }; + nano::mutex mutex; + nano::condition_variable condition; + std::vector threads; +}; +} \ No newline at end of file diff --git a/nano/node/network.cpp b/nano/node/network.cpp index befb6e9926..6fc16a7c4b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -1,3 +1,5 @@ +#include "message_processor.hpp" + #include #include #include @@ -21,9 +23,7 @@ nano::network::network (nano::node & node, uint16_t port) : syn_cookies{ node.network_params.network.max_peers_per_ip, node.logger }, resolver{ node.io_ctx }, publish_filter{ 256 * 1024 }, - tcp_channels{ node, [this] (nano::message const & message, std::shared_ptr const & channel) { - inbound (message, channel); - } }, + tcp_channels{ node }, port{ port } { } @@ -31,7 +31,6 @@ nano::network::network (nano::node & node, uint16_t port) : nano::network::~network () { // All threads must be stopped before this destructor - debug_assert (processing_threads.empty ()); debug_assert (!cleanup_thread.joinable ()); debug_assert (!keepalive_thread.joinable ()); debug_assert (!reachout_thread.joinable ()); @@ -63,14 +62,6 @@ void nano::network::start () if (!node.flags.disable_tcp_realtime) { tcp_channels.start (); - - for (std::size_t i = 0; i < node.config.network_threads; ++i) - { - processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () { - nano::thread_role::set (nano::thread_role::name::packet_processing); - run_processing (); - }); - } } } @@ -85,12 +76,6 @@ void nano::network::stop () tcp_channels.stop (); resolver.cancel (); - for (auto & thread : processing_threads) - { - thread.join (); - } - processing_threads.clear (); - join_or_pass (keepalive_thread); join_or_pass (cleanup_thread); join_or_pass (reachout_thread); @@ -99,35 +84,6 @@ void nano::network::stop () port = 0; } -void nano::network::run_processing () -{ - try - { - // TODO: Move responsibility of packet queuing and processing to the message_processor class - tcp_channels.process_messages (); - } - catch (boost::system::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::error_code & ec) - { - node.logger.critical (nano::log::type::network, "Error: {}", ec.message ()); - release_assert (false); - } - catch (std::runtime_error & err) - { - node.logger.critical (nano::log::type::network, "Error: {}", err.what ()); - release_assert (false); - } - catch (...) - { - node.logger.critical (nano::log::type::network, "Unknown error"); - release_assert (false); - } -} - void nano::network::run_cleanup () { nano::unique_lock lock{ mutex }; @@ -346,136 +302,12 @@ void nano::network::flood_block_many (std::deque> b } } -namespace -{ -class network_message_visitor : public nano::message_visitor -{ -public: - network_message_visitor (nano::node & node_a, std::shared_ptr const & channel_a) : - node{ node_a }, - channel{ channel_a } - { - } - - void keepalive (nano::keepalive const & message_a) override - { - // Check for special node port data - auto peer0 (message_a.peers[0]); - if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) - { - // TODO: Remove this as we do not need to establish a second connection to the same peer - nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ()); - node.network.merge_peer (new_endpoint); - - // Remember this for future forwarding to other peers - channel->set_peering_endpoint (new_endpoint); - } - } - - void publish (nano::publish const & message) override - { - bool added = node.block_processor.add (message.block, nano::block_source::live, channel); - if (!added) - { - node.network.publish_filter.clear (message.digest); - node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); - } - } - - void confirm_req (nano::confirm_req const & message) override - { - // Don't load nodes with disabled voting - // TODO: This check should be cached somewhere - if (node.config.enable_voting && node.wallets.reps ().voting > 0) - { - if (!message.roots_hashes.empty ()) - { - node.aggregator.request (message.roots_hashes, channel); - } - } - } - - void confirm_ack (nano::confirm_ack const & message_a) override - { - if (!message_a.vote->account.is_zero ()) - { - node.vote_processor.vote (message_a.vote, channel); - } - } - - void bulk_pull (nano::bulk_pull const &) override - { - debug_assert (false); - } - - void bulk_pull_account (nano::bulk_pull_account const &) override - { - debug_assert (false); - } - - void bulk_push (nano::bulk_push const &) override - { - debug_assert (false); - } - - void frontier_req (nano::frontier_req const &) override - { - debug_assert (false); - } - - void node_id_handshake (nano::node_id_handshake const & message_a) override - { - node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - } - - void telemetry_req (nano::telemetry_req const & message_a) override - { - // Send an empty telemetry_ack if we do not want, just to acknowledge that we have received the message to - // remove any timeouts on the server side waiting for a message. - nano::telemetry_ack telemetry_ack{ node.network_params.network }; - if (!node.flags.disable_providing_telemetry_metrics) - { - auto telemetry_data = node.local_telemetry (); - telemetry_ack = nano::telemetry_ack{ node.network_params.network, telemetry_data }; - } - channel->send (telemetry_ack, nullptr, nano::transport::buffer_drop_policy::no_socket_drop); - } - - void telemetry_ack (nano::telemetry_ack const & message_a) override - { - node.telemetry.process (message_a, channel); - } - - void asc_pull_req (nano::asc_pull_req const & message) override - { - node.bootstrap_server.request (message, channel); - } - - void asc_pull_ack (nano::asc_pull_ack const & message) override - { - node.ascendboot.process (message, channel); - } - -private: - nano::node & node; - std::shared_ptr channel; -}; -} - -void nano::network::process_message (nano::message const & message, std::shared_ptr const & channel) -{ - node.stats.inc (nano::stat::type::message, to_stat_detail (message.type ()), nano::stat::dir::in); - node.logger.trace (nano::log::type::network_processed, to_log_detail (message.type ()), nano::log::arg{ "message", message }); - - network_message_visitor visitor{ node, channel }; - message.visit (visitor); -} - void nano::network::inbound (const nano::message & message, const std::shared_ptr & channel) { debug_assert (message.header.network == node.network_params.network.current_network); debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); - process_message (message, channel); + + node.message_processor.process (message, channel); } // Send keepalives to all the peers we've been notified of @@ -731,57 +563,6 @@ nano::node_id_handshake::response_payload nano::network::prepare_handshake_respo return response; } -/* - * tcp_message_manager - */ - -nano::transport::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) : - max_entries (incoming_connections_max_a * max_entries_per_connection + 1) -{ - debug_assert (max_entries > 0); -} - -void nano::transport::tcp_message_manager::put (std::unique_ptr message, std::shared_ptr channel) -{ - { - nano::unique_lock lock{ mutex }; - while (entries.size () >= max_entries && !stopped) - { - producer_condition.wait (lock); - } - entries.emplace_back (std::move (message), channel); - } - consumer_condition.notify_one (); -} - -auto nano::transport::tcp_message_manager::next () -> entry_t -{ - entry_t result{ nullptr, nullptr }; - nano::unique_lock lock{ mutex }; - while (entries.empty () && !stopped) - { - consumer_condition.wait (lock); - } - if (!entries.empty ()) - { - result = std::move (entries.front ()); - entries.pop_front (); - } - lock.unlock (); - producer_condition.notify_one (); - return result; -} - -void nano::transport::tcp_message_manager::stop () -{ - { - nano::lock_guard lock{ mutex }; - stopped = true; - } - consumer_condition.notify_all (); - producer_condition.notify_all (); -} - /* * syn_cookies */ diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 84c131e73d..ea39c00d19 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -6,8 +6,6 @@ #include #include -#include - #include #include #include @@ -108,12 +106,10 @@ class network final nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const; private: - void run_processing (); void run_cleanup (); void run_keepalive (); void run_reachout (); void run_reachout_cached (); - void process_message (nano::message const &, std::shared_ptr const &); private: // Dependencies nano::node & node; @@ -136,7 +132,6 @@ class network final std::atomic stopped{ false }; mutable nano::mutex mutex; nano::condition_variable condition; - std::vector processing_threads; // Using boost::thread to enable increased stack size std::thread cleanup_thread; std::thread keepalive_thread; std::thread reachout_thread; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 2b275dd0d0..57b2e01a36 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -159,6 +160,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy ledger_impl{ std::make_unique (store, stats, network_params.ledger, flags_a.generate_cache, config_a.representative_vote_weight_minimum.number ()) }, ledger{ *ledger_impl }, outbound_limiter{ outbound_bandwidth_limiter_config (config) }, + message_processor_impl{ std::make_unique (config.message_processor, *this) }, + message_processor{ *message_processor_impl }, // empty `config.peering_port` means the user made no port choice at all; // otherwise, any value is considered, with `0` having the special meaning of 'let the OS pick a port instead' // @@ -600,6 +603,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (node.unchecked.collect_container_info ("unchecked")); composite->add_component (node.local_block_broadcaster.collect_container_info ("local_block_broadcaster")); composite->add_component (node.rep_tiers.collect_container_info ("rep_tiers")); + composite->add_component (node.message_processor.collect_container_info ("message_processor")); return composite; } @@ -636,6 +640,7 @@ void nano::node::start () long_inactivity_cleanup (); network.start (); + message_processor.start (); if (!flags.disable_legacy_bootstrap && !flags.disable_ongoing_bootstrap) { @@ -763,6 +768,7 @@ void nano::node::stop () epoch_upgrader.stop (); workers.stop (); local_block_broadcaster.stop (); + message_processor.stop (); network.stop (); // Stop network last to avoid killing in-use sockets // work pool is not stopped on purpose due to testing setup diff --git a/nano/node/node.hpp b/nano/node/node.hpp index a52659298d..8e8397b1b8 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -45,6 +45,7 @@ namespace nano { class active_elections; class confirming_set; +class message_processor; class node; class vote_processor; class vote_router; @@ -161,6 +162,8 @@ class node final : public std::enable_shared_from_this std::unique_ptr ledger_impl; nano::ledger & ledger; nano::outbound_bandwidth_limiter outbound_limiter; + std::unique_ptr message_processor_impl; + nano::message_processor & message_processor; nano::network network; nano::telemetry telemetry; nano::bootstrap_initiator bootstrap_initiator; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index abd98e1fc2..f434f5f8fa 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -85,6 +85,11 @@ nano::node_config::node_config (const std::optional & peering_port_a, } } +nano::node_config::~node_config () +{ + // Keep the node_config destructor definition here to avoid incomplete type issues +} + nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const { if (peering_port.has_value ()) @@ -237,6 +242,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const request_aggregator.serialize (request_aggregator_l); toml.put_child ("request_aggregator", request_aggregator_l); + nano::tomlconfig message_processor_l; + message_processor.serialize (message_processor_l); + toml.put_child ("message_processor", message_processor_l); + return toml.get_error (); } @@ -348,6 +357,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) request_aggregator.deserialize (config_l); } + if (toml.has_key ("message_processor")) + { + auto config_l = toml.get_required_child ("message_processor"); + message_processor.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index a17cecacd5..bdd16bfd56 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,8 @@ enum class frontiers_confirmation_mode : uint8_t invalid }; +class message_processor_config; + /** * Node configuration */ @@ -50,6 +53,7 @@ class node_config // TODO: Users of this class rely on the default copy consturctor. This prevents using unique_ptrs with forward declared types. node_config (nano::network_params & network_params = nano::dev::network_params); node_config (const std::optional &, nano::network_params & network_params = nano::dev::network_params); + ~node_config (); nano::error serialize_toml (nano::tomlconfig &) const; nano::error deserialize_toml (nano::tomlconfig &); @@ -143,6 +147,7 @@ class node_config nano::peer_history_config peer_history; nano::transport::tcp_config tcp; nano::request_aggregator_config request_aggregator; + nano::message_processor_config message_processor; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index e60fe4428d..5c60ed4b32 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -107,10 +107,8 @@ void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const * tcp_channels */ -nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function const &)> sink) : - node{ node }, - message_manager{ node.config.tcp_incoming_connections_max }, - sink{ std::move (sink) } +nano::transport::tcp_channels::tcp_channels (nano::node & node) : + node{ node } { } @@ -131,8 +129,6 @@ void nano::transport::tcp_channels::stop () } condition.notify_all (); - message_manager.stop (); - close (); } @@ -342,27 +338,6 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () return result; } -void nano::transport::tcp_channels::queue_message (std::unique_ptr message, std::shared_ptr channel) -{ - if (!stopped) - { - message_manager.put (std::move (message), std::move (channel)); - } -} - -void nano::transport::tcp_channels::process_messages () -{ - while (!stopped) - { - auto [message, channel] = message_manager.next (); - if (message != nullptr) - { - release_assert (channel != nullptr); - sink (*message, channel); - } - } -} - bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) { if (node.flags.disable_max_peers_per_ip) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index c9a18a9a2b..f3965d6746 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -26,30 +26,6 @@ namespace transport class tcp_channels; class channel_tcp; - // TODO: Replace with message_processor component with fair queueing - class tcp_message_manager final - { - public: - using entry_t = std::pair, std::shared_ptr>; - - explicit tcp_message_manager (unsigned incoming_connections_max); - void stop (); - - void put (std::unique_ptr, std::shared_ptr); - entry_t next (); - - private: - nano::mutex mutex; - nano::condition_variable producer_condition; - nano::condition_variable consumer_condition; - std::deque entries; - unsigned max_entries; - static unsigned const max_entries_per_connection = 16; - bool stopped{ false }; - - friend class network_tcp_message_manager_Test; - }; - class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this { friend class nano::transport::tcp_channels; @@ -132,7 +108,7 @@ namespace transport friend class network_peer_max_tcp_attempts_subnetwork_Test; public: - explicit tcp_channels (nano::node &, std::function const &)> sink = nullptr); + explicit tcp_channels (nano::node &); ~tcp_channels (); void start (); @@ -147,8 +123,6 @@ namespace transport std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); - void queue_message (std::unique_ptr, std::shared_ptr); - void process_messages (); bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); @@ -167,9 +141,6 @@ namespace transport private: // Dependencies nano::node & node; - public: - tcp_message_manager message_manager; - private: void close (); bool check (nano::tcp_endpoint const &, nano::account const & node_id) const; @@ -273,8 +244,6 @@ namespace transport // clang-format on private: - std::function const &)> sink; - std::atomic stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index ab9ce23693..2287e53a43 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -253,7 +253,9 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr release_assert (channel != nullptr); channel->set_last_packet_received (std::chrono::steady_clock::now ()); - node->network.tcp_channels.queue_message (std::move (message), channel); + + bool added = node->message_processor.put (std::move (message), channel); + // TODO: Throttle if not added } auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status