Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(bridge-withdrawer): Add instrumentation #1324

Merged
merged 6 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/astria-bridge-withdrawer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use http::status::StatusCode;
use hyper::server::conn::AddrIncoming;
use serde::Serialize;
use tokio::sync::watch;
use tracing::instrument;

use crate::bridge_withdrawer::StateSnapshot;

Expand Down Expand Up @@ -51,6 +52,7 @@ pub(crate) fn start(socket_addr: SocketAddr, withdrawer_state: WithdrawerState)
}

#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_healthz(State(withdrawer_state): State<WithdrawerState>) -> Healthz {
if withdrawer_state.borrow().is_healthy() {
Healthz::Ok
Expand All @@ -66,6 +68,7 @@ async fn get_healthz(State(withdrawer_state): State<WithdrawerState>) -> Healthz
/// + there is a current sequencer height (implying a block from sequencer was received)
/// + there is a current data availability height (implying a height was received from the DA)
#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_readyz(State(withdrawer_state): State<WithdrawerState>) -> Readyz {
let is_withdrawer_online = withdrawer_state.borrow().is_ready();
if is_withdrawer_online {
Expand All @@ -76,6 +79,7 @@ async fn get_readyz(State(withdrawer_state): State<WithdrawerState>) -> Readyz {
}

#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_status(State(withdrawer_state): State<WithdrawerState>) -> Json<StateSnapshot> {
Json(withdrawer_state.borrow().clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
info_span,
instrument,
warn,
};
Expand Down Expand Up @@ -293,11 +294,13 @@ async fn watch_for_blocks(
bail!("current rollup block missing block number")
};

info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);
info_span!("watch_for_blocks").in_scope(|| {
info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);
});

// sync any blocks missing between `next_rollup_block_height` and the current latest
// (inclusive).
Expand All @@ -314,7 +317,7 @@ async fn watch_for_blocks(
loop {
select! {
() = shutdown_token.cancelled() => {
info!("block watcher shutting down");
info_span!("watch_for_blocks").in_scope(|| info!("block watcher shutting down"));
return Ok(());
}
block = block_rx.next() => {
Expand Down
91 changes: 71 additions & 20 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@ use astria_eyre::eyre::{
self,
WrapErr as _,
};
use axum::{
routing::IntoMakeService,
Router,
Server,
};
use ethereum::watcher::Watcher;
use hyper::server::conn::AddrIncoming;
use startup::Startup;
use tokio::{
select,
sync::oneshot,
sync::oneshot::{
self,
Receiver,
},
task::{
JoinError,
JoinHandle,
Expand All @@ -24,6 +35,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{
error,
info,
instrument,
};

pub(crate) use self::state::StateSnapshot;
Expand Down Expand Up @@ -167,34 +179,29 @@ impl BridgeWithdrawer {
// Separate the API shutdown signal from the cancellation token because we want it to live
// until the very end.
let (api_shutdown_signal, api_shutdown_signal_rx) = oneshot::channel::<()>();
let mut api_task = tokio::spawn(async move {
api_server
.with_graceful_shutdown(async move {
let _ = api_shutdown_signal_rx.await;
})
.await
.wrap_err("api server ended unexpectedly")
});
info!("spawned API server");

let mut startup_task = Some(tokio::spawn(startup.run()));
info!("spawned startup task");

let mut submitter_task = tokio::spawn(submitter.run());
info!("spawned submitter task");
let mut ethereum_watcher_task = tokio::spawn(ethereum_watcher.run());
info!("spawned ethereum watcher task");
let TaskHandles {
mut api_task,
mut startup_task,
mut submitter_task,
mut ethereum_watcher_task,
} = spawn_tasks(
api_server,
api_shutdown_signal_rx,
startup,
submitter,
ethereum_watcher,
);

let shutdown = loop {
select!(
o = async { startup_task.as_mut().unwrap().await }, if startup_task.is_none() => {
match o {
Ok(_) => {
info!(task = "startup", "task has exited");
report_exit("startup", Ok(Ok(())));
startup_task = None;
},
Err(error) => {
error!(task = "startup", %error, "task returned with error");
report_exit("startup", Err(error));
break Shutdown {
api_task: Some(api_task),
submitter_task: Some(submitter_task),
Expand Down Expand Up @@ -245,6 +252,47 @@ impl BridgeWithdrawer {
}
}

pub struct TaskHandles {
ethanoroshiba marked this conversation as resolved.
Show resolved Hide resolved
api_task: JoinHandle<eyre::Result<()>>,
startup_task: Option<JoinHandle<eyre::Result<()>>>,
submitter_task: JoinHandle<eyre::Result<()>>,
ethereum_watcher_task: JoinHandle<eyre::Result<()>>,
}

#[instrument(skip_all)]
fn spawn_tasks(
api_server: Server<AddrIncoming, IntoMakeService<Router>>,
api_shutdown_signal_rx: Receiver<()>,
startup: Startup,
submitter: Submitter,
ethereum_watcher: Watcher,
) -> TaskHandles {
let api_task = tokio::spawn(async move {
api_server
.with_graceful_shutdown(async move {
let _ = api_shutdown_signal_rx.await;
})
.await
.wrap_err("api server ended unexpectedly")
});
info!("spawned API server");

let startup_task = Some(tokio::spawn(startup.run()));
info!("spawned startup task");

let submitter_task = tokio::spawn(submitter.run());
info!("spawned submitter task");
let ethereum_watcher_task = tokio::spawn(ethereum_watcher.run());
info!("spawned ethereum watcher task");

TaskHandles {
api_task,
startup_task,
submitter_task,
ethereum_watcher_task,
}
}

/// A handle for instructing the [`Service`] to shut down.
///
/// It is returned along with its related `Service` from [`Service::new`]. The
Expand Down Expand Up @@ -275,6 +323,7 @@ impl ShutdownHandle {
}

impl Drop for ShutdownHandle {
#[instrument(skip_all)]
fn drop(&mut self) {
if !self.token.is_cancelled() {
info!("shutdown handle dropped, issuing shutdown to all services");
Expand All @@ -283,6 +332,7 @@ impl Drop for ShutdownHandle {
}
}

#[instrument(skip_all)]
fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task has exited"),
Expand Down Expand Up @@ -314,6 +364,7 @@ impl Shutdown {
const STARTUP_SHUTDOWN_TIMEOUT_SECONDS: u64 = 1;
const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 19;

#[instrument(skip_all)]
async fn run(self) {
let Self {
api_task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tracing::{
instrument,
warn,
Instrument as _,
Level,
Span,
};
use tryhard::backoff_strategies::ExponentialBackoff;
Expand Down Expand Up @@ -120,6 +121,7 @@ impl InfoHandle {
}
}

#[instrument(skip_all, err)]
pub(super) async fn get_info(&mut self) -> eyre::Result<Info> {
let state = self
.rx
Expand Down Expand Up @@ -202,6 +204,7 @@ impl Startup {
/// - `self.chain_id` does not match the value returned from the sequencer node
/// - `self.fee_asset` is not a valid fee asset on the sequencer node
/// - `self.sequencer_bridge_address` does not have a sufficient balance of `self.fee_asset`.
#[instrument(skip_all, err)]
async fn confirm_sequencer_config(&self) -> eyre::Result<()> {
// confirm the sequencer chain id
let actual_chain_id =
Expand Down Expand Up @@ -250,6 +253,7 @@ impl Startup {
/// in the sequencer logic).
/// 5. Failing to convert the transaction data from bytes to proto.
/// 6. Failing to convert the transaction data from proto to `SignedTransaction`.
#[instrument(skip_all, err)]
async fn get_last_transaction(&self) -> eyre::Result<Option<SignedTransaction>> {
// get last transaction hash by the bridge account, if it exists
let last_transaction_hash_resp = get_bridge_account_last_transaction_hash(
Expand Down Expand Up @@ -323,6 +327,7 @@ impl Startup {
/// the sequencer logic)
/// 3. The last transaction by the bridge account did not contain a withdrawal action
/// 4. The memo of the last transaction by the bridge account could not be parsed
#[instrument(skip_all, err)]
async fn get_starting_rollup_height(&mut self) -> eyre::Result<u64> {
let signed_transaction = self
.get_last_transaction()
Expand All @@ -347,6 +352,7 @@ impl Startup {
}
}

#[instrument(skip_all, err(level = Level::WARN))]
async fn ensure_mempool_empty(
cometbft_client: sequencer_client::HttpClient,
sequencer_client: sequencer_service_client::SequencerServiceClient<Channel>,
Expand Down Expand Up @@ -391,6 +397,7 @@ async fn ensure_mempool_empty(
/// 2. Failing to get the latest nonce from cometBFT's mempool.
/// 3. The pending nonce from the Sequencer's app-side mempool does not match the latest nonce from
/// cometBFT's mempool after the exponential backoff times out.
#[instrument(skip_all, err)]
async fn wait_for_empty_mempool(
cometbft_client: sequencer_client::HttpClient,
sequencer_grpc_endpoint: String,
Expand Down Expand Up @@ -485,7 +492,7 @@ fn rollup_height_from_signed_transaction(
Ok(last_batch_rollup_height)
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_bridge_account_last_transaction_hash(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -507,7 +514,7 @@ async fn get_bridge_account_last_transaction_hash(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_sequencer_transaction_at_hash(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -525,7 +532,7 @@ async fn get_sequencer_transaction_at_hash(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_sequencer_chain_id(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -542,7 +549,7 @@ async fn get_sequencer_chain_id(
Ok(genesis.chain_id)
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_allowed_fee_assets(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -559,7 +566,7 @@ async fn get_allowed_fee_assets(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_latest_nonce(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use astria_eyre::eyre::{
};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{
info,
instrument,
};

use super::state::State;
use crate::{
Expand All @@ -30,6 +33,7 @@ impl Handle {
}
}

#[instrument(skip_all, err)]
pub(crate) async fn send_batch(&self, batch: Batch) -> eyre::Result<()> {
self.batches_tx
.send(batch)
Expand Down
Loading
Loading