Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check_repeaters() task iterates repeaters #34946

Closed
wants to merge 39 commits into from
Closed

Conversation

kaapstorm
Copy link
Contributor

@kaapstorm kaapstorm commented Aug 5, 2024

Technical Summary

My oscillating level-of-annoyance wave and my availability-over-a-weekend wave overlapped, and the peak resulted in this PR.

Context:

I am working on this in my spare time because I am not resourced to this. I don't think there is a Jira ticket specifically for it.

I'm opening this as a draft for discussion. From the TODOs I have left in the code, you can see a couple of small things I plan to add / clean up. Notably:

  1. Adding a max_workers field to Repeater so that each repeater can reduce the number of outgoing requests sent in parallel to its API. If Repeater.max_workers is set to 1, payloads will be sent sequentially in chronological order.
  2. Removing attempt_forward_now() and its tests.

I have a few comments and questions. I will add them as comments so responses can be threaded.

Safety Assurance

Safety story

  • Tested locally
  • Testing on Staging

Automated test coverage

Tests included.

QA Plan

Not planned at this stage.

Migrations

Migrations not yet added, but when they are ...

  • The migrations in this code can be safely applied first independently of the code

Rollback instructions

  • This PR can be reverted after deploy with no further considerations

Labels & Review

  • Risk label is set correctly
  • The set of people pinged as reviewers is appropriate for the level of risk of the change

@kaapstorm kaapstorm added the product/invisible Change has no end-user visible impact label Aug 5, 2024
@kaapstorm
Copy link
Contributor Author

Once Repeater.max_workers is added, it may make sense to move MAX_REPEATER_WORKERS from consts to settings, so that it can vary across environments.

@kaapstorm
Copy link
Contributor Author

Is there a way for iter_ready_repeaters() to lock a repeater when it yields it, or alternatively not to yield locked repeaters? I played with locking the repeater in iter_ready_repeaters() but I couldn't unlock it in the update_repeater() task.

@kaapstorm
Copy link
Contributor Author

Currently if a remore API returns a 4XX response, we fail the repeat record and back off. Should we rather just cancel it?

@kaapstorm
Copy link
Contributor Author

There is an idea for some repeaters to implement a merge_records() method. The process_repeater() task would call this before spawning tasks to process repeat records.

For repeaters like CaseRepeater, CaseUpdateRepeater and DataSourceRepeater, the merge_records() method would merge case updates for the same case / data source, create a new repeat record with the new payload, and cancel (or delete?) the outdated repeat records.

Base automatically changed from nh/drop_unused to master August 5, 2024 18:27
Copy link
Contributor

@gherceg gherceg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting started on this Norman! I did a first pass and mainly left comments related to rate limiting.

Comment on lines 369 to 372
interval = random.uniform(*RATE_LIMITER_DELAY_RANGE)
Repeater.objects.filter(id=self.repeater_id).update(
next_attempt_at=datetime.utcnow() + interval,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention of random.uniform(*RATE_LIMITER_DELAY_RANGE) was to avoid clumping if rate limiting a large amount of records from a specific repeater simultaneously, since a different interval value would be calculated and applied to each individual record.

This code no longer does that, but that seems fine given a repeater will try one record at a time now (unless max repeaters is > 1, but then it is the project's choice).

So maybe the only thing to do is revert the random.uniform change back to a constant delay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, rate limiting at the repeater level is pretty different than the existing behavior. If a project has a high volume of records, they might be rate limited at the minute window level, but would be able to successfully send records in the following minute (in the current world). Whereas postponing the entire repeater would delay any records from being sent by up to 15 minutes, which I don't think is desirable. So maybe pushing rate limiting back down to a record by record basis is best?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should chat through this idea offline. I might not be understanding how you're thinking of implementing this, because I worry that if we rate limit at the repeat record level, then we will be iterating repeaters that don't actually have repeat records ready to send. That would result in churning through process_repeater() workers with nothing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


try:
repeater = Repeater.objects.get(domain=domain, id=repeater_id)
repeat_records = repeater.repeat_records_ready[:repeater.num_workers]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit nit picky since this is much better than the current behavior, but once we call process_repeat_record, we have no way of rate limiting at that point right? Having MAX_REPEATERS set to 7 or anything on that scale does seem sufficient in preventing a project from hogging repeater resources, but I'm concerned that whoever changes MAX_REPEATERS down the line might not take into consideration the impact on rate limiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a valid concern. I've updated the comment on MAX_REPEATER_WORKERS in 86f1cce -- Do you think that makes it clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so. The alternative would be to include rate limiting code in the process_repeat_record method as well right? Just want to understand if you have specific hesitations about taking that step?

corehq/motech/repeaters/tests/test_tasks.py Outdated Show resolved Hide resolved
corehq/motech/repeaters/tests/test_tasks.py Outdated Show resolved Hide resolved
@mkangia
Copy link
Contributor

mkangia commented Aug 6, 2024

Hey @kaapstorm

Thanks for taking out time to work on this.

I have noted down a point in the tech spec, after which I am happy to take a look at this PR in depth.

We won't consider merging this PR right? its a PoC?

@kaapstorm
Copy link
Contributor Author

We won't consider merging this PR right?

There are open questions (e.g. the way in which locking should work, see comments above).

From conversations I've had around this change, it feels to me like we have alignment on the approach that this PR takes.

So once we have answers for those questions, I'd like to flip this to a normal PR.

Copy link
Contributor

@snopoke snopoke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't done a thorough review, just a few initial comments

else process_repeat_record.s(rr.id, rr.domain)
for rr in repeat_records
]
chord(header_tasks)(update_repeater.s(repeater.repeater_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't wait for the tasks to complete right? Just wondering about the lock and what it's purpose is since it's not actually locking the sending of the records. I'm not sure locking is needed at all.

Copy link
Contributor Author

@kaapstorm kaapstorm Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iter_ready_repeaters() can yield a repeater that is still being processed.

My first take at this locked the repeater in iter_ready_repeaters(), and released it in update_repeater(), but that didn't work, because it seems you can't release a Redis lock that was locked in a different thread. (This contradicts the documentation, which says, "The Lock interface is identical to the threading.Lock so you can use it as replacement." but threading.Lock says, "Release a lock. This can be called from any thread, not only the thread which has acquired the lock.")

This doesn't wait for the tasks to complete right?

You are right. This is a bug. This code used to wait for the chord to complete, and I dropped that.

So I guess my follow-up question would be, am I wanting the right thing -- to ensure that process_repeater() is only being executed for a particular repeater by one worker at a time?

If so, I'll keep digging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my first take was actually correct, and my tests were confusing me. 7c4e3cf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(My first take was not correct. I'm not sure why this isn't working with Redis, because I'm sure it should. I have dropped locking repeaters for now. I imagine that not locking is a problem especially when a remote endpoint is timing out. Locking using Postgres, either on the Repeater model, or using a lock table, might be an alternative.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering about the lock and what it's purpose is

I discovered an important purpose for the lock. If prefetch_multiplier is not set to a small number (it is "1" on Production, and not set on Staging) then Celery will keep iterating repeaters, and queue up as many tasks as it thinks is reasonable, and it is not reasonable. On Staging (which has concurrency set to 4) it queued up 4,814,913 tasks. This blocked the queue for all but the domain with the most repeat records. Pausing data forwarding for the offending domain had no effect because the tasks were already queued, and so Celery continued to try to process its repeat records. (Purging the tasks in the queue fixed the problem.)

(Edit: I moved this comment from the wrong thread.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



def _process_repeat_record(repeat_record):
if repeat_record.state == State.Cancelled:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this check still valid?

Copy link
Contributor Author

@kaapstorm kaapstorm Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Assuming a repeater is only being processed by one worker at a time) this function isn't called unless the repeat record is either pending or failed. Repeater.repeat_records_ready ensures that. (I'll add tests to show that.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Spread retries evenly over the range defined by RATE_LIMITER_DELAY_RANGE
# with the intent of avoiding clumping and spreading load
repeat_record.postpone_by(random.uniform(*RATE_LIMITER_DELAY_RANGE))
elif repeat_record.is_queued():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should queued state still be checked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_queued() just checks self.state == State.Pending or self.state == State.Fail, and we know this must be true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless it was processed by another task before it got here right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to make my intended use clear here: 5fe6d3c (Is what I'm intending correct -- that _process_repeat_record() is called by process_pending_repeat_record() and process_failed_repeat_record(), and when they are called they are pending or failed?

This branch doesn't process repeat records synchronously / outside of process_repeater(). Is there a race condition that I haven't considered?)

"""
while True:
yielded = False
for repeater in Repeater.objects.all_ready():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a naive suggestion, but I looked at this again with Evan and was wondering why you couldn't "lock" a repeater by setting a property on the Repeater itself, something like is_processing and only return repeaters where that is False. I imagine the two cons are:

  • this isn't actually a lock, so if somehow two process_repeater calls were made for the same repeater, it would still process simultaneously
  • adds another database update on the repeaters table

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same thing. And I think we can avoid the first con by setting the property in the iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locking that sucks less implemented here: 2bb7c59

return (
self.repeat_records
.filter(state__in=(State.Pending, State.Fail))
.order_by('registered_at')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting confused on next_check and where that is being set/checked, but is it possible that we would end up in a scenario where a repeater processes the same record multiple consecutive times because it's registered_at is the oldest, but it continues to fail? This would obviously have an upper limit of attempts (4 or 6), but I think would still not be ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one payload fails repeatedly, but another succeeds, that sounds like a problem with the payload and not the remote endpoint. Trying to send it again isn't going to help. I think when a remote API responds with a 4XX error, we should immediately cancel the repeat record.

RepeatRecord.next_check is no longer relevant. It only exists because we had no way of figuring out what was happening with the remote endpoint. But now that we are iterating Repeaters instead of RepeatRecords, we can back off based on the status of the endpoint, and not the failure of the payload. Repeater back-off happens in the update_repeater() task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when a remote API responds with a 4XX error, we should immediately cancel the repeat record.

What about a 429 (Too Many Requests) response with a retry-after header, where the remote API is asking us to back off? That seems like exactly the type of situation where we'd want to retry later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimagimon dimagimon added the reindex/migration Reindex or migration will be required during or before deploy label Aug 9, 2024
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters branch 3 times, most recently from 73f811c to 06de227 Compare August 10, 2024 01:22
Comment on lines 494 to 513
HTTPStatus.BAD_REQUEST,
HTTPStatus.UNAUTHORIZED,
HTTPStatus.PAYMENT_REQUIRED,
HTTPStatus.FORBIDDEN,
HTTPStatus.NOT_FOUND,
HTTPStatus.METHOD_NOT_ALLOWED,
HTTPStatus.NOT_ACCEPTABLE,
HTTPStatus.PROXY_AUTHENTICATION_REQUIRED,
HTTPStatus.GONE,
HTTPStatus.LENGTH_REQUIRED,
HTTPStatus.REQUEST_ENTITY_TOO_LARGE,
HTTPStatus.REQUEST_URI_TOO_LONG,
HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE,
HTTPStatus.EXPECTATION_FAILED,
HTTPStatus.IM_A_TEAPOT, # For completeness :)
HTTPStatus.MISDIRECTED_REQUEST,
HTTPStatus.UNPROCESSABLE_ENTITY,
HTTPStatus.REQUEST_HEADER_FIELDS_TOO_LARGE,
HTTPStatus.UNAVAILABLE_FOR_LEGAL_REASONS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error codes (particularly which are 4XX vs 5XX) are not obvious here. Actually, upon spot-checking, it looks like these may all be 4XX errors. Does this mean that all 5XX and a few 4XX errors will be retried? Is there a way to make that, esp. about 5XX errors, more obvious in the code?

Like maybe invert the condition: (pseudocode)

if status is 5XX or status in _4XX_RETRY_CODES:
    handle_failure
else:
    handle_payload_error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that all 5XX and a few 4XX errors will be retried?

Yes.

Like maybe invert the condition ...

Oh, nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for iterating repeaters and counting overdue repeat records.
"""
domains_can_forward = {
domain for domain in Domain.get_all_names()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pull thousands of domain names on production, right? Are most of them allowed to forward?

If yes, seems like this will make some very big queries when using conditions like .filter(domain__in=domains). Have you profiled that on production?

If not, would it make sense to add a Couch view to make it efficient to grab the few domains that can forward?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are most of them allowed to forward?

Pro Plan and higher.

Have you profiled that on production?

Not yet.

An alternative is to filter out repeaters for domains that can't forward as we iterate the repeaters. That wouldn't be too bad, but it would make metrics like "overdue_repeat_records" less accurate. ("overdue_repeat_records" currently includes the repeat records of domains that have been paused. I'm undecided whether that's correct or not.)

Comment on lines 74 to 80
self.addCleanup(self.conn.delete)
self.repeater = FormRepeater(
domain=DOMAIN,
connection_settings=self.conn,
)
self.repeater.save()
self.addCleanup(self.repeater.delete)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these cleanups are unnecessary (and only make the tests slower) since SQL changes are automatically rolled back after each test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since SQL changes are automatically rolled back

Really? That's useful! Has this always been true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's a Django TestCase feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 1b0306a

@@ -69,6 +90,7 @@ def setUp(self) -> None:
connection_settings=self.conn,
)
r.save()
self.addCleanup(r.delete)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_attempt_at = models.DateTimeField(null=True, blank=True)
# TODO: max_workers = models.IntegerField(default=1)
max_workers = models.IntegerField(default=MAX_REPEATER_WORKERS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of changing this default to zero and changing the num_workers calculation below to self.max_workers or MAX_REPEATER_WORKERS? That way if we change the constant in code it will not require a database schema change as well.

I assume it will be rare for the max workers to be overridden on a repeater. Is that a good assumption? In fact, I'm curious when we ever expect to override the code constant with a database value? Currently it looks like a field that is not set anywhere in code, which implies that it is a custom setting for developers. Do I have that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of changing this default to zero and changing the num_workers calculation below to self.max_workers or MAX_REPEATER_WORKERS?

Excellent idea.

In fact, I'm curious when we ever expect to override the code constant with a database value?

I am thinking of two scenarios:

  1. The ability to ensure that payloads are always send in chronological order
  2. Remote endpoints that are unable to support MAX_REPEATER_WORKERS concurrent requests

At first I thought that a repeater should not be allowed to exceed the maximum. But if we rename MAX_REPEATER_WORKERS to DEFAULT_REPEATER_WORKERS that could better communicate that a repeater can exceed that value, if the endpoint can handle large numbers of workers. A high-volume repeater sending to a service built for that kind of volume (like Zapier) would be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eb1f424

That's a fixup commit. I'd like to rebase before flipping this to a normal PR, to clean up the reverted commits and the migrations (which will probably conflict with master soon).

@@ -29,13 +29,15 @@ class State(IntegerChoices):
Success = 4, _('Succeeded')
Cancelled = 8, _('Cancelled')
Empty = 16, _('Empty') # There was nothing to send. Implies Success.
InvalidPayload = 32, _('Invalid Payload') # Implies Cancelled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of adding this status in a separate PR? Seems like we could do it before we change repeater processing logic, and it would simplify this PR, which is getting pretty large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea!

@kaapstorm kaapstorm mentioned this pull request Aug 27, 2024
4 tasks
@kaapstorm
Copy link
Contributor Author

Closing. Feedback is incorporated in PR #35033

@kaapstorm kaapstorm closed this Aug 27, 2024
@kaapstorm kaapstorm deleted the nh/iter_repeaters branch January 24, 2025 21:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
product/invisible Change has no end-user visible impact reindex/migration Reindex or migration will be required during or before deploy
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants