Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check_repeaters() task iterates repeaters #34946

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5a0e77a
Drop soft asserts
kaapstorm Aug 4, 2024
127912c
Add TODOs
kaapstorm Aug 3, 2024
f2b3aa0
`check_repeaters()` uses `iter_ready_repeaters()`
kaapstorm Aug 3, 2024
1267f51
Test `iter_ready_repeaters()`
kaapstorm Aug 3, 2024
1714f62
`process_repeater()` task
kaapstorm Aug 3, 2024
c6f603b
Update tests
kaapstorm Aug 4, 2024
3d40f91
Add tests
kaapstorm Aug 4, 2024
3d4e4cd
TestIterReadyRepeater extends SimpleTestCase
kaapstorm Aug 6, 2024
d90a5c1
Fix TestUpdateRepeater
kaapstorm Aug 6, 2024
86f1cce
Explain the purpose of MAX_REPEATER_WORKERS
kaapstorm Aug 6, 2024
3315c45
Update TestAppStructureRepeater test
kaapstorm Aug 6, 2024
7c4e3cf
Fix repeater locking
kaapstorm Aug 6, 2024
a916c34
Drop repeater lock
kaapstorm Aug 6, 2024
bb17648
Don't retry payload errors
kaapstorm Aug 7, 2024
3047c08
Fix `RepeatRecord.count_overdue()`
kaapstorm Aug 7, 2024
5370a17
Some client errors need a retry
kaapstorm Aug 8, 2024
f2f50e9
Polling overdue every 5 minutes seems enough
kaapstorm Aug 9, 2024
976d297
Improve query by filtering domains
kaapstorm Aug 9, 2024
a922568
Add metric for one loop over repeaters
kaapstorm Aug 9, 2024
1bdca83
Merge branch 'master' into nh/iter_repeaters
kaapstorm Aug 9, 2024
6458b3e
Index filtered fields, add `max_workers`
kaapstorm Aug 9, 2024
369c96e
A partial implementation of a lock using Postgres
kaapstorm Aug 9, 2024
74e6676
Lock repeater
kaapstorm Aug 9, 2024
5e4fbca
Add InvalidPayload state
kaapstorm Aug 10, 2024
5fe6d3c
Make repeat_record.state explicit
kaapstorm Aug 10, 2024
407d60d
Fix tests
kaapstorm Aug 14, 2024
7a02d6a
Revert "Lock repeater"
kaapstorm Aug 13, 2024
62369a4
Revert "A partial implementation of a lock using Postgres"
kaapstorm Aug 13, 2024
2bb7c59
Use Redis Lock
kaapstorm Aug 13, 2024
7d5abeb
Round-robin repeaters by domain
kaapstorm Aug 12, 2024
b8543a8
Skip rate-limited repeaters
kaapstorm Aug 12, 2024
2600e94
Make response status clearer
kaapstorm Aug 14, 2024
1b0306a
Use automatic rollback
kaapstorm Aug 14, 2024
d333eeb
Being locked out is not a problem
kaapstorm Aug 13, 2024
6b79a5e
Drop `attempt_forward_now()`
kaapstorm Aug 12, 2024
61a0a00
Drop `fire_synchronously` param
kaapstorm Aug 12, 2024
04c8177
Drop `send_request()`
kaapstorm Aug 12, 2024
f586d2c
Drop `get_payload()`
kaapstorm Aug 12, 2024
eb1f424
fixup! Index filtered fields, add `max_workers`
kaapstorm Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions corehq/apps/app_manager/tests/test_repeater.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from django.test import TestCase
from django.test.client import Client

from unittest.mock import patch

from corehq.apps.accounting.models import SoftwarePlanEdition
from corehq.apps.accounting.tests.utils import DomainSubscriptionMixin
from corehq.apps.accounting.utils import clear_plan_version_cache
Expand Down Expand Up @@ -64,18 +62,3 @@ def test_repeat_record_created(self):
later = datetime.utcnow() + timedelta(hours=48 + 1)
repeat_records = RepeatRecord.objects.filter(domain=self.domain, next_check__lt=later)
self.assertEqual(len(repeat_records), 1)

def test_repeat_record_forwarded(self):
"""
When an application with a repeater is saved, then HQ should try to forward the repeat record
"""
self.app_structure_repeater = AppStructureRepeater(domain=self.domain, connection_settings=self.conn)
self.app_structure_repeater.save()
self.addCleanup(self.app_structure_repeater.delete)

with patch('corehq.motech.repeaters.models.simple_request') as mock_fire:
self.application = Application(domain=self.domain)
self.application.save()
self.addCleanup(self.application.delete)

self.assertEqual(mock_fire.call_count, 1)
8 changes: 7 additions & 1 deletion corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
MIN_RETRY_WAIT = timedelta(minutes=60)
RATE_LIMITER_DELAY_RANGE = (timedelta(minutes=0), timedelta(minutes=15))
CHECK_REPEATERS_INTERVAL = timedelta(minutes=5)
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT # TODO: Drop
CHECK_REPEATERS_KEY = 'check-repeaters-key'
# Number of attempts to an online endpoint before cancelling payload
MAX_ATTEMPTS = 3
# Number of exponential backoff attempts to an offline endpoint
MAX_BACKOFF_ATTEMPTS = 6
# The maximum number of workers that one repeater can use to send repeat
# records at the same time. (In other words, HQ's capacity to DDOS
# attack a remote API endpoint.) This is a guardrail to prevent one
# domain from hogging repeat record queue workers and to ensure that
# repeaters are iterated fairly.
MAX_REPEATER_WORKERS = 7


class State(IntegerChoices):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ def create_case_repeater_register(repeater, domain, payload):
'domain': domain,
'doc_type': repeater.repeater_type
})
repeat_record.attempt_forward_now()
# TODO: No, the repeater sends the repeat record.
# repeat_record.attempt_forward_now()
return repeat_record


Expand Down
130 changes: 103 additions & 27 deletions corehq/motech/repeaters/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@
"""
import inspect
import json
import random
import traceback
import uuid
from collections import defaultdict
from datetime import datetime, timedelta
from http import HTTPStatus
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse

Expand All @@ -92,6 +94,7 @@

from corehq import toggles
from corehq.apps.accounting.utils import domain_has_privilege
from corehq.apps.domain.models import Domain
from corehq.apps.locations.models import SQLLocation
from corehq.apps.users.models import CommCareUser
from corehq.form_processor.exceptions import XFormNotFound
Expand Down Expand Up @@ -122,8 +125,10 @@
from .const import (
MAX_ATTEMPTS,
MAX_BACKOFF_ATTEMPTS,
MAX_REPEATER_WORKERS,
MAX_RETRY_WAIT,
MIN_RETRY_WAIT,
RATE_LIMITER_DELAY_RANGE,
State,
)
from .exceptions import RequestConnectionError, UnknownRepeater
Expand Down Expand Up @@ -220,6 +225,7 @@ def all_ready(self):
"""
Return all Repeaters ready to be forwarded.
"""
domains = get_domains_forwarding_enabled()
not_paused = models.Q(is_paused=False)
next_attempt_not_in_the_future = (
models.Q(next_attempt_at__isnull=True)
Expand All @@ -228,10 +234,13 @@ def all_ready(self):
repeat_records_ready_to_send = models.Q(
repeat_records__state__in=(State.Pending, State.Fail)
)
return (self.get_queryset()
.filter(not_paused)
.filter(next_attempt_not_in_the_future)
.filter(repeat_records_ready_to_send))
return (
self.get_queryset()
.filter(domain__in=domains)
.filter(not_paused)
.filter(next_attempt_not_in_the_future)
.filter(repeat_records_ready_to_send)
)

def get_queryset(self):
repeater_obj = self.model()
Expand All @@ -258,6 +267,7 @@ class Repeater(RepeaterSuperProxy):
is_paused = models.BooleanField(default=False)
next_attempt_at = models.DateTimeField(null=True, blank=True)
last_attempt_at = models.DateTimeField(null=True, blank=True)
# TODO: max_workers = models.IntegerField(default=1)
options = JSONField(default=dict)
connection_settings_id = models.IntegerField(db_index=True)
is_deleted = models.BooleanField(default=False, db_index=True)
Expand Down Expand Up @@ -348,21 +358,19 @@ def _repeater_type(cls):

@property
def repeat_records_ready(self):
return self.repeat_records.filter(state__in=(State.Pending, State.Fail))
return (
self.repeat_records
.filter(state__in=(State.Pending, State.Fail))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an index next_check_not_null and a check constraint next_check_pending_or_null that validates and keeps next_check in sync with state. If we're dropping next_check then we'll need to redefine that index to make this condition efficient. Looks like we'll want to include repeater_id in that new index.

.order_by('registered_at')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting confused on next_check and where that is being set/checked, but is it possible that we would end up in a scenario where a repeater processes the same record multiple consecutive times because it's registered_at is the oldest, but it continues to fail? This would obviously have an upper limit of attempts (4 or 6), but I think would still not be ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one payload fails repeatedly, but another succeeds, that sounds like a problem with the payload and not the remote endpoint. Trying to send it again isn't going to help. I think when a remote API responds with a 4XX error, we should immediately cancel the repeat record.

RepeatRecord.next_check is no longer relevant. It only exists because we had no way of figuring out what was happening with the remote endpoint. But now that we are iterating Repeaters instead of RepeatRecords, we can back off based on the status of the endpoint, and not the failure of the payload. Repeater back-off happens in the update_repeater() task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when a remote API responds with a 4XX error, we should immediately cancel the repeat record.

What about a 429 (Too Many Requests) response with a retry-after header, where the remote API is asking us to back off? That seems like exactly the type of situation where we'd want to retry later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

@property
def is_ready(self):
"""
Returns True if there are repeat records to be sent.
"""
if self.is_paused or toggles.PAUSE_DATA_FORWARDING.enabled(self.domain):
return False
if not (self.next_attempt_at is None
or self.next_attempt_at < timezone.now()):
return False
return self.repeat_records_ready.exists()
def rate_limit(self):
interval = random.uniform(*RATE_LIMITER_DELAY_RANGE)
Repeater.objects.filter(id=self.repeater_id).update(
next_attempt_at=datetime.utcnow() + interval,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention of random.uniform(*RATE_LIMITER_DELAY_RANGE) was to avoid clumping if rate limiting a large amount of records from a specific repeater simultaneously, since a different interval value would be calculated and applied to each individual record.

This code no longer does that, but that seems fine given a repeater will try one record at a time now (unless max repeaters is > 1, but then it is the project's choice).

So maybe the only thing to do is revert the random.uniform change back to a constant delay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, rate limiting at the repeater level is pretty different than the existing behavior. If a project has a high volume of records, they might be rate limited at the minute window level, but would be able to successfully send records in the following minute (in the current world). Whereas postponing the entire repeater would delay any records from being sent by up to 15 minutes, which I don't think is desirable. So maybe pushing rate limiting back down to a record by record basis is best?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should chat through this idea offline. I might not be understanding how you're thinking of implementing this, because I worry that if we rate limit at the repeat record level, then we will be iterating repeaters that don't actually have repeat records ready to send. That would result in churning through process_repeater() workers with nothing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def set_next_attempt(self):
def set_backoff(self):
now = datetime.utcnow()
interval = _get_retry_interval(self.last_attempt_at, now)
self.last_attempt_at = now
Expand All @@ -375,7 +383,7 @@ def set_next_attempt(self):
next_attempt_at=now + interval,
)

def reset_next_attempt(self):
def reset_backoff(self):
if self.last_attempt_at or self.next_attempt_at:
self.last_attempt_at = None
self.next_attempt_at = None
Expand Down Expand Up @@ -411,7 +419,8 @@ def register(self, payload, fire_synchronously=False):
# Prime the cache to prevent unnecessary lookup. Only do this for synchronous repeaters
# to prevent serializing the repeater in the celery task payload
repeat_record.__dict__["repeater"] = self
repeat_record.attempt_forward_now(fire_synchronously=fire_synchronously)
# TODO: No, send the repeat record when it's its turn.
# repeat_record.attempt_forward_now(fire_synchronously=fire_synchronously)
return repeat_record

def allowed_to_forward(self, payload):
Expand All @@ -432,6 +441,11 @@ def retire(self):
self.is_deleted = True
Repeater.objects.filter(id=self.repeater_id).update(is_deleted=True)

@property
def num_workers(self):
# TODO: return min(self.max_workers, MAX_REPEATER_WORKERS)
return MAX_REPEATER_WORKERS

def fire_for_record(self, repeat_record):
payload = self.get_payload(repeat_record)
try:
Expand Down Expand Up @@ -473,10 +487,36 @@ def handle_response(self, result, repeat_record):

result may be either a response object or an exception
"""
never_gonna_work = (
HTTPStatus.BAD_REQUEST,
HTTPStatus.UNAUTHORIZED,
HTTPStatus.PAYMENT_REQUIRED,
HTTPStatus.FORBIDDEN,
HTTPStatus.NOT_FOUND,
HTTPStatus.METHOD_NOT_ALLOWED,
HTTPStatus.NOT_ACCEPTABLE,
HTTPStatus.PROXY_AUTHENTICATION_REQUIRED,
HTTPStatus.GONE,
HTTPStatus.LENGTH_REQUIRED,
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
HTTPStatus.REQUEST_URI_TOO_LONG,
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
HTTPStatus.EXPECTATION_FAILED,
HTTPStatus.IM_A_TEAPOT, # For completeness :)
HTTPStatus.MISDIRECTED_REQUEST,
HTTPStatus.UNPROCESSABLE_ENTITY,
HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE,
HTTPStatus.UNAVAILABLE_FOR_LEGAL_REASONS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error codes (particularly which are 4XX vs 5XX) are not obvious here. Actually, upon spot-checking, it looks like these may all be 4XX errors. Does this mean that all 5XX and a few 4XX errors will be retried? Is there a way to make that, esp. about 5XX errors, more obvious in the code?

Like maybe invert the condition: (pseudocode)

if status is 5XX or status in _4XX_RETRY_CODES:
    handle_failure
else:
    handle_payload_error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that all 5XX and a few 4XX errors will be retried?

Yes.

Like maybe invert the condition ...

Oh, nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)

if isinstance(result, Exception):
repeat_record.handle_exception(result)
elif is_response(result) and 200 <= result.status_code < 300 or result is True:
repeat_record.handle_success(result)
elif is_response(result) and result.status_code in never_gonna_work:
message = format_response(result)
repeat_record.handle_payload_error(message)
else:
repeat_record.handle_failure(result)

Expand Down Expand Up @@ -916,10 +956,21 @@ def count_by_repeater_and_state(self, domain):
return result

def count_overdue(self, threshold=timedelta(minutes=10)):
return self.filter(
next_check__isnull=False,
next_check__lt=datetime.utcnow() - threshold
).count()
overdue = datetime.utcnow() - threshold
domains = get_domains_forwarding_enabled()
repeater_not_paused = models.Q(repeater__is_paused=False)
repeater_next_attempt_overdue = models.Q(repeater__next_attempt_at__lt=overdue)
ready_to_send = models.Q(
state__in=(State.Pending, State.Fail)
)
return (
self.get_queryset()
.filter(domain__in=domains)
.filter(repeater_not_paused)
.filter(repeater_next_attempt_overdue)
.filter(ready_to_send)
.count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you profiled this query? Will we need any new indexes to make it fast?

)

def iterate(self, domain, repeater_id=None, state=None, chunk_size=1000):
db = router.db_for_read(self.model)
Expand Down Expand Up @@ -1154,9 +1205,12 @@ def fire(self, force_send=False):
except Exception as e:
log_repeater_error_in_datadog(self.domain, status_code=None,
repeater_type=self.repeater_type)
self.handle_payload_exception(e)
raise
self.handle_payload_error(str(e))
finally:
return self.state
return None

# TODO: Drop: `process_repeater` task will call `process_repeat_record` tasks directly
def attempt_forward_now(self, *, is_retry=False, fire_synchronously=False):
from corehq.motech.repeaters.tasks import (
process_repeat_record,
Expand Down Expand Up @@ -1206,8 +1260,8 @@ def handle_failure(self, response):
def handle_exception(self, exception):
self.add_client_failure_attempt(str(exception))

def handle_payload_exception(self, exception):
self.add_client_failure_attempt(str(exception), retry=False)
def handle_payload_error(self, message):
self.add_client_failure_attempt(message, retry=False)

def cancel(self):
self.state = State.Cancelled
Expand Down Expand Up @@ -1370,7 +1424,29 @@ def is_response(duck):


def domain_can_forward(domain):
"""
Checks whether ``domain`` has the privilege to forward data. Ignores
the status of the (temporary) ``PAUSE_DATA_FORWARDING`` toggle.

Used for registering repeat records.
"""
return domain and (
domain_has_privilege(domain, ZAPIER_INTEGRATION)
or domain_has_privilege(domain, DATA_FORWARDING)
)


def get_domains_forwarding_enabled():
"""
Returns a set of domains that are *currently* able to forward data.
Considers the status of the (temporary) ``PAUSE_DATA_FORWARDING``
toggle.

Used for iterating repeaters and counting overdue repeat records.
"""
domains_can_forward = {
domain for domain in Domain.get_all_names()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pull thousands of domain names on production, right? Are most of them allowed to forward?

If yes, seems like this will make some very big queries when using conditions like .filter(domain__in=domains). Have you profiled that on production?

If not, would it make sense to add a Couch view to make it efficient to grab the few domains that can forward?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are most of them allowed to forward?

Pro Plan and higher.

Have you profiled that on production?

Not yet.

An alternative is to filter out repeaters for domains that can't forward as we iterate the repeaters. That wouldn't be too bad, but it would make metrics like "overdue_repeat_records" less accurate. ("overdue_repeat_records" currently includes the repeat records of domains that have been paused. I'm undecided whether that's correct or not.)

if domain_can_forward(domain)
}
domains_paused = set(toggles.PAUSE_DATA_FORWARDING.get_enabled_domains())
return domains_can_forward - domains_paused
Loading