Skip to content

Commit

Permalink
Merge pull request #35 from NOAA-GSL/idsse-450
Browse files Browse the repository at this point in the history
Added TTL and max length to _setup_queue for any private confirm queu…
  • Loading branch information
paulhamer-noaa authored Nov 21, 2023
2 parents 0a1ba1a + a3f33bc commit 5f7cc96
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
13 changes: 9 additions & 4 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class PublishConfirmParams(NamedTuple):
queue: Queue


class PublishConfirm():
class PublishConfirm:
"""This is a publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures for any process.
If RabbitMQ closes the connection, it will reopen it. You should
Expand Down Expand Up @@ -142,7 +142,7 @@ def _start(self, callback: Optional[Callable[[], None]] = None):
Args:
callback (Optional[Callable[[], None]]): callback function to be invoked
once instance is ready to publish messages (all RabbitMQ connection and channel
are setup, delivery confirmation is enabled, etc.). Default to None.
are set up, delivery confirmation is enabled, etc.). Default to None.
"""
logger.debug('Starting thread with callback')
if callback is not None:
Expand Down Expand Up @@ -199,7 +199,7 @@ def _wait_for_channel_to_be_ready(self) -> None:
"""If connection or channel are not open, start the PublishConfirm to do needed
RabbitMQ setup. This method will not return until channel is confirmed ready for use"""

# validate that PublishConfirm thread has been setup and connected to RabbitMQ
# validate that PublishConfirm thread has been set up and connected to RabbitMQ
if not (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
logger.debug('Channel is not ready to publish, calling _start() now')
Expand Down Expand Up @@ -317,8 +317,13 @@ def _setup_queue(self, queue: Queue):
:param str|unicode queue_name: The name of the queue to declare.
"""
logger.debug('Declaring queue %s', queue.name)
args = {} # If we have a 'private' queue, i.e. one that is not consumed but used to support message publishing
if queue.name.startswith('_'):
# Set message time-to-live (TTL) to 10 seconds
args = {'x-message-ttl': 10000}
self._channel.queue_declare(queue=queue.name,
durable=queue.durable,
arguments=args,
exclusive=queue.exclusive,
auto_delete=queue.auto_delete,
callback=self._on_queue_declareok)
Expand Down Expand Up @@ -390,7 +395,7 @@ def _on_delivery_confirmation(self, method_frame: Method):
ack_multiple = method.multiple
delivery_tag = method.delivery_tag

logger.debug('Received %s for delivery tag: %i (multiple: %s)',
logger.info('Received %s for delivery tag: %i (multiple: %s)',
confirmation_type, delivery_tag, ack_multiple)

if confirmation_type == 'ack':
Expand Down
2 changes: 1 addition & 1 deletion python/idsse_common/test/test_publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def exchange_declare(self, exchange, exchange_type, callback: Callable[[Any, str

# pylint: disable=too-many-arguments
def queue_declare(
self, queue, durable, exclusive, auto_delete, callback: Callable[[Any], None]
self, queue, durable, arguments, exclusive, auto_delete, callback: Callable[[Any], None]
):
callback(None)

Expand Down

0 comments on commit 5f7cc96

Please sign in to comment.