Skip to content

Commit

Permalink
Improve limiter stability by using average window RTT.
Browse files Browse the repository at this point in the history
  • Loading branch information
elandau committed Jun 14, 2018
1 parent 8e3e42f commit 22fc4cd
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ interface SampleWindow {
*/
long getCandidateRttNanos();

/**
* @return Sum of all RTT samples in this window.
*/
long getRttSumNanos();

/**
* @return Maximum number of inflight observed during the sample window
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,13 @@ public ExpAvgMeasurement(int window, double filter) {

@Override
public Number add(Number sample) {
if (value == 0) {
if (value == 0.0) {
value = sample.doubleValue();
} else if (sample.doubleValue() < value) {
value = sample.doubleValue();
} else {
value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
}
// // First sample seen
// if (count == 0) {
// value = sample.doubleValue();
// count = 1;
// // Adaptive average for the first <window> samples
// } else if (count < window) {
// count++;
// double tempRatio = 1.0 / count;
// value = (1-tempRatio) * value + tempRatio * Math.min(value*filter, sample.doubleValue());
// // Steady state
// } else {
// value = (1-ratio) * value + ratio * Math.min(value*filter, sample.doubleValue());
// }
return value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public final class GradientLimit implements Limit {
public static class Builder {
private int initialLimit = 50;
private int minLimit = 1;
private int maxLimit = 1000;
private int maxLimit = 200;

private double smoothing = 0.2;
private Function<Integer, Integer> queueSize = SquareRootFunction.create(4);
private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;
private int noLoadRttWindow = 1000;
private double noLoadRttFilter = 1.1;
private double rttTolerance = 2.0;

/**
* Minimum threshold for accepting a new rtt sample. Any RTT lower than this threshold
Expand Down Expand Up @@ -76,9 +77,9 @@ public Builder minLimit(int minLimit) {
* before reducing the limit. For example, a value of 2.0 means that a 2x increase in latency is acceptable.
* @return Chainable builder
*/
@Deprecated
public Builder rttTolerance(double rttTolerance) {
Preconditions.checkArgument(rttTolerance >= 1.0, "Tolerance must be >= 1.0");
this.rttTolerance = rttTolerance;
return this;
}

Expand Down Expand Up @@ -209,6 +210,8 @@ public static GradientLimit newDefault() {

private final int minLimit;

private final double rttTolerance;

private final Function<Integer, Integer> queueSize;

private final SampleListener minRttSampleListener;
Expand All @@ -223,6 +226,8 @@ private GradientLimit(Builder builder) {
this.minLimit = builder.minLimit;
this.queueSize = builder.queueSize;
this.smoothing = builder.smoothing;
this.rttTolerance = builder.rttTolerance;

this.rttNoLoadAccumulator = new ExpAvgMeasurement(builder.noLoadRttWindow, builder.noLoadRttFilter);

this.minRttSampleListener = builder.registry.registerDistribution(MetricIds.MIN_RTT_NAME);
Expand All @@ -234,18 +239,18 @@ private GradientLimit(Builder builder) {
public synchronized void update(SampleWindow sample) {
Preconditions.checkArgument(sample.getCandidateRttNanos() > 0, "rtt must be >0 but got " + sample.getCandidateRttNanos());

final long rttSample = sample.getCandidateRttNanos();
final long rttSample = sample.getRttSumNanos() / sample.getSampleCount();
minWindowRttSampleListener.addSample(rttSample);

final double queueSize = this.queueSize.apply((int)this.estimatedLimit);
queueSizeSampleListener.addSample(queueSize);

final double rttNoLoad = rttNoLoadAccumulator.add(rttSample).doubleValue();
final double rtt = (double)rttSample;

minRttSampleListener.addSample(rttNoLoad);
minRttSampleListener.addSample(rttSample);

final double gradient;
final double rtt = (double)rttSample / rttTolerance;
// rtt is lower than rtt_noload because of smoothing rtt noload updates
// set to 1.0 to indicate no queueing
if (rtt < rttNoLoad) {
Expand All @@ -264,23 +269,24 @@ public synchronized void update(SampleWindow sample) {
}

// Apply a smoothing factor when reducing the limit only
newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit);
if (newLimit < estimatedLimit) {
newLimit = (1-smoothing) * estimatedLimit + smoothing*(newLimit);
}

newLimit = Math.max(Math.max(minLimit, queueSize), Math.min(maxLimit, newLimit));

if (LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
(int)newLimit,
TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0,
TimeUnit.NANOSECONDS.toMicros((int)rttSample)/1000.0,
queueSize,
gradient);
}

if ((int)newLimit != (int)estimatedLimit) {
// Don't grow the limit if we are app limited
if (sample.getMaxInFlight() + queueSize < estimatedLimit) {
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={}",
(int)newLimit,
TimeUnit.NANOSECONDS.toMicros((int)rttNoLoad)/1000.0,
TimeUnit.NANOSECONDS.toMicros((int)rtt)/1000.0,
queueSize,
gradient);
}
// We are app limited, don't increase the limit to prevent upward drift
if (sample.getMaxInFlight() * 2 < estimatedLimit) {
return;
}

estimatedLimit = newLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
public final class DefaultLimiter<ContextT> implements Limiter<ContextT> {
private final Supplier<Long> nanoClock = System::nanoTime;

private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1);
private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(500);
private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3);
private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(3);
private static final long DEFAULT_MIN_RTT_THRESHOLD = TimeUnit.MICROSECONDS.toNanos(100);

/**
* Minimum observed samples to filter out sample windows with not enough significant samples
*/
private static final int DEFAULT_WINDOW_SIZE = 10;
private static final int DEFAULT_WINDOW_SIZE = 100;

/**
* End time for the sampling window at which point the limit should be updated
Expand Down Expand Up @@ -181,7 +181,8 @@ public void onSuccess() {
if (isWindowReady(current)) {
sample.set(new ImmutableSampleWindow());

nextUpdateTime = endTime + Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime);
long delta = Math.min(Math.max(current.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime);
nextUpdateTime = endTime + delta;
limit.update(current);
strategy.setLimit(limit.getLimit());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,53 @@
package com.netflix.concurrency.limits.limiter;

import com.netflix.concurrency.limits.Limit;

import java.util.concurrent.TimeUnit;

import com.netflix.concurrency.limits.Limit;

/**
* Class used to track immutable samples in an AtomicReference
*/
public class ImmutableSampleWindow implements Limit.SampleWindow {
final long minRtt;
final long sum;
final int maxInFlight;
final int sampleCount;
final boolean didDrop;

public ImmutableSampleWindow() {
this.minRtt = Long.MAX_VALUE;
this.sum = 0;
this.maxInFlight = 0;
this.sampleCount = 0;
this.didDrop = false;
}

public ImmutableSampleWindow(long minRtt, int maxInFlight, int sampleCount, boolean didDrop) {
public ImmutableSampleWindow(long minRtt, long sum, int maxInFlight, int sampleCount, boolean didDrop) {
this.minRtt = minRtt;
this.sum = sum;
this.maxInFlight = maxInFlight;
this.sampleCount = sampleCount;
this.didDrop = didDrop;
}

public ImmutableSampleWindow addSample(long rtt, int maxInFlight) {
return new ImmutableSampleWindow(Math.min(rtt, minRtt), Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop);
return new ImmutableSampleWindow(Math.min(rtt, minRtt), sum + rtt, Math.max(maxInFlight, this.maxInFlight), sampleCount+1, didDrop);
}

public ImmutableSampleWindow addDroppedSample(int maxInFlight) {
return new ImmutableSampleWindow(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount, true);
return new ImmutableSampleWindow(minRtt, sum, Math.max(maxInFlight, this.maxInFlight), sampleCount, true);
}

@Override
public long getCandidateRttNanos() {
return minRtt;
}

@Override
public long getRttSumNanos() {
return sum;
}

@Override
public int getMaxInFlight() {
return maxInFlight;
Expand Down

0 comments on commit 22fc4cd

Please sign in to comment.