From ac7c773e0427d50d81a176d708973a769d988a20 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 27 Feb 2024 15:59:35 +1300 Subject: [PATCH] Tweak output logging. --- cylc/flow/task_events_mgr.py | 7 +++++++ cylc/flow/task_pool.py | 15 +++++++++------ cylc/flow/taskdef.py | 7 +++++++ tests/functional/cylc-set/02-off-flow-out.t | 6 +++--- tests/functional/cylc-set/03-set-failed.t | 11 +++++------ tests/integration/test_task_pool.py | 6 +++--- 6 files changed, 34 insertions(+), 18 deletions(-) diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 159b9947ae8..76de7adca3f 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -739,6 +739,7 @@ def process_message( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_FAILED) ): + # Already failed. return True if self._process_message_failed( itask, event_time, self.JOB_FAILED, forced @@ -750,6 +751,7 @@ def process_message( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_SUBMIT_FAILED) ): + # Already submit-failed return True if self._process_message_submit_failed( itask, event_time, submit_num, forced @@ -761,6 +763,7 @@ def process_message( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_SUBMITTED) ): + # Already submitted. return True if ( itask.state.status == TASK_STATUS_PREPARING @@ -787,6 +790,7 @@ def process_message( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_FAILED) ): + # Already failed. return True signal = message[len(FAIL_MESSAGE_PREFIX):] self._db_events_insert(itask, "signaled", signal) @@ -803,6 +807,7 @@ def process_message( flag == self.FLAG_RECEIVED and itask.state.is_gt(TASK_STATUS_FAILED) ): + # Already failed. return True aborted_with = message[len(ABORT_MESSAGE_PREFIX):] self._db_events_insert(itask, "aborted", message) @@ -835,6 +840,8 @@ def process_message( elif completed_output: # Message of a custom task output. # No state change. + # Log completion of o (not needed for standard outputs) + LOG.info(f"[{itask}] completed output {completed_output}") self.setup_event_handlers(itask, completed_output, message) self.spawn_children(itask, msg0) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e06c3e4f039..fe01a9bc53b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1101,13 +1101,17 @@ def log_unsatisfied_prereqs(self) -> bool: task_point = itask.point if self.stop_point and task_point > self.stop_point: continue - for pre in itask.state.get_unsatisfied_prerequisites(): - point, name, output = pre + for point, task, msg in ( + itask.state.get_unsatisfied_prerequisites() + ): if get_point(point) > self.stop_point: continue if itask.identity not in unsat: unsat[itask.identity] = [] - unsat[itask.identity].append(f"{point}/{name}:{output}") + unsat[itask.identity].append( + f"{point}/{task}:" + f"{self.config.get_taskdef(task).get_output(msg)}" + ) if unsat: LOG.warning( "Partially satisfied prerequisites:\n" @@ -1696,7 +1700,7 @@ def _get_task_proxy( return itask def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]': - """Convert prerequisite names to task messages.""" + """Convert prerequisites to task output messages.""" _prereqs = [] for pre in [ Tokens(prereq, relative=True) @@ -1717,7 +1721,7 @@ def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]': def _standardise_outputs( self, point: 'PointBase', tdef: 'TaskDef', outputs: List[str] ) -> List[str]: - """Convert output names to task messages.""" + """Convert output names to task output messages.""" _outputs = [] for output in outputs: try: @@ -1825,7 +1829,6 @@ def _set_outputs_itask( self.task_events_mgr.process_message( itask, logging.INFO, output, forced=True) changed = True - LOG.info(f"output {itask.identity}:{output} completed") if changed and itask.transient: self.workflow_db_mgr.put_update_task_state(itask) diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 868f96d9ab3..0289701afd0 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -179,6 +179,13 @@ def add_output(self, output, message): # optional/required is None until defined by the graph self.outputs[output] = (message, None) + def get_output(self, message): + """Return output name corresponding to task message.""" + for name, (msg, _) in self.outputs.items(): + if msg == message: + return name + raise KeyError(f"Unknown task output message: {message}") + def _add_std_outputs(self): """Add the standard outputs.""" # optional/required is None until defined by the graph diff --git a/tests/functional/cylc-set/02-off-flow-out.t b/tests/functional/cylc-set/02-off-flow-out.t index 1d1afdeac64..a18d3e61fbf 100644 --- a/tests/functional/cylc-set/02-off-flow-out.t +++ b/tests/functional/cylc-set/02-off-flow-out.t @@ -31,14 +31,14 @@ reftest_run grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted' grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started' -grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" 'output 1/a_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed' grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted' grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started' -grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" 'output 1/b_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed' grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted' grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started' -grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" 'output 1/c_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed' purge diff --git a/tests/functional/cylc-set/03-set-failed.t b/tests/functional/cylc-set/03-set-failed.t index b61df13496b..1910e5b3120 100644 --- a/tests/functional/cylc-set/03-set-failed.t +++ b/tests/functional/cylc-set/03-set-failed.t @@ -20,7 +20,7 @@ # https://cylc.github.io/cylc-admin/proposal-cylc-set.html#4-set-jobs-to-failed-when-a-job-platform-is-known-to-be-down . "$(dirname "$0")/test_header" -set_test_number 4 +set_test_number 3 install_and_validate @@ -30,16 +30,15 @@ poll_grep_workflow_log -E "1/foo.* \(internal\)submitted" cylc set -o failed "${WORKFLOW_NAME}//1/foo" -poll_grep_workflow_log -E "1/foo.* => failed" -poll_grep_workflow_log -E "1/foo.* did not complete required outputs" - -cylc stop --now --now --interval=2 --max-polls=5 "${WORKFLOW_NAME}" # Check the log for: # - set completion message # - implied outputs reported as already completed -grep_workflow_log_ok "${TEST_NAME_BASE}-grep-3" 'output 1/foo:failed completed' +poll_grep_workflow_log -E "1/foo.* => failed" +poll_grep_workflow_log -E "1/foo.* did not complete required outputs" + +cylc stop --now --now --interval=2 --max-polls=5 "${WORKFLOW_NAME}" # Check the DB records all the outputs. sqlite3 ~/cylc-run/"${WORKFLOW_NAME}"/log/db \ diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 63698503f85..754c44e90a5 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1297,7 +1297,7 @@ async def test_set_failed_complete( schd.pool.set_prereqs_and_outputs([one.identity], None, None, ['all']) assert log_filter( - log, contains='output 1/one:succeeded completed') + log, contains=f'[{one}] task completed') db_outputs = db_select( schd, True, 'task_outputs', 'outputs', @@ -1491,8 +1491,8 @@ async def test_set_outputs_future( flow=['all'] ) assert log_filter(log, contains="output 1/a:cheese not found") - assert log_filter(log, contains="output 1/a:xylophone completed") - assert log_filter(log, contains="output 1/a:yacht completed") + assert log_filter(log, contains="completed output x") + assert log_filter(log, contains="completed output y") async def test_prereq_satisfaction(