From c9b7ea3c1be3b320ecd77f7149f19af5f8a1c1d3 Mon Sep 17 00:00:00 2001 From: ash Date: Tue, 15 Oct 2024 11:05:56 +0100 Subject: [PATCH] Be less aggressive about stopping the event loop If we stop the event loop during aiorun's shutdown phase, we can cause all sorts of pain (the loop may stop before all tasks have had a chance to finish shutting down). We *should* only see `async_main` being cancelled if aiorun is the one doing it (I hope), and if another exception is thrown then aiorun's exception handler should see it and stop the loop for us. Probably. --- src/farmer/reporter/__init__.py | 44 ++++++++++++++++----------------- src/farmer/server/__init__.py | 20 +++++++-------- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/farmer/reporter/__init__.py b/src/farmer/reporter/__init__.py index 1ca5cf0..55b97b9 100644 --- a/src/farmer/reporter/__init__.py +++ b/src/farmer/reporter/__init__.py @@ -322,29 +322,27 @@ async def get_job_details(self, *, job_id: str) -> Any: async def async_main(): - try: - reporter = FarmerReporter() - await reporter.start() - # TODO: retry logic? - # (there should be some built into fastapi_websocket_rpc) - disconnected = asyncio.Event() - async def on_disconnect(channel): - disconnected.set() - scheme, *rest = urllib.parse.urlparse(os.environ["FARMER_SERVER_BASE_URL"]) - match scheme: - case "http": - scheme = "ws" - case "https": - scheme = "wss" - async with WebSocketRpcClient( - urllib.parse.urljoin(urlunparse((scheme, *rest)), "/internal/ws"), - FarmerReporterRpc(reporter), - on_disconnect=[on_disconnect], - ): - await disconnected.wait() - await reporter.stop() - finally: - asyncio.get_running_loop().stop() + reporter = FarmerReporter() + await reporter.start() + # TODO: retry logic? + # (there should be some built into fastapi_websocket_rpc) + disconnected = asyncio.Event() + async def on_disconnect(channel): + disconnected.set() + scheme, *rest = urllib.parse.urlparse(os.environ["FARMER_SERVER_BASE_URL"]) + match scheme: + case "http": + scheme = "ws" + case "https": + scheme = "wss" + async with WebSocketRpcClient( + urllib.parse.urljoin(urlunparse((scheme, *rest)), "/internal/ws"), + FarmerReporterRpc(reporter), + on_disconnect=[on_disconnect], + ): + await disconnected.wait() + await reporter.stop() + asyncio.get_running_loop().stop() def main(): diff --git a/src/farmer/server/__init__.py b/src/farmer/server/__init__.py index 28d43d2..effb15d 100644 --- a/src/farmer/server/__init__.py +++ b/src/farmer/server/__init__.py @@ -520,19 +520,17 @@ async def server_cleanup(): async def async_main(): + server = uvicorn.Server(uvicorn.Config(ws_app, host="0.0.0.0", port=int(os.environ.get("PORT", 8234)), lifespan="off")) + slack = AsyncSocketModeHandler(slack_bot, os.environ["SLACK_APP_TOKEN"]) + # slack.start_async() would not properly dispose of resources on + # exit, so we do it by hand... + await slack.connect_async() try: - server = uvicorn.Server(uvicorn.Config(ws_app, host="0.0.0.0", port=int(os.environ.get("PORT", 8234)), lifespan="off")) - slack = AsyncSocketModeHandler(slack_bot, os.environ["SLACK_APP_TOKEN"]) - # slack.start_async() would not properly dispose of resources on - # exit, so we do it by hand... - await slack.connect_async() - try: - await serve_uvicorn(server) - finally: - logging.debug("farmer quitting") - await slack.close_async() + await serve_uvicorn(server) finally: - asyncio.get_running_loop().stop() + logging.debug("farmer quitting") + await slack.close_async() + asyncio.get_running_loop().stop() def main():