diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index effe02e910d..fab68c529c9 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1740,11 +1740,13 @@ async def main_loop(self) -> None: self.pool.set_expired_tasks() self.release_queued_tasks() - if ( self.pool.config.run_mode('simulation') and sim_time_check( - self.message_queue, self.pool.get_tasks()) + self.message_queue, + self.pool.get_tasks(), + self.broadcast_mgr + ) ): # A simulated task state change occurred. self.reset_inactivity_timer() diff --git a/cylc/flow/simulation.py b/cylc/flow/simulation.py index 15314f8e3e7..6a9173922b8 100644 --- a/cylc/flow/simulation.py +++ b/cylc/flow/simulation.py @@ -37,6 +37,10 @@ from cylc.flow.task_proxy import TaskProxy +# Exotic: Recursive Type hint. +NestedDict = Dict[str, Union['NestedDict', Any]] + + def configure_sim_modes(taskdefs, sim_mode): """Adjust task defs for simulation and dummy mode. @@ -45,17 +49,21 @@ def configure_sim_modes(taskdefs, sim_mode): for tdef in taskdefs: # Compute simulated run time by scaling the execution limit. - rtc = tdef.rtconfig - sleep_sec = get_simulated_run_len(rtc) + configure_sim_modes_rtc(tdef.rtconfig, dummy_mode) - rtc['execution time limit'] = ( - sleep_sec + DurationParser().parse(str( - rtc['simulation']['time limit buffer'])).get_seconds() - ) - rtc['simulation']['simulated run length'] = sleep_sec - rtc['submission retry delays'] = [1] +def configure_sim_modes_rtc(rtc, dummy_mode): + sleep_sec = get_simulated_run_len(rtc) + + rtc['execution time limit'] = ( + sleep_sec + DurationParser().parse(str( + rtc['simulation']['time limit buffer'])).get_seconds() + ) + + rtc['simulation']['simulated run length'] = sleep_sec + rtc['submission retry delays'] = [1] + if dummy_mode: # Generate dummy scripting. rtc['init-script'] = "" rtc['env-script'] = "" @@ -63,33 +71,49 @@ def configure_sim_modes(taskdefs, sim_mode): rtc['post-script'] = "" rtc['script'] = build_dummy_script( rtc, sleep_sec) if dummy_mode else "" + else: + rtc['script'] = "" + + disable_platforms(rtc) - disable_platforms(rtc) + rtc['platform'] = 'localhost' - # Disable environment, in case it depends on env-script. - rtc['environment'] = {} + # Disable environment, in case it depends on env-script. + rtc['environment'] = {} - rtc["simulation"][ - "fail cycle points" - ] = parse_fail_cycle_points( - rtc["simulation"]["fail cycle points"] - ) + rtc["simulation"][ + "fail cycle points" + ] = parse_fail_cycle_points( + rtc["simulation"]["fail cycle points"] + ) def get_simulated_run_len(rtc: Dict[str, Any]) -> int: """Get simulated run time. - rtc = run time config + Args: + rtc: run time config + + Returns: + Number of seconds to sleep for in sim mode. """ + # Simulated run length acts as a flag that this is at runtime: + # If durations have already been parsed, trying to parse them + # again will result in failures. + recalc = bool(rtc['simulation'].get('simulated run length', '')) limit = rtc['execution time limit'] speedup = rtc['simulation']['speedup factor'] - if limit and speedup: + + if limit and speedup and recalc: + sleep_sec = limit / speedup + elif limit and speedup: sleep_sec = (DurationParser().parse( str(limit)).get_seconds() / speedup) + elif recalc: + sleep_sec = rtc['simulation']['default run length'] else: - sleep_sec = DurationParser().parse( - str(rtc['simulation']['default run length']) - ).get_seconds() + default_run_len = str(rtc['simulation']['default run length']) + sleep_sec = DurationParser().parse(default_run_len).get_seconds() return sleep_sec @@ -147,7 +171,7 @@ def parse_fail_cycle_points( [] """ f_pts: 'Optional[List[PointBase]]' - if 'all' in f_pts_orig: + if f_pts_orig is None or 'all' in f_pts_orig: f_pts = None else: f_pts = [] @@ -156,8 +180,64 @@ def parse_fail_cycle_points( return f_pts +def unpack_dict(dict_: NestedDict, parent_key: str = '') -> Dict[str, Any]: + """Unpack a nested dict into a single layer. + + Examples: + >>> unpack_dict({'foo': 1, 'bar': {'baz': 2, 'qux':3}}) + {'foo': 1, 'bar.baz': 2, 'bar.qux': 3} + >>> unpack_dict({'foo': {'example': 42}, 'bar': {"1":2, "3":4}}) + {'foo.example': 42, 'bar.1': 2, 'bar.3': 4} + + """ + output = {} + for key, value in dict_.items(): + new_key = parent_key + '.' + key if parent_key else key + if isinstance(value, dict): + output.update(unpack_dict(value, new_key)) + else: + output[new_key] = value + + return output + + +def nested_dict_path_update( + dict_: NestedDict, path: List[Any], value: Any +) -> NestedDict: + """Set a value in a nested dict. + + Examples: + >>> nested_dict_path_update({'foo': {'bar': 1}}, ['foo', 'bar'], 42) + {'foo': {'bar': 42}} + """ + this = dict_ + for i in range(len(path)): + if isinstance(this[path[i]], dict): + this = this[path[i]] + else: + this[path[i]] = value + return dict_ + + +def update_nested_dict(rtc: NestedDict, dict_: NestedDict) -> None: + """Update one config nested dictionary with the contents of another. + + Examples: + >>> x = {'foo': {'bar': 12}, 'qux': 77} + >>> y = {'foo': {'bar': 42}} + >>> update_nested_dict(x, y) + >>> print(x) + {'foo': {'bar': 42}, 'qux': 77} + """ + for keylist, value in unpack_dict(dict_).items(): + keys = keylist.split('.') + rtc = nested_dict_path_update(rtc, keys, value) + + def sim_time_check( - message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]' + message_queue: 'Queue[TaskMsg]', + itasks: 'List[TaskProxy]', + broadcast_mgr: Optional[Any] = None ) -> bool: """Check if sim tasks have been "running" for as long as required. @@ -166,9 +246,16 @@ def sim_time_check( Returns: True if _any_ simulated task state has changed. """ + sim_task_state_changed = False now = time() for itask in itasks: + if broadcast_mgr: + broadcast = broadcast_mgr.get_broadcast(itask.tokens) + if broadcast: + update_nested_dict( + itask.tdef.rtconfig, broadcast) + configure_sim_modes_rtc(itask.tdef.rtconfig, False) if itask.state.status != TASK_STATUS_RUNNING: continue # Started time is not set on restart diff --git a/tests/functional/modes/04-simulation-runtime.t b/tests/functional/modes/04-simulation-runtime.t new file mode 100644 index 00000000000..d40bccc9781 --- /dev/null +++ b/tests/functional/modes/04-simulation-runtime.t @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Test that we can broadcast an alteration to simulation mode. + +. "$(dirname "$0")/test_header" +set_test_number 3 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" +workflow_run_ok "${TEST_NAME_BASE}-run" \ + cylc play "${WORKFLOW_NAME}" --mode=simulation +SCHD_LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log" + +# If we speed up the simulated task we +# can make it finish before workflow timeout: +cylc broadcast "${WORKFLOW_NAME}" -s '[simulation]speedup factor = 600' + +# Wait for the workflow to finish (it wasn't run in no-detach mode): +poll_grep "INFO - DONE" "${SCHD_LOG}" + +# If we hadn't changed the speedup factor using broadcast +# The workflow timeout would have been hit: +grep_fail "WARNING - Orphaned tasks" "${SCHD_LOG}" + +purge +exit diff --git a/tests/functional/modes/04-simulation-runtime/flow.cylc b/tests/functional/modes/04-simulation-runtime/flow.cylc new file mode 100644 index 00000000000..54b85536698 --- /dev/null +++ b/tests/functional/modes/04-simulation-runtime/flow.cylc @@ -0,0 +1,15 @@ +[scheduler] + [[events]] + workflow timeout = PT30S + +[scheduling] + initial cycle point = 2359 + [[graph]] + R1 = get_observations + +[runtime] + [[get_observations]] + execution retry delays = PT10M + [[[simulation]]] + speedup factor = 1 + diff --git a/tests/functional/modes/04-simulation-runtime/reference.log b/tests/functional/modes/04-simulation-runtime/reference.log new file mode 100644 index 00000000000..2d14bc201fb --- /dev/null +++ b/tests/functional/modes/04-simulation-runtime/reference.log @@ -0,0 +1,2 @@ +23590101T0000Z/get_observations -triggered off [] in flow 1 +23590101T0000Z/get_observations -triggered off [] in flow 1 diff --git a/tests/unit/test_simulation.py b/tests/unit/test_simulation.py index 1c490f35c16..f9325f2d9bf 100644 --- a/tests/unit/test_simulation.py +++ b/tests/unit/test_simulation.py @@ -36,6 +36,8 @@ param(None, 10, 'PT1H', id='speedup-factor-alone'), param('PT1H', None, 'PT1H', id='execution-time-limit-alone'), param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'), + param(60 * 60 * 24, 24, 'PT1M', id='recalculation'), + param(1, None, 3600, id='recalculation'), ) ) def test_get_simulated_run_len( @@ -49,9 +51,11 @@ def test_get_simulated_run_len( 'execution time limit': execution_time_limit, 'simulation': { 'speedup factor': speedup_factor, - 'default run length': default_run_length - } + 'default run length': default_run_length, + }, } + if isinstance(execution_time_limit, int): + rtc['simulation']['simulated run length'] = 30 assert get_simulated_run_len(rtc) == 3600