Skip to content

Commit

Permalink
Fix: tasks.repeat_every() and related tests (#310)
Browse files Browse the repository at this point in the history
* fix(tasks): Fix @repeat_every blocking app startup

* fix(tasks): Add missing await on ensure_future

* Revert "fix(tasks): Add missing await on ensure_future"

This reverts commit 3d9ba0d.

See the conversation on #308 for further details.

* added on_complete parameter for pytest purpose

* added loop completed state and loop_completed function

* formatting improvements (for mypy)

* use Union for typing support in older Python versions

* on_exception feature added and deprecation warnings for logger and raise_exception added

* Fix TestRepeatEveryWithAsynchronousFunction so that it runs with async functions

* add pytest-timeout to package and fix typo

* fix function type problems by removing conditional types

---------

Co-authored-by: Martin <[email protected]>
Co-authored-by: Yuval Levi <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2024
1 parent 57b7657 commit fdd2939
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 81 deletions.
59 changes: 48 additions & 11 deletions fastapi_utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import warnings
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Union
Expand All @@ -10,7 +11,26 @@

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]
ExcArgNoReturnFuncT = Callable[[Exception], None]
ExcArgNoReturnAsyncFuncT = Callable[[Exception], Coroutine[Any, Any, None]]
NoArgsNoReturnAnyFuncT = Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]
ExcArgNoReturnAnyFuncT = Union[ExcArgNoReturnFuncT, ExcArgNoReturnAsyncFuncT]
NoArgsNoReturnDecorator = Callable[[NoArgsNoReturnAnyFuncT], NoArgsNoReturnAsyncFuncT]


async def _handle_func(func: NoArgsNoReturnAnyFuncT) -> None:
if asyncio.iscoroutinefunction(func):
await func()
else:
await run_in_threadpool(func)


async def _handle_exc(exc: Exception, on_exception: ExcArgNoReturnAnyFuncT | None) -> None:
if on_exception:
if asyncio.iscoroutinefunction(on_exception):
await on_exception(exc)
else:
await run_in_threadpool(on_exception, exc)


def repeat_every(
Expand All @@ -20,6 +40,8 @@ def repeat_every(
logger: logging.Logger | None = None,
raise_exceptions: bool = False,
max_repetitions: int | None = None,
on_complete: NoArgsNoReturnAnyFuncT | None = None,
on_exception: ExcArgNoReturnAnyFuncT | None = None,
) -> NoArgsNoReturnDecorator:
"""
This function returns a decorator that modifies a function so it is periodically re-executed after its first call.
Expand All @@ -34,47 +56,62 @@ def repeat_every(
wait_first: float (default None)
If not None, the function will wait for the given duration before the first call
logger: Optional[logging.Logger] (default None)
Warning: This parameter is deprecated and will be removed in the 1.0 release.
The logger to use to log any exceptions raised by calls to the decorated function.
If not provided, exceptions will not be logged by this function (though they may be handled by the event loop).
raise_exceptions: bool (default False)
Warning: This parameter is deprecated and will be removed in the 1.0 release.
If True, errors raised by the decorated function will be raised to the event loop's exception handler.
Note that if an error is raised, the repeated execution will stop.
Otherwise, exceptions are just logged and the execution continues to repeat.
See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info.
max_repetitions: Optional[int] (default None)
The maximum number of times to call the repeated function. If `None`, the function is repeated forever.
on_complete: Optional[Callable[[], None]] (default None)
A function to call after the final repetition of the decorated function.
on_exception: Optional[Callable[[Exception], None]] (default None)
A function to call when an exception is raised by the decorated function.
"""

def decorator(func: NoArgsNoReturnAsyncFuncT | NoArgsNoReturnFuncT) -> NoArgsNoReturnAsyncFuncT:
def decorator(func: NoArgsNoReturnAnyFuncT) -> NoArgsNoReturnAsyncFuncT:
"""
Converts the decorated function into a repeated, periodically-called version of itself.
"""
is_coroutine = asyncio.iscoroutinefunction(func)

@wraps(func)
async def wrapped() -> None:
repetitions = 0

async def loop() -> None:
nonlocal repetitions
if wait_first is not None:
await asyncio.sleep(wait_first)

repetitions = 0
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
await func() # type: ignore
else:
await run_in_threadpool(func)
await _handle_func(func)

except Exception as exc:
if logger is not None:
warnings.warn(
"'logger' is to be deprecated in favor of 'on_exception' in the 1.0 release.",
DeprecationWarning,
)
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
warnings.warn(
"'raise_exceptions' is to be deprecated in favor of 'on_exception' in the 1.0 release.",
DeprecationWarning,
)
raise exc
await _handle_exc(exc, on_exception)

repetitions += 1
await asyncio.sleep(seconds)

await loop()
if on_complete:
await _handle_func(on_complete)

asyncio.ensure_future(loop())

return wrapped

Expand Down
31 changes: 28 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ session = ["sqlalchemy"]

[tool.poetry.group.dev.dependencies]
codecov = "^2.1.13"
pytest-timeout = "^2.3.1"

[tool.black]
line-length = 120
Expand Down
Loading

0 comments on commit fdd2939

Please sign in to comment.