From 9870bf560b5e6dab0a604493d839649f01ac251e Mon Sep 17 00:00:00 2001 From: Paul Ferrell Date: Thu, 8 Sep 2022 08:45:14 -0600 Subject: [PATCH] Resurrected scheduler changes. --- lib/pavilion/jobs.py | 3 +- lib/pavilion/parsers/expressions.py | 2 +- lib/pavilion/resolver.py | 45 +++++- lib/pavilion/schedulers/advanced.py | 166 ++++++++------------- lib/pavilion/schedulers/basic.py | 23 ++- lib/pavilion/schedulers/chunking.py | 166 +++++++++++++++++++++ lib/pavilion/schedulers/config.py | 9 ++ lib/pavilion/schedulers/node_selection.py | 36 ----- lib/pavilion/schedulers/plugins/slurm.py | 6 +- lib/pavilion/schedulers/scheduler.py | 14 +- lib/pavilion/schedulers/vars.py | 169 +++++++++++++++------- lib/pavilion/test_config/file_format.py | 7 +- lib/pavilion/var_dict.py | 5 +- test/tests/sched_tests.py | 10 +- test/tests/slurm_tests.py | 6 +- 15 files changed, 444 insertions(+), 223 deletions(-) create mode 100644 lib/pavilion/schedulers/chunking.py delete mode 100644 lib/pavilion/schedulers/node_selection.py diff --git a/lib/pavilion/jobs.py b/lib/pavilion/jobs.py index 5e94bcb10..9c3ec44ac 100644 --- a/lib/pavilion/jobs.py +++ b/lib/pavilion/jobs.py @@ -114,8 +114,7 @@ def save_node_data(self, nodes: Nodes): with (self.path/self.NODE_INFO_FN).open('wb') as data_file: pickle.dump(nodes, data_file) except OSError as err: - raise JobError( - "Could not save node data: {}".format(err)) + raise JobError("Could not save node data: {}".format(err)) def load_sched_data(self) -> Nodes: """Load the scheduler data that was saved from the kickoff time.""" diff --git a/lib/pavilion/parsers/expressions.py b/lib/pavilion/parsers/expressions.py index 185fbef91..30ee2172f 100644 --- a/lib/pavilion/parsers/expressions.py +++ b/lib/pavilion/parsers/expressions.py @@ -346,7 +346,7 @@ def math_expr(self, items) -> lark.Token: return base_tok - # This have been generalized. + # These have been generalized. add_expr = math_expr mult_expr = math_expr diff --git a/lib/pavilion/resolver.py b/lib/pavilion/resolver.py index 3348215b5..0f289d59b 100644 --- a/lib/pavilion/resolver.py +++ b/lib/pavilion/resolver.py @@ -598,6 +598,15 @@ def load_raw_configs(self, tests, host, modes): test_cfg['host'] = host test_cfg['modes'] = modes + # Set up a schedule section if one doesn't already exist, and + # add a default chunking group (the suite.test_name) if needed + schedule = test_cfg.get('schedule', {}) + test_cfg['schedule'] = schedule + chunking = schedule.get('chunking', {}) + test_cfg['schedule']['chunking'] = chunking + if not chunking['group']: + chunking['group'] = '{}.{}'.format(test_suite, test_cfg_name) + all_tests[test_suite] = suite_tests # Now that we know we've loaded and resolved a given suite, @@ -1049,17 +1058,47 @@ def resolve_permutations(self, test_cfg: Dict, base_var_man: variables.VariableS # that they are permutation variables. These are variables that refer to themselves # (either directly (a.b->a.c) or indirectly (a.b -> d -> a.c). all_var_men = [] + resolvable = [] for var_man in var_men: basic_per_vars = [var for var_set, var in used_per_vars if var_set == 'var'] _, could_resolve = var_man.resolve_references(partial=True, skip_deps=basic_per_vars) # Resolve permutations only for those 'could_resolve' variables that # we actually permute over. - all_var_men.extend(var_man.get_permutations( - [('var', var_name) for var_name in could_resolve - if var_name in could_resolve])) + + resolvable = [('var', var_name) for var_name in could_resolve + if var_name in could_resolve] + all_var_men.extend(var_man.get_permutations(resolvable)) + # Remove vars we just resolved. It's ok that we grab the set from the last permutation, + # as they will always be the same across all permutations. + for resolved in resolvable: + used_per_vars.remove(resolved) + var_men = all_var_men + # Permute on 'sched.chunks'. This one is special - it depends on the scheduler + # configuration, but it must be permuted on before the scheduler vars are pulled. + if ('sched', 'chunks') in used_per_vars: + used_per_vars.remove(('sched', 'chunks')) + # Make and resolve a dummy variable manager. + dummy_var_man = copy.deepcopy(var_men[0]) + dummy_var_man.resolve_references(partial=True) + # Resolve the scheduler section and get a chunk count, otherwise just use 1 chunk. + try: + dummy_sched_cfg = resolve.test_config(sched_cfg, dummy_var_man) + except KeyError: + chunk_count = 1 + else: + chunk_count = sched.get_chunk_count(dummy_sched_cfg) + + # There's not actually anything to permute across yet, so just make extra duplicates + # of each test for each chunk. They'll get chunk ids assigned automatically. + all_var_men = [] + for var_man in var_men: + for i in range(chunk_count): + all_var_men.append(copy.deepcopy(var_man)) + var_men = all_var_men + # Everything left at this point will require the sched vars to deal with. all_var_men = [] for var_man in var_men: diff --git a/lib/pavilion/schedulers/advanced.py b/lib/pavilion/schedulers/advanced.py index e47e4f408..042e996e7 100644 --- a/lib/pavilion/schedulers/advanced.py +++ b/lib/pavilion/schedulers/advanced.py @@ -10,7 +10,8 @@ from pavilion.status_file import STATES from pavilion.test_run import TestRun from pavilion.types import NodeInfo, Nodes, NodeList, NodeSet, NodeRange -from .config import validate_config, AVAILABLE, BACKFILL, calc_node_range +from .chunking import ChunkSetManager, Chunks +from .config import validate_config, AVAILABLE, calc_node_range, SchedConfigError from .scheduler import SchedulerPlugin, SchedulerPluginError from .vars import SchedulerVariables @@ -30,7 +31,7 @@ def __init__(self, name, description, priority=SchedulerPlugin.PRIO_COMMON): self._nodes = None # type: Union[Nodes, None] self._node_lists = [] # type: List[NodeList] - self._chunks = ChunksByNodeListId({}) # type: ChunksByNodeListId + self._chunk_sets = ChunkSetManager() # These additional methods need to be defined for advanced schedulers. @@ -77,6 +78,37 @@ def _transform_raw_node_data(self, sched_config, node_data, extra) -> NodeInfo: raise NotImplementedError("This must be implemented by the scheduler plugin.") + def get_chunk_count(self, raw_sched_config: dict) -> int: + """Get the number of chunks for the given configuration. On config errors, 1 is + returned and errors are deferred until actual variable retrieval time.""" + + try: + sched_config = validate_config(raw_sched_config) + except SchedConfigError: + return 1 + + if self._nodes is None: + self._nodes = self._get_system_inventory(sched_config) + filtered_nodes, filter_reasons = self._filter_nodes(sched_config) + filtered_nodes.sort() + + chunking = sched_config['chunking'] + chunks = Chunks(filtered_nodes, chunking['size'], chunking['extra'], + chunking['node_selection']) + return len(chunks) + + def _get_node_list_id(self, nodes: NodeList) -> int: + """Get the node list lookup id given a list of nodes.""" + # NOTE: This looks kind of bad, and is 40% slower than creating a hashable list + # and using a dict, but that only amounts to 0.02 seconds for 1000 lists of 1000 items. + try: + node_list_id = self._node_lists.index(nodes) + except ValueError: + node_list_id = len(self._node_lists) + self._node_lists.append(nodes) + + return node_list_id + def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables: """Get initial variables (and chunks) for this scheduler.""" @@ -120,16 +152,30 @@ def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables: .format(min_nodes, max_nodes, len(filtered_nodes), reasons, pprint.pformat(sched_config))) - try: - node_list_id = self._node_lists.index(filtered_nodes) - except ValueError: - node_list_id = len(self._node_lists) - self._node_lists.append(filtered_nodes) + node_list_id = self._get_node_list_id(filtered_nodes) - chunks = self._get_chunks(node_list_id, sched_config) + chunking = sched_config['chunking'] + if self._chunk_sets.has_chunk_set(node_list_id, chunking): + chunks = self._chunk_sets.get_chunk_set(node_list_id, chunking) + else: + chunks = self._chunk_sets.create_chunk_set(node_list_id, chunking, + filtered_nodes) + chunk_id, chunk = chunks.get_group_chunk(chunking['group']) + + test_nodes = None + if not sched_config['share_allocation']: + if sched_config['chunking']['size'] not in (0, None): + chunk_min, chunk_max = calc_node_range(sched_config, len(chunk)) + test_nodes = list(chunk[0:chunk_max]) + + sched_vars = self.VAR_CLASS( + sched_config=sched_config, + node_info=self._nodes, + node_list_id=node_list_id, + chunks=chunks.chunks, + chunk_id=chunk_id, + test_nodes=test_nodes) - sched_vars = self.VAR_CLASS(sched_config, nodes=self._nodes, chunks=chunks, - node_list_id=node_list_id) sched_vars.add_errors(errors) return sched_vars @@ -148,7 +194,7 @@ def get_final_vars(self, test: TestRun) -> SchedulerVariables: sched_config = validate_config(test.config['schedule']) - return self.VAR_CLASS(sched_config, nodes=nodes, deferred=False) + return self.VAR_CLASS.finalize(sched_config, nodes) def _get_system_inventory(self, sched_config: dict) -> Union[Nodes, None]: """Returns a dictionary of node data, or None if the scheduler does not @@ -241,55 +287,6 @@ def _filter_custom(self, sched_config: dict, node_name: str, node: NodeInfo) \ return None - def _get_chunks(self, node_list_id, sched_config) -> List[NodeSet]: - """Chunking is specific to the node list, chunk size, and node selection - settings of a job. The actual chunk used by a test_run won't be known until - after the test is at least partially resolved, however. Until then, it only - knows what chunks are available. - - This method retrieves or creates a list of ChunkInfo objects, and returns - it.""" - - nodes = list(self._node_lists[node_list_id]) - - chunk_size = sched_config['chunking']['size'] - # Chunk size 0/null is all the nodes. - if chunk_size in (0, None) or chunk_size > len(nodes): - chunk_size = len(nodes) - chunk_extra = sched_config['chunking']['extra'] - node_select = sched_config['chunking']['node_selection'] - - chunk_id = (node_list_id, chunk_size, node_select, chunk_extra) - # If we already have chunks for our node list and settings, just return what - # we've got. - if chunk_id in self._chunks: - return self._chunks[chunk_id] - - # We can potentially have no nodes, in which case return an empty chunk. - if chunk_size == 0: - self._chunks[chunk_id] = [NodeSet(frozenset([]))] - return self._chunks[chunk_id] - - chunks = [] - for i in range(len(nodes)//chunk_size): - # Apply the selection function and get our chunk nodes. - chunk = self.NODE_SELECTION[node_select](nodes, chunk_size) - # Filter out any chosen from our node list. - nodes = [node for node in nodes if node not in chunk] - chunks.append(chunk) - - if nodes and chunk_extra == BACKFILL: - backfill = chunks[-1][:chunk_size - len(nodes)] - chunks.append(backfill + nodes) - - chunk_info = [] - for chunk in chunks: - chunk_info.append(NodeSet(frozenset(chunk))) - - self._chunks[chunk_id] = chunk_info - - return chunk_info - def schedule_tests(self, pav_cfg, tests: List[TestRun]): """Schedule each of the given tests using this scheduler using a separate allocation (if applicable) for each. @@ -301,52 +298,14 @@ def schedule_tests(self, pav_cfg, tests: List[TestRun]): # type: Dict[FrozenSet[str], List[TestRun]] by_chunk = collections.defaultdict(lambda: []) - usage = collections.defaultdict(lambda: 0) # type: Dict[FrozenSet[str], int] sched_configs = {} # type: Dict[str, dict] for test in tests: - node_list_id = int(test.var_man.get('sched.node_list_id')) - sched_config = validate_config(test.config['schedule']) sched_configs[test.full_id] = sched_config - chunk_spec = test.config.get('chunk') - if chunk_spec != 'any': - # This is validated in test object creation. - chunk_spec = int(chunk_spec) - - chunk_size = sched_config['chunking']['size'] - node_select = sched_config['chunking']['node_selection'] - chunk_extra = sched_config['chunking']['extra'] - - node_list = self._node_lists[node_list_id] - if chunk_size in (None, 0) or chunk_size > len(node_list): - chunk_size = len(node_list) - - chunks_id = (node_list_id, chunk_size, node_select, chunk_extra) - - chunks = self._chunks[chunks_id] - - if chunk_spec == 'any': - least_used = None - least_used_chunk = None - for chunk in chunks: - chunk_usage = usage[chunk] - if chunk_usage == 0: - least_used_chunk = chunk - break - elif least_used is None or chunk_usage < least_used: - least_used = chunk_usage - least_used_chunk = chunk - - usage[least_used_chunk] += 1 - by_chunk[least_used_chunk].append(test) - else: - if chunk_spec > len(chunks): - raise SchedulerPluginError( - "Test selected chunk '{}', but there are only {} chunks " - "available.".format(chunk_spec, len(chunks))) - chunk = chunks[chunk_spec] - by_chunk[chunk].append(test) + + chunk = test.var_man.get('sched.chunk') + by_chunk[chunk].append(test) for chunk, tests in by_chunk.items(): self._schedule_chunk(pav_cfg, chunk, tests, sched_configs) @@ -361,9 +320,11 @@ def _schedule_chunk(self, pav_cfg, chunk: NodeSet, tests: List[TestRun], sched_configs: Dict[str, dict]): # There are three types of test launches. - # 1. Tests that can share an allocation ( + # 1. Tests that can share an allocation share_groups = collections.defaultdict(list) + # 2. Tests that want the scheduler to figure out their exact nodes. flex_tests: List[TestRun] = [] + # 3. Tests that run by themselves. indi_tests: List[TestRun] = [] for test in tests: @@ -470,6 +431,7 @@ def _schedule_flex_chunk(self, pav_cfg, tests: List[TestRun], chunking, leaving the node picking to the scheduler.""" for test in tests: + print(chunk) node_info = {node: self._nodes[node] for node in chunk} try: diff --git a/lib/pavilion/schedulers/basic.py b/lib/pavilion/schedulers/basic.py index 5612434e9..95eef5c71 100644 --- a/lib/pavilion/schedulers/basic.py +++ b/lib/pavilion/schedulers/basic.py @@ -7,7 +7,7 @@ from pavilion.jobs import Job, JobError from pavilion.status_file import STATES from pavilion.test_run import TestRun -from pavilion.types import NodeInfo, Nodes +from pavilion.types import NodeInfo, Nodes, NodeList from .config import validate_config, calc_node_range from .scheduler import SchedulerPlugin, SchedulerPluginError from .vars import SchedulerVariables @@ -18,9 +18,16 @@ class SchedulerPluginBasic(SchedulerPlugin, ABC): on manually set parameters in 'schedule.cluster_info'.""" def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables: - """Get the initial variables for the basic scheduler.""" + """Get the initial variables for the basic scheduler. + """ - return self.VAR_CLASS(sched_config) + return self.VAR_CLASS( + sched_config=sched_config, + node_info=Nodes({}), + chunks=[NodeList([])], + chunk_id=0, + node_list_id=0, + ) def get_final_vars(self, test: TestRun) -> SchedulerVariables: """Gather node information from within the allocation.""" @@ -37,7 +44,15 @@ def get_final_vars(self, test: TestRun) -> SchedulerVariables: for node in alloc_nodes: nodes[node] = self._get_alloc_node_info(node) - return self.VAR_CLASS(sched_config, nodes=nodes, deferred=False) + return self.VAR_CLASS( + sched_config=sched_config, + node_info=nodes, + chunks=[NodeList(list(nodes.keys()))], + chunk_id=0, + node_list_id=0, + test_nodes=NodeList(list(nodes.keys())), + deferred=False, + ) def _get_alloc_node_info(self, node_name) -> NodeInfo: """Given that this is running on an allocation, get information about diff --git a/lib/pavilion/schedulers/chunking.py b/lib/pavilion/schedulers/chunking.py new file mode 100644 index 000000000..dddbb3176 --- /dev/null +++ b/lib/pavilion/schedulers/chunking.py @@ -0,0 +1,166 @@ +"""Classes and functions for manage node chunks.""" +import random +from typing import List, Dict, Union + +from pavilion.schedulers.config import BACKFILL +from pavilion.types import NodeList, NodeSet + + +def contiguous(node_list: List[str], chunk_size: int) -> List[str]: + """Just return a sequence of nodes. This can probably be improved.""" + return node_list[:chunk_size] + + +def random_chunks(node_list: List[str], chunk_size: int) -> List[str]: + """Select nodes randomly from the node list.""" + + return random.sample(node_list, chunk_size) + + +def rand_dist(node_list: List[str], chunk_size: int) -> List[str]: + """Divide the nodes into segments across the breadth of those available, and + randomly choose one node from each segment.""" + + picked = [] + step = len(node_list)//chunk_size + + for i in range(chunk_size): + picked.append(node_list[i*step + random.randint(0, step-1)]) + + return picked + + +def distributed(node_list: List[str], chunk_size: int) -> List[str]: + """Pick an evenly spaced selection of nodes.""" + + step = len(node_list)//chunk_size + + return [node_list[i*step] for i in range(chunk_size)] + + +class Chunks: + """A collection of chunks and its usage information.""" + + NODE_SELECTION = { + 'contiguous': contiguous, + 'random': random_chunks, + 'rand_dist': rand_dist, + 'distributed': distributed, + } + + def __init__(self, nodes: NodeList, size: Union[int, float], extra: str, + select: str): + + self.chunks = self._create_chunks(nodes, size, extra, select) + + self._group_usage = {} + + def __len__(self): + """Get the number of chunks.""" + return len(self.chunks) + + def _create_chunks(self, nodes: NodeList, size: Union[int, float], extra: str, + select: str) -> List[NodeSet]: + """Create chunks from the given nodelist, and return the chunk lists.""" + + if isinstance(size, float): + # Consider float sizes to be a percentage. + size = min(int(size * len(nodes)), 1) + + chunks = [] + for i in range(len(nodes)//size): + + # Apply the selection function and get our chunk nodes. + chunk = self.NODE_SELECTION[select](nodes, size) + # Filter out any chosen from our node list. + nodes = [node for node in nodes if node not in chunk] + chunks.append(chunk) + + if nodes and extra == BACKFILL: + backfill = chunks[-1][:size - len(nodes)] + chunks.append(backfill + nodes) + + chunk_info = [] + for chunk in chunks: + chunk_info.append(NodeSet(frozenset(chunk))) + + return chunk_info + + def get_group_chunk(self, group: str) -> (int, NodeList): + """Return the next chunk for the given group. Chunking groups reuse chunks + only when all chunks have been reused by the group. The exception is groups + that are named for a numerical chunk index - they always use that chunk (modulus + the number of chunks). Given a choice of chunks, chunks are returned according to + overall usage (iteratively - we can't know what chunks later tests will request). + + :returns: Returns the chunk id and the nodelist for that chunk.""" + + if group.isdigit(): + chunk_id = int(group) % len(self.chunks) + return chunk_id, self.chunks[chunk_id] + + # Initialize the group usage if it's a new group. + if group not in self._group_usage: + if group not in self._group_usage: + order = list(range(len(self.chunks))) + random.shuffle(order) + self._group_usage[group] = order + + chunk_id = self._group_usage[group].pop(0) + self._group_usage[group].append(chunk_id) + + return chunk_id, self.chunks[chunk_id] + + +class ChunkSetManager: + """Organizes chunk lists by chunk properties and node_list id.""" + + # These are the properties that differentiate different sets of chunks. + CHUNK_SELECT_PROPS = [ + 'extra', + 'node_selection', + 'size', + ] + + def __init__(self): + self._chunk_sets = {} + + # NOTE: nid stands for nodelist ID. It's a unique id that identifies which list of + # nodes that each set of chunks belong to. + def _mk_id_tuple(self, nid: int, chunking: dict): + """Create a hashable tuple from the node_list id and properties.""" + + return (nid,) + tuple(chunking[prop] for prop in self.CHUNK_SELECT_PROPS) + + def get_chunk_set(self, nid, chunking: dict) -> Chunks: + """Get the chunk_set for the given chunking properties.""" + + id_tpl = self._mk_id_tuple(nid, chunking) + if id_tpl not in self._chunk_sets: + # This should never happen (we should always call has_chunk_set first. + raise KeyError("Chunk set with properties {} does not exist for node_list {}." + .format(chunking, nid)) + + return self._chunk_sets[id_tpl] + + def has_chunk_set(self, nid, chunking: Dict): + """Return whether a chunk set exists for the given nodelist and chunking + properties.""" + return self._mk_id_tuple(nid, chunking) in self._chunk_sets + + def create_chunk_set(self, nid, chunking, nodes) -> Chunks: + """""" + + id_tpl = self._mk_id_tuple(nid, chunking) + + chunk_size = chunking['size'] + # Chunk size 0/null is all the nodes. + if chunk_size in (0, None) or chunk_size > len(nodes): + chunk_size = len(nodes) + extra = chunking['extra'] + select = chunking['node_selection'] + + chunks = Chunks(nodes, chunk_size, extra, select) + self._chunk_sets[id_tpl] = chunks + return chunks + diff --git a/lib/pavilion/schedulers/config.py b/lib/pavilion/schedulers/config.py index d078cccbc..10aac8510 100644 --- a/lib/pavilion/schedulers/config.py +++ b/lib/pavilion/schedulers/config.py @@ -120,6 +120,13 @@ class ScheduleConfig(yc.KeyedElem): " 'backfill (default) - The extra nodes will be padded " " out with nodes from the last chunk.\n" " 'discard' - Don't use the extra nodes.\n"), + yc.StrElem( + 'group', + help_text="Chunking 'group' to use. Defaults to the suite.test_name of " + "each test (subtitle is ignored). Tests in the same chunking " + "group won't overlap when selecting chunks.\n" + "This may also be given as an integer, to specify specific " + "chunk (modulus the number of chunks).") ] ), yc.KeyedElem( @@ -429,6 +436,7 @@ def _validate_node_list(items) -> List[str]: 'size': min_int('chunk.size', min_val=0), 'node_selection': NODE_SELECT_OPTIONS, 'extra': NODE_EXTRA_OPTIONS, + 'group': None, }, 'tasks_per_node': _validate_tasks_per_node, 'min_tasks_per_node': min_int('min_tasks_per_node', min_val=1, required=False), @@ -455,6 +463,7 @@ def _validate_node_list(items) -> List[str]: 'size': '0', 'node_selection': CONTIGUOUS, 'extra': BACKFILL, + 'group': 'unset', }, 'tasks_per_node': '1', 'min_tasks_per_node': None, diff --git a/lib/pavilion/schedulers/node_selection.py b/lib/pavilion/schedulers/node_selection.py deleted file mode 100644 index d907c37e5..000000000 --- a/lib/pavilion/schedulers/node_selection.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Callback functions for selecting nodes from a node list.""" - -from typing import List -import random as rnd - - -def contiguous(node_list: List[str], chunk_size: int) -> List[str]: - """Just return a sequence of nodes. This can probably be improved.""" - return node_list[:chunk_size] - - -def random(node_list: List[str], chunk_size: int) -> List[str]: - """Select nodes randomly from the node list.""" - - return rnd.sample(node_list, chunk_size) - - -def rand_dist(node_list: List[str], chunk_size: int) -> List[str]: - """Divide the nodes into segments across the breadth of those available, and - randomly choose one node from each segment.""" - - picked = [] - step = len(node_list)//chunk_size - - for i in range(chunk_size): - picked.append(node_list[i*step + rnd.randint(0, step-1)]) - - return picked - - -def distributed(node_list: List[str], chunk_size: int) -> List[str]: - """Pick an evenly spaced selection of nodes.""" - - step = len(node_list)//chunk_size - - return [node_list[i*step] for i in range(chunk_size)] diff --git a/lib/pavilion/schedulers/plugins/slurm.py b/lib/pavilion/schedulers/plugins/slurm.py index 61d0090a2..a06b0f2c2 100644 --- a/lib/pavilion/schedulers/plugins/slurm.py +++ b/lib/pavilion/schedulers/plugins/slurm.py @@ -131,14 +131,14 @@ def test_cmd(self): slurm_conf = self._sched_config['slurm'] - nodes = len(self._nodes) + nodes = len(self._node_info) tasks = int(self.tasks_per_node()) * nodes if self._sched_config['slurm']['mpi_cmd'] == Slurm.MPI_CMD_SRUN: cmd = ['srun', '-N', str(nodes), - '-w', Slurm.compress_node_list(self._nodes.keys()), + '-w', Slurm.compress_node_list(self._node_info.keys()), '-n', str(tasks)] cmd.extend(slurm_conf['srun_extra']) @@ -156,7 +156,7 @@ def test_cmd(self): for mca_opt in mca: cmd.extend(['--mca', mca_opt]) - hostlist = ','.join(self._nodes.keys()) + hostlist = ','.join(self._node_info.keys()) cmd.extend(['--host', hostlist]) cmd.extend(self._sched_config['slurm']['mpirun_extra']) diff --git a/lib/pavilion/schedulers/scheduler.py b/lib/pavilion/schedulers/scheduler.py index 3e38ea6d4..84d04218e 100644 --- a/lib/pavilion/schedulers/scheduler.py +++ b/lib/pavilion/schedulers/scheduler.py @@ -17,7 +17,7 @@ from pavilion.types import NodeList, PickedNodes from yapsy import IPlugin from . import config -from . import node_selection +from . import chunking from .config import validate_config, SchedConfigError, ScheduleConfig from .vars import SchedulerVariables @@ -106,13 +106,6 @@ class SchedulerPlugin(IPlugin.IPlugin): KICKOFF_SCRIPT_HEADER_CLASS = KickoffScriptHeader """The class to use when generating headers for kickoff scripts.""" - NODE_SELECTION = { - 'contiguous': node_selection.contiguous, - 'random': node_selection.random, - 'rand_dist': node_selection.rand_dist, - 'distributed': node_selection.distributed, - } - def __init__(self, name, description, priority=PRIO_CORE): """Scheduler plugin that is expected to be overriden by subclasses. The plugin will populate a set of expected 'sched' variables.""" @@ -194,6 +187,11 @@ def schedule_tests(self, pav_cfg, tests: List[TestRun]): raise NotImplementedError("Implemented in Basic/Advanced child classes.") # The remaining methods are shared by all plugins. + def get_chunk_count(self, raw_sched_config: dict) -> int: + """Get the number of chunks for the given configuration. Returns 1 on errors + and when chunking isn't supported.""" + + return 1 def get_initial_vars(self, raw_sched_config: dict) -> SchedulerVariables: """Queries the scheduler to auto-detect its current state, and returns the diff --git a/lib/pavilion/schedulers/vars.py b/lib/pavilion/schedulers/vars.py index de8b84edf..574818daa 100644 --- a/lib/pavilion/schedulers/vars.py +++ b/lib/pavilion/schedulers/vars.py @@ -1,11 +1,11 @@ """Scheduler variable base class.""" import math -from typing import List +from typing import Union, List from pavilion.deferred import DeferredVariable from pavilion.var_dict import VarDict, var_method, dfr_var_method -from ..types import NodeInfo, Nodes, NodeList, NodeSet from .config import calc_node_range +from ..types import Nodes, NodeList class SchedulerVariables(VarDict): @@ -51,9 +51,11 @@ class of this that contains all the variable functions it provides. then harvesting the results of the test run.""" def __init__(self, sched_config: dict, - nodes: Nodes = None, - chunks: List[NodeSet] = None, - node_list_id: int = None, + node_info: Nodes, + chunks: List[NodeList], + chunk_id: int, + node_list_id: int, + test_nodes: NodeList = None, deferred=True): """Initialize the scheduler var dictionary. This will be initialized when preliminary variables are gathered vs when it is no longer deferred. @@ -62,26 +64,50 @@ def __init__(self, sched_config: dict, nodes that are part of the actual allocation. 'chunks' is not given in this case. - :param nodes: The dict of node names to node data. If None, will default to + :param node_info: The dict of node names to node data. If None, will default to an empty dict. :param sched_config: The scheduler configuration for the corresponding test. - :param chunks: The list of chunks, each of which is a list of node names. If - None, will default to an empty list. :param node_list_id: Should always be included when chunks is included. Provides the scheduler with a way to recover the original node list that was chunked without having to store it. + :param chunks: The list of chunks this could have used. + :param chunk_id: The id of the selected chunk. + :param test_nodes: The final list of nodes allocated for the test. :param deferred: Whether the variables are deferred. """ super().__init__('sched', deferred=deferred) self._sched_config = sched_config - self._nodes = nodes if nodes else {} - self._chunks = chunks if chunks else [] + self._node_info = node_info + self._chunk = chunks[chunk_id] + self._chunks = len(chunks) self._node_list_id = node_list_id + self._chunk_id = chunk_id + self._test_nodes = test_nodes self._keys = self._find_vars() + @classmethod + def finalize(cls, sched_config, node_info): + """Set up the scheduler variables with the finalized node list. The 'node_info' + variable should contain only those nodes for the final allocation, everything else + will be cut down to that or faked. Only those final nodes are needed to deal with + any deferred variables, everything involving chunks and general node lists will + have already been resolved and won't be queried.""" + + nodes = NodeList(list(node_info.keys())) + + return cls( + sched_config=sched_config, + node_info=node_info, + node_list_id=0, + chunks=[nodes], + chunk_id=0, + test_nodes=nodes, + deferred=False, + ) + NO_EXAMPLE = '' def info(self, key): @@ -118,13 +144,12 @@ def __repr__(self): # front-end nodes have less resources than any compute node. Note that # they are all non-deferred, so they're safe to use in build scripts, - def _get_min(self, nodes: List[NodeInfo], attr: str, default: int): + def _get_min(self, values, attr: str, default: int): """Get the minimum of the given attribute across the list of nodes, settling for the cluster_info value, and then the default.""" min_val = None - for node in nodes: - val = node.get(attr) + for val in values: if val is not None: if min_val is None or val < min_val: min_val = val @@ -172,7 +197,7 @@ def tasks_per_node(self) -> int: else: tasks = tasks_per_node else: # Should be a float - if self._nodes: + if self._node_info: tasks = max(math.floor(tasks_per_node * int(self.min_cpus())), 1) else: tasks = 1 @@ -182,31 +207,47 @@ def tasks_per_node(self) -> int: return tasks @var_method - def chunk_ids(self): - """A list of indices of the available chunks.""" - return list(range(len(self._chunks))) + def chunks(self): + """A list of the chunk ids. For the specific chunk id for a test see sched.chunk_id + even when permuting over sched.chunks.""" + + return list(range(self._chunks)) @var_method def chunk_size(self): """The size of each chunk.""" - if self._chunks: - return str(len(self._chunks[0])) - else: - return '' + return str(len(self._chunk)) + + @var_method + def chunk_nodes(self): + """A list of nodes in the selected chunk. The allocation will be on these nodes or + a subset of them.""" + + return [node for node in self._chunk] + + @var_method + def chunk_id(self): + """The id of chunk that was selected.""" + + return self._chunk_id + + @var_method + def chunk_ids(self): + """This variable is deprecated and always returns an empty list. The + test resolver will warn the user if this is used.""" + + return [] @var_method def requested_nodes(self): - """Number of requested nodes.""" + """The requested node count or range.""" - if self._chunks: - nmin, nmax = calc_node_range(self._sched_config, len(self._chunks[0])) - if nmin == nmax: - return str(nmax) - else: - return '{}-{}'.format(nmin, nmax) + nmin, nmax = calc_node_range(self._sched_config, len(self._chunk)) + if nmin == nmax: + return str(nmax) else: - return '' + return '{}-{}'.format(nmin, nmax) @var_method def node_list_id(self): @@ -220,17 +261,23 @@ def node_list_id(self): @var_method def min_cpus(self): - """Get a minimum number of cpus available on each (filtered) noded. Defaults to + """Get a minimum number of cpus available on each (filtered) node. Defaults to 1 if unknown.""" - return self._get_min(list(self._nodes.values()), 'cpus', 1) + default = 1 + + return self._get_min([node.get('cpus', default) for node in self._node_info.values()], + 'cpus', default) @var_method def min_mem(self): """Get a minimum for any node across each (filtered) nodes. Returns a value in bytes (4 GB if unknown).""" - return self._get_min(list(self._nodes.values()), 'mem', 4*1024**3) + default = 4*1024**3 + + return self._get_min([node.get('mem', default) for node in self._node_info.values()], + 'mem', default) @var_method def nodes(self) -> int: @@ -238,8 +285,8 @@ def nodes(self) -> int: supports auto-detection, this will be the filtered count of nodes. Otherwise, this will be the 'cluster_info.node_count' value, or 1 if that isn't set.""" - if self._nodes: - return len(self._nodes) + if self._node_info: + return len(self._node_info) if self._sched_config['cluster_info'].get('node_count') is None: return 1 @@ -251,42 +298,58 @@ def node_list(self) -> NodeList: """The list of node names on the system. If the scheduler supports auto-detection, will be the filtered list. This list will otherwise be empty.""" - if self._nodes: - return NodeList(list(self._nodes.keys())) + if self._node_info: + return NodeList(list(self._node_info.keys())) else: return NodeList([]) - @dfr_var_method - def test_nodes(self) -> int: - """The number of nodes for this specific test, determined once the test - has an allocation. Note that the allocation size may be larger than this - number.""" + @var_method + def test_nodes(self) -> Union[int, DeferredVariable]: + """The number of nodes for this specific test's job. This may not be known (and + hence deferred) until the test has acquired the allocation.""" - return len(self._nodes) + if self._test_nodes: + return len(self._test_nodes) + else: + return DeferredVariable() - @dfr_var_method - def test_node_list(self) -> NodeList: - """The list of nodes by name allocated for this test. Note that more - nodes than this may exist in the allocation.""" + @var_method + def test_node_list(self) -> Union[NodeList, DeferredVariable]: + """The list of nodes by name allocated for this test's job. This is available + at test creation time when using chunking, but is otherwise deferred.""" - return NodeList(list(self._nodes.keys())) + if self._test_nodes: + return self._test_nodes + else: + return DeferredVariable() - @dfr_var_method + @var_method def test_min_cpus(self): """The min cpus for each node in the chunk. Defaults to 1 if no info is available.""" - return self._get_min(list(self._nodes.values()), 'cpus', 1) + if self._test_nodes: + cpus = [self._node_info[node]['cpus'] for node in self._test_nodes] + return self._get_min(cpus, 'cpus', 1) + else: + return DeferredVariable() - @dfr_var_method + @var_method def test_min_mem(self): """The min memory for each node in the chunk in bytes. Defaults to 4 GB if no info is available.""" - return self._get_min(list(self._nodes.values()), 'mem', 4*1024**3) + if self._test_nodes: + mems = [self._node_info[node]['mem'] for node in self._test_nodes] + return self._get_min(mems, 'mem', 4 * 1024 ** 3) + else: + return DeferredVariable() - @dfr_var_method + @var_method def tasks_total(self): """Total tasks to create, based on number of nodes actually acquired.""" - return self._sched_config.get('tasks_per_node', 1) * len(self._nodes) + if self._test_nodes: + return self._sched_config.get('tasks_per_node', 1) * len(self._test_nodes) + else: + return DeferredVariable() diff --git a/lib/pavilion/test_config/file_format.py b/lib/pavilion/test_config/file_format.py index c8a2ee9e2..88827acd0 100644 --- a/lib/pavilion/test_config/file_format.py +++ b/lib/pavilion/test_config/file_format.py @@ -331,10 +331,9 @@ class TestConfigLoader(yc.YamlConfigLoader): "group of tests. Not to be set by the user."), yc.StrElem( 'chunk', default='any', - help_text="The scheduler chunk to run on. Will run on 'any' chunk " - "by default, but the chunk may be specified by number. The " - "available chunk ids are in the sched.chunks variable, and" - "can be permuted on." + help_text="This config element is deprecated. Use " + "schedule.chunking.group to set the chunking group" + "(or specify a specific chunk) instead." ), CondCategoryElem( 'only_if', sub_elem=yc.ListElem(sub_elem=yc.StrElem()), diff --git a/lib/pavilion/var_dict.py b/lib/pavilion/var_dict.py index 9f1223f5d..b01d20201 100644 --- a/lib/pavilion/var_dict.py +++ b/lib/pavilion/var_dict.py @@ -28,6 +28,9 @@ def _func(self): # This is primarily to enforce the fact that these can't take arguments value = func(self) + if isinstance(value, DeferredVariable): + return value + norm_value = normalize_value(value) if norm_value is None: raise ValueError( @@ -162,7 +165,7 @@ def __getitem__(self, key): if self.DEFER_ERRORS: formatted_err = traceback.format_exc() msg = "Error getting key '{}' (See logs for full traceback): {}"\ - .format(key, err) + .format(key, err) self.data[key] = "<{}>".format(msg) self._errors.append(msg + '\n' + formatted_err) else: diff --git a/test/tests/sched_tests.py b/test/tests/sched_tests.py index 5c4788f92..6604e0701 100644 --- a/test/tests/sched_tests.py +++ b/test/tests/sched_tests.py @@ -91,7 +91,7 @@ def test_sched_var_values(self): sched_vars = schedulers.SchedulerVariables( config, - nodes=nodes, + node_info=nodes, chunks=chunks, node_list_id=5, deferred=False, @@ -144,7 +144,7 @@ def test_sched_var_values_basic(self): sched_vars = schedulers.SchedulerVariables( config, - nodes=nodes, + node_info=nodes, chunks=chunks, node_list_id=5, deferred=False, @@ -252,7 +252,7 @@ def test_chunking_size(self): chunk_size = size chunk_id = (node_list_id, chunk_size, 'contiguous', extra) - chunks = dummy._chunks[chunk_id] + chunks = dummy._chunk_sets[chunk_id] for chunk in chunks: self.assertEqual(len(chunk), chunk_size) @@ -294,8 +294,8 @@ def test_node_selection(self): # Exercise each node selection method. sched_vars = dummy.get_initial_vars(sched_config) node_list_id = int(sched_vars['node_list_id']) - chunks = dummy._chunks[(node_list_id, chunk_size, - select, sconfig.BACKFILL)] + chunks = dummy._chunk_sets[(node_list_id, chunk_size, + select, sconfig.BACKFILL)] # This is here to debug node selection and visually examine the node # selection algorithm results, as they are mostly random. diff --git a/test/tests/slurm_tests.py b/test/tests/slurm_tests.py index 612c1e836..4102d6367 100644 --- a/test/tests/slurm_tests.py +++ b/test/tests/slurm_tests.py @@ -115,7 +115,11 @@ def test_node_list_parsing(self): ) for ex, answer in examples: - nodes = slurm.parse_node_list(ex) + try: + nodes = slurm.parse_node_list(ex) + except Exception as err: + self.fail("Failed on example '{}': {}".format(ex, err)) + self.assertEqual(nodes, answer) bad_examples = (