diff --git a/Cargo.lock b/Cargo.lock index c430a6779a..ec49b99958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -892,6 +892,7 @@ dependencies = [ "tendermint-proto", "thiserror", "tokio", + "tokio-util 0.7.11", "tonic 0.10.2", "tower 0.4.13", "tower-abci", diff --git a/charts/sequencer/Chart.yaml b/charts/sequencer/Chart.yaml index 832ce7df76..15186e1a9a 100644 --- a/charts/sequencer/Chart.yaml +++ b/charts/sequencer/Chart.yaml @@ -20,7 +20,7 @@ version: 1.0.1 # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "1.0.0" +appVersion: "1.0.1" dependencies: - name: sequencer-relayer diff --git a/charts/sequencer/templates/configmaps.yaml b/charts/sequencer/templates/configmaps.yaml index 89f9deedea..d9ef853c48 100644 --- a/charts/sequencer/templates/configmaps.yaml +++ b/charts/sequencer/templates/configmaps.yaml @@ -75,5 +75,6 @@ data: OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}" {{- if not .Values.global.dev }} {{- else }} + ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS: "{{ not .Values.sequencer.optimisticBlockApis.enabled }}" {{- end }} --- diff --git a/charts/sequencer/values.yaml b/charts/sequencer/values.yaml index 80bd795258..2be3b3228b 100644 --- a/charts/sequencer/values.yaml +++ b/charts/sequencer/values.yaml @@ -120,6 +120,8 @@ sequencer: tracesTimeout: 10 otlpHeaders: traceHeaders: + optimisticBlockApis: + enabled: false cometbft: config: diff --git a/crates/astria-core/src/sequencerblock/v1/mod.rs b/crates/astria-core/src/sequencerblock/v1/mod.rs index 9685f452f6..fe9cfea039 100644 --- a/crates/astria-core/src/sequencerblock/v1/mod.rs +++ b/crates/astria-core/src/sequencerblock/v1/mod.rs @@ -1,5 +1,6 @@ pub mod block; pub mod celestia; +pub mod optimistic; pub use block::{ RollupTransactions, diff --git a/crates/astria-core/src/sequencerblock/v1/optimistic.rs b/crates/astria-core/src/sequencerblock/v1/optimistic.rs new file mode 100644 index 0000000000..54b241cedc --- /dev/null +++ b/crates/astria-core/src/sequencerblock/v1/optimistic.rs @@ -0,0 +1,83 @@ +use bytes::Bytes; + +use crate::{ + generated::astria::sequencerblock::optimistic::v1alpha1 as raw, + Protobuf, +}; + +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +pub struct SequencerBlockCommitError(SequencerBlockCommitErrorKind); + +impl SequencerBlockCommitError { + fn invalid_block_hash(len: usize) -> Self { + Self(SequencerBlockCommitErrorKind::InvalidBlockHash(len)) + } +} + +#[derive(Debug, thiserror::Error)] +enum SequencerBlockCommitErrorKind { + #[error("invalid block hash length: {0}")] + InvalidBlockHash(usize), +} + +#[derive(Clone, Debug)] +pub struct SequencerBlockCommit { + height: u64, + block_hash: [u8; 32], +} + +impl SequencerBlockCommit { + #[must_use] + pub fn new(height: u64, block_hash: [u8; 32]) -> Self { + Self { + height, + block_hash, + } + } + + #[must_use] + pub fn height(&self) -> u64 { + self.height + } + + #[must_use] + pub fn block_hash(&self) -> &[u8; 32] { + &self.block_hash + } +} + +impl From for raw::SequencerBlockCommit { + fn from(value: SequencerBlockCommit) -> Self { + value.to_raw() + } +} + +impl Protobuf for SequencerBlockCommit { + type Error = SequencerBlockCommitError; + type Raw = raw::SequencerBlockCommit; + + fn try_from_raw_ref(raw: &Self::Raw) -> Result { + let Self::Raw { + height, + block_hash, + } = raw; + + let block_hash = block_hash + .as_ref() + .try_into() + .map_err(|_| SequencerBlockCommitError::invalid_block_hash(block_hash.len()))?; + + Ok(SequencerBlockCommit { + height: *height, + block_hash, + }) + } + + fn to_raw(&self) -> Self::Raw { + raw::SequencerBlockCommit { + height: self.height(), + block_hash: Bytes::copy_from_slice(self.block_hash()), + } + } +} diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index 56014df011..63de5bfd5d 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -64,6 +64,7 @@ tendermint-proto = { workspace = true } tendermint = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt", "tracing"] } +tokio-util = { workspace = true, features = ["rt"] } tonic = { workspace = true } tracing = { workspace = true } diff --git a/crates/astria-sequencer/local.env.example b/crates/astria-sequencer/local.env.example index 7a794e616a..4881d5c310 100644 --- a/crates/astria-sequencer/local.env.example +++ b/crates/astria-sequencer/local.env.example @@ -33,6 +33,9 @@ ASTRIA_SEQUENCER_METRICS_HTTP_LISTENER_ADDR="127.0.0.1:9000" # `ASTRIA_SEQUENCER_FORCE_STDOUT` is set to `true`. ASTRIA_SEQUENCER_PRETTY_PRINT=false +# Disables streaming optimistic blocks to clients. +ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS=false + # If set to any non-empty value removes ANSI escape characters from the pretty # printed output. Note that this does nothing unless `ASTRIA_SEQUENCER_PRETTY_PRINT` # is set to `true`. diff --git a/crates/astria-sequencer/src/app/event_bus.rs b/crates/astria-sequencer/src/app/event_bus.rs new file mode 100644 index 0000000000..e821274b71 --- /dev/null +++ b/crates/astria-sequencer/src/app/event_bus.rs @@ -0,0 +1,158 @@ +use std::sync::Arc; + +use astria_core::sequencerblock::v1::SequencerBlock; +use astria_eyre::eyre::WrapErr as _; +use tendermint::abci::request::FinalizeBlock; +use tokio::sync::watch::{ + Receiver, + Sender, +}; +use tokio_util::sync::CancellationToken; + +/// `EventReceiver` contains the receiver side of the events sent by the Sequencer App. +/// The listeners of the events can receive the latest value of the event by calling the +/// `receive` method. +#[derive(Clone)] +pub(crate) struct EventReceiver { + // The receiver side of the watch which is read for the latest value of the event. + // We receive an Option over T because the sender side of the watch is designed to send + // Option values. This allows the sender value to send objects which do not have a `Default` + // implementation. + inner: Receiver>, + // To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was + // set. + is_init: CancellationToken, +} + +impl EventReceiver +where + T: Clone, +{ + // Marks the current message in the receiver end of the watch as seen. + // This is useful in situations where we want to ignore the current value of the watch + // and wait for the next value. + pub(crate) fn mark_latest_event_as_seen(&mut self) { + self.inner.mark_unchanged(); + } + + // Returns the latest value of the event, waiting for the value to change if it hasn't already. + pub(crate) async fn receive(&mut self) -> astria_eyre::Result { + // This will get resolved when the sender side of the watch is initialized by sending + // the first value. We wait till the sender side of the watch has initialized the watch. + // Once the sender side has been initialized, it will get resolved immediately. + self.is_init.cancelled().await; + // We want to only receive the latest value through the receiver, so we wait for the + // current value in the watch to change before we return it. + self.inner + .changed() + .await + .wrap_err("error waiting for latest event")?; + Ok(self.inner.borrow_and_update().clone().expect( + "events must be set after is_init is triggered; this means an invariant was violated", + )) + } +} + +/// `EventSender` contains the sender side of the events sent by the Sequencer App. +/// At any given time, it sends the latest value of the event. +struct EventSender { + // A watch channel that is always starts unset. Once set, the `is_init` token is cancelled + // and value in the channel will never be unset. + inner: Sender>, + // To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was + // set. + is_init: CancellationToken, +} + +impl EventSender { + fn new() -> Self { + let (sender, _) = tokio::sync::watch::channel(None); + Self { + inner: sender, + is_init: CancellationToken::new(), + } + } + + // Returns a `EventReceiver` object that contains the receiver side of the watch which can be + // used to receive the latest value of the event. + fn subscribe(&self) -> EventReceiver { + EventReceiver { + inner: self.inner.subscribe(), + is_init: self.is_init.clone(), + } + } + + // Sends the event to all the subscribers. + fn send(&self, event: T) { + self.inner.send_replace(Some(event)); + // after sending the first value, we resolve the is_init token to signal that the sender + // side of the watch is initialized. The receiver side can now start receiving valid + // values. + self.is_init.cancel(); + } +} + +/// `EventBusSubscription` contains [`EventReceiver`] of various events that can be subscribed. +/// It can be cloned by various components in the sequencer app to receive events. +#[derive(Clone)] +pub(crate) struct EventBusSubscription { + process_proposal_blocks: EventReceiver>, + finalized_blocks: EventReceiver>, +} + +impl EventBusSubscription { + pub(crate) fn process_proposal_blocks(&mut self) -> EventReceiver> { + self.process_proposal_blocks.clone() + } + + pub(crate) fn finalized_blocks(&mut self) -> EventReceiver> { + self.finalized_blocks.clone() + } +} + +/// The Sequencer `EventBus` is used to send and receive events between different components of the +/// sequencer. Components of Sequencer can subscribe to the `EventBus` via the `subscribe` method +/// which returns a [`EventBusSubscription`] objects that contains receivers of various events which +/// are of type [`EventReceiver`]. +/// +/// The `EventBus` is implemented using [`tokio::sync::watch`] which allows for multiple receivers +/// to receive the event at any given time. +pub(super) struct EventBus { + // Sends a process proposal block event to the subscribers. The event is sent in the form of a + // sequencer block which is created during the process proposal block phase. + process_proposal_block_sender: EventSender>, + // Sends a finalized block event to the subscribers. The event is sent in the form of the + // finalize block abci request. + finalized_block_sender: EventSender>, +} + +impl EventBus { + pub(super) fn new() -> Self { + let process_proposal_block_sender = EventSender::new(); + let finalized_block_sender = EventSender::new(); + + Self { + process_proposal_block_sender, + finalized_block_sender, + } + } + + // Returns a `EventBusSubscription` object that contains receivers of various events that can + // be subscribed to. + pub(crate) fn subscribe(&self) -> EventBusSubscription { + EventBusSubscription { + process_proposal_blocks: self.process_proposal_block_sender.subscribe(), + finalized_blocks: self.finalized_block_sender.subscribe(), + } + } + + // Sends a process proposal block event to the subscribers. + pub(super) fn send_process_proposal_block(&self, sequencer_block: Arc) { + self.process_proposal_block_sender.send(sequencer_block); + } + + // Sends a finalized block event to the subscribers. + pub(super) fn send_finalized_block(&self, sequencer_block_commit: Arc) { + self.finalized_block_sender.send(sequencer_block_commit); + } +} diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 9a5064ff60..79f49ac10a 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -2,6 +2,7 @@ pub(crate) mod benchmark_and_test_utils; #[cfg(feature = "benchmark")] mod benchmarks; +pub(crate) mod event_bus; mod state_ext; pub(crate) mod storage; #[cfg(test)] @@ -97,6 +98,10 @@ use crate::{ ActionHandler as _, }, address::StateWriteExt as _, + app::event_bus::{ + EventBus, + EventBusSubscription, + }, assets::StateWriteExt as _, authority::{ component::{ @@ -233,6 +238,9 @@ pub(crate) struct App { )] app_hash: AppHash, + // the sequencer event bus, used to send and receive events between components within the app + event_bus: EventBus, + metrics: &'static Metrics, } @@ -259,6 +267,8 @@ impl App { // there should be no unexpected copies elsewhere. let state = Arc::new(StateDelta::new(snapshot)); + let event_bus = EventBus::new(); + Ok(Self { state, mempool, @@ -267,10 +277,15 @@ impl App { recost_mempool: false, write_batch: None, app_hash, + event_bus, metrics, }) } + pub(crate) fn subscribe_to_events(&self) -> EventBusSubscription { + self.event_bus.subscribe() + } + #[instrument(name = "App:init_chain", skip_all, err)] pub(crate) async fn init_chain( &mut self, @@ -435,16 +450,21 @@ impl App { bail!("execution results must be present after executing transactions") }; - self.post_execute_transactions( - process_proposal.hash, - process_proposal.height, - process_proposal.time, - process_proposal.proposer_address, - process_proposal.txs, - tx_results, - ) - .await - .wrap_err("failed to run post execute transactions handler")?; + // FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835 + let sequencer_block = self + .post_execute_transactions( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + + self.event_bus + .send_process_proposal_block(Arc::new(sequencer_block)); return Ok(()); } @@ -535,16 +555,22 @@ impl App { ); self.executed_proposal_hash = process_proposal.hash; - self.post_execute_transactions( - process_proposal.hash, - process_proposal.height, - process_proposal.time, - process_proposal.proposer_address, - process_proposal.txs, - tx_results, - ) - .await - .wrap_err("failed to run post execute transactions handler")?; + + // FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835 + let sequencer_block = self + .post_execute_transactions( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + + self.event_bus + .send_process_proposal_block(Arc::new(sequencer_block)); Ok(()) } @@ -752,6 +778,7 @@ impl App { /// `SequencerBlock`. /// /// this must be called after a block's transactions are executed. + /// FIXME: don't return sequencer block but grab the block from state delta https://github.com/astriaorg/astria/issues/1436 #[instrument(name = "App::post_execute_transactions", skip_all, err(level = Level::WARN))] async fn post_execute_transactions( &mut self, @@ -761,7 +788,7 @@ impl App { proposer_address: account::Id, txs: Vec, tx_results: Vec, - ) -> Result<()> { + ) -> Result { let Hash::Sha256(block_hash) = block_hash else { bail!("block hash is empty; this should not occur") }; @@ -800,6 +827,7 @@ impl App { finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); finalize_block_tx_results.extend(tx_results); + // FIXME - avoid duplicate calls to post_execute_transactions. refer to: https://github.com/astriaorg/astria/issues/1835 let sequencer_block = SequencerBlock::try_from_block_info_and_data( block_hash, chain_id, @@ -811,7 +839,7 @@ impl App { ) .wrap_err("failed to convert block info and data to SequencerBlock")?; state_tx - .put_sequencer_block(sequencer_block) + .put_sequencer_block(sequencer_block.clone()) .wrap_err("failed to write sequencer block to state")?; let result = PostTransactionExecutionResult { @@ -827,7 +855,7 @@ impl App { // there should be none anyways. let _ = self.apply(state_tx); - Ok(()) + Ok(sequencer_block) } /// Executes the given block, but does not write it to disk. @@ -854,18 +882,20 @@ impl App { rollup IDs commitment" ); + // FIXME: refactor to avoid cloning the finalize block + let finalize_block_arc = Arc::new(finalize_block.clone()); + // When the hash is not empty, we have already executed and cached the results if self.executed_proposal_hash.is_empty() { // convert tendermint id to astria address; this assumes they are // the same address, as they are both ed25519 keys let proposer_address = finalize_block.proposer_address; - let height = finalize_block.height; let time = finalize_block.time; // we haven't executed anything yet, so set up the state for execution. let block_data = BlockData { misbehavior: finalize_block.misbehavior, - height, + height: finalize_block.height, time, next_validators_hash: finalize_block.next_validators_hash, proposer_address, @@ -909,7 +939,7 @@ impl App { self.post_execute_transactions( finalize_block.hash, - height, + finalize_block.height, time, proposer_address, finalize_block.txs, @@ -939,7 +969,7 @@ impl App { .prepare_commit(storage) .await .wrap_err("failed to prepare commit")?; - let finalize_block = abci::response::FinalizeBlock { + let finalize_block_response = abci::response::FinalizeBlock { events: post_transaction_execution_result.events, validator_updates: post_transaction_execution_result.validator_updates, consensus_param_updates: post_transaction_execution_result.consensus_param_updates, @@ -947,7 +977,9 @@ impl App { tx_results: post_transaction_execution_result.tx_results, }; - Ok(finalize_block) + self.event_bus.send_finalized_block(finalize_block_arc); + + Ok(finalize_block_response) } #[instrument(skip_all, err(level = Level::WARN))] diff --git a/crates/astria-sequencer/src/config.rs b/crates/astria-sequencer/src/config.rs index 00db5f637b..27c349230c 100644 --- a/crates/astria-sequencer/src/config.rs +++ b/crates/astria-sequencer/src/config.rs @@ -32,6 +32,8 @@ pub struct Config { pub pretty_print: bool, /// The maximum number of transactions that can be parked in the mempool. pub mempool_parked_max_tx_count: usize, + /// Disables streaming optimistic blocks over grpc. + pub no_optimistic_blocks: bool, } impl config::Config for Config { diff --git a/crates/astria-sequencer/src/grpc/mod.rs b/crates/astria-sequencer/src/grpc/mod.rs index 2de987dd92..a81d9dec6c 100644 --- a/crates/astria-sequencer/src/grpc/mod.rs +++ b/crates/astria-sequencer/src/grpc/mod.rs @@ -1,8 +1,144 @@ +pub(crate) mod optimistic; pub(crate) mod sequencer; mod state_ext; pub(crate) mod storage; +use std::time::Duration; + +use astria_core::generated::astria::sequencerblock::v1::sequencer_service_server::SequencerServiceServer; +use astria_eyre::eyre::WrapErr as _; pub(crate) use state_ext::{ StateReadExt, StateWriteExt, }; +use tokio::{ + sync::oneshot, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{ + error, + info, + info_span, + warn, +}; + +use crate::{ + app::event_bus::EventBusSubscription, + grpc::sequencer::SequencerServer, + ibc::host_interface::AstriaHost, + mempool::Mempool, +}; + +// we provide a shutdown time mainly for the optimistic block service tasks to shutdown +// gracefully +const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(1500); +const SHUTDOWN_SPAN: &str = "grpc_server_shutdown"; + +pub(crate) fn start_server( + storage: &cnidarium::Storage, + mempool: Mempool, + grpc_addr: std::net::SocketAddr, + no_optimistic_blocks: bool, + event_bus_subscription: EventBusSubscription, + shutdown_rx: oneshot::Receiver<()>, +) -> JoinHandle> { + use ibc_proto::ibc::core::{ + channel::v1::query_server::QueryServer as ChannelQueryServer, + client::v1::query_server::QueryServer as ClientQueryServer, + connection::v1::query_server::QueryServer as ConnectionQueryServer, + }; + use penumbra_tower_trace::remote_addr; + use tower_http::cors::CorsLayer; + + let ibc = penumbra_ibc::component::rpc::IbcQuery::::new(storage.clone()); + let sequencer_api = SequencerServer::new(storage.clone(), mempool); + let cors_layer: CorsLayer = CorsLayer::permissive(); + + let optimistic_streams_cancellation_token = CancellationToken::new(); + + let (optimistic_block_service, mut optimistic_block_task) = if no_optimistic_blocks { + (None, None) + } else { + let (optimistic_block_service, optimistic_block_task) = optimistic::new_service( + event_bus_subscription, + optimistic_streams_cancellation_token.child_token(), + ); + + (Some(optimistic_block_service), Some(optimistic_block_task)) + }; + + // TODO: setup HTTPS? + let grpc_server = tonic::transport::Server::builder() + .trace_fn(|req| { + if let Some(remote_addr) = remote_addr(req) { + let addr = remote_addr.to_string(); + tracing::error_span!("grpc", addr) + } else { + tracing::error_span!("grpc") + } + }) + // (from Penumbra) Allow HTTP/1, which will be used by grpc-web connections. + // This is particularly important when running locally, as gRPC + // typically uses HTTP/2, which requires HTTPS. Accepting HTTP/2 + // allows local applications such as web browsers to talk to pd. + .accept_http1(true) + // (from Penumbra) Add permissive CORS headers, so pd's gRPC services are accessible + // from arbitrary web contexts, including from localhost. + .layer(cors_layer) + .add_service(ClientQueryServer::new(ibc.clone())) + .add_service(ChannelQueryServer::new(ibc.clone())) + .add_service(ConnectionQueryServer::new(ibc.clone())) + .add_service(SequencerServiceServer::new(sequencer_api)) + .add_optional_service(optimistic_block_service); + + info!(grpc_addr = grpc_addr.to_string(), "starting grpc server"); + + tokio::task::spawn(grpc_server.serve_with_shutdown(grpc_addr, async move { + let reason = tokio::select! { + biased; + _ = shutdown_rx => { + Ok("grpc server shutting down") + }, + res = async { optimistic_block_task.as_mut().unwrap().await }, if optimistic_block_task.is_some() => { + match res { + Ok(()) => { + Ok("optimistic block inner handle task exited successfully") + }, + Err(e) => { + Err(e).wrap_err("optimistic block inner handle task exited with error") + } + } + } + }; + optimistic_streams_cancellation_token.cancel(); + + if optimistic_block_task.is_some() { + // give time for the optimistic block service to shutdown all the streaming tasks. + let handle = optimistic_block_task.as_mut().unwrap(); + match tokio::time::timeout(SHUTDOWN_TIMEOUT, &mut *handle).await { + Ok(Ok(())) => { + info!("optimistic block service task shutdown gracefully"); + }, + Ok(Err(e)) => { + warn!(%e, "optimistic block service has panicked"); + } + Err(e) => { + error!(%e, "optimistic block service task didn't shutdown in time"); + handle.abort(); + } + } + } + + info_span!(SHUTDOWN_SPAN).in_scope(|| { + match reason { + Ok(reason) => { + info!(reason); + } + Err(reason) => { + warn!("{}", reason.to_string()); + } + }; + }); + })) +} diff --git a/crates/astria-sequencer/src/grpc/optimistic.rs b/crates/astria-sequencer/src/grpc/optimistic.rs new file mode 100644 index 0000000000..d4ce3f2782 --- /dev/null +++ b/crates/astria-sequencer/src/grpc/optimistic.rs @@ -0,0 +1,419 @@ +use std::{ + pin::Pin, + sync::Arc, + time::Duration, +}; + +use astria_core::{ + generated::astria::sequencerblock::optimistic::v1alpha1::{ + optimistic_block_service_server::{ + OptimisticBlockService, + OptimisticBlockServiceServer, + }, + GetBlockCommitmentStreamRequest, + GetBlockCommitmentStreamResponse, + GetOptimisticBlockStreamRequest, + GetOptimisticBlockStreamResponse, + }, + primitive::v1::RollupId, + sequencerblock::v1::{ + optimistic::SequencerBlockCommit, + SequencerBlock, + }, + Protobuf, +}; +use astria_eyre::{ + eyre, + eyre::WrapErr as _, +}; +use tendermint::{ + abci::request::FinalizeBlock, + Hash, +}; +use tokio::{ + sync::mpsc, + task::{ + JoinHandle, + JoinSet, + }, +}; +use tokio_util::sync::CancellationToken; +use tonic::{ + codegen::tokio_stream::{ + wrappers::ReceiverStream, + Stream, + }, + Request, + Response, + Status, +}; +use tracing::{ + error, + info, + info_span, + instrument, + trace, + warn, +}; + +use crate::app::event_bus::{ + EventBusSubscription, + EventReceiver, +}; + +const STREAM_TASKS_SHUTDOWN_DURATION: Duration = Duration::from_secs(1); +const OPTIMISTIC_STREAM_SPAN: &str = "optimistic_stream"; +const BLOCK_COMMITMENT_STREAM_SPAN: &str = "block_commitment_stream"; + +type GrpcStream = Pin> + Send>>; + +pub(super) fn new_service( + event_bus_subscription: EventBusSubscription, + cancellation_token: CancellationToken, +) -> ( + OptimisticBlockServiceServer, + JoinHandle<()>, +) { + let (tx, rx) = mpsc::channel(128); + + let facade = OptimisticBlockFacade::new(tx); + let inner = OptimisticBlockStreamRunner::new(event_bus_subscription, rx, cancellation_token); + + let inner_task = tokio::spawn(inner.run()); + let server = OptimisticBlockServiceServer::new(facade); + + (server, inner_task) +} + +struct StartOptimisticBlockStreamRequest { + rollup_id: RollupId, + tx: mpsc::Sender>, +} + +struct StartBlockCommitmentStreamRequest { + tx: mpsc::Sender>, +} + +enum NewStreamRequest { + OptimisticBlockStream(StartOptimisticBlockStreamRequest), + BlockCommitmentStream(StartBlockCommitmentStreamRequest), +} + +pub(super) struct OptimisticBlockStreamRunner { + event_bus_subscription: EventBusSubscription, + stream_request_receiver: mpsc::Receiver, + stream_tasks: JoinSet>, + cancellation_token: CancellationToken, +} + +impl OptimisticBlockStreamRunner { + fn new( + event_bus_subscription: EventBusSubscription, + stream_request_receiver: mpsc::Receiver, + cancellation_token: CancellationToken, + ) -> Self { + Self { + event_bus_subscription, + stream_request_receiver, + stream_tasks: JoinSet::new(), + cancellation_token, + } + } + + fn handle_optimistic_block_stream_request( + &mut self, + request: StartOptimisticBlockStreamRequest, + ) { + let StartOptimisticBlockStreamRequest { + rollup_id, + tx, + } = request; + + self.stream_tasks.spawn(optimistic_stream( + self.event_bus_subscription.process_proposal_blocks(), + rollup_id, + tx, + self.cancellation_token.child_token(), + )); + } + + fn handle_block_commitment_stream_request( + &mut self, + request: StartBlockCommitmentStreamRequest, + ) { + let StartBlockCommitmentStreamRequest { + tx, + } = request; + + self.stream_tasks.spawn(block_commitment_stream( + self.event_bus_subscription.finalized_blocks(), + tx, + self.cancellation_token.child_token(), + )); + } + + pub(super) async fn run(mut self) { + loop { + tokio::select! { + biased; + () = self.cancellation_token.cancelled() => { + break; + }, + Some(inner_stream_request) = self.stream_request_receiver.recv() => { + match inner_stream_request { + NewStreamRequest::OptimisticBlockStream(request) => { + self.handle_optimistic_block_stream_request(request); + } + NewStreamRequest::BlockCommitmentStream(request) => { + self.handle_block_commitment_stream_request(request); + } + } + }, + Some(joined_task) = self.stream_tasks.join_next() => { + match joined_task { + Ok(Ok(())) => { + trace!("stream task has been joined successfully"); + }, + Ok(Err(error)) => { + warn!(%error, "stream task has been joined with an error"); + }, + Err(error) => { + warn!(%error, "stream task has panicked"); + } + } + } + } + } + + self.shutdown().await; + } + + #[instrument(skip_all)] + async fn shutdown(&mut self) { + match tokio::time::timeout(STREAM_TASKS_SHUTDOWN_DURATION, async { + while let Some(joined_tasks) = self.stream_tasks.join_next().await { + match joined_tasks { + Ok(Ok(())) => { + trace!("stream task has been joined successfully"); + } + Ok(Err(error)) => { + warn!(%error, "stream task has been joined with an error"); + } + Err(error) => { + warn!(%error, "stream task has panicked"); + } + } + } + }) + .await + { + Ok(()) => { + info!("all stream tasks have been joined successfully"); + } + Err(error) => { + error!(%error, "stream tasks failed to shut down in time"); + self.stream_tasks.abort_all(); + } + } + } +} + +pub(super) struct OptimisticBlockFacade { + stream_request_sender: mpsc::Sender, +} + +impl OptimisticBlockFacade { + fn new(stream_request_sender: mpsc::Sender) -> Self { + Self { + stream_request_sender, + } + } + + #[instrument(skip_all)] + async fn spawn_optimistic_block_stream( + &self, + get_optimistic_block_stream_request: GetOptimisticBlockStreamRequest, + ) -> tonic::Result>> { + let rollup_id = { + let rollup_id = get_optimistic_block_stream_request + .rollup_id + .ok_or_else(|| Status::invalid_argument("rollup id is required"))?; + + RollupId::try_from_raw(rollup_id) + .map_err(|e| Status::invalid_argument(e.to_string()))? + }; + + let (tx, rx) = + tokio::sync::mpsc::channel::>(128); + + let request = NewStreamRequest::OptimisticBlockStream(StartOptimisticBlockStreamRequest { + rollup_id, + tx, + }); + + self.stream_request_sender + .send(request) + .await + .map_err(|e| { + Status::internal(format!("failed to create optimistic block stream: {e}")) + })?; + + Ok(Response::new( + Box::pin(ReceiverStream::new(rx)) as GrpcStream + )) + } + + #[instrument(skip_all)] + async fn spawn_block_commitment_stream_request( + &self, + ) -> tonic::Result>> { + let (tx, rx) = + tokio::sync::mpsc::channel::>(128); + + let request = NewStreamRequest::BlockCommitmentStream(StartBlockCommitmentStreamRequest { + tx, + }); + + self.stream_request_sender + .send(request) + .await + .map_err(|e| { + Status::internal(format!("failed to create block commitment stream: {e}")) + })?; + + Ok(Response::new( + Box::pin(ReceiverStream::new(rx)) as GrpcStream + )) + } +} + +#[async_trait::async_trait] +impl OptimisticBlockService for OptimisticBlockFacade { + type GetBlockCommitmentStreamStream = GrpcStream; + type GetOptimisticBlockStreamStream = GrpcStream; + + #[instrument(skip_all)] + async fn get_optimistic_block_stream( + self: Arc, + request: Request, + ) -> tonic::Result> { + let get_optimistic_block_stream_request = request.into_inner(); + + self.spawn_optimistic_block_stream(get_optimistic_block_stream_request) + .await + } + + #[instrument(skip_all)] + async fn get_block_commitment_stream( + self: Arc, + _request: Request, + ) -> tonic::Result> { + self.spawn_block_commitment_stream_request().await + } +} + +// the below streams are free standing functions as implementing them as methods on +// OptimisticBlockInner will cause lifetime issues with the self reference. This is because the +// Joinset requires that the future being spawned should have a static lifetime. +async fn block_commitment_stream( + mut finalized_blocks_receiver: EventReceiver>, + tx: mpsc::Sender>, + cancellation_token: CancellationToken, +) -> Result<(), eyre::Report> { + // mark the current value in the event receiver as seen so that we can start streaming + // the next new block commitment to the subscriber + finalized_blocks_receiver.mark_latest_event_as_seen(); + + loop { + tokio::select! { + biased; + () = cancellation_token.cancelled() => { + break Ok(()); + } + finalized_block_res = finalized_blocks_receiver.receive() => { + match finalized_block_res { + Ok(finalized_block) => { + let res = info_span!(BLOCK_COMMITMENT_STREAM_SPAN).in_scope(|| { + let Hash::Sha256(block_hash) = finalized_block.hash else { + warn!("block hash is empty; this should not occur"); + return Ok(()); + }; + + let sequencer_block_commit = SequencerBlockCommit::new(finalized_block.height.value(), block_hash); + + let get_block_commitment_stream_response = GetBlockCommitmentStreamResponse { + commitment: Some(sequencer_block_commit.to_raw()), + }; + + if let Err(error) = tx.try_send(Ok(get_block_commitment_stream_response)) { + error!(%error, "forwarding block commitment stream to client failed"); + return Err(error).wrap_err("forwarding block commitment stream to client failed"); + }; + trace!("forwarded block commitment stream to client"); + + Ok(()) + }); + + if let Err(e) = res { + break Err(e); + } + }, + Err(e) => { + break Err(e).wrap_err("finalized block sender has been dropped with error") + } + } + }, + } + } +} + +async fn optimistic_stream( + mut process_proposal_blocks_receiver: EventReceiver>, + rollup_id: RollupId, + tx: mpsc::Sender>, + cancellation_token: CancellationToken, +) -> Result<(), eyre::Report> { + // mark the current value in the event receiver as seen so that we can start streaming + // the next new optimistic block to the subscriber + process_proposal_blocks_receiver.mark_latest_event_as_seen(); + + loop { + tokio::select! { + biased; + () = cancellation_token.cancelled() => { + break Ok(()); + } + process_proposal_block_res = process_proposal_blocks_receiver.receive() => { + match process_proposal_block_res { + Ok(process_proposal_block) => { + let res = info_span!(OPTIMISTIC_STREAM_SPAN).in_scope(|| { + let filtered_optimistic_block = process_proposal_block + .to_filtered_block(vec![rollup_id]); + let raw_filtered_optimistic_block = filtered_optimistic_block.into_raw(); + + let get_optimistic_block_stream_response = GetOptimisticBlockStreamResponse { + block: Some(raw_filtered_optimistic_block), + }; + + if let Err(error) = tx.try_send(Ok(get_optimistic_block_stream_response)) { + error!(%error, "forwarding optimistic block stream to client failed"); + return Err(error).wrap_err("forwarding optimistic block stream to client failed") + } + trace!("forwarded optimistic block stream to client"); + + Ok(()) + }); + + if let Err(e) = res { + break Err(e); + } + + }, + Err(e) => { + break Err(e).wrap_err("process proposal block sender has been dropped with error") + } + } + }, + } + } +} diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 92df49d470..39787b58e1 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -1,4 +1,3 @@ -use astria_core::generated::astria::sequencerblock::v1::sequencer_service_server::SequencerServiceServer; use astria_eyre::{ anyhow_to_eyre, eyre::{ @@ -20,11 +19,7 @@ use tokio::{ signal, SignalKind, }, - sync::{ - oneshot, - watch, - }, - task::JoinHandle, + sync::watch, }; use tower_abci::v038::Server; use tracing::{ @@ -37,8 +32,6 @@ use tracing::{ use crate::{ app::App, config::Config, - grpc::sequencer::SequencerServer, - ibc::host_interface::AstriaHost, mempool::Mempool, metrics::Metrics, service, @@ -91,10 +84,13 @@ impl Sequencer { let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(metrics, config.mempool_parked_max_tx_count); + let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; + let event_bus_subscription = app.subscribe_to_events(); + let consensus_service = tower::ServiceBuilder::new() .layer(request_span::layer(|req: &ConsensusRequest| { req.create_span() @@ -123,7 +119,14 @@ impl Sequencer { .grpc_addr .parse() .wrap_err("failed to parse grpc_addr address")?; - let grpc_server_handle = start_grpc_server(&storage, mempool, grpc_addr, shutdown_rx); + let grpc_server_handle = crate::grpc::start_server( + &storage, + mempool, + grpc_addr, + config.no_optimistic_blocks, + event_bus_subscription, + shutdown_rx, + ); span.in_scope(|| info!(config.listen_addr, "starting sequencer")); let server_handle = tokio::spawn(async move { @@ -162,54 +165,6 @@ impl Sequencer { } } -fn start_grpc_server( - storage: &cnidarium::Storage, - mempool: Mempool, - grpc_addr: std::net::SocketAddr, - shutdown_rx: oneshot::Receiver<()>, -) -> JoinHandle> { - use futures::TryFutureExt as _; - use ibc_proto::ibc::core::{ - channel::v1::query_server::QueryServer as ChannelQueryServer, - client::v1::query_server::QueryServer as ClientQueryServer, - connection::v1::query_server::QueryServer as ConnectionQueryServer, - }; - use penumbra_tower_trace::remote_addr; - use tower_http::cors::CorsLayer; - - let ibc = penumbra_ibc::component::rpc::IbcQuery::::new(storage.clone()); - let sequencer_api = SequencerServer::new(storage.clone(), mempool); - let cors_layer: CorsLayer = CorsLayer::permissive(); - - // TODO: setup HTTPS? - let grpc_server = tonic::transport::Server::builder() - .trace_fn(|req| { - if let Some(remote_addr) = remote_addr(req) { - let addr = remote_addr.to_string(); - error_span!("grpc", addr) - } else { - error_span!("grpc") - } - }) - // (from Penumbra) Allow HTTP/1, which will be used by grpc-web connections. - // This is particularly important when running locally, as gRPC - // typically uses HTTP/2, which requires HTTPS. Accepting HTTP/2 - // allows local applications such as web browsers to talk to pd. - .accept_http1(true) - // (from Penumbra) Add permissive CORS headers, so pd's gRPC services are accessible - // from arbitrary web contexts, including from localhost. - .layer(cors_layer) - .add_service(ClientQueryServer::new(ibc.clone())) - .add_service(ChannelQueryServer::new(ibc.clone())) - .add_service(ConnectionQueryServer::new(ibc.clone())) - .add_service(SequencerServiceServer::new(sequencer_api)); - - info!(grpc_addr = grpc_addr.to_string(), "starting grpc server"); - tokio::task::spawn( - grpc_server.serve_with_shutdown(grpc_addr, shutdown_rx.unwrap_or_else(|_| ())), - ) -} - struct SignalReceiver { stop_rx: watch::Receiver<()>, } diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index f6d47b2f6b..56bf80bcf4 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -44,6 +44,7 @@ use tendermint::{ use tower::Service; use tower_abci::BoxError; use tracing::{ + info, instrument, Instrument as _, }; @@ -228,28 +229,33 @@ async fn handle_check_tx( ) -> response::CheckTx { use sha2::Digest as _; + info!("BHARATH: handle_check_tx"); let request::CheckTx { tx, .. } = req; let tx_hash = sha2::Sha256::digest(&tx).into(); + info!("BHARATH: check if tx is removed from appside mempool"); // check if the transaction has been removed from the appside mempool if let Err(rsp) = check_removed_comet_bft(tx_hash, mempool, metrics).await { return rsp; } + info!("BHARATH: check if tx is already in the mempool"); // check if the transaction is already in the mempool if is_tracked(tx_hash, mempool, metrics).await { return response::CheckTx::default(); } + info!("BHARATH: perform stateless checks"); // perform stateless checks let signed_tx = match stateless_checks(tx, &state, metrics).await { Ok(signed_tx) => signed_tx, Err(rsp) => return rsp, }; + info!("BHARATH: attempt to insert the transaction into the mempool"); // attempt to insert the transaction into the mempool if let Err(rsp) = insert_into_mempool(mempool, &state, signed_tx, metrics).await { return rsp; @@ -258,6 +264,7 @@ async fn handle_check_tx( // insertion successful metrics.set_transactions_in_mempool_total(mempool.len().await); + info!("BHARATH: transaction successfully inserted into the mempool"); response::CheckTx::default() }