Skip to content

Commit

Permalink
add ProcessingEngineRunFacet to OL DAG Start event (#43213)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski authored Nov 14, 2024
1 parent 7728139 commit f60886c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 13 deletions.
14 changes: 3 additions & 11 deletions providers/src/airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
nominal_time_run,
ownership_job,
parent_run,
processing_engine_run,
source_code_location_job,
)
from openlineage.client.uuid import generate_static_uuid
Expand All @@ -42,6 +41,7 @@
OpenLineageRedactor,
get_airflow_debug_facet,
get_airflow_state_run_facet,
get_processing_engine_facet,
)
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -195,18 +195,10 @@ def start_task(
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
from airflow.version import version as AIRFLOW_VERSION

processing_engine_version_facet = processing_engine_run.ProcessingEngineRunFacet(
version=AIRFLOW_VERSION,
name="Airflow",
openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
)

run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets["processing_engine"] = processing_engine_version_facet # type: ignore
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
Expand Down Expand Up @@ -362,7 +354,7 @@ def dag_started(
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**run_facets, **get_airflow_debug_facet()},
run_facets={**run_facets, **get_airflow_debug_facet(), **get_processing_engine_facet()},
),
inputs=[],
outputs=[],
Expand Down
16 changes: 14 additions & 2 deletions providers/src/airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
# TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.common.compat.assets import Asset
from airflow.providers.openlineage import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
AirflowDebugRunFacet,
Expand All @@ -65,7 +65,7 @@

if TYPE_CHECKING:
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet
from openlineage.client.facet_v2 import RunFacet, processing_engine_run

from airflow.models import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
Expand Down Expand Up @@ -428,6 +428,18 @@ def _get_all_packages_installed() -> dict[str, str]:
return {dist.metadata["Name"]: dist.version for dist in metadata.distributions()}


def get_processing_engine_facet() -> dict[str, processing_engine_run.ProcessingEngineRunFacet]:
from openlineage.client.facet_v2 import processing_engine_run

return {
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=AIRFLOW_VERSION,
name="Airflow",
openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
)
}


def get_airflow_debug_facet() -> dict[str, AirflowDebugRunFacet]:
if not conf.debug_mode():
return {}
Expand Down
3 changes: 3 additions & 0 deletions providers/tests/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat
nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat(),
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"airflowDagRun": AirflowDagRunFacet(
dag=expected_dag_info,
dagRun={
Expand Down
17 changes: 17 additions & 0 deletions providers/tests/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
get_airflow_debug_facet,
get_airflow_run_facet,
get_fully_qualified_class_name,
get_processing_engine_facet,
is_operator_disabled,
)
from airflow.serialization.enums import DagAttributeTypes
Expand Down Expand Up @@ -438,3 +439,19 @@ def test_serialize_timetable_2_8():
],
}
}


@pytest.mark.parametrize(
("airflow_version", "ol_version"),
[
("2.9.3", "1.12.2"),
("2.10.1", "1.13.0"),
("3.0.0", "1.14.0"),
],
)
def test_get_processing_engine_facet(airflow_version, ol_version):
with patch("airflow.providers.openlineage.utils.utils.AIRFLOW_VERSION", airflow_version):
with patch("airflow.providers.openlineage.utils.utils.OPENLINEAGE_PROVIDER_VERSION", ol_version):
result = get_processing_engine_facet()
assert result["processing_engine"].version == airflow_version
assert result["processing_engine"].openlineageAdapterVersion == ol_version

0 comments on commit f60886c

Please sign in to comment.