diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 9f94d6b73a8..c2906386be8 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -80,6 +80,7 @@ const STATUS_SUCCESS: &str = "success"; const PHASE_LOAD_STATE: &str = "load_state"; const PHASE_COMMIT: &str = "commit"; +const METRIC_RECEIVE_BATCH_LATENCY: &str = "mr_receive_batch_latency_seconds"; const METRIC_PROCESS_BATCH_DURATION: &str = "mr_process_batch_duration_seconds"; const METRIC_PROCESS_BATCH_PHASE_DURATION: &str = "mr_process_batch_phase_duration_seconds"; const METRIC_TIMED_OUT_MESSAGES_TOTAL: &str = "mr_timed_out_messages_total"; @@ -266,6 +267,8 @@ pub(crate) struct MessageRoutingMetrics { expected_batch_height: IntGauge, /// Registry version referenced in the most recently executed batch. registry_version: IntGauge, + /// How long Message Routing had to wait to receive the next batch. + receive_batch_latency: Histogram, /// Batch processing durations. process_batch_duration: Histogram, /// Most recently seen certified height, per remote subnet @@ -354,6 +357,12 @@ impl MessageRoutingMetrics { METRIC_REGISTRY_VERSION, "Registry version referenced in the most recently executed batch.", ), + receive_batch_latency: metrics_registry.histogram( + METRIC_RECEIVE_BATCH_LATENCY, + "How long Message Routing had to wait to receive the next batch.", + // 0.1ms - 5s + decimal_buckets(-4, 0), + ), process_batch_phase_duration: metrics_registry.histogram_vec( METRIC_PROCESS_BATCH_PHASE_DURATION, "Batch processing phase durations, by phase.", @@ -1327,12 +1336,18 @@ impl MessageRoutingImpl { ) -> Self { let (batch_sender, batch_receiver) = sync_channel(BATCH_QUEUE_BUFFER_SIZE); + let receive_batch_latency = metrics.receive_batch_latency.clone(); let _batch_processor_handle = JoinOnDrop::new( std::thread::Builder::new() .name("MR Batch Processor".to_string()) .spawn(move || { + let mut since = Instant::now(); while let Ok(batch) = batch_receiver.recv() { + receive_batch_latency.observe(since.elapsed().as_secs_f64()); + batch_processor.process_batch(batch); + + since = Instant::now(); } }) .expect("Can spawn a batch processing thread in MR"),