diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 7690723837165..1107a54a1896b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -1,5 +1,4 @@ import atexit -import hashlib import logging import os import re @@ -146,10 +145,6 @@ def cleanup(config: BigQueryV2Config) -> None: os.unlink(config._credentials_path) -def _generate_sql_id(sql: str) -> str: - return hashlib.md5(sql.encode("utf-8")).hexdigest() - - @platform_name("BigQuery", doc_order=1) @config_class(BigQueryV2Config) @support_status(SupportStatus.CERTIFIED) @@ -286,10 +281,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): # Global store of table identifiers for lineage filtering self.table_refs: Set[str] = set() - # We do this so that the SQL is stored in a file-backed dict, but the sql IDs are stored in memory. - # Maps project -> view_ref -> sql ID (will be used when generating lineage) - self.view_definition_ids: Dict[str, Dict[str, str]] = defaultdict(dict) - # Maps sql ID -> actual sql + # Maps project -> view_ref, so we can find all views in a project + self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set) + # Maps view ref -> actual sql self.view_definitions: FileBackedDict[str] = FileBackedDict() self.sql_parser_schema_resolver = SchemaResolver( @@ -684,10 +678,8 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: ) if self.config.lineage_parse_view_ddl: - for view, view_definition_id in self.view_definition_ids[ - project_id - ].items(): - view_definition = self.view_definitions[view_definition_id] + for view in self.view_refs_by_project[project_id]: + view_definition = self.view_definitions[view] raw_view_lineage = sqlglot_lineage( view_definition, schema_resolver=self.sql_parser_schema_resolver, @@ -896,10 +888,9 @@ def _process_view( BigQueryTableRef(table_identifier).get_sanitized_table_ref() ) self.table_refs.add(table_ref) - if self.config.lineage_parse_view_ddl: - view_definition_id = _generate_sql_id(view.view_definition) - self.view_definition_ids[project_id][table_ref] = view_definition_id - self.view_definitions[view_definition_id] = view.view_definition + if self.config.lineage_parse_view_ddl and view.view_definition: + self.view_refs_by_project[project_id].add(table_ref) + self.view_definitions[table_ref] = view.view_definition view.column_count = len(columns) if not view.column_count: @@ -989,7 +980,7 @@ def gen_view_dataset_workunits( view_properties_aspect = ViewProperties( materialized=view.materialized, viewLanguage="SQL", - viewLogic=view_definition_string, + viewLogic=view_definition_string or "", ) yield MetadataChangeProposalWrapper( entityUrn=self.gen_dataset_urn( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 90b751c875add..e561ed0e2d146 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -770,7 +770,8 @@ def _process_schema( if self.config.parse_view_ddl: for view in views: key = self.get_dataset_identifier(view.name, schema_name, db_name) - self.view_definitions[key] = view.view_definition + if view.view_definition: + self.view_definitions[key] = view.view_definition if self.config.include_technical_schema or self.config.parse_view_ddl: for view in views: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py index aa0493a18ab58..345f5bd57b44c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic.py @@ -44,7 +44,7 @@ class BaseView: comment: Optional[str] created: Optional[datetime] last_altered: Optional[datetime] - view_definition: str + view_definition: Optional[str] size_in_bytes: Optional[int] = None rows_count: Optional[int] = None column_count: Optional[int] = None