From 266fb941cecae146fa7494707da876b9f2fedbfc Mon Sep 17 00:00:00 2001 From: Rishab Ramanathan Date: Sat, 5 Oct 2024 15:02:13 -0700 Subject: [PATCH] fix: remove async uploads --- src/openlayer/lib/data/__init__.py | 2 - src/openlayer/lib/data/batch_inferences.py | 48 +++++----------------- 2 files changed, 10 insertions(+), 40 deletions(-) diff --git a/src/openlayer/lib/data/__init__.py b/src/openlayer/lib/data/__init__.py index 5072e313..a4e035ff 100644 --- a/src/openlayer/lib/data/__init__.py +++ b/src/openlayer/lib/data/__init__.py @@ -4,7 +4,6 @@ "StorageType", "upload_reference_dataframe", "upload_batch_inferences", - "upload_batch_inferences_async", "update_batch_inferences", ] @@ -12,6 +11,5 @@ from .batch_inferences import ( update_batch_inferences, upload_batch_inferences, - upload_batch_inferences_async, ) from .reference_dataset import upload_reference_dataframe diff --git a/src/openlayer/lib/data/batch_inferences.py b/src/openlayer/lib/data/batch_inferences.py index c8821c1a..77172ab0 100644 --- a/src/openlayer/lib/data/batch_inferences.py +++ b/src/openlayer/lib/data/batch_inferences.py @@ -1,6 +1,7 @@ """Upload a batch of inferences to the Openlayer platform.""" import time +import logging import tempfile from typing import Optional @@ -12,10 +13,11 @@ from ... import Openlayer from ..._utils import maybe_transform from ...types.inference_pipelines import data_stream_params -import asyncio +log: logging.Logger = logging.getLogger(__name__) -async def upload_batch_inferences_async( + +def upload_batch_inferences( client: Openlayer, inference_pipeline_id: str, config: data_stream_params.Config, @@ -23,7 +25,6 @@ async def upload_batch_inferences_async( dataset_path: Optional[str] = None, storage_type: Optional[StorageType] = None, merge: bool = False, - verbose: bool = False, ) -> None: """Uploads a batch of inferences to the Openlayer platform.""" if dataset_df is None and dataset_path is None: @@ -45,16 +46,9 @@ async def upload_batch_inferences_async( # writer if dataset_df is not None: temp_file_path = f"{tmp_dir}/dataset.arrow" - if verbose: - print("Converting DataFrame to pyarrow Table...") pa_table = pa.Table.from_pandas(dataset_df) pa_schema = pa_table.schema - if verbose: - print( - "Writing Arrow Table using RecordBatchStreamWriter to " - f"{temp_file_path}" - ) with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer: writer.write_table(pa_table, max_chunksize=16384) else: @@ -64,14 +58,15 @@ async def upload_batch_inferences_async( # camelCase the config config = maybe_transform(config, data_stream_params.Config) - # Upload tarball to storage - if verbose: - print("Uploading dataset to storage via presigned URL...") - uploader.upload( + # Upload file to Openlayer storage + log.info("Uploading file to Openlayer") + response = uploader.upload( file_path=temp_file_path, object_name=object_name, presigned_url_response=presigned_url_response, ) + if response.status_code != 200: + raise ValueError(f"Failed to upload file to storage: {response.text}") # Notify the backend client.post( @@ -83,30 +78,7 @@ async def upload_batch_inferences_async( "config": config, }, ) - - -def upload_batch_inferences( - client: Openlayer, - inference_pipeline_id: str, - config: data_stream_params.Config, - dataset_df: Optional[pd.DataFrame] = None, - dataset_path: Optional[str] = None, - storage_type: Optional[StorageType] = None, - merge: bool = False, - verbose: bool = False, -) -> None: - asyncio.run( - upload_batch_inferences_async( - client, - inference_pipeline_id, - config, - dataset_df, - dataset_path, - storage_type, - merge, - verbose, - ) - ) + log.info("Success! Uploaded batch inferences") def update_batch_inferences(