diff --git a/exodus_gw/aws/dynamodb.py b/exodus_gw/aws/dynamodb.py index afbb3aff..b2705ea4 100644 --- a/exodus_gw/aws/dynamodb.py +++ b/exodus_gw/aws/dynamodb.py @@ -25,12 +25,14 @@ def __init__( from_date: str, env_obj: Environment | None = None, deadline: datetime | None = None, + mirror_writes: bool = False ): self.env = env self.settings = settings self.from_date = from_date self.env_obj = env_obj or get_environment(env) self.deadline = deadline + self.mirror_writes = mirror_writes self.client = DynamoDBClientWrapper(self.env_obj.aws_profile).client self._lock = Lock() self._definitions = None @@ -208,6 +210,29 @@ def query_definitions(self) -> dict[str, Any]: out = json.loads(item_json) return out + def uris_for_item(self, item) -> list[str]: + """Returns all URIs to be written for the given item. + + In practice, always returns either one or two URIs depending on + configured aliases and other settings, though the caller should + assume any number of URIs. + """ + + # Resolve aliases. We only write to the deepest path + # after all alias resolution, hence only using the + # first result from uri_alias. + uris = [uri_alias(item.web_uri, self.aliases_for_write)[0]] + + # We only want to mirror writes for release ver aliases. Recalculating + # the aliases completely is a bit inefficient, but I'd rather not + # duplicate any alias logic. + if self.mirror_writes and \ + uri_alias( + item.web_uri, self._aliases(["releasever_alias"]) + )[0] != item.web_uri: + uris.append(item.web_uri) + return uris + def create_request( self, items: list[models.Item], @@ -216,8 +241,6 @@ def create_request( """Create the dictionary structure expected by batch_write_item.""" table_name = self.env_obj.table request: dict[str, list[Any]] = {table_name: []} - uri_aliases = self.aliases_for_write - for item in items: # Items carry their own from_date. This effectively resolves # conflicts in the case of two publishes updating the same web_uri @@ -226,35 +249,32 @@ def create_request( # updated timestamp. from_date = str(item.updated) - # Resolve aliases. We only write to the deepest path - # after all alias resolution, hence only using the - # first result from uri_alias. - web_uri = uri_alias(item.web_uri, uri_aliases)[0] - - if delete: - request[table_name].append( - { - "DeleteRequest": { - "Key": { - "from_date": {"S": from_date}, - "web_uri": {"S": web_uri}, + for web_uri in self.uris_for_item(item): + if delete: + request[table_name].append( + { + "DeleteRequest": { + "Key": { + "from_date": {"S": from_date}, + "web_uri": {"S": web_uri}, + } } } - } - ) - else: - request[table_name].append( - { - "PutRequest": { - "Item": { - "from_date": {"S": from_date}, - "web_uri": {"S": web_uri}, - "object_key": {"S": item.object_key}, - "content_type": {"S": item.content_type}, + ) + else: + request[table_name].append( + { + "PutRequest": { + "Item": { + "from_date": {"S": from_date}, + "web_uri": {"S": web_uri}, + "object_key": {"S": item.object_key}, + "content_type": {"S": item.content_type}, + } } } - } - ) + ) + return request def create_config_request(self, config): diff --git a/exodus_gw/settings.py b/exodus_gw/settings.py index 381a9150..50151af9 100644 --- a/exodus_gw/settings.py +++ b/exodus_gw/settings.py @@ -475,6 +475,10 @@ class Settings(BaseSettings): s3_pool_size: int = 3 """Number of S3 clients to cache""" + mirror_writes_enabled: bool = True + """Whether both the original url and releasever alias are written during + phase 1 commits.""" + model_config = SettingsConfigDict(env_prefix="exodus_gw_") diff --git a/exodus_gw/worker/publish.py b/exodus_gw/worker/publish.py index 0365ef63..c947435a 100644 --- a/exodus_gw/worker/publish.py +++ b/exodus_gw/worker/publish.py @@ -186,9 +186,14 @@ def dynamodb(self): self.from_date, self.env_obj, self.task.deadline, + self.should_mirror_writes, ) return self._dynamodb + @property + def should_mirror_writes(self): + return False + @property def task_ready(self) -> bool: task = self.task @@ -446,6 +451,10 @@ class CommitPhase1(CommitBase): # phase1 commit is allowed to proceed in either of these states. PUBLISH_STATES = [PublishStates.committing, PublishStates.pending] + @property + def should_mirror_writes(self): + return self.settings.mirror_writes_enabled + @property def item_select(self): # Query for items to be handled by phase1 commit. diff --git a/tests/aws/test_dynamodb.py b/tests/aws/test_dynamodb.py index 32d223df..4bd1e624 100644 --- a/tests/aws/test_dynamodb.py +++ b/tests/aws/test_dynamodb.py @@ -7,6 +7,7 @@ import pytest from botocore.exceptions import EndpointConnectionError +from exodus_gw.models import Publish, Item from exodus_gw.aws import dynamodb from exodus_gw.settings import Settings @@ -63,6 +64,21 @@ } } }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1/repo/repomd.xml" + }, + "object_key": { + "S": "3f449eb3b942af58e9aca4c1cffdef89" + "c3f1552c20787ae8c966767a1fedd3a5" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, { "PutRequest": { "Item": { @@ -78,6 +94,21 @@ } } }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1/repo/.__exodus_autoindex" + }, + "object_key": { + "S": "5891b5b522d5df086d0ff0b110fbd9d2" + "1bb4fc7163af34d08286a2e846f6be03" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, ], }, ), @@ -111,6 +142,16 @@ } } }, + { + "DeleteRequest": { + "Key": { + "web_uri": { + "S": "/content/testproduct/1/repo/repomd.xml" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + } + } + }, { "DeleteRequest": { "Key": { @@ -121,6 +162,16 @@ } } }, + { + "DeleteRequest": { + "Key": { + "web_uri": { + "S": "/content/testproduct/1/repo/.__exodus_autoindex" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + } + } + }, ], }, ), @@ -130,7 +181,7 @@ def test_batch_write( mock_boto3_client, fake_publish, delete, expected_request ): - ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC) + ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC, mirror_writes=True) request = ddb.create_request(fake_publish.items, delete=delete) @@ -145,6 +196,285 @@ def test_batch_write( ) +@pytest.mark.parametrize( + "mirror,expected_request", + [ + ( + True, + { + "my-table": [ + { + "PutRequest": { + "Item": { + "web_uri": {"S": "/some/path"}, + "object_key": { + "S": "0bacfc5268f9994065dd858ece3359fd" + "7a99d82af5be84202b8e84c2a5b07ffa" + }, + # Note these timestamps come from the canned values + # on fake_publish.items + "from_date": {"S": "2023-10-04 03:52:00"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": {"S": "/other/path"}, + "object_key": { + "S": "e448a4330ff79a1b20069d436fae9480" + "6a0e2e3a6b309cd31421ef088c6439fb" + }, + "from_date": {"S": "2023-10-04 03:52:01"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1.1.0/repo/repomd.xml" + }, + "object_key": { + "S": "3f449eb3b942af58e9aca4c1cffdef89" + "c3f1552c20787ae8c966767a1fedd3a5" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1/repo/repomd.xml" + }, + "object_key": { + "S": "3f449eb3b942af58e9aca4c1cffdef89" + "c3f1552c20787ae8c966767a1fedd3a5" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex" + }, + "object_key": { + "S": "5891b5b522d5df086d0ff0b110fbd9d2" + "1bb4fc7163af34d08286a2e846f6be03" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1/repo/.__exodus_autoindex" + }, + "object_key": { + "S": "5891b5b522d5df086d0ff0b110fbd9d2" + "1bb4fc7163af34d08286a2e846f6be03" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + ], + }, + ), + ( + False, + { + "my-table": [ + { + "PutRequest": { + "Item": { + "web_uri": {"S": "/some/path"}, + "object_key": { + "S": "0bacfc5268f9994065dd858ece3359fd" + "7a99d82af5be84202b8e84c2a5b07ffa" + }, + # Note these timestamps come from the canned values + # on fake_publish.items + "from_date": {"S": "2023-10-04 03:52:00"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": {"S": "/other/path"}, + "object_key": { + "S": "e448a4330ff79a1b20069d436fae9480" + "6a0e2e3a6b309cd31421ef088c6439fb" + }, + "from_date": {"S": "2023-10-04 03:52:01"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1.1.0/repo/repomd.xml" + }, + "object_key": { + "S": "3f449eb3b942af58e9aca4c1cffdef89" + "c3f1552c20787ae8c966767a1fedd3a5" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + { + "PutRequest": { + "Item": { + "web_uri": { + "S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex" + }, + "object_key": { + "S": "5891b5b522d5df086d0ff0b110fbd9d2" + "1bb4fc7163af34d08286a2e846f6be03" + }, + "from_date": {"S": "2023-10-04 03:52:02"}, + "content_type": {"S": None}, + } + } + }, + ], + }, + ), + ], + ids=["Mirror-Enabled", "Mirror-Disabled"], +) +def test_batch_write_mirror_configurable( + mock_boto3_client, fake_publish, mirror, expected_request +): + ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC, mirror_writes=mirror) + + request = ddb.create_request(fake_publish.items, delete=False) + + # Represent successful write/delete of all items to the table. + mock_boto3_client.batch_write_item.return_value = {"UnprocessedItems": {}} + + ddb.batch_write(request) + + # Should've requested write of all items. + mock_boto3_client.batch_write_item.assert_called_once_with( + RequestItems=expected_request + ) + + +def test_write_mirror(mock_boto3_client): + expected_request = {'my-table': [ + # publish.items[0] both sides of the alias are mirrored. + {'PutRequest': { + 'Item': {'from_date': {'S': '2023-10-04 03:52:00'}, + 'web_uri': { + 'S': '/content/dist/rhel8/8.5/x86_64/baseos/os/repodata/abc123-primary.xml.gz'}, + 'object_key': { + 'S': '0bacfc5268f9994065dd858ece3359fd7a99d82af5be84202b8e84c2a5b07ffa'}, + 'content_type': {'S': None}}}} + , {'PutRequest': { + 'Item': {'from_date': {'S': '2023-10-04 03:52:00'}, 'web_uri': { + 'S': '/content/dist/rhel8/8/x86_64/baseos/os/repodata/abc123-primary.xml.gz'}, + 'object_key': { + 'S': '0bacfc5268f9994065dd858ece3359fd7a99d82af5be84202b8e84c2a5b07ffa'}, + 'content_type': {'S': None}}}}, + # publish.items[1] no alias matches, so it's just the provided uri + {'PutRequest': { + 'Item': {'from_date': {'S': '2023-10-04 03:52:01'}, + 'web_uri': { + 'S': '/content/dist/rhel9/9/x86_64/baseos/os/repodata/abc-primary.xml.gz'}, + 'object_key': { + 'S': 'e448a4330ff79a1b20069d436fae94806a0e2e3a6b309cd31421ef088c6439fb'}, + 'content_type': {'S': None}}}}, + # publish.items[2], dest of the alias, so no mirroring occurs. + {'PutRequest': { + 'Item': {'from_date': {'S': '2023-10-04 03:52:02'}, + 'web_uri': { + 'S': '/content/dist/rhel8/8.5/aarch64/appstream/debug/repodata/xyz-primary.xml.gz'}, + 'object_key': { + 'S': '3f449eb3b942af58e9aca4c1cffdef89c3f1552c20787ae8c966767a1fedd3a5'}, + 'content_type': {'S': None}}}}, + # publish.items[3] RHUI is not mirrored. + {'PutRequest': { + 'Item': {'from_date': {'S': '2023-10-04 03:52:02'}, + 'web_uri': { + 'S': '/content/dist/rhel8/rhui/8/aarch64/appstream/debug/repodata/ijk-primary.xml.gz'}, + 'object_key': { + 'S': '5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03'}, + 'content_type': {'S': None}}}}, + ] + } + publish = Publish( + id="123e4567-e89b-12d3-a456-426614174000", + env="test", + state="PENDING", + ) + publish.items = [ + # Matches the 8 => 8.5 alias in conftest.py, should mirror + Item( + web_uri="/content/dist/rhel8/8/x86_64/baseos/os/repodata/abc123-primary.xml.gz", + object_key="0bacfc5268f9994065dd858ece3359fd7a99d82af5be84202b8e84c2a5b07ffa", + publish_id=publish.id, + updated=datetime(2023, 10, 4, 3, 52, 0), + ), + # Doesn't match any aliases, so no mirroring. + Item( + web_uri="/content/dist/rhel9/9/x86_64/baseos/os/repodata/abc-primary.xml.gz", + object_key="e448a4330ff79a1b20069d436fae94806a0e2e3a6b309cd31421ef088c6439fb", + publish_id=publish.id, + updated=datetime(2023, 10, 4, 3, 52, 1), + ), + # The destination side of 8 => 8.5, should not mirror. + Item( + web_uri="/content/dist/rhel8/8.5/aarch64/appstream/debug/repodata/xyz-primary.xml.gz", + object_key="3f449eb3b942af58e9aca4c1cffdef89c3f1552c20787ae8c966767a1fedd3a5", + publish_id=publish.id, + updated=datetime(2023, 10, 4, 3, 52, 2), + ), + # RHUI aliases are not aliased on write, so we expect the same uri + # with no mirroring. rhsm-pulp is configured upstream to block + # writing directly to rhui, this test is here to document the + # current behaviour. + Item( + web_uri="/content/dist/rhel8/rhui/8/aarch64/appstream/debug/repodata/ijk-primary.xml.gz", + object_key="5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03", + publish_id=publish.id, + updated=datetime(2023, 10, 4, 3, 52, 2), + ), + ] + ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC, mirror_writes=True) + + request = ddb.create_request(publish.items, delete=False) + + # Represent successful write/delete of all items to the table. + mock_boto3_client.batch_write_item.return_value = {"UnprocessedItems": {}} + + ddb.batch_write(request) + + # Should've requested write of all items. + mock_boto3_client.batch_write_item.assert_called_once_with( + RequestItems=expected_request + ) + + def test_batch_write_item_limit(mock_boto3_client, fake_publish, caplog): items = fake_publish.items * 9 ddb = dynamodb.DynamoDB("test", Settings(), NOW_UTC) diff --git a/tests/worker/test_publish.py b/tests/worker/test_publish.py index 81d53a64..301ed6fe 100644 --- a/tests/worker/test_publish.py +++ b/tests/worker/test_publish.py @@ -757,3 +757,46 @@ def test_commit_missing_object_key( # It should've logged the reason why. assert "BUG: missing object_key for /some/path/to/link-src" in caplog.text + + +@mock.patch("exodus_gw.worker.publish.CurrentMessage.get_current_message") +@pytest.mark.parametrize( + "commit_class,expected", + [ + (worker.publish.CommitPhase1, [ + "/content/dist/rhel8/8.5/aarch64/appstream/debug/repodata/abc123-comps.xml", + "/content/dist/rhel8/8/aarch64/appstream/debug/repodata/abc123-comps.xml"]), + (worker.publish.CommitPhase2, [ + "/content/dist/rhel8/8.5/aarch64/appstream/debug/repodata/abc123-comps.xml"]) + ], + ids=["phase1", "phase2"] +) +def test_phase2_wont_mirror(mock_get_msg, fake_publish, db, commit_class, expected): + """ + Phase2 is set up to not mirror aliases. Test shows a url that would be mirrored by + Phase1 is not mirrored in Phase2. In reality, different phases work on + different files, but this at least confirms the general behaviour. + """ + + # Construct task that would be generated by caller. + task = _task(fake_publish.id) + # Construct dramatiq message that would be generated by caller. + mock_get_msg.return_value = mock.MagicMock( + message_id=task.id, kwargs={"publish_id": fake_publish.id} + ) + item = models.Item( + web_uri="/content/dist/rhel8/8/aarch64/appstream/debug/repodata/abc123-comps.xml", + link_to="/some/link-dest", + publish_id=fake_publish.id, + updated=datetime(2023, 10, 4, 3, 52, 0), + ) + db.add(fake_publish) + db.add(task) + # Caller would've set publish state to COMMITTING. + fake_publish.state = "COMMITTING" + db.commit() + commit_instance = commit_class(fake_publish.id, fake_publish.env, NOW_UTC, task.id, load_settings()) + + item_uris = commit_instance.dynamodb.uris_for_item(item) + + assert item_uris == expected \ No newline at end of file