Skip to content

Commit

Permalink
Feat: RabbitMqUtils: subscribe to queue (#34)
Browse files Browse the repository at this point in the history
* add utility to subscribe connection and channel to rmq queue
  • Loading branch information
mackenzie-grimes-noaa authored Nov 14, 2023
1 parent 6bea7ef commit d700233
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 4 deletions.
6 changes: 4 additions & 2 deletions python/idsse_common/idsse/common/json_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
# ------------------------------------------------------------------------------

import json
from typing import Tuple, Union, Optional
from uuid import uuid4, UUID
from typing import Any, Dict, List, Optional, Tuple, Union
from uuid import UUID, uuid4

Json = Union[Dict[str, Any], List[Any], int, str, float, bool, type[None]]


def get_corr_id(
Expand Down
127 changes: 125 additions & 2 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Module for RabbitMQ client related data classes"""
"""Module for RabbitMQ client related data classes and utility functions"""
# ----------------------------------------------------------------------------------
# Created on Fri Sep 8 2023.
#
Expand All @@ -7,9 +7,23 @@
#
# Contributors:
# Paul Hamer (2)
# Mackenzie Grimes (2)
#
# ----------------------------------------------------------------------------------
from typing import NamedTuple

import logging
import logging.config
from typing import Callable, NamedTuple, Optional, Tuple, Union

from pika import BasicProperties, ConnectionParameters, PlainCredentials
from pika.adapters import BlockingConnection, blocking_connection
from pika.frame import Method
from pika.spec import Basic

logger = logging.getLogger(__name__)

# default pseudo-queue on default exchange that RabbitMQ designates for direct reply-to RPC
DIRECT_REPLY_QUEUE = 'amq.rabbitmq.reply-to'


class Conn(NamedTuple):
Expand All @@ -20,6 +34,15 @@ class Conn(NamedTuple):
username: str
password: str

def to_connection(self) -> BlockingConnection:
"""Establish a new RabbitMQ connection using attributes in Conn data class"""
return BlockingConnection(ConnectionParameters(
host=self.host,
virtual_host=self.v_host,
port=self.port,
credentials=PlainCredentials(self.username, self.password)
))


class Exch(NamedTuple):
"""An internal data class for holding the RabbitMQ exchange info"""
Expand All @@ -40,3 +63,103 @@ class RabbitMqParams(NamedTuple):
"""Data class to hold configurations for RabbitMQ exchange/queue pair"""
exchange: Exch
queue: Queue


def _initialize_exchange_and_queue(
channel: blocking_connection.BlockingChannel,
params: RabbitMqParams
) -> str:
"""Declare and bind RabbitMQ exchange and queue using the provided channel.
Returns:
str: the name of the newly-initialized queue.
"""
exch, queue = params
logger.info('Subscribing to exchange: %s', exch.name)

# Do not try to declare the default exchange. It already exists
if exch.name != '':
channel.exchange_declare(exchange=exch.name, exchange_type=exch.type)

# Do not try to declare or bind built-in queues. They are pseudo-queues that already exist
if queue.name.startswith('amq.rabbitmq.'):
return queue.name

frame: Method = channel.queue_declare(
queue=queue.name,
exclusive=queue.exclusive,
durable=queue.durable,
auto_delete=queue.auto_delete
)

# Bind queue to exchange with routing_key. May need to support multiple keys in the future
if exch.name != '':
logger.info(' binding key %s to queue: %s', queue.route_key, queue.name)
channel.queue_bind(queue.name, exch.name, queue.route_key)
return frame.method.queue


def subscribe_to_queue(
connection: Union[Conn, BlockingConnection],
params: RabbitMqParams,
on_message_callback: Callable[
[blocking_connection.BlockingChannel, Basic.Deliver, BasicProperties, bytes], None],
channel: Optional[blocking_connection.BlockingChannel] = None
) -> Tuple[BlockingConnection, blocking_connection.BlockingChannel]:
"""
Function that handles setup of consumer of RabbitMQ queue messages, declaring the exchange and
queue if needed, and invoking the provided callback when a message is received.
If an existing BlockingConnection or BlockingChannel are passed, they are used to
setup the subscription, but by default a new connection and channel will be established and
returned, which the caller can immediately begin doing RabbitMQ operations with.
For example: start a blocking consume of messages with channel.start_consuming(), or
close gracefully with connection.close()
Args:
connection (Union[Conn, BlockingConnection]): connection parameters to establish new
RabbitMQ connection, or existing RabbitMQ connection to reuse for this consumer.
params (RabbitMqParams): parameters for the RabbitMQ exchange and queue from which to
to consume messages.
on_message_callback (Callable[
[BlockingChannel, Basic.Deliver, BasicProperties, bytes], None]):
function to handle messages that are received over the subscribed exchange and queue.
channel (Optional[BlockingChannel]): optional existing (open) RabbitMQ channel to reuse.
Default is to create unique channel for this consumer.
Returns:
Tuple[BlockingConnection, BlockingChannel]: the connection and channel, which are now open
and subscribed to the provided queue.
"""
if isinstance(connection, Conn):
# Use connection as parameters to establish new connection
_connection = connection.to_connection()
logger.info('Established new RabbitMQ connection to %s on port %i',
connection.host, connection.port)
elif isinstance(connection, BlockingConnection):
# Or existing open connection was provided, so use that
_connection = connection
else:
# connection of unsupported type passed
raise ValueError(
(f'Cannot use or create new RabbitMQ connection using type {type(connection)}. '
'Should be one of: [Conn, pika.BlockingConnection]')
)

if channel is None:
logger.info('Creating new RabbitMQ channel')
_channel = _connection.channel()
else:
_channel = channel

queue_name = _initialize_exchange_and_queue(_channel, params)

# begin consuming messages
auto_ack = queue_name == DIRECT_REPLY_QUEUE
logger.info('Consuming messages from queue %s with auto_ack: %s', queue_name, auto_ack)

_channel.basic_qos(prefetch_count=1)
_channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback,
auto_ack=auto_ack)
return (_connection, _channel)
191 changes: 191 additions & 0 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""Testing for RabbitMqUtils functions"""
# ------------------------------------------------------------------------------
# Created on Wed Nov 8 2023
#
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1)
# Copyright (c) 2023 Colorado State University. All rights reserved. (2)
#
# Contributors:
# Mackenzie Grimes (2)
#
# ------------------------------------------------------------------------------
# pylint: disable=missing-function-docstring,missing-class-docstring,too-few-public-methods
# pylint: disable=redefined-outer-name,unused-argument,protected-access,duplicate-code

from typing import NamedTuple
from unittest.mock import Mock

from pytest import fixture, raises, MonkeyPatch
from pika import BlockingConnection
from pika.adapters import blocking_connection

from idsse.common.rabbitmq_utils import Conn, Exch, Queue, RabbitMqParams, subscribe_to_queue

# Example data objects
CONN = Conn('localhost', '/', port=5672, username='user', password='password')
RMQ_PARAMS = RabbitMqParams(
Exch('ims_data', 'topic'),
Queue('ims_data', '', True, False, True)
)


class Method(NamedTuple):
"""Mock of pika.frame.Method"""
exchange: str = ' '
queue: str = ''
delivery_tag: int = 0
routing_key: str = ''


class Frame(NamedTuple):
"""Mock of pika.frame.Frame"""
method: Method


# fixtures
@fixture
def mock_channel() -> Mock:
"""Mock pika.adapters.blocking_connection.BlockingChannel object"""
def mock_queue_declare(queue: str, **_kwargs) -> Method:
return Frame(Method(queue=queue)) # create a usable (mock) Frame using queue name passed

mock_obj = Mock(spec=blocking_connection.BlockingChannel, name='MockChannel')
mock_obj.exchange_declare = Mock()
mock_obj.queue_declare = Mock(side_effect=mock_queue_declare)
mock_obj.queue_bind = Mock()
mock_obj.basic_qos = Mock()
mock_obj.close = Mock()

return mock_obj


@fixture
def mock_connection(monkeypatch: MonkeyPatch, mock_channel: Mock) -> Mock:
"""Mock pika.BlockingChannel object"""
mock_obj = Mock(name='MockConnection')
mock_obj.channel = Mock(return_value=mock_channel)
mock_obj.add_callback_threadsafe = Mock()
mock_obj.close = Mock()

return mock_obj


# tests
def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock):
mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr(
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)

# run method
mock_callback_function = Mock()
_connection, _channel = subscribe_to_queue(CONN, RMQ_PARAMS, mock_callback_function)

# assert correct (mocked) pika calls were made
mock_blocking_connection.assert_called_once()
_connection.channel.assert_called_once()

_channel.basic_qos.assert_called_once()
_channel.basic_consume.assert_called_once()

# assert exchange was declared
_channel.exchange_declare.assert_called_once_with(
exchange=RMQ_PARAMS.exchange.name,
exchange_type=RMQ_PARAMS.exchange.type
)

# assert queue was declared and bound
_channel.queue_declare.assert_called_once_with(
queue=RMQ_PARAMS.queue.name,
exclusive=RMQ_PARAMS.queue.exclusive,
durable=RMQ_PARAMS.queue.durable,
auto_delete=RMQ_PARAMS.queue.auto_delete
)

_channel.queue_bind.assert_called_once_with(
RMQ_PARAMS.exchange.name,
RMQ_PARAMS.queue.name,
RMQ_PARAMS.queue.route_key
)

# assert queue connected to message callback
_channel.basic_consume.assert_called_once_with(
queue=RMQ_PARAMS.queue.name,
on_message_callback=mock_callback_function,
auto_ack=False
)


def test_passing_connection_does_not_create_new(mock_connection):
mock_connection.__class__ = BlockingConnection # set mock type to pika.BlockingConnection
mock_callback_function = Mock(name='on_message_callback')
new_connection, new_channel = subscribe_to_queue(
mock_connection, RMQ_PARAMS, mock_callback_function
)

mock_connection.assert_not_called()
assert new_connection == mock_connection
# confirm that all channel setup proceeds normally
new_channel.basic_consume.assert_called_once_with(
queue=RMQ_PARAMS.queue.name,
on_message_callback=mock_callback_function,
auto_ack=False
)


def test_passing_unsupported_connection_type_fails():
with raises(ValueError) as exc:
subscribe_to_queue('bad connection', RMQ_PARAMS, Mock(name='on_message_callback'))
assert exc is not None


def test_passing_channel_does_not_create_new(mock_connection: Mock, mock_channel: Mock):
mock_callback_func = Mock()
mock_connection.__class__ = BlockingConnection # make mock look like real BlockingConnection

_, new_channel = subscribe_to_queue(
mock_connection, RMQ_PARAMS, mock_callback_func, channel=mock_channel
)

mock_connection.channel.assert_not_called()
new_channel.basic_consume.assert_called_once()
assert new_channel == mock_channel

def test_direct_reply_does_not_try_to_declare_queue(
monkeypatch: MonkeyPatch, mock_connection: Mock
):
params = RabbitMqParams(
Exch('ims_data', 'topic'),
Queue('amq.rabbitmq.reply-to', '', True, False, True)
)

mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr(
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)

_, new_channel = subscribe_to_queue(CONN, params, Mock(name='mock_callback'))

# assert that built-in Direct Reply-to queue was not recreated (pika would fail)
new_channel.queue_declare.assert_not_called()
new_channel.queue_bind.assert_not_called()
new_channel.basic_consume.assert_called_once()

def test_default_exchange_does_not_try_to_declare_exchange(
monkeypatch: MonkeyPatch, mock_connection: Mock
):
params = RabbitMqParams(
Exch('', 'topic'),
Queue('something', '', True, False, True)
)

mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr(
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)

_, new_channel = subscribe_to_queue(CONN, params, Mock())

new_channel.exchange_declare.assert_not_called()
new_channel.queue_declare.assert_called_once()
new_channel.basic_consume.assert_called_once()

0 comments on commit d700233

Please sign in to comment.