diff --git a/deepset_cloud_sdk/_api/config.py b/deepset_cloud_sdk/_api/config.py index 7dcf055d..0ec17aed 100644 --- a/deepset_cloud_sdk/_api/config.py +++ b/deepset_cloud_sdk/_api/config.py @@ -48,6 +48,8 @@ def load_environment() -> bool: # configuration to use a selectd workspace DEFAULT_WORKSPACE_NAME: str = os.getenv("DEFAULT_WORKSPACE_NAME", "") +ASYNC_CLIENT_TIMEOUT: int = int(os.getenv("ASYNC_CLIENT_TIMEOUT", "300")) + @dataclass class CommonConfig: diff --git a/deepset_cloud_sdk/_api/deepset_cloud_api.py b/deepset_cloud_sdk/_api/deepset_cloud_api.py index a3403df8..2a838e19 100644 --- a/deepset_cloud_sdk/_api/deepset_cloud_api.py +++ b/deepset_cloud_sdk/_api/deepset_cloud_api.py @@ -169,6 +169,12 @@ async def delete( ) return response + @retry( + retry=retry_if_exception_type(httpx.ConnectError), + stop=stop_after_attempt(3), + wait=wait_fixed(1), + reraise=True, + ) async def put( self, workspace_name: str, diff --git a/deepset_cloud_sdk/_s3/upload.py b/deepset_cloud_sdk/_s3/upload.py index 68e77c3a..52b63d0a 100644 --- a/deepset_cloud_sdk/_s3/upload.py +++ b/deepset_cloud_sdk/_s3/upload.py @@ -14,6 +14,7 @@ from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from tqdm.asyncio import tqdm +from deepset_cloud_sdk._api.config import ASYNC_CLIENT_TIMEOUT from deepset_cloud_sdk._api.upload_sessions import ( AWSPrefixedRequestConfig, UploadSession, @@ -26,7 +27,9 @@ class RetryableHttpError(Exception): """An error that indicates a function should be retried.""" - def __init__(self, error: Union[aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError]) -> None: + def __init__( + self, error: Union[aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, aiohttp.ClientConnectionError] + ) -> None: """Store the original exception.""" self.error = error @@ -79,7 +82,7 @@ def __init__(self, concurrency: int = 120, rate_limit: Rate = Rate(3000, Duratio @retry( retry=retry_if_exception_type(RetryableHttpError), - stop=stop_after_attempt(3), + stop=stop_after_attempt(5), wait=wait_fixed(0.5), reraise=True, ) @@ -141,6 +144,8 @@ async def _upload_file_with_retries( ]: raise RetryableHttpError(cre) from cre raise + except aiohttp.ClientConnectionError as cre: + raise RetryableHttpError(cre) from cre def _build_file_data( self, content: Any, aws_safe_name: str, aws_config: AWSPrefixedRequestConfig @@ -276,7 +281,9 @@ async def upload_in_memory( :param show_progress: Whether to show a progress bar on the upload. :return: S3UploadSummary object. """ - async with aiohttp.ClientSession(connector=self.connector) as client_session: + async with aiohttp.ClientSession( + connector=self.connector, timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT) + ) as client_session: tasks = [] for file in files: diff --git a/tests/unit/s3/test_upload.py b/tests/unit/s3/test_upload.py index b88ec029..8b8f301a 100644 --- a/tests/unit/s3/test_upload.py +++ b/tests/unit/s3/test_upload.py @@ -228,3 +228,16 @@ async def test_upload_file_does_not_retry_for_exception( with pytest.raises(aiohttp.ClientResponseError): await s3._upload_file_with_retries("one.txt", upload_session_response, "123", mock_session) + + @patch("aiohttp.ClientSession") + async def test_upload_file_retries_for_client_connection_exception( + self, + mock_session: Mock, + upload_session_response: UploadSession, + ) -> None: + exception = aiohttp.ClientConnectionError() + with patch.object(aiohttp.ClientSession, "post", side_effect=exception): + s3 = S3() + + with pytest.raises(RetryableHttpError): + await s3._upload_file_with_retries("one.txt", upload_session_response, "123", mock_session)