Skip to content

Commit

Permalink
Merge pull request #82 from elandau/feature/gradient2_rtt_reset
Browse files Browse the repository at this point in the history
Tweak rtt reset
  • Loading branch information
elandau authored Sep 19, 2018
2 parents 35f89f1 + b44965b commit 87565b7
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ private Gradient2Limit(Builder builder) {
this.minLimit = builder.minLimit;
this.queueSize = builder.queueSize;
this.smoothing = builder.smoothing;
this.shortRtt = new ExpAvgMeasurement(builder.shortWindow,10, Math::min);
this.longRtt = new ExpAvgMeasurement(builder.longWindow,10, Math::min);
this.shortRtt = new ExpAvgMeasurement(builder.shortWindow,10);
this.longRtt = new ExpAvgMeasurement(builder.longWindow,10);
this.maxDriftIntervals = builder.shortWindow * builder.driftMultiplier;

this.longRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
Expand All @@ -226,24 +226,17 @@ public int _update(final long startTime, final long rtt, final int inflight, fin
// the short term RTT window. Since both short and long term RTT trend higher this state results in the limit
// slowly trending upwards, increasing the queue and latency. To mitigate this we drop both the limit and
// long term latency value to effectively probe for less queueing and better latency.
if (shortRtt < longRtt) {
intervalsAbove = 0;
} else if (shortRtt > longRtt){
if (shortRtt > longRtt) {
intervalsAbove++;
}

if (intervalsAbove > maxDriftIntervals) {
if (intervalsAbove > maxDriftIntervals) {
intervalsAbove = 0;
int newLimit = (int) Math.max(minLimit, queueSize);
this.longRtt.reset();
estimatedLimit = newLimit;
return (int) estimatedLimit;
}
} else {
intervalsAbove = 0;
int newLimit = (int)Math.max(minLimit, queueSize);
this.longRtt.reset();
estimatedLimit = newLimit;
return (int)estimatedLimit;
}
// Because we're using lots of averages it's possible for the short term RTT to be substantially lower than
// the longer term Rtt. When that happens we need to 'reset' the long term RTT.
if (shortRtt < longRtt) {
this.longRtt.reset();
return (int)estimatedLimit;
}

shortRttSampleListener.addSample(shortRtt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,29 @@
*/
package com.netflix.concurrency.limits.limit.measurement;

import java.util.function.BiFunction;
import java.util.function.Function;

public class ExpAvgMeasurement implements Measurement {
private final BiFunction<Double, Double, Double> accumulator;
private Double value = 0.0;
private Double sum = 0.0;
private final int window;
private final int warmupWindow;
private int count = 0;

public ExpAvgMeasurement(int window, int warmupWindow, BiFunction<Double, Double, Double> accumulator) {
public ExpAvgMeasurement(int window, int warmupWindow) {
this.window = window;
this.warmupWindow = warmupWindow;
this.accumulator = accumulator;
}

@Override
public Number add(Number sample) {
if (count == 0) {
count++;
value = sample.doubleValue();
sum = value = sample.doubleValue();
} else if (count < warmupWindow) {
count++;
value = accumulator.apply(value.doubleValue(), sample.doubleValue());
sum += sample.doubleValue();
value = sum.doubleValue() / count;
} else {
double factor = factor(window);
value = value * (1-factor) + sample.doubleValue() * factor;
Expand All @@ -59,6 +58,7 @@ public Number get() {
public void reset() {
value = 0.0;
count = 0;
sum = 0.0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class ExpAvgMeasurementTest {
@Test
public void testWarmup() {
ExpAvgMeasurement avg = new ExpAvgMeasurement(10, 10, Math::min);
ExpAvgMeasurement avg = new ExpAvgMeasurement(100, 10);

double expected[] = new double[]{10.0, 10.5, 11, 11.5, 12, 12.5, 13, 13.5, 14, 14.5};
for (int i = 0; i < 10; i++) {
avg.add(i + 10);
Assert.assertEquals(10.0, avg.get().doubleValue(), 0.01);
double value = avg.add(i + 10).doubleValue();
Assert.assertEquals(expected[i], avg.get().doubleValue(), 0.01);
}

avg.add(100);
Assert.assertEquals(26.36, avg.get().doubleValue(), 0.01);
Assert.assertEquals(16.19, avg.get().doubleValue(), 0.01);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public static void main(String[] args) throws IOException {
final LatencyCollector latency = new LatencyCollector();

final Driver driver = Driver.newBuilder()
.exponentialRps(100, 60, TimeUnit.SECONDS)
.exponentialRps(200, 500, TimeUnit.SECONDS)
.exponentialRps(100, 500, TimeUnit.SECONDS)
.exponentialRps(75, 500, TimeUnit.SECONDS)
.exponentialRps(75, 200, TimeUnit.SECONDS)
.exponentialRps(100, 200, TimeUnit.SECONDS)
.exponentialRps(200, 200, TimeUnit.SECONDS)
.exponentialRps(100, 200, TimeUnit.SECONDS)
.latencyAccumulator(latency)
.runtime(1, TimeUnit.HOURS)
.port(server.getPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static void main(String[] args) throws IOException, ExecutionException, I
.partition("1", 1.0)
.partition("2", 0.0)
// .partition("3", 0.0)
.partitionRejectDelay("2", 1000, TimeUnit.MILLISECONDS)
// .partitionRejectDelay("2", 1000, TimeUnit.MILLISECONDS)
// .partitionRejectDelay("3", 1000, TimeUnit.MILLISECONDS)
.limit(WindowedLimit.newBuilder()
.minWindowTime(1, TimeUnit.SECONDS)
Expand Down

0 comments on commit 87565b7

Please sign in to comment.