Skip to content

Commit

Permalink
Bring back metrics (#24)
Browse files Browse the repository at this point in the history
* Config sanity check

* Add max_queued_txs

* Update metrics

* Cleanup Cargo.toml

* Add log to start service

* Revert "Add max_queued_txs"

This reverts commit 83c517d.

* Add shutdown listening (non-graceful)

* clippy + fmt
  • Loading branch information
Dzejkop authored Feb 29, 2024
1 parent 9be5bfb commit 683c35c
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 89 deletions.
36 changes: 7 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ default-run = "tx-sitter"
members = ["crates/*"]

[dependencies]
async-trait = "0.1.74"

## AWS
aws-config = { version = "1.0.1" }
aws-credential-types = { version = "1.0.1", features = [
Expand All @@ -19,6 +17,15 @@ aws-sdk-kms = "1.3.0"
aws-smithy-runtime-api = "1.0.2"
aws-smithy-types = "1.0.2"
aws-types = "1.0.1"

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" }

## External
async-trait = "0.1.74"
axum = { version = "0.6.20", features = ["headers"] }
base64 = "0.21.5"
bigdecimal = "0.4.2"
Expand All @@ -36,18 +43,12 @@ humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
itertools = "0.12.0"
metrics = "0.21.1"
metrics = "0.22.1"
num-bigint = "0.4.4"
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
rand = "0.8.5"
reqwest = { version = "0.11.13", default-features = false, features = [
"rustls-tls",
] }

## Other
serde = "1.0.136"
serde_json = "1.0.91"
sha3 = "0.10.8"
Expand All @@ -62,9 +63,6 @@ sqlx = { version = "0.7.2", features = [
"bigdecimal",
] }
strum = { version = "0.25.0", features = ["derive"] }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", rev = "ec8ba6d4da45fdb98f900d8d4c8e1a09186894b4" }
thiserror = "1.0.50"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
toml = "0.8.8"
Expand Down
57 changes: 57 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;

use config::FileFormat;
use serde::{Deserialize, Serialize};

use crate::api_key::ApiKey;

pub fn load_config<'a>(
config_files: impl Iterator<Item = &'a Path>,
) -> eyre::Result<Config> {
let mut settings = config::Config::builder();

for config_file in config_files {
settings = settings.add_source(
config::File::from(config_file).format(FileFormat::Toml),
);
}

let settings = settings
.add_source(
config::Environment::with_prefix("TX_SITTER").separator("__"),
)
.add_source(
config::Environment::with_prefix("TX_SITTER_EXT")
.separator("__")
.try_parsing(true)
.list_separator(","),
)
.build()?;

let config = settings.try_deserialize::<Config>()?;

Ok(config)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct Config {
Expand Down Expand Up @@ -262,4 +292,31 @@ mod tests {

assert_eq!(toml, WITH_DB_PARTS);
}

#[test]
fn env_config_test() {
std::env::set_var("TX_SITTER__DATABASE__KIND", "parts");
std::env::set_var("TX_SITTER__DATABASE__HOST", "dbHost");
std::env::set_var("TX_SITTER__DATABASE__PORT", "dbPort");
std::env::set_var("TX_SITTER__DATABASE__DATABASE", "dbName");
std::env::set_var("TX_SITTER__DATABASE__USERNAME", "dbUsername");
std::env::set_var("TX_SITTER__DATABASE__PASSWORD", "dbPassword");
std::env::set_var("TX_SITTER__SERVICE__ESCALATION_INTERVAL", "1m");
std::env::set_var("TX_SITTER__SERVICE__DATADOG_ENABLED", "true");
std::env::set_var("TX_SITTER__SERVICE__STATSD_ENABLED", "true");
std::env::set_var("TX_SITTER__SERVER__HOST", "0.0.0.0:8080");
std::env::set_var("TX_SITTER__SERVER__USERNAME", "authUsername");
std::env::set_var("TX_SITTER__SERVER__PASSWORD", "authPassword");
std::env::set_var("TX_SITTER__KEYS__KIND", "kms");

let config = load_config(std::iter::empty()).unwrap();

assert!(config.service.statsd_enabled);
assert!(config.service.datadog_enabled);
assert_eq!(config.service.escalation_interval, Duration::from_secs(60));
assert_eq!(
config.database.to_connection_string(),
"postgres://dbUsername:dbPassword@dbHost:dbPort/dbName"
);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod keys;
pub mod serde_utils;
pub mod server;
pub mod service;
pub mod shutdown;
pub mod task_runner;
pub mod tasks;
pub mod types;
29 changes: 6 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::path::PathBuf;

use clap::Parser;
use config::FileFormat;
use telemetry_batteries::metrics::statsd::StatsdBattery;
use telemetry_batteries::tracing::datadog::DatadogBattery;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use tx_sitter::config::Config;
use tx_sitter::config::load_config;
use tx_sitter::service::Service;
use tx_sitter::shutdown::spawn_await_shutdown_task;

#[derive(Parser)]
#[command(author, version, about)]
Expand All @@ -35,27 +35,7 @@ async fn main() -> eyre::Result<()> {
dotenv::from_path(path)?;
}

let mut settings = config::Config::builder();

for arg in &args.config {
settings = settings.add_source(
config::File::from(arg.as_ref()).format(FileFormat::Toml),
);
}

let settings = settings
.add_source(
config::Environment::with_prefix("TX_SITTER").separator("__"),
)
.add_source(
config::Environment::with_prefix("TX_SITTER_EXT")
.separator("__")
.try_parsing(true)
.list_separator(","),
)
.build()?;

let config = settings.try_deserialize::<Config>()?;
let config = load_config(args.config.iter().map(PathBuf::as_ref))?;

if config.service.datadog_enabled {
DatadogBattery::init(None, "tx-sitter-monolith", None, true);
Expand All @@ -76,6 +56,9 @@ async fn main() -> eyre::Result<()> {
)?;
}

spawn_await_shutdown_task();

tracing::info!(?config, "Starting service");
let service = Service::new(config).await?;
service.wait().await?;

Expand Down
28 changes: 28 additions & 0 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use core::panic;

use tokio::signal::unix::{signal, SignalKind};

pub fn spawn_await_shutdown_task() {
tokio::spawn(async {
let result = await_shutdown_signal().await;
if let Err(err) = result {
tracing::error!("Error while waiting for shutdown signal: {}", err);
panic!("Error while waiting for shutdown signal: {}", err);
}

tracing::info!("Shutdown complete");
std::process::exit(0);
});
}

pub async fn await_shutdown_signal() -> eyre::Result<()> {
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;

tokio::select! {
_ = sigint.recv() => { tracing::info!("SIGINT received, shutting down"); }
_ = sigterm.recv() => { tracing::info!("SIGTERM received, shutting down"); }
};

Ok(())
}
2 changes: 1 addition & 1 deletion src/tasks/escalate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn escalate_txs(app: &App) -> eyre::Result<()> {
let mut futures = FuturesUnordered::new();

for (relayer_id, txs) in txs_for_escalation {
futures.push(escalate_relayer_txs(&app, relayer_id, txs));
futures.push(escalate_relayer_txs(app, relayer_id, txs));
}

while let Some(result) = futures.next().await {
Expand Down
17 changes: 6 additions & 11 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn index_block(
"Tx mined"
);

metrics::increment_counter!("tx_mined", &metric_labels);
metrics::counter!("tx_mined", &metric_labels).increment(1);
}

let relayers = app.db.get_relayers_by_chain_id(chain_id).await?;
Expand Down Expand Up @@ -171,29 +171,24 @@ pub async fn estimate_gas(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
.await?;

let labels = [("chain_id", chain_id.to_string())];
metrics::gauge!(
"gas_price",
gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR,
&labels
);
metrics::gauge!(
"base_fee_per_gas",
metrics::gauge!("gas_price", &labels)
.set(gas_price.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR);
metrics::gauge!("base_fee_per_gas", &labels).set(
fee_estimates.base_fee_per_gas.as_u64() as f64
* GAS_PRICE_FOR_METRICS_FACTOR,
&labels
);

for (i, percentile) in FEE_PERCENTILES.iter().enumerate() {
let percentile_fee = fee_estimates.percentile_fees[i];

metrics::gauge!(
"percentile_fee",
percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR,
&[
("chain_id", chain_id.to_string()),
("percentile", percentile.to_string()),
]
);
)
.set(percentile_fee.as_u64() as f64 * GAS_PRICE_FOR_METRICS_FACTOR);
}

tokio::time::sleep(Duration::from_secs(
Expand Down
21 changes: 8 additions & 13 deletions src/tasks/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,14 @@ pub async fn emit_metrics(app: Arc<App>) -> eyre::Result<()> {
// TODO: Add labels for env, etc.
let labels = [("chain_id", chain_id.to_string())];

metrics::gauge!("pending_txs", stats.pending_txs as f64, &labels);
metrics::gauge!("mined_txs", stats.mined_txs as f64, &labels);
metrics::gauge!(
"finalized_txs",
stats.finalized_txs as f64,
&labels
);
metrics::gauge!(
"total_indexed_blocks",
stats.total_indexed_blocks as f64,
&labels
);
metrics::gauge!("block_txs", stats.block_txs as f64, &labels);
metrics::gauge!("pending_txs", &labels)
.set(stats.pending_txs as f64);
metrics::gauge!("mined_txs", &labels).set(stats.mined_txs as f64);
metrics::gauge!("finalized_txs", &labels)
.set(stats.finalized_txs as f64);
metrics::gauge!("total_indexed_blocks", &labels)
.set(stats.total_indexed_blocks as f64);
metrics::gauge!("block_txs", &labels).set(stats.block_txs as f64);
}

tokio::time::sleep(EMIT_METRICS_INTERVAL).await;
Expand Down

0 comments on commit 683c35c

Please sign in to comment.