From 12a7bfe8c8f5b85899359b3cc112cb47a23530d0 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Tue, 26 Sep 2023 17:31:16 +0530 Subject: [PATCH] feat(ingest/snowflake): initialize schema resolver from datahub for lineage-only ingestion --- .../source/snowflake/snowflake_config.py | 4 +-- .../source/snowflake/snowflake_v2.py | 33 ++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 95f6444384408..032bdef178fdf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -101,8 +101,8 @@ class SnowflakeV2Config( ) include_view_column_lineage: bool = Field( - default=False, - description="Populates view->view and table->view column lineage.", + default=True, + description="Populates view->view and table->view column lineage using DataHub's sql parser.", ) _check_role_grants_removed = pydantic_removed_field("check_role_grants") diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 240e0ffa1a0b6..8d57bb83b3758 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -301,14 +301,11 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # Caches tables for a single database. Consider moving to disk or S3 when possible. self.db_tables: Dict[str, List[SnowflakeTable]] = {} - self.sql_parser_schema_resolver = SchemaResolver( - platform=self.platform, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) self.view_definitions: FileBackedDict[str] = FileBackedDict() self.add_config_to_report() + self.sql_parser_schema_resolver = self._init_schema_resolver() + @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": config = SnowflakeV2Config.parse_obj(config_dict) @@ -493,6 +490,24 @@ def query(query): return _report + def _init_schema_resolver(self) -> SchemaResolver: + if not self.config.include_technical_schema and self.config.parse_view_ddl: + if self.ctx.graph: + return self.ctx.graph.initialize_schema_resolver_from_datahub( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + )[0] + else: + logger.warning( + "Failed to load schema info from DataHub as DataHubGraph is missing.", + ) + return SchemaResolver( + platform=self.platform, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), @@ -764,7 +779,7 @@ def _process_schema( ) self.db_tables[schema_name] = tables - if self.config.include_technical_schema or self.config.parse_view_ddl: + if self.config.include_technical_schema: for table in tables: yield from self._process_table(table, schema_name, db_name) @@ -776,7 +791,7 @@ def _process_schema( if view.view_definition: self.view_definitions[key] = view.view_definition - if self.config.include_technical_schema or self.config.parse_view_ddl: + if self.config.include_technical_schema: for view in views: yield from self._process_view(view, schema_name, db_name) @@ -892,8 +907,6 @@ def _process_table( yield from self._process_tag(tag) yield from self.gen_dataset_workunits(table, schema_name, db_name) - elif self.config.parse_view_ddl: - self.gen_schema_metadata(table, schema_name, db_name) def fetch_sample_data_for_classification( self, table: SnowflakeTable, schema_name: str, db_name: str, dataset_name: str @@ -1004,8 +1017,6 @@ def _process_view( yield from self._process_tag(tag) yield from self.gen_dataset_workunits(view, schema_name, db_name) - elif self.config.parse_view_ddl: - self.gen_schema_metadata(view, schema_name, db_name) def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: tag_identifier = tag.identifier()