diff --git a/db/migrations/002_relayers_table_update.sql b/db/migrations/002_relayers_table_update.sql new file mode 100644 index 0000000..24e3053 --- /dev/null +++ b/db/migrations/002_relayers_table_update.sql @@ -0,0 +1,5 @@ +ALTER TABLE relayers +RENAME COLUMN gas_limits TO gas_price_limits; + +ALTER TABLE relayers +ADD COLUMN enabled BOOL NOT NULL DEFAULT TRUE; diff --git a/manual_test_kms.nu b/manual_test_kms.nu index e426991..7ad746a 100644 --- a/manual_test_kms.nu +++ b/manual_test_kms.nu @@ -21,7 +21,7 @@ echo "Creating relayer" let relayer = http post -t application/json $"($txSitter)/1/admin/relayer" { "name": "My Relayer", "chainId": 11155111 } http post -t application/json $"($txSitter)/1/admin/relayer/($relayer.relayerId)" { - gasLimits: [ + gasPriceLimits: [ { chainId: 11155111, value: "0x123" } ] } diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index 35c9318..3de31b5 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -3,6 +3,7 @@ use eyre::ContextCompat; use self::gas_estimation::FeesEstimate; use crate::app::App; +use crate::types::RelayerInfo; pub mod gas_estimation; @@ -21,11 +22,19 @@ pub fn calculate_gas_fees_from_estimates( pub async fn should_send_transaction( app: &App, - relayer_id: &str, + relayer: &RelayerInfo, ) -> eyre::Result { - let relayer = app.db.get_relayer(relayer_id).await?; + if !relayer.enabled { + tracing::warn!( + relayer_id = relayer.id, + chain_id = relayer.chain_id, + "Relayer is disabled, skipping transactions broadcast" + ); + + return Ok(false); + } - for gas_limit in &relayer.gas_limits.0 { + for gas_limit in &relayer.gas_price_limits.0 { let chain_fees = app .db .get_latest_block_fees_by_chain_id(relayer.chain_id) diff --git a/src/db.rs b/src/db.rs index 0504c67..d935abc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -101,16 +101,16 @@ impl Database { .await?; } - if let Some(gas_limits) = &update.gas_limits { + if let Some(gas_price_limits) = &update.gas_price_limits { sqlx::query( r#" UPDATE relayers - SET gas_limits = $2 + SET gas_price_limits = $2 WHERE id = $1 "#, ) .bind(id) - .bind(Json(gas_limits)) + .bind(Json(gas_price_limits)) .execute(tx.as_mut()) .await?; } @@ -132,7 +132,8 @@ impl Database { nonce, current_nonce, max_inflight_txs, - gas_limits + gas_price_limits, + enabled FROM relayers "#, ) @@ -152,7 +153,8 @@ impl Database { nonce, current_nonce, max_inflight_txs, - gas_limits + gas_price_limits, + enabled FROM relayers WHERE id = $1 "#, @@ -1090,7 +1092,7 @@ mod tests { use super::*; use crate::db::data::U256Wrapper; - use crate::types::RelayerGasLimit; + use crate::types::RelayerGasPriceLimit; async fn setup_db() -> eyre::Result<(Database, DockerContainerGuard)> { let db_container = postgres_docker_utils::setup().await?; @@ -1230,17 +1232,18 @@ mod tests { assert_eq!(relayer.nonce, 0); assert_eq!(relayer.current_nonce, 0); assert_eq!(relayer.max_inflight_txs, 5); - assert_eq!(relayer.gas_limits.0, vec![]); + assert_eq!(relayer.gas_price_limits.0, vec![]); db.update_relayer( relayer_id, &RelayerUpdate { relayer_name: None, max_inflight_txs: Some(10), - gas_limits: Some(vec![RelayerGasLimit { + gas_price_limits: Some(vec![RelayerGasPriceLimit { chain_id: 1, value: U256Wrapper(U256::from(10_123u64)), }]), + enabled: None, }, ) .await?; @@ -1256,8 +1259,8 @@ mod tests { assert_eq!(relayer.current_nonce, 0); assert_eq!(relayer.max_inflight_txs, 10); assert_eq!( - relayer.gas_limits.0, - vec![RelayerGasLimit { + relayer.gas_price_limits.0, + vec![RelayerGasPriceLimit { chain_id: 1, value: U256Wrapper(U256::from(10_123u64)), }] diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 7f33f21..ce1f44a 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -44,14 +44,16 @@ async fn broadcast_relayer_txs( app: &App, relayer_id: String, txs: Vec, -) -> Result<(), eyre::Error> { +) -> eyre::Result<()> { if txs.is_empty() { return Ok(()); } tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs"); - if !should_send_transaction(app, &relayer_id).await? { + let relayer = app.db.get_relayer(&relayer_id).await?; + + if !should_send_transaction(app, &relayer).await? { tracing::warn!( relayer_id = relayer_id, "Skipping transaction broadcasts" diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 3a8c51f..7d15ac1 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use ethers::providers::Middleware; @@ -5,9 +6,12 @@ use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, U256}; use eyre::ContextCompat; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use crate::app::App; use crate::broadcast_utils::should_send_transaction; +use crate::db::TxForEscalation; pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { loop { @@ -16,90 +20,136 @@ pub async fn escalate_txs(app: Arc) -> eyre::Result<()> { .get_txs_for_escalation(app.config.service.escalation_interval) .await?; - for tx in txs_for_escalation { - tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx"); + let txs_for_escalation = split_txs_per_relayer(txs_for_escalation); - if !should_send_transaction(&app, &tx.relayer_id).await? { - tracing::warn!(id = tx.id, "Skipping transaction broadcast"); - continue; - } + let mut futures = FuturesUnordered::new(); - let escalation = tx.escalation_count + 1; - - let middleware = app - .signer_middleware(tx.chain_id, tx.key_id.clone()) - .await?; - - let fees = app - .db - .get_latest_block_fees_by_chain_id(tx.chain_id) - .await? - .context("Missing block")?; - - // Min increase of 20% on the priority fee required for a replacement tx - let factor = U256::from(100); - let increased_gas_price_percentage = - factor + U256::from(20 * (1 + escalation)); - - let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 - * increased_gas_price_percentage - / factor; - - let max_fee_per_gas = - tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase; - - let max_priority_fee_per_gas = - max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; - - tracing::warn!( - "Initial tx fees are max = {}, priority = {}", - tx.initial_max_fee_per_gas.0, - tx.initial_max_priority_fee_per_gas.0 - ); - tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); - - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), - }; - - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; - - let pending_tx = match pending_tx { - Ok(pending_tx) => { - tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx - } - Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); - continue; - } - }; - - let tx_hash = pending_tx.tx_hash(); - - app.db - .escalate_tx( - &tx.id, - tx_hash, - max_fee_per_gas, - max_priority_fee_per_gas, - ) - .await?; - - tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated"); + for (relayer_id, txs) in txs_for_escalation { + futures.push(escalate_relayer_txs(&app, relayer_id, txs)); + } + + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed escalating txs"); + } } tokio::time::sleep(app.config.service.escalation_interval).await; } } + +async fn escalate_relayer_txs( + app: &App, + relayer_id: String, + txs: Vec, +) -> eyre::Result<()> { + let relayer = app.db.get_relayer(&relayer_id).await?; + + for tx in txs { + tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx"); + + if !should_send_transaction(app, &relayer).await? { + tracing::warn!(id = tx.id, "Skipping transaction broadcast"); + + return Ok(()); + } + + let escalation = tx.escalation_count + 1; + + let middleware = app + .signer_middleware(tx.chain_id, tx.key_id.clone()) + .await?; + + let fees = app + .db + .get_latest_block_fees_by_chain_id(tx.chain_id) + .await? + .context("Missing block")?; + + // Min increase of 20% on the priority fee required for a replacement tx + let factor = U256::from(100); + let increased_gas_price_percentage = + factor + U256::from(20 * (1 + escalation)); + + let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0 + * increased_gas_price_percentage + / factor; + + let max_fee_per_gas = + tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase; + + let max_priority_fee_per_gas = + max_fee_per_gas - fees.fee_estimates.base_fee_per_gas; + + tracing::warn!( + "Initial tx fees are max = {}, priority = {}", + tx.initial_max_fee_per_gas.0, + tx.initial_max_priority_fee_per_gas.0 + ); + tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}"); + + let eip1559_tx = Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }; + + let pending_tx = middleware + .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) + .await; + + let pending_tx = match pending_tx { + Ok(pending_tx) => { + tracing::info!(?pending_tx, "Tx sent successfully"); + pending_tx + } + Err(err) => { + tracing::error!(error = ?err, "Failed to send tx"); + continue; + } + }; + + let tx_hash = pending_tx.tx_hash(); + + app.db + .escalate_tx( + &tx.id, + tx_hash, + max_fee_per_gas, + max_priority_fee_per_gas, + ) + .await?; + + tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated"); + } + + Ok(()) +} + +fn split_txs_per_relayer( + txs: Vec, +) -> HashMap> { + let mut txs_per_relayer = HashMap::new(); + + for tx in txs { + let relayer_id = tx.relayer_id.clone(); + + let txs_for_relayer = + txs_per_relayer.entry(relayer_id).or_insert_with(Vec::new); + + txs_for_relayer.push(tx); + } + + for (_, txs) in txs_per_relayer.iter_mut() { + txs.sort_by_key(|tx| tx.escalation_count); + } + + txs_per_relayer +} diff --git a/src/types.rs b/src/types.rs index 47fd120..b349d08 100644 --- a/src/types.rs +++ b/src/types.rs @@ -42,7 +42,8 @@ pub struct RelayerInfo { pub current_nonce: u64, #[sqlx(try_from = "i64")] pub max_inflight_txs: u64, - pub gas_limits: Json>, + pub gas_price_limits: Json>, + pub enabled: bool, } #[derive(Deserialize, Serialize, Debug, Clone, Default)] @@ -50,17 +51,17 @@ pub struct RelayerInfo { pub struct RelayerUpdate { #[serde(default)] pub relayer_name: Option, - #[serde(default)] pub max_inflight_txs: Option, - #[serde(default)] - pub gas_limits: Option>, + pub gas_price_limits: Option>, + #[serde(default)] + pub enabled: Option, } #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] -pub struct RelayerGasLimit { +pub struct RelayerGasPriceLimit { pub value: U256Wrapper, pub chain_id: i64, } @@ -82,10 +83,11 @@ mod tests { nonce: 0, current_nonce: 0, max_inflight_txs: 0, - gas_limits: Json(vec![RelayerGasLimit { + gas_price_limits: Json(vec![RelayerGasPriceLimit { value: U256Wrapper(U256::zero()), chain_id: 1, }]), + enabled: true, }; let json = serde_json::to_string_pretty(&info).unwrap(); @@ -100,12 +102,13 @@ mod tests { "nonce": 0, "currentNonce": 0, "maxInflightTxs": 0, - "gasLimits": [ + "gasPriceLimits": [ { "value": "0x0", "chainId": 1 } - ] + ], + "enabled": true } "#};