Skip to content

Commit

Permalink
feat(ingestion/bigquery): Use sqlglot_lineage for usage and add more …
Browse files Browse the repository at this point in the history
…perf timers (#9247)

Co-authored-by: Andrew Sikowitz <[email protected]>
  • Loading branch information
shubhamjagtap639 and asikowitz authored Dec 28, 2023
1 parent 4efa46f commit 3635c1c
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 505 deletions.
2 changes: 0 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,6 @@
"bigquery": sql_common
| bigquery_common
| {
# TODO: I doubt we need all three sql parsing libraries.
*sqllineage_lib,
*sqlglot_lib,
"sqlalchemy-bigquery>=1.4.1",
"google-cloud-datacatalog-lineage==0.2.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
self.sql_parser_schema_resolver = self._init_schema_resolver()

redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
Expand Down Expand Up @@ -253,6 +254,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.usage_extractor = BigQueryUsageExtractor(
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
dataset_urn_builder=self.gen_dataset_urn_from_ref,
redundant_run_skip_handler=redundant_usage_run_skip_handler,
)
Expand Down Expand Up @@ -283,8 +285,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
# Maps view ref -> actual sql
self.view_definitions: FileBackedDict[str] = FileBackedDict()

self.sql_parser_schema_resolver = self._init_schema_resolver()

self.add_config_to_report()
atexit.register(cleanup, config)

Expand Down Expand Up @@ -371,7 +371,10 @@ def usage_capability_test(
report: BigQueryV2Report,
) -> CapabilityReport:
usage_extractor = BigQueryUsageExtractor(
connection_conf, report, lambda ref: ""
connection_conf,
report,
schema_resolver=SchemaResolver(platform="bigquery"),
dataset_urn_builder=lambda ref: "",
)
for project_id in project_ids:
try:
Expand Down Expand Up @@ -447,7 +450,9 @@ def _init_schema_resolver(self) -> SchemaResolver:
self.config.lineage_parse_view_ddl or self.config.lineage_use_sql_parser
)
schema_ingestion_enabled = (
self.config.include_views and self.config.include_tables
self.config.include_schema_metadata
and self.config.include_tables
and self.config.include_views
)

if schema_resolution_required and not schema_ingestion_enabled:
Expand Down Expand Up @@ -545,10 +550,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if not projects:
return

for project_id in projects:
self.report.set_ingestion_stage(project_id.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project_id.id}")
yield from self._process_project(project_id)
if self.config.include_schema_metadata:
for project_id in projects:
self.report.set_ingestion_stage(project_id.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project_id.id}")
yield from self._process_project(project_id)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
get_first_missing_key,
get_first_missing_key_any,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn

AuditLogEntry = Any

Expand Down Expand Up @@ -178,6 +179,17 @@ def from_string_name(cls, ref: str) -> "BigQueryTableRef":
raise ValueError(f"invalid BigQuery table reference: {ref}")
return cls(BigqueryTableIdentifier(parts[1], parts[3], parts[5]))

@classmethod
def from_urn(cls, urn: str) -> "BigQueryTableRef":
"""Raises: ValueError if urn is not a valid BigQuery table URN."""
dataset_urn = DatasetUrn.create_from_string(urn)
split = dataset_urn.get_dataset_name().rsplit(".", 3)
if len(split) == 3:
project, dataset, table = split
else:
_, project, dataset, table = split
return cls(BigqueryTableIdentifier(project, dataset, table))

def is_temporary_table(self, prefixes: List[str]) -> bool:
for prefix in prefixes:
if self.table_identifier.dataset.startswith(prefix):
Expand Down Expand Up @@ -566,7 +578,7 @@ def from_query_event(
query_event: QueryEvent,
debug_include_full_payloads: bool = False,
) -> "ReadEvent":
readEvent = ReadEvent(
return ReadEvent(
actor_email=query_event.actor_email,
timestamp=query_event.timestamp,
resource=read_resource,
Expand All @@ -577,8 +589,6 @@ def from_query_event(
from_query=True,
)

return readEvent

@classmethod
def from_exported_bigquery_audit_metadata(
cls, row: BigQueryAuditMetadata, debug_include_full_payloads: bool = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class BigQueryV2Config(
description="Regex patterns for project_id to filter in ingestion.",
)

include_schema_metadata: bool = Field(
default=True,
description="Whether to ingest the BigQuery schema, i.e. projects, schemas, tables, and views.",
)

usage: BigQueryUsageConfig = Field(
default=BigQueryUsageConfig(), description="Usage related configs"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class BigQueryAuditLogApiPerfReport(Report):
list_log_entries: PerfTimer = field(default_factory=PerfTimer)


@dataclass
class BigQueryProcessingPerfReport(Report):
sql_parsing_sec: PerfTimer = field(default_factory=PerfTimer)
store_usage_event_sec: PerfTimer = field(default_factory=PerfTimer)
usage_state_size: Optional[str] = None


@dataclass
class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
Expand Down Expand Up @@ -120,8 +127,6 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR
read_reasons_stat: Counter[str] = field(default_factory=collections.Counter)
operation_types_stat: Counter[str] = field(default_factory=collections.Counter)

usage_state_size: Optional[str] = None

exclude_empty_projects: Optional[bool] = None

schema_api_perf: BigQuerySchemaApiPerfReport = field(
Expand All @@ -130,6 +135,9 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR
audit_log_api_perf: BigQueryAuditLogApiPerfReport = field(
default_factory=BigQueryAuditLogApiPerfReport
)
processing_perf: BigQueryProcessingPerfReport = field(
default_factory=BigQueryProcessingPerfReport
)

lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
AuditEvent,
AuditLogEntry,
BigQueryAuditMetadata,
BigqueryTableIdentifier,
BigQueryTableRef,
QueryEvent,
ReadEvent,
Expand All @@ -60,9 +59,9 @@
USAGE_EXTRACTION_USAGE_AGGREGATION,
)
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -284,7 +283,7 @@ def delete_original_read_events_for_view_query_events(self) -> None:
)

def report_disk_usage(self, report: BigQueryV2Report) -> None:
report.usage_state_size = str(
report.processing_perf.usage_state_size = str(
{
"main": humanfriendly.format_size(os.path.getsize(self.conn.filename)),
"queries": humanfriendly.format_size(
Expand All @@ -310,11 +309,14 @@ def __init__(
self,
config: BigQueryV2Config,
report: BigQueryV2Report,
*,
schema_resolver: SchemaResolver,
dataset_urn_builder: Callable[[BigQueryTableRef], str],
redundant_run_skip_handler: Optional[RedundantUsageRunSkipHandler] = None,
):
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = report
self.schema_resolver = schema_resolver
self.dataset_urn_builder = dataset_urn_builder
# Replace hash of query with uuid if there are hash conflicts
self.uuid_to_query: Dict[str, str] = {}
Expand Down Expand Up @@ -415,10 +417,11 @@ def generate_read_events_from_query(
) -> Iterable[AuditEvent]:
try:
tables = self.get_tables_from_query(
query_event_on_view.project_id,
query_event_on_view.query,
default_project=query_event_on_view.project_id,
default_dataset=query_event_on_view.default_dataset,
)
assert tables is not None and len(tables) != 0
assert len(tables) != 0
for table in tables:
yield AuditEvent.create(
ReadEvent.from_query_event(table, query_event_on_view)
Expand Down Expand Up @@ -462,12 +465,15 @@ def _ingest_events(
self.report.num_view_query_events += 1

for new_event in self.generate_read_events_from_query(query_event):
num_generated += self._store_usage_event(
new_event, usage_state, table_refs
)
num_aggregated += self._store_usage_event(
audit_event, usage_state, table_refs
)
with self.report.processing_perf.store_usage_event_sec:
num_generated += self._store_usage_event(
new_event, usage_state, table_refs
)
with self.report.processing_perf.store_usage_event_sec:
num_aggregated += self._store_usage_event(
audit_event, usage_state, table_refs
)

except Exception as e:
logger.warning(
f"Unable to store usage event {audit_event}", exc_info=True
Expand Down Expand Up @@ -905,54 +911,38 @@ def _generate_filter(self, corrected_start_time, corrected_end_time):
)

def get_tables_from_query(
self, default_project: str, query: str
) -> Optional[List[BigQueryTableRef]]:
self, query: str, default_project: str, default_dataset: Optional[str] = None
) -> List[BigQueryTableRef]:
"""
This method attempts to parse bigquery objects read in the query
"""
if not query:
return None
return []

parsed_tables = set()
try:
parser = BigQuerySQLParser(
query,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
tables = parser.get_tables()
except Exception as ex:
with self.report.processing_perf.sql_parsing_sec:
result = sqlglot_lineage(
query,
self.schema_resolver,
default_db=default_project,
default_schema=default_dataset,
)
except Exception:
logger.debug(
f"Sql parsing failed on this query on view: {query}. "
f"Usage won't be added. The error was {ex}."
f"Sql parsing failed on this query on view: {query}. Usage won't be added."
)
return None
logger.debug(result.debug_info)
return []

for table in tables:
parts = table.split(".")
if len(parts) == 2:
parsed_tables.add(
BigQueryTableRef(
BigqueryTableIdentifier(
project_id=default_project, dataset=parts[0], table=parts[1]
)
).get_sanitized_table_ref()
)
elif len(parts) == 3:
parsed_tables.add(
BigQueryTableRef(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
).get_sanitized_table_ref()
)
else:
logger.debug(
f"Invalid table identifier {table} when parsing query on view {query}"
)
parsed_table_refs = []
for urn in result.in_tables:
try:
parsed_table_refs.append(BigQueryTableRef.from_urn(urn))
except ValueError:
logger.debug(f"Invalid urn {urn} when parsing query on view {query}")
self.report.num_view_query_events_failed_table_identification += 1

return list(parsed_tables)
return parsed_table_refs

def _report_error(
self, label: str, e: Exception, group: Optional[str] = None
Expand Down
Loading

0 comments on commit 3635c1c

Please sign in to comment.