Skip to content

Commit

Permalink
Merge pull request #74 from elandau/feature/refactor
Browse files Browse the repository at this point in the history
Eliminate Strategy abstraction + Gradient2 algorithm
  • Loading branch information
elandau authored Sep 11, 2018
2 parents cefc43f + 3882327 commit d3c9861
Show file tree
Hide file tree
Showing 39 changed files with 1,195 additions and 1,293 deletions.
Original file line number Diff line number Diff line change
@@ -1,49 +1,29 @@
package com.netflix.concurrency.limits;

import java.util.function.Consumer;

/**
* Contract for an algorithm that calculates a concurrency limit based on
* rtt measurements
*/
public interface Limit {
/**
* Details of the current sample window
*/
interface SampleWindow {
/**
* @return Candidate RTT in the sample window. This is traditionally the minimum rtt.
*/
long getCandidateRttNanos();

/**
* @return Average RTT in the sample window. Excludes timeouts and dropped rtt.
*/
long getAverateRttNanos();

/**
* @return Maximum number of inflight observed during the sample window
*/
int getMaxInFlight();

/**
* @return Number of observed RTTs in the sample window
*/
int getSampleCount();

/**
* @return True if there was a timeout
*/
boolean didDrop();
}

/**
* @return Current estimated limit
*/
int getLimit();


/**
* Register a callback to receive notification whenever the limit is updated to a new value
* @param consumer
*/
void notifyOnChange(Consumer<Integer> consumer);

/**
* Update the concurrency limit using a new rtt sample
*
* @param sample Data from the last sampling window such as RTT
* Update the limiter with a sample
* @param startTime
* @param rtt
* @param inflight
* @param didDrop
*/
void update(SampleWindow sample);
void onSample(long startTime, long rtt, int inflight, boolean didDrop);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.netflix.concurrency.limits;

import com.netflix.concurrency.limits.strategy.LookupPartitionStrategy;

import java.util.Optional;

/**
Expand Down Expand Up @@ -40,8 +38,7 @@ interface Listener {
* If acquired the caller must call one of the Listener methods when the operation has been completed
* to release the count.
*
* @param context Context for the request. The context is used by advanced startegies such as
* {@link LookupPartitionStrategy}.
* @param context Context for the request
* @return Optional.empty() if limit exceeded.
*/
Optional<Listener> acquire(ContextT context);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Loss based dynamic {@link Limit} that does an additive increment as long as
* there are no errors and a multiplicative decrement when there is an error.
*/
public final class AIMDLimit implements Limit {
public final class AIMDLimit extends AbstractLimit {

public static class Builder {
private int initialLimit = 10;
Expand All @@ -31,30 +31,26 @@ public static Builder newBuilder() {
return new Builder();
}

private volatile int limit;
private final double backoffRatio;

private AIMDLimit(Builder builder) {
this.limit = builder.initialLimit;
super(builder.initialLimit);
this.backoffRatio = builder.backoffRatio;
}

@Override
public int getLimit() {
return limit;
}

@Override
public void update(SampleWindow sample) {
if (sample.didDrop()) {
limit = Math.max(1, Math.min(limit - 1, (int) (limit * backoffRatio)));
} else if (sample.getMaxInFlight() >= limit) {
limit = limit + 1;
protected int _update(long startTime, long rtt, int inflight, boolean didDrop) {
if (didDrop) {
return Math.max(1, Math.min(getLimit() - 1, (int) (getLimit() * backoffRatio)));
} else if (inflight >= getLimit()) {
return getLimit() + 1;
} else {
return getLimit();
}
}

@Override
public String toString() {
return "AIMDLimit [limit=" + limit + "]";
return "AIMDLimit [limit=" + getLimit() + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.netflix.concurrency.limits.limit;

import com.netflix.concurrency.limits.Limit;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

public abstract class AbstractLimit implements Limit {
private volatile int limit;
private final List<Consumer<Integer>> listeners = new CopyOnWriteArrayList<>();

protected AbstractLimit(int initialLimit) {
this.limit = initialLimit;
}

@Override
public final synchronized void onSample(long startTime, long rtt, int inflight, boolean didDrop) {
setLimit(_update(startTime, rtt, inflight, didDrop));
}

protected abstract int _update(long startTime, long rtt, int inflight, boolean didDrop);

@Override
public final int getLimit() {
return limit;
}

protected synchronized void setLimit(int newLimit) {
if (newLimit != limit) {
limit = newLimit;
listeners.forEach(listener -> listener.accept(newLimit));
}
}

public void notifyOnChange(Consumer<Integer> consumer) {
this.listeners.add(consumer);
}


}
Original file line number Diff line number Diff line change
@@ -1,33 +1,25 @@
package com.netflix.concurrency.limits.limit;

import com.netflix.concurrency.limits.Limit;

/**
* Non dynamic limit with fixed value
*/
public final class FixedLimit implements Limit {

private final int limit;
public final class FixedLimit extends AbstractLimit {

public static FixedLimit of(int limit) {
return new FixedLimit(limit);
}

private FixedLimit(int limit) {
this.limit = limit;
}

@Override
public int getLimit() {
return limit;
super(limit);
}

@Override
public void update(SampleWindow sample) {
public int _update(long startTime, long rtt, int inflight, boolean didDrop) {
return getLimit();
}

@Override
public String toString() {
return "FixedLimit [limit=" + limit + "]";
return "FixedLimit [limit=" + getLimit() + "]";
}
}
Loading

0 comments on commit d3c9861

Please sign in to comment.