Skip to content

Commit

Permalink
Try calling update_repeater() synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
kaapstorm committed Jan 10, 2025
1 parent 0e5c5ec commit 55a81e5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 27 deletions.
15 changes: 8 additions & 7 deletions corehq/motech/repeaters/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@

from django.conf import settings

from celery import chord
from celery import group
from celery.schedules import crontab
from celery.utils.log import get_task_logger

Expand Down Expand Up @@ -426,8 +426,11 @@ def get_task_signature(repeat_record):

repeater = Repeater.objects.get(domain=domain, id=repeater_id)
repeat_records = repeater.repeat_records_ready[:repeater.num_workers]
header_tasks = [get_task_signature(rr) for rr in repeat_records]
chord(header_tasks)(update_repeater.s(repeater_id, lock_token))
# Rather than using a Celery chord, process repeat records and then
# call `update_repeater()` synchronously to unlock repeater immediately.
task_group = group(get_task_signature(rr) for rr in repeat_records)
repeat_record_states = task_group().get()
update_repeater(repeat_record_states, repeater, lock_token)


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
Expand Down Expand Up @@ -515,8 +518,7 @@ def _get_wait_duration_seconds(repeat_record):
return int(wait_duration.total_seconds())


@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE)
def update_repeater(repeat_record_states, repeater_id, lock_token):
def update_repeater(repeat_record_states, repeater, lock_token):
"""
Determines whether the repeater should back off, based on the
results of ``_process_repeat_record()`` tasks.
Expand All @@ -526,7 +528,6 @@ def update_repeater(repeat_record_states, repeater_id, lock_token):
# We can't tell anything about the remote endpoint.
return
success_or_invalid = (State.Success, State.InvalidPayload)
repeater = Repeater.objects.get(id=repeater_id)
if any(s in success_or_invalid for s in repeat_record_states):
# The remote endpoint appears to be healthy.
repeater.reset_backoff()
Expand All @@ -538,7 +539,7 @@ def update_repeater(repeat_record_states, repeater_id, lock_token):
)
repeater.set_backoff()
finally:
lock = get_repeater_lock(repeater_id)
lock = get_repeater_lock(repeater.repeater_id)
lock.local.token = lock_token
lock.release()

Expand Down
30 changes: 10 additions & 20 deletions corehq/motech/repeaters/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,61 +517,51 @@ def test_process_repeater_updates_repeater(self):
class TestUpdateRepeater(SimpleTestCase):

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __):
def test_update_repeater_resets_backoff_on_success(self, __):
repeat_record_states = [State.Success, State.Fail, State.Empty, None]

mock_repeater = MagicMock()
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')
update_repeater(repeat_record_states, mock_repeater, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_called_once()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_resets_backoff_on_invalid(self, mock_get_repeater, __):
def test_update_repeater_resets_backoff_on_invalid(self, __):
repeat_record_states = [State.InvalidPayload, State.Fail, State.Empty, None]

mock_repeater = MagicMock()
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')
update_repeater(repeat_record_states, mock_repeater, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_called_once()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __):
def test_update_repeater_sets_backoff_on_failure(self, __):
repeat_record_states = [State.Fail, State.Empty, None]

mock_repeater = MagicMock()
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')
update_repeater(repeat_record_states, mock_repeater, 'token')

mock_repeater.set_backoff.assert_called_once()
mock_repeater.reset_backoff.assert_not_called()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __):
def test_update_repeater_does_nothing_on_empty(self, __):
repeat_record_states = [State.Empty]

mock_repeater = MagicMock()
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')
update_repeater(repeat_record_states, mock_repeater, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()

@patch('corehq.motech.repeaters.tasks.get_repeater_lock')
@patch('corehq.motech.repeaters.tasks.Repeater.objects.get')
def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __):
def test_update_repeater_does_nothing_on_none(self, __):
repeat_record_states = [None]

mock_repeater = MagicMock()
mock_get_repeater.return_value = mock_repeater
update_repeater(repeat_record_states, 1, 'token')
update_repeater(repeat_record_states, mock_repeater, 'token')

mock_repeater.set_backoff.assert_not_called()
mock_repeater.reset_backoff.assert_not_called()
Expand Down

0 comments on commit 55a81e5

Please sign in to comment.