-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature_request(BREAKING_CHANGE): automatically set parquet_options.enable_list_inference = True
when the source format is parquet
#2008
Comments
Since you are using My teammate, Micah, commented on a similar Arrow + BigQuery issue with what might be going on. For (1) He mentions a feature in BigQuery "enable_list_inference". We do this automatically in the "load pandas DataFrame into BigQuery" code path: python-bigquery/google/cloud/bigquery/client.py Line 2730 in ea69fe3
We also enable compliant nested types in that code path: python-bigquery/google/cloud/bigquery/client.py Line 2815 in ea69fe3
For parquet files from polars, it'd be good to double check that the parquet schema produced are compatible with BigQuery via "compliant nested types". |
In https://gist.github.com/tswast/4e2fb2cca1c1fecf8fb697e94102358f, I have confirmed that parquet files created with the polars write engine use the "compliant" schema, which is equivalent to an list of repeated groups with a single field named "element". (See the Parquet spec) Modifying your code samples to add job_config = bigquery.LoadJobConfig(...)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options both ways (pyarrow and polars) succeed. Codefrom io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery
import polars as pl
PROJECT = "swast-scratch"
def create_and_return_table(table_name: str, client: bigquery.Client) -> bigquery.Table:
schema = [bigquery.SchemaField("foo", "INTEGER", mode="REPEATED")]
table = bigquery.Table(f"{PROJECT}.testing.{table_name}", schema=schema)
client.delete_table(table, not_found_ok=True)
return client.create_table(table)
def polars_way(table: bigquery.Table, client: bigquery.Client):
df = pl.DataFrame({"foo": [[1, 2], [3, 4]]})
with BytesIO() as stream:
df.write_parquet(stream)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def pyarrow_way(table: bigquery.Table, client: bigquery.Client):
pyarrow_schema = pa.schema([pa.field("foo", pa.large_list(pa.int64()))])
pyarrow_table = pa.Table.from_pydict(
{"foo": [[1, 2], [3, 4]]}, schema=pyarrow_schema
)
with BytesIO() as stream:
writer = pq.ParquetWriter(stream, pyarrow_schema)
writer.write(pyarrow_table)
writer.close()
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
# Default option, but make it explicit
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
# If the schema is provided, the operation succeeds, but the data is not
# correctly inserted. Empty lists are inserted instead.
# schema=table.schema,
)
parquet_options = bigquery.ParquetOptions()
parquet_options.enable_list_inference = True
job_config.parquet_options = parquet_options
job = client.load_table_from_file(
stream,
destination=table,
rewind=True,
job_config=job_config,
)
job.result()
def main():
client = bigquery.Client()
table = create_and_return_table("test_pl", client)
polars_way(table, client)
print(client.list_rows(table).to_arrow())
table = create_and_return_table("test_pa", client)
pyarrow_way(table, client)
print(client.list_rows(table).to_arrow())
if __name__ == "__main__":
main() Output:
|
All that said, I think this issue can be converted to a feature request to change the default This would be a breaking change, so should only happen whenever we do a 4.0.0 release. |
client.load_table_from_file
with parquet fileparquet_options.enable_list_inference = True
when the source format is parquet
Hi @tswast, Thank you very much for you work, comments and PR to the Polars docs! |
Environment details
google-cloud-bigquery
version: 3.25.0Steps to reproduce
client.load_table_from_file
with a parquet file written from memory to aBytesIO
buffer- If no schema is provided to
bigquery.LoadJobConfig
, the operation fails- If the table schema is provided to
bigquery.LoadJobConfig
, the operation does not raise, but instead incorrectly inserts empty arrays into the tableIssue details
I am unable to use
client.load_table_from_file
with a parquet file to append to an existing table with a REPEATED field.This issue is somewhat similar to #1981, except related to REPEATED fields rather than REQUIRED fields.
Code example
Apologies, in advance that the example is a bit long.
It demonstrates Parquet files written to BytesIO buffers from both Polars and PyArrow unable to be written to a BigQuery table with mode=REPEATED.
Code example
Stack trace
Both the
polars_way
and thepyarrow_way
raise with the error. Here they both are.The text was updated successfully, but these errors were encountered: