Skip to content

Commit

Permalink
feat: upload less than 20 files via sync endpoints (#142)
Browse files Browse the repository at this point in the history
* feat: use sync upload for less than 100 files

* fix: action

* fix: actions

* fix

* fix

* fix

* fix

* ux: add logs

* fix: request

* fix

* fix: adjust total uploaded files for direct upload

* tests: add unit tests

* feat: reduce to 20 files

* fix: error code types

* fix: logs

* fix: unit tests

* feat: upload texts

* fix: unit tests

* fix: ddocstrings

* fix

* fix: upload single file

* fix

* fix

* fix: metadata upload

---------

Co-authored-by: Rohan Janjua <[email protected]>
  • Loading branch information
ArzelaAscoIi and rjanjua authored Dec 4, 2023
1 parent ae32df5 commit 119db5c
Show file tree
Hide file tree
Showing 22 changed files with 541 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/actions/integration_tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ runs:
steps:
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
shell: bash
run: pip install hatch==${{ env.HATCH_VERSION }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/compliance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

- uses: actions/setup-python@b55428b1882923874294fa556849718a1d7f2ca5
with:
python-version: 3.8
python-version: "3.10"

- name: Install prod dependencies
run: |
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/continuous-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run black
Expand All @@ -28,7 +28,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run mypy
Expand All @@ -41,7 +41,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run pylint
Expand All @@ -54,7 +54,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run mypy
Expand All @@ -67,7 +67,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run pydocstyle
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Run unit tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/merge-queue.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.10"
- name: Install Hatch
run: pip install hatch==${{ env.HATCH_VERSION }}
- name: Build
Expand Down
6 changes: 4 additions & 2 deletions deepset_cloud_sdk/_api/deepset_cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ async def post(
workspace_name: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
timeout_s: int = 20,
) -> Response:
"""Make a POST request to the deepset Cloud API.
Expand All @@ -123,7 +124,8 @@ async def post(
response = await self.client.post(
f"{self.base_url(workspace_name)}/{endpoint}",
params=params or {},
json=data or {},
json=json or {},
data=data or {},
files=files,
headers=self.headers,
timeout=timeout_s,
Expand Down
81 changes: 81 additions & 0 deletions deepset_cloud_sdk/_api/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import datetime
import inspect
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
Expand All @@ -16,14 +17,23 @@
from httpx import codes

from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI
from deepset_cloud_sdk._api.upload_sessions import WriteMode

logger = structlog.get_logger(__name__)


class NotMatchingFileTypeException(Exception):
"""Exception raised when a file is not matching the file type."""


class FileNotFoundInDeepsetCloudException(Exception):
"""Exception raised when a file is not found."""


class FailedToUploadFileException(Exception):
"""Exception raised when a file failed to be uploaded."""


@dataclass
class File:
"""File primitive from deepset Cloud. This dataclass is used for all file-related operations that don't include thea actual file content."""
Expand Down Expand Up @@ -157,6 +167,77 @@ async def _save_to_disk(self, file_dir: Path, file_name: str, content: bytes) ->
file.write(content)
return new_filename

async def direct_upload_path(
self,
workspace_name: str,
file_path: Union[Path, str],
file_name: Optional[str] = None,
meta: Optional[Dict[str, Any]] = None,
write_mode: WriteMode = WriteMode.KEEP,
) -> UUID:
"""Directly upload a file to deepset Cloud.
:param workspace_name: Name of the workspace to use.
:param file_path: Path to the file to upload.
:param file_name: Name of the file to upload.
:param meta: Meta information to attach to the file.
:return: ID of the uploaded file.
"""
if isinstance(file_path, str):
file_path = Path(file_path)

if file_name is None:
file_name = file_path.name

with file_path.open("rb") as file:
response = await self._deepset_cloud_api.post(
workspace_name,
"files",
files={"file": (file_name, file)},
json={"meta": meta},
params={"write_mode": write_mode.value},
)
if response.status_code != codes.CREATED or response.json().get("file_id") is None:
raise FailedToUploadFileException(
f"Failed to upload file with status code {response.status_code}. response was: {response.text}"
)
file_id: UUID = UUID(response.json()["file_id"])
return file_id

async def direct_upload_text(
self,
workspace_name: str,
text: str,
file_name: str,
meta: Optional[Dict[str, Any]] = None,
write_mode: WriteMode = WriteMode.KEEP,
) -> UUID:
"""Directly upload text to deepset Cloud.
:param workspace_name: Name of the workspace to use.
:param file_path: Path to the file to upload.
:param file_name: Name of the file to upload.
:param meta: Meta information to attach to the file.
:return: ID of the uploaded file.
"""
if not file_name.endswith(".txt"):
raise NotMatchingFileTypeException(
f"File name {file_name} is not a textfile. Please use '.txt' for text uploads."
)

response = await self._deepset_cloud_api.post(
workspace_name,
"files",
data={"text": text, "meta": json.dumps(meta)},
params={"write_mode": write_mode.value, "file_name": file_name},
)
if response.status_code != codes.CREATED or response.json().get("file_id") is None:
raise FailedToUploadFileException(
f"Failed to upload file with status code {response.status_code}. response was: {response.text}"
)
file_id: UUID = UUID(response.json()["file_id"])
return file_id

async def download(
self,
workspace_name: str,
Expand Down
2 changes: 1 addition & 1 deletion deepset_cloud_sdk/_api/upload_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def create(self, workspace_name: str, write_mode: WriteMode = WriteMode.KE
:return: UploadSession object.
"""
response = await self._deepset_cloud_api.post(
workspace_name=workspace_name, endpoint="upload_sessions", data={"write_mode": write_mode.value}
workspace_name=workspace_name, endpoint="upload_sessions", json={"write_mode": write_mode.value}
)
if response.status_code != codes.CREATED:
logger.error(
Expand Down
2 changes: 1 addition & 1 deletion deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def _upload_file_with_retries(
file_data = self._build_file_data(content, aws_safe_name, aws_config)
async with client_session.post(
redirect_url,
data=file_data,
json=file_data,
allow_redirects=False,
) as response:
response.raise_for_status()
Expand Down
99 changes: 96 additions & 3 deletions deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from __future__ import annotations

import asyncio
import json
import os
import time
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator, List, Optional, Union
from typing import Any, AsyncGenerator, Dict, List, Optional, Union
from uuid import UUID

import structlog
Expand All @@ -27,11 +28,14 @@
UploadSessionStatus,
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3, S3UploadSummary
from deepset_cloud_sdk._s3.upload import S3, S3UploadResult, S3UploadSummary
from deepset_cloud_sdk.models import DeepsetCloudFile

logger = structlog.get_logger(__name__)

ALLOWED_TYPE_SUFFIXES = [".txt", ".pdf"]
DIRECT_UPLOAD_THRESHOLD = 20


class FilesService:
"""Service for all file-related operations."""
Expand Down Expand Up @@ -129,6 +133,40 @@ async def _create_upload_session(
finally:
await self._upload_sessions.close(workspace_name=workspace_name, session_id=upload_session.session_id)

async def _wrapped_direct_upload_path(
self, workspace_name: str, file_path: Path, meta: Dict[str, Any], write_mode: WriteMode
) -> S3UploadResult:
try:
await self._files.direct_upload_path(
workspace_name=workspace_name,
file_path=file_path,
meta=meta,
file_name=file_path.name,
write_mode=write_mode,
)
logger.info("Successfully uploaded file.", file_path=file_path)
return S3UploadResult(file_name=file_path.name, success=True)
except Exception as error:
logger.error("Failed uploading file.", file_path=file_path, error=error)
return S3UploadResult(file_name=file_path.name, success=False, exception=error)

async def _wrapped_direct_upload_text(
self, workspace_name: str, text: str, file_name: str, meta: Dict[str, Any], write_mode: WriteMode
) -> S3UploadResult:
try:
await self._files.direct_upload_text(
workspace_name=workspace_name,
text=text,
meta=meta,
file_name=file_name,
write_mode=write_mode,
)
logger.info("Successfully uploaded file.", file_name=file_name)
return S3UploadResult(file_name=file_name, success=True)
except Exception as error:
logger.error("Failed uploading file.", file_name=file_name, error=error)
return S3UploadResult(file_name=file_name, success=False, exception=error)

async def upload_file_paths(
self,
workspace_name: str,
Expand All @@ -152,6 +190,35 @@ async def upload_file_paths(
:show_progress If True, shows a progress bar for S3 uploads.
:raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s.
"""
if len(file_paths) <= DIRECT_UPLOAD_THRESHOLD:
logger.info("Uploading files to deepset Cloud.", file_paths=file_paths)
_coroutines = []
_raw_files = [path for path in file_paths if path.suffix.lower() in ALLOWED_TYPE_SUFFIXES]
for file_path in _raw_files:
meta: Dict[str, Any] = {}
meta_path = Path(str(file_path) + ".meta.json")
if meta_path in file_paths:
with meta_path.open("r") as meta_file:
meta = json.loads(meta_file.read())

_coroutines.append(
self._wrapped_direct_upload_path(
workspace_name=workspace_name, file_path=file_path, meta=meta, write_mode=write_mode
)
)
result = await asyncio.gather(*_coroutines)
logger.info(
"Finished uploading files.",
number_of_successful_files=len(_raw_files),
failed_files=[r for r in result if r.success is False],
)
return S3UploadSummary(
total_files=len(_raw_files),
successful_upload_count=len([r for r in result if r.success]),
failed_upload_count=len([r for r in result if r.success is False]),
failed=[r for r in result if r.success is False],
)

# create session to upload files to
async with self._create_upload_session(workspace_name=workspace_name, write_mode=write_mode) as upload_session:
# upload file paths to session
Expand Down Expand Up @@ -248,7 +315,7 @@ def _preprocess_paths(paths: List[Path], spinner: yaspin.Spinner = None, recursi
file_paths = [
path
for path in all_files
if path.is_file() and ((path.suffix in [".txt", ".pdf"]) or path.name.endswith("meta.json"))
if path.is_file() and ((path.suffix in ALLOWED_TYPE_SUFFIXES) or path.name.endswith("meta.json"))
]

if len(file_paths) < len(all_files):
Expand Down Expand Up @@ -430,6 +497,32 @@ async def upload_texts(
:show_progress If True, shows a progress bar for S3 uploads.
:raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s.
"""
if len(files) <= DIRECT_UPLOAD_THRESHOLD:
logger.info("Uploading files to deepset Cloud.", total_text_files=len(files))
_coroutines = []
for file in files:
_coroutines.append(
self._wrapped_direct_upload_text(
workspace_name=workspace_name,
file_name=file.name,
meta=file.meta or {},
text=file.text,
write_mode=write_mode,
)
)
result = await asyncio.gather(*_coroutines)
logger.info(
"Finished uploading files.",
number_of_successful_files=len(files),
failed_files=[r for r in result if r.success is False],
)
return S3UploadSummary(
total_files=len(files),
successful_upload_count=len([r for r in result if r.success]),
failed_upload_count=len([r for r in result if r.success is False]),
failed=[r for r in result if r.success is False],
)

# create session to upload files to
async with self._create_upload_session(workspace_name=workspace_name, write_mode=write_mode) as upload_session:
upload_summary = await self._s3.upload_texts(
Expand Down
2 changes: 1 addition & 1 deletion docs/_images/favicon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 119db5c

Please sign in to comment.