diff --git a/datahub-web-react/src/app/onboarding/config/HomePageOnboardingConfig.tsx b/datahub-web-react/src/app/onboarding/config/HomePageOnboardingConfig.tsx index 28a0465a1b2f74..8b361db5ab344c 100644 --- a/datahub-web-react/src/app/onboarding/config/HomePageOnboardingConfig.tsx +++ b/datahub-web-react/src/app/onboarding/config/HomePageOnboardingConfig.tsx @@ -94,8 +94,7 @@ export const HomePageOnboardingConfig: OnboardingStep[] = [ Here are your organization's Data Platforms. Data Platforms represent specific third-party Data Systems or Tools. Examples include Data Warehouses like Snowflake, - Orchestrators like - Airflow, and Dashboarding tools like Looker. + Orchestrators like Airflow, and Dashboarding tools like Looker. ), }, diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py index 9604931795ccb9..0428acf1d49cf2 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/hooks/datahub.py @@ -63,6 +63,13 @@ def test_connection(self) -> Tuple[bool, str]: return True, "Successfully connected to DataHub." def _get_config(self) -> Tuple[str, Optional[str], Optional[int]]: + # We have a few places in the codebase that use this method directly, despite + # it being "private". For now, we retain backwards compatibility by keeping + # this method around, but should stop using it in the future. + config = self._get_config_v2() + return config[0], config[1], config[2].get("timeout_sec") + + def _get_config_v2(self) -> Tuple[str, Optional[str], Dict]: conn: "Connection" = self.get_connection(self.datahub_rest_conn_id) host = conn.host @@ -74,14 +81,19 @@ def _get_config(self) -> Tuple[str, Optional[str], Optional[int]]: "host parameter should not contain a port number if the port is specified separately" ) host = f"{host}:{conn.port}" - password = conn.password - timeout_sec = conn.extra_dejson.get("timeout_sec") - return (host, password, timeout_sec) + token = conn.password + + extra_args = conn.extra_dejson + return (host, token, extra_args) def make_emitter(self) -> "DatahubRestEmitter": import datahub.emitter.rest_emitter - return datahub.emitter.rest_emitter.DatahubRestEmitter(*self._get_config()) + config = self._get_config_v2() + + return datahub.emitter.rest_emitter.DataHubRestEmitter( + config[0], config[1], **config[2] + ) def emit( self, diff --git a/metadata-ingestion/src/datahub/emitter/generic_emitter.py b/metadata-ingestion/src/datahub/emitter/generic_emitter.py index 28138c61827583..54b3d6841fe9c6 100644 --- a/metadata-ingestion/src/datahub/emitter/generic_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/generic_emitter.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Optional, Union +from typing import Callable, Optional, Union from typing_extensions import Protocol @@ -21,10 +21,7 @@ def emit( # required. However, this would be a breaking change that may need # more careful consideration. callback: Optional[Callable[[Exception, str], None]] = None, - # TODO: The rest emitter returns timestamps as the return type. For now - # we smooth over that detail using Any, but eventually we should - # standardize on a return type. - ) -> Any: + ) -> None: raise NotImplementedError def flush(self) -> None: diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index afb19df9791af3..0985cb4be3b5ab 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -1,10 +1,9 @@ -import datetime import functools import json import logging import os from json.decoder import JSONDecodeError -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union import requests from deprecated import deprecated @@ -208,8 +207,7 @@ def emit( UsageAggregation, ], callback: Optional[Callable[[Exception, str], None]] = None, - ) -> Tuple[datetime.datetime, datetime.datetime]: - start_time = datetime.datetime.now() + ) -> None: try: if isinstance(item, UsageAggregation): self.emit_usage(item) @@ -226,7 +224,6 @@ def emit( else: if callback: callback(None, "success") # type: ignore - return start_time, datetime.datetime.now() def emit_mce(self, mce: MetadataChangeEvent) -> None: url = f"{self._gms_server}/entities?action=ingest" diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index d3abde0d36993a..fedd8520dde4d4 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -4,10 +4,10 @@ import logging from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from datetime import timedelta +from datetime import datetime, timedelta from enum import auto from threading import BoundedSemaphore -from typing import Union +from typing import Tuple, Union from datahub.cli.cli_utils import set_env_variables_override_config from datahub.configuration.common import ( @@ -181,6 +181,18 @@ def _write_done_callback( self.report.report_failure({"e": e}) write_callback.on_failure(record_envelope, Exception(e), {}) + def _emit_wrapper( + self, + record: Union[ + MetadataChangeEvent, + MetadataChangeProposal, + MetadataChangeProposalWrapper, + ], + ) -> Tuple[datetime, datetime]: + start_time = datetime.now() + self.emitter.emit(record) + return start_time, datetime.now() + def write_record_async( self, record_envelope: RecordEnvelope[ @@ -194,7 +206,7 @@ def write_record_async( ) -> None: record = record_envelope.record if self.config.mode == SyncOrAsync.ASYNC: - write_future = self.executor.submit(self.emitter.emit, record) + write_future = self.executor.submit(self._emit_wrapper, record) write_future.add_done_callback( functools.partial( self._write_done_callback, record_envelope, write_callback @@ -204,7 +216,7 @@ def write_record_async( else: # execute synchronously try: - (start, end) = self.emitter.emit(record) + (start, end) = self._emit_wrapper(record) write_callback.on_success(record_envelope, success_metadata={}) except Exception as e: write_callback.on_failure(record_envelope, e, failure_metadata={}) diff --git a/metadata-ingestion/tests/test_helpers/graph_helpers.py b/metadata-ingestion/tests/test_helpers/graph_helpers.py index 4c2c46c2f97ced..2e73f5e2c6cdb8 100644 --- a/metadata-ingestion/tests/test_helpers/graph_helpers.py +++ b/metadata-ingestion/tests/test_helpers/graph_helpers.py @@ -1,6 +1,5 @@ -from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union from datahub.emitter.mce_builder import Aspect from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -22,7 +21,9 @@ class MockDataHubGraph(DataHubGraph): - def __init__(self, entity_graph: Dict[str, Dict[str, Any]] = {}) -> None: + def __init__( + self, entity_graph: Optional[Dict[str, Dict[str, Any]]] = None + ) -> None: self.emitted: List[ Union[ MetadataChangeEvent, @@ -30,7 +31,7 @@ def __init__(self, entity_graph: Dict[str, Dict[str, Any]] = {}) -> None: MetadataChangeProposalWrapper, ] ] = [] - self.entity_graph = entity_graph + self.entity_graph = entity_graph or {} def import_file(self, file: Path) -> None: """Imports metadata from any MCE/MCP file. Does not clear prior loaded data. @@ -110,9 +111,8 @@ def emit( UsageAggregationClass, ], callback: Union[Callable[[Exception, str], None], None] = None, - ) -> Tuple[datetime, datetime]: + ) -> None: self.emitted.append(item) # type: ignore - return (datetime.now(), datetime.now()) def emit_mce(self, mce: MetadataChangeEvent) -> None: self.emitted.append(mce)