From 03473ca3da738208b6d3bc2247aec253c92b8c83 Mon Sep 17 00:00:00 2001 From: Julio Gonzalez Date: Tue, 31 Dec 2024 15:38:33 +0100 Subject: [PATCH] Refactor. * Move metrics to another compilation unit. * Add tests to metrics. * Refactor builder to use `url` and `heartbeat` properties instead of ddtelemetry::config::Config. * Add basic documentation. --- data-pipeline/src/telemetry/metrics.rs | 100 ++++++++++++++++ data-pipeline/src/telemetry/mod.rs | 160 +++++++++++-------------- 2 files changed, 169 insertions(+), 91 deletions(-) create mode 100644 data-pipeline/src/telemetry/metrics.rs diff --git a/data-pipeline/src/telemetry/metrics.rs b/data-pipeline/src/telemetry/metrics.rs new file mode 100644 index 0000000000..dd5780322e --- /dev/null +++ b/data-pipeline/src/telemetry/metrics.rs @@ -0,0 +1,100 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use datadog_trace_utils::trace_utils::SendDataResult; +use std::collections::HashMap; + +/// Structure to accumulate partial results coming from sending traces to the agent. +#[derive(Default)] +pub struct Metrics { + pub api_requests: u64, + pub api_responses_count_per_code: HashMap, + pub api_errors_timeout: u64, + pub api_errors_network: u64, + pub api_errors_status_code: u64, + pub bytes_sent: u64, + pub chunks_sent: u64, + pub chunks_dropped: u64, +} + +impl Metrics { + /// Updates the metric internal properties based on `result` contents. + pub fn update(&mut self, result: &SendDataResult) { + self.api_requests += result.requests_count; + self.api_errors_timeout += result.errors_timeout; + self.api_errors_network += result.errors_network; + self.api_errors_status_code += result.errors_status_code; + self.bytes_sent += result.bytes_sent; + self.chunks_sent += result.chunks_sent; + self.chunks_dropped += result.chunks_dropped; + + for (status_code, count) in &result.responses_count_per_code { + *self + .api_responses_count_per_code + .entry(*status_code) + .or_default() += count; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn update_test() { + let mut result = SendDataResult::default(); + let mut metrics = Metrics::default(); + + assert_eq!(metrics.api_requests, 0); + assert_eq!(metrics.api_errors_timeout, 0); + assert_eq!(metrics.api_errors_network, 0); + assert_eq!(metrics.api_errors_status_code, 0); + assert_eq!(metrics.bytes_sent, 0); + assert_eq!(metrics.chunks_dropped, 0); + assert!(metrics.api_responses_count_per_code.is_empty()); + + metrics.update(&result); + assert_eq!(metrics.api_requests, 0); + assert_eq!(metrics.api_errors_timeout, 0); + assert_eq!(metrics.api_errors_network, 0); + assert_eq!(metrics.api_errors_status_code, 0); + assert_eq!(metrics.bytes_sent, 0); + assert_eq!(metrics.chunks_dropped, 0); + assert!(metrics.api_responses_count_per_code.is_empty()); + + result.requests_count = 1; + result.chunks_dropped = 1; + result.bytes_sent = 1; + result.errors_timeout = 1; + result.errors_network = 1; + result.errors_status_code = 1; + result.responses_count_per_code.insert(200, 1); + + metrics.update(&result); + assert_eq!(metrics.api_requests, 1); + assert_eq!(metrics.api_errors_timeout, 1); + assert_eq!(metrics.api_errors_network, 1); + assert_eq!(metrics.api_errors_status_code, 1); + assert_eq!(metrics.bytes_sent, 1); + assert_eq!(metrics.chunks_dropped, 1); + assert_eq!(metrics.api_responses_count_per_code.len(), 1); + assert_eq!( + *metrics.api_responses_count_per_code.get(&200_u16).unwrap(), + 1 + ); + + metrics.update(&result); + assert_eq!(metrics.api_requests, 2); + assert_eq!(metrics.api_errors_timeout, 2); + assert_eq!(metrics.api_errors_network, 2); + assert_eq!(metrics.api_errors_status_code, 2); + assert_eq!(metrics.bytes_sent, 2); + assert_eq!(metrics.chunks_dropped, 2); + assert_eq!(metrics.api_responses_count_per_code.len(), 1); + assert_eq!( + *metrics.api_responses_count_per_code.get(&200_u16).unwrap(), + 2 + ); + } +} diff --git a/data-pipeline/src/telemetry/mod.rs b/data-pipeline/src/telemetry/mod.rs index c3ed3179fa..baa64a5797 100644 --- a/data-pipeline/src/telemetry/mod.rs +++ b/data-pipeline/src/telemetry/mod.rs @@ -1,8 +1,9 @@ // Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +pub mod metrics; +use crate::telemetry::metrics::Metrics; use anyhow::Result; -use datadog_trace_utils::trace_utils::SendDataResult; use ddcommon::tag; use ddcommon::tag::Tag; use ddtelemetry::data::metrics::{MetricNamespace, MetricType}; @@ -15,80 +16,58 @@ use std::time::Duration; use tokio::select; use tokio::task::JoinHandle; -#[derive(Default)] -pub struct Metrics { - pub active_sessions: usize, - pub api_requests: u64, - pub api_responses_count_per_code: HashMap, - pub api_errors_timeout: u64, - pub api_errors_network: u64, - pub api_errors_status_code: u64, - pub bytes_sent: u64, - pub chunks_sent: u64, - pub chunks_dropped: u64, - pub submitted_payloads: u64, - pub memory_usage: usize, -} - -impl Metrics { - pub fn update(&mut self, result: &SendDataResult) { - self.api_requests += result.requests_count; - self.api_errors_timeout += result.errors_timeout; - self.api_errors_network += result.errors_network; - self.api_errors_status_code += result.errors_status_code; - self.bytes_sent += result.bytes_sent; - self.chunks_sent += result.chunks_sent; - self.chunks_dropped += result.chunks_dropped; - - for (status_code, count) in &result.responses_count_per_code { - *self - .api_responses_count_per_code - .entry(*status_code) - .or_default() += count; - } - } -} - #[derive(Debug, PartialEq)] struct SelfMetric(MetricType, MetricNamespace); +/// Structure to build a Telemetry client. +/// +/// It will hold partial data until `build` method is called which result in a new +/// `TelemetryClient`. #[derive(Default)] pub struct TelemetryClientBuilder { service_name: Option, language: Option, language_version: Option, tracer_version: Option, - config: ddtelemetry::config::Config, metrics: HashMap, + heartbeat: u64, interval: u64, url: Option, } impl TelemetryClientBuilder { + /// Creates a new empty builder. pub fn new() -> Self { - Self::default() + Self { + ..Default::default() + } } + /// Sets the service name for the telemetry client pub fn set_service_name(mut self, name: &str) -> Self { self.service_name = Some(name.to_string()); self } + /// Sets the language name for the telemetry client pub fn set_language(mut self, lang: &str) -> Self { self.language = Some(lang.to_string()); self } + /// Sets the language version for the telemetry client pub fn set_language_version(mut self, version: &str) -> Self { self.language_version = Some(version.to_string()); self } + /// Sets the tracer version for the telemetry client pub fn set_tracer_version(mut self, version: &str) -> Self { self.tracer_version = Some(version.to_string()); self } + /// Register a new metric that will be used by the telemetry client. pub fn add_metric( mut self, name: &str, @@ -100,29 +79,47 @@ impl TelemetryClientBuilder { self } + /// Sets the interval time for sending metrics to the agent. pub fn set_interval(mut self, msecs: u64) -> Self { self.interval = msecs; self } - pub fn set_config(mut self, config: ddtelemetry::config::Config) -> Self { - self.config = config; + /// Sets the url where the metrics will be sent. + pub fn set_url(mut self, url: &str) -> Self { + self.url = Some(url.to_string()); self } - pub fn set_url(mut self, url: &str) -> Self { - self.url = Some(url.to_string()); + /// Sets the heartbeat notification interval in seconds. + pub fn set_hearbeat(mut self, interval: u64) -> Self { + self.heartbeat = interval; self } - pub async fn spawn(self) -> Result { + /// Builds the telemetry client. + pub async fn build(self) -> Result { + let mut config = ddtelemetry::config::Config { + ..Default::default() + }; + + if self.heartbeat > 0 { + config.telemetry_hearbeat_interval = Duration::from_secs(self.heartbeat); + } + + if let Some(url) = self.url { + let _ = config.set_endpoint(ddcommon::Endpoint::from_url( + url.parse::().unwrap(), + )); + } + let (worker, handle) = TelemetryWorkerBuilder::new_fetch_host( self.service_name.unwrap(), self.language.unwrap(), self.language_version.unwrap(), self.tracer_version.unwrap(), ) - .spawn_with_config(self.config.clone()) + .spawn_with_config(config) .await?; let metrics = { @@ -145,6 +142,7 @@ impl TelemetryClientBuilder { } } +/// Telemetry handle used to send metrics to the agent pub struct TelemetryClient { interval: tokio::time::Interval, worker: TelemetryWorkerHandle, @@ -155,7 +153,7 @@ pub struct TelemetryClient { } impl TelemetryClient { - async fn enqueue_point(&self, value: f64, key: ContextKey, tags: Vec) -> Result<()> { + async fn add_point(&self, value: f64, key: ContextKey, tags: Vec) -> Result<()> { self.worker .send_msg(TelemetryActions::AddPoint((value, key, tags))) .await @@ -163,21 +161,9 @@ impl TelemetryClient { async fn send(&mut self, metrics: Metrics) { let mut futures = Vec::new(); - if metrics.active_sessions > 0 { - let key = self.metrics.get("server.active_sessions").unwrap(); - futures.push(self.enqueue_point(metrics.active_sessions as f64, *key, vec![])); - } - if metrics.memory_usage > 0 { - let key = self.metrics.get("server.memory_usage").unwrap(); - futures.push(self.enqueue_point(metrics.memory_usage as f64, *key, vec![])); - } - if metrics.submitted_payloads > 0 { - let key = self.metrics.get("server.submitted_payloads").unwrap(); - futures.push(self.enqueue_point(metrics.submitted_payloads as f64, *key, vec![])); - } if metrics.api_requests > 0 { let key = self.metrics.get("trace_api.requests").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.api_requests as f64, *key, vec![Tag::new("src_library", "libdatadog").unwrap()], @@ -185,7 +171,7 @@ impl TelemetryClient { } if metrics.api_errors_network > 0 { let key = self.metrics.get("trace_api.errors").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.api_errors_network as f64, *key, vec![tag!("type", "network"), tag!("src_library", "libdatadog")], @@ -193,7 +179,7 @@ impl TelemetryClient { } if metrics.api_errors_timeout > 0 { let key = self.metrics.get("trace_api.errors").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.api_errors_timeout as f64, *key, vec![tag!("type", "timeout"), tag!("src_library", "libdatadog")], @@ -201,7 +187,7 @@ impl TelemetryClient { } if metrics.api_errors_status_code > 0 { let key = self.metrics.get("trace_api.errors").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.api_errors_status_code as f64, *key, vec![ @@ -212,7 +198,7 @@ impl TelemetryClient { } if metrics.bytes_sent > 0 { let key = self.metrics.get("trace_api.bytes").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.bytes_sent as f64, *key, vec![tag!("src_library", "libdatadog")], @@ -220,7 +206,7 @@ impl TelemetryClient { } if metrics.chunks_sent > 0 { let key = self.metrics.get("trace_chunks_sent").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.chunks_sent as f64, *key, vec![tag!("src_library", "libdatadog")], @@ -228,7 +214,7 @@ impl TelemetryClient { } if metrics.chunks_dropped > 0 { let key = self.metrics.get("trace_chunks_dropped").unwrap(); - futures.push(self.enqueue_point( + futures.push(self.add_point( metrics.chunks_dropped as f64, *key, vec![tag!("src_library", "libdatadog")], @@ -237,7 +223,7 @@ impl TelemetryClient { if !metrics.api_responses_count_per_code.is_empty() { let key = self.metrics.get("trace_api.responses").unwrap(); for (status_code, count) in &metrics.api_responses_count_per_code { - futures.push(self.enqueue_point( + futures.push(self.add_point( *count as f64, *key, vec![ @@ -251,6 +237,17 @@ impl TelemetryClient { futures::future::join_all(futures).await; } + /// Spins the Telemetry client. + /// + /// The client will call the update closure peridically at the interval set in the + /// `TelemetryClientBuilder` to retrieve the metrics and send them to the agent. The client + /// will loop indefinitely until the cancellation future resolves. + /// + /// # Arguments + /// + /// * update: closure to retrieve `Metrics`. + /// * cancellation: future which resolution will serve as cancellation point to stop the + /// client. pub async fn run(&mut self, mut update: U, cancellation: C) where U: FnMut() -> Option, @@ -288,8 +285,6 @@ impl TelemetryClient { #[cfg(test)] mod tests { - use ddcommon::Endpoint; - use ddtelemetry::config::Config; use httpmock::Method::POST; use httpmock::MockServer; @@ -322,15 +317,14 @@ mod tests { #[test] fn builder_test() { - let config = Config::default(); - let builder = TelemetryClientBuilder::new() .set_service_name("test_service") .set_language("test_language") .set_language_version("test_language_version") .set_tracer_version("test_tracer_version") .set_interval(100) - .set_config(config) + .set_url("http://localhost") + .set_hearbeat(30) .add_metric("test.foo", MetricType::Count, MetricNamespace::Telemetry) .add_metric( "test.bar", @@ -342,15 +336,9 @@ mod tests { assert_eq!(&builder.language.unwrap(), "test_language"); assert_eq!(&builder.language_version.unwrap(), "test_language_version"); assert_eq!(&builder.tracer_version.unwrap(), "test_tracer_version"); + assert_eq!(&builder.url.unwrap(), "http://localhost"); assert_eq!(builder.interval, 100_u64); - assert_eq!(builder.config.endpoint, None); - assert!(!builder.config.restartable); - assert!(!builder.config.direct_submission_enabled); - assert_eq!( - builder.config.telemetry_hearbeat_interval, - Duration::new(60, 0) - ); - assert!(!builder.config.telemetry_debug_logging_enabled); + assert_eq!(builder.heartbeat, 30_u64); assert_eq!( *builder.metrics.get("test.foo").unwrap(), SelfMetric(MetricType::Count, MetricNamespace::Telemetry) @@ -364,22 +352,19 @@ mod tests { #[cfg_attr(miri, ignore)] #[tokio::test] async fn spawn_test() { - let config = Config::default(); - let client = TelemetryClientBuilder::new() .set_service_name("test_service") .set_language("test_language") .set_language_version("test_language_version") .set_tracer_version("test_tracer_version") .set_interval(100) - .set_config(config) .add_metric("test.foo", MetricType::Count, MetricNamespace::Telemetry) .add_metric( "test.bar", MetricType::Distribution, MetricNamespace::General, ) - .spawn() + .build() .await; assert!(client.is_ok()); @@ -398,27 +383,20 @@ mod tests { }) .await; - let mut config = Config { - telemetry_hearbeat_interval: Duration::from_secs(1), - ..Default::default() - }; - let _ = config.set_endpoint(Endpoint::from_url( - server.url("/").parse::().unwrap(), - )); - let client = TelemetryClientBuilder::new() .set_service_name("test_service") .set_language("test_language") .set_language_version("test_language_version") .set_tracer_version("test_tracer_version") .set_interval(100) - .set_config(config) + .set_url(&server.url("/")) + .set_hearbeat(1) .add_metric( "trace_api.bytes", MetricType::Distribution, MetricNamespace::Tracers, ) - .spawn() + .build() .await; assert!(client.is_ok());