Skip to content

Commit

Permalink
feat(ingest): refactor + simplify incremental lineage helper (#8976)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 10, 2023
1 parent 8d175ef commit 57f855e
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 145 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import copy
from typing import Dict, Iterable, Optional

from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
FineGrainedLineageClass,
MetadataChangeEventClass,
SystemMetadataClass,
UpstreamClass,
UpstreamLineageClass,
)
from datahub.specific.dataset import DatasetPatchBuilder


def _convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
patch_builder = DatasetPatchBuilder(urn, system_metadata)
for upstream in aspect.upstreams:
patch_builder.add_upstream_lineage(upstream)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp)


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
return datahub_guid(
{
"upstreams": sorted(fine_upstream.upstreams or []),
"downstreams": sorted(fine_upstream.downstreams or []),
"transformOperation": fine_upstream.transformOperation,
}
)


def _merge_upstream_lineage(
new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass
) -> UpstreamLineageClass:
merged_aspect = copy.deepcopy(gms_aspect)

upstreams_map: Dict[str, UpstreamClass] = {
upstream.dataset: upstream for upstream in merged_aspect.upstreams
}

upstreams_updated = False
fine_upstreams_updated = False

for table_upstream in new_aspect.upstreams:
if table_upstream.dataset not in upstreams_map or (
table_upstream.auditStamp.time
> upstreams_map[table_upstream.dataset].auditStamp.time
):
upstreams_map[table_upstream.dataset] = table_upstream
upstreams_updated = True

if upstreams_updated:
merged_aspect.upstreams = list(upstreams_map.values())

if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages:
fine_upstreams_map: Dict[str, FineGrainedLineageClass] = {
get_fine_grained_lineage_key(fine_upstream): fine_upstream
for fine_upstream in merged_aspect.fineGrainedLineages
}
for column_upstream in new_aspect.fineGrainedLineages:
column_upstream_key = get_fine_grained_lineage_key(column_upstream)

if column_upstream_key not in fine_upstreams_map or (
column_upstream.confidenceScore
> fine_upstreams_map[column_upstream_key].confidenceScore
):
fine_upstreams_map[column_upstream_key] = column_upstream
fine_upstreams_updated = True

if fine_upstreams_updated:
merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values())
else:
merged_aspect.fineGrainedLineages = (
new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages
)

return merged_aspect


def _lineage_wu_via_read_modify_write(
graph: Optional[DataHubGraph],
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
else:
new_aspect = aspect

return MetadataChangeProposalWrapper(
entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata
).as_workunit()


def auto_incremental_lineage(
graph: Optional[DataHubGraph],
incremental_lineage: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not incremental_lineage:
yield from stream
return # early exit

for wu in stream:
lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type(
UpstreamLineageClass
)
urn = wu.get_urn()

if lineage_aspect:
if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll emit upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
138 changes: 1 addition & 137 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import copy
import logging
from datetime import datetime, timezone
from typing import (
Expand All @@ -16,32 +15,22 @@
)

from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mce_builder import (
datahub_guid,
make_dataplatform_instance_urn,
set_aspect,
)
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsClass,
BrowsePathsV2Class,
ChangeTypeClass,
ContainerClass,
DatasetUsageStatisticsClass,
FineGrainedLineageClass,
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
SystemMetadataClass,
TagKeyClass,
TimeWindowSizeClass,
UpstreamClass,
UpstreamLineageClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.telemetry import telemetry
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
Expand Down Expand Up @@ -377,128 +366,3 @@ def _prepend_platform_instance(
return [BrowsePathEntryClass(id=urn, urn=urn)] + entries

return entries


def auto_incremental_lineage(
graph: Optional[DataHubGraph],
incremental_lineage: bool,
include_column_level_lineage: bool,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not incremental_lineage:
yield from stream
return # early exit

for wu in stream:
lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type(
UpstreamLineageClass
)
urn = wu.get_urn()

if lineage_aspect:
if isinstance(wu.metadata, MetadataChangeEventClass):
set_aspect(
wu.metadata, None, UpstreamLineageClass
) # we'll emit upstreamLineage separately below
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if include_column_level_lineage else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu


def _convert_upstream_lineage_to_patch(
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
patch_builder = DatasetPatchBuilder(urn, system_metadata)
for upstream in aspect.upstreams:
patch_builder.add_upstream_lineage(upstream)
mcp = next(iter(patch_builder.build()))
return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp)


def _lineage_wu_via_read_modify_write(
graph: Optional[DataHubGraph],
urn: str,
aspect: UpstreamLineageClass,
system_metadata: Optional[SystemMetadataClass],
) -> MetadataWorkUnit:
if graph is None:
raise ValueError(
"Failed to handle incremental lineage, DataHubGraph is missing. "
"Use `datahub-rest` sink OR provide `datahub-api` config in recipe. "
)
gms_aspect = graph.get_aspect(urn, UpstreamLineageClass)
if gms_aspect:
new_aspect = _merge_upstream_lineage(aspect, gms_aspect)
else:
new_aspect = aspect

return MetadataChangeProposalWrapper(
entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata
).as_workunit()


def _merge_upstream_lineage(
new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass
) -> UpstreamLineageClass:
merged_aspect = copy.deepcopy(gms_aspect)

upstreams_map: Dict[str, UpstreamClass] = {
upstream.dataset: upstream for upstream in merged_aspect.upstreams
}

upstreams_updated = False
fine_upstreams_updated = False

for table_upstream in new_aspect.upstreams:
if table_upstream.dataset not in upstreams_map or (
table_upstream.auditStamp.time
> upstreams_map[table_upstream.dataset].auditStamp.time
):
upstreams_map[table_upstream.dataset] = table_upstream
upstreams_updated = True

if upstreams_updated:
merged_aspect.upstreams = list(upstreams_map.values())

if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages:
fine_upstreams_map: Dict[str, FineGrainedLineageClass] = {
get_fine_grained_lineage_key(fine_upstream): fine_upstream
for fine_upstream in merged_aspect.fineGrainedLineages
}
for column_upstream in new_aspect.fineGrainedLineages:
column_upstream_key = get_fine_grained_lineage_key(column_upstream)

if column_upstream_key not in fine_upstreams_map or (
column_upstream.confidenceScore
> fine_upstreams_map[column_upstream_key].confidenceScore
):
fine_upstreams_map[column_upstream_key] = column_upstream
fine_upstreams_updated = True

if fine_upstreams_updated:
merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values())
else:
merged_aspect.fineGrainedLineages = (
new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages
)

return merged_aspect


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
return datahub_guid(
{
"upstreams": sorted(fine_upstream.upstreams or []),
"downstreams": sorted(fine_upstream.downstreams or []),
"transformOperation": fine_upstream.transformOperation,
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand All @@ -36,7 +37,6 @@
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import auto_incremental_lineage
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.glossary.classification_mixin import ClassificationHandler
from datahub.ingestion.source.common.subtypes import (
Expand Down Expand Up @@ -517,8 +517,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
self.config.include_column_lineage
or self.config.include_view_column_lineage,
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_incremental_lineage
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.sink.file import write_metadata_file
from tests.test_helpers import mce_helpers
Expand Down Expand Up @@ -88,7 +88,6 @@ def test_incremental_table_lineage(tmp_path, pytestconfig):
processed_wus = auto_incremental_lineage(
graph=None,
incremental_lineage=True,
include_column_level_lineage=False,
stream=[
MetadataChangeProposalWrapper(
entityUrn=urn, aspect=aspect, systemMetadata=system_metadata
Expand Down Expand Up @@ -146,7 +145,6 @@ def test_incremental_column_level_lineage(
processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True,
include_column_level_lineage=True,
stream=[
MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
Expand Down Expand Up @@ -184,7 +182,6 @@ def test_incremental_column_lineage_less_upstreams_in_gms_aspect(
processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True,
include_column_level_lineage=True,
stream=[
MetadataChangeProposalWrapper(
entityUrn=urn, aspect=aspect, systemMetadata=system_metadata
Expand Down Expand Up @@ -227,7 +224,6 @@ def test_incremental_column_lineage_more_upstreams_in_gms_aspect(
processed_wus = auto_incremental_lineage(
graph=mock_graph,
incremental_lineage=True,
include_column_level_lineage=True,
stream=[
MetadataChangeProposalWrapper(
entityUrn=urn, aspect=aspect, systemMetadata=system_metadata
Expand Down

0 comments on commit 57f855e

Please sign in to comment.