Skip to content

Commit

Permalink
bigquery: add a function to get the SQL table ID from a Table
Browse files Browse the repository at this point in the history
  • Loading branch information
cgsheeh committed Nov 1, 2024
1 parent 4331bae commit 5d8fa44
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ def create_staging_tables(
staging table.
"""
return {
table.full_table_id: bq_client.create_table(
bigquery.Table(staging_table_id(table.full_table_id), schema=table.schema),
sql_table_id(table): bq_client.create_table(
bigquery.Table(staging_table_id(sql_table_id(table)), schema=table.schema),
exists_ok=True,
)
for table in tables.values()
Expand Down Expand Up @@ -512,6 +512,15 @@ def staging_table_id(table_id: str) -> str:
return f"{table_id}_staging"


def sql_table_id(table: bigquery.Table) -> str:
"""Return a fully-qualified ID in standard SQL format.
Return in the format `project.dataset_id.table_id`, since `Table.full_table_id`
returns as `project:dataset_id.table_id`.
"""
return f"{table.project}.{table.dataset_id}.{table.table_id}"


def merge_into_bigquery(
bq_client: bigquery.Client,
table_id: str,
Expand Down Expand Up @@ -560,11 +569,11 @@ def submit_to_bigquery(

# Insert rows into staging table.
for chunk in chunked(rows, 500):
errors = bq_client.insert_rows_json(table.full_table_id, chunk)
errors = bq_client.insert_rows_json(sql_table_id(table), chunk)
if errors:
logging.error(
"Encountered errors while inserting rows to "
f"{table.full_table_id}: {errors}."
f"{sql_table_id(table)}: {errors}."
)
sys.exit(1)

Expand Down

0 comments on commit 5d8fa44

Please sign in to comment.