From f3f0e83a8afc211166cbe1e2bca629beb65078d1 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Tue, 7 Jan 2025 22:25:44 -0800 Subject: [PATCH 1/8] Scaffolding for workflow_manager object type --- .../defaults/base_workflow_manager_repos.yaml | 14 ++++++++ .../defaults/workflow_manager_repos.yaml | 14 ++++++++ lib/ramble/ramble/application.py | 23 +++++++++++++ lib/ramble/ramble/cmd/style.py | 7 ++++ lib/ramble/ramble/config.py | 4 +++ lib/ramble/ramble/namespace.py | 1 + lib/ramble/ramble/repository.py | 30 +++++++++++++++++ .../schema/base_workflow_manager_repos.py | 32 +++++++++++++++++++ .../ramble/schema/workflow_manager_repos.py | 32 +++++++++++++++++++ .../builtin/base_workflow_managers/.gitignore | 0 .../builtin/workflow_managers/.gitignore | 0 11 files changed, 157 insertions(+) create mode 100644 etc/ramble/defaults/base_workflow_manager_repos.yaml create mode 100644 etc/ramble/defaults/workflow_manager_repos.yaml create mode 100644 lib/ramble/ramble/schema/base_workflow_manager_repos.py create mode 100644 lib/ramble/ramble/schema/workflow_manager_repos.py create mode 100644 var/ramble/repos/builtin/base_workflow_managers/.gitignore create mode 100644 var/ramble/repos/builtin/workflow_managers/.gitignore diff --git a/etc/ramble/defaults/base_workflow_manager_repos.yaml b/etc/ramble/defaults/base_workflow_manager_repos.yaml new file mode 100644 index 000000000..38a87ffa7 --- /dev/null +++ b/etc/ramble/defaults/base_workflow_manager_repos.yaml @@ -0,0 +1,14 @@ +# ------------------------------------------------------------------------- +# This is the default ramble repository configuration. It includes the +# builtin ramble base workflow manager repository. +# +# Users can override these settings by editing the following files. +# +# Per-ramble-instance settings (overrides defaults): +# $RAMBLE_ROOT/etc/ramble/base_workflow_manager_repos.yaml +# +# Per-user settings (overrides default and site settings): +# ~/.ramble/base_workflow_manager_repos.yaml +# ------------------------------------------------------------------------- +base_workflow_manager_repos: + - $ramble/var/ramble/repos/builtin diff --git a/etc/ramble/defaults/workflow_manager_repos.yaml b/etc/ramble/defaults/workflow_manager_repos.yaml new file mode 100644 index 000000000..820030257 --- /dev/null +++ b/etc/ramble/defaults/workflow_manager_repos.yaml @@ -0,0 +1,14 @@ +# ------------------------------------------------------------------------- +# This is the default ramble repository configuration. It includes the +# builtin ramble application repository. +# +# Users can override these settings by editing the following files. +# +# Per-ramble-instance settings (overrides defaults): +# $RAMBLE_ROOT/etc/ramble/repos.yaml +# +# Per-user settings (overrides default and site settings): +# ~/.ramble/repos.yaml +# ------------------------------------------------------------------------- +workflow_manager_repos: + - $ramble/var/ramble/repos/builtin diff --git a/lib/ramble/ramble/application.py b/lib/ramble/ramble/application.py index 57c59e6e4..3c39383cd 100644 --- a/lib/ramble/ramble/application.py +++ b/lib/ramble/ramble/application.py @@ -181,6 +181,8 @@ def __init__(self, file_path): self.license_path = "" self.license_file = "" + self.workflow_manager = None + ramble.util.directives.define_directive_methods(self) def experiment_lock(self): @@ -236,7 +238,10 @@ def set_variants(self, variants): experiment. """ self.variants = variants.copy() + self._set_package_manager() + self._set_workflow_manager() + def _set_package_manager(self): if namespace.package_manager in self.variants: pkgman_name = self.expander.expand_var( self.variants[namespace.package_manager], typed=True @@ -277,6 +282,24 @@ def set_variants(self, variants): } ) + def _set_workflow_manager(self): + if namespace.workflow_manager in self.variants: + workflow_name = self.expander.expand_var( + self.variants[namespace.workflow_manager], typed=True + ) + + if workflow_name is not None: + try: + wfman_type = ramble.repository.ObjectTypes.workflow_managers + self.workflow_manager = ramble.repository.get(workflow_name, wfman_type).copy() + self.workflow_manager.set_application(self) + except ramble.repository.UnknownObjectError: + logger.die( + f"{workflow_name} is not a valid workflow manager. " + "Valid workflow managers can be listed via:\n" + "\tramble list --type workflow_managers" + ) + def build_phase_order(self): if self._pipeline_graphs is not None: return diff --git a/lib/ramble/ramble/cmd/style.py b/lib/ramble/ramble/cmd/style.py index 4c5ca394c..925676624 100644 --- a/lib/ramble/ramble/cmd/style.py +++ b/lib/ramble/ramble/cmd/style.py @@ -94,6 +94,13 @@ def is_object(f): "F403": [r"^from ramble.pkgmankit import \*$"], **common_object_exemptions, }, + # exemptions applied only to workflow_manager.py files. + r"workflow_manager.py$": { + # Allow 'from ramble.modkit import *' in workflow_managers, + # but no other wildcards + "F403": [r"^from ramble.wmkit import \*$"], + **common_object_exemptions, + }, # exemptions applied to all files. r".py$": { "E501": [ diff --git a/lib/ramble/ramble/config.py b/lib/ramble/ramble/config.py index b27b8354a..d22738931 100644 --- a/lib/ramble/ramble/config.py +++ b/lib/ramble/ramble/config.py @@ -69,9 +69,11 @@ import ramble.schema.repos import ramble.schema.modifier_repos import ramble.schema.package_manager_repos +import ramble.schema.workflow_manager_repos import ramble.schema.base_application_repos import ramble.schema.base_modifier_repos import ramble.schema.base_package_manager_repos +import ramble.schema.base_workflow_manager_repos from ramble.error import RambleError from ramble.util.logger import logger @@ -99,9 +101,11 @@ "repos": ramble.schema.repos.schema, "modifier_repos": ramble.schema.modifier_repos.schema, "package_manager_repos": ramble.schema.package_manager_repos.schema, + "workflow_manager_repos": ramble.schema.workflow_manager_repos.schema, "base_application_repos": ramble.schema.base_application_repos.schema, "base_modifier_repos": ramble.schema.base_modifier_repos.schema, "base_package_manager_repos": ramble.schema.base_package_manager_repos.schema, + "base_workflow_manager_repos": ramble.schema.base_workflow_manager_repos.schema, } # Same as above, but including keys for workspaces diff --git a/lib/ramble/ramble/namespace.py b/lib/ramble/ramble/namespace.py index 6d0771b13..156ac06a9 100644 --- a/lib/ramble/ramble/namespace.py +++ b/lib/ramble/ramble/namespace.py @@ -62,5 +62,6 @@ class namespace: # For variants package_manager = "package_manager" + workflow_manager = "workflow_manager" metadata = "metadata" diff --git a/lib/ramble/ramble/repository.py b/lib/ramble/ramble/repository.py index b3d1f80c7..e270c7fa3 100644 --- a/lib/ramble/ramble/repository.py +++ b/lib/ramble/ramble/repository.py @@ -62,9 +62,11 @@ "applications", "modifiers", "package_managers", + "workflow_managers", "base_applications", "base_modifiers", "base_package_managers", + "base_workflow_managers", ], ) @@ -99,6 +101,14 @@ "accepted_configs": ["package_manager_repo.yaml", unified_config], "singular": "package manager", }, + ObjectTypes.workflow_managers: { + "file_name": "workflow_manager.py", + "dir_name": "workflow_managers", + "abbrev": "wm", + "config_section": "workflow_manager_repos", + "accepted_configs": ["workflow_manager_repo.yaml", unified_config], + "singular": "workflow manager", + }, ObjectTypes.base_applications: { "file_name": "base_application.py", "dir_name": "base_applications", @@ -123,6 +133,14 @@ "accepted_configs": ["base_package_manager_repo.yaml", unified_config], "singular": "base package manager", }, + ObjectTypes.base_workflow_managers: { + "file_name": "base_workflow_manager.py", + "dir_name": "base_workflow_managers", + "abbrev": "base_wm", + "config_section": "base_workflow_manager_repos", + "accepted_configs": ["base_workflow_manager_repo.yaml", unified_config], + "singular": "base workflow manager", + }, } @@ -141,6 +159,11 @@ def _package_managers(repo_dirs=None): return _gen_path(repo_dirs=repo_dirs, obj_type=ObjectTypes.package_managers) +def _workflow_managers(repo_dirs=None): + """Get the workflow managers singleton RepoPath instance for Ramble.""" + return _gen_path(repo_dirs=repo_dirs, obj_type=ObjectTypes.workflow_managers) + + def _base_apps(repo_dirs=None): """Get the base applications singleton RepoPath instance for Ramble.""" return _gen_path(repo_dirs=repo_dirs, obj_type=ObjectTypes.base_applications) @@ -156,13 +179,20 @@ def _base_package_managers(repo_dirs=None): return _gen_path(repo_dirs=repo_dirs, obj_type=ObjectTypes.base_package_managers) +def _base_workflow_managers(repo_dirs=None): + """Get the base workflow managers singleton RepoPath instance for Ramble.""" + return _gen_path(repo_dirs=repo_dirs, obj_type=ObjectTypes.base_workflow_managers) + + paths = { ObjectTypes.applications: llnl.util.lang.Singleton(_apps), ObjectTypes.modifiers: llnl.util.lang.Singleton(_mods), ObjectTypes.package_managers: llnl.util.lang.Singleton(_package_managers), + ObjectTypes.workflow_managers: llnl.util.lang.Singleton(_workflow_managers), ObjectTypes.base_applications: llnl.util.lang.Singleton(_base_apps), ObjectTypes.base_modifiers: llnl.util.lang.Singleton(_base_mods), ObjectTypes.base_package_managers: llnl.util.lang.Singleton(_base_package_managers), + ObjectTypes.base_workflow_managers: llnl.util.lang.Singleton(_base_workflow_managers), } ##################################### diff --git a/lib/ramble/ramble/schema/base_workflow_manager_repos.py b/lib/ramble/ramble/schema/base_workflow_manager_repos.py new file mode 100644 index 000000000..5b8ee733a --- /dev/null +++ b/lib/ramble/ramble/schema/base_workflow_manager_repos.py @@ -0,0 +1,32 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +"""Schema for base_workflow_manager_repos.yaml configuration file. +.. literalinclude:: _ramble_root/lib/ramble/ramble/schema/base_workflow_manager_repos.py + :lines: 13- +""" + + +#: Properties for inclusion in other schemas +properties = { + "base_workflow_manager_repos": { + "type": "array", + "default": [], + "items": {"type": "string"}, + }, +} + + +#: Full schema with metadata +schema = { + "$schema": "http://json-schema.org/schema#", + "title": "Ramble base workflow manager repository configuration file schema", + "type": "object", + "additionalProperties": False, + "properties": properties, +} diff --git a/lib/ramble/ramble/schema/workflow_manager_repos.py b/lib/ramble/ramble/schema/workflow_manager_repos.py new file mode 100644 index 000000000..88c7e075d --- /dev/null +++ b/lib/ramble/ramble/schema/workflow_manager_repos.py @@ -0,0 +1,32 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +"""Schema for workflow_manager_repos.yaml configuration file. +.. literalinclude:: _ramble_root/lib/ramble/ramble/schema/workflow_manager_repos.py + :lines: 13- +""" + + +#: Properties for inclusion in other schemas +properties = { + "workflow_manager_repos": { + "type": "array", + "default": [], + "items": {"type": "string"}, + }, +} + + +#: Full schema with metadata +schema = { + "$schema": "http://json-schema.org/schema#", + "title": "Ramble workflow manager repository configuration file schema", + "type": "object", + "additionalProperties": False, + "properties": properties, +} diff --git a/var/ramble/repos/builtin/base_workflow_managers/.gitignore b/var/ramble/repos/builtin/base_workflow_managers/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/var/ramble/repos/builtin/workflow_managers/.gitignore b/var/ramble/repos/builtin/workflow_managers/.gitignore new file mode 100644 index 000000000..e69de29bb From 364c5c39146264638bf178e848c9ced5005cf0ff Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Tue, 7 Jan 2025 23:08:30 -0800 Subject: [PATCH 2/8] Implement slurm workflow manager --- lib/ramble/ramble/application.py | 14 ++ lib/ramble/ramble/cmd/common/info.py | 2 + lib/ramble/ramble/language/language_base.py | 2 +- .../language/workflow_manager_language.py | 47 +++++ lib/ramble/ramble/wmkit.py | 23 +++ lib/ramble/ramble/workflow_manager.py | 80 ++++++++ .../builtin/workflow_managers/.gitignore | 0 .../workflow_managers/slurm/batch_cancel.tpl | 10 + .../workflow_managers/slurm/batch_query.tpl | 27 +++ .../workflow_managers/slurm/batch_submit.tpl | 2 + .../slurm/slurm_execute_experiment.tpl | 8 + .../slurm/workflow_manager.py | 175 ++++++++++++++++++ 12 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 lib/ramble/ramble/language/workflow_manager_language.py create mode 100644 lib/ramble/ramble/wmkit.py create mode 100644 lib/ramble/ramble/workflow_manager.py delete mode 100644 var/ramble/repos/builtin/workflow_managers/.gitignore create mode 100644 var/ramble/repos/builtin/workflow_managers/slurm/batch_cancel.tpl create mode 100644 var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl create mode 100644 var/ramble/repos/builtin/workflow_managers/slurm/batch_submit.tpl create mode 100644 var/ramble/repos/builtin/workflow_managers/slurm/slurm_execute_experiment.tpl create mode 100644 var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py diff --git a/lib/ramble/ramble/application.py b/lib/ramble/ramble/application.py index 3c39383cd..f2c4fb2e2 100644 --- a/lib/ramble/ramble/application.py +++ b/lib/ramble/ramble/application.py @@ -502,6 +502,10 @@ def build_used_variables(self, workspace): for var in self.package_manager.package_manager_variables.values(): self.variables[var.name] = var.default + if self.workflow_manager is not None: + for var in self.workflow_manager.wm_vars.values(): + self.variables[var.name] = var.default + ########################################## # Expand used variables to track all usage ########################################## @@ -965,6 +969,9 @@ def _set_default_experiment_variables(self): for mod_inst in self._modifier_instances: var_sets.append(mod_inst.mode_variables()) + if self.workflow_manager is not None: + var_sets.append(self.workflow_manager.wm_vars) + for var_set in var_sets: for var, val in var_set.items(): if var not in self.variables.keys(): @@ -2318,6 +2325,13 @@ def _get_template_config( tpl_config, obj_type=ramble.repository.ObjectTypes.package_managers, ) + if self.workflow_manager is not None: + for tpl_config in self.workflow_manager.templates.values(): + yield _get_template_config( + self.workflow_manager, + tpl_config, + obj_type=ramble.repository.ObjectTypes.workflow_managers, + ) def _render_object_templates(self, extra_vars): run_dir = self.expander.experiment_run_dir diff --git a/lib/ramble/ramble/cmd/common/info.py b/lib/ramble/ramble/cmd/common/info.py index e494a4d93..fa2677e14 100644 --- a/lib/ramble/ramble/cmd/common/info.py +++ b/lib/ramble/ramble/cmd/common/info.py @@ -55,6 +55,8 @@ "package_manager_requirements": None, # Package manager specific: "package_manager_variables": None, + # Workflow manager specific: + "workflow_manager_variables": "wm_vars", } diff --git a/lib/ramble/ramble/language/language_base.py b/lib/ramble/ramble/language/language_base.py index 7b2bfe10d..001e80a41 100644 --- a/lib/ramble/ramble/language/language_base.py +++ b/lib/ramble/ramble/language/language_base.py @@ -27,7 +27,7 @@ #: them reserved_names = [] -namespaces = ["ramble.app", "ramble.mod", "ramble.pkg_man", "ramble.package_manager"] +namespaces = ["ramble.app", "ramble.mod", "ramble.pkg_man", "ramble.package_manager", "ramble.wm"] class DirectiveMeta(type): diff --git a/lib/ramble/ramble/language/workflow_manager_language.py b/lib/ramble/ramble/language/workflow_manager_language.py new file mode 100644 index 000000000..fa16fb8a6 --- /dev/null +++ b/lib/ramble/ramble/language/workflow_manager_language.py @@ -0,0 +1,47 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +from typing import Optional + +import ramble.language.shared_language + + +class WorkflowManagerMeta(ramble.language.shared_language.SharedMeta): + _directive_names = set() + _directives_to_be_executed = [] + + +workflow_manager_directive = WorkflowManagerMeta.directive + + +@workflow_manager_directive("wm_vars") +def workflow_manager_variable( + name: str, + default, + description: str, + values: Optional[list] = None, +): + """Define a variable for this wm + Args: + name: Name of variable + default: Default value if the variable is not defined + description: Description of the variable + values: Optional list of suggested values for this variable + """ + + def _define_wm_variable(wm): + import ramble.workload + + wm.wm_vars[name] = ramble.workload.WorkloadVariable( + name, + default=default, + description=description, + values=values, + ) + + return _define_wm_variable diff --git a/lib/ramble/ramble/wmkit.py b/lib/ramble/ramble/wmkit.py new file mode 100644 index 000000000..109b931a5 --- /dev/null +++ b/lib/ramble/ramble/wmkit.py @@ -0,0 +1,23 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +# flake8: noqa: F401 +"""wmkit is a set of useful modules to import when writing workflow managers +""" + +from ramble.language.workflow_manager_language import * +from ramble.language.shared_language import * + +from ramble.workflow_manager import WorkflowManagerBase + +from ramble.util.command_runner import ( + CommandRunner, + RunnerError, +) + +from ramble.util.logger import logger diff --git a/lib/ramble/ramble/workflow_manager.py b/lib/ramble/ramble/workflow_manager.py new file mode 100644 index 000000000..8de70c590 --- /dev/null +++ b/lib/ramble/ramble/workflow_manager.py @@ -0,0 +1,80 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. +"""Define base classes for workflow manager definitions""" + +from typing import List + +from ramble.language.workflow_manager_language import WorkflowManagerMeta +from ramble.language.shared_language import SharedMeta +from ramble.util.naming import NS_SEPARATOR +import ramble.util.class_attributes +import ramble.util.directives +from ramble.expander import ExpanderError + + +class WorkflowManagerBase(metaclass=WorkflowManagerMeta): + name = None + _builtin_name = NS_SEPARATOR.join(("workflow_manager_builtin", "{obj_name}", "{name}")) + _language_classes = [WorkflowManagerMeta, SharedMeta] + _pipelines = [ + "analyze", + "setup", + "execute", + ] + maintainers: List[str] = [] + tags: List[str] = [] + + def __init__(self, file_path): + super().__init__() + + ramble.util.class_attributes.convert_class_attributes(self) + + self._file_path = file_path + + ramble.util.directives.define_directive_methods(self) + + self.app_inst = None + self.runner = None + + def set_application(self, app_inst): + """Set a reference to the associated app_inst""" + self.app_inst = app_inst + + def get_status(self, workspace): + """Return status of a given job""" + return None + + def conditional_expand(self, templates): + """Return a (potentially empty) list of expanded strings + + Args: + templates: A list of templates to expand. + If the template cannot be fully expanded, it's skipped. + Returns: + A list of expanded strings + """ + expander = self.app_inst.expander + expanded = [] + for tpl in templates: + try: + rendered = expander.expand_var(tpl, allow_passthrough=False) + if rendered: + expanded.append(rendered) + except ExpanderError: + # Skip a particular entry if any of the vars are not defined + continue + return expanded + + def copy(self): + """Deep copy a workflow manager instance""" + new_copy = type(self)(self._file_path) + + return new_copy + + def __str__(self): + return self.name diff --git a/var/ramble/repos/builtin/workflow_managers/.gitignore b/var/ramble/repos/builtin/workflow_managers/.gitignore deleted file mode 100644 index e69de29bb..000000000 diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/batch_cancel.tpl b/var/ramble/repos/builtin/workflow_managers/slurm/batch_cancel.tpl new file mode 100644 index 000000000..862707d4c --- /dev/null +++ b/var/ramble/repos/builtin/workflow_managers/slurm/batch_cancel.tpl @@ -0,0 +1,10 @@ +#!/bin/bash + +# Ensure job_id is present +job_id=$(< {experiment_run_dir}/.slurm_job) +if [ -z "${job_id:-}" ]; then + echo "No valid job_id found" 1>&2 + exit 1 +fi + +scancel ${job_id} diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl b/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl new file mode 100644 index 000000000..ff462102b --- /dev/null +++ b/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl @@ -0,0 +1,27 @@ +#!/bin/bash + +# Ensure job_id is present +job_id=$(< {experiment_run_dir}/.slurm_job) +if [ -z "${job_id:-}" ]; then + echo "No valid job_id found" 1>&2 + exit 1 +fi + +status=$(squeue -h -o "%t" -j "${job_id}" 2>/dev/null) +if [ -z "$status" ]; then + status=$(sacct -j "${job_id}" -o state -X -n | xargs) +fi +if [ ! -z "$status" ]; then + # Define a mapping between sacct/squeue status to ramble counterpart + declare -A status_map + status_map["PD"]="SETUP" + status_map["R"]="RUNNING" + status_map["CF"]="SETUP" + status_map["CG"]="COMPLETE" + status_map["COMPLETED"]="COMPLETE" + status_map["CANCELLED+"]="CANCELLED" + if [ -v status_map["$status"] ]; then + status=${status_map["$status"]} + fi +fi +echo "job {job_name} with id ${job_id} has status: $status" diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/batch_submit.tpl b/var/ramble/repos/builtin/workflow_managers/slurm/batch_submit.tpl new file mode 100644 index 000000000..e42c1493a --- /dev/null +++ b/var/ramble/repos/builtin/workflow_managers/slurm/batch_submit.tpl @@ -0,0 +1,2 @@ +#!/bin/bash +sbatch {slurm_execute_experiment} | tee >(awk '{print $NF}' > {experiment_run_dir}/.slurm_job) diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/slurm_execute_experiment.tpl b/var/ramble/repos/builtin/workflow_managers/slurm/slurm_execute_experiment.tpl new file mode 100644 index 000000000..2b3507a3d --- /dev/null +++ b/var/ramble/repos/builtin/workflow_managers/slurm/slurm_execute_experiment.tpl @@ -0,0 +1,8 @@ +#!/bin/bash +{sbatch_headers_str} + +cd {experiment_run_dir} + +scontrol show hostnames > {experiment_run_dir}/hostfile + +{command} diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py new file mode 100644 index 000000000..75cdc4c12 --- /dev/null +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -0,0 +1,175 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +import os + +from ramble.wmkit import * +from ramble.expander import ExpanderError +from ramble.application import experiment_status + +from spack.util.executable import ProcessError + +# Mapping from squeue/sacct status to Ramble status +_STATUS_MAP = { + "PD": "SETUP", + "R": "RUNNING", + "CF": "SETUP", + "CG": "COMPLETE", + "COMPLETED": "COMPLETE", + "CANCELLED": "CANCELLED", + "CANCELLED+": "CANCELLED", +} + +_ensure_job_id_snippet = r""" +job_id=$(< {experiment_run_dir}/.slurm_job) +if [ -z "${job_id:-}" ]; then + echo "No valid job_id found" 1>&2 + exit 1 +fi +""" + + +class Slurm(WorkflowManagerBase): + """Slurm workflow manager""" + + name = "slurm" + + maintainers("linsword13") + + tags("workflow", "slurm") + + def __init__(self, file_path): + super().__init__(file_path) + + self.runner = SlurmRunner() + + workflow_manager_variable( + name="job_name", + default="{application_name}_{workload_name}_{experiment_name}", + description="Slurm job name", + ) + + workflow_manager_variable( + name="extra_sbatch_headers", + default="", + description="Extra sbatch headers added to slurm job script", + ) + + workflow_manager_variable( + name="hostlist", + default="$SLURM_JOB_NODELIST", + description="hostlist variable used by various modifiers", + ) + + register_template( + name="batch_submit", + src_name="batch_submit.tpl", + dest_name="batch_submit", + ) + + register_template( + name="batch_query", src_name="batch_query.tpl", dest_name="batch_query" + ) + + register_template( + name="batch_cancel", + src_name="batch_cancel.tpl", + dest_name="batch_cancel", + ) + + register_template( + name="slurm_execute_experiment", + src_name="slurm_execute_experiment.tpl", + dest_name="slurm_execute_experiment", + extra_vars_func="execute_vars", + ) + + def _execute_vars(self): + expander = self.app_inst.expander + # Adding pre-defined and custom headers + pragmas = [ + ("#SBATCH -N {n_nodes}"), + ("#SBATCH -p {partition}"), + ("#SBATCH --ntasks-per-node {processes_per_node}"), + ("#SBATCH -J {job_name}"), + ("#SBATCH -o {experiment_run_dir}/slurm-%j.out"), + ("#SBATCH -e {experiment_run_dir}/slurm-%j.err"), + ("#SBATCH --gpus-per-node {gpus_per_node}"), + ] + try: + extra_sbatch_headers_raw = expander.expand_var_name( + "extra_sbatch_headers", allow_passthrough=False + ) + extra_sbatch_headers = extra_sbatch_headers_raw.strip().split("\n") + pragmas = pragmas + extra_sbatch_headers + except ExpanderError: + pass + header_str = "\n".join(self.conditional_expand(pragmas)) + return {"sbatch_headers_str": header_str} + + def get_status(self, workspace): + expander = self.app_inst.expander + run_dir = expander.expand_var_name("experiment_run_dir") + job_id_file = os.path.join(run_dir, ".slurm_job") + status = experiment_status.UNKNOWN + if not os.path.isfile(job_id_file): + logger.warn("job_id file is missing") + return status + with open(job_id_file) as f: + job_id = f.read().strip() + self.runner.set_dry_run(workspace.dry_run) + wm_status_raw = self.runner.get_status(job_id) + wm_status = _STATUS_MAP.get(wm_status_raw) + if wm_status is not None and hasattr(experiment_status, wm_status): + status = getattr(experiment_status, wm_status) + return status + + +class SlurmRunner: + """Runner for executing slurm commands""" + + def __init__(self, dry_run=False): + self.dry_run = dry_run + self.squeue_runner = None + self.sacct_runner = None + self.run_dir = None + + def _ensure_runner(self, runner_name: str): + attr = f"{runner_name}_runner" + if getattr(self, attr) is None: + setattr( + self, + attr, + CommandRunner(name=runner_name, command=runner_name), + ) + + def set_dry_run(self, dry_run=False): + """ + Set the dry_run state of this runner + """ + self.dry_run = dry_run + + def get_status(self, job_id): + if self.dry_run: + return None + self._ensure_runner("squeue") + squeue_args = ["-h", "-o", "%t", "-j", job_id] + try: + status_out = self.squeue_runner.command( + *squeue_args, output=str, error=os.devnull + ) + except ProcessError as e: + status_out = "" + logger.debug( + f"squeue returns error {e}. This is normal if the job has already been completed." + ) + if not status_out: + self._ensure_runner("sacct") + sacct_args = ["-o", "state", "-X", "-n", "-j", job_id] + status_out = self.sacct_runner.command(*sacct_args, output=str) + return status_out.strip() From 168f00a53b9c0cbc9d46916ce56b16a02f86722e Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Wed, 8 Jan 2025 23:06:17 -0800 Subject: [PATCH 3/8] Include in workflow status in workspace analyze --- lib/ramble/ramble/application.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/ramble/ramble/application.py b/lib/ramble/ramble/application.py index f2c4fb2e2..edef93aac 100644 --- a/lib/ramble/ramble/application.py +++ b/lib/ramble/ramble/application.py @@ -65,7 +65,8 @@ from enum import Enum experiment_status = Enum( - "experiment_status", ["UNKNOWN", "SETUP", "RUNNING", "COMPLETE", "SUCCESS", "FAILED"] + "experiment_status", + ["UNKNOWN", "SETUP", "RUNNING", "COMPLETE", "SUCCESS", "FAILED", "CANCELLED"], ) _NULL_CONTEXT = "null" @@ -1745,10 +1746,13 @@ def format_context(context_match, context_format): success = True success = success and criteria_list.passed() - if success: - self.set_status(status=experiment_status.SUCCESS) - else: - self.set_status(status=experiment_status.FAILED) + status = experiment_status.SUCCESS if success else experiment_status.FAILED + # When workflow_manager is present, only use app_status when workflow is completed. + if self.workflow_manager is not None: + wm_status = self.workflow_manager.get_status(workspace) + if not wm_status == experiment_status.COMPLETE: + status = wm_status + self.set_status(status) self._init_result() From 3ba58bbb41c98cfd0f6fc3aee74b4bf4a33b7d44 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Wed, 8 Jan 2025 23:07:17 -0800 Subject: [PATCH 4/8] Add tests --- lib/ramble/ramble/test/conftest.py | 60 +++++++++--- .../config/base_workflow_manager_repos.yaml | 2 + .../data/config/workflow_manager_repos.yaml | 2 + .../slurm_workflow_manager.py | 91 +++++++++++++++++++ 4 files changed, 142 insertions(+), 13 deletions(-) create mode 100644 lib/ramble/ramble/test/data/config/base_workflow_manager_repos.yaml create mode 100644 lib/ramble/ramble/test/data/config/workflow_manager_repos.yaml create mode 100644 lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py diff --git a/lib/ramble/ramble/test/conftest.py b/lib/ramble/ramble/test/conftest.py index b5e686390..a9dd8a3fb 100644 --- a/lib/ramble/ramble/test/conftest.py +++ b/lib/ramble/ramble/test/conftest.py @@ -148,6 +148,12 @@ def mock_pkg_mans_repo_path(): yield ramble.repository.Repo(ramble.paths.mock_builtin_path, obj_type) +@pytest.fixture(scope="function") +def mock_wms_repo_path(): + obj_type = ramble.repository.ObjectTypes.workflow_managers + yield ramble.repository.Repo(ramble.paths.mock_builtin_path, obj_type) + + @pytest.fixture(scope="function") def mutable_apps_repo_path(): obj_type = ramble.repository.ObjectTypes.applications @@ -166,6 +172,12 @@ def mutable_pkg_mans_repo_path(): yield ramble.repository.Repo(ramble.paths.builtin_path, obj_type) +@pytest.fixture(scope="function") +def mutable_wms_repo_path(): + obj_type = ramble.repository.ObjectTypes.workflow_managers + yield ramble.repository.Repo(ramble.paths.builtin_path, obj_type) + + @pytest.fixture(scope="function") def mock_applications(mock_apps_repo_path): """Use the 'builtin.mock' repository for applications instead of 'builtin'""" @@ -187,18 +199,25 @@ def mock_modifiers(mock_mods_repo_path): @pytest.fixture(scope="function") -def mock_package_managers(mock_mods_repo_path): +def mock_package_managers(mock_pkg_mans_repo_path): """Use the 'builtin.mock' repository for package managers of 'builtin'""" obj_type = ramble.repository.ObjectTypes.package_managers with ramble.repository.use_repositories( - mock_mods_repo_path, object_type=obj_type - ) as mock_mods_repo: - yield mock_mods_repo + mock_pkg_mans_repo_path, object_type=obj_type + ) as mock_pkg_mans_repo: + yield mock_pkg_mans_repo + + +@pytest.fixture(scope="function") +def mock_workflow_managers(mock_wms_repo_path): + """Use the 'builtin.mock' repository for package managers of 'builtin'""" + obj_type = ramble.repository.ObjectTypes.workflow_managers + with ramble.repository.use_repositories(mock_wms_repo_path, object_type=obj_type) as mock_repo: + yield mock_repo @pytest.fixture(scope="function") def mutable_applications(mutable_apps_repo_path): - """Use the 'builtin.mock' repository for applications instead of 'builtin'""" obj_type = ramble.repository.ObjectTypes.applications with ramble.repository.use_repositories( mutable_apps_repo_path, object_type=obj_type @@ -208,7 +227,6 @@ def mutable_applications(mutable_apps_repo_path): @pytest.fixture(scope="function") def mutable_modifiers(mutable_mods_repo_path): - """Use the 'builtin.mock' repository for modifiers instead of 'builtin'""" obj_type = ramble.repository.ObjectTypes.modifiers with ramble.repository.use_repositories( mutable_mods_repo_path, object_type=obj_type @@ -216,14 +234,21 @@ def mutable_modifiers(mutable_mods_repo_path): yield mods_repo -@pytest.fixture(scope="function") -def mutable_package_managers(mutable_mods_repo_path): - """Use the 'builtin.mock' repository for package_mangers instead of 'builtin'""" +def mutable_package_managers(mutable_pkg_mans_repo_path): obj_type = ramble.repository.ObjectTypes.package_managers with ramble.repository.use_repositories( - mutable_mods_repo_path, object_type=obj_type - ) as mods_repo: - yield mods_repo + mutable_pkg_mans_repo_path, object_type=obj_type + ) as pkg_mans_repo: + yield pkg_mans_repo + + +@pytest.fixture(scope="function") +def mutable_workflow_managers(mutable_wms_repo_path): + obj_type = ramble.repository.ObjectTypes.workflow_managers + with ramble.repository.use_repositories( + mutable_wms_repo_path, object_type=obj_type + ) as wms_repo: + yield wms_repo @pytest.fixture(scope="function") @@ -245,7 +270,7 @@ def mutable_mock_mods_repo(mock_mods_repo_path): @pytest.fixture(scope="function") -def mutable_mock_pkg_mans_repo(mock_mods_repo_path): +def mutable_mock_pkg_mans_repo(mock_pkg_mans_repo_path): """Function-scoped mock package managers, for tests that need to modify them.""" obj_type = ramble.repository.ObjectTypes.package_managers mock_repo = ramble.repository.Repo(ramble.paths.mock_builtin_path, object_type=obj_type) @@ -253,6 +278,15 @@ def mutable_mock_pkg_mans_repo(mock_mods_repo_path): yield mock_repo_path +@pytest.fixture(scope="function") +def mutable_mock_wms_repo(mock_wms_repo_path): + """Function-scoped mock package managers, for tests that need to modify them.""" + obj_type = ramble.repository.ObjectTypes.workflow_managers + mock_repo = ramble.repository.Repo(ramble.paths.mock_builtin_path, object_type=obj_type) + with ramble.repository.use_repositories(mock_repo, object_type=obj_type) as mock_repo_path: + yield mock_repo_path + + @pytest.fixture(scope="function") def default_config(): """Isolates the default configuration from the user configs. diff --git a/lib/ramble/ramble/test/data/config/base_workflow_manager_repos.yaml b/lib/ramble/ramble/test/data/config/base_workflow_manager_repos.yaml new file mode 100644 index 000000000..70fd089b5 --- /dev/null +++ b/lib/ramble/ramble/test/data/config/base_workflow_manager_repos.yaml @@ -0,0 +1,2 @@ +base_workflow_manager_repos: + - $ramble/var/ramble/repos/builtin diff --git a/lib/ramble/ramble/test/data/config/workflow_manager_repos.yaml b/lib/ramble/ramble/test/data/config/workflow_manager_repos.yaml new file mode 100644 index 000000000..dd5a5ed44 --- /dev/null +++ b/lib/ramble/ramble/test/data/config/workflow_manager_repos.yaml @@ -0,0 +1,2 @@ +workflow_manager_repos: + - $ramble/var/ramble/repos/builtin diff --git a/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py new file mode 100644 index 000000000..a2aae7605 --- /dev/null +++ b/lib/ramble/ramble/test/workflow_manager_functionality/slurm_workflow_manager.py @@ -0,0 +1,91 @@ +# Copyright 2022-2025 The Ramble Authors +# +# Licensed under the Apache License, Version 2.0 or the MIT license +# , at your +# option. This file may not be copied, modified, or distributed +# except according to those terms. + +import os + +import pytest + +import ramble.workspace +from ramble.main import RambleCommand + +workspace = RambleCommand("workspace") + +pytestmark = pytest.mark.usefixtures( + "mutable_config", + "mutable_mock_workspace_path", +) + + +def test_slurm_workflow(): + workspace_name = "test_slurm_workflow" + + test_config = """ +ramble: + variants: + workflow_manager: '{wm_name}' + variables: + # This batch_submit is overridden with slurm workflow manager + batch_submit: echo {wm_name} + mpi_command: mpirun -n {n_ranks} -hostfile hostfile + processes_per_node: 1 + wm_name: ['None', 'slurm'] + applications: + hostname: + workloads: + local: + experiments: + test_{wm_name}: + variables: + n_nodes: 1 + extra_sbatch_headers: "#SBATCH --gpus-per-task={n_threads}" +""" + with ramble.workspace.create(workspace_name) as ws: + ws.write() + config_path = os.path.join(ws.config_dir, ramble.workspace.config_file_name) + with open(config_path, "w+") as f: + f.write(test_config) + ws._re_read() + workspace("setup", "--dry-run", global_args=["-D", ws.root]) + + # assert the batch_submit is overridden, pointing to the generated script + all_exec_file = os.path.join(ws.root, "all_experiments") + with open(all_exec_file) as f: + content = f.read() + assert "echo None" in content + assert "echo slurm" not in content + assert os.path.join("hostname", "local", "test_slurm", "batch_submit") in content + + # Assert on no workflow manager + path = os.path.join(ws.experiment_dir, "hostname", "local", "test_None") + files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] + assert "slurm_execute_experiment" not in files + assert "batch_submit" not in files + assert "batch_query" not in files + assert "batch_cancel" not in files + + # Assert on slurm workflow manager + path = os.path.join(ws.experiment_dir, "hostname", "local", "test_slurm") + files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] + assert "slurm_execute_experiment" in files + assert "batch_submit" in files + assert "batch_query" in files + assert "batch_cancel" in files + with open(os.path.join(path, "batch_submit")) as f: + content = f.read() + assert "slurm_execute_experiment" in content + assert ".slurm_job" in content + with open(os.path.join(path, "slurm_execute_experiment")) as f: + content = f.read() + assert "scontrol show hostnames" in content + assert "#SBATCH --gpus-per-task=1" in content + with open(os.path.join(path, "batch_query")) as f: + content = f.read() + assert "squeue" in content + with open(os.path.join(path, "batch_cancel")) as f: + content = f.read() + assert "scancel" in content From c531964348bf8209cf438b5c8f490331f240372e Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Wed, 8 Jan 2025 23:13:22 -0800 Subject: [PATCH 5/8] Cleanup a unused variable --- .../builtin/workflow_managers/slurm/workflow_manager.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index 75cdc4c12..82513b983 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -25,14 +25,6 @@ "CANCELLED+": "CANCELLED", } -_ensure_job_id_snippet = r""" -job_id=$(< {experiment_run_dir}/.slurm_job) -if [ -z "${job_id:-}" ]; then - echo "No valid job_id found" 1>&2 - exit 1 -fi -""" - class Slurm(WorkflowManagerBase): """Slurm workflow manager""" From 10e749213f5237d099ba0497c666cb20176d2145 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Wed, 8 Jan 2025 23:47:23 -0800 Subject: [PATCH 6/8] Add var def for `mpi_command`, serving as a default --- .../workflow_managers/slurm/workflow_manager.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index 82513b983..d5cdd5af5 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -58,6 +58,18 @@ def __init__(self, file_path): description="hostlist variable used by various modifiers", ) + workflow_manager_variable( + name="srun_args", + default="-n {n_ranks}", + description="Arguments supplied to srun", + ) + + workflow_manager_variable( + name="mpi_command", + default="srun {srun_args}", + description="mpirun prefix, mostly served as an overridable default", + ) + register_template( name="batch_submit", src_name="batch_submit.tpl", From 235f59d4ce209610caafda930c262e876166a27b Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Thu, 9 Jan 2025 11:50:58 -0800 Subject: [PATCH 7/8] Update completion --- share/ramble/ramble-completion.bash | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/share/ramble/ramble-completion.bash b/share/ramble/ramble-completion.bash index 3716cafec..e82b0994b 100755 --- a/share/ramble/ramble-completion.bash +++ b/share/ramble/ramble-completion.bash @@ -264,7 +264,7 @@ complete -o bashdefault -o default -F _bash_completion_ramble ramble _ramble() { if $list_options then - RAMBLE_COMPREPLY="-h --help -H --all-help --color -c --config -C --config-scope -d --debug --disable-passthrough -N --disable-logger -P --disable-progress-bar --timestamp --pdb -w --workspace -D --workspace-dir -W --no-workspace --use-workspace-repo -k --insecure -l --enable-locks -L --disable-locks -m --mock --mock-applications --mock-modifiers --mock-package-managers --mock-base-applications --mock-base-modifiers --mock-base-package-managers -p --profile --sorted-profile --lines -v --verbose --stacktrace -V --version --print-shell-vars" + RAMBLE_COMPREPLY="-h --help -H --all-help --color -c --config -C --config-scope -d --debug --disable-passthrough -N --disable-logger -P --disable-progress-bar --timestamp --pdb -w --workspace -D --workspace-dir -W --no-workspace --use-workspace-repo -k --insecure -l --enable-locks -L --disable-locks -m --mock --mock-applications --mock-modifiers --mock-package-managers --mock-workflow-managers --mock-base-applications --mock-base-modifiers --mock-base-package-managers --mock-base-workflow-managers -p --profile --sorted-profile --lines -v --verbose --stacktrace -V --version --print-shell-vars" else RAMBLE_COMPREPLY="attributes clean commands config debug deployment docs edit flake8 help info license list mirror mods on python repo results software-definitions style unit-test workspace" fi @@ -273,7 +273,7 @@ _ramble() { _ramble_attributes() { if $list_options then - RAMBLE_COMPREPLY="-h --help --defined --undefined -a --all --by-attribute --applications --modifiers --package_managers --base_applications --base_modifiers --base_package_managers --maintainers --tags" + RAMBLE_COMPREPLY="-h --help --defined --undefined -a --all --by-attribute --applications --modifiers --package_managers --workflow_managers --base_applications --base_modifiers --base_package_managers --base_workflow_managers --maintainers --tags" else RAMBLE_COMREPLY="" fi From ab82b5d24bced08371ddef088b40abea8de29291 Mon Sep 17 00:00:00 2001 From: Lin Guo Date: Thu, 9 Jan 2025 16:31:36 -0800 Subject: [PATCH 8/8] Add a `SUBMITTED` ramble status Also generate the bash associative array declaration from the python status_map. --- lib/ramble/ramble/application.py | 2 +- .../workflow_managers/slurm/batch_query.tpl | 12 ++++-------- .../workflow_managers/slurm/workflow_manager.py | 15 +++++++++++++-- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/lib/ramble/ramble/application.py b/lib/ramble/ramble/application.py index edef93aac..2f9671df3 100644 --- a/lib/ramble/ramble/application.py +++ b/lib/ramble/ramble/application.py @@ -66,7 +66,7 @@ experiment_status = Enum( "experiment_status", - ["UNKNOWN", "SETUP", "RUNNING", "COMPLETE", "SUCCESS", "FAILED", "CANCELLED"], + ["UNKNOWN", "SETUP", "SUBMITTED", "RUNNING", "COMPLETE", "SUCCESS", "FAILED", "CANCELLED"], ) _NULL_CONTEXT = "null" diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl b/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl index ff462102b..8995edd75 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl +++ b/var/ramble/repos/builtin/workflow_managers/slurm/batch_query.tpl @@ -7,19 +7,15 @@ if [ -z "${job_id:-}" ]; then exit 1 fi +# Set up the status_map mapping between +# sacct/squeue status to ramble counterpart +{declare_status_map} + status=$(squeue -h -o "%t" -j "${job_id}" 2>/dev/null) if [ -z "$status" ]; then status=$(sacct -j "${job_id}" -o state -X -n | xargs) fi if [ ! -z "$status" ]; then - # Define a mapping between sacct/squeue status to ramble counterpart - declare -A status_map - status_map["PD"]="SETUP" - status_map["R"]="RUNNING" - status_map["CF"]="SETUP" - status_map["CG"]="COMPLETE" - status_map["COMPLETED"]="COMPLETE" - status_map["CANCELLED+"]="CANCELLED" if [ -v status_map["$status"] ]; then status=${status_map["$status"]} fi diff --git a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py index d5cdd5af5..105a1c855 100644 --- a/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py +++ b/var/ramble/repos/builtin/workflow_managers/slurm/workflow_manager.py @@ -16,7 +16,7 @@ # Mapping from squeue/sacct status to Ramble status _STATUS_MAP = { - "PD": "SETUP", + "PD": "SUBMITTED", "R": "RUNNING", "CF": "SETUP", "CG": "COMPLETE", @@ -26,6 +26,14 @@ } +def _declare_status_map(): + """A utility to convert the `_STATUS_MAP` into a bash array""" + entries = ["declare -A status_map"] + for k, v in _STATUS_MAP.items(): + entries.append(f'status_map["{k}"]="{v}"') + return "\n".join(entries) + + class Slurm(WorkflowManagerBase): """Slurm workflow manager""" @@ -77,7 +85,10 @@ def __init__(self, file_path): ) register_template( - name="batch_query", src_name="batch_query.tpl", dest_name="batch_query" + name="batch_query", + src_name="batch_query.tpl", + dest_name="batch_query", + extra_vars={"declare_status_map": _declare_status_map()}, ) register_template(