Skip to content

Commit

Permalink
Don't spawn parentless if removing after flow-stop.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Dec 7, 2023
1 parent 452c05d commit 6e0f050
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 27 deletions.
8 changes: 5 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2086,9 +2086,11 @@ def check_auto_shutdown(self):
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
TASK_STATUS_RUNNING,
) or (
# This is because runahead limit gets truncated
# to stop_point if there is one, so tasks spawned
# beyond the stop_point must be runahead limited.
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
Expand Down
47 changes: 27 additions & 20 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,12 @@ def release_runahead_tasks(self):

for itask in release_me:
self.rh_release_and_queue(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
if itask.flow_nums:
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
released = True

return released
Expand All @@ -303,6 +304,10 @@ def compute_runahead(self, force=False) -> bool:
* The max future offset might have changed.
* The runahead limit config or task pool might have changed (reload).
This is a collective task pool computation. Call it once at the end
of a group operation such as removal of multiple tasks (not after
every individual task operation).
Start from earliest point with unfinished tasks. Partially satisfied
and incomplete tasks count too because they still need to run.
Expand Down Expand Up @@ -767,7 +772,7 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:

def spawn_if_parentless(self, tdef, point, flow_nums):
"""Spawn a task if parentless, regardless of runahead limit."""
if point is not None and tdef.is_parentless(point):
if flow_nums and point is not None and tdef.is_parentless(point):
ntask = self.get_or_spawn_task(
point, tdef.name, flow_nums
)
Expand All @@ -777,7 +782,7 @@ def spawn_if_parentless(self, tdef, point, flow_nums):
def remove(self, itask, reason=""):
"""Remove a task from the pool."""

if itask.state.is_runahead:
if itask.state.is_runahead and itask.flow_nums:
# If removing a parentless runahead-limited task
# auto-spawn its next instance first.
self.spawn_if_parentless(
Expand All @@ -786,7 +791,7 @@ def remove(self, itask, reason=""):
itask.flow_nums
)

msg = "removed from active pool"
msg = "removed"
if reason:
msg += f" ({reason})"
try:
Expand Down Expand Up @@ -1009,6 +1014,9 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
self.config.runtime['descendants']
)

if self.compute_runahead():
self.release_runahead_tasks()

# Now queue all tasks that are ready to run
for itask in self.get_tasks():
# Recreate data store elements from task pool.
Expand Down Expand Up @@ -1367,6 +1375,9 @@ def spawn_on_output(self, itask, output, forced=False):
# Task finished.
self.remove_if_complete(itask)

if self.compute_runahead():
self.release_runahead_tasks()

def remove_if_complete(self, itask):
"""Remove a finished task if required outputs are complete.
Expand All @@ -1384,13 +1395,8 @@ def remove_if_complete(self, itask):
(C7 failed tasks don't count toward runahead limit)
"""
if cylc.flow.flags.cylc7_back_compat:

if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'completed')

if self.compute_runahead():
self.release_runahead_tasks()

return

if itask.state(TASK_STATUS_EXPIRED):
Expand All @@ -1411,9 +1417,6 @@ def remove_if_complete(self, itask):

self.remove(itask, reason)

if self.compute_runahead():
self.release_runahead_tasks()

def spawn_on_all_outputs(
self, itask: TaskProxy, completed_only: bool = False
) -> None:
Expand Down Expand Up @@ -1710,6 +1713,9 @@ def set( # noqa: A003
if trans is not None:
self._set_outputs_itask(trans, outputs)

if self.compute_runahead():
self.release_runahead_tasks()

def _set_outputs_itask(
self,
itask: 'TaskProxy',
Expand Down Expand Up @@ -1920,17 +1926,18 @@ def stop_flow(self, flow_num):
*TASK_STATUSES_ACTIVE, TASK_STATUS_PREPARING)
and not itask.flow_nums
):
# Don't spawn successor if the task is parentless.
self.remove(itask, "flow stopped")

if self.compute_runahead():
self.release_runahead_tasks()

def log_task_pool(self, log_lvl=logging.DEBUG):
"""Log content of task pool, for debugging."""
LOG.log(
log_lvl,
"\n".join(
f"* {itask} status={itask.state.status}"
f" runahead={itask.state.is_runahead}"
f" queued={itask.state.is_queued}"
for itask in self.get_tasks()
f"* {itask}" for itask in self.get_tasks()
)
)

Expand Down
2 changes: 1 addition & 1 deletion tests/functional/hold-release/05-release.t
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
inherit = STOP
script = """
cylc__job__poll_grep_workflow_log -E \
'1/dog1/01:succeeded.* removed from active pool \(completed\)'
'1/dog1/01:succeeded.* removed \(completed\)'
cylc stop "${CYLC_WORKFLOW_ID}"
"""
__FLOW_CONFIG__
Expand Down
4 changes: 1 addition & 3 deletions tests/functional/spawn-on-demand/09-set-outputs/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Test that `cylc set` has the same effect as natural output
# completion: i.e. that downstream children are spawned as normal.

# DEBUG mode required: we search for "removed from active pool" in the log.

[scheduler]
[[events]]
abort on stall timeout = True
Expand Down Expand Up @@ -46,7 +44,7 @@
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/foo"
# Set bar outputs after it is gone from the pool.
cylc__job__poll_grep_workflow_log -E "1/bar.* removed from active pool"
cylc__job__poll_grep_workflow_log -E "1/bar.* removed"
cylc set --flow=2 --output=out1 --output=out2 "${CYLC_WORKFLOW_ID}//1/bar"
"""
[[qux, quw, fux, fuw]]
Expand Down

0 comments on commit 6e0f050

Please sign in to comment.