From 2ec128378ee4f4b17f63ce65dc9d488dd3d4728e Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 29 Feb 2024 10:51:54 +1300 Subject: [PATCH] Fix handling of 'submitted' message, for manual set. --- cylc/flow/flow_mgr.py | 8 ++++---- cylc/flow/task_events_mgr.py | 29 ++++++++++++++--------------- cylc/flow/task_pool.py | 21 +++++++++++++++++---- cylc/flow/task_proxy.py | 11 +++++++++-- cylc/flow/task_state.py | 15 +++++++++++---- tests/unit/test_task_state.py | 21 ++++++++++++++++++++- 6 files changed, 75 insertions(+), 30 deletions(-) diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index e371318a383..f5a930f3a6a 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -86,10 +86,10 @@ def validate_flow_opts(options): def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str: """Return a string representation of a set of flow numbers - If the set contains only the original flow 1, return an empty string - so that users can disregard flows unless they trigger new ones. - - Otherwise return e.g. "(1,2,3)". + Return: + - "none" for no flow + - "" for the original flow (flows only matter if there are several) + - otherwise e.g. "(flow=1,2,3)" Examples: >>> stringify_flow_nums({}) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index e761b0b9a42..9db17679fc3 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -710,7 +710,7 @@ def process_message( f"[{itask}] setting implied output: {implied}") self.process_message( itask, INFO, implied, event_time, - self.FLAG_INTERNAL, submit_num + self.FLAG_INTERNAL, submit_num, forced ) if message == self.EVENT_STARTED: @@ -758,19 +758,12 @@ def process_message( elif message == self.EVENT_SUBMITTED: if ( flag == self.FLAG_RECEIVED - and itask.state.is_gt(TASK_STATUS_SUBMITTED) + and itask.state.is_gte(TASK_STATUS_SUBMITTED) ): # Already submitted. return True - if ( - itask.state.status == TASK_STATUS_PREPARING - or itask.tdef.run_mode == 'simulation' - ): - # If not in the preparing state we already assumed and handled - # job submission under the started event above... - # (sim mode does not have the job prep state) - self._process_message_submitted(itask, event_time, forced) - self.spawn_children(itask, TASK_OUTPUT_SUBMITTED) + self._process_message_submitted(itask, event_time, forced) + self.spawn_children(itask, TASK_OUTPUT_SUBMITTED) # ... but either way update the job ID in the job proxy (it only # comes in via the submission message). @@ -1290,6 +1283,7 @@ def _retry_task(self, itask, wallclock_time, submit_retry=False): os.getenv("CYLC_WORKFLOW_RUN_DIR") ) itask.state.add_xtrigger(label) + if itask.state_reset(TASK_STATUS_WAITING): self.data_store_mgr.delta_task_state(itask) @@ -1437,7 +1431,8 @@ def _process_message_submit_failed( # Register newly submit-failed job with the database and datastore. job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) - self._insert_task_job(itask, event_time, self.JOB_SUBMIT_FAIL_FLAG) + self._insert_task_job( + itask, event_time, self.JOB_SUBMIT_FAIL_FLAG, forced=forced) self.data_store_mgr.delta_job_state( job_tokens, TASK_STATUS_SUBMIT_FAILED @@ -1489,7 +1484,8 @@ def _process_message_submitted( # Register the newly submitted job with the database and datastore. # Do after itask has changed state - self._insert_task_job(itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG) + self._insert_task_job( + itask, event_time, self.JOB_SUBMIT_SUCCESS_FLAG, forced=forced) job_tokens = itask.tokens.duplicate(job=str(itask.submit_num)) self.data_store_mgr.delta_job_time( job_tokens, @@ -1513,7 +1509,8 @@ def _insert_task_job( self, itask: 'TaskProxy', event_time: str, - submit_status: int + submit_status: int, + forced: bool = False ): """Insert a new job proxy into the datastore. @@ -1526,7 +1523,9 @@ def _insert_task_job( # itask.jobs appends for automatic retries (which reuse the same task # proxy) but a retriggered task that was not already in the pool will # not see previous submissions (so can't use itask.jobs[submit_num-1]). - if itask.tdef.run_mode == "simulation": + # And transient tasks, used for setting outputs and spawning children, + # do not submit jobs. + if itask.tdef.run_mode == "simulation" or forced: job_conf = {"submit_num": 0} else: job_conf = itask.jobs[-1] diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index ebfc2bb6ee4..e695d35b813 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -116,7 +116,6 @@ def __init__( self.stop_point = config.stop_point or config.final_point self.workflow_db_mgr: 'WorkflowDatabaseManager' = workflow_db_mgr self.task_events_mgr: 'TaskEventsManager' = task_events_mgr - # TODO this is ugly: self.task_events_mgr.spawn_func = self.spawn_on_output self.data_store_mgr: 'DataStoreMgr' = data_store_mgr self.flow_mgr: 'FlowMgr' = flow_mgr @@ -1278,7 +1277,9 @@ def spawn_on_output(self, itask, output, forced=False): cycle=str(c_point), task=c_name, ).relative_id + c_task = self._get_task_by_id(c_taskid) + if c_task is not None and c_task != itask: # (Avoid self-suicide: A => !A) self.merge_flows(c_task, itask.flow_nums) @@ -1575,6 +1576,10 @@ def spawn_task( prev_completed and not prev_flow_wait and not force ): + LOG.warning( + f"({point}/{name} already completed" + f" in {stringify_flow_nums(flow_nums, full=True)})" + ) return None # If previously completed we just create a transient task proxy to use @@ -1770,6 +1775,12 @@ def set_prereqs_and_outputs( - future tasks must be specified individually - family names are not expanded to members + Note transient tasks are a subset of forced tasks (you can + force-trigger a task that is already in the pool so not transient) + + A forced output cannot cause a state change to submitted or running, + but it can complete a task so that it doesn't need to run. + Args: items: task ID match patterns prereqs: prerequisites to set @@ -1792,6 +1803,7 @@ def set_prereqs_and_outputs( ) for itask in itasks: + # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) @@ -1835,6 +1847,8 @@ def _set_outputs_itask( changed = True if changed and itask.transient: + # TODO DO WE NEED THIS? + print(f"DB UPDATE {itask}") self.workflow_db_mgr.put_update_task_state(itask) self.workflow_db_mgr.put_update_task_outputs(itask) @@ -1975,9 +1989,8 @@ def force_trigger_tasks( If the previous run was not flow-wait - run it, and try to spawn on outputs Else if the previous run was flow-wait: - - just try to spawn, unless flow-wait is set. - - ("try to spawn": unless the output already spawned in the flow) + - just spawn (if not already spawned in this flow) + unless flow-wait is set. """ # Get flow numbers for the tasks to be triggered. diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index c810be1489b..32fe473a326 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -299,9 +299,10 @@ def __str__(self) -> str: Format: "//{}:status". """ id_ = self.identity + if self.transient: + return f"{id_}{stringify_flow_nums(self.flow_nums)}(transient)" if not self.state(TASK_STATUS_WAITING, TASK_STATUS_EXPIRED): id_ += f"/{self.submit_num:02d}" - return ( f"{id_}{stringify_flow_nums(self.flow_nums)}:{self.state}" ) @@ -508,16 +509,22 @@ def state_reset( self, status=None, is_held=None, is_queued=None, is_runahead=None, silent=False, forced=False ) -> bool: - """Set new state and log the change. Return whether it changed.""" + """Set new state and log the change. Return whether it changed. + + """ before = str(self) + if status == TASK_STATUS_EXPIRED: is_queued = False + is_runahead = False + if self.state.reset( status, is_held, is_queued, is_runahead, forced ): if not silent and not self.transient: LOG.info(f"[{before}] => {self.state}") return True + return False def satisfy_me(self, outputs: 'Iterable[Tokens]') -> bool: diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 7844e491911..bc36f81866b 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -414,6 +414,8 @@ def reset( returns: whether state changed or not (bool) """ + req = status + current_status = ( self.status, self.is_held, @@ -432,11 +434,11 @@ def reset( if ( forced and - requested_status in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING] + req in [TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING] ): - # For manual setting of task outputs: return True but don't action - # the state change (there's no real job associated with the task). - return True + # Forced setting of outputs can cause state change to completed + # but not to submitted or running (there's no real job). + return False # perform the actual state change self.status, self.is_held, self.is_queued, self.is_runahead = ( @@ -460,6 +462,11 @@ def is_gt(self, status): return (TASK_STATUSES_ORDERED.index(self.status) > TASK_STATUSES_ORDERED.index(status)) + def is_gte(self, status): + """"Return True if self.status >= status.""" + return (TASK_STATUSES_ORDERED.index(self.status) >= + TASK_STATUSES_ORDERED.index(status)) + def _add_prerequisites(self, point, tdef): """Add task prerequisites.""" # Triggers for sequence_i only used if my cycle point is a diff --git a/tests/unit/test_task_state.py b/tests/unit/test_task_state.py index a64056ef574..e655c74b7bb 100644 --- a/tests/unit/test_task_state.py +++ b/tests/unit/test_task_state.py @@ -21,9 +21,12 @@ from cylc.flow.task_trigger import Dependency, TaskTrigger from cylc.flow.task_state import ( TaskState, + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMIT_FAILED, + TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED, - TASK_STATUS_FAILED, TASK_STATUS_WAITING, + TASK_STATUS_RUNNING, ) @@ -100,3 +103,19 @@ def test_task_prereq_duplicates(set_cycling_type): prereqs = [p.satisfied for p in tstate.prerequisites] assert prereqs == [{("1", "a", "succeeded"): False}] + + +def test_task_state_order(): + """Test is_gt and is_gte methods.""" + + tdef = TaskDef('foo', {}, 'live', IntegerPoint("1"), IntegerPoint("1")) + tstate = TaskState(tdef, IntegerPoint("1"), TASK_STATUS_SUBMITTED, False) + + assert tstate.is_gt(TASK_STATUS_WAITING) + assert tstate.is_gt(TASK_STATUS_PREPARING) + assert tstate.is_gt(TASK_STATUS_SUBMIT_FAILED) + assert not tstate.is_gt(TASK_STATUS_SUBMITTED) + assert tstate.is_gte(TASK_STATUS_SUBMITTED) + assert not tstate.is_gt(TASK_STATUS_RUNNING) + assert not tstate.is_gte(TASK_STATUS_RUNNING) +