Skip to content

Commit

Permalink
Mirror write WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
amcmahon-rh committed Jan 24, 2025
1 parent 1183517 commit 122d916
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 0 deletions.
22 changes: 22 additions & 0 deletions exodus_gw/aws/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def __init__(
from_date: str,
env_obj: Environment | None = None,
deadline: datetime | None = None,
mirror_writes: bool = True
):
self.env = env
self.settings = settings
self.from_date = from_date
self.env_obj = env_obj or get_environment(env)
self.deadline = deadline
self.mirror_writes = mirror_writes
self.client = DynamoDBClientWrapper(self.env_obj.aws_profile).client
self._lock = Lock()
self._definitions = None
Expand Down Expand Up @@ -208,6 +210,13 @@ def query_definitions(self) -> dict[str, Any]:
out = json.loads(item_json)
return out

def should_mirror_write(self, uri):
# We only want to mirror writes for release ver aliases. Recalculating
# the aliases completely is a bit inefficient, but I'd rather not
# duplicate any alias logic.
return self.mirror_writes and \
uri_alias(uri, self._aliases(["releasever_alias"]))[0] != uri

def create_request(
self,
items: list[models.Item],
Expand Down Expand Up @@ -255,6 +264,19 @@ def create_request(
}
}
)
if self.should_mirror_write(item.web_uri):
request[table_name].append(
{
"PutRequest": {
"Item": {
"from_date": {"S": from_date},
"web_uri": {"S": item.web_uri},
"object_key": {"S": item.object_key},
"content_type": {"S": item.content_type},
}
}
}
)
return request

def create_config_request(self, config):
Expand Down
2 changes: 2 additions & 0 deletions exodus_gw/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ class Settings(BaseSettings):
s3_pool_size: int = 3
"""Number of S3 clients to cache"""

mirror_writes_enabled: bool = True
"""Whether both the original url and release var alias are written during phase 1 commits."""
model_config = SettingsConfigDict(env_prefix="exodus_gw_")


Expand Down
9 changes: 9 additions & 0 deletions exodus_gw/worker/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,14 @@ def dynamodb(self):
self.from_date,
self.env_obj,
self.task.deadline,
self.should_mirror_writes,
)
return self._dynamodb

@property
def should_mirror_writes(self):
return False

@property
def task_ready(self) -> bool:
task = self.task
Expand Down Expand Up @@ -446,6 +451,10 @@ class CommitPhase1(CommitBase):
# phase1 commit is allowed to proceed in either of these states.
PUBLISH_STATES = [PublishStates.committing, PublishStates.pending]

@property
def should_mirror_writes(self):
return self.settings.mirror_writes_enabled

@property
def item_select(self):
# Query for items to be handled by phase1 commit.
Expand Down
215 changes: 215 additions & 0 deletions tests/aws/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1/repo/repomd.xml"
},
"object_key": {
"S": "3f449eb3b942af58e9aca4c1cffdef89"
"c3f1552c20787ae8c966767a1fedd3a5"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
Expand All @@ -78,6 +93,21 @@
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1/repo/.__exodus_autoindex"
},
"object_key": {
"S": "5891b5b522d5df086d0ff0b110fbd9d2"
"1bb4fc7163af34d08286a2e846f6be03"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
],
},
),
Expand Down Expand Up @@ -145,6 +175,191 @@ def test_batch_write(
)




@pytest.mark.parametrize(
"mirror,expected_request",
[
(
True,
{
"my-table": [
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/some/path"},
"object_key": {
"S": "0bacfc5268f9994065dd858ece3359fd"
"7a99d82af5be84202b8e84c2a5b07ffa"
},
# Note these timestamps come from the canned values
# on fake_publish.items
"from_date": {"S": "2023-10-04 03:52:00"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/other/path"},
"object_key": {
"S": "e448a4330ff79a1b20069d436fae9480"
"6a0e2e3a6b309cd31421ef088c6439fb"
},
"from_date": {"S": "2023-10-04 03:52:01"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/repomd.xml"
},
"object_key": {
"S": "3f449eb3b942af58e9aca4c1cffdef89"
"c3f1552c20787ae8c966767a1fedd3a5"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1/repo/repomd.xml"
},
"object_key": {
"S": "3f449eb3b942af58e9aca4c1cffdef89"
"c3f1552c20787ae8c966767a1fedd3a5"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex"
},
"object_key": {
"S": "5891b5b522d5df086d0ff0b110fbd9d2"
"1bb4fc7163af34d08286a2e846f6be03"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1/repo/.__exodus_autoindex"
},
"object_key": {
"S": "5891b5b522d5df086d0ff0b110fbd9d2"
"1bb4fc7163af34d08286a2e846f6be03"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
],
},
),
(
False,
{
"my-table": [
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/some/path"},
"object_key": {
"S": "0bacfc5268f9994065dd858ece3359fd"
"7a99d82af5be84202b8e84c2a5b07ffa"
},
# Note these timestamps come from the canned values
# on fake_publish.items
"from_date": {"S": "2023-10-04 03:52:00"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/other/path"},
"object_key": {
"S": "e448a4330ff79a1b20069d436fae9480"
"6a0e2e3a6b309cd31421ef088c6439fb"
},
"from_date": {"S": "2023-10-04 03:52:01"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/repomd.xml"
},
"object_key": {
"S": "3f449eb3b942af58e9aca4c1cffdef89"
"c3f1552c20787ae8c966767a1fedd3a5"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
{
"PutRequest": {
"Item": {
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex"
},
"object_key": {
"S": "5891b5b522d5df086d0ff0b110fbd9d2"
"1bb4fc7163af34d08286a2e846f6be03"
},
"from_date": {"S": "2023-10-04 03:52:02"},
"content_type": {"S": None},
}
}
},
],
},
),
],
ids=["Mirror-Enabled", "Mirror-Disabled"],
)
def test_batch_write_mirror(
mock_boto3_client, fake_publish, mirror, expected_request
):
ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC, mirror_writes=mirror)

request = ddb.create_request(fake_publish.items, delete=False)

# Represent successful write/delete of all items to the table.
mock_boto3_client.batch_write_item.return_value = {"UnprocessedItems": {}}

ddb.batch_write(request)

# Should've requested write of all items.
mock_boto3_client.batch_write_item.assert_called_once_with(
RequestItems=expected_request
)

def test_batch_write_item_limit(mock_boto3_client, fake_publish, caplog):
items = fake_publish.items * 9
ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC)
Expand Down

0 comments on commit 122d916

Please sign in to comment.