From dc087dd1f8b0d00a0c105e2cb24706fdcaaae7f2 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 7 Jan 2025 02:00:01 +0000 Subject: [PATCH] Extend integration tests. --- tests/integration/test_job_runner_mgr.py | 71 ++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/integration/test_job_runner_mgr.py b/tests/integration/test_job_runner_mgr.py index 39db087f7b..10f4a3193e 100644 --- a/tests/integration/test_job_runner_mgr.py +++ b/tests/integration/test_job_runner_mgr.py @@ -24,6 +24,14 @@ from cylc.flow.pathutil import get_workflow_run_job_dir from cylc.flow.task_state import TASK_STATUS_RUNNING from cylc.flow.subprocctx import SubProcContext +from cylc.flow.task_job_logs import ( + JOB_LOG_JOB, + JOB_LOG_ERR, + JOB_LOG_OUT, + JOB_LOG_STATUS, + JOB_LOG_XTRACE, + JOB_LOG_ACTIVITY +) async def test_kill_error(one, start, test_dir, capsys, log_filter): @@ -82,3 +90,66 @@ async def test_kill_error(one, start, test_dir, capsys, log_filter): level=logging.WARNING, ) assert itask.state(TASK_STATUS_RUNNING) + + +async def test_create_nn_new(one, start, test_dir, capsys, log_filter): + """Test _create_nn. + + It should the NN symlink. + """ + async with start(one): + # make it look like the task is running + itask = one.pool.get_tasks()[0] + + workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow)) + job_id = itask.tokens.duplicate(job='01').relative_id + job_log_dir = Path(workflow_job_log_dir, job_id) + job_log_dir.mkdir(parents=True) + + # call _create_nn + JobRunnerManager()._create_nn(job_log_dir / 'job.out') + + # check the symlink exists + assert (job_log_dir.parent / "NN").is_symlink() + + +async def test_create_nn_old(one, start, test_dir, capsys, log_filter): + """Test _create_nn. + + It should remove existing job logs, if the dir already exists. + """ + async with start(one): + itask = one.pool.get_tasks()[0] + + # fake some old job logs + workflow_job_log_dir = Path(get_workflow_run_job_dir(one.workflow)) + job_id = itask.tokens.duplicate(job='01').relative_id + job_log_dir = Path(workflow_job_log_dir, job_id) + job_log_dir.mkdir(parents=True) + + job_logs = [] + for name in ( + JOB_LOG_JOB, + JOB_LOG_ERR, + JOB_LOG_OUT, + JOB_LOG_STATUS, + JOB_LOG_XTRACE, + JOB_LOG_ACTIVITY + ): + job_logs.append(job_log_dir / name) + + # create the logs + for job_log in job_logs: + job_log.touch() + + # check they exist + for job_log in job_logs: + assert job_log.is_file() + + # call _create_nn + for job_log in job_logs: + JobRunnerManager()._create_nn(job_log) + + # check they were removed + for job_log in job_logs: + assert not job_log.is_file()