Skip to content

Commit

Permalink
Merge pull request #138 from elandau/grpc_dont_ignore_on_cancel
Browse files Browse the repository at this point in the history
Don't onIgnore in gRPC cancel
  • Loading branch information
elandau authored Jul 16, 2019
2 parents 75ad03d + 19fa88a commit 9414389
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package com.netflix.concurrency.limits.grpc.server;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.internal.Preconditions;
import io.grpc.Context;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
Expand All @@ -30,8 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -51,12 +47,6 @@ public class ConcurrencyLimitServerInterceptor implements ServerInterceptor {

private Supplier<Metadata> trailerSupplier;

private static final Executor executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("concurrency-limit-cleanup-%d")
.build());

public static class Builder {
private Supplier<Status> statusSupplier = () -> LIMIT_EXCEEDED_STATUS;
private Supplier<Metadata> trailerSupplier = Metadata::new;
Expand Down Expand Up @@ -158,8 +148,6 @@ void safeComplete(Runnable action) {

@Override
public Listener<ReqT> apply(Limiter.Listener listener) {
Context.current().addListener(context -> safeComplete(listener::onIgnore), executor);

final Listener<ReqT> delegate;

try {
Expand Down Expand Up @@ -214,15 +202,6 @@ public void onHalfClose() {
throw t;
}
}

@Override
public void onCancel() {
try {
super.onCancel();
} finally {
safeComplete(listener::onIgnore);
}
}
};
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,33 @@ public void releaseOnUncaughtException() throws IOException {
@Test
public void releaseOnCancellation() {
// Setup server
startServer((req, observer) -> {});
startServer((req, observer) -> {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
observer.onNext("delayed_response");
observer.onCompleted();
});

ListenableFuture<String> future = ClientCalls.futureUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT), "foo");
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
future.cancel(true);

// Verify
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore();
Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore();

verifyCounts(0, 1, 0, 0);
Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess();

verifyCounts(0, 0, 1, 0);
}

@Test
public void releaseOnDeadlineExceeded() {
// Setup server
startServer((req, observer) -> {});
startServer((req, observer) -> {
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
observer.onNext("delayed_response");
observer.onCompleted();
});

try {
ClientCalls.blockingUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS)), "foo");
Expand All @@ -182,28 +193,11 @@ public void releaseOnDeadlineExceeded() {
}
// Verify
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore();

verifyCounts(0, 1, 0, 0);
}
Mockito.verify(listener.getResult().get(), Mockito.times(0)).onIgnore();

@Test
public void releaseOnMissingHalfClose() {
// Setup server
startServer((req, observer) -> {});

ClientCall<String, String> call = channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withDeadlineAfter(500, TimeUnit.MILLISECONDS));
call.start(new ClientCall.Listener<String>() {}, new Metadata());
call.request(2);
call.sendMessage("foo");

Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
Mockito.verify(listener.getResult().get(), Mockito.timeout(2000).times(1)).onSuccess();

// Verify
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener.getResult().get(), Mockito.timeout(1000).times(1)).onIgnore();

verifyCounts(0, 1, 0, 0);
verifyCounts(0, 0, 1, 0);
}

public void verifyCounts(int dropped, int ignored, int success, int rejected) {
Expand Down

0 comments on commit 9414389

Please sign in to comment.