Skip to content

Commit

Permalink
Bundle ledger transaction and write guard into a single object (nanoc…
Browse files Browse the repository at this point in the history
…urrency#4653)

* Bundle transaction and write guard in a single object

* Add tests
  • Loading branch information
pwojcikdev authored and gr0vity committed Jun 26, 2024
1 parent aff6e7d commit f04781f
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 107 deletions.
91 changes: 89 additions & 2 deletions nano/core_test/ledger.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "nano/lib/numbers.hpp"

#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/active_elections.hpp>
Expand Down Expand Up @@ -5644,3 +5643,91 @@ TEST (ledger_receivable, any_one)
ASSERT_TRUE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), nano::dev::genesis_key.pub));
ASSERT_FALSE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), key.pub));
}

TEST (ledger_transaction, write_refresh)
{
auto ctx = nano::test::context::ledger_empty ();
nano::block_builder builder;
nano::keypair key;
auto send1 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*ctx.pool ().generate (nano::dev::genesis->hash ()))
.build ();
auto send2 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (send1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio)
.link (key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*ctx.pool ().generate (send1->hash ()))
.build ();

auto transaction = ctx.ledger ().tx_begin_write ();
ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send1));
// Force refresh
ASSERT_TRUE (transaction.refresh_if_needed (0ms));
ASSERT_FALSE (transaction.refresh_if_needed ()); // Should not refresh again too soon
// Refreshed transaction should work just fine
ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send2));
}

TEST (ledger_transaction, write_wait_order)
{
nano::test::system system;

auto ctx = nano::test::context::ledger_empty ();

std::atomic<bool> acquired1{ false };
std::atomic<bool> acquired2{ false };
std::atomic<bool> acquired3{ false };

std::latch latch1{ 1 };
std::latch latch2{ 1 };
std::latch latch3{ 1 };

auto fut1 = std::async (std::launch::async, [&] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::generic);
acquired1 = true;
latch1.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

auto fut2 = std::async (std::launch::async, [&ctx, &acquired2, &latch2] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::blockprocessor);
acquired2 = true;
latch2.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

auto fut3 = std::async (std::launch::async, [&ctx, &acquired3, &latch3] {
auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::confirmation_height);
acquired3 = true;
latch3.wait (); // Wait for the signal to drop tx
});
WAIT (250ms); // Allow thread to start

// First transaction should be ready immediately, others should be waiting
ASSERT_TIMELY (5s, acquired1.load ());
ASSERT_NEVER (250ms, acquired2.load ());
ASSERT_NEVER (250ms, acquired3.load ());

// Signal to continue and drop the first transaction
latch1.count_down ();
ASSERT_TIMELY (5s, acquired2.load ());
ASSERT_NEVER (250ms, acquired3.load ());

// Signal to continue and drop the second transaction
latch2.count_down ();
ASSERT_TIMELY (5s, acquired3.load ());

// Signal to continue and drop the third transaction
latch3.count_down ();
}
3 changes: 1 addition & 2 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
processed_batch_t processed;

auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch);
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights });
auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor);
nano::timer<std::chrono::milliseconds> timer_l;

lock_a.lock ();
Expand Down
8 changes: 4 additions & 4 deletions nano/node/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
lock.unlock ();

{
// TODO: Properly limiting batch times requires this <guard, transaction> combo to be wrapped in a single object that provides refresh functionality
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height });
auto transaction = ledger.tx_begin_write ({ nano::tables::confirmation_height }, nano::store::writer::confirmation_height);

for (auto const & hash : batch)
{
auto added = ledger.confirm (tx, hash);
transaction.refresh_if_needed ();

auto added = ledger.confirm (transaction, hash);
if (!added.empty ())
{
// Confirming this block may implicitly confirm more
Expand Down
5 changes: 2 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ void nano::node::process_active (std::shared_ptr<nano::block> const & incoming)

nano::block_status nano::node::process (std::shared_ptr<nano::block> block)
{
auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights });
auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::node);
return process (transaction, block);
}

Expand Down Expand Up @@ -1033,8 +1033,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto scoped_write_guard = store.write_queue.wait (nano::store::writer::pruning);
auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned });
auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned }, nano::store::writer::pruning);
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
auto const & pruning_hash (pruning_targets.front ());
Expand Down
11 changes: 8 additions & 3 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,15 @@ void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)
{
std::deque<candidate_t> verified;

auto verify_batch = [this, &verified] (auto && transaction_variant, auto && batch) {
auto refresh_if_needed = [] (auto && transaction_variant) {
std::visit ([&] (auto && transaction) { transaction.refresh_if_needed (); }, transaction_variant);
};

auto verify_batch = [this, &verified, &refresh_if_needed] (auto && transaction_variant, auto && batch) {
for (auto & [root, hash] : batch)
{
refresh_if_needed (transaction_variant);

if (should_vote (transaction_variant, root, hash))
{
verified.emplace_back (root, hash);
Expand All @@ -117,8 +123,7 @@ void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)

if (is_final)
{
auto guard = ledger.store.write_queue.wait (nano::store::writer::voting_final);
transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }) };
transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }, nano::store::writer::voting_final) };

verify_batch (transaction_variant, batch);

Expand Down
6 changes: 4 additions & 2 deletions nano/secure/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,11 @@ nano::ledger::~ledger ()
{
}

auto nano::ledger::tx_begin_write (std::vector<nano::tables> const & tables_to_lock, std::vector<nano::tables> const & tables_no_lock) const -> secure::write_transaction
auto nano::ledger::tx_begin_write (std::vector<nano::tables> const & tables_to_lock, nano::store::writer guard_type) const -> secure::write_transaction
{
return secure::write_transaction{ store.tx_begin_write (tables_to_lock, tables_no_lock) };
auto guard = store.write_queue.wait (guard_type);
auto txn = store.tx_begin_write (tables_to_lock);
return secure::write_transaction{ std::move (txn), std::move (guard) };
}

auto nano::ledger::tx_begin_read () const -> secure::read_transaction
Expand Down
2 changes: 1 addition & 1 deletion nano/secure/ledger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ledger final
~ledger ();

/** Start read-write transaction */
secure::write_transaction tx_begin_write (std::vector<nano::tables> const & tables_to_lock = {}, std::vector<nano::tables> const & tables_no_lock = {}) const;
secure::write_transaction tx_begin_write (std::vector<nano::tables> const & tables_to_lock = {}, nano::store::writer guard_type = nano::store::writer::generic) const;
/** Start read-only transaction */
secure::read_transaction tx_begin_read () const;

Expand Down
32 changes: 26 additions & 6 deletions nano/secure/transaction.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once

#include <nano/store/transaction.hpp> // Correct include for nano::store transaction classes
#include <nano/store/transaction.hpp>
#include <nano/store/write_queue.hpp>

#include <utility> // For std::move
#include <utility>

namespace nano::secure
{
Expand Down Expand Up @@ -31,11 +32,15 @@ class transaction
class write_transaction : public transaction
{
nano::store::write_transaction txn;
nano::store::write_guard guard;
std::chrono::steady_clock::time_point start;

public:
explicit write_transaction (nano::store::write_transaction && t) noexcept :
txn (std::move (t))
explicit write_transaction (nano::store::write_transaction && txn, nano::store::write_guard && guard) noexcept :
txn{ std::move (txn) },
guard{ std::move (guard) }
{
start = std::chrono::steady_clock::now ();
}

// Override to return a reference to the encapsulated write_transaction
Expand All @@ -47,16 +52,31 @@ class write_transaction : public transaction
void commit ()
{
txn.commit ();
guard.release ();
}

void renew ()
{
guard.renew ();
txn.renew ();
start = std::chrono::steady_clock::now ();
}

void refresh ()
{
txn.refresh ();
commit ();
renew ();
}

bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 })
{
auto now = std::chrono::steady_clock::now ();
if (now - start > max_age)
{
refresh ();
return true;
}
return false;
}

// Conversion operator to const nano::store::transaction&
Expand All @@ -78,7 +98,7 @@ class read_transaction : public transaction

public:
explicit read_transaction (nano::store::read_transaction && t) noexcept :
txn (std::move (t))
txn{ std::move (t) }
{
}

Expand Down
2 changes: 2 additions & 0 deletions nano/store/component.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ namespace store
store::final_vote & final_vote;
store::version & version;

public: // TODO: Shouldn't be public
store::write_queue write_queue;

public:
virtual unsigned max_block_write_batch_num () const = 0;

virtual bool copy_db (std::filesystem::path const & destination) = 0;
Expand Down
Loading

0 comments on commit f04781f

Please sign in to comment.