From 84c2fb181b16b74fbc587dba786b55214cb7743f Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate <136493179+mackenzie-grimes-noaa@users.noreply.github.com> Date: Fri, 3 Nov 2023 14:22:25 -0600 Subject: [PATCH] bug: idsse-429: PublishConfirm wait for internal setup (#32) * add threading.Event to block while rmq setup completes * Update python/idsse_common/idsse/common/publish_confirm.py Co-authored-by: Geary-Layne <77741618+Geary-Layne@users.noreply.github.com> * publishconfirm: make Thread, all non-essential methods internal * remove callback from PublishConfirm.start(), hide in internal method --------- Co-authored-by: Geary-Layne <77741618+Geary-Layne@users.noreply.github.com> --- .../idsse/common/publish_confirm.py | 151 +++++++++++------- .../idsse/common/rabbitmq_utils.py | 6 + .../idsse_common/test/test_publish_confirm.py | 56 ++++--- 3 files changed, 130 insertions(+), 83 deletions(-) diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index 4b313057..bf586957 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -8,6 +8,7 @@ # Contributors: # Geary Layne (1) # Paul Hamer (2) +# Mackenzie Grimes (2) # ---------------------------------------------------------------------------------- # pylint: disable=C0111,C0103,R0205 @@ -17,10 +18,11 @@ import json import time from dataclasses import dataclass, field -from threading import Thread +from random import randint +from threading import Thread, Event from typing import Optional, Dict, NamedTuple, Union, Callable, cast -from pika import SelectConnection, URLParameters, BasicProperties +from pika import SelectConnection, ConnectionParameters, PlainCredentials, BasicProperties from pika.channel import Channel from pika.frame import Method from pika.spec import Basic @@ -47,13 +49,14 @@ class PublishConfirmRecords: message_number: int = 0 -class RabbitMqParams(NamedTuple): - """Data class to hold rabbitmq configurations""" +class PublishConfirmParams(NamedTuple): + """Data class to hold RabbitMQ configurations for PublishConfirm""" + conn: Conn exchange: Exch queue: Queue -class PublishConfirm(Thread): +class PublishConfirm(): """This is a publisher that will handle unexpected interactions with RabbitMQ such as channel and connection closures for any process. If RabbitMQ closes the connection, it will reopen it. You should @@ -68,97 +71,83 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): :param Exch exchange: The RabbitMQ exchange details :param Queue queue: The RabbitMQ queue details """ - super().__init__(daemon=True) + self._thread = Thread(name=f'{__name__}-{randint(0,9)}', + daemon=True, + target=self._run) self._connection: Optional[SelectConnection] = None self._channel: Optional[Channel] = None - self._records = PublishConfirmRecords() - self._stopping = False - self._url = (f'amqp://{conn.username}:{conn.password}@{conn.host}' - f':{str(conn.port)}/%2F?connection_attempts=3&heartbeat=3600') - self._rmq_params = RabbitMqParams(exchange=exchange, queue=queue) + self._rmq_params = PublishConfirmParams(conn, exchange, queue) + self._records = PublishConfirmRecords() # data class to track message activity self._on_ready_callback: Optional[Callable[[], None]] = None - def connect(self): - """This method connects to RabbitMQ, returning the connection handle. - When the connection is established, the on_connection_open method - will be invoked by pika. - :rtype: pika.SelectConnection - """ - logger.info('Connecting to %s', self._url) - return SelectConnection( - URLParameters(self._url), - on_open_callback=self._on_connection_open, - on_open_error_callback=self._on_connection_open_error, - on_close_callback=self._on_connection_closed) + set_corr_id_context_var('PublishConfirm') - def publish_message(self, message, key=None) -> bool: + def publish_message(self, + message: Dict, + routing_key = '', + corr_id: Optional[str] = None) -> bool: """If the class is not stopping, publish a message to RabbitMQ, appending a list of deliveries with the message number that was sent. This list will be used to check for delivery confirmations in the on_delivery_confirmations method. + Args: + message (Dict): message to publish (should be valid json) + routing_key (str): routing_key to route the message to correct consumer. + Default is empty str + corr_id (Optional[str]): optional correlation_id to include in message + Returns: bool: True if message successfully published to queue (channel was open and publish did not throw exception) Raises: RuntimeError: if channel is uninitialized (start() not completed yet) or is closed """ - if not (self._channel and self._channel.is_open): - # tried to publish message before start() was called (or while it was in progress) - channel_state = 'closed' if self._channel and self._channel.is_closed else 'None' - raise RuntimeError(f'Cannot publish messages yet; RabbitMQ channel is {channel_state}') + self._wait_for_channel_to_be_ready() # We expect a JSON message format, do a check here... try: properties = BasicProperties(content_type='application/json', - content_encoding='utf-8') - self._channel.basic_publish(self._rmq_params.exchange.name, key, + content_encoding='utf-8', + correlation_id=corr_id) + self._channel.basic_publish(self._rmq_params.exchange.name, routing_key, json.dumps(message, ensure_ascii=True), properties) self._records.message_number += 1 self._records.deliveries[self._records.message_number] = message - logger.debug('Published message # %i', self._records.message_number) + logger.debug('Published message # %i to exchange %s, queue %s, routing_key %s', + self._records.message_number, self._rmq_params.exchange.name, + self._rmq_params.queue.name, routing_key) return True except Exception as e: # pylint: disable=broad-exception-caught logger.error('Publish message problem : %s', str(e)) return False - def run(self): - """Run the thread, i.e. get connection etc... + def start(self): + """Start thread to connect to RabbitMQ queue and prepare to publish messages, invoking + callback when setup complete. """ - set_corr_id_context_var('PublishConfirm') - - self._connection = self.connect() - self._connection.ioloop.start() - - while not self._stopping: - time.sleep(5) - - if self._connection is not None and not self._connection.is_closed: - # Finish closing - self._connection.ioloop.start() + logger.debug('Starting thread') + self._start() - def start(self, callback: Optional[Callable[[], None]] = None): - """Start thread to connect RabbitMQ queue and prepare to publish messages. Must be called - before publish_message(). + def _start(self, callback: Optional[Callable[[], None]] = None): + """ + Start a thread to handle PublishConfirm operations Args: callback (Optional[Callable[[], None]]): callback function to be invoked once instance is ready to publish messages (all RabbitMQ connection and channel - are setup, delivery confirmation is enabled, etc.). Defaults to None + are setup, delivery confirmation is enabled, etc.). Default to None. """ - logger.debug('Starting thread') - self._on_ready_callback = callback # to be invoked after all pika setup is done - super().start() - - # callback not passed, so sleep momentarily to ensure all RabbitMQ callbacks can complete - if callback is None: - time.sleep(.2) + logger.debug('Starting thread with callback') + if callback is not None: + self._on_ready_callback = callback # to be invoked after all pika setup is done + self._thread.start() def stop(self): """Stop the example by closing the channel and connection. We @@ -174,6 +163,54 @@ def stop(self): self._close_connection() self._stopping = False # done stopping + def _run(self): + """Run a new thread: get a new RMQ connection, and start looping until stop() is called""" + self._connection = self._create_connection() + self._connection.ioloop.start() + time.sleep(0.2) + + while not self._stopping: + time.sleep(5) + + if self._connection is not None and not self._connection.is_closed: + # Finish closing + self._connection.ioloop.start() + + def _create_connection(self): + """This method connects to RabbitMQ, returning the connection handle. + When the connection is established, the on_connection_open method + will be invoked by pika. + :rtype: pika.SelectConnection + """ + conn = self._rmq_params.conn + logger.info('Connecting to RabbitMQ: %s', conn) + return SelectConnection( + parameters=ConnectionParameters( + host=conn.host, + virtual_host=conn.v_host, + port=conn.port, + credentials=PlainCredentials(conn.username, conn.password) + ), + on_open_callback=self._on_connection_open, + on_open_error_callback=self._on_connection_open_error, + on_close_callback=self._on_connection_closed) + + def _wait_for_channel_to_be_ready(self) -> None: + """If connection or channel are not open, start the PublishConfirm to do needed + RabbitMQ setup. This method will not return until channel is confirmed ready for use""" + + # validate that PublishConfirm thread has been setup and connected to RabbitMQ + if not (self._connection and self._connection.is_open + and self._channel and self._channel.is_open): + logger.debug('Channel is not ready to publish, calling _start() now') + + # pass callback to flip is_ready flag, and block until flag changes + is_ready = Event() + self._start(callback=is_ready.set) + is_ready.wait() + + logger.debug('Connection and channel setup complete, ready to publish message') + def _on_connection_open(self, _unused_connection): """This method is called by pika once the connection to RabbitMQ has been established. It passes the handle to the connection object in @@ -307,9 +344,9 @@ def _on_bindok(self, _unused_frame): response from RabbitMQ. Since we know we're now setup and bound, it's time to start publishing.""" logger.debug('Queue bound') - self.start_publishing() + self._start_publishing() - def start_publishing(self): + def _start_publishing(self): """This method will enable delivery confirmations and schedule the first message to be sent to RabbitMQ """ diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index 971ec88c..04e177db 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -34,3 +34,9 @@ class Queue(NamedTuple): durable: bool exclusive: bool auto_delete: bool + + +class RabbitMqParams(NamedTuple): + """Data class to hold configurations for RabbitMQ exchange/queue pair""" + exchange: Exch + queue: Queue diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py index 1dc2e7e3..c27e15ef 100644 --- a/python/idsse_common/test/test_publish_confirm.py +++ b/python/idsse_common/test/test_publish_confirm.py @@ -16,7 +16,7 @@ from typing import Callable, Union, Any, NamedTuple from unittest.mock import Mock -from pytest import fixture, raises, MonkeyPatch +from pytest import fixture, MonkeyPatch from pika.spec import Basic from idsse.common.publish_confirm import PublishConfirm @@ -170,34 +170,26 @@ def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], No assert publish_confirm._records.acked == 0 -def test_publish_message_success(publish_confirm: PublishConfirm): - message_data = {'data': 123} +def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, context: MockPika): + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) + pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) + example_message = {'data': [123]} - publish_confirm.start() - result = publish_confirm.publish_message(message_data) + assert pub_conf._connection is None and pub_conf._channel is None + success = pub_conf.publish_message(example_message) - assert result - assert publish_confirm._records.message_number == 1 - assert publish_confirm._records.deliveries[1] == message_data + # connection & channel should have been initialized internally, so publish should have worked + assert success + assert pub_conf._channel is not None and pub_conf._channel.is_open + assert pub_conf._records.message_number == 1 + assert pub_conf._records.deliveries[1] == example_message -def test_publish_message_exception_when_channel_not_open(publish_confirm: PublishConfirm): +def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, context: MockPika): message_data = {'data': 123} - # missing a publish_confirm.start(), should fail - with raises(RuntimeError) as pytest_error: - publish_confirm.publish_message(message_data) - - assert pytest_error is not None - assert 'RabbitMQ channel is None' in str(pytest_error.value) - assert publish_confirm._records.message_number == 0 # should not have logged message sent - assert len(publish_confirm._records.deliveries) == 0 - - -def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm): - message_data = {'data': 123} - - publish_confirm.start() + publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock()) + publish_confirm._channel = context.Channel() publish_confirm._channel.basic_publish = Mock(side_effect=RuntimeError('ACCESS_REFUSED')) success = publish_confirm.publish_message(message_data) @@ -208,7 +200,10 @@ def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm): def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika): - publish_confirm.start() + publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock()) + publish_confirm._channel = context.Channel() + publish_confirm._channel.close() + publish_confirm._on_channel_closed(context.Channel(), 'ChannelClosedByClient') assert publish_confirm._channel is None assert publish_confirm._connection.is_closed @@ -223,9 +218,9 @@ def test_callback(): assert success assert publish_confirm._channel is None - publish_confirm.start(callback=test_callback) + publish_confirm._start(test_callback) - sleep(.1) # ensure that callback has time to run and send its message + sleep(.1) # ensure that our test's callback has time to run and send its message assert publish_confirm._records.message_number == 1 assert publish_confirm._records.deliveries[1] == example_message @@ -247,3 +242,12 @@ def mock_sleep_function(secs: float): # between this test's thread and the PublishConfirm thread. Both results are acceptable sleep_call_args = [call.args for call in mock_sleep.call_args_list] assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (5,)])] + + +def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: MockPika): + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) + pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) + + assert pub_conf._channel is None + pub_conf._wait_for_channel_to_be_ready() + assert pub_conf._channel is not None and pub_conf._channel.is_open