Skip to content

Commit

Permalink
Merge pull request #1523 from fractal-analytics-platform/1520-placeho…
Browse files Browse the repository at this point in the history
…lder-re-organize-job-folders-with-per-task-subfolders

Re-organize job folders with per-task subfolders
  • Loading branch information
tcompa authored May 28, 2024
2 parents a9fcd59 + a2e9827 commit c627f96
Show file tree
Hide file tree
Showing 25 changed files with 586 additions and 375 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
* API:
* Remove catch of `IntegrityError` in `POST /api/v1/project` (\#1530).
* Dependencies
* Add `fabric` to `dev` dependencies (\#1518).
* Runner:
* Change structure of job folders, introducing per-task subfolders (\#1523).
* Deployment
* Add custom gunicorn/uvicorn worker to handle SIGABRT signal (\#1526).
* Dependencies
* Add `fabric` to `dev` dependencies (\#1518).
* Testing:
* Install and run SSH daemon in CI containers (\#1518).
* Add unit test of SSH connection via fabric/paramiko (\#1518).
Expand Down
148 changes: 114 additions & 34 deletions fractal_server/app/runner/executors/slurm/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class SlurmJob:
submitted for execution on the `FractalSlurmExecutor`; this is
needed in the `_copy_files_from_user_to_server` method, and also to
construct the names of per-task input/output pickle files.
wftask_subfolder_name:
Name of the per-task subfolder (e.g. `7_task_name`).
slurm_script:
Path of SLURM submission script.
slurm_stdout:
Expand Down Expand Up @@ -145,6 +147,7 @@ class SlurmJob:
# Per-task attributes
workerids: tuple[str, ...]
wftask_file_prefixes: tuple[str, ...]
wftask_subfolder_name: str
input_pickle_files: tuple[Path, ...]
output_pickle_files: tuple[Path, ...]
# Slurm configuration
Expand Down Expand Up @@ -214,6 +217,7 @@ class FractalSlurmExecutor(SlurmExecutor):
map_jobid_to_slurm_files: dict[str, tuple[str, str, str]]
keep_pickle_files: bool
slurm_account: Optional[str]
jobs: dict[str, tuple[Future, SlurmJob]]

def __init__(
self,
Expand Down Expand Up @@ -294,32 +298,60 @@ def _cleanup(self, jobid: str) -> None:
self.map_jobid_to_slurm_files.pop(jobid)

def get_input_pickle_file_path(
self, arg: str, prefix: Optional[str] = None
self, *, arg: str, subfolder_name: str, prefix: Optional[str] = None
) -> Path:

prefix = prefix or "cfut"
return self.working_dir / f"{prefix}_in_{arg}.pickle"
output = (
self.working_dir / subfolder_name / f"{prefix}_in_{arg}.pickle"
)
return output

def get_output_pickle_file_path(
self, arg: str, prefix: Optional[str] = None
self, *, arg: str, subfolder_name: str, prefix: Optional[str] = None
) -> Path:
prefix = prefix or "cfut"
return self.working_dir_user / f"{prefix}_out_{arg}.pickle"
return (
self.working_dir_user
/ subfolder_name
/ f"{prefix}_out_{arg}.pickle"
)

def get_slurm_script_file_path(self, prefix: Optional[str] = None) -> Path:
def get_slurm_script_file_path(
self, *, subfolder_name: str, prefix: Optional[str] = None
) -> Path:
prefix = prefix or "_temp"
return self.working_dir / f"{prefix}_slurm_submit.sbatch"
return (
self.working_dir / subfolder_name / f"{prefix}_slurm_submit.sbatch"
)

def get_slurm_stdout_file_path(
self, arg: str = "%j", prefix: Optional[str] = None
self,
*,
subfolder_name: str,
arg: str = "%j",
prefix: Optional[str] = None,
) -> Path:
prefix = prefix or "slurmpy.stdout"
return self.working_dir_user / f"{prefix}_slurm_{arg}.out"
return (
self.working_dir_user
/ subfolder_name
/ f"{prefix}_slurm_{arg}.out"
)

def get_slurm_stderr_file_path(
self, arg: str = "%j", prefix: Optional[str] = None
self,
*,
subfolder_name: str,
arg: str = "%j",
prefix: Optional[str] = None,
) -> Path:
prefix = prefix or "slurmpy.stderr"
return self.working_dir_user / f"{prefix}_slurm_{arg}.err"
return (
self.working_dir_user
/ subfolder_name
/ f"{prefix}_slurm_{arg}.err"
)

def submit(
self,
Expand Down Expand Up @@ -599,6 +631,8 @@ def _submit_job(
)
job.single_task_submission = True
job.wftask_file_prefixes = (task_files.file_prefix,)
job.wftask_subfolder_name = task_files.subfolder_name

else:
if not components or len(components) < 1:
raise ValueError(
Expand All @@ -613,47 +647,77 @@ def _submit_job(
)

_prefixes = []
_subfolder_names = []
for component in components:
if isinstance(component, dict):
# This is needed for V2
actual_component = component.get(_COMPONENT_KEY_, None)
else:
actual_component = component
_prefixes.append(
get_task_file_paths(
workflow_dir=task_files.workflow_dir,
workflow_dir_user=task_files.workflow_dir_user,
task_order=task_files.task_order,
component=actual_component,
).file_prefix
_task_file_paths = get_task_file_paths(
workflow_dir=task_files.workflow_dir,
workflow_dir_user=task_files.workflow_dir_user,
task_name=task_files.task_name,
task_order=task_files.task_order,
component=actual_component,
)
_prefixes.append(_task_file_paths.file_prefix)
_subfolder_names.append(_task_file_paths.subfolder_name)
job.wftask_file_prefixes = tuple(_prefixes)

num_subfolders = len(set(_subfolder_names))
if num_subfolders != 1:
error_msg_short = (
f"[_submit_job] Subfolder list has {num_subfolders} "
"different values, but it must have only one (since "
"workflow tasks are executed one by one)."
)
error_msg_detail = (
"[_submit_job] Current unique subfolder names: "
f"{set(_subfolder_names)}"
)
logger.error(error_msg_short)
logger.error(error_msg_detail)
raise ValueError(error_msg_short)
job.wftask_subfolder_name = _subfolder_names[0]

# Check that server-side subfolder exists
subfolder_path = self.working_dir / job.wftask_subfolder_name
if not subfolder_path.exists():
raise FileNotFoundError(
f"Missing folder {subfolder_path.as_posix()}."
)

# Define I/O pickle file names/paths
job.input_pickle_files = tuple(
self.get_input_pickle_file_path(
job.workerids[ind],
arg=job.workerids[ind],
subfolder_name=job.wftask_subfolder_name,
prefix=job.wftask_file_prefixes[ind],
)
for ind in range(job.num_tasks_tot)
)
job.output_pickle_files = tuple(
self.get_output_pickle_file_path(
job.workerids[ind],
arg=job.workerids[ind],
subfolder_name=job.wftask_subfolder_name,
prefix=job.wftask_file_prefixes[ind],
)
for ind in range(job.num_tasks_tot)
)

# Define SLURM-job file names/paths
job.slurm_script = self.get_slurm_script_file_path(
prefix=job.slurm_file_prefix
subfolder_name=job.wftask_subfolder_name,
prefix=job.slurm_file_prefix,
)
job.slurm_stdout = self.get_slurm_stdout_file_path(
prefix=job.slurm_file_prefix
subfolder_name=job.wftask_subfolder_name,
prefix=job.slurm_file_prefix,
)
job.slurm_stderr = self.get_slurm_stderr_file_path(
prefix=job.slurm_file_prefix
subfolder_name=job.wftask_subfolder_name,
prefix=job.slurm_file_prefix,
)

# Dump serialized versions+function+args+kwargs to pickle file
Expand Down Expand Up @@ -773,10 +837,14 @@ def _completion(self, jobid: str) -> None:
slurm_stderr_file,
) = self.map_jobid_to_slurm_files[jobid]
new_slurm_stdout_file = str(
self.working_dir / Path(slurm_stdout_file).name
self.working_dir
/ job.wftask_subfolder_name
/ Path(slurm_stdout_file).name
)
new_slurm_stderr_file = str(
self.working_dir / Path(slurm_stderr_file).name
self.working_dir
/ job.wftask_subfolder_name
/ Path(slurm_stderr_file).name
)
with self.jobs_lock:
self.map_jobid_to_slurm_files[jobid] = (
Expand All @@ -787,7 +855,8 @@ def _completion(self, jobid: str) -> None:

in_paths = job.input_pickle_files
out_paths = tuple(
self.working_dir / f.name for f in job.output_pickle_files
(self.working_dir / job.wftask_subfolder_name / f.name)
for f in job.output_pickle_files
)

outputs = []
Expand Down Expand Up @@ -932,14 +1001,24 @@ def _copy_files_from_user_to_server(
Raises:
JobExecutionError: If a `cat` command fails.
"""
logger.debug("Enter _copy_files_from_user_to_server")
logger.debug("[_copy_files_from_user_to_server] Start")

if self.working_dir_user == self.working_dir:
logger.debug(
"[_copy_files_from_user_to_server] "
"working_dir corresponds to working_dir_user, return."
)
return

subfolder_name = job.wftask_subfolder_name
prefixes = set(
[job.slurm_file_prefix] + list(job.wftask_file_prefixes)
)

logger.debug(
"[_copy_files_from_user_to_server] "
f"WorkflowTask subfolder_name: {subfolder_name}"
)
logger.debug(f"[_copy_files_from_user_to_server] {prefixes=}")
logger.debug(
f"[_copy_files_from_user_to_server] {str(self.working_dir_user)=}"
Expand All @@ -949,13 +1028,13 @@ def _copy_files_from_user_to_server(

if prefix == job.slurm_file_prefix:
files_to_copy = _glob_as_user(
folder=str(self.working_dir_user),
folder=str(self.working_dir_user / subfolder_name),
user=self.slurm_user,
startswith=prefix,
)
else:
files_to_copy = _glob_as_user_strict(
folder=str(self.working_dir_user),
folder=str(self.working_dir_user / subfolder_name),
user=self.slurm_user,
startswith=prefix,
)
Expand All @@ -972,7 +1051,7 @@ def _copy_files_from_user_to_server(
"contains whitespaces"
)
source_file_path = str(
self.working_dir_user / source_file_name
self.working_dir_user / subfolder_name / source_file_name
)

# Read source_file_path (requires sudo)
Expand All @@ -991,7 +1070,9 @@ def _copy_files_from_user_to_server(
logger.error(info)
raise JobExecutionError(info)
# Write to dest_file_path (including empty files)
dest_file_path = str(self.working_dir / source_file_name)
dest_file_path = str(
self.working_dir / subfolder_name / source_file_name
)
with open(dest_file_path, "wb") as f:
f.write(res.stdout)
logger.debug("[_copy_files_from_user_to_server] End")
Expand Down Expand Up @@ -1131,12 +1212,11 @@ def get_default_task_files(self) -> TaskFiles:
This will be called when self.submit or self.map are called from
outside fractal-server, and then lack some optional arguments.
"""
import random

task_files = TaskFiles(
workflow_dir=self.working_dir,
workflow_dir_user=self.working_dir_user,
task_order=random.randint(10000, 99999), # nosec
task_order=None,
task_name="name",
)
return task_files

Expand All @@ -1154,7 +1234,7 @@ def shutdown(self, wait=True, *, cancel_futures=False):
while self.jobs:
jobid, fut_and_job = self.jobs.popitem()
slurm_jobs_to_scancel.append(jobid)
fut, job = fut_and_job[:]
fut = fut_and_job[0]
self.map_jobid_to_slurm_files.pop(jobid)
if not fut.cancelled():
fut.set_exception(
Expand Down
Loading

0 comments on commit c627f96

Please sign in to comment.