Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Zipkin Brave 6 #87

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
</scm>

<properties>
<junit.jupiter.version>5.9.1</junit.jupiter.version>
<testcontainers.version>1.17.6</testcontainers.version>
</properties>

Expand Down Expand Up @@ -144,4 +145,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
1 change: 0 additions & 1 deletion vertx-opentelemetry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<properties>
<opentelemetry.version>1.42.1</opentelemetry.version>
<assertj.version>3.22.0</assertj.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<mokito.junit.jupiter.version>4.5.1</mokito.junit.jupiter.version>
<awaitility.version>4.2.0</awaitility.version>
</properties>
Expand Down
20 changes: 16 additions & 4 deletions vertx-zipkin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
<name>Vert.x Zipkin</name>

<properties>
<zipkin.version>5.6.0</zipkin.version>
<zipkin.version>6.0.3</zipkin.version>
<zipkin-reporter.version>3.4.3</zipkin-reporter.version>
</properties>

<dependencies>
Expand All @@ -42,11 +43,16 @@
<artifactId>brave-instrumentation-http</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
<version>${zipkin-reporter.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -57,13 +63,19 @@
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-junit</artifactId>
<version>2.12.1</version>
<version>2.24.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>2.7.13</version>
<version>${zipkin-reporter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
141 changes: 35 additions & 106 deletions vertx-zipkin/src/main/java/io/vertx/tracing/zipkin/VertxSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,16 @@
*/
package io.vertx.tracing.zipkin;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.http.*;
import io.vertx.core.spi.VertxTracerFactory;
import io.vertx.core.tracing.TracingOptions;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.codec.Encoding;
import zipkin2.reporter.Sender;
import zipkin2.reporter.BaseHttpSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSuppliers;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -43,7 +28,7 @@
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class VertxSender extends Sender {
public class VertxSender extends BaseHttpSender<RequestOptions, Buffer> {

private static final CharSequence APPLICATION_JSON = HttpHeaders.createOptimized("application/json");

Expand All @@ -54,6 +39,7 @@ public class VertxSender extends Sender {
private final String endpoint;

public VertxSender(HttpSenderOptions options) {
super(Encoding.JSON, HttpEndpointSuppliers.constantFactory(), options.getSenderEndpoint());
this.options = new HttpSenderOptions(options);
this.endpoint = options.getSenderEndpoint();
this.vertx = Vertx.builder().withTracer(VertxTracerFactory.NOOP).build();
Expand All @@ -78,7 +64,7 @@ public int messageMaxBytes() {
public int messageSizeInBytes(List<byte[]> encodedSpans) {
int val = 2;
int length = encodedSpans.size();
for(int i = 0;i < length;i++) {
for (int i = 0; i < length; i++) {
if (i > 0) {
++val;
}
Expand All @@ -88,105 +74,48 @@ public int messageSizeInBytes(List<byte[]> encodedSpans) {
}

@Override
public Call<Void> sendSpans(List<byte[]> encodedSpans) {
protected RequestOptions newEndpoint(String endpoint) {
RequestOptions options = new RequestOptions()
.setMethod(HttpMethod.POST)
.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
if (endpoint.startsWith("http://") || endpoint.startsWith("https://")) {
options.setAbsoluteURI(endpoint);
} else {
options.setURI(endpoint);
}
return options;
}

@Override
protected Buffer newBody(List<byte[]> encodedSpans) {
int capacity = messageSizeInBytes(encodedSpans);
Buffer body = Buffer.buffer(capacity);
body.appendByte((byte) '[');
for (int i = 0;i < encodedSpans.size();i++) {
for (int i = 0; i < encodedSpans.size(); i++) {
if (i > 0) {
body.appendByte((byte) ',');
}
body.appendBytes(encodedSpans.get(i));
}
body.appendByte((byte) ']');
return new PostCall(body);
}

private class PostCall extends Call<Void> implements Handler<AsyncResult<Callback<Void>>> {

private final Promise<Callback<Void>> promise = Promise.promise();
private final Future<Callback<Void>> fut = promise.future().onComplete(this);
private final Buffer body;

PostCall(Buffer body) {
this.body = body;
}

@Override
public void handle(AsyncResult<Callback<Void>> ar) {
if (ar.succeeded()) {
Callback<Void> callback = ar.result();
RequestOptions options = new RequestOptions()
.setMethod(HttpMethod.POST)
.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
if (endpoint.startsWith("http://") || endpoint.startsWith("https://")) {
options.setAbsoluteURI(endpoint);
} else {
options.setURI(endpoint);
}
client.request(options)
.compose(req -> req
.send(body)
.compose(HttpClientResponse::body))
.onComplete(res -> {
if (res.succeeded()) {
callback.onSuccess(null);
} else {
callback.onError(res.cause());
}
});
}
}

@Override
public Void execute() throws IOException {
CompletableFuture<Void> fut = new CompletableFuture<>();
enqueue(new Callback<Void>() {
@Override
public void onSuccess(Void value) {
fut.complete(null);
}
@Override
public void onError(Throwable t) {
fut.completeExceptionally(t);
}
});
try {
return fut.get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} catch (TimeoutException e) {
throw new IOException(e);
}
}

@Override
public void enqueue(Callback<Void> callback) {
if (!promise.tryComplete(callback)) {
throw new IllegalStateException();
}
}

@Override
public void cancel() {
}

@Override
public boolean isCanceled() {
return false;
}
return body;
}

@Override
public Call<Void> clone() {
return new PostCall(body);
@Override
protected void postSpans(RequestOptions requestOptions, Buffer body) throws IOException {
try {
client.request(requestOptions)
.compose(req -> req
.send(body)
.compose(HttpClientResponse::body))
.await(20, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new IOException(e);
}
}

@Override
public void close() throws IOException {
public void doClose() {
client.close();
vertx.close();
}
Expand Down
Loading
Loading