Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fileinstall: run coros in the background if loop already running #2744

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metomi/rose/config_processors/fileinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Process "file:*" sections in node of a metomi.rose.config_tree.ConfigTree.
"""

from contextlib import suppress
from fnmatch import fnmatch
from glob import glob
from io import BytesIO
Expand Down Expand Up @@ -106,6 +107,9 @@ def process(
finally:
if cwd != os.getcwd():
self.manager.fs_util.chdir(cwd)
if loc_dao.conn:
with suppress(Exception):
loc_dao.conn.close()

def _process(self, conf_tree, nodes, loc_dao, **kwargs):
"""Helper for self.process."""
Expand Down
36 changes: 29 additions & 7 deletions metomi/rose/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
"""A multiprocessing runner of jobs with dependencies."""

import asyncio

from metomi.rose.reporter import Event


# set containing references to "background" coroutines that are not referenced
# from any code (i.e. are not directly awaited), adding them to this list
# avoids the potential for garbage collection to delete them whilst they are
# running
_BACKGROUND_TASKS = set()
Comment on lines +24 to +28
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that background tasks are always referenced whilst they are running.

That is so long as the module doesn't get housekept 🙈

For details: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task



class JobEvent(Event):
"""Event raised when a job completes."""

Expand Down Expand Up @@ -175,19 +183,33 @@ def run(self, job_manager, *args, concurrency=6):
The maximum number of jobs to run concurrently.

"""
running = []
loop = asyncio.get_event_loop()
loop.set_exception_handler(self.job_processor.handle_event)
loop.run_until_complete(
asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
)
)
coro = self._run(job_manager, *args, concurrency=concurrency)
try:
# event loop is not running (e.g. rose CLI use)
loop.run_until_complete(coro)
except RuntimeError:
# event loop is already running (e.g. cylc CLI use)
# WARNING: this starts the file installation running, but it
# doesn't wait for it to finish, that's your problem :(
task = loop.create_task(coro)
# reference this task from a global variable to prevent it from
# being garbage collected
_BACKGROUND_TASKS.add(task)
# tidy up afterwards
task.add_done_callback(_BACKGROUND_TASKS.discard)
dead_jobs = job_manager.get_dead_jobs()
if dead_jobs:
raise JobRunnerNotCompletedError(dead_jobs)

async def _run(self, job_manager, *args, concurrency=6):
running = []
await asyncio.gather(
self._run_jobs(running, job_manager, args, concurrency),
self._post_process_jobs(running, job_manager, args),
)

async def _run_jobs(self, running, job_manager, args, concurrency):
"""Run pending jobs subject to the concurrency limit.

Expand Down
Loading