Skip to content

Commit

Permalink
fix(ingest/bigquery): Filter out fine grained lineage with no upstrea…
Browse files Browse the repository at this point in the history
…ms (#8758)
  • Loading branch information
asikowitz authored Aug 31, 2023
1 parent 21b2851 commit a4e7268
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def make_lineage_edges_from_parsing_result(
column_mapping=frozenset(
LineageEdgeColumnMapping(out_column=out_column, in_columns=in_columns)
for out_column, in_columns in column_mapping.items()
if in_columns
),
auditStamp=audit_stamp,
type=lineage_type,
Expand Down
50 changes: 44 additions & 6 deletions metadata-ingestion/tests/unit/test_bigquery_lineage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
from typing import Dict, List, Set

import pytest

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigQueryTableRef,
QueryEvent,
Expand All @@ -14,15 +16,17 @@
from datahub.utilities.sqlglot_lineage import SchemaResolver


def test_lineage_with_timestamps():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)
lineage_entries: List[QueryEvent] = [
@pytest.fixture
def lineage_entries() -> List[QueryEvent]:
return [
QueryEvent(
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
actor_email="[email protected]",
query="testQuery",
query="""
INSERT INTO `my_project.my_dataset.my_table`
SELECT first.a, second.b FROM `my_project.my_dataset.my_source_table1` first
LEFT JOIN `my_project.my_dataset.my_source_table2` second ON first.id = second.id
""",
statementType="SELECT",
project_id="proj_12344",
end_time=None,
Expand Down Expand Up @@ -73,6 +77,12 @@ def test_lineage_with_timestamps():
),
]


def test_lineage_with_timestamps(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)

bq_table = BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
)
Expand All @@ -90,3 +100,31 @@ def test_lineage_with_timestamps():
)
assert upstream_lineage
assert len(upstream_lineage.upstreams) == 4


def test_column_level_lineage(lineage_entries: List[QueryEvent]) -> None:
config = BigQueryV2Config(extract_column_lineage=True, incremental_lineage=False)
report = BigQueryV2Report()
extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(config, report)

bq_table = BigQueryTableRef.from_string_name(
"projects/my_project/datasets/my_dataset/tables/my_table"
)

lineage_map: Dict[str, Set[LineageEdge]] = extractor._create_lineage_map(
lineage_entries[:1],
sql_parser_schema_resolver=SchemaResolver(platform="bigquery"),
)

upstream_lineage = extractor.get_lineage_for_table(
bq_table=bq_table,
bq_table_urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.my_dataset.my_table,PROD)",
lineage_metadata=lineage_map,
platform="bigquery",
)
assert upstream_lineage
assert len(upstream_lineage.upstreams) == 2
assert (
upstream_lineage.fineGrainedLineages
and len(upstream_lineage.fineGrainedLineages) == 2
)

0 comments on commit a4e7268

Please sign in to comment.