diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py index 59da50a12dee0..8966af292c850 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/load_defs.py @@ -17,6 +17,7 @@ DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, build_airflow_polling_sensor_defs, ) +from dagster_airlift.core.sensor.event_translation import DagsterEventTransformerFn from dagster_airlift.core.serialization.compute import compute_serialized_data from dagster_airlift.core.serialization.defs_construction import ( construct_automapped_dag_assets_defs, @@ -57,6 +58,7 @@ def build_defs_from_airflow_instance( airflow_instance: AirflowInstance, defs: Optional[Definitions] = None, sensor_minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, + event_transformer_fn: Optional[DagsterEventTransformerFn] = None, ) -> Definitions: resolved_defs = AirflowInstanceDefsLoader( airflow_instance=airflow_instance, @@ -70,7 +72,7 @@ def build_defs_from_airflow_instance( airflow_instance=airflow_instance, resolved_airflow_defs=resolved_defs ), minimum_interval_seconds=sensor_minimum_interval_seconds, - event_translation_fn=None, + event_transformer_fn=event_transformer_fn, ), ) @@ -117,7 +119,7 @@ def build_full_automapped_dags_from_airflow_instance( airflow_data=AirflowDefinitionsData( resolved_airflow_defs=resolved_defs, airflow_instance=airflow_instance ), - event_translation_fn=None, + event_transformer_fn=None, ), ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/builder.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/builder.py index f67e44ee6d8ec..025afaac04c7c 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/builder.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/builder.py @@ -12,11 +12,18 @@ _check as check, sensor, ) +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_selection import AssetSelection from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.events import AssetObservation from dagster._core.definitions.repository_definition.repository_definition import ( RepositoryDefinition, ) +from dagster._core.errors import ( + DagsterInvariantViolationError, + DagsterUserCodeExecutionError, + user_code_error_boundary, +) from dagster._core.storage.dagster_run import RunsFilter from dagster._grpc.client import DEFAULT_SENSOR_GRPC_TIMEOUT from dagster._record import record @@ -27,12 +34,14 @@ from dagster_airlift.constants import ( AUTOMAPPED_TASK_METADATA_KEY, DAG_RUN_ID_TAG_KEY, + EFFECTIVE_TIMESTAMP_METADATA_KEY, TASK_ID_TAG_KEY, ) from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData from dagster_airlift.core.airflow_instance import AirflowInstance, DagRun, TaskInstance from dagster_airlift.core.sensor.event_translation import ( - AirflowEventTranslationFn, + AssetEvent, + DagsterEventTransformerFn, get_timestamp_from_materialization, materializations_for_dag_run, synthetic_mats_for_mapped_asset_keys, @@ -54,6 +63,10 @@ class AirflowPollingSensorCursor: dag_query_offset: Optional[int] = None +class AirliftSensorEventTransformerError(DagsterUserCodeExecutionError): + """Error raised when an error occurs in the event transformer function.""" + + def check_keys_for_asset_keys( repository_def: RepositoryDefinition, asset_keys: Set[AssetKey] ) -> Iterable[AssetCheckKey]: @@ -65,7 +78,7 @@ def check_keys_for_asset_keys( def build_airflow_polling_sensor_defs( airflow_data: AirflowDefinitionsData, - event_translation_fn: Optional[AirflowEventTranslationFn], + event_transformer_fn: Optional[DagsterEventTransformerFn], minimum_interval_seconds: int = DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS, ) -> Definitions: @sensor( @@ -102,7 +115,6 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: end_date_lte=end_date_lte, offset=current_dag_offset, airflow_data=airflow_data, - event_translation_fn=event_translation_fn, ) all_asset_events: List[AssetMaterialization] = [] all_check_keys: Set[AssetCheckKey] = set() @@ -132,13 +144,20 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: end_date_lte=None, dag_query_offset=0, ) + updated_asset_events = _get_transformer_result( + event_transformer_fn=event_transformer_fn, + context=context, + airflow_data=airflow_data, + all_asset_events=all_asset_events, + ) + context.update_cursor(serialize_value(new_cursor)) context.log.info( - f"************Exitting sensor for {airflow_data.airflow_instance.name}***********" + f"************Exiting sensor for {airflow_data.airflow_instance.name}***********" ) return SensorResult( - asset_events=sorted_asset_events(all_asset_events, repository_def), + asset_events=sorted_asset_events(updated_asset_events, repository_def), run_requests=[RunRequest(asset_check_keys=list(all_check_keys))] if all_check_keys else None, @@ -148,22 +167,51 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: def sorted_asset_events( - all_materializations: Sequence[AssetMaterialization], + asset_events: Sequence[AssetEvent], repository_def: RepositoryDefinition, -) -> List[AssetMaterialization]: +) -> List[AssetEvent]: """Sort materializations by end date and toposort order.""" topo_aks = repository_def.asset_graph.toposorted_asset_keys materializations_and_timestamps = [ - (get_timestamp_from_materialization(mat), mat) for mat in all_materializations + (get_timestamp_from_materialization(mat), mat) for mat in asset_events ] return [ - sorted_mat[1] - for sorted_mat in sorted( + sorted_event[1] + for sorted_event in sorted( materializations_and_timestamps, key=lambda x: (x[0], topo_aks.index(x[1].asset_key)) ) ] +def _get_transformer_result( + event_transformer_fn: Optional[DagsterEventTransformerFn], + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + all_asset_events: Sequence[AssetMaterialization], +) -> Sequence[AssetEvent]: + if not event_transformer_fn: + return all_asset_events + + with user_code_error_boundary( + AirliftSensorEventTransformerError, + lambda: f"Error occurred during event transformation for {airflow_data.airflow_instance.name}", + ): + updated_asset_events = list(event_transformer_fn(context, airflow_data, all_asset_events)) + + for asset_event in updated_asset_events: + if not isinstance( + asset_event, (AssetMaterialization, AssetObservation, AssetCheckEvaluation) + ): + raise DagsterInvariantViolationError( + f"Event transformer function must return AssetMaterialization, AssetObservation, or AssetCheckEvaluation objects. Got {type(asset_event)}." + ) + if EFFECTIVE_TIMESTAMP_METADATA_KEY not in asset_event.metadata: + raise DagsterInvariantViolationError( + f"All returned events must have an effective timestamp, but {asset_event} does not. An effective timestamp can be used by setting dagster_airlift.constants.EFFECTIVE_TIMESTAMP_METADATA_KEY with a dagster.TimestampMetadataValue." + ) + return updated_asset_events + + @record class BatchResult: idx: int @@ -177,7 +225,6 @@ def materializations_and_requests_from_batch_iter( end_date_lte: float, offset: int, airflow_data: AirflowDefinitionsData, - event_translation_fn: Optional[AirflowEventTranslationFn], ) -> Iterator[Optional[BatchResult]]: runs = airflow_data.airflow_instance.get_dag_runs_batch( dag_ids=list(airflow_data.all_dag_ids), @@ -188,7 +235,7 @@ def materializations_and_requests_from_batch_iter( context.log.info(f"Found {len(runs)} dag runs for {airflow_data.airflow_instance.name}") context.log.info(f"All runs {runs}") for i, dag_run in enumerate(runs): - # TODO: add pluggability here (ignoring `event_translation_fn` for now) + # TODO: add pluggability here (ignoring `event_transformer_fn` for now) dag_mats = materializations_for_dag_run(dag_run, airflow_data) synthetic_mats = build_synthetic_asset_materializations( diff --git a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py index baca9d04f7d76..93a65bcbb8a72 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py +++ b/examples/experimental/dagster-airlift/dagster_airlift/core/sensor/event_translation.py @@ -1,12 +1,15 @@ -from typing import AbstractSet, Any, Callable, Iterable, Mapping, Sequence +from typing import AbstractSet, Any, Callable, Iterable, Mapping, Sequence, Union from dagster import ( AssetMaterialization, + AssetObservation, JsonMetadataValue, MarkdownMetadataValue, + SensorEvaluationContext, TimestampMetadataValue, _check as check, ) +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_key import AssetKey from dagster._time import get_current_timestamp @@ -14,14 +17,17 @@ from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData from dagster_airlift.core.airflow_instance import DagRun, TaskInstance -AirflowEventTranslationFn = Callable[ - [DagRun, Sequence[TaskInstance], AirflowDefinitionsData], Iterable[AssetMaterialization] +AssetEvent = Union[AssetMaterialization, AssetObservation, AssetCheckEvaluation] +DagsterEventTransformerFn = Callable[ + [SensorEvaluationContext, AirflowDefinitionsData, Sequence[AssetMaterialization]], + Iterable[AssetEvent], ] -def get_timestamp_from_materialization(mat: AssetMaterialization) -> float: +def get_timestamp_from_materialization(event: AssetEvent) -> float: return check.float_param( - mat.metadata[EFFECTIVE_TIMESTAMP_METADATA_KEY].value, "Materialization Effective Timestamp" + event.metadata[EFFECTIVE_TIMESTAMP_METADATA_KEY].value, + "Materialization Effective Timestamp", ) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py index d9bab97bdd255..d80698e85afea 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py @@ -1,6 +1,6 @@ from collections import defaultdict from datetime import datetime, timedelta -from typing import Dict, Generator, List, Sequence, Tuple, Union +from typing import Dict, Generator, List, Optional, Sequence, Tuple, Union import pytest from dagster import ( @@ -28,6 +28,7 @@ from dagster_airlift.core import ( build_defs_from_airflow_instance as build_defs_from_airflow_instance, ) +from dagster_airlift.core.sensor.event_translation import DagsterEventTransformerFn from dagster_airlift.core.utils import metadata_for_task_mapping from dagster_airlift.test import make_dag_run, make_instance @@ -40,9 +41,13 @@ def fully_loaded_repo_from_airflow_asset_graph( assets_per_task: Dict[str, Dict[str, List[Tuple[str, List[str]]]]], additional_defs: Definitions = Definitions(), create_runs: bool = True, + event_transformer_fn: Optional[DagsterEventTransformerFn] = None, ) -> RepositoryDefinition: defs = load_definitions_airflow_asset_graph( - assets_per_task, additional_defs=additional_defs, create_runs=create_runs + assets_per_task, + additional_defs=additional_defs, + create_runs=create_runs, + event_transformer_fn=event_transformer_fn, ) repo_def = defs.get_repository_def() repo_def.load_all_definitions() @@ -54,6 +59,7 @@ def load_definitions_airflow_asset_graph( additional_defs: Definitions = Definitions(), create_runs: bool = True, create_assets_defs: bool = True, + event_transformer_fn: Optional[DagsterEventTransformerFn] = None, ) -> Definitions: assets = [] dag_and_task_structure = defaultdict(list) @@ -96,7 +102,9 @@ def _asset(): additional_defs, Definitions(assets=assets), ) - return build_defs_from_airflow_instance(airflow_instance=instance, defs=defs) + return build_defs_from_airflow_instance( + airflow_instance=instance, defs=defs, event_transformer_fn=event_transformer_fn + ) def build_and_invoke_sensor( @@ -104,9 +112,10 @@ def build_and_invoke_sensor( assets_per_task: Dict[str, Dict[str, List[Tuple[str, List[str]]]]], instance: DagsterInstance, additional_defs: Definitions = Definitions(), + event_transformer_fn: Optional[DagsterEventTransformerFn] = None, ) -> Tuple[SensorResult, SensorEvaluationContext]: repo_def = fully_loaded_repo_from_airflow_asset_graph( - assets_per_task, additional_defs=additional_defs + assets_per_task, additional_defs=additional_defs, event_transformer_fn=event_transformer_fn ) sensor = next(iter(repo_def.sensor_defs)) sensor_context = build_sensor_context(repository_def=repo_def, instance=instance) diff --git a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py index 6611b0f199779..ffa0acf1e48aa 100644 --- a/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py +++ b/examples/experimental/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_sensor.py @@ -1,6 +1,8 @@ from datetime import datetime, timedelta, timezone +from typing import Sequence import mock +import pytest from dagster import ( AssetCheckKey, AssetKey, @@ -16,14 +18,24 @@ from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.events import AssetMaterialization from dagster._core.definitions.materialize import materialize +from dagster._core.definitions.metadata.metadata_value import TimestampMetadataValue +from dagster._core.definitions.sensor_definition import SensorEvaluationContext +from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult from dagster._core.test_utils import freeze_time from dagster._serdes import deserialize_value from dagster._time import get_current_datetime -from dagster_airlift.constants import DAG_ID_TAG_KEY, DAG_RUN_ID_TAG_KEY, TASK_ID_TAG_KEY +from dagster_airlift.constants import ( + DAG_ID_TAG_KEY, + DAG_RUN_ID_TAG_KEY, + EFFECTIVE_TIMESTAMP_METADATA_KEY, + TASK_ID_TAG_KEY, +) from dagster_airlift.core import dag_defs, task_defs +from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData from dagster_airlift.core.load_defs import build_full_automapped_dags_from_airflow_instance from dagster_airlift.core.sensor import AirflowPollingSensorCursor +from dagster_airlift.core.sensor.builder import AirliftSensorEventTransformerError from dagster_airlift.core.serialization.defs_construction import ( key_for_automapped_task_asset, make_default_dag_asset_key, @@ -581,3 +593,113 @@ def simulate_materialize_from_proxy_operator( TASK_ID_TAG_KEY: task_id, } return materialize(assets=[assets_def], instance=instance, tags=tags) + + +def test_pluggable_transformation(init_load_context: None, instance: DagsterInstance) -> None: + """Test the case where a custom transformation is provided to the sensor.""" + + def pluggable_event_transformer( + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + events: Sequence[AssetMaterialization], + ) -> Sequence[AssetMaterialization]: + assert isinstance(context, SensorEvaluationContext) + assert isinstance(airflow_data, AirflowDefinitionsData) + # Change the timestamp, which should also change the order. We expect this to be respected by the sensor. + new_events = [] + for event in events: + if AssetKey(["a"]) == event.asset_key: + new_events.append( + event._replace( + metadata={ + "test": "test", + EFFECTIVE_TIMESTAMP_METADATA_KEY: TimestampMetadataValue(1.0), + } + ) + ) + elif AssetKey.from_user_string(make_dag_key_str("dag")) == event.asset_key: + new_events.append( + event._replace( + metadata={ + "test": "test", + EFFECTIVE_TIMESTAMP_METADATA_KEY: TimestampMetadataValue(0.0), + } + ) + ) + return new_events + + result, context = build_and_invoke_sensor( + assets_per_task={ + "dag": {"task": [("a", [])]}, + }, + instance=instance, + event_transformer_fn=pluggable_event_transformer, + ) + assert len(result.asset_events) == 2 + assert_expected_key_order(result.asset_events, [make_dag_key_str("dag"), "a"]) + for event in result.asset_events: + assert set(event.metadata.keys()) == {"test", EFFECTIVE_TIMESTAMP_METADATA_KEY} + + +def test_user_code_error_pluggable_transformation( + init_load_context: None, instance: DagsterInstance +) -> None: + """Test the case where a custom transformation is provided to the sensor, and the user code raises an error.""" + + def pluggable_event_transformer( + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + events: Sequence[AssetMaterialization], + ) -> Sequence[AssetMaterialization]: + raise ValueError("User code error") + + with pytest.raises(AirliftSensorEventTransformerError): + build_and_invoke_sensor( + assets_per_task={ + "dag": {"task": [("a", [])]}, + }, + instance=instance, + event_transformer_fn=pluggable_event_transformer, + ) + + +def test_missing_effective_timestamp_pluggable_impl( + init_load_context: None, instance: DagsterInstance +) -> None: + """Test the case where a custom transformation is provided to the sensor, and the user doesn't include effective timestamp metadata.""" + + def missing_effective_timestamp( + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + events: Sequence[AssetMaterialization], + ) -> Sequence[AssetMaterialization]: + return [event._replace(metadata={}) for event in events] + + with pytest.raises(DagsterInvariantViolationError): + build_and_invoke_sensor( + assets_per_task={ + "dag": {"task": [("a", [])]}, + }, + instance=instance, + event_transformer_fn=missing_effective_timestamp, + ) + + +def test_nonsense_result_pluggable_impl(init_load_context: None, instance: DagsterInstance) -> None: + """Test the case where a nonsense result is returned from the custom transformation.""" + + def nonsense_result( + context: SensorEvaluationContext, + airflow_data: AirflowDefinitionsData, + events: Sequence[AssetMaterialization], + ) -> Sequence[AssetMaterialization]: + return [1, 2, 3] # type: ignore # intentionally wrong + + with pytest.raises(DagsterInvariantViolationError): + build_and_invoke_sensor( + assets_per_task={ + "dag": {"task": [("a", [])]}, + }, + instance=instance, + event_transformer_fn=nonsense_result, + )