Skip to content

Commit

Permalink
OpenTelemetry: fallback to default storage when not on a Vert.x thread (
Browse files Browse the repository at this point in the history
#72)

The VertxContextStorageProvider is chosen by OpenTelemetry via the service loader mechanism.

While it is mandatory to use context for storage of spans created on Vert.x threads, there may be parts of the application implemented without Vert.x.

In this case, the storage provider fails with NPE. Instead, it is better to fall back to the default storage mechanism, which uses thread local variables.

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont authored Dec 12, 2023
1 parent eb4bf5b commit 1bdeb88
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 138 deletions.
13 changes: 13 additions & 0 deletions vertx-opentelemetry/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ which gives dummy values (all zeroes) for trace and span ids. The OpenTelemetry
{@link examples.OpenTelemetryExamples#ex7}
----

[NOTE]
====
This project provides an OpenTelemetry `ContextStorageProvider` that uses the Vert.x {@link io.vertx.core.Context} when invoked on a Vert.x thread.
Otherwise, it fallbacks to the default storage.
If several `ContextStorageProvider` implementations are present on the classpath, you can force OpenTelemetry to select the Vert.x one:
[source]
----
-Dio.opentelemetry.context.contextStorageProvider=io.vertx.tracing.opentelemetry.VertxContextStorageProvider
----
====

== Tracing policy

The tracing policy defines the behavior of a component when tracing is enabled:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -12,8 +12,6 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.tracing.VertxTracer;
Expand All @@ -38,7 +36,7 @@ public OpenTelemetryOptions(JsonObject json) {
this.setFactory(OpenTelemetryTracingFactory.INSTANCE);
}

VertxTracer<Span, Span> buildTracer() {
VertxTracer<Operation, Operation> buildTracer() {
if (openTelemetry != null) {
return new OpenTelemetryTracer(openTelemetry);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -16,6 +16,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapSetter;
Expand All @@ -24,13 +25,12 @@
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.tracing.opentelemetry.VertxContextStorageProvider.VertxContextStorage;

import java.util.Map.Entry;
import java.util.function.BiConsumer;

import static io.vertx.tracing.opentelemetry.VertxContextStorageProvider.ACTIVE_CONTEXT;

class OpenTelemetryTracer implements VertxTracer<Span, Span> {
class OpenTelemetryTracer implements VertxTracer<Operation, Operation> {

private static final TextMapGetter<Iterable<Entry<String, String>>> getter = new HeadersPropagatorGetter();
private static final TextMapSetter<BiConsumer<String, String>> setter = new HeadersPropagatorSetter();
Expand All @@ -44,7 +44,7 @@ class OpenTelemetryTracer implements VertxTracer<Span, Span> {
}

@Override
public <R> Span receiveRequest(
public <R> Operation receiveRequest(
final Context context,
final SpanKind kind,
final TracingPolicy policy,
Expand All @@ -57,49 +57,61 @@ public <R> Span receiveRequest(
return null;
}

io.opentelemetry.context.Context tracingContext = propagators.getTextMapPropagator().extract(io.opentelemetry.context.Context.current(), headers, getter);
io.opentelemetry.context.Context otelCtx;
if ((otelCtx = VertxContextStorage.INSTANCE.current()) == null) {
otelCtx = io.opentelemetry.context.Context.root();
}

otelCtx = propagators.getTextMapPropagator().extract(otelCtx, headers, getter);

// If no span, and policy is PROPAGATE, then don't create the span
if (Span.fromContextOrNull(tracingContext) == null && TracingPolicy.PROPAGATE.equals(policy)) {
if (Span.fromContextOrNull(otelCtx) == null && TracingPolicy.PROPAGATE.equals(policy)) {
return null;
}

final Span span = reportTagsAndStart(tracer
io.opentelemetry.api.trace.SpanKind spanKind = SpanKind.RPC.equals(kind) ? io.opentelemetry.api.trace.SpanKind.SERVER : io.opentelemetry.api.trace.SpanKind.CONSUMER;

SpanBuilder spanBuilder = tracer
.spanBuilder(operation)
.setParent(tracingContext)
.setSpanKind(SpanKind.RPC.equals(kind) ? io.opentelemetry.api.trace.SpanKind.SERVER : io.opentelemetry.api.trace.SpanKind.CONSUMER), request, tagExtractor, false);
.setParent(otelCtx)
.setSpanKind(spanKind);

VertxContextStorageProvider.VertxContextStorage.INSTANCE.attach(context, tracingContext.with(span));
Span span = reportTagsAndStart(spanBuilder, request, tagExtractor, false);
Scope scope = VertxContextStorage.INSTANCE.attach(context, span.storeInContext(otelCtx));

return span;
return new Operation(span, scope);
}

@Override
public <R> void sendResponse(
final Context context,
final R response,
final Span span,
final Operation operation,
final Throwable failure,
final TagExtractor<R> tagExtractor) {
if (span != null) {
context.remove(ACTIVE_CONTEXT);
end(span, response, tagExtractor, failure, false);
if (operation != null) {
end(operation, response, tagExtractor, failure, false);
}
}

private static <R> void end(Span span, R response, TagExtractor<R> tagExtractor, Throwable failure, boolean client) {
if (failure != null) {
span.recordException(failure);
}
if (response != null) {
Attributes attributes = processTags(response, tagExtractor, client);
span.setAllAttributes(attributes);
private static <R> void end(Operation operation, R response, TagExtractor<R> tagExtractor, Throwable failure, boolean client) {
Span span = operation.span();
try {
if (failure != null) {
span.recordException(failure);
}
if (response != null) {
Attributes attributes = processTags(response, tagExtractor, client);
span.setAllAttributes(attributes);
}
span.end();
} finally {
operation.scope().close();
}
span.end();
}

@Override
public <R> Span sendRequest(
public <R> Operation sendRequest(
final Context context,
final SpanKind kind,
final TracingPolicy policy,
Expand All @@ -112,36 +124,38 @@ public <R> Span sendRequest(
return null;
}

io.opentelemetry.context.Context tracingContext = context.getLocal(ACTIVE_CONTEXT);
io.opentelemetry.context.Context otelCtx = VertxContextStorage.INSTANCE.current();

if (tracingContext == null && !TracingPolicy.ALWAYS.equals(policy)) {
return null;
if (otelCtx == null) {
if (!TracingPolicy.ALWAYS.equals(policy)) {
return null;
}
otelCtx = io.opentelemetry.context.Context.root();
}

if (tracingContext == null) {
tracingContext = io.opentelemetry.context.Context.root();
}
io.opentelemetry.api.trace.SpanKind spanKind = SpanKind.RPC.equals(kind) ? io.opentelemetry.api.trace.SpanKind.CLIENT : io.opentelemetry.api.trace.SpanKind.PRODUCER;

SpanBuilder spanBuilder = tracer.spanBuilder(operation)
.setParent(otelCtx)
.setSpanKind(spanKind);

final Span span = reportTagsAndStart(tracer.spanBuilder(operation)
.setParent(tracingContext)
.setSpanKind(SpanKind.RPC.equals(kind) ? io.opentelemetry.api.trace.SpanKind.CLIENT : io.opentelemetry.api.trace.SpanKind.PRODUCER)
, request, tagExtractor, true);
Span span = reportTagsAndStart(spanBuilder, request, tagExtractor, true);

tracingContext = tracingContext.with(span);
propagators.getTextMapPropagator().inject(tracingContext, headers, setter);
otelCtx = otelCtx.with(span);
propagators.getTextMapPropagator().inject(otelCtx, headers, setter);

return span;
return new Operation(span, Scope.noop());
}

@Override
public <R> void receiveResponse(
final Context context,
final R response,
final Span span,
final Operation operation,
final Throwable failure,
final TagExtractor<R> tagExtractor) {
if (span != null) {
end(span, response, tagExtractor, failure, true);
if (operation != null) {
end(operation, response, tagExtractor, failure, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 Contributors to the Eclipse Foundation
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
Expand All @@ -10,8 +10,6 @@
*/
package io.vertx.tracing.opentelemetry;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.VertxTracerFactory;
import io.vertx.core.spi.tracing.VertxTracer;
Expand All @@ -22,7 +20,7 @@ public class OpenTelemetryTracingFactory implements VertxTracerFactory {
static final OpenTelemetryTracingFactory INSTANCE = new OpenTelemetryTracingFactory();

@Override
public VertxTracer<Span, Span> tracer(final TracingOptions options) {
public VertxTracer<?, ?> tracer(final TracingOptions options) {
OpenTelemetryOptions openTelemetryOptions;
if (options instanceof OpenTelemetryOptions) {
openTelemetryOptions = (OpenTelemetryOptions) options;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.tracing.opentelemetry;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;

import java.util.Objects;

final class Operation {

private final Span span;
private final Scope scope;

Operation(Span span, Scope scope) {
this.span = Objects.requireNonNull(span);
this.scope = Objects.requireNonNull(scope);
}

public Span span() {
return span;
}

public Scope scope() {
return scope;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/

package io.vertx.tracing.opentelemetry;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextStorage;
import io.opentelemetry.context.ContextStorageProvider;
import io.opentelemetry.context.Scope;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

public class VertxContextStorageProvider implements ContextStorageProvider {

static String ACTIVE_CONTEXT = "tracing.context";
private static final Object ACTIVE_CONTEXT = new Object();

@Override
public ContextStorage get() {
Expand All @@ -20,7 +31,11 @@ enum VertxContextStorage implements ContextStorage {

@Override
public Scope attach(Context toAttach) {
return attach(Vertx.currentContext(), toAttach);
ContextInternal current = ContextInternal.current();
if (current == null) {
return ContextStorage.defaultStorage().attach(toAttach);
}
return attach(current, toAttach);
}

public Scope attach(io.vertx.core.Context vertxCtx, Context toAttach) {
Expand All @@ -40,9 +55,9 @@ public Scope attach(io.vertx.core.Context vertxCtx, Context toAttach) {

@Override
public Context current() {
io.vertx.core.Context vertxCtx = Vertx.currentContext();
ContextInternal vertxCtx = ContextInternal.current();
if (vertxCtx == null) {
return null;
return ContextStorage.defaultStorage().current();
}
return vertxCtx.getLocal(ACTIVE_CONTEXT);
}
Expand Down
Loading

0 comments on commit 1bdeb88

Please sign in to comment.