Skip to content

Commit

Permalink
Support configuring the AMQP heartbeat interval, or disabling heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
natefoo committed Mar 26, 2024
1 parent 9eb6a8e commit d340cdd
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
4 changes: 4 additions & 0 deletions app.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@
## this value is used as the timeout argument to the producer.publish function.
#amqp_publish_timeout: 2.0

# AMQP heartbeat interval (see: https://www.rabbitmq.com/docs/heartbeats). Set
# to false or 0 to disable heartbeats.
#amqp_heartbeat: 580

# AMQP does not guarantee that a published message is received by the AMQP
# server, so Pulsar can request that the consumer acknowledge messages and will
# resend them if acknowledgement is not received after a configurable timeout
Expand Down
5 changes: 5 additions & 0 deletions docs/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ In the event that the connection to the AMQP server is lost during message
publish, the Pulsar server can retry the connection, governed by the
``amqp_publish*`` options documented in `app.yml.sample`_.

By default, `AMQP heartbeats`_ are enabled at an interval of 580 seconds. This
can be adjusted by setting ``amqp_heartbeat`` or disabled by setting it to
``false`` or ``0``.

Caching (Experimental)
----------------------

Expand All @@ -241,3 +245,4 @@ and future plans and progress can be tracked on `this Trello card <https://trell
.. _RabbitMQ: https://www.rabbitmq.com/
.. _app.yml.sample: https://github.com/galaxyproject/pulsar/blob/master/app.yml.sample
.. _Two Generals Problem: https://en.wikipedia.org/wiki/Two_Generals%27_Problem
.. _AMQP heartbeats: https://www.rabbitmq.com/docs/heartbeats
9 changes: 7 additions & 2 deletions pulsar/client/amqp_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
publish_uuid_store=None,
consume_uuid_store=None,
republish_time=DEFAULT_REPUBLISH_TIME,
heartbeat=DEFAULT_HEARTBEAT,
):
"""
"""
Expand Down Expand Up @@ -102,6 +103,7 @@ def __init__(
self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE)
self.__timeout = timeout
self.__republish_time = republish_time
self.__heartbeat = heartbeat
# Be sure to log message publishing failures.
if publish_kwds.get("retry", False):
if "retry_policy" not in publish_kwds:
Expand Down Expand Up @@ -137,10 +139,13 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
callbacks.append(callback)
while check:
heartbeat_thread = None
if self.__heartbeat:
connection_kwargs["heartbeat"] = self.__heartbeat
try:
with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection:
with self.connection(self.__url, **connection_kwargs) as connection:
with kombu.Consumer(connection, queues=[queue], callbacks=callbacks, accept=['json']):
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
if self.__heartbeat:
heartbeat_thread = self.__start_heartbeat(queue_name, connection)
while check and connection.connected:
try:
connection.drain_events(timeout=self.__timeout)
Expand Down
2 changes: 2 additions & 0 deletions pulsar/client/amqp_exchange_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def get_exchange(url, manager_name, params):
)
if params.get('amqp_acknowledge', False):
exchange_kwds.update(parse_ack_kwds(params, manager_name))
if params.get("amqp_heartbeat") is not None:
exchange_kwds["heartbeat"] = params.get("amqp_heartbeat")
timeout = params.get('amqp_consumer_timeout', False)
if timeout is not False:
exchange_kwds['timeout'] = timeout
Expand Down

0 comments on commit d340cdd

Please sign in to comment.