diff --git a/providers/src/airflow/providers/openlineage/plugins/adapter.py b/providers/src/airflow/providers/openlineage/plugins/adapter.py index fb58cc5dc022..199df880e79a 100644 --- a/providers/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/src/airflow/providers/openlineage/plugins/adapter.py @@ -32,7 +32,6 @@ nominal_time_run, ownership_job, parent_run, - processing_engine_run, source_code_location_job, ) from openlineage.client.uuid import generate_static_uuid @@ -42,6 +41,7 @@ OpenLineageRedactor, get_airflow_debug_facet, get_airflow_state_run_facet, + get_processing_engine_facet, ) from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin @@ -195,18 +195,10 @@ def start_task( :param task: metadata container with information extracted from operator :param run_facets: custom run facets """ - from airflow.version import version as AIRFLOW_VERSION - - processing_engine_version_facet = processing_engine_run.ProcessingEngineRunFacet( - version=AIRFLOW_VERSION, - name="Airflow", - openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION, - ) - run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} - run_facets["processing_engine"] = processing_engine_version_facet # type: ignore + run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore event = RunEvent( eventType=RunState.START, eventTime=event_time, @@ -362,7 +354,7 @@ def dag_started( job_name=dag_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, - run_facets={**run_facets, **get_airflow_debug_facet()}, + run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()}, ), inputs=[], outputs=[], diff --git a/providers/src/airflow/providers/openlineage/utils/utils.py b/providers/src/airflow/providers/openlineage/utils/utils.py index 8c67c32f95b8..99faa3c4d5ce 100644 --- a/providers/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/src/airflow/providers/openlineage/utils/utils.py @@ -38,7 +38,7 @@ # TODO: move this maybe to Airflow's logic? from airflow.models import DAG, BaseOperator, DagRun, MappedOperator from airflow.providers.common.compat.assets import Asset -from airflow.providers.openlineage import conf +from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.plugins.facets import ( AirflowDagRunFacet, AirflowDebugRunFacet, @@ -65,7 +65,7 @@ if TYPE_CHECKING: from openlineage.client.event_v2 import Dataset as OpenLineageDataset - from openlineage.client.facet_v2 import RunFacet + from openlineage.client.facet_v2 import RunFacet, processing_engine_run from airflow.models import TaskInstance from airflow.utils.state import DagRunState, TaskInstanceState @@ -428,6 +428,18 @@ def _get_all_packages_installed() -> dict[str, str]: return {dist.metadata["Name"]: dist.version for dist in metadata.distributions()} +def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]: + from openlineage.client.facet_v2 import processing_engine_run + + return { + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=AIRFLOW_VERSION, + name="Airflow", + openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION, + ) + } + + def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]: if not conf.debug_mode(): return {} diff --git a/providers/tests/openlineage/plugins/test_adapter.py b/providers/tests/openlineage/plugins/test_adapter.py index f0928dd70db0..73145f9e4b1c 100644 --- a/providers/tests/openlineage/plugins/test_adapter.py +++ b/providers/tests/openlineage/plugins/test_adapter.py @@ -606,6 +606,9 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat nominalStartTime=event_time.isoformat(), nominalEndTime=event_time.isoformat(), ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), "airflowDagRun": AirflowDagRunFacet( dag=expected_dag_info, dagRun={ diff --git a/providers/tests/openlineage/plugins/test_utils.py b/providers/tests/openlineage/plugins/test_utils.py index 22b80120bb6a..e84fac118657 100644 --- a/providers/tests/openlineage/plugins/test_utils.py +++ b/providers/tests/openlineage/plugins/test_utils.py @@ -40,6 +40,7 @@ get_airflow_debug_facet, get_airflow_run_facet, get_fully_qualified_class_name, + get_processing_engine_facet, is_operator_disabled, ) from airflow.serialization.enums import DagAttributeTypes @@ -438,3 +439,19 @@ def test_serialize_timetable_2_8(): ], } } + + +@pytest.mark.parametrize( + ("airflow_version", "ol_version"), + [ + ("2.9.3", "1.12.2"), + ("2.10.1", "1.13.0"), + ("3.0.0", "1.14.0"), + ], +) +def test_get_processing_engine_facet(airflow_version, ol_version): + with patch("airflow.providers.openlineage.utils.utils.AIRFLOW_VERSION", airflow_version): + with patch("airflow.providers.openlineage.utils.utils.OPENLINEAGE_PROVIDER_VERSION", ol_version): + result = get_processing_engine_facet() + assert result["processing_engine"].version == airflow_version + assert result["processing_engine"].openlineageAdapterVersion == ol_version