Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve backlog metrics #3722

Merged
merged 13 commits into from
Dec 14, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Update the values of `backlog` metrics when clearing packets.
([\#3723](https://github.com/informalsystems/hermes/issues/3723))
18 changes: 18 additions & 0 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,15 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
unreceived_packets(self.dst_chain(), self.src_chain(), &self.path_id)
.map_err(LinkError::supervisor)?;

// Use queried unreceived packets to update the backlog metric
ibc_telemetry::global().update_backlog(
sequences.iter().map(|&sequence| sequence.into()).collect(),
&self.src_chain().id(),
self.src_channel_id(),
self.src_port_id(),
&self.dst_chain().id(),
);

let query_height = opt_query_height.unwrap_or(src_response_height);

// Skip: no relevant events found.
Expand Down Expand Up @@ -1170,6 +1179,15 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
return Ok(());
};

// Use queried unreceived acks to update the backlog metric
ibc_telemetry::global().update_backlog(
sequences.iter().map(|&sequence| sequence.into()).collect(),
&self.dst_chain().id(),
self.dst_channel_id(),
self.dst_port_id(),
&self.src_chain().id(),
);

let query_height = opt_query_height.unwrap_or(src_response_height);

// Skip: no relevant events found.
Expand Down
145 changes: 145 additions & 0 deletions crates/telemetry/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,38 @@ impl TelemetryState {
self.backlog_size.observe(&cx, total, labels);
}

/// Inserts in the backlog a new event for the given sequence number.
/// This happens when the relayer observed a new SendPacket event.
pub fn update_backlog(
&self,
sequences: Vec<u64>,
chain_id: &ChainId,
channel_id: &ChannelId,
port_id: &PortId,
counterparty_chain_id: &ChainId,
) {
// Unique identifier for a chain/channel/port.
let path_uid: PathIdentifier = PathIdentifier::new(
chain_id.to_string(),
channel_id.to_string(),
port_id.to_string(),
);

// Remove any sequence number from the backlog which isn't in the list of queried pending packets
// as they might have been relayed without the Hermes instance observing it
if let Some(path_backlog) = self.backlogs.get(&path_uid) {
let backlog = path_backlog.value();
let keys_to_remove: Vec<u64> = backlog
.iter()
.filter(|entry| !sequences.contains(entry.key()))
.map(|entry| *entry.key())
.collect();
for key in keys_to_remove.iter() {
self.backlog_remove(*key, chain_id, channel_id, port_id, counterparty_chain_id)
romac marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// Evicts from the backlog the event for the given sequence number.
/// Removing events happens when the relayer observed either an acknowledgment
/// or a timeout for a packet sequence number, which means that the corresponding
Expand Down Expand Up @@ -1168,3 +1200,116 @@ impl AggregatorSelector for CustomAggregatorSelector {
}
}
}

#[cfg(feature = "telemetry")]
mod tests {
romac marked this conversation as resolved.
Show resolved Hide resolved
use prometheus::proto::Metric;

use super::*;

#[test]
fn insert_remove_backlog() {
let state = TelemetryState::new(
Range {
start: 0,
end: 5000,
},
5,
Range {
start: 0,
end: 5000,
},
5,
);

let chain_id = ChainId::from_string("chain-test");
let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
let channel_id = ChannelId::new(0);
let port_id = PortId::transfer();

state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_remove(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_remove(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);

let metrics = state.exporter.registry().gather().clone();
let backlog_size = metrics
.iter()
.find(|metric| metric.get_name() == "backlog_size")
.unwrap();
assert!(
assert_metric_value(backlog_size.get_metric(), 3),
"expected backlog_size to be 3"
);
let backlog_oldest_sequence = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_oldest_sequence")
.unwrap();
assert!(
assert_metric_value(backlog_oldest_sequence.get_metric(), 2),
"expected backlog_oldest_sequence to be 2"
);
}

#[test]
fn update_backlog() {
let state = TelemetryState::new(
Range {
start: 0,
end: 5000,
},
5,
Range {
start: 0,
end: 5000,
},
5,
);

let chain_id = ChainId::from_string("chain-test");
let counterparty_chain_id = ChainId::from_string("counterpartychain-test");
let channel_id = ChannelId::new(0);
let port_id = PortId::transfer();

state.backlog_insert(1, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(2, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(3, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(4, &chain_id, &channel_id, &port_id, &counterparty_chain_id);
state.backlog_insert(5, &chain_id, &channel_id, &port_id, &counterparty_chain_id);

state.update_backlog(
vec![5],
&chain_id,
&channel_id,
&port_id,
&counterparty_chain_id,
);

let metrics = state.exporter.registry().gather().clone();
let backlog_size = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_size")
.unwrap();
assert!(
assert_metric_value(backlog_size.get_metric(), 1),
"expected backlog_size to be 1"
);
let backlog_oldest_sequence = metrics
.iter()
.find(|&metric| metric.get_name() == "backlog_oldest_sequence")
.unwrap();
assert!(
assert_metric_value(backlog_oldest_sequence.get_metric(), 5),
"expected backlog_oldest_sequence to be 5"
);
}

fn assert_metric_value(metric: &[Metric], expected: u64) -> bool {
metric
.iter()
.any(|m| m.get_gauge().get_value() as u64 == expected)
}
}
1 change: 1 addition & 0 deletions guide/src/documentation/telemetry/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ If this metric is increasing, it signals that the packet queue is increasing and
- If the `backlog_oldest_sequence` remains unchanged for more than a few minutes, that means that the packet with the respective sequence number is likely blocked
and cannot be relayed. To understand for how long the packet is block, Hermes will populate `backlog_oldest_timestamp` with the local time when it first observed
the `backlog_oldest_sequence` that is blocked.
- __NOTE__: The Hermes instance might miss the acknowledgment of an observed IBC packets relayed, this will cause the `backlog_*` metrics to contain an invalid value. In order to minimise this issue, whenever the Hermes instance clears packets the `backlog_*` metrics will be updated using the queried pending packets.

## How efficient and how secure is the IBC status on each network?

Expand Down
Loading