Skip to content

Commit

Permalink
test(decorators/assets): extend test_determine_kwargs to cover active…
Browse files Browse the repository at this point in the history
… asset
  • Loading branch information
Lee-W committed Nov 1, 2024
1 parent 951dbc5 commit 0ef189e
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions tests/decorators/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.assets import Asset
from airflow.decorators.assets import AssetRef, _AssetMainOperator, asset
from airflow.models.asset import AssetActive, AssetModel

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -145,21 +146,26 @@ def test__attrs_post_init__(


class Test_AssetMainOperator:
def test_determine_kwargs(self, example_asset_func_with_valid_arg_as_inlet_asset):
def test_determine_kwargs(self, example_asset_func_with_valid_arg_as_inlet_asset, session):
example_asset = AssetModel(uri="s3://bucket/object1", name="inlet_asset_1")
session.add(example_asset)
session.add(AssetActive.for_asset(example_asset))
session.commit()

asset_definition = asset(schedule=None, uri="s3://bucket/object", group="MLModel", extra={"k": "v"})(
example_asset_func_with_valid_arg_as_inlet_asset
)

op = _AssetMainOperator(
task_id="__main__",
inlets=[],
inlets=[AssetRef(name="inlet_asset_1"), AssetRef(name="inlet_asset_2")],
outlets=[asset_definition],
python_callable=example_asset_func_with_valid_arg_as_inlet_asset,
definition_name="example_asset_func",
)
assert op.determine_kwargs(context={"k": "v"}) == {
"_self": Asset(name="example_asset_func"),
"context": {"k": "v"},
"inlet_asset_1": Asset(name="inlet_asset_1"),
"inlet_asset_1": Asset(name="inlet_asset_1", uri="s3://bucket/object1"),
"inlet_asset_2": Asset(name="inlet_asset_2"),
}

0 comments on commit 0ef189e

Please sign in to comment.