Skip to content

Commit

Permalink
(ingestion) bug fix: emit platform instance aspect for dataset in Dat…
Browse files Browse the repository at this point in the history
…abricks ingestion (#8671)
  • Loading branch information
jinlintt authored Aug 28, 2023
1 parent 3acd25b commit 437b787
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class UnityCatalogSourceConfig(
description="Name of the workspace. Default to deployment name present in workspace_url",
)

ingest_data_platform_instance_aspect: Optional[bool] = pydantic.Field(
default=False,
description="Option to enable/disable ingestion of the data platform instance aspect. The default data platform instance id for a dataset is workspace_name",
)

_only_ingest_assigned_metastore_removed = pydantic_removed_field(
"only_ingest_assigned_metastore"
)
Expand Down
17 changes: 17 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_schema_field_urn,
Expand Down Expand Up @@ -68,6 +69,7 @@
ViewProperties,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
DomainsClass,
Expand Down Expand Up @@ -278,6 +280,7 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
operation = self._create_table_operation_aspect(table)
domain = self._get_domain_aspect(dataset_name=table.ref.qualified_table_name)
ownership = self._create_table_ownership_aspect(table)
data_platform_instance = self._create_data_platform_instance_aspect(table)

lineage: Optional[UpstreamLineageClass] = None
if self.config.include_column_lineage:
Expand All @@ -299,6 +302,7 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
operation,
domain,
ownership,
data_platform_instance,
lineage,
],
)
Expand Down Expand Up @@ -558,6 +562,19 @@ def _create_table_ownership_aspect(self, table: Table) -> Optional[OwnershipClas
)
return None

def _create_data_platform_instance_aspect(
self, table: Table
) -> Optional[DataPlatformInstanceClass]:
# Only ingest the DPI aspect if the flag is true
if self.config.ingest_data_platform_instance_aspect:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.platform_instance_name
),
)
return None

def _create_table_sub_type_aspect(self, table: Table) -> SubTypesClass:
return SubTypesClass(
typeNames=[DatasetSubTypes.VIEW if table.is_view else DatasetSubTypes.TABLE]
Expand Down

0 comments on commit 437b787

Please sign in to comment.