Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Handle null view_definition; remove view definition hash ids #8747

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import atexit
import hashlib
import logging
import os
import re
Expand Down Expand Up @@ -146,10 +145,6 @@ def cleanup(config: BigQueryV2Config) -> None:
os.unlink(config._credentials_path)


def _generate_sql_id(sql: str) -> str:
return hashlib.md5(sql.encode("utf-8")).hexdigest()


@platform_name("BigQuery", doc_order=1)
@config_class(BigQueryV2Config)
@support_status(SupportStatus.CERTIFIED)
Expand Down Expand Up @@ -286,10 +281,9 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()

# We do this so that the SQL is stored in a file-backed dict, but the sql IDs are stored in memory.
# Maps project -> view_ref -> sql ID (will be used when generating lineage)
self.view_definition_ids: Dict[str, Dict[str, str]] = defaultdict(dict)
# Maps sql ID -> actual sql
# Maps project -> view_ref, so we can find all views in a project
self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
# Maps view ref -> actual sql
self.view_definitions: FileBackedDict[str] = FileBackedDict()

self.sql_parser_schema_resolver = SchemaResolver(
Expand Down Expand Up @@ -684,10 +678,8 @@ def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
)

if self.config.lineage_parse_view_ddl:
for view, view_definition_id in self.view_definition_ids[
project_id
].items():
view_definition = self.view_definitions[view_definition_id]
for view in self.view_refs_by_project[project_id]:
view_definition = self.view_definitions[view]
raw_view_lineage = sqlglot_lineage(
view_definition,
schema_resolver=self.sql_parser_schema_resolver,
Expand Down Expand Up @@ -896,10 +888,9 @@ def _process_view(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
if self.config.lineage_parse_view_ddl:
view_definition_id = _generate_sql_id(view.view_definition)
self.view_definition_ids[project_id][table_ref] = view_definition_id
self.view_definitions[view_definition_id] = view.view_definition
if self.config.lineage_parse_view_ddl and view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition

view.column_count = len(columns)
if not view.column_count:
Expand Down Expand Up @@ -989,7 +980,7 @@ def gen_view_dataset_workunits(
view_properties_aspect = ViewProperties(
materialized=view.materialized,
viewLanguage="SQL",
viewLogic=view_definition_string,
viewLogic=view_definition_string or "",
)
yield MetadataChangeProposalWrapper(
entityUrn=self.gen_dataset_urn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,8 @@ def _process_schema(
if self.config.parse_view_ddl:
for view in views:
key = self.get_dataset_identifier(view.name, schema_name, db_name)
self.view_definitions[key] = view.view_definition
if view.view_definition:
self.view_definitions[key] = view.view_definition

if self.config.include_technical_schema or self.config.parse_view_ddl:
for view in views:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class BaseView:
comment: Optional[str]
created: Optional[datetime]
last_altered: Optional[datetime]
view_definition: str
view_definition: Optional[str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this be null?

size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
column_count: Optional[int] = None
Expand Down
Loading