Skip to content

Commit

Permalink
Force-trigger refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 18, 2024
1 parent 140686d commit 5f71daf
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/scheduler_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,4 +661,4 @@ def _play(parser: COP, options: 'Values', id_: str):
*options.starttask,
relative=True,
)
import colorama, pudb; colorama.deinit(); pudb.set_trace(); colorama.init(); return asyncio.run(scheduler_cli(options, id_))
return asyncio.run(scheduler_cli(options, id_))
221 changes: 140 additions & 81 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,6 @@ def spawn_on_output(self, itask, output, forced=False):
outputs to satisfy any tasks with absolute prerequisites).
Args:
tasks: list of identifiers or task globs.
output: output to spawn on.
forced: True if called from manual set task command
Expand Down Expand Up @@ -1467,75 +1466,100 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:

return True

def _get_task_history(self, name, point, flow_nums):
"""Get details of previous submits for this task.
"""
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
)
try:
submit_num = max(s[0] for s in info)
except ValueError:
# never spawned before in any flow
submit_num = 0

prev_completed = False # did not complete in the flow
prev_flow_wait = False # did not wait in the flow
# orig_fnums = set()

for snum, f_wait, old_fnums, is_complete in info:
if set.intersection(flow_nums, old_fnums):
# matching flows
prev_completed = is_complete
prev_flow_wait = f_wait
# orig_fnums = old_fnums
if not prev_completed:
# There may be multiple entries with flow overlap due
# to merges (they'll have have same snum and f_wait);
# keep going to find the complete one, if any .
continue
LOG.warning(
f"{point}/{name} already completed for flow"
f" {stringify_flow_nums(flow_nums, full=True)} via"
f" {point}/{name}/{snum:02d}"
f"{stringify_flow_nums(old_fnums, full=True)}"
)
break

return submit_num, prev_completed, prev_flow_wait

def _load_historical_outputs(self, itask):
"""Load a task's historical outputs from the DB."""
info = self.workflow_db_mgr.pri_dao.select_task_outputs(
itask.tdef.name, str(itask.point))
if not info:
# task never ran before
self.db_add_new_flow_rows(itask)

Check warning on line 1513 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1513

Added line #L1513 was not covered by tests
else:
for outputs_str, fnums in info.items():
if itask.flow_nums.intersection(fnums):
for msg in json.loads(outputs_str):
itask.state.outputs.set_completed_by_msg(msg)

def spawn_task(
self,
name: str,
point: 'PointBase',
flow_nums: Set[int],
force: bool = False,
is_manual_submit: bool = False,
flow_wait: bool = False,
) -> Optional[TaskProxy]:
"""Spawn a task if not already completed for this flow, or if forced.
If already completed in flow wait, spawn its children via a transient.
# (TODO - use "cylc set" machinery for this?)
The creates the task proxy but does not add it to the pool.
If completed previously with flow wait, just try to spawn children.
# (TODO - reuse "cylc set" machinery for this?)
"""
if not self.can_be_spawned(name, point):
return None

# Get previous submit info for this task.
info = self.workflow_db_mgr.pri_dao.select_prev_instances(
name, str(point)
submit_num, prev_completed, prev_flow_wait = (
self._get_task_history(name, point, flow_nums)
)

try:
submit_num = max(s[0] for s in info)
except ValueError:
# never spawned before.
submit_num = 0

flow_wait_done = False
completed = False
# orig_fnums = set()

# Don't spawn if already completed in this flow, unless forced.
if not force:
for snum, f_wait, old_fnums, is_complete in info:
if set.intersection(flow_nums, old_fnums):
completed = is_complete
flow_wait_done = f_wait
# orig_fnums = old_fnums
if not completed:
# There may be multiple entries with flow overlap due
# to merges (they'll have have same snum and f_wait);
# keep going to find the complete one, if any .
continue
LOG.warning(
f"{point}/{name} already completed for flow"
f" {stringify_flow_nums(flow_nums, full=True)} via"
f" {point}/{name}/{snum:02d}"
f"{stringify_flow_nums(old_fnums, full=True)}"
)
break

if completed and not flow_wait_done and not force:
# If already completed and spawned on outputs, quit unless forced.
if (
prev_completed and not prev_flow_wait
and not force
):
return None

# A transient task is only used for spawning on outputs.
if force:
# spawn into the task pool
transient = False

Check warning on line 1553 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1553

Added line #L1553 was not covered by tests
else:
# spawn into the pool only if not completed in this flow
transient = completed
# transient if completed, else run again.
transient = prev_completed

itask = self._get_task_proxy(
point,
self.config.get_taskdef(name),
flow_nums,
submit_num=submit_num,
is_manual_submit=is_manual_submit,
flow_wait=flow_wait,
transient=transient
)
Expand Down Expand Up @@ -1584,15 +1608,17 @@ def spawn_task(
[f"{a[0]}/{a[1]}:{a[2]}" for a in self.abs_outputs_done]
)

if flow_wait_done:
if prev_flow_wait:
self._spawn_after_flow_wait(itask)
return None

self.db_add_new_flow_rows(itask) # TODO: move this higher up
self.db_add_new_flow_rows(itask)
return itask

def _spawn_after_flow_wait(self, itask: TaskProxy) -> None:
LOG.info(f"spawning children of {itask.identity} after flow wait")
LOG.info(
f"spawning children of {itask.identity} after flow wait"
)
self.spawn_on_all_outputs(itask, completed_only=True)
# update flow wait status in the DB
itask.flow_wait = False
Expand Down Expand Up @@ -1839,6 +1865,27 @@ def _get_flow_nums(
return None
return flow_nums

def _force_trigger(self, itask):
"""Assumes task is in the pool"""
# TODO is this flag still needed, and consistent with "cylc set"?
itask.is_manual_submit = True
itask.reset_try_timers()
if itask.state_reset(TASK_STATUS_WAITING):
# (could also be unhandled failed)
self.data_store_mgr.delta_task_state(itask)
# (No need to set prerequisites satisfied here).
if itask.state.is_runahead:
# Release from runahead, and queue it.
self.rh_release_and_queue(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
else:
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)

def force_trigger_tasks(
self, items: Iterable[str],
flow: List[str],
Expand All @@ -1847,58 +1894,70 @@ def force_trigger_tasks(
):
"""Force a task to trigger (user command).
Always run the task, even if a previous run was flow-waited.
If the task did not run before in the flow:
- run it, and spawn on outputs unless flow-wait is set.
(but load the previous outputs from the DB)
Else if the task ran before in the flow:
- load previous outputs
If the previous run was not flow-wait
- run it, and try to spawn on outputs
Else if the previous run was flow-wait:
- just try to spawn, unless flow-wait is set.
("try to spawn": unless the output already spawned in the flow)
"""
# TODO CHECK FLOW MERGE BASED ON CLI FLOWS

# Get flow numbers for the tasks to be triggered.
flow_nums = self._get_flow_nums(flow, flow_descr)
if flow_nums is None:
return

# Get matching tasks proxies, and matching future task IDs.
itasks, future_tasks, unmatched = self.filter_task_proxies(
items,
future=True,
warn=False,
existing_tasks, future_ids, unmatched = self.filter_task_proxies(
items, future=True, warn=False,
)

# Spawn future task proxies and add to the list.
for name, point in future_tasks:
# (Flow values already validated by the trigger client).
itask = self.spawn_task(
name,
# Trigger existing tasks.
for itask in existing_tasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
self._force_trigger(itask)

# Spawn and trigger future tasks.
for name, point in future_ids:

if not self.can_be_spawned(name, point):
continue

Check warning on line 1936 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1936

Added line #L1936 was not covered by tests

submit_num, _, prev_fwait = self._get_task_history(
name, point, flow_nums)

itask = TaskProxy(
self.tokens,
self.config.get_taskdef(name),
point,
flow_nums,
force=True,
is_manual_submit=True,
flow_wait=flow_wait
flow_wait=flow_wait,
submit_num=submit_num,
)
if itask is None:
continue
itasks.append(itask)

# Trigger matched tasks if not already active.
for itask in itasks:
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
itask.is_manual_submit = True
itask.reset_try_timers()
# (If None, spawner reports cycle bounds errors).
if itask.state_reset(TASK_STATUS_WAITING):
# (could also be unhandled failed)
self.data_store_mgr.delta_task_state(itask)
# (No need to set prerequisites satisfied here).
self.db_add_new_flow_rows(itask)

if prev_fwait:
# update completed outputs from the DB
self._load_historical_outputs(itask)

# run it (or run it again for incomplete flow-wait)
self.add_to_pool(itask)
if itask.state.is_runahead:
# Release from runahead, and queue it.
self.rh_release_and_queue(itask)
self.spawn_to_rh_limit(
itask.tdef,
itask.tdef.next_point(itask.point),
itask.flow_nums
)
else:
# De-queue it to run now.
self.task_queue_mgr.force_release_task(itask)
self._force_trigger(itask)

def clock_expire_tasks(self):
"""Expire any tasks past their clock-expiry time."""
Expand Down
4 changes: 2 additions & 2 deletions tests/functional/flow-triggers/12-all-future-multi/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
)); then
# trigger 3/a in a new flow
cylc trigger --flow=new ${CYLC_WORKFLOW_ID}//3/a
cylc__job__poll_grep_workflow_log -E '3/a.*started'
cylc__job__poll_grep_workflow_log -E '3/a.*=> running'
# trigger 5/a in all flows
cylc trigger ${CYLC_WORKFLOW_ID}//5/a
cylc__job__poll_grep_workflow_log -E '5/a.*started'
cylc__job__poll_grep_workflow_log -E '5/a.*=> running'
fi
"""
2 changes: 1 addition & 1 deletion tests/functional/special/08-clock-trigger-retry.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ workflow_run_ok "${TEST_NAME_BASE}-run" cylc play --no-detach "$WORKFLOW_NAME"

log_scan "${TEST_NAME_BASE}-log-scan" \
"${WORKFLOW_RUN_DIR}/log/scheduler/log" 2 1 \
"\[20150101.*/foo/01.* retrying in PT5S" \
"\[20150101.*/foo/.* retrying in PT5S" \
"xtrigger satisfied: _cylc_retry_20150101"
# (if task resubmits immediately instead of waiting PT5S, xtrigger msg will not appear)

Expand Down

0 comments on commit 5f71daf

Please sign in to comment.