Skip to content

Commit

Permalink
Get poll to return task failure if job/log has been removed.
Browse files Browse the repository at this point in the history
added unit tests for JobRunnerMgr._jobs_poll_status_files

test the task_job_mgr end
  • Loading branch information
wxtim committed Jan 28, 2025
1 parent b942907 commit 2661a36
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 0 deletions.
1 change: 1 addition & 0 deletions changes.d/6577.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cylc poll will now return a task failure if the job log directory has been deleted whilst the task is active, fixing a bug where premature houskeeping left tasks permenantly submitted or running.
6 changes: 6 additions & 0 deletions cylc/flow/job_runner_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,12 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err):
def _jobs_poll_status_files(self, job_log_root, job_log_dir):
"""Helper 1 for self.jobs_poll(job_log_root, job_log_dirs)."""
ctx = JobPollContext(job_log_dir)
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
ctx.run_status = 1
ctx.run_signal = 'ERR/Job files have been removed'
return ctx
try:
with open(
os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS)
Expand Down
6 changes: 6 additions & 0 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,12 @@ def _manip_task_jobs_callback(
or (ctx.ret_code and ctx.ret_code != 255)
):
LOG.error(ctx)
# A polling task lets us know that a task has failed because it's
# log folder has been deleted whilst the task was active:
if 'Job files have been removed' in ctx.out:
LOG.error(
f'Task {ctx.cmd[-1]} failed because task log directory'
f'\n {"/".join(ctx.cmd[-2:])}\n has been removed.')
# A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy
#
# Note for "reload": A TaskProxy instance may be replaced on reload, so
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/test_task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from contextlib import suppress
import logging
from types import SimpleNamespace
from typing import Any as Fixture

from cylc.flow import CYLC_LOG
Expand Down Expand Up @@ -233,3 +234,26 @@ async def test_broadcast_platform_change(
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
# ... and that remote init failed because all hosts bad:
assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)")


async def test_poll_job_deleted_log_folder(
one_conf, flow, scheduler, start, caplog
):
"""Capture a task error caused by polling finding the job log dir deleted.
https://github.com/cylc/cylc-flow/issues/6425
"""
ctx = SimpleNamespace()
ctx.out = 'ERR/Job files have been removed'
ctx.ret_code = None
ctx.cmd = ['foo', 'bar']

schd = scheduler(flow(one_conf), run_mode='live', paused_start=False)
async with start(schd):
schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '')

assert (
'Task bar failed because task log directory'
'\n foo/bar\n has been removed.'
in caplog.messages
)
77 changes: 77 additions & 0 deletions tests/unit/test_job_runner_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.job_runner_mgr import JobRunnerManager

jrm = JobRunnerManager()


SAMPLE_STATUS = """
ignore me, I have no = sign
CYLC_JOB_RUNNER_NAME=pbs
CYLC_JOB_ID=2361713
CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z
CYLC_JOB_PID=2361713
CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z
CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31
CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z
CYLC_JOB_EXIT=SUCCEEDED
CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z
"""


def test__job_poll_status_files(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS)
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.job_runner_name == 'pbs'
assert ctx.job_id == '2361713'
assert ctx.job_runner_exit_polled == 1
assert ctx.pid == '2361713'
assert ctx.time_submit_exit == '2025-01-28T14:46:04Z'
assert ctx.time_run == '2025-01-28T14:46:05Z'
assert ctx.time_run_exit == '2025-01-28T14:46:38Z'
assert ctx.run_status == 0
assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31']


def test__job_poll_status_files_task_failed(tmp_path):
"""Good Path: A valid job.status files exists"""
(tmp_path / 'sub').mkdir()
(tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO")
ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
assert ctx.run_status == 1
assert ctx.run_signal == 'FOO'


def test__job_poll_status_files_deleted_logdir():
"""The log dir has been deleted whilst the task is still active.
Return the context with the message that the task has failed.
"""
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == 'ERR/Job files have been removed'
assert ctx.run_status == 1


def test__job_poll_status_files_ioerror(tmp_path, capsys):
"""There is no readable file.
"""
(tmp_path / 'sub').mkdir()
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err

0 comments on commit 2661a36

Please sign in to comment.