Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry support to catch_and_convert_errors #90

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions spice/spice.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ def __init__(
logging_dir: Optional[Path | str] = None,
logging_callback: Optional[Callable[[SpiceResponse, str, str], None]] = None,
default_temperature: Optional[float] = None,
max_retries: int = 0, # Add this line
base_delay: float = 1.0, # Add this line
max_delay: float = 32.0, # Add this line
):
"""
Creates a new Spice client.
Expand Down Expand Up @@ -268,6 +271,11 @@ def __init__(
self._default_embeddings_model = embeddings_model
self._default_temperature = default_temperature

# Initialize retry configuration parameters
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay

# TODO: Should we validate model aliases?
self._model_aliases = model_aliases

Expand All @@ -278,6 +286,30 @@ def __init__(
self.logging_callback = logging_callback
self.new_run("spice")

async def call_llm(self, client: WrappedClient, call_args: SpiceCallArgs, streaming_callback: Optional[Callable[[str], None]] = None):
retries = 0
delay = self.base_delay
while retries <= self.max_retries:
try:
with client.catch_and_convert_errors():
if streaming_callback is not None:
stream = await client.get_chat_completion_or_stream(call_args)
stream = cast(AsyncIterator, stream)
streaming_spice_response = StreamingSpiceResponse(
self._get_text_model(call_args.model), call_args, client, stream, None, streaming_callback
)
return await streaming_spice_response.complete_response()
else:
chat_completion = await client.get_chat_completion_or_stream(call_args)
text, input_tokens, output_tokens = client.extract_text_and_tokens(chat_completion, call_args)
return text, input_tokens, output_tokens
except (APIConnectionError, APIError) as e:
if retries == self.max_retries:
raise e
time.sleep(min(delay, self.max_delay))
delay *= 2
retries += 1

def new_run(self, name: str):
"""
Create a new run. All llm calls will be logged in a folder with the run name and a timestamp.
Expand Down Expand Up @@ -451,23 +483,12 @@ async def get_response(
elif i > 1 and call_args.temperature is not None:
call_args.temperature = max(0.5, call_args.temperature)

with client.catch_and_convert_errors():
if streaming_callback is not None:
stream = await client.get_chat_completion_or_stream(call_args)
stream = cast(AsyncIterator, stream)
streaming_spice_response = StreamingSpiceResponse(
text_model, call_args, client, stream, None, streaming_callback
)
chat_completion = await streaming_spice_response.complete_response()
text, input_tokens, output_tokens = (
chat_completion.text,
chat_completion.input_tokens,
chat_completion.output_tokens,
)

else:
chat_completion = await client.get_chat_completion_or_stream(call_args)
text, input_tokens, output_tokens = client.extract_text_and_tokens(chat_completion, call_args)
try:
text, input_tokens, output_tokens = await self.call_llm(client, call_args, streaming_callback)
except (APIConnectionError, APIError) as e:
if i == retries:
raise e
continue

completion_cost = text_request_cost(text_model, input_tokens, output_tokens)
if completion_cost is not None:
Expand Down Expand Up @@ -542,8 +563,11 @@ async def stream_response(
client = self._get_client(text_model, provider)
call_args = self._fix_call_args(messages, text_model, True, temperature, max_tokens, response_format)

with client.catch_and_convert_errors():
stream = await client.get_chat_completion_or_stream(call_args)
try:
stream = await self.call_llm(client, call_args, streaming_callback)
except (APIConnectionError, APIError) as e:
raise e

stream = cast(AsyncIterator, stream)

def callback(response: SpiceResponse, cache: List[float] = [0]):
Expand Down
3 changes: 1 addition & 2 deletions spice/wrapped_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import base64
import io
import mimetypes
import time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't add the implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I apologize for the oversight. I'll implement the retry logic in the catch_and_convert_errors method for the WrappedOpenAIClient and WrappedAnthropicClient classes. Thank you for catching that.

from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
Expand Down Expand Up @@ -120,7 +121,6 @@ def extract_text_and_tokens(self, chat_completion, call_args: SpiceCallArgs):
@override
@contextmanager
def catch_and_convert_errors(self):
# TODO: Do we catch all errors? I think we should catch APIStatusError
try:
yield
except openai.APIConnectionError as e:
Expand Down Expand Up @@ -398,7 +398,6 @@ def catch_and_convert_errors(self):
except anthropic.APIStatusError as e:
raise APIError(f"Anthropic Status Error: {e.message}") from e

# Anthropic doesn't give us a way to count tokens, so we just use OpenAI's token counting functions and multiply by a pre-determined multiplier
class _FakeWrappedOpenAIClient(WrappedOpenAIClient):
def __init__(self):
pass
Expand Down
Loading