Skip to content

Commit

Permalink
test(decorators/assets): add test cases to AssetDefinition
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Oct 30, 2024
1 parent a12e17a commit 2b7dbda
Showing 1 changed file with 43 additions and 2 deletions.
45 changes: 43 additions & 2 deletions tests/decorators/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
# under the License.
from __future__ import annotations

from unittest import mock

import pytest

from airflow.assets import Asset
from airflow.decorators.assets import asset

pytestmark = pytest.mark.db_test
Expand Down Expand Up @@ -46,13 +49,20 @@ def _example_asset_func():
return _example_asset_func


@pytest.fixture
def example_asset_definition(example_asset_func):
return asset(schedule=None, uri="s3://bucket/object", group="MLModel", extra={"k": "v"})(
example_asset_func
)


@pytest.fixture
def example_asset_func_with_valid_arg_as_inlet_asset():
def _example_asset_func(inlet_asset_1, inlet_asset_2):
return "This is example_asset"

_example_asset_func.__name__ = "context"
_example_asset_func.__qualname__ = "context"
_example_asset_func.__name__ = "example_asset_func"
_example_asset_func.__qualname__ = "example_asset_func"
return _example_asset_func


Expand Down Expand Up @@ -105,3 +115,34 @@ def test_with_invalid_asset_name(self, example_asset_func):
asset(schedule=None)(example_asset_func)

assert err.value.args[0].startswith("prohibited name for asset: ")


class TestAssetDefinition:
def test_serialzie(self, example_asset_definition):
assert example_asset_definition.serialize() == {
"extra": {"k": "v"},
"group": "MLModel",
"name": "example_asset_func",
"uri": "s3://bucket/object",
}

@mock.patch("airflow.decorators.assets._AssetMainOperator")
@mock.patch("airflow.decorators.assets.DAG")
def test__attrs_post_init__(
self, DAG, _AssetMainOperator, example_asset_func_with_valid_arg_as_inlet_asset
):
asset_definition = asset(schedule=None, uri="s3://bucket/object", group="MLModel", extra={"k": "v"})(
example_asset_func_with_valid_arg_as_inlet_asset
)

DAG.assert_called_once_with(dag_id="example_asset_func", schedule=None, auto_register=True)
_AssetMainOperator.assert_called_once_with(
task_id="__main__",
inlets=[
Asset(name="inlet_asset_1", uri="inlet_asset_1", group="", extra={}),
Asset(name="inlet_asset_2", uri="inlet_asset_2", group="", extra={}),
],
outlets=[asset_definition],
python_callable=example_asset_func_with_valid_arg_as_inlet_asset,
definition_name="example_asset_func",
)

0 comments on commit 2b7dbda

Please sign in to comment.