From a8da5028afeaeba7a784150bf9f420fb20c7aa33 Mon Sep 17 00:00:00 2001 From: Eran Landau Date: Thu, 3 May 2018 21:48:59 -0700 Subject: [PATCH] - Fix bug filtering sample window for high noload RTT - Continue using the existing sample window when it fails the filter to avoid starving the algorithm - Set limit to queueSize when probing for minRtt - Add GRPC example --- .../limits/limit/GradientLimit.java | 37 +++--- .../concurrency/limits/limit/Measurement.java | 4 + .../limits/limit/MinimumMeasurement.java | 7 ++ .../limits/limiter/DefaultLimiter.java | 90 ++++++++------ .../limits/limiter/ImmutableSample.java | 16 ++- concurrency-limits-grpc/build.gradle | 1 + .../limits/grpc/server/example/Example.java | 114 ++++++++++++++++++ 7 files changed, 211 insertions(+), 58 deletions(-) create mode 100644 concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java index 49508ebb..02646e27 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/GradientLimit.java @@ -1,13 +1,5 @@ package com.netflix.concurrency.limits.limit; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.concurrency.limits.Limit; import com.netflix.concurrency.limits.MetricIds; import com.netflix.concurrency.limits.MetricRegistry; @@ -16,6 +8,14 @@ import com.netflix.concurrency.limits.internal.Preconditions; import com.netflix.concurrency.limits.limit.functions.SquareRootFunction; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Concurrency limit algorithm that adjust the limits based on the gradient of change in the * samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the @@ -41,7 +41,7 @@ public static class Builder { private Supplier resetRttCounterSupplier; private Builder() { - probeNoLoadRtt(1000, 2000); + probeNoLoadRtt(500, 1000); } /** @@ -215,21 +215,24 @@ public synchronized void update(SampleWindow sample) { return; } - final double queueSize = this.queueSize.apply((int)this.estimatedLimit); - queueSizeSampleListener.addSample(queueSize); - // Reset or probe for a new RTT and a new estimatedLimit. It's necessary to cut the limit // in half to avoid having the limit drift upwards when the RTT is probed during heavy load. // To avoid decreasing the limit too much we don't allow it to go lower than the queueSize. if (resetRttCounter != DISABLED && resetRttCounter-- <= 0) { - LOG.debug("Probe for a new noload RTT"); resetRttCounter = this.resetRttCounterSupplier.get(); - estimatedLimit = Math.max(queueSize, estimatedLimit/2); - rttNoLoad.reset(); + long currrentQueueSize = this.queueSize.apply((int)this.estimatedLimit); + estimatedLimit = currrentQueueSize; + long nextRttNoLoad = rttNoLoad.update(value -> value * 2); + LOG.debug("Probe MinRTT {}", TimeUnit.NANOSECONDS.toMicros(nextRttNoLoad)/1000.0); + return; } + final double queueSize = this.queueSize.apply((int)this.estimatedLimit); + queueSizeSampleListener.addSample(queueSize); + if (rttNoLoad.add(rtt)) { - LOG.debug("New MinRTT {}", rtt); + LOG.debug("New MinRTT {}", TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0); + return; } minRttSampleListener.addSample(rttNoLoad.get()); @@ -250,7 +253,7 @@ public synchronized void update(SampleWindow sample) { if ((int)newLimit != (int)estimatedLimit) { if (LOG.isDebugEnabled()) { LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={} gradient={} resetCounter={}", - (int)estimatedLimit, + (int)newLimit, TimeUnit.NANOSECONDS.toMicros(rttNoLoad.get())/1000.0, TimeUnit.NANOSECONDS.toMicros(rtt)/1000.0, queueSize, diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java index 37fdef77..345668ba 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/Measurement.java @@ -1,5 +1,7 @@ package com.netflix.concurrency.limits.limit; +import java.util.function.Function; + /** * Contract for tracking a measurement such as a minimum or average of a sample set */ @@ -11,6 +13,8 @@ public interface Measurement { */ boolean add(long sample); + long update(Function func); + /** * @return Return the current value */ diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java index a3dd7e05..2d9cb80f 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/MinimumMeasurement.java @@ -1,5 +1,7 @@ package com.netflix.concurrency.limits.limit; +import java.util.function.Function; + public class MinimumMeasurement implements Measurement { private long value = 0; @@ -22,4 +24,9 @@ public void reset() { value = 0; } + @Override + public long update(Function func) { + value = func.apply(value); + return value; + } } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java index 83ab8e2a..8213bf7f 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/DefaultLimiter.java @@ -1,12 +1,5 @@ package com.netflix.concurrency.limits.limiter; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - import com.netflix.concurrency.limits.Limit; import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.Strategy; @@ -14,6 +7,12 @@ import com.netflix.concurrency.limits.internal.Preconditions; import com.netflix.concurrency.limits.limit.VegasLimit; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + /** * {@link Limiter} that combines a plugable limit algorithm and enforcement strategy to * enforce concurrency limits to a fixed resource. @@ -23,42 +22,31 @@ public final class DefaultLimiter implements Limiter { private final Supplier nanoClock = System::nanoTime; private static final long DEFAULT_MIN_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); - private static final int DEFAULT_WINDOW_SIZE = 10; - + private static final long DEFAULT_MAX_WINDOW_TIME = TimeUnit.SECONDS.toNanos(1); + /** * Minimum observed samples to filter out sample windows with not enough significant samples */ - private static final int MIN_WINDOW_SAMPLE_COUNT = 10; + private static final int DEFAULT_WINDOW_SIZE = 10; /** * Minimum observed max inflight to filter out sample windows with not enough significant data */ - private static final int MIN_WINDOW_MAX_INFLIGHT = 1; + private static final int MIN_WINDOW_MAX_INFLIGHT = 2; /** * End time for the sampling window at which point the limit should be updated */ - private final AtomicLong nextUpdateTime = new AtomicLong(); + private volatile long nextUpdateTime = 0; - /** - * Algorithm used to determine the new limit based on the current limit and minimum - * measured RTT in the sample window - */ private final Limit limit; - /** - * Strategy for enforcing the limit - */ private final Strategy strategy; - /** - * Minimum window size in nanonseconds for sampling a new minRtt - */ private final long minWindowTime; - /** - * Sampling window size in multiple of the measured minRtt - */ + private final long maxWindowTime; + private final int windowSize; /** @@ -73,27 +61,50 @@ public final class DefaultLimiter implements Limiter { public static class Builder { private Limit limit = VegasLimit.newDefault(); + private long maxWindowTime = DEFAULT_MAX_WINDOW_TIME; private long minWindowTime = DEFAULT_MIN_WINDOW_TIME; private int windowSize = DEFAULT_WINDOW_SIZE; + /** + * Algorithm used to determine the new limit based on the current limit and minimum + * measured RTT in the sample window + */ public Builder limit(Limit limit) { Preconditions.checkArgument(limit != null, "Algorithm may not be null"); this.limit = limit; return this; } + /** + * Minimum window duration for sampling a new minRtt + */ public Builder minWindowTime(long minWindowTime, TimeUnit units) { Preconditions.checkArgument(minWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms"); this.minWindowTime = units.toNanos(minWindowTime); return this; } + /** + * Maximum window duration for sampling a new minRtt + */ + public Builder maxWindowTime(long maxWindowTime, TimeUnit units) { + Preconditions.checkArgument(maxWindowTime >= units.toMillis(100), "minWindowTime must be >= 100 ms"); + this.maxWindowTime = units.toNanos(maxWindowTime); + return this; + } + + /** + * Minimum sampling window size for finding a new minimum rtt + */ public Builder windowSize(int windowSize) { Preconditions.checkArgument(windowSize >= 10, "Window size must be >= 10"); this.windowSize = windowSize; return this; } + /** + * @param strategy Strategy for enforcing the limit + */ public DefaultLimiter build(Strategy strategy) { Preconditions.checkArgument(strategy != null, "Strategy may not be null"); return new DefaultLimiter(this, strategy); @@ -117,12 +128,14 @@ public DefaultLimiter(Limit limit, Strategy strategy) { this.strategy = strategy; this.windowSize = DEFAULT_WINDOW_SIZE; this.minWindowTime = DEFAULT_MIN_WINDOW_TIME; + this.maxWindowTime = DEFAULT_MAX_WINDOW_TIME; strategy.setLimit(limit.getLimit()); } private DefaultLimiter(Builder builder, Strategy strategy) { this.limit = builder.limit; this.minWindowTime = builder.minWindowTime; + this.maxWindowTime = builder.maxWindowTime; this.windowSize = builder.windowSize; this.strategy = strategy; strategy.setLimit(limit.getLimit()); @@ -151,16 +164,17 @@ public void onSuccess() { sample.getAndUpdate(current -> current.addSample(rtt, currentMaxInFlight)); - long updateTime = nextUpdateTime.get(); - if (endTime >= updateTime) { - long nextUpdate = endTime + Math.max(minWindowTime, rtt * windowSize); - if (nextUpdateTime.compareAndSet(updateTime, nextUpdate)) { - ImmutableSample last = sample.getAndUpdate(ImmutableSample::reset); - if (last.getCandidateRttNanos() < Integer.MAX_VALUE - && last.getSampleCount() > MIN_WINDOW_SAMPLE_COUNT - && last.getMaxInFlight() > MIN_WINDOW_MAX_INFLIGHT) { - limit.update(last); - strategy.setLimit(limit.getLimit()); + if (endTime >= nextUpdateTime) { + synchronized (this) { + // Double check inside the lock + if (endTime >= nextUpdateTime) { + ImmutableSample last = sample.get(); + if (isSampleReady(last)) { + nextUpdateTime = endTime + Math.min(Math.max(last.getCandidateRttNanos() * 2, minWindowTime), maxWindowTime); + sample.set(new ImmutableSample()); + limit.update(last); + strategy.setLimit(limit.getLimit()); + } } } } @@ -181,6 +195,12 @@ public void onDropped() { }); } + private boolean isSampleReady(ImmutableSample sample) { + return sample.getCandidateRttNanos() < Long.MAX_VALUE + && sample.getSampleCount() > windowSize + && sample.getMaxInFlight() > MIN_WINDOW_MAX_INFLIGHT; + } + protected int getLimit() { return limit.getLimit(); } diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSample.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSample.java index 9060c5bb..b3137c66 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSample.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/ImmutableSample.java @@ -2,6 +2,8 @@ import com.netflix.concurrency.limits.Limit; +import java.util.concurrent.TimeUnit; + /** * Class used to track immutable samples in an AtomicReference */ @@ -12,16 +14,12 @@ public class ImmutableSample implements Limit.SampleWindow { final boolean didDrop; public ImmutableSample() { - this.minRtt = Integer.MAX_VALUE; + this.minRtt = Long.MAX_VALUE; this.maxInFlight = 0; this.sampleCount = 0; this.didDrop = false; } - public ImmutableSample reset() { - return new ImmutableSample(); - } - public ImmutableSample(long minRtt, int maxInFlight, int sampleCount, boolean didDrop) { this.minRtt = minRtt; this.maxInFlight = maxInFlight; @@ -34,7 +32,7 @@ public ImmutableSample addSample(long rtt, int maxInFlight) { } public ImmutableSample addDroppedSample(int maxInFlight) { - return new ImmutableSample(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount+1, true); + return new ImmutableSample(minRtt, Math.max(maxInFlight, this.maxInFlight), sampleCount, true); } @Override @@ -56,4 +54,10 @@ public int getSampleCount() { public boolean didDrop() { return didDrop; } + + @Override + public String toString() { + return "ImmutableSample [minRtt=" + TimeUnit.NANOSECONDS.toMicros(minRtt) / 1000.0 + ", maxInFlight=" + maxInFlight + ", sampleCount=" + sampleCount + + ", didDrop=" + didDrop + "]"; + } } \ No newline at end of file diff --git a/concurrency-limits-grpc/build.gradle b/concurrency-limits-grpc/build.gradle index 6d05b795..0938fe09 100644 --- a/concurrency-limits-grpc/build.gradle +++ b/concurrency-limits-grpc/build.gradle @@ -15,4 +15,5 @@ dependencies { testCompile "io.grpc:grpc-stub:1.9.0" testCompile "junit:junit-dep:4.10" testCompile "org.slf4j:slf4j-log4j12:1.7.+" + testCompile "org.apache.commons:commons-math3:3.6.1" } diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java new file mode 100644 index 00000000..164a21e3 --- /dev/null +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/example/Example.java @@ -0,0 +1,114 @@ +package com.netflix.concurrency.limits.grpc.server.example; + +import com.google.common.util.concurrent.Uninterruptibles; +import com.netflix.concurrency.limits.grpc.StringMarshaller; +import com.netflix.concurrency.limits.grpc.server.ConcurrencyLimitServerInterceptor; +import com.netflix.concurrency.limits.grpc.server.GrpcServerLimiterBuilder; +import com.netflix.concurrency.limits.limit.GradientLimit; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Server; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.ClientCalls; +import io.grpc.stub.ServerCalls; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import org.apache.commons.math3.distribution.ExponentialDistribution; + +public class Example { + private static final MethodDescriptor METHOD_DESCRIPTOR = MethodDescriptor.newBuilder() + .setType(MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(StringMarshaller.INSTANCE) + .setResponseMarshaller(StringMarshaller.INSTANCE) + .build(); + + public static ServerCallHandler createServerHandler(int concurrency) { + final ExponentialDistribution distribution = new ExponentialDistribution(10.0); + final Supplier latency = () -> 100 + (int)distribution.sample(); + final Semaphore sem = new Semaphore(concurrency, true); + + return ServerCalls.asyncUnaryCall((req, observer) -> { + try { + sem.acquire(); + Uninterruptibles.sleepUninterruptibly(latency.get(), TimeUnit.MILLISECONDS); + observer.onNext("response"); + observer.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + observer.onError(Status.UNKNOWN.asRuntimeException()); + } finally { + sem.release(); + } + }); + } + + public static void main(String[] args) throws IOException { + GradientLimit limit = GradientLimit.newBuilder() + .build(); + + // Create a server + Server server = NettyServerBuilder.forPort(0) + .addService(ServerInterceptors.intercept(ServerServiceDefinition.builder("service") + .addMethod(METHOD_DESCRIPTOR, createServerHandler(20)) + .build(), new ConcurrencyLimitServerInterceptor(new GrpcServerLimiterBuilder() + .limiter(builder -> builder + .limit(limit) + .minWindowTime(200, TimeUnit.MILLISECONDS) + ) + .build()) + )) + .build() + .start(); + + // Report progress + AtomicInteger dropCount = new AtomicInteger(0); + AtomicInteger successCount = new AtomicInteger(0); + + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + System.out.println(MessageFormat.format("{0}, {1}, {2}", limit.getLimit(), successCount.getAndSet(0), dropCount.getAndSet(0))); + }, 1, 1, TimeUnit.SECONDS); + + // Create a client + Channel channel = NettyChannelBuilder.forTarget("localhost:" + server.getPort()) + .usePlaintext(true) + .build(); + + while (true) { + Uninterruptibles.sleepUninterruptibly(6, TimeUnit.MILLISECONDS); + ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request", + new StreamObserver() { + @Override + public void onNext(String value) { + } + + @Override + public void onError(Throwable t) { + dropCount.incrementAndGet(); + } + + @Override + public void onCompleted() { + successCount.incrementAndGet(); + } + + }); + } + } +}