Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: improved GPU job support #173

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
JobExecutorInterface,
)
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task
from snakemake_executor_plugin_slurm_jobstep import get_cpu_setting

from .utils import delete_slurm_environment, delete_empty_dirs
from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string


@dataclass
Expand Down Expand Up @@ -219,6 +219,8 @@ def run_job(self, job: JobExecutorInterface):
if self.workflow.executor_settings.requeue:
call += " --requeue"

call += set_gres_string(job)

if job.resources.get("clusters"):
call += f" --clusters {job.resources.clusters}"

Expand Down Expand Up @@ -249,7 +251,11 @@ def run_job(self, job: JobExecutorInterface):

# fixes #40 - set ntasks regardless of mpi, because
# SLURM v22.05 will require it for all jobs
call += f" --ntasks={job.resources.get('tasks', 1)}"
gpu_job = job.resources.get("gpus") or "gpu" in job.resources.get("gres", "")
if gpu_job:
call += f" --ntasks-per-gpu={job.resources.get('tasks', 1)}"
else:
call += f" --ntasks={job.resources.get('tasks', 1)}"
# MPI job
if job.resources.get("mpi", False):
if not job.resources.get("tasks_per_node") and not job.resources.get(
Expand All @@ -260,8 +266,9 @@ def run_job(self, job: JobExecutorInterface):
"specified. Assuming 'tasks_per_node=1'."
"Probably not what you want."
)

call += f" --cpus-per-task={get_cpus_per_task(job)}"
# we need to set cpus-per-task OR cpus-per-gpu, the function
# will return a string with the corresponding value
call += f" {get_cpu_setting(job, gpu_job)}"

if job.resources.get("slurm_extra"):
self.check_slurm_extra(job)
Expand Down
67 changes: 67 additions & 0 deletions snakemake_executor_plugin_slurm/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# utility functions for the SLURM executor plugin

import os
import re
from pathlib import Path

from snakemake_interface_executor_plugins.jobs import (
JobExecutorInterface,
)

cmeesters marked this conversation as resolved.
Show resolved Hide resolved

def delete_slurm_environment():
"""
Expand Down Expand Up @@ -40,3 +45,65 @@ def delete_empty_dirs(path: Path) -> None:
except (OSError, FileNotFoundError) as e:
# Provide more context in the error message
raise OSError(f"Failed to remove empty directory {path}: {e}") from e


def set_gres_string(job: JobExecutorInterface) -> str:
"""
Function to set the gres string for the SLURM job
based on the resources requested in the job.
"""
# generic resources (GRES) arguments can be of type
# "string:int" or "string:string:int"
gres_re = re.compile(r"^[a-zA-Z0-9_]+(:[a-zA-Z0-9_]+)?:\d+$")
# gpu model arguments can be of type "string"
gpu_model_re = re.compile(r"^[a-zA-Z0-9_]+$")
# gpus can be of type "int" or "string:int"
gpus_re = re.compile(r"^\d+$|^[a-zA-Z0-9_]+:\d+$")

if job.resources.get("gres"):
# Validate GRES format (e.g., "gpu:1", "gpu:tesla:2")
gres = job.resources.gres
if not gres_re.match(gres):
raise WorkflowError(
f"Invalid GRES format: {gres}. Expected format: "
"'<name>:<number>' or '<name>:<type>:<number>' "
"(e.g., 'gpu:1' or 'gpu:tesla:2')"
)
gres_string = f" --gres={job.resources.gres}"
if job.resources.get("gpu"):
# ensure that gres is not set, if gpu and gpu_model are set
if job.resources.get("gres"):
raise WorkflowError("GRES and GPU are set. Please only set one of them.")
# ensure that 'gpu' is an integer
if not isinstance(job.resources.gpu, int):
raise WorkflowError(
"The 'gpu' resource must be an integer. "
f"Got: {job.resources.gpu} ({type(job.resources.gpu)})."
)
gres_string = f" --gpus={job.resources.gpu}"
if job.resources.get("gpu"):
# ensure that gres is not set, if gpu and gpu_model are set
if job.resources.get("gres"):
raise WorkflowError("GRES and GPU are set. Please only set one" " of them.")
# validate GPU format
if not gpus_re.match(str(job.resources.gpu)):
raise WorkflowError(
f"Invalid GPU format: {job.resources.gpu}. "
"Expected format: '<number>' or '<name>:<number>' "
"(e.g., '1' or 'tesla:2')"
)
gres_string = f" --gpus={job.resources.gpu}"
elif job.resources.get("gpu_model") and job.resources.get("gpu"):
# validate GPU model format
if not gpu_model_re.match(job.resources.gpu_model):
raise WorkflowError(
f"Invalid GPU model format: {job.resources.gpu_model}. "
"Expected format: '<name>' "
"(e.g., 'tesla')"
)
gres_string = f" --gpus:{job.resources.gpu_model}:{job.resources.gpu}"
elif job.resources.get("gpu_model") and not job.resources.get("gpu"):
raise WorkflowError(
"GPU model is set, but no GPU number is given. " "Please set 'gpu' as well."
)
return gres_string if job.resources.get("gres") or job.resources.get("gpu") else ""
cmeesters marked this conversation as resolved.
Show resolved Hide resolved
Loading