From 056d3619f0923757c1e4afe9f0e5a56de1a64dde Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 21 Jul 2023 14:14:06 -0700 Subject: [PATCH] feat(sdk): easily generate container urns (#8198) Co-authored-by: Aseem Bansal --- docs/how/updating-datahub.md | 2 ++ .../src/datahub/emitter/mce_builder.py | 21 ++++++++++++-- .../src/datahub/emitter/mcp_builder.py | 29 ++++++++++++------- .../src/datahub/ingestion/graph/client.py | 13 +++++---- .../ingestion/source/bigquery_v2/bigquery.py | 6 ++-- .../data_lake_common/data_lake_utils.py | 4 +-- .../ingestion/source/powerbi/powerbi.py | 5 ++-- .../powerbi/rest_api_wrapper/data_classes.py | 10 +++---- .../datahub/ingestion/source/sql/athena.py | 6 ++-- .../datahub/ingestion/source/sql/sql_utils.py | 14 ++++----- .../source/sql/two_tier_sql_source.py | 8 ++--- .../src/datahub/ingestion/source/tableau.py | 8 ++--- .../datahub/ingestion/source/unity/source.py | 4 +-- 13 files changed, 78 insertions(+), 52 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 03b3d763ed247..b705c973cdbb5 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -20,6 +20,8 @@ individually enable / disable desired field metrics. ### Deprecations +- #8198: In the Python SDK, the `PlatformKey` class has been renamed to `ContainerKey`. + ### Other notable Changes ## 0.10.4 diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 9c44949741297..47727d5784a19 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -6,7 +6,17 @@ import time from enum import Enum from hashlib import md5 -from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints +from typing import ( + TYPE_CHECKING, + Any, + List, + Optional, + Type, + TypeVar, + Union, + cast, + get_type_hints, +) import typing_inspect @@ -50,6 +60,9 @@ os.getenv("DATAHUB_DATASET_URN_TO_LOWER", "false") == "true" ) +if TYPE_CHECKING: + from datahub.emitter.mcp_builder import DatahubKey + # TODO: Delete this once lower-casing is the standard. def set_dataset_urn_to_lower(value: bool) -> None: @@ -132,7 +145,11 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str: ) -def make_container_urn(guid: str) -> str: +def make_container_urn(guid: Union[str, "DatahubKey"]) -> str: + from datahub.emitter.mcp_builder import DatahubKey + + if isinstance(guid, DatahubKey): + guid = guid.guid() return f"urn:li:container:{guid}" diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 9051f2e82fa1f..40df214f49433 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -54,7 +54,9 @@ def guid(self) -> str: return _stable_guid_from_dict(bag) -class PlatformKey(DatahubKey): +class ContainerKey(DatahubKey): + """Base class for container guid keys. Most users should use one of the subclasses instead.""" + platform: str instance: Optional[str] = None @@ -81,8 +83,15 @@ def guid_dict(self) -> Dict[str, str]: def property_dict(self) -> Dict[str, str]: return self.dict(by_alias=True, exclude_none=True) + def as_urn(self) -> str: + return make_container_urn(guid=self.guid()) + + +# DEPRECATION: Keeping the `PlatformKey` name around for backwards compatibility. +PlatformKey = ContainerKey + -class DatabaseKey(PlatformKey): +class DatabaseKey(ContainerKey): database: str @@ -90,11 +99,11 @@ class SchemaKey(DatabaseKey): db_schema: str = Field(alias="schema") -class ProjectIdKey(PlatformKey): +class ProjectIdKey(ContainerKey): project_id: str -class MetastoreKey(PlatformKey): +class MetastoreKey(ContainerKey): metastore: str @@ -110,11 +119,11 @@ class BigQueryDatasetKey(ProjectIdKey): dataset_id: str -class FolderKey(PlatformKey): +class FolderKey(ContainerKey): folder_abs_path: str -class BucketKey(PlatformKey): +class BucketKey(ContainerKey): bucket_name: str @@ -127,7 +136,7 @@ def default(self, obj: Any) -> Any: return json.JSONEncoder.default(self, obj) -KeyType = TypeVar("KeyType", bound=PlatformKey) +KeyType = TypeVar("KeyType", bound=ContainerKey) def add_domain_to_entity_wu( @@ -188,7 +197,7 @@ def gen_containers( container_key: KeyType, name: str, sub_types: List[str], - parent_container_key: Optional[PlatformKey] = None, + parent_container_key: Optional[ContainerKey] = None, extra_properties: Optional[Dict[str, str]] = None, domain_urn: Optional[str] = None, description: Optional[str] = None, @@ -199,9 +208,7 @@ def gen_containers( created: Optional[int] = None, last_modified: Optional[int] = None, ) -> Iterable[MetadataWorkUnit]: - container_urn = make_container_urn( - guid=container_key.guid(), - ) + container_urn = container_key.as_urn() yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=parent_container_key.guid()), diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 2f817ee69a637..cac53c350f2ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -88,6 +88,12 @@ class RemovedStatusFilter(enum.Enum): """Search only soft-deleted entities.""" +@dataclass +class RelatedEntity: + urn: str + relationship_type: str + + def _graphql_entity_type(entity_type: str) -> str: """Convert the entity types into GraphQL "EntityType" enum values.""" @@ -769,11 +775,6 @@ class RelationshipDirection(str, enum.Enum): INCOMING = "INCOMING" OUTGOING = "OUTGOING" - @dataclass - class RelatedEntity: - urn: str - relationship_type: str - def get_related_entities( self, entity_urn: str, @@ -794,7 +795,7 @@ def get_related_entities( }, ) for related_entity in response.get("entities", []): - yield DataHubGraph.RelatedEntity( + yield RelatedEntity( urn=related_entity["urn"], relationship_type=related_entity["relationshipType"], ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 919c803222066..ccda00ba293ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -20,7 +20,7 @@ set_dataset_urn_to_lower, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.mcp_builder import BigQueryDatasetKey, PlatformKey, ProjectIdKey +from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -434,7 +434,7 @@ def get_dataplatform_instance_aspect( entityUrn=dataset_urn, aspect=aspect ).as_workunit() - def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey: + def gen_dataset_key(self, db_name: str, schema: str) -> ContainerKey: return BigQueryDatasetKey( project_id=db_name, dataset_id=schema, @@ -443,7 +443,7 @@ def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey: backcompat_env_as_instance=True, ) - def gen_project_id_key(self, database: str) -> PlatformKey: + def gen_project_id_key(self, database: str) -> ContainerKey: return ProjectIdKey( project_id=database, platform=self.platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index 0a65537772390..b04718a9eabba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -3,9 +3,9 @@ from datahub.emitter.mcp_builder import ( BucketKey, + ContainerKey, FolderKey, KeyType, - PlatformKey, add_dataset_to_container, gen_containers, ) @@ -45,7 +45,7 @@ def create_emit_containers( container_key: KeyType, name: str, sub_types: List[str], - parent_container_key: Optional[PlatformKey] = None, + parent_container_key: Optional[ContainerKey] = None, domain_urn: Optional[str] = None, ) -> Iterable[MetadataWorkUnit]: if container_key.guid() not in self.processed_containers: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 33596091e420d..919cb83e4d832 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -9,7 +9,7 @@ import datahub.emitter.mce_builder as builder import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.emitter.mcp_builder import PlatformKey, gen_containers +from datahub.emitter.mcp_builder import ContainerKey, gen_containers from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -104,7 +104,7 @@ def __init__( self.__reporter = reporter self.__dataplatform_instance_resolver = dataplatform_instance_resolver self.processed_datasets: Set[powerbi_data_classes.PowerBIDataset] = set() - self.workspace_key: PlatformKey + self.workspace_key: ContainerKey @staticmethod def urn_to_lowercase(value: str, flag: bool) -> str: @@ -256,7 +256,6 @@ def to_datahub_schema( self, table: powerbi_data_classes.Table, ) -> SchemaMetadataClass: - fields = [] table_fields = ( [self.to_datahub_schema_field(column) for column in table.columns] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 28a5fac8b127b..2d2d9f527788f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -2,7 +2,7 @@ from enum import Enum from typing import Any, Dict, List, Optional, Union -from datahub.emitter.mcp_builder import PlatformKey +from datahub.emitter.mcp_builder import ContainerKey from datahub.metadata.schema_classes import ( BooleanTypeClass, DateTypeClass, @@ -28,11 +28,11 @@ } -class WorkspaceKey(PlatformKey): +class WorkspaceKey(ContainerKey): workspace: str -class DatasetKey(PlatformKey): +class DatasetKey(ContainerKey): dataset: str @@ -57,7 +57,7 @@ def get_workspace_key( platform_name: str, platform_instance: Optional[str] = None, workspace_id_as_urn_part: Optional[bool] = False, - ) -> PlatformKey: + ) -> ContainerKey: return WorkspaceKey( workspace=self.get_urn_part(workspace_id_as_urn_part), platform=platform_name, @@ -150,7 +150,7 @@ def __eq__(self, instance): def __hash__(self): return hash(self.__members()) - def get_dataset_key(self, platform_name: str) -> PlatformKey: + def get_dataset_key(self, platform_name: str) -> ContainerKey: return DatasetKey( dataset=self.id, platform=platform_name, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 46f9fd240db04..8b2eed36ac6b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -9,7 +9,7 @@ from sqlalchemy.engine.reflection import Inspector from datahub.configuration.validate_field_rename import pydantic_renamed_field -from datahub.emitter.mcp_builder import PlatformKey +from datahub.emitter.mcp_builder import ContainerKey from datahub.ingestion.api.decorators import ( SourceCapability, SupportStatus, @@ -211,7 +211,7 @@ def gen_schema_containers( extra_properties=extra_properties, ) - def get_database_container_key(self, db_name: str, schema: str) -> PlatformKey: + def get_database_container_key(self, db_name: str, schema: str) -> ContainerKey: # Because our overridden get_allowed_schemas method returns db_name as the schema name, # the db_name and schema here will be the same. Hence, we just ignore the schema parameter. # Based on community feedback, db_name only available if it is explicitly specified in the connection string. @@ -232,7 +232,7 @@ def add_table_to_schema_container( dataset_urn: str, db_name: str, schema: str, - schema_container_key: Optional[PlatformKey] = None, + schema_container_key: Optional[ContainerKey] = None, ) -> Iterable[MetadataWorkUnit]: yield from add_table_to_schema_container( dataset_urn=dataset_urn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py index 7554dd5af3103..a5f5034d175c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py @@ -8,8 +8,8 @@ ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( + ContainerKey, DatabaseKey, - PlatformKey, SchemaKey, add_dataset_to_container, add_domain_to_entity_wu, @@ -28,7 +28,7 @@ def gen_schema_key( platform: str, platform_instance: Optional[str], env: Optional[str], -) -> PlatformKey: +) -> ContainerKey: return SchemaKey( database=db_name, schema=schema, @@ -41,7 +41,7 @@ def gen_schema_key( def gen_database_key( database: str, platform: str, platform_instance: Optional[str], env: Optional[str] -) -> PlatformKey: +) -> ContainerKey: return DatabaseKey( database=database, platform=platform, @@ -55,8 +55,8 @@ def gen_schema_container( schema: str, database: str, sub_types: List[str], - database_container_key: PlatformKey, - schema_container_key: PlatformKey, + database_container_key: ContainerKey, + schema_container_key: ContainerKey, domain_registry: Optional[DomainRegistry] = None, domain_config: Optional[Dict[str, AllowDenyPattern]] = None, name: Optional[str] = None, @@ -113,7 +113,7 @@ def gen_domain_urn( def gen_database_container( database: str, - database_container_key: PlatformKey, + database_container_key: ContainerKey, sub_types: List[str], domain_config: Optional[Dict[str, AllowDenyPattern]] = None, domain_registry: Optional[DomainRegistry] = None, @@ -152,7 +152,7 @@ def gen_database_container( def add_table_to_schema_container( dataset_urn: str, - parent_container_key: PlatformKey, + parent_container_key: ContainerKey, ) -> Iterable[MetadataWorkUnit]: yield from add_dataset_to_container( container_key=parent_container_key, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index f105829d874de..d9062cef06eae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -7,7 +7,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.validate_field_rename import pydantic_renamed_field -from datahub.emitter.mcp_builder import PlatformKey +from datahub.emitter.mcp_builder import ContainerKey from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, logger from datahub.ingestion.source.sql.sql_config import ( @@ -56,7 +56,7 @@ def __init__(self, config, ctx, platform): super().__init__(config, ctx, platform) self.config: TwoTierSQLAlchemyConfig = config - def get_database_container_key(self, db_name: str, schema: str) -> PlatformKey: + def get_database_container_key(self, db_name: str, schema: str) -> ContainerKey: # Because our overridden get_allowed_schemas method returns db_name as the schema name, # the db_name and schema here will be the same. Hence, we just ignore the schema parameter. assert db_name == schema @@ -72,7 +72,7 @@ def add_table_to_schema_container( dataset_urn: str, db_name: str, schema: str, - schema_container_key: Optional[PlatformKey] = None, + schema_container_key: Optional[ContainerKey] = None, ) -> Iterable[MetadataWorkUnit]: yield from add_table_to_schema_container( dataset_urn=dataset_urn, @@ -86,7 +86,7 @@ def get_allowed_schemas( # dbName itself as an allowed schema yield db_name - def gen_schema_key(self, db_name: str, schema: str) -> PlatformKey: + def gen_schema_key(self, db_name: str, schema: str) -> ContainerKey: # Sanity check that we don't try to generate schema containers for 2 tier databases. raise NotImplementedError diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 5ad39425c3f73..67bd1af6c2d7f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -33,7 +33,7 @@ ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( - PlatformKey, + ContainerKey, add_entity_to_container, gen_containers, ) @@ -358,11 +358,11 @@ def projects_backward_compatibility(cls, values: Dict) -> Dict: return values -class WorkbookKey(PlatformKey): +class WorkbookKey(ContainerKey): workbook_id: str -class ProjectKey(PlatformKey): +class ProjectKey(ContainerKey): project_id: str @@ -1682,7 +1682,7 @@ def emit_datasource( ) def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds): - container_key: Optional[PlatformKey] = None + container_key: Optional[ContainerKey] = None if is_embedded_ds: # It is embedded then parent is container is workbook if workbook is not None: container_key = self.gen_workbook_key(workbook) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9d82a9e247a00..ec7d00c7bcc63 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -15,8 +15,8 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import ( CatalogKey, + ContainerKey, MetastoreKey, - PlatformKey, UnitySchemaKey, add_dataset_to_container, gen_containers, @@ -432,7 +432,7 @@ def gen_catalog_containers(self, catalog: Catalog) -> Iterable[MetadataWorkUnit] external_url=f"{self.external_url_base}/{catalog.name}", ) - def gen_schema_key(self, schema: Schema) -> PlatformKey: + def gen_schema_key(self, schema: Schema) -> ContainerKey: return UnitySchemaKey( unity_schema=schema.name, platform=self.platform,