Skip to content

Commit

Permalink
Fix handling of 'submitted' message, for manual set.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 3, 2024
1 parent d105dff commit 2ec1283
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 30 deletions.
8 changes: 4 additions & 4 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def validate_flow_opts(options):
def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
If the set contains only the original flow 1, return an empty string
so that users can disregard flows unless they trigger new ones.
Otherwise return e.g. "(1,2,3)".
Return:
- "none" for no flow
- "" for the original flow (flows only matter if there are several)
- otherwise e.g. "(flow=1,2,3)"
Examples:
>>> stringify_flow_nums({})
Expand Down
29 changes: 14 additions & 15 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ def process_message(
f"[{itask}] setting implied output: {implied}")
self.process_message(
itask, INFO, implied, event_time,
self.FLAG_INTERNAL, submit_num
self.FLAG_INTERNAL, submit_num, forced
)

if message == self.EVENT_STARTED:
Expand Down Expand Up @@ -758,19 +758,12 @@ def process_message(
elif message == self.EVENT_SUBMITTED:
if (
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_SUBMITTED)
and itask.state.is_gte(TASK_STATUS_SUBMITTED)
):
# Already submitted.
return True
if (
itask.state.status == TASK_STATUS_PREPARING
or itask.tdef.run_mode == 'simulation'
):
# If not in the preparing state we already assumed and handled
# job submission under the started event above...
# (sim mode does not have the job prep state)
self._process_message_submitted(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED)
self._process_message_submitted(itask, event_time, forced)
self.spawn_children(itask, TASK_OUTPUT_SUBMITTED)

# ... but either way update the job ID in the job proxy (it only
# comes in via the submission message).
Expand Down Expand Up @@ -1290,6 +1283,7 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False):
os.getenv("CYLC_WORKFLOW_RUN_DIR")
)
itask.state.add_xtrigger(label)

if itask.state_reset(TASK_STATUS_WAITING):
self.data_store_mgr.delta_task_state(itask)

Expand Down Expand Up @@ -1437,7 +1431,8 @@ def _process_message_submit_failed(

# Register newly submit-failed job with the database and datastore.
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
self._insert_task_job(itask, event_time, self.JOB_SUBMIT_FAIL_FLAG)
self._insert_task_job(
itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced)
self.data_store_mgr.delta_job_state(
job_tokens,
TASK_STATUS_SUBMIT_FAILED
Expand Down Expand Up @@ -1489,7 +1484,8 @@ def _process_message_submitted(

# Register the newly submitted job with the database and datastore.
# Do after itask has changed state
self._insert_task_job(itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG)
self._insert_task_job(
itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced)
job_tokens = itask.tokens.duplicate(job=str(itask.submit_num))
self.data_store_mgr.delta_job_time(
job_tokens,
Expand All @@ -1513,7 +1509,8 @@ def _insert_task_job(
self,
itask: 'TaskProxy',
event_time: str,
submit_status: int
submit_status: int,
forced: bool = False
):
"""Insert a new job proxy into the datastore.
Expand All @@ -1526,7 +1523,9 @@ def _insert_task_job(
# itask.jobs appends for automatic retries (which reuse the same task
# proxy) but a retriggered task that was not already in the pool will
# not see previous submissions (so can't use itask.jobs[submit_num-1]).
if itask.tdef.run_mode == "simulation":
# And transient tasks, used for setting outputs and spawning children,
# do not submit jobs.
if itask.tdef.run_mode == "simulation" or forced:
job_conf = {"submit_num": 0}
else:
job_conf = itask.jobs[-1]
Expand Down
21 changes: 17 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def __init__(
self.stop_point = config.stop_point or config.final_point
self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr
self.task_events_mgr: 'TaskEventsManager' = task_events_mgr
# TODO this is ugly:
self.task_events_mgr.spawn_func = self.spawn_on_output
self.data_store_mgr: 'DataStoreMgr' = data_store_mgr
self.flow_mgr: 'FlowMgr' = flow_mgr
Expand Down Expand Up @@ -1278,7 +1277,9 @@ def spawn_on_output(self, itask, output, forced=False):
cycle=str(c_point),
task=c_name,
).relative_id

c_task = self._get_task_by_id(c_taskid)

if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
Expand Down Expand Up @@ -1575,6 +1576,10 @@ def spawn_task(
prev_completed and not prev_flow_wait
and not force
):
LOG.warning(
f"({point}/{name} already completed"
f" in {stringify_flow_nums(flow_nums, full=True)})"
)
return None

# If previously completed we just create a transient task proxy to use
Expand Down Expand Up @@ -1770,6 +1775,12 @@ def set_prereqs_and_outputs(
- future tasks must be specified individually
- family names are not expanded to members
Note transient tasks are a subset of forced tasks (you can
force-trigger a task that is already in the pool so not transient)
A forced output cannot cause a state change to submitted or running,
but it can complete a task so that it doesn't need to run.
Args:
items: task ID match patterns
prereqs: prerequisites to set
Expand All @@ -1792,6 +1803,7 @@ def set_prereqs_and_outputs(
)

for itask in itasks:
# Existing task proxies.
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
Expand Down Expand Up @@ -1835,6 +1847,8 @@ def _set_outputs_itask(
changed = True

if changed and itask.transient:
# TODO DO WE NEED THIS?
print(f"DB UPDATE {itask}")
self.workflow_db_mgr.put_update_task_state(itask)
self.workflow_db_mgr.put_update_task_outputs(itask)

Expand Down Expand Up @@ -1975,9 +1989,8 @@ def force_trigger_tasks(
If the previous run was not flow-wait
- run it, and try to spawn on outputs
Else if the previous run was flow-wait:
- just try to spawn, unless flow-wait is set.
("try to spawn": unless the output already spawned in the flow)
- just spawn (if not already spawned in this flow)
unless flow-wait is set.
"""
# Get flow numbers for the tasks to be triggered.
Expand Down
11 changes: 9 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ def __str__(self) -> str:
Format: "<point>/<name>/<job>{<flows>}:status".
"""
id_ = self.identity
if self.transient:
return f"{id_}{stringify_flow_nums(self.flow_nums)}(transient)"
if not self.state(TASK_STATUS_WAITING, TASK_STATUS_EXPIRED):
id_ += f"/{self.submit_num:02d}"

return (
f"{id_}{stringify_flow_nums(self.flow_nums)}:{self.state}"
)
Expand Down Expand Up @@ -508,16 +509,22 @@ def state_reset(
self, status=None, is_held=None, is_queued=None, is_runahead=None,
silent=False, forced=False
) -> bool:
"""Set new state and log the change. Return whether it changed."""
"""Set new state and log the change. Return whether it changed.
"""
before = str(self)

if status == TASK_STATUS_EXPIRED:
is_queued = False
is_runahead = False

if self.state.reset(
status, is_held, is_queued, is_runahead, forced
):
if not silent and not self.transient:
LOG.info(f"[{before}] => {self.state}")
return True

return False

def satisfy_me(self, outputs: 'Iterable[Tokens]') -> bool:
Expand Down
15 changes: 11 additions & 4 deletions cylc/flow/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ def reset(
returns: whether state changed or not (bool)
"""
req = status

current_status = (
self.status,
self.is_held,
Expand All @@ -432,11 +434,11 @@ def reset(

if (
forced and
requested_status in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]
req in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING]
):
# For manual setting of task outputs: return True but don't action
# the state change (there's no real job associated with the task).
return True
# Forced setting of outputs can cause state change to completed
# but not to submitted or running (there's no real job).
return False

# perform the actual state change
self.status, self.is_held, self.is_queued, self.is_runahead = (
Expand All @@ -460,6 +462,11 @@ def is_gt(self, status):
return (TASK_STATUSES_ORDERED.index(self.status) >
TASK_STATUSES_ORDERED.index(status))

def is_gte(self, status):
""""Return True if self.status >= status."""
return (TASK_STATUSES_ORDERED.index(self.status) >=

Check warning on line 467 in cylc/flow/task_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_state.py#L467

Added line #L467 was not covered by tests
TASK_STATUSES_ORDERED.index(status))

def _add_prerequisites(self, point, tdef):
"""Add task prerequisites."""
# Triggers for sequence_i only used if my cycle point is a
Expand Down
21 changes: 20 additions & 1 deletion tests/unit/test_task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
from cylc.flow.task_trigger import Dependency, TaskTrigger
from cylc.flow.task_state import (
TaskState,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED,
TASK_STATUS_WAITING,
TASK_STATUS_RUNNING,
)


Expand Down Expand Up @@ -100,3 +103,19 @@ def test_task_prereq_duplicates(set_cycling_type):
prereqs = [p.satisfied for p in tstate.prerequisites]

assert prereqs == [{("1", "a", "succeeded"): False}]


def test_task_state_order():
"""Test is_gt and is_gte methods."""

tdef = TaskDef('foo', {}, 'live', IntegerPoint("1"), IntegerPoint("1"))
tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_SUBMITTED, False)

assert tstate.is_gt(TASK_STATUS_WAITING)
assert tstate.is_gt(TASK_STATUS_PREPARING)
assert tstate.is_gt(TASK_STATUS_SUBMIT_FAILED)
assert not tstate.is_gt(TASK_STATUS_SUBMITTED)
assert tstate.is_gte(TASK_STATUS_SUBMITTED)
assert not tstate.is_gt(TASK_STATUS_RUNNING)
assert not tstate.is_gte(TASK_STATUS_RUNNING)

0 comments on commit 2ec1283

Please sign in to comment.