Skip to content

Commit

Permalink
Mirror release version alias writes [RHELDST-28333]
Browse files Browse the repository at this point in the history
During the RHEL GA process some repos would appear to temporarily
corrupt from the customer's point of view, usually appearing as 404
errors when fetching repodata files. This is due to aliases getting
updated and delay in data being flushed.

This change introduces mirroring writes on both the source and
destination, which will prevent these errors from occuring.
  • Loading branch information
amcmahon-rh committed Feb 4, 2025
1 parent 1183517 commit 3fd9380
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 28 deletions.
74 changes: 47 additions & 27 deletions exodus_gw/aws/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ def __init__(
from_date: str,
env_obj: Environment | None = None,
deadline: datetime | None = None,
mirror_writes: bool = False
):
self.env = env
self.settings = settings
self.from_date = from_date
self.env_obj = env_obj or get_environment(env)
self.deadline = deadline
self.mirror_writes = mirror_writes
self.client = DynamoDBClientWrapper(self.env_obj.aws_profile).client
self._lock = Lock()
self._definitions = None
Expand Down Expand Up @@ -208,6 +210,29 @@ def query_definitions(self) -> dict[str, Any]:
out = json.loads(item_json)
return out

def uris_for_item(self, item) -> list[str]:
"""Returns all URIs to be written for the given item.
In practice, always returns either one or two URIs depending on
configured aliases and other settings, though the caller should
assume any number of URIs.
"""

# Resolve aliases. We only write to the deepest path
# after all alias resolution, hence only using the
# first result from uri_alias.
uris = [uri_alias(item.web_uri, self.aliases_for_write)[0]]

# We only want to mirror writes for release ver aliases. Recalculating
# the aliases completely is a bit inefficient, but I'd rather not
# duplicate any alias logic.
if self.mirror_writes and \
uri_alias(
item.web_uri, self._aliases(["releasever_alias"])
)[0] != item.web_uri:
uris.append(item.web_uri)
return uris

def create_request(
self,
items: list[models.Item],
Expand All @@ -216,8 +241,6 @@ def create_request(
"""Create the dictionary structure expected by batch_write_item."""
table_name = self.env_obj.table
request: dict[str, list[Any]] = {table_name: []}
uri_aliases = self.aliases_for_write

for item in items:
# Items carry their own from_date. This effectively resolves
# conflicts in the case of two publishes updating the same web_uri
Expand All @@ -226,35 +249,32 @@ def create_request(
# updated timestamp.
from_date = str(item.updated)

# Resolve aliases. We only write to the deepest path
# after all alias resolution, hence only using the
# first result from uri_alias.
web_uri = uri_alias(item.web_uri, uri_aliases)[0]

if delete:
request[table_name].append(
{
"DeleteRequest": {
"Key": {
"from_date": {"S": from_date},
"web_uri": {"S": web_uri},
for web_uri in self.uris_for_item(item):
if delete:
request[table_name].append(
{
"DeleteRequest": {
"Key": {
"from_date": {"S": from_date},
"web_uri": {"S": web_uri},
}
}
}
}
)
else:
request[table_name].append(
{
"PutRequest": {
"Item": {
"from_date": {"S": from_date},
"web_uri": {"S": web_uri},
"object_key": {"S": item.object_key},
"content_type": {"S": item.content_type},
)
else:
request[table_name].append(
{
"PutRequest": {
"Item": {
"from_date": {"S": from_date},
"web_uri": {"S": web_uri},
"object_key": {"S": item.object_key},
"content_type": {"S": item.content_type},
}
}
}
}
)
)

return request

def create_config_request(self, config):
Expand Down
4 changes: 4 additions & 0 deletions exodus_gw/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,10 @@ class Settings(BaseSettings):
s3_pool_size: int = 3
"""Number of S3 clients to cache"""

mirror_writes_enabled: bool = True
"""Whether both the original url and releasever alias are written during
phase 1 commits."""

model_config = SettingsConfigDict(env_prefix="exodus_gw_")


Expand Down
9 changes: 9 additions & 0 deletions exodus_gw/worker/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,14 @@ def dynamodb(self):
self.from_date,
self.env_obj,
self.task.deadline,
self.should_mirror_writes,
)
return self._dynamodb

@property
def should_mirror_writes(self):
return False

@property
def task_ready(self) -> bool:
task = self.task
Expand Down Expand Up @@ -446,6 +451,10 @@ class CommitPhase1(CommitBase):
# phase1 commit is allowed to proceed in either of these states.
PUBLISH_STATES = [PublishStates.committing, PublishStates.pending]

@property
def should_mirror_writes(self):
return self.settings.mirror_writes_enabled

@property
def item_select(self):
# Query for items to be handled by phase1 commit.
Expand Down
Loading

0 comments on commit 3fd9380

Please sign in to comment.