diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index 3de31b5..f1d791d 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -20,7 +20,7 @@ pub fn calculate_gas_fees_from_estimates( (max_fee_per_gas, max_priority_fee_per_gas) } -pub async fn should_send_transaction( +pub async fn should_send_relayer_transactions( app: &App, relayer: &RelayerInfo, ) -> eyre::Result { @@ -43,6 +43,7 @@ pub async fn should_send_transaction( if chain_fees.gas_price > gas_limit.value.0 { tracing::warn!( + relayer_id = relayer.id, chain_id = relayer.chain_id, gas_price = ?chain_fees.gas_price, gas_limit = ?gas_limit.value.0, diff --git a/src/db.rs b/src/db.rs index d935abc..43f9b9e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,9 +14,7 @@ use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority}; pub mod data; -use self::data::{ - AddressWrapper, BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind, -}; +use self::data::{BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind}; pub use self::data::{TxForEscalation, TxStatus, UnsentTx}; // Statically link in migration files @@ -141,6 +139,32 @@ impl Database { .await?) } + pub async fn get_relayers_by_chain_id( + &self, + chain_id: u64, + ) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT + id, + name, + chain_id, + key_id, + address, + nonce, + current_nonce, + max_inflight_txs, + gas_price_limits, + enabled + FROM relayers + WHERE chain_id = $1 + "#, + ) + .bind(chain_id as i64) + .fetch_all(&self.pool) + .await?) + } + pub async fn get_relayer(&self, id: &str) -> eyre::Result { Ok(sqlx::query_as( r#" @@ -781,24 +805,6 @@ impl Database { .await?) } - pub async fn get_relayer_addresses( - &self, - chain_id: u64, - ) -> eyre::Result> { - let items: Vec<(AddressWrapper,)> = sqlx::query_as( - r#" - SELECT address - FROM relayers - WHERE chain_id = $1 - "#, - ) - .bind(chain_id as i64) - .fetch_all(&self.pool) - .await?; - - Ok(items.into_iter().map(|(wrapper,)| wrapper.0).collect()) - } - pub async fn update_relayer_nonce( &self, chain_id: u64, diff --git a/src/server/routes/transaction.rs b/src/server/routes/transaction.rs index 33f7d9e..75dceec 100644 --- a/src/server/routes/transaction.rs +++ b/src/server/routes/transaction.rs @@ -102,7 +102,7 @@ pub async fn send_tx( ) .await?; - tracing::info!(id = tx_id, "Tx created"); + tracing::info!(tx_id, "Transaction created"); Ok(Json(SendTxResponse { tx_id })) } diff --git a/src/task_runner.rs b/src/task_runner.rs index 2e4e096..f5b4367 100644 --- a/src/task_runner.rs +++ b/src/task_runner.rs @@ -33,12 +33,12 @@ where let mut failures = vec![]; loop { - tracing::info!(label, "Running task"); + tracing::info!(task_label = label, "Running task"); let result = task(app.clone()).await; if let Err(err) = result { - tracing::error!(label, error = ?err, "Task failed"); + tracing::error!(task_label = label, error = ?err, "Task failed"); failures.push(Instant::now()); let backoff = determine_backoff(&failures); @@ -47,7 +47,7 @@ where prune_failures(&mut failures); } else { - tracing::info!(label, "Task finished"); + tracing::info!(task_label = label, "Task finished"); break; } } diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index ce1f44a..673c02f 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -13,14 +13,17 @@ use itertools::Itertools; use crate::app::App; use crate::broadcast_utils::{ - calculate_gas_fees_from_estimates, should_send_transaction, + calculate_gas_fees_from_estimates, should_send_relayer_transactions, }; use crate::db::UnsentTx; +const NO_TXS_SLEEP_DURATION: Duration = Duration::from_secs(2); + pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { loop { - // Get all unsent txs and broadcast let txs = app.db.get_unsent_txs().await?; + let num_txs = txs.len(); + let txs_by_relayer = sort_txs_by_relayer(txs); let mut futures = FuturesUnordered::new(); @@ -31,11 +34,13 @@ pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { while let Some(result) = futures.next().await { if let Err(err) = result { - tracing::error!(error = ?err, "Failed broadcasting txs"); + tracing::error!(error = ?err, "Failed broadcasting transactions"); } } - tokio::time::sleep(Duration::from_secs(1)).await; + if num_txs == 0 { + tokio::time::sleep(NO_TXS_SLEEP_DURATION).await; + } } } @@ -49,21 +54,22 @@ async fn broadcast_relayer_txs( return Ok(()); } - tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs"); - let relayer = app.db.get_relayer(&relayer_id).await?; - if !should_send_transaction(app, &relayer).await? { - tracing::warn!( - relayer_id = relayer_id, - "Skipping transaction broadcasts" - ); + if !should_send_relayer_transactions(app, &relayer).await? { + tracing::warn!(relayer_id = relayer_id, "Skipping relayer broadcasts"); return Ok(()); } + tracing::info!( + relayer_id, + num_txs = txs.len(), + "Broadcasting relayer transactions" + ); + for tx in txs { - tracing::info!(id = tx.id, "Sending tx"); + tracing::info!(tx_id = tx.id, nonce = tx.nonce, "Sending transaction"); let middleware = app .signer_middleware(tx.chain_id, tx.key_id.clone()) @@ -103,16 +109,22 @@ async fn broadcast_relayer_txs( .fill_transaction(&mut typed_transaction, None) .await?; - tracing::debug!(?tx.id, "Simulating tx"); + tracing::debug!(tx_id = tx.id, "Simulating transaction"); // Simulate the transaction match middleware.call(&typed_transaction, None).await { Ok(_) => { - tracing::info!(?tx.id, "Tx simulated successfully"); + tracing::info!( + tx_id = tx.id, + "Transaction simulated successfully" + ); } Err(err) => { - tracing::error!(?tx.id, error = ?err, "Failed to simulate tx"); - continue; + tracing::error!(tx_id = tx.id, error = ?err, "Failed to simulate transaction"); + + // If we fail while broadcasting a tx with nonce `n`, + // it doesn't make sense to broadcast tx with nonce `n + 1` + return Ok(()); } }; @@ -133,24 +145,25 @@ async fn broadcast_relayer_txs( ) .await?; - tracing::debug!(?tx.id, "Sending tx"); + tracing::debug!(tx_id = tx.id, "Sending transaction"); - // TODO: Be smarter about error handling - a tx can fail to be sent - // e.g. because the relayer is out of funds - // but we don't want to retry it forever let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; - match pending_tx { - Ok(pending_tx) => { - tracing::info!(?pending_tx, "Tx sent successfully"); - } + let pending_tx = match pending_tx { + Ok(pending_tx) => pending_tx, Err(err) => { - tracing::error!(?tx.id, error = ?err, "Failed to send tx"); + tracing::error!(tx_id = tx.id, error = ?err, "Failed to send transaction"); continue; } }; - tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast"); + tracing::info!( + tx_id = tx.id, + tx_nonce = tx.nonce, + tx_hash = ?tx_hash, + ?pending_tx, + "Transaction broadcast" + ); } Ok(()) diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 7d15ac1..76cf4da 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -10,7 +10,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use crate::app::App; -use crate::broadcast_utils::should_send_transaction; +use crate::broadcast_utils::should_send_relayer_transactions; use crate::db::TxForEscalation; pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { @@ -46,14 +46,21 @@ async fn escalate_relayer_txs( let relayer = app.db.get_relayer(&relayer_id).await?; for tx in txs { - tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx"); - - if !should_send_transaction(app, &relayer).await? { - tracing::warn!(id = tx.id, "Skipping transaction broadcast"); + if !should_send_relayer_transactions(app, &relayer).await? { + tracing::warn!( + relayer_id = relayer.id, + "Skipping relayer escalations" + ); return Ok(()); } + tracing::info!( + tx_id = tx.id, + escalation_count = tx.escalation_count, + "Escalating transaction" + ); + let escalation = tx.escalation_count + 1; let middleware = app @@ -71,23 +78,17 @@ async fn escalate_relayer_txs( let increased_gas_price_percentage = factor + U256::from(20 * (1 + escalation)); - let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 - * increased_gas_price_percentage - / factor; + let initial_max_fee_per_gas = tx.initial_max_fee_per_gas.0; + + let max_fee_per_gas_increase = + initial_max_fee_per_gas * increased_gas_price_percentage / factor; let max_fee_per_gas = - tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase; + initial_max_fee_per_gas + max_fee_per_gas_increase; let max_priority_fee_per_gas = max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; - tracing::warn!( - "Initial tx fees are max = {}, priority = {}", - tx.initial_max_fee_per_gas.0, - tx.initial_max_priority_fee_per_gas.0 - ); - tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); - let eip1559_tx = Eip1559TransactionRequest { from: None, to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), @@ -106,18 +107,26 @@ async fn escalate_relayer_txs( .await; let pending_tx = match pending_tx { - Ok(pending_tx) => { - tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx - } + Ok(pending_tx) => pending_tx, Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); + tracing::error!(tx_id = tx.id, error = ?err, "Failed to escalate transaction"); continue; } }; let tx_hash = pending_tx.tx_hash(); + tracing::info!( + tx_id = tx.id, + ?tx_hash, + ?initial_max_fee_per_gas, + ?max_fee_per_gas_increase, + ?max_fee_per_gas, + ?max_priority_fee_per_gas, + ?pending_tx, + "Escalated transaction" + ); + app.db .escalate_tx( &tx.id, @@ -127,7 +136,7 @@ async fn escalate_relayer_txs( ) .await?; - tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated"); + tracing::info!(tx_id = tx.id, "Escalated transaction saved"); } Ok(()) diff --git a/src/tasks/handle_reorgs.rs b/src/tasks/handle_reorgs.rs index 7b9a12d..a18aa15 100644 --- a/src/tasks/handle_reorgs.rs +++ b/src/tasks/handle_reorgs.rs @@ -9,7 +9,7 @@ pub async fn handle_hard_reorgs(app: Arc) -> eyre::Result<()> { let reorged_txs = app.db.handle_hard_reorgs().await?; for tx in reorged_txs { - tracing::info!(id = tx, "Tx hard reorged"); + tracing::info!(tx_id = tx, "Transaction hard reorged"); } tokio::time::sleep(app.config.service.hard_reorg_interval).await; @@ -23,7 +23,7 @@ pub async fn handle_soft_reorgs(app: Arc) -> eyre::Result<()> { let txs = app.db.handle_soft_reorgs().await?; for tx in txs { - tracing::info!(id = tx, "Tx soft reorged"); + tracing::info!(tx_id = tx, "Transaction soft reorged"); } tokio::time::sleep(app.config.service.soft_reorg_interval).await; diff --git a/src/tasks/index.rs b/src/tasks/index.rs index eca129f..dcfe76c 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -12,6 +12,7 @@ use crate::app::App; use crate::broadcast_utils::gas_estimation::{ estimate_percentile_fees, FeesEstimate, }; +use crate::types::RelayerInfo; const BLOCK_FEE_HISTORY_SIZE: usize = 10; const FEE_PERCENTILES: [f64; 5] = [5.0, 25.0, 50.0, 75.0, 95.0]; @@ -48,7 +49,7 @@ pub async fn index_block( ) -> eyre::Result<()> { let block_number = block.number.context("Missing block number")?.as_u64(); - tracing::info!(block_number, "Indexing block"); + tracing::info!(chain_id, block_number, "Indexing block"); let block_timestamp_seconds = block.timestamp.as_u64(); let block_timestamp = @@ -75,17 +76,18 @@ pub async fn index_block( [("chain_id", chain_id.to_string())]; for tx in mined_txs { tracing::info!( - id = tx.0, - hash = ?tx.1, + tx_id = tx.0, + tx_hash = ?tx.1, "Tx mined" ); metrics::increment_counter!("tx_mined", &metric_labels); } - let relayer_addresses = app.db.get_relayer_addresses(chain_id).await?; + let relayers = app.db.get_relayers_by_chain_id(chain_id).await?; + + update_relayer_nonces(&relayers, &app, rpc, chain_id).await?; - update_relayer_nonces(relayer_addresses, &app, rpc, chain_id).await?; Ok(()) } @@ -138,14 +140,18 @@ pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { .await?; let Some(latest_block_number) = latest_block_number else { - tracing::info!("No blocks to estimate fees for"); + tracing::info!(chain_id, "No blocks to estimate fees for"); tokio::time::sleep(Duration::from_secs(2)).await; continue; }; - tracing::info!(block_number = latest_block_number, "Estimating fees"); + tracing::info!( + chain_id, + block_number = latest_block_number, + "Estimating fees" + ); let fee_estimates = get_block_fee_estimates(&rpc, latest_block_number) .await @@ -196,30 +202,31 @@ pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { } async fn update_relayer_nonces( - relayer_addresses: Vec, + relayers: &[RelayerInfo], app: &Arc, rpc: &Provider, chain_id: u64, ) -> Result<(), eyre::Error> { let mut futures = FuturesUnordered::new(); - for relayer_address in relayer_addresses { + for relayer in relayers { let app = app.clone(); futures.push(async move { let tx_count = - rpc.get_transaction_count(relayer_address, None).await?; + rpc.get_transaction_count(relayer.address.0, None).await?; tracing::info!( + relayer_id = relayer.id, nonce = ?tx_count, - ?relayer_address, + relayer_address = ?relayer.address.0, "Updating relayer nonce" ); app.db .update_relayer_nonce( chain_id, - relayer_address, + relayer.address.0, tx_count.as_u64(), ) .await?;