From 49f83e11f225e5bc7a8de01c308747d7db552569 Mon Sep 17 00:00:00 2001 From: Christopher DeCarolis Date: Thu, 3 Oct 2024 17:23:39 -0700 Subject: [PATCH] [dagster-airlift][sensor] example of sensor pluggability (#25053) ## Summary & Motivation Adds a new example of switching AssetMaterialization events to AssetObservation events using sensor pluggability point. ## How I Tested These Changes New example test ## Changelog NOCHANGELOG --- .../examples/kitchen-sink/Makefile | 3 + .../airflow_dags/simple_unproxied.py | 27 +++++++ .../dagster_defs/observation_defs.py | 42 +++++++++++ .../integration_tests/test_e2e_observation.py | 75 +++++++++++++++++++ 4 files changed, 147 insertions(+) create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/simple_unproxied.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py create mode 100644 examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile b/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile index 2333caea39eba..b719d425ed14e 100644 --- a/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/Makefile @@ -34,6 +34,9 @@ run_dagster_mapped: run_dagster_automapped: dagster dev -m kitchen_sink.dagster_defs.automapped_defs -p 3333 +run_observation_defs: + dagster dev -m kitchen_sink.dagster_defs.observation_defs -p 3333 + wipe: ## Wipe out all the files created by the Makefile rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME) diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/simple_unproxied.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/simple_unproxied.py new file mode 100644 index 0000000000000..c765a66fcc6c5 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/airflow_dags/simple_unproxied.py @@ -0,0 +1,27 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.python import PythonOperator + + +def print_hello() -> None: + print("Hello") # noqa: T201 + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2023, 1, 1), + "retries": 1, +} + + +with DAG( + "simple_unproxied_dag", + default_args=default_args, + schedule_interval=None, + is_paused_upon_creation=False, +) as dag: + PythonOperator(task_id="print_task", python_callable=print_hello) >> PythonOperator( + task_id="downstream_print_task", python_callable=print_hello + ) # type: ignore diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py new file mode 100644 index 0000000000000..3cb5492247a5b --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink/dagster_defs/observation_defs.py @@ -0,0 +1,42 @@ +from typing import Iterable, Sequence + +from dagster import Definitions +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.events import AssetMaterialization, AssetObservation +from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs +from dagster_airlift.core.sensor.event_translation import AssetEvent + +from .airflow_instance import local_airflow_instance + + +def observations_from_materializations( + _context, _airflow_data, materializations: Sequence[AssetMaterialization] +) -> Iterable[AssetEvent]: + """Construct AssetObservations from AssetMaterializations.""" + for materialization in materializations: + yield AssetObservation( + asset_key=materialization.asset_key, + description=materialization.description, + metadata=materialization.metadata, + tags=materialization.tags, + ) + + +def build_mapped_defs() -> Definitions: + return build_defs_from_airflow_instance( + airflow_instance=local_airflow_instance(), + defs=Definitions.merge( + dag_defs( + "simple_unproxied_dag", + task_defs("print_task", Definitions(assets=[AssetSpec("my_asset")])), + task_defs( + "downstream_print_task", + Definitions(assets=[AssetSpec("my_downstream_asset", deps=["my_asset"])]), + ), + ), + ), + event_transformer_fn=observations_from_materializations, + ) + + +defs = build_mapped_defs() diff --git a/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py new file mode 100644 index 0000000000000..d69243777e0ba --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_observation.py @@ -0,0 +1,75 @@ +import os +import time +from datetime import timedelta +from typing import List + +import pytest +from dagster import AssetKey, DagsterInstance +from dagster._core.definitions.metadata.metadata_value import JsonMetadataValue +from dagster._core.events.log import EventLogEntry +from dagster._time import get_current_datetime + +from kitchen_sink_tests.integration_tests.conftest import makefile_dir + +RAW_METADATA_KEY = "Run Metadata (raw)" + + +def dag_id_of_mat(event_log_entry: EventLogEntry) -> bool: + assert event_log_entry.asset_materialization + assert isinstance(event_log_entry.asset_materialization.metadata, dict) + json_metadata_value = event_log_entry.asset_materialization.metadata[RAW_METADATA_KEY] + assert isinstance(json_metadata_value, JsonMetadataValue) + assert isinstance(json_metadata_value.data, dict) + return json_metadata_value.data["dag_id"] + + +def poll_for_observation( + dagster_instance: DagsterInstance, + asset_key: AssetKey, +) -> EventLogEntry: + start_time = get_current_datetime() + while get_current_datetime() - start_time < timedelta(seconds=30): + records = dagster_instance.fetch_observations(records_filter=asset_key, limit=1).records + if len(records) > 0: + return records[0].event_log_entry + time.sleep(0.1) + + raise Exception(f"Timeout waiting for observation event on {asset_key}") + + +@pytest.fixture(name="dagster_home") +def dagster_home_fixture(local_env: None) -> str: + return os.environ["DAGSTER_HOME"] + + +@pytest.fixture(name="dagster_dev_cmd") +def dagster_dev_cmd_fixture() -> List[str]: + return ["make", "run_observation_defs", "-C", str(makefile_dir())] + + +def test_observation_defs_are_observed( + airflow_instance: None, + dagster_dev: None, + dagster_home: str, +) -> None: + """Test that assets can load properly, and that observations register.""" + from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance + + af_instance = local_airflow_instance() + + expected_obs_per_dag = { + "simple_unproxied_dag": [AssetKey("my_asset"), AssetKey("my_downstream_asset")], + } + + for dag_id, expected_asset_keys in expected_obs_per_dag.items(): + airflow_run_id = af_instance.trigger_dag(dag_id=dag_id) + af_instance.wait_for_run_completion(dag_id=dag_id, run_id=airflow_run_id, timeout=60) + dagster_instance = DagsterInstance.get() + + dag_asset_key = AssetKey(["my_airflow_instance", "dag", dag_id]) + assert poll_for_observation(dagster_instance, dag_asset_key) + + for expected_asset_key in expected_asset_keys: + obs_entry = poll_for_observation(dagster_instance, expected_asset_key) + assert obs_entry.asset_observation + assert obs_entry.asset_observation.asset_key == expected_asset_key