diff --git a/nano/core_test/bootstrap.cpp b/nano/core_test/bootstrap.cpp index 23600e035d..84995fa540 100644 --- a/nano/core_test/bootstrap.cpp +++ b/nano/core_test/bootstrap.cpp @@ -291,7 +291,7 @@ TEST (bootstrap_processor, process_none) auto node1 = system.make_disconnected_node (); bool done = false; - node0->observers.socket_accepted.add ([&] (nano::transport::socket & socket) { + node0->observers.socket_connected.add ([&] (nano::transport::socket & socket) { done = true; }); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 3a1c5660f0..a9c5c1b8d2 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -853,9 +853,9 @@ TEST (network, tcp_no_accept_excluded_peers) { node0->network.excluded_peers.add (endpoint1_tcp); } - ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded)); + ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded)); node1->network.merge_peer (node0->network.endpoint ()); - ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded) >= 1); + ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded) >= 1); ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ())); // Should not actively reachout to excluded peers @@ -878,12 +878,12 @@ TEST (network, tcp_no_accept_excluded_peers) ASSERT_TIMELY_EQ (5s, node0->network.size (), 1); } +/* namespace nano { TEST (network, tcp_message_manager) { - nano::tcp_message_manager manager (1); - nano::tcp_message_item item; + nano::transport::tcp_message_manager manager (1); item.node_id = nano::account (100); ASSERT_EQ (0, manager.entries.size ()); manager.put_message (item); @@ -938,6 +938,7 @@ TEST (network, tcp_message_manager) } } } +*/ TEST (network, cleanup_purge) { @@ -1026,9 +1027,14 @@ TEST (network, filter_invalid_version_using) TEST (network, fill_keepalive_self) { nano::test::system system{ 2 }; - std::array target; - system.nodes[0]->network.fill_keepalive_self (target); - ASSERT_EQ (target[2].port (), system.nodes[1]->network.port); + + auto get_keepalive = [&system] (nano::node & node) { + std::array target; + node.network.fill_keepalive_self (target); + return target; + }; + + ASSERT_TIMELY_EQ (5s, get_keepalive (system.node (0))[2].port (), system.nodes[1]->network.port); } TEST (network, reconnect_cached) @@ -1085,9 +1091,9 @@ TEST (network, reconnect_cached) /* * Tests that channel and channel container removes channels with dead local sockets */ -TEST (network, purge_dead_channel_outgoing) +TEST (network, purge_dead_channel) { - nano::test::system system{}; + nano::test::system system; nano::node_flags flags; // Disable non realtime sockets @@ -1100,36 +1106,14 @@ TEST (network, purge_dead_channel_outgoing) auto & node1 = *system.add_node (flags); - // We expect one incoming and one outgoing connection - std::shared_ptr outgoing; - std::shared_ptr incoming; - - std::atomic connected_count{ 0 }; - node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) { - connected_count++; - outgoing = socket.shared_from_this (); - - std::cout << "connected: " << socket.remote_endpoint () << std::endl; - }); - - std::atomic accepted_count{ 0 }; - node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) { - accepted_count++; - incoming = socket.shared_from_this (); - - std::cout << "accepted: " << socket.remote_endpoint () << std::endl; + node1.observers.socket_connected.add ([&] (nano::transport::socket & sock) { + system.logger.debug (nano::log::type::test, "Connected: {}", sock); }); auto & node2 = *system.add_node (flags); - ASSERT_TIMELY_EQ (5s, connected_count, 1); - ASSERT_ALWAYS_EQ (1s, connected_count, 1); - - ASSERT_TIMELY_EQ (5s, accepted_count, 1); - ASSERT_ALWAYS_EQ (1s, accepted_count, 1); - ASSERT_EQ (node1.network.size (), 1); - ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1); + ASSERT_ALWAYS_EQ (500ms, node1.network.size (), 1); // Store reference to the only channel auto channels = node1.network.list (); @@ -1137,29 +1121,29 @@ TEST (network, purge_dead_channel_outgoing) auto channel = channels.front (); ASSERT_TRUE (channel); + auto sockets = node1.tcp_listener.sockets (); + ASSERT_EQ (sockets.size (), 1); + auto socket = sockets.front (); + ASSERT_TRUE (socket); + // When socket is dead ensure channel knows about that ASSERT_TRUE (channel->alive ()); - outgoing->close (); - ASSERT_TIMELY (5s, !channel->alive ()); - - // Shortly after that a new channel should be established - ASSERT_TIMELY_EQ (5s, connected_count, 2); - ASSERT_ALWAYS_EQ (1s, connected_count, 2); + socket->close (); + ASSERT_TIMELY (10s, !channel->alive ()); - // Check that a new channel is healthy - auto channels2 = node1.network.list (); - ASSERT_EQ (channels2.size (), 1); - auto channel2 = channels2.front (); - ASSERT_TRUE (channel2); - ASSERT_TRUE (channel2->alive ()); + auto channel_exists = [] (auto & node, auto & channel) { + auto channels = node.network.list (); + return std::find (channels.begin (), channels.end (), channel) != channels.end (); + }; + ASSERT_TIMELY (5s, !channel_exists (node1, channel)); } /* * Tests that channel and channel container removes channels with dead remote sockets */ -TEST (network, purge_dead_channel_incoming) +TEST (network, purge_dead_channel_remote) { - nano::test::system system{}; + nano::test::system system; nano::node_flags flags; // Disable non realtime sockets @@ -1171,37 +1155,15 @@ TEST (network, purge_dead_channel_incoming) flags.disable_wallet_bootstrap = true; auto & node1 = *system.add_node (flags); - - // We expect one incoming and one outgoing connection - std::shared_ptr outgoing; - std::shared_ptr incoming; - - std::atomic connected_count{ 0 }; - node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) { - connected_count++; - outgoing = socket.shared_from_this (); - - std::cout << "connected: " << socket.remote_endpoint () << std::endl; - }); - - std::atomic accepted_count{ 0 }; - node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) { - accepted_count++; - incoming = socket.shared_from_this (); - - std::cout << "accepted: " << socket.remote_endpoint () << std::endl; - }); - auto & node2 = *system.add_node (flags); - ASSERT_TIMELY_EQ (5s, connected_count, 1); - ASSERT_ALWAYS_EQ (1s, connected_count, 1); - - ASSERT_TIMELY_EQ (5s, accepted_count, 1); - ASSERT_ALWAYS_EQ (1s, accepted_count, 1); + node2.observers.socket_connected.add ([&] (nano::transport::socket & sock) { + system.logger.debug (nano::log::type::test, "Connected: {}", sock); + }); + ASSERT_EQ (node1.network.size (), 1); ASSERT_EQ (node2.network.size (), 1); - ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1); + ASSERT_ALWAYS_EQ (500ms, std::min (node1.network.size (), node2.network.size ()), 1); // Store reference to the only channel auto channels = node2.network.list (); @@ -1209,19 +1171,19 @@ TEST (network, purge_dead_channel_incoming) auto channel = channels.front (); ASSERT_TRUE (channel); + auto sockets = node1.tcp_listener.sockets (); + ASSERT_EQ (sockets.size (), 1); + auto socket = sockets.front (); + ASSERT_TRUE (socket); + // When remote socket is dead ensure channel knows about that ASSERT_TRUE (channel->alive ()); - incoming->close (); + socket->close (); ASSERT_TIMELY (5s, !channel->alive ()); - // Shortly after that a new channel should be established - ASSERT_TIMELY_EQ (5s, accepted_count, 2); - ASSERT_ALWAYS_EQ (1s, accepted_count, 2); - - // Check that a new channel is healthy - auto channels2 = node2.network.list (); - ASSERT_EQ (channels2.size (), 1); - auto channel2 = channels2.front (); - ASSERT_TRUE (channel2); - ASSERT_TRUE (channel2->alive ()); + auto channel_exists = [] (auto & node, auto & channel) { + auto channels = node.network.list (); + return std::find (channels.begin (), channels.end (), channel) != channels.end (); + }; + ASSERT_TIMELY (5s, !channel_exists (node2, channel)); } diff --git a/nano/core_test/peer_history.cpp b/nano/core_test/peer_history.cpp index e09dc69f61..7dd9397914 100644 --- a/nano/core_test/peer_history.cpp +++ b/nano/core_test/peer_history.cpp @@ -26,8 +26,11 @@ TEST (peer_history, erase_old) { nano::test::system system; - auto & node1 = *system.add_node (); - auto & node2 = *system.add_node (); + auto node_config = system.default_config (); + node_config.peer_history.erase_cutoff = 1s; + + auto & node1 = *system.add_node (node_config); + auto & node2 = *system.add_node (node_config); ASSERT_TIMELY (5s, node1.peer_history.exists (node2.network.endpoint ())); ASSERT_TIMELY (5s, node2.peer_history.exists (node1.network.endpoint ())); @@ -41,6 +44,7 @@ TEST (peer_history, erase_old) ASSERT_EQ (cached1.size (), 1); ASSERT_EQ (cached1[0], node2_endpoint); + ASSERT_TIMELY_EQ (5s, node1.network.size (), 0); ASSERT_TIMELY (5s, !node1.peer_history.exists (node2_endpoint)); auto cached2 = node1.peer_history.peers (); diff --git a/nano/core_test/rep_crawler.cpp b/nano/core_test/rep_crawler.cpp index 70fc542fff..13088d7d76 100644 --- a/nano/core_test/rep_crawler.cpp +++ b/nano/core_test/rep_crawler.cpp @@ -89,12 +89,15 @@ TEST (rep_crawler, rep_weight) ASSERT_TRUE (nano::test::process (node2, { block1, block2, block3, block4 })); ASSERT_TRUE (nano::test::process (node3, { block1, block2, block3, block4 })); ASSERT_TRUE (node.rep_crawler.representatives (1).empty ()); - std::shared_ptr channel1 = nano::test::establish_tcp (system, node, node1.network.endpoint ()); + + ASSERT_TIMELY (5s, node.network.size () == 3); + auto channel1 = node.network.find_node_id (node1.node_id.pub); + auto channel2 = node.network.find_node_id (node2.node_id.pub); + auto channel3 = node.network.find_node_id (node3.node_id.pub); ASSERT_NE (nullptr, channel1); - std::shared_ptr channel2 = nano::test::establish_tcp (system, node, node2.network.endpoint ()); ASSERT_NE (nullptr, channel2); - std::shared_ptr channel3 = nano::test::establish_tcp (system, node, node3.network.endpoint ()); ASSERT_NE (nullptr, channel3); + auto vote0 = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector{ nano::dev::genesis->hash () }); auto vote1 = std::make_shared (keypair1.pub, keypair1.prv, 0, 0, std::vector{ nano::dev::genesis->hash () }); auto vote2 = std::make_shared (keypair2.pub, keypair2.prv, 0, 0, std::vector{ nano::dev::genesis->hash () }); @@ -227,8 +230,6 @@ TEST (rep_crawler, rep_remove) reps = searching_node.rep_crawler.representatives (1); ASSERT_EQ (nano::dev::genesis_key.pub, reps[0].account); ASSERT_TIMELY_EQ (5s, searching_node.network.size (), 1); - auto list (searching_node.network.list (1)); - ASSERT_EQ (node_genesis_rep->network.endpoint (), list[0]->get_endpoint ()); } TEST (rep_crawler, rep_connection_close) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 5a7a627735..971887e530 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -36,7 +36,10 @@ TEST (socket, max_connections) std::mutex server_sockets_mutex; // start a server socket that allows max 2 live connections - nano::transport::tcp_listener listener{ server_port, *node, 2 }; + nano::transport::tcp_config tcp_config{ nano::dev::network_params.network }; + tcp_config.max_inbound_connections = 2; + + nano::transport::tcp_listener listener{ server_port, tcp_config, *node }; listener.connection_accepted.add ([&] (auto const & socket, auto const & server) { std::lock_guard guard{ server_sockets_mutex }; server_sockets.push_back (socket); @@ -63,12 +66,12 @@ TEST (socket, max_connections) auto client3 = std::make_shared (*node); client3->async_connect (dst_endpoint, connect_handler); - auto get_tcp_accept_failures = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); + auto get_tcp_accept_failures = [] (auto & node) { + return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure); }; - auto get_tcp_accept_successes = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); + auto get_tcp_accept_successes = [] (auto & node) { + return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success); }; auto server_sockets_size = [&] () { @@ -76,8 +79,8 @@ TEST (socket, max_connections) return server_sockets.size (); }; - ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (), 2); - ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 2); + ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (node), 2); + ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 2); ASSERT_TIMELY_EQ (5s, connection_attempts, 3); ASSERT_TIMELY_EQ (5s, server_sockets_size (), 2); @@ -94,8 +97,8 @@ TEST (socket, max_connections) auto client5 = std::make_shared (*node); client5->async_connect (dst_endpoint, connect_handler); - ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (), 3); - ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 3); + ASSERT_TIMELY_EQ (10s, get_tcp_accept_successes (node), 3); + ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 3); ASSERT_TIMELY_EQ (5s, connection_attempts, 5); ASSERT_TIMELY_EQ (5s, server_sockets.size (), 3); @@ -116,8 +119,8 @@ TEST (socket, max_connections) auto client8 = std::make_shared (*node); client8->async_connect (dst_endpoint, connect_handler); - ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5); - ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (), 5); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (node), 5); + ASSERT_ALWAYS_EQ (1s, get_tcp_accept_successes (node), 5); ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client ASSERT_TIMELY_EQ (5s, server_sockets_size (), 5); // connections accepted by the server } @@ -144,7 +147,10 @@ TEST (socket, max_connections_per_ip) // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - nano::transport::tcp_listener listener{ server_port, *node, max_global_connections }; + nano::transport::tcp_config tcp_config{ nano::dev::network_params.network }; + tcp_config.max_inbound_connections = max_global_connections; + + nano::transport::tcp_listener listener{ server_port, tcp_config, *node }; listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) { server_sockets.push_back (socket); }); @@ -170,16 +176,8 @@ TEST (socket, max_connections_per_ip) client_list.push_back (client); } - auto get_tcp_max_per_ip = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in); - }; - - auto get_tcp_accept_successes = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - }; - - ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections); - ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 1); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_ip_connections); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip), 1); ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1); } @@ -267,7 +265,10 @@ TEST (socket, max_connections_per_subnetwork) // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - nano::transport::tcp_listener listener{ server_port, *node, max_global_connections }; + nano::transport::tcp_config tcp_config{ nano::dev::network_params.network }; + tcp_config.max_inbound_connections = max_global_connections; + + nano::transport::tcp_listener listener{ server_port, tcp_config, *node }; listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) { server_sockets.push_back (socket); }); @@ -293,16 +294,8 @@ TEST (socket, max_connections_per_subnetwork) client_list.push_back (client); } - auto get_tcp_max_per_subnetwork = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in); - }; - - auto get_tcp_accept_successes = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - }; - - ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_subnetwork_connections); - ASSERT_TIMELY_EQ (5s, get_tcp_max_per_subnetwork (), 1); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_subnetwork_connections); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_subnetwork), 1); ASSERT_TIMELY_EQ (5s, connection_attempts, max_subnetwork_connections + 1); } @@ -330,7 +323,10 @@ TEST (socket, disabled_max_peers_per_ip) // successful incoming connections are stored in server_sockets to keep them alive (server side) std::vector> server_sockets; - nano::transport::tcp_listener listener = { server_port, *node, max_global_connections }; + nano::transport::tcp_config tcp_config{ nano::dev::network_params.network }; + tcp_config.max_inbound_connections = max_global_connections; + + nano::transport::tcp_listener listener = { server_port, tcp_config, *node }; listener.connection_accepted.add ([&server_sockets] (auto const & socket, auto const & server) { server_sockets.push_back (socket); }); @@ -356,16 +352,8 @@ TEST (socket, disabled_max_peers_per_ip) client_list.push_back (client); } - auto get_tcp_max_per_ip = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in); - }; - - auto get_tcp_accept_successes = [&node] () { - return node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - }; - - ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections + 1); - ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 0); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), max_ip_connections + 1); + ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip), 0); ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1); } diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 8d90ef3eb3..beabcad4a8 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1023,8 +1023,11 @@ TEST (websocket, telemetry) ASSERT_TRUE (nano::test::compare_telemetry (telemetry_data, *node2)); - ASSERT_EQ (contents.get ("address"), node2->network.endpoint ().address ().to_string ()); - ASSERT_EQ (contents.get ("port"), node2->network.endpoint ().port ()); + auto channel2 = node2->network.find_node_id (node1->get_node_id ()); + ASSERT_NE (channel2, nullptr); + + ASSERT_EQ (contents.get ("address"), channel2->get_local_endpoint ().address ().to_string ()); + ASSERT_EQ (contents.get ("port"), channel2->get_local_endpoint ().port ()); // Other node should have no subscribers EXPECT_EQ (0, node2->websocket.server->subscriber_count (nano::websocket::topic::telemetry)); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 7ae4ca7715..be36061e30 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -31,7 +31,9 @@ enum class type ipc, tcp, tcp_channels, + tcp_channels_rejected, tcp_listener, + tcp_listener_rejected, channel, socket, confirmation_height, @@ -259,12 +261,26 @@ enum class detail accept_success, accept_error, accept_failure, - accept_limits_exceeded, + accept_rejected, close_error, max_per_ip, max_per_subnetwork, + max_attempts, + max_attempts_per_ip, excluded, erase_dead, + connect_initiate, + connect_failure, + connect_error, + connect_rejected, + connect_success, + attempt_timeout, + not_a_peer, + + // tcp_channels + channel_accepted, + channel_rejected, + channel_duplicate, // tcp_server handshake, diff --git a/nano/lib/thread_runner.cpp b/nano/lib/thread_runner.cpp index a57479639d..9b372eea94 100644 --- a/nano/lib/thread_runner.cpp +++ b/nano/lib/thread_runner.cpp @@ -17,15 +17,15 @@ nano::thread_runner::thread_runner (std::shared_ptr io_ { debug_assert (io_ctx != nullptr); + logger.debug (nano::log::type::thread_runner, "Starting threads: {} ({})", num_threads, to_string (role)); + for (auto i (0u); i < num_threads; ++i) { threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () { nano::thread_role::set (role); try { - logger.debug (nano::log::type::thread_runner, "Started thread #{} ({})", i, to_string (role)); run (); - logger.debug (nano::log::type::thread_runner, "Stopped thread #{} ({})", i, to_string (role)); } catch (std::exception const & ex) { @@ -88,6 +88,9 @@ void nano::thread_runner::join () i.join (); } } + + logger.debug (nano::log::type::thread_runner, "Stopped threads ({})", to_string (role)); + io_ctx.reset (); } diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index cbee474aa8..36344df9e1 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -25,7 +25,7 @@ nano::bootstrap_client::bootstrap_client (std::shared_ptr const & no { ++node_a->bootstrap_initiator.connections->connections_count; receive_buffer->resize (256); - channel->update_endpoint (); + channel->update_endpoints (); } nano::bootstrap_client::~bootstrap_client () diff --git a/nano/node/confirmation_solicitor.cpp b/nano/node/confirmation_solicitor.cpp index feb782453a..5695f5f8b9 100644 --- a/nano/node/confirmation_solicitor.cpp +++ b/nano/node/confirmation_solicitor.cpp @@ -17,6 +17,8 @@ nano::confirmation_solicitor::confirmation_solicitor (nano::network & network_a, void nano::confirmation_solicitor::prepare (std::vector const & representatives_a) { debug_assert (!prepared); + debug_assert (std::none_of (representatives_a.begin (), representatives_a.end (), [] (auto const & rep) { return rep.channel == nullptr; })); + requests.clear (); rebroadcasted = 0; /** Two copies are required as representatives can be erased from \p representatives_requests */ diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 2a268ad25f..f220764c86 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3060,6 +3060,10 @@ void nano::json_handler::peers () } debug_assert (channel->get_type () == nano::transport::transport_type::tcp); pending_tree.put ("type", "tcp"); + + auto peering_endpoint = channel->get_peering_endpoint (); + pending_tree.put ("peering", boost::lexical_cast (peering_endpoint)); + peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), pending_tree)); } else diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 3b01286150..9242298c73 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -363,6 +363,7 @@ class network_message_visitor : public nano::message_visitor auto peer0 (message_a.peers[0]); if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) { + // TODO: Remove this as we do not need to establish a second connection to the same peer nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ()); node.network.merge_peer (new_endpoint); @@ -733,13 +734,13 @@ nano::node_id_handshake::response_payload nano::network::prepare_handshake_respo * tcp_message_manager */ -nano::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) : - max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1) +nano::transport::tcp_message_manager::tcp_message_manager (unsigned incoming_connections_max_a) : + max_entries (incoming_connections_max_a * max_entries_per_connection + 1) { debug_assert (max_entries > 0); } -void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a) +void nano::transport::tcp_message_manager::put (std::unique_ptr message, std::shared_ptr channel) { { nano::unique_lock lock{ mutex }; @@ -747,14 +748,14 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item { producer_condition.wait (lock); } - entries.push_back (item_a); + entries.emplace_back (std::move (message), channel); } consumer_condition.notify_one (); } -nano::tcp_message_item nano::tcp_message_manager::get_message () +auto nano::transport::tcp_message_manager::next () -> entry_t { - nano::tcp_message_item result; + entry_t result{ nullptr, nullptr }; nano::unique_lock lock{ mutex }; while (entries.empty () && !stopped) { @@ -765,16 +766,12 @@ nano::tcp_message_item nano::tcp_message_manager::get_message () result = std::move (entries.front ()); entries.pop_front (); } - else - { - result = nano::tcp_message_item{ nullptr, nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr }; - } lock.unlock (); producer_condition.notify_one (); return result; } -void nano::tcp_message_manager::stop () +void nano::transport::tcp_message_manager::stop () { { nano::lock_guard lock{ mutex }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a90b4767d9..0e06033b02 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -171,7 +171,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy // Thus, be very careful if you change the order: if `bootstrap` gets constructed before `network`, // the latter would inherit the port from the former (if TCP is active, otherwise `network` picks first) // - tcp_listener_impl{ std::make_unique (network.port, *this, config.tcp_incoming_connections_max) }, + tcp_listener_impl{ std::make_unique (network.port, config.tcp, *this) }, tcp_listener{ *tcp_listener_impl }, application_path (application_path_a), port_mapping (*this), @@ -660,11 +660,11 @@ void nano::node::start () network.port = tcp_listener.endpoint ().port (); } - logger.info (nano::log::type::node, "Node peering port: {}", network.port.load ()); + logger.info (nano::log::type::node, "Peering port: {}", network.port.load ()); } else { - logger.warn (nano::log::type::node, "Node peering is disabled"); + logger.warn (nano::log::type::node, "Peering is disabled"); } if (!flags.disable_backup) diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 9a97987c14..6675a7f2bb 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -31,9 +31,7 @@ class node_observers final nano::observer_set<> disconnect; nano::observer_set work_cancel; nano::observer_set const &> telemetry; - nano::observer_set socket_connected; - nano::observer_set socket_accepted; }; std::unique_ptr collect_container_info (node_observers & node_observers, std::string const & name); diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index b5c0f9a9e7..ca9bda3679 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -35,7 +35,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, external_address{ boost::asio::ip::address_v6{}.to_string () }, rep_crawler{ network_params.network }, block_processor{ network_params.network }, - peer_history{ network_params.network } + peer_history{ network_params.network }, + tcp{ network_params.network } { if (peering_port == 0) { diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 32786d70f5..8f2defda79 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -143,6 +144,7 @@ class node_config nano::block_processor_config block_processor; nano::vote_processor_config vote_processor; nano::peer_history_config peer_history; + nano::transport::tcp_config tcp; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/node/peer_history.cpp b/nano/node/peer_history.cpp index 80bf703de2..bf1fa099bb 100644 --- a/nano/node/peer_history.cpp +++ b/nano/node/peer_history.cpp @@ -86,7 +86,7 @@ void nano::peer_history::run_one () // Add or update live peers for (auto const & peer : live_peers) { - auto const endpoint = peer->get_endpoint (); + auto const endpoint = peer->get_peering_endpoint (); bool const exists = store.peer.exists (transaction, endpoint); store.peer.put (transaction, endpoint, nano::milliseconds_since_epoch ()); if (!exists) @@ -141,7 +141,7 @@ nano::peer_history_config::peer_history_config (nano::network_constants const & if (network.is_dev_network ()) { check_interval = 1s; - erase_cutoff = 3s; + erase_cutoff = 10s; } } diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 3192290359..996884a031 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -43,6 +43,7 @@ class channel virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; virtual nano::tcp_endpoint get_tcp_endpoint () const = 0; + virtual nano::endpoint get_local_endpoint () const = 0; virtual nano::transport::transport_type get_type () const = 0; virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) diff --git a/nano/node/transport/common.hpp b/nano/node/transport/common.hpp new file mode 100644 index 0000000000..58f15ce43e --- /dev/null +++ b/nano/node/transport/common.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include + +namespace nano::transport +{ +/** Policy to affect at which stage a buffer can be dropped */ +enum class buffer_drop_policy +{ + /** Can be dropped by bandwidth limiter (default) */ + limiter, + /** Should not be dropped by bandwidth limiter */ + no_limiter_drop, + /** Should not be dropped by bandwidth limiter or socket write queue limiter */ + no_socket_drop +}; + +enum class socket_type +{ + undefined, + bootstrap, + realtime, + realtime_response_server // special type for tcp channel response server +}; + +std::string_view to_string (socket_type); + +enum class socket_endpoint +{ + server, // Socket was created by accepting an incoming connection + client, // Socket was created by initiating an outgoing connection +}; + +std::string_view to_string (socket_endpoint); +} \ No newline at end of file diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index 34d0a23280..a1c95b55f5 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -40,6 +40,11 @@ namespace transport return nano::transport::map_endpoint_to_tcp (endpoint); } + nano::endpoint get_local_endpoint () const override + { + return endpoint; + } + nano::transport::transport_type get_type () const override { return nano::transport::transport_type::fake; diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index 092ab92393..9ae5670293 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -32,6 +32,11 @@ namespace transport return nano::transport::map_endpoint_to_tcp (endpoint); } + nano::endpoint get_local_endpoint () const override + { + return endpoint; + } + nano::transport::transport_type get_type () const override { return nano::transport::transport_type::loopback; diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index c008614751..233513dea9 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -28,35 +29,6 @@ class node; namespace nano::transport { -/** Policy to affect at which stage a buffer can be dropped */ -enum class buffer_drop_policy -{ - /** Can be dropped by bandwidth limiter (default) */ - limiter, - /** Should not be dropped by bandwidth limiter */ - no_limiter_drop, - /** Should not be dropped by bandwidth limiter or socket write queue limiter */ - no_socket_drop -}; - -enum class socket_type -{ - undefined, - bootstrap, - realtime, - realtime_response_server // special type for tcp channel response server -}; - -std::string_view to_string (socket_type); - -enum class socket_endpoint -{ - server, // Socket was created by accepting an incoming connection - client, // Socket was created by initiating an outgoing connection -}; - -std::string_view to_string (socket_endpoint); - /** Socket class for tcp clients and newly accepted connections */ class socket final : public std::enable_shared_from_this { @@ -71,7 +43,7 @@ class socket final : public std::enable_shared_from_this explicit socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, std::size_t max_queue_size = default_max_queue_size); // TODO: Accepting remote/local endpoints as a parameter is unnecessary, but is needed for now to keep compatibility with the legacy code - explicit socket ( + socket ( nano::node &, boost::asio::ip::tcp::socket, boost::asio::ip::tcp::endpoint remote_endpoint, @@ -125,7 +97,7 @@ class socket final : public std::enable_shared_from_this } bool is_realtime_connection () const { - return type () == socket_type::realtime || type () == socket_type::realtime_response_server; + return type () == socket_type::realtime; } bool is_bootstrap_connection () const { diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index fa1fa2eb48..e60fe4428d 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -23,10 +23,21 @@ nano::transport::channel_tcp::~channel_tcp () // Close socket. Exception: socket is used by tcp_server if (auto socket_l = socket.lock ()) { - if (!temporary) - { - socket_l->close (); - } + socket_l->close (); + } +} + +void nano::transport::channel_tcp::update_endpoints () +{ + nano::lock_guard lk (channel_mutex); + + debug_assert (endpoint == nano::endpoint{}); // Not initialized endpoint value + debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value + + if (auto socket_l = socket.lock ()) + { + endpoint = socket_l->remote_endpoint (); + local_endpoint = socket_l->local_endpoint (); } } @@ -145,31 +156,87 @@ void nano::transport::tcp_channels::close () channels.clear (); } -bool nano::transport::tcp_channels::insert (std::shared_ptr const & channel_a, std::shared_ptr const & socket_a, std::shared_ptr const & server_a) +bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint, const nano::account & node_id) const { - auto endpoint (channel_a->get_tcp_endpoint ()); - debug_assert (endpoint.address ().is_v6 ()); - auto udp_endpoint (nano::transport::map_tcp_to_endpoint (endpoint)); - bool error (true); - if (!node.network.not_a_peer (udp_endpoint, node.config.allow_local_peers) && !stopped) + debug_assert (!mutex.try_lock ()); + + if (stopped) { - nano::unique_lock lock{ mutex }; - auto existing (channels.get ().find (endpoint)); - if (existing == channels.get ().end ()) + return false; // Reject + } + + if (node.network.not_a_peer (nano::transport::map_tcp_to_endpoint (endpoint), node.config.allow_local_peers)) + { + node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::not_a_peer); + node.logger.debug (nano::log::type::tcp_channels, "Rejected invalid endpoint channel from: {}", fmt::streamed (endpoint)); + + return false; // Reject + } + + bool has_duplicate = std::any_of (channels.begin (), channels.end (), [&endpoint, &node_id] (auto const & channel) { + if (nano::transport::is_same_ip (channel.endpoint ().address (), endpoint.address ())) { - auto node_id (channel_a->get_node_id ()); - if (!channel_a->temporary) + // Only counsider channels with the same node id as duplicates if they come from the same IP + if (channel.node_id () == node_id) { - channels.get ().erase (node_id); + return true; } - channels.get ().emplace (channel_a, socket_a, server_a); - attempts.get ().erase (endpoint); - error = false; - lock.unlock (); - node.network.channel_observer (channel_a); } + return false; + }); + + if (has_duplicate) + { + node.stats.inc (nano::stat::type::tcp_channels_rejected, nano::stat::detail::channel_duplicate); + node.logger.debug (nano::log::type::tcp_channels, "Duplicate channel rejected from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); + + return false; // Reject } - return error; + + return true; // OK +} + +// This should be the only place in node where channels are created +std::shared_ptr nano::transport::tcp_channels::create (const std::shared_ptr & socket, const std::shared_ptr & server, const nano::account & node_id) +{ + auto const endpoint = socket->remote_endpoint (); + debug_assert (endpoint.address ().is_v6 ()); + + nano::unique_lock lock{ mutex }; + + if (stopped) + { + return nullptr; + } + + if (!check (endpoint, node_id)) + { + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_rejected); + node.logger.debug (nano::log::type::tcp_channels, "Rejected new channel from: {} ({})", fmt::streamed (endpoint), node_id.to_node_id ()); + // Rejection reason should be logged earlier + + return nullptr; + } + + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_accepted); + node.logger.debug (nano::log::type::tcp_channels, "Accepted new channel from: {} ({})", + fmt::streamed (socket->remote_endpoint ()), + node_id.to_node_id ()); + + auto channel = std::make_shared (node, socket); + channel->update_endpoints (); + channel->set_node_id (node_id); + + attempts.get ().erase (endpoint); + + auto [_, inserted] = channels.get ().emplace (channel, socket, server); + debug_assert (inserted); + + lock.unlock (); + + node.network.channel_observer (channel); + + return channel; } void nano::transport::tcp_channels::erase (nano::tcp_endpoint const & endpoint_a) @@ -216,7 +283,7 @@ std::unordered_set> nano::transport::t continue; } - if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary)) + if (channel->get_network_version () >= min_version) { result.insert (channel); } @@ -275,65 +342,23 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () return result; } -void nano::transport::tcp_channels::process_messages () +void nano::transport::tcp_channels::queue_message (std::unique_ptr message, std::shared_ptr channel) { - while (!stopped) + if (!stopped) { - auto item = message_manager.get_message (); - if (item.message != nullptr) - { - process_message (*item.message, item.endpoint, item.node_id, item.socket); - } + message_manager.put (std::move (message), std::move (channel)); } } -void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr const & socket_a) +void nano::transport::tcp_channels::process_messages () { - auto type_a = socket_a->type (); - if (!stopped && message_a.header.version_using >= node.network_params.network.protocol_version_min) + while (!stopped) { - auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a))); - if (channel) - { - sink (message_a, channel); - } - else + auto [message, channel] = message_manager.next (); + if (message != nullptr) { - channel = node.network.find_node_id (node_id_a); - if (channel) - { - sink (message_a, channel); - } - else if (!node.network.excluded_peers.check (endpoint_a)) - { - if (!node_id_a.is_zero ()) - { - // Add temporary channel - auto temporary_channel (std::make_shared (node, socket_a)); - temporary_channel->update_endpoint (); - debug_assert (endpoint_a == temporary_channel->get_tcp_endpoint ()); - temporary_channel->set_node_id (node_id_a); - temporary_channel->set_network_version (message_a.header.version_using); - temporary_channel->temporary = true; - debug_assert (type_a == nano::transport::socket_type::realtime || type_a == nano::transport::socket_type::realtime_response_server); - // Don't insert temporary channels for response_server - if (type_a == nano::transport::socket_type::realtime) - { - insert (temporary_channel, socket_a, nullptr); - } - sink (message_a, temporary_channel); - } - else - { - // Initial node_id_handshake request without node ID - debug_assert (message_a.header.type == nano::message_type::node_id_handshake); - node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - } - } - } - if (channel) - { - channel->set_last_packet_received (std::chrono::steady_clock::now ()); + release_assert (channel != nullptr); + sink (*message, channel); } } } @@ -529,7 +554,7 @@ void nano::transport::tcp_channels::list (std::deque lock{ mutex }; // clang-format off nano::transform_if (channels.get ().begin (), channels.get ().end (), std::back_inserter (deque_a), - [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_temporary_channels_a || !channel_a.channel->temporary); }, + [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; }, [](auto const & channel) { return channel.channel; }); // clang-format on } @@ -546,199 +571,7 @@ void nano::transport::tcp_channels::modify (std::shared_ptr (node); - std::weak_ptr socket_w (socket); - auto channel (std::make_shared (node, socket_w)); - std::weak_ptr node_w (node.shared ()); - socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a), - [node_w, channel, socket, endpoint_a] (boost::system::error_code const & ec) { - if (auto node_l = node_w.lock ()) - { - if (!ec && channel) - { - // TCP node ID handshake - auto query = node_l->network.prepare_handshake_query (endpoint_a); - nano::node_id_handshake message{ node_l->network_params.network, query }; - - node_l->logger.debug (nano::log::type::tcp, "Handshake sent to: {} (query: {})", - nano::util::to_str (endpoint_a), - (query ? query->cookie.to_string () : "")); - - channel->update_endpoint (); - std::shared_ptr> receive_buffer (std::make_shared> ()); - receive_buffer->resize (256); - channel->send (message, [node_w, channel, endpoint_a, receive_buffer] (boost::system::error_code const & ec, std::size_t size_a) { - if (auto node_l = node_w.lock ()) - { - if (!ec) - { - node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer); - } - else - { - node_l->logger.debug (nano::log::type::tcp, "Error sending handshake to: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - - if (auto socket_l = channel->socket.lock ()) - { - socket_l->close (); - } - } - } - }); - } - else - { - if (ec) - { - node_l->logger.debug (nano::log::type::tcp, "Error connecting to: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - } - else - { - node_l->logger.debug (nano::log::type::tcp, "Error connecting to: {}", nano::util::to_str (endpoint_a)); - } - } - } - }); -} - -void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr> const & receive_buffer_a) -{ - std::weak_ptr node_w (node.shared ()); - auto socket_l = channel_a->socket.lock (); - if (!socket_l) - { - return; - } - auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) { - if (auto node_l = node_w.lock ()) - { - if (auto socket_l = socket_w.lock ()) - { - socket_l->close (); - } - } - }; - - auto message_deserializer = std::make_shared (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, - [socket_l] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { - debug_assert (socket_l != nullptr); - socket_l->read_impl (data_a, size_a, callback_a); - }); - message_deserializer->read ([node_w, socket_l, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code ec, std::unique_ptr message) { - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } - if (ec || !channel_a) - { - node_l->logger.debug (nano::log::type::tcp, "Error reading handshake from: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - auto error (false); - - // the header type should in principle be checked after checking the network bytes and the version numbers, I will not change it here since the benefits do not outweight the difficulties - if (error || message->type () != nano::message_type::node_id_handshake) - { - node_l->logger.debug (nano::log::type::tcp, "Error reading handshake header from: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - auto & handshake = static_cast (*message); - - if (message->header.network != node_l->network_params.network.current_network || message->header.version_using < node_l->network_params.network.protocol_version_min) - { - // error handling, either the networks bytes or the version is wrong - if (message->header.network == node_l->network_params.network.current_network) - { - node_l->stats.inc (nano::stat::type::message, nano::stat::detail::invalid_network); - } - else - { - node_l->stats.inc (nano::stat::type::message, nano::stat::detail::outdated_version); - } - - cleanup_node_id_handshake_socket (endpoint_a); - // Cleanup attempt - { - nano::lock_guard lock{ node_l->network.tcp_channels.mutex }; - node_l->network.tcp_channels.attempts.get ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a)); - } - return; - } - - if (error || !handshake.response || !handshake.query) - { - node_l->logger.debug (nano::log::type::tcp, "Error reading handshake payload from: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - channel_a->set_network_version (handshake.header.version_using); - - debug_assert (handshake.query); - debug_assert (handshake.response); - - auto const node_id = handshake.response->node_id; - - if (!node_l->network.verify_handshake_response (*handshake.response, endpoint_a)) - { - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - - /* If node ID is known, don't establish new connection - Exception: temporary channels from tcp_server */ - auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id)); - if (existing_channel && !existing_channel->temporary) - { - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - - channel_a->set_node_id (node_id); - channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); - - debug_assert (handshake.query); - auto response = node_l->network.prepare_handshake_response (*handshake.query, handshake.is_v2 ()); - nano::node_id_handshake handshake_response (node_l->network_params.network, std::nullopt, response); - - node_l->logger.debug (nano::log::type::tcp, "Handshake response sent to {} (query: {})", - nano::util::to_str (endpoint_a), - handshake.query->cookie.to_string ()); - - channel_a->send (handshake_response, [node_w, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) { - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } - if (ec || !channel_a) - { - node_l->logger.debug (nano::log::type::tcp, "Error sending handshake response to: {} ({})", nano::util::to_str (endpoint_a), ec.message ()); - - cleanup_node_id_handshake_socket (endpoint_a); - return; - } - // Insert new node ID connection - auto socket_l = channel_a->socket.lock (); - if (!socket_l) - { - return; - } - channel_a->set_last_packet_sent (std::chrono::steady_clock::now ()); - auto response_server = std::make_shared (socket_l, node_l); - node_l->network.tcp_channels.insert (channel_a, socket_l, response_server); - // Listen for possible responses - response_server->socket->type_set (nano::transport::socket_type::realtime_response_server); - response_server->remote_node_id = channel_a->get_node_id (); - response_server->start (); - }); - }); + node.tcp_listener.connect (endpoint.address (), endpoint.port ()); } diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 81c038e66f..c9a18a9a2b 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -20,40 +20,35 @@ namespace mi = boost::multi_index; namespace nano { -class tcp_message_item final -{ -public: - std::shared_ptr message; - nano::tcp_endpoint endpoint; - nano::account node_id; - std::shared_ptr socket; -}; - -class tcp_message_manager final -{ -public: - tcp_message_manager (unsigned incoming_connections_max_a); - void put_message (nano::tcp_message_item const & item_a); - nano::tcp_message_item get_message (); - // Stop container and notify waiting threads - void stop (); - -private: - nano::mutex mutex; - nano::condition_variable producer_condition; - nano::condition_variable consumer_condition; - std::deque entries; - unsigned max_entries; - static unsigned const max_entries_per_connection = 16; - bool stopped{ false }; - - friend class network_tcp_message_manager_Test; -}; - namespace transport { class tcp_server; class tcp_channels; + class channel_tcp; + + // TODO: Replace with message_processor component with fair queueing + class tcp_message_manager final + { + public: + using entry_t = std::pair, std::shared_ptr>; + + explicit tcp_message_manager (unsigned incoming_connections_max); + void stop (); + + void put (std::unique_ptr, std::shared_ptr); + entry_t next (); + + private: + nano::mutex mutex; + nano::condition_variable producer_condition; + nano::condition_variable consumer_condition; + std::deque entries; + unsigned max_entries; + static unsigned const max_entries_per_connection = 16; + bool stopped{ false }; + + friend class network_tcp_message_manager_Test; + }; class channel_tcp : public nano::transport::channel, public std::enable_shared_from_this { @@ -63,21 +58,13 @@ namespace transport channel_tcp (nano::node &, std::weak_ptr); ~channel_tcp () override; + void update_endpoints (); + // TODO: investigate clang-tidy warning about default parameters on virtual/override functions// void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; std::string to_string () const override; - void update_endpoint () - { - nano::lock_guard lk (channel_mutex); - debug_assert (endpoint == nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0)); // Not initialized endpoint value - if (auto socket_l = socket.lock ()) - { - endpoint = socket_l->remote_endpoint (); - } - } - nano::endpoint get_endpoint () const override { return nano::transport::map_tcp_to_endpoint (get_tcp_endpoint ()); @@ -89,6 +76,12 @@ namespace transport return endpoint; } + nano::endpoint get_local_endpoint () const override + { + nano::lock_guard lk (channel_mutex); + return local_endpoint; + } + nano::transport::transport_type get_type () const override { return nano::transport::transport_type::tcp; @@ -124,12 +117,9 @@ namespace transport public: std::weak_ptr socket; - /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. - If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ - std::atomic temporary{ false }; - private: - nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 }; + nano::endpoint endpoint; + nano::endpoint local_endpoint; public: // Logging void operator() (nano::object_stream &) const override; @@ -137,7 +127,7 @@ namespace transport class tcp_channels final { - friend class nano::transport::channel_tcp; + friend class channel_tcp; friend class telemetry_simultaneous_requests_Test; friend class network_peer_max_tcp_attempts_subnetwork_Test; @@ -148,7 +138,7 @@ namespace transport void start (); void stop (); - bool insert (std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); + std::shared_ptr create (std::shared_ptr const &, std::shared_ptr const &, nano::account const & node_id); void erase (nano::tcp_endpoint const &); std::size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; @@ -157,8 +147,8 @@ namespace transport std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection nano::tcp_endpoint bootstrap_peer (); + void queue_message (std::unique_ptr, std::shared_ptr); void process_messages (); - void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr const &); bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); @@ -173,16 +163,16 @@ namespace transport // Connection start void start_tcp (nano::endpoint const &); - void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &); private: // Dependencies nano::node & node; public: - nano::tcp_message_manager message_manager; + tcp_message_manager message_manager; private: void close (); + bool check (nano::tcp_endpoint const &, nano::account const & node_id) const; private: class channel_entry final @@ -215,8 +205,7 @@ namespace transport } nano::account node_id () const { - auto node_id (channel->get_node_id ()); - return node_id; + return channel->get_node_id (); } uint8_t network_version () const { diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index a06d07fb48..7cf6a0b6ee 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -8,6 +8,9 @@ #include #include +#include + +#include using namespace std::chrono_literals; @@ -15,26 +18,27 @@ using namespace std::chrono_literals; * tcp_listener */ -nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : +nano::transport::tcp_listener::tcp_listener (uint16_t port_a, tcp_config const & config_a, nano::node & node_a) : + config{ config_a }, node{ node_a }, stats{ node_a.stats }, logger{ node_a.logger }, port{ port_a }, - max_inbound_connections{ max_inbound_connections }, strand{ node_a.io_ctx.get_executor () }, acceptor{ strand }, task{ strand } { connection_accepted.add ([this] (auto const & socket, auto const & server) { - node.observers.socket_accepted.notify (*socket); + node.observers.socket_connected.notify (*socket); }); } nano::transport::tcp_listener::~tcp_listener () { - // Thread should be stopped before destruction debug_assert (!cleanup_thread.joinable ()); debug_assert (!task.joinable ()); + debug_assert (connection_count () == 0); + debug_assert (attempt_count () == 0); } void nano::transport::tcp_listener::start () @@ -91,7 +95,8 @@ void nano::transport::tcp_listener::start () { logger.critical (nano::log::type::tcp_listener, "Unknown error"); release_assert (false); // Unexpected error - } }); + } + }); cleanup_thread = std::thread ([this] { nano::thread_role::set (nano::thread_role::name::tcp_listener); @@ -130,9 +135,18 @@ void nano::transport::tcp_listener::stop () } decltype (connections) connections_l; + decltype (attempts) attempts_l; { nano::lock_guard lock{ mutex }; connections_l.swap (connections); + attempts_l.swap (attempts); + } + + for (auto & attempt : attempts_l) + { + debug_assert (attempt.task.joinable ()); + attempt.task.cancel (); + attempt.task.join (); } for (auto & connection : connections_l) @@ -154,7 +168,10 @@ void nano::transport::tcp_listener::run_cleanup () while (!stopped) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup); + cleanup (); + timeout (); + condition.wait_for (lock, 1s, [this] () { return stopped.load (); }); } } @@ -163,6 +180,7 @@ void nano::transport::tcp_listener::cleanup () { debug_assert (!mutex.try_lock ()); + // Erase dead connections erase_if (connections, [this] (auto const & connection) { if (connection.socket.expired () && connection.server.expired ()) { @@ -175,6 +193,109 @@ void nano::transport::tcp_listener::cleanup () return false; } }); + + // Erase completed attempts + erase_if (attempts, [this] (auto const & attempt) { + return attempt.task.ready (); + }); +} + +void nano::transport::tcp_listener::timeout () +{ + debug_assert (!mutex.try_lock ()); + + auto const cutoff = std::chrono::steady_clock::now () - config.connect_timeout; + + // Cancel timed out attempts + for (auto & attempt : attempts) + { + if (!attempt.task.ready () && attempt.start < cutoff) + { + attempt.task.cancel (); + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); + logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)", + fmt::streamed (attempt.endpoint), nano::log::seconds_delta (attempt.start)); + } + } +} + +bool nano::transport::tcp_listener::connect (asio::ip::address ip, uint16_t port) +{ + nano::unique_lock lock{ mutex }; + + if (port == 0) + { + port = node.network_params.network.default_node_port; + } + + if (auto count = attempts.size (); count > config.max_attempts) + { + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Max connection attempts reached ({}), rejected connection attempt: {}", + count, ip.to_string ()); + + return false; // Rejected + } + + if (auto count = count_attempts (ip); count >= config.max_attempts_per_ip) + { + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts_per_ip, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Connection attempt already in progress ({}), rejected connection attempt: {}", + count, ip.to_string ()); + + return false; // Rejected + } + + if (auto result = check_limits (ip, connection_type::outbound); result != accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_rejected, nano::stat::dir::out); + // Refusal reason should be logged earlier + + return false; // Rejected + } + + nano::tcp_endpoint const endpoint{ ip, port }; + + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_initiate, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Initiating outgoing connection to: {}", fmt::streamed (endpoint)); + + auto task = nano::async::task (strand, connect_impl (endpoint)); + + attempts.emplace_back (attempt{ endpoint, std::move (task) }); + + return true; // Attempt started +} + +auto nano::transport::tcp_listener::connect_impl (asio::ip::tcp::endpoint endpoint) -> asio::awaitable +{ + debug_assert (strand.running_in_this_thread ()); + + try + { + auto raw_socket = co_await connect_socket (endpoint); + debug_assert (strand.running_in_this_thread ()); + + auto result = accept_one (std::move (raw_socket), connection_type::outbound); + if (result.result == accept_result::accepted) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_success, nano::stat::dir::out); + logger.debug (nano::log::type::tcp_listener, "Successfully connected to: {}", fmt::streamed (endpoint)); + + release_assert (result.server); + result.server->initiate_handshake (); + } + else + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_failure, nano::stat::dir::out); + // Refusal reason should be logged earlier + } + } + catch (boost::system::system_error const & ex) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_error, nano::stat::dir::out); + logger.log (nano::log::level::debug, nano::log::type::tcp_listener, "Error connecting to: {} ({})", fmt::streamed (endpoint), ex.what ()); + } } asio::awaitable nano::transport::tcp_listener::run () @@ -188,8 +309,10 @@ asio::awaitable nano::transport::tcp_listener::run () try { auto socket = co_await accept_socket (); - auto result = accept_one (std::move (socket)); - if (result != accept_result::accepted) + debug_assert (strand.running_in_this_thread ()); + + auto result = accept_one (std::move (socket), connection_type::inbound); + if (result.result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_failure, nano::stat::dir::in); // Refusal reason should be logged earlier @@ -218,15 +341,43 @@ asio::awaitable nano::transport::tcp_listener::accept_soc co_return co_await acceptor.async_accept (asio::use_awaitable); } -auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket) -> accept_result +asio::awaitable nano::transport::tcp_listener::connect_socket (asio::ip::tcp::endpoint endpoint) +{ + debug_assert (strand.running_in_this_thread ()); + + asio::ip::tcp::socket raw_socket{ strand }; + co_await raw_socket.async_connect (endpoint, asio::use_awaitable); + + co_return raw_socket; +} + +asio::awaitable nano::transport::tcp_listener::wait_available_slots () const +{ + nano::interval log_interval; + while (connection_count () >= config.max_inbound_connections && !stopped) + { + if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s)) + { + logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})", + connection_count (), config.max_inbound_connections); + } + + co_await nano::async::sleep_for (100ms); + } +} + +auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket, connection_type type) -> accept_return { auto const remote_endpoint = raw_socket.remote_endpoint (); auto const local_endpoint = raw_socket.local_endpoint (); - if (auto result = check_limits (remote_endpoint.address ()); result != accept_result::accepted) + nano::unique_lock lock{ mutex }; + + if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_limits_exceeded, nano::stat::dir::in); - // Refusal reason should be logged earlier + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_rejected, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Rejected connection from: {} ({})", fmt::streamed (remote_endpoint), to_string (type)); + // Rejection reason should be logged earlier try { @@ -236,23 +387,22 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket } catch (boost::system::system_error const & ex) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {}", ex.what ()); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::close_error, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Error while closing socket after refusing connection: {} ({})", ex.what (), to_string (type)); } - return result; + return { result }; } - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Accepted incoming connection from: {}", fmt::streamed (remote_endpoint)); + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Accepted connection: {} ({})", fmt::streamed (remote_endpoint), to_string (type)); - auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, socket_endpoint::server); + auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_endpoint (type)); auto server = std::make_shared (socket, node.shared (), true); - { - nano::lock_guard lock{ mutex }; - connections.emplace (entry{ remote_endpoint, socket, server }); - } + connections.emplace_back (connection{ remote_endpoint, socket, server }); + + lock.unlock (); socket->set_timeout (node.network_params.network.idle_timeout); socket->start (); @@ -260,49 +410,37 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket connection_accepted.notify (socket, server); - return accept_result::accepted; + return { accept_result::accepted, socket, server }; } -asio::awaitable nano::transport::tcp_listener::wait_available_slots () const +auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result { - nano::interval log_interval; - while (connection_count () >= max_inbound_connections && !stopped) - { - if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s)) - { - logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})", - connection_count (), max_inbound_connections); - } + debug_assert (!mutex.try_lock ()); - co_await nano::async::sleep_for (100ms); + if (stopped) + { + return accept_result::rejected; } -} - -auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) -> accept_result -{ - nano::lock_guard lock{ mutex }; cleanup (); - debug_assert (connections.size () <= max_inbound_connections); // Should be checked earlier (wait_available_slots) - if (node.network.excluded_peers.check (ip)) // true => error { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::excluded, nano::stat::dir::in); + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type)); logger.debug (nano::log::type::tcp_listener, "Rejected connection from excluded peer: {}", ip.to_string ()); - return accept_result::excluded; + return accept_result::rejected; } if (!node.flags.disable_max_peers_per_ip) { if (auto count = count_per_ip (ip); count >= node.network_params.network.max_peers_per_ip) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_ip, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_ip, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection: {}", + count, ip.to_string ()); - return accept_result::too_many_per_ip; + return accept_result::rejected; } } @@ -311,11 +449,36 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip) { if (auto count = count_per_subnetwork (ip); count >= node.network_params.network.max_peers_per_subnetwork) { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::max_per_subnetwork, nano::stat::dir::in); - logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (ip: {}, count: {}), unable to open new connection", - ip.to_string (), count); + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_per_subnetwork, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached ({}), unable to open new connection: {}", + count, ip.to_string ()); + + return accept_result::rejected; + } + } + + if (type == connection_type::inbound) + { + debug_assert (connections.size () <= config.max_inbound_connections); // Should be checked earlier (wait_available_slots) + + if (auto count = count_per_type (connection_type::inbound); count >= config.max_inbound_connections) + { + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max inbound connections reached ({}), unable to accept new connection: {}", + count, ip.to_string ()); + + return accept_result::rejected; + } + } + if (type == connection_type::outbound) + { + if (auto count = count_per_type (connection_type::outbound); count >= config.max_outbound_connections) + { + stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::max_attempts, to_stat_dir (type)); + logger.debug (nano::log::type::tcp_listener, "Max outbound connections reached ({}), unable to initiate new connection: {}", + count, ip.to_string ()); - return accept_result::too_many_per_subnetwork; + return accept_result::rejected; } } @@ -328,6 +491,12 @@ size_t nano::transport::tcp_listener::connection_count () const return connections.size (); } +size_t nano::transport::tcp_listener::attempt_count () const +{ + nano::lock_guard lock{ mutex }; + return attempts.size (); +} + size_t nano::transport::tcp_listener::realtime_count () const { nano::lock_guard lock{ mutex }; @@ -354,6 +523,19 @@ size_t nano::transport::tcp_listener::bootstrap_count () const }); } +size_t nano::transport::tcp_listener::count_per_type (connection_type type) const +{ + debug_assert (!mutex.try_lock ()); + + return std::count_if (connections.begin (), connections.end (), [type] (auto const & connection) { + if (auto socket = connection.socket.lock ()) + { + return socket->endpoint_type () == to_socket_endpoint (type); + } + return false; + }); +} + size_t nano::transport::tcp_listener::count_per_ip (asio::ip::address const & ip) const { debug_assert (!mutex.try_lock ()); @@ -372,15 +554,74 @@ size_t nano::transport::tcp_listener::count_per_subnetwork (asio::ip::address co }); } +size_t nano::transport::tcp_listener::count_attempts (asio::ip::address const & ip) const +{ + debug_assert (!mutex.try_lock ()); + + return std::count_if (attempts.begin (), attempts.end (), [&ip] (auto const & attempt) { + return nano::transport::is_same_ip (attempt.address (), ip); + }); +} + asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { nano::lock_guard lock{ mutex }; return { asio::ip::address_v6::loopback (), local.port () }; } +auto nano::transport::tcp_listener::sockets () const -> std::vector> +{ + nano::lock_guard lock{ mutex }; + auto r = connections + | std::views::transform ([] (auto const & connection) { return connection.socket.lock (); }) + | std::views::filter ([] (auto const & socket) { return socket != nullptr; }); + return { r.begin (), r.end () }; +} + +auto nano::transport::tcp_listener::servers () const -> std::vector> +{ + nano::lock_guard lock{ mutex }; + auto r = connections + | std::views::transform ([] (auto const & connection) { return connection.server.lock (); }) + | std::views::filter ([] (auto const & server) { return server != nullptr; }); + return { r.begin (), r.end () }; +} + std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) { auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); + composite->add_component (std::make_unique (container_info{ "attempts", attempt_count (), sizeof (decltype (attempts)::value_type) })); return composite; -} \ No newline at end of file +} + +nano::stat::dir nano::transport::tcp_listener::to_stat_dir (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return nano::stat::dir::in; + case connection_type::outbound: + return nano::stat::dir::out; + } + debug_assert (false); + return {}; +} + +std::string_view nano::transport::tcp_listener::to_string (connection_type type) +{ + return magic_enum::enum_name (type); +} + +nano::transport::socket_endpoint nano::transport::tcp_listener::to_socket_endpoint (connection_type type) +{ + switch (type) + { + case connection_type::inbound: + return socket_endpoint::server; + case connection_type::outbound: + return socket_endpoint::client; + } + debug_assert (false); + return {}; +} diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 80a825aa10..371399137a 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -9,42 +11,71 @@ #include #include +#include #include +#include #include #include namespace mi = boost::multi_index; namespace asio = boost::asio; -namespace nano -{ -class node; -class stats; -class logger; -} - namespace nano::transport { class socket; class tcp_server; +class tcp_config +{ +public: + explicit tcp_config (nano::network_constants const & network) + { + if (network.is_dev_network ()) + { + max_inbound_connections = 128; + max_outbound_connections = 128; + max_attempts = 128; + max_attempts_per_ip = 128; + connect_timeout = std::chrono::seconds{ 5 }; + } + } + +public: + size_t max_inbound_connections{ 2048 }; + size_t max_outbound_connections{ 2048 }; + size_t max_attempts{ 60 }; + size_t max_attempts_per_ip{ 1 }; + std::chrono::seconds connect_timeout{ 60 }; +}; + /** * Server side portion of tcp sessions. Listens for new socket connections and spawns tcp_server objects when connected. */ class tcp_listener final { public: - tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); + tcp_listener (uint16_t port, tcp_config const &, nano::node &); ~tcp_listener (); void start (); void stop (); + /** + * @param port is optional, if 0 then default peering port is used + * @return true if connection attempt was initiated + */ + bool connect (asio::ip::address ip, uint16_t port = 0); + nano::tcp_endpoint endpoint () const; + size_t connection_count () const; + size_t attempt_count () const; size_t realtime_count () const; size_t bootstrap_count () const; + std::vector> sockets () const; + std::vector> servers () const; + std::unique_ptr collect_container_info (std::string const & name); public: // Events @@ -52,6 +83,7 @@ class tcp_listener final connection_accepted_event_t connection_accepted; private: // Dependencies + tcp_config const & config; nano::node & node; nano::stats & stats; nano::logger & logger; @@ -62,25 +94,43 @@ class tcp_listener final void run_cleanup (); void cleanup (); + void timeout (); enum class accept_result { invalid, accepted, - too_many_per_ip, - too_many_per_subnetwork, - excluded, + rejected, + error, + }; + + enum class connection_type + { + inbound, + outbound, }; - accept_result accept_one (asio::ip::tcp::socket); - accept_result check_limits (asio::ip::address const & ip); + asio::awaitable connect_impl (asio::ip::tcp::endpoint); + asio::awaitable connect_socket (asio::ip::tcp::endpoint); + + struct accept_return + { + accept_result result; + std::shared_ptr socket; + std::shared_ptr server; + }; + + accept_return accept_one (asio::ip::tcp::socket, connection_type); + accept_result check_limits (asio::ip::address const & ip, connection_type); asio::awaitable accept_socket (); + size_t count_per_type (connection_type) const; size_t count_per_ip (asio::ip::address const & ip) const; size_t count_per_subnetwork (asio::ip::address const & ip) const; + size_t count_attempts (asio::ip::address const & ip) const; private: - struct entry + struct connection { asio::ip::tcp::endpoint endpoint; std::weak_ptr socket; @@ -92,20 +142,24 @@ class tcp_listener final } }; + struct attempt + { + asio::ip::tcp::endpoint endpoint; + nano::async::task task; + + std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () }; + + asio::ip::address address () const + { + return endpoint.address (); + } + }; + private: uint16_t const port; - std::size_t const max_inbound_connections; - - // clang-format off - class tag_address {}; - using ordered_connections = boost::multi_index_container, - mi::const_mem_fun> - >>; - // clang-format on - ordered_connections connections; + std::list connections; + std::list attempts; nano::async::strand strand; @@ -117,5 +171,10 @@ class tcp_listener final mutable nano::mutex mutex; nano::async::task task; std::thread cleanup_thread; + +private: + static nano::stat::dir to_stat_dir (connection_type); + static std::string_view to_string (connection_type); + static nano::transport::socket_endpoint to_socket_endpoint (connection_type); }; } \ No newline at end of file diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index a0dc394de4..ab9ce23693 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -38,17 +38,6 @@ nano::transport::tcp_server::~tcp_server () node->logger.debug (nano::log::type::tcp_server, "Exiting server: {}", fmt::streamed (remote_endpoint)); - if (socket->type () == nano::transport::socket_type::realtime) - { - // Clear temporary channel - auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint)); - if (exisiting_response_channel != nullptr) - { - exisiting_response_channel->temporary = false; - node->network.tcp_channels.erase (remote_endpoint); - } - } - stop (); } @@ -260,7 +249,11 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr { return; } - node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); + + release_assert (channel != nullptr); + + channel->set_last_packet_received (std::chrono::steady_clock::now ()); + node->network.tcp_channels.queue_message (std::move (message), channel); } auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status @@ -296,7 +289,9 @@ auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake con handshake_received = true; node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - node->logger.debug (nano::log::type::tcp_server, "Handshake message received ({})", fmt::streamed (remote_endpoint)); + node->logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})", + message.query ? (message.response ? "query + response" : "query") : (message.response ? "response" : "none"), + fmt::streamed (remote_endpoint)); if (message.query) { @@ -333,6 +328,42 @@ auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake con return handshake_status::handshake; // Handshake is in progress } +void nano::transport::tcp_server::initiate_handshake () +{ + auto node = this->node.lock (); + if (!node) + { + return; + } + + auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); + nano::node_id_handshake message{ node->network_params.network, query }; + + node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", fmt::streamed (remote_endpoint)); + + auto shared_const_buffer = message.to_shared_const_buffer (); + socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { + auto node = this_l->node.lock (); + if (!node) + { + return; + } + if (ec) + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint)); + + // Stop invalid handshake + this_l->stop (); + } + else + { + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out); + } + }); +} + void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) { auto node = this->node.lock (); @@ -634,7 +665,13 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & return false; } - remote_node_id = node_id; + auto channel_l = node->network.tcp_channels.create (socket, shared_from_this (), node_id); + if (!channel_l) + { + return false; + } + channel = channel_l; + socket->type_set (nano::transport::socket_type::realtime); node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint)); diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index fdd28f1879..0ce95cdefe 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -25,6 +25,7 @@ class tcp_server final : public std::enable_shared_from_this void start (); void stop (); + void initiate_handshake (); void timeout (); void set_last_keepalive (nano::keepalive const & message); std::optional pop_last_keepalive (); @@ -36,7 +37,6 @@ class tcp_server final : public std::enable_shared_from_this std::atomic handshake_received{ false }; // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; - nano::account remote_node_id{}; std::chrono::steady_clock::time_point last_telemetry_req{}; private: @@ -74,6 +74,9 @@ class tcp_server final : public std::enable_shared_from_this std::shared_ptr message_deserializer; std::optional last_keepalive; + // Every realtime connection must have an associated channel + std::shared_ptr channel; + private: // Visitors class handshake_message_visitor : public nano::message_visitor { diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index d8a8cebc2b..3032daf5de 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1710,23 +1710,28 @@ TEST (rpc, keepalive) TEST (rpc, peers) { nano::test::system system; - auto node = add_ipc_enabled_node (system); + // Add node2 first to avoid peers with ephemeral ports auto const node2 = system.add_node (); + auto node = add_ipc_enabled_node (system); auto const rpc_ctx = add_rpc (system, node); boost::property_tree::ptree request; request.put ("action", "peers"); auto response (wait_response (system, rpc_ctx, request)); auto & peers_node (response.get_child ("peers")); ASSERT_EQ (1, peers_node.size ()); - ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), peers_node.get ((boost::format ("[::1]:%1%") % node2->network.endpoint ().port ()).str ())); + + auto peer = peers_node.begin (); + ASSERT_EQ (peer->first, boost::lexical_cast (node2->network.endpoint ())); + ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), peers_node.get (peer->first)); // The previous version of this test had an UDP connection to an arbitrary IP address, so it could check for two peers. This doesn't work with TCP. } TEST (rpc, peers_node_id) { nano::test::system system; - auto node = add_ipc_enabled_node (system); + // Add node2 first to avoid peers with ephemeral ports auto const node2 = system.add_node (); + auto node = add_ipc_enabled_node (system); auto const rpc_ctx = add_rpc (system, node); boost::property_tree::ptree request; request.put ("action", "peers"); @@ -1734,12 +1739,35 @@ TEST (rpc, peers_node_id) auto response (wait_response (system, rpc_ctx, request)); auto & peers_node (response.get_child ("peers")); ASSERT_EQ (1, peers_node.size ()); - auto tree1 (peers_node.get_child ((boost::format ("[::1]:%1%") % node2->network.endpoint ().port ()).str ())); + + auto peer = peers_node.begin (); + ASSERT_EQ (peer->first, boost::lexical_cast (node2->network.endpoint ())); + + auto tree1 = peer->second; ASSERT_EQ (std::to_string (node->network_params.network.protocol_version), tree1.get ("protocol_version")); - ASSERT_EQ (system.nodes[1]->node_id.pub.to_node_id (), tree1.get ("node_id")); + ASSERT_EQ (node2->node_id.pub.to_node_id (), tree1.get ("node_id")); // The previous version of this test had an UDP connection to an arbitrary IP address, so it could check for two peers. This doesn't work with TCP. } +TEST (rpc, peers_peering_endpoint) +{ + nano::test::system system; + // Add node first, so that node2 will connect to node from ephemeral port + auto node = add_ipc_enabled_node (system); + auto const node2 = system.add_node (); + auto const rpc_ctx = add_rpc (system, node); + boost::property_tree::ptree request; + request.put ("action", "peers"); + request.put ("peer_details", true); + auto response (wait_response (system, rpc_ctx, request)); + auto & peers_node (response.get_child ("peers")); + ASSERT_EQ (1, peers_node.size ()); + + auto peer = peers_node.begin (); + ASSERT_NE (peer->first, boost::lexical_cast (node2->network.endpoint ())); + ASSERT_EQ (peer->second.get ("peering"), boost::lexical_cast (node2->network.endpoint ())); +} + TEST (rpc, version) { nano::test::system system;