Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout and deadline #78

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions vertx-grpc-client/src/main/asciidoc/client.adoc
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
== Vert.x gRPC Client

Vert.x gRPC Client is a new gRPC client powered by Vert.x HTTP client superseding the integrated Netty based gRPC client.
Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client.

This client provides a gRPC request/response oriented API as well as a the generated stub approach with a gRPC Channel
This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel

=== Using Vert.x gRPC Client

Expand Down Expand Up @@ -41,7 +41,7 @@ You can easily create the gRPC client

==== Request/response

Any interaction with a gRPC server involves creating a request to the remote gRPC service
Interacting with a gRPC server involves creating a request to the remote gRPC service

[source,java]
----
Expand All @@ -65,7 +65,7 @@ Future composition can combine all the previous steps together in a compact fash

==== Streaming request

A streaming request involves calling `{@link io.vertx.grpc.client.GrpcClientRequest#write}` for each element of the stream
Streaming requests involve calling `{@link io.vertx.grpc.client.GrpcClientRequest#write}` for each element of the stream
and using `{@link io.vertx.grpc.client.GrpcClientRequest#end()}` to end the stream

[source,java]
Expand All @@ -75,7 +75,7 @@ and using `{@link io.vertx.grpc.client.GrpcClientRequest#end()}` to end the stre

==== Streaming response

You can set handlers to process response events
You can set handlers to process response events of a streaming response

[source,java]
----
Expand Down Expand Up @@ -104,6 +104,26 @@ You can pause/resume/fetch a response
{@link examples.GrpcClientExamples#responseFlowControl}
----

=== Timeout and deadlines

The gRPC client handles timeout and deadlines, setting a timeout on a gRPC request instructs the client to send the timeout
information to make the server aware that the client desires a response within a defined time.

In addition, the client shall be configured to schedule a deadline: when a timeout is set on a request, the client schedules
locally a timer to cancel the request when the response has not been received in time.

[source,java]
----
{@link examples.GrpcClientExamples#requestWithDeadline}
----

The timeout can also be set on a per-request basis.

[source,java]
----
{@link examples.GrpcClientExamples#requestWithDeadline2}
----

=== Cancellation

You can call `{@link io.vertx.grpc.client.GrpcClientRequest#cancel}` to cancel a request
Expand All @@ -126,17 +146,28 @@ You can compress request messages by setting the request encoding *prior* before

=== Decompression

Decompression is done transparently by the client when the server send encoded responses.
Decompression is achieved transparently by the client when the server sends encoded responses.

=== Stub API

The Vert.x gRPC Client provides a gRPC channel to use with a generated client stub in a more traditional fashion

[source,java]
----
{@link examples.GrpcClientExamples#stubExample}
{@link examples.GrpcClientExamples#stub}
----

Timeout and deadlines are supported through the usual gRPC API.

[source,java]
----
{@link examples.GrpcClientExamples#stubWithDeadline}
----

Deadline are cascaded, e.g. when the current `io.grpc.Context` carries a deadline and the stub has no explicit deadline
set, the client automatically inherits the implicit deadline. Such deadline can be set when using a stub within a gRPC server
call.

=== Message level API

The client provides a message level API to interact directly with protobuf encoded gRPC messages.
Expand Down
46 changes: 40 additions & 6 deletions vertx-grpc-client/src/main/java/examples/GrpcClientExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.SocketAddress;
import io.vertx.docgen.Source;
import io.vertx.grpc.client.GrpcClient;
import io.vertx.grpc.client.GrpcClientChannel;
import io.vertx.grpc.client.GrpcClientRequest;
import io.vertx.grpc.client.GrpcClientResponse;
import io.vertx.grpc.client.*;
import io.vertx.grpc.common.GrpcMessage;
import io.vertx.grpc.common.ServiceName;

import java.util.concurrent.TimeUnit;

@Source
public class GrpcClientExamples {

Expand Down Expand Up @@ -122,26 +121,61 @@ public void requestCompression(GrpcClientRequest<Item, Empty> request) {
request.write(Item.newBuilder().setValue("item-3").build());
}

public void stubExample(GrpcClient client) {
public void stub(GrpcClient client) {

GrpcClientChannel channel = new GrpcClientChannel(client, SocketAddress.inetSocketAddress(443, "example.com"));

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), new StreamObserver<HelloReply>() {
StreamObserver<HelloReply> observer = new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply value) {
// Process response
}

@Override
public void onCompleted() {
// Done
}

@Override
public void onError(Throwable t) {
// Something went bad
}
};

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);
}

public void stubWithDeadline(GrpcClientChannel channel, StreamObserver<HelloReply> observer) {

GreeterGrpc.GreeterStub greeter = GreeterGrpc.newStub(channel).withDeadlineAfter(10, TimeUnit.SECONDS);

greeter.sayHello(HelloRequest.newBuilder().setName("Bob").build(), observer);
}

public void requestWithDeadline(Vertx vertx) {

// Set a 10 seconds timeout that will be sent to the gRPC service
// Let the client schedule a deadline
GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions()
.setTimeout(10)
.setTimeoutUnit(TimeUnit.SECONDS)
.setScheduleDeadlineAutomatically(true));
}

public void requestWithDeadline2(GrpcClient client, SocketAddress server, MethodDescriptor<HelloRequest, HelloReply> sayHelloMethod) {

Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {

request
// Given this request, set a 10 seconds timeout that will be sent to the gRPC service
.timeout(10, TimeUnit.SECONDS);

request.end(HelloRequest.newBuilder().setName("Bob").build());
});

}

public void protobufLevelAPI(GrpcClient client, Buffer protoHello, SocketAddress server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,28 @@ static GrpcClient client(Vertx vertx) {
return new GrpcClientImpl(vertx);
}

/**
* Create a client.
*
* @param vertx the vertx instance
* @return the created client
*/
static GrpcClient client(Vertx vertx, GrpcClientOptions options) {
return new GrpcClientImpl(vertx, options, new HttpClientOptions().setHttp2ClearTextUpgrade(false));
}

/**
* Create a client with the specified {@code options}.
*
* @param vertx the vertx instance
* @param grpcOptions the http client options
* @param httpOptions the http client options
* @return the created client
*/
static GrpcClient client(Vertx vertx, GrpcClientOptions grpcOptions, HttpClientOptions httpOptions) {
return new GrpcClientImpl(vertx, grpcOptions, httpOptions);
}

/**
* Create a client with the specified {@code options}.
*
Expand All @@ -60,7 +82,7 @@ static GrpcClient client(Vertx vertx) {
* @return the created client
*/
static GrpcClient client(Vertx vertx, HttpClientOptions options) {
return new GrpcClientImpl(vertx, options);
return new GrpcClientImpl(vertx, new GrpcClientOptions(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
*/
package io.vertx.grpc.client;

import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.*;
import io.vertx.core.net.SocketAddress;

import java.util.concurrent.Executor;
Expand All @@ -34,25 +30,25 @@ public GrpcClientChannel(GrpcClient client, SocketAddress server) {

@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {

String encoding = callOptions.getCompressor();

Compressor compressor;
if (encoding != null) {
compressor = CompressorRegistry.getDefaultInstance().lookupCompressor(encoding);
} else {
compressor = null;
}


Executor exec = callOptions.getExecutor();

return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor);
Context ctx = Context.current();
Deadline deadline = callOptions.getDeadline();
Deadline contextDeadline = ctx.getDeadline();
if (contextDeadline != null && (deadline == null || contextDeadline.isBefore(deadline))) {
deadline = contextDeadline;
}
return new VertxClientCall<>(client, server, exec, methodDescriptor, encoding, compressor, deadline);
}

@Override
public String authority() {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.client;

import io.vertx.codegen.annotations.DataObject;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Options configuring a gRPC client.
*/
@DataObject
public class GrpcClientOptions {

/**
* The default value for automatic deadline schedule = {@code false}.
*/
public static final boolean DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY = false;

/**
* The default value of the timeout = {@code 0} (no timeout).
*/
public static final int DEFAULT_TIMEOUT = 0;

/**
* The default value of the timeout unit = {@link TimeUnit#SECONDS}.
*/
public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;

private boolean scheduleDeadlineAutomatically;
private int timeout;
private TimeUnit timeoutUnit;

/**
* Default constructor.
*/
public GrpcClientOptions() {
scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY;
timeout = DEFAULT_TIMEOUT;
timeoutUnit = DEFAULT_TIMEOUT_UNIT;
}

/**
* Copy constructor.
*
* @param other the options to copy
*/
public GrpcClientOptions(GrpcClientOptions other) {
scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically;
timeout = other.timeout;
timeoutUnit = other.timeoutUnit;
}

/**
* @return whether the client will automatically schedule a deadline when a request carrying a timeout is sent.
*/
public boolean getScheduleDeadlineAutomatically() {
return scheduleDeadlineAutomatically;
}

/**
* <p>Set whether a deadline is automatically scheduled when a request carrying a timeout (either set explicitly or through this
* options instance) is sent.</p>
*
* <ul>
* <li>When the automatic deadline is set and a request carrying a timeout is sent, a deadline (timer) is created to cancel the request
* when the response has not been timely received. The deadline can be obtained with {@link GrpcClientRequest#deadline()}.</li>
* <li>When the deadline is not set and a request carrying a timeout is sent, the timeout is sent to the server and it remains the
* responsibility of the caller to eventually cancel the request. Note: the server might cancel the request as well when its local deadline is met.</li>
* </ul>
*
* @param handleDeadlineAutomatically whether to automatically set
* @return a reference to this, so the API can be used fluently
*/
public GrpcClientOptions setScheduleDeadlineAutomatically(boolean handleDeadlineAutomatically) {
this.scheduleDeadlineAutomatically = handleDeadlineAutomatically;
return this;
}

/**
* Return the default timeout set when sending gRPC requests, the initial value is {@code 0} which does not
* send a timeout.
*
* @return the default timeout.
*/
public int getTimeout() {
return timeout;
}

/**
* Set the default timeout set when sending gRPC requests, this value should be set along with {@link #setTimeoutUnit(TimeUnit)}.
*
* @param timeout the timeout value
* @return a reference to this, so the API can be used fluently
*/
public GrpcClientOptions setTimeout(int timeout) {
if (timeout < 0L) {
throw new IllegalArgumentException("Timeout value must be >= 0");
}
this.timeout = timeout;
return this;
}

/**
* @return the unit of time of the default timeout.
*/
public TimeUnit getTimeoutUnit() {
return timeoutUnit;
}

/**
* Set the unit of time of the default timeout value.
*
* @param timeoutUnit the unit of time
* @return a reference to this, so the API can be used fluently
*/
public GrpcClientOptions setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = Objects.requireNonNull(timeoutUnit);
return this;
}
}
Loading
Loading