diff --git a/rs/query_stats/src/lib.rs b/rs/query_stats/src/lib.rs index 007eec6968e..99d95855529 100644 --- a/rs/query_stats/src/lib.rs +++ b/rs/query_stats/src/lib.rs @@ -134,6 +134,7 @@ impl QueryStatsCollector { .or_default() .saturating_accumulate(stats); + self.metrics.query_stats_collector.add(stats); self.metrics .query_stats_collector_num_canister_ids .set(state.len() as i64); diff --git a/rs/query_stats/src/metrics.rs b/rs/query_stats/src/metrics.rs index 34d2db573b7..c6c67cf3e74 100644 --- a/rs/query_stats/src/metrics.rs +++ b/rs/query_stats/src/metrics.rs @@ -1,22 +1,71 @@ use ic_metrics::{buckets::decimal_buckets, MetricsRegistry}; +use ic_types::batch::QueryStats; use prometheus::{HistogramVec, IntCounter, IntGauge}; pub(crate) const CRITICAL_ERROR_AGGREGATION_FAILURE: &str = "query_stats_aggregator_failure"; +/// A set of the statistics reported by this feature +/// +/// Occasionally, we want to export the metrics which contain the statistics reported +/// by various components. This struct is a helper to make this reporting more concise +#[derive(Clone, Debug)] +pub(crate) struct QueryStatsMetricsSet { + num_calls: IntGauge, + num_instructions: IntGauge, + request_bytes: IntGauge, + response_bytes: IntGauge, +} + +impl QueryStatsMetricsSet { + pub fn new(metrics_registry: &MetricsRegistry, name: &str) -> Self { + Self { + num_calls: metrics_registry.int_gauge( + format!("query_stats_{}_num_calls", name), + "Sum of calls".to_string(), + ), + num_instructions: metrics_registry.int_gauge( + format!("query_stats_{}_num_instructions", name), + "Sum of instructions".to_string(), + ), + request_bytes: metrics_registry.int_gauge( + format!("query_stats_{}_request_bytes", name), + "Sum of request bytes".to_string(), + ), + response_bytes: metrics_registry.int_gauge( + format!("query_stats_{}_response_bytes", name), + "Sum of response bytes".to_string(), + ), + } + } + + pub fn add(&self, query_stats: &QueryStats) { + self.num_calls.add(query_stats.num_calls as i64); + self.num_instructions + .add(query_stats.num_instructions as i64); + self.request_bytes + .add(query_stats.ingress_payload_size as i64); + self.response_bytes + .add(query_stats.egress_payload_size as i64); + } +} + /// Metrics for query stats collector /// /// The collector is responsible for locally collecting statistics for /// each query executed. It is not part of the replicated state machine. pub(crate) struct CollectorMetrics { + /// The statistics as currently reported by the collector + pub query_stats_collector: QueryStatsMetricsSet, /// The number of canister IDs registered in the collector for the current epoch. - pub(crate) query_stats_collector_num_canister_ids: IntGauge, + pub query_stats_collector_num_canister_ids: IntGauge, /// The epoch for which query calls are locally collected at the moment. - pub(crate) query_stats_collector_current_epoch: IntGauge, + pub query_stats_collector_current_epoch: IntGauge, } impl CollectorMetrics { pub(crate) fn new(metrics_registry: &MetricsRegistry) -> Self { Self { + query_stats_collector: QueryStatsMetricsSet::new(metrics_registry, "collector"), query_stats_collector_num_canister_ids: metrics_registry.int_gauge( "query_stats_collector_num_canister_ids", "Current number of canister ids in the query stats collector", @@ -37,6 +86,8 @@ impl CollectorMetrics { pub(crate) struct QueryStatsPayloadBuilderMetrics { /// Records the time it took to perform an operation pub(crate) query_stats_payload_builder_duration: HistogramVec, + /// Records the statistics received from the collector + pub(crate) query_stats_payload_builder_current: QueryStatsMetricsSet, /// The current epoch as seen by the payload builder. /// /// Should be slightly behind the current epoch of [`CollectorMetrics`] @@ -57,6 +108,10 @@ impl QueryStatsPayloadBuilderMetrics { decimal_buckets(-4, 0), &["operation"], ), + query_stats_payload_builder_current: QueryStatsMetricsSet::new( + metrics_registry, + "payload_builder_current", + ), query_stats_payload_builder_current_epoch: metrics_registry.int_gauge( "query_stats_payload_builder_current_epoch", "The current epoch as seen by the payload builder", @@ -73,29 +128,29 @@ impl QueryStatsPayloadBuilderMetrics { /// /// The query stats aggregator runs as part of the replicated state machine. /// It deterministically aggregates query stats received from consensus blocks. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct QueryStatsAggregatorMetrics { + /// Sum of stats received from the payload builder + pub(crate) query_stats_received: QueryStatsMetricsSet, /// The epoch for which we currently aggregate query stats. /// This is lower than the epoch for which we collect stats, as there is /// a delay for propagating local query stats via consensus blocks. - pub query_stats_aggregator_current_epoch: IntGauge, + pub(crate) query_stats_aggregator_current_epoch: IntGauge, /// The number of records stored in the unaggregateed state - pub query_stats_aggregator_num_records: IntGauge, - /// Sum of delivered call statistics - pub query_stats_delivered_num_calls: IntGauge, - /// Sum of delivered instruction statistics - pub query_stats_delivered_num_instructions: IntGauge, - /// Sum of delivered request bytes - pub query_stats_delivered_request_bytes: IntGauge, - /// Sum of delivered response bytes - pub query_stats_delivered_response_bytes: IntGauge, + pub(crate) query_stats_aggregator_num_records: IntGauge, + /// Sum of statistics delivered to the canisters + pub(crate) query_stats_delivered: QueryStatsMetricsSet, /// Critical error occuring in aggregator - pub query_stats_critical_error_aggregator_failure: IntCounter, + pub(crate) query_stats_critical_error_aggregator_failure: IntCounter, } impl QueryStatsAggregatorMetrics { pub fn new(metrics_registry: &MetricsRegistry) -> Self { Self { + query_stats_received: QueryStatsMetricsSet::new( + metrics_registry, + "aggregator_received", + ), query_stats_aggregator_current_epoch: metrics_registry.int_gauge( "query_stats_aggregator_current_epoch", "Current epoch of the query stats aggregator", @@ -104,22 +159,7 @@ impl QueryStatsAggregatorMetrics { "query_stats_aggregator_num_records", "The number of records stored in the unaggregateed state", ), - query_stats_delivered_num_calls: metrics_registry.int_gauge( - "query_stats_delivered_num_calls", - "Sum of delivered call statistics", - ), - query_stats_delivered_num_instructions: metrics_registry.int_gauge( - "query_stats_delivered_num_instructions", - "Sum of delivered instruction statistics", - ), - query_stats_delivered_request_bytes: metrics_registry.int_gauge( - "query_stats_delivered_request_bytes", - "Sum of delivered request bytes", - ), - query_stats_delivered_response_bytes: metrics_registry.int_gauge( - "query_stats_delivered_response_bytes", - "Sum of delivered response bytes", - ), + query_stats_delivered: QueryStatsMetricsSet::new(metrics_registry, "delivered"), query_stats_critical_error_aggregator_failure: metrics_registry .error_counter(CRITICAL_ERROR_AGGREGATION_FAILURE), } diff --git a/rs/query_stats/src/payload_builder.rs b/rs/query_stats/src/payload_builder.rs index 47e0b2dc602..09ec06db7e7 100644 --- a/rs/query_stats/src/payload_builder.rs +++ b/rs/query_stats/src/payload_builder.rs @@ -13,7 +13,7 @@ use ic_logger::{error, warn, ReplicaLogger}; use ic_metrics::MetricsRegistry; use ic_replicated_state::ReplicatedState; use ic_types::{ - batch::{LocalQueryStats, QueryStatsPayload, ValidationContext}, + batch::{LocalQueryStats, QueryStats, QueryStatsPayload, ValidationContext}, epoch_from_height, CanisterId, Height, NodeId, NumBytes, QueryStatsEpoch, }; use std::{ @@ -80,11 +80,24 @@ impl BatchPayloadBuilder for QueryStatsPayloadBuilderImpl { .start_timer(); match self.receiver.try_recv() { - Ok(new_epoch) => { - let Ok(mut epoch) = self.current_stats.write() else { + Ok(new_stats) => { + let Ok(mut current_stats) = self.current_stats.write() else { return vec![]; }; - *epoch = Some(new_epoch); + *current_stats = Some(new_stats); + + // Update the metrics about the received metrics + if let Some(current_stats) = current_stats.as_ref() { + let mut report = QueryStats::default(); + current_stats + .stats + .iter() + .for_each(|next_stats| report.saturating_accumulate(&next_stats.stats)); + + self.metrics + .query_stats_payload_builder_current + .add(&report); + }; } Err(TryRecvError::Empty) => (), Err(TryRecvError::Disconnected) => { diff --git a/rs/query_stats/src/state_machine.rs b/rs/query_stats/src/state_machine.rs index 03dbf203111..4d669cd1789 100644 --- a/rs/query_stats/src/state_machine.rs +++ b/rs/query_stats/src/state_machine.rs @@ -170,6 +170,7 @@ fn process_payload( query_stats: &QueryStatsPayload, state: &mut ReplicatedState, logger: &ReplicaLogger, + metrics: &QueryStatsAggregatorMetrics, ) -> bool { let state = &mut state.epoch_query_stats; @@ -199,6 +200,7 @@ fn process_payload( None => node.entry(query_stats.epoch).or_default(), }; + let mut received_stats = QueryStats::default(); for message in &query_stats.stats { let previous_value = stats.insert( message.canister_id, @@ -209,6 +211,8 @@ fn process_payload( egress_payload_size: message.stats.egress_payload_size, }, ); + received_stats.saturating_accumulate(&message.stats); + if previous_value.is_some() { error!( logger, @@ -219,6 +223,7 @@ fn process_payload( ); } } + metrics.query_stats_received.add(&received_stats); true } @@ -331,16 +336,10 @@ fn try_aggregate_one_epoch( query_stats_to_be_applied.push((canister_id, aggregated_stats)); } - let mut delivered_num_calls = 0; - let mut delivered_num_instructions = 0; - let mut delivered_request_bytes = 0; - let mut delivered_response_bytes = 0; + let mut delivered_query_stats = QueryStats::default(); for (canister_id, aggregated_stats) in query_stats_to_be_applied { - delivered_num_calls += aggregated_stats.num_calls; - delivered_num_instructions += aggregated_stats.num_instructions; - delivered_request_bytes += aggregated_stats.ingress_payload_size; - delivered_response_bytes += aggregated_stats.egress_payload_size; + delivered_query_stats.saturating_accumulate(&aggregated_stats); apply_query_stats_to_canister( &aggregated_stats, @@ -351,18 +350,7 @@ fn try_aggregate_one_epoch( ); } - metrics - .query_stats_delivered_num_calls - .add(delivered_num_calls as i64); - metrics - .query_stats_delivered_num_instructions - .add(delivered_num_instructions as i64); - metrics - .query_stats_delivered_request_bytes - .add(delivered_request_bytes as i64); - metrics - .query_stats_delivered_response_bytes - .add(delivered_response_bytes as i64); + metrics.query_stats_delivered.add(&delivered_query_stats); true } @@ -400,7 +388,7 @@ fn update_metrics(state: &ReplicatedState, metrics: &QueryStatsAggregatorMetrics let num_records: usize = state .stats .values() - .map(|records| records.values().len()) + .map(|epochs| epochs.values().map(|record| record.len()).sum::()) .sum(); metrics .query_stats_aggregator_num_records @@ -416,7 +404,7 @@ pub fn deliver_query_stats( logger: &ReplicaLogger, metrics: &QueryStatsAggregatorMetrics, ) { - if process_payload(query_stats, state, logger) { + if process_payload(query_stats, state, logger, metrics) { // While in theory is is guaranteed that `try_aggregate_one_epoch` will eventually return // `false`, the code is relatively complex and we don't want to rely on correct implementation // only.