Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Repeaters, Part 1 #35033

Open
wants to merge 57 commits into
base: master
Choose a base branch
from
Open

Process Repeaters, Part 1 #35033

wants to merge 57 commits into from

Conversation

kaapstorm
Copy link
Contributor

@kaapstorm kaapstorm commented Aug 24, 2024

Technical Summary

At the moment, repeat records are processed independently, and this prevents us from making better decisions about the APIs that we are sending data to. e.g. If a remote server is offline, we will keep trying to send new payloads to it even if all the previous payloads failed.

The approach taken in this PR intends to process repeaters smarter, and also to share repeat record queue workers more fairly across domains. It honors rate limiting by skipping repeaters or domains when they are rate-limited.

Context:

This branch implements the feedback given in the draft PR #34946 with commits squashed and rebased. 🐟 🐠 🐬

Almost all new functionality is behind a feature flag. The one change that is not is that up to once a minute (less often if there are repeaters being processed) we loop through the results of RepeaterManager.all_ready(). (That happens here.)

Enabling the "process_repeaters" feature flag will switch a domain from the current approach to the new approach. The domain can be switched back to the current approach without consequences.

There are two migrations in this PR. I have kept them separate to make it easier to modify the second one. I am looking for confirmation that the indexes in Index fields used by RepeaterManager.all_ready() are correct.

Currently there are no new Datadog metrics in this PR. I expect that as we test this more / switch domains over, we will add metrics in this PR and follow-up PRs.

Feature Flag

process_repeaters

Safety Assurance

Safety story

  • Tested locally and on Staging.
  • Behavior is behind a feature flag.

Automated test coverage

Includes coverage. Please highlight any gaps.

QA Plan

QA requested. Ticket pending.

Migrations

  • The migrations in this code can be safely applied first independently of the code

Rollback instructions

  • This PR can be reverted after deploy with no further considerations

Labels & Review

  • Risk label is set correctly
  • The set of people pinged as reviewers is appropriate for the level of risk of the change

@dimagimon dimagimon added the reindex/migration Reindex or migration will be required during or before deploy label Aug 24, 2024
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch 7 times, most recently from e06a034 to e83e925 Compare August 27, 2024 14:39
@kaapstorm kaapstorm added the product/feature-flag Change will only affect users who have a specific feature flag enabled label Aug 27, 2024
@kaapstorm kaapstorm marked this pull request as ready for review August 27, 2024 16:52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indexes on boolean fields typically don't get used but a useful approach can be to use a partial index to match the query you are trying to optimize. In this case I think it it may work to create a partial index on next_attempt_at with a condition for is_paused=False.

Copy link
Contributor Author

@kaapstorm kaapstorm Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like so? d8d9642

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Still worth testing that to make sure the query uses it.

corehq/motech/repeaters/tasks.py Outdated Show resolved Hide resolved
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch from e83e925 to db093d0 Compare August 28, 2024 17:01
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch from db093d0 to 48c3d7c Compare August 29, 2024 00:13
Copy link
Contributor

@gherceg gherceg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass, didn't dig into tests much.

corehq/motech/repeaters/models.py Outdated Show resolved Hide resolved
Comment on lines 246 to 247
for domain, repeater_id, lock_token in iter_ready_repeater_ids_forever():
process_repeater.delay(domain, repeater_id, lock_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be checking how long this task has been running for and exit if we are about to exceed the process_repeater_lock timeout value? I assume it is very unlikely that there are always repeaters that we can iterate over since once we kick them off, they will no longer be returned in this query, but is it safe to still have a check of some sort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be checking how long this task has been running for and exit if we are about to exceed the process_repeater_lock timeout value?

process_repeater() should take a maximum of five minutes, if that repeater's requests are timing out. The lock timeout is three hours.

I expect we would only exceed the timeout if Celery was killed and the update_repeater() task was never called. So the timeout is intended to prevent a repeater from being locked forever.

I think it could be useful to use Datadog to monitor how long the task takes, but probably only to identify problematic endpoints, not to exit the task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not be understanding, but I'm referring to the process_repeaters task and the process_repeater_lock with the 24 hour timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my confusion. I didn't read your question properly.

I wondered about that timeout too, because there is no problem if this loop just keeps going for more than 24 hours as long as new repeat records are being sent in a reasonable time.

What do you think of not having a timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corehq/motech/repeaters/tasks.py Outdated Show resolved Hide resolved
# repeater can use to send repeat records at the same time. This is a
# guardrail to prevent one repeater from hogging repeat_record_queue
# workers and to ensure that repeaters are iterated fairly.
MAX_REPEATER_WORKERS = 144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a function of # of celery workers right? It is out of scope of this PR, but it would be pretty cool if we could figure out how many workers celery has dedicated to a specific queue and set this to some percentage of that (looks like that is 50% at the moment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. It would be excellent to be able to calculate this value.

(It is 144 here because that was the number of workers at the time of writing. It was recently doubled.)

migrations.AddIndex(
model_name="repeatrecord",
index=models.Index(
condition=models.Q(("state__in", (1, 2))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what are the computational differences in Postgres for state IN (1, 2) vs state = 1 OR state = 2? I hope they are virtually the same, but it would be a shame if IN (...) does something complicated with a loop.

corehq/motech/repeaters/migrations/0016_add_indexes.py Outdated Show resolved Hide resolved
corehq/motech/repeaters/tests/test_tasks.py Show resolved Hide resolved
corehq/motech/repeaters/tasks.py Outdated Show resolved Hide resolved
set per repeater) and spawns tasks to send them in parallel. The results
of all the send attempts are passed to ``update_repeater()``. If all the
send attempts failed, the **repeater** (not the repeat record) is backed
off. If any send attempts succeeded, the backoff is reset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine a scenario where the first attempt usually succeeds, but subsequent attempts usually fail. I think we would ideally want to reduce the batch size for such a fragile repeater endpoint, but I assume we have no automated way of doing that right now. Do we have any way of detecting that scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any way of detecting that scenario?

Not yet, but once this PR is merged, we will be tying up a few loose ends, and this is one of them. If we get a 429 response, we will automatically reduce Repeater.num_workers. This doesn't specifically take your example into account, but if the remote API uses 429 responses, then it will handle, say, the first 5 attempts succeeding and the last two failing.

Comment on lines 353 to 360
if lock.acquire(blocking=False, token=lock_token):
yielded = True
yield domain, repeater_id, lock_token
else:
metrics_counter(
'commcare.repeaters.process_repeaters.repeater_locked',
tags={'domain': domain},
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@millerdev @gherceg I wonder whether you can help my to understand what is happening here on Staging.

Take a look at the "Repeater locked by domain" chart on Datadog for a few hours last night.

Staging has one Celery machine, and the repeat records queue has 4 gevent workers. I created 10 repeaters in each of 5 domains, and gave them 1000 repeat records each.

What I expected: for domain, repeater_id in iter_ready_repeater_ids_once() would loop through the 50 repeaters. For each repeater, this loop would lock the repeater, then the workers would send 7 payloads, and then update_repeater() would unlock the repeater (8 tasks across 4 workers). So by the time the outer while True loop got back to the same repeater it would have been unlocked.

(That is how that works when testing locally with only one worker, and CELERY_TASK_ALWAYS_EAGER = True.)

But Datadog shows that that is not what is happening on Staging. I am really scratching my head to understand why all the repeaters are still locked when the outer loop comes back around.

When this function returns, and then gets called again 5 minutes later by the process_repeaters() task, then the repeaters are unlocked.

Is update_repeater() never getting called, maybe, and the locks are timing out every ten minutes, so only half the repeaters are processed every five minutes? 🤔 Maybe. But if that is what is happening, then why does update_repeater() get called locally (and in unit tests) but not in practice on Staging?

Is there something about Celery or Staging that I'm missing?

Or can you spot something in the code? Although I wasn't logging repeater lock-out at the time, I am pretty sure it was this commit that changed this behavior: 9c8d9fb -- Prior to returning only distinct repeater IDs, the repeaters did get unlocked, and the outer loop did yield them again.

... Is it simply that Celery chord takes a while?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've figured out what I'm missing:

The process_repeaters() task (note the final "s") spawns an asynchronous process_repeater() task for every repeater. This takes next to no time at all. Then it loops again, and of course all the repeaters are still locked, because not a single sent payload has got a reply yet. So it exits and waits a minute.

@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch from b1dcbbd to 79377f8 Compare January 10, 2025 17:04
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch 4 times, most recently from d03740c to c26109d Compare January 11, 2025 05:10
@kaapstorm kaapstorm force-pushed the nh/iter_repeaters_1 branch from c26109d to 988ed56 Compare January 11, 2025 06:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
product/feature-flag Change will only affect users who have a specific feature flag enabled
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants