Skip to content

Commit

Permalink
Add idle timeout support to HTTP client request
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 14, 2023
1 parent 1b95677 commit 727efa1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ public interface GrpcClientRequest<Req, Resp> extends GrpcWriteStream<Req> {
@Override
GrpcClientRequest<Req, Resp> drainHandler(@Nullable Handler<Void> handler);

/**
* Sets the amount of time after which, if the request does not return any data within the timeout period,
* the request/response is cancelled and the related futures.
*
* @param timeout the amount of time in milliseconds.
* @return a reference to this, so the API can be used fluently
*/
@Fluent
GrpcClientRequest<Req, Resp> idleTimeout(long timeout);

/**
* @return the underlying HTTP connection
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public GrpcClientRequest<Req, Resp> drainHandler(Handler<Void> handler) {
return this;
}

@Override
public GrpcClientRequest<Req, Resp> idleTimeout(long timeout) {
httpRequest.idleTimeout(timeout);
return this;
}

@Override public Future<Void> writeMessage(GrpcMessage message) {
return writeMessage(message, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,49 @@ public void onCompleted() {
}));
}

@Test
public void testIdleTimeout(TestContext should) throws Exception {

CompletableFuture<Void> cf = new CompletableFuture<>();

Async done = should.async(2);
startServer(new StreamingGrpc.StreamingImplBase() {
@Override
public StreamObserver<Item> sink(StreamObserver<Empty> responseObserver) {
return new StreamObserver<Item>() {
@Override
public void onNext(Item item) {
cf.complete(null);
}
@Override
public void onError(Throwable t) {
should.assertEquals(StatusRuntimeException.class, t.getClass());
StatusRuntimeException sre = (StatusRuntimeException) t;
should.assertEquals(Status.CANCELLED.getCode(), sre.getStatus().getCode());
done.countDown();
}
@Override
public void onCompleted() {
}
};
}
});

client = GrpcClient.client(vertx);
client.request(SocketAddress.inetSocketAddress(port, "localhost"), StreamingGrpc.getSinkMethod())
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.write(Item.getDefaultInstance());
cf.whenComplete((v, t) -> {
long now = System.currentTimeMillis();
callRequest.idleTimeout(1000);
callRequest.exceptionHandler(err -> {
should.assertTrue(System.currentTimeMillis() - now >= 1000);
done.countDown();
});
});
}));
}

@Test
public void testCall(TestContext should) throws IOException {

Expand Down

0 comments on commit 727efa1

Please sign in to comment.