Skip to content

Commit

Permalink
Improve InboundMessageQueue to execute handle resume on event-loop th…
Browse files Browse the repository at this point in the history
…read
  • Loading branch information
vietj committed Apr 10, 2024
1 parent b975d80 commit f6b17bd
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private static class StreamImpl extends Stream implements HttpClientStream {
super(context, promise, id);

this.conn = conn;
this.queue = new InboundMessageQueue<>(context) {
this.queue = new InboundMessageQueue<>(conn.context.nettyEventLoop(), context) {
@Override
protected void handleResume() {
conn.doResume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void handleWsFrame(WebSocketFrame msg) {
w = webSocket;
}
if (w != null) {
w.context.execute(frame, w::handleFrame);
w.handleFrame(frame);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class Http1xServerRequest extends HttpServerRequestInternal implements io
this.context = context;
this.request = request;
this.parked = parked;
this.queue = new InboundMessageQueue<>(context) {
this.queue = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
@Override
protected void handle(Object elt) {
if (elt == InboundBuffer.END_SENTINEL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public abstract class WebSocketImplBase<S extends WebSocket> implements WebSocke
this.context = context;
this.maxWebSocketFrameSize = maxWebSocketFrameSize;
this.maxWebSocketMessageSize = maxWebSocketMessageSize;
this.pending = new InboundMessageQueue<>(context) {
this.pending = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
@Override
protected void handleResume() {
conn.doResume();
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/io/vertx/core/net/impl/InboundMessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.vertx.core.net.impl;

import io.netty.channel.EventLoop;
import io.vertx.core.ThreadingModel;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.streams.impl.InboundReadQueue;
Expand All @@ -20,19 +21,21 @@
public class InboundMessageQueue<M> implements Predicate<M>, Runnable {

private final ContextInternal context;
private final EventLoop eventLoop;
private final InboundReadQueue<M> readQueue;
private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
private boolean draining;

public InboundMessageQueue(ContextInternal context) {
public InboundMessageQueue(EventLoop eventLoop, ContextInternal context) {
InboundReadQueue.Factory readQueueFactory;
if (context.threadingModel() == ThreadingModel.EVENT_LOOP) {
if (context.threadingModel() == ThreadingModel.EVENT_LOOP && context.nettyEventLoop() == eventLoop) {
readQueueFactory = InboundReadQueue.SINGLE_THREADED;
} else {
readQueueFactory = InboundReadQueue.SPSC;
}
this.readQueue = readQueueFactory.create(this);
this.context = context;
this.eventLoop = eventLoop;
}

@Override
Expand Down Expand Up @@ -70,6 +73,7 @@ protected void handle(M msg) {
}

public boolean add(M msg) {
assert eventLoop.inEventLoop();
int res = readQueue.add(msg);
if ((res & InboundReadQueue.QUEUE_UNWRITABLE_MASK) != 0) {
handlePause();
Expand All @@ -90,8 +94,9 @@ public void run() {
}
draining = true;
try {
if ((readQueue.drain() & InboundReadQueue.QUEUE_WRITABLE_MASK) != 0) {
handleResume();
int drain = readQueue.drain();
if ((drain & InboundReadQueue.QUEUE_WRITABLE_MASK) != 0) {
eventLoop.execute(this::handleResume);
}
} finally {
draining = false;
Expand All @@ -116,7 +121,7 @@ public void demand(long value) {
}
demand.set(value);
if (value > 0L) {
drain();
context.executor().execute(this); // TODO TEST THIS
}
}

Expand All @@ -137,6 +142,6 @@ public void fetch(long amount) {
break;
}
}
drain();
context.executor().execute(this); // TODO TEST THIS
}
}
9 changes: 5 additions & 4 deletions src/main/java/io/vertx/core/net/impl/NetSocketImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public NetSocketImpl(ContextInternal context,
this.metrics = metrics;
this.messageHandler = new DataMessageHandler();
this.negotiatedApplicationLayerProtocol = negotiatedApplicationLayerProtocol;
this.pending = new InboundMessageQueue<>(context) {
this.pending = new InboundMessageQueue<>(context.nettyEventLoop(), context) {
@Override
protected void handleResume() {
NetSocketImpl.this.doResume();
Expand Down Expand Up @@ -184,7 +184,7 @@ private synchronized Handler<Object> messageHandler() {

@Override
public synchronized NetSocketInternal messageHandler(Handler<Object> handler) {
messageHandler = handler == null ? new DataMessageHandler() : handler;
messageHandler = handler == null ? new DataMessageHandler() : msg -> context.emit(msg, handler);
return this;
}

Expand Down Expand Up @@ -355,13 +355,14 @@ public Future<Void> end() {

@Override
protected void handleClosed() {
context.emit(InboundBuffer.END_SENTINEL, pending::write);
pending.write(InboundBuffer.END_SENTINEL);
super.handleClosed();
}

@Override
public void handleMessage(Object msg) {
context.emit(msg, messageHandler());
Handler<Object> handler = messageHandler();
handler.handle(msg);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/vertx/core/http/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3336,6 +3336,7 @@ public void start(Promise<Void> startPromise) {
await();
}

@Repeat(times = 16)
@Test
public void testServerReadStreamInWorker() throws Exception {
int numReq = 16;
Expand All @@ -3348,7 +3349,7 @@ public void start(Promise<Void> startPromise) {
server.requestHandler(req -> {
req.end().onComplete(onSuccess(v -> req.response().end()));
req.pause();
vertx.setTimer(250, id -> {
vertx.setTimer(10, id -> {
req.resume();
});
}).listen(testAddress)
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.net.*;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.test.core.CheckingSender;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.proxy.HAProxy;
Expand Down Expand Up @@ -2017,6 +2018,7 @@ public void testRaceConditionWithWebSocketClientEventLoop() {
testRaceConditionWithWebSocketClient(vertx.getOrCreateContext());
}

@Repeat(times = 100)
@Test
public void testRaceConditionWithWebSocketClientWorker() throws Exception {
CompletableFuture<Context> fut = new CompletableFuture<>();
Expand Down

0 comments on commit f6b17bd

Please sign in to comment.