From a91846913b7a563349eb3febbff59bdbde69a4e2 Mon Sep 17 00:00:00 2001 From: Oleksandr <115580134+oleks-rip@users.noreply.github.com> Date: Tue, 28 May 2024 22:03:51 -0400 Subject: [PATCH] RXI-874 DB operations in thread --- CMakeLists.txt | 2 +- src/xbwd/federator/Federator.cpp | 343 ++++++++++++++++++++----- src/xbwd/federator/Federator.h | 36 ++- src/xbwd/federator/FederatorEvents.cpp | 22 ++ src/xbwd/federator/FederatorEvents.h | 25 ++ 5 files changed, 355 insertions(+), 73 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index fda0513..55e4354 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,7 +90,7 @@ set(HEADERS src/xbwd/core/DatabaseCon.h src/xbwd/core/SociDB.h src/xbwd/federator/Federator.h - src/xbwd/federator/FederatorEvents.cpp + src/xbwd/federator/FederatorEvents.h src/xbwd/federator/TxnSupport.h src/xbwd/rpc/fromJSON.h src/xbwd/rpc/RPCCall.h diff --git a/src/xbwd/federator/Federator.cpp b/src/xbwd/federator/Federator.cpp index 6c5c9a5..25d9411 100644 --- a/src/xbwd/federator/Federator.cpp +++ b/src/xbwd/federator/Federator.cpp @@ -189,8 +189,10 @@ Federator::init( (:ct, :txnId, :lgrSeq); )sql", fmt::arg("table_name", db_init::xChainSyncTable)); + + std::uint32_t ledgerSeq = initSync_[ct].dbLedgerSqn_; *session << sql, soci::use(static_cast(ct)), - soci::use(txnIdHex), soci::use(initSync_[ct].dbLedgerSqn_); + soci::use(txnIdHex), soci::use(ledgerSeq); } JLOG(j_.info()) << "created DB table for initial sync, " << db_init::xChainSyncTable; @@ -210,8 +212,7 @@ Federator::init( for (auto const ct : {ChainType::locking, ChainType::issuing}) { - if (autoSubmit_[ct]) - readDBAttests(ct); + readDBAttests(ct); JLOGV( j_.info(), "Prepare init sync", @@ -345,6 +346,11 @@ Federator::readDBAttests(ChainType ct) continue; } + txnsInProcessing_[ct].insert( + fmt::format("create: {:x}", createCount)); + if (!autoSubmit_[ct]) + continue; + // The attestation will be created by the other chain auto p = SubmissionPtr(new SubmissionCreateAccount( 0, // will be updated when new ledger arrive @@ -452,6 +458,10 @@ Federator::readDBAttests(ChainType ct) continue; } + txnsInProcessing_[ct].insert(fmt::format("claim: {:x}", claimID)); + if (!autoSubmit_[ct]) + continue; + // The attestation will be created by the other chain auto p = SubmissionPtr(new SubmissionClaim( 0, // will be updated when new ledger arrive @@ -515,6 +525,11 @@ Federator::start() beast::setCurrentThreadName("FederatorTxns"); this->txnSubmitLoop(); }); + + threads_[lt_db] = std::thread([this]() { + beast::setCurrentThreadName("FederatorDB"); + this->dbLoop(); + }); } void @@ -560,6 +575,23 @@ Federator::push(FederatorEvent&& e) } } +void +Federator::pushDB(FederatorDBEvent&& e) +{ + auto const lt = lt_db; + bool notify = false; + { + std::lock_guard l{eventsMutex_}; + notify = dbEvents_.empty(); + dbEvents_.push_back(std::move(e)); + } + if (notify) + { + std::lock_guard l(cvMutexes_[lt]); + cvs_[lt].notify_one(); + } +} + // Called from 2 events that require attestations void Federator::initSync( @@ -700,7 +732,7 @@ Federator::tryFinishInitSync(ChainType const ct) jv("chainType", to_string(cht)), jv("account", ripple::toBase58(bridge_.door(cht))), jv("events to replay", rel_size), - jv("attested events", initSync_[cht].attestedTx_.size()), + jv("attested events", initSync_[ocht].attestedTx_.size()), jv("events to delete", del_cnt)); for (auto const& event : repl) @@ -708,7 +740,7 @@ Federator::tryFinishInitSync(ChainType const ct) std::visit([this](auto const& e) { this->onEvent(e); }, event); } repl.clear(); - initSync_[cht].attestedTx_.clear(); + initSync_[ocht].attestedTx_.clear(); } syncFinished_ = true; } @@ -748,39 +780,88 @@ Federator::onEvent(event::XChainCommitDetected const& e) return; } - auto const& tblName = db_init::xChainTableName(ct); auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); + auto const res = + txnsInProcessing_[ct].insert(fmt::format("claim: {:x}", e.claimID_)); + if (!res.second) { - auto session = app_.getXChainTxnDB().checkoutDb(); - auto const sql = fmt::format( - R"sql(SELECT count(*) FROM {table_name} WHERE TransID = "{tx_hex}";)sql", - fmt::arg("table_name", tblName), - fmt::arg("tx_hex", txnIdHex)); + // Already have this transaction + // TODO: Sanity check the claim id and deliveredAmt match + // TODO: Stop historical transaction collection + JLOGV( + j_.error(), + "XChainCommit already present", + jv("event", e.toJson())); + return; // Don't store it again + } - int count = 0; - *session << sql, soci::into(count); - if (session->got_data() && count > 0) + // The attestation will be send from the other chain, so the other chain + // will get the reward + auto const& rewardAccount = chains_[oct].rewardAccount_; + auto const& optDst = e.otherChainDst_; + bool const success = ripple::isTesSuccess(e.status_); + + // non-const so it may be moved from + auto claimOpt = + [&]() -> std::optional { + if (!success) + return std::nullopt; + if (!e.deliveredAmt_) { - // Already have this transaction - // TODO: Sanity check the claim id and deliveredAmt match - // TODO: Stop historical transaction collection JLOGV( - j_.fatal(), - "XChainCommitDetected already present", + j_.error(), + "missing delivered amount in successful xchain transfer", jv("event", e.toJson())); - return; // Don't store it again + return std::nullopt; } + + return ripple::Attestations::AttestationClaim{ + e.bridge_, + signingAccount_ ? *signingAccount_ + : ripple::calcAccountID(signingPK_), + signingPK_, + signingSK_, + e.src_, + *e.deliveredAmt_, + rewardAccount, + ct == ChainType::locking, + e.claimID_, + optDst}; + }(); + + assert(!claimOpt || claimOpt->verify(e.bridge_)); + + pushDB(event::XChainCommitDetected(e)); + + // The attestation will be created by the other chain + if (autoSubmit_[oct] && claimOpt) + { + bool processNow = e.ledgerBoundary_ || !e.rpcOrder_; + pushAtt(e.bridge_, std::move(*claimOpt), oct, processNow); } +} + +void +Federator::onDBEvent(const event::XChainCommitDetected& e) +{ + JLOGV(j_.debug(), "onDBEvent", jv("event", e.toJson())); + + auto const ct = e.chainType_; + + auto const oct = otherChain(ct); + auto const& tblName = db_init::xChainTableName(ct); + + auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); // soci complains about a bool int const success = ripple::isTesSuccess(e.status_) ? 1 : 0; + // The attestation will be send from the other chain, so the other chain // will get the reward auto const& rewardAccount = chains_[oct].rewardAccount_; auto const& optDst = e.otherChainDst_; - // non-const so it may be moved from - auto claimOpt = + auto const claimOpt = [&]() -> std::optional { if (!success) return std::nullopt; @@ -807,8 +888,6 @@ Federator::onEvent(event::XChainCommitDetected const& e) optDst}; }(); - assert(!claimOpt || claimOpt->verify(e.bridge_)); - { auto session = app_.getXChainTxnDB().checkoutDb(); @@ -877,13 +956,6 @@ Federator::onEvent(event::XChainCommitDetected const& e) auto const chainType = static_cast(ct); *session << sql, soci::use(txnIdHex), soci::use(chainType); } - - // The attestation will be created by the other chain - if (autoSubmit_[oct] && claimOpt) - { - bool processNow = e.ledgerBoundary_ || !e.rpcOrder_; - pushAtt(e.bridge_, std::move(*claimOpt), oct, processNow); - } } void @@ -917,24 +989,19 @@ Federator::onEvent(event::XChainAccountCreateCommitDetected const& e) return; } - auto const& tblName = db_init::xChainCreateAccountTableName(ct); auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); + auto const res = txnsInProcessing_[ct].insert( + fmt::format("create: {:x}", e.createCount_)); + if (!res.second) { - auto session = app_.getXChainTxnDB().checkoutDb(); - auto const sql = fmt::format( - R"sql(SELECT count(*) FROM {table_name} WHERE TransID = "{tx_hex}";)sql", - fmt::arg("table_name", tblName), - fmt::arg("tx_hex", txnIdHex)); - - int count = 0; - *session << sql, soci::into(count); - if (session->got_data() && count > 0) - { - // Already have this transaction - // TODO: Sanity check the claim id and deliveredAmt match - // TODO: Stop historical transaction collection - return; // Don't store it again - } + // Already have this transaction + // TODO: Sanity check the claim id and deliveredAmt match + // TODO: Stop historical transaction collection + JLOGV( + j_.error(), + "XChainAccountCreateCommit already present", + jv("event", e.toJson())); + return; // Don't store it again } // soci complains about a bool @@ -975,6 +1042,60 @@ Federator::onEvent(event::XChainAccountCreateCommitDetected const& e) assert(!createOpt || createOpt->verify(e.bridge_)); + pushDB(event::XChainAccountCreateCommitDetected(e)); + + // The attestation will be created by the other chain + if (autoSubmit_[oct] && createOpt) + { + bool processNow = e.ledgerBoundary_ || !e.rpcOrder_; + pushAtt(e.bridge_, std::move(*createOpt), oct, processNow); + } +} + +void +Federator::onDBEvent(event::XChainAccountCreateCommitDetected const& e) +{ + JLOGV(j_.debug(), "onDBEvent", jv("event", e.toJson())); + + auto const ct = e.chainType_; + auto const oct = otherChain(ct); + auto const& tblName = db_init::xChainCreateAccountTableName(ct); + + auto const txnIdHex = ripple::strHex(e.txnHash_.begin(), e.txnHash_.end()); + + // soci complains about a bool + int const success = ripple::isTesSuccess(e.status_) ? 1 : 0; + auto const& rewardAccount = chains_[oct].rewardAccount_; + auto const& dst = e.otherChainDst_; + + auto const createOpt = + [&]() -> std::optional { + if (!success) + return std::nullopt; + if (!e.deliveredAmt_) + { + JLOGV( + j_.error(), + "missing delivered amount in successful xchain create transfer", + jv("event", e.toJson())); + return std::nullopt; + } + + return ripple::Attestations::AttestationCreateAccount{ + e.bridge_, + signingAccount_ ? *signingAccount_ + : ripple::calcAccountID(signingPK_), + signingPK_, + signingSK_, + e.src_, + *e.deliveredAmt_, + e.rewardAmt_, + rewardAccount, + ct == ChainType::locking, + e.createCount_, + dst}; + }(); + { auto session = app_.getXChainTxnDB().checkoutDb(); @@ -1045,13 +1166,6 @@ Federator::onEvent(event::XChainAccountCreateCommitDetected const& e) auto const chainType = static_cast(ct); *session << sql, soci::use(txnIdHex), soci::use(chainType); } - - // The attestation will be created by the other chain - if (autoSubmit_[oct] && createOpt) - { - bool processNow = e.ledgerBoundary_ || !e.rpcOrder_; - pushAtt(e.bridge_, std::move(*createOpt), oct, processNow); - } } void @@ -1079,6 +1193,13 @@ Federator::onEvent(event::EndOfHistory const& e) } } +void +Federator::onDBEvent(event::DBDelete const& e) +{ + JLOGV(j_.debug(), "onDBEvent", jv("event", e.toJson())); + deleteFromDB(e.chainType_, e.id_, e.isCreateAccount); +} + #ifdef USE_BATCH_ATTESTATION std::pair @@ -1205,10 +1326,38 @@ Federator::onEvent(event::XChainAttestsResult const& e) for (auto& sub : subToDelete) { - // Deleting events from the opposite side of the attestations + std::uint32_t cnt = 0; + // Erasing events from the opposite side of the attestations + if ((e.type_ == xbwd::XChainTxnType::xChainAddClaimAttestation) && + e.claimID_) + { + cnt = txnsInProcessing_[oct].erase( + fmt::format("claim: {:x}", *e.claimID_)); + } + else if ( + (e.type_ == + xbwd::XChainTxnType::xChainAddAccountCreateAttestation) && + e.createCount_) + { + cnt = txnsInProcessing_[oct].erase( + fmt::format("create: {:x}", *e.createCount_)); + } + else + { + JLOGV( + j_.error(), + "XChainAttestsResult flow error, unknown attestation", + jv("event", e.toJson())); + } + + // Deleting records from the opposite side of the attestations auto const attestedIDs = sub->forAttestIDs( - [&](std::uint64_t id) { deleteFromDB(oct, id, false); }, - [&](std::uint64_t id) { deleteFromDB(oct, id, true); }); + [&](std::uint64_t id) { + pushDB(event::DBDelete{oct, false, id}); + }, + [&](std::uint64_t id) { + pushDB(event::DBDelete{oct, true, id}); + }); JLOGV( j_.trace(), "XChainAttestsResult processed", @@ -1216,7 +1365,8 @@ Federator::onEvent(event::XChainAttestsResult const& e) jv("accountSqn", e.accountSqn_), jv("result", e.ter_), jv("commitAttests", attestedIDs.first), - jv("createAttests", attestedIDs.second)); + jv("createAttests", attestedIDs.second), + jv("cacheRemoved", cnt)); } if (e.isHistory_) @@ -1283,7 +1433,7 @@ Federator::onEvent(event::NewLedger const& e) auto const x = std::min(std::min(submitLedgerIndex, doorLedgerIndex), e.ledgerIndex_); auto const minLedger = x ? x - 1 : 0; - saveProcessedLedger(ct, minLedger); + pushDB(event::DBUpdateLedger{ct, minLedger}); checkExpired(ct, minLedger); } @@ -1390,8 +1540,13 @@ Federator::checkProcessedLedger(ChainType ct) } void -Federator::saveProcessedLedger(ChainType ct, std::uint32_t ledger) +Federator::onDBEvent(event::DBUpdateLedger const& e) { + JLOGV(j_.debug(), "onDBEvent", jv("event", e.toJson())); + + auto const ledger = e.ledger_; + auto const ct = e.chainType_; + if (ledger > initSync_[ct].dbLedgerSqn_) { auto session = app_.getXChainTxnDB().checkoutDb(); @@ -1819,12 +1974,23 @@ Federator::txnSubmitLoop() if (ad.isMember(ripple::jss::Sequence) && ad[ripple::jss::Sequence].isIntegral()) { - std::lock_guard aiLock{accountInfoMutex}; - assert(waitingAccountInfo[ct] && accountInfoSqns[ct] == 0); - accountInfoSqns[ct] = ad[ripple::jss::Sequence].asUInt(); - waitingAccountInfo[ct] = false; - JLOG(j_.trace()) - << "got account sqn " << accountInfoSqns[ct]; + { + std::lock_guard aiLock{accountInfoMutex}; + assert( + waitingAccountInfo[ct] && accountInfoSqns[ct] == 0); + accountInfoSqns[ct] = + ad[ripple::jss::Sequence].asUInt(); + waitingAccountInfo[ct] = false; + + JLOG(j_.trace()) + << "got account sqn " << accountInfoSqns[ct]; + } + + { + // advance the loop + std::lock_guard l(cvMutexes_[lt]); + cvs_[lt].notify_one(); + } } } }; @@ -1944,6 +2110,51 @@ Federator::txnSubmitLoop() } } +void +Federator::dbLoop() +{ + auto const lt = lt_db; + { + std::unique_lock l{loopMutexes_[lt]}; + loopCvs_[lt].wait(l, [this, lt] { return !loopLocked_[lt]; }); + } + + std::vector localEvents; + localEvents.reserve(16); + while (!requestStop_) + { + { + std::lock_guard l{eventsMutex_}; + assert(localEvents.empty()); + localEvents.swap(dbEvents_); + } + if (localEvents.empty()) + { + using namespace std::chrono_literals; + std::unique_lock l{cvMutexes_[lt]}; + cvs_[lt].wait_for(l, 100ms); + continue; + } + + auto const start = + duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count(); + for (auto const& event : localEvents) + std::visit([this](auto&& e) { this->onDBEvent(e); }, event); + auto const finish = + duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count(); + JLOGV( + j_.debug(), + "DB events processed", + jv("size", localEvents.size()), + jv("time(ms)", finish - start)); + localEvents.clear(); + } +} + Json::Value Federator::getInfo() const { diff --git a/src/xbwd/federator/Federator.h b/src/xbwd/federator/Federator.h index 89f0a3f..58e8066 100644 --- a/src/xbwd/federator/Federator.h +++ b/src/xbwd/federator/Federator.h @@ -295,6 +295,7 @@ class Federator lt_event_locking, lt_event_issuing, lt_txnSubmit, + lt_db, lt_last }; std::array threads_; @@ -325,11 +326,19 @@ class Federator mutable std::mutex eventsMutex_; ChainArray> GUARDED_BY(eventsMutex_) events_; + // Not divided by sidechains because attestation events need to be + // processed after commit events(to delete them in the DB). + std::vector GUARDED_BY(eventsMutex_) dbEvents_; + mutable std::mutex txnsMutex_; ChainArray> GUARDED_BY(txnsMutex_) txns_; ChainArray> GUARDED_BY(txnsMutex_) submitted_; ChainArray> GUARDED_BY(txnsMutex_) errored_; + // Cache of the events added to processing. It is added so as not to read + // the DB. No need for mutex as event processing is synchronized. + ChainArray> txnsInProcessing_; + // "Window" size for sending attestations // 0 - no "window" std::uint32_t const maxAttToSend_ = 0; @@ -377,7 +386,7 @@ class Federator ripple::uint256 dbTxnHash_; // The latest ledger that was fully processed in the previous session. - std::uint32_t dbLedgerSqn_{0u}; + std::atomic_uint32_t dbLedgerSqn_{0u}; // Request to stop processing history bool historyDone_{false}; @@ -475,17 +484,26 @@ class Federator ripple::Logs& l); void - mainLoop(ChainType ct) EXCLUDES(mainLoopMutex_); + mainLoop(ChainType ct); + + void + txnSubmitLoop(); void - txnSubmitLoop() EXCLUDES(txnSubmitLoopMutex_); + dbLoop(); void onEvent(event::XChainCommitDetected const& e); + void + onDBEvent(event::XChainCommitDetected const& e); + void onEvent(event::XChainAccountCreateCommitDetected const& e); + void + onDBEvent(event::XChainAccountCreateCommitDetected const& e); + void onEvent(event::XChainTransferResult const& e); @@ -513,6 +531,12 @@ class Federator void onEvent(event::EndOfHistory const& e); + void + onDBEvent(event::DBDelete const& e); + + void + onDBEvent(event::DBUpdateLedger const& e); + void initSync( ChainType const ct, @@ -550,6 +574,9 @@ class Federator void submitTxn(SubmissionPtr&& submission, ChainType dstChain); + void + pushDB(FederatorDBEvent&& e) EXCLUDES(eventsMutex_); + void deleteFromDB( ChainType ct, @@ -575,9 +602,6 @@ class Federator void checkProcessedLedger(ChainType ct); - - void - saveProcessedLedger(ChainType ct, std::uint32_t ledger); }; std::unique_ptr diff --git a/src/xbwd/federator/FederatorEvents.cpp b/src/xbwd/federator/FederatorEvents.cpp index 7e08f37..d14ffb6 100644 --- a/src/xbwd/federator/FederatorEvents.cpp +++ b/src/xbwd/federator/FederatorEvents.cpp @@ -198,6 +198,28 @@ XChainAccountSet::toJson() const return result; } +Json::Value +DBDelete::toJson() const +{ + Json::Value result{Json::objectValue}; + result["chainType"] = to_string(chainType_); + result["eventType"] = "DBDelete"; + result[isCreateAccount ? "create" : "claim"] = fmt::format("{:x}", id_); + + return result; +} + +Json::Value +DBUpdateLedger::toJson() const +{ + Json::Value result{Json::objectValue}; + result["chainType"] = to_string(chainType_); + result["eventType"] = "DBUpdateLedger"; + result["ledger"] = ledger_; + + return result; +} + } // namespace event Json::Value diff --git a/src/xbwd/federator/FederatorEvents.h b/src/xbwd/federator/FederatorEvents.h index f6f7e8a..8d36a78 100644 --- a/src/xbwd/federator/FederatorEvents.h +++ b/src/xbwd/federator/FederatorEvents.h @@ -169,6 +169,25 @@ struct XChainAccountSet toJson() const; }; +struct DBDelete +{ + ChainType chainType_ = ChainType::locking; + bool isCreateAccount = false; + std::uint64_t id_ = 0; + + Json::Value + toJson() const; +}; + +struct DBUpdateLedger +{ + ChainType chainType_ = ChainType::locking; + std::uint32_t ledger_ = 0; + + Json::Value + toJson() const; +}; + } // namespace event using FederatorEvent = std::variant< @@ -186,4 +205,10 @@ using FederatorEvent = std::variant< Json::Value toJson(FederatorEvent const& event); +using FederatorDBEvent = std::variant< + event::XChainCommitDetected, + event::XChainAccountCreateCommitDetected, + event::DBDelete, + event::DBUpdateLedger>; + } // namespace xbwd