Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resurrected scheduler changes. #535

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions lib/pavilion/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ def save_node_data(self, nodes: Nodes):
with (self.path/self.NODE_INFO_FN).open('wb') as data_file:
pickle.dump(nodes, data_file)
except OSError as err:
raise JobError(
"Could not save node data: {}".format(err))
raise JobError("Could not save node data: {}".format(err))

def load_sched_data(self) -> Nodes:
"""Load the scheduler data that was saved from the kickoff time."""
Expand Down
2 changes: 1 addition & 1 deletion lib/pavilion/parsers/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def math_expr(self, items) -> lark.Token:

return base_tok

# This have been generalized.
# These have been generalized.
add_expr = math_expr
mult_expr = math_expr

Expand Down
45 changes: 42 additions & 3 deletions lib/pavilion/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,15 @@ def load_raw_configs(self, tests, host, modes):
test_cfg['host'] = host
test_cfg['modes'] = modes

# Set up a schedule section if one doesn't already exist, and
# add a default chunking group (the suite.test_name) if needed
schedule = test_cfg.get('schedule', {})
test_cfg['schedule'] = schedule
chunking = schedule.get('chunking', {})
test_cfg['schedule']['chunking'] = chunking
if not chunking['group']:
chunking['group'] = '{}.{}'.format(test_suite, test_cfg_name)

all_tests[test_suite] = suite_tests

# Now that we know we've loaded and resolved a given suite,
Expand Down Expand Up @@ -1049,17 +1058,47 @@ def resolve_permutations(self, test_cfg: Dict, base_var_man: variables.VariableS
# that they are permutation variables. These are variables that refer to themselves
# (either directly (a.b->a.c) or indirectly (a.b -> d -> a.c).
all_var_men = []
resolvable = []
for var_man in var_men:
basic_per_vars = [var for var_set, var in used_per_vars if var_set == 'var']
_, could_resolve = var_man.resolve_references(partial=True, skip_deps=basic_per_vars)

# Resolve permutations only for those 'could_resolve' variables that
# we actually permute over.
all_var_men.extend(var_man.get_permutations(
[('var', var_name) for var_name in could_resolve
if var_name in could_resolve]))

resolvable = [('var', var_name) for var_name in could_resolve
if var_name in could_resolve]
all_var_men.extend(var_man.get_permutations(resolvable))
# Remove vars we just resolved. It's ok that we grab the set from the last permutation,
# as they will always be the same across all permutations.
for resolved in resolvable:
used_per_vars.remove(resolved)

var_men = all_var_men

# Permute on 'sched.chunks'. This one is special - it depends on the scheduler
# configuration, but it must be permuted on before the scheduler vars are pulled.
if ('sched', 'chunks') in used_per_vars:
used_per_vars.remove(('sched', 'chunks'))
# Make and resolve a dummy variable manager.
dummy_var_man = copy.deepcopy(var_men[0])
dummy_var_man.resolve_references(partial=True)
# Resolve the scheduler section and get a chunk count, otherwise just use 1 chunk.
try:
dummy_sched_cfg = resolve.test_config(sched_cfg, dummy_var_man)
except KeyError:
chunk_count = 1
else:
chunk_count = sched.get_chunk_count(dummy_sched_cfg)

# There's not actually anything to permute across yet, so just make extra duplicates
# of each test for each chunk. They'll get chunk ids assigned automatically.
all_var_men = []
for var_man in var_men:
for i in range(chunk_count):
all_var_men.append(copy.deepcopy(var_man))
var_men = all_var_men

# Everything left at this point will require the sched vars to deal with.
all_var_men = []
for var_man in var_men:
Expand Down
166 changes: 64 additions & 102 deletions lib/pavilion/schedulers/advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from pavilion.status_file import STATES
from pavilion.test_run import TestRun
from pavilion.types import NodeInfo, Nodes, NodeList, NodeSet, NodeRange
from .config import validate_config, AVAILABLE, BACKFILL, calc_node_range
from .chunking import ChunkSetManager, Chunks
from .config import validate_config, AVAILABLE, calc_node_range, SchedConfigError
from .scheduler import SchedulerPlugin, SchedulerPluginError
from .vars import SchedulerVariables

Expand All @@ -30,7 +31,7 @@ def __init__(self, name, description, priority=SchedulerPlugin.PRIO_COMMON):

self._nodes = None # type: Union[Nodes, None]
self._node_lists = [] # type: List[NodeList]
self._chunks = ChunksByNodeListId({}) # type: ChunksByNodeListId
self._chunk_sets = ChunkSetManager()

# These additional methods need to be defined for advanced schedulers.

Expand Down Expand Up @@ -77,6 +78,37 @@ def _transform_raw_node_data(self, sched_config, node_data, extra) -> NodeInfo:

raise NotImplementedError("This must be implemented by the scheduler plugin.")

def get_chunk_count(self, raw_sched_config: dict) -> int:
"""Get the number of chunks for the given configuration. On config errors, 1 is
returned and errors are deferred until actual variable retrieval time."""

try:
sched_config = validate_config(raw_sched_config)
except SchedConfigError:
return 1

if self._nodes is None:
self._nodes = self._get_system_inventory(sched_config)
filtered_nodes, filter_reasons = self._filter_nodes(sched_config)
filtered_nodes.sort()

chunking = sched_config['chunking']
chunks = Chunks(filtered_nodes, chunking['size'], chunking['extra'],
chunking['node_selection'])
return len(chunks)

def _get_node_list_id(self, nodes: NodeList) -> int:
"""Get the node list lookup id given a list of nodes."""
# NOTE: This looks kind of bad, and is 40% slower than creating a hashable list
# and using a dict, but that only amounts to 0.02 seconds for 1000 lists of 1000 items.
try:
node_list_id = self._node_lists.index(nodes)
except ValueError:
node_list_id = len(self._node_lists)
self._node_lists.append(nodes)

return node_list_id

def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables:
"""Get initial variables (and chunks) for this scheduler."""

Expand Down Expand Up @@ -120,16 +152,30 @@ def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables:
.format(min_nodes, max_nodes, len(filtered_nodes),
reasons, pprint.pformat(sched_config)))

try:
node_list_id = self._node_lists.index(filtered_nodes)
except ValueError:
node_list_id = len(self._node_lists)
self._node_lists.append(filtered_nodes)
node_list_id = self._get_node_list_id(filtered_nodes)

chunks = self._get_chunks(node_list_id, sched_config)
chunking = sched_config['chunking']
if self._chunk_sets.has_chunk_set(node_list_id, chunking):
chunks = self._chunk_sets.get_chunk_set(node_list_id, chunking)
else:
chunks = self._chunk_sets.create_chunk_set(node_list_id, chunking,
filtered_nodes)
chunk_id, chunk = chunks.get_group_chunk(chunking['group'])

test_nodes = None
if not sched_config['share_allocation']:
if sched_config['chunking']['size'] not in (0, None):
chunk_min, chunk_max = calc_node_range(sched_config, len(chunk))
test_nodes = list(chunk[0:chunk_max])

sched_vars = self.VAR_CLASS(
sched_config=sched_config,
node_info=self._nodes,
node_list_id=node_list_id,
chunks=chunks.chunks,
chunk_id=chunk_id,
test_nodes=test_nodes)

sched_vars = self.VAR_CLASS(sched_config, nodes=self._nodes, chunks=chunks,
node_list_id=node_list_id)
sched_vars.add_errors(errors)
return sched_vars

Expand All @@ -148,7 +194,7 @@ def get_final_vars(self, test: TestRun) -> SchedulerVariables:

sched_config = validate_config(test.config['schedule'])

return self.VAR_CLASS(sched_config, nodes=nodes, deferred=False)
return self.VAR_CLASS.finalize(sched_config, nodes)

def _get_system_inventory(self, sched_config: dict) -> Union[Nodes, None]:
"""Returns a dictionary of node data, or None if the scheduler does not
Expand Down Expand Up @@ -241,55 +287,6 @@ def _filter_custom(self, sched_config: dict, node_name: str, node: NodeInfo) \

return None

def _get_chunks(self, node_list_id, sched_config) -> List[NodeSet]:
"""Chunking is specific to the node list, chunk size, and node selection
settings of a job. The actual chunk used by a test_run won't be known until
after the test is at least partially resolved, however. Until then, it only
knows what chunks are available.

This method retrieves or creates a list of ChunkInfo objects, and returns
it."""

nodes = list(self._node_lists[node_list_id])

chunk_size = sched_config['chunking']['size']
# Chunk size 0/null is all the nodes.
if chunk_size in (0, None) or chunk_size > len(nodes):
chunk_size = len(nodes)
chunk_extra = sched_config['chunking']['extra']
node_select = sched_config['chunking']['node_selection']

chunk_id = (node_list_id, chunk_size, node_select, chunk_extra)
# If we already have chunks for our node list and settings, just return what
# we've got.
if chunk_id in self._chunks:
return self._chunks[chunk_id]

# We can potentially have no nodes, in which case return an empty chunk.
if chunk_size == 0:
self._chunks[chunk_id] = [NodeSet(frozenset([]))]
return self._chunks[chunk_id]

chunks = []
for i in range(len(nodes)//chunk_size):
# Apply the selection function and get our chunk nodes.
chunk = self.NODE_SELECTION[node_select](nodes, chunk_size)
# Filter out any chosen from our node list.
nodes = [node for node in nodes if node not in chunk]
chunks.append(chunk)

if nodes and chunk_extra == BACKFILL:
backfill = chunks[-1][:chunk_size - len(nodes)]
chunks.append(backfill + nodes)

chunk_info = []
for chunk in chunks:
chunk_info.append(NodeSet(frozenset(chunk)))

self._chunks[chunk_id] = chunk_info

return chunk_info

def schedule_tests(self, pav_cfg, tests: List[TestRun]):
"""Schedule each of the given tests using this scheduler using a
separate allocation (if applicable) for each.
Expand All @@ -301,52 +298,14 @@ def schedule_tests(self, pav_cfg, tests: List[TestRun]):

# type: Dict[FrozenSet[str], List[TestRun]]
by_chunk = collections.defaultdict(lambda: [])
usage = collections.defaultdict(lambda: 0) # type: Dict[FrozenSet[str], int]
sched_configs = {} # type: Dict[str, dict]

for test in tests:
node_list_id = int(test.var_man.get('sched.node_list_id'))

sched_config = validate_config(test.config['schedule'])
sched_configs[test.full_id] = sched_config
chunk_spec = test.config.get('chunk')
if chunk_spec != 'any':
# This is validated in test object creation.
chunk_spec = int(chunk_spec)

chunk_size = sched_config['chunking']['size']
node_select = sched_config['chunking']['node_selection']
chunk_extra = sched_config['chunking']['extra']

node_list = self._node_lists[node_list_id]
if chunk_size in (None, 0) or chunk_size > len(node_list):
chunk_size = len(node_list)

chunks_id = (node_list_id, chunk_size, node_select, chunk_extra)

chunks = self._chunks[chunks_id]

if chunk_spec == 'any':
least_used = None
least_used_chunk = None
for chunk in chunks:
chunk_usage = usage[chunk]
if chunk_usage == 0:
least_used_chunk = chunk
break
elif least_used is None or chunk_usage < least_used:
least_used = chunk_usage
least_used_chunk = chunk

usage[least_used_chunk] += 1
by_chunk[least_used_chunk].append(test)
else:
if chunk_spec > len(chunks):
raise SchedulerPluginError(
"Test selected chunk '{}', but there are only {} chunks "
"available.".format(chunk_spec, len(chunks)))
chunk = chunks[chunk_spec]
by_chunk[chunk].append(test)

chunk = test.var_man.get('sched.chunk')
by_chunk[chunk].append(test)

for chunk, tests in by_chunk.items():
self._schedule_chunk(pav_cfg, chunk, tests, sched_configs)
Expand All @@ -361,9 +320,11 @@ def _schedule_chunk(self, pav_cfg, chunk: NodeSet, tests: List[TestRun],
sched_configs: Dict[str, dict]):

# There are three types of test launches.
# 1. Tests that can share an allocation (
# 1. Tests that can share an allocation
share_groups = collections.defaultdict(list)
# 2. Tests that want the scheduler to figure out their exact nodes.
flex_tests: List[TestRun] = []
# 3. Tests that run by themselves.
indi_tests: List[TestRun] = []

for test in tests:
Expand Down Expand Up @@ -470,6 +431,7 @@ def _schedule_flex_chunk(self, pav_cfg, tests: List[TestRun],
chunking, leaving the node picking to the scheduler."""

for test in tests:
print(chunk)
node_info = {node: self._nodes[node] for node in chunk}

try:
Expand Down
23 changes: 19 additions & 4 deletions lib/pavilion/schedulers/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pavilion.jobs import Job, JobError
from pavilion.status_file import STATES
from pavilion.test_run import TestRun
from pavilion.types import NodeInfo, Nodes
from pavilion.types import NodeInfo, Nodes, NodeList
from .config import validate_config, calc_node_range
from .scheduler import SchedulerPlugin, SchedulerPluginError
from .vars import SchedulerVariables
Expand All @@ -18,9 +18,16 @@ class SchedulerPluginBasic(SchedulerPlugin, ABC):
on manually set parameters in 'schedule.cluster_info'."""

def _get_initial_vars(self, sched_config: dict) -> SchedulerVariables:
"""Get the initial variables for the basic scheduler."""
"""Get the initial variables for the basic scheduler.
"""

return self.VAR_CLASS(sched_config)
return self.VAR_CLASS(
sched_config=sched_config,
node_info=Nodes({}),
chunks=[NodeList([])],
chunk_id=0,
node_list_id=0,
)

def get_final_vars(self, test: TestRun) -> SchedulerVariables:
"""Gather node information from within the allocation."""
Expand All @@ -37,7 +44,15 @@ def get_final_vars(self, test: TestRun) -> SchedulerVariables:
for node in alloc_nodes:
nodes[node] = self._get_alloc_node_info(node)

return self.VAR_CLASS(sched_config, nodes=nodes, deferred=False)
return self.VAR_CLASS(
sched_config=sched_config,
node_info=nodes,
chunks=[NodeList(list(nodes.keys()))],
chunk_id=0,
node_list_id=0,
test_nodes=NodeList(list(nodes.keys())),
deferred=False,
)

def _get_alloc_node_info(self, node_name) -> NodeInfo:
"""Given that this is running on an allocation, get information about
Expand Down
Loading