diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index 796a5803ceb..4eaaf48ac56 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -92,6 +92,7 @@ def submit_task_job( task_job_mgr.workflow_db_mgr.put_insert_task_jobs( itask, { 'time_submit': now[1], + 'time_run': now[1], 'try_num': itask.get_try_num(), 'flow_nums': str(list(itask.flow_nums)), 'is_manual_submit': itask.is_manual_submit, diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py index 82222b53824..aa0cd576a7e 100644 --- a/tests/integration/run_modes/test_nonlive.py +++ b/tests/integration/run_modes/test_nonlive.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import pytest from typing import Any, Dict # Define here to ensure test doesn't just mirror code: @@ -47,7 +48,41 @@ def not_time(data: Dict[str, Any]): return {k: v for k, v in data.items() if 'time' not in k} -async def test_db_task_jobs(flow, scheduler, start, capture_live_submissions): +@pytest.fixture +def submit_and_check_db(): + """Wraps up testing that we want to do repeatedly in + test_db_task_jobs. + """ + def _inner(schd): + # Submit task jobs: + schd.task_job_mgr.submit_task_jobs( + schd.workflow, + schd.pool.get_tasks(), + schd.server.curve_auth, + schd.server.client_pub_key_dir + ) + # Make sure that db changes are enacted: + schd.workflow_db_mgr.process_queued_ops() + + for mode, kgo in KGO.items(): + task_jobs = schd.workflow_db_mgr.pub_dao.select_task_job(1, mode) + + # Check all non-datetime items against KGO: + assert not_time(task_jobs) == kgo, ( + f'Mode {mode}: incorrect db entries.') + + # Check that timestamps have been created: + for timestamp in [ + 'time_submit', 'time_submit_exit', 'time_run', 'time_run_exit' + ]: + assert task_jobs[timestamp] is not None + return _inner + + +async def test_db_task_jobs( + flow, scheduler, start, capture_live_submissions, + submit_and_check_db +): """Ensure that task job data is added to the database correctly for each run mode. """ @@ -58,37 +93,16 @@ async def test_db_task_jobs(flow, scheduler, start, capture_live_submissions): mode: {'run mode': mode} for mode in KGO} })) async with start(schd): + # Reference all task proxies so we can examine them + # at the end of the test: task_proxies = schd.pool.get_tasks() - schd.task_job_mgr.submit_task_jobs( - schd.workflow, - schd.pool.get_tasks(), - schd.server.curve_auth, - schd.server.client_pub_key_dir - ) - schd.workflow_db_mgr.process_queued_ops() - - for mode, kgo in KGO.items(): - taskdata = not_time( - schd.workflow_db_mgr.pub_dao.select_task_job(1, mode)) - assert taskdata == kgo, ( - f'Mode {mode}: incorrect db entries.') + submit_and_check_db(schd) + # Set outputs to failed: schd.pool.set_prereqs_and_outputs('*', ['failed'], [], []) - schd.task_job_mgr.submit_task_jobs( - schd.workflow, - schd.pool.get_tasks(), - schd.server.curve_auth, - schd.server.client_pub_key_dir - ) - schd.workflow_db_mgr.process_queued_ops() - - for mode, kgo in KGO.items(): - taskdata = not_time( - schd.workflow_db_mgr.pub_dao.select_task_job(1, mode)) - assert taskdata == kgo, ( - f'Mode {mode}: incorrect db entries.') + submit_and_check_db(schd) assert task_proxies[0].run_mode == 'simulation' assert task_proxies[1].run_mode == 'skip'