From 3f7140e888a7e0c7a320e233de16c89ea07065e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 23 Jun 2024 16:14:34 +0200 Subject: [PATCH] Bundle ledger transaction and write guard into a single object (#4653) * Bundle transaction and write guard in a single object * Add tests --- nano/core_test/ledger.cpp | 91 +++++++++++++++++++++++- nano/node/blockprocessor.cpp | 3 +- nano/node/confirming_set.cpp | 8 +-- nano/node/node.cpp | 5 +- nano/node/vote_generator.cpp | 11 ++- nano/secure/ledger.cpp | 6 +- nano/secure/ledger.hpp | 2 +- nano/secure/transaction.hpp | 32 +++++++-- nano/store/component.hpp | 2 + nano/store/write_queue.cpp | 132 +++++++++++++++++------------------ nano/store/write_queue.hpp | 46 ++++++++---- 11 files changed, 231 insertions(+), 107 deletions(-) diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index b42e31cb7d..850ddab934 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -1,7 +1,6 @@ -#include "nano/lib/numbers.hpp" - #include #include +#include #include #include #include @@ -5644,3 +5643,91 @@ TEST (ledger_receivable, any_one) ASSERT_TRUE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), nano::dev::genesis_key.pub)); ASSERT_FALSE (ctx.ledger ().any.receivable_exists (ctx.ledger ().tx_begin_read (), key.pub)); } + +TEST (ledger_transaction, write_refresh) +{ + auto ctx = nano::test::context::ledger_empty (); + nano::block_builder builder; + nano::keypair key; + auto send1 = builder + .state () + .account (nano::dev::genesis_key.pub) + .previous (nano::dev::genesis->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio) + .link (nano::dev::genesis_key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*ctx.pool ().generate (nano::dev::genesis->hash ())) + .build (); + auto send2 = builder + .state () + .account (nano::dev::genesis_key.pub) + .previous (send1->hash ()) + .representative (nano::dev::genesis_key.pub) + .balance (nano::dev::constants.genesis_amount - 2 * nano::Gxrb_ratio) + .link (key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*ctx.pool ().generate (send1->hash ())) + .build (); + + auto transaction = ctx.ledger ().tx_begin_write (); + ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send1)); + // Force refresh + ASSERT_TRUE (transaction.refresh_if_needed (0ms)); + ASSERT_FALSE (transaction.refresh_if_needed ()); // Should not refresh again too soon + // Refreshed transaction should work just fine + ASSERT_EQ (nano::block_status::progress, ctx.ledger ().process (transaction, send2)); +} + +TEST (ledger_transaction, write_wait_order) +{ + nano::test::system system; + + auto ctx = nano::test::context::ledger_empty (); + + std::atomic acquired1{ false }; + std::atomic acquired2{ false }; + std::atomic acquired3{ false }; + + std::latch latch1{ 1 }; + std::latch latch2{ 1 }; + std::latch latch3{ 1 }; + + auto fut1 = std::async (std::launch::async, [&] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::generic); + acquired1 = true; + latch1.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + auto fut2 = std::async (std::launch::async, [&ctx, &acquired2, &latch2] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::blockprocessor); + acquired2 = true; + latch2.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + auto fut3 = std::async (std::launch::async, [&ctx, &acquired3, &latch3] { + auto tx = ctx.ledger ().tx_begin_write ({}, nano::store::writer::confirmation_height); + acquired3 = true; + latch3.wait (); // Wait for the signal to drop tx + }); + WAIT (250ms); // Allow thread to start + + // First transaction should be ready immediately, others should be waiting + ASSERT_TIMELY (5s, acquired1.load ()); + ASSERT_NEVER (250ms, acquired2.load ()); + ASSERT_NEVER (250ms, acquired3.load ()); + + // Signal to continue and drop the first transaction + latch1.count_down (); + ASSERT_TIMELY (5s, acquired2.load ()); + ASSERT_NEVER (250ms, acquired3.load ()); + + // Signal to continue and drop the second transaction + latch2.count_down (); + ASSERT_TIMELY (5s, acquired3.load ()); + + // Signal to continue and drop the third transaction + latch3.count_down (); +} \ No newline at end of file diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 80ecefeb62..aa8e6bc49a 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -295,8 +295,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock { processed_batch_t processed; - auto scoped_write_guard = node.store.write_queue.wait (nano::store::writer::process_batch); - auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }); + auto transaction = node.ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::blockprocessor); nano::timer timer_l; lock_a.lock (); diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index b577e3d5e4..ee72858b52 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -133,13 +133,13 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) lock.unlock (); { - // TODO: Properly limiting batch times requires this combo to be wrapped in a single object that provides refresh functionality - auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height); - auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height }); + auto transaction = ledger.tx_begin_write ({ nano::tables::confirmation_height }, nano::store::writer::confirmation_height); for (auto const & hash : batch) { - auto added = ledger.confirm (tx, hash); + transaction.refresh_if_needed (); + + auto added = ledger.confirm (transaction, hash); if (!added.empty ()) { // Confirming this block may implicitly confirm more diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 33f07fe417..a2521e49cf 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -617,7 +617,7 @@ void nano::node::process_active (std::shared_ptr const & incoming) nano::block_status nano::node::process (std::shared_ptr block) { - auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }); + auto const transaction = ledger.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }, nano::store::writer::node); return process (transaction, block); } @@ -1033,8 +1033,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei transaction_write_count = 0; if (!pruning_targets.empty () && !stopped) { - auto scoped_write_guard = store.write_queue.wait (nano::store::writer::pruning); - auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned }); + auto write_transaction = ledger.tx_begin_write ({ tables::blocks, tables::pruned }, nano::store::writer::pruning); while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped) { auto const & pruning_hash (pruning_targets.front ()); diff --git a/nano/node/vote_generator.cpp b/nano/node/vote_generator.cpp index a0619d7ceb..169f537766 100644 --- a/nano/node/vote_generator.cpp +++ b/nano/node/vote_generator.cpp @@ -105,9 +105,15 @@ void nano::vote_generator::process_batch (std::deque & batch) { std::deque verified; - auto verify_batch = [this, &verified] (auto && transaction_variant, auto && batch) { + auto refresh_if_needed = [] (auto && transaction_variant) { + std::visit ([&] (auto && transaction) { transaction.refresh_if_needed (); }, transaction_variant); + }; + + auto verify_batch = [this, &verified, &refresh_if_needed] (auto && transaction_variant, auto && batch) { for (auto & [root, hash] : batch) { + refresh_if_needed (transaction_variant); + if (should_vote (transaction_variant, root, hash)) { verified.emplace_back (root, hash); @@ -117,8 +123,7 @@ void nano::vote_generator::process_batch (std::deque & batch) if (is_final) { - auto guard = ledger.store.write_queue.wait (nano::store::writer::voting_final); - transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }) }; + transaction_variant_t transaction_variant{ ledger.tx_begin_write ({ tables::final_votes }, nano::store::writer::voting_final) }; verify_batch (transaction_variant, batch); diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 481be0181e..f42f02aac2 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -732,9 +732,11 @@ nano::ledger::~ledger () { } -auto nano::ledger::tx_begin_write (std::vector const & tables_to_lock, std::vector const & tables_no_lock) const -> secure::write_transaction +auto nano::ledger::tx_begin_write (std::vector const & tables_to_lock, nano::store::writer guard_type) const -> secure::write_transaction { - return secure::write_transaction{ store.tx_begin_write (tables_to_lock, tables_no_lock) }; + auto guard = store.write_queue.wait (guard_type); + auto txn = store.tx_begin_write (tables_to_lock); + return secure::write_transaction{ std::move (txn), std::move (guard) }; } auto nano::ledger::tx_begin_read () const -> secure::read_transaction diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index 3089fa3c22..3e13256d29 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -39,7 +39,7 @@ class ledger final ~ledger (); /** Start read-write transaction */ - secure::write_transaction tx_begin_write (std::vector const & tables_to_lock = {}, std::vector const & tables_no_lock = {}) const; + secure::write_transaction tx_begin_write (std::vector const & tables_to_lock = {}, nano::store::writer guard_type = nano::store::writer::generic) const; /** Start read-only transaction */ secure::read_transaction tx_begin_read () const; diff --git a/nano/secure/transaction.hpp b/nano/secure/transaction.hpp index 619fdb37b4..5dd5e36d83 100644 --- a/nano/secure/transaction.hpp +++ b/nano/secure/transaction.hpp @@ -1,8 +1,9 @@ #pragma once -#include // Correct include for nano::store transaction classes +#include +#include -#include // For std::move +#include namespace nano::secure { @@ -31,11 +32,15 @@ class transaction class write_transaction : public transaction { nano::store::write_transaction txn; + nano::store::write_guard guard; + std::chrono::steady_clock::time_point start; public: - explicit write_transaction (nano::store::write_transaction && t) noexcept : - txn (std::move (t)) + explicit write_transaction (nano::store::write_transaction && txn, nano::store::write_guard && guard) noexcept : + txn{ std::move (txn) }, + guard{ std::move (guard) } { + start = std::chrono::steady_clock::now (); } // Override to return a reference to the encapsulated write_transaction @@ -47,16 +52,31 @@ class write_transaction : public transaction void commit () { txn.commit (); + guard.release (); } void renew () { + guard.renew (); txn.renew (); + start = std::chrono::steady_clock::now (); } void refresh () { - txn.refresh (); + commit (); + renew (); + } + + bool refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) + { + auto now = std::chrono::steady_clock::now (); + if (now - start > max_age) + { + refresh (); + return true; + } + return false; } // Conversion operator to const nano::store::transaction& @@ -78,7 +98,7 @@ class read_transaction : public transaction public: explicit read_transaction (nano::store::read_transaction && t) noexcept : - txn (std::move (t)) + txn{ std::move (t) } { } diff --git a/nano/store/component.hpp b/nano/store/component.hpp index 7cdd7d1fff..a64ae1a60f 100644 --- a/nano/store/component.hpp +++ b/nano/store/component.hpp @@ -81,8 +81,10 @@ namespace store store::final_vote & final_vote; store::version & version; + public: // TODO: Shouldn't be public store::write_queue write_queue; + public: virtual unsigned max_block_write_batch_num () const = 0; virtual bool copy_db (std::filesystem::path const & destination) = 0; diff --git a/nano/store/write_queue.cpp b/nano/store/write_queue.cpp index b50809a32c..154a62cee6 100644 --- a/nano/store/write_queue.cpp +++ b/nano/store/write_queue.cpp @@ -4,34 +4,30 @@ #include -nano::store::write_guard::write_guard (std::function guard_finish_callback_a) : - guard_finish_callback (guard_finish_callback_a) -{ -} +/* + * write_guard + */ -nano::store::write_guard::write_guard (write_guard && write_guard_a) noexcept : - guard_finish_callback (std::move (write_guard_a.guard_finish_callback)), - owns (write_guard_a.owns) +nano::store::write_guard::write_guard (write_queue & queue, writer type) : + queue{ queue }, + type{ type } { - write_guard_a.owns = false; - write_guard_a.guard_finish_callback = nullptr; + renew (); } -nano::store::write_guard & nano::store::write_guard::operator= (write_guard && write_guard_a) noexcept +nano::store::write_guard::write_guard (write_guard && other) noexcept : + queue{ other.queue }, + type{ other.type }, + owns{ other.owns } { - owns = write_guard_a.owns; - guard_finish_callback = std::move (write_guard_a.guard_finish_callback); - - write_guard_a.owns = false; - write_guard_a.guard_finish_callback = nullptr; - return *this; + other.owns = false; } nano::store::write_guard::~write_guard () { if (owns) { - guard_finish_callback (); + release (); } } @@ -42,84 +38,82 @@ bool nano::store::write_guard::is_owned () const void nano::store::write_guard::release () { - debug_assert (owns); - if (owns) - { - guard_finish_callback (); - } + release_assert (owns); + queue.release (type); owns = false; } +void nano::store::write_guard::renew () +{ + release_assert (!owns); + queue.acquire (type); + owns = true; +} + +/* + * write_queue + */ + nano::store::write_queue::write_queue (bool use_noops_a) : - guard_finish_callback ([use_noops_a, &queue = queue, &mutex = mutex, &cv = cv] () { - if (!use_noops_a) - { - { - nano::lock_guard guard (mutex); - queue.pop_front (); - } - cv.notify_all (); - } - }), - use_noops (use_noops_a) + use_noops{ use_noops_a } { } nano::store::write_guard nano::store::write_queue::wait (writer writer) { - if (use_noops) - { - return write_guard ([] {}); - } - - nano::unique_lock lk (mutex); - debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) { return item == writer; })); - // Add writer to the end of the queue if it's not already waiting - auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); - if (!exists) - { - queue.push_back (writer); - } - cv.wait (lk, [&] () { return queue.front () == writer; }); - return write_guard (guard_finish_callback); + return write_guard{ *this, writer }; } -bool nano::store::write_queue::contains (writer writer) +bool nano::store::write_queue::contains (writer writer) const { debug_assert (!use_noops); - nano::lock_guard guard (mutex); + nano::lock_guard guard{ mutex }; return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); } -bool nano::store::write_queue::process (writer writer) +void nano::store::write_queue::pop () { - if (use_noops) + nano::lock_guard guard{ mutex }; + if (!queue.empty ()) { - return true; + queue.pop_front (); } + condition.notify_all (); +} - auto result = false; +void nano::store::write_queue::acquire (writer writer) +{ + if (use_noops) { - nano::lock_guard guard (mutex); - // Add writer to the end of the queue if it's not already waiting - auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); - if (!exists) - { - queue.push_back (writer); - } - - result = (queue.front () == writer); + return; // Pass immediately } - if (!result) + nano::unique_lock lock{ mutex }; + + // There should be no duplicates in the queue + debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) { return item == writer; })); + + // Add writer to the end of the queue if it's not already waiting + auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); + if (!exists) { - cv.notify_all (); + queue.push_back (writer); } - return result; + condition.wait (lock, [&] () { return queue.front () == writer; }); } -nano::store::write_guard nano::store::write_queue::pop () +void nano::store::write_queue::release (writer writer) { - return write_guard (guard_finish_callback); -} + if (use_noops) + { + return; // Pass immediately + } + { + nano::lock_guard guard{ mutex }; + release_assert (!queue.empty ()); + release_assert (queue.front () == writer); + queue.pop_front (); + } + condition.notify_all (); +} \ No newline at end of file diff --git a/nano/store/write_queue.hpp b/nano/store/write_queue.hpp index 8875b2db35..e4f1fdc69a 100644 --- a/nano/store/write_queue.hpp +++ b/nano/store/write_queue.hpp @@ -11,28 +11,38 @@ namespace nano::store /** Distinct areas write locking is done, order is irrelevant */ enum class writer { + generic, + node, + blockprocessor, confirmation_height, - process_batch, pruning, voting_final, testing // Used in tests to emulate a write lock }; +class write_queue; + class write_guard final { public: - explicit write_guard (std::function guard_finish_callback_a); - void release (); + explicit write_guard (write_queue & queue, writer type); ~write_guard (); + write_guard (write_guard const &) = delete; write_guard & operator= (write_guard const &) = delete; write_guard (write_guard &&) noexcept; - write_guard & operator= (write_guard &&) noexcept; + write_guard & operator= (write_guard &&) noexcept = delete; + + void release (); + void renew (); + bool is_owned () const; + writer const type; + private: - std::function guard_finish_callback; - bool owns{ true }; + write_queue & queue; + bool owns{ false }; }; /** @@ -41,25 +51,31 @@ class write_guard final */ class write_queue final { + friend class write_guard; + public: - explicit write_queue (bool use_noops_a); + explicit write_queue (bool use_noops); + /** Blocks until we are at the head of the queue and blocks other waiters until write_guard goes out of scope */ [[nodiscard ("write_guard blocks other waiters")]] write_guard wait (writer writer); - /** Returns true if this writer is now at the front of the queue */ - bool process (writer writer); - /** Returns true if this writer is anywhere in the queue. Currently only used in tests */ - bool contains (writer writer); + bool contains (writer writer) const; /** Doesn't actually pop anything until the returned write_guard is out of scope */ - write_guard pop (); + void pop (); private: + void acquire (writer writer); + void release (writer writer); + +private: + bool const use_noops; + std::deque queue; - nano::mutex mutex; - nano::condition_variable cv; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::function guard_finish_callback; - bool use_noops; }; } // namespace nano::store