From 3b0f5abf6005433a04b4f1f176f03ee82ef188c6 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 11 Jan 2024 13:55:16 +0000 Subject: [PATCH 1/2] xtriggers: test db / restart interaction --- tests/integration/test_xtrigger_mgr.py | 76 ++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py index b4d2d503799..c116e6968a5 100644 --- a/tests/integration/test_xtrigger_mgr.py +++ b/tests/integration/test_xtrigger_mgr.py @@ -13,10 +13,13 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Tests for the behaviour of xtrigger manager. -""" +"""Tests for the behaviour of xtrigger manager.""" -from pytest_mock import mocker +import asyncio +from pathlib import Path +from textwrap import dedent + +from cylc.flow.pathutil import get_workflow_run_dir async def test_2_xtriggers(flow, start, scheduler, monkeypatch): """Test that if an itask has 2 wall_clock triggers with different @@ -118,4 +121,69 @@ async def test_1_xtrigger_2_tasks(flow, start, scheduler, monkeypatch, mocker): # resulting in two calls to put_xtriggers. This test fails # on master, but with call count 0 (not 2) because the main # loop doesn't run in this test. - + + +async def test_xtriggers_restart(flow, start, scheduler, db_select): + """It should write xtrigger results to the DB and load them on restart.""" + # define a workflow which uses a custom xtrigger + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'xtriggers': { + 'mytrig': 'mytrig()' + }, + 'graph': { + 'R1': '@mytrig => foo' + }, + } + }) + + # add a custom xtrigger to the workflow + run_dir = Path(get_workflow_run_dir(id_)) + xtrig_dir = run_dir / 'lib/python' + xtrig_dir.mkdir(parents=True) + (xtrig_dir / 'mytrig.py').write_text(dedent(''' + from random import random + + def mytrig(*args, **kwargs): + # return a different random number each time + return True, {"x": str(random())} + ''')) + + # start the workflow & run the xtrigger + schd = scheduler(id_) + async with start(schd): + # run all xtriggers + for task in schd.pool.get_tasks(): + schd.xtrigger_mgr.call_xtriggers_async(task) + # one xtrigger should have been scheduled to run + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 1 + # wait for it to return + for _ in range(50): + await asyncio.sleep(0.1) + schd.proc_pool.process() + if len(schd.proc_pool.runnings) == 0: + break + else: + raise Exception('Process pool did not clear') + + # the xtrigger should be written to the DB + db_xtriggers = db_select(schd, True, 'xtriggers') + assert len(db_xtriggers) == 1 + assert db_xtriggers[0][0] == 'mytrig()' + assert db_xtriggers[0][1].startswith('{"x":') + + # restart the workflow, the xtrigger should *not* run again + schd = scheduler(id_) + async with start(schd): + # run all xtriggers + for task in schd.pool.get_tasks(): + schd.xtrigger_mgr.call_xtriggers_async(task) + # the xtrigger should have been loaded from the DB + # (so no xtriggers should be scheduled to run) + assert len(schd.proc_pool.queuings) + len(schd.proc_pool.runnings) == 0 + + # check the DB to ensure no additional entries have been created + assert db_select(schd, True, 'xtriggers') == db_xtriggers From d0eb392acc5e91d85d0fda33d2ae5a0bddca23a0 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Fri, 12 Jan 2024 10:57:41 +0000 Subject: [PATCH 2/2] ensure logger set for all caplog.set_level --- tests/unit/test_flow_mgr.py | 3 ++- tests/unit/test_id_cli.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py index 765fdf0c522..5fea08c1b97 100644 --- a/tests/unit/test_flow_mgr.py +++ b/tests/unit/test_flow_mgr.py @@ -22,6 +22,7 @@ from cylc.flow.flow_mgr import FlowMgr from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager +from cylc.flow import CYLC_LOG FAKE_NOW = datetime.datetime(2020, 12, 25, 17, 5, 55) @@ -44,7 +45,7 @@ def test_all( ): db_mgr = WorkflowDatabaseManager() flow_mgr = FlowMgr(db_mgr) - caplog.set_level(logging.INFO) + caplog.set_level(logging.INFO, CYLC_LOG) count = 1 meta = "the quick brown fox" diff --git a/tests/unit/test_id_cli.py b/tests/unit/test_id_cli.py index 7649e9fa0f2..3db329436bf 100644 --- a/tests/unit/test_id_cli.py +++ b/tests/unit/test_id_cli.py @@ -19,6 +19,7 @@ from pathlib import Path import pytest +from cylc.flow import CYLC_LOG from cylc.flow.async_util import pipe from cylc.flow.exceptions import InputError, WorkflowFilesError from cylc.flow.id import detokenise, tokenise, Tokens @@ -537,7 +538,7 @@ def test_validate_workflow_ids_basic(tmp_run_dir): def test_validate_workflow_ids_warning(caplog): """It should warn when the run number is provided as a cycle point.""" - caplog.set_level(logging.WARN) + caplog.set_level(logging.WARN, CYLC_LOG) _validate_workflow_ids(Tokens('workflow/run1//cycle/task'), src_path='') assert caplog.messages == []