Skip to content

Commit

Permalink
add optimistic block APIs to sequencer
Browse files Browse the repository at this point in the history
Co-authored-by: Bharath <[email protected]>
  • Loading branch information
SuperFluffy and bharath-123 committed Jan 8, 2025
1 parent 5594ba3 commit b213ddb
Show file tree
Hide file tree
Showing 15 changed files with 887 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion charts/sequencer/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ version: 1.0.1
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.0.0"
appVersion: "1.0.1"

dependencies:
- name: sequencer-relayer
Expand Down
1 change: 1 addition & 0 deletions charts/sequencer/templates/configmaps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.sequencer.otel.serviceName . }}"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS: "{{ not .Values.sequencer.optimisticBlockApis.enabled }}"
{{- end }}
---
2 changes: 2 additions & 0 deletions charts/sequencer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ sequencer:
tracesTimeout: 10
otlpHeaders:
traceHeaders:
optimisticBlockApis:
enabled: false

cometbft:
config:
Expand Down
1 change: 1 addition & 0 deletions crates/astria-core/src/sequencerblock/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod block;
pub mod celestia;
pub mod optimistic;

pub use block::{
RollupTransactions,
Expand Down
83 changes: 83 additions & 0 deletions crates/astria-core/src/sequencerblock/v1/optimistic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use bytes::Bytes;

use crate::{
generated::astria::sequencerblock::optimistic::v1alpha1 as raw,
Protobuf,
};

#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct SequencerBlockCommitError(SequencerBlockCommitErrorKind);

impl SequencerBlockCommitError {
fn invalid_block_hash(len: usize) -> Self {
Self(SequencerBlockCommitErrorKind::InvalidBlockHash(len))
}
}

#[derive(Debug, thiserror::Error)]
enum SequencerBlockCommitErrorKind {
#[error("invalid block hash length: {0}")]
InvalidBlockHash(usize),
}

#[derive(Clone, Debug)]
pub struct SequencerBlockCommit {
height: u64,
block_hash: [u8; 32],
}

impl SequencerBlockCommit {
#[must_use]
pub fn new(height: u64, block_hash: [u8; 32]) -> Self {
Self {
height,
block_hash,
}
}

#[must_use]
pub fn height(&self) -> u64 {
self.height
}

#[must_use]
pub fn block_hash(&self) -> &[u8; 32] {
&self.block_hash
}
}

impl From<SequencerBlockCommit> for raw::SequencerBlockCommit {
fn from(value: SequencerBlockCommit) -> Self {
value.to_raw()
}
}

impl Protobuf for SequencerBlockCommit {
type Error = SequencerBlockCommitError;
type Raw = raw::SequencerBlockCommit;

fn try_from_raw_ref(raw: &Self::Raw) -> Result<Self, Self::Error> {
let Self::Raw {
height,
block_hash,
} = raw;

let block_hash = block_hash
.as_ref()
.try_into()
.map_err(|_| SequencerBlockCommitError::invalid_block_hash(block_hash.len()))?;

Ok(SequencerBlockCommit {
height: *height,
block_hash,
})
}

fn to_raw(&self) -> Self::Raw {
raw::SequencerBlockCommit {
height: self.height(),
block_hash: Bytes::copy_from_slice(self.block_hash()),
}
}
}
1 change: 1 addition & 0 deletions crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ tendermint-proto = { workspace = true }
tendermint = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "tracing"] }
tokio-util = { workspace = true, features = ["rt"] }
tonic = { workspace = true }
tracing = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions crates/astria-sequencer/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ASTRIA_SEQUENCER_METRICS_HTTP_LISTENER_ADDR="127.0.0.1:9000"
# `ASTRIA_SEQUENCER_FORCE_STDOUT` is set to `true`.
ASTRIA_SEQUENCER_PRETTY_PRINT=false

# Disables streaming optimistic blocks to clients.
ASTRIA_SEQUENCER_NO_OPTIMISTIC_BLOCKS=false

# If set to any non-empty value removes ANSI escape characters from the pretty
# printed output. Note that this does nothing unless `ASTRIA_SEQUENCER_PRETTY_PRINT`
# is set to `true`.
Expand Down
158 changes: 158 additions & 0 deletions crates/astria-sequencer/src/app/event_bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use astria_core::sequencerblock::v1::SequencerBlock;
use astria_eyre::eyre::WrapErr as _;
use tendermint::abci::request::FinalizeBlock;
use tokio::sync::watch::{
Receiver,
Sender,
};
use tokio_util::sync::CancellationToken;

/// `EventReceiver` contains the receiver side of the events sent by the Sequencer App.
/// The listeners of the events can receive the latest value of the event by calling the
/// `receive` method.
#[derive(Clone)]
pub(crate) struct EventReceiver<T> {
// The receiver side of the watch which is read for the latest value of the event.
// We receive an Option over T because the sender side of the watch is designed to send
// Option values. This allows the sender value to send objects which do not have a `Default`
// implementation.
inner: Receiver<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventReceiver<T>
where
T: Clone,
{
// Marks the current message in the receiver end of the watch as seen.
// This is useful in situations where we want to ignore the current value of the watch
// and wait for the next value.
pub(crate) fn mark_latest_event_as_seen(&mut self) {
self.inner.mark_unchanged();
}

// Returns the latest value of the event, waiting for the value to change if it hasn't already.
pub(crate) async fn receive(&mut self) -> astria_eyre::Result<T> {
// This will get resolved when the sender side of the watch is initialized by sending
// the first value. We wait till the sender side of the watch has initialized the watch.
// Once the sender side has been initialized, it will get resolved immediately.
self.is_init.cancelled().await;
// We want to only receive the latest value through the receiver, so we wait for the
// current value in the watch to change before we return it.
self.inner
.changed()
.await
.wrap_err("error waiting for latest event")?;
Ok(self.inner.borrow_and_update().clone().expect(
"events must be set after is_init is triggered; this means an invariant was violated",
))
}
}

/// `EventSender` contains the sender side of the events sent by the Sequencer App.
/// At any given time, it sends the latest value of the event.
struct EventSender<T> {
// A watch channel that is always starts unset. Once set, the `is_init` token is cancelled
// and value in the channel will never be unset.
inner: Sender<Option<T>>,
// To signal subscribers that the event bus is initialized, i.e. that the value in `sender` was
// set.
is_init: CancellationToken,
}

impl<T> EventSender<T> {
fn new() -> Self {
let (sender, _) = tokio::sync::watch::channel(None);
Self {
inner: sender,
is_init: CancellationToken::new(),
}
}

// Returns a `EventReceiver` object that contains the receiver side of the watch which can be
// used to receive the latest value of the event.
fn subscribe(&self) -> EventReceiver<T> {
EventReceiver {
inner: self.inner.subscribe(),
is_init: self.is_init.clone(),
}
}

// Sends the event to all the subscribers.
fn send(&self, event: T) {
self.inner.send_replace(Some(event));
// after sending the first value, we resolve the is_init token to signal that the sender
// side of the watch is initialized. The receiver side can now start receiving valid
// values.
self.is_init.cancel();
}
}

/// `EventBusSubscription` contains [`EventReceiver`] of various events that can be subscribed.
/// It can be cloned by various components in the sequencer app to receive events.
#[derive(Clone)]
pub(crate) struct EventBusSubscription {
process_proposal_blocks: EventReceiver<Arc<SequencerBlock>>,
finalized_blocks: EventReceiver<Arc<FinalizeBlock>>,
}

impl EventBusSubscription {
pub(crate) fn process_proposal_blocks(&mut self) -> EventReceiver<Arc<SequencerBlock>> {
self.process_proposal_blocks.clone()
}

pub(crate) fn finalized_blocks(&mut self) -> EventReceiver<Arc<FinalizeBlock>> {
self.finalized_blocks.clone()
}
}

/// The Sequencer `EventBus` is used to send and receive events between different components of the
/// sequencer. Components of Sequencer can subscribe to the `EventBus` via the `subscribe` method
/// which returns a [`EventBusSubscription`] objects that contains receivers of various events which
/// are of type [`EventReceiver`].
///
/// The `EventBus` is implemented using [`tokio::sync::watch`] which allows for multiple receivers
/// to receive the event at any given time.
pub(super) struct EventBus {
// Sends a process proposal block event to the subscribers. The event is sent in the form of a
// sequencer block which is created during the process proposal block phase.
process_proposal_block_sender: EventSender<Arc<SequencerBlock>>,
// Sends a finalized block event to the subscribers. The event is sent in the form of the
// finalize block abci request.
finalized_block_sender: EventSender<Arc<FinalizeBlock>>,
}

impl EventBus {
pub(super) fn new() -> Self {
let process_proposal_block_sender = EventSender::new();
let finalized_block_sender = EventSender::new();

Self {
process_proposal_block_sender,
finalized_block_sender,
}
}

// Returns a `EventBusSubscription` object that contains receivers of various events that can
// be subscribed to.
pub(crate) fn subscribe(&self) -> EventBusSubscription {
EventBusSubscription {
process_proposal_blocks: self.process_proposal_block_sender.subscribe(),
finalized_blocks: self.finalized_block_sender.subscribe(),
}
}

// Sends a process proposal block event to the subscribers.
pub(super) fn send_process_proposal_block(&self, sequencer_block: Arc<SequencerBlock>) {
self.process_proposal_block_sender.send(sequencer_block);
}

// Sends a finalized block event to the subscribers.
pub(super) fn send_finalized_block(&self, sequencer_block_commit: Arc<FinalizeBlock>) {
self.finalized_block_sender.send(sequencer_block_commit);
}
}
Loading

0 comments on commit b213ddb

Please sign in to comment.