Skip to content

Commit

Permalink
fix: remove async uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
whoseoyster committed Oct 5, 2024
1 parent 1e446eb commit 266fb94
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 40 deletions.
2 changes: 0 additions & 2 deletions src/openlayer/lib/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
"StorageType",
"upload_reference_dataframe",
"upload_batch_inferences",
"upload_batch_inferences_async",
"update_batch_inferences",
]

from ._upload import StorageType
from .batch_inferences import (
update_batch_inferences,
upload_batch_inferences,
upload_batch_inferences_async,
)
from .reference_dataset import upload_reference_dataframe
48 changes: 10 additions & 38 deletions src/openlayer/lib/data/batch_inferences.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Upload a batch of inferences to the Openlayer platform."""

import time
import logging
import tempfile
from typing import Optional

Expand All @@ -12,18 +13,18 @@
from ... import Openlayer
from ..._utils import maybe_transform
from ...types.inference_pipelines import data_stream_params
import asyncio

log: logging.Logger = logging.getLogger(__name__)

async def upload_batch_inferences_async(

def upload_batch_inferences(
client: Openlayer,
inference_pipeline_id: str,
config: data_stream_params.Config,
dataset_df: Optional[pd.DataFrame] = None,
dataset_path: Optional[str] = None,
storage_type: Optional[StorageType] = None,
merge: bool = False,
verbose: bool = False,
) -> None:
"""Uploads a batch of inferences to the Openlayer platform."""
if dataset_df is None and dataset_path is None:
Expand All @@ -45,16 +46,9 @@ async def upload_batch_inferences_async(
# writer
if dataset_df is not None:
temp_file_path = f"{tmp_dir}/dataset.arrow"
if verbose:
print("Converting DataFrame to pyarrow Table...")
pa_table = pa.Table.from_pandas(dataset_df)
pa_schema = pa_table.schema

if verbose:
print(
"Writing Arrow Table using RecordBatchStreamWriter to "
f"{temp_file_path}"
)
with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer:
writer.write_table(pa_table, max_chunksize=16384)
else:
Expand All @@ -64,14 +58,15 @@ async def upload_batch_inferences_async(
# camelCase the config
config = maybe_transform(config, data_stream_params.Config)

# Upload tarball to storage
if verbose:
print("Uploading dataset to storage via presigned URL...")
uploader.upload(
# Upload file to Openlayer storage
log.info("Uploading file to Openlayer")
response = uploader.upload(
file_path=temp_file_path,
object_name=object_name,
presigned_url_response=presigned_url_response,
)
if response.status_code != 200:
raise ValueError(f"Failed to upload file to storage: {response.text}")

# Notify the backend
client.post(
Expand All @@ -83,30 +78,7 @@ async def upload_batch_inferences_async(
"config": config,
},
)


def upload_batch_inferences(
client: Openlayer,
inference_pipeline_id: str,
config: data_stream_params.Config,
dataset_df: Optional[pd.DataFrame] = None,
dataset_path: Optional[str] = None,
storage_type: Optional[StorageType] = None,
merge: bool = False,
verbose: bool = False,
) -> None:
asyncio.run(
upload_batch_inferences_async(
client,
inference_pipeline_id,
config,
dataset_df,
dataset_path,
storage_type,
merge,
verbose,
)
)
log.info("Success! Uploaded batch inferences")


def update_batch_inferences(
Expand Down

0 comments on commit 266fb94

Please sign in to comment.