From 7c91ee49213b1f73a30e7fd5964e0dbd38304a37 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 25 May 2023 09:35:03 +0200 Subject: [PATCH 01/12] Protocol in plantuml --- .../java/io/vertx/core/eventbus/eventbus.md | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 src/test/java/io/vertx/core/eventbus/eventbus.md diff --git a/src/test/java/io/vertx/core/eventbus/eventbus.md b/src/test/java/io/vertx/core/eventbus/eventbus.md new file mode 100644 index 00000000000..deeebb32efc --- /dev/null +++ b/src/test/java/io/vertx/core/eventbus/eventbus.md @@ -0,0 +1,52 @@ +```plantuml +title Send +hide footbox +participant Producer as P +[-> P: send(message) +participant Consumer as C +P -> C: MSG(request) +``` + +```plantuml +title Request and response +hide footbox +participant Producer as P +[-> P: send(request) +create Source as S +P --> S: bind ephemeral address +participant Consumer as C +P -> C: SYN(Source)/MSG(request) +S <- C: FIN/MSG(response) +Destroy S +[<- S: reply(response) +``` + +```plantuml +title General case +hide footbox +participant Producer as P +[-> P: begin +create Source as S +P --> S: src = bind ephemeral address +participant Consumer as C +P -> C: SYN(Source) +create Destination as D +C --> D: bind ephemeral address +S <- C: ACK(Destination) +S -> D: MSG +D -> S: MSG +S -> D: MSG +S -> D: FIN +Destroy D +D -> S: MSG +D -> S: MSG +D -> S: FIN +Destroy S +``` + +. Todo request and reply +. Fragmentation +. Gauge interest +.. Pascal K. +.. Stephane Martin +. service-proxy usage From 3c3753e880f41a6df4387af1895c9ef9d2211937 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 22 May 2023 10:21:45 +0200 Subject: [PATCH 02/12] Decouple from outbound delivery context --- .../core/eventbus/impl/EventBusImpl.java | 34 ++++++---- .../eventbus/impl/MessageProducerImpl.java | 10 +-- .../impl/OutboundDeliveryContext.java | 27 ++++++-- .../impl/clustered/ClusteredEventBus.java | 66 +++++++++---------- .../impl/clustered/ConnectionHolder.java | 31 +++++---- 5 files changed, 94 insertions(+), 74 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 9b95ab0b4c2..1a62b44bd4b 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -116,7 +116,7 @@ public EventBus send(String address, Object message) { @Override public EventBus send(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); - sendOrPubInternal(msg, options, null, null); + sendOrPubInternal(msg, options, null); return this; } @@ -124,7 +124,7 @@ public EventBus send(String address, Object message, DeliveryOptions options) { public Future> request(String address, Object message, DeliveryOptions options) { MessageImpl msg = createMessage(true, address, options.getHeaders(), message, options.getCodecName()); ReplyHandler handler = createReplyHandler(msg, true, options); - sendOrPubInternal(msg, options, handler, null); + sendOrPubInternal(msg, options, handler); return handler.result(); } @@ -161,7 +161,7 @@ public EventBus publish(String address, Object message) { @Override public EventBus publish(String address, Object message, DeliveryOptions options) { - sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null, null); + sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null); return this; } @@ -321,20 +321,24 @@ protected void sendReply(MessageImpl replyMessage, DeliveryOptions options, if (replyMessage.address() == null) { throw new IllegalStateException("address not specified"); } else { - sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler, null)); + sendOrPubInternal(new OutboundDeliveryContext<>(vertx.getOrCreateContext(), replyMessage, options, replyHandler)); } } + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + sendLocally(message, options, writePromise); + } + protected void sendOrPub(OutboundDeliveryContext sendContext) { - sendLocally(sendContext); + sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext); } - private void sendLocally(OutboundDeliveryContext sendContext) { - ReplyException failure = deliverMessageLocally(sendContext.message); + protected void sendLocally(MessageImpl message, DeliveryOptions options, Promise writePromise) { + ReplyException failure = deliverMessageLocally(message); if (failure != null) { - sendContext.written(failure); + writePromise.tryFail(failure); } else { - sendContext.written(null); + writePromise.tryComplete(); } } @@ -403,8 +407,8 @@ ReplyHandler createReplyHandler(MessageImpl message, } public OutboundDeliveryContext newSendContext(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { - return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler, writePromise); + ReplyHandler handler) { + return new OutboundDeliveryContext<>(vertx.getOrCreateContext(), message, options, handler); } public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { @@ -414,10 +418,12 @@ public void sendOrPubInternal(OutboundDeliveryContext senderCtx) { senderCtx.next(); } - public void sendOrPubInternal(MessageImpl message, DeliveryOptions options, - ReplyHandler handler, Promise writePromise) { + public Future sendOrPubInternal(MessageImpl message, DeliveryOptions options, + ReplyHandler handler) { checkStarted(); - sendOrPubInternal(newSendContext(message, options, handler, writePromise)); + OutboundDeliveryContext ctx = newSendContext(message, options, handler); + sendOrPubInternal(ctx); + return ctx.writePromise.future(); } private Future unregisterAll() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java index 41d4bb2be81..18f02b646f6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageProducerImpl.java @@ -45,14 +45,8 @@ public synchronized MessageProducer deliveryOptions(DeliveryOptions options) @Override public Future write(T body) { - Promise promise = ((VertxInternal)vertx).getOrCreateContext().promise(); - write(body, promise); - return promise.future(); - } - - private void write(T data, Promise handler) { - MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName()); - bus.sendOrPubInternal(msg, options, null, handler); + MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), body, options.getCodecName()); + return bus.sendOrPubInternal(msg, options, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java index 657ae791543..29c26f40803 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java +++ b/src/main/java/io/vertx/core/eventbus/impl/OutboundDeliveryContext.java @@ -11,6 +11,7 @@ package io.vertx.core.eventbus.impl; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.core.eventbus.DeliveryOptions; @@ -25,31 +26,43 @@ import java.util.function.BiConsumer; -public class OutboundDeliveryContext extends DeliveryContextBase implements Handler> { +public class OutboundDeliveryContext extends DeliveryContextBase implements Promise { public final ContextInternal ctx; public final DeliveryOptions options; public final ReplyHandler replyHandler; - private final Promise writePromise; + public final Promise writePromise; private boolean src; EventBusImpl bus; EventBusMetrics metrics; - OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler, Promise writePromise) { + OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler replyHandler) { super(message, message.bus.outboundInterceptors(), ctx); this.ctx = ctx; this.options = options; this.replyHandler = replyHandler; - this.writePromise = writePromise; + this.writePromise = ctx.promise(); } @Override - public void handle(AsyncResult event) { - written(event.cause()); + public boolean tryComplete(Void result) { + written(null); + return true; } - public void written(Throwable failure) { + @Override + public boolean tryFail(Throwable cause) { + written(cause); + return false; + } + + @Override + public Future future() { + throw new UnsupportedOperationException(); + } + + private void written(Throwable failure) { // Metrics if (metrics != null) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 3fe279ecc00..ffb970a3edf 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -11,12 +11,10 @@ package io.vertx.core.eventbus.impl.clustered; -import io.vertx.core.Handler; -import io.vertx.core.MultiMap; -import io.vertx.core.Promise; -import io.vertx.core.VertxOptions; +import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.AddressHelper; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.core.eventbus.MessageCodec; import io.vertx.core.eventbus.impl.CodecManager; @@ -24,11 +22,11 @@ import io.vertx.core.eventbus.impl.HandlerHolder; import io.vertx.core.eventbus.impl.HandlerRegistration; import io.vertx.core.eventbus.impl.MessageImpl; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.impl.utils.ConcurrentCyclicSequence; @@ -193,42 +191,42 @@ protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise } @Override - protected void sendOrPub(OutboundDeliveryContext sendContext) { - if (((ClusteredMessage) sendContext.message).getRepliedTo() != null) { - clusteredSendReply(((ClusteredMessage) sendContext.message).getRepliedTo(), sendContext); - } else if (sendContext.options.isLocalOnly()) { - super.sendOrPub(sendContext); + protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { + if (((ClusteredMessage) message).getRepliedTo() != null) { + clusteredSendReply(message, options, writePromise, ((ClusteredMessage) message).getRepliedTo()); + } else if (options.isLocalOnly()) { + super.sendOrPub(ctx, message, options, writePromise); } else { - Serializer serializer = Serializer.get(sendContext.ctx); - if (sendContext.message.isSend()) { - Promise promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForSend, promise); + Serializer serializer = Serializer.get(ctx); + if (message.isSend()) { + Promise promise = Promise.promise(); + serializer.queue(message, nodeSelector::selectForSend, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNode(sendContext, ar.result()); + sendToNode(ar.result(), message, options, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } else { - Promise> promise = sendContext.ctx.promise(); - serializer.queue(sendContext.message, nodeSelector::selectForPublish, promise); + Promise> promise = Promise.promise(); + serializer.queue(message, nodeSelector::selectForPublish, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNodes(sendContext, ar.result()); + sendToNodes(ar.result(), message, options, writePromise); } else { - sendOrPublishFailed(sendContext, ar.cause()); + sendOrPublishFailed(writePromise, ar.cause()); } }); } } } - private void sendOrPublishFailed(OutboundDeliveryContext sendContext, Throwable cause) { + private void sendOrPublishFailed(Promise promise, Throwable cause) { if (log.isDebugEnabled()) { log.error("Failed to send message", cause); } - sendContext.written(cause); + promise.tryFail(cause); } @Override @@ -329,39 +327,39 @@ public void handle(Buffer buff) { }; } - private void sendToNode(OutboundDeliveryContext sendContext, String nodeId) { + private void sendToNode(String nodeId, MessageImpl message, DeliveryOptions options, Promise writePromise) { if (nodeId != null && !nodeId.equals(this.nodeId)) { - sendRemote(sendContext, nodeId, sendContext.message); + sendRemote(nodeId, message, options, writePromise); } else { - super.sendOrPub(sendContext); + super.sendOrPub(ebContext, message, options, writePromise); } } - private void sendToNodes(OutboundDeliveryContext sendContext, Iterable nodeIds) { + private void sendToNodes(Iterable nodeIds, MessageImpl message, DeliveryOptions options, Promise writePromise) { boolean sentRemote = false; if (nodeIds != null) { for (String nid : nodeIds) { if (!sentRemote) { sentRemote = true; } - sendToNode(sendContext, nid); + // Write promise might be completed several times!!!! + sendToNode(nid, message, options, writePromise); } } if (!sentRemote) { - super.sendOrPub(sendContext); + super.sendOrPub(ebContext, message, options, writePromise); } } - private void clusteredSendReply(String replyDest, OutboundDeliveryContext sendContext) { - MessageImpl message = sendContext.message; + private void clusteredSendReply(MessageImpl message, DeliveryOptions options, Promise writePromise, String replyDest) { if (!replyDest.equals(nodeId)) { - sendRemote(sendContext, replyDest, message); + sendRemote(replyDest, message, options, writePromise); } else { - super.sendOrPub(sendContext); + super.sendOrPub(ebContext, message, options, writePromise); } } - private void sendRemote(OutboundDeliveryContext sendContext, String remoteNodeId, MessageImpl message) { + private void sendRemote(String remoteNodeId, MessageImpl message, DeliveryOptions options, Promise writePromise) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. @@ -380,7 +378,7 @@ private void sendRemote(OutboundDeliveryContext sendContext, String remoteNod holder.connect(); } } - holder.writeMessage(sendContext); + holder.writeMessage(message, writePromise); } ConcurrentMap connections() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 07348e82440..7301756b6c6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -14,7 +14,7 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBusOptions; -import io.vertx.core.eventbus.impl.OutboundDeliveryContext; +import io.vertx.core.eventbus.impl.MessageImpl; import io.vertx.core.eventbus.impl.codecs.PingMessageCodec; import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.logging.Logger; @@ -41,12 +41,21 @@ class ConnectionHolder { private final VertxInternal vertx; private final EventBusMetrics metrics; - private Queue> pending; + private Queue pending; private NetSocket socket; private boolean connected; private long timeoutID = -1; private long pingTimeoutID = -1; + private static class SomeTask { + final MessageImpl message; + final Promise writePromise; + SomeTask(MessageImpl message, Promise writePromise) { + this.message = message; + this.writePromise = writePromise; + } + } + ConnectionHolder(ClusteredEventBus eventBus, String remoteNodeId) { this.eventBus = eventBus; this.remoteNodeId = remoteNodeId; @@ -70,13 +79,13 @@ void connect() { } // TODO optimise this (contention on monitor) - synchronized void writeMessage(OutboundDeliveryContext ctx) { + synchronized void writeMessage(MessageImpl message, Promise writePromise) { if (connected) { - Buffer data = ((ClusteredMessage) ctx.message).encodeToWire(); + Buffer data = ((ClusteredMessage) message).encodeToWire(); if (metrics != null) { - metrics.messageWritten(ctx.message.address(), data.length()); + metrics.messageWritten(message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(writePromise); } else { if (pending == null) { if (log.isDebugEnabled()) { @@ -84,7 +93,7 @@ synchronized void writeMessage(OutboundDeliveryContext ctx) { } pending = new ArrayDeque<>(); } - pending.add(ctx); + pending.add(new SomeTask(message, writePromise)); } } @@ -100,10 +109,10 @@ private void close(Throwable cause) { vertx.cancelTimer(pingTimeoutID); } synchronized (this) { - OutboundDeliveryContext msg; + SomeTask msg; if (pending != null) { while ((msg = pending.poll()) != null) { - msg.written(cause); + msg.writePromise.tryFail(cause); } } } @@ -150,12 +159,12 @@ private synchronized void connected(NetSocket socket) { if (log.isDebugEnabled()) { log.debug("Draining the queue for server " + remoteNodeId); } - for (OutboundDeliveryContext ctx : pending) { + for (SomeTask ctx : pending) { Buffer data = ((ClusteredMessage)ctx.message).encodeToWire(); if (metrics != null) { metrics.messageWritten(ctx.message.address(), data.length()); } - socket.write(data).onComplete(ctx); + socket.write(data).onComplete(ctx.writePromise); } } pending = null; From 19c96f074c548be30f99d32350b4fb0dd5f9756f Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 22 May 2023 13:48:54 +0200 Subject: [PATCH 03/12] Decouple HandlerRegistration from HandlerHolder --- .../java/io/vertx/core/eventbus/impl/EventBusImpl.java | 7 +++++-- .../io/vertx/core/eventbus/impl/HandlerRegistration.java | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 1a62b44bd4b..aca3afb9623 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -257,10 +258,12 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, return msg; } - protected HandlerHolder addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { + protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { HandlerHolder holder = addLocalRegistration(address, registration, replyHandler, localOnly); onLocalRegistration(holder, promise); - return holder; + return p -> { + removeRegistration(holder, p); + }; } protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index a1c15ea6c6d..81da3106d93 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -20,13 +20,15 @@ import io.vertx.core.spi.tracing.VertxTracer; import io.vertx.core.tracing.TracingPolicy; +import java.util.function.Consumer; + public abstract class HandlerRegistration implements Closeable { public final ContextInternal context; public final EventBusImpl bus; public final String address; public final boolean src; - private HandlerHolder registered; + private Consumer> registered; private Object metric; HandlerRegistration(ContextInternal context, @@ -74,7 +76,7 @@ public Future unregister() { Promise promise = context.promise(); synchronized (this) { if (registered != null) { - bus.removeRegistration(registered, promise); + registered.accept(promise); registered = null; if (bus.metrics != null) { bus.metrics.handlerUnregistered(metric); From 272a328ece0b368e1168e720e8b627732a47de1a Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 23 May 2023 17:18:32 +0200 Subject: [PATCH 04/12] Some --- src/test/java/io/vertx/core/eventbus/eventbus.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/vertx/core/eventbus/eventbus.md b/src/test/java/io/vertx/core/eventbus/eventbus.md index deeebb32efc..a9332bf4b68 100644 --- a/src/test/java/io/vertx/core/eventbus/eventbus.md +++ b/src/test/java/io/vertx/core/eventbus/eventbus.md @@ -44,9 +44,10 @@ D -> S: FIN Destroy S ``` -. Todo request and reply -. Fragmentation -. Gauge interest -.. Pascal K. -.. Stephane Martin -. service-proxy usage +## Todo + +- Fragmentation +- Gauge interest + - Pascal K. + - Stephane Martin +- service-proxy usage From 9980a85dc3f8e77b0d08ea3be20a00b0dd0accb1 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 23 May 2023 17:41:50 +0200 Subject: [PATCH 05/12] Remove DeliveryOptions wherever possible --- .../core/eventbus/impl/EventBusImpl.java | 4 +-- .../impl/clustered/ClusteredEventBus.java | 29 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index aca3afb9623..fbca1f9f1d2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -329,14 +329,14 @@ protected void sendReply(MessageImpl replyMessage, DeliveryOptions options, } protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { - sendLocally(message, options, writePromise); + sendLocally(message, writePromise); } protected void sendOrPub(OutboundDeliveryContext sendContext) { sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext); } - protected void sendLocally(MessageImpl message, DeliveryOptions options, Promise writePromise) { + protected void sendLocally(MessageImpl message, Promise writePromise) { ReplyException failure = deliverMessageLocally(message); if (failure != null) { writePromise.tryFail(failure); diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index ffb970a3edf..6667eca0a57 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -26,7 +26,6 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; -import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.impl.logging.Logger; import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.impl.utils.ConcurrentCyclicSequence; @@ -193,9 +192,9 @@ protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise @Override protected void sendOrPub(ContextInternal ctx, MessageImpl message, DeliveryOptions options, Promise writePromise) { if (((ClusteredMessage) message).getRepliedTo() != null) { - clusteredSendReply(message, options, writePromise, ((ClusteredMessage) message).getRepliedTo()); + clusteredSendReply(message, writePromise, ((ClusteredMessage) message).getRepliedTo()); } else if (options.isLocalOnly()) { - super.sendOrPub(ctx, message, options, writePromise); + sendLocally(message, writePromise); } else { Serializer serializer = Serializer.get(ctx); if (message.isSend()) { @@ -203,7 +202,7 @@ protected void sendOrPub(ContextInternal ctx, MessageImpl message, Del serializer.queue(message, nodeSelector::selectForSend, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNode(ar.result(), message, options, writePromise); + sendToNode(ar.result(), message, writePromise); } else { sendOrPublishFailed(writePromise, ar.cause()); } @@ -213,7 +212,7 @@ protected void sendOrPub(ContextInternal ctx, MessageImpl message, Del serializer.queue(message, nodeSelector::selectForPublish, promise); promise.future().onComplete(ar -> { if (ar.succeeded()) { - sendToNodes(ar.result(), message, options, writePromise); + sendToNodes(ar.result(), message, writePromise); } else { sendOrPublishFailed(writePromise, ar.cause()); } @@ -327,15 +326,15 @@ public void handle(Buffer buff) { }; } - private void sendToNode(String nodeId, MessageImpl message, DeliveryOptions options, Promise writePromise) { + private void sendToNode(String nodeId, MessageImpl message, Promise writePromise) { if (nodeId != null && !nodeId.equals(this.nodeId)) { - sendRemote(nodeId, message, options, writePromise); + sendRemote(nodeId, message, writePromise); } else { - super.sendOrPub(ebContext, message, options, writePromise); + sendLocally(message, writePromise); } } - private void sendToNodes(Iterable nodeIds, MessageImpl message, DeliveryOptions options, Promise writePromise) { + private void sendToNodes(Iterable nodeIds, MessageImpl message, Promise writePromise) { boolean sentRemote = false; if (nodeIds != null) { for (String nid : nodeIds) { @@ -343,23 +342,23 @@ private void sendToNodes(Iterable nodeIds, MessageImpl message sentRemote = true; } // Write promise might be completed several times!!!! - sendToNode(nid, message, options, writePromise); + sendToNode(nid, message, writePromise); } } if (!sentRemote) { - super.sendOrPub(ebContext, message, options, writePromise); + sendLocally(message, writePromise); } } - private void clusteredSendReply(MessageImpl message, DeliveryOptions options, Promise writePromise, String replyDest) { + private void clusteredSendReply(MessageImpl message, Promise writePromise, String replyDest) { if (!replyDest.equals(nodeId)) { - sendRemote(replyDest, message, options, writePromise); + sendRemote(replyDest, message, writePromise); } else { - super.sendOrPub(ebContext, message, options, writePromise); + sendLocally(message, writePromise); } } - private void sendRemote(String remoteNodeId, MessageImpl message, DeliveryOptions options, Promise writePromise) { + private void sendRemote(String remoteNodeId, MessageImpl message, Promise writePromise) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. From 20746ba04586f049470f209ccb4dc58907f05865 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 11:46:11 +0200 Subject: [PATCH 06/12] Node selector should care about addresses instead of messages --- .../core/eventbus/impl/clustered/Serializer.java | 10 +++++----- .../java/io/vertx/core/spi/cluster/NodeSelector.java | 9 ++------- .../core/spi/cluster/impl/DefaultNodeSelector.java | 12 ++++-------- .../core/spi/cluster/impl/selector/Selectors.java | 3 +-- .../vertx/core/eventbus/CustomNodeSelectorTest.java | 4 ++-- .../eventbus/MessageQueueOnWorkerThreadTest.java | 4 ++-- .../core/eventbus/WriteHandlerLookupFailureTest.java | 4 ++-- .../vertx/core/spi/cluster/WrappedNodeSelector.java | 8 ++++---- 8 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java index 145011725b9..08d0e403ae8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java @@ -60,7 +60,7 @@ public static Serializer get(ContextInternal context) { return serializer; } - public void queue(Message message, BiConsumer, Promise> selectHandler, Promise promise) { + public void queue(Message message, BiConsumer> selectHandler, Promise promise) { ctx.emit(v -> { String address = message.address(); SerializerQueue queue = queues.computeIfAbsent(address, SerializerQueue::new); @@ -110,7 +110,7 @@ void checkPending() { } } - void add(Message msg, BiConsumer, Promise> selectHandler, Promise promise) { + void add(Message msg, BiConsumer> selectHandler, Promise promise) { SerializedTask serializedTask = new SerializedTask<>(ctx, msg, selectHandler); Future fut = serializedTask.internalPromise.future(); fut.onComplete(promise); @@ -136,20 +136,20 @@ void close() { private class SerializedTask implements Handler> { final Message msg; - final BiConsumer, Promise> selectHandler; + final BiConsumer> selectHandler; final Promise internalPromise; SerializedTask( ContextInternal context, Message msg, - BiConsumer, Promise> selectHandler) { + BiConsumer> selectHandler) { this.msg = msg; this.selectHandler = selectHandler; this.internalPromise = context.promise(); } void process() { - selectHandler.accept(msg, internalPromise); + selectHandler.accept(msg.address(), internalPromise); } @Override diff --git a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java index 062322c24cf..f767a029a6e 100644 --- a/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/NodeSelector.java @@ -13,7 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; import io.vertx.core.impl.VertxBuilder; import io.vertx.core.spi.VertxServiceProvider; @@ -47,20 +46,16 @@ default void init(VertxBuilder builder) { * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code false} */ - void selectForSend(Message message, Promise promise); + void selectForSend(String address, Promise promise); /** * Select a node for publishing the given {@code message}. * *

The provided {@code promise} needs to be completed with {@link Promise#tryComplete} and {@link Promise#tryFail} * as it might completed outside the selector. - * - * @throws IllegalArgumentException if {@link Message#isSend()} returns {@code true} */ - void selectForPublish(Message message, Promise> promise); + void selectForPublish(String address, Promise> promise); /** * Invoked by the {@link ClusterManager} when messaging handler registrations are added or removed. diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java index 6198477d340..72007135316 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java @@ -13,8 +13,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.eventbus.Message; -import io.vertx.core.impl.Arguments; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.core.spi.cluster.NodeSelector; import io.vertx.core.spi.cluster.RegistrationUpdateEvent; @@ -37,17 +35,15 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - Arguments.require(message.isSend(), "selectForSend used for publishing"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForSend(String address, Promise promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForSend()); }); } @Override - public void selectForPublish(Message message, Promise> promise) { - Arguments.require(!message.isSend(), "selectForPublish used for sending"); - selectors.withSelector(message, promise, (prom, selector) -> { + public void selectForPublish(String address, Promise> promise) { + selectors.withSelector(address, promise, (prom, selector) -> { prom.tryComplete(selector.selectForPublish()); }); } diff --git a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java index 2244e85cc6f..0e5423e0153 100644 --- a/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java +++ b/src/main/java/io/vertx/core/spi/cluster/impl/selector/Selectors.java @@ -35,8 +35,7 @@ public Selectors(ClusterManager clusterManager) { this.clusterManager = clusterManager; } - public void withSelector(Message message, Promise promise, BiConsumer, RoundRobinSelector> task) { - String address = message.address(); + public void withSelector(String address, Promise promise, BiConsumer, RoundRobinSelector> task) { SelectorEntry entry = map.compute(address, (addr, curr) -> { return curr == null ? new SelectorEntry() : (curr.isNotReady() ? curr.increment() : curr); }); diff --git a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java index 9b1645d09d2..f2762d0db70 100644 --- a/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java +++ b/src/test/java/io/vertx/core/eventbus/CustomNodeSelectorTest.java @@ -105,12 +105,12 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail("Not implemented"); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { List nodes = clusterManager.getNodes(); CompositeFuture future = nodes.stream() .map(nodeId -> { diff --git a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java index 28ff053e0e6..d08744e087d 100644 --- a/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java +++ b/src/test/java/io/vertx/core/eventbus/MessageQueueOnWorkerThreadTest.java @@ -89,7 +89,7 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { try { NANOSECONDS.sleep(150); } catch (InterruptedException e) { @@ -99,7 +99,7 @@ public void selectForSend(Message message, Promise promise) { } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { throw new UnsupportedOperationException(); } diff --git a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java index be2b2b02835..92dd56431d3 100644 --- a/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java +++ b/src/test/java/io/vertx/core/eventbus/WriteHandlerLookupFailureTest.java @@ -38,12 +38,12 @@ public void test() { .setPort(0); NodeSelector nodeSelector = new DefaultNodeSelector() { @Override - public void selectForSend(Message message, Promise promise) { + public void selectForSend(String address, Promise promise) { promise.fail(cause); } @Override - public void selectForPublish(Message message, Promise> promise) { + public void selectForPublish(String address, Promise> promise) { promise.fail("Not implemented"); } }; diff --git a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java index 76e2597863f..479d6a98a8e 100644 --- a/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java +++ b/src/test/java/io/vertx/core/spi/cluster/WrappedNodeSelector.java @@ -34,13 +34,13 @@ public void eventBusStarted() { } @Override - public void selectForSend(Message message, Promise promise) { - delegate.selectForSend(message, promise); + public void selectForSend(String address, Promise promise) { + delegate.selectForSend(address, promise); } @Override - public void selectForPublish(Message message, Promise> promise) { - delegate.selectForPublish(message, promise); + public void selectForPublish(String address, Promise> promise) { + delegate.selectForPublish(address, promise); } @Override From b51073aa48c8eaa83a74e754beb920e4d476aacb Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 14:11:16 +0200 Subject: [PATCH 07/12] Remove reply handler flag from HandlerHolder --- .../core/eventbus/impl/EventBusImpl.java | 28 +++++++++---- .../core/eventbus/impl/HandlerHolder.java | 8 +--- .../core/eventbus/impl/ReplyHandler.java | 2 +- .../impl/clustered/ClusteredEventBus.java | 42 ++++++++----------- .../clustered/ClusteredHandlerHolder.java | 4 +- 5 files changed, 40 insertions(+), 44 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index fbca1f9f1d2..bdbe44128d7 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -259,10 +259,16 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, } protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { - HandlerHolder holder = addLocalRegistration(address, registration, replyHandler, localOnly); - onLocalRegistration(holder, promise); + HandlerHolder holder = addLocalRegistration(address, registration, localOnly); + if (!replyHandler) { + onLocalRegistration(holder, promise); + } else { + if (promise != null) { + promise.complete(); + } + } return p -> { - removeRegistration(holder, p); + removeRegistration(holder, replyHandler, p); }; } @@ -273,12 +279,12 @@ protected void onLocalRegistration(HandlerHolder handlerHolder, Promise HandlerHolder addLocalRegistration(String address, HandlerRegistration registration, - boolean replyHandler, boolean localOnly) { + boolean localOnly) { Objects.requireNonNull(address, "address"); ContextInternal context = registration.context; - HandlerHolder holder = createHandlerHolder(registration, replyHandler, localOnly, context); + HandlerHolder holder = createHandlerHolder(registration, localOnly, context); ConcurrentCyclicSequence handlers = new ConcurrentCyclicSequence().add(holder); ConcurrentCyclicSequence actualHandlers = handlerMap.merge( @@ -293,13 +299,17 @@ private HandlerHolder addLocalRegistration(String address, HandlerRegistr return holder; } - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new HandlerHolder<>(registration, replyHandler, localOnly, context); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new HandlerHolder<>(registration, localOnly, context); } - protected void removeRegistration(HandlerHolder handlerHolder, Promise promise) { + protected void removeRegistration(HandlerHolder handlerHolder, boolean replyHandler, Promise promise) { removeLocalRegistration(handlerHolder); - onLocalUnregistration(handlerHolder, promise); + if (!replyHandler) { + onLocalUnregistration(handlerHolder, promise); + } else { + promise.complete(); + } } protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise promise) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java index d1135b2b2f0..5a0e32c1497 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerHolder.java @@ -22,14 +22,12 @@ public class HandlerHolder { public final ContextInternal context; public final HandlerRegistration handler; - public final boolean replyHandler; public final boolean localOnly; private boolean removed; - public HandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context) { + public HandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context) { this.context = context; this.handler = handler; - this.replyHandler = replyHandler; this.localOnly = localOnly; } @@ -76,10 +74,6 @@ public HandlerRegistration getHandler() { return handler; } - public boolean isReplyHandler() { - return replyHandler; - } - public boolean isLocalOnly() { return localOnly; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 649668090e1..b725cef1da4 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -83,7 +83,7 @@ protected boolean doReceive(Message reply) { } void register() { - register(repliedAddress, true, null); + register(repliedAddress, false, null); } @Override diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 6667eca0a57..5b386a7c2b9 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -156,37 +156,29 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, @Override protected void onLocalRegistration(HandlerHolder handlerHolder, Promise promise) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); - } else if (promise != null) { - promise.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + clusterManager.addRegistration(handlerHolder.getHandler().address, registrationInfo, Objects.requireNonNull(promise)); } @Override - protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean replyHandler, boolean localOnly, ContextInternal context) { - return new ClusteredHandlerHolder<>(registration, replyHandler, localOnly, context, handlerSequence.getAndIncrement()); + protected HandlerHolder createHandlerHolder(HandlerRegistration registration, boolean localOnly, ContextInternal context) { + return new ClusteredHandlerHolder<>(registration, localOnly, context, handlerSequence.getAndIncrement()); } @Override protected void onLocalUnregistration(HandlerHolder handlerHolder, Promise completionHandler) { - if (!handlerHolder.isReplyHandler()) { - RegistrationInfo registrationInfo = new RegistrationInfo( - nodeId, - handlerHolder.getSeq(), - handlerHolder.isLocalOnly() - ); - Promise promise = Promise.promise(); - clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); - promise.future().onComplete(completionHandler); - } else { - completionHandler.complete(); - } + RegistrationInfo registrationInfo = new RegistrationInfo( + nodeId, + handlerHolder.getSeq(), + handlerHolder.isLocalOnly() + ); + Promise promise = Promise.promise(); + clusterManager.removeRegistration(handlerHolder.getHandler().address, registrationInfo, promise); + promise.future().onComplete(completionHandler); } @Override @@ -249,7 +241,7 @@ protected HandlerHolder nextHandler(ConcurrentCyclicSequence hand Iterator iterator = handlers.iterator(false); while (iterator.hasNext()) { HandlerHolder next = iterator.next(); - if (next.isReplyHandler() || !next.isLocalOnly()) { + if (!next.isLocalOnly()) { handlerHolder = next; break; } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java index 8940eff5184..c5703d86117 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredHandlerHolder.java @@ -19,8 +19,8 @@ public class ClusteredHandlerHolder extends HandlerHolder { private final long seq; - public ClusteredHandlerHolder(HandlerRegistration handler, boolean replyHandler, boolean localOnly, ContextInternal context, long seq) { - super(handler, replyHandler, localOnly, context); + public ClusteredHandlerHolder(HandlerRegistration handler, boolean localOnly, ContextInternal context, long seq) { + super(handler, localOnly, context); this.seq = seq; } From ca06d218d44949de63c577f79c9a256b1fd00dc7 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 14:22:20 +0200 Subject: [PATCH 08/12] Use a broadcast flag when registering a handler registration --- .../java/io/vertx/core/eventbus/impl/EventBusImpl.java | 10 +++++----- .../vertx/core/eventbus/impl/HandlerRegistration.java | 6 +++--- .../vertx/core/eventbus/impl/MessageConsumerImpl.java | 2 +- .../java/io/vertx/core/eventbus/impl/ReplyHandler.java | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index bdbe44128d7..d9194e559b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -258,9 +258,9 @@ public MessageImpl createMessage(boolean send, String address, MultiMap headers, return msg; } - protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean replyHandler, boolean localOnly, Promise promise) { + protected Consumer> addRegistration(String address, HandlerRegistration registration, boolean broadcast, boolean localOnly, Promise promise) { HandlerHolder holder = addLocalRegistration(address, registration, localOnly); - if (!replyHandler) { + if (broadcast) { onLocalRegistration(holder, promise); } else { if (promise != null) { @@ -268,7 +268,7 @@ protected Consumer> addRegistration(String address, HandlerReg } } return p -> { - removeRegistration(holder, replyHandler, p); + removeRegistration(holder, broadcast, p); }; } @@ -303,9 +303,9 @@ protected HandlerHolder createHandlerHolder(HandlerRegistration regist return new HandlerHolder<>(registration, localOnly, context); } - protected void removeRegistration(HandlerHolder handlerHolder, boolean replyHandler, Promise promise) { + protected void removeRegistration(HandlerHolder handlerHolder, boolean broadcast, Promise promise) { removeLocalRegistration(handlerHolder); - if (!replyHandler) { + if (broadcast) { onLocalUnregistration(handlerHolder, promise); } else { promise.complete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index 81da3106d93..5727cc59a47 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -58,13 +58,13 @@ void receive(MessageImpl msg) { protected abstract void dispatch(Message msg, ContextInternal context, Handler> handler); - synchronized void register(String repliedAddress, boolean localOnly, Promise promise) { + synchronized void register(boolean broadcast, boolean localOnly, Promise promise) { if (registered != null) { throw new IllegalStateException(); } - registered = bus.addRegistration(address, this, repliedAddress != null, localOnly, promise); + registered = bus.addRegistration(address, this, broadcast, localOnly, promise); if (bus.metrics != null) { - metric = bus.metrics.handlerRegistered(address, repliedAddress); + metric = bus.metrics.handlerRegistered(address, null /* regression */); } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 776cbe0800d..0c9d0429ca5 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -216,7 +216,7 @@ public synchronized MessageConsumer handler(Handler> h) { registered = true; Promise p = result; Promise registration = context.promise(); - register(null, localOnly, registration); + register(true, localOnly, registration); registration.future().onComplete(ar -> { if (ar.succeeded()) { p.tryComplete(); diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index b725cef1da4..2ce310eedde 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -83,7 +83,7 @@ protected boolean doReceive(Message reply) { } void register() { - register(repliedAddress, false, null); + register(false, false, null); } @Override From d044fb1a915bfeda92672a4f2524d909598fef2e Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 12:59:12 +0200 Subject: [PATCH 09/12] Try to introduce Frame for what we exchange between peers --- .../core/eventbus/impl/EventBusImpl.java | 59 ++++++++++--------- .../io/vertx/core/eventbus/impl/Frame.java | 23 ++++++++ .../vertx/core/eventbus/impl/MessageImpl.java | 13 +++- .../impl/clustered/ClusteredEventBus.java | 14 ++--- .../impl/clustered/ConnectionHolder.java | 9 +-- 5 files changed, 77 insertions(+), 41 deletions(-) create mode 100644 src/main/java/io/vertx/core/eventbus/impl/Frame.java diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index d9194e559b2..97283663f27 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -346,7 +346,7 @@ protected void sendOrPub(OutboundDeliveryContext sendContext) { sendOrPub(sendContext.ctx, sendContext.message, sendContext.options, sendContext); } - protected void sendLocally(MessageImpl message, Promise writePromise) { + protected void sendLocally(Frame message, Promise writePromise) { ReplyException failure = deliverMessageLocally(message); if (failure != null) { writePromise.tryFail(failure); @@ -355,42 +355,47 @@ protected void sendLocally(MessageImpl message, Promise writePro } } - protected boolean isMessageLocal(MessageImpl msg) { + protected boolean isMessageLocal(Frame msg) { return true; } - protected ReplyException deliverMessageLocally(MessageImpl msg) { - ConcurrentCyclicSequence handlers = handlerMap.get(msg.address()); - boolean messageLocal = isMessageLocal(msg); - if (handlers != null) { - if (msg.isSend()) { - //Choose one - HandlerHolder holder = nextHandler(handlers, messageLocal); - if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0); - } - if (holder != null) { - holder.handler.receive(msg.copyBeforeReceive()); + protected ReplyException deliverMessageLocally(Frame frame) { + ConcurrentCyclicSequence handlers = handlerMap.get(frame.address()); + boolean messageLocal = isMessageLocal(frame); + if (frame instanceof MessageImpl) { + MessageImpl msg = (MessageImpl) frame; + if (handlers != null) { + if (msg.isSend()) { + //Choose one + HandlerHolder holder = nextHandler(handlers, messageLocal); + if (metrics != null) { + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0); + } + if (holder != null) { + holder.handler.receive(msg.copyBeforeReceive()); + } else { + // RACY issue !!!!! + } } else { - // RACY issue !!!!! + // Publish + if (metrics != null) { + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size()); + } + for (HandlerHolder holder: handlers) { + if (messageLocal || !holder.isLocalOnly()) { + holder.handler.receive(msg.copyBeforeReceive()); + } + } } + return null; } else { - // Publish if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, handlers.size()); - } - for (HandlerHolder holder: handlers) { - if (messageLocal || !holder.isLocalOnly()) { - holder.handler.receive(msg.copyBeforeReceive()); - } + metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0); } + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } - return null; } else { - if (metrics != null) { - metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0); - } - return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + frame.address()); } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/Frame.java b/src/main/java/io/vertx/core/eventbus/impl/Frame.java new file mode 100644 index 00000000000..0f0dadf94cb --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/Frame.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +public interface Frame { + + String address(); + + Buffer encodeToWire(); + + boolean isFromWire(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java index c2919156d20..67395486fb8 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageImpl.java @@ -13,6 +13,7 @@ import io.vertx.core.Future; import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.*; import java.util.List; @@ -21,7 +22,7 @@ /** * @author Tim Fox */ -public class MessageImpl implements Message { +public class MessageImpl implements Message, Frame { protected MessageCodec messageCodec; protected final EventBusImpl bus; @@ -140,4 +141,14 @@ public MessageCodec codec() { protected boolean isLocal() { return true; } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + throw new UnsupportedOperationException(); + } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 5b386a7c2b9..6b78ef0fc59 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -17,11 +17,7 @@ import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBusOptions; import io.vertx.core.eventbus.MessageCodec; -import io.vertx.core.eventbus.impl.CodecManager; -import io.vertx.core.eventbus.impl.EventBusImpl; -import io.vertx.core.eventbus.impl.HandlerHolder; -import io.vertx.core.eventbus.impl.HandlerRegistration; -import io.vertx.core.eventbus.impl.MessageImpl; +import io.vertx.core.eventbus.impl.*; import io.vertx.core.impl.CloseFuture; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; @@ -227,7 +223,7 @@ protected String generateReplyAddress() { } @Override - protected boolean isMessageLocal(MessageImpl msg) { + protected boolean isMessageLocal(Frame msg) { ClusteredMessage clusteredMessage = (ClusteredMessage) msg; return !clusteredMessage.isFromWire(); } @@ -318,7 +314,7 @@ public void handle(Buffer buff) { }; } - private void sendToNode(String nodeId, MessageImpl message, Promise writePromise) { + private void sendToNode(String nodeId, Frame message, Promise writePromise) { if (nodeId != null && !nodeId.equals(this.nodeId)) { sendRemote(nodeId, message, writePromise); } else { @@ -326,7 +322,7 @@ private void sendToNode(String nodeId, MessageImpl message, Promise void sendToNodes(Iterable nodeIds, MessageImpl message, Promise writePromise) { + private void sendToNodes(Iterable nodeIds, Frame message, Promise writePromise) { boolean sentRemote = false; if (nodeIds != null) { for (String nid : nodeIds) { @@ -350,7 +346,7 @@ private void clusteredSendReply(MessageImpl message, Promise wri } } - private void sendRemote(String remoteNodeId, MessageImpl message, Promise writePromise) { + private void sendRemote(String remoteNodeId, Frame message, Promise writePromise) { // We need to deal with the fact that connecting can take some time and is async, and we cannot // block to wait for it. So we add any sends to a pending list if not connected yet. // Once we connect we send them. diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java index 7301756b6c6..11684358915 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ConnectionHolder.java @@ -14,6 +14,7 @@ import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBusOptions; +import io.vertx.core.eventbus.impl.Frame; import io.vertx.core.eventbus.impl.MessageImpl; import io.vertx.core.eventbus.impl.codecs.PingMessageCodec; import io.vertx.core.impl.VertxInternal; @@ -48,9 +49,9 @@ class ConnectionHolder { private long pingTimeoutID = -1; private static class SomeTask { - final MessageImpl message; + final Frame message; final Promise writePromise; - SomeTask(MessageImpl message, Promise writePromise) { + SomeTask(Frame message, Promise writePromise) { this.message = message; this.writePromise = writePromise; } @@ -79,9 +80,9 @@ void connect() { } // TODO optimise this (contention on monitor) - synchronized void writeMessage(MessageImpl message, Promise writePromise) { + synchronized void writeMessage(Frame message, Promise writePromise) { if (connected) { - Buffer data = ((ClusteredMessage) message).encodeToWire(); + Buffer data = message.encodeToWire(); if (metrics != null) { metrics.messageWritten(message.address(), data.length()); } From 82f2e244c70c1d456207f24eacc6a17220ff9937 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 14:39:16 +0200 Subject: [PATCH 10/12] Handler registration should process generic frames --- .../java/io/vertx/core/eventbus/impl/EventBusImpl.java | 8 +++++++- .../vertx/core/eventbus/impl/HandlerRegistration.java | 10 ++++++---- .../vertx/core/eventbus/impl/MessageConsumerImpl.java | 9 +++++++++ .../java/io/vertx/core/eventbus/impl/ReplyHandler.java | 10 +++++++--- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 97283663f27..7f299d480bb 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -395,7 +395,13 @@ protected ReplyException deliverMessageLocally(Frame frame) { return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } } else { - return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + frame.address()); + if (handlers != null) { + HandlerHolder holder = nextHandler(handlers, messageLocal); + holder.handler.receive(frame); + return null; + } else { + return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + frame.address()); + } } } diff --git a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index 5727cc59a47..840cf7266f3 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -41,20 +41,22 @@ public abstract class HandlerRegistration implements Closeable { this.address = address; } - void receive(MessageImpl msg) { + void receive(Frame msg) { if (bus.metrics != null) { - bus.metrics.scheduleMessage(metric, msg.isLocal()); + bus.metrics.scheduleMessage(metric, ((MessageImpl)msg).isLocal()); // Will CCE } context.executor().execute(() -> { // Need to check handler is still there - the handler might have been removed after the message were sent but // before it was received if (!doReceive(msg)) { - discard(msg); + if (msg instanceof Message) { + discard((Message) msg); + } } }); } - protected abstract boolean doReceive(Message msg); + protected abstract boolean doReceive(Frame msg); protected abstract void dispatch(Message msg, ContextInternal context, Handler> handler); diff --git a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 0c9d0429ca5..7304bd955c6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -131,6 +131,15 @@ public synchronized Future unregister() { return fut; } + @Override + protected boolean doReceive(Frame msg) { + if (msg instanceof Message) { + return doReceive((Message) msg); + } else { + return false; + } + } + protected boolean doReceive(Message message) { Handler> theHandler; synchronized (this) { diff --git a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 2ce310eedde..2eae66ca26f 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -77,9 +77,13 @@ public void handle(Long id) { } @Override - protected boolean doReceive(Message reply) { - dispatch(null, reply, context); - return true; + protected boolean doReceive(Frame msg) { + if (msg instanceof Message) { + dispatch(null, (Message) msg, context); + return true; + } else { + return false; + } } void register() { From 761f252d3137ed38e25907fdc8dbcb057d3a7760 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Wed, 24 May 2023 15:49:41 +0200 Subject: [PATCH 11/12] Basic stream implementation --- .../java/io/vertx/core/eventbus/EventBus.java | 5 +- .../io/vertx/core/eventbus/MessageStream.java | 27 +++++++ .../core/eventbus/impl/ClientStream.java | 27 +++++++ .../core/eventbus/impl/EventBusImpl.java | 25 +++++++ .../io/vertx/core/eventbus/impl/FinFrame.java | 27 +++++++ .../vertx/core/eventbus/impl/StreamBase.java | 75 +++++++++++++++++++ .../core/eventbus/impl/StreamServer.java | 42 +++++++++++ .../io/vertx/core/eventbus/impl/SynFrame.java | 29 +++++++ .../core/eventbus/LocalEventBusTest.java | 29 +++++++ 9 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/vertx/core/eventbus/MessageStream.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/ClientStream.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/FinFrame.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/StreamBase.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/StreamServer.java create mode 100644 src/main/java/io/vertx/core/eventbus/impl/SynFrame.java diff --git a/src/main/java/io/vertx/core/eventbus/EventBus.java b/src/main/java/io/vertx/core/eventbus/EventBus.java index 140808e5c11..3d718093149 100644 --- a/src/main/java/io/vertx/core/eventbus/EventBus.java +++ b/src/main/java/io/vertx/core/eventbus/EventBus.java @@ -15,7 +15,6 @@ import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; -import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.eventbus.impl.DefaultSerializableChecker; @@ -199,6 +198,10 @@ default Future> request(String address, @Nullable Object message) */ MessageProducer publisher(String address, DeliveryOptions options); + Future bindStream(String address, Handler handler); + + Future connectStream(String address); + /** * Register a message codec. *

diff --git a/src/main/java/io/vertx/core/eventbus/MessageStream.java b/src/main/java/io/vertx/core/eventbus/MessageStream.java new file mode 100644 index 00000000000..c1d2952fd9c --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/MessageStream.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2011-2021 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.eventbus; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Handler; + +@VertxGen +public interface MessageStream { + + void handler(Handler> handler); + + void endHandler(Handler handler); + + void write(String msg); + + void end(); + +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java new file mode 100644 index 00000000000..872a9089800 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/ClientStream.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Promise; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class ClientStream extends StreamBase { + + private final Promise promise2; + + public ClientStream(EventBusImpl eventBus, String sourceAddress, ContextInternal ctx, Promise promise2) { + super(sourceAddress, ctx, eventBus, sourceAddress, true); + this.promise2 = promise2; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + remoteAddress = syn.src; + promise2.complete(this); + return true; + } else { + return super.doReceive(frame); + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 7f299d480bb..2f4155570b2 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -155,6 +155,31 @@ public MessageProducer publisher(String address, DeliveryOptions options) return new MessageProducerImpl<>(vertx, address, false, options); } + @Override + public Future bindStream(String address, Handler handler) { + ContextInternal ctx = vertx.getOrCreateContext(); + HandlerRegistration reg = new StreamServer(this, ctx, address, handler); + Promise promise = ctx.promise(); + reg.register(true, false, promise); + return promise.future(); + } + + @Override + public Future connectStream(String address) { + ContextInternal ctx = vertx.getOrCreateContext(); + String sourceAddress = generateReplyAddress(); + Promise promise2 = ctx.promise(); + StreamBase reg = new ClientStream(this, sourceAddress, ctx, promise2); + Promise promise = ctx.promise(); + reg.register(false, false, promise); + promise.future().onComplete(ar -> { + if (ar.succeeded()) { + sendLocally(new SynFrame(sourceAddress, address), ctx.promise()); + } + }); + return promise2.future(); + } + @Override public EventBus publish(String address, Object message) { return publish(address, message, new DeliveryOptions()); diff --git a/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java new file mode 100644 index 00000000000..70309c74ff7 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/FinFrame.java @@ -0,0 +1,27 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +class FinFrame implements Frame { + + final String addr; + + public FinFrame(String addr) { + this.addr = addr; + } + + @Override + public String address() { + return addr; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java new file mode 100644 index 00000000000..3b663b1566f --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamBase.java @@ -0,0 +1,75 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; + +class StreamBase extends HandlerRegistration implements MessageStream { + + private Handler> handler; + private Handler endHandler; + final String localAddress; + String remoteAddress; + private boolean halfClosed; + + StreamBase(String localAddress, ContextInternal context, EventBusImpl bus, String address, boolean src) { + super(context, bus, address, src); + this.localAddress = localAddress; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof MessageImpl) { + MessageImpl msg = (MessageImpl) frame; + Handler> h = handler; + if (h != null) { + h.handle(msg); + } + } else if (frame instanceof FinFrame) { + Handler h = endHandler; + if (h != null) { + h.handle(null); + } + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + + } + + @Override + public void handler(Handler> handler) { + this.handler = handler; + } + + @Override + public void endHandler(Handler handler) { + this.endHandler = handler; + } + + @Override + public void write(String body) { + MessageImpl msg = new MessageImpl(remoteAddress, MultiMap.caseInsensitiveMultiMap(), body, CodecManager.STRING_MESSAGE_CODEC, true, bus); + bus.sendLocally(msg, context.promise()); + } + + @Override + public void end() { + FinFrame fin = new FinFrame(remoteAddress); + bus.sendLocally(fin, context.promise()); + if (halfClosed) { + unregister(); + } else { + halfClosed = true; + } + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java new file mode 100644 index 00000000000..f60a5d87109 --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/StreamServer.java @@ -0,0 +1,42 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageStream; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; + +class StreamServer extends HandlerRegistration { + private final EventBusImpl eventBus; + private final Handler handler; + + public StreamServer(EventBusImpl eventBus, ContextInternal ctx, String address, Handler handler) { + super(ctx, eventBus, address, false); + this.eventBus = eventBus; + this.handler = handler; + } + + @Override + protected boolean doReceive(Frame frame) { + if (frame instanceof SynFrame) { + SynFrame syn = (SynFrame) frame; + String localAddress = eventBus.generateReplyAddress(); + StreamBase ss = new StreamBase(localAddress, context, eventBus, localAddress, false); + ss.remoteAddress = syn.src; + PromiseInternal p = context.promise(); + ss.register(false, true, p); + p.onComplete(ar -> { + if (ar.succeeded()) { + SynFrame reply = new SynFrame(localAddress, syn.src); + eventBus.sendLocally(reply, context.promise()); + handler.handle(ss); + } + }); + } + return true; + } + + @Override + protected void dispatch(Message msg, ContextInternal context, Handler handler) { + } +} diff --git a/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java new file mode 100644 index 00000000000..d55d6250a7e --- /dev/null +++ b/src/main/java/io/vertx/core/eventbus/impl/SynFrame.java @@ -0,0 +1,29 @@ +package io.vertx.core.eventbus.impl; + +import io.vertx.core.buffer.Buffer; + +public class SynFrame implements Frame { + + final String src; + final String dst; + + public SynFrame(String src, String dst) { + this.src = src; + this.dst = dst; + } + + @Override + public String address() { + return dst; + } + + @Override + public Buffer encodeToWire() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFromWire() { + return false; + } +} diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 43107da8963..f8784d2e369 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -1531,5 +1531,34 @@ public void testEarlyTimeoutOnHandlerUnregistration() { }); await(); } + + @Test + public void testStream() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + vertx.eventBus().bindStream(ADDRESS1, stream -> { + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.write(msg.body()); + }); + stream.endHandler(v -> { + stream.end(); + }); + }).onComplete(onSuccess(v -> { + latch.countDown(); + })); + awaitLatch(latch); + vertx.eventBus().connectStream(ADDRESS1).onComplete(onSuccess(stream -> { + stream.write("ping"); + stream.handler(msg -> { + assertEquals("ping", msg.body()); + stream.end(); + }); + stream.endHandler(v -> { + testComplete(); + }); + })); + await(); + } + } From 42bc778e01e23a46bb96eba747d4f1e3753bfbb4 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Fri, 26 May 2023 15:53:08 +0200 Subject: [PATCH 12/12] Update eventbus markdown --- .../java/io/vertx/core/eventbus/eventbus.md | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/test/java/io/vertx/core/eventbus/eventbus.md b/src/test/java/io/vertx/core/eventbus/eventbus.md index a9332bf4b68..a041cb1d1b6 100644 --- a/src/test/java/io/vertx/core/eventbus/eventbus.md +++ b/src/test/java/io/vertx/core/eventbus/eventbus.md @@ -11,37 +11,44 @@ P -> C: MSG(request) title Request and response hide footbox participant Producer as P -[-> P: send(request) +[-> P: request(request_message) create Source as S P --> S: bind ephemeral address participant Consumer as C -P -> C: SYN(Source)/MSG(request) -S <- C: FIN/MSG(response) +P -> C: SYN(Source)/MSG(request_message) +S <- C: FIN/MSG(reply_message) Destroy S -[<- S: reply(response) +[<- S: response(reply_message) ``` ```plantuml title General case hide footbox participant Producer as P -[-> P: begin +[-> P: stream = connect("consumer") create Source as S P --> S: src = bind ephemeral address participant Consumer as C -P -> C: SYN(Source) +P -> C: SYN(src) create Destination as D -C --> D: bind ephemeral address -S <- C: ACK(Destination) -S -> D: MSG -D -> S: MSG -S -> D: MSG +C --> D: dst = bind ephemeral address +S <- C: ACK(dst) +[-> S: stream.write(m1) +S -> D: MSG(m1) +D -> S: MSG(m2) +[<- S: handler.handle(m2) +[-> S: stream.write(m3) +S -> D: MSG(m3) +[-> S: stream.end() S -> D: FIN Destroy D -D -> S: MSG -D -> S: MSG +[<- S: handler.handle(m4) +D -> S: MSG(m4) +[<- S: handler.handle(m5) +D -> S: MSG(m5) D -> S: FIN Destroy S +[<- S: endHandler.handle(null) ``` ## Todo