diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/lakehouse.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/lakehouse.py index 0d14bed361b3b..d6f0ba3bc0875 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/lakehouse.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/lakehouse.py @@ -3,6 +3,7 @@ from dagster import AssetKey, AssetSpec, Definitions, multi_asset from dagster._core.definitions.asset_checks import AssetChecksDefinition +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) @@ -25,9 +26,9 @@ def specs_from_lakehouse( ] -def defs_from_lakehouse( +def lakehouse_assets_def( *, specs: Sequence[AssetSpec], csv_path: Path, duckdb_path: Path, columns: List[str] -) -> Definitions: +) -> AssetsDefinition: @multi_asset(specs=specs) def _multi_asset() -> None: load_csv_to_duckdb( @@ -36,7 +37,19 @@ def _multi_asset() -> None: columns=columns, ) - return Definitions(assets=[_multi_asset]) + return _multi_asset + + +def defs_from_lakehouse( + *, specs: Sequence[AssetSpec], csv_path: Path, duckdb_path: Path, columns: List[str] +) -> Definitions: + return Definitions( + assets=[ + lakehouse_assets_def( + specs=specs, csv_path=csv_path, duckdb_path=duckdb_path, columns=columns + ) + ] + ) def lakehouse_existence_check(csv_path: Path, duckdb_path: Path) -> AssetChecksDefinition: diff --git a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py index e31f7091a5129..2b4262ab9c0d0 100644 --- a/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py +++ b/examples/experimental/dagster-airlift/examples/dbt-example/dbt_example/dagster_defs/migrate.py @@ -2,28 +2,19 @@ from dagster_airlift.core import ( AirflowInstance, BasicAuthBackend, + assets_with_task_mappings, build_defs_from_airflow_instance, - dag_defs, - task_defs, ) -from dagster_airlift.dbt import dbt_defs -from dagster_dbt import DbtProject from dbt_example.dagster_defs.lakehouse import ( - defs_from_lakehouse, - lakehouse_existence_check_defs, + lakehouse_assets_def, + lakehouse_existence_check, specs_from_lakehouse, ) from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS -from .constants import ( - AIRFLOW_BASE_URL, - AIRFLOW_INSTANCE_NAME, - PASSWORD, - USERNAME, - dbt_manifest_path, - dbt_project_path, -) +from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME +from .jaffle_shop import jaffle_shop_assets, jaffle_shop_resource airflow_instance = AirflowInstance( auth_backend=BasicAuthBackend( @@ -35,29 +26,24 @@ defs = build_defs_from_airflow_instance( airflow_instance=airflow_instance, - defs=Definitions.merge( - dag_defs( - "rebuild_iris_models", - task_defs( - "load_iris", - defs_from_lakehouse( - specs=specs_from_lakehouse(csv_path=CSV_PATH), - csv_path=CSV_PATH, - duckdb_path=DB_PATH, - columns=IRIS_COLUMNS, - ), - ), - task_defs( - "build_dbt_models", - dbt_defs( - manifest=dbt_manifest_path(), - project=DbtProject(dbt_project_path()), - ), - ), - ), - lakehouse_existence_check_defs( - csv_path=CSV_PATH, - duckdb_path=DB_PATH, - ), + defs=Definitions( + assets=[ + *assets_with_task_mappings( + dag_id="rebuild_iris_models", + task_mappings={ + "load_iris": [ + lakehouse_assets_def( + specs=specs_from_lakehouse(csv_path=CSV_PATH), + csv_path=CSV_PATH, + duckdb_path=DB_PATH, + columns=IRIS_COLUMNS, + ) + ], + "build_dbt_models": [jaffle_shop_assets], + }, + ) + ], + asset_checks=[lakehouse_existence_check(csv_path=CSV_PATH, duckdb_path=DB_PATH)], + resources={"dbt": jaffle_shop_resource()}, ), )