Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow) Override datajob external_url #9681

Merged
merged 9 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import TYPE_CHECKING, Optional

import datahub.emitter.mce_builder as builder
Expand All @@ -8,6 +9,11 @@
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook


class DatajobUrl(Enum):
GRID = "grid"
TASKINSTANCE = "taskinstance"


class DatahubLineageConfig(ConfigModel):
# This class is shared between the lineage backend and the Airflow plugin.
# The defaults listed here are only relevant for the lineage backend.
Expand Down Expand Up @@ -41,6 +47,8 @@ class DatahubLineageConfig(ConfigModel):
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand All @@ -65,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)

return DatahubLineageConfig(
enabled=enabled,
Expand All @@ -77,4 +88,5 @@ def get_lineage_config() -> DatahubLineageConfig:
log_level=log_level,
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl

assert AIRFLOW_PATCHED

Expand Down Expand Up @@ -208,6 +209,7 @@ def generate_datajob(
set_dependencies: bool = True,
capture_owner: bool = True,
capture_tags: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataJob:
"""
Expand All @@ -217,6 +219,7 @@ def generate_datajob(
:param set_dependencies: bool - whether to extract dependencies from airflow task
:param capture_owner: bool - whether to extract owner from airflow task
:param capture_tags: bool - whether to set tags automatically from airflow task
:param config: DatahubLineageConfig
:return: DataJob - returns the generated DataJob object
"""
dataflow_urn = DataFlowUrn.create_from_ids(
Expand Down Expand Up @@ -267,7 +270,11 @@ def generate_datajob(

datajob.properties = job_property_bag
base_url = conf.get("webserver", "base_url")
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if config and config.datajob_url_link == DatajobUrl.GRID:
datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}"
else:
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
Expand All @@ -290,9 +297,12 @@ def create_datajob_instance(
task: "Operator",
dag: "DAG",
data_job: Optional[DataJob] = None,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if data_job is None:
data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag)
data_job = AirflowGenerator.generate_datajob(
cluster, task=task, dag=dag, config=config
)
dpi = DataProcessInstance.from_datajob(
datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True
)
Expand Down Expand Up @@ -407,9 +417,12 @@ def run_datajob(
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
Expand Down Expand Up @@ -480,6 +493,7 @@ def complete_datajob(
end_timestamp_millis: Optional[int] = None,
result: Optional[InstanceRunResult] = None,
datajob: Optional[DataJob] = None,
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
"""
Expand All @@ -491,10 +505,13 @@ def complete_datajob(
:param end_timestamp_millis: Optional[int]
:param result: Optional[str] One of the result from datahub.metadata.schema_class.RunResultTypeClass
:param datajob: Optional[DataJob]
:param config: Optional[DatahubLineageConfig]
:return: DataProcessInstance
"""
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)

if end_timestamp_millis is None:
if ti.end_date:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def on_task_instance_running(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# TODO: Make use of get_task_location to extract github urls.
Expand All @@ -397,6 +398,7 @@ def on_task_instance_running(
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand All @@ -419,6 +421,7 @@ def on_task_instance_finish(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# Add lineage info.
Expand All @@ -436,6 +439,7 @@ def on_task_instance_finish(
dag_run=dagrun,
datajob=datajob,
result=status,
config=self.config,
)
logger.debug(
f"Emitted DataHub DataProcess Instance with status {status}: {dpi}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def datahub_task_status_callback(context, status):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -143,6 +144,7 @@ def datahub_task_status_callback(context, status):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -185,6 +187,7 @@ def datahub_pre_execution(context):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -208,6 +211,7 @@ def datahub_pre_execution(context):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def send_lineage_to_datahub(
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets]))
datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ def send_lineage(
try:
context = context or {} # ensure not None to satisfy mypy
send_lineage_to_datahub(
config, operator, operator.inlets, operator.outlets, context
config,
operator,
operator.inlets,
operator.outlets,
context,
)
except Exception as e:
operator.log.error(e)
Loading