Skip to content

Commit

Permalink
handle restart/reload/remove
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 11, 2023
1 parent bee0a2d commit 49f01eb
Showing 1 changed file with 49 additions and 19 deletions.
68 changes: 49 additions & 19 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,31 +734,41 @@ def rh_release_and_queue(self, itask) -> None:
self.queue_task(itask)

def _get_spawned_or_merged_task(
self, point: 'PointBase', name: str, flow_nums: 'FlowNums'
) -> 'Tuple[Optional[TaskProxy], bool]':
self, point: 'PointBase', tdef: 'TaskDef', flow_nums: 'FlowNums'
) -> 'Tuple[Optional[TaskProxy], bool, bool]':
"""Return new or existing task point/name with merged flow_nums"""
taskid = Tokens(cycle=str(point), task=name).relative_id
taskid = Tokens(cycle=str(point), task=tdef.name).relative_id
ntask = (
self._get_hidden_task_by_id(taskid)
or self._get_main_task_by_id(taskid)
)
is_in_pool = False
is_xtrig_sequential = False
if ntask is None:
# ntask does not exist: spawn it in the flow.
ntask = self.spawn_task(name, point, flow_nums)
ntask = self.spawn_task(tdef.name, point, flow_nums)
# if the task was found set xtrigger checking type.
if (
ntask is not None
and set(ntask.state.xtriggers.keys()).intersection(
if ntask is not None:
if set(ntask.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
ntask.is_xtrigger_sequential = True
is_xtrig_sequential = True
elif {
xtrig_label
for sequence, xtrig_labels in tdef.xtrig_labels.items()
for xtrig_label in xtrig_labels
if sequence.is_valid(point)
}.intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
):
ntask.is_xtrigger_sequential = True
is_xtrig_sequential = True
else:
# ntask already exists (n=0 or incomplete): merge flows.
is_in_pool = True
self.merge_flows(ntask, flow_nums)
return ntask, is_in_pool # may be None
is_xtrig_sequential = ntask.is_xtrigger_sequential
return ntask, is_in_pool, is_xtrig_sequential # may be None

def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
"""Spawn parentless task instances from point to runahead limit.
Expand All @@ -775,29 +785,34 @@ def spawn_to_rh_limit(self, tdef, point, flow_nums) -> None:
if self.runahead_limit_point is None:
self.compute_runahead()

is_sequential = False
is_xtrig_sequential = False
while point is not None and (point <= self.runahead_limit_point):
if tdef.is_parentless(point):
ntask, is_in_pool = self._get_spawned_or_merged_task(
point, tdef.name, flow_nums
ntask, is_in_pool, is_xtrig_sequential = (
self._get_spawned_or_merged_task(
point,
tdef,
flow_nums
)
)
if ntask is not None:
if not is_in_pool:
self.add_to_pool(ntask)
self.rh_release_and_queue(ntask)
if ntask.is_xtrigger_sequential:
is_sequential = True
break
if is_xtrig_sequential:
break
point = tdef.next_point(point)

# Once more (for the rh-limited task: don't rh release it!)
if (
point is not None
and tdef.is_parentless(point)
and not is_sequential
and not is_xtrig_sequential
):
ntask, is_in_pool = self._get_spawned_or_merged_task(
point, tdef.name, flow_nums
ntask, is_in_pool, _ = self._get_spawned_or_merged_task(
point,
tdef,
flow_nums
)
if ntask is not None and not is_in_pool:
self.add_to_pool(ntask)
Expand Down Expand Up @@ -1048,6 +1063,14 @@ def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
self.check_task_output,
)
self._swap_out(new_task)
# Set xtrigger checking type for parentless spawning.
if (
new_task.tdef.is_parentless(new_task.point)
and set(new_task.state.xtriggers.keys()).intersection(
self.xtrigger_mgr.sequential_xtrigger_labels
)
):
new_task.is_xtrigger_sequential = True
self.data_store_mgr.delta_task_prerequisite(new_task)
LOG.info(f"[{itask}] reloaded task definition")
if itask.state(*TASK_STATUSES_ACTIVE):
Expand Down Expand Up @@ -1705,6 +1728,13 @@ def remove_tasks(self, items):
"""Remove tasks from the pool."""
itasks, _, bad_items = self.filter_task_proxies(items)
for itask in itasks:
# Spawn next occurance of xtrigger sequential task.
if itask.is_xtrigger_sequential:
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
self.remove(itask, 'request')
if self.compute_runahead():
self.release_runahead_tasks()
Expand Down

0 comments on commit 49f01eb

Please sign in to comment.