From f80913dc68683325747e09ae6cfeb878994e525f Mon Sep 17 00:00:00 2001 From: Martynas Jurkus Date: Thu, 9 Nov 2023 12:49:43 +0200 Subject: [PATCH] Remove temporary tables after data export, add logging (#16) --- .../feast/infra/offline_stores/bigquery.py | 89 +++++++++++-------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 39c13202b0..8e21503501 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,4 +1,5 @@ import contextlib +import logging import tempfile import uuid from datetime import date, datetime, timedelta @@ -46,7 +47,6 @@ from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage from feast.usage import get_user_agent, log_exceptions_and_usage - from .bigquery_source import ( BigQueryLoggingDestination, BigQuerySource, @@ -67,6 +67,8 @@ raise FeastExtrasDependencyImportError("gcp", str(e)) +logger = logging.getLogger(__name__) + def get_http_client_info(): return http_client_info.ClientInfo(user_agent=get_user_agent()) @@ -575,46 +577,57 @@ def to_remote_storage(self) -> List[str]: assert isinstance(self.config.offline_store, BigQueryOfflineStoreConfig) - table = self.to_bigquery() - - if self.config.offline_store.gcs_staging_file_size_mb is not None: - table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024 - number_of_files = max( - 1, - int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb), - ) - destination_uris = [ - f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files) - ] - else: - destination_uris = [f"{self._gcs_path}/*.parquet"] + table = None + try: + logger.info(f"Starting data export to '{self._gcs_path}'") + table = self.to_bigquery() + logger.info(f"Data exported to table '{table}'") + + if self.config.offline_store.gcs_staging_file_size_mb is not None: + table_size_in_mb = self.client.get_table(table).num_bytes / 1024 / 1024 + number_of_files = max( + 1, + int(table_size_in_mb // self.config.offline_store.gcs_staging_file_size_mb), + ) + destination_uris = [ + f"{self._gcs_path}/{n:0>12}.parquet" for n in range(number_of_files) + ] + else: + destination_uris = [f"{self._gcs_path}/*.parquet"] - job_config = bigquery.job.ExtractJobConfig() - job_config.destination_format = "PARQUET" + job_config = bigquery.job.ExtractJobConfig() + job_config.destination_format = "PARQUET" - extract_job = self.client.extract_table( - table, - destination_uris=destination_uris, - location=self.config.offline_store.location, - job_config=job_config, - ) - extract_job.result() + logger.info(f"Starting data extraction from '{table}' to '{self._gcs_path}'") + extract_job = self.client.extract_table( + table, + destination_uris=destination_uris, + location=self.config.offline_store.location, + job_config=job_config, + ) + extract_job.result() - bucket: str - prefix: str - if self.config.offline_store.billing_project_id: - storage_client = StorageClient(project=self.config.offline_store.project_id) - else: - storage_client = StorageClient(project=self.client.project) - bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) - if prefix.startswith("/"): - prefix = prefix[1:] - - blobs = storage_client.list_blobs(bucket, prefix=prefix) - results = [] - for b in blobs: - results.append(f"gs://{b.bucket.name}/{b.name}") - return results + bucket: str + prefix: str + if self.config.offline_store.billing_project_id: + storage_client = StorageClient(project=self.config.offline_store.project_id) + else: + storage_client = StorageClient(project=self.client.project) + bucket, prefix = self._gcs_path[len("gs://"):].split("/", 1) + if prefix.startswith("/"): + prefix = prefix[1:] + + blobs = storage_client.list_blobs(bucket, prefix=prefix) + results = [] + for b in blobs: + results.append(f"gs://{b.bucket.name}/{b.name}") + + logger.info(f"Data extraction completed. Extracted to {len(results)} files") + return results + finally: + if table: + logger.info(f"Cleanup: Deleting temporary table '{table}'") + self.client.delete_table(table=table, not_found_ok=True) def block_until_done(