diff --git a/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py b/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py new file mode 100644 index 0000000000000..9478c5cf7efa2 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/api/incremental_lineage_helper.py @@ -0,0 +1,139 @@ +import copy +from typing import Dict, Iterable, Optional + +from datahub.emitter.mce_builder import datahub_guid, set_aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import ( + FineGrainedLineageClass, + MetadataChangeEventClass, + SystemMetadataClass, + UpstreamClass, + UpstreamLineageClass, +) +from datahub.specific.dataset import DatasetPatchBuilder + + +def _convert_upstream_lineage_to_patch( + urn: str, + aspect: UpstreamLineageClass, + system_metadata: Optional[SystemMetadataClass], +) -> MetadataWorkUnit: + patch_builder = DatasetPatchBuilder(urn, system_metadata) + for upstream in aspect.upstreams: + patch_builder.add_upstream_lineage(upstream) + mcp = next(iter(patch_builder.build())) + return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp) + + +def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str: + return datahub_guid( + { + "upstreams": sorted(fine_upstream.upstreams or []), + "downstreams": sorted(fine_upstream.downstreams or []), + "transformOperation": fine_upstream.transformOperation, + } + ) + + +def _merge_upstream_lineage( + new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass +) -> UpstreamLineageClass: + merged_aspect = copy.deepcopy(gms_aspect) + + upstreams_map: Dict[str, UpstreamClass] = { + upstream.dataset: upstream for upstream in merged_aspect.upstreams + } + + upstreams_updated = False + fine_upstreams_updated = False + + for table_upstream in new_aspect.upstreams: + if table_upstream.dataset not in upstreams_map or ( + table_upstream.auditStamp.time + > upstreams_map[table_upstream.dataset].auditStamp.time + ): + upstreams_map[table_upstream.dataset] = table_upstream + upstreams_updated = True + + if upstreams_updated: + merged_aspect.upstreams = list(upstreams_map.values()) + + if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages: + fine_upstreams_map: Dict[str, FineGrainedLineageClass] = { + get_fine_grained_lineage_key(fine_upstream): fine_upstream + for fine_upstream in merged_aspect.fineGrainedLineages + } + for column_upstream in new_aspect.fineGrainedLineages: + column_upstream_key = get_fine_grained_lineage_key(column_upstream) + + if column_upstream_key not in fine_upstreams_map or ( + column_upstream.confidenceScore + > fine_upstreams_map[column_upstream_key].confidenceScore + ): + fine_upstreams_map[column_upstream_key] = column_upstream + fine_upstreams_updated = True + + if fine_upstreams_updated: + merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values()) + else: + merged_aspect.fineGrainedLineages = ( + new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages + ) + + return merged_aspect + + +def _lineage_wu_via_read_modify_write( + graph: Optional[DataHubGraph], + urn: str, + aspect: UpstreamLineageClass, + system_metadata: Optional[SystemMetadataClass], +) -> MetadataWorkUnit: + if graph is None: + raise ValueError( + "Failed to handle incremental lineage, DataHubGraph is missing. " + "Use `datahub-rest` sink OR provide `datahub-api` config in recipe. " + ) + gms_aspect = graph.get_aspect(urn, UpstreamLineageClass) + if gms_aspect: + new_aspect = _merge_upstream_lineage(aspect, gms_aspect) + else: + new_aspect = aspect + + return MetadataChangeProposalWrapper( + entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata + ).as_workunit() + + +def auto_incremental_lineage( + graph: Optional[DataHubGraph], + incremental_lineage: bool, + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + if not incremental_lineage: + yield from stream + return # early exit + + for wu in stream: + lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type( + UpstreamLineageClass + ) + urn = wu.get_urn() + + if lineage_aspect: + if isinstance(wu.metadata, MetadataChangeEventClass): + set_aspect( + wu.metadata, None, UpstreamLineageClass + ) # we'll emit upstreamLineage separately below + if len(wu.metadata.proposedSnapshot.aspects) > 0: + yield wu + + yield _lineage_wu_via_read_modify_write( + graph, urn, lineage_aspect, wu.metadata.systemMetadata + ) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch( + urn, lineage_aspect, wu.metadata.systemMetadata + ) + else: + yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 42f970e97c95f..7fc15cf829678 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -1,4 +1,3 @@ -import copy import logging from datetime import datetime, timezone from typing import ( @@ -16,14 +15,9 @@ ) from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mce_builder import ( - datahub_guid, - make_dataplatform_instance_urn, - set_aspect, -) +from datahub.emitter.mce_builder import make_dataplatform_instance_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( BrowsePathEntryClass, BrowsePathsClass, @@ -31,17 +25,12 @@ ChangeTypeClass, ContainerClass, DatasetUsageStatisticsClass, - FineGrainedLineageClass, MetadataChangeEventClass, MetadataChangeProposalClass, StatusClass, - SystemMetadataClass, TagKeyClass, TimeWindowSizeClass, - UpstreamClass, - UpstreamLineageClass, ) -from datahub.specific.dataset import DatasetPatchBuilder from datahub.telemetry import telemetry from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.tag_urn import TagUrn @@ -377,128 +366,3 @@ def _prepend_platform_instance( return [BrowsePathEntryClass(id=urn, urn=urn)] + entries return entries - - -def auto_incremental_lineage( - graph: Optional[DataHubGraph], - incremental_lineage: bool, - include_column_level_lineage: bool, - stream: Iterable[MetadataWorkUnit], -) -> Iterable[MetadataWorkUnit]: - if not incremental_lineage: - yield from stream - return # early exit - - for wu in stream: - lineage_aspect: Optional[UpstreamLineageClass] = wu.get_aspect_of_type( - UpstreamLineageClass - ) - urn = wu.get_urn() - - if lineage_aspect: - if isinstance(wu.metadata, MetadataChangeEventClass): - set_aspect( - wu.metadata, None, UpstreamLineageClass - ) # we'll emit upstreamLineage separately below - if len(wu.metadata.proposedSnapshot.aspects) > 0: - yield wu - - yield _lineage_wu_via_read_modify_write( - graph, urn, lineage_aspect, wu.metadata.systemMetadata - ) if include_column_level_lineage else _convert_upstream_lineage_to_patch( - urn, lineage_aspect, wu.metadata.systemMetadata - ) - else: - yield wu - - -def _convert_upstream_lineage_to_patch( - urn: str, - aspect: UpstreamLineageClass, - system_metadata: Optional[SystemMetadataClass], -) -> MetadataWorkUnit: - patch_builder = DatasetPatchBuilder(urn, system_metadata) - for upstream in aspect.upstreams: - patch_builder.add_upstream_lineage(upstream) - mcp = next(iter(patch_builder.build())) - return MetadataWorkUnit(id=f"{urn}-upstreamLineage", mcp_raw=mcp) - - -def _lineage_wu_via_read_modify_write( - graph: Optional[DataHubGraph], - urn: str, - aspect: UpstreamLineageClass, - system_metadata: Optional[SystemMetadataClass], -) -> MetadataWorkUnit: - if graph is None: - raise ValueError( - "Failed to handle incremental lineage, DataHubGraph is missing. " - "Use `datahub-rest` sink OR provide `datahub-api` config in recipe. " - ) - gms_aspect = graph.get_aspect(urn, UpstreamLineageClass) - if gms_aspect: - new_aspect = _merge_upstream_lineage(aspect, gms_aspect) - else: - new_aspect = aspect - - return MetadataChangeProposalWrapper( - entityUrn=urn, aspect=new_aspect, systemMetadata=system_metadata - ).as_workunit() - - -def _merge_upstream_lineage( - new_aspect: UpstreamLineageClass, gms_aspect: UpstreamLineageClass -) -> UpstreamLineageClass: - merged_aspect = copy.deepcopy(gms_aspect) - - upstreams_map: Dict[str, UpstreamClass] = { - upstream.dataset: upstream for upstream in merged_aspect.upstreams - } - - upstreams_updated = False - fine_upstreams_updated = False - - for table_upstream in new_aspect.upstreams: - if table_upstream.dataset not in upstreams_map or ( - table_upstream.auditStamp.time - > upstreams_map[table_upstream.dataset].auditStamp.time - ): - upstreams_map[table_upstream.dataset] = table_upstream - upstreams_updated = True - - if upstreams_updated: - merged_aspect.upstreams = list(upstreams_map.values()) - - if new_aspect.fineGrainedLineages and merged_aspect.fineGrainedLineages: - fine_upstreams_map: Dict[str, FineGrainedLineageClass] = { - get_fine_grained_lineage_key(fine_upstream): fine_upstream - for fine_upstream in merged_aspect.fineGrainedLineages - } - for column_upstream in new_aspect.fineGrainedLineages: - column_upstream_key = get_fine_grained_lineage_key(column_upstream) - - if column_upstream_key not in fine_upstreams_map or ( - column_upstream.confidenceScore - > fine_upstreams_map[column_upstream_key].confidenceScore - ): - fine_upstreams_map[column_upstream_key] = column_upstream - fine_upstreams_updated = True - - if fine_upstreams_updated: - merged_aspect.fineGrainedLineages = list(fine_upstreams_map.values()) - else: - merged_aspect.fineGrainedLineages = ( - new_aspect.fineGrainedLineages or gms_aspect.fineGrainedLineages - ) - - return merged_aspect - - -def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str: - return datahub_guid( - { - "upstreams": sorted(fine_upstream.upstreams or []), - "downstreams": sorted(fine_upstream.downstreams or []), - "transformOperation": fine_upstream.transformOperation, - } - ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index e0848b5f9ab34..a5c07d9a3870c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -27,6 +27,7 @@ platform_name, support_status, ) +from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, @@ -36,7 +37,6 @@ TestableSource, TestConnectionReport, ) -from datahub.ingestion.api.source_helpers import auto_incremental_lineage from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.glossary.classification_mixin import ClassificationHandler from datahub.ingestion.source.common.subtypes import ( @@ -517,8 +517,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: auto_incremental_lineage, self.ctx.graph, self.config.incremental_lineage, - self.config.include_column_lineage - or self.config.include_view_column_lineage, ), StaleEntityRemovalHandler.create( self, self.config, self.ctx diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py index 4078bda26c743..54a22d860285c 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_incremental_lineage_helper.py @@ -6,7 +6,7 @@ import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import make_dataset_urn, make_schema_field_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.source_helpers import auto_incremental_lineage +from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.sink.file import write_metadata_file from tests.test_helpers import mce_helpers @@ -88,7 +88,6 @@ def test_incremental_table_lineage(tmp_path, pytestconfig): processed_wus = auto_incremental_lineage( graph=None, incremental_lineage=True, - include_column_level_lineage=False, stream=[ MetadataChangeProposalWrapper( entityUrn=urn, aspect=aspect, systemMetadata=system_metadata @@ -146,7 +145,6 @@ def test_incremental_column_level_lineage( processed_wus = auto_incremental_lineage( graph=mock_graph, incremental_lineage=True, - include_column_level_lineage=True, stream=[ MetadataChangeProposalWrapper( entityUrn=dataset_urn, @@ -184,7 +182,6 @@ def test_incremental_column_lineage_less_upstreams_in_gms_aspect( processed_wus = auto_incremental_lineage( graph=mock_graph, incremental_lineage=True, - include_column_level_lineage=True, stream=[ MetadataChangeProposalWrapper( entityUrn=urn, aspect=aspect, systemMetadata=system_metadata @@ -227,7 +224,6 @@ def test_incremental_column_lineage_more_upstreams_in_gms_aspect( processed_wus = auto_incremental_lineage( graph=mock_graph, incremental_lineage=True, - include_column_level_lineage=True, stream=[ MetadataChangeProposalWrapper( entityUrn=urn, aspect=aspect, systemMetadata=system_metadata