diff --git a/.changelog/unreleased/bug-fixes/ibc-telemetry/3723-fix-backlog-metrics.md b/.changelog/unreleased/bug-fixes/ibc-telemetry/3723-fix-backlog-metrics.md new file mode 100644 index 0000000000..fa95766987 --- /dev/null +++ b/.changelog/unreleased/bug-fixes/ibc-telemetry/3723-fix-backlog-metrics.md @@ -0,0 +1,4 @@ +- Update the values of `backlog` metrics when clearing packets. + Change the `backlog_oldest_timestamp` to `backlog_latest_update_timestamp` + which shows the last time the `backlog` metrics have been updated. + ([\#3723](https://github.com/informalsystems/hermes/issues/3723)) \ No newline at end of file diff --git a/crates/relayer/src/chain/counterparty.rs b/crates/relayer/src/chain/counterparty.rs index be9425aae2..b99699d3b0 100644 --- a/crates/relayer/src/chain/counterparty.rs +++ b/crates/relayer/src/chain/counterparty.rs @@ -32,6 +32,7 @@ use crate::channel::ChannelError; use crate::client_state::IdentifiedAnyClientState; use crate::path::PathIdentifiers; use crate::supervisor::Error; +use crate::telemetry; pub fn counterparty_chain_from_connection( src_chain: &impl ChainHandle, @@ -502,6 +503,19 @@ pub fn unreceived_packets( &path.counterparty_channel_id, )?; + telemetry!( + update_backlog, + commit_sequences + .iter() + .map(|s| u64::from(*s)) + .collect::>() + .clone(), + &counterparty_chain.id(), + &path.counterparty_channel_id, + &path.counterparty_port_id, + &chain.id() + ); + let packet_seq_nrs = unreceived_packets_sequences(chain, &path.port_id, &path.channel_id, commit_sequences)?; diff --git a/crates/telemetry/src/state.rs b/crates/telemetry/src/state.rs index ab27df1fb8..b0aff925df 100644 --- a/crates/telemetry/src/state.rs +++ b/crates/telemetry/src/state.rs @@ -170,9 +170,9 @@ pub struct TelemetryState { /// SendPacket events were relayed. backlog_oldest_sequence: ObservableGauge, - /// Record the timestamp related to `backlog_oldest_sequence`. + /// Record the timestamp of the last time the `backlog_*` metrics have been updated. /// The timestamp is the time passed since since the unix epoch in seconds. - backlog_oldest_timestamp: ObservableGauge, + backlog_latest_update_timestamp: ObservableGauge, /// Records the length of the backlog, i.e., how many packets are pending. backlog_size: ObservableGauge, @@ -350,10 +350,10 @@ impl TelemetryState { .with_description("Sequence number of the oldest SendPacket event in the backlog") .init(), - backlog_oldest_timestamp: meter - .u64_observable_gauge("backlog_oldest_timestamp") + backlog_latest_update_timestamp: meter + .u64_observable_gauge("backlog_latest_update_timestamp") .with_unit(Unit::new("seconds")) - .with_description("Local timestamp for the oldest SendPacket event in the backlog") + .with_description("Local timestamp for the last time the backlog metrics have been updated") .init(), backlog_size: meter @@ -457,7 +457,7 @@ impl TelemetryState { } self.backlog_oldest_sequence.observe(&cx, 0, labels); - self.backlog_oldest_timestamp.observe(&cx, 0, labels); + self.backlog_latest_update_timestamp.observe(&cx, 0, labels); self.backlog_size.observe(&cx, 0, labels); } @@ -922,8 +922,7 @@ impl TelemetryState { }; // Update the backlog with the incoming data and retrieve the oldest values - let (oldest_sn, oldest_ts, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid) - { + let (oldest_sn, total) = if let Some(path_backlog) = self.backlogs.get(&path_uid) { // Avoid having the inner backlog map growing more than a given threshold, by removing // the oldest sequence number entry. if path_backlog.len() > BACKLOG_RESET_THRESHOLD { @@ -935,20 +934,11 @@ impl TelemetryState { // Return the oldest event information to be recorded in telemetry if let Some(min) = path_backlog.iter().map(|v| *v.key()).min() { - if let Some(oldest) = path_backlog.get(&min) { - (min, *oldest.value(), path_backlog.len() as u64) - } else { - // Timestamp was not found, this should not happen, record a 0 ts. - (min, 0, path_backlog.len() as u64) - } + (min, path_backlog.len() as u64) } else { // We just inserted a new key/value, so this else branch is unlikely to activate, // but it can happen in case of concurrent updates to the backlog. - ( - EMPTY_BACKLOG_SYMBOL, - EMPTY_BACKLOG_SYMBOL, - EMPTY_BACKLOG_SYMBOL, - ) + (EMPTY_BACKLOG_SYMBOL, EMPTY_BACKLOG_SYMBOL) } } else { // If there is no inner backlog for this path, create a new map to store it. @@ -958,16 +948,57 @@ impl TelemetryState { self.backlogs.insert(path_uid, new_path_backlog); // Return the current event information to be recorded in telemetry - (seq_nr, timestamp, 1) + (seq_nr, 1) }; // Update metrics to reflect the new state of the backlog self.backlog_oldest_sequence.observe(&cx, oldest_sn, labels); - self.backlog_oldest_timestamp - .observe(&cx, oldest_ts, labels); + self.backlog_latest_update_timestamp + .observe(&cx, timestamp, labels); self.backlog_size.observe(&cx, total, labels); } + /// Inserts in the backlog a new event for the given sequence number. + /// This happens when the relayer observed a new SendPacket event. + pub fn update_backlog( + &self, + sequences: Vec, + chain_id: &ChainId, + channel_id: &ChannelId, + port_id: &PortId, + counterparty_chain_id: &ChainId, + ) { + // Unique identifier for a chain/channel/port. + let path_uid: PathIdentifier = PathIdentifier::new( + chain_id.to_string(), + channel_id.to_string(), + port_id.to_string(), + ); + + // This condition is done in order to avoid having an incorrect `backlog_latest_update_timestamp`. + // If the sequences is an empty vector by removing the entries using `backlog_remove` the `backlog_latest_update_timestamp` + // will only be updated if the current backlog is not empty. + // If the sequences is not empty, then it is possible to simple remove the backlog for that path and insert the sequences. + if sequences.is_empty() { + if let Some(path_backlog) = self.backlogs.get(&path_uid) { + let current_keys: Vec = path_backlog + .value() + .iter() + .map(|entry| *entry.key()) + .collect(); + + for key in current_keys.iter() { + self.backlog_remove(*key, chain_id, channel_id, port_id, counterparty_chain_id) + } + } + } else { + self.backlogs.remove(&path_uid); + for key in sequences.iter() { + self.backlog_insert(*key, chain_id, channel_id, port_id, counterparty_chain_id) + } + } + } + /// Evicts from the backlog the event for the given sequence number. /// Removing events happens when the relayer observed either an acknowledgment /// or a timeout for a packet sequence number, which means that the corresponding @@ -996,16 +1027,20 @@ impl TelemetryState { KeyValue::new("port", port_id.to_string()), ]; + // Retrieve local timestamp when this SendPacket event was recorded. + let now = Time::now(); + let timestamp = match now.duration_since(Time::unix_epoch()) { + Ok(ts) => ts.as_secs(), + Err(_) => 0, + }; + if let Some(path_backlog) = self.backlogs.get(&path_uid) { if path_backlog.remove(&seq_nr).is_some() { + // If the entry was removed update the latest update timestamp. + self.backlog_latest_update_timestamp + .observe(&cx, timestamp, labels); // The oldest pending sequence number is the minimum key in the inner (path) backlog. if let Some(min_key) = path_backlog.iter().map(|v| *v.key()).min() { - if let Some(oldest) = path_backlog.get(&min_key) { - self.backlog_oldest_timestamp - .observe(&cx, *oldest.value(), labels); - } else { - self.backlog_oldest_timestamp.observe(&cx, 0, labels); - } self.backlog_oldest_sequence.observe(&cx, min_key, labels); self.backlog_size .observe(&cx, path_backlog.len() as u64, labels); @@ -1013,8 +1048,6 @@ impl TelemetryState { // No mimimum found, update the metrics to reflect an empty backlog self.backlog_oldest_sequence .observe(&cx, EMPTY_BACKLOG_SYMBOL, labels); - self.backlog_oldest_timestamp - .observe(&cx, EMPTY_BACKLOG_SYMBOL, labels); self.backlog_size.observe(&cx, EMPTY_BACKLOG_SYMBOL, labels); } } @@ -1156,7 +1189,7 @@ impl AggregatorSelector for CustomAggregatorSelector { match descriptor.name() { "wallet_balance" => Some(Arc::new(last_value())), "backlog_oldest_sequence" => Some(Arc::new(last_value())), - "backlog_oldest_timestamp" => Some(Arc::new(last_value())), + "backlog_latest_update_timestamp" => Some(Arc::new(last_value())), "backlog_size" => Some(Arc::new(last_value())), // Prometheus' supports only collector for histogram, sum, and last value aggregators. // https://docs.rs/opentelemetry-prometheus/0.10.0/src/opentelemetry_prometheus/lib.rs.html#411-418 @@ -1168,3 +1201,169 @@ impl AggregatorSelector for CustomAggregatorSelector { } } } + +#[cfg(test)] +mod tests { + use prometheus::proto::Metric; + + use super::*; + + #[test] + fn insert_remove_backlog() { + let state = TelemetryState::new( + Range { + start: 0, + end: 5000, + }, + 5, + Range { + start: 0, + end: 5000, + }, + 5, + ); + + let chain_id = ChainId::from_string("chain-test"); + let counterparty_chain_id = ChainId::from_string("counterpartychain-test"); + let channel_id = ChannelId::new(0); + let port_id = PortId::transfer(); + + state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + + let metrics = state.exporter.registry().gather().clone(); + let backlog_size = metrics + .iter() + .find(|metric| metric.get_name() == "backlog_size") + .unwrap(); + assert!( + assert_metric_value(backlog_size.get_metric(), 3), + "expected backlog_size to be 3" + ); + let backlog_oldest_sequence = metrics + .iter() + .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .unwrap(); + assert!( + assert_metric_value(backlog_oldest_sequence.get_metric(), 2), + "expected backlog_oldest_sequence to be 2" + ); + } + + #[test] + fn update_backlog() { + let state = TelemetryState::new( + Range { + start: 0, + end: 5000, + }, + 5, + Range { + start: 0, + end: 5000, + }, + 5, + ); + + let chain_id = ChainId::from_string("chain-test"); + let counterparty_chain_id = ChainId::from_string("counterpartychain-test"); + let channel_id = ChannelId::new(0); + let port_id = PortId::transfer(); + + state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + + state.update_backlog( + vec![5], + &chain_id, + &channel_id, + &port_id, + &counterparty_chain_id, + ); + + let metrics = state.exporter.registry().gather().clone(); + let backlog_size = metrics + .iter() + .find(|&metric| metric.get_name() == "backlog_size") + .unwrap(); + assert!( + assert_metric_value(backlog_size.get_metric(), 1), + "expected backlog_size to be 1" + ); + let backlog_oldest_sequence = metrics + .iter() + .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .unwrap(); + assert!( + assert_metric_value(backlog_oldest_sequence.get_metric(), 5), + "expected backlog_oldest_sequence to be 5" + ); + } + + #[test] + fn update_backlog_empty() { + let state = TelemetryState::new( + Range { + start: 0, + end: 5000, + }, + 5, + Range { + start: 0, + end: 5000, + }, + 5, + ); + + let chain_id = ChainId::from_string("chain-test"); + let counterparty_chain_id = ChainId::from_string("counterpartychain-test"); + let channel_id = ChannelId::new(0); + let port_id = PortId::transfer(); + + state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id); + + state.update_backlog( + vec![], + &chain_id, + &channel_id, + &port_id, + &counterparty_chain_id, + ); + + let metrics = state.exporter.registry().gather().clone(); + let backlog_size = metrics + .iter() + .find(|&metric| metric.get_name() == "backlog_size") + .unwrap(); + assert!( + assert_metric_value(backlog_size.get_metric(), 0), + "expected backlog_size to be 0" + ); + let backlog_oldest_sequence = metrics + .iter() + .find(|&metric| metric.get_name() == "backlog_oldest_sequence") + .unwrap(); + assert!( + assert_metric_value(backlog_oldest_sequence.get_metric(), 0), + "expected backlog_oldest_sequence to be 0" + ); + } + + fn assert_metric_value(metric: &[Metric], expected: u64) -> bool { + metric + .iter() + .any(|m| m.get_gauge().get_value() as u64 == expected) + } +} diff --git a/guide/src/assets/grafana_template.json b/guide/src/assets/grafana_template.json index 2d7e91454a..80846ea8cf 100644 --- a/guide/src/assets/grafana_template.json +++ b/guide/src/assets/grafana_template.json @@ -958,7 +958,7 @@ }, "editorMode": "builder", "exemplar": false, - "expr": "backlog_oldest_timestamp{job=\"hermes\"}", + "expr": "backlog_latest_update_timestamp{job=\"hermes\"}", "format": "table", "hide": false, "instant": true, diff --git a/guide/src/documentation/telemetry/integration.md b/guide/src/documentation/telemetry/integration.md index 451f95b9a7..a33af5f656 100644 --- a/guide/src/documentation/telemetry/integration.md +++ b/guide/src/documentation/telemetry/integration.md @@ -22,10 +22,10 @@ acknowledgment_packets_confirmed_total{dst_chain="ibc-1",dst_channel="channel-0" # TYPE backlog_oldest_sequence gauge backlog_oldest_sequence{chain="ibc-0",channel="channel-0",counterparty="ibc-1",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 backlog_oldest_sequence{chain="ibc-1",channel="channel-0",counterparty="ibc-0",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 -# HELP backlog_oldest_timestamp Local timestamp for the oldest SendPacket event in the backlog -# TYPE backlog_oldest_timestamp gauge -backlog_oldest_timestamp{chain="ibc-0",channel="channel-0",counterparty="ibc-1",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 -backlog_oldest_timestamp{chain="ibc-1",channel="channel-0",counterparty="ibc-0",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 +# HELP backlog_latest_update_timestamp Local timestamp for the last time the backlog metrics have been updated +# TYPE backlog_latest_update_timestamp gauge +backlog_latest_update_timestamp{chain="ibc-0",channel="channel-0",counterparty="ibc-1",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 +backlog_latest_update_timestamp{chain="ibc-1",channel="channel-0",counterparty="ibc-0",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 # HELP backlog_size Total number of SendPacket events in the backlog # TYPE backlog_size gauge backlog_size{chain="ibc-0",channel="channel-0",counterparty="ibc-1",port="transfer",service_name="unknown_service",otel_scope_name="hermes",otel_scope_version=""} 0 diff --git a/guide/src/documentation/telemetry/operators.md b/guide/src/documentation/telemetry/operators.md index c550232beb..96204843c4 100644 --- a/guide/src/documentation/telemetry/operators.md +++ b/guide/src/documentation/telemetry/operators.md @@ -121,7 +121,7 @@ Since Hermes v1, we also introduced 3 metrics that sketch the backlog status of | Name | Description | OpenTelemetry type | Configuration Dependencies | | -------------------------- | -------------------------------------------------------------- | ------------------- | -------------------------- | | `backlog_oldest_sequence` | Sequence number of the oldest SendPacket event in the backlog | `u64` ValueRecorder | Packet workers enabled | -| `backlog_oldest_timestamp` | Local timestamp for the oldest SendPacket event in the backlog | `u64` ValueRecorder | Packet workers enabled | +| `backlog_latest_update_timestamp` | Local timestamp for the last time the backlog metrics have been updated | `u64` ValueRecorder | Packet workers enabled | | `backlog_size` | Total number of SendPacket events in the backlog | `u64` ValueRecorder | Packet workers enabled | @@ -129,9 +129,8 @@ Notes: - The `backlog_size` defines how many IBC packets users sent and were not yet relayed (i.e., received on the destination network, or timed-out). If this metric is increasing, it signals that the packet queue is increasing and there may be some errors in the Hermes logs that need your attention. -- If the `backlog_oldest_sequence` remains unchanged for more than a few minutes, that means that the packet with the respective sequence number is likely blocked -and cannot be relayed. To understand for how long the packet is block, Hermes will populate `backlog_oldest_timestamp` with the local time when it first observed -the `backlog_oldest_sequence` that is blocked. +- The `backlog_latest_update_timestamp` is used to get information on the reliability of the `backlog_*` metrics. If the timestamp doesn't change it means there might be an issue with the metrics. +- __NOTE__: The Hermes instance might miss the acknowledgment of an observed IBC packets relayed, this will cause the `backlog_*` metrics to contain an invalid value. In order to minimise this issue, whenever the Hermes instance clears packets the `backlog_*` metrics will be updated using the queried pending packets. ## How efficient and how secure is the IBC status on each network?