diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java index 275cad15..fb846ee8 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Gradient2Limit.java @@ -205,8 +205,8 @@ private Gradient2Limit(Builder builder) { this.minLimit = builder.minLimit; this.queueSize = builder.queueSize; this.smoothing = builder.smoothing; - this.shortRtt = new ExpAvgMeasurement(builder.shortWindow,10, Math::min); - this.longRtt = new ExpAvgMeasurement(builder.longWindow,10, Math::min); + this.shortRtt = new ExpAvgMeasurement(builder.shortWindow,10); + this.longRtt = new ExpAvgMeasurement(builder.longWindow,10); this.maxDriftIntervals = builder.shortWindow * builder.driftMultiplier; this.longRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME); @@ -226,24 +226,17 @@ public int _update(final long startTime, final long rtt, final int inflight, fin // the short term RTT window. Since both short and long term RTT trend higher this state results in the limit // slowly trending upwards, increasing the queue and latency. To mitigate this we drop both the limit and // long term latency value to effectively probe for less queueing and better latency. - if (shortRtt < longRtt) { - intervalsAbove = 0; - } else if (shortRtt > longRtt){ + if (shortRtt > longRtt) { intervalsAbove++; - } - - if (intervalsAbove > maxDriftIntervals) { + if (intervalsAbove > maxDriftIntervals) { + intervalsAbove = 0; + int newLimit = (int) Math.max(minLimit, queueSize); + this.longRtt.reset(); + estimatedLimit = newLimit; + return (int) estimatedLimit; + } + } else { intervalsAbove = 0; - int newLimit = (int)Math.max(minLimit, queueSize); - this.longRtt.reset(); - estimatedLimit = newLimit; - return (int)estimatedLimit; - } - // Because we're using lots of averages it's possible for the short term RTT to be substantially lower than - // the longer term Rtt. When that happens we need to 'reset' the long term RTT. - if (shortRtt < longRtt) { - this.longRtt.reset(); - return (int)estimatedLimit; } shortRttSampleListener.addSample(shortRtt); diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java index cd382e92..6d935a14 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/measurement/ExpAvgMeasurement.java @@ -15,30 +15,29 @@ */ package com.netflix.concurrency.limits.limit.measurement; -import java.util.function.BiFunction; import java.util.function.Function; public class ExpAvgMeasurement implements Measurement { - private final BiFunction accumulator; private Double value = 0.0; + private Double sum = 0.0; private final int window; private final int warmupWindow; private int count = 0; - public ExpAvgMeasurement(int window, int warmupWindow, BiFunction accumulator) { + public ExpAvgMeasurement(int window, int warmupWindow) { this.window = window; this.warmupWindow = warmupWindow; - this.accumulator = accumulator; } @Override public Number add(Number sample) { if (count == 0) { count++; - value = sample.doubleValue(); + sum = value = sample.doubleValue(); } else if (count < warmupWindow) { count++; - value = accumulator.apply(value.doubleValue(), sample.doubleValue()); + sum += sample.doubleValue(); + value = sum.doubleValue() / count; } else { double factor = factor(window); value = value * (1-factor) + sample.doubleValue() * factor; @@ -59,6 +58,7 @@ public Number get() { public void reset() { value = 0.0; count = 0; + sum = 0.0; } @Override diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java index cff93ce6..b4e080ca 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/ExpAvgMeasurementTest.java @@ -4,17 +4,21 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + public class ExpAvgMeasurementTest { @Test public void testWarmup() { - ExpAvgMeasurement avg = new ExpAvgMeasurement(10, 10, Math::min); + ExpAvgMeasurement avg = new ExpAvgMeasurement(100, 10); + double expected[] = new double[]{10.0, 10.5, 11, 11.5, 12, 12.5, 13, 13.5, 14, 14.5}; for (int i = 0; i < 10; i++) { - avg.add(i + 10); - Assert.assertEquals(10.0, avg.get().doubleValue(), 0.01); + double value = avg.add(i + 10).doubleValue(); + Assert.assertEquals(expected[i], avg.get().doubleValue(), 0.01); } avg.add(100); - Assert.assertEquals(26.36, avg.get().doubleValue(), 0.01); + Assert.assertEquals(16.19, avg.get().doubleValue(), 0.01); } } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java index 4fcf28d9..77fe6a86 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java @@ -34,10 +34,10 @@ public static void main(String[] args) throws IOException { final LatencyCollector latency = new LatencyCollector(); final Driver driver = Driver.newBuilder() - .exponentialRps(100, 60, TimeUnit.SECONDS) - .exponentialRps(200, 500, TimeUnit.SECONDS) - .exponentialRps(100, 500, TimeUnit.SECONDS) - .exponentialRps(75, 500, TimeUnit.SECONDS) + .exponentialRps(75, 200, TimeUnit.SECONDS) + .exponentialRps(100, 200, TimeUnit.SECONDS) + .exponentialRps(200, 200, TimeUnit.SECONDS) + .exponentialRps(100, 200, TimeUnit.SECONDS) .latencyAccumulator(latency) .runtime(1, TimeUnit.HOURS) .port(server.getPort()) diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/PartitionedExample.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/PartitionedExample.java index 270bb15f..61043355 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/PartitionedExample.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/PartitionedExample.java @@ -28,7 +28,7 @@ public static void main(String[] args) throws IOException, ExecutionException, I .partition("1", 1.0) .partition("2", 0.0) // .partition("3", 0.0) - .partitionRejectDelay("2", 1000, TimeUnit.MILLISECONDS) +// .partitionRejectDelay("2", 1000, TimeUnit.MILLISECONDS) // .partitionRejectDelay("3", 1000, TimeUnit.MILLISECONDS) .limit(WindowedLimit.newBuilder() .minWindowTime(1, TimeUnit.SECONDS)