diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java index 1884e835..1addef4c 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/AIMDLimit.java @@ -16,26 +16,44 @@ package com.netflix.concurrency.limits.limit; import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.internal.Preconditions; + +import java.util.concurrent.TimeUnit; /** * Loss based dynamic {@link Limit} that does an additive increment as long as * there are no errors and a multiplicative decrement when there is an error. */ public final class AIMDLimit extends AbstractLimit { - + private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toNanos(5); + public static class Builder { private int initialLimit = 10; private double backoffRatio = 0.9; - + private long timeout = DEFAULT_TIMEOUT; + public Builder initialLimit(int initialLimit) { this.initialLimit = initialLimit; return this; } - + public Builder backoffRatio(double backoffRatio) { + Preconditions.checkArgument(backoffRatio < 1.0 && backoffRatio >= 0.5, "Backoff ratio must be in the range [0.5, 1.0)"); this.backoffRatio = backoffRatio; return this; } + + /** + * Timeout threshold that when exceeded equates to a drop. + * @param timeout + * @param units + * @return Chainable builder + */ + public Builder timeout(long timeout, TimeUnit units) { + Preconditions.checkArgument(timeout > 0, "Timeout must be positive"); + this.timeout = units.toNanos(timeout); + return this; + } public AIMDLimit build() { return new AIMDLimit(this); @@ -47,20 +65,24 @@ public static Builder newBuilder() { } private final double backoffRatio; + private final long timeout; private AIMDLimit(Builder builder) { super(builder.initialLimit); this.backoffRatio = builder.backoffRatio; + this.timeout = builder.timeout; } @Override protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { - if (didDrop) { - return Math.max(1, Math.min(getLimit() - 1, (int) (getLimit() * backoffRatio))); - } else if (inflight >= getLimit()) { - return getLimit() + 1; + final int currentLimit = getLimit(); + + if (didDrop || rtt > timeout) { + return Math.max(1, Math.min(currentLimit - 1, (int) (currentLimit * backoffRatio))); + } else if (inflight * 2 >= currentLimit) { + return currentLimit + 1; } else { - return getLimit(); + return currentLimit; } }