Skip to content

Commit

Permalink
Be less aggressive about stopping the event loop
Browse files Browse the repository at this point in the history
If we stop the event loop during aiorun's shutdown phase, we can cause
all sorts of pain (the loop may stop before all tasks have had a chance
to finish shutting down). We *should* only see `async_main` being
cancelled if aiorun is the one doing it (I hope), and if another
exception is thrown then aiorun's exception handler should see it and
stop the loop for us. Probably.
  • Loading branch information
sersorrel committed Oct 15, 2024
1 parent 9930735 commit c9b7ea3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
44 changes: 21 additions & 23 deletions src/farmer/reporter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,29 +322,27 @@ async def get_job_details(self, *, job_id: str) -> Any:


async def async_main():
try:
reporter = FarmerReporter()
await reporter.start()
# TODO: retry logic?
# (there should be some built into fastapi_websocket_rpc)
disconnected = asyncio.Event()
async def on_disconnect(channel):
disconnected.set()
scheme, *rest = urllib.parse.urlparse(os.environ["FARMER_SERVER_BASE_URL"])
match scheme:
case "http":
scheme = "ws"
case "https":
scheme = "wss"
async with WebSocketRpcClient(
urllib.parse.urljoin(urlunparse((scheme, *rest)), "/internal/ws"),
FarmerReporterRpc(reporter),
on_disconnect=[on_disconnect],
):
await disconnected.wait()
await reporter.stop()
finally:
asyncio.get_running_loop().stop()
reporter = FarmerReporter()
await reporter.start()
# TODO: retry logic?
# (there should be some built into fastapi_websocket_rpc)
disconnected = asyncio.Event()
async def on_disconnect(channel):
disconnected.set()
scheme, *rest = urllib.parse.urlparse(os.environ["FARMER_SERVER_BASE_URL"])
match scheme:
case "http":
scheme = "ws"
case "https":
scheme = "wss"
async with WebSocketRpcClient(
urllib.parse.urljoin(urlunparse((scheme, *rest)), "/internal/ws"),
FarmerReporterRpc(reporter),
on_disconnect=[on_disconnect],
):
await disconnected.wait()
await reporter.stop()
asyncio.get_running_loop().stop()


def main():
Expand Down
20 changes: 9 additions & 11 deletions src/farmer/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,19 +520,17 @@ async def server_cleanup():


async def async_main():
server = uvicorn.Server(uvicorn.Config(ws_app, host="0.0.0.0", port=int(os.environ.get("PORT", 8234)), lifespan="off"))
slack = AsyncSocketModeHandler(slack_bot, os.environ["SLACK_APP_TOKEN"])
# slack.start_async() would not properly dispose of resources on
# exit, so we do it by hand...
await slack.connect_async()
try:
server = uvicorn.Server(uvicorn.Config(ws_app, host="0.0.0.0", port=int(os.environ.get("PORT", 8234)), lifespan="off"))
slack = AsyncSocketModeHandler(slack_bot, os.environ["SLACK_APP_TOKEN"])
# slack.start_async() would not properly dispose of resources on
# exit, so we do it by hand...
await slack.connect_async()
try:
await serve_uvicorn(server)
finally:
logging.debug("farmer quitting")
await slack.close_async()
await serve_uvicorn(server)
finally:
asyncio.get_running_loop().stop()
logging.debug("farmer quitting")
await slack.close_async()
asyncio.get_running_loop().stop()


def main():
Expand Down

0 comments on commit c9b7ea3

Please sign in to comment.