diff --git a/stats.py b/stats.py index 135ba90..a1f6756 100755 --- a/stats.py +++ b/stats.py @@ -462,8 +462,8 @@ def create_staging_tables( staging table. """ return { - table.full_table_id: bq_client.create_table( - bigquery.Table(staging_table_id(table.full_table_id), schema=table.schema), + sql_table_id(table): bq_client.create_table( + bigquery.Table(staging_table_id(sql_table_id(table)), schema=table.schema), exists_ok=True, ) for table in tables.values() @@ -512,6 +512,15 @@ def staging_table_id(table_id: str) -> str: return f"{table_id}_staging" +def sql_table_id(table: bigquery.Table) -> str: + """Return a fully-qualified ID in standard SQL format. + + Return in the format `project.dataset_id.table_id`, since `Table.full_table_id` + returns as `project:dataset_id.table_id`. + """ + return f"{table.project}.{table.dataset_id}.{table.table_id}" + + def merge_into_bigquery( bq_client: bigquery.Client, table_id: str, @@ -560,11 +569,11 @@ def submit_to_bigquery( # Insert rows into staging table. for chunk in chunked(rows, 500): - errors = bq_client.insert_rows_json(table.full_table_id, chunk) + errors = bq_client.insert_rows_json(sql_table_id(table), chunk) if errors: logging.error( "Encountered errors while inserting rows to " - f"{table.full_table_id}: {errors}." + f"{sql_table_id(table)}: {errors}." ) sys.exit(1)