Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check_repeaters() task iterates repeaters #34946

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5a0e77a
Drop soft asserts
kaapstorm Aug 4, 2024
127912c
Add TODOs
kaapstorm Aug 3, 2024
f2b3aa0
`check_repeaters()` uses `iter_ready_repeaters()`
kaapstorm Aug 3, 2024
1267f51
Test `iter_ready_repeaters()`
kaapstorm Aug 3, 2024
1714f62
`process_repeater()` task
kaapstorm Aug 3, 2024
c6f603b
Update tests
kaapstorm Aug 4, 2024
3d40f91
Add tests
kaapstorm Aug 4, 2024
3d4e4cd
TestIterReadyRepeater extends SimpleTestCase
kaapstorm Aug 6, 2024
d90a5c1
Fix TestUpdateRepeater
kaapstorm Aug 6, 2024
86f1cce
Explain the purpose of MAX_REPEATER_WORKERS
kaapstorm Aug 6, 2024
3315c45
Update TestAppStructureRepeater test
kaapstorm Aug 6, 2024
7c4e3cf
Fix repeater locking
kaapstorm Aug 6, 2024
a916c34
Drop repeater lock
kaapstorm Aug 6, 2024
bb17648
Don't retry payload errors
kaapstorm Aug 7, 2024
3047c08
Fix `RepeatRecord.count_overdue()`
kaapstorm Aug 7, 2024
5370a17
Some client errors need a retry
kaapstorm Aug 8, 2024
f2f50e9
Polling overdue every 5 minutes seems enough
kaapstorm Aug 9, 2024
976d297
Improve query by filtering domains
kaapstorm Aug 9, 2024
a922568
Add metric for one loop over repeaters
kaapstorm Aug 9, 2024
1bdca83
Merge branch 'master' into nh/iter_repeaters
kaapstorm Aug 9, 2024
6458b3e
Index filtered fields, add `max_workers`
kaapstorm Aug 9, 2024
369c96e
A partial implementation of a lock using Postgres
kaapstorm Aug 9, 2024
74e6676
Lock repeater
kaapstorm Aug 9, 2024
5e4fbca
Add InvalidPayload state
kaapstorm Aug 10, 2024
5fe6d3c
Make repeat_record.state explicit
kaapstorm Aug 10, 2024
407d60d
Fix tests
kaapstorm Aug 14, 2024
7a02d6a
Revert "Lock repeater"
kaapstorm Aug 13, 2024
62369a4
Revert "A partial implementation of a lock using Postgres"
kaapstorm Aug 13, 2024
2bb7c59
Use Redis Lock
kaapstorm Aug 13, 2024
7d5abeb
Round-robin repeaters by domain
kaapstorm Aug 12, 2024
b8543a8
Skip rate-limited repeaters
kaapstorm Aug 12, 2024
2600e94
Make response status clearer
kaapstorm Aug 14, 2024
1b0306a
Use automatic rollback
kaapstorm Aug 14, 2024
d333eeb
Being locked out is not a problem
kaapstorm Aug 13, 2024
6b79a5e
Drop `attempt_forward_now()`
kaapstorm Aug 12, 2024
61a0a00
Drop `fire_synchronously` param
kaapstorm Aug 12, 2024
04c8177
Drop `send_request()`
kaapstorm Aug 12, 2024
f586d2c
Drop `get_payload()`
kaapstorm Aug 12, 2024
eb1f424
fixup! Index filtered fields, add `max_workers`
kaapstorm Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions corehq/apps/app_manager/tests/test_repeater.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from django.test import TestCase
from django.test.client import Client

from unittest.mock import patch

from corehq.apps.accounting.models import SoftwarePlanEdition
from corehq.apps.accounting.tests.utils import DomainSubscriptionMixin
from corehq.apps.accounting.utils import clear_plan_version_cache
Expand Down Expand Up @@ -64,18 +62,3 @@ def test_repeat_record_created(self):
later = datetime.utcnow() + timedelta(hours=48 + 1)
repeat_records = RepeatRecord.objects.filter(domain=self.domain, next_check__lt=later)
self.assertEqual(len(repeat_records), 1)

def test_repeat_record_forwarded(self):
"""
When an application with a repeater is saved, then HQ should try to forward the repeat record
"""
self.app_structure_repeater = AppStructureRepeater(domain=self.domain, connection_settings=self.conn)
self.app_structure_repeater.save()
self.addCleanup(self.app_structure_repeater.delete)

with patch('corehq.motech.repeaters.models.simple_request') as mock_fire:
self.application = Application(domain=self.domain)
self.application.save()
self.addCleanup(self.application.delete)

self.assertEqual(mock_fire.call_count, 1)
1 change: 1 addition & 0 deletions corehq/apps/domain/tests/test_deletion_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
'icds_reports',
'notifications',
'oauth2_provider',
'pg_lock',
'phonelog', # these are deleted after 60 days regardless
'pillow_retry',
'pillowtop',
Expand Down
1 change: 1 addition & 0 deletions corehq/apps/dump_reload/tests/test_dump_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"otp_static.StaticDevice",
"otp_static.StaticToken",
"otp_totp.TOTPDevice",
"pg_lock.PGLock",
"phone.SyncLogSQL", # not required and can be a lot of data
"pillow_retry.PillowError",
"pillowtop.DjangoPillowCheckpoint",
Expand Down
Empty file added corehq/apps/pg_lock/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions corehq/apps/pg_lock/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from django.apps import AppConfig


class PGLockConfig(AppConfig):
name = 'corehq.apps.pg_lock'
23 changes: 23 additions & 0 deletions corehq/apps/pg_lock/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.14 on 2024-08-09 18:49

from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="PGLock",
fields=[
(
"key",
models.CharField(max_length=255, primary_key=True, serialize=False),
),
("expires_at", models.DateTimeField(blank=True, null=True)),
],
),
]
Empty file.
82 changes: 82 additions & 0 deletions corehq/apps/pg_lock/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
A partial implementation of a lock using PostgreSQL. It does not
implement blocking.

The intention of using Postgres for locking is to have a lock that can
be used for coordinating across multiple worker processes.
"""
from contextlib import contextmanager
from datetime import datetime, timedelta

from django.db import models, IntegrityError


class PGLock(models.Model):
key = models.CharField(max_length=255, primary_key=True)
expires_at = models.DateTimeField(null=True, blank=True)


class Lock:
def __init__(self, key):
self.key = key

def __str__(self):
return self.key

@property
def name(self): # Used by MeteredLock
return self.key

def acquire(self, blocking=True, timeout=-1):
if blocking:
raise NotImplementedError("Blocking is not supported")

if timeout >= 0:
expires_at = datetime.utcnow() + timedelta(seconds=timeout)
else:
expires_at = None

try:
pg_lock, created = PGLock.objects.get_or_create(
key=self.key,
defaults={'expires_at': expires_at},
)
if created:
return True
if (
pg_lock.expires_at is not None
and pg_lock.expires_at <= datetime.utcnow()
):
# Lock has expired
pg_lock.expires_at = expires_at
pg_lock.save()
return True
return False
except IntegrityError:
return False

def release(self):
PGLock.objects.filter(key=self.key).delete()

def locked(self):
return (
PGLock.objects
.filter(key=self.key)
.filter(
models.Q(expires_at__isnull=True)
| models.Q(expires_at__gt=datetime.utcnow())
)
.exists()
)


@contextmanager
def lock(key, timeout=-1):
lock = Lock(key)
acquired = False
try:
acquired = lock.acquire(blocking=False, timeout=timeout)
yield acquired
finally:
if acquired:
lock.release()
Empty file.
85 changes: 85 additions & 0 deletions corehq/apps/pg_lock/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from datetime import datetime
from unittest import expectedFailure
from unittest.mock import patch

from django.test import TestCase

from corehq.apps.pg_lock.models import Lock, lock
from dimagi.utils.couch import get_redis_lock, get_pg_lock


class PGLockTests(TestCase):

def test_acquires_lock_when_not_locked(self):
lock_instance = Lock('test_lock')
acquired = lock_instance.acquire(blocking=False)
self.assertTrue(acquired)
self.assertTrue(lock_instance.locked())
lock_instance.release()

def test_does_not_acquire_lock_when_already_locked(self):
lock_instance = Lock('test_lock')
lock_instance.acquire(blocking=False)
another_lock_instance = Lock('test_lock')
acquired = another_lock_instance.acquire(blocking=False)
self.assertFalse(acquired)
lock_instance.release()

def test_releases_lock(self):
lock_instance = Lock('test_lock')
lock_instance.acquire(blocking=False)
lock_instance.release()
self.assertFalse(lock_instance.locked())

def test_releases_lock_not_acquired(self):
lock_instance = Lock('test_lock')
lock_instance.acquire(blocking=False)
another_lock_instance = Lock('test_lock')
another_lock_instance.release()
self.assertFalse(lock_instance.locked())

@patch('corehq.apps.pg_lock.models.datetime')
def test_acquires_lock_after_expiration(self, mock_datetime):
lock_instance = Lock('test_lock')
mock_datetime.utcnow.return_value = datetime(2023, 1, 1, 12, 0, 0)
lock_instance.acquire(blocking=False, timeout=1)
mock_datetime.utcnow.return_value = datetime(2023, 1, 1, 12, 0, 2)
acquired = lock_instance.acquire(blocking=False)
self.assertTrue(acquired)
lock_instance.release()

def test_context_manager_acquires_and_releases_lock(self):
with lock('test_lock') as acquired:
self.assertTrue(acquired)
lock_instance = Lock('test_lock')
self.assertTrue(lock_instance.locked())
lock_instance = Lock('test_lock')
self.assertFalse(lock_instance.locked())


class TestLockWorkers(TestCase):

@expectedFailure
def test_release_redis_lock_not_acquired(self):
# Worker 1:
lock1 = get_redis_lock('test-key', timeout=1, name='test-name')
self.assertTrue(lock1.acquire(blocking=False))
self.assertTrue(lock1.locked())

# Worker 2:
lock2 = get_redis_lock('test-key', timeout=1, name='test-name')
lock2.release() # redis.exceptions.LockError: Cannot release an unlocked lock

self.assertFalse(lock1.locked())

def test_release_pg_lock_not_acquired(self):
# Worker 1:
lock1 = get_pg_lock('test-key', name='test-name')
self.assertTrue(lock1.acquire(blocking=False))
self.assertTrue(lock1.locked())

# Worker 2:
lock2 = get_pg_lock('test-key', name='test-name')
lock2.release()

self.assertFalse(lock1.locked())
1 change: 1 addition & 0 deletions corehq/apps/reports/filters/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def options(self):
State.Pending,
State.Cancelled,
State.Fail,
State.InvalidPayload,
]]


Expand Down
6 changes: 6 additions & 0 deletions corehq/ex-submodules/dimagi/utils/couch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ def get_redis_lock(key, timeout=None, name=None, track_unreleased=True, **kw):
return MeteredLock(lock, name, track_unreleased)


def get_pg_lock(key, name, track_unreleased=True):
from corehq.apps.pg_lock.models import Lock
lock = Lock(key)
return MeteredLock(lock, name, track_unreleased)


def acquire_lock(lock, degrade_gracefully, **kwargs):
acquired = False
try:
Expand Down
10 changes: 9 additions & 1 deletion corehq/motech/repeaters/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
MIN_RETRY_WAIT = timedelta(minutes=60)
RATE_LIMITER_DELAY_RANGE = (timedelta(minutes=0), timedelta(minutes=15))
CHECK_REPEATERS_INTERVAL = timedelta(minutes=5)
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT
CHECK_REPEATERS_PARTITION_COUNT = settings.CHECK_REPEATERS_PARTITION_COUNT # TODO: Drop
CHECK_REPEATERS_KEY = 'check-repeaters-key'
# Number of attempts to an online endpoint before cancelling payload
MAX_ATTEMPTS = 3
# Number of exponential backoff attempts to an offline endpoint
MAX_BACKOFF_ATTEMPTS = 6
# The maximum number of workers that one repeater can use to send repeat
# records at the same time. (In other words, HQ's capacity to DDOS
# attack a remote API endpoint.) This is a guardrail to prevent one
# domain from hogging repeat record queue workers and to ensure that
# repeaters are iterated fairly.
MAX_REPEATER_WORKERS = 7


class State(IntegerChoices):
Expand All @@ -23,13 +29,15 @@ class State(IntegerChoices):
Success = 4, _('Succeeded')
Cancelled = 8, _('Cancelled')
Empty = 16, _('Empty') # There was nothing to send. Implies Success.
InvalidPayload = 32, _('Invalid Payload') # Implies Cancelled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of adding this status in a separate PR? Seems like we could do it before we change repeater processing logic, and it would simplify this PR, which is getting pretty large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea!



RECORD_PENDING_STATE = State.Pending
RECORD_SUCCESS_STATE = State.Success
RECORD_FAILURE_STATE = State.Fail
RECORD_CANCELLED_STATE = State.Cancelled
RECORD_EMPTY_STATE = State.Empty
RECORD_INVALIDPAYLOAD_STATE = State.InvalidPayload


class UCRRestrictionFFStatus(IntegerChoices):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ def create_case_repeater_register(repeater, domain, payload):
'domain': domain,
'doc_type': repeater.repeater_type
})
repeat_record.attempt_forward_now()
# TODO: No, the repeater sends the repeat record.
# repeat_record.attempt_forward_now()
return repeat_record


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("repeaters", "0012_formexpressionrepeater_arcgisformexpressionrepeater"),
]

operations = [
migrations.AddField(
model_name="repeater",
name="max_workers",
field=models.IntegerField(default=7),
),
migrations.AlterField(
model_name="repeater",
name="is_paused",
field=models.BooleanField(db_index=True, default=False),
),
migrations.AlterField(
model_name="repeater",
name="next_attempt_at",
field=models.DateTimeField(blank=True, db_index=True, null=True),
),
migrations.AlterField(
model_name="repeatrecord",
name="domain",
field=models.CharField(db_index=True, max_length=126),
),
migrations.AlterField(
model_name="repeatrecord",
name="state",
field=models.PositiveSmallIntegerField(
choices=[
(1, "Pending"),
(2, "Failed"),
(4, "Succeeded"),
(8, "Cancelled"),
(16, "Empty"),
],
db_index=True,
default=1,
),
),
]
Loading
Loading