Skip to content

Commit

Permalink
fix mypy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Sep 27, 2023
1 parent ed55d67 commit 6463500
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 473 deletions.
7 changes: 4 additions & 3 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pathlib
from typing import Dict, Set

import setuptools

Expand Down Expand Up @@ -29,7 +30,7 @@ def get_long_description():
*rest_common,
}

plugins = {
plugins: Dict[str, Set[str]] = {
"datahub-rest": {
f"acryl-datahub[datahub-rest]{_self_pin}",
},
Expand All @@ -48,7 +49,7 @@ def get_long_description():
}

# Include datahub-rest in the base requirements.
base_requirements |= plugins["datahub-rest"]
base_requirements.update(plugins["datahub-rest"])


mypy_stubs = {
Expand Down Expand Up @@ -158,7 +159,7 @@ def get_long_description():
# Dependencies.
install_requires=list(base_requirements),
extras_require={
**{plugin: list(dependencies) for (plugin, dependencies) in plugins.items()},
**{plugin: list(dependencies) for plugin, dependencies in plugins.items()},
"dev": list(dev_requirements),
"integration-tests": list(integration_test_requirements),
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Set, cast
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union, cast

from airflow.configuration import conf
from datahub.api.entities.datajob import DataFlow, DataJob
Expand Down Expand Up @@ -227,7 +227,7 @@ def generate_datajob(

job_property_bag: Dict[str, str] = {}

allowed_task_keys = [
allowed_task_keys: List[Union[str, Tuple[str, ...]]] = [
"_task_type",
"_task_module",
"depends_on_past",
Expand All @@ -247,13 +247,17 @@ def generate_datajob(
]

for key in allowed_task_keys:
old_key = None
if isinstance(key, tuple):
key, old_key = key
if hasattr(task, key):
job_property_bag[key] = repr(getattr(task, key))
elif old_key is not None and hasattr(task, old_key):
job_property_bag[key] = repr(getattr(task, old_key))
out_key: str = key[0]
try_keys = key
else:
out_key = key
try_keys = (key,)

for k in try_keys:
if hasattr(task, k):
job_property_bag[out_key] = repr(getattr(task, k))
break

datajob.properties = job_property_bag
base_url = conf.get("webserver", "base_url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,39 @@ def callback(exc, msg):
emit_mcps = emit


class SynchronizedFileHook(BaseHook):
conn_type = "datahub-file"

def __init__(self, datahub_conn_id: str) -> None:
super().__init__()
self.datahub_conn_id = datahub_conn_id

def make_emitter(self) -> "SynchronizedFileEmitter":
from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter

conn = self.get_connection(self.datahub_conn_id)
filename = conn.host
if not filename:
raise AirflowException("filename parameter is required")

return SynchronizedFileEmitter(filename=filename)

def emit(
self,
items: Sequence[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
],
) -> None:
emitter = self.make_emitter()

for item in items:
emitter.emit(item)


class DatahubGenericHook(BaseHook):
"""
Emits Metadata Change Events using either the DatahubRestHook or the
Expand All @@ -205,7 +238,9 @@ def __init__(self, datahub_conn_id: str) -> None:
super().__init__()
self.datahub_conn_id = datahub_conn_id

def get_underlying_hook(self) -> Union[DatahubRestHook, DatahubKafkaHook]:
def get_underlying_hook(
self,
) -> Union[DatahubRestHook, DatahubKafkaHook, SynchronizedFileHook]:
conn = self.get_connection(self.datahub_conn_id)

# We need to figure out the underlying hook type. First check the
Expand Down Expand Up @@ -251,18 +286,3 @@ def emit(

# Retained for backwards compatibility.
emit_mces = emit


class SynchronizedFileHook(BaseHook):
conn_type = "datahub-file"

def __init__(self, datahub_conn_id: str) -> None:
super().__init__()
self.datahub_conn_id = datahub_conn_id

def make_emitter(self) -> "SynchronizedFileEmitter":
from datahub.emitter.synchronized_file_emitter import SynchronizedFileEmitter

conn = self.get_connection(self.datahub_conn_id)

return SynchronizedFileEmitter(filename=conn.host)
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"downstream_task_ids": "set()",
"inlets": "[Dataset(platform='snowflake', name='mydb.schema.tableA', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableB', env='DEV', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableC', env='PROD', platform_instance='cloud'), Urn(_urn='urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)')]",
"outlets": "[Dataset(platform='snowflake', name='mydb.schema.tableD', env='PROD', platform_instance=None), Dataset(platform='snowflake', name='mydb.schema.tableE', env='PROD', platform_instance=None)]",
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_lock_for_execution\": true, \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_on_exit_code\": [99], \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"wait_for_past_depends_before_skipping\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
"openlineage_run_facet_unknownSourceAttribute": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.2.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"unknownItems\": [{\"name\": \"BashOperator\", \"properties\": {\"_BaseOperator__from_mapped\": false, \"_BaseOperator__init_kwargs\": {\"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"dag\": \"<<non-serializable: DAG>>\", \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"task_id\": \"run_data_task\"}, \"_BaseOperator__instantiated\": true, \"_dag\": \"<<non-serializable: DAG>>\", \"_log\": \"<<non-serializable: Logger>>\", \"append_env\": false, \"bash_command\": \"echo 'This is where you might run your data tooling.'\", \"depends_on_past\": false, \"do_xcom_push\": true, \"downstream_task_ids\": [], \"email_on_failure\": true, \"email_on_retry\": true, \"executor_config\": {}, \"ignore_first_depends_on_past\": true, \"inlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableA\", \"platform\": \"snowflake\"}, {\"env\": \"DEV\", \"name\": \"mydb.schema.tableB\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableC\", \"platform\": \"snowflake\", \"platform_instance\": \"cloud\"}, {\"_urn\": \"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)\"}], \"outlets\": [{\"env\": \"PROD\", \"name\": \"mydb.schema.tableD\", \"platform\": \"snowflake\"}, {\"env\": \"PROD\", \"name\": \"mydb.schema.tableE\", \"platform\": \"snowflake\"}], \"output_encoding\": \"utf-8\", \"owner\": \"airflow\", \"params\": \"<<non-serializable: ParamsDict>>\", \"pool\": \"default_pool\", \"pool_slots\": 1, \"priority_weight\": 1, \"queue\": \"default\", \"retries\": 0, \"retry_delay\": \"<<non-serializable: timedelta>>\", \"retry_exponential_backoff\": false, \"skip_exit_code\": 99, \"start_date\": \"<<non-serializable: DateTime>>\", \"task_group\": \"<<non-serializable: TaskGroup>>\", \"task_id\": \"run_data_task\", \"trigger_rule\": \"all_success\", \"upstream_task_ids\": [], \"wait_for_downstream\": false, \"weight_rule\": \"downstream\"}, \"type\": \"operator\"}]}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
Expand Down
Loading

0 comments on commit 6463500

Please sign in to comment.