Skip to content

Commit

Permalink
Tweak output logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Feb 27, 2024
1 parent 2dfc50b commit ac7c773
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 18 deletions.
7 changes: 7 additions & 0 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ def process_message(
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_FAILED)
):
# Already failed.
return True
if self._process_message_failed(
itask, event_time, self.JOB_FAILED, forced
Expand All @@ -750,6 +751,7 @@ def process_message(
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_SUBMIT_FAILED)
):
# Already submit-failed
return True
if self._process_message_submit_failed(
itask, event_time, submit_num, forced
Expand All @@ -761,6 +763,7 @@ def process_message(
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_SUBMITTED)
):
# Already submitted.
return True
if (
itask.state.status == TASK_STATUS_PREPARING
Expand All @@ -787,6 +790,7 @@ def process_message(
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_FAILED)
):
# Already failed.
return True
signal = message[len(FAIL_MESSAGE_PREFIX):]
self._db_events_insert(itask, "signaled", signal)
Expand All @@ -803,6 +807,7 @@ def process_message(
flag == self.FLAG_RECEIVED
and itask.state.is_gt(TASK_STATUS_FAILED)
):
# Already failed.
return True
aborted_with = message[len(ABORT_MESSAGE_PREFIX):]
self._db_events_insert(itask, "aborted", message)
Expand Down Expand Up @@ -835,6 +840,8 @@ def process_message(
elif completed_output:
# Message of a custom task output.
# No state change.
# Log completion of o (not needed for standard outputs)
LOG.info(f"[{itask}] completed output {completed_output}")
self.setup_event_handlers(itask, completed_output, message)
self.spawn_children(itask, msg0)

Expand Down
15 changes: 9 additions & 6 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,13 +1101,17 @@ def log_unsatisfied_prereqs(self) -> bool:
task_point = itask.point
if self.stop_point and task_point > self.stop_point:
continue
for pre in itask.state.get_unsatisfied_prerequisites():
point, name, output = pre
for point, task, msg in (
itask.state.get_unsatisfied_prerequisites()
):
if get_point(point) > self.stop_point:
continue
if itask.identity not in unsat:
unsat[itask.identity] = []
unsat[itask.identity].append(f"{point}/{name}:{output}")
unsat[itask.identity].append(
f"{point}/{task}:"
f"{self.config.get_taskdef(task).get_output(msg)}"
)
if unsat:
LOG.warning(
"Partially satisfied prerequisites:\n"
Expand Down Expand Up @@ -1696,7 +1700,7 @@ def _get_task_proxy(
return itask

def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
"""Convert prerequisite names to task messages."""
"""Convert prerequisites to task output messages."""
_prereqs = []
for pre in [
Tokens(prereq, relative=True)
Expand All @@ -1717,7 +1721,7 @@ def _standardise_prereqs(self, prereqs: 'List[str]') -> 'List[Tokens]':
def _standardise_outputs(
self, point: 'PointBase', tdef: 'TaskDef', outputs: List[str]
) -> List[str]:
"""Convert output names to task messages."""
"""Convert output names to task output messages."""
_outputs = []
for output in outputs:
try:
Expand Down Expand Up @@ -1825,7 +1829,6 @@ def _set_outputs_itask(
self.task_events_mgr.process_message(
itask, logging.INFO, output, forced=True)
changed = True
LOG.info(f"output {itask.identity}:{output} completed")

if changed and itask.transient:
self.workflow_db_mgr.put_update_task_state(itask)
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,13 @@ def add_output(self, output, message):
# optional/required is None until defined by the graph
self.outputs[output] = (message, None)

def get_output(self, message):
"""Return output name corresponding to task message."""
for name, (msg, _) in self.outputs.items():
if msg == message:
return name
raise KeyError(f"Unknown task output message: {message}")

Check warning on line 187 in cylc/flow/taskdef.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/taskdef.py#L187

Added line #L187 was not covered by tests

def _add_std_outputs(self):
"""Add the standard outputs."""
# optional/required is None until defined by the graph
Expand Down
6 changes: 3 additions & 3 deletions tests/functional/cylc-set/02-off-flow-out.t
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ reftest_run

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" 'output 1/a_cold:succeeded completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" '1/a_cold.* task completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" 'output 1/b_cold:succeeded completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" '1/b_cold.* task completed'

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting implied output: submitted'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting implied output: started'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" 'output 1/c_cold:succeeded completed'
grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" '1/c_cold.* task completed'

purge
11 changes: 5 additions & 6 deletions tests/functional/cylc-set/03-set-failed.t
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#4-set-jobs-to-failed-when-a-job-platform-is-known-to-be-down

. "$(dirname "$0")/test_header"
set_test_number 4
set_test_number 3

install_and_validate

Expand All @@ -30,16 +30,15 @@ poll_grep_workflow_log -E "1/foo.* \(internal\)submitted"

cylc set -o failed "${WORKFLOW_NAME}//1/foo"

poll_grep_workflow_log -E "1/foo.* => failed"
poll_grep_workflow_log -E "1/foo.* did not complete required outputs"

cylc stop --now --now --interval=2 --max-polls=5 "${WORKFLOW_NAME}"

# Check the log for:
# - set completion message
# - implied outputs reported as already completed

grep_workflow_log_ok "${TEST_NAME_BASE}-grep-3" 'output 1/foo:failed completed'
poll_grep_workflow_log -E "1/foo.* => failed"
poll_grep_workflow_log -E "1/foo.* did not complete required outputs"

cylc stop --now --now --interval=2 --max-polls=5 "${WORKFLOW_NAME}"

# Check the DB records all the outputs.
sqlite3 ~/cylc-run/"${WORKFLOW_NAME}"/log/db \
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ async def test_set_failed_complete(
schd.pool.set_prereqs_and_outputs([one.identity], None, None, ['all'])

assert log_filter(
log, contains='output 1/one:succeeded completed')
log, contains=f'[{one}] task completed')

db_outputs = db_select(
schd, True, 'task_outputs', 'outputs',
Expand Down Expand Up @@ -1491,8 +1491,8 @@ async def test_set_outputs_future(
flow=['all']
)
assert log_filter(log, contains="output 1/a:cheese not found")
assert log_filter(log, contains="output 1/a:xylophone completed")
assert log_filter(log, contains="output 1/a:yacht completed")
assert log_filter(log, contains="completed output x")
assert log_filter(log, contains="completed output y")


async def test_prereq_satisfaction(
Expand Down

0 comments on commit ac7c773

Please sign in to comment.