From 64635003132bec18955df68e3b29e92bbd8b3df7 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 27 Sep 2023 16:57:33 -0700 Subject: [PATCH] fix mypy issues --- .../airflow-plugin/setup.py | 7 +- .../client/airflow_generator.py | 20 +- .../datahub_airflow_plugin/hooks/datahub.py | 52 ++- .../integration/goldens/v2_basic_iolets.json | 2 +- .../integration/goldens/v2_simple_dag.json | 387 ------------------ .../api/entities/corpgroup/corpgroup.py | 33 +- .../datahub/api/entities/corpuser/corpuser.py | 9 +- .../dataprocess/dataprocess_instance.py | 21 +- .../api/entities/dataproduct/dataproduct.py | 10 +- .../emitter/synchronized_file_emitter.py | 8 +- .../src/datahub/ingestion/graph/client.py | 4 +- 11 files changed, 80 insertions(+), 473 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 221b956ac6be3..4db70f1efb6f8 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -1,5 +1,6 @@ import os import pathlib +from typing import Dict, Set import setuptools @@ -29,7 +30,7 @@ def get_long_description(): *rest_common, } -plugins = { +plugins: Dict[str, Set[str]] = { "datahub-rest": { f"acryl-datahub[datahub-rest]{_self_pin}", }, @@ -48,7 +49,7 @@ def get_long_description(): } # Include datahub-rest in the base requirements. -base_requirements |= plugins["datahub-rest"] +base_requirements.update(plugins["datahub-rest"]) mypy_stubs = { @@ -158,7 +159,7 @@ def get_long_description(): # Dependencies. install_requires=list(base_requirements), extras_require={ - **{plugin: list(dependencies) for (plugin, dependencies) in plugins.items()}, + **{plugin: list(dependencies) for plugin, dependencies in plugins.items()}, "dev": list(dev_requirements), "integration-tests": list(integration_test_requirements), }, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py index 0e12a980d1d73..c32ed4a8dc3f5 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Dict, List, Optional, Set, cast +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast from airflow.configuration import conf from datahub.api.entities.datajob import DataFlow, DataJob @@ -227,7 +227,7 @@ def generate_datajob( job_property_bag: Dict[str, str] = {} - allowed_task_keys = [ + allowed_task_keys: List[Union[str, Tuple[str, ...]]] = [ "_task_type", "_task_module", "depends_on_past", @@ -247,13 +247,17 @@ def generate_datajob( ] for key in allowed_task_keys: - old_key = None if isinstance(key, tuple): - key, old_key = key - if hasattr(task, key): - job_property_bag[key] = repr(getattr(task, key)) - elif old_key is not None and hasattr(task, old_key): - job_property_bag[key] = repr(getattr(task, old_key)) + out_key: str = key[0] + try_keys = key + else: + out_key = key + try_keys = (key,) + + for k in try_keys: + if hasattr(task, k): + job_property_bag[out_key] = repr(getattr(task, k)) + break datajob.properties = job_property_bag base_url = conf.get("webserver", "base_url") diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py index 6c40ca0f2e4f7..9604931795ccb 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py @@ -192,6 +192,39 @@ def callback(exc, msg): emit_mcps = emit +class SynchronizedFileHook(BaseHook): + conn_type = "datahub-file" + + def __init__(self, datahub_conn_id: str) -> None: + super().__init__() + self.datahub_conn_id = datahub_conn_id + + def make_emitter(self) -> "SynchronizedFileEmitter": + from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter + + conn = self.get_connection(self.datahub_conn_id) + filename = conn.host + if not filename: + raise AirflowException("filename parameter is required") + + return SynchronizedFileEmitter(filename=filename) + + def emit( + self, + items: Sequence[ + Union[ + MetadataChangeEvent, + MetadataChangeProposal, + MetadataChangeProposalWrapper, + ] + ], + ) -> None: + emitter = self.make_emitter() + + for item in items: + emitter.emit(item) + + class DatahubGenericHook(BaseHook): """ Emits Metadata Change Events using either the DatahubRestHook or the @@ -205,7 +238,9 @@ def __init__(self, datahub_conn_id: str) -> None: super().__init__() self.datahub_conn_id = datahub_conn_id - def get_underlying_hook(self) -> Union[DatahubRestHook, DatahubKafkaHook]: + def get_underlying_hook( + self, + ) -> Union[DatahubRestHook, DatahubKafkaHook, SynchronizedFileHook]: conn = self.get_connection(self.datahub_conn_id) # We need to figure out the underlying hook type. First check the @@ -251,18 +286,3 @@ def emit( # Retained for backwards compatibility. emit_mces = emit - - -class SynchronizedFileHook(BaseHook): - conn_type = "datahub-file" - - def __init__(self, datahub_conn_id: str) -> None: - super().__init__() - self.datahub_conn_id = datahub_conn_id - - def make_emitter(self) -> "SynchronizedFileEmitter": - from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter - - conn = self.get_connection(self.datahub_conn_id) - - return SynchronizedFileEmitter(filename=conn.host) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 9d671f2d7908c..90155969c9241 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -74,7 +74,7 @@ "downstream_task_ids": "set()", "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" + "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" }, "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", "name": "run_data_task", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 26ad51a251b13..e3b49688ac033 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -55,54 +55,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'task_1'", - "execution_timeout": "None", - "sla": "None", - "task_id": "'task_1'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "{'run_another_data_task'}", - "inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]", - "outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None)]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 1'\", \"dag\": \"<>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"task_id\": \"task_1\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 1'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [\"run_another_data_task\"], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"task_1\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", - "name": "task_1", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" - ], - "outputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" - ], - "inputDatajobs": [], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", @@ -136,163 +88,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceProperties", - "aspect": { - "json": { - "customProperties": { - "run_id": "manual_run_test", - "duration": "None", - "start_date": "2023-09-27 22:40:45.392053+00:00", - "end_date": "None", - "execution_date": "2023-09-27 21:34:38+00:00", - "try_number": "0", - "hostname": "aluminum-b.local", - "max_tries": "0", - "external_executor_id": "None", - "pid": "73891", - "state": "running", - "operator": "BashOperator", - "priority_weight": "2", - "unixname": "hsheth", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1" - }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", - "name": "simple_dag_task_1_manual_run_test", - "type": "BATCH_AD_HOC", - "created": { - "time": 1695854445392, - "actor": "urn:li:corpuser:datahub" - } - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRelationships", - "aspect": { - "json": { - "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", - "upstreamInstances": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" - ] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceOutput", - "aspect": { - "json": { - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)" - ] - } - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableA,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableD,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1695854445392, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "STARTED", - "attempt": 1 - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", @@ -444,26 +239,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:fdbbbcd638bc0e91bbf8d7775efbecaf", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1695854445796, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "COMPLETE", - "result": { - "type": "SUCCESS", - "nativeResultType": "airflow" - } - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", @@ -542,167 +317,5 @@ "tags": [] } } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceProperties", - "aspect": { - "json": { - "customProperties": { - "run_id": "manual_run_test", - "duration": "None", - "start_date": "2023-09-27 22:40:50.033987+00:00", - "end_date": "None", - "execution_date": "2023-09-27 21:34:38+00:00", - "try_number": "0", - "hostname": "aluminum-b.local", - "max_tries": "0", - "external_executor_id": "None", - "pid": "73894", - "state": "running", - "operator": "BashOperator", - "priority_weight": "1", - "unixname": "hsheth", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1" - }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", - "name": "simple_dag_run_another_data_task_manual_run_test", - "type": "BATCH_AD_HOC", - "created": { - "time": 1695854450033, - "actor": "urn:li:corpuser:datahub" - } - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRelationships", - "aspect": { - "json": { - "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", - "upstreamInstances": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1695854450033, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "STARTED", - "attempt": 1 - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'run_another_data_task'", - "execution_timeout": "None", - "sla": "None", - "task_id": "'run_another_data_task'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "set()", - "inlets": "[]", - "outlets": "[]", - "openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'task 2'\", \"dag\": \"<>\", \"task_id\": \"run_another_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<>\", \"_lock_for_execution\": true, \"_log\": \"<>\", \"append_env\": false, \"bash_command\": \"echo 'task 2'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [], \"outlets\": [], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<>\", \"task_group\": \"<>\", \"task_id\": \"run_another_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [\"task_1\"], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task", - "name": "run_another_data_task", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)" - ], - "fineGrainedLineages": [] - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:888f71b79d9a0b162fe44acad7b2c2ae", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1695854450466, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "COMPLETE", - "result": { - "type": "SUCCESS", - "nativeResultType": "airflow" - } - } - } } ] \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py index 796786beba21b..a898e35bb810e 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py +++ b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, Iterable, List, Optional, Union +from typing import Callable, Iterable, List, Optional, Union import pydantic from pydantic import BaseModel @@ -11,9 +11,10 @@ from datahub.api.entities.corpuser.corpuser import CorpUser, CorpUserGenerationConfig from datahub.configuration.common import ConfigurationError from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( CorpGroupEditableInfoClass, CorpGroupInfoClass, @@ -25,9 +26,6 @@ _Aspect, ) -if TYPE_CHECKING: - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter - logger = logging.getLogger(__name__) @@ -194,30 +192,9 @@ def generate_mcp( entityUrn=urn, aspect=StatusClass(removed=False) ) - @staticmethod - def _datahub_graph_from_datahub_rest_emitter( - rest_emitter: DatahubRestEmitter, - ) -> DataHubGraph: - """ - Create a datahub graph instance from a REST Emitter. - A stop-gap implementation which is expected to be removed after PATCH support is implemented - for membership updates for users <-> groups - """ - graph = DataHubGraph( - config=DatahubClientConfig( - server=rest_emitter._gms_server, - token=rest_emitter._token, - timeout_sec=rest_emitter._connect_timeout_sec, - retry_status_codes=rest_emitter._retry_status_codes, - extra_headers=rest_emitter._session.headers, - disable_ssl_verification=rest_emitter._session.verify is False, - ) - ) - return graph - def emit( self, - emitter: Union[DatahubRestEmitter, "DatahubKafkaEmitter"], + emitter: Emitter, callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: """ @@ -235,7 +212,7 @@ def emit( # who are passing in a DataHubRestEmitter today # we won't need this in the future once PATCH support is implemented as all emitters # will work - datahub_graph = self._datahub_graph_from_datahub_rest_emitter(emitter) + datahub_graph = emitter.to_graph() for mcp in self.generate_mcp( generation_config=CorpGroupGenerationConfig( override_editable=self.overrideEditable, datahub_graph=datahub_graph diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py index c67eb02a870a5..9fe1ebedafca7 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py @@ -1,14 +1,14 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING, Callable, Iterable, List, Optional, Union +from typing import Callable, Iterable, List, Optional import pydantic import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel +from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.schema_classes import ( CorpUserEditableInfoClass, CorpUserInfoClass, @@ -16,9 +16,6 @@ StatusClass, ) -if TYPE_CHECKING: - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter - @dataclass class CorpUserGenerationConfig: @@ -144,7 +141,7 @@ def generate_mcp( def emit( self, - emitter: Union[DatahubRestEmitter, "DatahubKafkaEmitter"], + emitter: Emitter, callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: """ diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index 9ec389c3a0989..cf6080c7072e6 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -1,9 +1,10 @@ import time from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import Callable, Dict, Iterable, List, Optional, Union, cast from datahub.api.entities.datajob import DataFlow, DataJob +from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import DatahubKey from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import ( @@ -26,10 +27,6 @@ from datahub.utilities.urns.data_process_instance_urn import DataProcessInstanceUrn from datahub.utilities.urns.dataset_urn import DatasetUrn -if TYPE_CHECKING: - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter - from datahub.emitter.rest_emitter import DatahubRestEmitter - class DataProcessInstanceKey(DatahubKey): cluster: str @@ -106,7 +103,7 @@ def start_event_mcp( def emit_process_start( self, - emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"], + emitter: Emitter, start_timestamp_millis: int, attempt: Optional[int] = None, emit_template: bool = True, @@ -197,7 +194,7 @@ def end_event_mcp( def emit_process_end( self, - emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"], + emitter: Emitter, end_timestamp_millis: int, result: InstanceRunResult, result_type: Optional[str] = None, @@ -207,7 +204,7 @@ def emit_process_end( """ Generate an DataProcessInstance finish event and emits is - :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps + :param emitter: (Emitter) the datahub emitter to emit generated mcps :param end_timestamp_millis: (int) the end time of the execution in milliseconds :param result: (InstanceRunResult) The result of the run :param result_type: (string) It identifies the system where the native result comes from like Airflow, Azkaban @@ -261,24 +258,24 @@ def generate_mcp( @staticmethod def _emit_mcp( mcp: MetadataChangeProposalWrapper, - emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"], + emitter: Emitter, callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: """ - :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps + :param emitter: (Emitter) the datahub emitter to emit generated mcps :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ emitter.emit(mcp, callback) def emit( self, - emitter: Union["DatahubRestEmitter", "DatahubKafkaEmitter"], + emitter: Emitter, callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: """ - :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps + :param emitter: (Emitter) the datahub emitter to emit generated mcps :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): diff --git a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py index 04f12b4f61d1e..e1a1555016dba 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py +++ b/metadata-ingestion/src/datahub/api/entities/dataproduct/dataproduct.py @@ -19,8 +19,8 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel +from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( AuditStampClass, @@ -43,9 +43,6 @@ from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.urns.urn import Urn -if TYPE_CHECKING: - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter - def patch_list( orig_list: Optional[list], @@ -225,7 +222,6 @@ def _generate_properties_mcp( def generate_mcp( self, upsert: bool ) -> Iterable[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: - if self._resolved_domain_urn is None: raise Exception( f"Unable to generate MCP-s because we were unable to resolve the domain {self.domain} to an urn." @@ -282,7 +278,7 @@ def generate_mcp( def emit( self, - emitter: Union[DatahubRestEmitter, "DatahubKafkaEmitter"], + emitter: Emitter, upsert: bool, callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: @@ -440,7 +436,6 @@ def patch_yaml( original_dataproduct: DataProduct, output_file: Path, ) -> bool: - update_needed = False if not original_dataproduct._original_yaml_dict: raise Exception("Original Data Product was not loaded from yaml") @@ -523,7 +518,6 @@ def to_yaml( self, file: Path, ) -> None: - with open(file, "w") as fp: yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) yaml.indent(mapping=2, sequence=4, offset=2) diff --git a/metadata-ingestion/src/datahub/emitter/synchronized_file_emitter.py b/metadata-ingestion/src/datahub/emitter/synchronized_file_emitter.py index 67469e728f6ef..a5c581d244877 100644 --- a/metadata-ingestion/src/datahub/emitter/synchronized_file_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/synchronized_file_emitter.py @@ -1,5 +1,5 @@ import pathlib -from typing import Callable, Union +from typing import Callable, Optional, Union import filelock @@ -32,7 +32,7 @@ def emit( item: Union[ MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper ], - callback: Callable[[Exception, str], None] | None = None, + callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: with self._lock: if self._filename.exists(): @@ -44,6 +44,10 @@ def emit( write_metadata_file(self._filename, metadata) + def flush(self) -> None: + # No-op. + pass + def close(self) -> None: # No-op. pass diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index ce5c2ca2a878b..cd112a158de08 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -162,11 +162,11 @@ def from_emitter(cls, emitter: DatahubRestEmitter) -> "DataHubGraph": timeout_sec=emitter._read_timeout_sec, retry_status_codes=emitter._retry_status_codes, retry_max_times=emitter._retry_max_times, + extra_headers=emitter._session.headers, + disable_ssl_verification=emitter._session.verify is False, # TODO: Support these headers. - # extra_headers=emitter._extra_headers, # ca_certificate_path=emitter._ca_certificate_path, # client_certificate_path=emitter._client_certificate_path, - # disable_ssl_verification=emitter._disable_ssl_verification, ) )