diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java index cafe685a..0796b651 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java @@ -31,34 +31,6 @@ public abstract class AbstractLimiter implements Limiter { public static final String ID_TAG = "id"; public static final String STATUS_TAG = "status"; - /** - * Constructs a new builder with a list of bypass resolvers. - * If the predicate condition in any of the resolver is satisfied, - * the call is bypassed without increasing the limiter inflight count - * and affecting the algorithm. - */ - public abstract static class BypassLimiterBuilder, ContextT> extends Builder { - - private final Predicate ALWAYS_FALSE = (context) -> false; - private Predicate bypassResolver = ALWAYS_FALSE; - - /** - * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the - * predicate condition returns true the call is bypassed without increasing the limiter inflight count and - * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. - * @param shouldBypass Predicate condition to bypass limit - * @return Chainable builder - */ - public BuilderT bypassLimitResolver(Predicate shouldBypass) { - if (this.bypassResolver == ALWAYS_FALSE) { - this.bypassResolver = shouldBypass; - } else { - this.bypassResolver = bypassResolver.or(shouldBypass); - } - return self(); - } - } - public abstract static class Builder> { private static final AtomicInteger idCounter = new AtomicInteger(); @@ -68,6 +40,9 @@ public abstract static class Builder> { protected String name = "unnamed-" + idCounter.incrementAndGet(); protected MetricRegistry registry = EmptyMetricRegistry.INSTANCE; + private final Predicate ALWAYS_FALSE = (context) -> false; + private Predicate bypassResolver = ALWAYS_FALSE; + public BuilderT named(String name) { this.name = name; return self(); @@ -89,6 +64,26 @@ public BuilderT metricRegistry(MetricRegistry registry) { } protected abstract BuilderT self(); + + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * + * Due to the builders not having access to the ContextT, it is the duty of subclasses to ensure that + * implementations are type safe. + * + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + protected final BuilderT bypassLimitResolverInternal(Predicate shouldBypass) { + if (this.bypassResolver == ALWAYS_FALSE) { + this.bypassResolver = (Predicate) shouldBypass; + } else { + this.bypassResolver = bypassResolver.or((Predicate) shouldBypass); + } + return self(); + } } private final AtomicInteger inFlight = new AtomicInteger(); @@ -99,7 +94,7 @@ public BuilderT metricRegistry(MetricRegistry registry) { private final MetricRegistry.Counter ignoredCounter; private final MetricRegistry.Counter rejectedCounter; private final MetricRegistry.Counter bypassCounter; - private Predicate bypassResolver = (context) -> false; + private final Predicate bypassResolver; private volatile int limit; @@ -108,9 +103,8 @@ protected AbstractLimiter(Builder builder) { this.limitAlgorithm = builder.limit; this.limit = limitAlgorithm.getLimit(); this.limitAlgorithm.notifyOnChange(this::onNewLimit); - if (builder instanceof BypassLimiterBuilder) { - this.bypassResolver = ((BypassLimiterBuilder) builder).bypassResolver; - } + this.bypassResolver = (Predicate) builder.bypassResolver; + builder.registry.gauge(MetricIds.LIMIT_NAME, this::getLimit); this.successCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "success"); this.droppedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "dropped"); diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java index 53b28940..1f0d0856 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java @@ -37,7 +37,7 @@ public abstract class AbstractPartitionedLimiter extends AbstractLimit private static final Logger LOG = LoggerFactory.getLogger(AbstractPartitionedLimiter.class); private static final String PARTITION_TAG_NAME = "partition"; - public abstract static class Builder, ContextT> extends AbstractLimiter.BypassLimiterBuilder { + public abstract static class Builder, ContextT> extends AbstractLimiter.Builder { private List> partitionResolvers = new ArrayList<>(); private final Map partitions = new LinkedHashMap<>(); private int maxDelayedThreads = 100; diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java index 6792e054..afd8acdc 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java @@ -23,18 +23,6 @@ import java.util.concurrent.Semaphore; public class SimpleLimiter extends AbstractLimiter { - - public static class BypassLimiterBuilder extends AbstractLimiter.BypassLimiterBuilder, ContextT> { - public SimpleLimiter build() { - return new SimpleLimiter<>(this); - } - - @Override - protected BypassLimiterBuilder self() { - return this; - } - } - public static class Builder extends AbstractLimiter.Builder { public SimpleLimiter build() { return new SimpleLimiter<>(this); @@ -46,10 +34,6 @@ protected Builder self() { } } - public static BypassLimiterBuilder newBypassLimiterBuilder() { - return new BypassLimiterBuilder<>(); - } - public static Builder newBuilder() { return new Builder(); } @@ -58,6 +42,7 @@ public static Builder newBuilder() { public SimpleLimiter(AbstractLimiter.Builder builder) { super(builder); + this.inflightDistribution = builder.registry.distribution(MetricIds.INFLIGHT_NAME); this.semaphore = new AdjustableSemaphore(getLimit()); } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java index b2a548ae..a827c6a3 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java @@ -173,7 +173,7 @@ public void testBypassPartitionedLimiter() { .partition("batch", 0.1) .partition("live", 0.9) .limit(FixedLimit.of(10)) - .bypassLimitResolver(new ShouldBypassPredicate()) + .bypassLimitResolverInternal(new ShouldBypassPredicate()) .build(); Assert.assertTrue(limiter.acquire("batch").isPresent()); @@ -200,7 +200,7 @@ public void testBypassSimpleLimiter() { SimpleLimiter limiter = (SimpleLimiter) TestPartitionedLimiter.newBuilder() .limit(FixedLimit.of(10)) - .bypassLimitResolver(new ShouldBypassPredicate()) + .bypassLimitResolverInternal(new ShouldBypassPredicate()) .build(); int inflightCount = 0; diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java index ebc80f83..cc8c2642 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/SimpleLimiterTest.java @@ -48,9 +48,9 @@ public void testReleaseLimit() { @Test public void testSimpleBypassLimiter() { - SimpleLimiter limiter = SimpleLimiter.newBypassLimiterBuilder() + SimpleLimiter limiter = SimpleLimiter.newBuilder() .limit(FixedLimit.of(10)) - .bypassLimitResolver((context) -> context.equals("admin")) + .bypassLimitResolverInternal((context) -> context.equals("admin")) .build(); for (int i = 0; i < 10; i++) { @@ -68,7 +68,7 @@ public void testSimpleBypassLimiter() { @Test public void testSimpleBypassLimiterDefault() { - SimpleLimiter limiter = SimpleLimiter.newBypassLimiterBuilder() + SimpleLimiter limiter = SimpleLimiter.newBuilder() .limit(FixedLimit.of(10)) .build(); diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java index da084cbc..5beff578 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java @@ -19,6 +19,7 @@ import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; import com.netflix.concurrency.limits.limiter.BlockingLimiter; import io.grpc.CallOptions; +import java.util.function.Predicate; /** * Builder to simplify creating a {@link Limiter} specific to GRPC clients. @@ -34,6 +35,18 @@ public GrpcClientLimiterBuilder partitionByCallOption(CallOptions.Key op return partitionResolver(context -> context.getCallOptions().getOption(option)); } + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + public GrpcClientLimiterBuilder bypassLimitResolver(Predicate shouldBypass) { + return bypassLimitResolverInternal(shouldBypass); + } + /** * Bypass limit if the request's full method name matches the specified gRPC method's full name. * @param fullMethodName The full method name to check against the {@link GrpcClientRequestContext}'s method. diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java index 21dad243..c82ffc6c 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java @@ -18,6 +18,7 @@ import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; import io.grpc.Attributes; import io.grpc.Metadata; +import java.util.function.Predicate; public class GrpcServerLimiterBuilder extends AbstractPartitionedLimiter.Builder { /** @@ -44,6 +45,19 @@ public GrpcServerLimiterBuilder partitionByAttribute(Attributes.Key attr return partitionResolver(context -> context.getCall().getAttributes().get(attribute)); } + + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + public GrpcServerLimiterBuilder bypassLimitResolver(Predicate shouldBypass) { + return bypassLimitResolverInternal(shouldBypass); + } + /** * Bypass limit if the request's full method name matches the specified gRPC method's full name. * @param fullMethodName The full method name to check against the {@link GrpcServerRequestContext}'s method. diff --git a/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java b/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java index c9d6fe3d..f400bb51 100644 --- a/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet-jakarta/src/main/java/com/netflix/concurrency/limits/servlet/jakarta/ServletLimiterBuilder.java @@ -22,6 +22,7 @@ import java.security.Principal; import java.util.Optional; import java.util.function.Function; +import java.util.function.Predicate; /** * Builder to simplify creating a {@link Limiter} specific to a Servlet filter. By default, @@ -73,6 +74,18 @@ public ServletLimiterBuilder partitionByPathInfo(Function pathTo return partitionResolver(request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null)); } + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitResolver(Predicate shouldBypass) { + return bypassLimitResolverInternal(shouldBypass); + } + /** * Bypass limit if the value of the provided header name matches the specified value. * @param name The name of the header to check. diff --git a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java index 6bc07d15..1496e785 100644 --- a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java @@ -18,6 +18,7 @@ import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; +import java.util.function.Predicate; import javax.servlet.http.HttpServletRequest; import java.security.Principal; import java.util.Optional; @@ -73,6 +74,18 @@ public ServletLimiterBuilder partitionByPathInfo(Function pathTo return partitionResolver(request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null)); } + /** + * Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the + * predicate condition returns true the call is bypassed without increasing the limiter inflight count and + * affecting the algorithm. Will not bypass any calls by default if no resolvers are added. + * + * @param shouldBypass Predicate condition to bypass limit + * @return Chainable builder + */ + public ServletLimiterBuilder bypassLimitResolver(Predicate shouldBypass) { + return bypassLimitResolverInternal(shouldBypass); + } + /** * Bypass the limit if the value of the provided header name matches the specified value. * @param name The name of the header to check.