From 4213e82a4f6d5054a441041c0b9dc2540b5ea917 Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 9 Jul 2024 10:15:47 -0400 Subject: [PATCH] change(rpc): Adds a TrustedChainSync struct for keeping up with Zebra's non-finalized best chain from a separate process (#8596) * Adds an init_read_only() fn in zebra-state * moves elasticsearch initialization to `FinalizedState::new_with_debug()` * Updates callers of `FinalizedState::{new, new_with_debug}` to pass a bool to try enabling elasticsearch * Adds a non-finalized read state syncer to zebra-rpc * moves, removes, updates, or addresses TODOs * reduces disk IO while waiting for the a new chain tip & updates the chain tip sender when the finalized tip has changed. * Returns boxed errors from RpcRequestClient methods instead of color_eyre type * Avoids resetting the non-finalized state when there's an error getting a block unless it has the missing block error code. * Adds stub for acceptance test(s) and removes outdated TODO * adds TODOs for testing * Tests that `ChainTipChange` is updated when the non-finalized best chain grows * adds a last_chain_tip_hash and uses a FuturesOrdered for getblock requests * Fixes a pre-flush sync issue by using a secondary db instead of a read-only db * Moves disk IO to blocking tasks * Updates acceptance test to how forks are handled * Checks synced read state for all of the expected blocks * checks that there isn't a tip change until the best chain changes * checks for chain tip changes in test * run test without feature * fixes lint * Fixes compilation/test issues * Adds docs / comments, moves HexData out from behind the getblocktemplate-rpcs feature flag, moves test behind the mining feature flag. * Fixes lints * removes syncer and rpc-syncer features * Fixes test on Windows, applies suggestions from code review * Updates `POLL_DELAY` documentation * Updates method docs * Fixes a test bug * use rpc-client feature in zebrad production code * use rpc-client feature in zebra-node-services for building zebra-rpc crate --------- Co-authored-by: Pili Guerra Co-authored-by: Alfredo Garcia --- zebra-node-services/src/rpc_client.rs | 10 +- zebra-rpc/Cargo.toml | 3 +- zebra-rpc/src/lib.rs | 1 + zebra-rpc/src/methods.rs | 36 +- .../src/methods/get_block_template_rpcs.rs | 5 +- .../methods/get_block_template_rpcs/types.rs | 1 - .../types/get_block_template/parameters.rs | 2 +- .../types/submit_block.rs | 4 +- .../types => }/hex_data.rs | 0 .../tests/snapshot/get_block_template_rpcs.rs | 2 +- zebra-rpc/src/methods/tests/vectors.rs | 5 +- zebra-rpc/src/sync.rs | 392 ++++++++++++++++++ zebra-state/src/lib.rs | 10 +- .../src/service/finalized_state/disk_db.rs | 23 +- .../src/service/finalized_state/zebra_db.rs | 5 + zebrad/Cargo.toml | 4 +- zebrad/src/components/miner.rs | 10 +- zebrad/tests/acceptance.rs | 279 +++++++++++++ .../get_block_template_rpcs/get_peer_info.rs | 5 +- zebrad/tests/common/regtest.rs | 128 ++++-- 20 files changed, 852 insertions(+), 73 deletions(-) rename zebra-rpc/src/methods/{get_block_template_rpcs/types => }/hex_data.rs (100%) create mode 100644 zebra-rpc/src/sync.rs diff --git a/zebra-node-services/src/rpc_client.rs b/zebra-node-services/src/rpc_client.rs index 350b373aa72..7f5ffbf192e 100644 --- a/zebra-node-services/src/rpc_client.rs +++ b/zebra-node-services/src/rpc_client.rs @@ -1,12 +1,12 @@ //! A client for calling Zebra's JSON-RPC methods. //! -//! Only used in tests and tools. +//! Used in the rpc sync scanning functionality and in various tests and tools. use std::net::SocketAddr; use reqwest::Client; -use color_eyre::{eyre::eyre, Result}; +use crate::BoxError; /// An HTTP client for making JSON-RPC requests. #[derive(Clone, Debug)] @@ -99,7 +99,7 @@ impl RpcRequestClient { &self, method: impl AsRef, params: impl AsRef, - ) -> Result { + ) -> std::result::Result { Self::json_result_from_response_text(&self.text_from_call(method, params).await?) } @@ -107,13 +107,13 @@ impl RpcRequestClient { /// Returns `Ok` with a deserialized `result` value in the expected type, or an error report. fn json_result_from_response_text( response_text: &str, - ) -> Result { + ) -> std::result::Result { use jsonrpc_core::Output; let output: Output = serde_json::from_str(response_text)?; match output { Output::Success(success) => Ok(serde_json::from_value(success.result)?), - Output::Failure(failure) => Err(eyre!("RPC call failed with: {failure:?}")), + Output::Failure(failure) => Err(failure.error.into()), } } } diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index edaa1550cc6..a97f879ef13 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -15,7 +15,6 @@ keywords = ["zebra", "zcash"] categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"] [features] -default = [] # Production features that activate extra dependencies, or extra features in dependencies @@ -77,7 +76,7 @@ proptest = { version = "1.4.0", optional = true } zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] } zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" } zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" } -zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38" } +zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] } zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" } diff --git a/zebra-rpc/src/lib.rs b/zebra-rpc/src/lib.rs index 81a30d78e0c..d5b687913fd 100644 --- a/zebra-rpc/src/lib.rs +++ b/zebra-rpc/src/lib.rs @@ -9,6 +9,7 @@ pub mod constants; pub mod methods; pub mod queue; pub mod server; +pub mod sync; #[cfg(test)] mod tests; diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index bf4abe2a08e..77041b8a021 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -38,6 +38,7 @@ use crate::{ }; mod errors; +pub mod hex_data; use errors::{MapServerError, OkOrServerError}; @@ -171,6 +172,14 @@ pub trait Rpc { #[rpc(name = "getbestblockhash")] fn get_best_block_hash(&self) -> Result; + /// Returns the height and hash of the current best blockchain tip block, as a [`GetBlockHeightAndHash`] JSON struct. + /// + /// zcashd reference: none + /// method: post + /// tags: blockchain + #[rpc(name = "getbestblockheightandhash")] + fn get_best_block_height_and_hash(&self) -> Result; + /// Returns all transaction ids in the memory pool, as a JSON array. /// /// zcashd reference: [`getrawmempool`](https://zcash.github.io/rpc/getrawmempool.html) @@ -867,7 +876,6 @@ where .boxed() } - // TODO: use a generic error constructor (#5548) fn get_best_block_hash(&self) -> Result { self.latest_chain_tip .best_tip_hash() @@ -875,7 +883,13 @@ where .ok_or_server_error("No blocks in state") } - // TODO: use a generic error constructor (#5548) + fn get_best_block_height_and_hash(&self) -> Result { + self.latest_chain_tip + .best_tip_height_and_hash() + .map(|(height, hash)| GetBlockHeightAndHash { height, hash }) + .ok_or_server_error("No blocks in state") + } + fn get_raw_mempool(&self) -> BoxFuture>> { #[cfg(feature = "getblocktemplate-rpcs")] use zebra_chain::block::MAX_BLOCK_BYTES; @@ -1547,6 +1561,24 @@ impl Default for GetBlock { #[serde(transparent)] pub struct GetBlockHash(#[serde(with = "hex")] pub block::Hash); +/// Response to a `getbestblockheightandhash` RPC request. +#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] +pub struct GetBlockHeightAndHash { + /// The best chain tip block height + pub height: block::Height, + /// The best chain tip block hash + pub hash: block::Hash, +} + +impl Default for GetBlockHeightAndHash { + fn default() -> Self { + Self { + height: block::Height::MIN, + hash: block::Hash([0; 32]), + } + } +} + impl Default for GetBlockHash { fn default() -> Self { GetBlockHash(block::Hash([0; 32])) diff --git a/zebra-rpc/src/methods/get_block_template_rpcs.rs b/zebra-rpc/src/methods/get_block_template_rpcs.rs index 77267d006f0..a1c368ac4e0 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs.rs @@ -46,7 +46,6 @@ use crate::methods::{ types::{ get_block_template::GetBlockTemplate, get_mining_info, - hex_data::HexData, long_poll::LongPollInput, peer_info::PeerInfo, submit_block, @@ -54,7 +53,9 @@ use crate::methods::{ unified_address, validate_address, z_validate_address, }, }, - height_from_signed_int, GetBlockHash, MISSING_BLOCK_ERROR_CODE, + height_from_signed_int, + hex_data::HexData, + GetBlockHash, MISSING_BLOCK_ERROR_CODE, }; pub mod constants; diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types.rs index fc3b94cee81..a2f5ccc265e 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types.rs @@ -3,7 +3,6 @@ pub mod default_roots; pub mod get_block_template; pub mod get_mining_info; -pub mod hex_data; pub mod long_poll; pub mod peer_info; pub mod submit_block; diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template/parameters.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template/parameters.rs index 9329bb65c6f..73e1ed820ba 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template/parameters.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template/parameters.rs @@ -1,6 +1,6 @@ //! Parameter types for the `getblocktemplate` RPC. -use crate::methods::get_block_template_rpcs::types::{hex_data::HexData, long_poll::LongPollId}; +use crate::methods::{get_block_template_rpcs::types::long_poll::LongPollId, hex_data::HexData}; /// Defines whether the RPC method should generate a block template or attempt to validate a block proposal. #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs index fb12b54cefc..7674f3d5657 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/submit_block.rs @@ -29,7 +29,7 @@ pub struct JsonParameters { /// Response to a `submitblock` RPC request. /// /// Zebra never returns "duplicate-invalid", because it does not store invalid blocks. -#[derive(Debug, PartialEq, Eq, serde::Serialize)] +#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] pub enum ErrorResponse { /// Block was already committed to the non-finalized or finalized state @@ -45,7 +45,7 @@ pub enum ErrorResponse { /// Response to a `submitblock` RPC request. /// /// Zebra never returns "duplicate-invalid", because it does not store invalid blocks. -#[derive(Debug, PartialEq, Eq, serde::Serialize)] +#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(untagged)] pub enum Response { /// Block was not successfully submitted, return error diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/hex_data.rs b/zebra-rpc/src/methods/hex_data.rs similarity index 100% rename from zebra-rpc/src/methods/get_block_template_rpcs/types/hex_data.rs rename to zebra-rpc/src/methods/hex_data.rs diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index 04b3139913f..93343d6059f 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -39,13 +39,13 @@ use crate::methods::{ get_block_template_rpcs::types::{ get_block_template::{self, GetBlockTemplateRequestMode}, get_mining_info, - hex_data::HexData, long_poll::{LongPollId, LONG_POLL_ID_LENGTH}, peer_info::PeerInfo, submit_block, subsidy::BlockSubsidy, unified_address, validate_address, z_validate_address, }, + hex_data::HexData, tests::{snapshot::EXCESSIVE_BLOCK_HEIGHT, utils::fake_history_tree}, GetBlockHash, GetBlockTemplateRpc, GetBlockTemplateRpcImpl, }; diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 4d830c51e93..5b5a21e23d0 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -1259,8 +1259,9 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) { GET_BLOCK_TEMPLATE_NONCE_RANGE_FIELD, }, get_block_template::{self, GetBlockTemplateRequestMode}, - types::{hex_data::HexData, long_poll::LONG_POLL_ID_LENGTH}, + types::long_poll::LONG_POLL_ID_LENGTH, }, + hex_data::HexData, tests::utils::fake_history_tree, }; @@ -1547,7 +1548,7 @@ async fn rpc_submitblock_errors() { use zebra_chain::chain_sync_status::MockSyncStatus; use zebra_network::address_book_peers::MockAddressBookPeers; - use crate::methods::get_block_template_rpcs::types::{hex_data::HexData, submit_block}; + use crate::methods::{get_block_template_rpcs::types::submit_block, hex_data::HexData}; let _init_guard = zebra_test::init(); diff --git a/zebra-rpc/src/sync.rs b/zebra-rpc/src/sync.rs new file mode 100644 index 00000000000..53eed4072f0 --- /dev/null +++ b/zebra-rpc/src/sync.rs @@ -0,0 +1,392 @@ +//! Syncer task for maintaining a non-finalized state in Zebra's ReadStateService and updating `ChainTipSender` via RPCs + +use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration}; + +use futures::{stream::FuturesOrdered, StreamExt}; +use tokio::task::JoinHandle; +use tower::BoxError; +use tracing::info; +use zebra_chain::{ + block::{self, Block, Height}, + parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, + serialization::ZcashDeserializeInto, +}; +use zebra_node_services::rpc_client::RpcRequestClient; +use zebra_state::{ + spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock, + LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb, + MAX_BLOCK_REORG_HEIGHT, +}; + +use zebra_chain::diagnostic::task::WaitForPanics; + +use crate::{ + constants::MISSING_BLOCK_ERROR_CODE, + methods::{hex_data::HexData, GetBlockHeightAndHash}, +}; + +/// How long to wait between calls to `getbestblockheightandhash` when it: +/// - Returns an error, or +/// - Returns the block hash of a block that the read state already contains, +/// (so that there's nothing for the syncer to do except wait for the next chain tip change). +/// See the [`TrustedChainSync::wait_for_chain_tip_change()`] method documentation for more information. +const POLL_DELAY: Duration = Duration::from_millis(200); + +/// Syncs non-finalized blocks in the best chain from a trusted Zebra node's RPC methods. +#[derive(Debug)] +struct TrustedChainSync { + /// RPC client for calling Zebra's RPC methods. + rpc_client: RpcRequestClient, + /// The read state service. + db: ZebraDb, + /// The non-finalized state - currently only contains the best chain. + non_finalized_state: NonFinalizedState, + /// The chain tip sender for updating [`LatestChainTip`] and [`ChainTipChange`]. + chain_tip_sender: ChainTipSender, + /// The non-finalized state sender, for updating the [`ReadStateService`] when the non-finalized best chain changes. + non_finalized_state_sender: tokio::sync::watch::Sender, +} + +impl TrustedChainSync { + /// Creates a new [`TrustedChainSync`] with a [`ChainTipSender`], then spawns a task to sync blocks + /// from the node's non-finalized best chain. + /// + /// Returns the [`LatestChainTip`], [`ChainTipChange`], and a [`JoinHandle`] for the sync task. + pub async fn spawn( + rpc_address: SocketAddr, + db: ZebraDb, + non_finalized_state_sender: tokio::sync::watch::Sender, + ) -> (LatestChainTip, ChainTipChange, JoinHandle<()>) { + let rpc_client = RpcRequestClient::new(rpc_address); + let non_finalized_state = NonFinalizedState::new(&db.network()); + let (chain_tip_sender, latest_chain_tip, chain_tip_change) = + ChainTipSender::new(None, &db.network()); + + let mut syncer = Self { + rpc_client, + db, + non_finalized_state, + chain_tip_sender, + non_finalized_state_sender, + }; + + let sync_task = tokio::spawn(async move { + syncer.sync().await; + }); + + (latest_chain_tip, chain_tip_change, sync_task) + } + + /// Starts syncing blocks from the node's non-finalized best chain and checking for chain tip changes in the finalized state. + /// + /// When the best chain tip in Zebra is not available in the finalized state or the local non-finalized state, + /// gets any unavailable blocks in Zebra's best chain from the RPC server, adds them to the local non-finalized state, then + /// sends the updated chain tip block and non-finalized state to the [`ChainTipSender`] and non-finalized state sender. + async fn sync(&mut self) { + self.try_catch_up_with_primary().await; + let mut last_chain_tip_hash = + if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await { + let last_chain_tip_hash = finalized_tip_block.hash; + self.chain_tip_sender.set_finalized_tip(finalized_tip_block); + last_chain_tip_hash + } else { + GENESIS_PREVIOUS_BLOCK_HASH + }; + + loop { + let (target_tip_height, target_tip_hash) = + self.wait_for_chain_tip_change(last_chain_tip_hash).await; + + info!( + ?target_tip_height, + ?target_tip_hash, + "got a chain tip change" + ); + + if self.is_finalized_tip_change(target_tip_hash).await { + let block = self.finalized_chain_tip_block().await.expect( + "should have genesis block after successful bestblockheightandhash response", + ); + + last_chain_tip_hash = block.hash; + self.chain_tip_sender.set_finalized_tip(block); + continue; + } + + // If the new best chain tip is unavailable in the finalized state, start syncing non-finalized blocks from + // the non-finalized best chain tip height or finalized tip height. + let (next_block_height, mut current_tip_hash) = + self.next_block_height_and_prev_hash().await; + + last_chain_tip_hash = current_tip_hash; + + let rpc_client = self.rpc_client.clone(); + let mut block_futs = + rpc_client.block_range_ordered(next_block_height..=target_tip_height); + + let should_reset_non_finalized_state = loop { + let block = match block_futs.next().await { + Some(Ok(Some(block))) + if block.header.previous_block_hash == current_tip_hash => + { + SemanticallyVerifiedBlock::from(block) + } + // Clear the non-finalized state and re-fetch every block past the finalized tip if: + // - the next block's previous block hash doesn't match the expected hash, + // - the next block is missing + // - the target tip hash is missing from the blocks in `block_futs` + // because there was likely a chain re-org/fork. + Some(Ok(_)) | None => break true, + // If calling the `getblock` RPC method fails with an unexpected error, wait for the next chain tip change + // without resetting the non-finalized state. + Some(Err(err)) => { + tracing::warn!( + ?err, + "encountered an unexpected error while calling getblock method" + ); + + break false; + } + }; + + let block_hash = block.hash; + let commit_result = if self.non_finalized_state.chain_count() == 0 { + self.non_finalized_state + .commit_new_chain(block.clone(), &self.db) + } else { + self.non_finalized_state + .commit_block(block.clone(), &self.db) + }; + + // The previous block hash is checked above, if committing the block fails for some reason, try again. + if let Err(error) = commit_result { + tracing::warn!( + ?error, + ?block_hash, + "failed to commit block to non-finalized state" + ); + + break false; + } + + // TODO: Check the finalized tip height and finalize blocks from the non-finalized state until + // all non-finalized state chain root previous block hashes match the finalized tip hash. + while self + .non_finalized_state + .best_chain_len() + .expect("just successfully inserted a non-finalized block above") + > MAX_BLOCK_REORG_HEIGHT + { + tracing::trace!("finalizing block past the reorg limit"); + self.non_finalized_state.finalize(); + } + + self.update_channels(block); + current_tip_hash = block_hash; + last_chain_tip_hash = current_tip_hash; + + // If the block hash matches the output from the `getbestblockhash` RPC method, we can wait until + // the best block hash changes to get the next block. + if block_hash == target_tip_hash { + break false; + } + }; + + if should_reset_non_finalized_state { + self.try_catch_up_with_primary().await; + let block = self.finalized_chain_tip_block().await.expect( + "should have genesis block after successful bestblockheightandhash response", + ); + + last_chain_tip_hash = block.hash; + self.non_finalized_state = + NonFinalizedState::new(&self.non_finalized_state.network); + self.update_channels(block); + } + } + } + + /// Tries to catch up to the primary db instance for an up-to-date view of finalized blocks. + async fn try_catch_up_with_primary(&self) { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + if let Err(catch_up_error) = db.try_catch_up_with_primary() { + tracing::warn!(?catch_up_error, "failed to catch up to primary"); + } + }) + .wait_for_panics() + .await + } + + /// If the non-finalized state is empty, tries to catch up to the primary db instance for + /// an up-to-date view of finalized blocks. + /// + /// Returns true if the non-finalized state is empty and the target hash is in the finalized state. + async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool { + self.non_finalized_state.chain_count() == 0 && { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + if let Err(catch_up_error) = db.try_catch_up_with_primary() { + tracing::warn!(?catch_up_error, "failed to catch up to primary"); + } + db.contains_hash(target_tip_hash) + }) + .wait_for_panics() + .await + } + } + + /// Returns the current tip hash and the next height immediately after the current tip height. + async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) { + if let Some(tip) = self.non_finalized_state.best_tip() { + Some(tip) + } else { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || db.tip()) + .wait_for_panics() + .await + } + .map(|(current_tip_height, current_tip_hash)| { + ( + current_tip_height.next().expect("should be valid height"), + current_tip_hash, + ) + }) + .unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH)) + } + + /// Reads the finalized tip block from the secondary db instance and converts it to a [`ChainTipBlock`]. + async fn finalized_chain_tip_block(&self) -> Option { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let (height, hash) = db.tip()?; + db.block(height.into()) + .map(|block| CheckpointVerifiedBlock::with_hash(block, hash)) + .map(ChainTipBlock::from) + }) + .wait_for_panics() + .await + } + + /// Accepts a block hash. + /// + /// Polls `getbestblockheightandhash` RPC method until it successfully returns a block hash that is different from the last chain tip hash. + /// + /// Returns the node's best block hash. + async fn wait_for_chain_tip_change( + &self, + last_chain_tip_hash: block::Hash, + ) -> (block::Height, block::Hash) { + loop { + let Some(target_height_and_hash) = self + .rpc_client + .get_best_block_height_and_hash() + .await + .filter(|&(_height, hash)| hash != last_chain_tip_hash) + else { + // If `get_best_block_height_and_hash()` returns an error, or returns + // the current chain tip hash, wait [`POLL_DELAY`], then try again. + tokio::time::sleep(POLL_DELAY).await; + continue; + }; + + break target_height_and_hash; + } + } + + /// Sends the new chain tip and non-finalized state to the latest chain channels. + fn update_channels(&mut self, best_tip: impl Into) { + // If the final receiver was just dropped, ignore the error. + let _ = self + .non_finalized_state_sender + .send(self.non_finalized_state.clone()); + self.chain_tip_sender + .set_best_non_finalized_tip(Some(best_tip.into())); + } +} + +/// Accepts a [zebra-state configuration](zebra_state::Config), a [`Network`], and +/// the [`SocketAddr`] of a Zebra node's RPC server. +/// +/// Initializes a [`ReadStateService`] and a [`TrustedChainSync`] to update the +/// non-finalized best chain and the latest chain tip. +/// +/// Returns a [`ReadStateService`], [`LatestChainTip`], [`ChainTipChange`], and +/// a [`JoinHandle`] for the sync task. +pub fn init_read_state_with_syncer( + config: zebra_state::Config, + network: &Network, + rpc_address: SocketAddr, +) -> tokio::task::JoinHandle< + Result< + ( + ReadStateService, + LatestChainTip, + ChainTipChange, + tokio::task::JoinHandle<()>, + ), + BoxError, + >, +> { + let network = network.clone(); + tokio::spawn(async move { + if config.ephemeral { + return Err("standalone read state service cannot be used with ephemeral state".into()); + } + + let (read_state, db, non_finalized_state_sender) = + spawn_init_read_only(config, &network).await?; + let (latest_chain_tip, chain_tip_change, sync_task) = + TrustedChainSync::spawn(rpc_address, db, non_finalized_state_sender).await; + Ok((read_state, latest_chain_tip, chain_tip_change, sync_task)) + }) +} + +trait SyncerRpcMethods { + async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)>; + async fn get_block(&self, height: u32) -> Result>, BoxError>; + fn block_range_ordered( + &self, + height_range: RangeInclusive, + ) -> FuturesOrdered>, BoxError>>> + { + let &Height(start_height) = height_range.start(); + let &Height(end_height) = height_range.end(); + let mut futs = FuturesOrdered::new(); + + for height in start_height..=end_height { + futs.push_back(self.get_block(height)); + } + + futs + } +} + +impl SyncerRpcMethods for RpcRequestClient { + async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)> { + self.json_result_from_call("getbestblockheightandhash", "[]") + .await + .map(|GetBlockHeightAndHash { height, hash }| (height, hash)) + .ok() + } + + async fn get_block(&self, height: u32) -> Result>, BoxError> { + match self + .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height)) + .await + { + Ok(HexData(raw_block)) => { + let block = raw_block.zcash_deserialize_into::()?; + Ok(Some(Arc::new(block))) + } + Err(err) + if err + .downcast_ref::() + .is_some_and(|err| err.code == MISSING_BLOCK_ERROR_CODE) => + { + Ok(None) + } + Err(err) => Err(err), + } + } +} diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 58a83c9e15d..e93a3b8f905 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -63,14 +63,12 @@ pub use service::finalized_state::{ // Allow use in the scanner and external tests #[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))] -pub use service::{ - finalized_state::{ - DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk, - WriteTypedBatch, ZebraDb, - }, - ReadStateService, +pub use service::finalized_state::{ + DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk, WriteTypedBatch, }; +pub use service::{finalized_state::ZebraDb, ReadStateService}; + #[cfg(feature = "getblocktemplate-rpcs")] pub use response::GetBlockTemplateChainInfo; diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index ac0da2795c1..0c25f6a273c 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -567,6 +567,11 @@ impl DiskDb { ); } + /// When called with a secondary DB instance, tries to catch up with the primary DB instance + pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> { + self.db.try_catch_up_with_primary() + } + /// Returns a forward iterator over the items in `cf` in `range`. /// /// Holding this iterator open might delay block commit transactions. @@ -834,7 +839,23 @@ impl DiskDb { .map(|cf_name| rocksdb::ColumnFamilyDescriptor::new(cf_name, db_options.clone())); let db_result = if read_only { - DB::open_cf_descriptors_read_only(&db_options, &path, column_families, false) + // Use a tempfile for the secondary instance cache directory + let secondary_config = Config { + ephemeral: true, + ..config.clone() + }; + let secondary_path = + secondary_config.db_path("secondary_state", format_version_in_code.major, network); + let create_dir_result = std::fs::create_dir_all(&secondary_path); + + info!(?create_dir_result, "creating secondary db directory"); + + DB::open_cf_descriptors_as_secondary( + &db_options, + &path, + &secondary_path, + column_families, + ) } else { DB::open_cf_descriptors(&db_options, &path, column_families) }; diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index 17891e38f39..56145b1d4e2 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -233,6 +233,11 @@ impl ZebraDb { } } + /// When called with a secondary DB instance, tries to catch up with the primary DB instance + pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> { + self.db.try_catch_up_with_primary() + } + /// Shut down the database, cleaning up background tasks and ephemeral data. /// /// If `force` is true, clean up regardless of any shared references. diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index df2a481f228..b9b19846a36 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -161,7 +161,7 @@ test_sync_past_mandatory_checkpoint_testnet = [] zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38" } zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" } zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" } -zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38" } +zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] } zebra-rpc = { path = "../zebra-rpc", version = "1.0.0-beta.38" } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" } @@ -286,8 +286,6 @@ zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features zebra-scan = { path = "../zebra-scan", version = "0.1.0-alpha.7", features = ["proptest-impl"] } zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] } -zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] } - zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" } zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.5" } diff --git a/zebrad/src/components/miner.rs b/zebrad/src/components/miner.rs index 193f30e8c5b..cb32cc91981 100644 --- a/zebrad/src/components/miner.rs +++ b/zebrad/src/components/miner.rs @@ -30,13 +30,11 @@ use zebra_node_services::mempool; use zebra_rpc::{ config::mining::Config, methods::{ - get_block_template_rpcs::{ - get_block_template::{ - self, proposal::TimeSource, proposal_block_from_template, - GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*, - }, - types::hex_data::HexData, + get_block_template_rpcs::get_block_template::{ + self, proposal::TimeSource, proposal_block_from_template, + GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*, }, + hex_data::HexData, GetBlockTemplateRpc, GetBlockTemplateRpcImpl, }, }; diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 8fd96a2d668..eee5be6e016 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -3161,3 +3161,282 @@ async fn regtest_submit_blocks() -> Result<()> { common::regtest::submit_blocks_test().await?; Ok(()) } + +#[tokio::test(flavor = "multi_thread")] +#[cfg(feature = "getblocktemplate-rpcs")] +async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> { + use std::sync::Arc; + + use common::regtest::MiningRpcMethods; + use eyre::Error; + use tokio::time::timeout; + use zebra_chain::{ + chain_tip::ChainTip, parameters::NetworkUpgrade, + primitives::byte_array::increment_big_endian, + }; + use zebra_rpc::methods::GetBlockHash; + use zebra_state::{ReadResponse, Response}; + + let _init_guard = zebra_test::init(); + let mut config = random_known_rpc_port_config(false, &Network::new_regtest(None))?; + config.state.ephemeral = false; + let network = config.network.network.clone(); + let rpc_address = config.rpc.listen_addr.unwrap(); + + let test_dir = testdir()?.with_config(&mut config)?; + + let mut child = test_dir.spawn_child(args!["start"])?; + + tracing::info!("waiting for Zebra state cache to be opened"); + + #[cfg(not(target_os = "windows"))] + child.expect_stdout_line_matches("marked database format as newly created")?; + + #[cfg(target_os = "windows")] + tokio::time::sleep(LAUNCH_DELAY).await; + + tracing::info!("starting read state with syncer"); + // Spawn a read state with the RPC syncer to check that it has the same best chain as Zebra + let (read_state, _latest_chain_tip, mut chain_tip_change, _sync_task) = + zebra_rpc::sync::init_read_state_with_syncer( + config.state, + &config.network.network, + rpc_address, + ) + .await? + .map_err(|err| eyre!(err))?; + + tracing::info!("waiting for first chain tip change"); + + // Wait for Zebrad to start up + let tip_action = timeout(LAUNCH_DELAY, chain_tip_change.wait_for_tip_change()).await??; + assert!( + tip_action.is_reset(), + "first tip action should be a reset for the genesis block" + ); + + tracing::info!("got genesis chain tip change, submitting more blocks .."); + + let rpc_client = RpcRequestClient::new(rpc_address); + let mut blocks = Vec::new(); + for _ in 0..10 { + let (block, height) = rpc_client.submit_block_from_template().await?; + blocks.push(block); + let tip_action = timeout( + Duration::from_secs(1), + chain_tip_change.wait_for_tip_change(), + ) + .await??; + + assert_eq!( + tip_action.best_tip_height(), + height, + "tip action height should match block submission" + ); + } + + tracing::info!("checking that read state has the new non-finalized best chain blocks"); + for expected_block in blocks.clone() { + let height = expected_block.coinbase_height().unwrap(); + let zebra_block = rpc_client + .get_block(height.0 as i32) + .await + .map_err(|err| eyre!(err))? + .expect("Zebra test child should have the expected block"); + + assert_eq!( + zebra_block, + Arc::new(expected_block), + "Zebra should have the same block" + ); + + let ReadResponse::Block(read_state_block) = read_state + .clone() + .oneshot(zebra_state::ReadRequest::Block(height.into())) + .await + .map_err(|err| eyre!(err))? + else { + unreachable!("unexpected read response to a block request") + }; + + assert_eq!( + zebra_block, + read_state_block.expect("read state should have the block"), + "read state should have the same block" + ); + } + + tracing::info!("getting next block template"); + let (block_11, _) = rpc_client.block_from_template(Height(100)).await?; + let next_blocks: Vec<_> = blocks + .split_off(5) + .into_iter() + .chain(std::iter::once(block_11)) + .collect(); + + tracing::info!("creating populated state"); + let genesis_block = regtest_genesis_block(); + let (state2, read_state2, latest_chain_tip2, _chain_tip_change2) = + zebra_state::populated_state( + std::iter::once(genesis_block).chain(blocks.iter().cloned().map(Arc::new)), + &network, + ) + .await; + + tracing::info!("attempting to trigger a best chain change"); + for mut block in next_blocks { + let is_chain_history_activation_height = NetworkUpgrade::Heartwood + .activation_height(&network) + == Some(block.coinbase_height().unwrap()); + let header = Arc::make_mut(&mut block.header); + increment_big_endian(header.nonce.as_mut()); + let ReadResponse::ChainInfo(chain_info) = read_state2 + .clone() + .oneshot(zebra_state::ReadRequest::ChainInfo) + .await + .map_err(|err| eyre!(err))? + else { + unreachable!("wrong response variant"); + }; + + header.previous_block_hash = chain_info.tip_hash; + header.commitment_bytes = chain_info + .history_tree + .hash() + .or(is_chain_history_activation_height.then_some([0; 32].into())) + .expect("history tree can't be empty") + .bytes_in_serialized_order() + .into(); + + let Response::Committed(block_hash) = state2 + .clone() + .oneshot(zebra_state::Request::CommitSemanticallyVerifiedBlock( + Arc::new(block.clone()).into(), + )) + .await + .map_err(|err| eyre!(err))? + else { + unreachable!("wrong response variant"); + }; + + assert!( + chain_tip_change.last_tip_change().is_none(), + "there should be no tip change until the last block is submitted" + ); + + rpc_client.submit_block(block.clone()).await?; + blocks.push(block); + let GetBlockHash(best_block_hash) = rpc_client + .json_result_from_call("getbestblockhash", "[]") + .await + .map_err(|err| eyre!(err))?; + + if block_hash == best_block_hash { + break; + } + } + + tracing::info!("newly submitted blocks are in the best chain, checking for reset"); + tokio::time::sleep(Duration::from_secs(3)).await; + let tip_action = timeout( + Duration::from_secs(1), + chain_tip_change.wait_for_tip_change(), + ) + .await??; + let (expected_height, expected_hash) = latest_chain_tip2 + .best_tip_height_and_hash() + .expect("should have a chain tip"); + assert!(tip_action.is_reset(), "tip action should be reset"); + assert_eq!( + tip_action.best_tip_hash_and_height(), + (expected_hash, expected_height), + "tip action hashes and heights should match" + ); + + tracing::info!("checking that read state has the new non-finalized best chain blocks"); + for expected_block in blocks { + let height = expected_block.coinbase_height().unwrap(); + let zebra_block = rpc_client + .get_block(height.0 as i32) + .await + .map_err(|err| eyre!(err))? + .expect("Zebra test child should have the expected block"); + + assert_eq!( + zebra_block, + Arc::new(expected_block), + "Zebra should have the same block" + ); + + let ReadResponse::Block(read_state_block) = read_state + .clone() + .oneshot(zebra_state::ReadRequest::Block(height.into())) + .await + .map_err(|err| eyre!(err))? + else { + unreachable!("unexpected read response to a block request") + }; + + assert_eq!( + zebra_block, + read_state_block.expect("read state should have the block"), + "read state should have the same block" + ); + } + + tracing::info!("restarting Zebra on Mainnet"); + + child.kill(false)?; + let output = child.wait_with_output()?; + + // Make sure the command was killed + output.assert_was_killed()?; + + output.assert_failure()?; + + let mut config = random_known_rpc_port_config(false, &Network::Mainnet)?; + config.state.ephemeral = false; + let rpc_address = config.rpc.listen_addr.unwrap(); + + let test_dir = testdir()?.with_config(&mut config)?; + + let mut child = test_dir.spawn_child(args!["start"])?; + + tracing::info!("waiting for Zebra state cache to be opened"); + + #[cfg(not(target_os = "windows"))] + child.expect_stdout_line_matches("marked database format as newly created")?; + + #[cfg(target_os = "windows")] + tokio::time::sleep(LAUNCH_DELAY).await; + + tracing::info!("starting read state with syncer"); + // Spawn a read state with the RPC syncer to check that it has the same best chain as Zebra + let (_read_state, _latest_chain_tip, mut chain_tip_change, _sync_task) = + zebra_rpc::sync::init_read_state_with_syncer( + config.state, + &config.network.network, + rpc_address, + ) + .await? + .map_err(|err| eyre!(err))?; + + tracing::info!("waiting for finalized chain tip changes"); + + timeout( + Duration::from_secs(100), + tokio::spawn(async move { + for _ in 0..2 { + chain_tip_change + .wait_for_tip_change() + .await + .map_err(|err| eyre!(err))?; + } + + Ok::<(), Error>(()) + }), + ) + .await???; + + Ok(()) +} diff --git a/zebrad/tests/common/get_block_template_rpcs/get_peer_info.rs b/zebrad/tests/common/get_block_template_rpcs/get_peer_info.rs index 5dd0fd81604..4ca0bc797ad 100644 --- a/zebrad/tests/common/get_block_template_rpcs/get_peer_info.rs +++ b/zebrad/tests/common/get_block_template_rpcs/get_peer_info.rs @@ -1,6 +1,6 @@ //! Tests that `getpeerinfo` RPC method responds with info about at least 1 peer. -use color_eyre::eyre::{Context, Result}; +use color_eyre::eyre::{eyre, Context, Result}; use zebra_chain::parameters::Network; use zebra_node_services::rpc_client::RpcRequestClient; @@ -41,7 +41,8 @@ pub(crate) async fn run() -> Result<()> { // call `getpeerinfo` RPC method let peer_info_result: Vec = RpcRequestClient::new(rpc_address) .json_result_from_call("getpeerinfo", "[]".to_string()) - .await?; + .await + .map_err(|err| eyre!(err))?; assert!( !peer_info_result.is_empty(), diff --git a/zebrad/tests/common/regtest.rs b/zebrad/tests/common/regtest.rs index 86a20546f40..669434a6c73 100644 --- a/zebrad/tests/common/regtest.rs +++ b/zebrad/tests/common/regtest.rs @@ -5,18 +5,27 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; -use color_eyre::eyre::{Context, Result}; +use color_eyre::eyre::{eyre, Context, Result}; +use tower::BoxError; use tracing::*; use zebra_chain::{ + block::{Block, Height}, parameters::{testnet::REGTEST_NU5_ACTIVATION_HEIGHT, Network, NetworkUpgrade}, primitives::byte_array::increment_big_endian, - serialization::ZcashSerialize, + serialization::{ZcashDeserializeInto, ZcashSerialize}, }; use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::{ - methods::get_block_template_rpcs::get_block_template::{ - proposal::TimeSource, proposal_block_from_template, GetBlockTemplate, + constants::MISSING_BLOCK_ERROR_CODE, + methods::{ + get_block_template_rpcs::{ + get_block_template::{ + proposal::TimeSource, proposal_block_from_template, GetBlockTemplate, + }, + types::submit_block, + }, + hex_data::HexData, }, server::OPENED_RPC_ENDPOINT_MSG, }; @@ -66,54 +75,99 @@ pub(crate) async fn submit_blocks_test() -> Result<()> { async fn submit_blocks(network: Network, rpc_address: SocketAddr) -> Result<()> { let client = RpcRequestClient::new(rpc_address); - for height in 1..=NUM_BLOCKS_TO_SUBMIT { - let block_template: GetBlockTemplate = client + for _ in 1..=NUM_BLOCKS_TO_SUBMIT { + let (mut block, height) = client + .block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT)) + .await?; + + while !network.disable_pow() + && zebra_consensus::difficulty_is_valid(&block.header, &network, &height, &block.hash()) + .is_err() + { + increment_big_endian(Arc::make_mut(&mut block.header).nonce.as_mut()); + } + + if height.0 % 40 == 0 { + info!(?block, ?height, "submitting block"); + } + + client.submit_block(block).await?; + } + + Ok(()) +} + +pub trait MiningRpcMethods { + async fn block_from_template(&self, nu5_activation_height: Height) -> Result<(Block, Height)>; + async fn submit_block(&self, block: Block) -> Result<()>; + async fn submit_block_from_template(&self) -> Result<(Block, Height)>; + async fn get_block(&self, height: i32) -> Result>, BoxError>; +} + +impl MiningRpcMethods for RpcRequestClient { + async fn block_from_template(&self, nu5_activation_height: Height) -> Result<(Block, Height)> { + let block_template: GetBlockTemplate = self .json_result_from_call("getblocktemplate", "[]".to_string()) .await .expect("response should be success output with a serialized `GetBlockTemplate`"); - let network_upgrade = if height < REGTEST_NU5_ACTIVATION_HEIGHT.try_into().unwrap() { + let height = Height(block_template.height); + + let network_upgrade = if height < nu5_activation_height { NetworkUpgrade::Canopy } else { NetworkUpgrade::Nu5 }; - let mut block = - proposal_block_from_template(&block_template, TimeSource::default(), network_upgrade)?; - let height = block - .coinbase_height() - .expect("should have a coinbase height"); - - while !network.disable_pow() - && zebra_consensus::difficulty_is_valid(&block.header, &network, &height, &block.hash()) - .is_err() - { - increment_big_endian(Arc::make_mut(&mut block.header).nonce.as_mut()); - } + Ok(( + proposal_block_from_template(&block_template, TimeSource::default(), network_upgrade)?, + height, + )) + } + async fn submit_block(&self, block: Block) -> Result<()> { let block_data = hex::encode(block.zcash_serialize_to_vec()?); - let submit_block_response = client - .text_from_call("submitblock", format!(r#"["{block_data}"]"#)) - .await?; - - let was_submission_successful = submit_block_response.contains(r#""result":null"#); + let submit_block_response: submit_block::Response = self + .json_result_from_call("submitblock", format!(r#"["{block_data}"]"#)) + .await + .map_err(|err| eyre!(err))?; - if height.0 % 40 == 0 { - info!( - was_submission_successful, - ?block_template, - ?network_upgrade, - "submitted block" - ); + match submit_block_response { + submit_block::Response::Accepted => Ok(()), + submit_block::Response::ErrorResponse(err) => { + Err(eyre!("block submission failed: {err:?}")) + } } + } + + async fn submit_block_from_template(&self) -> Result<(Block, Height)> { + let (block, height) = self + .block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT)) + .await?; + + self.submit_block(block.clone()).await?; - // Check that the block was validated and committed. - assert!( - submit_block_response.contains(r#""result":null"#), - "unexpected response from submitblock RPC, should be null, was: {submit_block_response}" - ); + Ok((block, height)) } - Ok(()) + async fn get_block(&self, height: i32) -> Result>, BoxError> { + match self + .json_result_from_call("getblock", format!(r#"["{}", 0]"#, height)) + .await + { + Ok(HexData(raw_block)) => { + let block = raw_block.zcash_deserialize_into::()?; + Ok(Some(Arc::new(block))) + } + Err(err) + if err + .downcast_ref::() + .is_some_and(|err| err.code == MISSING_BLOCK_ERROR_CODE) => + { + Ok(None) + } + Err(err) => Err(err), + } + } }