Skip to content

Commit

Permalink
Merge pull request #4823 from pwojcikdev/request-aggregator-tests
Browse files Browse the repository at this point in the history
Request aggregator tests
  • Loading branch information
pwojcikdev authored Jan 15, 2025
2 parents 0891762 + 4ba325b commit 7115a27
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 15 deletions.
283 changes: 283 additions & 0 deletions nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/function.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/confirming_set.hpp>
Expand All @@ -9,12 +10,17 @@
#include <nano/node/transport/inproc.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/secure/vote.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <future>

using namespace std::chrono_literals;

TEST (request_aggregator, one)
Expand Down Expand Up @@ -450,3 +456,280 @@ TEST (request_aggregator, cannot_vote)
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 <= node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}

namespace
{
std::future<nano::confirm_ack> observe_confirm_ack (std::shared_ptr<nano::transport::test_channel> const & channel)
{
std::promise<nano::confirm_ack> promise;
auto future = promise.get_future ();

struct confirm_ack_visitor : public nano::message_visitor
{
std::optional<nano::confirm_ack> result;

void confirm_ack (nano::confirm_ack const & msg) override
{
result = msg;
}
};

channel->observers.clear ();
channel->observers.add (nano::wrap_move_only ([&, promise = std::move (promise)] (nano::message const & message, nano::transport::traffic_type const & type) mutable {
confirm_ack_visitor visitor{};
message.visit (visitor);
if (visitor.result)
{
promise.set_value (visitor.result.value ());
}
}));

return future;
}
}

/*
* Request for a forked open block should return vote for the correct fork alternative
*/
TEST (request_aggregator, forked_open)
{
nano::test::system system;
auto & node = *system.add_node ();

// Voting needs a rep key set up on the node
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);

// Setup two forks of the open block
nano::keypair key;
nano::block_builder builder;
auto send0 = builder.send ()
.previous (nano::dev::genesis->hash ())
.destination (key.pub)
.balance (nano::dev::constants.genesis_amount - 500)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
auto open0 = builder.open ()
.source (send0->hash ())
.representative (1)
.account (key.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build ();
auto open1 = builder.open ()
.source (send0->hash ())
.representative (2)
.account (key.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build ();

nano::test::process (node, { send0, open0 });
nano::test::confirm (node, { open0 });

auto channel = nano::test::test_channel (node);
auto future = observe_confirm_ack (channel);

// Request vote for the wrong fork
std::vector<std::pair<nano::block_hash, nano::root>> request{ { open1->hash (), open1->root () } };
ASSERT_TRUE (node.aggregator.request (request, channel));

ASSERT_EQ (future.wait_for (5s), std::future_status::ready);

auto ack = future.get ();
ASSERT_EQ (ack.vote->hashes.size (), 1);
ASSERT_EQ (ack.vote->hashes[0], open0->hash ()); // Vote for the correct fork alternative
ASSERT_EQ (ack.vote->account, nano::dev::genesis_key.pub);
}

/*
* Request for a conflicting epoch block should return vote for the correct alternative
*/
TEST (request_aggregator, epoch_conflict)
{
nano::test::system system;
auto & node = *system.add_node ();

// Voting needs a rep key set up on the node
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);

// Setup the initial chain and the conflicting blocks
nano::keypair key;
nano::keypair epoch_signer (nano::dev::genesis_key);
nano::state_block_builder builder;

// Create initial chain: send -> open -> change
auto send = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 1)
.link (key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();

auto open = builder.make_block ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build ();

// Change block root is the open block hash, qualified root: {open, open}
auto change = builder.make_block ()
.account (key.pub)
.previous (open->hash ())
.representative (key.pub)
.balance (1)
.link (0)
.sign (key.prv, key.pub)
.work (*system.work.generate (open->hash ()))
.build ();

// Pending entry is needed first to process the epoch open block
auto pending = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2)
.link (change->root ().as_account ())
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (send->hash ()))
.build ();

// Create conflicting epoch block with the same root as the change block, qualified root: {open, 0}
// This block is intentionally not processed immediately so the node doesn't know about it
auto epoch_open = builder.make_block ()
.account (change->root ().as_account ())
.previous (0)
.representative (0)
.balance (0)
.link (node.ledger.epoch_link (nano::epoch::epoch_1))
.sign (epoch_signer.prv, epoch_signer.pub)
.work (*system.work.generate (open->hash ()))
.build ();

// Process and confirm the initial chain with the change block
nano::test::process (node, { send, open, change });
nano::test::confirm (node, { change });
ASSERT_TIMELY (5s, node.block_confirmed (change->hash ()));

auto channel = nano::test::test_channel (node);

// Request vote for the conflicting epoch block
std::vector<std::pair<nano::block_hash, nano::root>> request{ { epoch_open->hash (), epoch_open->root () } };
auto future1 = observe_confirm_ack (channel);
ASSERT_TRUE (node.aggregator.request (request, channel));

ASSERT_EQ (future1.wait_for (5s), std::future_status::ready);

auto ack1 = future1.get ();
ASSERT_EQ (ack1.vote->hashes.size (), 1);
ASSERT_EQ (ack1.vote->hashes[0], change->hash ()); // Vote for the correct alternative (change block)
ASSERT_EQ (ack1.vote->account, nano::dev::genesis_key.pub);

// Process the conflicting epoch block
nano::test::process (node, { pending, epoch_open });
nano::test::confirm (node, { pending, epoch_open });

// Workaround for vote spacing dropping requests with the same root
// FIXME: Vote spacing should use full qualified root
WAIT (1s);

// Request vote for the conflicting epoch block again
auto future2 = observe_confirm_ack (channel);
ASSERT_TRUE (node.aggregator.request (request, channel));

ASSERT_EQ (future2.wait_for (5s), std::future_status::ready);

auto ack2 = future2.get ();
ASSERT_EQ (ack2.vote->hashes.size (), 1);
ASSERT_EQ (ack2.vote->hashes[0], epoch_open->hash ()); // Vote for the epoch block
ASSERT_EQ (ack2.vote->account, nano::dev::genesis_key.pub);
}

/*
* Request for multiple cemented blocks in a chain should generate votes regardless of vote spacing
*/
TEST (request_aggregator, cemented_no_spacing)
{
nano::test::system system;
auto & node = *system.add_node ();

// Voting needs a rep key set up on the node
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);

// Create a chain of 3 blocks: send1 -> send2 -> send3
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 1)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();

auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (send1->hash ()))
.build ();

auto send3 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send2->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 3)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (send2->hash ()))
.build ();

// Process and confirm all blocks in the chain
nano::test::process (node, { send1, send2, send3 });
nano::test::confirm (node, { send1, send2, send3 });
ASSERT_TRUE (node.block_confirmed (send3->hash ()));

auto channel = nano::test::test_channel (node);

// Request votes for blocks at different positions in the chain
std::vector<std::pair<nano::block_hash, nano::root>> request{
{ send1->hash (), send1->root () },
{ send2->hash (), send2->root () },
{ send3->hash (), send3->root () }
};

// Request votes for all blocks
auto future = observe_confirm_ack (channel);
ASSERT_TRUE (node.aggregator.request (request, channel));

// Wait for the votes
ASSERT_EQ (future.wait_for (5s), std::future_status::ready);
auto ack = future.get ();

// Verify we got votes for all blocks in the chain
ASSERT_EQ (ack.vote->hashes.size (), 3);
ASSERT_EQ (ack.vote->account, nano::dev::genesis_key.pub);

// Verify individual vote properties
std::set<nano::block_hash> voted_hashes;
for (auto const & hash : ack.vote->hashes)
{
voted_hashes.insert (hash);
}

// Verify we got votes for all three blocks
ASSERT_TRUE (voted_hashes.find (send1->hash ()) != voted_hashes.end ());
ASSERT_TRUE (voted_hashes.find (send2->hash ()) != voted_hashes.end ());
ASSERT_TRUE (voted_hashes.find (send3->hash ()) != voted_hashes.end ());
}
19 changes: 19 additions & 0 deletions nano/lib/function.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <functional>
#include <memory>
#include <utility>

namespace nano
{
// TODO: Replace with std::move_only_function in C++23
template <typename F>
auto wrap_move_only (F && f)
{
using fn_type = decltype (std::function{ std::declval<F> () });
auto ptr = std::make_shared<std::decay_t<F>> (std::forward<F> (f));
return fn_type ([ptr] (auto &&... args) {
return (*ptr) (std::forward<decltype (args)> (args)...);
});
}
}
6 changes: 6 additions & 0 deletions nano/lib/observer_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class observer_set final
return observers.size ();
}

void clear ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
observers.clear ();
}

nano::container_info container_info () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ add_library(
transport/tcp_server.cpp
transport/tcp_socket.hpp
transport/tcp_socket.cpp
transport/test_channel.hpp
transport/test_channel.cpp
transport/traffic_type.hpp
transport/traffic_type.cpp
transport/transport.hpp
Expand Down
3 changes: 1 addition & 2 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ nano::transport::channel::channel (nano::node & node_a) :

bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback)
{
auto buffer = message.to_shared_const_buffer ();
bool sent = send_buffer (buffer, traffic_type, std::move (callback));
bool sent = send_impl (message, traffic_type, std::move (callback));
node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
return sent;
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class channel
std::shared_ptr<nano::node> owner () const;

protected:
virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0;
virtual bool send_impl (nano::message const &, nano::transport::traffic_type, callback_t) = 0;

protected:
nano::node & node;
Expand Down
3 changes: 2 additions & 1 deletion nano/node/transport/fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ nano::transport::fake::channel::channel (nano::node & node) :
/**
* The send function behaves like a null device, it throws the data away and returns success.
*/
bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
bool nano::transport::fake::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
{
auto buffer = message.to_shared_const_buffer ();
auto size = buffer.size ();
if (callback)
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace transport
}

protected:
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;

private:
nano::endpoint endpoint;
Expand Down
4 changes: 3 additions & 1 deletion nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti
* Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
*/
bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
bool nano::transport::inproc::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
{
auto buffer = message.to_shared_const_buffer ();

std::size_t offset{ 0 };
auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (buffer_v.size () >= (offset + size_a));
Expand Down
Loading

0 comments on commit 7115a27

Please sign in to comment.