diff --git a/corehq/motech/repeaters/tasks.py b/corehq/motech/repeaters/tasks.py index be9fb7993138..1a003cedb064 100644 --- a/corehq/motech/repeaters/tasks.py +++ b/corehq/motech/repeaters/tasks.py @@ -78,7 +78,7 @@ from django.conf import settings -from celery import group +from celery import chord from celery.schedules import crontab from celery.utils.log import get_task_logger @@ -426,11 +426,8 @@ def get_task_signature(repeat_record): repeater = Repeater.objects.get(domain=domain, id=repeater_id) repeat_records = repeater.repeat_records_ready[:repeater.num_workers] - # Rather than using a Celery chord, process repeat records and then - # call `update_repeater()` synchronously to unlock repeater immediately. - task_group = group(get_task_signature(rr) for rr in repeat_records) - repeat_record_states = task_group().get() - update_repeater(repeat_record_states, repeater, lock_token) + header_tasks = [get_task_signature(rr) for rr in repeat_records] + chord(header_tasks)(update_repeater.s(repeater_id, lock_token)) @task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) @@ -518,7 +515,8 @@ def _get_wait_duration_seconds(repeat_record): return int(wait_duration.total_seconds()) -def update_repeater(repeat_record_states, repeater, lock_token): +@task(queue=settings.CELERY_REPEAT_RECORD_QUEUE) +def update_repeater(repeat_record_states, repeater_id, lock_token): """ Determines whether the repeater should back off, based on the results of ``_process_repeat_record()`` tasks. @@ -528,6 +526,7 @@ def update_repeater(repeat_record_states, repeater, lock_token): # We can't tell anything about the remote endpoint. return success_or_invalid = (State.Success, State.InvalidPayload) + repeater = Repeater.objects.get(id=repeater_id) if any(s in success_or_invalid for s in repeat_record_states): # The remote endpoint appears to be healthy. repeater.reset_backoff() @@ -539,7 +538,7 @@ def update_repeater(repeat_record_states, repeater, lock_token): ) repeater.set_backoff() finally: - lock = get_repeater_lock(repeater.repeater_id) + lock = get_repeater_lock(repeater_id) lock.local.token = lock_token lock.release() diff --git a/corehq/motech/repeaters/tests/test_tasks.py b/corehq/motech/repeaters/tests/test_tasks.py index 435e6dc27748..17eb0561ebda 100644 --- a/corehq/motech/repeaters/tests/test_tasks.py +++ b/corehq/motech/repeaters/tests/test_tasks.py @@ -517,51 +517,61 @@ def test_process_repeater_updates_repeater(self): class TestUpdateRepeater(SimpleTestCase): @patch('corehq.motech.repeaters.tasks.get_repeater_lock') - def test_update_repeater_resets_backoff_on_success(self, __): + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_resets_backoff_on_success(self, mock_get_repeater, __): repeat_record_states = [State.Success, State.Fail, State.Empty, None] mock_repeater = MagicMock() - update_repeater(repeat_record_states, mock_repeater, 'token') + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_called_once() @patch('corehq.motech.repeaters.tasks.get_repeater_lock') - def test_update_repeater_resets_backoff_on_invalid(self, __): + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_resets_backoff_on_invalid(self, mock_get_repeater, __): repeat_record_states = [State.InvalidPayload, State.Fail, State.Empty, None] mock_repeater = MagicMock() - update_repeater(repeat_record_states, mock_repeater, 'token') + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_called_once() @patch('corehq.motech.repeaters.tasks.get_repeater_lock') - def test_update_repeater_sets_backoff_on_failure(self, __): + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_sets_backoff_on_failure(self, mock_get_repeater, __): repeat_record_states = [State.Fail, State.Empty, None] mock_repeater = MagicMock() - update_repeater(repeat_record_states, mock_repeater, 'token') + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_called_once() mock_repeater.reset_backoff.assert_not_called() @patch('corehq.motech.repeaters.tasks.get_repeater_lock') - def test_update_repeater_does_nothing_on_empty(self, __): + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_does_nothing_on_empty(self, mock_get_repeater, __): repeat_record_states = [State.Empty] mock_repeater = MagicMock() - update_repeater(repeat_record_states, mock_repeater, 'token') + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_not_called() @patch('corehq.motech.repeaters.tasks.get_repeater_lock') - def test_update_repeater_does_nothing_on_none(self, __): + @patch('corehq.motech.repeaters.tasks.Repeater.objects.get') + def test_update_repeater_does_nothing_on_none(self, mock_get_repeater, __): repeat_record_states = [None] mock_repeater = MagicMock() - update_repeater(repeat_record_states, mock_repeater, 'token') + mock_get_repeater.return_value = mock_repeater + update_repeater(repeat_record_states, 1, 'token') mock_repeater.set_backoff.assert_not_called() mock_repeater.reset_backoff.assert_not_called()