Skip to content

Commit

Permalink
fix(assets): extend Asset as_expression methods to include name, grou…
Browse files Browse the repository at this point in the history
…p fields (also AssetAlias group field)
  • Loading branch information
Lee-W committed Nov 14, 2024
1 parent cd30368 commit a57ed8b
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 13 deletions.
10 changes: 6 additions & 4 deletions airflow/assets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def as_expression(self) -> Any:
:meta private:
"""
return self.uri
return {"asset": {"uri": self.uri, "name": self.name, "group": self.group}}

def iter_assets(self) -> Iterator[tuple[str, Asset]]:
yield self.uri, self
Expand Down Expand Up @@ -380,7 +380,8 @@ def __init__(self, *objects: BaseAsset) -> None:
raise TypeError("expect asset expressions in condition")

self.objects = [
_AssetAliasCondition(obj.name) if isinstance(obj, AssetAlias) else obj for obj in objects
_AssetAliasCondition(name=obj.name, group=obj.group) if isinstance(obj, AssetAlias) else obj
for obj in objects
]

def evaluate(self, statuses: dict[str, bool]) -> bool:
Expand Down Expand Up @@ -440,8 +441,9 @@ class _AssetAliasCondition(AssetAny):
:meta private:
"""

def __init__(self, name: str) -> None:
def __init__(self, name: str, group: str) -> None:
self.name = name
self.group = group
self.objects = expand_alias_to_assets(name)

def __repr__(self) -> str:
Expand All @@ -453,7 +455,7 @@ def as_expression(self) -> Any:
:meta private:
"""
return {"alias": self.name}
return {"alias": {"name": self.name, "group": self.group}}

def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
yield self.name, AssetAlias(self.name)
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ def detect_task_dependencies(task: Operator) -> list[DagDependency]:
)
)
elif isinstance(obj, AssetAlias):
cond = _AssetAliasCondition(obj.name)
cond = _AssetAliasCondition(name=obj.name, group=obj.group)

deps.extend(cond.iter_dag_dependencies(source=task.dag_id, target=""))
return deps
Expand Down
4 changes: 3 additions & 1 deletion airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ def __init__(self, assets: BaseAsset) -> None:
super().__init__()
self.asset_condition = assets
if isinstance(self.asset_condition, AssetAlias):
self.asset_condition = _AssetAliasCondition(self.asset_condition.name)
self.asset_condition = _AssetAliasCondition(
name=self.asset_condition.name, group=self.asset_condition.group
)

if not next(self.asset_condition.iter_assets(), False):
self._summary = AssetTriggeredTimetable.UNRESOLVED_ALIAS_SUMMARY
Expand Down
4 changes: 3 additions & 1 deletion tests/api_fastapi/core_api/routes/ui/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def test_next_run_assets(test_client, dag_maker):

assert response.status_code == 200
assert response.json() == {
"asset_expression": {"all": ["s3://bucket/key/1"]},
"asset_expression": {
"all": [{"asset": {"uri": "s3://bucket/key/1", "name": "s3://bucket/key/1", "group": ""}}]
},
"events": [{"id": 17, "uri": "s3://bucket/key/1", "lastUpdate": None}],
}
10 changes: 5 additions & 5 deletions tests/assets/test_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,22 +597,22 @@ def resolved_asset_alias_2(self, session, asset_1):
return asset_alias_2

def test_init(self, asset_alias_1, asset_1, resolved_asset_alias_2):
cond = _AssetAliasCondition(name=asset_alias_1.name)
cond = _AssetAliasCondition(name=asset_alias_1.name, group=asset_alias_1.group)
assert cond.objects == []

cond = _AssetAliasCondition(name=resolved_asset_alias_2.name)
cond = _AssetAliasCondition(name=resolved_asset_alias_2.name, group=resolved_asset_alias_2.group)
assert cond.objects == [Asset(uri=asset_1.uri)]

def test_as_expression(self, asset_alias_1, resolved_asset_alias_2):
for assset_alias in (asset_alias_1, resolved_asset_alias_2):
cond = _AssetAliasCondition(assset_alias.name)
cond = _AssetAliasCondition(name=assset_alias.name, group=assset_alias.group)
assert cond.as_expression() == {"alias": assset_alias.name}

def test_evalute(self, asset_alias_1, resolved_asset_alias_2, asset_1):
cond = _AssetAliasCondition(asset_alias_1.name)
cond = _AssetAliasCondition(name=asset_alias_1.name, group=asset_alias_1.group)
assert cond.evaluate({asset_1.uri: True}) is False

cond = _AssetAliasCondition(resolved_asset_alias_2.name)
cond = _AssetAliasCondition(name=resolved_asset_alias_2.name, group=resolved_asset_alias_2.group)
assert cond.evaluate({asset_1.uri: True}) is True


Expand Down
4 changes: 3 additions & 1 deletion tests/www/views/test_views_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@ def test_next_run_assets(admin_client, dag_maker, session, app, monkeypatch):

assert resp.status_code == 200, resp.json
assert resp.json == {
"asset_expression": {"all": ["s3://bucket/key/1", "s3://bucket/key/2"]},
"asset_expression": {
"all": [{"asset": {"uri": "s3://bucket/key/1", "name": "s3://bucket/key/2", "group": ""}}]
},
"events": [
{"id": asset1_id, "uri": "s3://bucket/key/1", "lastUpdate": "2022-08-02T02:00:00+00:00"},
{"id": asset2_id, "uri": "s3://bucket/key/2", "lastUpdate": None},
Expand Down

0 comments on commit a57ed8b

Please sign in to comment.