Skip to content

Commit

Permalink
Allow broadcasts to modify sim mode tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Oct 26, 2023
1 parent 8606f93 commit fdc4cef
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 27 deletions.
6 changes: 4 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1740,11 +1740,13 @@ async def main_loop(self) -> None:

self.pool.set_expired_tasks()
self.release_queued_tasks()

if (
self.pool.config.run_mode('simulation')
and sim_time_check(
self.message_queue, self.pool.get_tasks())
self.message_queue,
self.pool.get_tasks(),
self.broadcast_mgr
)
):
# A simulated task state change occurred.
self.reset_inactivity_timer()
Expand Down
133 changes: 110 additions & 23 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
from cylc.flow.task_proxy import TaskProxy


# Exotic: Recursive Type hint.
NestedDict = Dict[str, Union['NestedDict', Any]]


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
Expand All @@ -45,51 +49,71 @@ def configure_sim_modes(taskdefs, sim_mode):

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
sleep_sec = get_simulated_run_len(rtc)
configure_sim_modes_rtc(tdef.rtconfig, dummy_mode)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]
def configure_sim_modes_rtc(rtc, dummy_mode):
sleep_sec = get_simulated_run_len(rtc)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]

if dummy_mode:
# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, sleep_sec) if dummy_mode else ""
else:
rtc['script'] = ""

disable_platforms(rtc)

disable_platforms(rtc)
rtc['platform'] = 'localhost'

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}
# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)
rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)


def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
"""Get simulated run time.
rtc = run time config
Args:
rtc: run time config
Returns:
Number of seconds to sleep for in sim mode.
"""
# Simulated run length acts as a flag that this is at runtime:
# If durations have already been parsed, trying to parse them
# again will result in failures.
recalc = bool(rtc['simulation'].get('simulated run length', ''))
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:

if limit and speedup and recalc:
sleep_sec = limit / speedup
elif limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
elif recalc:
sleep_sec = rtc['simulation']['default run length']
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()
default_run_len = str(rtc['simulation']['default run length'])
sleep_sec = DurationParser().parse(default_run_len).get_seconds()

return sleep_sec

Expand Down Expand Up @@ -147,7 +171,7 @@ def parse_fail_cycle_points(
[]
"""
f_pts: 'Optional[List[PointBase]]'
if 'all' in f_pts_orig:
if f_pts_orig is None or 'all' in f_pts_orig:
f_pts = None
else:
f_pts = []
Expand All @@ -156,8 +180,64 @@ def parse_fail_cycle_points(
return f_pts


def unpack_dict(dict_: NestedDict, parent_key: str = '') -> Dict[str, Any]:
"""Unpack a nested dict into a single layer.
Examples:
>>> unpack_dict({'foo': 1, 'bar': {'baz': 2, 'qux':3}})
{'foo': 1, 'bar.baz': 2, 'bar.qux': 3}
>>> unpack_dict({'foo': {'example': 42}, 'bar': {"1":2, "3":4}})
{'foo.example': 42, 'bar.1': 2, 'bar.3': 4}
"""
output = {}
for key, value in dict_.items():
new_key = parent_key + '.' + key if parent_key else key
if isinstance(value, dict):
output.update(unpack_dict(value, new_key))
else:
output[new_key] = value

return output


def nested_dict_path_update(
dict_: NestedDict, path: List[Any], value: Any
) -> NestedDict:
"""Set a value in a nested dict.
Examples:
>>> nested_dict_path_update({'foo': {'bar': 1}}, ['foo', 'bar'], 42)
{'foo': {'bar': 42}}
"""
this = dict_
for i in range(len(path)):
if isinstance(this[path[i]], dict):
this = this[path[i]]
else:
this[path[i]] = value
return dict_


def update_nested_dict(rtc: NestedDict, dict_: NestedDict) -> None:
"""Update one config nested dictionary with the contents of another.
Examples:
>>> x = {'foo': {'bar': 12}, 'qux': 77}
>>> y = {'foo': {'bar': 42}}
>>> update_nested_dict(x, y)
>>> print(x)
{'foo': {'bar': 42}, 'qux': 77}
"""
for keylist, value in unpack_dict(dict_).items():
keys = keylist.split('.')
rtc = nested_dict_path_update(rtc, keys, value)


def sim_time_check(
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
message_queue: 'Queue[TaskMsg]',
itasks: 'List[TaskProxy]',
broadcast_mgr: Optional[Any] = None
) -> bool:
"""Check if sim tasks have been "running" for as long as required.
Expand All @@ -166,9 +246,16 @@ def sim_time_check(
Returns:
True if _any_ simulated task state has changed.
"""

sim_task_state_changed = False
now = time()
for itask in itasks:
if broadcast_mgr:
broadcast = broadcast_mgr.get_broadcast(itask.tokens)
if broadcast:
update_nested_dict(
itask.tdef.rtconfig, broadcast)
configure_sim_modes_rtc(itask.tdef.rtconfig, False)
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Started time is not set on restart
Expand Down
41 changes: 41 additions & 0 deletions tests/functional/modes/04-simulation-runtime.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Test that we can broadcast an alteration to simulation mode.

. "$(dirname "$0")/test_header"
set_test_number 3

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" --mode=simulation
SCHD_LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"

# If we speed up the simulated task we
# can make it finish before workflow timeout:
cylc broadcast "${WORKFLOW_NAME}" -s '[simulation]speedup factor = 600'

# Wait for the workflow to finish (it wasn't run in no-detach mode):
poll_grep "INFO - DONE" "${SCHD_LOG}"

# If we hadn't changed the speedup factor using broadcast
# The workflow timeout would have been hit:
grep_fail "WARNING - Orphaned tasks" "${SCHD_LOG}"

purge
exit
15 changes: 15 additions & 0 deletions tests/functional/modes/04-simulation-runtime/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[scheduler]
[[events]]
workflow timeout = PT30S

[scheduling]
initial cycle point = 2359
[[graph]]
R1 = get_observations

[runtime]
[[get_observations]]
execution retry delays = PT10M
[[[simulation]]]
speedup factor = 1

2 changes: 2 additions & 0 deletions tests/functional/modes/04-simulation-runtime/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
23590101T0000Z/get_observations -triggered off [] in flow 1
23590101T0000Z/get_observations -triggered off [] in flow 1
8 changes: 6 additions & 2 deletions tests/unit/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
param(None, 10, 'PT1H', id='speedup-factor-alone'),
param('PT1H', None, 'PT1H', id='execution-time-limit-alone'),
param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'),
param(60 * 60 * 24, 24, 'PT1M', id='recalculation'),
param(1, None, 3600, id='recalculation'),
)
)
def test_get_simulated_run_len(
Expand All @@ -49,9 +51,11 @@ def test_get_simulated_run_len(
'execution time limit': execution_time_limit,
'simulation': {
'speedup factor': speedup_factor,
'default run length': default_run_length
}
'default run length': default_run_length,
},
}
if isinstance(execution_time_limit, int):
rtc['simulation']['simulated run length'] = 30
assert get_simulated_run_len(rtc) == 3600


Expand Down

0 comments on commit fdc4cef

Please sign in to comment.