Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: idsse-368: publish confirm tests #26

Merged
merged 5 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/idsse_common/idsse/common/grid_proj.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# ----------------------------------------------------------------------------------
# Created on Mon Jul 31 2023
#
# Copyright (c) 2023 Colorado State University. All rights reserved. (1)
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (2)
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1)
# Copyright (c) 2023 Colorado State University. All rights reserved. (2)
#
# Contributors:
# Mackenzie Grimes (1)
# Geary Layne (2)
# Mackenzie Grimes (2)
# Geary Layne (1)
#
# ----------------------------------------------------------------------------------
# pylint: disable=invalid-name
Expand Down
137 changes: 66 additions & 71 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# ----------------------------------------------------------------------------------
# Created on Fri Jun 23 2023.
#
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1)
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1)
# Copyright (c) 2023 Colorado State University. All rights reserved. (2)
#
# Contributors:
Expand All @@ -18,14 +18,16 @@
import time
from dataclasses import dataclass, field
from threading import Thread
from typing import Optional, Dict
from typing import Optional, Dict, NamedTuple, Union, cast

import pika
from pika import SelectConnection
from pika.exchange_type import ExchangeType
from pika import SelectConnection, URLParameters, BasicProperties
from pika.channel import Channel
from pika.frame import Method
from pika.spec import Basic


from idsse.common.rabbitmq_utils import Conn, Exch, Queue
from idsse.common.log_util import get_default_log_config, set_corr_id_context_var
from idsse.common.log_util import set_corr_id_context_var

logger = logging.getLogger(__name__)

Expand All @@ -46,6 +48,12 @@ class PublishConfirmRecords:
message_number: int = 0


class RabbitMqParams(NamedTuple):
"""Data class to hold rabbitmq configurations"""
exchange: Exch
queue: Queue


class PublishConfirm(Thread):
"""This is a publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures for any process.
Expand All @@ -71,8 +79,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
self._stopping = False
self._url = (f'amqp://{conn.username}:{conn.password}@{conn.host}'
f':{str(conn.port)}/%2F?connection_attempts=3&heartbeat=3600')
self._exchange = exchange
self._queue = queue
self._rmq_params = RabbitMqParams(exchange=exchange, queue=queue)

def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
Expand All @@ -81,8 +88,8 @@ def connect(self):
:rtype: pika.SelectConnection
"""
logger.info('Connecting to %s', self._url)
return pika.SelectConnection(
pika.URLParameters(self._url),
return SelectConnection(
URLParameters(self._url),
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed)
Expand All @@ -95,28 +102,30 @@ def publish_message(self, message, key=None) -> bool:

Returns:
bool: True if message successfully published to queue (channel was open and
publish did not throw
exception)
publish did not throw exception)
Raises:
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed
"""
success = False
if self._channel and self._channel.is_open:

# We expect a JSON message format, do a check here...
try:
properties = pika.BasicProperties(content_type='application/json',
content_encoding='utf-8')

self._channel.basic_publish(self._exchange.name, key,
json.dumps(message, ensure_ascii=True),
properties)
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.debug('Published message # %i', self._records.message_number)
success = True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : %s', str(e))
return success
if not (self._channel and self._channel.is_open):
# tried to publish message before start() was called (or while it was in progress)
channel_state = 'closed' if self._channel and self._channel.is_closed else 'None'
raise RuntimeError(f'Cannot publish messages yet; RabbitMQ channel is {channel_state}')

# We expect a JSON message format, do a check here...
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8')
self._channel.basic_publish(self._rmq_params.exchange.name, key,
json.dumps(message, ensure_ascii=True),
properties)
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.debug('Published message # %i', self._records.message_number)
return True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : %s', str(e))
return False

def run(self):
"""Run the thread, i.e. get connection etc...
Expand All @@ -133,6 +142,13 @@ def run(self):
# Finish closing
self._connection.ioloop.start()

def start(self):
"""Start thread to connect RabbitMQ queue and prepare to publish messages. Must be called
before publish_message().
"""
logger.debug('Starting thread')
super().start()

def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
Expand All @@ -145,6 +161,7 @@ def stop(self):
self._stopping = True
self._close_channel()
self._close_connection()
self._stopping = False # done stopping

def _on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
Expand Down Expand Up @@ -198,7 +215,7 @@ def _on_channel_open(self, channel: Channel):
logger.debug('Channel opened')
self._channel = channel
self._add_on_channel_close_callback()
self._setup_exchange(self._exchange)
self._setup_exchange(self._rmq_params.exchange)

def _add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
Expand All @@ -219,7 +236,7 @@ def _on_channel_closed(self, channel, reason):
logger.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
self._close_connection()

def _setup_exchange(self, exchange: Exch):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
Expand All @@ -233,8 +250,8 @@ def _setup_exchange(self, exchange: Exch):
cb = functools.partial(self._on_exchange_declareok,
userdata=exchange.name)
self._channel.exchange_declare(exchange=exchange.name,
exchange_type=exchange.type,
callback=cb)
exchange_type=exchange.type,
callback=cb)

def _on_exchange_declareok(self, _unused_frame, userdata):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
Expand All @@ -243,7 +260,7 @@ def _on_exchange_declareok(self, _unused_frame, userdata):
:param str|unicode userdata: Extra user data (exchange name)
"""
logger.debug('Exchange declared: %s', userdata)
self._setup_queue(self._queue)
self._setup_queue(self._rmq_params.queue)

def _setup_queue(self, queue: Queue):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
Expand All @@ -266,9 +283,11 @@ def _on_queue_declareok(self, _unused_frame):
be invoked by pika.
:param pika.frame.Method _unused_frame: The Queue.DeclareOk frame
"""
logger.debug('Binding %s to %s with %s', self._exchange.name, self._queue.name, '#')
self._channel.queue_bind(self._queue.name,
self._exchange.name,
logger.debug('Binding %s to %s with #',
self._rmq_params.exchange.name,
self._rmq_params.queue.name)
self._channel.queue_bind(self._rmq_params.queue.name,
self._rmq_params.exchange.name,
routing_key='#', # Default wildcard key to consume everything
callback=self._on_bindok)

Expand Down Expand Up @@ -298,9 +317,10 @@ def _enable_delivery_confirmations(self):
"""
logger.debug('Issuing Confirm.Select RPC command')
if self._channel is not None:
self._records.deliveries[0] = 'Confirm.SelectOk' # track the confirmation message
self._channel.confirm_delivery(self._on_delivery_confirmation)

def _on_delivery_confirmation(self, method_frame):
def _on_delivery_confirmation(self, method_frame: Method):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
Expand All @@ -311,9 +331,12 @@ def _on_delivery_confirmation(self, method_frame):
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
ack_multiple = method_frame.method.multiple
delivery_tag = method_frame.method.delivery_tag
# tell python type checker that method will be an Ack or Nack (per pika docs)
method = cast(Union[Basic.Ack, Basic.Nack], method_frame.method)

confirmation_type = method.NAME.split('.')[1].lower()
ack_multiple = method.multiple
delivery_tag = method.delivery_tag

logger.debug('Received %s for delivery tag: %i (multiple: %s)',
confirmation_type, delivery_tag, ack_multiple)
Expand Down Expand Up @@ -351,31 +374,3 @@ def _close_connection(self):
if self._connection is not None:
logger.debug('Closing connection')
self._connection.close()


def main():
logging.config.dictConfig(get_default_log_config('INFO'))

# Setup a test instance...
conn = Conn('localhost', '/', '5672', 'guest', 'guest')
exch = Exch('pub.conf.test', ExchangeType.topic)
queue = Queue('pub.conf', '#', durable=False, exclusive=False, auto_delete=True)
expub = PublishConfirm(conn, exch, queue)
# Start the object thread, give it a moment...
expub.start()
time.sleep(1)

while True:
try:
msg = input()
key = 'publish.confirm.test'
expub.publish_message(msg, key)
except Exception as e: # pylint: disable=broad-exception-caught
logger.info('Exiting from test loop : %s', str(e))
break
expub.stop()
logger.info('Stopping...')


if __name__ == '__main__':
main()
8 changes: 4 additions & 4 deletions python/idsse_common/test/test_grid_proj.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# ----------------------------------------------------------------------------------
# Created on Wed Aug 2 2023
#
# Copyright (c) 2023 Colorado State University. All rights reserved. (1)
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (2)
# Copyright (c) 2023 Regents of the University of Colorado. All rights reserved. (1)
# Copyright (c) 2023 Colorado State University. All rights reserved. (2)
#
# Contributors:
# Mackenzie Grimes (1)
# Geary Layne (2)
# Mackenzie Grimes (2)
# Geary Layne (1)
#
# ----------------------------------------------------------------------------------
# pylint: disable=missing-function-docstring,redefined-outer-name,invalid-name,protected-access
Expand Down
Loading
Loading