Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] Migrate complete to assets_with_task_mappings #25095

Open
wants to merge 1 commit into
base: undo-definitions-based-api-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dagster import AssetKey, AssetSpec, Definitions, multi_asset
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand All @@ -25,9 +26,9 @@ def specs_from_lakehouse(
]


def defs_from_lakehouse(
def lakehouse_assets_def(
*, specs: Sequence[AssetSpec], csv_path: Path, duckdb_path: Path, columns: List[str]
) -> Definitions:
) -> AssetsDefinition:
@multi_asset(specs=specs)
def _multi_asset() -> None:
load_csv_to_duckdb(
Expand All @@ -36,7 +37,19 @@ def _multi_asset() -> None:
columns=columns,
)

return Definitions(assets=[_multi_asset])
return _multi_asset


def defs_from_lakehouse(
*, specs: Sequence[AssetSpec], csv_path: Path, duckdb_path: Path, columns: List[str]
) -> Definitions:
return Definitions(
assets=[
lakehouse_assets_def(
specs=specs, csv_path=csv_path, duckdb_path=duckdb_path, columns=columns
)
]
)


def lakehouse_existence_check(csv_path: Path, duckdb_path: Path) -> AssetChecksDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,19 @@
from dagster_airlift.core import (
AirflowInstance,
BasicAuthBackend,
assets_with_task_mappings,
build_defs_from_airflow_instance,
dag_defs,
task_defs,
)
from dagster_airlift.dbt import dbt_defs
from dagster_dbt import DbtProject

from dbt_example.dagster_defs.lakehouse import (
defs_from_lakehouse,
lakehouse_existence_check_defs,
lakehouse_assets_def,
lakehouse_existence_check,
specs_from_lakehouse,
)
from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS

from .constants import (
AIRFLOW_BASE_URL,
AIRFLOW_INSTANCE_NAME,
PASSWORD,
USERNAME,
dbt_manifest_path,
dbt_project_path,
)
from .constants import AIRFLOW_BASE_URL, AIRFLOW_INSTANCE_NAME, PASSWORD, USERNAME
from .jaffle_shop import jaffle_shop_assets, jaffle_shop_resource

airflow_instance = AirflowInstance(
auth_backend=BasicAuthBackend(
Expand All @@ -35,29 +26,24 @@

defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=Definitions.merge(
dag_defs(
"rebuild_iris_models",
task_defs(
"load_iris",
defs_from_lakehouse(
specs=specs_from_lakehouse(csv_path=CSV_PATH),
csv_path=CSV_PATH,
duckdb_path=DB_PATH,
columns=IRIS_COLUMNS,
),
),
task_defs(
"build_dbt_models",
dbt_defs(
manifest=dbt_manifest_path(),
project=DbtProject(dbt_project_path()),
),
),
),
lakehouse_existence_check_defs(
csv_path=CSV_PATH,
duckdb_path=DB_PATH,
),
defs=Definitions(
assets=[
*assets_with_task_mappings(
dag_id="rebuild_iris_models",
task_mappings={
"load_iris": [
lakehouse_assets_def(
specs=specs_from_lakehouse(csv_path=CSV_PATH),
csv_path=CSV_PATH,
duckdb_path=DB_PATH,
columns=IRIS_COLUMNS,
)
],
"build_dbt_models": [jaffle_shop_assets],
},
)
],
asset_checks=[lakehouse_existence_check(csv_path=CSV_PATH, duckdb_path=DB_PATH)],
resources={"dbt": jaffle_shop_resource()},
),
)