Skip to content

Commit

Permalink
Revert "Try calling update_repeater() synchronously"
Browse files Browse the repository at this point in the history
That was a bad idea

> Avoid launching synchronous subtasks
>
> Having a task wait for the result of another task is really inefficient, and
> may even cause a deadlock if the worker pool is exhausted.
>
> -- https://docs.celeryq.dev/en/stable/userguide/tasks.html#task-synchronous-subtasks

This reverts commit 55a81e5.
  • Loading branch information
kaapstorm committed Jan 10, 2025
1 parent 55a81e5 commit b1dcbbd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
15 changes: 7 additions & 8 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@

from django.conf import settings

from celery import group
from celery import chord
from celery.schedules import crontab
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -426,11 +426,8 @@ def get_task_signature(repeat_record):

repeater = Repeater.objects.get(domain=domain, id=repeater_id)
repeat_records = repeater.repeat_records_ready[:repeater.num_workers]
# Rather than using a Celery chord, process repeat records and then
# call `update_repeater()` synchronously to unlock repeater immediately.
task_group = group(get_task_signature(rr) for rr in repeat_records)
repeat_record_states = task_group().get()
update_repeater(repeat_record_states, repeater, lock_token)
header_tasks = [get_task_signature(rr) for rr in repeat_records]
chord(header_tasks)(update_repeater.s(repeater_id, lock_token))


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
Expand Down Expand Up @@ -518,7 +515,8 @@ def _get_wait_duration_seconds(repeat_record):
return int(wait_duration.total_seconds())


def update_repeater(repeat_record_states, repeater, lock_token):
@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def update_repeater(repeat_record_states, repeater_id, lock_token):
"""
Determines whether the repeater should back off, based on the
results of ``_process_repeat_record()`` tasks.
Expand All @@ -528,6 +526,7 @@ def update_repeater(repeat_record_states, repeater, lock_token):
# We can't tell anything about the remote endpoint.
return
success_or_invalid = (State.Success, State.InvalidPayload)
repeater = Repeater.objects.get(id=repeater_id)
if any(s in success_or_invalid for s in repeat_record_states):
# The remote endpoint appears to be healthy.
repeater.reset_backoff()
Expand All @@ -539,7 +538,7 @@ def update_repeater(repeat_record_states, repeater, lock_token):
)
repeater.set_backoff()
finally:
lock = get_repeater_lock(repeater.repeater_id)
lock = get_repeater_lock(repeater_id)
lock.local.token = lock_token
lock.release()

Expand Down
30 changes: 20 additions & 10 deletions corehq/motech/repeaters/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,51 +517,61 @@ def test_process_repeater_updates_repeater(self):
class TestUpdateRepeater(SimpleTestCase):

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
def test_update_repeater_resets_backoff_on_success(self, __):
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __):
repeat_record_states = [State.Success, State.Fail, State.Empty, None]

mock_repeater = MagicMock()
update_repeater(repeat_record_states, mock_repeater, 'token')
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_called_once()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
def test_update_repeater_resets_backoff_on_invalid(self, __):
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_resets_backoff_on_invalid(self, mock_get_repeater, __):
repeat_record_states = [State.InvalidPayload, State.Fail, State.Empty, None]

mock_repeater = MagicMock()
update_repeater(repeat_record_states, mock_repeater, 'token')
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_called_once()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
def test_update_repeater_sets_backoff_on_failure(self, __):
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __):
repeat_record_states = [State.Fail, State.Empty, None]

mock_repeater = MagicMock()
update_repeater(repeat_record_states, mock_repeater, 'token')
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')

mock_repeater.set_backoff.assert_called_once()
mock_repeater.reset_backoff.assert_not_called()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
def test_update_repeater_does_nothing_on_empty(self, __):
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __):
repeat_record_states = [State.Empty]

mock_repeater = MagicMock()
update_repeater(repeat_record_states, mock_repeater, 'token')
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
def test_update_repeater_does_nothing_on_none(self, __):
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __):
repeat_record_states = [None]

mock_repeater = MagicMock()
update_repeater(repeat_record_states, mock_repeater, 'token')
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()
Expand Down

0 comments on commit b1dcbbd

Please sign in to comment.