Skip to content

Commit

Permalink
[dagster-airlift][sensor] example of sensor pluggability (#25053)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Adds a new example of switching AssetMaterialization events to
AssetObservation events using sensor pluggability point.
## How I Tested These Changes
New example test
## Changelog
NOCHANGELOG
  • Loading branch information
dpeng817 authored Oct 4, 2024
1 parent 9bd94e9 commit 49f83e1
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ run_dagster_mapped:
run_dagster_automapped:
dagster dev -m kitchen_sink.dagster_defs.automapped_defs -p 3333

run_observation_defs:
dagster dev -m kitchen_sink.dagster_defs.observation_defs -p 3333

wipe: ## Wipe out all the files created by the Makefile
rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator


def print_hello() -> None:
print("Hello") # noqa: T201


default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 1, 1),
"retries": 1,
}


with DAG(
"simple_unproxied_dag",
default_args=default_args,
schedule_interval=None,
is_paused_upon_creation=False,
) as dag:
PythonOperator(task_id="print_task", python_callable=print_hello) >> PythonOperator(
task_id="downstream_print_task", python_callable=print_hello
) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Iterable, Sequence

from dagster import Definitions
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.events import AssetMaterialization, AssetObservation
from dagster_airlift.core import build_defs_from_airflow_instance, dag_defs, task_defs
from dagster_airlift.core.sensor.event_translation import AssetEvent

from .airflow_instance import local_airflow_instance


def observations_from_materializations(
_context, _airflow_data, materializations: Sequence[AssetMaterialization]
) -> Iterable[AssetEvent]:
"""Construct AssetObservations from AssetMaterializations."""
for materialization in materializations:
yield AssetObservation(
asset_key=materialization.asset_key,
description=materialization.description,
metadata=materialization.metadata,
tags=materialization.tags,
)


def build_mapped_defs() -> Definitions:
return build_defs_from_airflow_instance(
airflow_instance=local_airflow_instance(),
defs=Definitions.merge(
dag_defs(
"simple_unproxied_dag",
task_defs("print_task", Definitions(assets=[AssetSpec("my_asset")])),
task_defs(
"downstream_print_task",
Definitions(assets=[AssetSpec("my_downstream_asset", deps=["my_asset"])]),
),
),
),
event_transformer_fn=observations_from_materializations,
)


defs = build_mapped_defs()
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import time
from datetime import timedelta
from typing import List

import pytest
from dagster import AssetKey, DagsterInstance
from dagster._core.definitions.metadata.metadata_value import JsonMetadataValue
from dagster._core.events.log import EventLogEntry
from dagster._time import get_current_datetime

from kitchen_sink_tests.integration_tests.conftest import makefile_dir

RAW_METADATA_KEY = "Run Metadata (raw)"


def dag_id_of_mat(event_log_entry: EventLogEntry) -> bool:
assert event_log_entry.asset_materialization
assert isinstance(event_log_entry.asset_materialization.metadata, dict)
json_metadata_value = event_log_entry.asset_materialization.metadata[RAW_METADATA_KEY]
assert isinstance(json_metadata_value, JsonMetadataValue)
assert isinstance(json_metadata_value.data, dict)
return json_metadata_value.data["dag_id"]


def poll_for_observation(
dagster_instance: DagsterInstance,
asset_key: AssetKey,
) -> EventLogEntry:
start_time = get_current_datetime()
while get_current_datetime() - start_time < timedelta(seconds=30):
records = dagster_instance.fetch_observations(records_filter=asset_key, limit=1).records
if len(records) > 0:
return records[0].event_log_entry
time.sleep(0.1)

raise Exception(f"Timeout waiting for observation event on {asset_key}")


@pytest.fixture(name="dagster_home")
def dagster_home_fixture(local_env: None) -> str:
return os.environ["DAGSTER_HOME"]


@pytest.fixture(name="dagster_dev_cmd")
def dagster_dev_cmd_fixture() -> List[str]:
return ["make", "run_observation_defs", "-C", str(makefile_dir())]


def test_observation_defs_are_observed(
airflow_instance: None,
dagster_dev: None,
dagster_home: str,
) -> None:
"""Test that assets can load properly, and that observations register."""
from kitchen_sink.dagster_defs.airflow_instance import local_airflow_instance

af_instance = local_airflow_instance()

expected_obs_per_dag = {
"simple_unproxied_dag": [AssetKey("my_asset"), AssetKey("my_downstream_asset")],
}

for dag_id, expected_asset_keys in expected_obs_per_dag.items():
airflow_run_id = af_instance.trigger_dag(dag_id=dag_id)
af_instance.wait_for_run_completion(dag_id=dag_id, run_id=airflow_run_id, timeout=60)
dagster_instance = DagsterInstance.get()

dag_asset_key = AssetKey(["my_airflow_instance", "dag", dag_id])
assert poll_for_observation(dagster_instance, dag_asset_key)

for expected_asset_key in expected_asset_keys:
obs_entry = poll_for_observation(dagster_instance, expected_asset_key)
assert obs_entry.asset_observation
assert obs_entry.asset_observation.asset_key == expected_asset_key

0 comments on commit 49f83e1

Please sign in to comment.