Skip to content

Commit

Permalink
feat(ingest/unity-catalog): Support external S3 lineage (#9025)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Oct 17, 2023
1 parent 6366b63 commit 9fec602
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 5 deletions.
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,26 @@ def get_bucket_relative_path(s3_uri: str) -> str:
return "/".join(strip_s3_prefix(s3_uri).split("/")[1:])


def make_s3_urn(s3_uri: str, env: str) -> str:
def make_s3_urn(s3_uri: str, env: str, remove_extension: bool = True) -> str:
s3_name = strip_s3_prefix(s3_uri)

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

name, extension = os.path.splitext(s3_name)

if extension != "":
if remove_extension and extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"


def make_s3_urn_for_lineage(s3_uri: str, env: str) -> str:
# Ideally this is the implementation for all S3 URNs
# Don't feel comfortable changing `make_s3_urn` for glue, sagemaker, and athena
return make_s3_urn(s3_uri, env, remove_extension=False)


def get_bucket_name(s3_uri: str) -> str:
if not is_s3_uri(s3_uri):
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
SnowflakeEdition,
Expand Down Expand Up @@ -652,7 +652,9 @@ def get_external_upstreams(self, external_lineage: Set[str]) -> List[UpstreamCla
# For now, populate only for S3
if external_lineage_entry.startswith("s3://"):
external_upstream_table = UpstreamClass(
dataset=make_s3_urn(external_lineage_entry, self.config.env),
dataset=make_s3_urn_for_lineage(
external_lineage_entry, self.config.env
),
type=DatasetLineageTypeClass.COPY,
)
external_upstreams.append(external_upstream_table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation.",
)

include_external_lineage: bool = pydantic.Field(
default=True,
description=(
"Option to enable/disable lineage generation for external tables."
" Only external S3 tables are supported at the moment."
),
)

include_notebooks: bool = pydantic.Field(
default=False,
description="Ingest notebooks, represented as DataHub datasets.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ALLOWED_STATEMENT_TYPES,
Catalog,
Column,
ExternalTableReference,
Metastore,
Notebook,
Query,
Expand Down Expand Up @@ -248,6 +249,13 @@ def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
)
if table_ref:
table.upstreams[table_ref] = {}
elif "fileInfo" in item:
external_ref = ExternalTableReference.create_from_lineage(
item["fileInfo"]
)
if external_ref:
table.external_upstreams.add(external_ref)

for notebook in item.get("notebookInfos") or []:
table.upstream_notebooks.add(notebook["notebook_id"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CatalogType,
ColumnTypeName,
DataSourceFormat,
SecurableType,
TableType,
)
from databricks.sdk.service.sql import QueryStatementType
Expand Down Expand Up @@ -176,6 +177,35 @@ def external_path(self) -> str:
return f"{self.catalog}/{self.schema}/{self.table}"


@dataclass(frozen=True, order=True)
class ExternalTableReference:
path: str
has_permission: bool
name: Optional[str]
type: Optional[SecurableType]
storage_location: Optional[str]

@classmethod
def create_from_lineage(cls, d: dict) -> Optional["ExternalTableReference"]:
try:
securable_type: Optional[SecurableType]
try:
securable_type = SecurableType(d.get("securable_type", "").lower())
except ValueError:
securable_type = None

return cls(
path=d["path"],
has_permission=d.get("has_permission") or True,
name=d.get("securable_name"),
type=securable_type,
storage_location=d.get("storage_location"),
)
except Exception as e:
logger.warning(f"Failed to create ExternalTableReference from {d}: {e}")
return None


@dataclass
class Table(CommonProperty):
schema: Schema
Expand All @@ -193,6 +223,7 @@ class Table(CommonProperty):
view_definition: Optional[str]
properties: Dict[str, str]
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
external_upstreams: Set[ExternalTableReference] = field(default_factory=set)
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
downstream_notebooks: Set[NotebookId] = field(default_factory=set)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")

num_column_lineage_skipped_column_count: int = 0
num_external_upstreams_lacking_permissions: int = 0
num_external_upstreams_unsupported: int = 0

num_queries: int = 0
num_queries_dropped_parse_failure: int = 0
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
Expand Down Expand Up @@ -455,6 +456,28 @@ def _generate_lineage_aspect(
)
)

if self.config.include_external_lineage:
for external_ref in table.external_upstreams:
if not external_ref.has_permission or not external_ref.path:
self.report.num_external_upstreams_lacking_permissions += 1
logger.warning(
f"Lacking permissions for external file upstream on {table.ref}"
)
elif external_ref.path.startswith("s3://"):
upstreams.append(
UpstreamClass(
dataset=make_s3_urn_for_lineage(
external_ref.path, self.config.env
),
type=DatasetLineageTypeClass.COPY,
)
)
else:
self.report.num_external_upstreams_unsupported += 1
logger.warning(
f"Unsupported external file upstream on {table.ref}: {external_ref.path}"
)

if upstreams:
return UpstreamLineageClass(
upstreams=upstreams,
Expand Down

0 comments on commit 9fec602

Please sign in to comment.