Skip to content

Commit

Permalink
OpenTelemetryTracer should obtain the active otel context from the pa…
Browse files Browse the repository at this point in the history
…ssed vertx context instead of relying on the thread local

Motivation:

OpenTelemetryTracer obtains the active otel context from the thread local assocation, which returns null on a worker thread.

Changes:

OpenTelemetryTracer gets the active otel from the passed vertx context.
  • Loading branch information
vietj committed Jan 15, 2025
1 parent a1f3c71 commit ce6a95d
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.opentelemetry.context.propagation.TextMapSetter;
import io.vertx.core.Context;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
Expand All @@ -31,6 +32,8 @@
import java.util.Map.Entry;
import java.util.function.BiConsumer;

import static io.vertx.tracing.opentelemetry.VertxContextStorageProvider.ACTIVE_CONTEXT;

class OpenTelemetryTracer implements VertxTracer<Operation, Operation> {

private static final TextMapGetter<Iterable<Entry<String, String>>> getter = new HeadersPropagatorGetter();
Expand Down Expand Up @@ -58,8 +61,8 @@ public <R> Operation receiveRequest(
return null;
}

io.opentelemetry.context.Context otelCtx;
if ((otelCtx = VertxContextStorage.INSTANCE.current()) == null) {
io.opentelemetry.context.Context otelCtx = ((ContextInternal)context).getLocal(ACTIVE_CONTEXT);
if (otelCtx == null) {
otelCtx = io.opentelemetry.context.Context.root();
}

Expand Down Expand Up @@ -125,7 +128,7 @@ public <R> Operation sendRequest(
return null;
}

io.opentelemetry.context.Context otelCtx = VertxContextStorage.INSTANCE.current();
io.opentelemetry.context.Context otelCtx = ((ContextInternal)context).getLocal(ACTIVE_CONTEXT);

if (otelCtx == null) {
if (!TracingPolicy.ALWAYS.equals(policy)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class VertxContextStorageProvider implements ContextStorageProvider {

private static final Object ACTIVE_CONTEXT = new Object();
public static final Object ACTIVE_CONTEXT = new Object();

@Override
public ContextStorage get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.tracing.TracingPolicy;
Expand Down Expand Up @@ -75,34 +73,31 @@ public void tearDown(VertxTestContext context) throws Exception {

private static Stream<Arguments> testTracingPolicyArgs() {
return Stream.of(
Arguments.of(TracingPolicy.PROPAGATE, true),
Arguments.of(TracingPolicy.PROPAGATE, false),
Arguments.of(TracingPolicy.ALWAYS, true)
Arguments.of(TracingPolicy.PROPAGATE, true, ThreadingModel.EVENT_LOOP),
Arguments.of(TracingPolicy.PROPAGATE, false, ThreadingModel.EVENT_LOOP),
Arguments.of(TracingPolicy.ALWAYS, true, ThreadingModel.EVENT_LOOP),
Arguments.of(TracingPolicy.PROPAGATE, true, ThreadingModel.WORKER)
);
}

@ParameterizedTest
@MethodSource("testTracingPolicyArgs")
public void testHttpServerRequestWithPolicy(TracingPolicy policy, boolean createTrace, VertxTestContext ctx) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
public void testHttpServerRequestWithPolicy(TracingPolicy policy, boolean createTrace, ThreadingModel threadingModel, VertxTestContext ctx) throws Exception {
final boolean expectTrace = (policy == TracingPolicy.PROPAGATE && createTrace) || policy == TracingPolicy.ALWAYS;

ctx.assertComplete(
vertx.createHttpServer(new HttpServerOptions().setTracingPolicy(policy)).requestHandler(req -> {
ctx.verify(() -> {
if (expectTrace) {
assertThat(Span.current())
.isNotEqualTo(Span.getInvalid());
} else {
assertThat(Span.current())
.isEqualTo(Span.getInvalid());
}
});
req.response().end();
}).listen(8080).onSuccess(v -> latch.countDown())
);

Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS));
vertx.deployVerticle($ -> vertx
.createHttpServer(new HttpServerOptions().setTracingPolicy(policy)).requestHandler(req -> {
ctx.verify(() -> {
if (expectTrace) {
assertThat(Span.current())
.isNotEqualTo(Span.getInvalid());
} else {
assertThat(Span.current())
.isEqualTo(Span.getInvalid());
}
});
req.response().end();
}).listen(8080), new DeploymentOptions().setThreadingModel(threadingModel)).await(20, TimeUnit.SECONDS);

if (createTrace) {
sendRequestWithTrace();
Expand Down Expand Up @@ -135,29 +130,27 @@ public void testHttpServerRequestWithPolicy(TracingPolicy policy, boolean create

@ParameterizedTest
@MethodSource("testTracingPolicyArgs")
public void testHttpClientRequestWithPolicy(TracingPolicy policy, boolean createTrace, VertxTestContext ctx) throws Exception {
public void testHttpClientRequestWithPolicy(TracingPolicy policy, boolean createTrace, ThreadingModel threadingModel, VertxTestContext ctx) throws Exception {
int expectedTrace = (createTrace ? 1 : 0) +
(policy == TracingPolicy.PROPAGATE && createTrace ? 2 : 0) +
(policy == TracingPolicy.ALWAYS ? 2 : 0);

CountDownLatch latch = new CountDownLatch(2);
HttpClient c = vertx.createHttpClient(new HttpClientOptions().setTracingPolicy(policy));

// Proxy server
vertx.createHttpServer(new HttpServerOptions().setTracingPolicy(TracingPolicy.PROPAGATE)).requestHandler(req ->
c.request(HttpMethod.GET, 8081, "localhost", "/").onComplete(ctx.succeeding(clientReq ->
clientReq.send().onComplete(ctx.succeeding(clientResp ->
req.response().end()
vertx.deployVerticle($ -> {
HttpClient c = vertx.createHttpClient(new HttpClientOptions().setTracingPolicy(policy));
return vertx.createHttpServer(new HttpServerOptions().setTracingPolicy(TracingPolicy.PROPAGATE)).requestHandler(req ->
c.request(HttpMethod.GET, 8081, "localhost", "/").onComplete(ctx.succeeding(clientReq ->
clientReq.send().onComplete(ctx.succeeding(clientResp ->
req.response().end()
))
))
))
).listen(8080).onComplete(ctx.succeeding(v -> latch.countDown()));
).listen(8080);
}, new DeploymentOptions().setThreadingModel(threadingModel)).await(20, TimeUnit.SECONDS);

// End server
vertx.createHttpServer(new HttpServerOptions().setTracingPolicy(TracingPolicy.PROPAGATE))
.requestHandler(req -> req.response().end())
.listen(8081).onComplete(ctx.succeeding(v -> latch.countDown()));

Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS));
.listen(8081)
.await(20, TimeUnit.SECONDS);

if (createTrace) {
sendRequestWithTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.opentelemetry.context.propagation.ContextPropagators;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.junit5.VertxExtension;
import io.vertx.tracing.opentelemetry.OpenTelemetryOptions;
import io.vertx.tracing.opentelemetry.Operation;
import io.vertx.tracing.opentelemetry.VertxContextStorageProvider;
import io.vertx.tracing.opentelemetry.VertxContextStorageProvider.VertxContextStorage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -242,8 +244,8 @@ public void sendRequestShouldNotReturnSpanIfPolicyIsPropagateAndPreviousContextI
public void sendRequestShouldReturnSpanIfPolicyIsPropagateAndPreviousContextIsPresent(final Vertx vertx) {
VertxTracer<Operation, Operation> tracer = new OpenTelemetryOptions(OpenTelemetry.noop()).buildTracer();

final Context ctx = vertx.getOrCreateContext();
VertxContextStorage.INSTANCE.attach(io.opentelemetry.context.Context.current());
final ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
ctx.putLocal(VertxContextStorageProvider.ACTIVE_CONTEXT, io.opentelemetry.context.Context.current());

final Operation operation = tracer.sendRequest(
ctx,
Expand Down

0 comments on commit ce6a95d

Please sign in to comment.