-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement grpc-timeout and deadlines.
- Loading branch information
Showing
18 changed files
with
585 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ | |
*/ | ||
package io.vertx.grpc.client.impl; | ||
|
||
import io.vertx.core.AsyncResult; | ||
import io.grpc.Context; | ||
import io.vertx.core.Future; | ||
import io.vertx.core.Handler; | ||
import io.vertx.core.MultiMap; | ||
|
@@ -19,10 +19,11 @@ | |
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import io.vertx.core.http.HttpConnection; | ||
import io.vertx.core.impl.ContextInternal; | ||
import io.vertx.core.impl.future.FutureInternal; | ||
import io.vertx.core.impl.future.PromiseInternal; | ||
import io.vertx.grpc.client.GrpcClientRequest; | ||
import io.vertx.grpc.client.GrpcClientResponse; | ||
import io.vertx.grpc.common.CodecException; | ||
|
@@ -32,12 +33,14 @@ | |
import io.vertx.grpc.common.GrpcMessageEncoder; | ||
import io.vertx.grpc.common.ServiceName; | ||
import io.vertx.grpc.common.impl.GrpcMessageImpl; | ||
import io.vertx.grpc.common.impl.VertxScheduledExecutorService; | ||
|
||
/** | ||
* @author <a href="mailto:[email protected]">Julien Viet</a> | ||
*/ | ||
public class GrpcClientRequestImpl<Req, Resp> implements GrpcClientRequest<Req, Resp> { | ||
|
||
private final ContextInternal context; | ||
private final HttpClientRequest httpRequest; | ||
private final GrpcMessageEncoder<Req> messageEncoder; | ||
private ServiceName serviceName; | ||
|
@@ -48,16 +51,21 @@ public class GrpcClientRequestImpl<Req, Resp> implements GrpcClientRequest<Req, | |
boolean trailersSent; | ||
private Future<GrpcClientResponse<Req, Resp>> response; | ||
private MultiMap headers; | ||
private long timeout; | ||
private TimeUnit timeoutUnit; | ||
private io.grpc.Context grpcContext; | ||
|
||
public GrpcClientRequestImpl(HttpClientRequest httpRequest, GrpcMessageEncoder<Req> messageEncoder, GrpcMessageDecoder<Resp> messageDecoder) { | ||
|
||
this.context = ((PromiseInternal<?>)httpRequest.response()).context(); | ||
this.httpRequest = httpRequest; | ||
this.messageEncoder = messageEncoder; | ||
this.response = httpRequest.response().map(httpResponse -> { | ||
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(this, httpResponse, messageDecoder); | ||
grpcResponse.init(); | ||
return grpcResponse; | ||
}); | ||
this.response = httpRequest | ||
.response() | ||
.map(httpResponse -> { | ||
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, grpcContext, this, httpResponse, messageDecoder); | ||
grpcResponse.init(); | ||
return grpcResponse; | ||
}); | ||
} | ||
|
||
@Override | ||
|
@@ -126,6 +134,19 @@ public GrpcClientRequest<Req, Resp> drainHandler(Handler<Void> handler) { | |
return this; | ||
} | ||
|
||
@Override | ||
public GrpcClientRequest<Req, Resp> timeout(long timeout, TimeUnit unit) { | ||
if (timeout < 0L) { | ||
throw new IllegalArgumentException("Timeout must be positive"); | ||
} | ||
if (headersSent) { | ||
throw new IllegalStateException("Timeout must be set before sending request headers"); | ||
} | ||
this.timeoutUnit = Objects.requireNonNull(unit); | ||
this.timeout = timeout; | ||
return this; | ||
} | ||
|
||
@Override | ||
public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) { | ||
httpRequest.idleTimeout(timeout); | ||
|
@@ -198,6 +219,9 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) { | |
requestHeaders.add(header.getKey(), header.getValue()); | ||
} | ||
} | ||
if (timeout > 0L) { | ||
httpRequest.putHeader("grpc-timeout", timeoutUnit.toMicros(timeout) + "u"); | ||
} | ||
String uri = serviceName.pathOf(methodName); | ||
httpRequest.putHeader("content-type", "application/grpc"); | ||
if (encoding != null) { | ||
|
@@ -209,7 +233,15 @@ private Future<Void> writeMessage(GrpcMessage message, boolean end) { | |
httpRequest.setURI(uri); | ||
headersSent = true; | ||
} | ||
|
||
if (timeout > 0L) { | ||
Context.CancellableContext deadlineContext = Context.current().withDeadlineAfter(timeout, timeoutUnit, new VertxScheduledExecutorService(context)); | ||
deadlineContext.addListener(ctx_ -> { | ||
cancel(); | ||
}, Runnable::run); | ||
grpcContext = deadlineContext; | ||
} else { | ||
grpcContext = Context.current(); | ||
} | ||
if (end) { | ||
trailersSent = true; | ||
return httpRequest.end(GrpcMessageImpl.encode(message)); | ||
|
@@ -238,9 +270,7 @@ public void cancel() { | |
return; | ||
} | ||
cancelled = true; | ||
// That's a bit convoluted, the reset API should be improved instead | ||
ContextInternal ctx = ((FutureInternal) (response)).context(); | ||
ctx.execute(() -> { | ||
context.execute(() -> { | ||
boolean responseEnded; | ||
if (response.failed()) { | ||
return; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.