Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove temporary tables after data export, add logging #16

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 51 additions & 38 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import logging
import tempfile
import uuid
from datetime import date, datetime, timedelta
Expand Down Expand Up @@ -46,7 +47,6 @@
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import get_user_agent, log_exceptions_and_usage

from .bigquery_source import (
BigQueryLoggingDestination,
BigQuerySource,
Expand All @@ -67,6 +67,8 @@

raise FeastExtrasDependencyImportError("gcp", str(e))

logger = logging.getLogger(__name__)
vstrimaitis marked this conversation as resolved.
Show resolved Hide resolved


def get_http_client_info():
return http_client_info.ClientInfo(user_agent=get_user_agent())
Expand Down Expand Up @@ -575,46 +577,57 @@ def to_remote_storage(self) -> List[str]:

assert isinstance(self.config.offline_store, BigQueryOfflineStoreConfig)

table = self.to_bigquery()

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]
table = None
try:
logger.info(f"Starting data export to '{self._gcs_path}'")
table = self.to_bigquery()
logger.info(f"Data exported to table '{table}'")

if self.config.offline_store.gcs_staging_file_size_mb is not None:
table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024
number_of_files = max(
1,
int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb),
)
destination_uris = [
f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files)
]
else:
destination_uris = [f"{self._gcs_path}/*.parquet"]

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
location=self.config.offline_store.location,
job_config=job_config,
)
extract_job.result()
logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'")
extract_job = self.client.extract_table(
table,
destination_uris=destination_uris,
location=self.config.offline_store.location,
job_config=job_config,
)
extract_job.result()

bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
else:
storage_client = StorageClient(project=self.client.project)
bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1)
if prefix.startswith("/"):
prefix = prefix[1:]

blobs = storage_client.list_blobs(bucket, prefix=prefix)
results = []
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")
return results
bucket: str
prefix: str
if self.config.offline_store.billing_project_id:
storage_client = StorageClient(project=self.config.offline_store.project_id)
else:
storage_client = StorageClient(project=self.client.project)
bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1)
if prefix.startswith("/"):
prefix = prefix[1:]

blobs = storage_client.list_blobs(bucket, prefix=prefix)
results = []
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")

logger.info(f"Data extraction completed. Extracted to {len(results)} files")
return results
finally:
if table:
logger.info(f"Cleanup: Deleting temporary table '{table}'")
self.client.delete_table(table=table, not_found_ok=True)


def block_until_done(
Expand Down
Loading