-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide a better event-loop to context message transfer queue #5180
Conversation
9cbef84
to
5e02d8a
Compare
@franz1981 @jponge mostly ready for review |
1d4d9e1
to
e91c179
Compare
5f9788d
to
42d189a
Compare
This is now ready for review @tsegismont @franz1981 @jponge @cescoffier and anyone else who wants to review this |
(still on my todo) |
@jponge ok |
949fa93
to
c7ff6aa
Compare
* @param amount the number of message to consume | ||
*/ | ||
public final void fetch(long amount) { | ||
if (amount < 0L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might want to handle 0
as well and reject it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or accept it in the other branch, but in Reactive Streams land 0 is not accepted)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to tolerate 0 is a no-op, any reactive-streams implementation that uses it will never pass 0
…est instead of accumulating messages in the request.
The Vert.x event-loop thread to context thread message transfer relies on the
InboundBuffer
implementation.InboundBuffer
performs the add/dispatch in the same method call assuming that the same thread actually handles the dispatch of the message, forcing the dispatch to then schedule the message delivery on the context thread.Inbound read queue
The
InboundReadQueue
design actually split this operation in two separate operations providing control to the caller of the message dispatch.add
operation queues a message and let the producer knows whether messages should be drained from the queue (e.g. if a drain operation is already in progress, then there is no need to schedule another drain).drain
operation let consumer consume messages from the queue until neededThe event-loop to context message dispatch then becomes:
Event-loop thread dispatch
In this use case, the
InboundReadQueue
assumes the same thread is producing/consuming messages and therefore no memory visibility is actually required. In practice the event-loop add to the queue and then drain delivers the message to the connection.Generic context thread dispatch
In this use case, the queue will be drained by the context thread and an SPSC + volatile WIP is used. This behaviour also can optimise the message delivery since we don't need anymore a message received ⇒ scheduling a task, the event-loop thread and the context thread can work in an SPSC consumer design.
This use case holds for:
Back pressure
The context thread message deals controls the back-pressure and cannot control the inbound channel back-pressure without races. Like Like the
OutboundWriteQueue
, theInboundReadQueue
relies on an internal buffer and the queue acts as an intermediary for back-pressure.add
operation signals when the queue becomes un-writable (e.g. it turns off Netty auto-read)Inbound message queue
The
InboundMessageQueue
is a construct integrating theInboundReadQueue
with the Vert.x context and a demand counter (ReadStream
). The consumer deals withpause
/resume
/fetch
to control demand, the producer implements the message flow pause/resume. This queue is theInboundBuffer
replacement.ConnectionBase message flow changes
doPause
/doResume
ofConnectionBase
has been rewritten to be strict concerning the delivery of messages. Previously these operations were turning on/off channel auto read, however this was not controlling the reads in progress. This changes these operations to control apaused
flag and buffer any messages to be handled by the connection base when paused.This change has been made to let the
Http1xServerConnection
control precisely the message flow when processing an HTTP pipelined request content. Previously the content was poured in the pending queue of the pipelined request which is complicated to deliver when the pipelined request is processed and requires some hacks with respect to the request demand. After this change,Http1xServerConnection
can immediately pause the connection after receiving a pipelined request and have the guarantee that no message will be processed until it is resumed. When the pipelined request is processed, the connection is resumed and will deliver the messages with the regularhandleMessage
flow.Performance
Plaintext
Microbenchmark
InboundMessageQueue (synchronised long)
InboundMessageQueue (volatile long)
InboundBuffer (master)
InboundBuffer (4.x)
Integration with Vert.x Core
This integrates with
Past attempts to solve this: