Skip to content

Commit

Permalink
Test dynamic partitions and remove old ones (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Loftus authored Dec 23, 2024
1 parent b2668df commit be0f605
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
16 changes: 15 additions & 1 deletion userCode/lib/dagster_env.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
from dagster import DynamicPartitionsDefinition
from dagster import DagsterInstance, DynamicPartitionsDefinition, get_dagster_logger

# This is the list of sources to crawl that is dynamically generated at runtime by parsing the geoconnex config
sources_partitions_def = DynamicPartitionsDefinition(name="sources_partitions_def")


def filter_partitions(
instance: DagsterInstance, partition_name: str, keys_to_keep: set[str]
) -> None:
"""Remove all old partitions that are not in the list of keys to keep.
This is needed since dagster does not remove old partitions but keeps them by default for historical monitoring"""
sources_partitions_def = instance.get_dynamic_partitions(partition_name)
for key in sources_partitions_def:
if key not in keys_to_keep:
get_dagster_logger().info(
f"Deleting partition: {key} from {partition_name}"
)
instance.delete_dynamic_partition(partition_name, key)
14 changes: 11 additions & 3 deletions userCode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
template_rclone,
)
from urllib.parse import urlparse
from .lib.dagster_env import sources_partitions_def
from .lib.dagster_env import filter_partitions, sources_partitions_def
from .lib.env import (
GLEANER_GRAPH_URL,
GLEANER_HEADLESS_ENDPOINT,
Expand Down Expand Up @@ -91,12 +91,14 @@ def gleaner_config(context: AssetExecutionContext):
Lines: list[str] = [sitemap.findNext("loc").text for sitemap in sitemapTags]

sources = []
names = set()
names: set[str] = set()

assert (
len(Lines) > 0
), f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}"

# context.instance.delete_dynamic_partition("sources_partitions_def")

for line in Lines:
basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml")
name = (
Expand All @@ -108,7 +110,9 @@ def gleaner_config(context: AssetExecutionContext):
)
name = remove_non_alphanumeric(name)
if name in names:
print(f"Warning! Skipping duplicate name {name}")
get_dagster_logger().warning(
f"Found duplicate name '{name}' in line '{line}' in sitemap {REMOTE_GLEANER_SITEMAP}. Skipping adding it again"
)
continue

parsed_url = urlparse(REMOTE_GLEANER_SITEMAP)
Expand All @@ -128,6 +132,10 @@ def gleaner_config(context: AssetExecutionContext):
names.add(name)
sources.append(data)

get_dagster_logger().info(f"Found {len(sources)} sources in the sitemap")

filter_partitions(context.instance, "sources_partitions_def", names)

# Each source is a partition that can be crawled independently
context.instance.add_dynamic_partitions(
partitions_def_name="sources_partitions_def", partition_keys=list(names)
Expand Down
55 changes: 55 additions & 0 deletions userCode/test/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,58 @@ def test_materialize_ref_hu02():
)

assert result.success, "Job execution failed for partition 'ref_hu02_hu02__0'"


def test_dynamic_partitions():
"""Make sure that a new materialization of the gleaner config will create new partitions"""
instance = DagsterInstance.ephemeral()
mocked_partition_keys = ["test_partition1", "test_partition2", "test_partition3"]
instance.add_dynamic_partitions(
partitions_def_name="sources_partitions_def",
partition_keys=list(mocked_partition_keys),
)

assert (
instance.get_dynamic_partitions("sources_partitions_def")
== mocked_partition_keys
)

assets = load_assets_from_modules([main])
# It is possible to load certain asset types that cannot be passed into
# Materialize so we filter them to avoid a pyright type error
filtered_assets = [
asset
for asset in assets
if isinstance(asset, (AssetsDefinition, AssetSpec, SourceAsset))
]
# These three assets are needed to generate the dynamic partition.
result = materialize(
assets=filtered_assets,
selection=["gleaner_config"],
instance=instance,
)
assert result.success, "Expected gleaner config to materialize"

assert (
instance.get_dynamic_partitions("sources_partitions_def")
!= mocked_partition_keys
)
newPartitions = instance.get_dynamic_partitions("sources_partitions_def")

# Make sure that the old partition keys aren't in the asset but
# the new ones are
for key in mocked_partition_keys:
assert key not in newPartitions
assert "ref_hu02_hu02__0" in newPartitions
assert "ref_hu04_hu04__0" in newPartitions

# Check what happens when we delete a specific key in the dynamic partition
instance.delete_dynamic_partition("sources_partitions_def", "ref_hu02_hu02__0")

# Make sure that
partitionsAfterDelete = instance.get_dynamic_partitions("sources_partitions_def")
assert "ref_hu02_hu02__0" not in partitionsAfterDelete
assert "ref_hu04_hu04__0" in partitionsAfterDelete
assert len(partitionsAfterDelete) == len(newPartitions) - 1
for key in mocked_partition_keys:
assert key not in partitionsAfterDelete

0 comments on commit be0f605

Please sign in to comment.