diff --git a/.github/actions/integration_tests/action.yml b/.github/actions/integration_tests/action.yml index 0d3a6304..630d1fb2 100644 --- a/.github/actions/integration_tests/action.yml +++ b/.github/actions/integration_tests/action.yml @@ -15,7 +15,7 @@ runs: steps: - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch shell: bash run: pip install hatch==${{ env.HATCH_VERSION }} diff --git a/.github/workflows/compliance.yml b/.github/workflows/compliance.yml index bc28bec7..2e348a1f 100644 --- a/.github/workflows/compliance.yml +++ b/.github/workflows/compliance.yml @@ -20,7 +20,7 @@ jobs: - uses: actions/setup-python@b55428b1882923874294fa556849718a1d7f2ca5 with: - python-version: 3.8 + python-version: "3.10" - name: Install prod dependencies run: | diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml index d67c287e..93766589 100644 --- a/.github/workflows/continuous-integration.yml +++ b/.github/workflows/continuous-integration.yml @@ -14,7 +14,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run black @@ -28,7 +28,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run mypy @@ -41,7 +41,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run pylint @@ -54,7 +54,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run mypy @@ -67,7 +67,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run pydocstyle @@ -96,7 +96,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Run unit tests diff --git a/.github/workflows/merge-queue.yml b/.github/workflows/merge-queue.yml index ff0ba088..9afa43dc 100644 --- a/.github/workflows/merge-queue.yml +++ b/.github/workflows/merge-queue.yml @@ -43,7 +43,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.10" - name: Install Hatch run: pip install hatch==${{ env.HATCH_VERSION }} - name: Build diff --git a/deepset_cloud_sdk/_api/deepset_cloud_api.py b/deepset_cloud_sdk/_api/deepset_cloud_api.py index 8a7cf0bf..c9fdf73e 100644 --- a/deepset_cloud_sdk/_api/deepset_cloud_api.py +++ b/deepset_cloud_sdk/_api/deepset_cloud_api.py @@ -106,8 +106,9 @@ async def post( workspace_name: str, endpoint: str, params: Optional[Dict[str, Any]] = None, - data: Optional[Dict[str, Any]] = None, + json: Optional[Dict[str, Any]] = None, files: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, timeout_s: int = 20, ) -> Response: """Make a POST request to the deepset Cloud API. @@ -123,7 +124,8 @@ async def post( response = await self.client.post( f"{self.base_url(workspace_name)}/{endpoint}", params=params or {}, - json=data or {}, + json=json or {}, + data=data or {}, files=files, headers=self.headers, timeout=timeout_s, diff --git a/deepset_cloud_sdk/_api/files.py b/deepset_cloud_sdk/_api/files.py index f264669d..6fcc1def 100644 --- a/deepset_cloud_sdk/_api/files.py +++ b/deepset_cloud_sdk/_api/files.py @@ -7,6 +7,7 @@ import datetime import inspect +import json from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Union @@ -16,14 +17,23 @@ from httpx import codes from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI +from deepset_cloud_sdk._api.upload_sessions import WriteMode logger = structlog.get_logger(__name__) +class NotMatchingFileTypeException(Exception): + """Exception raised when a file is not matching the file type.""" + + class FileNotFoundInDeepsetCloudException(Exception): """Exception raised when a file is not found.""" +class FailedToUploadFileException(Exception): + """Exception raised when a file failed to be uploaded.""" + + @dataclass class File: """File primitive from deepset Cloud. This dataclass is used for all file-related operations that don't include thea actual file content.""" @@ -157,6 +167,77 @@ async def _save_to_disk(self, file_dir: Path, file_name: str, content: bytes) -> file.write(content) return new_filename + async def direct_upload_path( + self, + workspace_name: str, + file_path: Union[Path, str], + file_name: Optional[str] = None, + meta: Optional[Dict[str, Any]] = None, + write_mode: WriteMode = WriteMode.KEEP, + ) -> UUID: + """Directly upload a file to deepset Cloud. + + :param workspace_name: Name of the workspace to use. + :param file_path: Path to the file to upload. + :param file_name: Name of the file to upload. + :param meta: Meta information to attach to the file. + :return: ID of the uploaded file. + """ + if isinstance(file_path, str): + file_path = Path(file_path) + + if file_name is None: + file_name = file_path.name + + with file_path.open("rb") as file: + response = await self._deepset_cloud_api.post( + workspace_name, + "files", + files={"file": (file_name, file)}, + json={"meta": meta}, + params={"write_mode": write_mode.value}, + ) + if response.status_code != codes.CREATED or response.json().get("file_id") is None: + raise FailedToUploadFileException( + f"Failed to upload file with status code {response.status_code}. response was: {response.text}" + ) + file_id: UUID = UUID(response.json()["file_id"]) + return file_id + + async def direct_upload_text( + self, + workspace_name: str, + text: str, + file_name: str, + meta: Optional[Dict[str, Any]] = None, + write_mode: WriteMode = WriteMode.KEEP, + ) -> UUID: + """Directly upload text to deepset Cloud. + + :param workspace_name: Name of the workspace to use. + :param file_path: Path to the file to upload. + :param file_name: Name of the file to upload. + :param meta: Meta information to attach to the file. + :return: ID of the uploaded file. + """ + if not file_name.endswith(".txt"): + raise NotMatchingFileTypeException( + f"File name {file_name} is not a textfile. Please use '.txt' for text uploads." + ) + + response = await self._deepset_cloud_api.post( + workspace_name, + "files", + data={"text": text, "meta": json.dumps(meta)}, + params={"write_mode": write_mode.value, "file_name": file_name}, + ) + if response.status_code != codes.CREATED or response.json().get("file_id") is None: + raise FailedToUploadFileException( + f"Failed to upload file with status code {response.status_code}. response was: {response.text}" + ) + file_id: UUID = UUID(response.json()["file_id"]) + return file_id + async def download( self, workspace_name: str, diff --git a/deepset_cloud_sdk/_api/upload_sessions.py b/deepset_cloud_sdk/_api/upload_sessions.py index 28fbb7d7..03633e55 100644 --- a/deepset_cloud_sdk/_api/upload_sessions.py +++ b/deepset_cloud_sdk/_api/upload_sessions.py @@ -131,7 +131,7 @@ async def create(self, workspace_name: str, write_mode: WriteMode = WriteMode.KE :return: UploadSession object. """ response = await self._deepset_cloud_api.post( - workspace_name=workspace_name, endpoint="upload_sessions", data={"write_mode": write_mode.value} + workspace_name=workspace_name, endpoint="upload_sessions", json={"write_mode": write_mode.value} ) if response.status_code != codes.CREATED: logger.error( diff --git a/deepset_cloud_sdk/_s3/upload.py b/deepset_cloud_sdk/_s3/upload.py index 3112d8d1..eb832216 100644 --- a/deepset_cloud_sdk/_s3/upload.py +++ b/deepset_cloud_sdk/_s3/upload.py @@ -117,7 +117,7 @@ async def _upload_file_with_retries( file_data = self._build_file_data(content, aws_safe_name, aws_config) async with client_session.post( redirect_url, - data=file_data, + json=file_data, allow_redirects=False, ) as response: response.raise_for_status() diff --git a/deepset_cloud_sdk/_service/files_service.py b/deepset_cloud_sdk/_service/files_service.py index 4b4da4d6..cec4b69f 100644 --- a/deepset_cloud_sdk/_service/files_service.py +++ b/deepset_cloud_sdk/_service/files_service.py @@ -2,11 +2,12 @@ from __future__ import annotations import asyncio +import json import os import time from contextlib import asynccontextmanager from pathlib import Path -from typing import AsyncGenerator, List, Optional, Union +from typing import Any, AsyncGenerator, Dict, List, Optional, Union from uuid import UUID import structlog @@ -27,11 +28,14 @@ UploadSessionStatus, WriteMode, ) -from deepset_cloud_sdk._s3.upload import S3, S3UploadSummary +from deepset_cloud_sdk._s3.upload import S3, S3UploadResult, S3UploadSummary from deepset_cloud_sdk.models import DeepsetCloudFile logger = structlog.get_logger(__name__) +ALLOWED_TYPE_SUFFIXES = [".txt", ".pdf"] +DIRECT_UPLOAD_THRESHOLD = 20 + class FilesService: """Service for all file-related operations.""" @@ -129,6 +133,40 @@ async def _create_upload_session( finally: await self._upload_sessions.close(workspace_name=workspace_name, session_id=upload_session.session_id) + async def _wrapped_direct_upload_path( + self, workspace_name: str, file_path: Path, meta: Dict[str, Any], write_mode: WriteMode + ) -> S3UploadResult: + try: + await self._files.direct_upload_path( + workspace_name=workspace_name, + file_path=file_path, + meta=meta, + file_name=file_path.name, + write_mode=write_mode, + ) + logger.info("Successfully uploaded file.", file_path=file_path) + return S3UploadResult(file_name=file_path.name, success=True) + except Exception as error: + logger.error("Failed uploading file.", file_path=file_path, error=error) + return S3UploadResult(file_name=file_path.name, success=False, exception=error) + + async def _wrapped_direct_upload_text( + self, workspace_name: str, text: str, file_name: str, meta: Dict[str, Any], write_mode: WriteMode + ) -> S3UploadResult: + try: + await self._files.direct_upload_text( + workspace_name=workspace_name, + text=text, + meta=meta, + file_name=file_name, + write_mode=write_mode, + ) + logger.info("Successfully uploaded file.", file_name=file_name) + return S3UploadResult(file_name=file_name, success=True) + except Exception as error: + logger.error("Failed uploading file.", file_name=file_name, error=error) + return S3UploadResult(file_name=file_name, success=False, exception=error) + async def upload_file_paths( self, workspace_name: str, @@ -152,6 +190,35 @@ async def upload_file_paths( :show_progress If True, shows a progress bar for S3 uploads. :raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s. """ + if len(file_paths) <= DIRECT_UPLOAD_THRESHOLD: + logger.info("Uploading files to deepset Cloud.", file_paths=file_paths) + _coroutines = [] + _raw_files = [path for path in file_paths if path.suffix.lower() in ALLOWED_TYPE_SUFFIXES] + for file_path in _raw_files: + meta: Dict[str, Any] = {} + meta_path = Path(str(file_path) + ".meta.json") + if meta_path in file_paths: + with meta_path.open("r") as meta_file: + meta = json.loads(meta_file.read()) + + _coroutines.append( + self._wrapped_direct_upload_path( + workspace_name=workspace_name, file_path=file_path, meta=meta, write_mode=write_mode + ) + ) + result = await asyncio.gather(*_coroutines) + logger.info( + "Finished uploading files.", + number_of_successful_files=len(_raw_files), + failed_files=[r for r in result if r.success is False], + ) + return S3UploadSummary( + total_files=len(_raw_files), + successful_upload_count=len([r for r in result if r.success]), + failed_upload_count=len([r for r in result if r.success is False]), + failed=[r for r in result if r.success is False], + ) + # create session to upload files to async with self._create_upload_session(workspace_name=workspace_name, write_mode=write_mode) as upload_session: # upload file paths to session @@ -248,7 +315,7 @@ def _preprocess_paths(paths: List[Path], spinner: yaspin.Spinner = None, recursi file_paths = [ path for path in all_files - if path.is_file() and ((path.suffix in [".txt", ".pdf"]) or path.name.endswith("meta.json")) + if path.is_file() and ((path.suffix in ALLOWED_TYPE_SUFFIXES) or path.name.endswith("meta.json")) ] if len(file_paths) < len(all_files): @@ -430,6 +497,32 @@ async def upload_texts( :show_progress If True, shows a progress bar for S3 uploads. :raises TimeoutError: If blocking is True and the ingestion takes longer than timeout_s. """ + if len(files) <= DIRECT_UPLOAD_THRESHOLD: + logger.info("Uploading files to deepset Cloud.", total_text_files=len(files)) + _coroutines = [] + for file in files: + _coroutines.append( + self._wrapped_direct_upload_text( + workspace_name=workspace_name, + file_name=file.name, + meta=file.meta or {}, + text=file.text, + write_mode=write_mode, + ) + ) + result = await asyncio.gather(*_coroutines) + logger.info( + "Finished uploading files.", + number_of_successful_files=len(files), + failed_files=[r for r in result if r.success is False], + ) + return S3UploadSummary( + total_files=len(files), + successful_upload_count=len([r for r in result if r.success]), + failed_upload_count=len([r for r in result if r.success is False]), + failed=[r for r in result if r.success is False], + ) + # create session to upload files to async with self._create_upload_session(workspace_name=workspace_name, write_mode=write_mode) as upload_session: upload_summary = await self._s3.upload_texts( diff --git a/docs/_images/favicon.svg b/docs/_images/favicon.svg index 24ab2fb6..41b78938 100644 --- a/docs/_images/favicon.svg +++ b/docs/_images/favicon.svg @@ -1,3 +1,3 @@ - + diff --git a/docs/_images/white-logo.svg b/docs/_images/white-logo.svg index b62472ae..5271b69c 100644 --- a/docs/_images/white-logo.svg +++ b/docs/_images/white-logo.svg @@ -1,3 +1,3 @@ - + diff --git a/pyproject.toml b/pyproject.toml index 8086201f..ba233566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ name = "deepset-cloud-sdk" dynamic = ["version"] description = 'deepset Cloud SDK' readme = "README.md" -requires-python = ">=3.8" +requires-python = ">= 3.8" license = "Apache-2.0" keywords = [] authors = [ @@ -54,7 +54,7 @@ tests-unit = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov tests-integration = "pytest tests/integration" [[tool.hatch.envs.all.matrix]] -python = ["3.8"] +python = ["3.10"] [tool.hatch.envs.default] dependencies = [ @@ -80,7 +80,7 @@ dependencies = [ [tool.hatch.envs.code-quality] -python = "3.8" +python = "3.10" template='default' detached = false # Please keep these aligned with the versions defined in .pre-commit-config.yaml @@ -136,7 +136,7 @@ exclude_lines = [ line-length = 120 [tool.mypy] -python_version = "3.8" +python_version = "3.10" warn_return_any = true warn_unused_configs = true ignore_missing_imports = true diff --git a/test-upload/example.txt b/test-upload/example.txt new file mode 100644 index 00000000..891c48d5 --- /dev/null +++ b/test-upload/example.txt @@ -0,0 +1 @@ +this is my text diff --git a/test-upload/example.txt.meta.json b/test-upload/example.txt.meta.json new file mode 100644 index 00000000..20d339c3 --- /dev/null +++ b/test-upload/example.txt.meta.json @@ -0,0 +1,4 @@ +{ + "key": "value", + "key2": "value2" +} diff --git a/test-upload/example2.txt b/test-upload/example2.txt new file mode 100644 index 00000000..5652f64c --- /dev/null +++ b/test-upload/example2.txt @@ -0,0 +1 @@ +this is my text 2 diff --git a/test-upload/example2.txt.meta.json b/test-upload/example2.txt.meta.json new file mode 100644 index 00000000..20d339c3 --- /dev/null +++ b/test-upload/example2.txt.meta.json @@ -0,0 +1,4 @@ +{ + "key": "value", + "key2": "value2" +} diff --git a/tests/data/direct_upload/example.txt b/tests/data/direct_upload/example.txt new file mode 100644 index 00000000..8bd6648e --- /dev/null +++ b/tests/data/direct_upload/example.txt @@ -0,0 +1 @@ +asdf diff --git a/tests/data/direct_upload/example.txt.meta.json b/tests/data/direct_upload/example.txt.meta.json new file mode 100644 index 00000000..21da3b26 --- /dev/null +++ b/tests/data/direct_upload/example.txt.meta.json @@ -0,0 +1,3 @@ +{ + "key": "value" +} diff --git a/tests/integration/service/test_integration_files_service.py b/tests/integration/service/test_integration_files_service.py index 6226d4fb..9445b0eb 100644 --- a/tests/integration/service/test_integration_files_service.py +++ b/tests/integration/service/test_integration_files_service.py @@ -4,6 +4,7 @@ from typing import List import pytest +from _pytest.monkeypatch import MonkeyPatch from deepset_cloud_sdk._api.config import CommonConfig from deepset_cloud_sdk._api.files import File @@ -13,7 +14,26 @@ @pytest.mark.asyncio class TestUploadsFileService: - async def test_upload(self, integration_config: CommonConfig, workspace_name: str) -> None: + async def test_direct_upload_path(self, integration_config: CommonConfig, workspace_name: str) -> None: + async with FilesService.factory(integration_config) as file_service: + timeout = 120 if "dev.cloud.dpst.dev" in integration_config.api_url else 300 + + result = await file_service.upload( + workspace_name=workspace_name, + paths=[Path("./tests/test_data/msmarco.10")], + blocking=True, + write_mode=WriteMode.KEEP, + timeout_s=timeout, + ) + assert result.total_files == 10 + assert result.successful_upload_count == 10 + assert result.failed_upload_count == 0 + assert len(result.failed) == 0 + + async def test_async_upload( + self, integration_config: CommonConfig, workspace_name: str, monkeypatch: MonkeyPatch + ) -> None: + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", 1) async with FilesService.factory(integration_config) as file_service: timeout = 120 if "dev.cloud.dpst.dev" in integration_config.api_url else 300 @@ -30,6 +50,30 @@ async def test_upload(self, integration_config: CommonConfig, workspace_name: st assert len(result.failed) == 0 async def test_upload_texts(self, integration_config: CommonConfig, workspace_name: str) -> None: + async with FilesService.factory(integration_config) as file_service: + files = [ + DeepsetCloudFile("file1", "file1.txt", {"which": 1}), + DeepsetCloudFile("file2", "file2.txt", {"which": 2}), + DeepsetCloudFile("file3", "file3.txt", {"which": 3}), + DeepsetCloudFile("file4", "file4.txt", {"which": 4}), + DeepsetCloudFile("file5", "file5.txt", {"which": 5}), + ] + result = await file_service.upload_texts( + workspace_name=workspace_name, + files=files, + blocking=True, + write_mode=WriteMode.KEEP, + timeout_s=120, + ) + assert result.total_files == 5 + assert result.successful_upload_count == 5 + assert result.failed_upload_count == 0 + assert len(result.failed) == 0 + + async def test_upload_texts_less_than_session_threshold( + self, integration_config: CommonConfig, workspace_name: str, monkeypatch: MonkeyPatch + ) -> None: + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", -1) async with FilesService.factory(integration_config) as file_service: files = [ DeepsetCloudFile("file1", "file1.txt", {"which": 1}), diff --git a/tests/unit/api/test_deepset_cloud_api.py b/tests/unit/api/test_deepset_cloud_api.py index f8cd2c34..14e6b883 100644 --- a/tests/unit/api/test_deepset_cloud_api.py +++ b/tests/unit/api/test_deepset_cloud_api.py @@ -123,7 +123,8 @@ async def test_post( "default", "endpoint", params={"param_key": "param_value"}, - data={"data_key": "data_value"}, + json={"data_key": "data_value"}, + data={"raw": "data_sent_as_form_data"}, files={"file": ("my_file", "fake-file-binary", "text/csv")}, timeout_s=123, ) @@ -133,6 +134,7 @@ async def test_post( "https://fake.dc.api/api/v1/workspaces/default/endpoint", params={"param_key": "param_value"}, json={"data_key": "data_value"}, + data={"raw": "data_sent_as_form_data"}, files={"file": ("my_file", "fake-file-binary", "text/csv")}, headers={ "Accept": "application/json", diff --git a/tests/unit/api/test_files.py b/tests/unit/api/test_files.py index 1c9cab5c..7f562598 100644 --- a/tests/unit/api/test_files.py +++ b/tests/unit/api/test_files.py @@ -1,19 +1,23 @@ import datetime +import json import tempfile from pathlib import Path from typing import Any -from unittest.mock import Mock +from unittest.mock import ANY, Mock from uuid import UUID import httpx import pytest from deepset_cloud_sdk._api.files import ( + FailedToUploadFileException, File, FileList, FileNotFoundInDeepsetCloudException, FilesAPI, + NotMatchingFileTypeException, ) +from deepset_cloud_sdk._api.upload_sessions import WriteMode @pytest.fixture @@ -292,3 +296,134 @@ def mock_response(*args: Any, **kwargs: Any) -> httpx.Response: with Path.open(Path(tmp_dir + "/silly_things_1_2.txt.meta.json")) as _file: assert _file.read() == '{"key": "value"}' + + +@pytest.mark.asyncio +class TestDirectUploadFilePath: + @pytest.mark.parametrize("error_code", [httpx.codes.NOT_FOUND, httpx.codes.SERVICE_UNAVAILABLE]) + async def test_direct_upload_file_failed( + self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock, error_code: int + ) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=error_code, + ) + with pytest.raises(FailedToUploadFileException): + await files_api.direct_upload_path( + workspace_name="test_workspace", + file_path=Path("./tests/test_data/basic.txt"), + meta={}, + ) + + async def test_direct_upload_file(self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=httpx.codes.CREATED, + json={"file_id": "cd16435f-f6eb-423f-bf6f-994dc8a36a10"}, + ) + file_id = await files_api.direct_upload_path( + workspace_name="test_workspace", + file_path=Path("./tests/test_data/basic.txt"), + meta={"key": "value"}, + write_mode=WriteMode.OVERWRITE, + ) + assert file_id == UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10") + mocked_deepset_cloud_api.post.assert_called_once_with( + "test_workspace", + "files", + files={"file": ("basic.txt", ANY)}, + json={"meta": {"key": "value"}}, + params={ + "write_mode": "OVERWRITE", + }, + ) + + async def test_direct_upload_file_with_name(self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=httpx.codes.CREATED, + json={"file_id": "cd16435f-f6eb-423f-bf6f-994dc8a36a10"}, + ) + file_id = await files_api.direct_upload_path( + workspace_name="test_workspace", + file_path=Path("./tests/test_data/basic.txt"), + meta={"key": "value"}, + file_name="my_file.txt", + write_mode=WriteMode.OVERWRITE, + ) + assert file_id == UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10") + mocked_deepset_cloud_api.post.assert_called_once_with( + "test_workspace", + "files", + files={"file": ("my_file.txt", ANY)}, + json={"meta": {"key": "value"}}, + params={"write_mode": "OVERWRITE"}, + ) + + async def test_direct_upload_with_path_as_string(self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=httpx.codes.CREATED, + json={"file_id": "cd16435f-f6eb-423f-bf6f-994dc8a36a10"}, + ) + file_id = await files_api.direct_upload_path( + workspace_name="test_workspace", + file_path="./tests/test_data/basic.txt", + meta={"key": "value"}, + file_name="my_file.txt", + write_mode=WriteMode.FAIL, + ) + assert file_id == UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10") + mocked_deepset_cloud_api.post.assert_called_once_with( + "test_workspace", + "files", + files={"file": ("my_file.txt", ANY)}, + json={"meta": {"key": "value"}}, + params={"write_mode": "FAIL"}, + ) + + +@pytest.mark.asyncio +class TestDirectUploadText: + async def test_direct_upload_file_for_wrong_file_type_name(self, files_api: FilesAPI) -> None: + with pytest.raises(NotMatchingFileTypeException): + await files_api.direct_upload_text( + workspace_name="test_workspace", + file_name="basic.json", + text="some text", + meta={}, + ) + + @pytest.mark.parametrize("error_code", [httpx.codes.NOT_FOUND, httpx.codes.SERVICE_UNAVAILABLE]) + async def test_direct_upload_file_failed( + self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock, error_code: int + ) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=error_code, + ) + with pytest.raises(FailedToUploadFileException): + await files_api.direct_upload_text( + workspace_name="test_workspace", + file_name="basic.txt", + text="some text", + meta={}, + ) + + async def test_direct_upload_file(self, files_api: FilesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.post.return_value = httpx.Response( + status_code=httpx.codes.CREATED, + json={"file_id": "cd16435f-f6eb-423f-bf6f-994dc8a36a10"}, + ) + file_id = await files_api.direct_upload_text( + workspace_name="test_workspace", + file_name="basic.txt", + text="some text", + meta={"key": "value"}, + write_mode=WriteMode.OVERWRITE, + ) + assert file_id == UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10") + mocked_deepset_cloud_api.post.assert_called_once_with( + "test_workspace", + "files", + data={"text": "some text", "meta": json.dumps({"key": "value"})}, + params={ + "write_mode": "OVERWRITE", + "file_name": "basic.txt", + }, + ) diff --git a/tests/unit/service/test_files_service.py b/tests/unit/service/test_files_service.py index b24cb5da..b95e7fc7 100644 --- a/tests/unit/service/test_files_service.py +++ b/tests/unit/service/test_files_service.py @@ -10,6 +10,7 @@ from deepset_cloud_sdk._api.config import CommonConfig from deepset_cloud_sdk._api.files import ( + FailedToUploadFileException, File, FileList, FileNotFoundInDeepsetCloudException, @@ -24,7 +25,7 @@ UploadSessionWriteModeEnum, WriteMode, ) -from deepset_cloud_sdk._s3.upload import S3UploadSummary +from deepset_cloud_sdk._s3.upload import S3UploadResult, S3UploadSummary from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService from deepset_cloud_sdk.models import UserInfo @@ -42,7 +43,10 @@ async def test_upload_file_paths( mocked_upload_sessions_api: Mock, upload_session_response: UploadSession, mocked_s3: Mock, + monkeypatch: MonkeyPatch, ) -> None: + # enforce batch upload + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", -1) upload_summary = S3UploadSummary(total_files=1, successful_upload_count=1, failed_upload_count=0, failed=[]) mocked_s3.upload_files_from_paths.return_value = upload_summary mocked_upload_sessions_api.create.return_value = upload_session_response @@ -84,7 +88,10 @@ async def test_upload_file_paths_with_timeout( file_service: FilesService, mocked_upload_sessions_api: Mock, upload_session_response: UploadSession, + monkeypatch: MonkeyPatch, ) -> None: + # enforce batch upload + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", -1) mocked_upload_sessions_api.create.return_value = upload_session_response mocked_upload_sessions_api.status.return_value = UploadSessionStatus( session_id=upload_session_response.session_id, @@ -100,6 +107,91 @@ async def test_upload_file_paths_with_timeout( workspace_name="test_workspace", file_paths=[Path("./tmp/my-file")], blocking=True, timeout_s=0 ) + async def test_upload_file_with_direct_upload_path( + self, + file_service: FilesService, + mocked_upload_sessions_api: Mock, + upload_session_response: UploadSession, + mocked_files_api: Mock, + mocked_s3: Mock, + ) -> None: + # enforce batch upload + # enforce batch upload + upload_summary = S3UploadSummary(total_files=1, successful_upload_count=1, failed_upload_count=0, failed=[]) + mocked_s3.upload_files_from_paths.return_value = upload_summary + mocked_upload_sessions_api.create.return_value = upload_session_response + mocked_upload_sessions_api.status.return_value = UploadSessionStatus( + session_id=upload_session_response.session_id, + expires_at=upload_session_response.expires_at, + documentation_url=upload_session_response.documentation_url, + ingestion_status=UploadSessionIngestionStatus( + failed_files=0, + finished_files=1, + ), + ) + result = await file_service.upload_file_paths( + workspace_name="test_workspace", + file_paths=[ + Path("./tests/data/direct_upload/example.txt"), + Path("./tests/data/direct_upload/example.txt.meta.json"), + ], + write_mode=WriteMode.OVERWRITE, + blocking=True, + timeout_s=300, + ) + assert result == upload_summary + + mocked_files_api.direct_upload_path.assert_called_once_with( + workspace_name="test_workspace", + file_path=Path("./tests/data/direct_upload/example.txt"), + file_name="example.txt", + meta={"key": "value"}, + write_mode=WriteMode.OVERWRITE, + ) + + assert not mocked_upload_sessions_api.create.called, "We should not have created a sessionf for a single file" + + async def test_upload_file_with_direct_upload_and_one_fail( + self, + file_service: FilesService, + mocked_upload_sessions_api: Mock, + mocked_files_api: Mock, + mocked_s3: Mock, + ) -> None: + upload_summary = S3UploadSummary(total_files=1, successful_upload_count=1, failed_upload_count=0, failed=[]) + mocked_s3.upload_files_from_paths.return_value = upload_summary + expected_exception = FailedToUploadFileException() + mocked_files_api.direct_upload_path.side_effect = [ + expected_exception, + UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), + ] + + result = await file_service.upload_file_paths( + workspace_name="test_workspace", + file_paths=[ + Path("./tests/data/direct_upload/example.txt"), + Path("./tests/data/direct_upload/example.txt.meta.json"), + Path("./tests/data/direct_upload/example2.txt"), + ], + write_mode=WriteMode.OVERWRITE, + blocking=True, + timeout_s=300, + ) + assert result == S3UploadSummary( + total_files=2, + successful_upload_count=1, + failed_upload_count=1, + failed=[ + S3UploadResult( + file_name="example.txt", + success=False, + exception=expected_exception, + ) + ], + ) + + assert not mocked_upload_sessions_api.create.called, "We should not have created a sessionf for a single file" + @pytest.mark.asyncio class TestUpload: @@ -197,13 +289,15 @@ async def test_upload_paths_to_file( @pytest.mark.asyncio class TestUploadTexts: - async def test_upload_texts( + async def test_upload_texts_via_sessions( self, file_service: FilesService, mocked_upload_sessions_api: Mock, upload_session_response: UploadSession, mocked_s3: Mock, + monkeypatch: MonkeyPatch, ) -> None: + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", -1) upload_summary = S3UploadSummary(total_files=1, successful_upload_count=1, failed_upload_count=0, failed=[]) mocked_s3.upload_texts.return_value = upload_summary files = [ @@ -248,13 +342,62 @@ async def test_upload_texts( workspace_name="test_workspace", session_id=upload_session_response.session_id ) + async def test_upload_texts_via_sync_upload( + self, + file_service: FilesService, + mocked_upload_sessions_api: Mock, + upload_session_response: UploadSession, + mocked_s3: Mock, + mocked_files_api: Mock, + ) -> None: + upload_summary = S3UploadSummary(total_files=1, successful_upload_count=1, failed_upload_count=0, failed=[]) + mocked_s3.upload_texts.return_value = upload_summary + files = [ + DeepsetCloudFile( + name="test_file.txt", + text="test content", + meta={"test": "test"}, + ) + ] + mocked_upload_sessions_api.create.return_value = upload_session_response + mocked_upload_sessions_api.status.return_value = UploadSessionStatus( + session_id=upload_session_response.session_id, + expires_at=upload_session_response.expires_at, + documentation_url=upload_session_response.documentation_url, + ingestion_status=UploadSessionIngestionStatus( + failed_files=0, + finished_files=1, + ), + ) + result = await file_service.upload_texts( + workspace_name="test_workspace", + files=files, + write_mode=WriteMode.OVERWRITE, + blocking=True, + timeout_s=300, + show_progress=False, + ) + assert result == upload_summary + + assert not mocked_upload_sessions_api.create.called, "We should not have created a sessionf for a single file" + + mocked_files_api.direct_upload_text.assert_called_once_with( + workspace_name="test_workspace", + text="test content", + meta={"test": "test"}, + file_name="test_file.txt", + write_mode=WriteMode.OVERWRITE, + ) + @pytest.mark.asyncio async def test_upload_file_paths_with_timeout( file_service: FilesService, mocked_upload_sessions_api: Mock, upload_session_response: UploadSession, + monkeypatch: MonkeyPatch, ) -> None: + monkeypatch.setattr("deepset_cloud_sdk._service.files_service.DIRECT_UPLOAD_THRESHOLD", -1) mocked_upload_sessions_api.create.return_value = upload_session_response mocked_upload_sessions_api.status.return_value = UploadSessionStatus( session_id=upload_session_response.session_id,