Skip to content

Commit

Permalink
chore(sync): use asyncio.Runner for async_to_sync() on py311+ (#2216
Browse files Browse the repository at this point in the history
)

* chore(asyncio): replace `get_event_loop()` -> `get_running_loop()` where applicable

* chore(sync): use `asyncio.Runner` for `async_to_sync()` on py311+

* chore(sync): exempt a line from coverage as it can only be hit on 3.11+

* chore(tests/asgi): adapt to Uvicorn now propagating signals to retcode

* chore(tests/asgi): do not check ASGI server retcode on Windows

* chore(tests/asgi): check for a M$ Windows specific exit code constant

* chore(sync): use a nicer pattern to get the active runner
  • Loading branch information
vytas7 authored Mar 21, 2024
1 parent dc8d2d4 commit a78cfb3
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 23 deletions.
60 changes: 44 additions & 16 deletions falcon/util/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,42 @@
'wrap_sync_to_async_unsafe',
]

Result = TypeVar('Result')


class _DummyRunner:
def run(self, coro: Awaitable[Result]) -> Result: # pragma: nocover
# NOTE(vytas): Work around get_event_loop deprecation in 3.10 by going
# via get_event_loop_policy(). This should be equivalent for
# async_to_sync's use case as it is currently impossible to invoke
# run_until_complete() from a running loop anyway.
return self.get_loop().run_until_complete(coro)

def get_loop(self) -> asyncio.AbstractEventLoop: # pragma: nocover
return asyncio.get_event_loop_policy().get_event_loop()

def close(self) -> None: # pragma: nocover
pass


class _ActiveRunner:
def __init__(self, runner_cls: type):
self._runner_cls = runner_cls
self._runner = runner_cls()

# TODO(vytas): This typing is wrong on py311+, but mypy accepts it.
# It doesn't, OTOH, accept any of my ostensibly valid attempts to
# describe it.
def __call__(self) -> _DummyRunner:
# NOTE(vytas): Sometimes our runner's loop can get picked and consumed
# by other utilities and test methods. If that happens, recreate the runner.
if self._runner.get_loop().is_closed():
# NOTE(vytas): This condition is never hit on _DummyRunner.
self._runner = self._runner_cls() # pragma: nocover
return self._runner


_active_runner = _ActiveRunner(getattr(asyncio, 'Runner', _DummyRunner))
_one_thread_to_rule_them_all = ThreadPoolExecutor(max_workers=1)

create_task = asyncio.create_task
Expand Down Expand Up @@ -190,9 +226,6 @@ def _wrap_non_coroutine_unsafe(
return wrap_sync_to_async_unsafe(func)


Result = TypeVar('Result')


def async_to_sync(
coroutine: Callable[..., Awaitable[Result]], *args: Any, **kwargs: Any
) -> Result:
Expand All @@ -204,8 +237,13 @@ def async_to_sync(
one will be created.
Warning:
This method is very inefficient and is intended primarily for testing
and prototyping.
Executing async code in this manner is inefficient since it involves
synchronization via threading primitives, and is intended primarily for
testing, prototyping or compatibility purposes.
Note:
On Python 3.11+, this function leverages a module-wide
``asyncio.Runner``.
Args:
coroutine: A coroutine function to invoke.
Expand All @@ -214,17 +252,7 @@ def async_to_sync(
Keyword Args:
**kwargs: Additional args are passed through to the coroutine function.
"""

# TODO(vytas): The canonical way of doing this for simple use cases is
# asyncio.run(), but that would be a breaking change wrt the above
# documented behaviour; breaking enough to break some of our own tests.

# NOTE(vytas): Work around get_event_loop deprecation in 3.10 by going via
# get_event_loop_policy(). This should be equivalent for async_to_sync's
# use case as it is currently impossible to invoke run_until_complete()
# from a running loop anyway.
loop = asyncio.get_event_loop_policy().get_event_loop()
return loop.run_until_complete(coroutine(*args, **kwargs))
return _active_runner().run(coroutine(*args, **kwargs))


def runs_sync(coroutine: Callable[..., Awaitable[Result]]) -> Callable[..., Result]:
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ filterwarnings = [
"ignore:.cgi. is deprecated and slated for removal:DeprecationWarning",
"ignore:path is deprecated\\. Use files\\(\\) instead:DeprecationWarning",
"ignore:This process \\(.+\\) is multi-threaded",
"ignore:There is no current event loop",
]
testpaths = [
"tests"
Expand Down
10 changes: 8 additions & 2 deletions tests/asgi/test_asgi_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import platform
import random
import signal
import subprocess
import sys
import time
Expand All @@ -27,7 +28,9 @@
_SERVER_HOST = '127.0.0.1'
_SIZE_1_KB = 1024
_SIZE_1_MB = _SIZE_1_KB**2

# NOTE(vytas): Windows specific: {Application Exit by CTRL+C}.
# The application terminated as a result of a CTRL+C.
_STATUS_CONTROL_C_EXIT = 0xC000013A

_REQUEST_TIMEOUT = 10

Expand Down Expand Up @@ -620,7 +623,10 @@ def server_base_url(request):

yield base_url

assert server.returncode == 0
# NOTE(vytas): Starting with 0.29.0, Uvicorn will propagate signal
# values into the return code (which is a good practice in Unix);
# see also https://github.com/encode/uvicorn/pull/1600
assert server.returncode in (0, -signal.SIGTERM, _STATUS_CONTROL_C_EXIT)

break

Expand Down
6 changes: 3 additions & 3 deletions tests/asgi/test_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_supported_asgi_version(version, supported):
resp_event_collector = testing.ASGIResponseEventCollector()

async def task():
coro = asyncio.get_event_loop().create_task(
coro = asyncio.get_running_loop().create_task(
app(scope, req_event_emitter, resp_event_collector)
)

Expand Down Expand Up @@ -142,7 +142,7 @@ def test_lifespan_scope_default_version():
scope = {'type': 'lifespan'}

async def t():
t = asyncio.get_event_loop().create_task(
t = asyncio.get_running_loop().create_task(
app(scope, req_event_emitter, resp_event_collector)
)

Expand Down Expand Up @@ -196,7 +196,7 @@ def test_lifespan_scope_version(spec_version, supported):
return

async def t():
t = asyncio.get_event_loop().create_task(
t = asyncio.get_running_loop().create_task(
app(scope, req_event_emitter, resp_event_collector)
)

Expand Down
2 changes: 1 addition & 1 deletion tests/dump_asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ async def app(scope, receive, send):
}
)

loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
loop.create_task(_say_hi())

0 comments on commit a78cfb3

Please sign in to comment.