diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 0e28586de8..f5db2ddd46 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -9,12 +10,17 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include + using namespace std::chrono_literals; TEST (request_aggregator, one) @@ -450,3 +456,280 @@ TEST (request_aggregator, cannot_vote) ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); ASSERT_TIMELY (3s, 1 <= node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } + +namespace +{ +std::future observe_confirm_ack (std::shared_ptr const & channel) +{ + std::promise promise; + auto future = promise.get_future (); + + struct confirm_ack_visitor : public nano::message_visitor + { + std::optional result; + + void confirm_ack (nano::confirm_ack const & msg) override + { + result = msg; + } + }; + + channel->observers.clear (); + channel->observers.add (nano::wrap_move_only ([&, promise = std::move (promise)] (nano::message const & message, nano::transport::traffic_type const & type) mutable { + confirm_ack_visitor visitor{}; + message.visit (visitor); + if (visitor.result) + { + promise.set_value (visitor.result.value ()); + } + })); + + return future; +} +} + +/* + * Request for a forked open block should return vote for the correct fork alternative + */ +TEST (request_aggregator, forked_open) +{ + nano::test::system system; + auto & node = *system.add_node (); + + // Voting needs a rep key set up on the node + system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); + + // Setup two forks of the open block + nano::keypair key; + nano::block_builder builder; + auto send0 = builder.send () + .previous (nano::dev::genesis->hash ()) + .destination (key.pub) + .balance (nano::dev::constants.genesis_amount - 500) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build (); + auto open0 = builder.open () + .source (send0->hash ()) + .representative (1) + .account (key.pub) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build (); + auto open1 = builder.open () + .source (send0->hash ()) + .representative (2) + .account (key.pub) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build (); + + nano::test::process (node, { send0, open0 }); + nano::test::confirm (node, { open0 }); + + auto channel = nano::test::test_channel (node); + auto future = observe_confirm_ack (channel); + + // Request vote for the wrong fork + std::vector> request{ { open1->hash (), open1->root () } }; + ASSERT_TRUE (node.aggregator.request (request, channel)); + + ASSERT_EQ (future.wait_for (5s), std::future_status::ready); + + auto ack = future.get (); + ASSERT_EQ (ack.vote->hashes.size (), 1); + ASSERT_EQ (ack.vote->hashes[0], open0->hash ()); // Vote for the correct fork alternative + ASSERT_EQ (ack.vote->account, nano::dev::genesis_key.pub); +} + +/* + * Request for a conflicting epoch block should return vote for the correct alternative + */ +TEST (request_aggregator, epoch_conflict) +{ + nano::test::system system; + auto & node = *system.add_node (); + + // Voting needs a rep key set up on the node + system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); + + // Setup the initial chain and the conflicting blocks + nano::keypair key; + nano::keypair epoch_signer (nano::dev::genesis_key); + nano::state_block_builder builder; + + // Create initial chain: send -> open -> change + auto send = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 1) + .link (key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build (); + + auto open = builder.make_block () + .account (key.pub) + .previous (0) + .representative (key.pub) + .balance (1) + .link (send->hash ()) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build (); + + // Change block root is the open block hash, qualified root: {open, open} + auto change = builder.make_block () + .account (key.pub) + .previous (open->hash ()) + .representative (key.pub) + .balance (1) + .link (0) + .sign (key.prv, key.pub) + .work (*system.work.generate (open->hash ())) + .build (); + + // Pending entry is needed first to process the epoch open block + auto pending = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2) + .link (change->root ().as_account ()) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send->hash ())) + .build (); + + // Create conflicting epoch block with the same root as the change block, qualified root: {open, 0} + // This block is intentionally not processed immediately so the node doesn't know about it + auto epoch_open = builder.make_block () + .account (change->root ().as_account ()) + .previous (0) + .representative (0) + .balance (0) + .link (node.ledger.epoch_link (nano::epoch::epoch_1)) + .sign (epoch_signer.prv, epoch_signer.pub) + .work (*system.work.generate (open->hash ())) + .build (); + + // Process and confirm the initial chain with the change block + nano::test::process (node, { send, open, change }); + nano::test::confirm (node, { change }); + ASSERT_TIMELY (5s, node.block_confirmed (change->hash ())); + + auto channel = nano::test::test_channel (node); + + // Request vote for the conflicting epoch block + std::vector> request{ { epoch_open->hash (), epoch_open->root () } }; + auto future1 = observe_confirm_ack (channel); + ASSERT_TRUE (node.aggregator.request (request, channel)); + + ASSERT_EQ (future1.wait_for (5s), std::future_status::ready); + + auto ack1 = future1.get (); + ASSERT_EQ (ack1.vote->hashes.size (), 1); + ASSERT_EQ (ack1.vote->hashes[0], change->hash ()); // Vote for the correct alternative (change block) + ASSERT_EQ (ack1.vote->account, nano::dev::genesis_key.pub); + + // Process the conflicting epoch block + nano::test::process (node, { pending, epoch_open }); + nano::test::confirm (node, { pending, epoch_open }); + + // Workaround for vote spacing dropping requests with the same root + // FIXME: Vote spacing should use full qualified root + WAIT (1s); + + // Request vote for the conflicting epoch block again + auto future2 = observe_confirm_ack (channel); + ASSERT_TRUE (node.aggregator.request (request, channel)); + + ASSERT_EQ (future2.wait_for (5s), std::future_status::ready); + + auto ack2 = future2.get (); + ASSERT_EQ (ack2.vote->hashes.size (), 1); + ASSERT_EQ (ack2.vote->hashes[0], epoch_open->hash ()); // Vote for the epoch block + ASSERT_EQ (ack2.vote->account, nano::dev::genesis_key.pub); +} + +/* + * Request for multiple cemented blocks in a chain should generate votes regardless of vote spacing + */ +TEST (request_aggregator, cemented_no_spacing) +{ + nano::test::system system; + auto & node = *system.add_node (); + + // Voting needs a rep key set up on the node + system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); + + // Create a chain of 3 blocks: send1 -> send2 -> send3 + nano::state_block_builder builder; + auto send1 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 1) + .link (nano::dev::genesis_key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (nano::dev::genesis->hash ())) + .build (); + + auto send2 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2) + .link (nano::dev::genesis_key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send1->hash ())) + .build (); + + auto send3 = builder.make_block () + .account (nano::dev::genesis_key.pub) + .previous (send2->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 3) + .link (nano::dev::genesis_key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (send2->hash ())) + .build (); + + // Process and confirm all blocks in the chain + nano::test::process (node, { send1, send2, send3 }); + nano::test::confirm (node, { send1, send2, send3 }); + ASSERT_TRUE (node.block_confirmed (send3->hash ())); + + auto channel = nano::test::test_channel (node); + + // Request votes for blocks at different positions in the chain + std::vector> request{ + { send1->hash (), send1->root () }, + { send2->hash (), send2->root () }, + { send3->hash (), send3->root () } + }; + + // Request votes for all blocks + auto future = observe_confirm_ack (channel); + ASSERT_TRUE (node.aggregator.request (request, channel)); + + // Wait for the votes + ASSERT_EQ (future.wait_for (5s), std::future_status::ready); + auto ack = future.get (); + + // Verify we got votes for all blocks in the chain + ASSERT_EQ (ack.vote->hashes.size (), 3); + ASSERT_EQ (ack.vote->account, nano::dev::genesis_key.pub); + + // Verify individual vote properties + std::set voted_hashes; + for (auto const & hash : ack.vote->hashes) + { + voted_hashes.insert (hash); + } + + // Verify we got votes for all three blocks + ASSERT_TRUE (voted_hashes.find (send1->hash ()) != voted_hashes.end ()); + ASSERT_TRUE (voted_hashes.find (send2->hash ()) != voted_hashes.end ()); + ASSERT_TRUE (voted_hashes.find (send3->hash ()) != voted_hashes.end ()); +} \ No newline at end of file diff --git a/nano/lib/function.hpp b/nano/lib/function.hpp new file mode 100644 index 0000000000..a678298185 --- /dev/null +++ b/nano/lib/function.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include + +namespace nano +{ +// TODO: Replace with std::move_only_function in C++23 +template +auto wrap_move_only (F && f) +{ + using fn_type = decltype (std::function{ std::declval () }); + auto ptr = std::make_shared> (std::forward (f)); + return fn_type ([ptr] (auto &&... args) { + return (*ptr) (std::forward (args)...); + }); +} +} \ No newline at end of file diff --git a/nano/lib/observer_set.hpp b/nano/lib/observer_set.hpp index 5630515fa6..b0048bdb2b 100644 --- a/nano/lib/observer_set.hpp +++ b/nano/lib/observer_set.hpp @@ -46,6 +46,12 @@ class observer_set final return observers.size (); } + void clear () + { + nano::lock_guard lock{ mutex }; + observers.clear (); + } + nano::container_info container_info () const { nano::unique_lock lock{ mutex }; diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 77eb01e82a..396fe33cd1 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -174,6 +174,8 @@ add_library( transport/tcp_server.cpp transport/tcp_socket.hpp transport/tcp_socket.cpp + transport/test_channel.hpp + transport/test_channel.cpp transport/traffic_type.hpp transport/traffic_type.cpp transport/transport.hpp diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index ea470f2ec1..ff80fdb72e 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -16,8 +16,7 @@ nano::transport::channel::channel (nano::node & node_a) : bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback) { - auto buffer = message.to_shared_const_buffer (); - bool sent = send_buffer (buffer, traffic_type, std::move (callback)); + bool sent = send_impl (message, traffic_type, std::move (callback)); node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true); return sent; } diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 5911f1249c..cafb830bc2 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -120,7 +120,7 @@ class channel std::shared_ptr owner () const; protected: - virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0; + virtual bool send_impl (nano::message const &, nano::transport::traffic_type, callback_t) = 0; protected: nano::node & node; diff --git a/nano/node/transport/fake.cpp b/nano/node/transport/fake.cpp index 81d3fec7aa..2da2b6e294 100644 --- a/nano/node/transport/fake.cpp +++ b/nano/node/transport/fake.cpp @@ -14,8 +14,9 @@ nano::transport::fake::channel::channel (nano::node & node) : /** * The send function behaves like a null device, it throws the data away and returns success. */ -bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) +bool nano::transport::fake::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { + auto buffer = message.to_shared_const_buffer (); auto size = buffer.size (); if (callback) { diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index f503b2bd58..2ddec4a907 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -50,7 +50,7 @@ namespace transport } protected: - bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; + bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: nano::endpoint endpoint; diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index 78a4be9904..5858dc16e4 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -18,8 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. */ -bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) +bool nano::transport::inproc::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback) { + auto buffer = message.to_shared_const_buffer (); + std::size_t offset{ 0 }; auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { debug_assert (buffer_v.size () >= (offset + size_a)); diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index dfe932e777..37b3c44959 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -40,7 +40,7 @@ namespace transport } protected: - bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; + bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: nano::node & destination; diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index 47ed452fe6..ca69f8d804 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -76,20 +76,27 @@ bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_ty return queue.max (traffic_type); } -bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type type, nano::transport::channel::callback_t callback) +bool nano::transport::tcp_channel::send_impl (nano::message const & message, nano::transport::traffic_type type, nano::transport::channel::callback_t callback) { + auto buffer = message.to_shared_const_buffer (); + nano::unique_lock lock{ mutex }; if (!queue.full (type)) { queue.push (type, { buffer, callback }); lock.unlock (); + node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out); node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out); - sending_task.notify (); + + sending_task.notify (); // Wake up the sending task + return true; } else { + lock.unlock (); + node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out); node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out); } diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index fc64ed9021..c48d079b6f 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -66,7 +66,7 @@ class tcp_channel final : public nano::transport::channel, public std::enable_sh std::string to_string () const override; protected: - bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; + bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override; private: void start (); diff --git a/nano/node/transport/test_channel.cpp b/nano/node/transport/test_channel.cpp new file mode 100644 index 0000000000..743c0e2493 --- /dev/null +++ b/nano/node/transport/test_channel.cpp @@ -0,0 +1,19 @@ +#include +#include + +nano::transport::test_channel::test_channel (nano::node & node_a) : + channel (node_a) +{ +} + +bool nano::transport::test_channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback) +{ + observers.notify (message, traffic_type); + + if (callback) + { + callback (boost::system::errc::make_error_code (boost::system::errc::success), message.to_shared_const_buffer ().size ()); + } + + return true; +} \ No newline at end of file diff --git a/nano/node/transport/test_channel.hpp b/nano/node/transport/test_channel.hpp new file mode 100644 index 0000000000..f1e3b50a4e --- /dev/null +++ b/nano/node/transport/test_channel.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include + +namespace nano::transport +{ +class test_channel final : public nano::transport::channel +{ +public: + nano::observer_set observers; // Called for each queued message + +public: + explicit test_channel (nano::node &); + + nano::endpoint get_remote_endpoint () const override + { + return {}; + } + + nano::endpoint get_local_endpoint () const override + { + return {}; + } + + nano::transport::transport_type get_type () const override + { + return nano::transport::transport_type::loopback; + } + + void close () override + { + // Can't be closed + } + + std::string to_string () const override + { + return "test_channel"; + } + +protected: + bool send_impl (nano::message const &, nano::transport::traffic_type, callback_t) override; +}; +} \ No newline at end of file diff --git a/nano/secure/vote.hpp b/nano/secure/vote.hpp index c4e24f149d..7da9b32780 100644 --- a/nano/secure/vote.hpp +++ b/nano/secure/vote.hpp @@ -26,7 +26,7 @@ class vote final * @returns true if there was an error */ bool deserialize (nano::stream &); - static std::size_t size (uint8_t count); + static std::size_t size (uint8_t count); // TODO: This name is confusing, vote size is number of hashes present, not the message size nano::block_hash hash () const; nano::block_hash full_hash () const; diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index a3400f9b9f..0224b0e588 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -73,10 +73,7 @@ bool nano::test::process (nano::node & node, std::vector nano::test::fake_channel (nano:: return channel; } +std::shared_ptr nano::test::test_channel (nano::node & node, nano::account node_id) +{ + auto channel = std::make_shared (node); + if (!node_id.is_zero ()) + { + channel->set_node_id (node_id); + } + return channel; +} + std::shared_ptr nano::test::start_election (nano::test::system & system_a, nano::node & node_a, const nano::block_hash & hash_a) { system_a.deadline_set (5s); diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 1964af057b..3f5e0ffdbe 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -385,6 +386,10 @@ namespace test * Creates a new fake channel associated with `node` */ std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); + /* + * Creates a new test channel associated with `node` + */ + std::shared_ptr test_channel (nano::node & node, nano::account node_id = { 0 }); /* * Start an election on system system_a, node node_a and hash hash_a by reading the block * out of the ledger and adding it to the manual election scheduler queue.