diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java index ae6d3bbb..1f60116c 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java @@ -1,49 +1,29 @@ package com.netflix.concurrency.limits; +import java.util.function.Consumer; + /** * Contract for an algorithm that calculates a concurrency limit based on * rtt measurements */ public interface Limit { - /** - * Details of the current sample window - */ - interface SampleWindow { - /** - * @return Candidate RTT in the sample window. This is traditionally the minimum rtt. - */ - long getCandidateRttNanos(); - - /** - * @return Average RTT in the sample window. Excludes timeouts and dropped rtt. - */ - long getAverateRttNanos(); - - /** - * @return Maximum number of inflight observed during the sample window - */ - int getMaxInFlight(); - - /** - * @return Number of observed RTTs in the sample window - */ - int getSampleCount(); - - /** - * @return True if there was a timeout - */ - boolean didDrop(); - } - /** * @return Current estimated limit */ int getLimit(); - + + /** + * Register a callback to receive notification whenever the limit is updated to a new value + * @param consumer + */ + void notifyOnChange(Consumer consumer); + /** - * Update the concurrency limit using a new rtt sample - * - * @param sample Data from the last sampling window such as RTT + * Update the limiter with a sample + * @param startTime + * @param rtt + * @param inflight + * @param didDrop */ - void update(SampleWindow sample); + void onSample(long startTime, long rtt, int inflight, boolean didDrop); } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java index e370f9fe..fdc24a51 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java @@ -1,7 +1,5 @@ package com.netflix.concurrency.limits; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy; - import java.util.Optional; /** @@ -40,8 +38,7 @@ interface Listener { * If acquired the caller must call one of the Listener methods when the operation has been completed * to release the count. * - * @param context Context for the request. The context is used by advanced startegies such as - * {@link LookupPartitionStrategy}. + * @param context Context for the request * @return Optional.empty() if limit exceeded. */ Optional acquire(ContextT context); diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Strategy.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Strategy.java deleted file mode 100644 index 3975cc7f..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Strategy.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.netflix.concurrency.limits; - -/** - * Contract for enforcing a concurrency limit with optional partitioning - * of the limit. - * - * @param Context type used to partition the limit. Void if none. - */ -public interface Strategy { - /** - * Representation of a single acquired Token from the strategy. - */ - interface Token { - /** - * @return true if acquired or false if limit has been reached - */ - boolean isAcquired(); - - /** - * @return Get number of pending requests - */ - int getInFlightCount(); - - /** - * Release the acquired token and decrement the current inflight count. - */ - void release(); - - public static Token newNotAcquired(int inFlight) { - return new Token() { - @Override - public boolean isAcquired() { - return false; - } - - @Override - public int getInFlightCount() { - return inFlight; - } - - @Override - public void release() { - } - }; - } - - public static Token newAcquired(int inFlight, Runnable release) { - return new Token() { - @Override - public boolean isAcquired() { - return true; - } - - @Override - public int getInFlightCount() { - return inFlight; - } - - @Override - public void release() { - release.run(); - } - }; - } - } - - /** - * Try to acquire a token from the limiter. - * - * @param context Context of the request for partitioned limits - * @return Optional.empty() if limit exceeded or a {@link Token} that must be released when - * the operation completes. - */ - Token tryAcquire(ContextT context); - - /** - * Update the strategy with a new limit - * @param limit - */ - void setLimit(int limit); -} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java index 831f6b9e..f1689566 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java @@ -6,7 +6,7 @@ * Loss based dynamic {@link Limit} that does an additive increment as long as * there are no errors and a multiplicative decrement when there is an error. */ -public final class AIMDLimit implements Limit { +public final class AIMDLimit extends AbstractLimit { public static class Builder { private int initialLimit = 10; @@ -31,30 +31,26 @@ public static Builder newBuilder() { return new Builder(); } - private volatile int limit; private final double backoffRatio; private AIMDLimit(Builder builder) { - this.limit = builder.initialLimit; + super(builder.initialLimit); this.backoffRatio = builder.backoffRatio; } @Override - public int getLimit() { - return limit; - } - - @Override - public void update(SampleWindow sample) { - if (sample.didDrop()) { - limit = Math.max(1, Math.min(limit - 1, (int) (limit * backoffRatio))); - } else if (sample.getMaxInFlight() >= limit) { - limit = limit + 1; + protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { + if (didDrop) { + return Math.max(1, Math.min(getLimit() - 1, (int) (getLimit() * backoffRatio))); + } else if (inflight >= getLimit()) { + return getLimit() + 1; + } else { + return getLimit(); } } @Override public String toString() { - return "AIMDLimit [limit=" + limit + "]"; + return "AIMDLimit [limit=" + getLimit() + "]"; } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AbstractLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AbstractLimit.java new file mode 100644 index 00000000..07366cb1 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AbstractLimit.java @@ -0,0 +1,41 @@ +package com.netflix.concurrency.limits.limit; + +import com.netflix.concurrency.limits.Limit; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +public abstract class AbstractLimit implements Limit { + private volatile int limit; + private final List> listeners = new CopyOnWriteArrayList<>(); + + protected AbstractLimit(int initialLimit) { + this.limit = initialLimit; + } + + @Override + public final synchronized void onSample(long startTime, long rtt, int inflight, boolean didDrop) { + setLimit(_update(startTime, rtt, inflight, didDrop)); + } + + protected abstract int _update(long startTime, long rtt, int inflight, boolean didDrop); + + @Override + public final int getLimit() { + return limit; + } + + protected synchronized void setLimit(int newLimit) { + if (newLimit != limit) { + limit = newLimit; + listeners.forEach(listener -> listener.accept(newLimit)); + } + } + + public void notifyOnChange(Consumer consumer) { + this.listeners.add(consumer); + } + + +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/FixedLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/FixedLimit.java index 44913e23..5b742ee2 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/FixedLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/FixedLimit.java @@ -1,33 +1,25 @@ package com.netflix.concurrency.limits.limit; -import com.netflix.concurrency.limits.Limit; - /** * Non dynamic limit with fixed value */ -public final class FixedLimit implements Limit { - - private final int limit; +public final class FixedLimit extends AbstractLimit { public static FixedLimit of(int limit) { return new FixedLimit(limit); } private FixedLimit(int limit) { - this.limit = limit; - } - - @Override - public int getLimit() { - return limit; + super(limit); } @Override - public void update(SampleWindow sample) { + public int _update(long startTime, long rtt, int inflight, boolean didDrop) { + return getLimit(); } @Override public String toString() { - return "FixedLimit [limit=" + limit + "]"; + return "FixedLimit [limit=" + getLimit() + "]"; } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java new file mode 100644 index 00000000..68b54f4d --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java @@ -0,0 +1,275 @@ +package com.netflix.concurrency.limits.limit; + +import com.netflix.concurrency.limits.MetricIds; +import com.netflix.concurrency.limits.MetricRegistry; +import com.netflix.concurrency.limits.MetricRegistry.SampleListener; +import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; +import com.netflix.concurrency.limits.limit.measurement.ExpAvgMeasurement; +import com.netflix.concurrency.limits.limit.measurement.Measurement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Concurrency limit algorithm that adjust the limits based on the gradient of change in the + * samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the + * current limit. Why square root? Because it's better than a fixed queue size that becomes too + * small for large limits but still prevents the limit from growing too much by slowing down + * growth as the limit grows. + */ +public final class Gradient2Limit extends AbstractLimit { + private static final int DISABLED = -1; + + private static final Logger LOG = LoggerFactory.getLogger(Gradient2Limit.class); + + public static class Builder { + private int initialLimit = 4; + private int minLimit = 4; + private int maxConcurrency = 1000; + + private double smoothing = 0.2; + private Function queueSize = concurrency -> 4; + private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; + private int shortWindow = 10; + private int longWindow = 100; + + /** + * Initial limit used by the limiter + * @param initialLimit + * @return Chainable builder + */ + public Builder initialLimit(int initialLimit) { + this.initialLimit = initialLimit; + return this; + } + + /** + * Minimum concurrency limit allowed. The minimum helps prevent the algorithm from adjust the limit + * too far down. Note that this limit is not desirable when use as backpressure for batch apps. + * + * @param minLimit + * @return Chainable builder + */ + public Builder minLimit(int minLimit) { + this.minLimit = minLimit; + return this; + } + + /** + * Maximum allowable concurrency. Any estimated concurrency will be capped + * at this value + * @param maxConcurrency + * @return Chainable builder + */ + public Builder maxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + return this; + } + + /** + * Fixed amount the estimated limit can grow while latencies remain low + * @param queueSize + * @return Chainable builder + */ + public Builder queueSize(int queueSize) { + this.queueSize = (ignore) -> queueSize; + return this; + } + + /** + * Function to dynamically determine the amount the estimated limit can grow while + * latencies remain low as a function of the current limit. + * @param queueSize + * @return Chainable builder + */ + public Builder queueSize(Function queueSize) { + this.queueSize = queueSize; + return this; + } + + /** + * Smoothing factor to limit how aggressively the estimated limit can shrink + * when queuing has been detected. + * @param smoothing Value of 0.0 to 1.0 where 1.0 means the limit is completely + * replicated by the new estimate. + * @return Chainable builder + */ + public Builder smoothing(double smoothing) { + this.smoothing = smoothing; + return this; + } + + /** + * Registry for reporting metrics about the limiter's internal state. + * @param registry + * @return Chainable builder + */ + public Builder metricRegistry(MetricRegistry registry) { + this.registry = registry; + return this; + } + + public Builder shortWindow(int n) { + this.shortWindow = n; + return this; + } + + public Builder longWindow(int n) { + this.longWindow = n; + return this; + } + + public Gradient2Limit build() { + return new Gradient2Limit(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static Gradient2Limit newDefault() { + return newBuilder().build(); + } + + /** + * Estimated concurrency limit based on our algorithm + */ + private volatile double estimatedLimit; + + /** + * Tracks a measurement of the short time, and more volatile, RTT meant to represent the current system latency + */ + private final Measurement shortRtt; + + /** + * Tracks a measurement of the long term, less volatile, RTT meant to represent the baseline latency. When the system + * is under load this number is expect to trend higher. + */ + private final Measurement longRtt; + + /** + * Maximum allowed limit providing an upper bound failsafe + */ + private final int maxLimit; + + private final int minLimit; + + private final Function queueSize; + + private final double smoothing; + + private final SampleListener longRttSampleListener; + + private final SampleListener shortRttSampleListener; + + private final SampleListener queueSizeSampleListener; + + private final int maxDriftIntervals; + + private int intervalsAbove = 0; + + private Gradient2Limit(Builder builder) { + super(builder.initialLimit); + + this.estimatedLimit = builder.initialLimit; + this.maxLimit = builder.maxConcurrency; + this.minLimit = builder.minLimit; + this.queueSize = builder.queueSize; + this.smoothing = builder.smoothing; + this.shortRtt = new ExpAvgMeasurement(builder.shortWindow); + this.longRtt = new ExpAvgMeasurement(builder.longWindow); + this.maxDriftIntervals = builder.shortWindow * 10; + + this.longRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME); + this.shortRttSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_MIN_RTT_NAME); + this.queueSizeSampleListener = builder.registry.registerDistribution(MetricIds.WINDOW_QUEUE_SIZE_NAME); + } + + @Override + public int _update(final long startTime, final long rtt, final int inflight, final boolean didDrop) { + final double queueSize = this.queueSize.apply((int)this.estimatedLimit); + + final double shortRtt = this.shortRtt.add(rtt).doubleValue(); + final double longRtt = this.longRtt.add(rtt).doubleValue(); + + // Under steady state we expect the short and long term RTT to whipsaw. We can identify that a system is under + // long term load when there is no crossover detected for a certain number of internals, normally a multiple of + // the short term RTT window. Since both short and long term RTT trend higher this state results in the limit + // slowly trending upwards, increasing the queue and latency. To mitigate this we drop both the limit and + // long term latency value to effectively probe for less queueing and better latency. + if (shortRtt < longRtt) { + intervalsAbove = 0; + } else if (shortRtt > longRtt){ + intervalsAbove++; + } + + if (intervalsAbove > maxDriftIntervals) { + intervalsAbove = 0; + int newLimit = (int)Math.max(minLimit, queueSize); + // This helps prevent an unreasonably low long term RTT + if (newLimit * 2 < estimatedLimit) { + this.longRtt.update(current -> current.doubleValue() / 2); + } + estimatedLimit = newLimit; + return (int)estimatedLimit; + } + // Because we're using lots of averages it's possible for the short term RTT to be substantially lower than + // the longer term Rtt. When that happens we need to 'reset' the long term RTT. Averaging the short and long + // term rtt values smooths out this 'reset'. + if (shortRtt < longRtt) { + this.longRtt.update(current -> (longRtt + shortRtt) / 2); + return (int)estimatedLimit; + } + + shortRttSampleListener.addSample(shortRtt); + longRttSampleListener.addSample(longRtt); + queueSizeSampleListener.addSample(queueSize); + + // Rtt could be higher than rtt_noload because of smoothing rtt noload updates + // so set to 1.0 to indicate no queuing. Otherwise calculate the slope and don't + // allow it to be reduced by more than half to avoid aggressive load-sheding due to + // outliers. + final double gradient = Math.max(0.5, Math.min(1.0, longRtt / shortRtt)); + + double newLimit; + // Don't grow the limit if we are app limited + if (inflight < estimatedLimit / 2) { + return (int) estimatedLimit; + } + + newLimit = estimatedLimit * gradient + queueSize; + if (newLimit < estimatedLimit) { + newLimit = Math.max(minLimit, estimatedLimit * (1-smoothing) + smoothing*(newLimit)); + } + newLimit = Math.max(queueSize, Math.min(maxLimit, newLimit)); + + if ((int)estimatedLimit != newLimit) { + LOG.debug("New limit={} shortRtt={} ms longRtt={} ms queueSize={} gradient={}", + (int) newLimit, + getShortRtt(TimeUnit.MICROSECONDS) / 1000.0, + getLongRtt(TimeUnit.MICROSECONDS) / 1000.0, + queueSize, + gradient); + } + + estimatedLimit = newLimit; + + return (int)estimatedLimit; + } + + public long getShortRtt(TimeUnit units) { + return units.convert(shortRtt.get().longValue(), TimeUnit.NANOSECONDS); + } + + public long getLongRtt(TimeUnit units) { + return units.convert(longRtt.get().longValue(), TimeUnit.NANOSECONDS); + } + + @Override + public String toString() { + return "GradientLimit [limit=" + (int)estimatedLimit + "]"; + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java index 244db848..e73785e5 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java @@ -1,19 +1,19 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.concurrency.limits.Limit; import com.netflix.concurrency.limits.MetricIds; import com.netflix.concurrency.limits.MetricRegistry; import com.netflix.concurrency.limits.MetricRegistry.SampleListener; import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; import com.netflix.concurrency.limits.internal.Preconditions; import com.netflix.concurrency.limits.limit.functions.SquareRootFunction; +import com.netflix.concurrency.limits.limit.measurement.Measurement; +import com.netflix.concurrency.limits.limit.measurement.MinimumMeasurement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Concurrency limit algorithm that adjust the limits based on the gradient of change in the @@ -22,7 +22,7 @@ * small for large limits but still prevents the limit from growing too much by slowing down * growth as the limit grows. */ -public final class GradientLimit implements Limit { +public final class GradientLimit extends AbstractLimit { private static final int DISABLED = -1; private static final Logger LOG = LoggerFactory.getLogger(GradientLimit.class); @@ -199,6 +199,7 @@ public static GradientLimit newDefault() { private int resetRttCounter; private GradientLimit(Builder builder) { + super(builder.initialLimit); this.estimatedLimit = builder.initialLimit; this.maxLimit = builder.maxConcurrency; this.minLimit = builder.minLimit; @@ -222,10 +223,7 @@ private int nextProbeCountdown() { } @Override - public synchronized void update(SampleWindow sample) { - Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos()); - - final long rtt = sample.getAverateRttNanos(); + public int _update(final long startTime, final long rtt, final int inflight, final boolean didDrop) { minWindowRttSampleListener.addSample(rtt); final double queueSize = this.queueSize.apply((int)this.estimatedLimit); @@ -240,7 +238,7 @@ public synchronized void update(SampleWindow sample) { estimatedLimit = Math.max(minLimit, queueSize); rttNoLoadMeasurement.reset(); LOG.debug("Probe MinRTT limit={}", getLimit()); - return; + return (int)estimatedLimit; } final long rttNoLoad = rttNoLoadMeasurement.add(rtt).longValue(); @@ -254,16 +252,14 @@ public synchronized void update(SampleWindow sample) { double newLimit; // Reduce the limit aggressively if there was a drop - if (sample.didDrop()) { + if (didDrop) { newLimit = estimatedLimit/2; // Don't grow the limit if we are app limited - } else if (sample.getMaxInFlight() < estimatedLimit / 2) { - return; - // Normal update to the limit - } else { - newLimit = estimatedLimit * gradient + queueSize; + } else if (inflight < estimatedLimit / 2) { + return (int)estimatedLimit; } - + + newLimit = estimatedLimit * gradient + queueSize; if (newLimit < estimatedLimit) { newLimit = Math.max(minLimit, estimatedLimit * (1-smoothing) + smoothing*(newLimit)); } @@ -280,10 +276,6 @@ public synchronized void update(SampleWindow sample) { } estimatedLimit = newLimit; - } - - @Override - public int getLimit() { return (int)estimatedLimit; } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ImmutableSampleWindow.java similarity index 89% rename from concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java rename to concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ImmutableSampleWindow.java index 45175d75..d2908979 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ImmutableSampleWindow.java @@ -1,13 +1,11 @@ -package com.netflix.concurrency.limits.limiter; - -import com.netflix.concurrency.limits.Limit; +package com.netflix.concurrency.limits.limit; import java.util.concurrent.TimeUnit; /** * Class used to track immutable samples in an AtomicReference */ -public class ImmutableSampleWindow implements Limit.SampleWindow { +class ImmutableSampleWindow { final long minRtt; final int maxInFlight; final int sampleCount; @@ -38,27 +36,22 @@ public ImmutableSampleWindow addDroppedSample(int maxInFlight) { return new ImmutableSampleWindow(minRtt, sum, Math.max(maxInFlight, this.maxInFlight), sampleCount, true); } - @Override public long getCandidateRttNanos() { return minRtt; } - @Override public long getAverateRttNanos() { return sampleCount == 0 ? 0 : sum / sampleCount; } - @Override public int getMaxInFlight() { return maxInFlight; } - @Override public int getSampleCount() { return sampleCount; } - @Override public boolean didDrop() { return didDrop; } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/SettableLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/SettableLimit.java index 82f397c6..00adc619 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/SettableLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/SettableLimit.java @@ -6,33 +6,27 @@ * {@link Limit} to be used mostly for testing where the limit can be manually * adjusted. */ -public class SettableLimit implements Limit { - - private int limit; +public class SettableLimit extends AbstractLimit { public static SettableLimit startingAt(int limit) { return new SettableLimit(limit); } public SettableLimit(int limit) { - this.limit = limit; + super(limit); } @Override - public synchronized int getLimit() { - return limit; - } - - @Override - public void update(SampleWindow sample) { + protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { + return getLimit(); } public synchronized void setLimit(int limit) { - this.limit = limit; + super.setLimit(limit); } @Override public String toString() { - return "SettableLimit [limit=" + limit + "]"; + return "SettableLimit [limit=" + getLimit() + "]"; } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/TracingLimitDecorator.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/TracingLimitDecorator.java index 8fe4c35d..a232765b 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/TracingLimitDecorator.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/TracingLimitDecorator.java @@ -1,11 +1,11 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.TimeUnit; - +import com.netflix.concurrency.limits.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.netflix.concurrency.limits.Limit; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public class TracingLimitDecorator implements Limit { private static final Logger LOG = LoggerFactory.getLogger(TracingLimitDecorator.class); @@ -26,12 +26,15 @@ public int getLimit() { } @Override - public void update(SampleWindow sample) { - LOG.debug("sampleCount={} maxInFlight={} minRtt={} ms", - sample.getSampleCount(), - sample.getMaxInFlight(), - TimeUnit.NANOSECONDS.toMicros(sample.getCandidateRttNanos()) / 1000.0); - delegate.update(sample); + public void onSample(long startTime, long rtt, int inflight, boolean didDrop) { + LOG.debug("maxInFlight={} minRtt={} ms", + inflight, + TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); + delegate.onSample(startTime, rtt, inflight, didDrop); } + @Override + public void notifyOnChange(Consumer consumer) { + delegate.notifyOnChange(consumer); + } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java index e32db8ca..885e3048 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java @@ -1,19 +1,17 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.concurrency.limits.Limit; import com.netflix.concurrency.limits.MetricIds; import com.netflix.concurrency.limits.MetricRegistry; import com.netflix.concurrency.limits.MetricRegistry.SampleListener; import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; import com.netflix.concurrency.limits.internal.Preconditions; import com.netflix.concurrency.limits.limit.functions.Log10RootFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small ({@literal <} alpha) @@ -25,7 +23,7 @@ * For traditional TCP Vegas alpha is typically 2-3 and beta is typically 4-6. To allow for better growth and * stability at higher limits we set alpha=Max(3, 10% of the current limit) and beta=Max(6, 20% of the current limit) */ -public class VegasLimit implements Limit { +public class VegasLimit extends AbstractLimit { private static final Logger LOG = LoggerFactory.getLogger(VegasLimit.class); private static final Function LOG10 = Log10RootFunction.create(0); @@ -160,6 +158,7 @@ public static VegasLimit newDefault() { private int probeCountdown; private VegasLimit(Builder builder) { + super(builder.initialLimit); this.estimatedLimit = builder.initialLimit; this.maxLimit = builder.maxConcurrency; this.alphaFunc = builder.alphaFunc; @@ -180,38 +179,37 @@ private int nextProbeCountdown() { } @Override - public synchronized void update(SampleWindow sample) { - long rtt = sample.getCandidateRttNanos(); + protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { Preconditions.checkArgument(rtt > 0, "rtt must be >0 but got " + rtt); if (probeCountdown != DISABLED && probeCountdown-- <= 0) { LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); probeCountdown = nextProbeCountdown(); rtt_noload = rtt; - return; + return (int)estimatedLimit; } if (rtt_noload == 0 || rtt < rtt_noload) { LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0); rtt_noload = rtt; - return; + return (int)estimatedLimit; } rttSampleListener.addSample(rtt_noload); - updateEstimatedLimit(sample, rtt); + return updateEstimatedLimit(rtt, inflight, didDrop); } - private void updateEstimatedLimit(SampleWindow sample, long rtt) { + private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) { final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt)); double newLimit; // Treat any drop (i.e timeout) as needing to reduce the limit - if (sample.didDrop()) { + if (didDrop) { newLimit = decreaseFunc.apply(estimatedLimit); // Prevent upward drift if not close to the limit - } else if (sample.getMaxInFlight() * 2 < estimatedLimit) { - return; + } else if (inflight * 2 < estimatedLimit) { + return (int)estimatedLimit; } else { int alpha = alphaFunc.apply((int)estimatedLimit); int beta = betaFunc.apply((int)estimatedLimit); @@ -228,7 +226,7 @@ private void updateEstimatedLimit(SampleWindow sample, long rtt) { newLimit = decreaseFunc.apply(estimatedLimit); // We're within he sweet spot so nothing to do } else { - return; + return (int)estimatedLimit; } } @@ -242,10 +240,6 @@ private void updateEstimatedLimit(SampleWindow sample, long rtt) { queueSize); } estimatedLimit = newLimit; - } - - @Override - public int getLimit() { return (int)estimatedLimit; } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/WindowedLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/WindowedLimit.java new file mode 100644 index 00000000..c7d1d682 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/WindowedLimit.java @@ -0,0 +1,140 @@ +package com.netflix.concurrency.limits.limit; + +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.internal.Preconditions; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +public class WindowedLimit implements Limit { + private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); + private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); + private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100); + + /** + * Minimum observed samples to filter out sample windows with not enough significant samples + */ + private static final int DEFAULT_WINDOW_SIZE = 100; + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private long maxWindowTime = DEFAULT_MAX_WINDOW_TIME; + private long minWindowTime = DEFAULT_MIN_WINDOW_TIME; + private int windowSize = DEFAULT_WINDOW_SIZE; + private long minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD; + + /** + * Minimum window duration for sampling a new minRtt + */ + public Builder minWindowTime(long minWindowTime, TimeUnit units) { + Preconditions.checkArgument(units.toMillis(minWindowTime) >= 100, "minWindowTime must be >= 100 ms"); + this.minWindowTime = units.toNanos(minWindowTime); + return this; + } + + /** + * Maximum window duration for sampling a new minRtt + */ + public Builder maxWindowTime(long maxWindowTime, TimeUnit units) { + Preconditions.checkArgument(maxWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms"); + this.maxWindowTime = units.toNanos(maxWindowTime); + return this; + } + + /** + * Minimum sampling window size for finding a new minimum rtt + */ + public Builder windowSize(int windowSize) { + Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10"); + this.windowSize = windowSize; + return this; + } + + public Builder minRttThreshold(long threshold, TimeUnit units) { + this.minRttThreshold = units.toNanos(threshold); + return this; + } + + public WindowedLimit build(Limit delegate) { + return new WindowedLimit(this, delegate); + } + } + + private final Limit delegate; + + /** + * End time for the sampling window at which point the limit should be updated + */ + private volatile long nextUpdateTime = 0; + + private final long minWindowTime; + + private final long maxWindowTime; + + private final int windowSize; + + private final long minRttThreshold; + + private final Object lock = new Object(); + + /** + * Object tracking stats for the current sample window + */ + private final AtomicReference sample = new AtomicReference<>(new ImmutableSampleWindow()); + + private WindowedLimit(Builder builder, Limit delegate) { + this.delegate = delegate; + this.minWindowTime = builder.minWindowTime; + this.maxWindowTime = builder.maxWindowTime; + this.windowSize = builder.windowSize; + this.minRttThreshold = builder.minRttThreshold; + } + + @Override + public void notifyOnChange(Consumer consumer) { + delegate.notifyOnChange(consumer); + } + + @Override + public void onSample(long startTime, long rtt, int inflight, boolean didDrop) { + long endTime = startTime + rtt; + + if (rtt < minRttThreshold) { + return; + } + + if (didDrop) { + sample.updateAndGet(current -> current.addDroppedSample(inflight)); + } else { + sample.updateAndGet(window -> window.addSample(rtt, inflight)); + } + + if (startTime + rtt > nextUpdateTime) { + synchronized (lock) { + // Double check under the lock + if (endTime > nextUpdateTime) { + ImmutableSampleWindow current = sample.get(); + if (isWindowReady(current)) { + sample.set(new ImmutableSampleWindow()); + + nextUpdateTime = endTime + Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime); + delegate.onSample(startTime, current.getAverateRttNanos(), current.getMaxInFlight(), current.didDrop()); + } + } + } + } + } + + private boolean isWindowReady(ImmutableSampleWindow sample) { + return sample.getCandidateRttNanos() < Long.MAX_VALUE && sample.getSampleCount() > windowSize; + } + + @Override + public int getLimit() { + return delegate.getLimit(); + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java new file mode 100644 index 00000000..b18d5c9c --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java @@ -0,0 +1,49 @@ +package com.netflix.concurrency.limits.limit.measurement; + +import java.util.function.Function; + +public class ExpAvgMeasurement implements Measurement { + private Double value = 0.0; + private final int window; + private int count = 0; + + public ExpAvgMeasurement(int window) { + this.window = window; + } + + @Override + public Number add(Number sample) { + if (count == 0) { + count++; + value = sample.doubleValue(); + } else if (count < window) { + count++; + double factor = factor(count); + value = value * (1-factor) + sample.doubleValue() * factor; + } else { + double factor = factor(window); + value = value * (1-factor) + sample.doubleValue() * factor; + } + return value; + } + + private static double factor(int n) { + return 2.0 / (n + 1); + } + + @Override + public Number get() { + return value; + } + + @Override + public void reset() { + value = 0.0; + count = 0; + } + + @Override + public void update(Function operation) { + this.value = operation.apply(value).doubleValue(); + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/Measurement.java similarity index 76% rename from concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java rename to concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/Measurement.java index 517183d0..840235c0 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/Measurement.java @@ -1,4 +1,6 @@ -package com.netflix.concurrency.limits.limit; +package com.netflix.concurrency.limits.limit.measurement; + +import java.util.function.Function; /** * Contract for tracking a measurement such as a minimum or average of a sample set @@ -20,4 +22,6 @@ public interface Measurement { * Reset the internal state as if no samples were ever added */ void reset(); + + void update(Function operation); } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/MinimumMeasurement.java similarity index 69% rename from concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java rename to concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/MinimumMeasurement.java index 3c4bf6bc..075f2a6f 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/MinimumMeasurement.java @@ -1,4 +1,6 @@ -package com.netflix.concurrency.limits.limit; +package com.netflix.concurrency.limits.limit.measurement; + +import java.util.function.Function; public class MinimumMeasurement implements Measurement { private Double value = 0.0; @@ -20,4 +22,9 @@ public Number get() { public void reset() { value = 0.0; } + + @Override + public void update(Function operation) { + + } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java new file mode 100644 index 00000000..04665dfc --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiter.java @@ -0,0 +1,83 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.MetricRegistry; +import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; +import com.netflix.concurrency.limits.limit.VegasLimit; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public abstract class AbstractLimiter implements Limiter { + public abstract static class Builder, ContextT> { + private Limit limit = VegasLimit.newDefault(); + private Supplier clock = System::nanoTime; + protected MetricRegistry registry = EmptyMetricRegistry.INSTANCE; + + public BuilderT limit(Limit limit) { + this.limit = limit; + return self(); + } + + public BuilderT clock(Supplier clock) { + this.clock = clock; + return self(); + } + + public BuilderT metricRegistry(MetricRegistry registry) { + this.registry = registry; + return self(); + } + + protected abstract BuilderT self(); + } + + private final AtomicInteger inFlight = new AtomicInteger(); + private final Supplier clock; + private final Limit limitAlgorithm; + private volatile int limit; + + protected AbstractLimiter(Builder builder) { + this.clock = builder.clock; + this.limitAlgorithm = builder.limit; + this.limit = limitAlgorithm.getLimit(); + this.limitAlgorithm.notifyOnChange(this::onNewLimit); + } + + protected Listener createListener() { + final long startTime = clock.get(); + final int currentInflight = inFlight.incrementAndGet(); + return new Listener() { + @Override + public void onSuccess() { + inFlight.decrementAndGet(); + + limitAlgorithm.onSample(startTime, clock.get() - startTime, currentInflight, false); + } + + @Override + public void onIgnore() { + inFlight.decrementAndGet(); + } + + @Override + public void onDropped() { + inFlight.decrementAndGet(); + + limitAlgorithm.onSample(startTime, clock.get() - startTime, currentInflight, true); + } + }; + } + + public int getLimit() { + return limit; + } + + public int getInflight() { return inFlight.get(); } + + protected void onNewLimit(int newLimit) { + limit = newLimit; + } + +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiterBuilder.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiterBuilder.java deleted file mode 100644 index 517d8d0e..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractLimiterBuilder.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.netflix.concurrency.limits.limiter; - -import com.netflix.concurrency.limits.Limit; -import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.Strategy; -import com.netflix.concurrency.limits.internal.Preconditions; -import com.netflix.concurrency.limits.limit.VegasLimit; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy.Builder; -import com.netflix.concurrency.limits.strategy.SimpleStrategy; - -import java.util.function.Consumer; -import java.util.function.Function; - -/** - * Base class through which an RPC specific limiter may be constructed - * - * @param Concrete builder type specific to an RPC mechanism - * @param Type capturing the context of a request for the concrete RPC mechanism - */ -public abstract class AbstractLimiterBuilder, ContextT> { - protected Strategy strategy; - protected Builder builder; - private DefaultLimiter.Builder limiterBuilder = DefaultLimiter.newBuilder(); - - /** - * Configure the strategy for partitioning the limit. - * @param contextToGroup Mapper from the context to a name group associated with a percentage of the limit - * @param configurer Function through which the value set support by the func may be assigned - * a percentage of the limit - * @return Chainable builder - */ - public BuilderT partitionByLookup( - Function contextToGroup, - Consumer> configurer) { - Preconditions.checkState(this.strategy == null && this.builder == null, "strategy already set"); - - builder = LookupPartitionStrategy.newBuilder(contextToGroup); - configurer.accept(builder); - return self(); - } - - /** - * Strategy for acquiring a token from the limiter based on the context. - * @param strategy - * @return Chainable builder - */ - public BuilderT strategy(Strategy strategy) { - Preconditions.checkState(this.strategy == null && this.builder == null, "strategy already set"); - - this.strategy = strategy; - return self(); - } - - /** - * Set the limit algorithm to use. Default is {@link VegasLimit} - * @param limit Limit algorithm to use - * @return Chainable builder - * @deprecated Use {@link AbstractLimiterBuilder#limiter(Consumer)} - */ - @Deprecated - public BuilderT limit(Limit limit) { - return limiter(builder -> builder.limit(limit)); - } - - public BuilderT limiter(Consumer consumer) { - consumer.accept(this.limiterBuilder); - return self(); - } - - protected Limiter buildLimiter() { - return limiterBuilder.build(getFinalStrategy()); - } - - private Strategy getFinalStrategy() { - if (builder != null) { - return builder.build(); - } else if (strategy != null) { - return strategy; - } else { - return new SimpleStrategy<>(); - } - } - - protected abstract BuilderT self(); -} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java new file mode 100644 index 00000000..97f98aa6 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiter.java @@ -0,0 +1,178 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.MetricIds; +import com.netflix.concurrency.limits.MetricRegistry; +import com.netflix.concurrency.limits.internal.Preconditions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +public abstract class AbstractPartitionedLimiter extends AbstractLimiter { + private static final String PARTITION_TAG_NAME = "partition"; + + public abstract static class Builder, ContextT> extends AbstractLimiter.Builder { + private List> partitionResolvers = new ArrayList<>(); + private final Map partitions = new LinkedHashMap<>(); + + public BuilderT partitionResolver(Function contextToPartition) { + this.partitionResolvers.add(contextToPartition); + return self(); + } + + public BuilderT partition(String name, double percent) { + partitions.put(name, new Partition(name, percent)); + return self(); + } + + protected boolean hasPartitions() { + return !partitions.isEmpty(); + } + + public Limiter build() { + return (this.hasPartitions() && !partitionResolvers.isEmpty()) + ? new AbstractPartitionedLimiter(this) {} + : new SimpleLimiter(this); + } + } + + static class Partition { + private final double percent; + private final String name; + private int limit; + private int busy; + private MetricRegistry.SampleListener inflightDistribution; + + Partition(String name, double pct) { + this.name = name; + this.percent = pct; + } + + void updateLimit(int totalLimit) { + // Calculate this bin's limit while rounding up and ensuring the value + // is at least 1. With this technique the sum of bin limits may end up being + // higher than the concurrency limit. + this.limit = (int)Math.max(1, Math.ceil(totalLimit * percent)); + } + + boolean isLimitExceeded() { + return busy >= limit; + } + + void acquire() { + busy++; + } + + void release() { + busy--; + } + + int getLimit() { + return limit; + } + + public int getInflight() { + return busy; + } + + double getPercent() { + return percent; + } + + void createMetrics(MetricRegistry registry) { + this.inflightDistribution = registry.registerDistribution(MetricIds.INFLIGHT_GUAGE_NAME, PARTITION_TAG_NAME, name); + registry.registerGauge(MetricIds.PARTITION_LIMIT_GUAGE_NAME, this::getLimit, PARTITION_TAG_NAME, name); + } + + @Override + public String toString() { + return "Partition [pct=" + percent + ", limit=" + limit + ", busy=" + busy + "]"; + } + } + + private final Map partitions; + private final Partition unknownPartition; + private final List> partitionResolvers; + + public AbstractPartitionedLimiter(Builder builder) { + super(builder); + + Preconditions.checkArgument(!builder.partitions.isEmpty(), "No partitions specified"); + Preconditions.checkArgument(builder.partitions.values().stream().map(Partition::getPercent).reduce(0.0, Double::sum) <= 1.0, + "Sum of percentages must be <= 1.0"); + + this.partitions = new HashMap<>(builder.partitions); + this.partitions.forEach((name, partition) -> partition.createMetrics(builder.registry)); + + this.unknownPartition = new Partition("unknown", 0.0); + this.unknownPartition.createMetrics(builder.registry); + + this.partitionResolvers = builder.partitionResolvers; + + builder.registry.registerGauge(MetricIds.LIMIT_GUAGE_NAME, this::getLimit); + + onNewLimit(getLimit()); + } + + private Partition resolvePartition(ContextT context) { + for (Function resolver : this.partitionResolvers) { + String name = resolver.apply(context); + if (name != null) { + Partition partition = partitions.get(name); + if (partition != null) { + return partition; + } + } + } + return unknownPartition; + } + + @Override + public synchronized Optional acquire(ContextT context) { + final Partition partition = resolvePartition(context); + + if (getInflight() >= getLimit() && partition.isLimitExceeded()) { + return Optional.empty(); + } + + partition.acquire(); + final Listener listener = createListener(); + return Optional.of(new Listener() { + @Override + public void onSuccess() { + listener.onSuccess(); + releasePartition(partition); + } + + @Override + public void onIgnore() { + listener.onIgnore(); + releasePartition(partition); + } + + @Override + public void onDropped() { + listener.onDropped(); + releasePartition(partition); + } + }); + } + + private synchronized void releasePartition(Partition partition) { + partition.release(); + } + + @Override + protected synchronized void onNewLimit(int newLimit) { + partitions.forEach((name, partition) -> partition.updateLimit(newLimit)); + } + + Partition getPartition(String name) { + return partitions.get(name); + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java deleted file mode 100644 index 7c3b47f7..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java +++ /dev/null @@ -1,226 +0,0 @@ -package com.netflix.concurrency.limits.limiter; - -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import com.netflix.concurrency.limits.Limit; -import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.Strategy; -import com.netflix.concurrency.limits.Strategy.Token; -import com.netflix.concurrency.limits.internal.Preconditions; -import com.netflix.concurrency.limits.limit.VegasLimit; - -/** - * {@link Limiter} that combines a plugable limit algorithm and enforcement strategy to - * enforce concurrency limits to a fixed resource. - * @param - */ -public final class DefaultLimiter implements Limiter { - private final Supplier nanoClock = System::nanoTime; - - private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); - private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); - private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100); - - /** - * Minimum observed samples to filter out sample windows with not enough significant samples - */ - private static final int DEFAULT_WINDOW_SIZE = 100; - - /** - * End time for the sampling window at which point the limit should be updated - */ - private volatile long nextUpdateTime = 0; - - private final Limit limit; - - private final Strategy strategy; - - private final long minWindowTime; - - private final long maxWindowTime; - - private final int windowSize; - - private final long minRttThreshold; - - /** - * Object tracking stats for the current sample window - */ - private final AtomicReference sample = new AtomicReference<>(new ImmutableSampleWindow()); - - /** - * Counter tracking the current number of inflight requests - */ - private final AtomicInteger inFlight = new AtomicInteger(); - - private final Object lock = new Object(); - - public static class Builder { - private Limit limit = VegasLimit.newDefault(); - private long maxWindowTime = DEFAULT_MAX_WINDOW_TIME; - private long minWindowTime = DEFAULT_MIN_WINDOW_TIME; - private int windowSize = DEFAULT_WINDOW_SIZE; - private long minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD; - - /** - * Algorithm used to determine the new limit based on the current limit and minimum - * measured RTT in the sample window - */ - public Builder limit(Limit limit) { - Preconditions.checkArgument(limit != null, "Algorithm may not be null"); - this.limit = limit; - return this; - } - - /** - * Minimum window duration for sampling a new minRtt - */ - public Builder minWindowTime(long minWindowTime, TimeUnit units) { - Preconditions.checkArgument(units.toMillis(minWindowTime) >= 100, "minWindowTime must be >= 100 ms"); - this.minWindowTime = units.toNanos(minWindowTime); - return this; - } - - /** - * Maximum window duration for sampling a new minRtt - */ - public Builder maxWindowTime(long maxWindowTime, TimeUnit units) { - Preconditions.checkArgument(maxWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms"); - this.maxWindowTime = units.toNanos(maxWindowTime); - return this; - } - - /** - * Minimum sampling window size for finding a new minimum rtt - */ - public Builder windowSize(int windowSize) { - Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10"); - this.windowSize = windowSize; - return this; - } - - public Builder minRttThreshold(long threshold, TimeUnit units) { - this.minRttThreshold = units.toNanos(threshold); - return this; - } - - /** - * @param strategy Strategy for enforcing the limit - */ - public DefaultLimiter build(Strategy strategy) { - Preconditions.checkArgument(strategy != null, "Strategy may not be null"); - return new DefaultLimiter(this, strategy); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * @deprecated Use {@link DefaultLimiter#newBuilder} - * @param limit - * @param strategy - */ - @Deprecated - public DefaultLimiter(Limit limit, Strategy strategy) { - Preconditions.checkArgument(limit != null, "Algorithm may not be null"); - Preconditions.checkArgument(strategy != null, "Strategy may not be null"); - this.limit = limit; - this.strategy = strategy; - this.windowSize = DEFAULT_WINDOW_SIZE; - this.minWindowTime = DEFAULT_MIN_WINDOW_TIME; - this.maxWindowTime = DEFAULT_MAX_WINDOW_TIME; - this.minRttThreshold = DEFAULT_MIN_RTT_THRESHOLD; - strategy.setLimit(limit.getLimit()); - } - - private DefaultLimiter(Builder builder, Strategy strategy) { - this.limit = builder.limit; - this.minWindowTime = builder.minWindowTime; - this.maxWindowTime = builder.maxWindowTime; - this.windowSize = builder.windowSize; - this.strategy = strategy; - this.minRttThreshold = builder.minRttThreshold; - strategy.setLimit(limit.getLimit()); - } - - @Override - public Optional acquire(final ContextT context) { - // Did we exceed the limit - final Token token = strategy.tryAcquire(context); - if (!token.isAcquired()) { - return Optional.empty(); - } - - final long startTime = nanoClock.get(); - int currentMaxInFlight = inFlight.incrementAndGet(); - - return Optional.of(new Listener() { - @Override - public void onSuccess() { - inFlight.decrementAndGet(); - token.release(); - - final long endTime = nanoClock.get(); - final long rtt = endTime - startTime; - - if (rtt < minRttThreshold) { - return; - } - sample.updateAndGet(window -> window.addSample(rtt, currentMaxInFlight)); - - if (endTime > nextUpdateTime) { - synchronized (lock) { - // Double check under the lock - if (endTime > nextUpdateTime) { - ImmutableSampleWindow current = sample.get(); - if (isWindowReady(current)) { - sample.set(new ImmutableSampleWindow()); - - nextUpdateTime = endTime + Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime); - limit.update(current); - strategy.setLimit(limit.getLimit()); - } - } - } - } - } - - @Override - public void onIgnore() { - inFlight.decrementAndGet(); - token.release(); - } - - @Override - public void onDropped() { - inFlight.decrementAndGet(); - token.release(); - - sample.getAndUpdate(current -> current.addDroppedSample(currentMaxInFlight)); - } - }); - } - - private boolean isWindowReady(ImmutableSampleWindow sample) { - return sample.getCandidateRttNanos() < Long.MAX_VALUE && sample.getSampleCount() > windowSize; - } - - protected int getLimit() { - return limit.getLimit(); - } - - @Override - public String toString() { - return "DefaultLimiter [RTT_candidate=" + TimeUnit.NANOSECONDS.toMicros(sample.get().getCandidateRttNanos()) / 1000.0 - + ", maxInFlight=" + inFlight - + ", " + limit - + ", " + strategy - + "]"; - } -} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java new file mode 100644 index 00000000..a7de1881 --- /dev/null +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/SimpleLimiter.java @@ -0,0 +1,32 @@ +package com.netflix.concurrency.limits.limiter; + +import java.util.Optional; + +public class SimpleLimiter extends AbstractLimiter { + public static class Builder extends AbstractLimiter.Builder, ContextT> { + public SimpleLimiter build() { + return new SimpleLimiter(this); + } + + @Override + protected Builder self() { + return this; + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public SimpleLimiter(AbstractLimiter.Builder builder) { + super(builder); + } + + @Override + public Optional acquire(ContextT context) { + if (getInflight() > getLimit()) { + return Optional.empty(); + } + return Optional.of(createListener()); + } +} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/LookupPartitionStrategy.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/LookupPartitionStrategy.java deleted file mode 100644 index 02aec046..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/LookupPartitionStrategy.java +++ /dev/null @@ -1,179 +0,0 @@ -package com.netflix.concurrency.limits.strategy; - -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; - -import com.netflix.concurrency.limits.MetricIds; -import com.netflix.concurrency.limits.MetricRegistry; -import com.netflix.concurrency.limits.MetricRegistry.SampleListener; -import com.netflix.concurrency.limits.Strategy; -import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; -import com.netflix.concurrency.limits.internal.Preconditions; - -/** - * Strategy for partitioning the limiter by named groups where the allocation of - * group to percentage is provided up front. - * @param - */ -public class LookupPartitionStrategy implements Strategy { - private static final String PARTITION_TAG_NAME = "partition"; - - public static class Builder { - private final Function lookup; - private final Map partitions = new LinkedHashMap<>(); - private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; - - protected Builder(Function lookup) { - this.lookup = lookup; - } - - public Builder metricRegistry(MetricRegistry registry) { - this.registry = registry; - return this; - } - - public Builder assign(String group, Double percent) { - partitions.put(group, new Partition(group, percent)); - return this; - } - - public LookupPartitionStrategy build() { - return new LookupPartitionStrategy(this); - } - - public boolean hasPartitions() { - return !partitions.isEmpty(); - } - } - - public static Builder newBuilder(Function lookup) { - return new Builder(lookup); - } - - private final Map partitions; - private final Partition unknownPartition; - private final Function lookup; - private int busy = 0; - private int limit = 0; - - private LookupPartitionStrategy(Builder builder) { - Preconditions.checkArgument(!builder.partitions.isEmpty(), "No partitions specified"); - Preconditions.checkArgument(builder.partitions.values().stream().map(Partition::getPercent).reduce(0.0, Double::sum) <= 1.0, - "Sum of percentages must be <= 1.0"); - - this.partitions = new HashMap<>(builder.partitions); - this.partitions.forEach((name, partition) -> partition.createMetrics(builder.registry)); - - this.unknownPartition = new Partition("unknown", 0.0); - this.unknownPartition.createMetrics(builder.registry); - - this.lookup = builder.lookup; - - builder.registry.registerGauge(MetricIds.LIMIT_GUAGE_NAME, this::getLimit); - } - - @Override - public synchronized Token tryAcquire(T type) { - final Partition partition = partitions.getOrDefault(lookup.apply(type), this.unknownPartition); - - if (busy >= limit && partition.isLimitExceeded()) { - return Token.newNotAcquired(busy); - } - busy++; - partition.acquire(); - return Token.newAcquired(busy, () -> releasePartition(partition)); - } - - private synchronized void releasePartition(Partition partition) { - busy--; - partition.release(); - } - - @Override - public synchronized void setLimit(int newLimit) { - if (this.limit != newLimit) { - this.limit = newLimit; - partitions.forEach((name, partition) -> partition.updateLimit(newLimit)); - } - } - - public synchronized int getLimit() { - return limit; - } - - private static class Partition { - private final double percent; - private final String name; - private SampleListener busyDistribution; - private int limit; - private int busy; - - public Partition(String name, double pct) { - this.name = name; - this.percent = pct; - } - - public void createMetrics(MetricRegistry registry) { - this.busyDistribution = registry.registerDistribution(MetricIds.INFLIGHT_GUAGE_NAME, PARTITION_TAG_NAME, name); - registry.registerGauge(MetricIds.PARTITION_LIMIT_GUAGE_NAME, this::getLimit, PARTITION_TAG_NAME, name); - } - - public void updateLimit(int totalLimit) { - // Calculate this bin's limit while rounding up and ensuring the value - // is at least 1. With this technique the sum of bin limits may end up being - // higher than the concurrency limit. - this.limit = (int)Math.max(1, Math.ceil(totalLimit * percent)); - } - - public boolean isLimitExceeded() { - return busy >= limit; - } - - public void acquire() { - busy++; - busyDistribution.addSample(busy); - } - - public void release() { - busy--; - } - - public int getLimit() { - return limit; - } - - public double getPercent() { - return percent; - } - - @Override - public String toString() { - return "Partition [pct=" + percent + ", limit=" + limit + ", busy=" + busy + "]"; - } - } - - synchronized int getBinBusyCount(String key) { - return Optional.ofNullable(partitions.get(key)) - .orElseThrow(() -> new IllegalArgumentException("Invalid group " + key)) - .busy; - } - - synchronized int getBinLimit(String key) { - return Optional.ofNullable(partitions.get(key)) - .orElseThrow(() -> new IllegalArgumentException("Invalid group " + key)) - .limit; - } - - synchronized int getBusyCount() { - return busy; - } - - @Override - public String toString() { - return "LookupPartitionedStrategy [partitions=" + partitions + ", unknownPartition=" + unknownPartition - + ", limit=" + limit + "]"; - } -} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/PredicatePartitionStrategy.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/PredicatePartitionStrategy.java deleted file mode 100644 index 94281e06..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/PredicatePartitionStrategy.java +++ /dev/null @@ -1,174 +0,0 @@ -package com.netflix.concurrency.limits.strategy; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Predicate; - -import com.netflix.concurrency.limits.MetricIds; -import com.netflix.concurrency.limits.MetricRegistry; -import com.netflix.concurrency.limits.MetricRegistry.SampleListener; -import com.netflix.concurrency.limits.Strategy; -import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; -import com.netflix.concurrency.limits.internal.Preconditions; - -/** - * Concurrency limiter that guarantees a certain percentage of the limit to specific callers - * while allowing callers to borrow from underutilized callers. - * - * Callers are identified by their index into an array of percentages passed in during initialization. - * A percentage of 0.0 means that a caller may only use excess capacity and can be completely - * starved when the limit is reached. A percentage of 1.0 means that the caller - * is guaranteed to get the entire limit. - * - * grpc.server.call.inflight (group=[a, b, c]) - * grpc.server.call.limit (group=[a,b,c]) - */ -public final class PredicatePartitionStrategy implements Strategy { - private static final String PARTITION_TAG_NAME = "partition"; - - public static class Builder { - private final List> partitions = new ArrayList<>(); - private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; - - public Builder metricRegistry(MetricRegistry registry) { - this.registry = registry; - return this; - } - - public Builder add(String name, Double pct, Predicate predicate) { - partitions.add(new Partition(name, pct, predicate)); - return this; - } - - public PredicatePartitionStrategy build() { - return new PredicatePartitionStrategy(this); - } - - public boolean hasPartitions() { - return !partitions.isEmpty(); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - private final List> partitions; - private int busy = 0; - private int limit = 0; - - private PredicatePartitionStrategy(Builder builder) { - Preconditions.checkArgument(builder.partitions.stream().map(Partition::getPercent).reduce(0.0, Double::sum) <= 1.0, - "Sum of percentages must be <= 1.0"); - - this.partitions = new ArrayList<>(builder.partitions); - this.partitions.forEach(partition -> partition.createMetrics(builder.registry)); - - builder.registry.registerGauge(MetricIds.LIMIT_GUAGE_NAME, this::getLimit); - } - - @Override - public synchronized Token tryAcquire(T type) { - for (final Partition partition : partitions) { - if (partition.predicate.test(type)) { - if (busy >= limit && partition.isLimitExceeded()) { - break; - } - busy++; - partition.acquire(); - return Token.newAcquired(busy, () -> releasePartition(partition)); - } - } - return Token.newNotAcquired(busy); - } - - private synchronized void releasePartition(Partition partition) { - busy--; - partition.release(); - } - - @Override - public synchronized void setLimit(int newLimit) { - if (this.limit != newLimit) { - this.limit = newLimit; - partitions.forEach(partition -> partition.updateLimit(newLimit)); - } - } - - public synchronized int getLimit() { - return limit; - } - - private static class Partition { - private final double percent; - private final Predicate predicate; - private final String name; - private SampleListener inflightDistribution; - private int limit; - private int busy; - - public Partition(String name, double pct, Predicate predicate) { - this.name = name; - this.percent = pct; - this.predicate = predicate; - } - - public void createMetrics(MetricRegistry registry) { - this.inflightDistribution = registry.registerDistribution(MetricIds.INFLIGHT_GUAGE_NAME, PARTITION_TAG_NAME, name); - registry.registerGauge(MetricIds.PARTITION_LIMIT_GUAGE_NAME, this::getLimit, PARTITION_TAG_NAME, name); - } - - public void updateLimit(int totalLimit) { - // Calculate this bin's limit while rounding up and ensuring the value - // is at least 1. With this technique the sum of bin limits may end up being - // higher than the concurrency limit. - this.limit = (int)Math.max(1, Math.ceil(totalLimit * percent)); - } - - public boolean isLimitExceeded() { - return busy >= limit; - } - - public void acquire() { - busy++; - inflightDistribution.addSample(busy); - } - - public void release() { - busy--; - } - - public int getLimit() { - return limit; - } - - public double getPercent() { - return percent; - } - - @Override - public String toString() { - return "Partition [pct=" + percent + ", limit=" + limit + ", busy=" + busy + "]"; - } - } - - synchronized int getBinBusyCount(int index) { - Preconditions.checkArgument(index >= 0 && index < partitions.size(), "Invalid bin index " + index); - return partitions.get(index).busy; - } - - synchronized int getBinLimit(int index) { - Preconditions.checkArgument(index >= 0 && index < partitions.size(), "Invalid bin index " + index); - return partitions.get(index).limit; - } - - synchronized public int getBusyCount() { - return busy; - } - - @Override - public String toString() { - final int maxLen = 10; - return "PercentageStrategy [" + partitions.subList(0, Math.min(partitions.size(), maxLen)) + "]"; - } -} diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/SimpleStrategy.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/SimpleStrategy.java deleted file mode 100644 index c0f44108..00000000 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/strategy/SimpleStrategy.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.netflix.concurrency.limits.strategy; - -import java.util.concurrent.atomic.AtomicInteger; - -import com.netflix.concurrency.limits.MetricIds; -import com.netflix.concurrency.limits.MetricRegistry; -import com.netflix.concurrency.limits.MetricRegistry.SampleListener; -import com.netflix.concurrency.limits.Strategy; -import com.netflix.concurrency.limits.internal.EmptyMetricRegistry; - -/** - * Simplest strategy for enforcing a concurrency limit that has a single counter - * for tracking total usage. - */ -public final class SimpleStrategy implements Strategy { - - private final AtomicInteger busy = new AtomicInteger(); - private volatile int limit = 1; - private final SampleListener inflightMetric; - - public SimpleStrategy() { - this(EmptyMetricRegistry.INSTANCE); - } - - public SimpleStrategy(MetricRegistry registry) { - this.inflightMetric = registry.registerDistribution(MetricIds.INFLIGHT_GUAGE_NAME); - registry.registerGauge(MetricIds.LIMIT_GUAGE_NAME, this::getLimit); - } - - @Override - public Token tryAcquire(T context) { - final int currentBusy = busy.get(); - if (currentBusy >= limit) { - inflightMetric.addSample(currentBusy); - return Token.newNotAcquired(currentBusy); - } - - final int inflight = busy.incrementAndGet(); - inflightMetric.addSample(inflight); - return Token.newAcquired(inflight, busy::decrementAndGet); - } - - @Override - public void setLimit(int limit) { - if (limit < 1) { - limit = 1; - } - this.limit = limit; - } - - // Visible for testing - int getLimit() { - return limit; - } - - int getBusyCount() { - return busy.get(); - } - - @Override - public String toString() { - return "SimpleStrategy [busy=" + busy.get() + ", limit=" + limit + "]"; - } -} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java index 46bdf5c1..8387c428 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/executor/BlockingAdaptiveExecutorSimulation.java @@ -8,6 +8,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; import org.junit.Ignore; import org.junit.Test; @@ -16,14 +18,12 @@ import com.netflix.concurrency.limits.limit.GradientLimit; import com.netflix.concurrency.limits.limit.TracingLimitDecorator; import com.netflix.concurrency.limits.limit.VegasLimit; -import com.netflix.concurrency.limits.limiter.DefaultLimiter; -import com.netflix.concurrency.limits.strategy.SimpleStrategy; @Ignore("These are simulations and not tests") public class BlockingAdaptiveExecutorSimulation { @Test public void test() { - DefaultLimiter limiter = DefaultLimiter.newBuilder().limit(AIMDLimit.newBuilder().initialLimit(10).build()).build(new SimpleStrategy<>()); + Limiter limiter = SimpleLimiter.newBuilder().limit(AIMDLimit.newBuilder().initialLimit(10).build()).build(); Executor executor = new BlockingAdaptiveExecutor(limiter); run(10000, 20, executor, randomLatency(50, 150)); @@ -31,22 +31,22 @@ public void test() { @Test public void testVegas() { - DefaultLimiter limiter = DefaultLimiter.newBuilder() + Limiter limiter = SimpleLimiter.newBuilder() .limit(TracingLimitDecorator.wrap(VegasLimit.newBuilder() .initialLimit(100) .build())) - .build(new SimpleStrategy<>()); + .build(); Executor executor = new BlockingAdaptiveExecutor(limiter); run(10000, 50, executor, randomLatency(50, 150)); } @Test public void testGradient() { - DefaultLimiter limiter = DefaultLimiter.newBuilder() + Limiter limiter = SimpleLimiter.newBuilder() .limit(TracingLimitDecorator.wrap(GradientLimit.newBuilder() .initialLimit(100) .build())) - .build(new SimpleStrategy<>()); + .build(); Executor executor = new BlockingAdaptiveExecutor(limiter); run(100000, 50, executor, randomLatency(50, 150)); } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/AIMDLimitTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/AIMDLimitTest.java index 6b5e7e9f..4e08e39a 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/AIMDLimitTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/AIMDLimitTest.java @@ -1,12 +1,9 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.TimeUnit; - +import junit.framework.Assert; import org.junit.Test; -import com.netflix.concurrency.limits.limiter.ImmutableSampleWindow; - -import junit.framework.Assert; +import java.util.concurrent.TimeUnit; public class AIMDLimitTest { @Test @@ -18,14 +15,14 @@ public void testDefault() { @Test public void increaseOnSuccess() { AIMDLimit limiter = AIMDLimit.newBuilder().initialLimit(10).build(); - limiter.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(1), 10)); + limiter.onSample(0, TimeUnit.MILLISECONDS.toNanos(1), 10, false); Assert.assertEquals(11, limiter.getLimit()); } @Test public void decreaseOnDrops() { AIMDLimit limiter = AIMDLimit.newBuilder().initialLimit(10).build(); - limiter.update(new ImmutableSampleWindow().addDroppedSample(1)); + limiter.onSample(0, 0, 0, true); Assert.assertEquals(9, limiter.getLimit()); } } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java new file mode 100644 index 00000000..3be11102 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java @@ -0,0 +1,17 @@ +package com.netflix.concurrency.limits.limit; + +import com.netflix.concurrency.limits.limit.measurement.ExpAvgMeasurement; +import org.junit.Test; + +public class ExpAvgMeasurementTest { + @Test + public void test() { + ExpAvgMeasurement avg = new ExpAvgMeasurement(10); + + for (int i = 0; i < 10; i++) { + avg.add(1); + } + + avg.add(10); + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java index dc59813b..3edb62a0 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java @@ -1,12 +1,9 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.TimeUnit; - +import junit.framework.Assert; import org.junit.Test; -import com.netflix.concurrency.limits.limiter.ImmutableSampleWindow; - -import junit.framework.Assert; +import java.util.concurrent.TimeUnit; public class VegasLimitTest { public static VegasLimit create() { @@ -28,27 +25,27 @@ public void initialLimit() { @Test public void increaseLimit() { VegasLimit limit = create(); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 10, false); Assert.assertEquals(10, limit.getLimit()); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 11)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 11, false); Assert.assertEquals(16, limit.getLimit()); } @Test public void decreaseLimit() { VegasLimit limit = create(); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 10, false); Assert.assertEquals(10, limit.getLimit()); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(50), 11)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(50), 11, false); Assert.assertEquals(9, limit.getLimit()); } @Test public void noChangeIfWithinThresholds() { VegasLimit limit = create(); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 10)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 10, false); Assert.assertEquals(10, limit.getLimit()); - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(14), 14)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(14), 14, false); Assert.assertEquals(10, limit.getLimit()); } @@ -62,15 +59,15 @@ public void decreaseSmoothing() { .build(); // Pick up first min-rtt - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false); Assert.assertEquals(100, limit.getLimit()); // First decrease - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(20), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(75, limit.getLimit()); // Second decrease - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(20), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(56, limit.getLimit()); } @@ -83,15 +80,15 @@ public void decreaseWithoutSmoothing() { .build(); // Pick up first min-rtt - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(10), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false); Assert.assertEquals(100, limit.getLimit()); // First decrease - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(20), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(50, limit.getLimit()); // Second decrease - limit.update(new ImmutableSampleWindow().addSample(TimeUnit.MILLISECONDS.toNanos(20), 100)); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(25, limit.getLimit()); } } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java new file mode 100644 index 00000000..cf81b191 --- /dev/null +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/AbstractPartitionedLimiterTest.java @@ -0,0 +1,159 @@ +package com.netflix.concurrency.limits.limiter; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limit.SettableLimit; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; +import java.util.function.Function; + +public class AbstractPartitionedLimiterTest { + public static class TestPartitionedLimiter extends AbstractPartitionedLimiter { + public static class Builder extends AbstractPartitionedLimiter.Builder { + @Override + protected Builder self() { + return this; + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + public TestPartitionedLimiter(Builder builder) { + super(builder); + } + } + + @Test + public void limitAllocatedToBins() { + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(FixedLimit.of(10)) + .build(); + + Assert.assertEquals(3, limiter.getPartition("batch").getLimit()); + Assert.assertEquals(7, limiter.getPartition("live").getLimit()); + } + + @Test + public void useExcessCapacityUntilTotalLimit() { + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(FixedLimit.of(10)) + .build(); + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("batch").getInflight()); + } + + Assert.assertFalse(limiter.acquire("batch").isPresent()); + } + + @Test + public void exceedTotalLimitForUnusedBin() { + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(FixedLimit.of(10)) + .build(); + + for (int i = 0; i < 10; i++) { + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("batch").getInflight()); + } + + Assert.assertFalse(limiter.acquire("batch").isPresent()); + + for (int i = 0; i < 7; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("live").getInflight()); + } + + Assert.assertFalse(limiter.acquire("live").isPresent()); + } + + @Test + public void rejectOnceAllLimitsReached() { + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(FixedLimit.of(10)) + .build(); + + for (int i = 0; i < 3; i++) { + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("batch").getInflight()); + Assert.assertEquals(i+1, limiter.getInflight()); + } + + for (int i = 0; i < 7; i++) { + Assert.assertTrue(limiter.acquire("live").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("live").getInflight()); + Assert.assertEquals(i+4, limiter.getInflight()); + } + + Assert.assertFalse(limiter.acquire("batch").isPresent()); + Assert.assertFalse(limiter.acquire("live").isPresent()); + } + + @Test + public void releaseLimit() { + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(FixedLimit.of(10)) + .build(); + + Optional completion = limiter.acquire("batch"); + for (int i = 1; i < 10; i++) { + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(i+1, limiter.getPartition("batch").getInflight()); + } + + Assert.assertEquals(10, limiter.getInflight()); + + Assert.assertFalse(limiter.acquire("batch").isPresent()); + + completion.get().onSuccess(); + Assert.assertEquals(9, limiter.getPartition("batch").getInflight()); + Assert.assertEquals(9, limiter.getInflight()); + + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(10, limiter.getPartition("batch").getInflight()); + Assert.assertEquals(10, limiter.getInflight()); + } + + @Test + public void setLimitReservesBusy() { + SettableLimit limit = SettableLimit.startingAt(10); + + AbstractPartitionedLimiter limiter = (AbstractPartitionedLimiter) TestPartitionedLimiter.newBuilder() + .partitionResolver(Function.identity()) + .partition("batch", 0.3) + .partition("live", 0.7) + .limit(limit) + .build(); + + limit.setLimit(10); + Assert.assertEquals(3, limiter.getPartition("batch").getLimit()); + Assert.assertTrue(limiter.acquire("batch").isPresent()); + Assert.assertEquals(1, limiter.getPartition("batch").getInflight()); + Assert.assertEquals(1, limiter.getInflight()); + + limit.setLimit(20); + Assert.assertEquals(6, limiter.getPartition("batch").getLimit()); + Assert.assertEquals(1, limiter.getPartition("batch").getInflight()); + Assert.assertEquals(1, limiter.getInflight()); + } +} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java index bcd4c6b8..3abc0c65 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java @@ -2,7 +2,6 @@ import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.limit.SettableLimit; -import com.netflix.concurrency.limits.strategy.SimpleStrategy; import org.junit.Test; import java.util.LinkedList; @@ -19,7 +18,7 @@ public class BlockingLimiterTest { @Test public void test() { SettableLimit limit = SettableLimit.startingAt(10); - BlockingLimiter limiter = BlockingLimiter.wrap(DefaultLimiter.newBuilder().limit(limit).build(new SimpleStrategy<>())); + BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build()); LinkedList listeners = new LinkedList<>(); for (int i = 0; i < 10; i++) { @@ -39,7 +38,7 @@ public void test() { public void testMultipleBlockedThreads() throws InterruptedException, ExecutionException, TimeoutException { int numThreads = 8; SettableLimit limit = SettableLimit.startingAt(1); - BlockingLimiter limiter = BlockingLimiter.wrap(DefaultLimiter.newBuilder().limit(limit).build(new SimpleStrategy<>())); + BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build()); ExecutorService executorService = Executors.newFixedThreadPool(numThreads); try { for (Future future : IntStream.range(0, numThreads) diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/PercentageStrategyTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/PercentageStrategyTest.java deleted file mode 100644 index 33ed619c..00000000 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/PercentageStrategyTest.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.netflix.concurrency.limits.strategy; - -import org.junit.Assert; -import org.junit.Test; - -import com.netflix.concurrency.limits.Strategy.Token; - -public class PercentageStrategyTest { - @Test - public void limitAllocatedToBins() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - strategy.setLimit(10); - - Assert.assertEquals(3, strategy.getBinLimit(0)); - Assert.assertEquals(7, strategy.getBinLimit(1)); - } - - @Test - public void useExcessCapacityUntilTotalLimit() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - strategy.setLimit(10); - for (int i = 0; i < 10; i++) { - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(0)); - } - - Assert.assertFalse(strategy.tryAcquire("batch").isAcquired()); - } - - @Test - public void exceedTotalLimitForUnusedBin() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - - strategy.setLimit(10); - for (int i = 0; i < 10; i++) { - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(0)); - } - - Assert.assertFalse(strategy.tryAcquire("batch").isAcquired()); - - for (int i = 0; i < 7; i++) { - Assert.assertTrue(strategy.tryAcquire("live").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(1)); - } - - Assert.assertFalse(strategy.tryAcquire("live").isAcquired()); - } - - @Test - public void rejectOnceAllLimitsReached() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - - strategy.setLimit(10); - for (int i = 0; i < 3; i++) { - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(0)); - Assert.assertEquals(i+1, strategy.getBusyCount()); - } - - for (int i = 0; i < 7; i++) { - Assert.assertTrue(strategy.tryAcquire("live").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(1)); - Assert.assertEquals(i+4, strategy.getBusyCount()); - } - - Assert.assertFalse(strategy.tryAcquire("batch").isAcquired()); - Assert.assertFalse(strategy.tryAcquire("live").isAcquired()); - } - - @Test - public void releaseLimit() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - - strategy.setLimit(10); - Token completion = strategy.tryAcquire("batch"); - for (int i = 1; i < 10; i++) { - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(i+1, strategy.getBinBusyCount(0)); - } - - Assert.assertEquals(10, strategy.getBusyCount()); - - Assert.assertFalse(strategy.tryAcquire("batch").isAcquired()); - - completion.release(); - Assert.assertEquals(9, strategy.getBinBusyCount(0)); - Assert.assertEquals(9, strategy.getBusyCount()); - - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(10, strategy.getBinBusyCount(0)); - Assert.assertEquals(10, strategy.getBusyCount()); - } - - @Test - public void setLimitReservesBusy() { - PredicatePartitionStrategy strategy = PredicatePartitionStrategy.newBuilder() - .add("batch", 0.3, str -> str.equals("batch")) - .add("live", 0.7, str -> str.equals("live")) - .build(); - - strategy.setLimit(10); - Assert.assertEquals(3, strategy.getBinLimit(0)); - Assert.assertTrue(strategy.tryAcquire("batch").isAcquired()); - Assert.assertEquals(1, strategy.getBinBusyCount(0)); - Assert.assertEquals(1, strategy.getBusyCount()); - - strategy.setLimit(20); - Assert.assertEquals(6, strategy.getBinLimit(0)); - Assert.assertEquals(1, strategy.getBinBusyCount(0)); - Assert.assertEquals(1, strategy.getBusyCount()); - } -} diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/SimpleStrategyTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/SimpleStrategyTest.java deleted file mode 100644 index 2bcaecf0..00000000 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/strategy/SimpleStrategyTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.netflix.concurrency.limits.strategy; - -import org.junit.Test; - -import com.netflix.concurrency.limits.Strategy.Token; - -import junit.framework.Assert; - -public class SimpleStrategyTest { - @Test - public void limitLessThanZeroSetAs1() { - SimpleStrategy strategy = new SimpleStrategy(); - - strategy.setLimit(-10); - Assert.assertEquals(1, strategy.getLimit()); - } - - @Test - public void initialState() { - SimpleStrategy strategy = new SimpleStrategy(); - Assert.assertEquals(1, strategy.getLimit()); - Assert.assertEquals(0, strategy.getBusyCount()); - } - - @Test - public void acquireIncrementsBusy() { - SimpleStrategy strategy = new SimpleStrategy(); - Assert.assertEquals(0, strategy.getBusyCount()); - Assert.assertTrue(strategy.tryAcquire(null).isAcquired()); - Assert.assertEquals(1, strategy.getBusyCount()); - } - - @Test - public void exceedingLimitReturnsFalse() { - SimpleStrategy strategy = new SimpleStrategy(); - Assert.assertTrue(strategy.tryAcquire(null).isAcquired()); - Assert.assertFalse(strategy.tryAcquire(null).isAcquired()); - Assert.assertEquals(1, strategy.getBusyCount()); - } - - @Test - public void acquireAndRelease() { - SimpleStrategy strategy = new SimpleStrategy(); - Token completion = strategy.tryAcquire(null); - Assert.assertTrue(completion.isAcquired()); - Assert.assertEquals(1, strategy.getBusyCount()); - - completion.release(); - - Assert.assertEquals(0, strategy.getBusyCount()); - - Assert.assertTrue(strategy.tryAcquire(null).isAcquired()); - Assert.assertEquals(1, strategy.getBusyCount()); - } -} diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java index c07b6203..11dc62a7 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/client/GrpcClientLimiterBuilder.java @@ -1,26 +1,23 @@ package com.netflix.concurrency.limits.grpc.client; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.limiter.AbstractLimiterBuilder; +import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; import com.netflix.concurrency.limits.limiter.BlockingLimiter; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy; - +import com.netflix.concurrency.limits.limiter.SimpleLimiter; import io.grpc.CallOptions; -import java.util.function.Consumer; - /** * Builder to simplify creating a {@link Limiter} specific to GRPC clients. */ -public final class GrpcClientLimiterBuilder extends AbstractLimiterBuilder { +public final class GrpcClientLimiterBuilder extends AbstractPartitionedLimiter.Builder { private boolean blockOnLimit = false; - - public GrpcClientLimiterBuilder partitionByMethod(Consumer> configurer) { - return partitionByLookup(context -> context.getMethod().getFullMethodName(), configurer); + + public GrpcClientLimiterBuilder partitionByMethod() { + return partitionResolver(context -> context.getMethod().getFullMethodName()); } - public GrpcClientLimiterBuilder partitionByCallOption(CallOptions.Key option, Consumer> configurer) { - return partitionByLookup(context -> context.getCallOptions().getOption(option), configurer); + public GrpcClientLimiterBuilder partitionByCallOption(CallOptions.Key option) { + return partitionResolver(context -> context.getCallOptions().getOption(option)); } /** @@ -40,7 +37,8 @@ protected GrpcClientLimiterBuilder self() { } public Limiter build() { - Limiter limiter = buildLimiter(); + Limiter limiter = super.build(); + if (blockOnLimit) { limiter = BlockingLimiter.wrap(limiter); } diff --git a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java index ecebcc9e..95d44924 100644 --- a/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java +++ b/concurrency-limits-grpc/src/main/java/com/netflix/concurrency/limits/grpc/server/GrpcServerLimiterBuilder.java @@ -1,56 +1,38 @@ package com.netflix.concurrency.limits.grpc.server; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.limiter.AbstractLimiterBuilder; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy; - +import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; import io.grpc.Attributes; import io.grpc.Metadata; -import java.util.function.Consumer; - -/** - * Builder to simplify creating a {@link Limiter} specific to GRPC server. By default, - * the same concurrency limit is shared by all requests. The limiter can be partitioned - * based on one of many request attributes. Only one type of partition may be specified. - */ -public final class GrpcServerLimiterBuilder extends AbstractLimiterBuilder { +public class GrpcServerLimiterBuilder extends AbstractPartitionedLimiter.Builder { /** * Partition the limit by method - * @param configurer Configuration function though which method percentages may be specified - * Unspecified methods may only use excess capacity. * @return Chainable builder */ - public GrpcServerLimiterBuilder partitionByMethod(Consumer> configurer) { - return partitionByLookup(context -> context.getCall().getMethodDescriptor().getFullMethodName(), configurer); + public GrpcServerLimiterBuilder partitionByMethod() { + return partitionResolver((GrpcServerRequestContext context) -> context.getCall().getMethodDescriptor().getFullMethodName()); } - + /** - * Partition the limit by a request header. - * @param configurer Configuration function though which header value percentages may be specified. - * Unspecified header values may only use excess capacity. + * Partition the limit by a request header. * @return Chainable builder */ - public GrpcServerLimiterBuilder partitionByHeader(Metadata.Key header, Consumer> configurer) { - return partitionByLookup(context -> context.getHeaders().get(header), configurer); + public GrpcServerLimiterBuilder partitionByHeader(Metadata.Key header) { + return partitionResolver(context -> context.getHeaders().get(header)); } - + /** - * Partition the limit by a request attribute. - * @param configurer Configuration function though which attribute value percentages may be specified. - * Unspecified attribute values may only use excess capacity. + * Partition the limit by a request attribute. * @return Chainable builder */ - public GrpcServerLimiterBuilder partitionByAttribute(Attributes.Key attribute, Consumer> configurer) { - return partitionByLookup(context -> context.getCall().getAttributes().get(attribute), configurer); + public GrpcServerLimiterBuilder partitionByAttribute(Attributes.Key attribute) { + return partitionResolver(context -> context.getCall().getAttributes().get(attribute)); } - + @Override protected GrpcServerLimiterBuilder self() { return this; } - - public Limiter build() { - return buildLimiter(); - } } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java index c45daf65..2adf79c7 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java @@ -7,10 +7,8 @@ import com.netflix.concurrency.limits.limit.FixedLimit; import com.netflix.concurrency.limits.limit.VegasLimit; import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; -import com.netflix.concurrency.limits.strategy.SimpleStrategy; import com.netflix.spectator.api.DefaultRegistry; import com.netflix.spectator.api.Registry; - import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.Metadata; @@ -26,6 +24,8 @@ import io.grpc.stub.MetadataUtils; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; +import org.junit.Ignore; +import org.junit.Test; import java.io.IOException; import java.util.concurrent.Executor; @@ -34,9 +34,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import org.junit.Ignore; -import org.junit.Test; - public class ConcurrencyLimitServerInterceptorTest { private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() .setType(MethodType.UNARY) @@ -66,14 +63,15 @@ public void simulation() throws IOException, InterruptedException { observer.onNext("response"); observer.onCompleted(); })) - .build(), new ConcurrencyLimitServerInterceptor(new GrpcServerLimiterBuilder() - .limit(FixedLimit.of(50)) - .partitionByHeader(ID_HEADER, builder -> builder - .metricRegistry(serverRegistry) - .assign("0", 0.1) - .assign("1", 0.2) - .assign("2", 0.7)) - .build()) + .build(), ConcurrencyLimitServerInterceptor.newBuilder( + new GrpcServerLimiterBuilder() + .limit(FixedLimit.of(50)) + .partitionByHeader(ID_HEADER) + .partition("0", 0.1) + .partition("1", 0.2) + .partition("2", 0.7) + .build()) + .build() )) .build() .start(); @@ -114,7 +112,7 @@ private void simulateClient(int id, AtomicLongArray counters, AtomicLong drops, .usePlaintext(true) .intercept(MetadataUtils.newAttachHeadersInterceptor(headers)) .intercept(new ConcurrencyLimitClientInterceptor(new GrpcClientLimiterBuilder() - .strategy(new SimpleStrategy<>(clientRegistry)) + .metricRegistry(clientRegistry) .limit(limit) .blockOnLimit(true) .build())) diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java index 587539e6..18e79390 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java @@ -1,5 +1,10 @@ package com.netflix.concurrency.limits.grpc.server.example; +import com.netflix.concurrency.limits.grpc.server.GrpcServerLimiterBuilder; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limit.GradientLimit; +import com.netflix.concurrency.limits.limit.WindowedLimit; + import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.Executors; @@ -8,13 +13,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import com.netflix.concurrency.limits.grpc.server.GrpcServerLimiterBuilder; -import com.netflix.concurrency.limits.limit.GradientLimit; -import com.netflix.concurrency.limits.limiter.LifoBlockingLimiter; - public class Example { public static void main(String[] args) throws IOException { - final GradientLimit limit = GradientLimit.newBuilder() + final Gradient2Limit limit = Gradient2Limit.newBuilder() + .shortWindow(10) + .longWindow(100) .build(); // Create a server @@ -22,15 +25,11 @@ public static void main(String[] args) throws IOException { .concurrency(2) .lognormal(20, 1, TimeUnit.MINUTES) .limiter( - LifoBlockingLimiter.newBuilder( - new GrpcServerLimiterBuilder() - .limiter(builder -> builder - .limit(limit) - .minWindowTime(1, TimeUnit.SECONDS) - ) - .build() - ) - .maxBacklogSize(200) + new GrpcServerLimiterBuilder() + .limit(WindowedLimit.newBuilder() + .minWindowTime(1, TimeUnit.SECONDS) + .windowSize(10) + .build(limit)) .build() ) .build(); @@ -38,10 +37,12 @@ public static void main(String[] args) throws IOException { final AtomicInteger successCounter = new AtomicInteger(0); final AtomicInteger dropCounter = new AtomicInteger(0); final LatencyCollector latency = new LatencyCollector(); - + final Driver driver = Driver.newBuilder() - .exponentialRps(100, 90, TimeUnit.SECONDS) - .exponentialRps(200, 3, TimeUnit.SECONDS) + .exponentialRps(100, 60, TimeUnit.SECONDS) + .exponentialRps(200, 500, TimeUnit.SECONDS) + .exponentialRps(100, 500, TimeUnit.SECONDS) + .exponentialRps(75, 500, TimeUnit.SECONDS) .successAction(successCounter::incrementAndGet) .dropAction(dropCounter::incrementAndGet) .latencyAccumulator(latency) @@ -51,15 +52,17 @@ public static void main(String[] args) throws IOException { // Report progress final AtomicInteger counter = new AtomicInteger(0); + System.out.println("iteration, limit, success, drop, latency, shortRtt, longRtt"); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { - System.out.println(MessageFormat.format("{0,number,#}, {1,number,#}, {2,number,#}, {3,number,#}, {4,number,#}, {5,number,#}", + System.out.println(MessageFormat.format("{0,number,#}, {1,number,#}, {2,number,#}, {3,number,#}, {4,number,#}, {5,number,#}, {6,number,#}", counter.incrementAndGet(), limit.getLimit(), successCounter.getAndSet(0), dropCounter.getAndSet(0), TimeUnit.NANOSECONDS.toMillis(latency.getAndReset()), - TimeUnit.NANOSECONDS.toMillis(limit.getRttNoLoad()) - )); + limit.getShortRtt(TimeUnit.MILLISECONDS), + limit.getLongRtt(TimeUnit.MILLISECONDS) + )) ; }, 1, 1, TimeUnit.SECONDS); // Create a client diff --git a/concurrency-limits-grpc/src/test/resources/log4j.properties b/concurrency-limits-grpc/src/test/resources/log4j.properties new file mode 100644 index 00000000..ed76b055 --- /dev/null +++ b/concurrency-limits-grpc/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +# +# Copyright 2012 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=ERROR, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d %-5p %c{1}:%L %x %m [%t]%n + +log4j.additivity.com.netflix.concurrency.limits=false +#log4j.logger.com.netflix.concurrency.limits=DEBUG, console diff --git a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java index a954b256..1903031c 100644 --- a/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java +++ b/concurrency-limits-servlet/src/main/java/com/netflix/concurrency/limits/servlet/ServletLimiterBuilder.java @@ -1,92 +1,65 @@ package com.netflix.concurrency.limits.servlet; import com.netflix.concurrency.limits.Limiter; -import com.netflix.concurrency.limits.limiter.AbstractLimiterBuilder; -import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy; +import com.netflix.concurrency.limits.limiter.AbstractPartitionedLimiter; +import javax.servlet.http.HttpServletRequest; import java.security.Principal; import java.util.Optional; -import java.util.function.Consumer; import java.util.function.Function; -import javax.servlet.http.HttpServletRequest; - /** * Builder to simplify creating a {@link Limiter} specific to a Servlet filter. By default, * the same concurrency limit is shared by all requests. The limiter can be partitioned * based on one of many request attributes. Only one type of partition may be specified. */ -public final class ServletLimiterBuilder extends AbstractLimiterBuilder { +public final class ServletLimiterBuilder extends AbstractPartitionedLimiter.Builder { /** * Partition the limit by header - * @param configurer Configuration function though which header percentages may be specified - * Unspecified header values may only use excess capacity. * @return Chainable builder */ - public ServletLimiterBuilder partitionByHeader(String name, Consumer> configurer) { - return partitionByLookup( - request -> Optional.ofNullable(request.getHeader(name)).orElse(null), - configurer); + public ServletLimiterBuilder partitionByHeader(String name) { + return partitionResolver(request -> Optional.ofNullable(request.getHeader(name)).orElse(null)); } /** * Partition the limit by {@link Principal}. Percentages of the limit are partitioned to named * groups. Group membership is derived from the provided mapping function. * @param principalToGroup Mapping function from {@link Principal} to a named group. - * @param configurer Configuration function though which group percentages may be specified - * Unspecified group values may only use excess capacity. * @return Chainable builder */ - public ServletLimiterBuilder partitionByUserPrincipal(Function principalToGroup, Consumer> configurer) { - return partitionByLookup( - request -> Optional.ofNullable(request.getUserPrincipal()).map(principalToGroup).orElse(null), - configurer); + public ServletLimiterBuilder partitionByUserPrincipal(Function principalToGroup) { + return partitionResolver(request -> Optional.ofNullable(request.getUserPrincipal()).map(principalToGroup).orElse(null)); } /** * Partition the limit by request attribute - * @param configurer Configuration function though which attribute percentages may be specified - * Unspecified attribute values may only use excess capacity. * @return Chainable builder */ - public ServletLimiterBuilder partitionByAttribute(String name, Consumer> configurer) { - return partitionByLookup( - request -> Optional.ofNullable(request.getAttribute(name)).map(Object::toString).orElse(null), - configurer); + public ServletLimiterBuilder partitionByAttribute(String name) { + return partitionResolver(request -> Optional.ofNullable(request.getAttribute(name)).map(Object::toString).orElse(null)); } /** * Partition the limit by request parameter - * @param configurer Configuration function though which parameter value percentages may be specified - * Unspecified parameter values may only use excess capacity. * @return Chainable builder */ - public ServletLimiterBuilder partitionByParameter(String name, Consumer> configurer) { - return partitionByLookup( - request -> Optional.ofNullable(request.getParameter(name)).orElse(null), - configurer); + public ServletLimiterBuilder partitionByParameter(String name) { + return partitionResolver(request -> Optional.ofNullable(request.getParameter(name)).orElse(null)); } /** * Partition the limit by the full path. Percentages of the limit are partitioned to named * groups. Group membership is derived from the provided mapping function. * @param pathToGroup Mapping function from full path to a named group. - * @param configurer Configuration function though which group percentages may be specified - * Unspecified group values may only use excess capacity. * @return Chainable builder */ - public ServletLimiterBuilder partitionByPathInfo(Function pathToGroup, Consumer> configurer) { - return partitionByLookup( - request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null), - configurer); + public ServletLimiterBuilder partitionByPathInfo(Function pathToGroup) { + return partitionResolver(request -> Optional.ofNullable(request.getPathInfo()).map(pathToGroup).orElse(null)); } @Override protected ServletLimiterBuilder self() { return this; } - - public Limiter build() { - return buildLimiter(); - } } diff --git a/concurrency-limits-servlet/src/test/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java b/concurrency-limits-servlet/src/test/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java index 811e51ce..1f2a3b1b 100644 --- a/concurrency-limits-servlet/src/test/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java +++ b/concurrency-limits-servlet/src/test/com/netflix/concurrency/limits/ConcurrencyLimitServletFilterTest.java @@ -3,10 +3,8 @@ import com.netflix.concurrency.limits.executors.BlockingAdaptiveExecutor; import com.netflix.concurrency.limits.limit.FixedLimit; import com.netflix.concurrency.limits.limit.VegasLimit; -import com.netflix.concurrency.limits.limiter.DefaultLimiter; import com.netflix.concurrency.limits.servlet.ConcurrencyLimitServletFilter; import com.netflix.concurrency.limits.servlet.ServletLimiterBuilder; -import com.netflix.concurrency.limits.strategy.SimpleStrategy; import java.io.IOException; import java.security.Principal;