diff --git a/airflow/decorators/assets.py b/airflow/decorators/assets.py index 3b7d357bd5e8..5f87d489525a 100644 --- a/airflow/decorators/assets.py +++ b/airflow/decorators/assets.py @@ -63,7 +63,7 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]: @attrs.define(kw_only=True) -class AssetDefinition: +class AssetDefinition(Asset): """ Asset representation from decorating a function with ``@asset``. @@ -71,7 +71,6 @@ class AssetDefinition: """ name: str # TODO: This should be stored on Asset. - asset: Asset function: types.FunctionType schedule: ScheduleArg @@ -82,7 +81,7 @@ def __attrs_post_init__(self) -> None: task_id="__main__", # TODO: This should use the name argument instead. inlets=[Asset(uri=k) for k in parameters if k not in ("self", "context")], - outlets=[self.asset], + outlets=[self], python_callable=self.function, definition_name=self.name, ) @@ -107,10 +106,8 @@ def __call__(self, f: types.FunctionType) -> AssetDefinition: raise ValueError(f"prohibited name for asset: {name}") return AssetDefinition( name=name, - asset=Asset( - uri=name if self.uri is None else str(self.uri), - extra=self.extra, - ), + uri=name if self.uri is None else str(self.uri), + extra=self.extra, function=f, schedule=self.schedule, )