Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow) Override datajob external_url #9681

Merged
merged 9 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import TYPE_CHECKING, Optional

import datahub.emitter.mce_builder as builder
Expand All @@ -8,6 +9,11 @@
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook


class DatajobUrl(Enum):
GRID = "grid"
TASKINSTANCE = "taskinstance"


class DatahubLineageConfig(ConfigModel):
# This class is shared between the lineage backend and the Airflow plugin.
# The defaults listed here are only relevant for the lineage backend.
Expand Down Expand Up @@ -41,6 +47,8 @@ class DatahubLineageConfig(ConfigModel):
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True

datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE

def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand All @@ -65,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)

return DatahubLineageConfig(
enabled=enabled,
Expand All @@ -77,4 +88,5 @@ def get_lineage_config() -> DatahubLineageConfig:
log_level=log_level,
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast, Any

from airflow.configuration import conf
from datahub.api.entities.datajob import DataFlow, DataJob
Expand All @@ -13,6 +13,7 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn

from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl

assert AIRFLOW_PATCHED

Expand Down Expand Up @@ -208,6 +209,7 @@ def generate_datajob(
set_dependencies: bool = True,
capture_owner: bool = True,
capture_tags: bool = True,
**kwargs: Any,
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
) -> DataJob:
"""

Expand Down Expand Up @@ -267,7 +269,14 @@ def generate_datajob(

datajob.properties = job_property_bag
base_url = conf.get("webserver", "base_url")
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if (
kwargs.get("config")
and kwargs.get("config").datajob_url_link == DatajobUrl.GRID
):
datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}"
else:
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"

if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
Expand All @@ -290,9 +299,12 @@ def create_datajob_instance(
task: "Operator",
dag: "DAG",
data_job: Optional[DataJob] = None,
**kwargs: Any,
) -> DataProcessInstance:
if data_job is None:
data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag)
data_job = AirflowGenerator.generate_datajob(
cluster, task=task, dag=dag, **kwargs
)
dpi = DataProcessInstance.from_datajob(
datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True
)
Expand Down Expand Up @@ -407,9 +419,10 @@ def run_datajob(
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
**kwargs: Any,
) -> DataProcessInstance:
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs)

assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
Expand Down Expand Up @@ -480,6 +493,7 @@ def complete_datajob(
end_timestamp_millis: Optional[int] = None,
result: Optional[InstanceRunResult] = None,
datajob: Optional[DataJob] = None,
**kwargs: Any,
) -> DataProcessInstance:
"""

Expand All @@ -494,7 +508,7 @@ def complete_datajob(
:return: DataProcessInstance
"""
if datajob is None:
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs)

if end_timestamp_millis is None:
if ti.end_date:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def on_task_instance_running(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# TODO: Make use of get_task_location to extract github urls.
Expand All @@ -397,6 +398,7 @@ def on_task_instance_running(
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")

Expand All @@ -419,6 +421,7 @@ def on_task_instance_finish(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
config=self.config,
)

# Add lineage info.
Expand All @@ -436,6 +439,7 @@ def on_task_instance_finish(
dag_run=dagrun,
datajob=datajob,
result=status,
config=self.config,
)
logger.debug(
f"Emitted DataHub DataProcess Instance with status {status}: {dpi}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def datahub_task_status_callback(context, status):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -143,6 +144,7 @@ def datahub_task_status_callback(context, status):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
Expand Down Expand Up @@ -185,6 +187,7 @@ def datahub_pre_execution(context):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
Expand All @@ -208,6 +211,7 @@ def datahub_pre_execution(context):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
config=config,
)

task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List
from typing import TYPE_CHECKING, Dict, List, Any

from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult

Expand Down Expand Up @@ -51,6 +51,7 @@ def send_lineage_to_datahub(
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
config=config,
)
datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets]))
datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ def send_lineage(
try:
context = context or {} # ensure not None to satisfy mypy
send_lineage_to_datahub(
config, operator, operator.inlets, operator.outlets, context
config,
operator,
operator.inlets,
operator.outlets,
context,
config=config
gp1105739 marked this conversation as resolved.
Show resolved Hide resolved
)
except Exception as e:
operator.log.error(e)