diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index f9b4d4c3243..0a65baea1a9 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -35,6 +35,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Dict, List, NamedTuple, @@ -437,6 +438,9 @@ class TaskEventsManager(): workflow_cfg: Dict[str, Any] uuid_str: str + # To be set by the task pool: + spawn_func: Callable[['TaskProxy', str], Any] + mail_interval: float = 0 mail_smtp: Optional[str] = None mail_footer: Optional[str] = None @@ -459,8 +463,6 @@ def __init__( self._event_timers: Dict[EventKey, Any] = {} # NOTE: flag for DB use self.event_timers_updated = True - # To be set by the task pool: - self.spawn_func = None self.timestamp = timestamp self.bad_hosts = bad_hosts @@ -1966,7 +1968,7 @@ def reset_bad_hosts(self): ) self.bad_hosts.clear() - def spawn_children(self, itask, output): + def spawn_children(self, itask: 'TaskProxy', output: str) -> None: # update DB task outputs self.workflow_db_mgr.put_update_task_outputs(itask) # spawn child-tasks diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e0252956911..7f83df4192f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1329,9 +1329,7 @@ def check_abort_on_task_fails(self): """ return self.abort_task_failed - def spawn_on_output( - self, itask: TaskProxy, output: str, forced: bool = False - ) -> None: + def spawn_on_output(self, itask: TaskProxy, output: str) -> None: """Spawn child-tasks of given output, into the pool. Remove the parent task from the pool if complete. @@ -1352,7 +1350,6 @@ def spawn_on_output( Args: output: output to spawn on. - forced: True if called from manual set task command """ if ( @@ -1363,7 +1360,7 @@ def spawn_on_output( self.abort_task_failed = True children = [] - if itask.flow_nums or forced: + if itask.flow_nums: with suppress(KeyError): children = itask.graph_children[output] @@ -1394,10 +1391,7 @@ def spawn_on_output( if c_task is not None and c_task != itask: # (Avoid self-suicide: A => !A) self.merge_flows(c_task, itask.flow_nums) - elif ( - c_task is None - and (itask.flow_nums or forced) - ): + elif c_task is None and itask.flow_nums: # If child is not in the pool already, and parent belongs to a # flow (so it can spawn children), and parent is not waiting # for an upcoming flow merge before spawning ... then spawn it. diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 9f349c7e78c..e6c50bf8180 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1085,12 +1085,9 @@ async def test_no_flow_tasks_dont_spawn( 'R1': 'a => b => c' } }, - 'scheduler': { - 'allow implicit tasks': 'true', - }, }) - schd = scheduler(id_) + schd: Scheduler = scheduler(id_) async with start(schd): task_a = schd.pool.get_tasks()[0] @@ -1099,29 +1096,22 @@ async def test_no_flow_tasks_dont_spawn( # Set as completed: should not spawn children. schd.pool.set_prereqs_and_outputs( - [task_a.identity], None, None, [FLOW_NONE]) + [task_a.identity], [], [], [FLOW_NONE] + ) + assert not schd.pool.get_tasks() - for flow_nums, force, pool in ( + for flow_nums, expected_pool in ( # outputs yielded from a no-flow task should not spawn downstreams - (set(), False, []), - # forced spawning downstream of a no-flow task should spawn - # downstreams with flow_nums={} - (set(), True, [('1/b', set())]), + (set(), []), # outputs yielded from a task with flow numbers should spawn # downstreams in the same flow - ({1}, False, [('1/b', {1})]), - # forced spawning should work in the same way - ({1}, True, [('1/b', {1})]), + ({1}, [('1/b', {1})]), ): # set the flow-nums on 1/a task_a.flow_nums = flow_nums # spawn on the succeeded output - schd.pool.spawn_on_output( - task_a, - TASK_OUTPUT_SUCCEEDED, - forced=force, - ) + schd.pool.spawn_on_output(task_a, TASK_OUTPUT_SUCCEEDED) schd.pool.spawn_on_all_outputs(task_a) @@ -1129,7 +1119,7 @@ async def test_no_flow_tasks_dont_spawn( assert [ (itask.identity, itask.flow_nums) for itask in schd.pool.get_tasks() - ] == pool + ] == expected_pool async def test_task_proxy_remove_from_queues(