Skip to content

Commit

Permalink
bug: idsse-429: PublishConfirm wait for internal setup (#32)
Browse files Browse the repository at this point in the history
* add threading.Event to block while rmq setup completes
* Update python/idsse_common/idsse/common/publish_confirm.py
Co-authored-by: Geary-Layne <[email protected]>

* publishconfirm: make Thread, all non-essential methods internal
* remove callback from PublishConfirm.start(), hide in internal method
---------

Co-authored-by: Geary-Layne <[email protected]>
  • Loading branch information
mackenzie-grimes-noaa and Geary-Layne authored Nov 3, 2023
1 parent 09d4f2c commit 84c2fb1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 83 deletions.
151 changes: 94 additions & 57 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Contributors:
# Geary Layne (1)
# Paul Hamer (2)
# Mackenzie Grimes (2)
# ----------------------------------------------------------------------------------
# pylint: disable=C0111,C0103,R0205

Expand All @@ -17,10 +18,11 @@
import json
import time
from dataclasses import dataclass, field
from threading import Thread
from random import randint
from threading import Thread, Event
from typing import Optional, Dict, NamedTuple, Union, Callable, cast

from pika import SelectConnection, URLParameters, BasicProperties
from pika import SelectConnection, ConnectionParameters, PlainCredentials, BasicProperties
from pika.channel import Channel
from pika.frame import Method
from pika.spec import Basic
Expand All @@ -47,13 +49,14 @@ class PublishConfirmRecords:
message_number: int = 0


class RabbitMqParams(NamedTuple):
"""Data class to hold rabbitmq configurations"""
class PublishConfirmParams(NamedTuple):
"""Data class to hold RabbitMQ configurations for PublishConfirm"""
conn: Conn
exchange: Exch
queue: Queue


class PublishConfirm(Thread):
class PublishConfirm():
"""This is a publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures for any process.
If RabbitMQ closes the connection, it will reopen it. You should
Expand All @@ -68,97 +71,83 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
:param Exch exchange: The RabbitMQ exchange details
:param Queue queue: The RabbitMQ queue details
"""
super().__init__(daemon=True)
self._thread = Thread(name=f'{__name__}-{randint(0,9)}',
daemon=True,
target=self._run)

self._connection: Optional[SelectConnection] = None
self._channel: Optional[Channel] = None

self._records = PublishConfirmRecords()

self._stopping = False
self._url = (f'amqp://{conn.username}:{conn.password}@{conn.host}'
f':{str(conn.port)}/%2F?connection_attempts=3&heartbeat=3600')
self._rmq_params = RabbitMqParams(exchange=exchange, queue=queue)
self._rmq_params = PublishConfirmParams(conn, exchange, queue)

self._records = PublishConfirmRecords() # data class to track message activity
self._on_ready_callback: Optional[Callable[[], None]] = None

def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
logger.info('Connecting to %s', self._url)
return SelectConnection(
URLParameters(self._url),
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed)
set_corr_id_context_var('PublishConfirm')

def publish_message(self, message, key=None) -> bool:
def publish_message(self,
message: Dict,
routing_key = '',
corr_id: Optional[str] = None) -> bool:
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Args:
message (Dict): message to publish (should be valid json)
routing_key (str): routing_key to route the message to correct consumer.
Default is empty str
corr_id (Optional[str]): optional correlation_id to include in message
Returns:
bool: True if message successfully published to queue (channel was open and
publish did not throw exception)
Raises:
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed
"""
if not (self._channel and self._channel.is_open):
# tried to publish message before start() was called (or while it was in progress)
channel_state = 'closed' if self._channel and self._channel.is_closed else 'None'
raise RuntimeError(f'Cannot publish messages yet; RabbitMQ channel is {channel_state}')
self._wait_for_channel_to_be_ready()

# We expect a JSON message format, do a check here...
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8')
self._channel.basic_publish(self._rmq_params.exchange.name, key,
content_encoding='utf-8',
correlation_id=corr_id)
self._channel.basic_publish(self._rmq_params.exchange.name, routing_key,
json.dumps(message, ensure_ascii=True),
properties)
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.debug('Published message # %i', self._records.message_number)
logger.debug('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)
return True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : %s', str(e))
return False

def run(self):
"""Run the thread, i.e. get connection etc...
def start(self):
"""Start thread to connect to RabbitMQ queue and prepare to publish messages, invoking
callback when setup complete.
"""
set_corr_id_context_var('PublishConfirm')

self._connection = self.connect()
self._connection.ioloop.start()

while not self._stopping:
time.sleep(5)

if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()
logger.debug('Starting thread')
self._start()

def start(self, callback: Optional[Callable[[], None]] = None):
"""Start thread to connect RabbitMQ queue and prepare to publish messages. Must be called
before publish_message().
def _start(self, callback: Optional[Callable[[], None]] = None):
"""
Start a thread to handle PublishConfirm operations
Args:
callback (Optional[Callable[[], None]]): callback function to be invoked
once instance is ready to publish messages (all RabbitMQ connection and channel
are setup, delivery confirmation is enabled, etc.). Defaults to None
are setup, delivery confirmation is enabled, etc.). Default to None.
"""
logger.debug('Starting thread')
self._on_ready_callback = callback # to be invoked after all pika setup is done
super().start()

# callback not passed, so sleep momentarily to ensure all RabbitMQ callbacks can complete
if callback is None:
time.sleep(.2)
logger.debug('Starting thread with callback')
if callback is not None:
self._on_ready_callback = callback # to be invoked after all pika setup is done
self._thread.start()

def stop(self):
"""Stop the example by closing the channel and connection. We
Expand All @@ -174,6 +163,54 @@ def stop(self):
self._close_connection()
self._stopping = False # done stopping

def _run(self):
"""Run a new thread: get a new RMQ connection, and start looping until stop() is called"""
self._connection = self._create_connection()
self._connection.ioloop.start()
time.sleep(0.2)

while not self._stopping:
time.sleep(5)

if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()

def _create_connection(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
conn = self._rmq_params.conn
logger.info('Connecting to RabbitMQ: %s', conn)
return SelectConnection(
parameters=ConnectionParameters(
host=conn.host,
virtual_host=conn.v_host,
port=conn.port,
credentials=PlainCredentials(conn.username, conn.password)
),
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed)

def _wait_for_channel_to_be_ready(self) -> None:
"""If connection or channel are not open, start the PublishConfirm to do needed
RabbitMQ setup. This method will not return until channel is confirmed ready for use"""

# validate that PublishConfirm thread has been setup and connected to RabbitMQ
if not (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
logger.debug('Channel is not ready to publish, calling _start() now')

# pass callback to flip is_ready flag, and block until flag changes
is_ready = Event()
self._start(callback=is_ready.set)
is_ready.wait()

logger.debug('Connection and channel setup complete, ready to publish message')

def _on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
Expand Down Expand Up @@ -307,9 +344,9 @@ def _on_bindok(self, _unused_frame):
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
logger.debug('Queue bound')
self.start_publishing()
self._start_publishing()

def start_publishing(self):
def _start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
Expand Down
6 changes: 6 additions & 0 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ class Queue(NamedTuple):
durable: bool
exclusive: bool
auto_delete: bool


class RabbitMqParams(NamedTuple):
"""Data class to hold configurations for RabbitMQ exchange/queue pair"""
exchange: Exch
queue: Queue
56 changes: 30 additions & 26 deletions python/idsse_common/test/test_publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Callable, Union, Any, NamedTuple
from unittest.mock import Mock

from pytest import fixture, raises, MonkeyPatch
from pytest import fixture, MonkeyPatch

from pika.spec import Basic
from idsse.common.publish_confirm import PublishConfirm
Expand Down Expand Up @@ -170,34 +170,26 @@ def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], No
assert publish_confirm._records.acked == 0


def test_publish_message_success(publish_confirm: PublishConfirm):
message_data = {'data': 123}
def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, context: MockPika):
monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection)
pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)
example_message = {'data': [123]}

publish_confirm.start()
result = publish_confirm.publish_message(message_data)
assert pub_conf._connection is None and pub_conf._channel is None
success = pub_conf.publish_message(example_message)

assert result
assert publish_confirm._records.message_number == 1
assert publish_confirm._records.deliveries[1] == message_data
# connection & channel should have been initialized internally, so publish should have worked
assert success
assert pub_conf._channel is not None and pub_conf._channel.is_open
assert pub_conf._records.message_number == 1
assert pub_conf._records.deliveries[1] == example_message


def test_publish_message_exception_when_channel_not_open(publish_confirm: PublishConfirm):
def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, context: MockPika):
message_data = {'data': 123}

# missing a publish_confirm.start(), should fail
with raises(RuntimeError) as pytest_error:
publish_confirm.publish_message(message_data)

assert pytest_error is not None
assert 'RabbitMQ channel is None' in str(pytest_error.value)
assert publish_confirm._records.message_number == 0 # should not have logged message sent
assert len(publish_confirm._records.deliveries) == 0


def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm):
message_data = {'data': 123}

publish_confirm.start()
publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock())
publish_confirm._channel = context.Channel()
publish_confirm._channel.basic_publish = Mock(side_effect=RuntimeError('ACCESS_REFUSED'))
success = publish_confirm.publish_message(message_data)

Expand All @@ -208,7 +200,10 @@ def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm):


def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika):
publish_confirm.start()
publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock())
publish_confirm._channel = context.Channel()
publish_confirm._channel.close()

publish_confirm._on_channel_closed(context.Channel(), 'ChannelClosedByClient')
assert publish_confirm._channel is None
assert publish_confirm._connection.is_closed
Expand All @@ -223,9 +218,9 @@ def test_callback():
assert success

assert publish_confirm._channel is None
publish_confirm.start(callback=test_callback)
publish_confirm._start(test_callback)

sleep(.1) # ensure that callback has time to run and send its message
sleep(.1) # ensure that our test's callback has time to run and send its message
assert publish_confirm._records.message_number == 1
assert publish_confirm._records.deliveries[1] == example_message

Expand All @@ -247,3 +242,12 @@ def mock_sleep_function(secs: float):
# between this test's thread and the PublishConfirm thread. Both results are acceptable
sleep_call_args = [call.args for call in mock_sleep.call_args_list]
assert set(sleep_call_args) in [set([(0.2,)]), set([(0.2,), (5,)])]


def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: MockPika):
monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection)
pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)

assert pub_conf._channel is None
pub_conf._wait_for_channel_to_be_ready()
assert pub_conf._channel is not None and pub_conf._channel.is_open

0 comments on commit 84c2fb1

Please sign in to comment.