Skip to content

Commit

Permalink
Merge pull request #4595 from pwojcikdev/networking-fixes/connecting-…
Browse files Browse the repository at this point in the history
…4 "Remove temporary channels"

Remove temporary channels
  • Loading branch information
pwojcikdev authored May 4, 2024
2 parents fd3a5ab + 28af014 commit 958c6ea
Show file tree
Hide file tree
Showing 29 changed files with 799 additions and 610 deletions.
2 changes: 1 addition & 1 deletion nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ TEST (bootstrap_processor, process_none)
auto node1 = system.make_disconnected_node ();

bool done = false;
node0->observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
node0->observers.socket_connected.add ([&] (nano::transport::socket & socket) {
done = true;
});

Expand Down
134 changes: 48 additions & 86 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ TEST (network, tcp_no_accept_excluded_peers)
{
node0->network.excluded_peers.add (endpoint1_tcp);
}
ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded));
ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded));
node1->network.merge_peer (node0->network.endpoint ());
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::excluded) >= 1);
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded) >= 1);
ASSERT_EQ (nullptr, node0->network.find_node_id (node1->get_node_id ()));

// Should not actively reachout to excluded peers
Expand All @@ -878,12 +878,12 @@ TEST (network, tcp_no_accept_excluded_peers)
ASSERT_TIMELY_EQ (5s, node0->network.size (), 1);
}

/*
namespace nano
{
TEST (network, tcp_message_manager)
{
nano::tcp_message_manager manager (1);
nano::tcp_message_item item;
nano::transport::tcp_message_manager manager (1);
item.node_id = nano::account (100);
ASSERT_EQ (0, manager.entries.size ());
manager.put_message (item);
Expand Down Expand Up @@ -938,6 +938,7 @@ TEST (network, tcp_message_manager)
}
}
}
*/

TEST (network, cleanup_purge)
{
Expand Down Expand Up @@ -1026,9 +1027,14 @@ TEST (network, filter_invalid_version_using)
TEST (network, fill_keepalive_self)
{
nano::test::system system{ 2 };
std::array<nano::endpoint, 8> target;
system.nodes[0]->network.fill_keepalive_self (target);
ASSERT_EQ (target[2].port (), system.nodes[1]->network.port);

auto get_keepalive = [&system] (nano::node & node) {
std::array<nano::endpoint, 8> target;
node.network.fill_keepalive_self (target);
return target;
};

ASSERT_TIMELY_EQ (5s, get_keepalive (system.node (0))[2].port (), system.nodes[1]->network.port);
}

TEST (network, reconnect_cached)
Expand Down Expand Up @@ -1085,9 +1091,9 @@ TEST (network, reconnect_cached)
/*
* Tests that channel and channel container removes channels with dead local sockets
*/
TEST (network, purge_dead_channel_outgoing)
TEST (network, purge_dead_channel)
{
nano::test::system system{};
nano::test::system system;

nano::node_flags flags;
// Disable non realtime sockets
Expand All @@ -1100,66 +1106,44 @@ TEST (network, purge_dead_channel_outgoing)

auto & node1 = *system.add_node (flags);

// We expect one incoming and one outgoing connection
std::shared_ptr<nano::transport::socket> outgoing;
std::shared_ptr<nano::transport::socket> incoming;

std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();

std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});

std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();

std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
node1.observers.socket_connected.add ([&] (nano::transport::socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
});

auto & node2 = *system.add_node (flags);

ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);

ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);

ASSERT_EQ (node1.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1);
ASSERT_ALWAYS_EQ (500ms, node1.network.size (), 1);

// Store reference to the only channel
auto channels = node1.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);

auto sockets = node1.tcp_listener.sockets ();
ASSERT_EQ (sockets.size (), 1);
auto socket = sockets.front ();
ASSERT_TRUE (socket);

// When socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
outgoing->close ();
ASSERT_TIMELY (5s, !channel->alive ());

// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, connected_count, 2);
ASSERT_ALWAYS_EQ (1s, connected_count, 2);
socket->close ();
ASSERT_TIMELY (10s, !channel->alive ());

// Check that a new channel is healthy
auto channels2 = node1.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
auto channel_exists = [] (auto & node, auto & channel) {
auto channels = node.network.list ();
return std::find (channels.begin (), channels.end (), channel) != channels.end ();
};
ASSERT_TIMELY (5s, !channel_exists (node1, channel));
}

/*
* Tests that channel and channel container removes channels with dead remote sockets
*/
TEST (network, purge_dead_channel_incoming)
TEST (network, purge_dead_channel_remote)
{
nano::test::system system{};
nano::test::system system;

nano::node_flags flags;
// Disable non realtime sockets
Expand All @@ -1171,57 +1155,35 @@ TEST (network, purge_dead_channel_incoming)
flags.disable_wallet_bootstrap = true;

auto & node1 = *system.add_node (flags);

// We expect one incoming and one outgoing connection
std::shared_ptr<nano::transport::socket> outgoing;
std::shared_ptr<nano::transport::socket> incoming;

std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::transport::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();

std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});

std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::transport::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();

std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
});

auto & node2 = *system.add_node (flags);

ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);

ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);
node2.observers.socket_connected.add ([&] (nano::transport::socket & sock) {
system.logger.debug (nano::log::type::test, "Connected: {}", sock);
});

ASSERT_EQ (node1.network.size (), 1);
ASSERT_EQ (node2.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1);
ASSERT_ALWAYS_EQ (500ms, std::min (node1.network.size (), node2.network.size ()), 1);

// Store reference to the only channel
auto channels = node2.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);

auto sockets = node1.tcp_listener.sockets ();
ASSERT_EQ (sockets.size (), 1);
auto socket = sockets.front ();
ASSERT_TRUE (socket);

// When remote socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
incoming->close ();
socket->close ();
ASSERT_TIMELY (5s, !channel->alive ());

// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, accepted_count, 2);
ASSERT_ALWAYS_EQ (1s, accepted_count, 2);

// Check that a new channel is healthy
auto channels2 = node2.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
auto channel_exists = [] (auto & node, auto & channel) {
auto channels = node.network.list ();
return std::find (channels.begin (), channels.end (), channel) != channels.end ();
};
ASSERT_TIMELY (5s, !channel_exists (node2, channel));
}
8 changes: 6 additions & 2 deletions nano/core_test/peer_history.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ TEST (peer_history, erase_old)
{
nano::test::system system;

auto & node1 = *system.add_node ();
auto & node2 = *system.add_node ();
auto node_config = system.default_config ();
node_config.peer_history.erase_cutoff = 1s;

auto & node1 = *system.add_node (node_config);
auto & node2 = *system.add_node (node_config);

ASSERT_TIMELY (5s, node1.peer_history.exists (node2.network.endpoint ()));
ASSERT_TIMELY (5s, node2.peer_history.exists (node1.network.endpoint ()));
Expand All @@ -41,6 +44,7 @@ TEST (peer_history, erase_old)
ASSERT_EQ (cached1.size (), 1);
ASSERT_EQ (cached1[0], node2_endpoint);

ASSERT_TIMELY_EQ (5s, node1.network.size (), 0);
ASSERT_TIMELY (5s, !node1.peer_history.exists (node2_endpoint));

auto cached2 = node1.peer_history.peers ();
Expand Down
11 changes: 6 additions & 5 deletions nano/core_test/rep_crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ TEST (rep_crawler, rep_weight)
ASSERT_TRUE (nano::test::process (node2, { block1, block2, block3, block4 }));
ASSERT_TRUE (nano::test::process (node3, { block1, block2, block3, block4 }));
ASSERT_TRUE (node.rep_crawler.representatives (1).empty ());
std::shared_ptr<nano::transport::channel> channel1 = nano::test::establish_tcp (system, node, node1.network.endpoint ());

ASSERT_TIMELY (5s, node.network.size () == 3);
auto channel1 = node.network.find_node_id (node1.node_id.pub);
auto channel2 = node.network.find_node_id (node2.node_id.pub);
auto channel3 = node.network.find_node_id (node3.node_id.pub);
ASSERT_NE (nullptr, channel1);
std::shared_ptr<nano::transport::channel> channel2 = nano::test::establish_tcp (system, node, node2.network.endpoint ());
ASSERT_NE (nullptr, channel2);
std::shared_ptr<nano::transport::channel> channel3 = nano::test::establish_tcp (system, node, node3.network.endpoint ());
ASSERT_NE (nullptr, channel3);

auto vote0 = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
auto vote1 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
auto vote2 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
Expand Down Expand Up @@ -227,8 +230,6 @@ TEST (rep_crawler, rep_remove)
reps = searching_node.rep_crawler.representatives (1);
ASSERT_EQ (nano::dev::genesis_key.pub, reps[0].account);
ASSERT_TIMELY_EQ (5s, searching_node.network.size (), 1);
auto list (searching_node.network.list (1));
ASSERT_EQ (node_genesis_rep->network.endpoint (), list[0]->get_endpoint ());
}

TEST (rep_crawler, rep_connection_close)
Expand Down
Loading

0 comments on commit 958c6ea

Please sign in to comment.