Skip to content

Commit

Permalink
Allow disabling relayers (#14)
Browse files Browse the repository at this point in the history
* Update DB

* Integrate with db code

* Escalate per relayer & don't if relayer disabled

* clippy
  • Loading branch information
Dzejkop authored Jan 9, 2024
1 parent 8a5f5ee commit 0829d6b
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 104 deletions.
5 changes: 5 additions & 0 deletions db/migrations/002_relayers_table_update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
ALTER TABLE relayers
RENAME COLUMN gas_limits TO gas_price_limits;

ALTER TABLE relayers
ADD COLUMN enabled BOOL NOT NULL DEFAULT TRUE;
2 changes: 1 addition & 1 deletion manual_test_kms.nu
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ echo "Creating relayer"
let relayer = http post -t application/json $"($txSitter)/1/admin/relayer" { "name": "My Relayer", "chainId": 11155111 }

http post -t application/json $"($txSitter)/1/admin/relayer/($relayer.relayerId)" {
gasLimits: [
gasPriceLimits: [
{ chainId: 11155111, value: "0x123" }
]
}
Expand Down
15 changes: 12 additions & 3 deletions src/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use eyre::ContextCompat;

use self::gas_estimation::FeesEstimate;
use crate::app::App;
use crate::types::RelayerInfo;

pub mod gas_estimation;

Expand All @@ -21,11 +22,19 @@ pub fn calculate_gas_fees_from_estimates(

pub async fn should_send_transaction(
app: &App,
relayer_id: &str,
relayer: &RelayerInfo,
) -> eyre::Result<bool> {
let relayer = app.db.get_relayer(relayer_id).await?;
if !relayer.enabled {
tracing::warn!(
relayer_id = relayer.id,
chain_id = relayer.chain_id,
"Relayer is disabled, skipping transactions broadcast"
);

return Ok(false);
}

for gas_limit in &relayer.gas_limits.0 {
for gas_limit in &relayer.gas_price_limits.0 {
let chain_fees = app
.db
.get_latest_block_fees_by_chain_id(relayer.chain_id)
Expand Down
23 changes: 13 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ impl Database {
.await?;
}

if let Some(gas_limits) = &update.gas_limits {
if let Some(gas_price_limits) = &update.gas_price_limits {
sqlx::query(
r#"
UPDATE relayers
SET gas_limits = $2
SET gas_price_limits = $2
WHERE id = $1
"#,
)
.bind(id)
.bind(Json(gas_limits))
.bind(Json(gas_price_limits))
.execute(tx.as_mut())
.await?;
}
Expand All @@ -132,7 +132,8 @@ impl Database {
nonce,
current_nonce,
max_inflight_txs,
gas_limits
gas_price_limits,
enabled
FROM relayers
"#,
)
Expand All @@ -152,7 +153,8 @@ impl Database {
nonce,
current_nonce,
max_inflight_txs,
gas_limits
gas_price_limits,
enabled
FROM relayers
WHERE id = $1
"#,
Expand Down Expand Up @@ -1090,7 +1092,7 @@ mod tests {

use super::*;
use crate::db::data::U256Wrapper;
use crate::types::RelayerGasLimit;
use crate::types::RelayerGasPriceLimit;

async fn setup_db() -> eyre::Result<(Database, DockerContainerGuard)> {
let db_container = postgres_docker_utils::setup().await?;
Expand Down Expand Up @@ -1230,17 +1232,18 @@ mod tests {
assert_eq!(relayer.nonce, 0);
assert_eq!(relayer.current_nonce, 0);
assert_eq!(relayer.max_inflight_txs, 5);
assert_eq!(relayer.gas_limits.0, vec![]);
assert_eq!(relayer.gas_price_limits.0, vec![]);

db.update_relayer(
relayer_id,
&RelayerUpdate {
relayer_name: None,
max_inflight_txs: Some(10),
gas_limits: Some(vec![RelayerGasLimit {
gas_price_limits: Some(vec![RelayerGasPriceLimit {
chain_id: 1,
value: U256Wrapper(U256::from(10_123u64)),
}]),
enabled: None,
},
)
.await?;
Expand All @@ -1256,8 +1259,8 @@ mod tests {
assert_eq!(relayer.current_nonce, 0);
assert_eq!(relayer.max_inflight_txs, 10);
assert_eq!(
relayer.gas_limits.0,
vec![RelayerGasLimit {
relayer.gas_price_limits.0,
vec![RelayerGasPriceLimit {
chain_id: 1,
value: U256Wrapper(U256::from(10_123u64)),
}]
Expand Down
6 changes: 4 additions & 2 deletions src/tasks/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ async fn broadcast_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<UnsentTx>,
) -> Result<(), eyre::Error> {
) -> eyre::Result<()> {
if txs.is_empty() {
return Ok(());
}

tracing::info!(relayer_id, num_txs = txs.len(), "Broadcasting relayer txs");

if !should_send_transaction(app, &relayer_id).await? {
let relayer = app.db.get_relayer(&relayer_id).await?;

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(
relayer_id = relayer_id,
"Skipping transaction broadcasts"
Expand Down
210 changes: 130 additions & 80 deletions src/tasks/escalate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::collections::HashMap;
use std::sync::Arc;

use ethers::providers::Middleware;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::types::transaction::eip2930::AccessList;
use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, U256};
use eyre::ContextCompat;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

use crate::app::App;
use crate::broadcast_utils::should_send_transaction;
use crate::db::TxForEscalation;

pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
loop {
Expand All @@ -16,90 +20,136 @@ pub async fn escalate_txs(app: Arc<App>) -> eyre::Result<()> {
.get_txs_for_escalation(app.config.service.escalation_interval)
.await?;

for tx in txs_for_escalation {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");
let txs_for_escalation = split_txs_per_relayer(txs_for_escalation);

if !should_send_transaction(&app, &tx.relayer_id).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");
continue;
}
let mut futures = FuturesUnordered::new();

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
for (relayer_id, txs) in txs_for_escalation {
futures.push(escalate_relayer_txs(&app, relayer_id, txs));
}

while let Some(result) = futures.next().await {
if let Err(err) = result {
tracing::error!(error = ?err, "Failed escalating txs");
}
}

tokio::time::sleep(app.config.service.escalation_interval).await;
}
}

async fn escalate_relayer_txs(
app: &App,
relayer_id: String,
txs: Vec<TxForEscalation>,
) -> eyre::Result<()> {
let relayer = app.db.get_relayer(&relayer_id).await?;

for tx in txs {
tracing::info!(id = tx.id, tx.escalation_count, "Escalating tx");

if !should_send_transaction(app, &relayer).await? {
tracing::warn!(id = tx.id, "Skipping transaction broadcast");

return Ok(());
}

let escalation = tx.escalation_count + 1;

let middleware = app
.signer_middleware(tx.chain_id, tx.key_id.clone())
.await?;

let fees = app
.db
.get_latest_block_fees_by_chain_id(tx.chain_id)
.await?
.context("Missing block")?;

// Min increase of 20% on the priority fee required for a replacement tx
let factor = U256::from(100);
let increased_gas_price_percentage =
factor + U256::from(20 * (1 + escalation));

let max_fee_per_gas_increase = tx.initial_max_fee_per_gas.0
* increased_gas_price_percentage
/ factor;

let max_fee_per_gas =
tx.initial_max_fee_per_gas.0 + max_fee_per_gas_increase;

let max_priority_fee_per_gas =
max_fee_per_gas - fees.fee_estimates.base_fee_per_gas;

tracing::warn!(
"Initial tx fees are max = {}, priority = {}",
tx.initial_max_fee_per_gas.0,
tx.initial_max_priority_fee_per_gas.0
);
tracing::warn!("Escalating with max fee = {max_fee_per_gas} and max priority = {max_priority_fee_per_gas}");

let eip1559_tx = Eip1559TransactionRequest {
from: None,
to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))),
gas: Some(tx.gas_limit.0),
value: Some(tx.value.0),
data: Some(tx.data.into()),
nonce: Some(tx.nonce.into()),
access_list: AccessList::default(),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
max_fee_per_gas: Some(max_fee_per_gas),
chain_id: Some(tx.chain_id.into()),
};

let pending_tx = middleware
.send_transaction(TypedTransaction::Eip1559(eip1559_tx), None)
.await;

let pending_tx = match pending_tx {
Ok(pending_tx) => {
tracing::info!(?pending_tx, "Tx sent successfully");
pending_tx
}
Err(err) => {
tracing::error!(error = ?err, "Failed to send tx");
continue;
}
};

let tx_hash = pending_tx.tx_hash();

app.db
.escalate_tx(
&tx.id,
tx_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
)
.await?;

tracing::info!(id = ?tx.id, hash = ?tx_hash, "Tx escalated");
}

Ok(())
}

fn split_txs_per_relayer(
txs: Vec<TxForEscalation>,
) -> HashMap<String, Vec<TxForEscalation>> {
let mut txs_per_relayer = HashMap::new();

for tx in txs {
let relayer_id = tx.relayer_id.clone();

let txs_for_relayer =
txs_per_relayer.entry(relayer_id).or_insert_with(Vec::new);

txs_for_relayer.push(tx);
}

for (_, txs) in txs_per_relayer.iter_mut() {
txs.sort_by_key(|tx| tx.escalation_count);
}

txs_per_relayer
}
Loading

0 comments on commit 0829d6b

Please sign in to comment.