Skip to content

Commit

Permalink
Merge pull request #87 from elandau/bugfix/math_overflow
Browse files Browse the repository at this point in the history
Make blocking limiter timeout a requirement.
  • Loading branch information
elandau authored Oct 1, 2018
2 parents 63777de + 7229594 commit 6c95e64
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.concurrency.limits.limiter;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.internal.Preconditions;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -29,38 +30,54 @@
* @param <ContextT>
*/
public final class BlockingLimiter<ContextT> implements Limiter<ContextT> {
public static final Duration MAX_TIMEOUT = Duration.ofHours(1);

/**
* Wrap a limiter such that acquire will block up to {@link BlockingLimiter#MAX_TIMEOUT} if the limit was reached
* instead of return an empty listener immediately
* @param delegate Non-blocking limiter to wrap
* @return Wrapped limiter
*/
public static <ContextT> BlockingLimiter<ContextT> wrap(Limiter<ContextT> delegate) {
return new BlockingLimiter<>(delegate, Optional.empty());
return new BlockingLimiter<>(delegate, MAX_TIMEOUT);
}

/**
* Wrap a limiter such that acquire will block up to a provided timeout if the limit was reached
* instead of return an empty listener immediately
*
* @param delegate Non-blocking limiter to wrap
* @param timeout Max amount of time to wait for the wait for the limit to be released. Cannot exceed {@link BlockingLimiter#MAX_TIMEOUT}
* @return Wrapped limiter
*/
public static <ContextT> BlockingLimiter<ContextT> wrap(Limiter<ContextT> delegate, Duration timeout) {
return new BlockingLimiter<>(delegate, Optional.of(timeout));
Preconditions.checkArgument(timeout.compareTo(MAX_TIMEOUT) < 0, "Timeout cannot be greater than " + MAX_TIMEOUT);
return new BlockingLimiter<>(delegate, timeout);
}

private final Limiter<ContextT> delegate;
private final Optional<Duration> timeout;
private final Duration timeout;

/**
* Lock used to block and unblock callers as the limit is reached
*/
private final Object lock = new Object();

private BlockingLimiter(Limiter<ContextT> limiter, Optional<Duration> timeout) {
private BlockingLimiter(Limiter<ContextT> limiter, Duration timeout) {
this.delegate = limiter;
this.timeout = timeout;
}

private Optional<Listener> tryAcquire(ContextT context) {
Instant deadline = timeout.map(t -> Instant.now().plus(t)).orElse(Instant.MAX);
final Instant deadline = Instant.now().plus(timeout);
synchronized (lock) {
while (true) {
Instant now = Instant.now();
final Instant now = Instant.now();
if (!now.isBefore(deadline)) {
return Optional.empty();
}
// Try to acquire a token and return immediately if successful
Optional<Listener> listener;
listener = delegate.acquire(context);
final Optional<Listener> listener = delegate.acquire(context);
if (listener.isPresent()) {
return listener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.SettableLimit;
import org.junit.Assert;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -17,7 +19,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class BlockingLimiterTest {
Expand Down Expand Up @@ -62,11 +63,32 @@ public void testTimeout() {
Duration timeout = Duration.ofMillis(50);
SettableLimit limit = SettableLimit.startingAt(1);
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), timeout);

// Acquire first, will succeeed an not block
limiter.acquire(null);

// Second acquire should time out after at least 50 millis
Instant before = Instant.now();
assertEquals(Optional.empty(), limiter.acquire(null));
Assert.assertFalse(limiter.acquire(null).isPresent());
Instant after = Instant.now();
Duration interval = Duration.between(before, after);
assertTrue(interval.compareTo(timeout) >= 0);

Duration delay = Duration.between(before, after);
assertTrue("Delay was " + delay.toMillis() + " millis", delay.compareTo(timeout) >= 0);
}

@Test(expected=TimeoutException.class)
public void testNoTimeout() throws InterruptedException, ExecutionException, TimeoutException {
SettableLimit limit = SettableLimit.startingAt(1);
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build());
limiter.acquire(null);

CompletableFuture<Optional<Limiter.Listener>> future = CompletableFuture.supplyAsync(() -> limiter.acquire(null));
future.get(1, TimeUnit.SECONDS);
}

@Test(expected = IllegalArgumentException.class)
public void failOnHighTimeout() {
SettableLimit limit = SettableLimit.startingAt(1);
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), Duration.ofDays(1));
}
}

0 comments on commit 6c95e64

Please sign in to comment.