From 21e1efdba0625ff2b57d67db0a515a258f8d488b Mon Sep 17 00:00:00 2001 From: dreamlike_ocean Date: Fri, 11 Oct 2024 23:58:46 +0800 Subject: [PATCH 1/2] add new ThreadingModel to support reuse current eventLoop for deploying verticle --- .../java/io/vertx/core/ThreadingModel.java | 5 ++ .../java/io/vertx/core/impl/VertxImpl.java | 1 + .../core/impl/deployment/Deployment.java | 3 + .../CurrentEventLoopDeploymentTest.java | 58 +++++++++++++++++++ 4 files changed, 67 insertions(+) create mode 100644 vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java diff --git a/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java b/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java index 877a47539ae..aceca3d451d 100644 --- a/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java +++ b/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java @@ -22,6 +22,11 @@ public enum ThreadingModel { */ EVENT_LOOP, + /** + * Tasks are scheduled on the current event-loop thread. + */ + CURRENT_EVENT_LOOP, + /** * Tasks are scheduled on a worker pool of platform threads managed by the vertx instance. */ diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index ccfc8614b0e..1767c44711f 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -584,6 +584,7 @@ public ContextImpl createContext(ThreadingModel threadingModel, WorkerPool wp; switch (threadingModel) { case EVENT_LOOP: + case CURRENT_EVENT_LOOP: wp = workerPool != null ? workerPool : this.workerPool; eventExecutor = eventLoopExecutor; break; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java index 032d399373e..67404989d51 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java @@ -141,6 +141,9 @@ public Future deploy(DeploymentContext deployment) { context = vertx.createVirtualThreadContext(deployment, closeFuture, workerLoop, tccl); } break; + case CURRENT_EVENT_LOOP: + context = vertx.createContext(ThreadingModel.CURRENT_EVENT_LOOP, vertx.getOrCreateContext().nettyEventLoop(),closeFuture, workerPool, deployment, tccl); + break; default: context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); break; diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java new file mode 100644 index 00000000000..5b9c2f52a69 --- /dev/null +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java @@ -0,0 +1,58 @@ +package io.vertx.tests.deployment; + +import io.netty.channel.EventLoop; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.ThreadingModel; +import io.vertx.core.internal.ContextInternal; +import org.junit.Test; + +/** + * @author dremalike + */ +public class CurrentEventLoopDeploymentTest extends AbstractVerticleTest { + + @Test + public void testDeploy() throws InterruptedException { + waitFor(1); + ContextInternal currentContext = (ContextInternal) vertx.getOrCreateContext(); + EventLoop targetEventLoop = currentContext.nettyEventLoop(); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + EventLoop currentEventLoop = ((ContextInternal) context).nettyEventLoop(); + assertEquals(targetEventLoop, currentEventLoop); + assertNotSame(currentContext, context); + } + + }, new DeploymentOptions().setThreadingModel(ThreadingModel.CURRENT_EVENT_LOOP)) + .onSuccess(this::assertNotNull) + .onFailure(this::fail) + .onComplete(s -> testComplete()); + + await(); + } + + @Test + public void testExecuteBlocking() { + waitFor(1); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start(Promise startPromise) throws Exception { + Thread eventLoopThread = Thread.currentThread(); + vertx.executeBlocking(() -> { + assertNotSame(eventLoopThread, Thread.currentThread()); + startPromise.complete(); + return null; + }); + } + }, new DeploymentOptions().setThreadingModel(ThreadingModel.CURRENT_EVENT_LOOP)) + .onSuccess(this::assertNotNull) + .onFailure(this::fail) + .onComplete(s -> testComplete()); + await(); + } + + +} From 5c86bfd295b3b38fe428bfe156224a6aca50c698 Mon Sep 17 00:00:00 2001 From: dreamlike_ocean Date: Sat, 12 Oct 2024 22:10:32 +0800 Subject: [PATCH 2/2] By using the reuseCurrentEventLoop deployment option to control whether to reuse the current EventLoop instead of using the ThreadModel. --- .../core/DeploymentOptionsConverter.java | 6 +++++ .../java/io/vertx/core/DeploymentOptions.java | 23 +++++++++++++++++++ .../java/io/vertx/core/ThreadingModel.java | 5 ---- .../java/io/vertx/core/impl/VertxImpl.java | 1 - .../core/impl/deployment/Deployment.java | 7 +++--- .../io/vertx/core/internal/VertxInternal.java | 7 ++++++ .../CurrentEventLoopDeploymentTest.java | 4 ++-- 7 files changed, 41 insertions(+), 12 deletions(-) diff --git a/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java b/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java index b498e0b5be9..d0c8306c568 100644 --- a/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java +++ b/vertx-core/src/main/generated/io/vertx/core/DeploymentOptionsConverter.java @@ -54,6 +54,11 @@ static void fromJson(Iterable> json, Deploym obj.setMaxWorkerExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); } break; + case "reuseCurrentEventLoop": + if (member.getValue() instanceof Boolean) { + obj.setReuseCurrentEventLoop((Boolean)member.getValue()); + } + break; } } } @@ -79,5 +84,6 @@ static void toJson(DeploymentOptions obj, java.util.Map json) { if (obj.getMaxWorkerExecuteTimeUnit() != null) { json.put("maxWorkerExecuteTimeUnit", obj.getMaxWorkerExecuteTimeUnit().name()); } + json.put("reuseCurrentEventLoop", obj.getReuseCurrentEventLoop()); } } diff --git a/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java b/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java index 6e6ffb6f6b3..2a03e1e4468 100644 --- a/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/DeploymentOptions.java @@ -32,6 +32,7 @@ public class DeploymentOptions { public static final boolean DEFAULT_WORKER = false; public static final boolean DEFAULT_HA = false; public static final int DEFAULT_INSTANCES = 1; + public static final boolean DEFAULT_REUSE_CURRENT_EVENT_LOOP = false; private JsonObject config; private ThreadingModel threadingModel; @@ -42,6 +43,7 @@ public class DeploymentOptions { private int workerPoolSize; private long maxWorkerExecuteTime; private TimeUnit maxWorkerExecuteTimeUnit; + private boolean reuseCurrentEventLoop; /** * Default constructor @@ -54,6 +56,7 @@ public DeploymentOptions() { this.workerPoolSize = VertxOptions.DEFAULT_WORKER_POOL_SIZE; this.maxWorkerExecuteTime = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME; this.maxWorkerExecuteTimeUnit = VertxOptions.DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; + this.reuseCurrentEventLoop = DEFAULT_REUSE_CURRENT_EVENT_LOOP; } /** @@ -292,6 +295,26 @@ public DeploymentOptions setClassLoader(ClassLoader classLoader) { return this; } + /** + * @return true if the verticle should reuse the current event loop + */ + public boolean getReuseCurrentEventLoop() { + return reuseCurrentEventLoop; + } + + /** + * Set whether the verticle should reuse the current event loop + *

when reuseCurrentEventLoop is set to true, the verticle will be deployed on the current event loop. + * If no Context is set for the current thread, a Context will be created for the current thread. + * + * @param reuseCurrentEventLoop true if the verticle should reuse the current event loop + * @return a reference to this, so the API can be used fluently + */ + public DeploymentOptions setReuseCurrentEventLoop(boolean reuseCurrentEventLoop) { + this.reuseCurrentEventLoop = reuseCurrentEventLoop; + return this; + } + /** * Convert this to JSON * diff --git a/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java b/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java index aceca3d451d..877a47539ae 100644 --- a/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java +++ b/vertx-core/src/main/java/io/vertx/core/ThreadingModel.java @@ -22,11 +22,6 @@ public enum ThreadingModel { */ EVENT_LOOP, - /** - * Tasks are scheduled on the current event-loop thread. - */ - CURRENT_EVENT_LOOP, - /** * Tasks are scheduled on a worker pool of platform threads managed by the vertx instance. */ diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index 1767c44711f..ccfc8614b0e 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -584,7 +584,6 @@ public ContextImpl createContext(ThreadingModel threadingModel, WorkerPool wp; switch (threadingModel) { case EVENT_LOOP: - case CURRENT_EVENT_LOOP: wp = workerPool != null ? workerPool : this.workerPool; eventExecutor = eventLoopExecutor; break; diff --git a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java index 67404989d51..e8f60f0e808 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/deployment/Deployment.java @@ -141,11 +141,10 @@ public Future deploy(DeploymentContext deployment) { context = vertx.createVirtualThreadContext(deployment, closeFuture, workerLoop, tccl); } break; - case CURRENT_EVENT_LOOP: - context = vertx.createContext(ThreadingModel.CURRENT_EVENT_LOOP, vertx.getOrCreateContext().nettyEventLoop(),closeFuture, workerPool, deployment, tccl); - break; default: - context = vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); + context = options.getReuseCurrentEventLoop() + ? vertx.createEventLoopContext(vertx.getOrCreateContext().nettyEventLoop(), deployment,closeFuture,workerPool, tccl) + : vertx.createEventLoopContext(deployment, closeFuture, workerPool, tccl); break; } Instance instance = new Instance(verticle, context); diff --git a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java index 9ec4f47c3fe..734d193b06f 100644 --- a/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java +++ b/vertx-core/src/main/java/io/vertx/core/internal/VertxInternal.java @@ -166,6 +166,13 @@ default ContextInternal createEventLoopContext(DeploymentContext deployment, Clo return createContext(ThreadingModel.EVENT_LOOP, deployment, closeFuture, workerPool, tccl); } + /** + * @return event loop context + */ + default ContextInternal createEventLoopContext(EventLoop eventLoop, DeploymentContext deployment, CloseFuture closeFuture, WorkerPool workerPool, ClassLoader tccl) { + return createContext(ThreadingModel.EVENT_LOOP, eventLoop, closeFuture, workerPool, deployment, tccl); + } + /** * @return event loop context */ diff --git a/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java index 5b9c2f52a69..61d4fc8eb17 100644 --- a/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java +++ b/vertx-core/src/test/java/io/vertx/tests/deployment/CurrentEventLoopDeploymentTest.java @@ -26,7 +26,7 @@ public void start() { assertNotSame(currentContext, context); } - }, new DeploymentOptions().setThreadingModel(ThreadingModel.CURRENT_EVENT_LOOP)) + }, new DeploymentOptions().setReuseCurrentEventLoop(true)) .onSuccess(this::assertNotNull) .onFailure(this::fail) .onComplete(s -> testComplete()); @@ -47,7 +47,7 @@ public void start(Promise startPromise) throws Exception { return null; }); } - }, new DeploymentOptions().setThreadingModel(ThreadingModel.CURRENT_EVENT_LOOP)) + }, new DeploymentOptions().setReuseCurrentEventLoop(true)) .onSuccess(this::assertNotNull) .onFailure(this::fail) .onComplete(s -> testComplete());