Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): missing view downstream cll if platform instance is set #8966

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
SnowflakeEdition,
SnowflakeObjectDomain,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
Expand Down Expand Up @@ -53,7 +53,6 @@
sqlglot_lineage,
)
from datahub.utilities.time import ts_millis_to_datetime
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -196,19 +195,6 @@ def get_table_upstream_workunits(
f"Upstream lineage detected for {self.report.num_tables_with_upstreams} tables.",
)

def _gen_workunit_from_sql_parsing_result(
self,
dataset_identifier: str,
result: SqlParsingResult,
) -> MetadataWorkUnit:
upstreams, fine_upstreams = self.get_upstreams_from_sql_parsing_result(
self.dataset_urn_builder(dataset_identifier), result
)
self.report.num_views_with_upstreams += 1
return self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)

def _gen_workunits_from_query_result(
self,
discovered_assets: Collection[str],
Expand Down Expand Up @@ -242,26 +228,39 @@ def get_view_upstream_workunits(
schema_resolver: SchemaResolver,
view_definitions: MutableMapping[str, str],
) -> Iterable[MetadataWorkUnit]:
views_processed = set()
views_failed_parsing = set()
if self.config.include_view_column_lineage:
with PerfTimer() as timer:
builder = SqlParsingBuilder(
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
for view_identifier, view_definition in view_definitions.items():
result = self._run_sql_parser(
view_identifier, view_definition, schema_resolver
)
if result:
views_processed.add(view_identifier)
yield self._gen_workunit_from_sql_parsing_result(
view_identifier, result
if result and result.out_tables:
self.report.num_views_with_upstreams += 1
# This does not yield any workunits but we use
# yield here to execute this method
yield from builder.process_sql_parsing_result(
result=result,
query=view_definition,
is_view_ddl=True,
)
else:
views_failed_parsing.add(view_identifier)

yield from builder.gen_workunits()
self.report.view_lineage_parse_secs = timer.elapsed_seconds()

with PerfTimer() as timer:
results = self._fetch_upstream_lineages_for_views()

if results:
yield from self._gen_workunits_from_query_result(
set(discovered_views) - views_processed,
views_failed_parsing,
results,
upstream_for_view=True,
)
Expand Down Expand Up @@ -349,39 +348,6 @@ def get_upstreams_from_query_result_row(

return upstreams, fine_upstreams

def get_upstreams_from_sql_parsing_result(
self, downstream_table_urn: str, result: SqlParsingResult
) -> Tuple[List[UpstreamClass], List[FineGrainedLineage]]:
# Note: This ignores the out_tables section of the sql parsing result.
upstreams = [
UpstreamClass(dataset=upstream_table_urn, type=DatasetLineageTypeClass.VIEW)
for upstream_table_urn in set(result.in_tables)
]

# Maps downstream_col -> [upstream_col]
fine_lineage: Dict[str, Set[SnowflakeColumnId]] = defaultdict(set)
for column_lineage in result.column_lineage or []:
out_column = column_lineage.downstream.column
for upstream_column_info in column_lineage.upstreams:
upstream_table_name = DatasetUrn.create_from_string(
upstream_column_info.table
).get_dataset_name()
fine_lineage[out_column].add(
SnowflakeColumnId(
columnName=upstream_column_info.column,
objectName=upstream_table_name,
objectDomain=SnowflakeObjectDomain.VIEW.value,
)
)
fine_upstreams = [
self.build_finegrained_lineage(
downstream_table_urn, downstream_col, upstream_cols
)
for downstream_col, upstream_cols in fine_lineage.items()
]

return upstreams, list(filter(None, fine_upstreams))

def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None:
with PerfTimer() as timer:
self.report.num_external_table_edges_scanned = 0
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def default_query_results( # noqa: C901
"name": "VIEW_{}".format(view_idx),
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0),
"comment": "Comment for View",
"text": None,
"text": f"create view view_{view_idx} as select * from table_{view_idx}",
}
for view_idx in range(1, num_views + 1)
]
Expand Down
Loading
Loading