Skip to content

Commit

Permalink
Expose the Run retry configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Sep 6, 2024
1 parent 4415d6f commit 1d1a369
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
16 changes: 15 additions & 1 deletion python/restate/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ def clear(self, name: str) -> None:
def clear_all(self) -> None:
"""clear all the values in the store."""

@dataclass
class RunExponentialBackoffRetryConfig:
"""
Exponential backoff Retry Configuration.
Attributes:
initial_interval: The initial interval in milliseconds to wait before the first retry.
max_attempts: The maximum number of attempts to retry.
max_duration: The maximum duration in milliseconds to retry.
"""
initial_interval: typing.Optional[int]
max_attempts: typing.Optional[int]
max_duration: typing.Optional[timedelta]

class Context(abc.ABC):
"""
Expand All @@ -95,7 +108,8 @@ def request(self) -> Request:
def run(self,
name: str,
action: RunAction[T],
serde: Serde[T] = JsonSerde()) -> Awaitable[T]:
serde: Serde[T] = JsonSerde(),
retry: typing.Optional[RunExponentialBackoffRetryConfig] = None) -> Awaitable[T]:
"""
Runs the given action with the given name.
"""
Expand Down
19 changes: 16 additions & 3 deletions python/restate/server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
import typing
import traceback

from restate.context import DurablePromise, ObjectContext, Request
from restate.context import DurablePromise, ObjectContext, Request, RunExponentialBackoffRetryConfig
from restate.exceptions import TerminalError
from restate.handler import Handler, handler_from_callable, invoke_handler
from restate.serde import BytesSerde, JsonSerde, Serde
from restate.server_types import Receive, Send
from restate.vm import Failure, Invocation, NotReady, SuspendedException, VMWrapper
from restate.vm import Failure, Invocation, NotReady, SuspendedException, VMWrapper, RunRetryConfig


T = TypeVar('T')
Expand Down Expand Up @@ -230,7 +230,8 @@ def request(self) -> Request:
async def run(self,
name: str,
action: Callable[[], T] | Callable[[], Awaitable[T]],
serde: Optional[Serde[T]] = JsonSerde()) -> T:
serde: Optional[Serde[T]] = JsonSerde(),
retry: typing.Optional[RunExponentialBackoffRetryConfig] = None) -> T:
assert serde is not None
res = self.vm.sys_run_enter(name)
if isinstance(res, Failure):
Expand All @@ -254,6 +255,18 @@ async def run(self,
await self.create_poll_coroutine(handle)
# unreachable
assert False
except Exception as e:
if retry is None:
raise e
failure = Failure(code=500, message=str(e))
max_duration_ms = None if retry.max_duration is None else int(retry.max_duration.total_seconds() * 1000)
config = RunRetryConfig(initial_interval=retry.initial_interval,
max_attempts=retry.max_attempts,
max_duration=max_duration_ms)
self.vm.sys_run_exit_transient(failure=failure,
config=config,
attempt_duration_ms=1)
raise e

def sleep(self, delta: timedelta) -> Awaitable[None]:
# convert timedelta to milliseconds
Expand Down

0 comments on commit 1d1a369

Please sign in to comment.