diff --git a/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java b/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java index bba06fc22bf..c536179001b 100644 --- a/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java +++ b/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java @@ -12,8 +12,8 @@ package io.vertx.core.impl.future; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; import io.vertx.core.Handler; /** @@ -64,8 +64,12 @@ private CompositeFutureImpl(int op, boolean initializing, Future... results) private void init() { for (Future result : results) { - FutureInternal internal = (FutureInternal) result; - internal.addListener(this); + if (result instanceof FutureInternal) { + FutureInternal internal = (FutureInternal) result; + internal.addListener(this); + } else { + result.onComplete(this::onSuccess, this::onFailure); + } } Object o; synchronized (this) { @@ -201,8 +205,10 @@ public int size() { private void complete(Object result) { for (Future r : results) { - FutureInternal internal = (FutureInternal) r; - internal.removeListener(this); + if (r instanceof FutureInternal) { + FutureInternal internal = (FutureInternal) r; + internal.removeListener(this); + } } if (result == this) { tryComplete(this); diff --git a/src/test/java/io/vertx/core/CompositeFutureTest.java b/src/test/java/io/vertx/core/CompositeFutureTest.java index 5873e3c3773..baef042d0d8 100644 --- a/src/test/java/io/vertx/core/CompositeFutureTest.java +++ b/src/test/java/io/vertx/core/CompositeFutureTest.java @@ -11,12 +11,9 @@ package io.vertx.core; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - import io.vertx.core.impl.future.FutureImpl; import io.vertx.core.impl.future.Listener; import io.vertx.test.core.Repeat; - import org.assertj.core.api.ThrowableAssert.ThrowingCallable; import org.junit.Test; @@ -33,6 +30,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + /** * @author Julien Viet */ @@ -574,4 +573,103 @@ public void testAnyRemovesListeners2() { Future.any(f, Future.succeededFuture()); assertEquals(Collections.emptySet(), f.listeners); } + + @Test + public void testCustomFuture() { + Promise p1 = Promise.promise(); + Promise p2 = Promise.promise(); + Promise p3 = Promise.promise(); + + CompositeFuture cf = Future.all(p1.future(), new MyFuture(p2), p3.future()); + + p1.complete(null); + p2.complete(null); + p3.complete(null); + + assertTrue(cf.isComplete()); + } + + private static class MyFuture implements Future { + + private final Future delegate; + + private MyFuture(Promise promise) { + delegate = promise.future(); + } + + @Override + public boolean isComplete() { + return delegate.isComplete(); + } + + @Override + public Future onComplete(Handler> handler) { + return delegate.onComplete(handler); + } + + @Override + public Void result() { + return delegate.result(); + } + + @Override + public Throwable cause() { + return delegate.cause(); + } + + @Override + public boolean succeeded() { + return delegate.succeeded(); + } + + @Override + public boolean failed() { + return delegate.failed(); + } + + @Override + public Future compose(Function> successMapper, Function> failureMapper) { + return delegate.compose(successMapper, failureMapper); + } + + @Override + public Future transform(Function, Future> mapper) { + return delegate.transform(mapper); + } + + @Override + public Future eventually(Function> function) { + return delegate.eventually(function); + } + + @Override + public Future map(Function mapper) { + return delegate.map(mapper); + } + + @Override + public Future map(V value) { + return delegate.map(value); + } + + @Override + public Future otherwise(Function mapper) { + return delegate.otherwise(mapper); + } + + @Override + public Future otherwise(Void value) { + return delegate.otherwise(value); + } + + @Override + public Future expecting(Expectation expectation) { + return delegate.expecting(expectation); + } + + @Override + public Future timeout(long delay, TimeUnit unit) { + return delegate.timeout(delay, unit); + } + } }