From 1fe808de96fbc01b0f5ea5428395aec6ea9f3e16 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 17 Aug 2023 00:44:51 -0400 Subject: [PATCH 1/2] remove(ingest/snowflake): Remove legacy snowflake lineage --- .../source/snowflake/snowflake_config.py | 11 +- .../source/snowflake/snowflake_v2.py | 18 +- .../integration/snowflake/test_snowflake.py | 2 - .../snowflake/test_snowflake_failures.py | 1 - .../test_snowflake_failures_legacy_lineage.py | 291 ------------------ .../test_snowflake_legacy_lineage.py | 207 ------------- 6 files changed, 6 insertions(+), 524 deletions(-) delete mode 100644 metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py delete mode 100644 metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index e8e80e172a9ce..7699d89ce9ac2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -91,13 +91,8 @@ class SnowflakeV2Config( description="Whether `schema_pattern` is matched against fully qualified schema name `.`.", ) - use_legacy_lineage_method: bool = Field( - default=False, - description=( - "Whether to use the legacy lineage computation method. " - "By default, uses new optimised lineage extraction method that requires less ingestion process memory. " - "Table-to-view and view-to-view column-level lineage are not supported with the legacy method." - ), + _use_legacy_lineage_method_removed = pydantic_removed_field( + "use_legacy_lineage_method" ) validate_upstreams_against_patterns: bool = Field( @@ -113,7 +108,7 @@ class SnowflakeV2Config( # This is required since access_history table does not capture whether the table was temporary table. temporary_tables_pattern: List[str] = Field( default=DEFAULT_TABLES_DENY_LIST, - description="[Advanced] Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to match the entire table name in database.schema.table format. Defaults are to set in such a way to ignore the temporary staging tables created by known ETL tools. Not used if `use_legacy_lineage_method=True`", + description="[Advanced] Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to match the entire table name in database.schema.table format. Defaults are to set in such a way to ignore the temporary staging tables created by known ETL tools.", ) rename_upstreams_deny_pattern_to_temporary_table_pattern = pydantic_renamed_field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 7dd51d5b20e8e..40c4d32525a51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -51,9 +51,6 @@ SnowflakeV2Config, TagOption, ) -from datahub.ingestion.source.snowflake.snowflake_lineage_legacy import ( - SnowflakeLineageExtractor as SnowflakeLineageLegacyExtractor, -) from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import ( SnowflakeLineageExtractor, ) @@ -240,19 +237,10 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # For database, schema, tables, views, etc self.data_dictionary = SnowflakeDataDictionary() - self.lineage_extractor: Union[ - SnowflakeLineageExtractor, SnowflakeLineageLegacyExtractor - ] if config.include_table_lineage: - # For lineage - if self.config.use_legacy_lineage_method: - self.lineage_extractor = SnowflakeLineageLegacyExtractor( - config, self.report, dataset_urn_builder=self.gen_dataset_urn - ) - else: - self.lineage_extractor = SnowflakeLineageExtractor( - config, self.report, dataset_urn_builder=self.gen_dataset_urn - ) + self.lineage_extractor = SnowflakeLineageExtractor( + config, self.report, dataset_urn_builder=self.gen_dataset_urn + ) if config.include_usage_stats or config.include_operational_stats: self.usage_extractor = SnowflakeUsageExtractor( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index 53b2bcb236cd9..6135b0b3b3274 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -121,7 +121,6 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): include_table_lineage=True, include_view_lineage=True, include_usage_stats=True, - use_legacy_lineage_method=False, validate_upstreams_against_patterns=False, include_operational_stats=True, email_as_user_identifier=True, @@ -213,7 +212,6 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_ include_column_lineage=False, include_views=False, include_view_lineage=False, - use_legacy_lineage_method=False, include_usage_stats=False, include_operational_stats=False, start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index 73a261bb3cb6e..4963e71ae4d96 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -55,7 +55,6 @@ def snowflake_pipeline_config(tmp_path): schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), include_view_lineage=False, include_usage_stats=False, - use_legacy_lineage_method=False, start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( tzinfo=timezone.utc ), diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py deleted file mode 100644 index a5993793e574d..0000000000000 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures_legacy_lineage.py +++ /dev/null @@ -1,291 +0,0 @@ -from datetime import datetime, timezone -from typing import cast -from unittest import mock - -from freezegun import freeze_time -from pytest import fixture - -from datahub.configuration.common import AllowDenyPattern, DynamicTypedConfig -from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig -from datahub.ingestion.source.snowflake import snowflake_query -from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config -from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery -from tests.integration.snowflake.common import ( - FROZEN_TIME, - NUM_TABLES, - default_query_results, -) - - -def query_permission_error_override(fn, override_for_query, error_msg): - def my_function(query): - if query in override_for_query: - raise Exception(error_msg) - else: - return fn(query) - - return my_function - - -def query_permission_response_override(fn, override_for_query, response): - def my_function(query): - if query in override_for_query: - return response - else: - return fn(query) - - return my_function - - -@fixture(scope="function") -def snowflake_pipeline_legacy_lineage_config(tmp_path): - output_file = tmp_path / "snowflake_test_events_permission_error.json" - config = PipelineConfig( - source=SourceConfig( - type="snowflake", - config=SnowflakeV2Config( - account_id="ABC12345.ap-south-1.aws", - username="TST_USR", - password="TST_PWD", - role="TEST_ROLE", - warehouse="TEST_WAREHOUSE", - include_technical_schema=True, - match_fully_qualified_names=True, - schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), - include_view_lineage=False, - include_usage_stats=False, - use_legacy_lineage_method=True, - start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( - tzinfo=timezone.utc - ), - end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace(tzinfo=timezone.utc), - ), - ), - sink=DynamicTypedConfig(type="file", config={"filename": str(output_file)}), - ) - return config - - -@freeze_time(FROZEN_TIME) -def test_snowflake_missing_role_access_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - # Snowflake connection fails role not granted error - mock_connect.side_effect = Exception( - "250001 (08001): Failed to connect to DB: abc12345.ap-south-1.snowflakecomputing.com:443. Role 'TEST_ROLE' specified in the connect string is not granted to this user. Contact your local system administrator, or attempt to login with another role, e.g. PUBLIC" - ) - - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() - - -@freeze_time(FROZEN_TIME) -def test_snowflake_missing_warehouse_access_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Current warehouse query leads to blank result - sf_cursor.execute.side_effect = query_permission_response_override( - default_query_results, - [SnowflakeQuery.current_warehouse()], - [(None,)], - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() - - -@freeze_time(FROZEN_TIME) -def test_snowflake_no_databases_with_access_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in listing databases - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [SnowflakeQuery.get_databases("TEST_DB")], - "Database 'TEST_DB' does not exist or not authorized.", - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() - - -@freeze_time(FROZEN_TIME) -def test_snowflake_no_tables_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in listing databases - no_tables_fn = query_permission_response_override( - default_query_results, - [SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB")], - [], - ) - sf_cursor.execute.side_effect = query_permission_response_override( - no_tables_fn, - [SnowflakeQuery.show_views_for_schema("TEST_SCHEMA", "TEST_DB")], - [], - ) - - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert "permission-error" in pipeline.source.get_report().failures.keys() - - -@freeze_time(FROZEN_TIME) -def test_snowflake_list_columns_error_causes_pipeline_warning( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in listing columns - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [ - SnowflakeQuery.columns_for_table( - "TABLE_{}".format(tbl_idx), "TEST_SCHEMA", "TEST_DB" - ) - for tbl_idx in range(1, NUM_TABLES + 1) - ], - "Database 'TEST_DB' does not exist or not authorized.", - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - pipeline.raise_from_status() # pipeline should not fail - assert ( - "Failed to get columns for table" - in pipeline.source.get_report().warnings.keys() - ) - - -@freeze_time(FROZEN_TIME) -def test_snowflake_list_primary_keys_error_causes_pipeline_warning( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in listing keys leads to warning - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [SnowflakeQuery.show_primary_keys_for_schema("TEST_SCHEMA", "TEST_DB")], - "Insufficient privileges to operate on TEST_DB", - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - pipeline.raise_from_status() # pipeline should not fail - assert ( - "Failed to get primary key for table" - in pipeline.source.get_report().warnings.keys() - ) - - -@freeze_time(FROZEN_TIME) -def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in getting lineage - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [ - snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654473600000, 1654586220000, True - ), - ], - "Database 'SNOWFLAKE' does not exist or not authorized.", - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert ( - "lineage-permission-error" in pipeline.source.get_report().failures.keys() - ) - - -@freeze_time(FROZEN_TIME) -def test_snowflake_missing_snowflake_operations_permission_causes_pipeline_failure( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in getting access history date range - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [snowflake_query.SnowflakeQuery.get_access_history_date_range()], - "Database 'SNOWFLAKE' does not exist or not authorized.", - ) - pipeline = Pipeline(snowflake_pipeline_legacy_lineage_config) - pipeline.run() - assert "usage-permission-error" in pipeline.source.get_report().failures.keys() - - -@freeze_time(FROZEN_TIME) -def test_snowflake_unexpected_snowflake_view_lineage_error_causes_pipeline_warning( - pytestconfig, - snowflake_pipeline_legacy_lineage_config, -): - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - # Error in getting view lineage - sf_cursor.execute.side_effect = query_permission_error_override( - default_query_results, - [snowflake_query.SnowflakeQuery.view_dependencies()], - "Unexpected Error", - ) - - snowflake_pipeline_config1 = snowflake_pipeline_legacy_lineage_config.copy() - cast( - SnowflakeV2Config, - cast(PipelineConfig, snowflake_pipeline_config1).source.config, - ).include_view_lineage = True - pipeline = Pipeline(snowflake_pipeline_config1) - pipeline.run() - pipeline.raise_from_status() # pipeline should not fail - assert "view-upstream-lineage" in pipeline.source.get_report().warnings.keys() diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py deleted file mode 100644 index 59da7ddf695d8..0000000000000 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_legacy_lineage.py +++ /dev/null @@ -1,207 +0,0 @@ -import random -from datetime import datetime, timezone -from unittest import mock - -import pandas as pd -import pytest -from freezegun import freeze_time - -from datahub.configuration.common import AllowDenyPattern, DynamicTypedConfig -from datahub.ingestion.glossary.classifier import ( - ClassificationConfig, - DynamicTypedClassifierConfig, -) -from datahub.ingestion.glossary.datahub_classifier import ( - DataHubClassifierConfig, - InfoTypeConfig, - PredictionFactorsAndWeights, - ValuesFactorConfig, -) -from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig -from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig -from datahub.ingestion.source.snowflake.snowflake_config import ( - SnowflakeV2Config, - TagOption, -) -from tests.integration.snowflake.common import FROZEN_TIME, default_query_results -from tests.integration.snowflake.test_snowflake import random_cloud_region, random_email -from tests.test_helpers import mce_helpers - - -@pytest.mark.integration -def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): - test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake" - - # Run the metadata ingestion pipeline. - output_file = tmp_path / "snowflake_test_events.json" - golden_file = test_resources_dir / "snowflake_golden.json" - - with mock.patch("snowflake.connector.connect") as mock_connect, mock.patch( - "datahub.ingestion.source.snowflake.snowflake_v2.SnowflakeV2Source.get_sample_values_for_table" - ) as mock_sample_values: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - - sf_cursor.execute.side_effect = default_query_results - - mock_sample_values.return_value = pd.DataFrame( - data={ - "col_1": [random.randint(1, 80) for i in range(20)], - "col_2": [random_email() for i in range(20)], - "col_3": [random_cloud_region() for i in range(20)], - } - ) - - datahub_classifier_config = DataHubClassifierConfig( - minimum_values_threshold=10, - confidence_level_threshold=0.58, - info_types_config={ - "Age": InfoTypeConfig( - Prediction_Factors_and_Weights=PredictionFactorsAndWeights( - Name=0, Values=1, Description=0, Datatype=0 - ) - ), - "CloudRegion": InfoTypeConfig( - Prediction_Factors_and_Weights=PredictionFactorsAndWeights( - Name=0, - Description=0, - Datatype=0, - Values=1, - ), - Values=ValuesFactorConfig( - prediction_type="regex", - regex=[ - r"(af|ap|ca|eu|me|sa|us)-(central|north|(north(?:east|west))|south|south(?:east|west)|east|west)-\d+" - ], - ), - ), - }, - ) - - pipeline = Pipeline( - config=PipelineConfig( - source=SourceConfig( - type="snowflake", - config=SnowflakeV2Config( - account_id="ABC12345.ap-south-1.aws", - username="TST_USR", - password="TST_PWD", - match_fully_qualified_names=True, - schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), - include_technical_schema=True, - include_table_lineage=True, - include_view_lineage=True, - include_usage_stats=True, - use_legacy_lineage_method=True, - validate_upstreams_against_patterns=False, - include_operational_stats=True, - start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( - tzinfo=timezone.utc - ), - end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace( - tzinfo=timezone.utc - ), - classification=ClassificationConfig( - enabled=True, - classifiers=[ - DynamicTypedClassifierConfig( - type="datahub", config=datahub_classifier_config - ) - ], - ), - profiling=GEProfilingConfig( - enabled=True, - profile_if_updated_since_days=None, - profile_table_row_limit=None, - profile_table_size_limit=None, - profile_table_level_only=True, - ), - extract_tags=TagOption.without_lineage, - ), - ), - sink=DynamicTypedConfig( - type="file", config={"filename": str(output_file)} - ), - ) - ) - pipeline.run() - pipeline.pretty_print_summary() - pipeline.raise_from_status() - - # Verify the output. - - mce_helpers.check_golden_file( - pytestconfig, - output_path=output_file, - golden_path=golden_file, - ignore_paths=[ - r"root\[\d+\]\['aspect'\]\['json'\]\['timestampMillis'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['fields'\]\[\d+\]\['glossaryTerms'\]\['auditStamp'\]\['time'\]", - r"root\[\d+\]\['systemMetadata'\]", - ], - ) - - -@freeze_time(FROZEN_TIME) -@pytest.mark.integration -def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_graph): - test_resources_dir = pytestconfig.rootpath / "tests/integration/snowflake" - - # Run the metadata ingestion pipeline. - output_file = tmp_path / "snowflake_privatelink_test_events.json" - golden_file = test_resources_dir / "snowflake_privatelink_golden.json" - - with mock.patch("snowflake.connector.connect") as mock_connect: - sf_connection = mock.MagicMock() - sf_cursor = mock.MagicMock() - mock_connect.return_value = sf_connection - sf_connection.cursor.return_value = sf_cursor - sf_cursor.execute.side_effect = default_query_results - - pipeline = Pipeline( - config=PipelineConfig( - source=SourceConfig( - type="snowflake", - config=SnowflakeV2Config( - account_id="ABC12345.ap-south-1.privatelink", - username="TST_USR", - password="TST_PWD", - schema_pattern=AllowDenyPattern(allow=["test_schema"]), - include_technical_schema=True, - include_table_lineage=True, - include_column_lineage=False, - include_views=False, - include_view_lineage=False, - use_legacy_lineage_method=True, - include_usage_stats=False, - include_operational_stats=False, - start_time=datetime(2022, 6, 6, 7, 17, 0, 0).replace( - tzinfo=timezone.utc - ), - end_time=datetime(2022, 6, 7, 7, 17, 0, 0).replace( - tzinfo=timezone.utc - ), - ), - ), - sink=DynamicTypedConfig( - type="file", config={"filename": str(output_file)} - ), - ) - ) - pipeline.run() - pipeline.pretty_print_summary() - pipeline.raise_from_status() - - # Verify the output. - - mce_helpers.check_golden_file( - pytestconfig, - output_path=output_file, - golden_path=golden_file, - ignore_paths=[], - ) From 1a219afb998d1edfb3bf0f5eb0649069d97bcd93 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Thu, 17 Aug 2023 01:20:46 -0400 Subject: [PATCH 2/2] more removals --- .../snowflake/snowflake_lineage_legacy.py | 664 ------------------ .../source/snowflake/snowflake_query.py | 29 - .../tests/integration/snowflake/common.py | 9 - 3 files changed, 702 deletions(-) delete mode 100644 metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py deleted file mode 100644 index 832a072c619f8..0000000000000 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_legacy.py +++ /dev/null @@ -1,664 +0,0 @@ -import json -import logging -from collections import defaultdict -from dataclasses import dataclass, field -from typing import Any, Callable, Dict, FrozenSet, Iterable, List, Optional, Set - -from pydantic import Field -from pydantic.error_wrappers import ValidationError -from snowflake.connector import SnowflakeConnection - -import datahub.emitter.mce_builder as builder -from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.s3_util import make_s3_urn -from datahub.ingestion.source.snowflake.constants import ( - LINEAGE_PERMISSION_ERROR, - SnowflakeEdition, - SnowflakeObjectDomain, -) -from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config -from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery -from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report -from datahub.ingestion.source.snowflake.snowflake_usage_v2 import ( - SnowflakeColumnReference, -) -from datahub.ingestion.source.snowflake.snowflake_utils import ( - SnowflakeCommonMixin, - SnowflakeConnectionMixin, - SnowflakePermissionError, - SnowflakeQueryMixin, -) -from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( - FineGrainedLineage, - FineGrainedLineageDownstreamType, - FineGrainedLineageUpstreamType, - UpstreamLineage, -) -from datahub.metadata.schema_classes import DatasetLineageTypeClass, UpstreamClass -from datahub.utilities.perf_timer import PerfTimer - -logger: logging.Logger = logging.getLogger(__name__) - - -class SnowflakeColumnWithLineage(SnowflakeColumnReference): - class Config: - # This is for backward compatibility and can be removed later - allow_population_by_field_name = True - - directSourceColumns: Optional[List[SnowflakeColumnReference]] = Field( - default=None, alias="directSources" - ) - - -@dataclass(frozen=True) -class SnowflakeColumnId: - columnName: str - objectName: str - objectDomain: Optional[str] = None - - -@dataclass(frozen=True) -class SnowflakeColumnFineGrainedLineage: - """ - Fie grained upstream of column, - which represents a transformation applied on input columns""" - - inputColumns: FrozenSet[SnowflakeColumnId] - # Transform function, query etc can be added here - - -@dataclass -class SnowflakeColumnUpstreams: - """All upstreams of a column""" - - upstreams: Set[SnowflakeColumnFineGrainedLineage] = field( - default_factory=set, init=False - ) - - def update_column_lineage( - self, directSourceColumns: List[SnowflakeColumnReference] - ) -> None: - input_columns = frozenset( - [ - SnowflakeColumnId( - upstream_col.columnName, - upstream_col.objectName, - upstream_col.objectDomain, - ) - for upstream_col in directSourceColumns - if upstream_col.objectName - ] - ) - if not input_columns: - return - upstream = SnowflakeColumnFineGrainedLineage(inputColumns=input_columns) - if upstream not in self.upstreams: - self.upstreams.add(upstream) - - -@dataclass -class SnowflakeUpstreamTable: - upstreamDataset: str - upstreamColumns: List[SnowflakeColumnReference] - downstreamColumns: List[SnowflakeColumnWithLineage] - - @classmethod - def from_dict( - cls, - dataset: str, - upstreams_columns_json: Optional[str], - downstream_columns_json: Optional[str], - ) -> "SnowflakeUpstreamTable": - try: - upstreams_columns_list = [] - downstream_columns_list = [] - if upstreams_columns_json is not None: - upstreams_columns_list = json.loads(upstreams_columns_json) - if downstream_columns_json is not None: - downstream_columns_list = json.loads(downstream_columns_json) - - table_with_upstreams = cls( - dataset, - [ - SnowflakeColumnReference.parse_obj(col) - for col in upstreams_columns_list - ], - [ - SnowflakeColumnWithLineage.parse_obj(col) - for col in downstream_columns_list - ], - ) - except ValidationError: - # Earlier versions of column lineage did not include columnName, only columnId - table_with_upstreams = cls(dataset, [], []) - return table_with_upstreams - - -@dataclass -class SnowflakeTableLineage: - # key: upstream table name - upstreamTables: Dict[str, SnowflakeUpstreamTable] = field( - default_factory=dict, init=False - ) - - # key: downstream column name - columnLineages: Dict[str, SnowflakeColumnUpstreams] = field( - default_factory=lambda: defaultdict(SnowflakeColumnUpstreams), init=False - ) - - def update_lineage( - self, table: SnowflakeUpstreamTable, include_column_lineage: bool = True - ) -> None: - if table.upstreamDataset not in self.upstreamTables.keys(): - self.upstreamTables[table.upstreamDataset] = table - - if include_column_lineage and table.downstreamColumns: - for col in table.downstreamColumns: - if col.directSourceColumns: - self.columnLineages[col.columnName].update_column_lineage( - col.directSourceColumns - ) - - -class SnowflakeLineageExtractor( - SnowflakeQueryMixin, SnowflakeConnectionMixin, SnowflakeCommonMixin -): - """ - Extracts Lineage from Snowflake. - Following lineage edges are considered. - - 1. "Table to View" lineage via `snowflake.account_usage.object_dependencies` view - 2. "S3 to Table" lineage via `show external tables` query. - 3. "View to Table" lineage via `snowflake.account_usage.access_history` view (requires Snowflake Enterprise Edition or above) - 4. "Table to Table" lineage via `snowflake.account_usage.access_history` view (requires Snowflake Enterprise Edition or above) - 5. "S3 to Table" lineage via `snowflake.account_usage.access_history` view (requires Snowflake Enterprise Edition or above) - - Edition Note - Snowflake Standard Edition does not have Access History Feature. So it does not support lineage extraction for edges 3, 4, 5 mentioned above. - """ - - def __init__( - self, - config: SnowflakeV2Config, - report: SnowflakeV2Report, - dataset_urn_builder: Callable[[str], str], - ) -> None: - self._lineage_map: Dict[str, SnowflakeTableLineage] = defaultdict( - SnowflakeTableLineage - ) - self._external_lineage_map: Dict[str, Set[str]] = defaultdict(set) - self.config = config - self.report = report - self.logger = logger - self.dataset_urn_builder = dataset_urn_builder - self.connection: Optional[SnowflakeConnection] = None - - # Kwargs used by new snowflake lineage extractor need to be ignored here - def get_workunits( - self, discovered_tables: List[str], discovered_views: List[str], **_kwargs: Any - ) -> Iterable[MetadataWorkUnit]: - self.connection = self.create_connection() - if self.connection is None: - return - - self._populate_table_lineage() - - if self.config.include_view_lineage: - if len(discovered_views) > 0: - self._populate_view_lineage() - else: - logger.info("No views found. Skipping View Lineage Extraction.") - - self._populate_external_lineage() - - if ( - len(self._lineage_map.keys()) == 0 - and len(self._external_lineage_map.keys()) == 0 - ): - logger.debug("No lineage found.") - return - - yield from self.get_table_upstream_workunits(discovered_tables) - yield from self.get_view_upstream_workunits(discovered_views) - - def _populate_table_lineage(self): - if self.report.edition == SnowflakeEdition.STANDARD: - logger.info( - "Snowflake Account is Standard Edition. Table to Table Lineage Feature is not supported." - ) # See Edition Note above for why - else: - with PerfTimer() as timer: - self._populate_lineage() - self.report.table_lineage_query_secs = timer.elapsed_seconds() - - def get_table_upstream_workunits(self, discovered_tables): - if self.config.include_table_lineage: - for dataset_name in discovered_tables: - upstream_lineage = self._get_upstream_lineage_info(dataset_name) - if upstream_lineage is not None: - yield MetadataChangeProposalWrapper( - entityUrn=self.dataset_urn_builder(dataset_name), - aspect=upstream_lineage, - ).as_workunit() - - def get_view_upstream_workunits(self, discovered_views): - if self.config.include_view_lineage: - for view_name in discovered_views: - upstream_lineage = self._get_upstream_lineage_info(view_name) - if upstream_lineage is not None: - yield MetadataChangeProposalWrapper( - entityUrn=self.dataset_urn_builder(view_name), - aspect=upstream_lineage, - ).as_workunit() - - def _get_upstream_lineage_info( - self, dataset_name: str - ) -> Optional[UpstreamLineage]: - lineage = self._lineage_map[dataset_name] - external_lineage = self._external_lineage_map[dataset_name] - if not (lineage.upstreamTables or lineage.columnLineages or external_lineage): - logger.debug(f"No lineage found for {dataset_name}") - return None - - upstream_tables: List[UpstreamClass] = [] - finegrained_lineages: List[FineGrainedLineage] = [] - - # Populate the table-lineage in aspect - self.update_upstream_tables_lineage(upstream_tables, lineage) - - # Populate the column-lineage in aspect - self.update_upstream_columns_lineage( - self.dataset_urn_builder(dataset_name), finegrained_lineages, lineage - ) - - # Populate the external-table-lineage(s3->snowflake) in aspect - self.update_external_tables_lineage(upstream_tables, external_lineage) - - if len(upstream_tables) > 0: - logger.debug( - f"Upstream lineage of '{dataset_name}': {[u.dataset for u in upstream_tables]}" - ) - if self.config.upstream_lineage_in_report: - self.report.upstream_lineage[dataset_name] = [ - u.dataset for u in upstream_tables - ] - return UpstreamLineage( - upstreams=upstream_tables, - fineGrainedLineages=sorted( - finegrained_lineages, key=lambda x: (x.downstreams, x.upstreams) - ) - or None, - ) - else: - return None - - def _populate_view_lineage(self) -> None: - with PerfTimer() as timer: - self._populate_view_upstream_lineage() - self.report.view_upstream_lineage_query_secs = timer.elapsed_seconds() - - if self.report.edition == SnowflakeEdition.STANDARD: - logger.info( - "Snowflake Account is Standard Edition. View to Table Lineage Feature is not supported." - ) # See Edition Note above for why - else: - with PerfTimer() as timer: - self._populate_view_downstream_lineage() - self.report.view_downstream_lineage_query_secs = timer.elapsed_seconds() - - def _populate_external_lineage(self) -> None: - with PerfTimer() as timer: - self.report.num_external_table_edges_scanned = 0 - - if self.report.edition == SnowflakeEdition.STANDARD: - logger.info( - "Snowflake Account is Standard Edition. External Lineage Feature via Access History is not supported." - ) # See Edition Note above for why - else: - self._populate_external_lineage_from_access_history() - - self._populate_external_lineage_from_show_query() - - logger.info( - f"Found {self.report.num_external_table_edges_scanned} external lineage edges." - ) - - self.report.external_lineage_queries_secs = timer.elapsed_seconds() - - # Handles the case for explicitly created external tables. - # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_show_query(self): - external_tables_query: str = SnowflakeQuery.show_external_tables() - try: - for db_row in self.query(external_tables_query): - key = self.get_dataset_identifier( - db_row["name"], db_row["schema_name"], db_row["database_name"] - ) - - if not self._is_dataset_pattern_allowed( - key, SnowflakeObjectDomain.TABLE - ): - continue - self._external_lineage_map[key].add(db_row["location"]) - logger.debug( - f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]} via show external tables" - ) - self.report.num_external_table_edges_scanned += 1 - except Exception as e: - logger.debug(e, exc_info=e) - self.report_warning( - "external_lineage", - f"Populating external table lineage from Snowflake failed due to error {e}.", - ) - - # Handles the case where a table is populated from an external location via copy. - # Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv'; - def _populate_external_lineage_from_access_history(self): - query: str = SnowflakeQuery.external_table_lineage_history( - start_time_millis=int(self.config.start_time.timestamp() * 1000) - if not self.config.ignore_start_time_lineage - else 0, - end_time_millis=int(self.config.end_time.timestamp() * 1000), - ) - - try: - for db_row in self.query(query): - self._process_external_lineage_result_row(db_row) - except Exception as e: - if isinstance(e, SnowflakePermissionError): - error_msg = "Failed to get external lineage. Please grant imported privileges on SNOWFLAKE database. " - self.warn_if_stateful_else_error(LINEAGE_PERMISSION_ERROR, error_msg) - else: - logger.debug(e, exc_info=e) - self.report_warning( - "external_lineage", - f"Populating table external lineage from Snowflake failed due to error {e}.", - ) - - def _process_external_lineage_result_row(self, db_row): - # key is the down-stream table name - key: str = self.get_dataset_identifier_from_qualified_name( - db_row["DOWNSTREAM_TABLE_NAME"] - ) - if not self._is_dataset_pattern_allowed(key, SnowflakeObjectDomain.TABLE): - return - - if db_row["UPSTREAM_LOCATIONS"] is not None: - external_locations = json.loads(db_row["UPSTREAM_LOCATIONS"]) - - for loc in external_locations: - if loc not in self._external_lineage_map[key]: - self._external_lineage_map[key].add(loc) - self.report.num_external_table_edges_scanned += 1 - - logger.debug( - f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]} via access_history" - ) - - def _populate_lineage(self) -> None: - query: str = SnowflakeQuery.table_to_table_lineage_history( - start_time_millis=int(self.config.start_time.timestamp() * 1000) - if not self.config.ignore_start_time_lineage - else 0, - end_time_millis=int(self.config.end_time.timestamp() * 1000), - include_column_lineage=self.config.include_column_lineage, - ) - self.report.num_table_to_table_edges_scanned = 0 - try: - for db_row in self.query(query): - self._process_table_lineage_row(db_row) - except Exception as e: - if isinstance(e, SnowflakePermissionError): - error_msg = "Failed to get table to table lineage. Please grant imported privileges on SNOWFLAKE database. " - self.warn_if_stateful_else_error(LINEAGE_PERMISSION_ERROR, error_msg) - else: - logger.debug(e, exc_info=e) - self.report_warning( - "table-lineage", - f"Extracting lineage from Snowflake failed due to error {e}.", - ) - logger.info( - f"A total of {self.report.num_table_to_table_edges_scanned} Table->Table edges found" - f" for {len(self._lineage_map)} downstream tables.", - ) - - def _process_table_lineage_row(self, db_row): - # key is the down-stream table name - key: str = self.get_dataset_identifier_from_qualified_name( - db_row["DOWNSTREAM_TABLE_NAME"] - ) - upstream_table_name = self.get_dataset_identifier_from_qualified_name( - db_row["UPSTREAM_TABLE_NAME"] - ) - if not self._is_dataset_pattern_allowed( - key, SnowflakeObjectDomain.TABLE - ) or not ( - self._is_dataset_pattern_allowed( - upstream_table_name, SnowflakeObjectDomain.TABLE, is_upstream=True - ) - ): - return - self._lineage_map[key].update_lineage( - # (, , ) - SnowflakeUpstreamTable.from_dict( - upstream_table_name, - db_row["UPSTREAM_TABLE_COLUMNS"], - db_row["DOWNSTREAM_TABLE_COLUMNS"], - ), - self.config.include_column_lineage, - ) - self.report.num_table_to_table_edges_scanned += 1 - logger.debug(f"Lineage[Table(Down)={key}]:Table(Up)={self._lineage_map[key]}") - - def _populate_view_upstream_lineage(self) -> None: - # NOTE: This query captures only the upstream lineage of a view (with no column lineage). - # For more details see: https://docs.snowflake.com/en/user-guide/object-dependencies.html#object-dependencies - # and also https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views. - view_upstream_lineage_query: str = SnowflakeQuery.view_dependencies() - - self.report.num_table_to_view_edges_scanned = 0 - - try: - for db_row in self.query(view_upstream_lineage_query): - self._process_view_upstream_lineage_row(db_row) - except Exception as e: - if isinstance(e, SnowflakePermissionError): - error_msg = "Failed to get table to view lineage. Please grant imported privileges on SNOWFLAKE database." - self.warn_if_stateful_else_error(LINEAGE_PERMISSION_ERROR, error_msg) - else: - logger.debug(e, exc_info=e) - self.report_warning( - "view-upstream-lineage", - f"Extracting the upstream view lineage from Snowflake failed due to error {e}.", - ) - logger.info( - f"A total of {self.report.num_table_to_view_edges_scanned} View upstream edges found." - ) - - def _process_view_upstream_lineage_row(self, db_row): - # Process UpstreamTable/View/ExternalTable/Materialized View->View edge. - view_upstream: str = self.get_dataset_identifier_from_qualified_name( - db_row["VIEW_UPSTREAM"] - ) - view_name: str = self.get_dataset_identifier_from_qualified_name( - db_row["DOWNSTREAM_VIEW"] - ) - - if not self._is_dataset_pattern_allowed( - dataset_name=view_name, - dataset_type=db_row["REFERENCING_OBJECT_DOMAIN"], - ) or not self._is_dataset_pattern_allowed( - view_upstream, db_row["REFERENCED_OBJECT_DOMAIN"], is_upstream=True - ): - return - # key is the downstream view name - self._lineage_map[view_name].update_lineage( - # (, , ) - SnowflakeUpstreamTable.from_dict(view_upstream, None, None), - self.config.include_column_lineage, - ) - self.report.num_table_to_view_edges_scanned += 1 - logger.debug( - f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}" - ) - - def _populate_view_downstream_lineage(self) -> None: - # This query captures the downstream table lineage for views. - # See https://docs.snowflake.com/en/sql-reference/account-usage/access_history.html#usage-notes for current limitations on capturing the lineage for views. - # Eg: For viewA->viewB->ViewC->TableD, snowflake does not yet log intermediate view logs, resulting in only the viewA->TableD edge. - view_lineage_query: str = SnowflakeQuery.view_lineage_history( - start_time_millis=int(self.config.start_time.timestamp() * 1000) - if not self.config.ignore_start_time_lineage - else 0, - end_time_millis=int(self.config.end_time.timestamp() * 1000), - include_column_lineage=self.config.include_column_lineage, - ) - - self.report.num_view_to_table_edges_scanned = 0 - - try: - for db_row in self.query(view_lineage_query): - self._process_view_downstream_lineage_row(db_row) - except Exception as e: - if isinstance(e, SnowflakePermissionError): - error_msg = "Failed to get view to table lineage. Please grant imported privileges on SNOWFLAKE database. " - self.warn_if_stateful_else_error(LINEAGE_PERMISSION_ERROR, error_msg) - else: - logger.debug(e, exc_info=e) - self.report_warning( - "view-downstream-lineage", - f"Extracting the view lineage from Snowflake failed due to error {e}.", - ) - - logger.info( - f"Found {self.report.num_view_to_table_edges_scanned} View->Table edges." - ) - - def _process_view_downstream_lineage_row(self, db_row): - view_name: str = self.get_dataset_identifier_from_qualified_name( - db_row["VIEW_NAME"] - ) - downstream_table: str = self.get_dataset_identifier_from_qualified_name( - db_row["DOWNSTREAM_TABLE_NAME"] - ) - if not self._is_dataset_pattern_allowed( - view_name, db_row["VIEW_DOMAIN"], is_upstream=True - ) or not self._is_dataset_pattern_allowed( - downstream_table, db_row["DOWNSTREAM_TABLE_DOMAIN"] - ): - return - - # Capture view->downstream table lineage. - self._lineage_map[downstream_table].update_lineage( - # (, , ) - SnowflakeUpstreamTable.from_dict( - view_name, - db_row["VIEW_COLUMNS"], - db_row["DOWNSTREAM_TABLE_COLUMNS"], - ), - self.config.include_column_lineage, - ) - self.report.num_view_to_table_edges_scanned += 1 - - logger.debug( - f"View->Table: Lineage[Table(Down)={downstream_table}]:View(Up)={self._lineage_map[downstream_table]}" - ) - - def update_upstream_tables_lineage( - self, upstream_tables: List[UpstreamClass], lineage: SnowflakeTableLineage - ) -> None: - for lineage_entry in sorted( - lineage.upstreamTables.values(), key=lambda x: x.upstreamDataset - ): - upstream_table_name = lineage_entry.upstreamDataset - upstream_table = UpstreamClass( - dataset=self.dataset_urn_builder(upstream_table_name), - type=DatasetLineageTypeClass.TRANSFORMED, - ) - upstream_tables.append(upstream_table) - - def update_upstream_columns_lineage( - self, - dataset_urn: str, - finegrained_lineages: List[FineGrainedLineage], - lineage: SnowflakeTableLineage, - ) -> None: - # For every column for which upstream lineage is available - for col, col_upstreams in lineage.columnLineages.items(): - # For every upstream of column - self.update_upstream_columns_lineage_of_column( - dataset_urn, col, finegrained_lineages, col_upstreams - ) - - def update_upstream_columns_lineage_of_column( - self, - dataset_urn: str, - col: str, - finegrained_lineages: List[FineGrainedLineage], - col_upstreams: SnowflakeColumnUpstreams, - ) -> None: - for fine_upstream in col_upstreams.upstreams: - finegrained_lineage_entry = self.build_finegrained_lineage( - dataset_urn, col, fine_upstream - ) - if finegrained_lineage_entry.upstreams: - finegrained_lineages.append(finegrained_lineage_entry) - - def build_finegrained_lineage( - self, - dataset_urn: str, - col: str, - fine_upstream: SnowflakeColumnFineGrainedLineage, - ) -> FineGrainedLineage: - fieldPath = col - - column_upstreams = self.build_finegrained_lineage_upstreams(fine_upstream) - finegrained_lineage_entry = FineGrainedLineage( - upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, - # Sorting the list of upstream lineage events in order to avoid creating multiple aspects in backend - # even if the lineage is same but the order is different. - upstreams=sorted(column_upstreams), - downstreamType=FineGrainedLineageDownstreamType.FIELD, - downstreams=[ - builder.make_schema_field_urn( - dataset_urn, self.snowflake_identifier(fieldPath) - ) - ], - ) - - return finegrained_lineage_entry - - def build_finegrained_lineage_upstreams( - self, fine_upstream: SnowflakeColumnFineGrainedLineage - ) -> List[str]: - column_upstreams = [] - for upstream_col in fine_upstream.inputColumns: - if ( - upstream_col.objectName - and upstream_col.columnName - and self._is_dataset_pattern_allowed( - upstream_col.objectName, upstream_col.objectDomain, is_upstream=True - ) - ): - upstream_dataset_name = self.get_dataset_identifier_from_qualified_name( - upstream_col.objectName - ) - column_upstreams.append( - builder.make_schema_field_urn( - self.dataset_urn_builder(upstream_dataset_name), - self.snowflake_identifier(upstream_col.columnName), - ) - ) - return column_upstreams - - def update_external_tables_lineage( - self, upstream_tables: List[UpstreamClass], external_lineage: Set[str] - ) -> None: - for external_lineage_entry in sorted(external_lineage): - # For now, populate only for S3 - if external_lineage_entry.startswith("s3://"): - external_upstream_table = UpstreamClass( - dataset=make_s3_urn(external_lineage_entry, self.config.env), - type=DatasetLineageTypeClass.COPY, - ) - upstream_tables.append(external_upstream_table) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 039eac1e93819..265098e2b1599 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -505,35 +505,6 @@ def view_dependencies_v2() -> str: def show_external_tables() -> str: return "show external tables in account" - # Note - This method should be removed once legacy lineage is removed - @staticmethod - def external_table_lineage_history( - start_time_millis: int, end_time_millis: int - ) -> str: - return f""" - WITH external_table_lineage_history AS ( - SELECT - r.value:"locations" AS upstream_locations, - w.value:"objectName"::varchar AS downstream_table_name, - w.value:"objectDomain"::varchar AS downstream_table_domain, - w.value:"columns" AS downstream_table_columns, - t.query_start_time AS query_start_time - FROM - (SELECT * from snowflake.account_usage.access_history) t, - lateral flatten(input => t.BASE_OBJECTS_ACCESSED) r, - lateral flatten(input => t.OBJECTS_MODIFIED) w - WHERE r.value:"locations" IS NOT NULL - AND w.value:"objectId" IS NOT NULL - AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)) - SELECT - upstream_locations AS "UPSTREAM_LOCATIONS", - downstream_table_name AS "DOWNSTREAM_TABLE_NAME", - downstream_table_columns AS "DOWNSTREAM_TABLE_COLUMNS" - FROM external_table_lineage_history - WHERE downstream_table_domain = '{SnowflakeObjectDomain.TABLE.capitalize()}' - QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name ORDER BY query_start_time DESC) = 1""" - @staticmethod def copy_lineage_history( start_time_millis: int, diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 43f5e04fbc89f..81e307a78ae9e 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -434,11 +434,6 @@ def default_query_results( # noqa: C901 } for op_idx in range(1, num_ops + 1) ] - elif query == snowflake_query.SnowflakeQuery.external_table_lineage_history( - 1654473600000, - 1654586220000, - ): - return [] elif query in [ snowflake_query.SnowflakeQuery.view_dependencies(), ]: @@ -509,10 +504,6 @@ def default_query_results( # noqa: C901 } ] elif query in [ - snowflake_query.SnowflakeQuery.external_table_lineage_history( - 1654473600000, - 1654586220000, - ), snowflake_query.SnowflakeQuery.view_dependencies_v2(), snowflake_query.SnowflakeQuery.view_dependencies(), snowflake_query.SnowflakeQuery.show_external_tables(),