diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java index e1de0b4e..7eb5d5bd 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limit.java @@ -14,6 +14,11 @@ interface SampleWindow { */ long getCandidateRttNanos(); + /** + * @return Sum of all RTT samples in this window. + */ + long getRttSumNanos(); + /** * @return Maximum number of inflight observed during the sample window */ diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurement.java index bd0e45ae..78779033 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurement.java @@ -14,26 +14,13 @@ public ExpAvgMeasurement(int window, double filter) { @Override public Number add(Number sample) { - if (value == 0) { + if (value == 0.0) { value = sample.doubleValue(); } else if (sample.doubleValue() < value) { value = sample.doubleValue(); } else { value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue()); } -// // First sample seen -// if (count == 0) { -// value = sample.doubleValue(); -// count = 1; -// // Adaptive average for the first samples -// } else if (count < window) { -// count++; -// double tempRatio = 1.0 / count; -// value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue()); -// // Steady state -// } else { -// value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue()); -// } return value; } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java index 7b8d027b..cc026a14 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java @@ -27,13 +27,14 @@ public final class GradientLimit implements Limit { public static class Builder { private int initialLimit = 50; private int minLimit = 1; - private int maxLimit = 1000; + private int maxLimit = 200; private double smoothing = 0.2; private Function queueSize = SquareRootFunction.create(4); private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; private int noLoadRttWindow = 1000; private double noLoadRttFilter = 1.1; + private double rttTolerance = 2.0; /** * Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold @@ -76,9 +77,9 @@ public Builder minLimit(int minLimit) { * before reducing the limit. For example, a value of 2.0 means that a 2x increase in latency is acceptable. * @return Chainable builder */ - @Deprecated public Builder rttTolerance(double rttTolerance) { Preconditions.checkArgument(rttTolerance >= 1.0, "Tolerance must be >= 1.0"); + this.rttTolerance = rttTolerance; return this; } @@ -209,6 +210,8 @@ public static GradientLimit newDefault() { private final int minLimit; + private final double rttTolerance; + private final Function queueSize; private final SampleListener minRttSampleListener; @@ -223,6 +226,8 @@ private GradientLimit(Builder builder) { this.minLimit = builder.minLimit; this.queueSize = builder.queueSize; this.smoothing = builder.smoothing; + this.rttTolerance = builder.rttTolerance; + this.rttNoLoadAccumulator = new ExpAvgMeasurement(builder.noLoadRttWindow, builder.noLoadRttFilter); this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME); @@ -234,18 +239,18 @@ private GradientLimit(Builder builder) { public synchronized void update(SampleWindow sample) { Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos()); - final long rttSample = sample.getCandidateRttNanos(); + final long rttSample = sample.getRttSumNanos() / sample.getSampleCount(); minWindowRttSampleListener.addSample(rttSample); final double queueSize = this.queueSize.apply((int)this.estimatedLimit); queueSizeSampleListener.addSample(queueSize); final double rttNoLoad = rttNoLoadAccumulator.add(rttSample).doubleValue(); - final double rtt = (double)rttSample; - minRttSampleListener.addSample(rttNoLoad); + minRttSampleListener.addSample(rttSample); final double gradient; + final double rtt = (double)rttSample / rttTolerance; // rtt is lower than rtt_noload because of smoothing rtt noload updates // set to 1.0 to indicate no queueing if (rtt < rttNoLoad) { @@ -264,23 +269,24 @@ public synchronized void update(SampleWindow sample) { } // Apply a smoothing factor when reducing the limit only - newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit); + if (newLimit < estimatedLimit) { + newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit); + } + newLimit = Math.max(Math.max(minLimit, queueSize), Math.min(maxLimit, newLimit)); + + if (LOG.isDebugEnabled()) { + LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}", + (int)newLimit, + TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0, + TimeUnit.NANOSECONDS.toMicros((int)rttSample)/1000.0, + queueSize, + gradient); + } - if ((int)newLimit != (int)estimatedLimit) { - // Don't grow the limit if we are app limited - if (sample.getMaxInFlight() + queueSize < estimatedLimit) { - return; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}", - (int)newLimit, - TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0, - TimeUnit.NANOSECONDS.toMicros((int)rtt)/1000.0, - queueSize, - gradient); - } + // We are app limited, don't increase the limit to prevent upward drift + if (sample.getMaxInFlight() * 2 < estimatedLimit) { + return; } estimatedLimit = newLimit; diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java index e6dcba9d..b873d825 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java @@ -21,14 +21,14 @@ public final class DefaultLimiter implements Limiter { private final Supplier nanoClock = System::nanoTime; - private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); - private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); - private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(500); + private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3); + private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3); + private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100); /** * Minimum observed samples to filter out sample windows with not enough significant samples */ - private static final int DEFAULT_WINDOW_SIZE = 10; + private static final int DEFAULT_WINDOW_SIZE = 100; /** * End time for the sampling window at which point the limit should be updated @@ -181,7 +181,8 @@ public void onSuccess() { if (isWindowReady(current)) { sample.set(new ImmutableSampleWindow()); - nextUpdateTime = endTime + Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime); + long delta = Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime); + nextUpdateTime = endTime + delta; limit.update(current); strategy.setLimit(limit.getLimit()); } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java index 8ded87ef..84b8ea3b 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSampleWindow.java @@ -1,38 +1,41 @@ package com.netflix.concurrency.limits.limiter; -import com.netflix.concurrency.limits.Limit; - import java.util.concurrent.TimeUnit; +import com.netflix.concurrency.limits.Limit; + /** * Class used to track immutable samples in an AtomicReference */ public class ImmutableSampleWindow implements Limit.SampleWindow { final long minRtt; + final long sum; final int maxInFlight; final int sampleCount; final boolean didDrop; public ImmutableSampleWindow() { this.minRtt = Long.MAX_VALUE; + this.sum = 0; this.maxInFlight = 0; this.sampleCount = 0; this.didDrop = false; } - public ImmutableSampleWindow(long minRtt, int maxInFlight, int sampleCount, boolean didDrop) { + public ImmutableSampleWindow(long minRtt, long sum, int maxInFlight, int sampleCount, boolean didDrop) { this.minRtt = minRtt; + this.sum = sum; this.maxInFlight = maxInFlight; this.sampleCount = sampleCount; this.didDrop = didDrop; } public ImmutableSampleWindow addSample(long rtt, int maxInFlight) { - return new ImmutableSampleWindow(Math.min(rtt, minRtt), Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop); + return new ImmutableSampleWindow(Math.min(rtt, minRtt), sum + rtt, Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop); } public ImmutableSampleWindow addDroppedSample(int maxInFlight) { - return new ImmutableSampleWindow(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount, true); + return new ImmutableSampleWindow(minRtt, sum, Math.max(maxInFlight, this.maxInFlight), sampleCount, true); } @Override @@ -40,6 +43,11 @@ public long getCandidateRttNanos() { return minRtt; } + @Override + public long getRttSumNanos() { + return sum; + } + @Override public int getMaxInFlight() { return maxInFlight;