Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backfill): add #5

Merged
merged 6 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = [
"backfill",
"discv5",
"in-memory-state",
"minimal",
Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ for new developers.

## Overview

| Example | Description | Run |
| ------------------------------------ | -------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- |
| [Discv5](./discv5) | Runs discv5 discovery stack | `cargo run --bin discv5` |
| [In Memory State](./in-memory-state) | Tracks the plain state in memory | `cargo run --bin in-memory-state -- node` |
| [Minimal](./minimal) | Logs every chain commit, reorg and revert notification | `cargo run --bin minimal -- node` |
| [OP Bridge](./op-bridge) | Decodes Optimism deposit and withdrawal receipts from L1 | `cargo run --bin op-bridge -- node` |
| [Remote](./remote) | Emits notifications using a gRPC server, and a consumer that receives them | `cargo run --bin remote-exex -- node` to start Reth node with the ExEx and a gRPC server<br>`cargo run --bin remote-consumer` to start a gRPC client |
| [Rollup](./rollup) | Rollup that derives the state from L1 | `cargo run --bin rollup -- node` |
| Example | Description | Run |
| ------------------------------------ | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| [Backfill](./backfill) | Exposes an RPC to backfill (execute) a range of blocks | `cargo run --bin backfill -- node --http` to start Reth node with an RPC endpoint opened on HTTP port 8545<br><br>`ETH_RPC_URL=http://localhost:8545 TIP=$(cast bn); cast rpc backfill_start $(($TIP - 10)) $TIP` to start the backfill of the last 10 blocks |
| [Discv5](./discv5) | Runs discv5 discovery stack | `cargo run --bin discv5` |
| [In Memory State](./in-memory-state) | Tracks the plain state in memory | `cargo run --bin in-memory-state -- node` |
| [Minimal](./minimal) | Logs every chain commit, reorg and revert notification | `cargo run --bin minimal -- node` |
| [OP Bridge](./op-bridge) | Decodes Optimism deposit and withdrawal receipts from L1 | `cargo run --bin op-bridge -- node` |
| [Remote](./remote) | Emits notifications using a gRPC server, and a consumer that receives them | `cargo run --bin remote-exex -- node` to start Reth node with the ExEx and a gRPC server<br><br>`cargo run --bin remote-consumer` to start a gRPC client |
| [Rollup](./rollup) | Rollup that derives the state from L1 | `cargo run --bin rollup -- node` |

#### License

Expand Down
24 changes: 24 additions & 0 deletions backfill/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "backfill"
version = "0.0.0"
publish = false
edition.workspace = true
license.workspace = true

[dependencies]
# reth
reth-execution-types.workspace = true
reth-exex.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-tracing.workspace = true
reth.workspace = true

# async
eyre.workspace = true
futures.workspace = true
tokio.workspace = true

# rpc
async-trait = "0.1"
jsonrpsee = { version = "0.23", features = ["server", "macros"] }
161 changes: 161 additions & 0 deletions backfill/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::ops::RangeInclusive;

use async_trait::async_trait;
use futures::TryStreamExt;
use jsonrpsee::{
core::RpcResult,
proc_macros::rpc,
types::{error::INTERNAL_ERROR_CODE, ErrorObject},
};
use reth::primitives::{BlockNumber, Requests};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{BackfillJobFactory, ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::{error, info};
use tokio::sync::mpsc;

/// The ExEx that consumes new [`ExExNotification`]s and processes new backfill requests by
/// [`BackfillRpcExt`].
struct BackfillExEx<Node: FullNodeComponents> {
/// The context of the ExEx.
ctx: ExExContext<Node>,
/// Receiver for backfill requests.
backfill_rx: mpsc::UnboundedReceiver<RangeInclusive<BlockNumber>>,
/// Factory for backfill jobs.
backfill_job_factory: BackfillJobFactory<Node::Executor, Node::Provider>,
}

impl<Node: FullNodeComponents> BackfillExEx<Node> {
/// Creates a new instance of the ExEx.
fn new(
ctx: ExExContext<Node>,
backfill_rx: mpsc::UnboundedReceiver<RangeInclusive<BlockNumber>>,
) -> Self {
let backfill_job_factory =
BackfillJobFactory::new(ctx.block_executor().clone(), ctx.provider().clone());
Self { ctx, backfill_rx, backfill_job_factory }
}

/// Starts listening for notifications and backfill requests.
async fn start(mut self) -> eyre::Result<()> {
loop {
tokio::select! {
Some(notification) = self.ctx.notifications.recv() => {
self.process_notification(notification).await?;
}
Some(range) = self.backfill_rx.recv() => {
let _ = self.backfill(range).await.inspect_err(|err| error!(%err, "Backfill error occurred"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will effectively wait until the range is done, and could starve notification processing, we should just spawn this, ideally we also spawn and use some semaphor as guard so we limit the concurrent backfill requests

}
}
}
}

/// Processes the given notification and calls [`Self::process_committed_chain`] for every
/// committed chain.
async fn process_notification(&self, notification: ExExNotification) -> eyre::Result<()> {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};

if let Some(committed_chain) = notification.committed_chain() {
self.process_committed_chain(&committed_chain).await?;

self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}

Ok(())
}

/// Processes the committed chain and logs the number of blocks and transactions.
pub async fn process_committed_chain(&self, chain: &Chain) -> eyre::Result<()> {
// Calculate the number of blocks and transactions in the committed chain
let blocks = chain.blocks().len();
let transactions = chain.blocks().values().map(|block| block.body.len()).sum::<usize>();

info!(first_block = %chain.execution_outcome().first_block, %blocks, %transactions, "Processed committed blocks");
Ok(())
}

/// Backfills the given range of blocks in parallel, calling the
/// [`Self::process_committed_chain`] method for each block.
async fn backfill(&self, range: RangeInclusive<BlockNumber>) -> eyre::Result<()> {
self.backfill_job_factory
// Create a backfill job for the given range
.backfill(range)
// Convert the backfill job into a parallel stream
.into_stream()
// Covert the block execution error into `eyre`
.map_err(Into::into)
// Process each block, returning early if an error occurs
.try_for_each(|(block, output)| async {
let sealed_block = block.seal_slow();
let execution_outcome = ExecutionOutcome::new(
output.state,
output.receipts.into(),
sealed_block.number,
vec![Requests(output.requests)],
);
let chain = Chain::new([sealed_block], execution_outcome, None);

// Process the committed blocks
self.process_committed_chain(&chain).await
})
.await
}
}

#[rpc(server, namespace = "backfill")]
trait BackfillRpcExtApi {
/// Starts backfilling the given range of blocks asynchronously.
#[method(name = "start")]
async fn start(&self, from_block: BlockNumber, to_block: BlockNumber) -> RpcResult<()>;
}

/// The RPC module that exposes the backfill RPC methods and sends backfill requests to
/// [`BackfillExEx`].
struct BackfillRpcExt {
/// Sender for backfill requests.
backfill_tx: mpsc::UnboundedSender<RangeInclusive<BlockNumber>>,
}

#[async_trait]
impl BackfillRpcExtApiServer for BackfillRpcExt {
async fn start(&self, from_block: BlockNumber, to_block: BlockNumber) -> RpcResult<()> {
self.backfill_tx.send(from_block..=to_block).map_err(|err| {
ErrorObject::owned(INTERNAL_ERROR_CODE, "internal error", Some(err.to_string()))
})
}
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
// Create a channel for backfill requests. Sender will go to the RPC server, receiver will
// be used by the ExEx.
let (backfill_tx, backfill_rx) = mpsc::unbounded_channel();

let handle = builder
.node(EthereumNode::default())
// Extend the RPC server with the backfill RPC module.
.extend_rpc_modules(move |ctx| {
ctx.modules.merge_configured(BackfillRpcExt { backfill_tx }.into_rpc())?;
Ok(())
})
// Install the backfill ExEx.
.install_exex("Backfill", |ctx| async move {
Ok(BackfillExEx::new(ctx, backfill_rx).start())
})
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
Loading