Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenTelemetry support #70

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions .buildkite/generatesteps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import yaml


def benchmark_to_steps(python, connection_class):
def benchmark_to_steps(python, connection_class, nox_session):
return [
{
"group": f":elasticsearch: :python: ES Serverless ({python}/{connection_class})",
"group": f":elasticsearch: :python: {nox_session} {python} {connection_class}",
"steps": [
{
"label": "Run tests",
"agents": {"provider": "gcp"},
"env": {
"PYTHON_VERSION": f"{python}",
"PYTHON_CONNECTION_CLASS": f"{connection_class}",
"NOX_SESSION": f"{nox_session}",
# For development versions
# https://github.com/aio-libs/aiohttp/issues/6600
"AIOHTTP_NO_EXTENSIONS": 1,
Expand All @@ -24,12 +25,12 @@ def benchmark_to_steps(python, connection_class):
"EC_REGISTER_BACKEND": "appex-qa-team-cluster",
"EC_ENV": "qa",
"EC_REGION": "aws-eu-west-1",
"EC_PROJECT_PREFIX": f"esv-client-python-test-{python}-{connection_class}",
"EC_PROJECT_PREFIX": f"esv-client-python-{nox_session}-{python}-{connection_class}",
},
"command": "./.buildkite/run-tests",
"artifact_paths": "junit/*-junit.xml",
"retry": {"manual": False},
"key": f"run_{python.replace('.', '_')}_{connection_class}",
"key": f"run_{python.replace('.', '_')}_{connection_class}_{nox_session}",
},
{
"label": "Teardown",
Expand All @@ -40,7 +41,7 @@ def benchmark_to_steps(python, connection_class):
"EC_REGISTER_BACKEND": "appex-qa-team-cluster",
"EC_ENV": "qa",
"EC_REGION": "aws-eu-west-1",
"EC_PROJECT_PREFIX": f"esv-client-python-test-{python}-{connection_class}",
"EC_PROJECT_PREFIX": f"esv-client-python-{nox_session}-{python}-{connection_class}",
},
"command": ".buildkite/teardown-tests",
"depends_on": f"run_{python.replace('.', '_')}_{connection_class}",
Expand All @@ -55,5 +56,7 @@ def benchmark_to_steps(python, connection_class):
steps = []
for python in ["3.9", "3.10", "3.11", "3.12"]:
for connection_class in ["urllib3", "requests"]:
steps.extend(benchmark_to_steps(python, connection_class))
steps.extend(benchmark_to_steps(python, connection_class, "test"))
steps.extend(benchmark_to_steps("3.9", "urllib3", "test_otel"))
steps.extend(benchmark_to_steps("3.12", "urllib3", "test_otel"))
print(yaml.dump({"steps": steps}, Dumper=yaml.Dumper, sort_keys=False))
3 changes: 2 additions & 1 deletion .buildkite/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ echo -e "--- :computer: Environment variables"
echo -e "ELASTICSEARCH_URL $ELASTICSEARCH_URL"
echo -e "PYTHON_VERSION $PYTHON_VERSION"
echo -e "PYTHON_CONNECTION_CLASS $PYTHON_CONNECTION_CLASS"
echo -e "NOX_SESSION $NOX_SESSION"

echo -e "--- :docker: Build elasticsearch-serverless-python container"

Expand All @@ -74,4 +75,4 @@ docker run \
--volume "$(pwd)/junit:/code/elasticsearch-serverless-python/junit" \
--rm \
elasticsearch-serverless-python \
nox -s "test-$PYTHON_VERSION"
nox -s ${NOX_SESSION}-${PYTHON_VERSION}
Binary file added docs/guide/images/otel-waterfall-retry.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/guide/images/otel-waterfall-with-http.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/guide/images/otel-waterfall-without-http.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions docs/guide/integrations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
You can find integration options and information on this page.


[discrete]
[[opentelemetry-intro]]
=== OpenTelemetry instrumentation

The Python Elasticsearch client supports native OpenTelemetry instrumentation following the https://opentelemetry.io/docs/specs/semconv/database/elasticsearch/[OpenTelemetry Semantic Conventions for Elasticsearch].
Refer to the <<opentelemetry>> page for details.

[discrete]
[[transport]]
=== Transport
Expand Down Expand Up @@ -53,3 +60,6 @@ es.options(
------------------------------------

Type hints also allow tools like your IDE to check types and provide better auto-complete functionality.


include::open-telemetry.asciidoc[]
75 changes: 75 additions & 0 deletions docs/guide/open-telemetry.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[[opentelemetry]]
=== Using OpenTelemetry

You can use https://opentelemetry.io/[OpenTelemetry] to monitor the performance and behavior of your {es} requests through the Elasticsearch Python client.
The Python client comes with built-in OpenTelemetry instrumentation that emits https://www.elastic.co/guide/en/apm/guide/current/apm-distributed-tracing.html[distributed tracing spans] by default.
With that, applications using https://www.elastic.co/blog/manual-instrumentation-of-python-applications-opentelemetry[manual OpenTelemetry instrumentation] or https://www.elastic.co/blog/auto-instrumentation-of-python-applications-opentelemetry[automatic OpenTelemetry instrumentation] are enriched with additional spans that contain insightful information about the execution of the {es} requests.

The native instrumentation in the Python client follows the https://opentelemetry.io/docs/specs/semconv/database/elasticsearch/[OpenTelemetry Semantic Conventions for {es}]. In particular, the instrumentation in the client covers the logical layer of {es} requests. A single span per request is created that is processed by the service through the Python client. The following image shows a trace that records the handling of two different {es} requests: an `info` request and a `search` request.

[role="screenshot"]
image::images/otel-waterfall-without-http.png[alt="Distributed trace with Elasticsearch spans",align="center"]

Usually, OpenTelemetry auto-instrumentation modules come with instrumentation support for HTTP-level communication. In this case, in addition to the logical {es} client requests, spans will be captured for the physical HTTP requests emitted by the client. The following image shows a trace with both, {es} spans (in blue) and the corresponding HTTP-level spans (in red) after having installed the ``opentelemetry-instrumentation-urllib3`` package:

[role="screenshot"]
image::images/otel-waterfall-with-http.png[alt="Distributed trace with Elasticsearch spans",align="center"]

Advanced Python client behavior such as nodes round-robin and request retries are revealed through the combination of logical {es} spans and the physical HTTP spans. The following example shows a `search` request in a scenario with two nodes:

[role="screenshot"]
image::images/otel-waterfall-retry.png[alt="Distributed trace with Elasticsearch spans",align="center"]

The first node is unavailable and results in an HTTP error, while the retry to the second node succeeds. Both HTTP requests are subsumed by the logical {es} request span (in blue).

[discrete]
==== Setup the OpenTelemetry instrumentation

When using the https://opentelemetry.io/docs/languages/python/instrumentation/[manual Python OpenTelemetry instrumentation] or the https://opentelemetry.io/docs/languages/python/automatic/[OpenTelemetry Python agent], the Python client's OpenTelemetry instrumentation is enabled by default and uses the global OpenTelemetry SDK with the global tracer provider.
If you're getting started with OpenTelemetry instrumentation, the following blog posts have step-by-step instructions to ingest and explore tracing data with the Elastic stack:

* https://www.elastic.co/blog/manual-instrumentation-of-python-applications-opentelemetry[Manual instrumentation with OpenTelemetry for Python applications]
* https://www.elastic.co/blog/auto-instrumentation-of-python-applications-opentelemetry[Automatic instrumentation with OpenTelemetry for Python applications]

[discrete]
=== Comparison with community instrumentation

The https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/elasticsearch/elasticsearch.html[commmunity OpenTelemetry Elasticsearch instrumentation] also instruments the client and sends OpenTelemetry traces, but was developed before the OpenTelemetry Semantic Conventions for {es}, so the traces attributes are inconsistent with other OpenTelemetry Elasticsearch client instrumentations. To avoid tracing the same requests twice, make sure to use only one instrumentation, either by uninstalling the opentelemetry-instrumentation-elasticsearch Python package or by <<opentelemetry-config-enable,disabling the native instrumentation>>.

[discrete]
==== Configuring the OpenTelemetry instrumentation

You can configure this OpenTelemetry instrumentation through environment variables.
The following configuration options are available.

[discrete]
[[opentelemetry-config-enable]]
===== Enable / Disable the OpenTelemetry instrumentation

With this configuration option you can enable (default) or disable the built-in OpenTelemetry instrumentation.

**Default:** `true`

|============
| Environment Variable | `OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED`
|============

[discrete]
===== Capture search request bodies

Per default, the built-in OpenTelemetry instrumentation does not capture request bodies due to data privacy considerations. You can use this option to enable capturing of search queries from the request bodies of {es} search requests in case you wish to gather this information regardless. The options are to capture the raw search query or not capture it at all.

**Default:** `omit`

**Valid Options:** `omit`, `raw`

|============
| Environment Variable | `OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY`
|============

[discrete]
==== Overhead

The OpenTelemetry instrumentation (as any other monitoring approach) may come with a slight overhead on CPU, memory, and/or latency. The overhead may only occur when the instrumentation is enabled (default) and an OpenTelemetry SDK is active in the target application. When the instrumentation is disabled or no OpenTelemetry SDK is active within the target application, monitoring overhead is not expected when using the client.

Even in cases where the instrumentation is enabled and is actively used (by an OpenTelemetry SDK), the overhead is minimal and negligible in the vast majority of cases. In edge cases where there is a noticeable overhead, the <<opentelemetry-config-enable,instrumentation can be explicitly disabled>> to eliminate any potential impact on performance.
32 changes: 32 additions & 0 deletions elasticsearch_serverless/_async/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
HttpHeaders,
ListApiResponse,
ObjectApiResponse,
OpenTelemetrySpan,
TextApiResponse,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ...compat import warn_stacklevel
from ...exceptions import (
HTTP_EXCEPTIONS,
Expand Down Expand Up @@ -125,6 +127,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> AsyncTransport:
Expand All @@ -140,6 +143,34 @@ async def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = await self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

async def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand All @@ -162,6 +193,7 @@ async def perform_request(
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
92 changes: 92 additions & 0 deletions elasticsearch_serverless/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import contextlib
import os
from typing import TYPE_CHECKING, Generator, Mapping

if TYPE_CHECKING:
from typing import Literal

try:
from opentelemetry import trace

_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
except ModuleNotFoundError:
_tracer = None

from elastic_transport import OpenTelemetrySpan

# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
# Describes how to handle search queries in the request body when assigned to
# a span attribute.
# Valid values are 'omit' and 'raw'.
# Default is 'omit' as 'raw' has security implications.
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
DEFAULT_BODY_STRATEGY = "omit"


class OpenTelemetry:
def __init__(
self,
enabled: bool | None = None,
tracer: trace.Tracer | None = None,
# TODO import Literal at the top-level when dropping Python 3.7
body_strategy: 'Literal["omit", "raw"]' | None = None,
):
if enabled is None:
enabled = os.environ.get(ENABLED_ENV_VAR, "true") == "true"
self.tracer = tracer or _tracer
self.enabled = enabled and self.tracer is not None

if body_strategy is not None:
self.body_strategy = body_strategy
else:
self.body_strategy = os.environ.get(
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
) # type: ignore[assignment]
assert self.body_strategy in ("omit", "raw")

@contextlib.contextmanager
def span(
self,
method: str,
*,
endpoint_id: str | None,
path_parts: Mapping[str, str],
) -> Generator[OpenTelemetrySpan, None, None]:
if not self.enabled or self.tracer is None:
yield OpenTelemetrySpan(None)
return

span_name = endpoint_id or method
with self.tracer.start_as_current_span(span_name) as otel_span:
otel_span.set_attribute("http.request.method", method)
otel_span.set_attribute("db.system", "elasticsearch")
if endpoint_id is not None:
otel_span.set_attribute("db.operation", endpoint_id)
for key, value in path_parts.items():
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)

yield OpenTelemetrySpan(
otel_span,
endpoint_id=endpoint_id,
body_strategy=self.body_strategy,
)
32 changes: 32 additions & 0 deletions elasticsearch_serverless/_sync/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
HttpHeaders,
ListApiResponse,
ObjectApiResponse,
OpenTelemetrySpan,
TextApiResponse,
Transport,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ...compat import warn_stacklevel
from ...exceptions import (
HTTP_EXCEPTIONS,
Expand Down Expand Up @@ -125,6 +127,7 @@ def __init__(self, _transport: Transport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> Transport:
Expand All @@ -140,6 +143,34 @@ def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand All @@ -162,6 +193,7 @@ def perform_request(
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
Loading
Loading