From f5697d3cfd8fc367bcd2b69f0107916948a48a88 Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Sun, 21 Jan 2024 15:48:20 -0800 Subject: [PATCH 1/8] feat(airflow) Override datajob external_url --- docs/lineage/airflow.md | 2 ++ .../src/datahub_airflow_plugin/_config.py | 10 ++++++++++ .../client/airflow_generator.py | 20 ++++++++++++++----- .../datahub_listener.py | 4 ++++ .../datahub_plugin_v22.py | 4 ++++ .../lineage/_lineage_core.py | 3 ++- .../datahub_airflow_plugin/lineage/datahub.py | 2 +- 7 files changed, 38 insertions(+), 7 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index da3a36bc87be5..94eb69a2ed827 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -135,6 +135,8 @@ conn_id = datahub_rest_default # or datahub_kafka_default | capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. | | capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | +| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. + | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 67843da2ba995..9ba873121667c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -1,3 +1,4 @@ +from enum import Enum from typing import TYPE_CHECKING, Optional import datahub.emitter.mce_builder as builder @@ -8,6 +9,11 @@ from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook +class DatajobUrl(Enum): + GRID = "grid" + TASKINSTANCE = "taskinstance" + + class DatahubLineageConfig(ConfigModel): # This class is shared between the lineage backend and the Airflow plugin. # The defaults listed here are only relevant for the lineage backend. @@ -41,6 +47,8 @@ class DatahubLineageConfig(ConfigModel): # The Airflow plugin behaves as if it were set to True. graceful_exceptions: bool = True + datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE + def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -65,6 +73,7 @@ def get_lineage_config() -> DatahubLineageConfig: disable_openlineage_plugin = conf.get( "datahub", "disable_openlineage_plugin", fallback=True ) + datajob_url_link = conf.get("datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value) return DatahubLineageConfig( enabled=enabled, @@ -77,4 +86,5 @@ def get_lineage_config() -> DatahubLineageConfig: log_level=log_level, debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, + datajob_url_link=datajob_url_link, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index e1d53be7bae6b..aa908e16818c6 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast, Any from airflow.configuration import conf from datahub.api.entities.datajob import DataFlow, DataJob @@ -13,6 +13,7 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED +from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl assert AIRFLOW_PATCHED @@ -208,6 +209,7 @@ def generate_datajob( set_dependencies: bool = True, capture_owner: bool = True, capture_tags: bool = True, + **kwargs: Any, ) -> DataJob: """ @@ -267,7 +269,11 @@ def generate_datajob( datajob.properties = job_property_bag base_url = conf.get("webserver", "base_url") - datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}" + + if kwargs.get("config") and kwargs.get("config").datajob_url_link == DatajobUrl.GRID: + datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" + else: + datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}" if capture_owner and dag.owner: datajob.owners.add(dag.owner) @@ -290,9 +296,10 @@ def create_datajob_instance( task: "Operator", dag: "DAG", data_job: Optional[DataJob] = None, + **kwargs: Any, ) -> DataProcessInstance: if data_job is None: - data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag) + data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag, **kwargs) dpi = DataProcessInstance.from_datajob( datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True ) @@ -407,9 +414,10 @@ def run_datajob( datajob: Optional[DataJob] = None, attempt: Optional[int] = None, emit_templates: bool = True, + **kwargs: Any, ) -> DataProcessInstance: if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) + datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs) assert dag_run.run_id dpi = DataProcessInstance.from_datajob( @@ -480,6 +488,8 @@ def complete_datajob( end_timestamp_millis: Optional[int] = None, result: Optional[InstanceRunResult] = None, datajob: Optional[DataJob] = None, + **kwargs: Any, + ) -> DataProcessInstance: """ @@ -494,7 +504,7 @@ def complete_datajob( :return: DataProcessInstance """ if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) + datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs) if end_timestamp_millis is None: if ti.end_date: diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index a7f588a166dde..b89504cf8b799 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -376,6 +376,7 @@ def on_task_instance_running( dag=dag, capture_tags=self.config.capture_tags_info, capture_owner=self.config.capture_ownership_info, + config=self.config, ) # TODO: Make use of get_task_location to extract github urls. @@ -397,6 +398,7 @@ def on_task_instance_running( dag_run=dagrun, datajob=datajob, emit_templates=False, + config=self.config ) logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") @@ -419,6 +421,7 @@ def on_task_instance_finish( dag=dag, capture_tags=self.config.capture_tags_info, capture_owner=self.config.capture_ownership_info, + config=self.config, ) # Add lineage info. @@ -436,6 +439,7 @@ def on_task_instance_finish( dag_run=dagrun, datajob=datajob, result=status, + config=self.config, ) logger.debug( f"Emitted DataHub DataProcess Instance with status {status}: {dpi}" diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index 51a4151bc8207..e9490e625e257 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -120,6 +120,7 @@ def datahub_task_status_callback(context, status): dag=dag, capture_tags=config.capture_tags_info, capture_owner=config.capture_ownership_info, + config=config, ) datajob.inlets.extend( entities_to_dataset_urn_list([let.urn for let in task_inlets]) @@ -143,6 +144,7 @@ def datahub_task_status_callback(context, status): dag_run=context["dag_run"], datajob=datajob, start_timestamp_millis=int(ti.start_date.timestamp() * 1000), + config=config, ) task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}") @@ -185,6 +187,7 @@ def datahub_pre_execution(context): dag=dag, capture_tags=config.capture_tags_info, capture_owner=config.capture_ownership_info, + config=config, ) datajob.inlets.extend( entities_to_dataset_urn_list([let.urn for let in task_inlets]) @@ -208,6 +211,7 @@ def datahub_pre_execution(context): dag_run=context["dag_run"], datajob=datajob, start_timestamp_millis=int(ti.start_date.timestamp() * 1000), + config=config ) task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}") diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py index 75fc79443e49e..c93753fad4a71 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List +from typing import TYPE_CHECKING, Dict, List, Any from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult @@ -51,6 +51,7 @@ def send_lineage_to_datahub( dag=dag, capture_tags=config.capture_tags_info, capture_owner=config.capture_ownership_info, + config=config, ) datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets])) datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets])) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py index 3ebe7831d08f9..86ff39e5aa5e2 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py @@ -71,7 +71,7 @@ def send_lineage( try: context = context or {} # ensure not None to satisfy mypy send_lineage_to_datahub( - config, operator, operator.inlets, operator.outlets, context + config, operator, operator.inlets, operator.outlets, context, config=config ) except Exception as e: operator.log.error(e) From dfa87c9d0626ac38500462dcfa7f71eab86c36ae Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Thu, 25 Jan 2024 13:59:32 -0800 Subject: [PATCH 2/8] format --- .../src/datahub_airflow_plugin/_config.py | 4 +++- .../datahub_airflow_plugin/client/airflow_generator.py | 10 +++++++--- .../src/datahub_airflow_plugin/datahub_listener.py | 2 +- .../src/datahub_airflow_plugin/datahub_plugin_v22.py | 2 +- .../src/datahub_airflow_plugin/lineage/datahub.py | 7 ++++++- 5 files changed, 18 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 9ba873121667c..48d462b85702a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -73,7 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig: disable_openlineage_plugin = conf.get( "datahub", "disable_openlineage_plugin", fallback=True ) - datajob_url_link = conf.get("datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value) + datajob_url_link = conf.get( + "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value + ) return DatahubLineageConfig( enabled=enabled, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index aa908e16818c6..a932d73a77a9c 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -270,7 +270,10 @@ def generate_datajob( datajob.properties = job_property_bag base_url = conf.get("webserver", "base_url") - if kwargs.get("config") and kwargs.get("config").datajob_url_link == DatajobUrl.GRID: + if ( + kwargs.get("config") + and kwargs.get("config").datajob_url_link == DatajobUrl.GRID + ): datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" else: datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}" @@ -299,7 +302,9 @@ def create_datajob_instance( **kwargs: Any, ) -> DataProcessInstance: if data_job is None: - data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag, **kwargs) + data_job = AirflowGenerator.generate_datajob( + cluster, task=task, dag=dag, **kwargs + ) dpi = DataProcessInstance.from_datajob( datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True ) @@ -489,7 +494,6 @@ def complete_datajob( result: Optional[InstanceRunResult] = None, datajob: Optional[DataJob] = None, **kwargs: Any, - ) -> DataProcessInstance: """ diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index b89504cf8b799..475f3791bc0c8 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -398,7 +398,7 @@ def on_task_instance_running( dag_run=dagrun, datajob=datajob, emit_templates=False, - config=self.config + config=self.config, ) logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}") diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py index e9490e625e257..7b8d719712d10 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py @@ -211,7 +211,7 @@ def datahub_pre_execution(context): dag_run=context["dag_run"], datajob=datajob, start_timestamp_millis=int(ti.start_date.timestamp() * 1000), - config=config + config=config, ) task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}") diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py index 86ff39e5aa5e2..ce533e2d47afa 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py @@ -71,7 +71,12 @@ def send_lineage( try: context = context or {} # ensure not None to satisfy mypy send_lineage_to_datahub( - config, operator, operator.inlets, operator.outlets, context, config=config + config, + operator, + operator.inlets, + operator.outlets, + context, + config=config ) except Exception as e: operator.log.error(e) From e981ceb8bc8d9dea140ae56a18444c88ade5ed0c Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Thu, 25 Jan 2024 16:04:06 -0800 Subject: [PATCH 3/8] fix --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 2 +- .../src/datahub_airflow_plugin/lineage/datahub.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index a932d73a77a9c..a14af1634f25a 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -13,7 +13,7 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED -from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl +from datahub_airflow_plugin._config import DatajobUrl assert AIRFLOW_PATCHED diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py index ce533e2d47afa..6f81812ea766e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py @@ -76,7 +76,6 @@ def send_lineage( operator.inlets, operator.outlets, context, - config=config ) except Exception as e: operator.log.error(e) From 670e45b725ecc6fc7b46695a497a9c3ba0545a12 Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Thu, 25 Jan 2024 16:17:57 -0800 Subject: [PATCH 4/8] Update --- .../client/airflow_generator.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index a14af1634f25a..c697d42203fdf 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -13,7 +13,7 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED -from datahub_airflow_plugin._config import DatajobUrl +from datahub_airflow_plugin._config import DatajobUrl, DatahubLineageConfig assert AIRFLOW_PATCHED @@ -209,7 +209,7 @@ def generate_datajob( set_dependencies: bool = True, capture_owner: bool = True, capture_tags: bool = True, - **kwargs: Any, + config: DatahubLineageConfig = None, ) -> DataJob: """ @@ -219,6 +219,7 @@ def generate_datajob( :param set_dependencies: bool - whether to extract dependencies from airflow task :param capture_owner: bool - whether to extract owner from airflow task :param capture_tags: bool - whether to set tags automatically from airflow task + :param config: DatahubLineageConfig :return: DataJob - returns the generated DataJob object """ dataflow_urn = DataFlowUrn.create_from_ids( @@ -271,8 +272,7 @@ def generate_datajob( base_url = conf.get("webserver", "base_url") if ( - kwargs.get("config") - and kwargs.get("config").datajob_url_link == DatajobUrl.GRID + config and config.datajob_url_link == DatajobUrl.GRID ): datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" else: @@ -299,11 +299,11 @@ def create_datajob_instance( task: "Operator", dag: "DAG", data_job: Optional[DataJob] = None, - **kwargs: Any, + config: DatahubLineageConfig = None, ) -> DataProcessInstance: if data_job is None: data_job = AirflowGenerator.generate_datajob( - cluster, task=task, dag=dag, **kwargs + cluster, task=task, dag=dag, config=config ) dpi = DataProcessInstance.from_datajob( datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True @@ -419,10 +419,9 @@ def run_datajob( datajob: Optional[DataJob] = None, attempt: Optional[int] = None, emit_templates: bool = True, - **kwargs: Any, ) -> DataProcessInstance: if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs) + datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) assert dag_run.run_id dpi = DataProcessInstance.from_datajob( @@ -493,7 +492,6 @@ def complete_datajob( end_timestamp_millis: Optional[int] = None, result: Optional[InstanceRunResult] = None, datajob: Optional[DataJob] = None, - **kwargs: Any, ) -> DataProcessInstance: """ @@ -508,7 +506,7 @@ def complete_datajob( :return: DataProcessInstance """ if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag, **kwargs) + datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) if end_timestamp_millis is None: if ti.end_date: From afd466eb5b91547460ed6526f9a765b748a4ccb0 Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Mon, 29 Jan 2024 15:59:33 -0800 Subject: [PATCH 5/8] format --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index c697d42203fdf..e340a5594091e 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -272,7 +272,7 @@ def generate_datajob( base_url = conf.get("webserver", "base_url") if ( - config and config.datajob_url_link == DatajobUrl.GRID + config and config.datajob_url_link == DatajobUrl.GRID ): datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" else: From 3478c25a42d4b99965f5cc2d024faf633749015b Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Thu, 1 Feb 2024 09:51:22 -0800 Subject: [PATCH 6/8] update --- .../src/datahub_airflow_plugin/client/airflow_generator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index e340a5594091e..3c77607a10fe4 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -271,9 +271,7 @@ def generate_datajob( datajob.properties = job_property_bag base_url = conf.get("webserver", "base_url") - if ( - config and config.datajob_url_link == DatajobUrl.GRID - ): + if config and config.datajob_url_link == DatajobUrl.GRID: datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}" else: datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}" From 5083956d55451809a85b8aba37b62cb0c1da3b6f Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Mon, 5 Feb 2024 16:17:06 -0800 Subject: [PATCH 7/8] Format --- .../datahub_airflow_plugin/client/airflow_generator.py | 8 ++++---- .../src/datahub_airflow_plugin/lineage/_lineage_core.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 3c77607a10fe4..eca8feb0a5c9f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast, Any +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast from airflow.configuration import conf from datahub.api.entities.datajob import DataFlow, DataJob @@ -13,7 +13,7 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED -from datahub_airflow_plugin._config import DatajobUrl, DatahubLineageConfig +from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl assert AIRFLOW_PATCHED @@ -209,7 +209,7 @@ def generate_datajob( set_dependencies: bool = True, capture_owner: bool = True, capture_tags: bool = True, - config: DatahubLineageConfig = None, + config: DatahubLineageConfig = DatajobUrl.TASKINSTANCE, ) -> DataJob: """ @@ -297,7 +297,7 @@ def create_datajob_instance( task: "Operator", dag: "DAG", data_job: Optional[DataJob] = None, - config: DatahubLineageConfig = None, + config: DatahubLineageConfig = DatajobUrl.TASKINSTANCE, ) -> DataProcessInstance: if data_job is None: data_job = AirflowGenerator.generate_datajob( diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py index c93753fad4a71..daf45e1cd83f8 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Any +from typing import TYPE_CHECKING, Dict, List from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult From 9861d615d4c3d6c0e03a27ec1f7b46afddfa8f8a Mon Sep 17 00:00:00 2001 From: Peng Gao Date: Tue, 6 Feb 2024 17:39:38 -0800 Subject: [PATCH 8/8] update --- .../client/airflow_generator.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index eca8feb0a5c9f..2fa15f13e848b 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -209,7 +209,7 @@ def generate_datajob( set_dependencies: bool = True, capture_owner: bool = True, capture_tags: bool = True, - config: DatahubLineageConfig = DatajobUrl.TASKINSTANCE, + config: Optional[DatahubLineageConfig] = None, ) -> DataJob: """ @@ -297,7 +297,7 @@ def create_datajob_instance( task: "Operator", dag: "DAG", data_job: Optional[DataJob] = None, - config: DatahubLineageConfig = DatajobUrl.TASKINSTANCE, + config: Optional[DatahubLineageConfig] = None, ) -> DataProcessInstance: if data_job is None: data_job = AirflowGenerator.generate_datajob( @@ -417,9 +417,12 @@ def run_datajob( datajob: Optional[DataJob] = None, attempt: Optional[int] = None, emit_templates: bool = True, + config: Optional[DatahubLineageConfig] = None, ) -> DataProcessInstance: if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) + datajob = AirflowGenerator.generate_datajob( + cluster, ti.task, dag, config=config + ) assert dag_run.run_id dpi = DataProcessInstance.from_datajob( @@ -490,6 +493,7 @@ def complete_datajob( end_timestamp_millis: Optional[int] = None, result: Optional[InstanceRunResult] = None, datajob: Optional[DataJob] = None, + config: Optional[DatahubLineageConfig] = None, ) -> DataProcessInstance: """ @@ -501,10 +505,13 @@ def complete_datajob( :param end_timestamp_millis: Optional[int] :param result: Optional[str] One of the result from datahub.metadata.schema_class.RunResultTypeClass :param datajob: Optional[DataJob] + :param config: Optional[DatahubLineageConfig] :return: DataProcessInstance """ if datajob is None: - datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag) + datajob = AirflowGenerator.generate_datajob( + cluster, ti.task, dag, config=config + ) if end_timestamp_millis is None: if ti.end_date: