diff --git a/python/idsse_common/idsse/common/grid_proj.py b/python/idsse_common/idsse/common/grid_proj.py index 282d98d8..d62d6dff 100644 --- a/python/idsse_common/idsse/common/grid_proj.py +++ b/python/idsse_common/idsse/common/grid_proj.py @@ -2,12 +2,12 @@ # ---------------------------------------------------------------------------------- # Created on Mon Jul 31 2023 # -# Copyright (c) 2023 Colorado State University. All rights reserved. (1) -# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (2) +# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) +# Copyright (c) 2023 Colorado State University. All rights reserved. (2) # # Contributors: -# Mackenzie Grimes (1) -# Geary Layne (2) +# Mackenzie Grimes (2) +# Geary Layne (1) # # ---------------------------------------------------------------------------------- # pylint: disable=invalid-name diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index f400e9ef..f1b3e38f 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -2,7 +2,7 @@ # ---------------------------------------------------------------------------------- # Created on Fri Jun 23 2023. # -# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) +# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) # Copyright (c) 2023 Colorado State University. All rights reserved. (2) # # Contributors: @@ -18,14 +18,16 @@ import time from dataclasses import dataclass, field from threading import Thread -from typing import Optional, Dict +from typing import Optional, Dict, NamedTuple, Union, cast -import pika -from pika import SelectConnection -from pika.exchange_type import ExchangeType +from pika import SelectConnection, URLParameters, BasicProperties from pika.channel import Channel +from pika.frame import Method +from pika.spec import Basic + + from idsse.common.rabbitmq_utils import Conn, Exch, Queue -from idsse.common.log_util import get_default_log_config, set_corr_id_context_var +from idsse.common.log_util import set_corr_id_context_var logger = logging.getLogger(__name__) @@ -46,6 +48,12 @@ class PublishConfirmRecords: message_number: int = 0 +class RabbitMqParams(NamedTuple): + """Data class to hold rabbitmq configurations""" + exchange: Exch + queue: Queue + + class PublishConfirm(Thread): """This is a publisher that will handle unexpected interactions with RabbitMQ such as channel and connection closures for any process. @@ -71,8 +79,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): self._stopping = False self._url = (f'amqp://{conn.username}:{conn.password}@{conn.host}' f':{str(conn.port)}/%2F?connection_attempts=3&heartbeat=3600') - self._exchange = exchange - self._queue = queue + self._rmq_params = RabbitMqParams(exchange=exchange, queue=queue) def connect(self): """This method connects to RabbitMQ, returning the connection handle. @@ -81,8 +88,8 @@ def connect(self): :rtype: pika.SelectConnection """ logger.info('Connecting to %s', self._url) - return pika.SelectConnection( - pika.URLParameters(self._url), + return SelectConnection( + URLParameters(self._url), on_open_callback=self._on_connection_open, on_open_error_callback=self._on_connection_open_error, on_close_callback=self._on_connection_closed) @@ -95,28 +102,30 @@ def publish_message(self, message, key=None) -> bool: Returns: bool: True if message successfully published to queue (channel was open and - publish did not throw - exception) + publish did not throw exception) + Raises: + RuntimeError: if channel is uninitialized (start() not completed yet) or is closed """ - success = False - if self._channel and self._channel.is_open: - - # We expect a JSON message format, do a check here... - try: - properties = pika.BasicProperties(content_type='application/json', - content_encoding='utf-8') - - self._channel.basic_publish(self._exchange.name, key, - json.dumps(message, ensure_ascii=True), - properties) - self._records.message_number += 1 - self._records.deliveries[self._records.message_number] = message - logger.debug('Published message # %i', self._records.message_number) - success = True - - except Exception as e: # pylint: disable=broad-exception-caught - logger.error('Publish message problem : %s', str(e)) - return success + if not (self._channel and self._channel.is_open): + # tried to publish message before start() was called (or while it was in progress) + channel_state = 'closed' if self._channel and self._channel.is_closed else 'None' + raise RuntimeError(f'Cannot publish messages yet; RabbitMQ channel is {channel_state}') + + # We expect a JSON message format, do a check here... + try: + properties = BasicProperties(content_type='application/json', + content_encoding='utf-8') + self._channel.basic_publish(self._rmq_params.exchange.name, key, + json.dumps(message, ensure_ascii=True), + properties) + self._records.message_number += 1 + self._records.deliveries[self._records.message_number] = message + logger.debug('Published message # %i', self._records.message_number) + return True + + except Exception as e: # pylint: disable=broad-exception-caught + logger.error('Publish message problem : %s', str(e)) + return False def run(self): """Run the thread, i.e. get connection etc... @@ -133,6 +142,13 @@ def run(self): # Finish closing self._connection.ioloop.start() + def start(self): + """Start thread to connect RabbitMQ queue and prepare to publish messages. Must be called + before publish_message(). + """ + logger.debug('Starting thread') + super().start() + def stop(self): """Stop the example by closing the channel and connection. We set a flag here so that we stop scheduling new messages to be @@ -145,6 +161,7 @@ def stop(self): self._stopping = True self._close_channel() self._close_connection() + self._stopping = False # done stopping def _on_connection_open(self, _unused_connection): """This method is called by pika once the connection to RabbitMQ has @@ -198,7 +215,7 @@ def _on_channel_open(self, channel: Channel): logger.debug('Channel opened') self._channel = channel self._add_on_channel_close_callback() - self._setup_exchange(self._exchange) + self._setup_exchange(self._rmq_params.exchange) def _add_on_channel_close_callback(self): """This method tells pika to call the on_channel_closed method if @@ -219,7 +236,7 @@ def _on_channel_closed(self, channel, reason): logger.warning('Channel %i was closed: %s', channel, reason) self._channel = None if not self._stopping: - self._connection.close() + self._close_connection() def _setup_exchange(self, exchange: Exch): """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC @@ -233,8 +250,8 @@ def _setup_exchange(self, exchange: Exch): cb = functools.partial(self._on_exchange_declareok, userdata=exchange.name) self._channel.exchange_declare(exchange=exchange.name, - exchange_type=exchange.type, - callback=cb) + exchange_type=exchange.type, + callback=cb) def _on_exchange_declareok(self, _unused_frame, userdata): """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC @@ -243,7 +260,7 @@ def _on_exchange_declareok(self, _unused_frame, userdata): :param str|unicode userdata: Extra user data (exchange name) """ logger.debug('Exchange declared: %s', userdata) - self._setup_queue(self._queue) + self._setup_queue(self._rmq_params.queue) def _setup_queue(self, queue: Queue): """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC @@ -266,9 +283,11 @@ def _on_queue_declareok(self, _unused_frame): be invoked by pika. :param pika.frame.Method _unused_frame: The Queue.DeclareOk frame """ - logger.debug('Binding %s to %s with %s', self._exchange.name, self._queue.name, '#') - self._channel.queue_bind(self._queue.name, - self._exchange.name, + logger.debug('Binding %s to %s with #', + self._rmq_params.exchange.name, + self._rmq_params.queue.name) + self._channel.queue_bind(self._rmq_params.queue.name, + self._rmq_params.exchange.name, routing_key='#', # Default wildcard key to consume everything callback=self._on_bindok) @@ -298,9 +317,10 @@ def _enable_delivery_confirmations(self): """ logger.debug('Issuing Confirm.Select RPC command') if self._channel is not None: + self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message self._channel.confirm_delivery(self._on_delivery_confirmation) - def _on_delivery_confirmation(self, method_frame): + def _on_delivery_confirmation(self, method_frame: Method): """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC command, passing in either a Basic.Ack or Basic.Nack frame with the delivery tag of the message that was published. The delivery tag @@ -311,9 +331,12 @@ def _on_delivery_confirmation(self, method_frame): that are pending confirmation. :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame """ - confirmation_type = method_frame.method.NAME.split('.')[1].lower() - ack_multiple = method_frame.method.multiple - delivery_tag = method_frame.method.delivery_tag + # tell python type checker that method will be an Ack or Nack (per pika docs) + method = cast(Union[Basic.Ack, Basic.Nack], method_frame.method) + + confirmation_type = method.NAME.split('.')[1].lower() + ack_multiple = method.multiple + delivery_tag = method.delivery_tag logger.debug('Received %s for delivery tag: %i (multiple: %s)', confirmation_type, delivery_tag, ack_multiple) @@ -351,31 +374,3 @@ def _close_connection(self): if self._connection is not None: logger.debug('Closing connection') self._connection.close() - - -def main(): - logging.config.dictConfig(get_default_log_config('INFO')) - - # Setup a test instance... - conn = Conn('localhost', '/', '5672', 'guest', 'guest') - exch = Exch('pub.conf.test', ExchangeType.topic) - queue = Queue('pub.conf', '#', durable=False, exclusive=False, auto_delete=True) - expub = PublishConfirm(conn, exch, queue) - # Start the object thread, give it a moment... - expub.start() - time.sleep(1) - - while True: - try: - msg = input() - key = 'publish.confirm.test' - expub.publish_message(msg, key) - except Exception as e: # pylint: disable=broad-exception-caught - logger.info('Exiting from test loop : %s', str(e)) - break - expub.stop() - logger.info('Stopping...') - - -if __name__ == '__main__': - main() diff --git a/python/idsse_common/test/test_grid_proj.py b/python/idsse_common/test/test_grid_proj.py index 7fb0216a..45811513 100644 --- a/python/idsse_common/test/test_grid_proj.py +++ b/python/idsse_common/test/test_grid_proj.py @@ -2,12 +2,12 @@ # ---------------------------------------------------------------------------------- # Created on Wed Aug 2 2023 # -# Copyright (c) 2023 Colorado State University. All rights reserved. (1) -# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (2) +# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) +# Copyright (c) 2023 Colorado State University. All rights reserved. (2) # # Contributors: -# Mackenzie Grimes (1) -# Geary Layne (2) +# Mackenzie Grimes (2) +# Geary Layne (1) # # ---------------------------------------------------------------------------------- # pylint: disable=missing-function-docstring,redefined-outer-name,invalid-name,protected-access diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py new file mode 100644 index 00000000..c8625912 --- /dev/null +++ b/python/idsse_common/test/test_publish_confirm.py @@ -0,0 +1,223 @@ +"""Test suite for publish_confirm.py""" +# ---------------------------------------------------------------------------------- +# Created on Wed Oct 18 2023 +# +# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1) +# Copyright (c) 2023 Colorado State University. All rights reserved. (2) +# +# Contributors: +# Mackenzie Grimes (2) +# +# ---------------------------------------------------------------------------------- +# pylint: disable=missing-function-docstring,redefined-outer-name,invalid-name,protected-access +# pylint: disable=too-few-public-methods,unused-argument + +from time import sleep +from typing import Callable, Union, Any, NamedTuple +from unittest.mock import Mock + +from pytest import fixture, raises, MonkeyPatch + +from pika.spec import Basic +from idsse.common.publish_confirm import PublishConfirm +from idsse.common.rabbitmq_utils import Conn, Exch, Queue + +EXAMPLE_CONN = Conn('localhost', '/', 5672, 'guest', 'guest') +EXAMPLE_EXCH = Exch('pub.conf.test', 'topic') +EXAMPLE_QUEUE = Queue('pub.conf', '#', False, False, True) + + +class Method(NamedTuple): + """mock of pika.frame.Method data class""" + method: Union[Basic.Ack, Basic.Nack] + + +class MockPika: + """ + Mock classes to imitate pika functionality, callbacks, etc. + + Note that classes here are reduced functionality by far; only properties/methods/interfaces + that exist here are the ones used by PublishConfirm (at the time tests were written) + """ + def __init__(self): + self.delivery_tag = 0 # pseudo-global to track messages we have "sent" to our mock server + + class Channel: + """mock of pika.channel.Channel""" + def __init__(self): + self._context = MockPika() + self.channel_number = 0 + self.is_open = True + self.is_closed = False + + def __int__(self): + """Return int representation of channel""" + return self.channel_number + + def add_on_close_callback(self, callback): + pass + + def exchange_declare(self, exchange, exchange_type, callback: Callable[[Any, str], None]): + callback('userdata') + + # pylint: disable=too-many-arguments + def queue_declare( + self, queue, durable, exclusive, auto_delete, callback: Callable[[Any], None] + ): + callback(None) + + def queue_bind(self, queue, exchange, routing_key, callback: Callable[[Any], None]): + callback(None) # connection expected, but PublishConfirm doesn't actually use it + + def confirm_delivery(self, callback: Callable[[Method], None]): + """ + Args: + callback (Callable[[Method], None]) + """ + # may need to make this mockable in the future to pass Nack or customize delivery_tag + method = Method(Basic.Ack(delivery_tag=self._context.delivery_tag)) + self._context.delivery_tag += 1 # MockPika needs to track this message ID as "sent" + + callback(method) # send new Ack message back to PublishConfirm + + def basic_publish(self, exchange: str, key: str, body: str, properties): + self._context.delivery_tag += 1 + + def close(self): + self.is_open = False + self.is_closed = True + + + class IOLoop: + """mock of pika.SelectConnection.ioloop""" + def __init__(self, on_open: Callable[[Any], None], on_close: Callable[[Any, str], None]): + self.on_open = on_open + self.on_close = on_close + + def start(self): + self.on_open(None) + + def stop(self): + self.on_close(None, 'some_reason') + + def call_later(self): + pass + + + class SelectConnection: + """mock of pika.SelectConnection""" + def __init__(self, parameters, on_open_callback, on_open_error_callback, on_close_callback): + self.is_open = True + self.is_closed = False + self._context = MockPika() + + self.ioloop = self._context.IOLoop( + on_open=on_open_callback, on_close=on_close_callback + ) + + def channel(self, on_open_callback: Callable[[Any], None]): + """ + Args: + on_open_callback (Callable[[MockPika.Channel], None]) + """ + on_open_callback(self._context.Channel()) + + def close(self): + self.is_open = False + self.is_closed = True + + +# fixtures +@fixture +def context() -> MockPika: + """Create an instance of our mocked pika library. delivery_tag initialized to 0""" + return MockPika() + + +@fixture +def publish_confirm(monkeypatch: MonkeyPatch, context: MockPika) -> PublishConfirm: + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) + return PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) + + +# tests +def test_publish_confirm_start_and_stop(publish_confirm: PublishConfirm): + publish_confirm.start() + sleep(.1) + + assert publish_confirm._connection and publish_confirm._connection.is_open + assert publish_confirm._channel and publish_confirm._channel.is_open + assert publish_confirm._records.acked == 1 # channel.confirm_delivery() sent our first message + + publish_confirm.stop() + while publish_confirm._stopping: + # Ensure stop() has completed executing before validating everything is closed + # This race condition is possible (though unlikely) since PublishConfirm has its own Thread + sleep(.1) + + assert publish_confirm._connection.is_closed + assert publish_confirm._channel.is_closed + + +def test_delivery_confirmation_handles_nack(publish_confirm: PublishConfirm, context: MockPika): + def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], None]): + method = Method(Basic.Nack(delivery_tag=context.delivery_tag)) + self._context.delivery_tag += 1 + callback(method) + + publish_confirm._records.deliveries[0] = 'Confirm.Select' + context.Channel.confirm_delivery = mock_confirm_delivery + + publish_confirm.start() + sleep(.1) + assert publish_confirm._records.nacked == 1 + assert publish_confirm._records.acked == 0 + + +def test_publish_message_success(publish_confirm: PublishConfirm): + message_data = {'data': 123} + + publish_confirm.start() + # sleep to keep our unit test (Main thread) from outrunning PublishConfirm Thread. Not ideal + sleep(.1) + result = publish_confirm.publish_message(message_data) + + assert result + assert publish_confirm._records.message_number == 1 + assert publish_confirm._records.deliveries[1] == message_data + + +def test_publish_message_exception_when_channel_not_open(publish_confirm: PublishConfirm): + message_data = {'data': 123} + + # missing a publish_confirm.start(), should fail + with raises(RuntimeError) as pytest_error: + publish_confirm.publish_message(message_data) + + assert pytest_error is not None + assert 'RabbitMQ channel is None' in str(pytest_error.value) + assert publish_confirm._records.message_number == 0 # should not have logged message sent + assert len(publish_confirm._records.deliveries) == 0 + + +def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, context: MockPika): + message_data = {'data': 123} + context.Channel.basic_publish = Mock(side_effect=RuntimeError('ACCESS_REFUSED')) + + publish_confirm.start() + sleep(.1) + success = publish_confirm.publish_message(message_data) + + # publish should have returned failure and not recorded a message delivery + assert not success + assert publish_confirm._records.message_number == 0 + assert len(publish_confirm._records.deliveries) == 0 + + +def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika): + publish_confirm.start() + sleep(.1) + + publish_confirm._on_channel_closed(context.Channel(), 'ChannelClosedByClient') + assert publish_confirm._channel is None + assert publish_confirm._connection.is_closed