Skip to content

Commit

Permalink
Lazy loading improvements (#3066)
Browse files Browse the repository at this point in the history
* Increase authoring deadline to 60 seconds when using lazy loading

* Change lazy loading counter to be thread-safe

* remove extras

* chore: fiz block finalization when using lazy loading

* disabled frontier block synchronization when in lazy loading

---------

Co-authored-by: crystalin <[email protected]>
  • Loading branch information
RomarQ and crystalin authored Jan 9, 2025
1 parent 1275f83 commit 3aea751
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 62 deletions.
49 changes: 25 additions & 24 deletions node/service/src/lazy_loading/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ use sp_state_machine::{
};
use std::future::Future;
use std::marker::PhantomData;
use std::ops::AddAssign;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
ptr,
sync::Arc,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use sc_client_api::{
Expand Down Expand Up @@ -195,8 +197,10 @@ impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
self.apply_head(&header)?;
}

{
let mut storage = self.storage.write();
let mut storage = self.storage.write();
if number.is_zero() {
storage.genesis_hash = hash;
} else {
storage.leaves.import(hash, number, *header.parent_hash());
storage
.blocks
Expand All @@ -206,10 +210,6 @@ impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
storage.finalized_hash = hash;
storage.finalized_number = number;
}

if number == Zero::zero() {
storage.genesis_hash = hash;
}
}

Ok(())
Expand Down Expand Up @@ -499,7 +499,9 @@ impl<Block: BlockT + DeserializeOwned> blockchain::Backend<Block> for Blockchain
}

fn leaves(&self) -> sp_blockchain::Result<Vec<Block::Hash>> {
Ok(self.storage.read().leaves.hashes())
let leaves = self.storage.read().leaves.hashes();

Ok(leaves)
}

fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
Expand Down Expand Up @@ -949,7 +951,7 @@ impl<Block: BlockT> ForkedLazyBackend<Block> {
let mut entries: HashMap<Option<ChildInfo>, StorageCollection> = Default::default();
entries.insert(None, vec![(key.to_vec(), Some(val.clone()))]);

self.db.write().insert(entries, StateVersion::V0);
self.db.write().insert(entries, StateVersion::V1);
}
}
}
Expand Down Expand Up @@ -1255,10 +1257,8 @@ impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block
}

fn commit_operation(&self, operation: Self::BlockImportOperation) -> sp_blockchain::Result<()> {
if !operation.finalized_blocks.is_empty() {
for (block, justification) in operation.finalized_blocks {
self.blockchain.finalize_header(block, justification)?;
}
for (block, justification) in operation.finalized_blocks {
self.blockchain.finalize_header(block, justification)?;
}

if let Some(pending_block) = operation.pending_block {
Expand All @@ -1278,7 +1278,7 @@ impl<Block: BlockT + DeserializeOwned> backend::Backend<Block> for Backend<Block
let new_db = old_state.db.clone();
new_db.write().insert(
vec![(None::<ChildInfo>, operation.storage_updates)],
StateVersion::V0,
StateVersion::V1,
);
let new_state = ForkedLazyBackend {
rpc_client: self.rpc_client.clone(),
Expand Down Expand Up @@ -1457,7 +1457,7 @@ pub struct RPC {
http_client: HttpClient,
delay_between_requests_ms: u32,
max_retries_per_request: u32,
counter: Arc<ReadWriteLock<u64>>,
counter: Arc<AtomicU64>,
}

impl RPC {
Expand Down Expand Up @@ -1652,17 +1652,19 @@ impl RPC {
{
use tokio::runtime::Handle;

let id = self.counter.fetch_add(1, Ordering::SeqCst);
let start = std::time::Instant::now();

tokio::task::block_in_place(move || {
Handle::current().block_on(async move {
let delay_between_requests =
Duration::from_millis(self.delay_between_requests_ms.into());

let start = std::time::Instant::now();
self.counter.write().add_assign(1);
let start_req = std::time::Instant::now();
log::debug!(
target: super::LAZY_LOADING_LOG_TARGET,
"Sending request: {}",
self.counter.read()
id
);

// Explicit request delay, to avoid getting 429 errors
Expand All @@ -1676,10 +1678,11 @@ impl RPC {

log::debug!(
target: super::LAZY_LOADING_LOG_TARGET,
"Completed request (id: {}, successful: {}, elapsed_time: {:?})",
self.counter.read(),
"Completed request (id: {}, successful: {}, elapsed_time: {:?}, query_time: {:?})",
id,
result.is_ok(),
start.elapsed()
start.elapsed(),
start_req.elapsed()
);

result
Expand Down Expand Up @@ -1764,8 +1767,6 @@ where
})
.collect();

let _ = helpers::produce_genesis_block(backend.clone());

// Produce first block after the fork
let _ = helpers::produce_first_block(backend.clone(), checkpoint, state_overrides)?;

Expand Down
32 changes: 1 addition & 31 deletions node/service/src/lazy_loading/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,6 @@ use sp_runtime::Saturating;
use sp_storage::{StateVersion, Storage, StorageKey};
use std::sync::Arc;

pub fn produce_genesis_block<TBl: BlockT + sp_runtime::DeserializeOwned>(
backend: Arc<lazy_loading::backend::Backend<TBl>>,
) -> sp_blockchain::Result<()> {
let mut op = backend.begin_operation()?;
op.before_fork = true;

let genesis_block_hash: TBl::Hash = backend
.rpc_client
.block_hash::<TBl>(Some(Default::default()))
.unwrap()
.expect("Not able to obtain genesis block hash");

let genesis_block = backend
.rpc_client
.block::<TBl, _>(Some(genesis_block_hash))
.unwrap()
.unwrap()
.block;

let _ = op.set_block_data(
genesis_block.header().clone(),
Some(genesis_block.extrinsics().to_vec()),
None,
None,
NewBlockState::Final,
);

backend.commit_operation(op)
}

pub fn produce_first_block<Block: BlockT + sp_runtime::DeserializeOwned>(
backend: Arc<lazy_loading::backend::Backend<Block>>,
fork_checkpoint: Block,
Expand Down Expand Up @@ -89,7 +59,7 @@ pub fn produce_first_block<Block: BlockT + sp_runtime::DeserializeOwned>(
top: state_overrides.into_iter().collect(),
children_default: Default::default(),
},
StateVersion::V0,
StateVersion::V1,
)?;

// Create empty first block
Expand Down
213 changes: 213 additions & 0 deletions node/service/src/lazy_loading/manual_sealing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2024 Moonbeam foundation
// This file is part of Moonbeam.

// Moonbeam is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Moonbeam is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Moonbeam. If not, see <http://www.gnu.org/licenses/>.

use cumulus_primitives_core::BlockT;
use frame_benchmarking::__private::codec;
use futures::{Stream, StreamExt, TryFutureExt};
use sc_client_api::backend::Backend as ClientBackend;
use sc_client_api::Finalizer;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction};
use sc_consensus_manual_seal::{
finalize_block, rpc, CreatedBlock, EngineCommand, Error, FinalizeBlockParams, ManualSealParams,
SealBlockParams, MANUAL_SEAL_ENGINE_ID,
};
use sc_transaction_pool_api::TransactionPool;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus::{BlockOrigin, Environment, Proposer, SelectChain};
use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
use sp_runtime::traits::Header;
use std::marker::PhantomData;
use std::time::Duration;

pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
mut block_import,
mut env,
client,
pool,
mut commands_stream,
select_chain,
consensus_data_provider,
create_inherent_data_providers,
}: ManualSealParams<B, BI, E, C, TP, SC, CS, CIDP, P>,
) where
B: BlockT + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + 'static,
CB: ClientBackend<B> + 'static,
E: Environment<B> + 'static,
E::Proposer: Proposer<B, Proof = P>,
CS: Stream<Item = EngineCommand<<B as BlockT>::Hash>> + Unpin + 'static,
SC: SelectChain<B> + 'static,
TP: TransactionPool<Block = B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
while let Some(command) = commands_stream.next().await {
match command {
EngineCommand::SealNewBlock {
create_empty,
finalize,
parent_hash,
sender,
} => {
seal_block(SealBlockParams {
sender,
parent_hash,
finalize,
create_empty,
env: &mut env,
select_chain: &select_chain,
block_import: &mut block_import,
consensus_data_provider: consensus_data_provider.as_deref(),
pool: pool.clone(),
client: client.clone(),
create_inherent_data_providers: &create_inherent_data_providers,
})
.await;
}
EngineCommand::FinalizeBlock {
hash,
sender,
justification,
} => {
let justification = justification.map(|j| (MANUAL_SEAL_ENGINE_ID, j));
finalize_block(FinalizeBlockParams {
hash,
sender,
justification,
finalizer: client.clone(),
_phantom: PhantomData,
})
.await
}
}
}
}

/// max duration for creating a proposal in secs
pub const MAX_PROPOSAL_DURATION: u64 = 60;

/// seals a new block with the given params
pub async fn seal_block<B, BI, SC, C, E, TP, CIDP, P>(
SealBlockParams {
create_empty,
finalize,
pool,
parent_hash,
client,
select_chain,
block_import,
env,
create_inherent_data_providers,
consensus_data_provider: digest_provider,
mut sender,
}: SealBlockParams<'_, B, BI, SC, C, E, TP, CIDP, P>,
) where
B: BlockT,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
E: Environment<B>,
E::Proposer: Proposer<B, Proof = P>,
TP: TransactionPool<Block = B>,
SC: SelectChain<B>,
CIDP: CreateInherentDataProviders<B, ()>,
P: codec::Encode + Send + Sync + 'static,
{
let future = async {
if pool.status().ready == 0 && !create_empty {
return Err(Error::EmptyTransactionPool);
}

// get the header to build this new block on.
// use the parent_hash supplied via `EngineCommand`
// or fetch the best_block.
let parent = match parent_hash {
Some(hash) => client
.header(hash)?
.ok_or_else(|| Error::BlockNotFound(format!("{}", hash)))?,
None => select_chain.best_chain().await?,
};

let inherent_data_providers = create_inherent_data_providers
.create_inherent_data_providers(parent.hash(), ())
.await
.map_err(|e| Error::Other(e))?;

let inherent_data = inherent_data_providers.create_inherent_data().await?;

let proposer = env
.init(&parent)
.map_err(|err| Error::StringError(err.to_string()))
.await?;
let inherents_len = inherent_data.len();

let digest = if let Some(digest_provider) = digest_provider {
digest_provider.create_digest(&parent, &inherent_data)?
} else {
Default::default()
};

let proposal = proposer
.propose(
inherent_data.clone(),
digest,
Duration::from_secs(MAX_PROPOSAL_DURATION),
None,
)
.map_err(|err| Error::StringError(err.to_string()))
.await?;

if proposal.block.extrinsics().len() == inherents_len && !create_empty {
return Err(Error::EmptyTransactionPool);
}

let (header, body) = proposal.block.deconstruct();
let proof = proposal.proof;
let proof_size = proof.encoded_size();
let mut params = BlockImportParams::new(BlockOrigin::Own, header.clone());
params.body = Some(body);
params.finalized = finalize;
params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(
proposal.storage_changes,
));

if let Some(digest_provider) = digest_provider {
digest_provider.append_block_import(&parent, &mut params, &inherent_data, proof)?;
}

// Make sure we return the same post-hash that will be calculated when importing the block
// This is important in case the digest_provider added any signature, seal, ect.
let mut post_header = header.clone();
post_header
.digest_mut()
.logs
.extend(params.post_digests.iter().cloned());

match block_import.import_block(params).await? {
ImportResult::Imported(aux) => Ok(CreatedBlock {
hash: <B as BlockT>::Header::hash(&post_header),
aux,
proof_size,
}),
other => Err(other.into()),
}
};

rpc::send_result(&mut sender, future.await)
}
Loading

0 comments on commit 3aea751

Please sign in to comment.