From 4dd4b518e20a49b418d28bec14d85324889bca82 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 13:37:56 -0500 Subject: [PATCH 01/15] added .gitignore, increased allowable db setup time --- .gitignore | 1 + src/db.rs | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 9676582..ac0da7f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1069,9 +1069,17 @@ mod tests { let url = format!("postgres://postgres:postgres@{db_socket_addr}/database"); - let db = Database::new(&DatabaseConfig::connection_string(url)).await?; + for _ in 0..5 { + match Database::new(&DatabaseConfig::connection_string(&url)).await + { + Ok(db) => return Ok((db, db_container)), + Err(_) => { + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } - Ok((db, db_container)) + Err(eyre::eyre!("Failed to connect to the database")) } async fn full_update( From 2f2766e15863c5628f57266c6203e77d2d93dd98 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Tue, 12 Dec 2023 13:39:21 -0500 Subject: [PATCH 02/15] fix: typo --- src/tasks/finalize.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tasks/finalize.rs b/src/tasks/finalize.rs index 4695153..9ee3d87 100644 --- a/src/tasks/finalize.rs +++ b/src/tasks/finalize.rs @@ -7,15 +7,15 @@ const TIME_BETWEEN_FINALIZATIONS_SECONDS: i64 = 60; pub async fn finalize_txs(app: Arc) -> eyre::Result<()> { loop { - let finalization_timestmap = + let finalization_timestamp = chrono::Utc::now() - chrono::Duration::seconds(60 * 60); tracing::info!( "Finalizing txs mined before {}", - finalization_timestmap + finalization_timestamp ); - app.db.finalize_txs(finalization_timestmap).await?; + app.db.finalize_txs(finalization_timestamp).await?; tokio::time::sleep(Duration::from_secs( TIME_BETWEEN_FINALIZATIONS_SECONDS as u64, From 524199b73b139e06a079dbf6926ea2fb62bf441a Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 17:48:58 -0500 Subject: [PATCH 03/15] insert into tx_hashes and sent_transactions seperatley, added raw_signed_tx for UniversalSigner --- src/db.rs | 38 ++++++++++++++---- src/keys/universal_signer.rs | 15 ++++++++ src/tasks/broadcast.rs | 74 +++++++++++++++++++++++++----------- 3 files changed, 97 insertions(+), 30 deletions(-) diff --git a/src/db.rs b/src/db.rs index ac0da7f..269e9b6 100644 --- a/src/db.rs +++ b/src/db.rs @@ -231,7 +231,7 @@ impl Database { .await?) } - pub async fn insert_tx_broadcast( + pub async fn insert_into_tx_hashes( &self, tx_id: &str, tx_hash: H256, @@ -246,8 +246,6 @@ impl Database { initial_max_priority_fee_per_gas .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); - let mut tx = self.pool.begin().await?; - sqlx::query( r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) @@ -258,9 +256,27 @@ impl Database { .bind(tx_hash.as_bytes()) .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) - .execute(tx.as_mut()) + .execute(&self.pool) .await?; + Ok(()) + } + + pub async fn insert_into_sent_transactions( + &self, + tx_id: &str, + tx_hash: H256, + initial_max_fee_per_gas: U256, + initial_max_priority_fee_per_gas: U256, + ) -> eyre::Result<()> { + let mut initial_max_fee_per_gas_bytes = [0u8; 32]; + initial_max_fee_per_gas + .to_big_endian(&mut initial_max_fee_per_gas_bytes); + + let mut initial_max_priority_fee_per_gas_bytes = [0u8; 32]; + initial_max_priority_fee_per_gas + .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); + sqlx::query( r#" INSERT INTO sent_transactions (tx_id, initial_max_fee_per_gas, initial_max_priority_fee_per_gas, valid_tx_hash) @@ -271,9 +287,7 @@ impl Database { .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) .bind(tx_hash.as_bytes()) - .execute(tx.as_mut()).await?; - - tx.commit().await?; + .execute(&self.pool).await?; Ok(()) } @@ -1295,7 +1309,15 @@ mod tests { let initial_max_fee_per_gas = U256::from(1); let initial_max_priority_fee_per_gas = U256::from(1); - db.insert_tx_broadcast( + db.insert_into_tx_hashes( + tx_id, + tx_hash_1, + initial_max_fee_per_gas, + initial_max_priority_fee_per_gas, + ) + .await?; + + db.insert_into_sent_transactions( tx_id, tx_hash_1, initial_max_fee_per_gas, diff --git a/src/keys/universal_signer.rs b/src/keys/universal_signer.rs index 6bfd718..c54635c 100644 --- a/src/keys/universal_signer.rs +++ b/src/keys/universal_signer.rs @@ -3,6 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction; use ethers::core::types::transaction::eip712::Eip712; use ethers::core::types::{Address, Signature as EthSig}; use ethers::signers::{Signer, Wallet, WalletError}; +use ethers::types::{Bytes, H256}; use thiserror::Error; use crate::aws::ethers_signer::AwsSigner; @@ -13,6 +14,20 @@ pub enum UniversalSigner { Local(Wallet), } +impl UniversalSigner { + pub async fn raw_signed_tx( + &self, + tx: &TypedTransaction, + ) -> eyre::Result { + let signature = match self { + Self::Aws(signer) => signer.sign_transaction(tx).await?, + Self::Local(signer) => signer.sign_transaction(tx).await?, + }; + + Ok(tx.rlp_signed(&signature)) + } +} + #[derive(Debug, Error)] pub enum UniversalError { #[error("AWS Signer Error: {0}")] diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index d541928..154276b 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -5,7 +5,7 @@ use std::time::Duration; use ethers::providers::Middleware; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::types::transaction::eip2930::AccessList; -use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress}; +use ethers::types::{Address, Eip1559TransactionRequest, NameOrAddress, H256}; use eyre::ContextCompat; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -96,44 +96,74 @@ async fn broadcast_relayer_txs( max_base_fee_per_gas, ); - let eip1559_tx = Eip1559TransactionRequest { - from: None, - to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), - gas: Some(tx.gas_limit.0), - value: Some(tx.value.0), - data: Some(tx.data.into()), - nonce: Some(tx.nonce.into()), - access_list: AccessList::default(), - max_priority_fee_per_gas: Some(max_priority_fee_per_gas), - max_fee_per_gas: Some(max_fee_per_gas), - chain_id: Some(tx.chain_id.into()), + let mut typed_transaction = + TypedTransaction::Eip1559(Eip1559TransactionRequest { + from: None, + to: Some(NameOrAddress::from(Address::from(tx.tx_to.0))), + gas: Some(tx.gas_limit.0), + value: Some(tx.value.0), + data: Some(tx.data.into()), + nonce: Some(tx.nonce.into()), + access_list: AccessList::default(), + max_priority_fee_per_gas: Some(max_priority_fee_per_gas), + max_fee_per_gas: Some(max_fee_per_gas), + chain_id: Some(tx.chain_id.into()), + }); + + // Fill and simulate the transaction + middleware + .fill_transaction(&mut typed_transaction, None) + .await?; + + // Simulate the transaction + match middleware.call(&typed_transaction, None).await { + Ok(_) => { + tracing::info!(?tx.id, "Tx simulated successfully"); + } + Err(err) => { + tracing::error!(?tx.id, error = ?err, "Failed to simulate tx"); + continue; + } }; - tracing::debug!(?eip1559_tx, "Sending tx"); + // Get the raw signed tx and derive the tx hash + let raw_signed_tx = middleware + .signer() + .raw_signed_tx(&typed_transaction) + .await?; + + let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); + + app.db + .insert_into_tx_hashes( + &tx.id, + tx_hash, + max_fee_per_gas, + max_priority_fee_per_gas, + ) + .await?; + + tracing::debug!(?tx.id, "Sending tx"); // TODO: Is it possible that we send a tx but don't store it in the DB? // TODO: Be smarter about error handling - a tx can fail to be sent // e.g. because the relayer is out of funds // but we don't want to retry it forever - let pending_tx = middleware - .send_transaction(TypedTransaction::Eip1559(eip1559_tx), None) - .await; + let pending_tx = middleware.send_raw_transaction(raw_signed_tx).await; - let pending_tx = match pending_tx { + match pending_tx { Ok(pending_tx) => { tracing::info!(?pending_tx, "Tx sent successfully"); - pending_tx } Err(err) => { - tracing::error!(error = ?err, "Failed to send tx"); + tracing::error!(?tx.id, error = ?err, "Failed to send tx"); continue; } }; - let tx_hash = pending_tx.tx_hash(); - + // Insert the tx into app.db - .insert_tx_broadcast( + .insert_into_sent_transactions( &tx.id, tx_hash, max_fee_per_gas, From 479e335a75080c224b3d97a3f7ee58f7c13cffbb Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:28:02 -0500 Subject: [PATCH 04/15] added logic to recover simulated txs --- src/db.rs | 17 +++++++++++ src/tasks/broadcast.rs | 67 ++++++++++++++++++++++++++---------------- 2 files changed, 59 insertions(+), 25 deletions(-) diff --git a/src/db.rs b/src/db.rs index 269e9b6..fa413f1 100644 --- a/src/db.rs +++ b/src/db.rs @@ -292,6 +292,23 @@ impl Database { Ok(()) } + // Gets all transactions that were simulated but not sent + pub async fn recover_simulated_txs(&self) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT r.id as relayer_id, t.id, t.tx_to, t.data, t.value, t.gas_limit, t.priority, t.nonce, r.key_id, r.chain_id + FROM transactions t + INNER JOIN tx_hashes h ON (h.tx_id = t.id) + INNER JOIN relayers r ON (t.relayer_id = r.id + LEFT JOIN sent_transactions s ON (t.id = s.tx_id) + WHERE s.tx_id IS NULL + ORDER BY r.id, t.nonce ASC; + "#, + ) + .fetch_all(&self.pool) + .await?) + } + pub async fn get_latest_block_number_without_fee_estimates( &self, chain_id: u64, diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 154276b..784f52c 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -18,39 +18,38 @@ use crate::broadcast_utils::{ use crate::db::UnsentTx; pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { - loop { - let mut txs = app.db.get_unsent_txs().await?; - - txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + // Recovery any unsent transactions that were simulated but never sent + let recovered_txs = app.db.recover_simulated_txs().await?; + broadcast_unsent_txs(&app, recovered_txs).await?; - let txs_by_relayer = - txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + loop { + // Get all unsent txs and broadcast + let txs = app.db.get_unsent_txs().await?; + broadcast_unsent_txs(&app, txs).await?; - let txs_by_relayer: HashMap<_, _> = txs_by_relayer - .into_iter() - .map(|(relayer_id, txs)| { - let mut txs = txs.collect_vec(); + tokio::time::sleep(Duration::from_secs(1)).await; + } +} - txs.sort_unstable_by_key(|tx| tx.nonce); +async fn broadcast_unsent_txs( + app: &App, + txs: Vec, +) -> eyre::Result<()> { + let txs_by_relayer = sort_txs_by_relayer(txs); - (relayer_id, txs) - }) - .collect(); + let mut futures = FuturesUnordered::new(); - let mut futures = FuturesUnordered::new(); + for (relayer_id, txs) in txs_by_relayer { + futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + } - for (relayer_id, txs) in txs_by_relayer { - futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed broadcasting txs"); } - - while let Some(result) = futures.next().await { - if let Err(err) = result { - tracing::error!(error = ?err, "Failed broadcasting txs"); - } - } - - tokio::time::sleep(Duration::from_secs(1)).await; } + + Ok(()) } #[tracing::instrument(skip(app, txs))] @@ -176,3 +175,21 @@ async fn broadcast_relayer_txs( Ok(()) } + +fn sort_txs_by_relayer( + mut txs: Vec, +) -> HashMap> { + txs.sort_unstable_by_key(|tx| tx.relayer_id.clone()); + let txs_by_relayer = txs.into_iter().group_by(|tx| tx.relayer_id.clone()); + + txs_by_relayer + .into_iter() + .map(|(relayer_id, txs)| { + let mut txs = txs.collect_vec(); + + txs.sort_unstable_by_key(|tx| tx.nonce); + + (relayer_id, txs) + }) + .collect() +} From 9343e424dc29aa97c69331ef579d2e82ad8c1eae Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:28:40 -0500 Subject: [PATCH 05/15] cargo clippy, cargo sort --- Cargo.toml | 90 ++++++++++++++++++------------------ src/keys/universal_signer.rs | 2 +- src/tasks/broadcast.rs | 2 +- 3 files changed, 47 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f1f02e5..191421c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,54 +8,50 @@ default-run = "tx-sitter" members = ["crates/*"] [dependencies] +async-trait = "0.1.74" # Third Party ## AWS aws-config = { version = "1.0.1" } -aws-sdk-kms = "1.3.0" -aws-smithy-types = "1.0.2" -aws-smithy-runtime-api = "1.0.2" -aws-types = "1.0.1" aws-credential-types = { version = "1.0.1", features = [ "hardcoded-credentials", ] } - -## Other -serde = "1.0.136" +aws-sdk-kms = "1.3.0" +aws-smithy-runtime-api = "1.0.2" +aws-smithy-types = "1.0.2" +aws-types = "1.0.1" axum = { version = "0.6.20", features = ["headers"] } -thiserror = "1.0.50" -headers = "0.3.9" -humantime = "2.1.0" -humantime-serde = "1.1.1" -hyper = "0.14.27" -dotenv = "0.15.0" +base64 = "0.21.5" +bigdecimal = "0.4.2" +chrono = "0.4" clap = { version = "4.3.0", features = ["env", "derive"] } +config = "0.13.3" +dotenv = "0.15.0" ethers = { version = "2.0.11", features = ["ws"] } eyre = "0.6.5" +futures = "0.3" +headers = "0.3.9" hex = "0.4.3" hex-literal = "0.4.1" +humantime = "2.1.0" +humantime-serde = "1.1.1" +hyper = "0.14.27" +itertools = "0.12.0" +metrics = "0.21.1" +num-bigint = "0.4.4" +# telemetry-batteries = { path = "../telemetry-batteries" } + +# Internal +postgres-docker-utils = { path = "crates/postgres-docker-utils" } +rand = "0.8.5" reqwest = { version = "0.11.13", default-features = false, features = [ "rustls-tls", ] } + +## Other +serde = "1.0.136" serde_json = "1.0.91" -strum = { version = "0.25.0", features = ["derive"] } -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", default-features = false, features = [ - "env-filter", - "std", - "fmt", - "json", - "ansi", -] } -tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } -uuid = { version = "0.8", features = ["v4"] } -futures = "0.3" -chrono = "0.4" -rand = "0.8.5" sha3 = "0.10.8" -config = "0.13.3" -toml = "0.8.8" -url = "2.4.1" +spki = "0.7.2" sqlx = { version = "0.7.2", features = [ "time", "chrono", @@ -65,26 +61,30 @@ sqlx = { version = "0.7.2", features = [ "migrate", "bigdecimal", ] } -metrics = "0.21.1" -num-bigint = "0.4.4" -bigdecimal = "0.4.2" -spki = "0.7.2" -async-trait = "0.1.74" -itertools = "0.12.0" -base64 = "0.21.5" +strum = { version = "0.25.0", features = ["derive"] } # Company telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", branch = "dzejkop/unnest-fields" } -# telemetry-batteries = { path = "../telemetry-batteries" } - -# Internal -postgres-docker-utils = { path = "crates/postgres-docker-utils" } +thiserror = "1.0.50" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +toml = "0.8.8" +tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } +tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "env-filter", + "std", + "fmt", + "json", + "ansi", +] } +url = "2.4.1" +uuid = { version = "0.8", features = ["v4"] } [dev-dependencies] -test-case = "3.1.0" -indoc = "2.0.3" fake-rpc = { path = "crates/fake-rpc" } +indoc = "2.0.3" +test-case = "3.1.0" [features] -default = [ "default-config" ] +default = ["default-config"] default-config = [] diff --git a/src/keys/universal_signer.rs b/src/keys/universal_signer.rs index c54635c..bd96e80 100644 --- a/src/keys/universal_signer.rs +++ b/src/keys/universal_signer.rs @@ -3,7 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction; use ethers::core::types::transaction::eip712::Eip712; use ethers::core::types::{Address, Signature as EthSig}; use ethers::signers::{Signer, Wallet, WalletError}; -use ethers::types::{Bytes, H256}; +use ethers::types::{Bytes}; use thiserror::Error; use crate::aws::ethers_signer::AwsSigner; diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 784f52c..c4b47c6 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -40,7 +40,7 @@ async fn broadcast_unsent_txs( let mut futures = FuturesUnordered::new(); for (relayer_id, txs) in txs_by_relayer { - futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + futures.push(broadcast_relayer_txs(app, relayer_id, txs)); } while let Some(result) = futures.next().await { From 89a93b159ed6d19133245f3d59cafbe45dd9ffad Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:29:04 -0500 Subject: [PATCH 06/15] formatting --- src/keys/universal_signer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/keys/universal_signer.rs b/src/keys/universal_signer.rs index bd96e80..2a3db9d 100644 --- a/src/keys/universal_signer.rs +++ b/src/keys/universal_signer.rs @@ -3,7 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction; use ethers::core::types::transaction::eip712::Eip712; use ethers::core::types::{Address, Signature as EthSig}; use ethers::signers::{Signer, Wallet, WalletError}; -use ethers::types::{Bytes}; +use ethers::types::Bytes; use thiserror::Error; use crate::aws::ethers_signer::AwsSigner; From 280abf4415d060bb93082521e6351671e5df9265 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:30:58 -0500 Subject: [PATCH 07/15] removed recover simulated tx --- src/db.rs | 17 ----------------- src/tasks/broadcast.rs | 5 ----- 2 files changed, 22 deletions(-) diff --git a/src/db.rs b/src/db.rs index fa413f1..269e9b6 100644 --- a/src/db.rs +++ b/src/db.rs @@ -292,23 +292,6 @@ impl Database { Ok(()) } - // Gets all transactions that were simulated but not sent - pub async fn recover_simulated_txs(&self) -> eyre::Result> { - Ok(sqlx::query_as( - r#" - SELECT r.id as relayer_id, t.id, t.tx_to, t.data, t.value, t.gas_limit, t.priority, t.nonce, r.key_id, r.chain_id - FROM transactions t - INNER JOIN tx_hashes h ON (h.tx_id = t.id) - INNER JOIN relayers r ON (t.relayer_id = r.id - LEFT JOIN sent_transactions s ON (t.id = s.tx_id) - WHERE s.tx_id IS NULL - ORDER BY r.id, t.nonce ASC; - "#, - ) - .fetch_all(&self.pool) - .await?) - } - pub async fn get_latest_block_number_without_fee_estimates( &self, chain_id: u64, diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index c4b47c6..eb3ab9a 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -18,15 +18,10 @@ use crate::broadcast_utils::{ use crate::db::UnsentTx; pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { - // Recovery any unsent transactions that were simulated but never sent - let recovered_txs = app.db.recover_simulated_txs().await?; - broadcast_unsent_txs(&app, recovered_txs).await?; - loop { // Get all unsent txs and broadcast let txs = app.db.get_unsent_txs().await?; broadcast_unsent_txs(&app, txs).await?; - tokio::time::sleep(Duration::from_secs(1)).await; } } From 7d9ebd2592d815b8b90376bf58bb50b6dd532abe Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:44:35 -0500 Subject: [PATCH 08/15] updated insert_into_tx_hashes to do nothing on conflict --- src/db.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/db.rs b/src/db.rs index 269e9b6..fe96728 100644 --- a/src/db.rs +++ b/src/db.rs @@ -250,6 +250,7 @@ impl Database { r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) VALUES ($1, $2, $3, $4) + ON CONFLICT (tx_hash) DO NOTHING "#, ) .bind(tx_id) From 8488a9e3e2c61d6aebff5c9a2d26d626e52074a7 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 18:51:24 -0500 Subject: [PATCH 09/15] on conflict, update tx_hashes --- src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index fe96728..fd04681 100644 --- a/src/db.rs +++ b/src/db.rs @@ -250,7 +250,7 @@ impl Database { r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_hash) DO NOTHING + ON CONFLICT (tx_hash) DO UPDATE "#, ) .bind(tx_id) From 9412b0888e265479ce69a7c8e5016cfa9a2d0ca3 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 13 Dec 2023 19:02:27 -0500 Subject: [PATCH 10/15] updated tx_hashes to add constraint on tx_id, do nothing on conflict when inserting into table --- db/migrations/001_init.sql | 3 +++ src/db.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/db/migrations/001_init.sql b/db/migrations/001_init.sql index 62a7209..4490b20 100644 --- a/db/migrations/001_init.sql +++ b/db/migrations/001_init.sql @@ -60,6 +60,9 @@ CREATE TABLE tx_hashes ( escalated BOOL NOT NULL DEFAULT FALSE ); +ALTER TABLE tx_hashes +ADD UNIQUE (tx_id); + -- Dynamic tx data & data used for escalations CREATE TABLE sent_transactions ( tx_id VARCHAR(255) PRIMARY KEY REFERENCES transactions(id) ON DELETE CASCADE, diff --git a/src/db.rs b/src/db.rs index fd04681..914661d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -250,7 +250,7 @@ impl Database { r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_hash) DO UPDATE + ON CONFLICT (tx_id) DO NOTHING "#, ) .bind(tx_id) From 00ff1c68723b3bfecc1bc9f519d03d4e317aef98 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 14 Dec 2023 08:59:19 -0500 Subject: [PATCH 11/15] added insert_tx_broadcast --- db/migrations/001_init.sql | 3 --- src/db.rs | 29 +++++++---------------------- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/db/migrations/001_init.sql b/db/migrations/001_init.sql index 4490b20..62a7209 100644 --- a/db/migrations/001_init.sql +++ b/db/migrations/001_init.sql @@ -60,9 +60,6 @@ CREATE TABLE tx_hashes ( escalated BOOL NOT NULL DEFAULT FALSE ); -ALTER TABLE tx_hashes -ADD UNIQUE (tx_id); - -- Dynamic tx data & data used for escalations CREATE TABLE sent_transactions ( tx_id VARCHAR(255) PRIMARY KEY REFERENCES transactions(id) ON DELETE CASCADE, diff --git a/src/db.rs b/src/db.rs index 914661d..ace95df 100644 --- a/src/db.rs +++ b/src/db.rs @@ -231,7 +231,7 @@ impl Database { .await?) } - pub async fn insert_into_tx_hashes( + pub async fn insert_tx_broadcast( &self, tx_id: &str, tx_hash: H256, @@ -246,38 +246,21 @@ impl Database { initial_max_priority_fee_per_gas .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); + let mut tx = self.pool.begin().await?; + sqlx::query( r#" INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_id) DO NOTHING "#, ) .bind(tx_id) .bind(tx_hash.as_bytes()) .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) - .execute(&self.pool) + .execute(tx.as_mut()) .await?; - Ok(()) - } - - pub async fn insert_into_sent_transactions( - &self, - tx_id: &str, - tx_hash: H256, - initial_max_fee_per_gas: U256, - initial_max_priority_fee_per_gas: U256, - ) -> eyre::Result<()> { - let mut initial_max_fee_per_gas_bytes = [0u8; 32]; - initial_max_fee_per_gas - .to_big_endian(&mut initial_max_fee_per_gas_bytes); - - let mut initial_max_priority_fee_per_gas_bytes = [0u8; 32]; - initial_max_priority_fee_per_gas - .to_big_endian(&mut initial_max_priority_fee_per_gas_bytes); - sqlx::query( r#" INSERT INTO sent_transactions (tx_id, initial_max_fee_per_gas, initial_max_priority_fee_per_gas, valid_tx_hash) @@ -288,7 +271,9 @@ impl Database { .bind(initial_max_fee_per_gas_bytes) .bind(initial_max_priority_fee_per_gas_bytes) .bind(tx_hash.as_bytes()) - .execute(&self.pool).await?; + .execute(tx.as_mut()).await?; + + tx.commit().await?; Ok(()) } From eb6b1fc7068027d7075b2377cc912f0551bd542c Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 14 Dec 2023 09:02:23 -0500 Subject: [PATCH 12/15] write to database after successful simulation --- src/db.rs | 10 +--------- src/tasks/broadcast.rs | 14 +++----------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/src/db.rs b/src/db.rs index ace95df..ac0da7f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1295,15 +1295,7 @@ mod tests { let initial_max_fee_per_gas = U256::from(1); let initial_max_priority_fee_per_gas = U256::from(1); - db.insert_into_tx_hashes( - tx_id, - tx_hash_1, - initial_max_fee_per_gas, - initial_max_priority_fee_per_gas, - ) - .await?; - - db.insert_into_sent_transactions( + db.insert_tx_broadcast( tx_id, tx_hash_1, initial_max_fee_per_gas, diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index eb3ab9a..ee6159a 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -109,6 +109,8 @@ async fn broadcast_relayer_txs( .fill_transaction(&mut typed_transaction, None) .await?; + tracing::debug!(?tx.id, "Simulating tx"); + // Simulate the transaction match middleware.call(&typed_transaction, None).await { Ok(_) => { @@ -129,7 +131,7 @@ async fn broadcast_relayer_txs( let tx_hash = H256::from(ethers::utils::keccak256(&raw_signed_tx)); app.db - .insert_into_tx_hashes( + .insert_tx_broadcast( &tx.id, tx_hash, max_fee_per_gas, @@ -155,16 +157,6 @@ async fn broadcast_relayer_txs( } }; - // Insert the tx into - app.db - .insert_into_sent_transactions( - &tx.id, - tx_hash, - max_fee_per_gas, - max_priority_fee_per_gas, - ) - .await?; - tracing::info!(id = tx.id, hash = ?tx_hash, "Tx broadcast"); } From d74d027505969418d965f6840f8f54467058d15c Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 14 Dec 2023 09:13:34 -0500 Subject: [PATCH 13/15] removed unneeded function --- src/tasks/broadcast.rs | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index ee6159a..9822cc4 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -21,30 +21,22 @@ pub async fn broadcast_txs(app: Arc) -> eyre::Result<()> { loop { // Get all unsent txs and broadcast let txs = app.db.get_unsent_txs().await?; - broadcast_unsent_txs(&app, txs).await?; - tokio::time::sleep(Duration::from_secs(1)).await; - } -} + let txs_by_relayer = sort_txs_by_relayer(txs); -async fn broadcast_unsent_txs( - app: &App, - txs: Vec, -) -> eyre::Result<()> { - let txs_by_relayer = sort_txs_by_relayer(txs); - - let mut futures = FuturesUnordered::new(); + let mut futures = FuturesUnordered::new(); - for (relayer_id, txs) in txs_by_relayer { - futures.push(broadcast_relayer_txs(app, relayer_id, txs)); - } + for (relayer_id, txs) in txs_by_relayer { + futures.push(broadcast_relayer_txs(&app, relayer_id, txs)); + } - while let Some(result) = futures.next().await { - if let Err(err) = result { - tracing::error!(error = ?err, "Failed broadcasting txs"); + while let Some(result) = futures.next().await { + if let Err(err) = result { + tracing::error!(error = ?err, "Failed broadcasting txs"); + } } - } - Ok(()) + tokio::time::sleep(Duration::from_secs(1)).await; + } } #[tracing::instrument(skip(app, txs))] From 2f73406ae6d76d68628d8f9f2ae000188d70e69f Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 14 Dec 2023 09:25:54 -0500 Subject: [PATCH 14/15] updated cargo toml to match dev branch --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 191421c..6051d7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries" thiserror = "1.0.50" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } toml = "0.8.8" -tower-http = { version = "0.4.4", features = [ "trace", "auth" ] } +tower-http = { version = "0.4.4", features = ["trace", "auth"] } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false, features = [ "env-filter", From 9cb1e3e0a5303e7f738bd0ec45acaf6a8b73ec01 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 14 Dec 2023 10:25:30 -0500 Subject: [PATCH 15/15] removed comments and todo --- Cargo.toml | 2 +- src/tasks/broadcast.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6051d7e..192f45c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = ["crates/*"] [dependencies] async-trait = "0.1.74" -# Third Party + ## AWS aws-config = { version = "1.0.1" } aws-credential-types = { version = "1.0.1", features = [ diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index 9822cc4..7f33f21 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -133,7 +133,6 @@ async fn broadcast_relayer_txs( tracing::debug!(?tx.id, "Sending tx"); - // TODO: Is it possible that we send a tx but don't store it in the DB? // TODO: Be smarter about error handling - a tx can fail to be sent // e.g. because the relayer is out of funds // but we don't want to retry it forever