diff --git a/src/main/java/io/vertx/core/impl/ContextImpl.java b/src/main/java/io/vertx/core/impl/ContextImpl.java index 2499a8044ea..bd17689ae09 100644 --- a/src/main/java/io/vertx/core/impl/ContextImpl.java +++ b/src/main/java/io/vertx/core/impl/ContextImpl.java @@ -56,7 +56,7 @@ static void setResultHandler(ContextInternal ctx, Future fut, Handler close() { + Future fut; if (closeFuture == owner.closeFuture()) { - return Future.future(p -> orderedTasks.shutdown(eventLoop, p)); + fut = Future.succeededFuture(); } else { - return closeFuture.close().eventually(() -> Future.future(p -> orderedTasks.shutdown(eventLoop, p))); + fut = closeFuture.close(); } + fut = fut.eventually(() -> Future.future(p -> executeBlockingTasks.shutdown(eventLoop, p))); + if (executor instanceof WorkerExecutor) { + WorkerExecutor workerExec = (WorkerExecutor) executor; + fut = fut.eventually(() -> Future.future(p -> workerExec.taskQueue().shutdown(eventLoop, p))); + } + return fut; } public Deployment getDeployment() { @@ -136,12 +142,12 @@ public Future executeBlockingInternal(Callable action, boolean ordered @Override public Future executeBlocking(Handler> blockingCodeHandler, boolean ordered) { - return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null); + return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? executeBlockingTasks : null); } @Override public Future executeBlocking(Callable blockingCodeHandler, boolean ordered) { - return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? orderedTasks : null); + return executeBlocking(this, blockingCodeHandler, workerPool, ordered ? executeBlockingTasks : null); } @Override diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java index e984fce5135..fa2b307926c 100644 --- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java +++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java @@ -16,7 +16,6 @@ import io.vertx.core.spi.tracing.VertxTracer; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -136,12 +135,12 @@ public Future executeBlockingInternal(Callable action, boolean ordered @Override public final Future executeBlocking(Handler> action, boolean ordered) { - return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.orderedTasks : null); + return ContextImpl.executeBlocking(this, action, delegate.workerPool, ordered ? delegate.executeBlockingTasks : null); } @Override public final Future executeBlocking(Callable blockingCodeHandler, boolean ordered) { - return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.orderedTasks : null); + return ContextImpl.executeBlocking(this, blockingCodeHandler, delegate.workerPool, ordered ? delegate.executeBlockingTasks : null); } @Override diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java index fec8358599c..f701650375a 100644 --- a/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -563,27 +563,24 @@ private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture clos ThreadingModel threadingModel = ThreadingModel.EVENT_LOOP; EventExecutor eventExecutor = new EventLoopExecutor(eventLoop); WorkerPool wp = workerPool != null ? workerPool : this.workerPool; - WorkerTaskQueue taskQueue = new WorkerTaskQueue(); - return createContext(threadingModel, eventLoop, closeFuture, deployment, tccl, eventExecutor, wp, taskQueue); + return createContext(threadingModel, eventLoop, closeFuture, deployment, tccl, eventExecutor, wp); } private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) { - WorkerTaskQueue orderedTasks = new WorkerTaskQueue(); WorkerPool wp = workerPool != null ? workerPool : this.workerPool; - return createContext(ThreadingModel.WORKER, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(wp, orderedTasks), wp, orderedTasks); + return createContext(ThreadingModel.WORKER, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(wp, new WorkerTaskQueue()), wp); } private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl) { if (!isVirtualThreadAvailable()) { throw new IllegalStateException("This Java runtime does not support virtual threads"); } - WorkerTaskQueue orderedTasks = new WorkerTaskQueue(); - return createContext(ThreadingModel.VIRTUAL_THREAD, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), virtualThreaWorkerPool, orderedTasks); + return createContext(ThreadingModel.VIRTUAL_THREAD, eventLoop, closeFuture, deployment, tccl, new WorkerExecutor(virtualThreaWorkerPool, new WorkerTaskQueue()), virtualThreaWorkerPool); } - private ContextImpl createContext(ThreadingModel threadingModel, EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl, EventExecutor eventExecutor, WorkerPool wp, WorkerTaskQueue taskQueue) { - return new ContextImpl(this, contextLocalsLength, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, taskQueue, deployment, closeFuture, disableTCCL ? null : tccl); + private ContextImpl createContext(ThreadingModel threadingModel, EventLoop eventLoop, CloseFuture closeFuture, Deployment deployment, ClassLoader tccl, EventExecutor eventExecutor, WorkerPool wp) { + return new ContextImpl(this, contextLocalsLength, threadingModel, eventLoop, eventExecutor, internalWorkerPool, wp, deployment, closeFuture, disableTCCL ? null : tccl); } @Override diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutor.java b/src/main/java/io/vertx/core/impl/WorkerExecutor.java index 4cb5725244a..74b20aa7279 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutor.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutor.java @@ -10,6 +10,7 @@ */ package io.vertx.core.impl; +import io.vertx.core.Future; import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; import io.vertx.core.spi.metrics.PoolMetrics; @@ -43,10 +44,10 @@ public static io.vertx.core.impl.WorkerExecutor unwrapWorkerExecutor() { } private final WorkerPool workerPool; - private final TaskQueue orderedTasks; + private final WorkerTaskQueue orderedTasks; private final ThreadLocal inThread = new ThreadLocal<>(); - public WorkerExecutor(WorkerPool workerPool, TaskQueue orderedTasks) { + public WorkerExecutor(WorkerPool workerPool, WorkerTaskQueue orderedTasks) { this.workerPool = workerPool; this.orderedTasks = orderedTasks; } @@ -75,6 +76,10 @@ protected void execute() { orderedTasks.execute(task, workerPool.executor()); } + WorkerTaskQueue taskQueue() { + return orderedTasks; + } + /** * Suspend the current task execution until the task is resumed, the next task in the queue will be executed * when there is one. diff --git a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java index bc5f03588a6..108beec1ddc 100644 --- a/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java +++ b/src/main/java/io/vertx/core/impl/WorkerExecutorImpl.java @@ -66,14 +66,14 @@ public WorkerPool getPool() { } ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context; - return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); + return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.executeBlockingTasks : null); } @Override public Future<@Nullable T> executeBlocking(Callable blockingCodeHandler, boolean ordered) { ContextInternal context = vertx.getOrCreateContext(); ContextImpl impl = context instanceof DuplicatedContext ? ((DuplicatedContext)context).delegate : (ContextImpl) context; - return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.orderedTasks : null); + return ContextImpl.executeBlocking(context, blockingCodeHandler, pool, ordered ? impl.executeBlockingTasks : null); } public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, Handler> asyncResultHandler) { diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java index c74575dbc6a..599a3897826 100644 --- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java +++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java @@ -40,7 +40,6 @@ public static ContextInternal create(Vertx vertx) { EXECUTOR, impl.internalWorkerPool, impl.workerPool, - new WorkerTaskQueue(), null, null, Thread.currentThread().getContextClassLoader() diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java index 76bb7f5879a..2ebf178aacb 100644 --- a/src/test/java/io/vertx/core/ContextTest.java +++ b/src/test/java/io/vertx/core/ContextTest.java @@ -441,6 +441,27 @@ public void testInternalExecuteBlockingWithQueue(List { + ctx.executeBlocking(() -> { + latch.countDown(); + return 0; + }); + boolean timedOut; + try { + timedOut = !latch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertFalse(timedOut); + testComplete(); + }); + await(); + } + @Test public void testEventLoopContextDispatchReportsFailure() { ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext(); diff --git a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java index baea26376bd..dbfc25b79df 100644 --- a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java +++ b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java @@ -164,27 +164,25 @@ public void testUnordered() throws Exception { public void testUseDifferentExecutorWithSameTaskQueue() throws Exception { int count = 10; waitFor(count); - vertx.deployVerticle(new AbstractVerticle() { - @Override - public void start() throws Exception { - WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-the-executor"); - Thread startThread = Thread.currentThread(); - AtomicReference currentThread = new AtomicReference<>(); - for (int i = 0;i < count;i++) { - int val = i; - exec.executeBlocking(fut -> { - Thread current = Thread.currentThread(); - assertNotSame(startThread, current); - if (val == 0) { - assertNull(currentThread.getAndSet(current)); - } else { - assertSame(current, currentThread.get()); - } - fut.complete(); - }, true, onSuccess(v -> complete())); + WorkerExecutor exec = vertx.createSharedWorkerExecutor("vert.x-the-executor"); + Thread startThread = Thread.currentThread(); + AtomicReference currentThread = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0;i < count;i++) { + int val = i; + exec.executeBlocking(() -> { + Thread current = Thread.currentThread(); + assertNotSame(startThread, current); + if (val == 0) { + assertNull(currentThread.getAndSet(current)); + awaitLatch(latch); + } else { + assertSame(current, currentThread.get()); } - } - }, new DeploymentOptions().setWorker(true), onSuccess(id -> {})); + return null; + }, true).onComplete(onSuccess(v -> complete())); + latch.countDown(); + } await(); }