Skip to content

Commit

Permalink
Refactor: configurable BigQuery offline store export parquet file size (
Browse files Browse the repository at this point in the history
#13)

* Refactor by using BigQueryOfflineStoreConfig

* Remove unused import
  • Loading branch information
KarolisKont authored Nov 8, 2023
1 parent affbc4d commit fd2d8a2
Showing 1 changed file with 11 additions and 20 deletions.
31 changes: 11 additions & 20 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import os
import tempfile
import uuid
from datetime import date, datetime, timedelta
Expand All @@ -20,7 +19,7 @@
import pandas as pd
import pyarrow
import pyarrow.parquet
from pydantic import ConstrainedStr, StrictStr, validator
from pydantic import ConstrainedStr, Field, StrictStr, validator
from pydantic.typing import Literal
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed

Expand Down Expand Up @@ -104,6 +103,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""

gcs_staging_file_size_mb: Optional[int] = Field(None, ge=1, le=1000)
""" (optional) Specify the staging file size in Megabytes. If it is not set, the BigQuery export function will determine the export file size automatically."""

table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""

Expand Down Expand Up @@ -571,27 +573,16 @@ def to_remote_storage(self) -> List[str]:
"offline store when executing `to_remote_storage()`"
)

table = self.to_bigquery()
assert isinstance(self.config.offline_store, BigQueryOfflineStoreConfig)

parquet_file_size_mb = os.getenv("BQ_EXPORT_PARQUET_FILE_SIZE_MB")
if parquet_file_size_mb is not None:
if not parquet_file_size_mb.isdigit():
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable must "
"be a numeric digit, but it was set to: %s",
parquet_file_size_mb,
)

parquet_file_size_mb_int = int(parquet_file_size_mb)
if parquet_file_size_mb_int > 1000:
raise ValueError(
"The value for the BQ_EXPORT_PARQUET_FILE_SIZE_MB environment variable cannot "
"exceed 1000; however, it was set to: %s.",
parquet_file_size_mb_int,
)
table = self.to_bigquery()

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(1, table_size_in_mb // parquet_file_size_mb_int)
number_of_files = max(
1,
table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb,
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
Expand Down

0 comments on commit fd2d8a2

Please sign in to comment.