A very simple disk-backed message queue implemented in Java.
We needed a simple message queue with the following characteristics:
- One or many message sources.
- At most one message sink
- Sources and sinks connect to the message queue by TCP/IP socket and can operate completely asynchronously.
- Each message from a source is synchronously acked by the message queue when it has been safely persisted by the message queue.
- Each message received by a sink is acked by the sink, after which the message queue is free to discard it. The source does not need to know when (or even whether) a message has been delivered to a sink, but can assume that an acked message will be delivered to a sink, eventually.
- When it is restarted, it is acceptable for the message queue to resend messages to a sink which may have already been acked by a sink. That is, on restart, it is ok for the message queue to be not certain whether some messages persisted to disk from a previous message queue instantiation have been acked by a sink, and to send them again. If the message queue is not certain whether a message has been previously sent, it tells the sink that the message may be a "replay". It is up to the sink to decide what to do with possibly duplicate messages.
- Each message consists of 2 "fields": a message id of 16 bytes and a message content as a variable length byte array. The source can choose to generate the id or leave it to the message queue on a per-message basis. Messages provided by the source are "opaque" to the message queue: they are not examined, they do not need to be in any sequence or to be unique. If a message id is not provided by the source, it will be generated by the message queue; such id's are guaranteed to be unique amongst ids generated by the message queue, but could duplicate ids generated by source.
- Messages must be delivered to the sink in the order they were received from the source(s). If there are multiple sources, the vaguaries of thread scheduling mean it is very difficult to guarantee delivery order across messages from multiple sources, but at guaranteeing that messages will be delivered in the order in which they were acked to the source is good enough.
- A source will never want to "skip" a message, that is, will never want to get a second message before acking the first message.
- The amount of memory used by the message queue and the level of replaying messages on restart should be roughly configurable.
- Performance is not a dominant design criteria. Rather, simplicity, ease of configuration, operation and use, and robustness are most important.
There are already dozens of message queues, so implementing another one must seem strange. However, it was reasonable fun, and the result is, at least, small and understandable, so hopefully bugs will be correctable!
Everything is packaged as projectComputing.MessageQueue. Here's what each source file is:
-
MessageQueue.java: The message queue. It can run in its own JVM, started with it's main(), or wrapped by any code as part of another JVM. It contains these classes:
- MessageQueueSourceListener - listens for source connections in its own thread
- MessageQueueSource - communicates with a source in its own thread
- MessageQueueSinkListener - listens for a sink connection in its own thread. Because there can only be one sink connected at a time, it does not start a new thread to communicate to the sink.
- MessageQueueFile - manages the files which comprise the on-disk message queue.
- Message - an internal representation of a message whilst it is on the in-memory message-queue.
A message queue consists of an in-memory queue and files on disk. Each message received is always appended to the currently-open MessageQueueFile. The in-memory message queue can be in one of two modes:
1. Containing messages re-read from disk. This happens once the in-memory queue has been filled to capacity: new messages will only be written to disk (not to memory) until the entire backlog of messages written to disk has been consumed by a sink. 2. Containing messages written to memory (as well as being written to disk) as they are received.
The names of the disk files containing persisted messages allow processing those files in the sequence in which they were generated.
Disk files are deleted when the last message they contain is acked by a sink.
The approximate memory to be used by the in-memory queue is configurable, as is the approximate size of each disk file. It is suggested that the disk file size should be much smaller than the memory-queue size: on restart and when processing a backlog of messages, entire files are read into memory, not just part of a file. Smaller disk files also reduce the number of messages that may be replayed to the sink following a restart of the message queue.
-
MessageQueueWriter.java: the class that message queue sources should use to send messages to a message queue.
-
MessageQueueReader.java: the class that message queue sinks should use to request messages from a message queue.
-
ReceivedMessageQueueMessage.java: the class representing a message received by a MessageQueueReader from the message queue and returned to a message sink.
javac projectComputing/MessageQueue/MessageQueue.java
javac projectComputing/MessageQueue/MessageQueueWriter.java
javac projectComputing/MessageQueue/MessageQueueReader.java
javac projectComputing/MessageQueue/ReceivedMessageQueueMessage.java
There are 5 properties of the message queue that can be defined as runtime as either system environmental properties or by setting MessageQueue.java fields from your own message queue instantiation code:
- messageQueueSourcePort - the TCP/IP port used by message sources. Default: 6211
- messageQueueSinkPort - the TCP/IP port used by message sinks. Default: 6212
- maxMemoryQueueSize - very approximate max size in bytes of in-memory message queue. Default: 64000000 (ie, 64mb).
- diskFileSizeDivisor - number to divide into maxMemoryQueueSize to give approx max disk file size; eg, 4 makes disk file 1 quarter the size. Default: 4 (ie, 16mb).
- messageQueueDirectoryName - directory used for persisting messages. Default: "messageStore".
These properties can be set on the java command line (eg, -DmessageStore="/usr/local/messageQueueFiles"), or by code which instantiates the message queue by before the message queue begins operation:
MessageQueue mq = new MessageQueue() ;
mq.messageStore = "/usr/local/messageQueueFiles" ;
mq.maxMemoryQueueSize = 10000000 ;
mq.begin() ;
To run the message queue as a stand-alone JVM with default properties:
java -classpath . projectComputing.MessageQueue.MessageQueue
Don't forget to set the java heap size appropriately!
Create an instance of projectComputing.MessageQueue.MessageQueueWriter by passing it the ipaddress or dns name and the "source" tcp/ip port which has been bound to a message queue. Then invoke write() for each message passing an optional message id and the message contents. The write() method will return when the message has been acked by the server, or will throw an exception if the message cannot be delivered.
Here's some sample code
include projectComputing.MessageQueue.MessageQueueWriter ;
...
MessageQueueWriter mqWriter = new MessageQueueWriter(serverAddr, serverSourcePort) ;
byte[] contents ;
byte[ id = new byte[16] ; // only if generating message ids
while (moreToSend) {
// set the byte[] contents somehow...
// if generating and sending your own 16 byte message ids
// set the byte[16] id somehow, then:
mqWriter.write(id, contents) ;
// else, if not generating your own message ids:
mqWriter.write(contents) ;
}
mqWriter.close() ; // release the connection
See the sample "message queue source" program, projectComputing/MessageQueue/DemoSource.java for a simple but complete, working example.
Create an instance of projectComputing.MessageQueue.MessageQueueReader by passing it the ipaddress or dns name and the "sink" tcp/ip port which has been bound to a message queue. then invoke read() which will return when a message has been received from the the server ("acking" its receipt), or will throw an exception if the message cannot be delivered.
Note, the MessageQueueReader code always acks receipt of the message (hence telling the message queue that it has been received and can be discarded from the queue) before it is returned to your code. If this is not appropriate (for example, your code may crash whilst processing the message), then you should reimplement MessageQueueReader to move the acking into a separate method. Also note, however, that it is not possible to "skip" a message without acking it!
Here's some sample code
include projectComputing.MessageQueue.MessageQueueReader ;
include projectComputing.MessageQueue.ReceivedMessageQueueMessage ;
MessageQueueReader mqReader = new MessageQueueReader(serverAddr, serverSinkPort) ;
while (true) {
ReceivedMessageQueueMessage message = mqReader.read() ;
System.out.println("Received message id: " + message.idAsString() + ", contents: " + message.contentsAsString() +
", possibly replayed:" + message.possiblyReplayed) ;
// to get id as 16-byte array, use message.messageId; to get contents as byte array, use message.contents
}
mqReader.close() ; // release the connection
Note, only one sink can connect to the message queue at a time. The instantiation of the MessageQueueReader will hang if another sink is connected to the nominated message queue sink end-point.
John Evershed and Kent Fitch, www.projectcomputing.com
Copyright (C) 2012 John Evershed and Kent Fitch
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.