Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry client implementation to data pipeline. #802

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions data-pipeline-ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl From<TraceExporterError> for ExporterError {
},
TraceExporterError::Builder(e) => match e {
BuilderErrorKind::InvalidUri => ExporterErrorCode::InvalidUrl,
BuilderErrorKind::InvalidTelemetryConfig => ExporterErrorCode::InvalidArgument,
},
TraceExporterError::Deserialization(_) => ExporterErrorCode::Serde,
TraceExporterError::Io(e) => match e.kind() {
Expand Down
2 changes: 2 additions & 0 deletions data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ either = "1.13.0"
tokio = { version = "1.23", features = ["rt", "test-util", "time"], default-features = false }

ddcommon = { path = "../ddcommon" }
ddtelemetry = { path = "../ddtelemetry" }
datadog-trace-protobuf = { path = "../trace-protobuf" }
datadog-trace-utils = { path = "../trace-utils" }
datadog-trace-normalization = { path = "../trace-normalization" }
Expand All @@ -46,4 +47,5 @@ criterion = "0.5.1"
datadog-trace-utils = { path = "../trace-utils", features = ["test-utils"] }
httpmock = "0.7.0"
rand = "0.8.5"
regex = "1.5"
tokio = {version = "1.23", features = ["rt", "time", "test-util"], default-features = false}
1 change: 1 addition & 0 deletions data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ mod health_metrics;
pub mod span_concentrator;
#[allow(missing_docs)]
pub mod stats_exporter;
pub mod telemetry;
#[allow(missing_docs)]
pub mod trace_exporter;
121 changes: 121 additions & 0 deletions data-pipeline/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Provides an abstraction layer to hold metrics that comes from 'SendDataResult'.
use ddcommon::tag;
use ddtelemetry::data::metrics::{MetricNamespace, MetricType};
use ddtelemetry::metrics::ContextKey;
use ddtelemetry::worker::TelemetryWorkerHandle;
use std::collections::HashMap;

/// trace_api.requests metric
pub const API_REQUEST_STR: &str = "trace_api.requests";
/// trace_api.errors metric
pub const API_ERRORS_STR: &str = "trace_api.errors";
/// trace_api.bytes metric
pub const API_BYTES_STR: &str = "trace_api.bytes";
/// trace_api.responses metric
pub const API_RESPONSES_STR: &str = "trace_api.responses";
/// trace_chunk_sent metric
pub const CHUNKS_SENT_STR: &str = "trace_chunk_sent";
/// trace_chunk_dropped metric
pub const CHUNKS_DROPPED_STR: &str = "trace_chunk_dropped";

struct Metric {
name: &'static str,
metric_type: MetricType,
namespace: MetricNamespace,
}

const METRICS: &[Metric] = &[
Metric {
name: API_REQUEST_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
},
Metric {
name: API_ERRORS_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
},
Metric {
name: API_BYTES_STR,
metric_type: MetricType::Distribution,
namespace: MetricNamespace::Tracers,
},
Metric {
name: API_RESPONSES_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
},
Metric {
name: CHUNKS_SENT_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
},
Metric {
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
},
];

/// Structure to accumulate partial results coming from sending traces to the agent.
#[derive(Debug)]
pub struct Metrics(HashMap<String, ContextKey>);
Comment on lines +63 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of using hashmaps to lookup harcoded keys, for performance and correctness, but I don't really have a good counter proposition that'd be shorter to write...
Maybe associate an index to each metric name and hold Contexts in a Vec/array?


impl Metrics {
/// Creates a new Metrics instance
pub fn new(worker: &TelemetryWorkerHandle) -> Self {
let mut map = HashMap::new();
for metric in METRICS {
let key = worker.register_metric_context(
metric.name.to_string(),
vec![tag!("src_library", "libdatadog")],
metric.metric_type,
true,
metric.namespace,
);
map.insert(metric.name.to_string(), key);
}

Self(map)
}

/// Gets the context key associated with the metric.
pub fn get(&self, metric_name: &str) -> Option<&ContextKey> {
self.0.get(metric_name)
}
}

#[cfg(test)]
mod tests {
use super::*;
use ddtelemetry::config::Config;
use ddtelemetry::worker::TelemetryWorkerBuilder;

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn new_test() {
let (worker, _) = TelemetryWorkerBuilder::new_fetch_host(
"service".to_string(),
"language".to_string(),
"0.1".to_string(),
"1.0".to_string(),
)
.spawn_with_config(Config::default())
.await
.unwrap();

let metrics = Metrics::new(&worker);

assert!(!metrics.0.is_empty());

assert!(metrics.get(API_REQUEST_STR).is_some());
assert!(metrics.get(API_RESPONSES_STR).is_some());
assert!(metrics.get(API_BYTES_STR).is_some());
assert!(metrics.get(API_ERRORS_STR).is_some());
assert!(metrics.get(CHUNKS_SENT_STR).is_some());
assert!(metrics.get(CHUNKS_DROPPED_STR).is_some());
}
}
Loading
Loading