Skip to content

Commit

Permalink
fix(ingest/snowflake): propagate table list from main to query extrac…
Browse files Browse the repository at this point in the history
…tor (#11222)
  • Loading branch information
hsheth2 authored Aug 23, 2024
1 parent e0c13fd commit 1a09cb2
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ClassificationReportMixin:
class ClassificationSourceConfigMixin(ConfigModel):
classification: ClassificationConfig = Field(
default=ClassificationConfig(),
description="For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
description="For details, refer to [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def _populate_external_lineage_from_copy_history(
def _process_external_lineage_result_row(
cls,
db_row: dict,
discovered_tables: Optional[List[str]],
discovered_tables: Optional[Collection[str]],
identifiers: SnowflakeIdentifierBuilder,
) -> Optional[KnownLineageMapping]:
# key is the down-stream table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(
self.report = SnowflakeQueriesExtractorReport()
self.filters = filters
self.identifiers = identifiers
self.discovered_tables = discovered_tables
self.discovered_tables = set(discovered_tables) if discovered_tables else None

self._structured_report = structured_report

Expand Down Expand Up @@ -175,10 +175,24 @@ def local_temp_path(self) -> pathlib.Path:
return path

def is_temp_table(self, name: str) -> bool:
return any(
if any(
re.match(pattern, name, flags=re.IGNORECASE)
for pattern in self.config.temporary_tables_pattern
)
):
return True

# This is also a temp table if
# 1. this name would be allowed by the dataset patterns, and
# 2. we have a list of discovered tables, and
# 3. it's not in the discovered tables list
if (
self.filters.is_dataset_pattern_allowed(name, SnowflakeObjectDomain.TABLE)
and self.discovered_tables
and name not in self.discovered_tables
):
return True

return False

def is_allowed_table(self, name: str) -> bool:
if self.discovered_tables and name not in self.discovered_tables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
)

# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
Expand Down

0 comments on commit 1a09cb2

Please sign in to comment.