Skip to content

Commit

Permalink
feat: add async batch uploads & improve client-side upload latency
Browse files Browse the repository at this point in the history
  • Loading branch information
whoseoyster committed Oct 3, 2024
1 parent a9cf640 commit d7b8a85
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 23 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ dependencies = [
"sniffio",
"cached-property; python_version < '3.8'",
"pandas; python_version >= '3.7'",
"pyarrow>=11.0.0",
"pyyaml>=6.0",
"requests_toolbelt>=1.0.0",
]
requires-python = ">= 3.7"
classifiers = [
Expand Down
7 changes: 6 additions & 1 deletion src/openlayer/lib/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
"StorageType",
"upload_reference_dataframe",
"upload_batch_inferences",
"upload_batch_inferences_async",
"update_batch_inferences",
]

from ._upload import StorageType
from .batch_inferences import update_batch_inferences, upload_batch_inferences
from .batch_inferences import (
update_batch_inferences,
upload_batch_inferences,
upload_batch_inferences_async,
)
from .reference_dataset import upload_reference_dataframe
1 change: 0 additions & 1 deletion src/openlayer/lib/data/_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""

import os
import shutil
from enum import Enum
from typing import Optional

Expand Down
74 changes: 53 additions & 21 deletions src/openlayer/lib/data/batch_inferences.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
"""Upload a batch of inferences to the Openlayer platform."""

import os
import time
import shutil
import tarfile
import tempfile
from typing import Optional

import httpx
import pandas as pd
import pyarrow as pa

from . import StorageType, _upload
from .. import utils
from ... import Openlayer
from ..._utils import maybe_transform
from ...types.inference_pipelines import data_stream_params
import asyncio


def upload_batch_inferences(
async def upload_batch_inferences_async(
client: Openlayer,
inference_pipeline_id: str,
config: data_stream_params.Config,
dataset_df: Optional[pd.DataFrame] = None,
dataset_path: Optional[str] = None,
storage_type: Optional[StorageType] = None,
merge: bool = False,
verbose: bool = False,
) -> None:
"""Uploads a batch of inferences to the Openlayer platform."""
if dataset_df is None and dataset_path is None:
Expand All @@ -33,7 +32,7 @@ def upload_batch_inferences(
raise ValueError("Only one of dataset_df or dataset_path should be provided.")

uploader = _upload.Uploader(client, storage_type)
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.tar.gz"
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.arrow"

# Fetch presigned url
presigned_url_response = client.storage.presigned_url.create(
Expand All @@ -42,26 +41,34 @@ def upload_batch_inferences(

# Write dataset and config to temp directory
with tempfile.TemporaryDirectory() as tmp_dir:
temp_file_path = f"{tmp_dir}/dataset.csv"
# If DataFrame is provided, convert it to Arrow Table and write it using IPC
# writer
if dataset_df is not None:
dataset_df.to_csv(temp_file_path, index=False)
else:
shutil.copy(dataset_path, temp_file_path)
temp_file_path = f"{tmp_dir}/dataset.arrow"
if verbose:
print("Converting DataFrame to pyarrow Table...")
pa_table = pa.Table.from_pandas(dataset_df)
pa_schema = pa_table.schema

# Copy relevant files to tmp dir
config["label"] = "production"
utils.write_yaml(
maybe_transform(config, data_stream_params.Config),
f"{tmp_dir}/dataset_config.yaml",
)
if verbose:
print(
"Writing Arrow Table using RecordBatchStreamWriter to "
f"{temp_file_path}"
)
with pa.ipc.RecordBatchStreamWriter(temp_file_path, pa_schema) as writer:
writer.write_table(pa_table, max_chunksize=16384)
else:
object_name = f"batch_data_{time.time()}_{inference_pipeline_id}.csv"
temp_file_path = dataset_path

tar_file_path = os.path.join(tmp_dir, object_name)
with tarfile.open(tar_file_path, mode="w:gz") as tar:
tar.add(tmp_dir, arcname=os.path.basename("monitoring_data"))
# camelCase the config
config = maybe_transform(config, data_stream_params.Config)

# Upload to storage
# Upload tarball to storage
if verbose:
print("Uploading dataset to storage via presigned URL...")
uploader.upload(
file_path=tar_file_path,
file_path=temp_file_path,
object_name=object_name,
presigned_url_response=presigned_url_response,
)
Expand All @@ -73,10 +80,35 @@ def upload_batch_inferences(
body={
"storageUri": presigned_url_response.storage_uri,
"performDataMerge": merge,
"config": config,
},
)


def upload_batch_inferences(
client: Openlayer,
inference_pipeline_id: str,
config: data_stream_params.Config,
dataset_df: Optional[pd.DataFrame] = None,
dataset_path: Optional[str] = None,
storage_type: Optional[StorageType] = None,
merge: bool = False,
verbose: bool = False,
) -> None:
asyncio.run(
upload_batch_inferences_async(
client,
inference_pipeline_id,
config,
dataset_df,
dataset_path,
storage_type,
merge,
verbose,
)
)


def update_batch_inferences(
client: Openlayer,
inference_pipeline_id: str,
Expand Down

0 comments on commit d7b8a85

Please sign in to comment.