diff --git a/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java
index 9053ec00d54..779769c9953 100644
--- a/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java
+++ b/vertx-core/src/main/java/io/vertx/core/impl/future/CompositeFutureImpl.java
@@ -61,8 +61,12 @@ private CompositeFutureImpl(int op, boolean initializing, Future>... results)
private void init() {
for (Future> result : results) {
- FutureBase internal = (FutureBase>) result;
- internal.addListener(this);
+ if (result instanceof FutureBase) {
+ FutureBase internal = (FutureBase>) result;
+ internal.addListener(this);
+ } else {
+ result.onComplete(this);
+ }
}
Object o;
synchronized (this) {
@@ -205,8 +209,10 @@ public int size() {
private void doComplete(Object result) {
for (Future> r : results) {
- FutureBase internal = (FutureBase>) r;
- internal.removeListener(this);
+ if (r instanceof FutureBase) {
+ FutureBase internal = (FutureBase>) r;
+ internal.removeListener(this);
+ }
}
if (result == this) {
tryComplete(this);
diff --git a/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java b/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java
index 08b8056266d..e40b5b8b75e 100644
--- a/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java
+++ b/vertx-core/src/test/java/io/vertx/tests/future/CompositeFutureTest.java
@@ -11,15 +11,9 @@
package io.vertx.tests.future;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import io.vertx.core.Completable;
-import io.vertx.core.CompositeFuture;
-import io.vertx.core.Future;
-import io.vertx.core.Promise;
+import io.vertx.core.*;
import io.vertx.core.impl.future.FutureImpl;
import io.vertx.test.core.Repeat;
-
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.Test;
@@ -29,13 +23,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
/**
* @author Julien Viet
*/
@@ -577,4 +570,103 @@ public void testAnyRemovesListeners2() {
Future.any(f, Future.succeededFuture());
assertEquals(Collections.emptySet(), f.listeners);
}
+
+ @Test
+ public void testCustomFuture() {
+ Promise p1 = Promise.promise();
+ Promise p2 = Promise.promise();
+ Promise p3 = Promise.promise();
+
+ CompositeFuture cf = Future.all(p1.future(), new MyFuture(p2), p3.future());
+
+ p1.complete(null);
+ p2.complete(null);
+ p3.complete(null);
+
+ assertTrue(cf.isComplete());
+ }
+
+ private static class MyFuture implements Future {
+
+ private final Future delegate;
+
+ private MyFuture(Promise promise) {
+ delegate = promise.future();
+ }
+
+ @Override
+ public boolean isComplete() {
+ return delegate.isComplete();
+ }
+
+ @Override
+ public Future onComplete(Handler> handler) {
+ return delegate.onComplete(handler);
+ }
+
+ @Override
+ public Void result() {
+ return delegate.result();
+ }
+
+ @Override
+ public Throwable cause() {
+ return delegate.cause();
+ }
+
+ @Override
+ public boolean succeeded() {
+ return delegate.succeeded();
+ }
+
+ @Override
+ public boolean failed() {
+ return delegate.failed();
+ }
+
+ @Override
+ public Future compose(Function super Void, Future> successMapper, Function> failureMapper) {
+ return delegate.compose(successMapper, failureMapper);
+ }
+
+ @Override
+ public Future transform(Function, Future> mapper) {
+ return delegate.transform(mapper);
+ }
+
+ @Override
+ public Future eventually(Supplier> mapper) {
+ return delegate.eventually(mapper);
+ }
+
+ @Override
+ public Future map(Function super Void, U> mapper) {
+ return delegate.map(mapper);
+ }
+
+ @Override
+ public Future map(V value) {
+ return delegate.map(value);
+ }
+
+ @Override
+ public Future otherwise(Function mapper) {
+ return delegate.otherwise(mapper);
+ }
+
+ @Override
+ public Future otherwise(Void value) {
+ return delegate.otherwise(value);
+ }
+
+ @Override
+ public Future expecting(Expectation super Void> expectation) {
+ return delegate.expecting(expectation);
+ }
+
+ @Override
+ public Future timeout(long delay, TimeUnit unit) {
+ return delegate.timeout(delay, unit);
+ }
+ }
}