From a78cfb38a0c0f6031cc3ff39ff8bf4afd03ef008 Mon Sep 17 00:00:00 2001 From: Vytautas Liuolia Date: Thu, 21 Mar 2024 20:59:26 +0100 Subject: [PATCH] chore(sync): use `asyncio.Runner` for `async_to_sync()` on py311+ (#2216) * chore(asyncio): replace `get_event_loop()` -> `get_running_loop()` where applicable * chore(sync): use `asyncio.Runner` for `async_to_sync()` on py311+ * chore(sync): exempt a line from coverage as it can only be hit on 3.11+ * chore(tests/asgi): adapt to Uvicorn now propagating signals to retcode * chore(tests/asgi): do not check ASGI server retcode on Windows * chore(tests/asgi): check for a M$ Windows specific exit code constant * chore(sync): use a nicer pattern to get the active runner --- falcon/util/sync.py | 60 ++++++++++++++++++++++++--------- pyproject.toml | 1 - tests/asgi/test_asgi_servers.py | 10 ++++-- tests/asgi/test_scope.py | 6 ++-- tests/dump_asgi.py | 2 +- 5 files changed, 56 insertions(+), 23 deletions(-) diff --git a/falcon/util/sync.py b/falcon/util/sync.py index db9e21e37..96d05c058 100644 --- a/falcon/util/sync.py +++ b/falcon/util/sync.py @@ -22,6 +22,42 @@ 'wrap_sync_to_async_unsafe', ] +Result = TypeVar('Result') + + +class _DummyRunner: + def run(self, coro: Awaitable[Result]) -> Result: # pragma: nocover + # NOTE(vytas): Work around get_event_loop deprecation in 3.10 by going + # via get_event_loop_policy(). This should be equivalent for + # async_to_sync's use case as it is currently impossible to invoke + # run_until_complete() from a running loop anyway. + return self.get_loop().run_until_complete(coro) + + def get_loop(self) -> asyncio.AbstractEventLoop: # pragma: nocover + return asyncio.get_event_loop_policy().get_event_loop() + + def close(self) -> None: # pragma: nocover + pass + + +class _ActiveRunner: + def __init__(self, runner_cls: type): + self._runner_cls = runner_cls + self._runner = runner_cls() + + # TODO(vytas): This typing is wrong on py311+, but mypy accepts it. + # It doesn't, OTOH, accept any of my ostensibly valid attempts to + # describe it. + def __call__(self) -> _DummyRunner: + # NOTE(vytas): Sometimes our runner's loop can get picked and consumed + # by other utilities and test methods. If that happens, recreate the runner. + if self._runner.get_loop().is_closed(): + # NOTE(vytas): This condition is never hit on _DummyRunner. + self._runner = self._runner_cls() # pragma: nocover + return self._runner + + +_active_runner = _ActiveRunner(getattr(asyncio, 'Runner', _DummyRunner)) _one_thread_to_rule_them_all = ThreadPoolExecutor(max_workers=1) create_task = asyncio.create_task @@ -190,9 +226,6 @@ def _wrap_non_coroutine_unsafe( return wrap_sync_to_async_unsafe(func) -Result = TypeVar('Result') - - def async_to_sync( coroutine: Callable[..., Awaitable[Result]], *args: Any, **kwargs: Any ) -> Result: @@ -204,8 +237,13 @@ def async_to_sync( one will be created. Warning: - This method is very inefficient and is intended primarily for testing - and prototyping. + Executing async code in this manner is inefficient since it involves + synchronization via threading primitives, and is intended primarily for + testing, prototyping or compatibility purposes. + + Note: + On Python 3.11+, this function leverages a module-wide + ``asyncio.Runner``. Args: coroutine: A coroutine function to invoke. @@ -214,17 +252,7 @@ def async_to_sync( Keyword Args: **kwargs: Additional args are passed through to the coroutine function. """ - - # TODO(vytas): The canonical way of doing this for simple use cases is - # asyncio.run(), but that would be a breaking change wrt the above - # documented behaviour; breaking enough to break some of our own tests. - - # NOTE(vytas): Work around get_event_loop deprecation in 3.10 by going via - # get_event_loop_policy(). This should be equivalent for async_to_sync's - # use case as it is currently impossible to invoke run_until_complete() - # from a running loop anyway. - loop = asyncio.get_event_loop_policy().get_event_loop() - return loop.run_until_complete(coroutine(*args, **kwargs)) + return _active_runner().run(coroutine(*args, **kwargs)) def runs_sync(coroutine: Callable[..., Awaitable[Result]]) -> Callable[..., Result]: diff --git a/pyproject.toml b/pyproject.toml index ad445ce55..5ed0c5fab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,6 @@ filterwarnings = [ "ignore:.cgi. is deprecated and slated for removal:DeprecationWarning", "ignore:path is deprecated\\. Use files\\(\\) instead:DeprecationWarning", "ignore:This process \\(.+\\) is multi-threaded", - "ignore:There is no current event loop", ] testpaths = [ "tests" diff --git a/tests/asgi/test_asgi_servers.py b/tests/asgi/test_asgi_servers.py index 26f51ad0c..321e41f96 100644 --- a/tests/asgi/test_asgi_servers.py +++ b/tests/asgi/test_asgi_servers.py @@ -4,6 +4,7 @@ import os import platform import random +import signal import subprocess import sys import time @@ -27,7 +28,9 @@ _SERVER_HOST = '127.0.0.1' _SIZE_1_KB = 1024 _SIZE_1_MB = _SIZE_1_KB**2 - +# NOTE(vytas): Windows specific: {Application Exit by CTRL+C}. +# The application terminated as a result of a CTRL+C. +_STATUS_CONTROL_C_EXIT = 0xC000013A _REQUEST_TIMEOUT = 10 @@ -620,7 +623,10 @@ def server_base_url(request): yield base_url - assert server.returncode == 0 + # NOTE(vytas): Starting with 0.29.0, Uvicorn will propagate signal + # values into the return code (which is a good practice in Unix); + # see also https://github.com/encode/uvicorn/pull/1600 + assert server.returncode in (0, -signal.SIGTERM, _STATUS_CONTROL_C_EXIT) break diff --git a/tests/asgi/test_scope.py b/tests/asgi/test_scope.py index bb60ed0e7..e368f6576 100644 --- a/tests/asgi/test_scope.py +++ b/tests/asgi/test_scope.py @@ -70,7 +70,7 @@ def test_supported_asgi_version(version, supported): resp_event_collector = testing.ASGIResponseEventCollector() async def task(): - coro = asyncio.get_event_loop().create_task( + coro = asyncio.get_running_loop().create_task( app(scope, req_event_emitter, resp_event_collector) ) @@ -142,7 +142,7 @@ def test_lifespan_scope_default_version(): scope = {'type': 'lifespan'} async def t(): - t = asyncio.get_event_loop().create_task( + t = asyncio.get_running_loop().create_task( app(scope, req_event_emitter, resp_event_collector) ) @@ -196,7 +196,7 @@ def test_lifespan_scope_version(spec_version, supported): return async def t(): - t = asyncio.get_event_loop().create_task( + t = asyncio.get_running_loop().create_task( app(scope, req_event_emitter, resp_event_collector) ) diff --git a/tests/dump_asgi.py b/tests/dump_asgi.py index 0742a3ca0..0dfdb4b0a 100644 --- a/tests/dump_asgi.py +++ b/tests/dump_asgi.py @@ -23,5 +23,5 @@ async def app(scope, receive, send): } ) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() loop.create_task(_say_hi())