Skip to content

Commit

Permalink
Implement grpc-timeout and deadlines.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 14, 2023
1 parent 727efa1 commit bace434
Show file tree
Hide file tree
Showing 18 changed files with 585 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
*/
package io.vertx.grpc.client;

import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.*;
import io.vertx.core.net.SocketAddress;

import java.util.concurrent.Executor;
Expand All @@ -34,20 +30,20 @@ public GrpcClientChannel(GrpcClient client, SocketAddress server) {

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {

String encoding = callOptions.getCompressor();

Compressor compressor;
if (encoding != null) {
compressor = CompressorRegistry.getDefaultInstance().lookupCompressor(encoding);
} else {
compressor = null;
}


Executor exec = callOptions.getExecutor();

return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor);
Deadline deadline = callOptions.getDeadline();
if (deadline == null) {
Context ctx = Context.current();
deadline = ctx.getDeadline();
}
return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor, deadline);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.vertx.grpc.common.GrpcWriteStream;
import io.vertx.grpc.common.ServiceName;

import java.util.concurrent.TimeUnit;

/**
* A request to a gRPC server.
*
Expand Down Expand Up @@ -95,6 +97,9 @@ public interface GrpcClientRequest<Req, Resp> extends GrpcWriteStream<Req> {
@Override
GrpcClientRequest<Req, Resp> drainHandler(@Nullable Handler<Void> handler);

@Fluent
GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit);

/**
* Sets the amount of time after which, if the request does not return any data within the timeout period,
* the request/response is cancelled and the related futures.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package io.vertx.grpc.client;

import io.grpc.ClientCall;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.*;
import io.vertx.core.Future;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.net.SocketAddress;
Expand All @@ -19,6 +13,7 @@

import javax.annotation.Nullable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

class VertxClientCall<RequestT, ResponseT> extends ClientCall<RequestT, ResponseT> {

Expand All @@ -28,20 +23,28 @@ class VertxClientCall<RequestT, ResponseT> extends ClientCall<RequestT, Response
private final MethodDescriptor<RequestT, ResponseT> methodDescriptor;
private final String encoding;
private final Compressor compressor;
private final Deadline deadline;
private Future<GrpcClientRequest<RequestT, ResponseT>> fut;
private Listener<ResponseT> listener;
private WriteStreamAdapter<RequestT> writeAdapter;
private ReadStreamAdapter<ResponseT> readAdapter;
private GrpcClientRequest<RequestT, ResponseT> request;
private GrpcClientResponse<RequestT, ResponseT> grpcResponse;

VertxClientCall(GrpcClient client, SocketAddress server, Executor exec, MethodDescriptor<RequestT, ResponseT> methodDescriptor, String encoding, Compressor compressor) {
VertxClientCall(GrpcClient client,
SocketAddress server,
Executor exec,
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
String encoding,
Compressor compressor,
Deadline deadline) {
this.client = client;
this.server = server;
this.exec = exec;
this.methodDescriptor = methodDescriptor;
this.encoding = encoding;
this.compressor = compressor;
this.deadline = deadline;
writeAdapter = new WriteStreamAdapter<RequestT>() {
@Override
protected void handleReady() {
Expand Down Expand Up @@ -73,6 +76,10 @@ public void start(Listener<ResponseT> responseListener, Metadata headers) {
if (ar1.succeeded()) {
request = ar1.result();
Utils.writeMetadata(headers, request.headers());
if (deadline != null) {
long timeout = deadline.timeRemaining(TimeUnit.MILLISECONDS);
request.timeout(timeout, TimeUnit.MILLISECONDS);
}
if (encoding != null) {
request.encoding(encoding);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package io.vertx.grpc.client.impl;

import io.vertx.core.AsyncResult;
import io.grpc.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand All @@ -19,10 +19,11 @@

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.vertx.core.http.HttpConnection;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.FutureInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.grpc.client.GrpcClientResponse;
import io.vertx.grpc.common.CodecException;
Expand All @@ -32,12 +33,14 @@
import io.vertx.grpc.common.GrpcMessageEncoder;
import io.vertx.grpc.common.ServiceName;
import io.vertx.grpc.common.impl.GrpcMessageImpl;
import io.vertx.grpc.common.impl.VertxScheduledExecutorService;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class GrpcClientRequestImpl<Req, Resp> implements GrpcClientRequest<Req, Resp> {

private final ContextInternal context;
private final HttpClientRequest httpRequest;
private final GrpcMessageEncoder<Req> messageEncoder;
private ServiceName serviceName;
Expand All @@ -48,16 +51,21 @@ public class GrpcClientRequestImpl<Req, Resp> implements GrpcClientRequest<Req,
boolean trailersSent;
private Future<GrpcClientResponse<Req, Resp>> response;
private MultiMap headers;
private long timeout;
private TimeUnit timeoutUnit;
private io.grpc.Context grpcContext;

public GrpcClientRequestImpl(HttpClientRequest httpRequest, GrpcMessageEncoder<Req> messageEncoder, GrpcMessageDecoder<Resp> messageDecoder) {

this.context = ((PromiseInternal<?>)httpRequest.response()).context();
this.httpRequest = httpRequest;
this.messageEncoder = messageEncoder;
this.response = httpRequest.response().map(httpResponse -> {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(this, httpResponse, messageDecoder);
grpcResponse.init();
return grpcResponse;
});
this.response = httpRequest
.response()
.map(httpResponse -> {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, grpcContext, this, httpResponse, messageDecoder);
grpcResponse.init();
return grpcResponse;
});
}

@Override
Expand Down Expand Up @@ -126,6 +134,19 @@ public GrpcClientRequest<Req, Resp> drainHandler(Handler<Void> handler) {
return this;
}

@Override
public GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit) {
if (timeout < 0L) {
throw new IllegalArgumentException("Timeout must be positive");
}
if (headersSent) {
throw new IllegalStateException("Timeout must be set before sending request headers");
}
this.timeoutUnit = Objects.requireNonNull(unit);
this.timeout = timeout;
return this;
}

@Override
public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
httpRequest.idleTimeout(timeout);
Expand Down Expand Up @@ -198,6 +219,9 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) {
requestHeaders.add(header.getKey(), header.getValue());
}
}
if (timeout > 0L) {
httpRequest.putHeader("grpc-timeout", timeoutUnit.toMicros(timeout) + "u");
}
String uri = serviceName.pathOf(methodName);
httpRequest.putHeader("content-type", "application/grpc");
if (encoding != null) {
Expand All @@ -209,7 +233,15 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) {
httpRequest.setURI(uri);
headersSent = true;
}

if (timeout > 0L) {
Context.CancellableContext deadlineContext = Context.current().withDeadlineAfter(timeout, timeoutUnit, new VertxScheduledExecutorService(context));
deadlineContext.addListener(ctx_ -> {
cancel();
}, Runnable::run);
grpcContext = deadlineContext;
} else {
grpcContext = Context.current();
}
if (end) {
trailersSent = true;
return httpRequest.end(GrpcMessageImpl.encode(message));
Expand Down Expand Up @@ -238,9 +270,7 @@ public void cancel() {
return;
}
cancelled = true;
// That's a bit convoluted, the reset API should be improved instead
ContextInternal ctx = ((FutureInternal) (response)).context();
ctx.execute(() -> {
context.execute(() -> {
boolean responseEnded;
if (response.failed()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.vertx.grpc.client.impl;

import io.grpc.Context;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand All @@ -18,6 +19,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientResponse;

import io.vertx.core.impl.ContextInternal;
import io.vertx.grpc.client.GrpcClientResponse;
import io.vertx.grpc.common.CodecException;
import io.vertx.grpc.common.GrpcMessageDecoder;
Expand All @@ -37,8 +39,11 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
private String statusMessage;
private String encoding;

public GrpcClientResponseImpl(GrpcClientRequestImpl<Req, Resp> request, HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(Vertx.currentContext(), httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder); // A bit ugly
public GrpcClientResponseImpl(ContextInternal context,
io.grpc.Context grpcContext,
GrpcClientRequestImpl<Req, Resp> request,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(context, grpcContext, httpResponse, httpResponse.headers().get("grpc-encoding"), messageDecoder);
this.request = request;
this.encoding = httpResponse.headers().get("grpc-encoding");
this.httpResponse = httpResponse;
Expand Down Expand Up @@ -71,6 +76,9 @@ public MultiMap trailers() {
}

protected void handleEnd() {
if (grpcContext instanceof Context.CancellableContext) {
((Context.CancellableContext)grpcContext).close();
}
String responseStatus = httpResponse.getTrailer("grpc-status");
if (responseStatus != null) {
status = GrpcStatus.valueOf(Integer.parseInt(responseStatus));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,7 @@
*/
package io.vertx.grpc.client;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.*;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
Expand All @@ -45,9 +35,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -546,4 +539,67 @@ private void testGrpcResponseHttpError(TestContext should, Handler<HttpServerReq
should.assertEquals(expectedStatus, e.getStatus().getCode());
}
}

@Test
public void testTimeoutOnClient(TestContext should) throws Exception {
testTimeoutOnClient(should, stub -> {
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
stub
.withDeadlineAfter(2, TimeUnit.SECONDS)
.sayHello(request);
});
}

@Test
public void testTimeoutOnClientPropagation(TestContext should) throws Exception {
testTimeoutOnClient(should, stub -> {
Context current = Context.current();
Context.CancellableContext ctx = current.withDeadlineAfter(2, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor());
try {
ctx.call(() -> {
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
stub
.withDeadlineAfter(2, TimeUnit.SECONDS)
.sayHello(request);
return null;
});
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
should.fail();
}
}
});
}

public void testTimeoutOnClient(TestContext should, Consumer<GreeterGrpc.GreeterBlockingStub> c) throws Exception {
super.testTimeoutOnClient(should);
client = GrpcClient.client(vertx);
GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, "localhost"));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
try {
c.accept(stub);
} catch (StatusRuntimeException e) {
should.assertEquals(Status.Code.CANCELLED, e.getStatus().getCode());
}
}

@Test
public void testTimeoutPropagationToServer(TestContext should) throws Exception {
CompletableFuture<Long> cf = new CompletableFuture<>();
super.testTimeoutPropagationToServer(cf);
client = GrpcClient.client(vertx);
GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(port, "localhost"));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
try {
HelloRequest request = HelloRequest.newBuilder().setName("Julien").build();
stub
.withDeadlineAfter(2, TimeUnit.SECONDS)
.sayHello(request);
} catch (StatusRuntimeException e) {
e.printStackTrace();
should.assertEquals(Status.Code.CANCELLED, e.getStatus().getCode());
}
}
}
Loading

0 comments on commit bace434

Please sign in to comment.