Skip to content

Commit

Permalink
feat(sdk): easily generate container urns (#8198)
Browse files Browse the repository at this point in the history
Co-authored-by: Aseem Bansal <[email protected]>
  • Loading branch information
hsheth2 and anshbansal authored Jul 21, 2023
1 parent bec0182 commit 056d361
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 52 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ individually enable / disable desired field metrics.

### Deprecations

- #8198: In the Python SDK, the `PlatformKey` class has been renamed to `ContainerKey`.

### Other notable Changes

## 0.10.4
Expand Down
21 changes: 19 additions & 2 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
import time
from enum import Enum
from hashlib import md5
from typing import Any, List, Optional, Type, TypeVar, Union, cast, get_type_hints
from typing import (
TYPE_CHECKING,
Any,
List,
Optional,
Type,
TypeVar,
Union,
cast,
get_type_hints,
)

import typing_inspect

Expand Down Expand Up @@ -50,6 +60,9 @@
os.getenv("DATAHUB_DATASET_URN_TO_LOWER", "false") == "true"
)

if TYPE_CHECKING:
from datahub.emitter.mcp_builder import DatahubKey


# TODO: Delete this once lower-casing is the standard.
def set_dataset_urn_to_lower(value: bool) -> None:
Expand Down Expand Up @@ -132,7 +145,11 @@ def dataset_key_to_urn(key: DatasetKeyClass) -> str:
)


def make_container_urn(guid: str) -> str:
def make_container_urn(guid: Union[str, "DatahubKey"]) -> str:
from datahub.emitter.mcp_builder import DatahubKey

if isinstance(guid, DatahubKey):
guid = guid.guid()
return f"urn:li:container:{guid}"


Expand Down
29 changes: 18 additions & 11 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def guid(self) -> str:
return _stable_guid_from_dict(bag)


class PlatformKey(DatahubKey):
class ContainerKey(DatahubKey):
"""Base class for container guid keys. Most users should use one of the subclasses instead."""

platform: str
instance: Optional[str] = None

Expand All @@ -81,20 +83,27 @@ def guid_dict(self) -> Dict[str, str]:
def property_dict(self) -> Dict[str, str]:
return self.dict(by_alias=True, exclude_none=True)

def as_urn(self) -> str:
return make_container_urn(guid=self.guid())


# DEPRECATION: Keeping the `PlatformKey` name around for backwards compatibility.
PlatformKey = ContainerKey


class DatabaseKey(PlatformKey):
class DatabaseKey(ContainerKey):
database: str


class SchemaKey(DatabaseKey):
db_schema: str = Field(alias="schema")


class ProjectIdKey(PlatformKey):
class ProjectIdKey(ContainerKey):
project_id: str


class MetastoreKey(PlatformKey):
class MetastoreKey(ContainerKey):
metastore: str


Expand All @@ -110,11 +119,11 @@ class BigQueryDatasetKey(ProjectIdKey):
dataset_id: str


class FolderKey(PlatformKey):
class FolderKey(ContainerKey):
folder_abs_path: str


class BucketKey(PlatformKey):
class BucketKey(ContainerKey):
bucket_name: str


Expand All @@ -127,7 +136,7 @@ def default(self, obj: Any) -> Any:
return json.JSONEncoder.default(self, obj)


KeyType = TypeVar("KeyType", bound=PlatformKey)
KeyType = TypeVar("KeyType", bound=ContainerKey)


def add_domain_to_entity_wu(
Expand Down Expand Up @@ -188,7 +197,7 @@ def gen_containers(
container_key: KeyType,
name: str,
sub_types: List[str],
parent_container_key: Optional[PlatformKey] = None,
parent_container_key: Optional[ContainerKey] = None,
extra_properties: Optional[Dict[str, str]] = None,
domain_urn: Optional[str] = None,
description: Optional[str] = None,
Expand All @@ -199,9 +208,7 @@ def gen_containers(
created: Optional[int] = None,
last_modified: Optional[int] = None,
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
)
container_urn = container_key.as_urn()
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=parent_container_key.guid()),
Expand Down
13 changes: 7 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ class RemovedStatusFilter(enum.Enum):
"""Search only soft-deleted entities."""


@dataclass
class RelatedEntity:
urn: str
relationship_type: str


def _graphql_entity_type(entity_type: str) -> str:
"""Convert the entity types into GraphQL "EntityType" enum values."""

Expand Down Expand Up @@ -769,11 +775,6 @@ class RelationshipDirection(str, enum.Enum):
INCOMING = "INCOMING"
OUTGOING = "OUTGOING"

@dataclass
class RelatedEntity:
urn: str
relationship_type: str

def get_related_entities(
self,
entity_urn: str,
Expand All @@ -794,7 +795,7 @@ def get_related_entities(
},
)
for related_entity in response.get("entities", []):
yield DataHubGraph.RelatedEntity(
yield RelatedEntity(
urn=related_entity["urn"],
relationship_type=related_entity["relationshipType"],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, PlatformKey, ProjectIdKey
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -434,7 +434,7 @@ def get_dataplatform_instance_aspect(
entityUrn=dataset_urn, aspect=aspect
).as_workunit()

def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
def gen_dataset_key(self, db_name: str, schema: str) -> ContainerKey:
return BigQueryDatasetKey(
project_id=db_name,
dataset_id=schema,
Expand All @@ -443,7 +443,7 @@ def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
backcompat_env_as_instance=True,
)

def gen_project_id_key(self, database: str) -> PlatformKey:
def gen_project_id_key(self, database: str) -> ContainerKey:
return ProjectIdKey(
project_id=database,
platform=self.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from datahub.emitter.mcp_builder import (
BucketKey,
ContainerKey,
FolderKey,
KeyType,
PlatformKey,
add_dataset_to_container,
gen_containers,
)
Expand Down Expand Up @@ -45,7 +45,7 @@ def create_emit_containers(
container_key: KeyType,
name: str,
sub_types: List[str],
parent_container_key: Optional[PlatformKey] = None,
parent_container_key: Optional[ContainerKey] = None,
domain_urn: Optional[str] = None,
) -> Iterable[MetadataWorkUnit]:
if container_key.guid() not in self.processed_containers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import PlatformKey, gen_containers
from datahub.emitter.mcp_builder import ContainerKey, gen_containers
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -104,7 +104,7 @@ def __init__(
self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver
self.processed_datasets: Set[powerbi_data_classes.PowerBIDataset] = set()
self.workspace_key: PlatformKey
self.workspace_key: ContainerKey

@staticmethod
def urn_to_lowercase(value: str, flag: bool) -> str:
Expand Down Expand Up @@ -256,7 +256,6 @@ def to_datahub_schema(
self,
table: powerbi_data_classes.Table,
) -> SchemaMetadataClass:

fields = []
table_fields = (
[self.to_datahub_schema_field(column) for column in table.columns]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from datahub.emitter.mcp_builder import PlatformKey
from datahub.emitter.mcp_builder import ContainerKey
from datahub.metadata.schema_classes import (
BooleanTypeClass,
DateTypeClass,
Expand All @@ -28,11 +28,11 @@
}


class WorkspaceKey(PlatformKey):
class WorkspaceKey(ContainerKey):
workspace: str


class DatasetKey(PlatformKey):
class DatasetKey(ContainerKey):
dataset: str


Expand All @@ -57,7 +57,7 @@ def get_workspace_key(
platform_name: str,
platform_instance: Optional[str] = None,
workspace_id_as_urn_part: Optional[bool] = False,
) -> PlatformKey:
) -> ContainerKey:
return WorkspaceKey(
workspace=self.get_urn_part(workspace_id_as_urn_part),
platform=platform_name,
Expand Down Expand Up @@ -150,7 +150,7 @@ def __eq__(self, instance):
def __hash__(self):
return hash(self.__members())

def get_dataset_key(self, platform_name: str) -> PlatformKey:
def get_dataset_key(self, platform_name: str) -> ContainerKey:
return DatasetKey(
dataset=self.id,
platform=platform_name,
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy.engine.reflection import Inspector

from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mcp_builder import PlatformKey
from datahub.emitter.mcp_builder import ContainerKey
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand Down Expand Up @@ -211,7 +211,7 @@ def gen_schema_containers(
extra_properties=extra_properties,
)

def get_database_container_key(self, db_name: str, schema: str) -> PlatformKey:
def get_database_container_key(self, db_name: str, schema: str) -> ContainerKey:
# Because our overridden get_allowed_schemas method returns db_name as the schema name,
# the db_name and schema here will be the same. Hence, we just ignore the schema parameter.
# Based on community feedback, db_name only available if it is explicitly specified in the connection string.
Expand All @@ -232,7 +232,7 @@ def add_table_to_schema_container(
dataset_urn: str,
db_name: str,
schema: str,
schema_container_key: Optional[PlatformKey] = None,
schema_container_key: Optional[ContainerKey] = None,
) -> Iterable[MetadataWorkUnit]:
yield from add_table_to_schema_container(
dataset_urn=dataset_urn,
Expand Down
14 changes: 7 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
ContainerKey,
DatabaseKey,
PlatformKey,
SchemaKey,
add_dataset_to_container,
add_domain_to_entity_wu,
Expand All @@ -28,7 +28,7 @@ def gen_schema_key(
platform: str,
platform_instance: Optional[str],
env: Optional[str],
) -> PlatformKey:
) -> ContainerKey:
return SchemaKey(
database=db_name,
schema=schema,
Expand All @@ -41,7 +41,7 @@ def gen_schema_key(

def gen_database_key(
database: str, platform: str, platform_instance: Optional[str], env: Optional[str]
) -> PlatformKey:
) -> ContainerKey:
return DatabaseKey(
database=database,
platform=platform,
Expand All @@ -55,8 +55,8 @@ def gen_schema_container(
schema: str,
database: str,
sub_types: List[str],
database_container_key: PlatformKey,
schema_container_key: PlatformKey,
database_container_key: ContainerKey,
schema_container_key: ContainerKey,
domain_registry: Optional[DomainRegistry] = None,
domain_config: Optional[Dict[str, AllowDenyPattern]] = None,
name: Optional[str] = None,
Expand Down Expand Up @@ -113,7 +113,7 @@ def gen_domain_urn(

def gen_database_container(
database: str,
database_container_key: PlatformKey,
database_container_key: ContainerKey,
sub_types: List[str],
domain_config: Optional[Dict[str, AllowDenyPattern]] = None,
domain_registry: Optional[DomainRegistry] = None,
Expand Down Expand Up @@ -152,7 +152,7 @@ def gen_database_container(

def add_table_to_schema_container(
dataset_urn: str,
parent_container_key: PlatformKey,
parent_container_key: ContainerKey,
) -> Iterable[MetadataWorkUnit]:
yield from add_dataset_to_container(
container_key=parent_container_key,
Expand Down
Loading

0 comments on commit 056d361

Please sign in to comment.