Skip to content

Commit

Permalink
Support Elastic Cloud OpenTelemetry attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
pquentin committed Mar 12, 2024
1 parent 865be8c commit 4e8941b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 14 deletions.
6 changes: 4 additions & 2 deletions elastic_transport/_async_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ async def perform_request( # type: ignore[override]
method,
endpoint_id=resolve_default(endpoint_id, None),
path_parts=resolve_default(path_parts, {}),
):
return await self._perform_request(
) as span:
response = await self._perform_request(
method,
target,
body=body,
Expand All @@ -232,6 +232,8 @@ async def perform_request( # type: ignore[override]
request_timeout=request_timeout,
client_meta=client_meta,
)
span.set_elastic_cloud_metadata(response.meta.headers)
return response

async def _perform_request( # type: ignore[override,return]
self,
Expand Down
34 changes: 26 additions & 8 deletions elastic_transport/_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

try:
from opentelemetry import trace
from opentelemetry.trace import Span

_tracer: trace.Tracer | None = trace.get_tracer("elastic-transport")
except ModuleNotFoundError:
Expand All @@ -32,6 +33,23 @@
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"


class OpenTelemetrySpan:
def __init__(self, otel_span: Optional[Span]):
self.otel_span = otel_span

def set_attribute(self, key: str, value: str) -> None:
if self.otel_span is not None:
self.otel_span.set_attribute(key, value)

def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None:
cluster_name = headers.get("X-Found-Handling-Cluster")
if cluster_name is not None:
self.set_attribute("db.elasticsearch.cluster.name", cluster_name)
node_name = headers.get("X-Found-Handling-Instance")
if node_name is not None:
self.set_attribute("db.elasticsearch.node.name", node_name)


class OpenTelemetry:
def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = None):
if enabled is None:
Expand All @@ -46,17 +64,17 @@ def span(
*,
endpoint_id: Optional[str],
path_parts: Mapping[str, str],
) -> Generator[None, None, None]:
) -> Generator[OpenTelemetrySpan, None, None]:
if not self.enabled or self.tracer is None:
yield
yield OpenTelemetrySpan(None)
return

span_name = endpoint_id or method
with self.tracer.start_as_current_span(span_name) as span:
span.set_attribute("http.request.method", method)
span.set_attribute("db.system", "elasticsearch")
with self.tracer.start_as_current_span(span_name) as otel_span:
otel_span.set_attribute("http.request.method", method)
otel_span.set_attribute("db.system", "elasticsearch")
if endpoint_id is not None:
span.set_attribute("db.operation", endpoint_id)
otel_span.set_attribute("db.operation", endpoint_id)
for key, value in path_parts.items():
span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
yield
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
yield OpenTelemetrySpan(otel_span)
6 changes: 4 additions & 2 deletions elastic_transport/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ def perform_request(
method,
endpoint_id=resolve_default(endpoint_id, None),
path_parts=resolve_default(path_parts, {}),
):
return self._perform_request(
) as span:
api_response = self._perform_request(
method,
target,
body=body,
Expand All @@ -315,6 +315,8 @@ def perform_request(
request_timeout=request_timeout,
client_meta=client_meta,
)
span.set_elastic_cloud_metadata(api_response.meta.headers)
return api_response

def _perform_request( # type: ignore[return]
self,
Expand Down
11 changes: 9 additions & 2 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ def test_detailed_span():
otel = OpenTelemetry(enabled=True, tracer=tracer)
with otel.span(
"GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"}
):
pass
) as span:
span.set_elastic_cloud_metadata(
{
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
"X-Found-Handling-Instance": "instance-0000000001",
}
)

spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
Expand All @@ -65,4 +70,6 @@ def test_detailed_span():
"db.operation": "ml.close_job",
"db.elasticsearch.path_parts.job_id": "my-job",
"db.elasticsearch.path_parts.foo": "bar",
"db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f",
"db.elasticsearch.node.name": "instance-0000000001",
}

0 comments on commit 4e8941b

Please sign in to comment.