diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index dce1b0316a0..1da7401d2d5 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -56,8 +56,9 @@ from cylc.flow.platforms import ( fail_if_platform_and_host_conflict, get_platform_deprecated_settings, is_platform_definition_subshell) +from cylc.flow.run_modes import RunMode from cylc.flow.task_events_mgr import EventData -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import TASK_CONFIG_RUN_MODES # Regex to check whether a string is a command @@ -1338,8 +1339,8 @@ def get_script_common_text(this: str, example: Optional[str] = None): ) Conf( 'run mode', VDR.V_STRING, - options=list(RunMode.OVERRIDING_MODES.value) + [''], - default='', + options=list(TASK_CONFIG_RUN_MODES), + default=RunMode.LIVE.value, desc=f''' For a workflow run in live mode run this task in skip mode. diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 6bd71c431ab..30fa96344d2 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -76,9 +76,10 @@ from cylc.flow.log_level import log_level_to_verbosity from cylc.flow.network.schema import WorkflowStopMode from cylc.flow.parsec.exceptions import ParsecError +from cylc.flow.run_modes import RunMode from cylc.flow.task_id import TaskID from cylc.flow.task_state import ( - TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED, RunMode) + TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED) from cylc.flow.workflow_status import StopMode from metomi.isodatetime.parsers import TimePointParser diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 9ae2a3e132c..49ee30aa73a 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -99,7 +99,7 @@ get_trigger_completion_variable_maps, trigger_to_completion_variable, ) -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode from cylc.flow.task_trigger import TaskTrigger, Dependency from cylc.flow.taskdef import TaskDef from cylc.flow.unicode_rules import ( @@ -1738,10 +1738,6 @@ def process_config_env(self): ] ) - def run_mode(self) -> str: - """Return the run mode.""" - return RunMode.get(self.options) - def _check_task_event_handlers(self): """Check custom event handler templates can be expanded. @@ -2493,7 +2489,10 @@ def _get_taskdef(self, name: str) -> TaskDef: # Get the taskdef object for generating the task proxy class taskd = TaskDef( - name, rtcfg, self.run_mode(), self.start_point, + name, + rtcfg, + RunMode.get(self.options), + self.start_point, self.initial_point) # TODO - put all taskd.foo items in a single config dict diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index efd4bae8415..c98439a020a 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -85,6 +85,7 @@ pdeepcopy, poverride ) +from cylc.flow.run_modes import RunMode from cylc.flow.workflow_status import ( get_workflow_status, get_workflow_status_msg, @@ -699,8 +700,7 @@ def generate_definition_elements(self): time_zone_info = TIME_ZONE_LOCAL_INFO for key, val in time_zone_info.items(): setbuff(workflow.time_zone_info, key, val) - - workflow.run_mode = config.run_mode() + workflow.run_mode = RunMode.get(config.options) workflow.cycling_mode = config.cfg['scheduling']['cycling mode'] workflow.workflow_log_dir = self.schd.workflow_log_dir workflow.job_log_names.extend(list(JOB_LOG_OPTS.values())) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index ba8cad6890d..564d80ba69c 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -49,9 +49,10 @@ ) from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE from cylc.flow.id import Tokens +from cylc.flow.run_modes import ( + TASK_CONFIG_RUN_MODES, WORKFLOW_RUN_MODES, RunMode) from cylc.flow.task_outputs import SORT_ORDERS from cylc.flow.task_state import ( - RunMode, TASK_STATUSES_ORDERED, TASK_STATUS_DESC, TASK_STATUS_WAITING, @@ -605,20 +606,19 @@ def describe_run_mode(run_mode: Optional['Enum']) -> str: return getattr(RunMode, run_mode.value.upper()).__doc__ +# The run mode for the workflow. WorkflowRunMode = graphene.Enum( 'WorkflowRunMode', - [(m.capitalize(), m) for m in RunMode.WORKFLOW_MODES.value], - description=describe_run_mode, + [(m.capitalize(), m) for m in WORKFLOW_RUN_MODES], + description=lambda x: RunMode(x.value).describe() if x else None, ) -"""The run mode for the workflow.""" - +# The run mode for the task. TaskRunMode = graphene.Enum( 'TaskRunMode', - [(m.capitalize(), m) for m in RunMode.WORKFLOW_MODES.value], - description=describe_run_mode, + [(m.capitalize(), m) for m in TASK_CONFIG_RUN_MODES], + description=lambda x: RunMode(x.value).describe() if x else None, ) -"""The run mode for tasks.""" class Workflow(ObjectType): diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index fa49e598ec2..02ff15c5462 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -31,7 +31,7 @@ PlatformLookupError, CylcError, NoHostsError, NoPlatformsError) from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.hostuserutil import is_remote_host -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import JOBLESS_MODES if TYPE_CHECKING: from cylc.flow.parsec.OrderedDict import OrderedDictWithDefaults @@ -267,7 +267,7 @@ def platform_from_name( return platform_data # If platform name in run mode and not otherwise defined: - if platform_name in RunMode.JOBLESS_MODES.value: + if platform_name in JOBLESS_MODES: return platforms['localhost'] raise PlatformLookupError( @@ -652,7 +652,7 @@ def get_install_target_to_platforms_map( Return {install_target_1: [platform_1_dict, platform_2_dict, ...], ...} """ ret: Dict[str, List[Dict[str, Any]]] = {} - for p_name in set(platform_names) - set(RunMode.JOBLESS_MODES.value): + for p_name in set(platform_names) - set(JOBLESS_MODES): try: platform = platform_from_name(p_name) except PlatformLookupError as exc: @@ -665,10 +665,10 @@ def get_install_target_to_platforms_map( # Map jobless modes to localhost. if 'localhost' in ret: ret['localhost'] += [ - {'name': mode} for mode in RunMode.JOBLESS_MODES.value] + {'name': mode} for mode in JOBLESS_MODES] else: ret['localhost'] = [ - {'name': mode} for mode in RunMode.JOBLESS_MODES.value] + {'name': mode} for mode in JOBLESS_MODES] return ret diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index 04ea4596c09..ba9300bd75d 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -272,6 +272,7 @@ def satisfy_me( """ satisfied_message: SatisfiedState + if mode != 'live': satisfied_message = self.DEP_STATE_SATISFIED_BY.format( mode) # type: ignore diff --git a/cylc/flow/run_modes/__init__.py b/cylc/flow/run_modes/__init__.py new file mode 100644 index 00000000000..529be513c6c --- /dev/null +++ b/cylc/flow/run_modes/__init__.py @@ -0,0 +1,141 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from enum import Enum +from typing import TYPE_CHECKING, Callable, Optional, Tuple + +if TYPE_CHECKING: + from optparse import Values + from cylc.flow.task_job_mgr import TaskJobManager + from cylc.flow.task_proxy import TaskProxy + + # The interface for submitting jobs + SubmissionInterface = Callable[ + [ # Args: + # the task job manager instance + 'TaskJobManager', + # the task to submit + 'TaskProxy', + # the task's runtime config (with broadcasts applied) + dict, + # the workflow ID + str, + # the current time as (float_unix_time, str_ISO8601) + Tuple[float, str] + ], + # Return False if the job requires live-mode submission + # (dummy mode does this), else return True. + bool + ] + + +class RunMode(Enum): + """The possible run modes of a task/workflow.""" + + LIVE = 'live' + """Task will run normally.""" + + SIMULATION = 'simulation' + """Simulates job submission with configurable exection time + and succeeded/failed outcomes(but does not submit real jobs).""" + + DUMMY = 'dummy' + """Submits real jobs with empty scripts.""" + + SKIP = 'skip' + """Skips job submission; sets required outputs (by default) or + configured outputs.""" + + def describe(self): + """Return user friendly description of run mode. + + For use by configuration spec documenter. + """ + if self == self.LIVE: + return "Task will run normally." + if self == self.SKIP: + return ( + "Skips job submission; sets required outputs" + " (by default) or configured outputs.") + if self == self.DUMMY: + return "Submits real jobs with empty scripts." + if self == self.SIMULATION: + return ( + "Simulates job submission with configurable" + " exection time and succeeded/failed outcomes" + "(but does not submit real jobs).") + raise KeyError(f'No description for {self}.') + + @staticmethod + def get(options: 'Values') -> str: + """Return the workflow run mode from the options.""" + if hasattr(options, 'run_mode') and options.run_mode: + return options.run_mode + else: + return RunMode.LIVE.value + + def get_submit_method(self) -> 'Optional[SubmissionInterface]': + """Return the job submission method for this run mode. + + This returns None for live-mode jobs as these use a + different code pathway for job submission. + """ + if self == RunMode.DUMMY: + from cylc.flow.run_modes.dummy import ( + submit_task_job as dummy_submit_task_job) + return dummy_submit_task_job + elif self == RunMode.SIMULATION: + from cylc.flow.run_modes.simulation import ( + submit_task_job as simulation_submit_task_job) + return simulation_submit_task_job + elif self == RunMode.SKIP: + from cylc.flow.run_modes.skip import ( + submit_task_job as skip_submit_task_job) + return skip_submit_task_job + return None + + +def disable_task_event_handlers(itask: 'TaskProxy'): + """Should we disable event handlers for this task? + + No event handlers in simulation mode, or in skip mode + if we don't deliberately enable them: + """ + mode = itask.run_mode + return ( + mode == RunMode.SIMULATION.value + or ( + mode == RunMode.SKIP.value + and itask.platform.get( + 'disable task event handlers', False) + ) + ) + + +# Modes available for running a whole workflow: +WORKFLOW_RUN_MODES = frozenset(i.value for i in { + RunMode.LIVE, RunMode.DUMMY, RunMode.SIMULATION}) + +# Modes which can be set in task config: +TASK_CONFIG_RUN_MODES = frozenset( + i.value for i in (RunMode.LIVE, RunMode.SKIP)) +# And those only available to the workflow: +WORKFLOW_ONLY_MODES = frozenset( + i.value for i in RunMode) - TASK_CONFIG_RUN_MODES + +# Modes which completely ignore the standard submission path: +JOBLESS_MODES = frozenset(i.value for i in { + RunMode.SKIP, RunMode.SIMULATION}) diff --git a/cylc/flow/run_modes/dummy.py b/cylc/flow/run_modes/dummy.py index 91935ee5c3b..26d887d87dc 100644 --- a/cylc/flow/run_modes/dummy.py +++ b/cylc/flow/run_modes/dummy.py @@ -26,7 +26,7 @@ get_simulated_run_len, parse_fail_cycle_points ) -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode from cylc.flow.platforms import get_platform diff --git a/cylc/flow/run_modes/nonlive.py b/cylc/flow/run_modes/nonlive.py index 5bea9f70be5..0add79cba4f 100644 --- a/cylc/flow/run_modes/nonlive.py +++ b/cylc/flow/run_modes/nonlive.py @@ -19,7 +19,7 @@ from cylc.flow import LOG from cylc.flow.run_modes.skip import check_task_skip_config -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode if TYPE_CHECKING: from cylc.flow.taskdef import TaskDef diff --git a/cylc/flow/run_modes/simulation.py b/cylc/flow/run_modes/simulation.py index 122277bcf4c..796a5803ceb 100644 --- a/cylc/flow/run_modes/simulation.py +++ b/cylc/flow/run_modes/simulation.py @@ -36,7 +36,7 @@ TASK_STATUS_SUCCEEDED, ) from cylc.flow.wallclock import get_unix_time_from_time_string -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode if TYPE_CHECKING: @@ -73,8 +73,12 @@ def submit_task_job( itask.submit_num += 1 itask.platform = { - 'name': RunMode.SIMULATION.value, 'install target': 'localhost'} - itask.platform['name'] = RunMode.SIMULATION.value + 'name': RunMode.SIMULATION.value, + 'install target': 'localhost', + 'hosts': ['localhost'], + 'disable task event handlers': + rtconfig['simulation']['disable task event handlers'], + } itask.summary['job_runner_name'] = RunMode.SIMULATION.value itask.summary[task_job_mgr.KEY_EXECUTE_TIME_LIMIT] = ( itask.mode_settings.simulated_run_length @@ -311,7 +315,10 @@ def sim_time_check( for itask in itasks: if ( itask.state.status != TASK_STATUS_RUNNING - or itask.run_mode and itask.run_mode != RunMode.SIMULATION.value + or ( + itask.run_mode + and itask.run_mode != RunMode.SIMULATION.value + ) ): continue diff --git a/cylc/flow/run_modes/skip.py b/cylc/flow/run_modes/skip.py index 960301bfabc..8347c71e3dc 100644 --- a/cylc/flow/run_modes/skip.py +++ b/cylc/flow/run_modes/skip.py @@ -26,7 +26,7 @@ TASK_OUTPUT_FAILED, TASK_OUTPUT_STARTED ) -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode if TYPE_CHECKING: from cylc.flow.taskdef import TaskDef @@ -39,6 +39,7 @@ def submit_task_job( task_job_mgr: 'TaskJobManager', itask: 'TaskProxy', rtconfig: Dict, + _workflow: str, now: Tuple[float, str] ) -> 'Literal[True]': """Submit a task in skip mode. @@ -46,10 +47,6 @@ def submit_task_job( Returns: True - indicating that TaskJobManager need take no further action. """ - # Don't do anything if task is held: - if itask.state.is_held: - return True - task_job_mgr._set_retry_timers(itask, rtconfig) itask.summary['started_time'] = now[0] itask.waiting_on_job_prep = False @@ -63,7 +60,6 @@ def submit_task_job( rtconfig['skip']['disable task event handlers'], 'execution polling intervals': [] } - itask.platform['name'] = RunMode.SKIP.value itask.summary['job_runner_name'] = RunMode.SKIP.value itask.run_mode = RunMode.SKIP.value task_job_mgr.workflow_db_mgr.put_insert_task_jobs( diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 133d430fff2..081a5276637 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -113,6 +113,7 @@ from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.workflow_events import WorkflowEventHandler from cylc.flow.workflow_status import StopMode, AutoRestartMode +from cylc.flow.run_modes import RunMode, WORKFLOW_ONLY_MODES from cylc.flow.taskdef import TaskDef from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_job_mgr import TaskJobManager @@ -131,8 +132,7 @@ TASK_STATUS_PREPARING, TASK_STATUS_RUNNING, TASK_STATUS_SUBMITTED, - TASK_STATUS_WAITING, - RunMode) + TASK_STATUS_WAITING) from cylc.flow.templatevars import get_template_vars from cylc.flow.timer import Timer from cylc.flow.util import cli_format @@ -1195,7 +1195,7 @@ def run_event_handlers(self, event, reason=""): Run workflow events only in live mode or skip mode. """ - if self.get_run_mode() in RunMode.NON_OVERRIDABLE_MODES.value: + if self.get_run_mode() in WORKFLOW_ONLY_MODES: return self.workflow_event_handler.handle(self, event, str(reason)) diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index ec450e4cad4..251e941e3a5 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -54,7 +54,7 @@ from cylc.flow.remote import cylc_server_cmd from cylc.flow.scheduler import Scheduler, SchedulerError from cylc.flow.scripts.common import cylc_header -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import WORKFLOW_RUN_MODES from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager from cylc.flow.workflow_files import ( SUITERC_DEPR_MSG, @@ -130,14 +130,15 @@ RUN_MODE = OptionSettings( ["-m", "--mode"], help=( - f"Run mode: {RunMode.WORKFLOW_MODES.value} (default live)." - " Live mode executes the tasks as defined in the runtime section." + f"Run mode: {WORKFLOW_RUN_MODES} (default live)." + " Live mode executes the tasks as defined in the runtime" + " section." " Simulation, skip and dummy modes ignore part of tasks'" " runtime configurations. Simulation and dummy modes are" " designed for testing, and skip mode is for flow control." ), metavar="STRING", action='store', dest="run_mode", - choices=list(RunMode.WORKFLOW_MODES.value), + choices=list(WORKFLOW_RUN_MODES), ) PLAY_RUN_MODE = deepcopy(RUN_MODE) diff --git a/cylc/flow/scripts/validate.py b/cylc/flow/scripts/validate.py index 9e4f8f7cb89..443557375cd 100755 --- a/cylc/flow/scripts/validate.py +++ b/cylc/flow/scripts/validate.py @@ -54,7 +54,7 @@ from cylc.flow.task_proxy import TaskProxy from cylc.flow.templatevars import get_template_vars from cylc.flow.terminal import cli_function -from cylc.flow.task_state import RunMode +from cylc.flow.run_modes import RunMode if TYPE_CHECKING: from cylc.flow.option_parsers import Values diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 5bf6ccae66a..e33ee5afd7d 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -67,6 +67,8 @@ JOB_LOG_OUT, JOB_LOG_ERR, ) +from cylc.flow.run_modes import ( + JOBLESS_MODES, RunMode, disable_task_event_handlers) from cylc.flow.task_message import ( ABORT_MESSAGE_PREFIX, FAIL_MESSAGE_PREFIX, VACATION_MESSAGE_PREFIX) from cylc.flow.task_state import ( @@ -79,7 +81,6 @@ TASK_STATUS_EXPIRED, TASK_STATUS_SUCCEEDED, TASK_STATUS_WAITING, - RunMode, ) from cylc.flow.task_outputs import ( TASK_OUTPUT_EXPIRED, @@ -938,7 +939,7 @@ def _process_message_check( def setup_event_handlers(self, itask, event, message): """Set up handlers for a task event.""" - if RunMode.disable_task_event_handlers(itask): + if disable_task_event_handlers(itask): return msg = "" if message != f"job {event}": @@ -1539,7 +1540,7 @@ def _insert_task_job( # do not submit jobs. if ( not itask.run_mode - or itask.run_mode in RunMode.JOBLESS_MODES.value + or itask.run_mode in JOBLESS_MODES or forced ): job_conf = {"submit_num": itask.submit_num} diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index bd5a221319f..036aa881980 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -35,7 +35,7 @@ ) from shutil import rmtree from time import time -from typing import TYPE_CHECKING, Any, List, Tuple, Union, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Tuple, Union, Optional from cylc.flow import LOG from cylc.flow.job_runner_mgr import JobPollContext @@ -63,14 +63,9 @@ get_platform, ) from cylc.flow.remote import construct_ssh_cmd -from cylc.flow.run_modes.simulation import ( - submit_task_job as simulation_submit_task_job) -from cylc.flow.run_modes.skip import ( - submit_task_job as skip_submit_task_job) -from cylc.flow.run_modes.dummy import ( - submit_task_job as dummy_submit_task_job) from cylc.flow.subprocctx import SubProcContext from cylc.flow.subprocpool import SubProcPool +from cylc.flow.run_modes import RunMode, WORKFLOW_ONLY_MODES from cylc.flow.task_action_timer import ( TaskActionTimer, TimerFlags @@ -109,7 +104,6 @@ TASK_STATUS_RUNNING, TASK_STATUS_WAITING, TASK_STATUSES_ACTIVE, - RunMode ) from cylc.flow.wallclock import ( get_current_time_string, @@ -253,7 +247,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True): return [prepared_tasks, bad_tasks] def submit_task_jobs(self, workflow, itasks, curve_auth, - client_pub_key_dir, run_mode='live'): + client_pub_key_dir, run_mode=RunMode.LIVE): """Prepare for job submission and submit task jobs. Preparation (host selection, remote host init, and remote install) @@ -268,28 +262,42 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, Return (list): list of tasks that attempted submission. """ - itasks, nonlive_tasks = self._nonlive_submit_task_jobs( - itasks, workflow, run_mode) + # submit "simulation/skip" mode tasks, modify "dummy" task configs: + itasks, submitted_nonlive_tasks = self.submit_nonlive_task_jobs( + workflow, itasks, run_mode) + + # submit "live" mode tasks (and "dummy" mode tasks) + submitted_live_tasks = self.submit_livelike_task_jobs( + workflow, itasks, curve_auth, client_pub_key_dir) + + return submitted_nonlive_tasks + submitted_live_tasks + + def submit_livelike_task_jobs( + self, workflow, itasks, curve_auth, client_pub_key_dir + ) -> 'List[TaskProxy]': + """Submission for live tasks and dummy tasks. + """ + done_tasks: 'List[TaskProxy]' = [] + # {platform: [itask, ...], ...} + auth_itasks: 'Dict[str, List[TaskProxy]]' = {} - # Prepare tasks for job submission prepared_tasks, bad_tasks = self.prep_submit_task_jobs( workflow, itasks) # Reset consumed host selection results self.task_remote_mgr.subshell_eval_reset() - if not prepared_tasks and not nonlive_tasks: + if not prepared_tasks: return bad_tasks - elif not prepared_tasks: - return nonlive_tasks - auth_itasks = {} # {platform: [itask, ...], ...} for itask in prepared_tasks: platform_name = itask.platform['name'] auth_itasks.setdefault(platform_name, []) auth_itasks[platform_name].append(itask) + # Submit task jobs for each platform - done_tasks = bad_tasks + nonlive_tasks + # Non-prepared tasks can be considered done for now: + done_tasks = bad_tasks for _, itasks in sorted(auth_itasks.items()): # Find the first platform where >1 host has not been tried and @@ -1013,10 +1021,10 @@ def _set_retry_timers( except KeyError: itask.try_timers[key] = TaskActionTimer(delays=delays) - def _nonlive_submit_task_jobs( + def submit_nonlive_task_jobs( self: 'TaskJobManager', - itasks: 'List[TaskProxy]', workflow: str, + itasks: 'List[TaskProxy]', workflow_run_mode: str, ) -> 'Tuple[List[TaskProxy], List[TaskProxy]]': """Identify task mode and carry out alternative submission @@ -1047,15 +1055,15 @@ def _nonlive_submit_task_jobs( # Get task config with broadcasts applied: rtconfig = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig( itask) - # Apply task run mode - if workflow_run_mode in RunMode.NON_OVERRIDABLE_MODES.value: + if workflow_run_mode in WORKFLOW_ONLY_MODES: # Task run mode cannot override workflow run-mode sim or dummy: run_mode = workflow_run_mode else: # If workflow mode is skip or live and task mode is set, # override workflow mode, else use workflow mode. run_mode = rtconfig.get('run mode', None) or workflow_run_mode + # Store the run mode of the this submission: itask.run_mode = run_mode @@ -1063,19 +1071,15 @@ def _nonlive_submit_task_jobs( # tasks to list of tasks to put through live # submission pipeline - We decide based on the output # of the submit method: - is_nonlive = False - if run_mode == RunMode.DUMMY.value: - is_nonlive = dummy_submit_task_job( - self, itask, rtconfig, workflow, now) - elif run_mode == RunMode.SIMULATION.value: - is_nonlive = simulation_submit_task_job( + submit_func = RunMode(run_mode).get_submit_method() + if not submit_func: + # Return to nonlive. + nonlive_mode = False + else: + nonlive_mode = submit_func( self, itask, rtconfig, workflow, now) - elif run_mode == RunMode.SKIP.value: - is_nonlive = skip_submit_task_job( - self, itask, rtconfig, now) - # Assign task to list: - if is_nonlive: + if nonlive_mode: nonlive_tasks.append(itask) else: lively_tasks.append(itask) diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 5fb5a934935..92a810f16b6 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -642,7 +642,6 @@ def iter_required_messages( set(self._message_to_compvar.values()), force_optional=exclude ).items(): - # breakpoint(header=f"=== {compvar=}, {is_optional=} ===") if is_optional is False: for message, _compvar in self._message_to_compvar.items(): if _compvar == compvar: diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 31a57788451..9de77bbfe4f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -42,6 +42,7 @@ from cylc.flow.id import Tokens, detokenise from cylc.flow.id_cli import contains_fnmatch from cylc.flow.id_match import filter_ids +from cylc.flow.run_modes import RunMode from cylc.flow.workflow_status import StopMode from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags from cylc.flow.task_events_mgr import ( @@ -53,7 +54,6 @@ from cylc.flow.task_id import TaskID from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( - RunMode, TASK_STATUSES_ACTIVE, TASK_STATUSES_FINAL, TASK_STATUS_WAITING, @@ -1416,9 +1416,10 @@ def spawn_on_output(self, itask, output, forced=False): tasks = [c_task] for t in tasks: + t.satisfy_me( [itask.tokens.duplicate(task_sel=output)], - getattr(itask.tdef, 'run_mode', RunMode.LIVE.value) + mode=itask.run_mode ) self.data_store_mgr.delta_task_prerequisite(t) if not in_pool: @@ -1544,7 +1545,7 @@ def spawn_on_all_outputs( if completed_only: c_task.satisfy_me( [itask.tokens.duplicate(task_sel=message)], - itask.run_mode + mode=itask.run_mode ) self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index b25fa8403ae..edf30c803b1 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -38,9 +38,9 @@ from cylc.flow import LOG from cylc.flow.flow_mgr import stringify_flow_nums from cylc.flow.platforms import get_platform +from cylc.flow.run_modes import RunMode from cylc.flow.task_action_timer import TimerFlags from cylc.flow.task_state import ( - RunMode, TaskState, TASK_STATUS_WAITING, TASK_STATUS_EXPIRED, @@ -560,6 +560,7 @@ def satisfy_me( Return a set of unmatched task messages. """ + used = self.state.satisfy_me(task_messages, mode) return set(task_messages) - used diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 838414198f6..8447a7bed6d 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -16,8 +16,6 @@ """Task state related logic.""" - -from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -41,7 +39,6 @@ if TYPE_CHECKING: from cylc.flow.cycling import PointBase - from cylc.flow.option_parsers import Values from cylc.flow.id import Tokens from cylc.flow.prerequisite import PrereqMessage from cylc.flow.taskdef import TaskDef @@ -179,73 +176,6 @@ } -class RunMode(Enum): - """The possible run modes of a task/workflow.""" - - LIVE = 'live' - """Task will run normally.""" - - SIMULATION = 'simulation' - """Simulates job submission with configurable exection time - and succeeded/failed outcomes(does not submit real jobs).""" - - DUMMY = 'dummy' - """Submits real jobs with empty scripts.""" - - SKIP = 'skip' - """Skips job submission; sets required outputs (by default) or - configured outputs.""" - - WORKFLOW_MODES = (LIVE, DUMMY, SIMULATION, SKIP) - """Workflow mode not sensible mode for workflow. - - n.b. not using a set to ensure ordering in CLI - """ - - OVERRIDING_MODES = frozenset({LIVE, SKIP}) - """Modes which can be set in task config.""" - - NON_OVERRIDABLE_MODES = frozenset({SIMULATION, DUMMY}) - - JOBLESS_MODES = frozenset({SKIP, SIMULATION}) - """Modes which completely ignore the standard submission path.""" - - def describe(self): - """Return user friendly description of run mode. - - For use by configuration spec documenter. - """ - if self == self.LIVE: - return "Task will run normally." - if self == self.SKIP: - return ( - "Skips job submission; sets required outputs" - " (by default) or configured outputs.") - raise KeyError(f'No description for {self}.') - - @staticmethod - def get(options: 'Values') -> str: - """Return the workflow run mode from the options.""" - return getattr(options, 'run_mode', None) or RunMode.LIVE.value - - @staticmethod - def disable_task_event_handlers(itask): - """Should we disable event handlers for this task? - - No event handlers in simulation mode, or in skip mode - if we don't deliberately enable them: - """ - mode = itask.run_mode - return ( - mode == RunMode.SIMULATION.value - or ( - mode == RunMode.SKIP.value - and itask.platform.get( - 'disable task event handlers', False) - ) - ) - - def status_leq(status_a, status_b): """"Return True if status_a <= status_b""" return (TASK_STATUSES_ORDERED.index(status_a) <= diff --git a/cylc/flow/unicode_rules.py b/cylc/flow/unicode_rules.py index 0dbb5aa22f9..b24d576332d 100644 --- a/cylc/flow/unicode_rules.py +++ b/cylc/flow/unicode_rules.py @@ -22,8 +22,9 @@ _TASK_NAME_CHARACTERS, _TASK_NAME_PREFIX, ) +from cylc.flow.run_modes import RunMode from cylc.flow.task_qualifiers import TASK_QUALIFIERS -from cylc.flow.task_state import TASK_STATUSES_ORDERED, RunMode +from cylc.flow.task_state import TASK_STATUSES_ORDERED ENGLISH_REGEX_MAP = { r'\w': 'alphanumeric', diff --git a/tests/functional/cylc-config/00-simple/section2.stdout b/tests/functional/cylc-config/00-simple/section2.stdout index 049db739435..559d1c2556c 100644 --- a/tests/functional/cylc-config/00-simple/section2.stdout +++ b/tests/functional/cylc-config/00-simple/section2.stdout @@ -15,7 +15,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[meta]]] title = description = @@ -94,7 +94,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[meta]]] title = description = @@ -173,7 +173,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[meta]]] title = description = @@ -252,7 +252,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = serial [[[meta]]] @@ -332,7 +332,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = parallel [[[meta]]] @@ -412,7 +412,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = serial [[[meta]]] @@ -492,7 +492,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = serial [[[meta]]] @@ -572,7 +572,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = parallel [[[meta]]] @@ -652,7 +652,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = parallel [[[meta]]] @@ -732,7 +732,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = serial [[[meta]]] @@ -812,7 +812,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = serial [[[meta]]] @@ -892,7 +892,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = parallel [[[meta]]] @@ -972,7 +972,7 @@ execution time limit = submission polling intervals = submission retry delays = - run mode = + run mode = live [[[directives]]] job_type = parallel [[[meta]]] diff --git a/tests/functional/run_modes/06-run-mode-overrides.t b/tests/functional/run_modes/06-run-mode-overrides.t index f6d4faafb30..c7fc3325b8f 100644 --- a/tests/functional/run_modes/06-run-mode-overrides.t +++ b/tests/functional/run_modes/06-run-mode-overrides.t @@ -18,7 +18,7 @@ # Testing Skip mode functionality. . "$(dirname "$0")/test_header" -set_test_number 11 +set_test_number 6 # Install and run the workflow in live mode (default). # Check that tasks with run mode unset and run mode = live @@ -41,26 +41,5 @@ done JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1001" run_fail "${TEST_NAME}:broadcast run mode=skip" ls "${JOB_LOGS}/default_/" -purge - -# Install and run the workflow in skip mode. -# Check that tasks with run mode unset and run mode = skip -# don't leave log files, and that skip mode tasks does. -TEST_NAME="${TEST_NAME_BASE}:skip-workflow" -install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" -workflow_run_ok "${TEST_NAME}:run" \ - cylc play "${WORKFLOW_NAME}" \ - --no-detach \ - --mode skip \ - --set='changemode="live"' \ - --final-cycle-point=1000 - -JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1000" -run_ok "${TEST_NAME}:run mode=live" ls "${JOB_LOGS}/live_" -run_fail "${TEST_NAME}:run mode=default" ls "${JOB_LOGS}/default_" -run_fail "${TEST_NAME}:run mode=skip" ls "${JOB_LOGS}/skip_" -JOB_LOGS="${WORKFLOW_RUN_DIR}/log/job/1000" -named_grep_ok "${TEST_NAME}:run mode=live" "===.*===" "${JOB_LOGS}/live_/NN/job.out" - purge exit 0 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index bce6ea64e9f..2f0aa5afab4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -33,6 +33,7 @@ install as cylc_install, get_option_parser as install_gop ) +from cylc.flow.task_state import TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED from cylc.flow.util import serialise_set from cylc.flow.wallclock import get_current_time_string from cylc.flow.workflow_files import infer_latest_run_from_id @@ -47,6 +48,7 @@ _start_flow, ) + if TYPE_CHECKING: from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.scheduler import Scheduler @@ -672,3 +674,43 @@ async def _reftest( return triggers return _reftest + + +@pytest.fixture +def capture_live_submissions(capcall, monkeypatch): + """Capture live submission attempts. + + This prevents real jobs from being submitted to the system. + + If you call this fixture from a test, it will return a set of tasks that + would have been submitted had this fixture not been used. + """ + def fake_submit(self, _workflow, itasks, *_): + self.submit_nonlive_task_jobs(_workflow, itasks, 'simulation') + for itask in itasks: + for status in (TASK_STATUS_SUBMITTED, TASK_STATUS_SUCCEEDED): + self.task_events_mgr.process_message( + itask, + 'INFO', + status, + '2000-01-01T00:00:00Z', + '(received)', + ) + return itasks + + # suppress and capture live submissions + submit_live_calls = capcall( + 'cylc.flow.task_job_mgr.TaskJobManager.submit_livelike_task_jobs', + fake_submit) + + + + def get_submissions(): + nonlocal submit_live_calls + return { + itask.identity + for ((_self, _workflow, itasks, *_), _kwargs) in submit_live_calls + for itask in itasks + } + + return get_submissions diff --git a/tests/integration/run_modes/test_mode_overrides.py b/tests/integration/run_modes/test_mode_overrides.py index f9ab318e0e6..c54065e8e21 100644 --- a/tests/integration/run_modes/test_mode_overrides.py +++ b/tests/integration/run_modes/test_mode_overrides.py @@ -30,48 +30,41 @@ import pytest +from cylc.flow.run_modes import WORKFLOW_RUN_MODES -@pytest.mark.parametrize( - 'workflow_run_mode', [('live'), ('skip')]) + +@pytest.mark.parametrize('workflow_run_mode', sorted(WORKFLOW_RUN_MODES)) async def test_run_mode_override_from_config( - workflow_run_mode, flow, scheduler, run, complete, log_filter + capture_live_submissions, + flow, + scheduler, + run, + complete, + workflow_run_mode ): - """Test that ``[runtime][TASK]run mode`` overrides workflow modes. - """ - cfg = { - "scheduler": {"cycle point format": "%Y"}, - "scheduling": { - "initial cycle point": "1000", - "final cycle point": "1000", - "graph": {"P1Y": "live_\nskip_\ndefault_"}}, - "runtime": { - "skip_": {"run mode": "skip"}, - "live_": {"run mode": "live"} + """Test that `[runtime][]run mode` overrides workflow modes.""" + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'live & skip', + }, + }, + 'runtime': { + 'live': {'run mode': 'live'}, + 'skip': {'run mode': 'skip'}, } - } - id_ = flow(cfg) + }) schd = scheduler(id_, run_mode=workflow_run_mode, paused_start=False) - expect_template = ( - '[1000/{}_/01:preparing] submitted to localhost:background') - - async with run(schd) as log: + async with run(schd): await complete(schd) - # Live task has been really submitted: - assert log_filter(log, contains=expect_template.format('live')) - - # Default is the same as workflow: - if workflow_run_mode == 'live': - assert log_filter(log, contains=expect_template.format('default')) - else: - assert log_filter( - log, contains='[1000/default_/01:running] => succeeded') - assert not log_filter( - log, contains=expect_template.format('default')) - - # Skip task has run, but not actually been submitted: - assert log_filter(log, contains='[1000/skip_/01:running] => succeeded') - assert not log_filter(log, contains=expect_template.format('skip')) + if workflow_run_mode == 'live': + assert capture_live_submissions() == {'1/live'} + elif workflow_run_mode == 'dummy': + # Skip mode doesn't override dummy mode: + assert capture_live_submissions() == {'1/live', '1/skip'} + else: + assert capture_live_submissions() == set() async def test_force_trigger_does_not_override_run_mode( @@ -81,34 +74,19 @@ async def test_force_trigger_does_not_override_run_mode( ): """Force-triggering a task will not override the run mode. - Tasks with run mode = skip will continue to abide by - the is_held flag as normal. - Taken from spec at - https://github.com/cylc/cylc-admin/blob/master/ - docs/proposal-skip-mode.md#proposal + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal """ wid = flow({ 'scheduling': {'graph': {'R1': 'foo'}}, 'runtime': {'foo': {'run mode': 'skip'}} }) - schd = scheduler(wid) + schd = scheduler(wid, run_mode="live") async with start(schd): - # Check that task isn't held at first foo = schd.pool.get_tasks()[0] - assert foo.state.is_held is False - # Hold task, check that it's held: - schd.pool.hold_tasks('1/foo') - assert foo.state.is_held is True - - # Trigger task, check that it's _still_ held: + # Force trigger task: schd.pool.force_trigger_tasks('1/foo', [1]) - assert foo.state.is_held is True - - # run_mode will always be simulation from test - # workflow before submit routine... - assert not foo.run_mode # ... but job submission will always change this to the correct mode: schd.task_job_mgr.submit_task_jobs( @@ -116,11 +94,45 @@ async def test_force_trigger_does_not_override_run_mode( [foo], schd.server.curve_auth, schd.server.client_pub_key_dir) + assert foo.run_mode == 'skip' +async def test_run_mode_skip_abides_by_held( + flow, + scheduler, + run, + complete +): + """Tasks with run mode = skip will continue to abide by the + is_held flag as normal. + + Taken from spec at + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md#proposal + """ + wid = flow({ + 'scheduling': {'graph': {'R1': 'foo'}}, + 'runtime': {'foo': {'run mode': 'skip'}} + }) + schd = scheduler(wid, run_mode="live", paused_start=False) + async with run(schd): + foo = schd.pool.get_tasks()[0] + assert foo.state.is_held is False + + # Hold task, check that it's held: + schd.pool.hold_tasks('1/foo') + assert foo.state.is_held is True + + # Run to completion, should happen if task isn't held: + with pytest.raises( + Exception, + match="Timeout waiting for workflow to shut down" + ): + await complete(schd, timeout=5) + + async def test_run_mode_override_from_broadcast( - flow, scheduler, run, complete, log_filter + flow, scheduler, start, complete, log_filter, capture_live_submissions ): """Test that run_mode modifications only apply to one task. """ @@ -136,7 +148,7 @@ async def test_run_mode_override_from_broadcast( id_ = flow(cfg) schd = scheduler(id_, run_mode='live', paused_start=False) - async with run(schd): + async with start(schd): schd.broadcast_mgr.put_broadcast( ['1000'], ['foo'], [{'run mode': 'skip'}]) @@ -147,6 +159,5 @@ async def test_run_mode_override_from_broadcast( [foo_1000, foo_1001], schd.server.curve_auth, schd.server.client_pub_key_dir) - assert foo_1000.run_mode == 'skip' - assert foo_1001.run_mode == 'live' + assert capture_live_submissions() == {'1001/foo'} diff --git a/tests/integration/run_modes/test_nonlive.py b/tests/integration/run_modes/test_nonlive.py index 42ddca128ce..973f39e3301 100644 --- a/tests/integration/run_modes/test_nonlive.py +++ b/tests/integration/run_modes/test_nonlive.py @@ -22,11 +22,11 @@ 'flow_nums': '[1]', 'is_manual_submit': 0, 'try_num': 1, - 'submit_status': None, + 'submit_status': 0, 'run_signal': None, - 'run_status': None, - 'platform_name': 'localhost', - 'job_runner_name': 'background', + 'run_status': 0, + 'platform_name': 'simulation', + 'job_runner_name': 'simulation', 'job_id': None}, 'skip': { 'flow_nums': '[1]', @@ -47,7 +47,7 @@ def not_time(data: Dict[str, Any]): return {k: v for k, v in data.items() if 'time' not in k} -async def test_task_jobs(flow, scheduler, start): +async def test_task_jobs(flow, scheduler, start, capture_live_submissions): """Ensure that task job data is added to the database correctly for each run mode. """ @@ -58,6 +58,8 @@ async def test_task_jobs(flow, scheduler, start): mode: {'run mode': mode} for mode in KGO} })) async with start(schd): + task_proxies = schd.pool.get_tasks() + schd.task_job_mgr.submit_task_jobs( schd.workflow, schd.pool.get_tasks(), @@ -88,8 +90,13 @@ async def test_task_jobs(flow, scheduler, start): assert taskdata == kgo, ( f'Mode {mode}: incorrect db entries.') + assert task_proxies[0].run_mode == 'simulation' + assert task_proxies[1].run_mode == 'skip' -async def test_mean_task_time(flow, scheduler, run, complete): + +async def test_mean_task_time( + flow, scheduler, start, complete, capture_live_submissions +): """Non-live tasks are not added to the list of task times, so skipping tasks will not affect how long Cylc expects tasks to run. """ @@ -100,21 +107,26 @@ async def test_mean_task_time(flow, scheduler, run, complete): 'graph': {'P1Y': 'foo'}} }), run_mode='live') - async with run(schd): - tasks = schd.pool.get_tasks() - tdef = tasks[0].tdef - assert list(tdef.elapsed_times) == [] + async with start(schd): + itask = schd.pool.get_tasks()[0] + assert list(itask.tdef.elapsed_times) == [] # Make the task run in skip mode at one cycle: schd.broadcast_mgr.put_broadcast( ['1000'], ['foo'], [{'run mode': 'skip'}]) + # Fake adding some other examples of the task: + itask.tdef.elapsed_times.extend([133.0, 132.4]) + # Submit two tasks: schd.task_job_mgr.submit_task_jobs( schd.workflow, - tasks[:2], + [itask], schd.server.curve_auth, schd.server.client_pub_key_dir ) - await complete(schd, '10010101T0000Z/foo') - assert len(tdef.elapsed_times) == 1 + + # Ensure that the skipped task has succeeded, and that the + # number of items in the elapsed_times has not changed. + assert itask.state.status == 'succeeded' + assert len(itask.tdef.elapsed_times) == 2 diff --git a/tests/integration/run_modes/test_simulation.py b/tests/integration/run_modes/test_simulation.py index 4c48a572b15..b8a42ff1a27 100644 --- a/tests/integration/run_modes/test_simulation.py +++ b/tests/integration/run_modes/test_simulation.py @@ -62,8 +62,8 @@ def _run_simjob(schd, point, task): itask = schd.pool.get_task(point, task) itask.state.is_queued = False monkeytime(0) - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') monkeytime(itask.mode_settings.timeout + 1) # Run Time Check @@ -170,8 +170,8 @@ def test_fail_once(sim_time_check_setup, itask, point, results, monkeypatch): for i, result in enumerate(results): itask.try_timers['execution-retry'].num = i - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert itask.mode_settings.sim_task_fails is result @@ -190,11 +190,11 @@ def test_task_finishes(sim_time_check_setup, monkeytime, caplog): fail_all_1066 = schd.pool.get_task(ISO8601Point('1066'), 'fail_all') fail_all_1066.state.status = 'running' fail_all_1066.state.is_queued = False - schd.task_job_mgr._nonlive_submit_task_jobs( - [fail_all_1066], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [fail_all_1066], 'simulation') # For the purpose of the test delete the started time set by - # _nonlive_submit_task_jobs. + # submit_nonlive_task_jobs. fail_all_1066.summary['started_time'] = 0 # Before simulation time is up: @@ -220,8 +220,8 @@ def test_task_sped_up(sim_time_check_setup, monkeytime): # Run the job submission method: monkeytime(0) - schd.task_job_mgr._nonlive_submit_task_jobs( - [fast_forward_1066], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [fast_forward_1066], 'simulation') fast_forward_1066.state.is_queued = False result = sim_time_check(schd.task_events_mgr, [fast_forward_1066], '') @@ -274,8 +274,8 @@ async def test_settings_restart( async with start(schd): og_timeouts = {} for itask in schd.pool.get_tasks(): - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') og_timeouts[itask.identity] = itask.mode_settings.timeout @@ -399,8 +399,8 @@ async def test_settings_broadcast( itask.state.is_queued = False # Submit the first - the sim task will fail: - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert itask.mode_settings.sim_task_fails is True # Let task finish. @@ -418,14 +418,14 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': ''} }]) # Submit again - result is different: - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert itask.mode_settings.sim_task_fails is False # Assert Clearing the broadcast works schd.broadcast_mgr.clear_broadcast() - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert itask.mode_settings.sim_task_fails is True # Assert that list of broadcasts doesn't change if we submit @@ -435,8 +435,8 @@ async def test_settings_broadcast( ['1066'], ['one'], [{ 'simulation': {'fail cycle points': 'higadfuhasgiurguj'} }]) - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert ( 'Invalid ISO 8601 date representation: higadfuhasgiurguj' in log.messages[-1]) @@ -449,8 +449,8 @@ async def test_settings_broadcast( ['1066'], ['one'], [{ 'simulation': {'fail cycle points': '1'} }]) - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert ( 'Invalid ISO 8601 date representation: 1' in log.messages[-1]) @@ -461,8 +461,8 @@ async def test_settings_broadcast( 'simulation': {'fail cycle points': '1945, 1977, 1066'}, 'execution retry delays': '3*PT2S' }]) - schd.task_job_mgr._nonlive_submit_task_jobs( - [itask], schd.workflow, 'simulation') + schd.task_job_mgr.submit_nonlive_task_jobs( + schd.workflow, [itask], 'simulation') assert itask.mode_settings.sim_task_fails is True assert itask.try_timers['execution-retry'].delays == [2.0, 2.0, 2.0] # n.b. rtconfig should remain unchanged, lest we cancel broadcasts: diff --git a/tests/integration/run_modes/test_skip.py b/tests/integration/run_modes/test_skip.py index bc9f29116f2..79ad573ba13 100644 --- a/tests/integration/run_modes/test_skip.py +++ b/tests/integration/run_modes/test_skip.py @@ -16,7 +16,6 @@ """Test for skip mode integration. """ - async def test_settings_override_from_broadcast( flow, scheduler, start, complete, log_filter ): @@ -78,15 +77,16 @@ async def test_broadcast_changes_set_skip_outputs( | The skip keyword should not be allowed in custom outputs. """ wid = flow({ - 'scheduling': {'graph': {'R1': 'foo:expect_this'}}, - 'runtime': {'foo': {'outputs': {'expect_this': 'some message'}}} + 'scheduling': {'graph': {'R1': 'foo:x?\nfoo:y?'}}, + 'runtime': {'foo': {'outputs': { + 'x': 'some message', 'y': 'another message'}}} }) schd = scheduler(wid, run_mode='live') async with start(schd): schd.broadcast_mgr.put_broadcast( ['1'], ['foo'], - [{'skip': {'outputs': 'expect_this'}}], + [{'skip': {'outputs': 'x'}}], ) foo, = schd.pool.get_tasks() schd.pool.set_prereqs_and_outputs( @@ -94,14 +94,18 @@ async def test_broadcast_changes_set_skip_outputs( foo_outputs = foo.state.outputs.get_completed_outputs() - assert 'expect_this' in foo_outputs - assert foo_outputs['expect_this'] == '(manually completed)' + assert foo_outputs == { + 'submitted': '(manually completed)', + 'started': '(manually completed)', + 'succeeded': '(manually completed)', + 'x': '(manually completed)'} async def test_skip_mode_outputs( flow, scheduler, reftest, ): - """Nearly a functional test of the output emission of skip mode tasks + """Skip mode can be configured by the `[runtime][][skip]` + section. Skip mode proposal point 2 https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md @@ -110,7 +114,7 @@ async def test_skip_mode_outputs( # By default, all required outputs will be generated # plus succeeded if success is optional: foo? & foo:required_out => success_if_optional & required_outs - + # The outputs submitted and started are always produced # and do not need to be defined in outputs: foo:submitted => submitted_always @@ -159,7 +163,7 @@ async def test_skip_mode_outputs( async def test_doesnt_release_held_tasks( - one_conf, flow, scheduler, start, log_filter + one_conf, flow, scheduler, run, log_filter, capture_live_submissions ): """Point 5 of the proposal https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md @@ -168,59 +172,30 @@ async def test_doesnt_release_held_tasks( | flag as normal. """ - schd = scheduler(flow(one_conf), run_mode='skip') - async with start(schd) as log: + one_conf['runtime'] = {'one': {'run mode': 'skip'}} + schd = scheduler(flow(one_conf), run_mode='live', paused_start=False) + async with run(schd) as log: itask = schd.pool.get_tasks()[0] msg = 'held tasks shoudn\'t {}' # Set task to held and check submission in skip mode doesn't happen: itask.state.is_held = True - schd.task_job_mgr.submit_task_jobs( - schd.workflow, - [itask], - schd.server.curve_auth, - schd.server.client_pub_key_dir, - run_mode=schd.get_run_mode() - ) + + # Relinquish contol to the main loop. + schd.release_queued_tasks() + assert not log_filter(log, contains='=> running'), msg.format('run') assert not log_filter(log, contains='=> succeeded'), msg.format( 'succeed') # Release held task and assert that it now skips successfully: schd.pool.release_held_tasks(['1/one']) - schd.task_job_mgr.submit_task_jobs( - schd.workflow, - [itask], - schd.server.curve_auth, - schd.server.client_pub_key_dir, - run_mode=schd.get_run_mode() - ) + schd.release_queued_tasks() + assert log_filter(log, contains='=> running'), msg.format('run') assert log_filter(log, contains='=> succeeded'), msg.format('succeed') -async def test_force_trigger_doesnt_change_mode( - flow, scheduler, run, complete -): - """Point 6 from the skip mode proposal - https://github.com/cylc/cylc-admin/blob/master/docs/proposal-skip-mode.md - - | Force-triggering a task will not override the run mode. - """ - wid = flow({ - 'scheduling': {'graph': {'R1': 'slow => skip'}}, - 'runtime': { - 'slow': {'script': 'sleep 6'}, - 'skip': {'script': 'exit 1', 'run mode': 'skip'} - } - }) - schd = scheduler(wid, run_mode='live', paused_start=False) - async with run(schd): - schd.pool.force_trigger_tasks(['1/skip'], [1]) - # This will timeout if the skip task has become live on triggering: - await complete(schd, '1/skip', timeout=6) - - async def test_prereqs_marked_satisfied_by_skip_mode( flow, scheduler, start, log_filter, complete ): @@ -232,10 +207,11 @@ async def test_prereqs_marked_satisfied_by_skip_mode( | rather than "satisfied naturally" for provenance reasons. """ schd = scheduler(flow({ - 'scheduling': {'graph': {'R1': 'foo => bar'}} - }), run_mode='skip') + 'scheduling': {'graph': {'R1': 'foo => bar'}}, + 'runtime': {'foo': {'run mode': 'skip'}} + }), run_mode='live') - async with start(schd) as log: + async with start(schd): foo, = schd.pool.get_tasks() schd.task_job_mgr.submit_task_jobs( schd.workflow, diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 3da32733ffc..fef15e3e3dc 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -115,10 +115,6 @@ def __make_scheduler(id_: str, **opts: Any) -> Scheduler: schd.workflow_db_mgr.on_workflow_shutdown() -def caplogprinter(caplog): - _ = [print(i) for i in caplog.messages] - - @asynccontextmanager async def _start_flow( caplog: Optional[pytest.LogCaptureFixture], @@ -128,8 +124,6 @@ async def _start_flow( """Start a scheduler but don't set the main loop running.""" if caplog: caplog.set_level(level, CYLC_LOG) - # Debug functionality - caplog.print = lambda: caplogprinter(caplog) await schd.install() @@ -160,8 +154,6 @@ async def _run_flow( """Start a scheduler and set the main loop running.""" if caplog: caplog.set_level(level, CYLC_LOG) - # Debug functionality - caplog.print = lambda: caplogprinter(caplog) await schd.install() diff --git a/tests/unit/run_modes/test_nonlive.py b/tests/unit/run_modes/test_nonlive_units.py similarity index 100% rename from tests/unit/run_modes/test_nonlive.py rename to tests/unit/run_modes/test_nonlive_units.py diff --git a/tests/unit/run_modes/test_simulation.py b/tests/unit/run_modes/test_simulation_units.py similarity index 100% rename from tests/unit/run_modes/test_simulation.py rename to tests/unit/run_modes/test_simulation_units.py diff --git a/tests/unit/run_modes/test_skip.py b/tests/unit/run_modes/test_skip_units.py similarity index 100% rename from tests/unit/run_modes/test_skip.py rename to tests/unit/run_modes/test_skip_units.py diff --git a/tests/unit/test_platforms.py b/tests/unit/test_platforms.py index 3167afabf70..89e5d2f19f8 100644 --- a/tests/unit/test_platforms.py +++ b/tests/unit/test_platforms.py @@ -34,7 +34,6 @@ PlatformLookupError, GlobalConfigError ) -from cylc.flow.task_state import RunMode PLATFORMS = { diff --git a/tests/unit/test_task_state.py b/tests/unit/test_task_state.py index 1a2041fcba5..2854bd60c29 100644 --- a/tests/unit/test_task_state.py +++ b/tests/unit/test_task_state.py @@ -19,9 +19,9 @@ from cylc.flow.taskdef import TaskDef from cylc.flow.cycling.integer import IntegerSequence, IntegerPoint +from cylc.flow.run_modes import RunMode, disable_task_event_handlers from cylc.flow.task_trigger import Dependency, TaskTrigger from cylc.flow.task_state import ( - RunMode, TaskState, TASK_STATUS_PREPARING, TASK_STATUS_SUBMIT_FAILED, @@ -147,4 +147,4 @@ def test_disable_task_event_handlers(itask_run_mode, disable_handlers, expect): 'skip': {'disable task event handlers': disable_handlers}}) ) # Check method: - assert RunMode.disable_task_event_handlers(itask) is expect + assert disable_task_event_handlers(itask) is expect