diff --git a/Cargo.lock b/Cargo.lock index ef0d24a..e1b98f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,20 +2628,9 @@ dependencies = [ [[package]] name = "metrics" -version = "0.21.1" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" -dependencies = [ - "ahash 0.8.6", - "metrics-macros", - "portable-atomic", -] - -[[package]] -name = "metrics" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77b9e10a211c839210fd7f99954bda26e5f8e26ec686ad68da6a32df7c80e782" +checksum = "cd71d9db2e4287c3407fa04378b8c2ee570aebe0854431562cdd89ca091854f4" dependencies = [ "ahash 0.8.6", "portable-atomic", @@ -2658,7 +2647,7 @@ dependencies = [ "hyper-tls", "indexmap 1.9.3", "ipnet", - "metrics 0.22.0", + "metrics", "metrics-util", "quanta", "thiserror", @@ -2673,21 +2662,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82bd7bb16e431f15d56a61b18ee34881cd9d427da7b4450d1a588c911c1d9ac3" dependencies = [ "cadence", - "metrics 0.22.0", + "metrics", "thiserror", ] -[[package]] -name = "metrics-macros" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.39", -] - [[package]] name = "metrics-util" version = "0.16.0" @@ -2697,7 +2675,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.13.1", - "metrics 0.22.0", + "metrics", "num_cpus", "quanta", "sketches-ddsketch", @@ -4639,7 +4617,7 @@ dependencies = [ "chrono", "dirs", "http", - "metrics 0.22.0", + "metrics", "metrics-exporter-prometheus", "metrics-exporter-statsd", "opentelemetry", @@ -5173,7 +5151,7 @@ dependencies = [ "hyper", "indoc", "itertools 0.12.0", - "metrics 0.21.1", + "metrics", "num-bigint", "postgres-docker-utils", "rand", diff --git a/Cargo.toml b/Cargo.toml index 0c04a4a..c0433f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,6 @@ default-run = "tx-sitter" members = ["crates/*"] [dependencies] -async-trait = "0.1.74" - ## AWS aws-config = { version = "1.0.1" } aws-credential-types = { version = "1.0.1", features = [ @@ -19,6 +17,15 @@ aws-sdk-kms = "1.3.0" aws-smithy-runtime-api = "1.0.2" aws-smithy-types = "1.0.2" aws-types = "1.0.1" + +# Internal +postgres-docker-utils = { path = "crates/postgres-docker-utils" } + +# Company +telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" } + +## External +async-trait = "0.1.74" axum = { version = "0.6.20", features = ["headers"] } base64 = "0.21.5" bigdecimal = "0.4.2" @@ -36,18 +43,12 @@ humantime = "2.1.0" humantime-serde = "1.1.1" hyper = "0.14.27" itertools = "0.12.0" -metrics = "0.21.1" +metrics = "0.22.1" num-bigint = "0.4.4" -# telemetry-batteries = { path = "../telemetry-batteries" } - -# Internal -postgres-docker-utils = { path = "crates/postgres-docker-utils" } rand = "0.8.5" reqwest = { version = "0.11.13", default-features = false, features = [ "rustls-tls", ] } - -## Other serde = "1.0.136" serde_json = "1.0.91" sha3 = "0.10.8" @@ -62,9 +63,6 @@ sqlx = { version = "0.7.2", features = [ "bigdecimal", ] } strum = { version = "0.25.0", features = ["derive"] } - -# Company -telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" } thiserror = "1.0.50" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } toml = "0.8.8" diff --git a/src/config.rs b/src/config.rs index 6d2ed1c..35037bb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,40 @@ use std::net::SocketAddr; +use std::path::Path; use std::time::Duration; +use config::FileFormat; use serde::{Deserialize, Serialize}; use crate::api_key::ApiKey; +pub fn load_config<'a>( + config_files: impl Iterator, +) -> eyre::Result { + let mut settings = config::Config::builder(); + + for config_file in config_files { + settings = settings.add_source( + config::File::from(config_file).format(FileFormat::Toml), + ); + } + + let settings = settings + .add_source( + config::Environment::with_prefix("TX_SITTER").separator("__"), + ) + .add_source( + config::Environment::with_prefix("TX_SITTER_EXT") + .separator("__") + .try_parsing(true) + .list_separator(","), + ) + .build()?; + + let config = settings.try_deserialize::()?; + + Ok(config) +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub struct Config { @@ -262,4 +292,31 @@ mod tests { assert_eq!(toml, WITH_DB_PARTS); } + + #[test] + fn env_config_test() { + std::env::set_var("TX_SITTER__DATABASE__KIND", "parts"); + std::env::set_var("TX_SITTER__DATABASE__HOST", "dbHost"); + std::env::set_var("TX_SITTER__DATABASE__PORT", "dbPort"); + std::env::set_var("TX_SITTER__DATABASE__DATABASE", "dbName"); + std::env::set_var("TX_SITTER__DATABASE__USERNAME", "dbUsername"); + std::env::set_var("TX_SITTER__DATABASE__PASSWORD", "dbPassword"); + std::env::set_var("TX_SITTER__SERVICE__ESCALATION_INTERVAL", "1m"); + std::env::set_var("TX_SITTER__SERVICE__DATADOG_ENABLED", "true"); + std::env::set_var("TX_SITTER__SERVICE__STATSD_ENABLED", "true"); + std::env::set_var("TX_SITTER__SERVER__HOST", "0.0.0.0:8080"); + std::env::set_var("TX_SITTER__SERVER__USERNAME", "authUsername"); + std::env::set_var("TX_SITTER__SERVER__PASSWORD", "authPassword"); + std::env::set_var("TX_SITTER__KEYS__KIND", "kms"); + + let config = load_config(std::iter::empty()).unwrap(); + + assert!(config.service.statsd_enabled); + assert!(config.service.datadog_enabled); + assert_eq!(config.service.escalation_interval, Duration::from_secs(60)); + assert_eq!( + config.database.to_connection_string(), + "postgres://dbUsername:dbPassword@dbHost:dbPort/dbName" + ); + } } diff --git a/src/lib.rs b/src/lib.rs index 70b3af8..abb94b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod keys; pub mod serde_utils; pub mod server; pub mod service; +pub mod shutdown; pub mod task_runner; pub mod tasks; pub mod types; diff --git a/src/main.rs b/src/main.rs index 9361396..42031e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,14 @@ use std::path::PathBuf; use clap::Parser; -use config::FileFormat; use telemetry_batteries::metrics::statsd::StatsdBattery; use telemetry_batteries::tracing::datadog::DatadogBattery; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use tx_sitter::config::Config; +use tx_sitter::config::load_config; use tx_sitter::service::Service; +use tx_sitter::shutdown::spawn_await_shutdown_task; #[derive(Parser)] #[command(author, version, about)] @@ -35,27 +35,7 @@ async fn main() -> eyre::Result<()> { dotenv::from_path(path)?; } - let mut settings = config::Config::builder(); - - for arg in &args.config { - settings = settings.add_source( - config::File::from(arg.as_ref()).format(FileFormat::Toml), - ); - } - - let settings = settings - .add_source( - config::Environment::with_prefix("TX_SITTER").separator("__"), - ) - .add_source( - config::Environment::with_prefix("TX_SITTER_EXT") - .separator("__") - .try_parsing(true) - .list_separator(","), - ) - .build()?; - - let config = settings.try_deserialize::()?; + let config = load_config(args.config.iter().map(PathBuf::as_ref))?; if config.service.datadog_enabled { DatadogBattery::init(None, "tx-sitter-monolith", None, true); @@ -76,6 +56,9 @@ async fn main() -> eyre::Result<()> { )?; } + spawn_await_shutdown_task(); + + tracing::info!(?config, "Starting service"); let service = Service::new(config).await?; service.wait().await?; diff --git a/src/shutdown.rs b/src/shutdown.rs new file mode 100644 index 0000000..fa35a31 --- /dev/null +++ b/src/shutdown.rs @@ -0,0 +1,28 @@ +use core::panic; + +use tokio::signal::unix::{signal, SignalKind}; + +pub fn spawn_await_shutdown_task() { + tokio::spawn(async { + let result = await_shutdown_signal().await; + if let Err(err) = result { + tracing::error!("Error while waiting for shutdown signal: {}", err); + panic!("Error while waiting for shutdown signal: {}", err); + } + + tracing::info!("Shutdown complete"); + std::process::exit(0); + }); +} + +pub async fn await_shutdown_signal() -> eyre::Result<()> { + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => { tracing::info!("SIGINT received, shutting down"); } + _ = sigterm.recv() => { tracing::info!("SIGTERM received, shutting down"); } + }; + + Ok(()) +} diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 1c5d9a8..55367f6 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -38,7 +38,7 @@ async fn escalate_txs(app: &App) -> eyre::Result<()> { let mut futures = FuturesUnordered::new(); for (relayer_id, txs) in txs_for_escalation { - futures.push(escalate_relayer_txs(&app, relayer_id, txs)); + futures.push(escalate_relayer_txs(app, relayer_id, txs)); } while let Some(result) = futures.next().await { diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 4edb23a..49eaa8b 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -82,7 +82,7 @@ pub async fn index_block( "Tx mined" ); - metrics::increment_counter!("tx_mined", &metric_labels); + metrics::counter!("tx_mined", &metric_labels).increment(1); } let relayers = app.db.get_relayers_by_chain_id(chain_id).await?; @@ -171,16 +171,11 @@ pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { .await?; let labels = [("chain_id", chain_id.to_string())]; - metrics::gauge!( - "gas_price", - gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR, - &labels - ); - metrics::gauge!( - "base_fee_per_gas", + metrics::gauge!("gas_price", &labels) + .set(gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR); + metrics::gauge!("base_fee_per_gas", &labels).set( fee_estimates.base_fee_per_gas.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR, - &labels ); for (i, percentile) in FEE_PERCENTILES.iter().enumerate() { @@ -188,12 +183,12 @@ pub async fn estimate_gas(app: Arc, chain_id: u64) -> eyre::Result<()> { metrics::gauge!( "percentile_fee", - percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR, &[ ("chain_id", chain_id.to_string()), ("percentile", percentile.to_string()), ] - ); + ) + .set(percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR); } tokio::time::sleep(Duration::from_secs( diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs index fb945b1..f8e989b 100644 --- a/src/tasks/metrics.rs +++ b/src/tasks/metrics.rs @@ -15,19 +15,14 @@ pub async fn emit_metrics(app: Arc) -> eyre::Result<()> { // TODO: Add labels for env, etc. let labels = [("chain_id", chain_id.to_string())]; - metrics::gauge!("pending_txs", stats.pending_txs as f64, &labels); - metrics::gauge!("mined_txs", stats.mined_txs as f64, &labels); - metrics::gauge!( - "finalized_txs", - stats.finalized_txs as f64, - &labels - ); - metrics::gauge!( - "total_indexed_blocks", - stats.total_indexed_blocks as f64, - &labels - ); - metrics::gauge!("block_txs", stats.block_txs as f64, &labels); + metrics::gauge!("pending_txs", &labels) + .set(stats.pending_txs as f64); + metrics::gauge!("mined_txs", &labels).set(stats.mined_txs as f64); + metrics::gauge!("finalized_txs", &labels) + .set(stats.finalized_txs as f64); + metrics::gauge!("total_indexed_blocks", &labels) + .set(stats.total_indexed_blocks as f64); + metrics::gauge!("block_txs", &labels).set(stats.block_txs as f64); } tokio::time::sleep(EMIT_METRICS_INTERVAL).await;