diff --git a/vertx-grpc-client/src/main/asciidoc/client.adoc b/vertx-grpc-client/src/main/asciidoc/client.adoc index b4699bff..fd1b2606 100644 --- a/vertx-grpc-client/src/main/asciidoc/client.adoc +++ b/vertx-grpc-client/src/main/asciidoc/client.adoc @@ -1,8 +1,8 @@ == Vert.x gRPC Client -Vert.x gRPC Client is a new gRPC client powered by Vert.x HTTP client superseding the integrated Netty based gRPC client. +Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client. -This client provides a gRPC request/response oriented API as well as a the generated stub approach with a gRPC Channel +This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel === Using Vert.x gRPC Client @@ -41,7 +41,7 @@ You can easily create the gRPC client ==== Request/response -Any interaction with a gRPC server involves creating a request to the remote gRPC service +Interacting with a gRPC server involves creating a request to the remote gRPC service [source,java] ---- @@ -65,7 +65,7 @@ Future composition can combine all the previous steps together in a compact fash ==== Streaming request -A streaming request involves calling `{@link io.vertx.grpc.client.GrpcClientRequest#write}` for each element of the stream +Streaming requests involve calling `{@link io.vertx.grpc.client.GrpcClientRequest#write}` for each element of the stream and using `{@link io.vertx.grpc.client.GrpcClientRequest#end()}` to end the stream [source,java] @@ -75,7 +75,7 @@ and using `{@link io.vertx.grpc.client.GrpcClientRequest#end()}` to end the stre ==== Streaming response -You can set handlers to process response events +You can set handlers to process response events of a streaming response [source,java] ---- @@ -104,6 +104,26 @@ You can pause/resume/fetch a response {@link examples.GrpcClientExamples#responseFlowControl} ---- +=== Timeout and deadlines + +The gRPC client handles timeout and deadlines, setting a timeout on a gRPC request instructs the client to send the timeout +information to make the server aware that the client desires a response within a defined time. + +In addition, the client shall be configured to schedule a deadline: when a timeout is set on a request, the client schedules +locally a timer to cancel the request when the response has not been received in time. + +[source,java] +---- +{@link examples.GrpcClientExamples#requestWithDeadline} +---- + +The timeout can also be set on a per-request basis. + +[source,java] +---- +{@link examples.GrpcClientExamples#requestWithDeadline2} +---- + === Cancellation You can call `{@link io.vertx.grpc.client.GrpcClientRequest#cancel}` to cancel a request @@ -126,7 +146,7 @@ You can compress request messages by setting the request encoding *prior* before === Decompression -Decompression is done transparently by the client when the server send encoded responses. +Decompression is achieved transparently by the client when the server sends encoded responses. === Stub API @@ -134,9 +154,20 @@ The Vert.x gRPC Client provides a gRPC channel to use with a generated client st [source,java] ---- -{@link examples.GrpcClientExamples#stubExample} +{@link examples.GrpcClientExamples#stub} ---- +Timeout and deadlines are supported through the usual gRPC API. + +[source,java] +---- +{@link examples.GrpcClientExamples#stubWithDeadline} +---- + +Deadline are cascaded, e.g. when the current `io.grpc.Context` carries a deadline and the stub has no explicit deadline +set, the client automatically inherits the implicit deadline. Such deadline can be set when using a stub within a gRPC server +call. + === Message level API The client provides a message level API to interact directly with protobuf encoded gRPC messages. diff --git a/vertx-grpc-client/src/main/java/examples/GrpcClientExamples.java b/vertx-grpc-client/src/main/java/examples/GrpcClientExamples.java index 1027806a..d9b2b8e7 100644 --- a/vertx-grpc-client/src/main/java/examples/GrpcClientExamples.java +++ b/vertx-grpc-client/src/main/java/examples/GrpcClientExamples.java @@ -7,13 +7,12 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.net.SocketAddress; import io.vertx.docgen.Source; -import io.vertx.grpc.client.GrpcClient; -import io.vertx.grpc.client.GrpcClientChannel; -import io.vertx.grpc.client.GrpcClientRequest; -import io.vertx.grpc.client.GrpcClientResponse; +import io.vertx.grpc.client.*; import io.vertx.grpc.common.GrpcMessage; import io.vertx.grpc.common.ServiceName; +import java.util.concurrent.TimeUnit; + @Source public class GrpcClientExamples { @@ -122,26 +121,61 @@ public void requestCompression(GrpcClientRequest request) { request.write(Item.newBuilder().setValue("item-3").build()); } - public void stubExample(GrpcClient client) { + public void stub(GrpcClient client) { GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com")); GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel); - greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver() { + StreamObserver observer = new StreamObserver() { @Override public void onNext(HelloReply value) { // Process response } + @Override public void onCompleted() { // Done } + @Override public void onError(Throwable t) { // Something went bad } + }; + + greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer); + } + + public void stubWithDeadline(GrpcClientChannel channel, StreamObserver observer) { + + GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS); + + greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer); + } + + public void requestWithDeadline(Vertx vertx) { + + // Set a 10 seconds timeout that will be sent to the gRPC service + // Let the client schedule a deadline + GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions() + .setTimeout(10) + .setTimeoutUnit(TimeUnit.SECONDS) + .setScheduleDeadlineAutomatically(true)); + } + + public void requestWithDeadline2(GrpcClient client, SocketAddress server, MethodDescriptor sayHelloMethod) { + + Future> fut = client.request(server, sayHelloMethod); + fut.onSuccess(request -> { + + request + // Given this request, set a 10 seconds timeout that will be sent to the gRPC service + .timeout(10, TimeUnit.SECONDS); + + request.end(HelloRequest.newBuilder().setName("Bob").build()); }); + } public void protobufLevelAPI(GrpcClient client, Buffer protoHello, SocketAddress server) { diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClient.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClient.java index 3ba08c94..fd4d04c6 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClient.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClient.java @@ -52,6 +52,28 @@ static GrpcClient client(Vertx vertx) { return new GrpcClientImpl(vertx); } + /** + * Create a client. + * + * @param vertx the vertx instance + * @return the created client + */ + static GrpcClient client(Vertx vertx, GrpcClientOptions options) { + return new GrpcClientImpl(vertx, options, new HttpClientOptions().setHttp2ClearTextUpgrade(false)); + } + + /** + * Create a client with the specified {@code options}. + * + * @param vertx the vertx instance + * @param grpcOptions the http client options + * @param httpOptions the http client options + * @return the created client + */ + static GrpcClient client(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientOptions httpOptions) { + return new GrpcClientImpl(vertx, grpcOptions, httpOptions); + } + /** * Create a client with the specified {@code options}. * @@ -60,7 +82,7 @@ static GrpcClient client(Vertx vertx) { * @return the created client */ static GrpcClient client(Vertx vertx, HttpClientOptions options) { - return new GrpcClientImpl(vertx, options); + return new GrpcClientImpl(vertx, new GrpcClientOptions(), options); } /** diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientOptions.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientOptions.java new file mode 100644 index 00000000..70e03dde --- /dev/null +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientOptions.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.grpc.client; + +import io.vertx.codegen.annotations.DataObject; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Options configuring a gRPC client. + */ +@DataObject +public class GrpcClientOptions { + + /** + * The default value for automatic deadline schedule = {@code false}. + */ + public static final boolean DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY = false; + + /** + * The default value of the timeout = {@code 0} (no timeout). + */ + public static final int DEFAULT_TIMEOUT = 0; + + /** + * The default value of the timeout unit = {@link TimeUnit#SECONDS}. + */ + public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS; + + private boolean scheduleDeadlineAutomatically; + private int timeout; + private TimeUnit timeoutUnit; + + /** + * Default constructor. + */ + public GrpcClientOptions() { + scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY; + timeout = DEFAULT_TIMEOUT; + timeoutUnit = DEFAULT_TIMEOUT_UNIT; + } + + /** + * Copy constructor. + * + * @param other the options to copy + */ + public GrpcClientOptions(GrpcClientOptions other) { + scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically; + timeout = other.timeout; + timeoutUnit = other.timeoutUnit; + } + + /** + * @return whether the client will automatically schedule a deadline when a request carrying a timeout is sent. + */ + public boolean getScheduleDeadlineAutomatically() { + return scheduleDeadlineAutomatically; + } + + /** + *

Set whether a deadline is automatically scheduled when a request carrying a timeout (either set explicitly or through this + * options instance) is sent.

+ * + *
    + *
  • When the automatic deadline is set and a request carrying a timeout is sent, a deadline (timer) is created to cancel the request + * when the response has not been timely received. The deadline can be obtained with {@link GrpcClientRequest#deadline()}.
  • + *
  • When the deadline is not set and a request carrying a timeout is sent, the timeout is sent to the server and it remains the + * responsibility of the caller to eventually cancel the request. Note: the server might cancel the request as well when its local deadline is met.
  • + *
+ * + * @param handleDeadlineAutomatically whether to automatically set + * @return a reference to this, so the API can be used fluently + */ + public GrpcClientOptions setScheduleDeadlineAutomatically(boolean handleDeadlineAutomatically) { + this.scheduleDeadlineAutomatically = handleDeadlineAutomatically; + return this; + } + + /** + * Return the default timeout set when sending gRPC requests, the initial value is {@code 0} which does not + * send a timeout. + * + * @return the default timeout. + */ + public int getTimeout() { + return timeout; + } + + /** + * Set the default timeout set when sending gRPC requests, this value should be set along with {@link #setTimeoutUnit(TimeUnit)}. + * + * @param timeout the timeout value + * @return a reference to this, so the API can be used fluently + */ + public GrpcClientOptions setTimeout(int timeout) { + if (timeout < 0L) { + throw new IllegalArgumentException("Timeout value must be >= 0"); + } + this.timeout = timeout; + return this; + } + + /** + * @return the unit of time of the default timeout. + */ + public TimeUnit getTimeoutUnit() { + return timeoutUnit; + } + + /** + * Set the unit of time of the default timeout value. + * + * @param timeoutUnit the unit of time + * @return a reference to this, so the API can be used fluently + */ + public GrpcClientOptions setTimeoutUnit(TimeUnit timeoutUnit) { + this.timeoutUnit = Objects.requireNonNull(timeoutUnit); + return this; + } +} diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientRequest.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientRequest.java index a1e76beb..c5c0e523 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientRequest.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/GrpcClientRequest.java @@ -94,13 +94,24 @@ public interface GrpcClientRequest extends GrpcWriteStream { @Override GrpcClientRequest drainHandler(@Nullable Handler handler); + /** + *

Set a {@code grpc-timeout} header to be sent to the server to indicate the client expects a response with + * a timeout.

+ * + *

When the request handle deadline a timer will be set when sending the request to cancel the request when the response + * has not been received in time.

+ * + * @param timeout + * @param unit + * @return + */ @Fluent GrpcClientRequest timeout(long timeout, TimeUnit unit); /** - * Schedule a deadline when sending this request + * @return the request deadline or {@code null} when no deadline has been scheduled */ - Timer scheduleDeadline(); + Timer deadline(); /** * Sets the amount of time after which, if the request does not return any data within the timeout period, diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java index a106e1ec..de9bc7af 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/VertxClientCall.java @@ -79,12 +79,7 @@ public void start(Listener responseListener, Metadata headers) { if (deadline != null) { long timeout = deadline.timeRemaining(TimeUnit.MILLISECONDS); request.timeout(timeout, TimeUnit.MILLISECONDS); - sf = deadline.runOnExpiration(new Runnable() { - @Override - public void run() { - request.cancel(); - } - }, new VertxScheduledExecutorService(((GrpcClientRequestImpl)request).context())); + sf = deadline.runOnExpiration(() -> request.cancel(), new VertxScheduledExecutorService(((GrpcClientRequestImpl)request).context())); } else { sf = null; } diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java index 33dc4380..91ae2062 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java @@ -19,13 +19,17 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpVersion; import io.vertx.core.http.RequestOptions; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; import io.vertx.core.net.Address; -import io.vertx.core.net.SocketAddress; import io.vertx.grpc.client.GrpcClient; +import io.vertx.grpc.client.GrpcClientOptions; import io.vertx.grpc.client.GrpcClientRequest; import io.vertx.grpc.common.GrpcMessageDecoder; import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.impl.GrpcRequestLocal; + +import java.util.concurrent.TimeUnit; /** * @author Julien Viet @@ -35,29 +39,38 @@ public class GrpcClientImpl implements GrpcClient { private final Vertx vertx; private HttpClient client; private boolean closeClient; + private final boolean scheduleDeadlineAutomatically; + private final int timeout; + private final TimeUnit timeoutUnit; - public GrpcClientImpl(Vertx vertx, HttpClientOptions options) { - this(vertx, vertx.createHttpClient(new HttpClientOptions(options) - .setProtocolVersion(HttpVersion.HTTP_2)), true); + public GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientOptions httpOptions) { + this(vertx, grpcOptions, vertx.createHttpClient(new HttpClientOptions(httpOptions).setProtocolVersion(HttpVersion.HTTP_2)), true); } public GrpcClientImpl(Vertx vertx) { - this(vertx, new HttpClientOptions().setHttp2ClearTextUpgrade(false)); + this(vertx, new GrpcClientOptions(), new HttpClientOptions().setHttp2ClearTextUpgrade(false)); } public GrpcClientImpl(Vertx vertx, HttpClient client) { - this(vertx, client, false); + this(vertx, new GrpcClientOptions(), client, false); } - private GrpcClientImpl(Vertx vertx, HttpClient client, boolean close) { + private GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient client, boolean close) { this.vertx = vertx; this.client = client; + this.scheduleDeadlineAutomatically = grpcOptions.getScheduleDeadlineAutomatically(); + this.timeout = grpcOptions.getTimeout(); + this.timeoutUnit = grpcOptions.getTimeoutUnit(); this.closeClient = close; } private Future> request(RequestOptions options) { return client.request(options) - .map(request -> new GrpcClientRequestImpl<>(request, GrpcMessageEncoder.IDENTITY, GrpcMessageDecoder.IDENTITY)); + .map(httpRequest -> { + GrpcClientRequestImpl grpcRequest = new GrpcClientRequestImpl<>(httpRequest, scheduleDeadlineAutomatically, GrpcMessageEncoder.IDENTITY, GrpcMessageDecoder.IDENTITY); + configureTimeout(grpcRequest); + return grpcRequest; + }); } @Override @@ -70,13 +83,29 @@ public Future> request(Address server) { return request(new RequestOptions().setMethod(HttpMethod.POST).setServer(server)); } + private void configureTimeout(GrpcClientRequest request) { + ContextInternal current = (ContextInternal) vertx.getOrCreateContext(); + GrpcRequestLocal local = current.getLocal(GrpcRequestLocal.CONTEXT_LOCAL_KEY); + long timeout = this.timeout; + TimeUnit timeoutUnit = this.timeoutUnit; + if (local != null) { + timeout = local.deadline - System.currentTimeMillis(); + timeoutUnit = TimeUnit.MILLISECONDS; + if (timeout < 0L) { + throw new UnsupportedOperationException("Handle this case"); + } + } + request.timeout(timeout, timeoutUnit); + } + private Future> request(RequestOptions options, MethodDescriptor service) { GrpcMessageDecoder messageDecoder = GrpcMessageDecoder.unmarshaller(service.getResponseMarshaller()); GrpcMessageEncoder messageEncoder = GrpcMessageEncoder.marshaller(service.getRequestMarshaller()); return client.request(options) .map(request -> { - GrpcClientRequestImpl call = new GrpcClientRequestImpl<>(request, messageEncoder, messageDecoder); + GrpcClientRequestImpl call = new GrpcClientRequestImpl<>(request, scheduleDeadlineAutomatically, messageEncoder, messageDecoder); call.fullMethodName(service.getFullMethodName()); + configureTimeout(call); return call; }); } diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java index f92b0f8d..91442fc7 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientRequestImpl.java @@ -17,6 +17,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientRequest; +import java.util.EnumMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -42,6 +43,7 @@ public class GrpcClientRequestImpl implements GrpcClientRequest messageEncoder; + private final boolean scheduleDeadline; private ServiceName serviceName; private String methodName; private String encoding = null; @@ -52,12 +54,19 @@ public class GrpcClientRequestImpl implements GrpcClientRequest messageEncoder, GrpcMessageDecoder messageDecoder) { + public GrpcClientRequestImpl(HttpClientRequest httpRequest, + boolean scheduleDeadline, + GrpcMessageEncoder messageEncoder, GrpcMessageDecoder messageDecoder) { this.context = ((PromiseInternal)httpRequest.response()).context(); this.httpRequest = httpRequest; this.messageEncoder = messageEncoder; + this.scheduleDeadline = scheduleDeadline; + this.timeout = 0L; + this.timeoutUnit = null; + this.timeoutHeader = null; this.response = httpRequest .response() .map(httpResponse -> { @@ -145,22 +154,19 @@ public GrpcClientRequest timeout(long timeout, TimeUnit unit) { if (headersSent) { throw new IllegalStateException("Timeout must be set before sending request headers"); } - this.timeoutUnit = Objects.requireNonNull(unit); + String headerValue = toTimeoutHeader(timeout, unit); + if (headerValue == null) { + throw new IllegalArgumentException("Not a valid gRPC timeout value (" + timeout + ',' + unit + ')'); + } this.timeout = timeout; + this.timeoutUnit = unit; + this.timeoutHeader = headerValue; return this; } @Override - public Timer scheduleDeadline() { - if (timeout > 0L && timeoutTimer ==null) { - Timer timer = context.timer(timeout, TimeUnit.MILLISECONDS); - timeoutTimer = timer; - timer.onSuccess(v -> { - cancel(); - }); - return timer; - } - throw new IllegalStateException(); + public Timer deadline() { + return deadline; } @Override @@ -236,7 +242,7 @@ private Future writeMessage(GrpcMessage message, boolean end) { } } if (timeout > 0L) { - httpRequest.putHeader("grpc-timeout", timeoutUnit.toMicros(timeout) + "u"); + httpRequest.putHeader("grpc-timeout", timeoutHeader); } String uri = serviceName.pathOf(methodName); httpRequest.putHeader("content-type", "application/grpc"); @@ -247,6 +253,13 @@ private Future writeMessage(GrpcMessage message, boolean end) { httpRequest.putHeader("te", "trailers"); httpRequest.setChunked(true); httpRequest.setURI(uri); + if (scheduleDeadline && timeout > 0L) { + Timer timer = context.timer(timeout, timeoutUnit); + deadline = timer; + timer.onSuccess(v -> { + cancel(); + }); + } headersSent = true; } if (end) { @@ -258,9 +271,9 @@ private Future writeMessage(GrpcMessage message, boolean end) { } void cancelTimeout() { - Timer timer = timeoutTimer; + Timer timer = deadline; if (timer != null && timer.cancel()) { - timeoutTimer = null; + deadline = null; } } @@ -308,4 +321,49 @@ public void cancel() { public HttpConnection connection() { return httpRequest.connection(); } + + private static final EnumMap GRPC_TIMEOUT_UNIT_SUFFIXES = new EnumMap<>(TimeUnit.class); + + static { + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.NANOSECONDS, 'n'); + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.MICROSECONDS, 'u'); + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.MILLISECONDS, 'm'); + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.SECONDS, 'S'); + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.MINUTES, 'M'); + GRPC_TIMEOUT_UNIT_SUFFIXES.put(TimeUnit.HOURS, 'H'); + } + + private static final TimeUnit[] GRPC_TIMEOUT_UNITS = { + TimeUnit.NANOSECONDS, + TimeUnit.MICROSECONDS, + TimeUnit.MILLISECONDS, + TimeUnit.SECONDS, + TimeUnit.MINUTES, + TimeUnit.HOURS, + }; + + /** + * Compute timeout header, returns {@code null} when the timeout value is not valid. + * + * @param timeout the timeout + * @param timeoutUnit the timeout unit + * @return the grpc-timeout header value, e.g. 1M (1 minute) + */ + static String toTimeoutHeader(long timeout, TimeUnit timeoutUnit) { + for (TimeUnit grpcTimeoutUnit : GRPC_TIMEOUT_UNITS) { + String res = toTimeoutHeader(timeout, timeoutUnit, grpcTimeoutUnit); + if (res != null) { + return res; + } + } + return null; + } + + private static String toTimeoutHeader(long timeout, TimeUnit srcUnit, TimeUnit grpcTimeoutUnit) { + long v = grpcTimeoutUnit.convert(timeout, srcUnit); + if (v < 1_000_000_00) { + return Long.toString(v) + GRPC_TIMEOUT_UNIT_SUFFIXES.get(grpcTimeoutUnit); + } + return null; + } } diff --git a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java index 3096df76..a4e43b74 100644 --- a/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java +++ b/vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientResponseImpl.java @@ -75,9 +75,6 @@ public MultiMap trailers() { } protected void handleEnd() { -// if (grpcContext instanceof Context.CancellableContext) { -// ((Context.CancellableContext)grpcContext).close(); -// } request.cancelTimeout(); String responseStatus = httpResponse.getTrailer("grpc-status"); if (responseStatus != null) { diff --git a/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientRequestTest.java b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientRequestTest.java index f498cbb1..ea293568 100644 --- a/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientRequestTest.java +++ b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientRequestTest.java @@ -21,12 +21,15 @@ import io.grpc.stub.StreamObserver; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.StreamResetException; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.SelfSignedCertificate; import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.context.storage.AccessMode; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.grpc.common.GrpcReadStream; import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.impl.GrpcRequestLocal; import org.junit.Test; import java.io.File; @@ -533,12 +536,11 @@ public void sayHello(HelloRequest request, StreamObserver plainRespo @Test public void testTimeoutOnClient(TestContext should) throws Exception { super.testTimeoutOnClient(should); - client = GrpcClient.client(vertx); + client = GrpcClient.client(vertx, new GrpcClientOptions().setScheduleDeadlineAutomatically(true)); client.request(SocketAddress.inetSocketAddress(port, "localhost"), StreamingGrpc.getSinkMethod()) .onComplete(should.asyncAssertSuccess(callRequest -> { callRequest - .timeout(1, TimeUnit.SECONDS) - .scheduleDeadline(); + .timeout(1, TimeUnit.SECONDS); callRequest.write(Item.getDefaultInstance()); callRequest.response().onComplete(should.asyncAssertFailure(err -> { should.assertTrue(err instanceof StreamResetException); @@ -548,17 +550,35 @@ public void testTimeoutOnClient(TestContext should) throws Exception { })); } + @Test + public void testTimeoutOnClientPropagation(TestContext should) throws Exception { + super.testTimeoutOnClient(should); + client = GrpcClient.client(vertx, new GrpcClientOptions().setScheduleDeadlineAutomatically(true)); + ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); + context.runOnContext(v -> { + context.putLocal(GrpcRequestLocal.CONTEXT_LOCAL_KEY, AccessMode.CONCURRENT, new GrpcRequestLocal(System.currentTimeMillis() + 1000)); + client.request(SocketAddress.inetSocketAddress(port, "localhost"), StreamingGrpc.getSinkMethod()) + .onComplete(should.asyncAssertSuccess(callRequest -> { + callRequest.write(Item.getDefaultInstance()); + callRequest.response().onComplete(should.asyncAssertFailure(err -> { + should.assertTrue(err instanceof StreamResetException); + StreamResetException sre = (StreamResetException) err; + should.assertEquals(8L, sre.getCode()); + })); + })); + }); + } + @Test public void testTimeoutPropagationToServer(TestContext should) throws Exception { CompletableFuture cf = new CompletableFuture<>(); super.testTimeoutPropagationToServer(cf); Async done = should.async(); - client = GrpcClient.client(vertx); + client = GrpcClient.client(vertx, new GrpcClientOptions().setScheduleDeadlineAutomatically(true)); client.request(SocketAddress.inetSocketAddress(port, "localhost"), GreeterGrpc.getSayHelloMethod()) .onComplete(should.asyncAssertSuccess(callRequest -> { callRequest - .timeout(10, TimeUnit.SECONDS) - .scheduleDeadline(); + .timeout(10, TimeUnit.SECONDS); callRequest.end(HelloRequest.newBuilder().setName("Julien").build()); callRequest.response().onComplete(should.asyncAssertSuccess(e -> { long timeRemaining = cf.getNow(-1L); diff --git a/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientTest.java b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientTest.java index 6752dbcb..942f32ac 100644 --- a/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientTest.java +++ b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/ClientTest.java @@ -19,6 +19,7 @@ import io.grpc.examples.streaming.StreamingGrpc; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.StreamResetException; import io.vertx.core.net.SocketAddress; diff --git a/vertx-grpc-client/src/test/java/io/vertx/grpc/client/impl/TimeoutValueTest.java b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/impl/TimeoutValueTest.java new file mode 100644 index 00000000..3cd98199 --- /dev/null +++ b/vertx-grpc-client/src/test/java/io/vertx/grpc/client/impl/TimeoutValueTest.java @@ -0,0 +1,28 @@ +package io.vertx.grpc.client.impl; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class TimeoutValueTest { + + private static final long MAX = 99_999_999; + + @Test + public void testValue() { + assertEquals(MAX + "n", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.NANOSECONDS)); + assertEquals((MAX + 1) / 1000 + "u", GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.NANOSECONDS)); + assertEquals(MAX + "u", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.MICROSECONDS)); + assertEquals((MAX + 1) / 1000 + "m", GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.MICROSECONDS)); + assertEquals(MAX + "m", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.MILLISECONDS)); + assertEquals((MAX + 1) / 1000 + "S", GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.MILLISECONDS)); + assertEquals(MAX + "S", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.SECONDS)); + assertEquals((MAX + 1) / 60 + "M", GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.SECONDS)); + assertEquals(MAX + "M", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.MINUTES)); + assertEquals((MAX + 1) / 60 + "H", GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.MINUTES)); + assertEquals(MAX + "H", GrpcClientRequestImpl.toTimeoutHeader(MAX, TimeUnit.HOURS)); + assertEquals(null, GrpcClientRequestImpl.toTimeoutHeader(MAX + 1, TimeUnit.HOURS)); + } +} diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java index f4a92378..28bb9fc8 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcReadStreamBase.java @@ -213,7 +213,6 @@ public Future last() { @Override public Future end() { - // SHOULD BE MAPPED ON GRPC CONTEXT TOO ???? return end.future(); } } diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocal.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocal.java new file mode 100644 index 00000000..e786d896 --- /dev/null +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocal.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.grpc.common.impl; + +import io.vertx.core.spi.context.storage.ContextLocal; + +/** + * Request local for deadline propagation. + */ +public class GrpcRequestLocal { + + /** + * Context local key. + */ + public static final ContextLocal CONTEXT_LOCAL_KEY = GrpcRequestLocalRegistration.CONTEXT_LOCAL; + + public final long deadline; + + public GrpcRequestLocal(long deadline) { + this.deadline = deadline; + } +} diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocalRegistration.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocalRegistration.java new file mode 100644 index 00000000..4628eecc --- /dev/null +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/GrpcRequestLocalRegistration.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.grpc.common.impl; + +import io.vertx.core.impl.VertxBuilder; +import io.vertx.core.spi.VertxServiceProvider; +import io.vertx.core.spi.context.storage.ContextLocal; + +/** + * Registration of context local for {@link GrpcRequestLocal}. + */ +public class GrpcRequestLocalRegistration implements VertxServiceProvider { + + static final ContextLocal CONTEXT_LOCAL = ContextLocal.registerLocal(GrpcRequestLocal.class); + + @Override + public void init(VertxBuilder builder) { + } +} diff --git a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/VertxScheduledExecutorService.java b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/VertxScheduledExecutorService.java index b558c86e..5a2e7197 100644 --- a/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/VertxScheduledExecutorService.java +++ b/vertx-grpc-common/src/main/java/io/vertx/grpc/common/impl/VertxScheduledExecutorService.java @@ -1,3 +1,13 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ package io.vertx.grpc.common.impl; import io.netty.channel.EventLoop; @@ -6,6 +16,9 @@ import java.util.List; import java.util.concurrent.*; +/** + * Minimalistic scheduler for gRPC deadlines. + */ public class VertxScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService { private final io.vertx.core.impl.ContextInternal vertxContext; @@ -16,7 +29,6 @@ public VertxScheduledExecutorService(io.vertx.core.Context vertxContext) { @Override public void shutdown() { - } @Override diff --git a/vertx-grpc-common/src/main/resources/META-INF/services/io.vertx.core.spi.VertxServiceProvider b/vertx-grpc-common/src/main/resources/META-INF/services/io.vertx.core.spi.VertxServiceProvider new file mode 100644 index 00000000..705b8b68 --- /dev/null +++ b/vertx-grpc-common/src/main/resources/META-INF/services/io.vertx.core.spi.VertxServiceProvider @@ -0,0 +1 @@ +io.vertx.grpc.common.impl.GrpcRequestLocalRegistration diff --git a/vertx-grpc-context-storage/pom.xml b/vertx-grpc-context-storage/pom.xml index 0898b9be..2999419b 100644 --- a/vertx-grpc-context-storage/pom.xml +++ b/vertx-grpc-context-storage/pom.xml @@ -42,7 +42,7 @@ io.vertx vertx-grpc-server - + test io.grpc diff --git a/vertx-grpc-server/src/main/asciidoc/server.adoc b/vertx-grpc-server/src/main/asciidoc/server.adoc index da076be2..03c316fc 100644 --- a/vertx-grpc-server/src/main/asciidoc/server.adoc +++ b/vertx-grpc-server/src/main/asciidoc/server.adoc @@ -127,6 +127,34 @@ You can check the writability of a response and set a drain handler {@link examples.GrpcServerExamples#responseFlowControl} ---- +=== Timeout and deadlines + +The gRPC server handles timeout and deadlines. + +Whenever the service receives a request indicating a timeout, the timeout can be retrieved. + +[source,java] +---- +{@link examples.GrpcServerExamples#checkTimeout} +---- + +By default, the server + +- does not schedule automatically a deadline for a given request +- does not automatically propagate the deadline to a vertx client + +The server can schedule deadlines: when a request carries a timeout, the server schedules +locally a timer to cancel the request when the response has not been sent in time. + +The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline +and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute +a timeout to be sent and cascade the timeout to another gRPC server. + +[source,java] +---- +{@link examples.GrpcServerExamples#deadlineConfiguration} +---- + === Compression You can compress response messages by setting the response encoding *prior* before sending any message @@ -153,6 +181,8 @@ The Vert.x gRPC Server can bridge a gRPC service to use with a generated server {@link examples.GrpcServerExamples#stubExample} ---- +The bridge supports deadline automatic cancellation: when a gRPC request carrying a timeout is received, a deadline is associated with the `io.grpc.Context` an can be obtained from the current context. This deadline automatically cancels the request in progress when its associated timeout fires. + === Message level API The server provides a message level API to interact directly with protobuf encoded gRPC messages. diff --git a/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java b/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java index 8db0c974..3b67a8b0 100644 --- a/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java +++ b/vertx-grpc-server/src/main/generated/io/vertx/grpc/server/GrpcServerOptionsConverter.java @@ -25,6 +25,16 @@ static void fromJson(Iterable> json, GrpcSer obj.setGrpcWebEnabled((Boolean)member.getValue()); } break; + case "scheduleDeadlineAutomatically": + if (member.getValue() instanceof Boolean) { + obj.setScheduleDeadlineAutomatically((Boolean)member.getValue()); + } + break; + case "deadlinePropagation": + if (member.getValue() instanceof Boolean) { + obj.setDeadlinePropagation((Boolean)member.getValue()); + } + break; } } } @@ -35,5 +45,7 @@ static void toJson(GrpcServerOptions obj, JsonObject json) { static void toJson(GrpcServerOptions obj, java.util.Map json) { json.put("grpcWebEnabled", obj.isGrpcWebEnabled()); + json.put("scheduleDeadlineAutomatically", obj.getScheduleDeadlineAutomatically()); + json.put("deadlinePropagation", obj.getDeadlinePropagation()); } } diff --git a/vertx-grpc-server/src/main/java/examples/GrpcServerExamples.java b/vertx-grpc-server/src/main/java/examples/GrpcServerExamples.java index 821bffed..1d491e0c 100644 --- a/vertx-grpc-server/src/main/java/examples/GrpcServerExamples.java +++ b/vertx-grpc-server/src/main/java/examples/GrpcServerExamples.java @@ -11,10 +11,7 @@ import io.vertx.grpc.common.GrpcMessage; import io.vertx.grpc.common.GrpcStatus; import io.vertx.grpc.common.ServiceName; -import io.vertx.grpc.server.GrpcServer; -import io.vertx.grpc.server.GrpcServerRequest; -import io.vertx.grpc.server.GrpcServerResponse; -import io.vertx.grpc.server.GrpcServiceBridge; +import io.vertx.grpc.server.*; @Source public class GrpcServerExamples { @@ -116,6 +113,22 @@ public void responseFlowControl(GrpcServerResponse response, Item i } } + public void checkTimeout(GrpcServerRequest request) { + + long timeout = request.timeout(); + + if (timeout > 0L) { + // A timeout has been received + } + } + + public void deadlineConfiguration(Vertx vertx) { + GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions() + .setScheduleDeadlineAutomatically(true) + .setDeadlinePropagation(true) + ); + } + public void responseCompression(GrpcServerResponse response) { response.encoding("gzip"); diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java index 2a45776a..2830c365 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerOptions.java @@ -24,17 +24,31 @@ public class GrpcServerOptions { /** - * Whether the gRPC-Web protocol should be enabled, by default = true. + * Whether the gRPC-Web protocol should be enabled, by default = {@code true}. */ public static final boolean DEFAULT_GRPC_WEB_ENABLED = true; + /** + * Whether the server schedule deadline automatically when a request carrying a timeout is received, by default = {@code false} + */ + public static final boolean DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY = false; + + /** + * Whether the server propagates a deadline, by default = {@code false} + */ + public static final boolean DEFAULT_PROPAGATE_DEADLINE = false; + private boolean grpcWebEnabled; + private boolean scheduleDeadlineAutomatically; + private boolean deadlinePropagation; /** * Default options. */ public GrpcServerOptions() { grpcWebEnabled = DEFAULT_GRPC_WEB_ENABLED; + scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY; + deadlinePropagation = DEFAULT_PROPAGATE_DEADLINE; } /** @@ -42,6 +56,8 @@ public GrpcServerOptions() { */ public GrpcServerOptions(GrpcServerOptions other) { grpcWebEnabled = other.grpcWebEnabled; + scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically; + deadlinePropagation = other.deadlinePropagation; } /** @@ -70,6 +86,50 @@ public GrpcServerOptions setGrpcWebEnabled(boolean grpcWebEnabled) { return this; } + /** + * @return whether the server will automatically schedule a deadline when a request carrying a timeout is received. + */ + public boolean getScheduleDeadlineAutomatically() { + return scheduleDeadlineAutomatically; + } + + /** + *

Set whether a deadline is automatically scheduled when a request carrying a timeout is received.

+ *
    + *
  • When a deadline is automatically scheduled and a request carrying a timeout is received, a deadline (timer) + * will be created to cancel the request when the response has not been timely sent. The deadline can be obtained + * with {@link GrpcServerRequest#deadline()}.
  • + *
  • When the deadline is not set and a request carrying a timeout is received, the timeout is available with {@link GrpcServerRequest#timeout()} + * and it is the responsibility of the service to eventually cancel the request. Note: the client might cancel the request as well when its local + * deadline is met.
  • + *
+ * + * @param scheduleDeadlineAutomatically whether to schedule a deadline automatically + * @return a reference to this, so the API can be used fluently + */ + public GrpcServerOptions setScheduleDeadlineAutomatically(boolean scheduleDeadlineAutomatically) { + this.scheduleDeadlineAutomatically = scheduleDeadlineAutomatically; + return this; + } + + /** + * @return whether the server propagate deadlines to {@code io.vertx.grpc.client.GrpcClientRequest}. + */ + public boolean getDeadlinePropagation() { + return deadlinePropagation; + } + + /** + * Set whether the server propagate deadlines to {@code io.vertx.grpc.client.GrpcClientRequest}. + * + * @param deadlinePropagation the propagation setting + * @return a reference to this, so the API can be used fluently + */ + public GrpcServerOptions setDeadlinePropagation(boolean deadlinePropagation) { + this.deadlinePropagation = deadlinePropagation; + return this; + } + /** * @return a JSON representation of options */ @@ -81,8 +141,6 @@ public JsonObject toJson() { @Override public String toString() { - return "GrpcServerOptions{" + - "grpcWebEnabled=" + grpcWebEnabled + - '}'; + return toJson().encode(); } } diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java index 5c9fcb04..a93307f1 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/GrpcServerRequest.java @@ -22,6 +22,8 @@ import io.vertx.grpc.common.GrpcReadStream; import io.vertx.grpc.common.ServiceName; +import java.time.Instant; + @VertxGen public interface GrpcServerRequest extends GrpcReadStream { @@ -86,8 +88,8 @@ public interface GrpcServerRequest extends GrpcReadStream { long timeout(); /** - * Schedule a deadline based on the current request timeout. + * @return the request deadline or {@code null} when no deadline has been scheduled */ - Timer scheduleDeadline(); + Timer deadline(); } diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java index 2f78f9f6..b7d4da21 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerImpl.java @@ -10,7 +10,6 @@ */ package io.vertx.grpc.server.impl; -import io.grpc.Context; import io.grpc.MethodDescriptor; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -21,20 +20,18 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.spi.context.storage.AccessMode; import io.vertx.grpc.common.GrpcMediaType; import io.vertx.grpc.common.GrpcMessageDecoder; import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.impl.GrpcRequestLocal; import io.vertx.grpc.common.impl.GrpcMethodCall; -import io.vertx.grpc.common.impl.VertxScheduledExecutorService; import io.vertx.grpc.server.GrpcServer; import io.vertx.grpc.server.GrpcServerOptions; import io.vertx.grpc.server.GrpcServerRequest; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.Objects; import static io.vertx.core.http.HttpHeaders.CONTENT_TYPE; @@ -44,21 +41,6 @@ */ public class GrpcServerImpl implements GrpcServer { - private static final Pattern TIMEOUT_PATTERN = Pattern.compile("([0-9]{1,8})([HMSmun])"); - - private static final Map TIMEOUT_MAPPING; - - static { - Map timeoutMapping = new HashMap<>(); - timeoutMapping.put("H", TimeUnit.HOURS); - timeoutMapping.put("M", TimeUnit.MINUTES); - timeoutMapping.put("S", TimeUnit.SECONDS); - timeoutMapping.put("m", TimeUnit.MILLISECONDS); - timeoutMapping.put("u", TimeUnit.MICROSECONDS); - timeoutMapping.put("n", TimeUnit.NANOSECONDS); - TIMEOUT_MAPPING = timeoutMapping; - } - private static final Logger log = LoggerFactory.getLogger(GrpcServer.class); private final GrpcServerOptions options; @@ -115,22 +97,16 @@ private void handle(HttpServerRequest httpRequest, GrpcMessageEncoder messageEncoder, Handler> handler) { io.vertx.core.impl.ContextInternal context = (ContextInternal) ((HttpServerRequestInternal) httpRequest).context(); - GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(context, httpRequest, messageDecoder, messageEncoder, methodCall); + GrpcServerRequestImpl grpcRequest = new GrpcServerRequestImpl<>(context, options.getScheduleDeadlineAutomatically(), + httpRequest, messageDecoder, messageEncoder, methodCall); + if (options.getDeadlinePropagation() && grpcRequest.timeout() > 0L) { + long deadline = System.currentTimeMillis() + grpcRequest.timeout; + context.putLocal(GrpcRequestLocal.CONTEXT_LOCAL_KEY, AccessMode.CONCURRENT, new GrpcRequestLocal(deadline)); + } grpcRequest.init(); context.dispatch(grpcRequest, handler); } - private static long parseTimeout(String timeout) { - Matcher matcher = TIMEOUT_PATTERN.matcher(timeout); - if (matcher.matches()) { - long value = Long.parseLong(matcher.group(1)); - TimeUnit unit = TIMEOUT_MAPPING.get(matcher.group(2)); - return unit.toMillis(value); - } else { - return 0L; - } - } - public GrpcServer callHandler(Handler> handler) { this.requestHandler = handler; return this; diff --git a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java index 6fafde5e..51a3572f 100644 --- a/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java +++ b/vertx-grpc-server/src/main/java/io/vertx/grpc/server/impl/GrpcServerRequestImpl.java @@ -10,11 +10,9 @@ */ package io.vertx.grpc.server.impl; -import io.grpc.Context; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.base64.Base64; -import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.Timer; @@ -23,14 +21,13 @@ import io.vertx.core.http.HttpConnection; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpVersion; -import io.vertx.core.http.impl.HttpServerRequestInternal; import io.vertx.grpc.common.*; import io.vertx.grpc.common.impl.GrpcReadStreamBase; import io.vertx.grpc.common.impl.GrpcMethodCall; -import io.vertx.grpc.common.impl.GrpcReadStreamBase; import io.vertx.grpc.server.GrpcServerRequest; import io.vertx.grpc.server.GrpcServerResponse; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -77,12 +74,15 @@ private static long parseTimeout(String timeout) { final HttpServerRequest httpRequest; final GrpcServerResponseImpl response; final long timeout; + final boolean scheduleDeadline; private final GrpcMethodCall methodCall; private BufferInternal grpcWebTextBuffer; - private Timer timeoutTimer; - private boolean useTimeout; + private Timer deadline; - public GrpcServerRequestImpl(io.vertx.core.impl.ContextInternal context, HttpServerRequest httpRequest, GrpcMessageDecoder messageDecoder, GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) { + public GrpcServerRequestImpl(io.vertx.core.impl.ContextInternal context, + boolean scheduleDeadline, + HttpServerRequest httpRequest, GrpcMessageDecoder messageDecoder, + GrpcMessageEncoder messageEncoder, GrpcMethodCall methodCall) { super(context, httpRequest, httpRequest.headers().get("grpc-encoding"), messageDecoder); String timeoutHeader = httpRequest.getHeader("grpc-timeout"); long timeout = timeoutHeader != null ? parseTimeout(timeoutHeader) : 0L; @@ -104,6 +104,7 @@ public GrpcServerRequestImpl(io.vertx.core.impl.ContextInternal context, HttpSer this.httpRequest = httpRequest; this.response = response; this.methodCall = methodCall; + this.scheduleDeadline = scheduleDeadline; if (httpRequest.version() != HttpVersion.HTTP_2 && GrpcMediaType.isGrpcWebText(httpRequest.getHeader(CONTENT_TYPE))) { grpcWebTextBuffer = EMPTY_BUFFER; } else { @@ -111,10 +112,24 @@ public GrpcServerRequestImpl(io.vertx.core.impl.ContextInternal context, HttpSer } } + @Override + public void init() { + super.init(); + if (timeout > 0L) { + if (scheduleDeadline) { + Timer timer = context.timer(timeout, TimeUnit.MILLISECONDS); + deadline = timer; + timer.onSuccess(v -> { + response.handleTimeout(); + }); + } + } + } + void cancelTimeout() { - Timer timer = timeoutTimer; + Timer timer = deadline; if (timer != null) { - timeoutTimer = null; + deadline = null; timer.cancel(); } } @@ -176,16 +191,8 @@ public long timeout() { } @Override - public Timer scheduleDeadline() { - if (timeout > 0L && timeoutTimer ==null) { - Timer timer = context.timer(timeout, TimeUnit.MILLISECONDS); - timeoutTimer = timer; - timer.onSuccess(v -> { - response.handleTimeout(); - }); - return timer; - } - throw new IllegalStateException(); + public Timer deadline() { + return deadline; } @Override diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java index fe520e1f..118e7ce2 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerBridgeTest.java @@ -21,17 +21,10 @@ import io.grpc.protobuf.StatusProto; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; -import io.vertx.core.http.*; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; -import io.vertx.grpc.common.GrpcMessage; -import io.vertx.grpc.common.GrpcMessageEncoder; -import io.vertx.grpc.common.GrpcStatus; -import io.vertx.grpc.common.impl.GrpcMessageImpl; -import org.junit.Ignore; import org.junit.Test; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -433,47 +426,6 @@ public void sayHello(HelloRequest request, StreamObserver responseOb super.testTimeoutOnServerBeforeSendingResponse(should); } - @Ignore - @Test - public void testDeadline(TestContext should) { - - GreeterGrpc.GreeterImplBase impl = new GreeterGrpc.GreeterImplBase() { - @Override - public void sayHello(HelloRequest request, StreamObserver responseObserver) { - System.out.println("HELLO"); - } - }; - - GrpcServer server = GrpcServer.server(vertx); - GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(impl); - serverStub.bind(server); - startServer(server); - - HttpClient client = vertx.createHttpClient(new HttpClientOptions() - .setHttp2ClearTextUpgrade(false) - .setProtocolVersion(HttpVersion.HTTP_2)); - Async async = should.async(); - client.request(HttpMethod.POST, port, "localhost", "/helloworld.Greeter/SayHello") - .onComplete(should.asyncAssertSuccess(req -> { - req.putHeader(HttpHeaders.CONTENT_TYPE, "application/grpc"); - req.putHeader("grpc-timeout", TimeUnit.SECONDS.toMillis(1) + "m"); - GrpcMessageEncoder encoder = GrpcMessageEncoder.marshaller(GreeterGrpc.getSayHelloMethod().getRequestMarshaller()); - GrpcMessage msg = encoder.encode(HelloRequest.newBuilder().setName("test").build()); - req.end(GrpcMessageImpl.encode(msg)); -// req.response().onComplete(should.asyncAssertSuccess(resp -> { -// resp.endHandler(v -> { -// String status = resp.getTrailer("grpc-status"); -// should.assertEquals(String.valueOf(GrpcStatus.OK.code), status); -// req.exceptionHandler(err -> { -// async.complete(); -// }); -// }); -// })); - })); - - async.awaitSuccess(); - } - @Test public void testCallAttributes(TestContext should) { diff --git a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerRequestTest.java b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerRequestTest.java index 7591e43f..43b9e923 100644 --- a/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerRequestTest.java +++ b/vertx-grpc-server/src/test/java/io/vertx/grpc/server/ServerRequestTest.java @@ -10,7 +10,6 @@ */ package io.vertx.grpc.server; -import com.google.protobuf.Descriptors; import io.grpc.*; import io.grpc.examples.helloworld.GreeterGrpc; import io.grpc.examples.helloworld.HelloReply; @@ -21,14 +20,15 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.StreamObserver; import io.vertx.core.MultiMap; -import io.vertx.core.buffer.Buffer; +import io.vertx.core.Timer; import io.vertx.core.http.*; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.SelfSignedCertificate; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.grpc.common.GrpcError; -import io.vertx.grpc.common.GrpcMessageEncoder; import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.impl.GrpcRequestLocal; import org.junit.Test; import java.io.File; @@ -359,9 +359,11 @@ public void testTimeoutPropagation(TestContext should) { @Test public void testTimeoutOnServerBeforeSendingResponse(TestContext should) throws Exception { Async async = should.async(); - startServer(GrpcServer.server(vertx).callHandler(GreeterGrpc.getSayHelloMethod(), call -> { + startServer(GrpcServer.server(vertx, new GrpcServerOptions().setScheduleDeadlineAutomatically(true)).callHandler(GreeterGrpc.getSayHelloMethod(), call -> { should.assertTrue(call.timeout() > 0L); - call.scheduleDeadline(); + Timer deadline = call.deadline(); + should.assertNotNull(deadline); + should.assertTrue(deadline.getDelay(TimeUnit.MILLISECONDS) > 0L); GrpcServerResponse response = call.response(); async.complete(); })); @@ -371,9 +373,8 @@ public void testTimeoutOnServerBeforeSendingResponse(TestContext should) throws @Test public void testTimeoutOnServerAfterSendingResponse(TestContext should) throws Exception { - startServer(GrpcServer.server(vertx).callHandler(GreeterGrpc.getSayHelloMethod(), call -> { + startServer(GrpcServer.server(vertx, new GrpcServerOptions().setScheduleDeadlineAutomatically(true)).callHandler(GreeterGrpc.getSayHelloMethod(), call -> { GrpcServerResponse response = call.response(); - call.scheduleDeadline(); response.end(); })); @@ -396,4 +397,32 @@ public void testTimeoutOnServerAfterSendingResponse(TestContext should) throws E async.awaitSuccess(); } + + @Test + public void testTimeoutPropagationOnServer(TestContext should) throws Exception { + startServer(GrpcServer.server(vertx, new GrpcServerOptions().setDeadlinePropagation(true)).callHandler(GreeterGrpc.getSayHelloMethod(), call -> { + GrpcServerResponse response = call.response(); + GrpcRequestLocal local = ((ContextInternal)vertx.getOrCreateContext()).getLocal(GrpcRequestLocal.CONTEXT_LOCAL_KEY); + should.assertNotNull(local); + should.assertTrue(local.deadline - System.currentTimeMillis() > 8000); + response.end(); + })); + + HttpClient client = vertx.createHttpClient(new HttpClientOptions() + .setHttp2ClearTextUpgrade(false) + .setProtocolVersion(HttpVersion.HTTP_2)); + Async async = should.async(); + client.request(HttpMethod.POST, port, "localhost", "/helloworld.Greeter/SayHello") + .onComplete(should.asyncAssertSuccess(req -> { + req.putHeader("grpc-timeout", "10S"); + req.response().onComplete(should.asyncAssertSuccess(resp -> { + resp.endHandler(v -> { + async.complete(); + }); + })); + req.sendHead(); + })); + + async.awaitSuccess(); + } }