Skip to content

Commit

Permalink
Allow broadcasts to modify sim mode tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Oct 26, 2023
1 parent 8606f93 commit e8ca0c2
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 31 deletions.
1 change: 1 addition & 0 deletions changes.d/5721.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow users to broadcast run_mode to tasks.
6 changes: 4 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1740,11 +1740,13 @@ async def main_loop(self) -> None:

self.pool.set_expired_tasks()
self.release_queued_tasks()

if (
self.pool.config.run_mode('simulation')
and sim_time_check(
self.message_queue, self.pool.get_tasks())
self.message_queue,
self.pool.get_tasks(),
self.broadcast_mgr
)
):
# A simulated task state change occurred.
self.reset_inactivity_timer()
Expand Down
146 changes: 119 additions & 27 deletions cylc/flow/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,59 +37,86 @@
from cylc.flow.task_proxy import TaskProxy


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task defs for simulation and dummy mode.
# Exotic: Recursive Type hint.
NestedDict = Dict[str, Union['NestedDict', Any]]


def configure_sim_modes(taskdefs, sim_mode):
"""Adjust task definitions for simulation and dummy modes.
"""
dummy_mode = bool(sim_mode == 'dummy')

for tdef in taskdefs:
# Compute simulated run time by scaling the execution limit.
rtc = tdef.rtconfig
sleep_sec = get_simulated_run_len(rtc)
configure_rtc_sim_mode(tdef.rtconfig, dummy_mode)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]
def configure_rtc_sim_mode(rtc, dummy_mode):
"""Change a task proxy's runtime config to simulation mode settings.
"""
sleep_sec = get_simulated_run_len(rtc)

rtc['execution time limit'] = (
sleep_sec + DurationParser().parse(str(
rtc['simulation']['time limit buffer'])).get_seconds()
)

rtc['simulation']['simulated run length'] = sleep_sec
rtc['submission retry delays'] = [1]

if dummy_mode:
# Generate dummy scripting.
rtc['init-script'] = ""
rtc['env-script'] = ""
rtc['pre-script'] = ""
rtc['post-script'] = ""
rtc['script'] = build_dummy_script(
rtc, sleep_sec) if dummy_mode else ""
else:
rtc['script'] = ""

disable_platforms(rtc)
disable_platforms(rtc)

# Disable environment, in case it depends on env-script.
rtc['environment'] = {}
rtc['platform'] = 'localhost'

rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)
# Disable environment, in case it depends on env-script.
rtc['environment'] = {}

rtc["simulation"][
"fail cycle points"
] = parse_fail_cycle_points(
rtc["simulation"]["fail cycle points"]
)


def get_simulated_run_len(rtc: Dict[str, Any]) -> int:
"""Get simulated run time.
rtc = run time config
Args:
rtc: run time config
Returns:
Number of seconds to sleep for in sim mode.
"""
# Simulated run length acts as a flag that this is at runtime:
# If durations have already been parsed, trying to parse them
# again will result in failures.
recalc = bool(rtc['simulation'].get('simulated run length', ''))
limit = rtc['execution time limit']
speedup = rtc['simulation']['speedup factor']
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)

if recalc:
if limit and speedup:
sleep_sec = limit / speedup
else:
sleep_sec = rtc['simulation']['default run length']
else:
sleep_sec = DurationParser().parse(
str(rtc['simulation']['default run length'])
).get_seconds()
if limit and speedup:
sleep_sec = (DurationParser().parse(
str(limit)).get_seconds() / speedup)
else:
default_run_len = str(rtc['simulation']['default run length'])
sleep_sec = DurationParser().parse(default_run_len).get_seconds()

return sleep_sec

Expand Down Expand Up @@ -147,7 +174,7 @@ def parse_fail_cycle_points(
[]
"""
f_pts: 'Optional[List[PointBase]]'
if 'all' in f_pts_orig:
if f_pts_orig is None or 'all' in f_pts_orig:
f_pts = None
else:
f_pts = []
Expand All @@ -156,19 +183,84 @@ def parse_fail_cycle_points(
return f_pts


def unpack_dict(dict_: NestedDict, parent_key: str = '') -> Dict[str, Any]:
"""Unpack a nested dict into a single layer.
Examples:
>>> unpack_dict({'foo': 1, 'bar': {'baz': 2, 'qux':3}})
{'foo': 1, 'bar.baz': 2, 'bar.qux': 3}
>>> unpack_dict({'foo': {'example': 42}, 'bar': {"1":2, "3":4}})
{'foo.example': 42, 'bar.1': 2, 'bar.3': 4}
"""
output = {}
for key, value in dict_.items():
new_key = parent_key + '.' + key if parent_key else key
if isinstance(value, dict):
output.update(unpack_dict(value, new_key))
else:
output[new_key] = value

return output


def nested_dict_path_update(
dict_: NestedDict, path: List[Any], value: Any
) -> NestedDict:
"""Set a value in a nested dict.
Examples:
>>> nested_dict_path_update({'foo': {'bar': 1}}, ['foo', 'bar'], 42)
{'foo': {'bar': 42}}
"""
this = dict_
for i in range(len(path)):
if isinstance(this[path[i]], dict):
this = this[path[i]]
else:
this[path[i]] = value
return dict_


def update_nested_dict(rtc: NestedDict, dict_: NestedDict) -> None:
"""Update one config nested dictionary with the contents of another.
Examples:
>>> x = {'foo': {'bar': 12}, 'qux': 77}
>>> y = {'foo': {'bar': 42}}
>>> update_nested_dict(x, y)
>>> print(x)
{'foo': {'bar': 42}, 'qux': 77}
"""
for keylist, value in unpack_dict(dict_).items():
keys = keylist.split('.')
rtc = nested_dict_path_update(rtc, keys, value)


def sim_time_check(
message_queue: 'Queue[TaskMsg]', itasks: 'List[TaskProxy]'
message_queue: 'Queue[TaskMsg]',
itasks: 'List[TaskProxy]',
broadcast_mgr: Optional[Any] = None
) -> bool:
"""Check if sim tasks have been "running" for as long as required.
If they have change the task state.
If broadcasts are active and they apply to tasks in itasks update
itasks.rtconfig.
Returns:
True if _any_ simulated task state has changed.
"""

sim_task_state_changed = False
now = time()
for itask in itasks:
if broadcast_mgr:
broadcast = broadcast_mgr.get_broadcast(itask.tokens)
if broadcast:
update_nested_dict(
itask.tdef.rtconfig, broadcast)
configure_rtc_sim_mode(itask.tdef.rtconfig, False)
if itask.state.status != TASK_STATUS_RUNNING:
continue
# Started time is not set on restart
Expand Down
41 changes: 41 additions & 0 deletions tests/functional/modes/04-simulation-runtime.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Test that we can broadcast an alteration to simulation mode.

. "$(dirname "$0")/test_header"
set_test_number 3

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" --mode=simulation
SCHD_LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"

# If we speed up the simulated task we
# can make it finish before workflow timeout:
cylc broadcast "${WORKFLOW_NAME}" -s '[simulation]speedup factor = 600'

# Wait for the workflow to finish (it wasn't run in no-detach mode):
poll_grep "INFO - DONE" "${SCHD_LOG}"

# If we hadn't changed the speedup factor using broadcast
# The workflow timeout would have been hit:
grep_fail "WARNING - Orphaned tasks" "${SCHD_LOG}"

purge
exit
15 changes: 15 additions & 0 deletions tests/functional/modes/04-simulation-runtime/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[scheduler]
[[events]]
workflow timeout = PT30S

[scheduling]
initial cycle point = 2359
[[graph]]
R1 = get_observations

[runtime]
[[get_observations]]
execution retry delays = PT10M
[[[simulation]]]
speedup factor = 1

2 changes: 2 additions & 0 deletions tests/functional/modes/04-simulation-runtime/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
23590101T0000Z/get_observations -triggered off [] in flow 1
23590101T0000Z/get_observations -triggered off [] in flow 1
8 changes: 6 additions & 2 deletions tests/unit/test_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
param(None, 10, 'PT1H', id='speedup-factor-alone'),
param('PT1H', None, 'PT1H', id='execution-time-limit-alone'),
param('P1D', 24, 'PT1M', id='speed-up-and-execution-tl'),
param(60 * 60 * 24, 24, 'PT1M', id='recalculation'),
param(1, None, 3600, id='recalculation'),
)
)
def test_get_simulated_run_len(
Expand All @@ -49,9 +51,11 @@ def test_get_simulated_run_len(
'execution time limit': execution_time_limit,
'simulation': {
'speedup factor': speedup_factor,
'default run length': default_run_length
}
'default run length': default_run_length,
},
}
if isinstance(execution_time_limit, int):
rtc['simulation']['simulated run length'] = 30
assert get_simulated_run_len(rtc) == 3600


Expand Down

0 comments on commit e8ca0c2

Please sign in to comment.