Skip to content

Commit

Permalink
feat(airflow) Override datajob external_url (#9681)
Browse files Browse the repository at this point in the history
Co-authored-by: Peng Gao <[email protected]>
  • Loading branch information
gp1105739 and Peng Gao authored Feb 7, 2024
1 parent e6d7066 commit ea0ae8c
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import TYPE_CHECKING, Optional

import datahub.emitter.mce_builder as builder
Expand All @@ -8,6 +9,11 @@
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook


class DatajobUrl(Enum):
GRID = "grid"
TASKINSTANCE = "taskinstance"


class DatahubLineageConfig(ConfigModel):
# This class is shared between the lineage backend and the Airflow plugin.
# The defaults listed here are only relevant for the lineage backend.
Expand Down Expand Up @@ -41,6 +47,8 @@ class DatahubLineageConfig(ConfigModel):
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand All @@ -65,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)

return DatahubLineageConfig(
enabled=enabled,
Expand All @@ -77,4 +88,5 @@ def get_lineage_config() -> DatahubLineageConfig:
log_level=log_level,
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl

assert AIRFLOW_PATCHED

Expand Down Expand Up @@ -208,6 +209,7 @@ def generate_datajob(
set_dependencies: bool = True,
capture_owner: bool = True,
capture_tags: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataJob:
"""
Expand All @@ -217,6 +219,7 @@ def generate_datajob(
:param set_dependencies: bool - whether to extract dependencies from airflow task
:param capture_owner: bool - whether to extract owner from airflow task
:param capture_tags: bool - whether to set tags automatically from airflow task
:param config: DatahubLineageConfig
:return: DataJob - returns the generated DataJob object
"""
dataflow_urn = DataFlowUrn.create_from_ids(
Expand Down Expand Up @@ -267,7 +270,11 @@ def generate_datajob(

datajob.properties = job_property_bag
base_url = conf.get("webserver", "base_url")
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if config and config.datajob_url_link == DatajobUrl.GRID:
datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}"
else:
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
Expand All @@ -290,9 +297,12 @@ def create_datajob_instance(
task: "Operator",
dag: "DAG",
data_job: Optional[DataJob] = None,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if data_job is None:
data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag)
data_job = AirflowGenerator.generate_datajob(
cluster, task=task, dag=dag, config=config
)
dpi = DataProcessInstance.from_datajob(
datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True
)
Expand Down Expand Up @@ -407,9 +417,12 @@ def run_datajob(
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
Expand Down Expand Up @@ -480,6 +493,7 @@ def complete_datajob(
end_timestamp_millis: Optional[int] = None,
result: Optional[InstanceRunResult] = None,
datajob: Optional[DataJob] = None,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
"""
Expand All @@ -491,10 +505,13 @@ def complete_datajob(
:param end_timestamp_millis: Optional[int]
:param result: Optional[str] One of the result from datahub.metadata.schema_class.RunResultTypeClass
:param datajob: Optional[DataJob]
:param config: Optional[DatahubLineageConfig]
:return: DataProcessInstance
"""
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)

if end_timestamp_millis is None:
if ti.end_date:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def on_task_instance_running(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# TODO: Make use of get_task_location to extract github urls.
Expand All @@ -397,6 +398,7 @@ def on_task_instance_running(
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand All @@ -419,6 +421,7 @@ def on_task_instance_finish(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# Add lineage info.
Expand All @@ -436,6 +439,7 @@ def on_task_instance_finish(
dag_run=dagrun,
datajob=datajob,
result=status,
config=self.config,
)
logger.debug(
f"Emitted DataHub DataProcess Instance with status {status}: {dpi}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def datahub_task_status_callback(context, status):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -143,6 +144,7 @@ def datahub_task_status_callback(context, status):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -185,6 +187,7 @@ def datahub_pre_execution(context):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -208,6 +211,7 @@ def datahub_pre_execution(context):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def send_lineage_to_datahub(
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets]))
datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ def send_lineage(
try:
context = context or {} # ensure not None to satisfy mypy
send_lineage_to_datahub(
config, operator, operator.inlets, operator.outlets, context
config,
operator,
operator.inlets,
operator.outlets,
context,
)
except Exception as e:
operator.log.error(e)

0 comments on commit ea0ae8c

Please sign in to comment.