Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: idsse-897 rabbitmq_utils Rpc #87

Merged
merged 31 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
429ebab
add optional channel arg to Publisher to reuse existing RMQ Channel
mackenzie-grimes-noaa Nov 18, 2024
45a2536
correct args in method docstring
mackenzie-grimes-noaa Nov 18, 2024
32fd903
Update python/idsse_common/idsse/common/rabbitmq_utils.py
mackenzie-grimes-noaa Nov 19, 2024
7f4ac64
add rpc class to rabbitmq_utils
mackenzie-grimes-noaa Nov 19, 2024
37828aa
do away with setup() method and channel kwarg, improve docstrings
mackenzie-grimes-noaa Nov 19, 2024
eafa939
improve error message
mackenzie-grimes-noaa Nov 19, 2024
adba72f
Merge branch 'feat/publisher-channel' into feat/idsse-897/rabbitmq-rpc
mackenzie-grimes-noaa Nov 19, 2024
0207f5e
seeing pika error due to Publisher reusing Consumer's rmq channel
mackenzie-grimes-noaa Nov 20, 2024
cda6a21
switching which client (publisher/consumer) creates the shared RMQ ch…
mackenzie-grimes-noaa Nov 20, 2024
857fe40
drop Consumer/Publisher support of reusing channel, move _publish and…
mackenzie-grimes-noaa Nov 22, 2024
ca9cdb2
copy unit tests from RiskProcessor Rpc; to repurpose
mackenzie-grimes-noaa Nov 22, 2024
17541e8
add protections for Publisher/Consumer using default exch or queue
mackenzie-grimes-noaa Dec 2, 2024
2f69b80
fix spacing
mackenzie-grimes-noaa Dec 2, 2024
a25c777
merge feat/idsse-897/publisher-refactor to this branch
mackenzie-grimes-noaa Dec 2, 2024
f375ef5
revert simple_publisher unit test; it no longer supports re-using cha…
mackenzie-grimes-noaa Dec 2, 2024
342a59f
restore accidentally deleted Rpc class, unit test
mackenzie-grimes-noaa Dec 2, 2024
bac2e00
add some basic unit tests for Rpc, mocking away mutithread Consumer
mackenzie-grimes-noaa Dec 2, 2024
bfe4023
lots of complex pika thread mocking, but unit test passes
mackenzie-grimes-noaa Dec 2, 2024
f254e3b
bug fix: stop tracking Future for timed out/errored RPC requests
mackenzie-grimes-noaa Dec 2, 2024
ff18321
rebuild Rpc unit tests for error cases
mackenzie-grimes-noaa Dec 2, 2024
f6b74fd
Rpc: mock higher up in threadsafe chain
mackenzie-grimes-noaa Dec 3, 2024
80e5fd3
cleanup todo
mackenzie-grimes-noaa Dec 3, 2024
0646b8b
Update rabbitmq_utils.py
Geary-Layne Dec 3, 2024
85d0ee3
fix linter warning
mackenzie-grimes-noaa Dec 3, 2024
7ca549a
Merge pull request #89 from NOAA-GSL/Geary-Layne-patch-1
Geary-Layne Dec 3, 2024
73c70fc
re-order rabbitmq_utils classes and functions
mackenzie-grimes-noaa Dec 3, 2024
6874206
reorder public and private functions
mackenzie-grimes-noaa Dec 3, 2024
d5bd2b1
Merge branch 'feat/idsse-897/publisher-refactor' into feat/idsse-897/…
mackenzie-grimes-noaa Dec 3, 2024
1d7edcf
Merge branch 'main' into feat/idsse-897/rabbitmq-rpc
mackenzie-grimes-noaa Dec 4, 2024
d9ccee8
Update python/idsse_common/idsse/common/rabbitmq_utils.py
mackenzie-grimes-noaa Dec 6, 2024
9f28b7b
remove duplicative RpcResponse data class
mackenzie-grimes-noaa Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 140 additions & 1 deletion python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging.config
import uuid

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future, ThreadPoolExecutor
from collections.abc import Callable
from functools import partial
from threading import Event, Thread
Expand Down Expand Up @@ -98,6 +98,12 @@ class PublishMessageParams(NamedTuple):
route_key: str | None = None


class RpcResponse(NamedTuple):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this class in addition to PublishMessageParams?

Copy link
Contributor Author

@mackenzie-grimes-noaa mackenzie-grimes-noaa Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically no, but PublishMessageParams is used for outbound RMQ messages and RpcResponse was originally written for inbound responses. I just don't know what to name the class if it's used for both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "RabbitMqMessage"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works, or RmqMessage if you want shorter.

"""RabbitMQ response that came back from the recipient client of an RPC request."""
body: str
properties: BasicProperties


class Consumer(Thread):
"""
RabbitMQ consumer, runs in own thread to not block heartbeat. A thread pool
Expand Down Expand Up @@ -287,6 +293,139 @@ def stop(self):
self.connection.close)


class Rpc:
"""
RabbitMQ RPC (remote procedure call) client, runs in own thread to not block heartbeat.
The start() and stop() methods should be called from the same thread that created the instance.

This RPC class can be used to send "requests" (outbound messages) over RabbitMQ and block until
a "response" (inbound message) comes back from the receiving app. All producing to/consuming of
different queues, and associating requests with their responses, is abstracted away.

Note that RPC by RabbitMQ convention uses the built-in Direct Reply-To queue to field the
responses messages, rather than creating its own queue. Directing responses to a custom queue
is not yet supported by Rpc.

Example usage:

my_client = RpcClient(...insert params here...)

response = my_client.send_message('{"some": "json"}') # blocks while waiting for response

logger.info(f'Response from external service: {response}')
"""
def __init__(self, conn_params: Conn, exch: Exch, timeout: float | None = None):
"""
Args:
conn_params (Conn): parameters to connect to RabbitMQ server
exch (Exch): parameters of RMQ Exchange where messages should be sent
timeout (float | None): optional timeout to give up on receiving each response.
mackenzie-grimes-noaa marked this conversation as resolved.
Show resolved Hide resolved
Default is None, meaning wait indefinitely for response from external RMQ service.
"""
self._exch = exch
self._timeout = timeout
# only publish to built-in Direct Reply-to queue (recommended for RPC, less setup needed)
self._queue = Queue(DIRECT_REPLY_QUEUE, '', True, False, False)

# worklist to track corr_ids sent to remote service, and associated response when it arrives
self._pending_requests: dict[str, Future] = {}

# Start long-running thread to consume any messages from response queue
self.consumer = Consumer(
conn_params,
RabbitMqParamsAndCallback(RabbitMqParams(Exch('', 'direct'), self._queue),
self._response_callback)
)

@property
def is_open(self) -> bool:
"""Returns True if RabbitMQ connection (Publisher) is open and ready to send messages"""
return self.consumer.is_alive() and self.consumer.channel.is_open

def send_request(self, request_body: str | bytes) -> RpcResponse | None:
"""Send message to remote RabbitMQ service using thread-safe RPC. Will block until response
is received back, or timeout occurs.

Returns:
RpcResponse | None: The response message (body and properties), or None on request
timeout or error handling response.
"""
if not self.is_open:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this gets handled

logger.debug('RPC thread not yet initialized. Setting up now')
self.start()

# generate unique ID to associate our request to external service's response
request_id = str(uuid.uuid4())

# send request to external RMQ service, providing the queue where it should respond
properties = BasicProperties(content_type='application/json',
correlation_id=request_id,
reply_to=self._queue.name)

# add future to dict where callback can retrieve it and set result
request_future = Future()
self._pending_requests[request_id] = request_future

logger.debug('Publishing request message to external service with body: %s', request_body)
_blocking_publish(self.consumer.channel,
self._exch,
PublishMessageParams(request_body, properties, self._exch.route_key),
self._queue)

try:
# block until callback runs (we'll know when the future's result has been changed)
return request_future.result(timeout=self._timeout)
except TimeoutError:
logger.warning('Timed out waiting for response. correlation_id: %s', request_id)
self._pending_requests.pop(request_id) # stop tracking request Future
return None
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning('Unexpected response from external service: %s', str(exc))
self._pending_requests.pop(request_id) # stop tracking request Future
return None

def start(self):
"""Start dedicated threads to asynchronously send and receive RPC messages using a new
RabbitMQ connection and channel. Note: this method can be called externally, but it is
not required to use the client. It will automatically call this internally as needed."""
if not self.is_open:
logger.debug('Starting RPC thread to send and consume messages')
self.consumer.start()

def stop(self):
"""Unsubscribe to Direct Reply-To queue and cleanup thread"""
logger.debug('Shutting down RPC threads')
if not self.is_open:
logger.debug('RPC threads not running, nothing to cleanup')
return

# tell Consumer cleanup RabbitMQ resources and wait for thread to terminate
self.consumer.stop()
self.consumer.join()

def _response_callback(
self,
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes
):
"""Handle RabbitMQ message emitted to response queue."""
logger.debug('Received response message with routing_key: %s, content_type: %s, message: %i',
method.routing_key, properties.content_type, str(body, encoding='utf-8'))

# remove future from pending list. we will update result shortly
request_future = self._pending_requests.pop(properties.correlation_id)

# messages sent through RabbitMQ Direct reply-to are auto acked
is_direct_reply = str(method.routing_key).startswith(DIRECT_REPLY_QUEUE)
if not is_direct_reply:
channel.basic_ack(delivery_tag=method.delivery_tag)

# update future with response body to communicate it back up to main thread
return request_future.set_result(RpcResponse(str(body, encoding='utf-8'), properties))


def subscribe_to_queue(
connection: Conn | BlockingConnection,
rmq_params: RabbitMqParams,
Expand Down
2 changes: 0 additions & 2 deletions python/idsse_common/idsse/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def exec_cmd(commands: Sequence[str], timeout: int | None = None) -> Sequence[st

def to_iso(date_time: datetime) -> str:
"""Format a datetime instance to an ISO string"""
logger.debug('Datetime (%s) to iso', datetime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've wondered if these need to be here a few times.

return (f'{date_time.strftime("%Y-%m-%dT%H:%M")}:'
f'{(date_time.second + date_time.microsecond / 1e6):06.3f}'
'Z' if date_time.tzname() in [None, str(timezone.utc)]
Expand All @@ -126,7 +125,6 @@ def to_iso(date_time: datetime) -> str:

def to_compact(date_time: datetime) -> str:
"""Format a datetime instance to a compact string"""
logger.debug('Datetime (%s) to compact -- %s', datetime, __name__)
return date_time.strftime('%Y%m%d%H%M%S')


Expand Down
Loading
Loading