diff --git a/bfabric_app_runner/docs/_design_notes/_slurm.md b/bfabric_app_runner/docs/_design_notes/_slurm.md new file mode 100644 index 00000000..968959d5 --- /dev/null +++ b/bfabric_app_runner/docs/_design_notes/_slurm.md @@ -0,0 +1,45 @@ +This document is a rough draft. + +## Job Configuration Levels + +One idea of the new slurm submitter is that it should become possible to merge configurations from several (but restricted) +sources. The main ones that come into account are the following ones + +| Level | Description | +| ------------- | ------------------------------------------------------------------------------------------------- | +| Global config | The global configuration file for the submitter provides reasonable defaults to use for all jobs. | +| App config | The app configuration file provides default configuration that is specific to the app. | +| | It is also a static configuration. | +| WU config | The workunit configuration is specific to the workunit and can be specified by the user. | +| Dynamic/auto | The dynamic configuration is determined by some custom logic (to be defined). | + +### Merging operation + +To keep things sane, all configs are specified as they would be config arguments of a `sbatch` command. +For each level, previous values will be replaced by this level's values. +`None` is a special value that means that the value should not be set and the argument is to be omitted, when +performing the `sbatch` command. +This allows to have a default value in the global config, and remove that parameter completely in a more specific +configuration. + +This imposes the following limitations currently: + +- No argument without value can be set: as far as I can tell all relevant slurm arguments come with a value always +- No argument can be specified multiple times: one where this might have been potentially necessary is the `--depend` + flag, but even that supports passing a comma-separated list. + +If either of these occurs in the future: + +- The first problem could be solved by introducing a special syntax or value. +- The second problem is a bit more difficult to handle, but at least we could provide a value for some extra arguments + which should be passed as-is, i.e. without any merging involved (or merge by concatenation). + +Since neither does seem necessary right now I would propose to pick the simpler solution for now. + +### Chaining jobs configuration + +One case which is not handled at this level, because it would introduce a new level of complexity is chaining jobs. +The paradigm there would be that the first job gets queued with the conventional mechanism, and then it is able to chain +further jobs and submit these itself by its own logic. + +One thing which might be worth exploring is how it can be made so, that the regular logic can be reused there. diff --git a/bfabric_app_runner/pyproject.toml b/bfabric_app_runner/pyproject.toml index 6be1b2a6..a3156912 100644 --- a/bfabric_app_runner/pyproject.toml +++ b/bfabric_app_runner/pyproject.toml @@ -23,11 +23,15 @@ dependencies = [ include = [ "src/bfabric_app_runner/**/*.py", "src/bfabric_app_runner/py.typed", - "src/bfabric_app_runner/resources/*.mk" + "src/bfabric_app_runner/resources/*.mk", + "src/bfabric_app_runner/**/*.mako", ] [project.scripts] "bfabric-app-runner"="bfabric_app_runner.cli.__main__:app" +#TODO +"bfabric-app-runner-wrapper-creator"="bfabric_app_runner.bfabric_app.wrapper_creator:app" +"bfabric-app-runner-submitter"="bfabric_app_runner.bfabric_app.submitter:app" [project.optional-dependencies] doc = [ diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/__init__.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/__init__.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/__init__.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config.py new file mode 100644 index 00000000..a5848a88 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from functools import cached_property + +from bfabric_app_runner.specs.app.app_version import AppVersion # noqa: TC001 +from bfabric_app_runner.specs.submitters_spec import SubmitterSlurmSpec # noqa: TC001 +from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_workunit_params import ( + SlurmWorkunitParams, # noqa: TC001 +) # noqa: TC001 +from pydantic import BaseModel + + +class _SlurmConfigBase(BaseModel): + submitter_config: SubmitterSlurmSpec + app_version: AppVersion + workunit_config: SlurmWorkunitParams + + +class SlurmConfig(_SlurmConfigBase): + @cached_property + def sbatch_params(self) -> dict[str, str]: + # TODO consistent naming (params vs config) + merged = {**self.submitter_config.params, **self.app_version.submitter.params, **self.workunit_config.as_dict()} + return {key: value for key, value in merged.items() if value is not None} + + def get_scratch_dir(self) -> str: + # TODO check if cast is necessary later + return str(self.submitter_config.config.worker_scratch_dir) diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config_template.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config_template.py new file mode 100644 index 00000000..80483c8f --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_config_template.py @@ -0,0 +1,9 @@ +from bfabric_app_runner.specs.config_interpolation import VariablesApp, interpolate_config_strings, VariablesWorkunit +from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_config import _SlurmConfigBase, SlurmConfig + + +class SlurmConfigTemplate(_SlurmConfigBase): + def evaluate(self, app: VariablesApp, workunit: VariablesWorkunit) -> SlurmConfig: + data_template = self.model_dump(mode="json") + data = interpolate_config_strings(data=data_template, variables={"app": app, "workunit": workunit}) + return SlurmConfig.model_validate(data) diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_workunit_params.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_workunit_params.py new file mode 100644 index 00000000..392b9171 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/config/slurm_workunit_params.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from enum import Enum +from typing import Annotated + +from pydantic import BaseModel, field_validator, Field, AliasChoices + + +class SlurmWorkunitSpecialStrings(Enum): + default = "[default]" + auto = "[auto]" + + @classmethod + def parse(cls, string: str) -> SlurmWorkunitSpecialStrings | None: + if string == cls.default.value: + return cls.default + elif string == cls.auto.value: + return cls.auto + else: + return None + + +class SlurmWorkunitParams(BaseModel): + partition: Annotated[ + SlurmWorkunitSpecialStrings | str, Field(validation_alias=AliasChoices("partition", "--partition")) + ] = SlurmWorkunitSpecialStrings.default + nodelist: Annotated[ + SlurmWorkunitSpecialStrings | str, Field(validation_alias=AliasChoices("nodelist", "--nodelist")) + ] = SlurmWorkunitSpecialStrings.default + mem: Annotated[SlurmWorkunitSpecialStrings | str, Field(validation_alias=AliasChoices("mem", "--mem"))] = ( + SlurmWorkunitSpecialStrings.default + ) + time: Annotated[SlurmWorkunitSpecialStrings | str, Field(validation_alias=AliasChoices("time", "--time"))] = ( + SlurmWorkunitSpecialStrings.default + ) + + @classmethod + def _parse_string(cls, value: str | SlurmWorkunitSpecialStrings) -> SlurmWorkunitSpecialStrings | str: + if isinstance(value, SlurmWorkunitSpecialStrings): + return value + parsed = SlurmWorkunitSpecialStrings.parse(value) + if parsed is not None: + return parsed + return value + + @field_validator("partition", "nodelist", "mem", "time", mode="before") + def validate_special_string(cls, value: SlurmWorkunitSpecialStrings | str) -> SlurmWorkunitSpecialStrings | str: + return cls._parse_string(value) + + def _get_field(self, field_name: str) -> str | None: + """Returns the value to be used for the field, or None if the default should be used (when merging configs).""" + value = getattr(self, field_name) + if isinstance(value, str): + return value + if value == SlurmWorkunitSpecialStrings.default: + return None + else: + raise NotImplementedError(f"Currently unsupported value for {field_name}: {value}") + + def as_dict(self) -> dict[str, str | None]: + """Returns a dictionary with the values that should be passed to sbatch.""" + field_mapping = { + "partition": "--partition", + "nodelist": "--nodelist", + "mem": "--mem", + "time": "--time", + } + + values = {} + for field_name, flag in field_mapping.items(): + value = self._get_field(field_name) + if value is not None: + values[flag] = value + + return values diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.bash.mako b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.bash.mako new file mode 100644 index 00000000..67f40546 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.bash.mako @@ -0,0 +1,32 @@ +#!/bin/bash +% for key, value in sbatch_params.items(): +#SBATCH ${key}=${value} +% endfor +set -euxo pipefail +hostname +id +mkdir -p "${working_directory}" +cd "${working_directory}" + +set +x +tee app_version.yml <&1 | ts '[%Y-%m-%d %H:%M:%S]' + +exit 0 diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.py new file mode 100644 index 00000000..682e445a --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_job_template.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any +import shlex +import mako.template +import yaml +from loguru import logger +from pydantic import BaseModel +from bfabric_app_runner.specs.app.app_version import AppVersion # noqa: TC001 +from bfabric.experimental.workunit_definition import WorkunitDefinition # noqa: TC002 + +if TYPE_CHECKING: + import io + + +class Params(BaseModel): + app_version: AppVersion + workunit_definition: WorkunitDefinition + working_directory: Path + logging_resource_id: int | None + command: list[str] + sbatch_params: dict[str, str] + + +class SlurmJobTemplate: + def __init__(self, params: Params, path: Path) -> None: + self._params = params + self._path = path + + @classmethod + def for_params(cls, **params: Any) -> SlurmJobTemplate: + path = Path(__file__).parent / "slurm_job_template.bash.mako" + return cls(params=Params.model_validate(params), path=path) + + def render(self, target_file: io.TextIOBase) -> None: + params = { + "app_version_yml": yaml.safe_dump(self._params.app_version.model_dump(mode="json")), + "workunit_definition_yml": yaml.safe_dump(self._params.workunit_definition.model_dump(mode="json")), + "workunit_id": self._params.workunit_definition.registration.workunit_id, + "working_directory": str(self._params.working_directory), + "logging_resource_id": self._params.logging_resource_id, + "command": shlex.join(self._params.command), + "sbatch_params": self._params.sbatch_params, + } + logger.debug("Rendering {} with params: {}", self._path, params) + template = mako.template.Template(filename=str(self._path)) + target_file.write(template.render(**params)) diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_submitter.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_submitter.py new file mode 100644 index 00000000..d30f6861 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/slurm_submitter/slurm_submitter.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import os +import shlex +import subprocess +from typing import TYPE_CHECKING + +from loguru import logger + +from bfabric_app_runner.bfabric_app.slurm_submitter.slurm_job_template import SlurmJobTemplate +from bfabric_app_runner.specs.config_interpolation import VariablesApp, VariablesWorkunit + +if TYPE_CHECKING: + from bfabric import Bfabric + from pathlib import Path + from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_config import SlurmConfig + from bfabric_app_runner.bfabric_app.submitter import WorkunitWrapperData + from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_config_template import SlurmConfigTemplate + + +class SlurmSubmitter: + def __init__(self, config_template: SlurmConfigTemplate) -> None: + self._config_template = config_template + + def submit(self, workunit_wrapper_data: WorkunitWrapperData, client: Bfabric) -> None: + slurm_config = self.evaluate_config(workunit_wrapper_data) + + # Determine the script path + workunit_id = workunit_wrapper_data.workunit_definition.registration.workunit_id + script_path = slurm_config.submitter_config.config.local_script_dir / f"workunitid-{workunit_id}_run.bash" + + # Generate the script + log_resource_id = self._create_log_resource(config=slurm_config, workunit_id=workunit_id, client=client) + self.generate_script( + target_path=script_path, + slurm_config=slurm_config, + wrapper_data=workunit_wrapper_data, + log_resource_id=log_resource_id, + ) + + # Submit the script + sbatch_bin = slurm_config.submitter_config.config.slurm_root / "bin" / "sbatch" + env = os.environ | {"SLURMROOT": slurm_config.submitter_config.config.slurm_root} + logger.info("Script written to {}", script_path) + cmd = [str(sbatch_bin), str(script_path)] + logger.info("Running {}", shlex.join(cmd)) + subprocess.run(cmd, env=env, check=True) + + def evaluate_config(self, workunit_wrapper_data: WorkunitWrapperData) -> SlurmConfig: + app = VariablesApp( + id=workunit_wrapper_data.workunit_definition.registration.application_id, + name=workunit_wrapper_data.workunit_definition.registration.application_name, + version=workunit_wrapper_data.app_version.version, + ) + workunit = VariablesWorkunit(id=workunit_wrapper_data.workunit_definition.registration.workunit_id) + return self._config_template.evaluate(app=app, workunit=workunit) + + def _create_log_resource(self, config: SlurmConfig, workunit_id: int, client: Bfabric) -> int | None: + """Creates the log resource and returns its id, or if no log storage ID is provided, returns None.""" + if config.submitter_config.config.log_storage_id is None: + logger.info("No log storage ID provided, skipping log resource creation") + return None + + return client.save( + "resource", + { + "name": config.submitter_config.config.log_storage_resource_name, + "relativepath": config.submitter_config.config.log_storage_filename, + "storageid": config.submitter_config.config.log_storage_id, + "workunitid": workunit_id, + "status": "pending", + }, + )[0]["id"] + + def generate_script( + self, + target_path: Path, + slurm_config: SlurmConfig, + wrapper_data: WorkunitWrapperData, + log_resource_id: int | None, + ) -> None: + main_command = self._get_main_command( + slurm_config=slurm_config, app_runner_version=wrapper_data.app_runner_version + ) + slurm_job_template = SlurmJobTemplate.for_params( + app_version=wrapper_data.app_version, + workunit_definition=wrapper_data.workunit_definition, + working_directory=slurm_config.submitter_config.config.worker_scratch_dir, + logging_resource_id=log_resource_id, + command=main_command, + sbatch_params=slurm_config.sbatch_params, + ) + with target_path.open("w") as target_file: + slurm_job_template.render(target_file=target_file) + + def _get_force_storage_flag(self, slurm_config: SlurmConfig) -> list[str]: + return ( + [] + if not slurm_config.submitter_config.config.force_storage + else ["--force-storage", str(slurm_config.submitter_config.config.force_storage)] + ) + + def _get_main_command(self, slurm_config: SlurmConfig, app_runner_version: str) -> list[str]: + spec = f"@{app_runner_version}" if "git" in app_runner_version else f"=={app_runner_version}" + force_storage_flags = self._get_force_storage_flag(slurm_config) + return [ + "uv", + "run", + "-p", + "3.13", + "--with", + f"bfabric_app_runner{spec}", + "bfabric-app-runner", + "app", + "run", + "--app-spec", + "app_version.yml", + "--workunit-ref", + "workunit_definition.yml", + *force_storage_flags, + "--work-dir", + ".", + ] diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/submitter.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/submitter.py new file mode 100644 index 00000000..b05e652c --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/submitter.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import argparse +from functools import cached_property +from pathlib import Path +from typing import TYPE_CHECKING + +import yaml +from bfabric_app_runner.app_runner.resolve_app import resolve_app +from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_config_template import SlurmConfigTemplate +from bfabric_app_runner.bfabric_app.slurm_submitter.config.slurm_workunit_params import SlurmWorkunitParams +from bfabric_app_runner.bfabric_app.slurm_submitter.slurm_submitter import SlurmSubmitter +from bfabric_app_runner.specs.app.app_spec import AppSpecTemplate +from bfabric_app_runner.specs.app.app_version import AppVersion # noqa: TC001 +from bfabric_app_runner.specs.config_interpolation import Variables, VariablesApp, VariablesWorkunit +from bfabric_app_runner.specs.submitters_spec import SubmittersSpecTemplate, SubmitterSlurmSpec +from loguru import logger +from pydantic import BaseModel + +from bfabric.entities import ExternalJob, Workunit, Executable +from bfabric.experimental.workunit_definition import WorkunitDefinition +from bfabric.utils.cli_integration import use_client + +if TYPE_CHECKING: + from bfabric import Bfabric + from bfabric_app_runner.specs.submitter_ref import SubmitterRef + + +class Submitter: + def __init__( + self, client: Bfabric, external_job: ExternalJob, submitters_spec_template: SubmittersSpecTemplate + ) -> None: + self._client = client + self._external_job = external_job + self._submitters_spec_template = submitters_spec_template + + @cached_property + def _workunit_id(self) -> int: + # TODO this should be lazy and only return the id + # (do we add workunit_id or implement something like a lazy getter) + return self._external_job.workunit.id + + def get_workunit_wrapper_data(self) -> WorkunitWrapperData: + return get_data(workunit=self._external_job.workunit) + + def get_submitter_spec(self, workunit_wrapper_data: WorkunitWrapperData) -> SubmitterSlurmSpec: + """Retrieves the submitter spec for the workunit.""" + # Get information on the submitter + submitter_ref: SubmitterRef = workunit_wrapper_data.app_version.submitter + submitter_name = submitter_ref.name + + # Construct the interpolation variables + variables = Variables( + app=VariablesApp( + id=workunit_wrapper_data.workunit_definition.registration.application_id, + name=workunit_wrapper_data.workunit_definition.registration.application_name, + version=workunit_wrapper_data.app_version.version, + ), + workunit=VariablesWorkunit(id=workunit_wrapper_data.workunit_definition.registration.workunit_id), + ) + + # Evaluate the submitters spec + submitters_spec = self._submitters_spec_template.evaluate(variables=variables) + submitter_spec = submitters_spec.submitters.get(submitter_name) + + if submitter_spec is None: + msg = f"Submitter '{submitter_name}' not found in submitters spec." + raise ValueError(msg) + if not isinstance(submitter_spec, SubmitterSlurmSpec): + msg = f"Submitter '{submitter_name}' is not a Slurm submitter." + raise ValueError(msg) + + return submitter_spec + + def run(self) -> None: + workunit_wrapper_data = self.get_workunit_wrapper_data() + workunit_config = SlurmWorkunitParams.model_validate( + workunit_wrapper_data.workunit_definition.execution.raw_parameters + ) + submitter_spec = self.get_submitter_spec(workunit_wrapper_data=workunit_wrapper_data) + slurm_config_template = SlurmConfigTemplate( + submitter_config=submitter_spec, + app_version=workunit_wrapper_data.app_version, + workunit_config=workunit_config, + ) + submitter = SlurmSubmitter(slurm_config_template) + submitter.submit(workunit_wrapper_data=workunit_wrapper_data, client=self._client) + + +class WorkunitWrapperData(BaseModel): + workunit_definition: WorkunitDefinition + app_version: AppVersion + app_runner_version: str + + +def get_application_yaml_path(executable: Executable) -> Path: + """Returns the path to the application YAML file.""" + # TODO for now this is hardcoded, but a better solution might be considered later + program_str = executable["program"] + if "fgcz_slurm_app_runner_compat.bash" in program_str: + return Path(program_str.split()[-1]) + else: + return Path(program_str) + + +def get_data(workunit: Workunit) -> WorkunitWrapperData: + """Returns the data to be written to WORKUNIT context executable.""" + # TODO this can certainly be cleaned up further, it's an interim refactoring state + workunit_definition = WorkunitDefinition.from_workunit(workunit=workunit) + path = get_application_yaml_path(workunit.application.executable) + logger.info("Reading app spec from: {}", path) + app_spec_template = AppSpecTemplate.model_validate(yaml.safe_load(path.read_text())) + app_spec = app_spec_template.evaluate( + app_id=workunit_definition.registration.application_id, + app_name=workunit_definition.registration.application_name, + ) + app_version = resolve_app(versions=app_spec, workunit_definition=workunit_definition) + app_runner_version = app_spec.bfabric.app_runner + return WorkunitWrapperData( + workunit_definition=workunit_definition, app_version=app_version, app_runner_version=app_runner_version + ) + + +@use_client +@logger.catch(reraise=True) +def app(*, client: Bfabric) -> None: + """CLI interface for slurm submitter.""" + parser = argparse.ArgumentParser() + parser.add_argument("--submitters-yml", type=Path) + parser.add_argument("-j", type=int) + args = parser.parse_args() + external_job = ExternalJob.find(id=args.j, client=client) + submitters_spec_template = SubmittersSpecTemplate.model_validate(yaml.safe_load(args.submitters_yml.read_text())) + submitter = Submitter(client=client, external_job=external_job, submitters_spec_template=submitters_spec_template) + submitter.run() + + +if __name__ == "__main__": + app() diff --git a/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/wrapper_creator.py b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/wrapper_creator.py new file mode 100644 index 00000000..62a95242 --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/bfabric_app/wrapper_creator.py @@ -0,0 +1,7 @@ +def app() -> None: + """Wrapper creator CLI.""" + print("Dummy script") + + +if __name__ == "__main__": + app() diff --git a/bfabric_app_runner/src/bfabric_app_runner/cli/__main__.py b/bfabric_app_runner/src/bfabric_app_runner/cli/__main__.py index 06b2996e..35c34686 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/cli/__main__.py +++ b/bfabric_app_runner/src/bfabric_app_runner/cli/__main__.py @@ -13,6 +13,7 @@ cmd_validate_outputs_spec, cmd_validate_app_spec, cmd_validate_app_spec_template, + cmd_validate_submitters_spec_template, ) package_version = importlib.metadata.version("bfabric_app_runner") @@ -21,10 +22,10 @@ version=package_version, ) -cmd_app = cyclopts.App("app", help="Run an app.") +cmd_app = cyclopts.App(help="Run an app.") cmd_app.command(cmd_app_dispatch, name="dispatch") cmd_app.command(cmd_app_run, name="run") -app.command(cmd_app) +app.command(cmd_app, name="app") cmd_inputs = cyclopts.App("inputs", help="Prepare input files for an app.") cmd_inputs.command(cmd_inputs_check, name="check") @@ -49,7 +50,7 @@ cmd_validate.command(cmd_validate_app_spec_template, name="app-spec-template") cmd_validate.command(cmd_validate_inputs_spec, name="inputs-spec") cmd_validate.command(cmd_validate_outputs_spec, name="outputs-spec") - +cmd_validate.command(cmd_validate_submitters_spec_template, name="submitters-spec-template") app.command(cmd_validate) if __name__ == "__main__": diff --git a/bfabric_app_runner/src/bfabric_app_runner/cli/validate.py b/bfabric_app_runner/src/bfabric_app_runner/cli/validate.py index 93637629..1c943f46 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/cli/validate.py +++ b/bfabric_app_runner/src/bfabric_app_runner/cli/validate.py @@ -6,6 +6,7 @@ from bfabric_app_runner.specs.app.app_spec import AppSpecTemplate, AppSpec from bfabric_app_runner.specs.inputs_spec import InputsSpec from bfabric_app_runner.specs.outputs_spec import OutputsSpec +from bfabric_app_runner.specs.submitters_spec import SubmittersSpecTemplate def cmd_validate_app_spec_template(yaml_file: Path) -> None: @@ -14,7 +15,7 @@ def cmd_validate_app_spec_template(yaml_file: Path) -> None: pprint(app_spec_file) -def cmd_validate_app_spec(app_yaml: Path, app_id: str = "x", app_name: str = "y") -> None: +def cmd_validate_app_spec(app_yaml: Path, app_id: int = 0, app_name: str = "x") -> None: """Validates the app versions by expanding the relevant config info.""" versions = AppSpec.load_yaml(app_yaml, app_id=app_id, app_name=app_name) pprint(versions) @@ -30,3 +31,9 @@ def cmd_validate_outputs_spec(yaml_file: Path) -> None: """Validate an outputs spec file.""" outputs_spec = OutputsSpec.model_validate(yaml.safe_load(yaml_file.read_text())) pprint(outputs_spec) + + +def cmd_validate_submitters_spec_template(yaml_file: Path) -> None: + """Validate a submitters spec file.""" + submitters_spec = SubmittersSpecTemplate.model_validate(yaml.safe_load(yaml_file.read_text())) + pprint(submitters_spec) diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_spec.py b/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_spec.py index 18def569..91649bea 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_spec.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_spec.py @@ -6,6 +6,7 @@ from pydantic import BaseModel from bfabric_app_runner.specs.app.app_version import AppVersion, AppVersionMultiTemplate # noqa: TCH001 +from bfabric_app_runner.specs.config_interpolation import VariablesApp if TYPE_CHECKING: from pathlib import Path @@ -30,27 +31,32 @@ class AppSpecTemplate(BaseModel): The main difference is that this may contain usages of `Variables`. TODO """ - # TODO consider whether to reintroduce - # bfabric: BfabricAppSpec + bfabric: BfabricAppSpec versions: list[AppVersionMultiTemplate] - def evaluate(self, app_id: str, app_name: str) -> AppSpec: + # TODO this should take the variables as param instead + def evaluate(self, app_id: int, app_name: str) -> AppSpec: """Evaluates the template to a concrete ``AppSpec`` instance.""" - versions_templates = [expanded for version in self.versions for expanded in version.expand_versions()] - versions = [template.evaluate(app_id=app_id, app_name=app_name) for template in versions_templates] - return AppSpec.model_validate({"versions": versions}) + version_templates = [expanded for version in self.versions for expanded in version.expand_versions()] + versions = [] + for version_template in version_templates: + variables_app = VariablesApp(id=app_id, name=app_name, version=version_template.version) + versions.append(version_template.evaluate(variables_app=variables_app)) + # TODO add interpolation for bfabric config for consistency + return AppSpec.model_validate({"versions": versions, "bfabric": self.bfabric}) class AppSpec(BaseModel): """Parsed app versions from the app spec file.""" + bfabric: BfabricAppSpec versions: list[AppVersion] @classmethod def load_yaml(cls, app_yaml: Path, app_id: int | str, app_name: str) -> AppSpec: """Loads the app versions from the provided YAML file and evaluates the templates.""" app_spec_file = AppSpecTemplate.model_validate(yaml.safe_load(app_yaml.read_text())) - return app_spec_file.evaluate(app_id=str(app_id), app_name=str(app_name)) + return app_spec_file.evaluate(app_id=int(app_id), app_name=str(app_name)) @property def available_versions(self) -> set[str]: diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_version.py b/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_version.py index 373185f9..459900f1 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_version.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/app/app_version.py @@ -4,9 +4,8 @@ from pydantic import BaseModel, field_validator -from bfabric_app_runner.specs import config_interpolation from bfabric_app_runner.specs.app.commands_spec import CommandsSpec # noqa: TCH001 -from bfabric_app_runner.specs.config_interpolation import interpolate_config_strings +from bfabric_app_runner.specs.config_interpolation import interpolate_config_strings, VariablesApp from bfabric_app_runner.specs.submitter_ref import SubmitterRef # noqa: TCH001 @@ -30,11 +29,10 @@ class AppVersionTemplate(BaseModel): # TODO remove when new submitter becomes available reuse_default_resource: bool = True - def evaluate(self, app_id: str, app_name: str) -> AppVersion: + def evaluate(self, variables_app: VariablesApp) -> AppVersion: """Evaluates the template to a concrete ``AppVersion`` instance.""" - variables_app = config_interpolation.VariablesApp(id=app_id, name=app_name, version=self.version) data_template = self.model_dump(mode="json") - data = interpolate_config_strings(data_template, variables={"app": variables_app}) + data = interpolate_config_strings(data_template, variables={"app": variables_app, "workunit": None}) return AppVersion.model_validate(data) diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/config_interpolation.py b/bfabric_app_runner/src/bfabric_app_runner/specs/config_interpolation.py index 6493a41a..5a60364d 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/config_interpolation.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/config_interpolation.py @@ -1,11 +1,12 @@ from __future__ import annotations import re +import secrets from typing import Any from loguru import logger from mako.template import Template -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, field_validator, Field class VariablesApp(BaseModel): @@ -24,13 +25,19 @@ def validate_safe_name(cls, value: str) -> str: return characters.sub("_", value) +class VariablesWorkunit(BaseModel): + id: int + file_token: str = Field(default_factory=lambda: secrets.token_hex(16)) + + class Variables(BaseModel): """Variables that can be used in our config templates.""" app: VariablesApp + workunit: VariablesWorkunit | None def as_dict(self) -> dict[str, VariablesApp]: - return {"app": self.app} + return {"app": self.app, "workunit": self.workunit} def interpolate_config_strings(data: Any, variables: Variables | dict[str, Any]) -> Any: diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/submitter_ref.py b/bfabric_app_runner/src/bfabric_app_runner/specs/submitter_ref.py index e5f3f8d6..51aabb86 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/specs/submitter_ref.py +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/submitter_ref.py @@ -1,10 +1,10 @@ -from typing import Any +from typing import Annotated -from pydantic import BaseModel +from pydantic import BaseModel, StringConstraints class SubmitterRef(BaseModel): """Reference of a submitter and potential configuration overrides.""" name: str - config: dict[str, Any] = {} + params: dict[Annotated[str, StringConstraints(pattern="^--.*")], str | None] = {} diff --git a/bfabric_app_runner/src/bfabric_app_runner/specs/submitters_spec.py b/bfabric_app_runner/src/bfabric_app_runner/specs/submitters_spec.py new file mode 100644 index 00000000..ec03523d --- /dev/null +++ b/bfabric_app_runner/src/bfabric_app_runner/specs/submitters_spec.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Literal, Annotated + +from pydantic import BaseModel, ConfigDict, Field, StringConstraints, AliasChoices, model_validator + +from bfabric_app_runner.specs.config_interpolation import Variables, interpolate_config_strings + + +class SubmitterSlurmConfigSpec(BaseModel): + slurm_root: Path = Field(validation_alias=AliasChoices("slurm_root", "slurm-root")) + local_script_dir: Path = Field(validation_alias=AliasChoices("local_script_dir", "local-script-dir")) + worker_scratch_dir: Path = Field(validation_alias=AliasChoices("worker_scratch_dir", "worker-scratch-dir")) + log_storage_id: int | None = Field(validation_alias=AliasChoices("log_storage_id", "log-storage-id"), default=None) + log_storage_resource_name: str | None = Field( + validation_alias=AliasChoices("log_storage_resource_name", "log-storage-resource-name"), default=None + ) + log_storage_filename: str | None = Field( + validation_alias=AliasChoices("log_storage_filename", "log-storage-filename"), default=None + ) + force_storage: Path | None = Field(validation_alias=AliasChoices("force_storage", "force-storage"), default=None) + + @model_validator(mode="after") + def either_full_or_no_log_config(self) -> SubmitterSlurmConfigSpec: + if self.log_storage_id is None and self.log_storage_filename is None and self.log_storage_resource_name is None: + return self + if ( + self.log_storage_id is not None + and self.log_storage_resource_name is not None + and self.log_storage_filename is not None + ): + return self + raise ValueError("Either all log storage parameters must be provided, or none of them") + + +class SubmitterSlurmSpec(BaseModel): + model_config = ConfigDict(coerce_numbers_to_str=True) + + type: Literal["slurm"] + params: dict[Annotated[str, StringConstraints(pattern="^--.*")], str | None] + config: SubmitterSlurmConfigSpec + + +class SubmittersSpec(BaseModel): + submitters: dict[str, SubmitterSlurmSpec] + + +class SubmittersSpecTemplate(BaseModel): + submitters: dict[str, SubmitterSlurmSpec] + + def evaluate(self, variables: Variables) -> SubmittersSpec: + """Evaluates the template to a concrete ``SubmittersSpec`` instance.""" + template_data = self.model_dump(mode="json") + data = interpolate_config_strings(data=template_data, variables=variables) + return SubmittersSpec.model_validate(data) diff --git a/bfabric_app_runner/src/bfabric_app_runner/util/scp.py b/bfabric_app_runner/src/bfabric_app_runner/util/scp.py index 547bba54..10c5b387 100644 --- a/bfabric_app_runner/src/bfabric_app_runner/util/scp.py +++ b/bfabric_app_runner/src/bfabric_app_runner/util/scp.py @@ -33,8 +33,9 @@ def scp(source: str | Path, target: str | Path, *, user: str | None = None, mkdi if target_remote: host, path = target.split(":", 1) parent_path = Path(path).parent - logger.debug(f"ssh {host} mkdir -p {parent_path}") - subprocess.run(["ssh", host, "mkdir", "-p", parent_path], check=True) + cmd = ["ssh", host, shlex.join(["mkdir", "-p", str(parent_path)])] + logger.debug(shlex.join(cmd)) + subprocess.run(cmd, check=True) else: parent_path = Path(target).parent parent_path.mkdir(parents=True, exist_ok=True) diff --git a/tests/bfabric_app_runner/bfabric_app/__init__.py b/tests/bfabric_app_runner/bfabric_app/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/bfabric_app_runner/bfabric_app/fixtures/app_version.yml b/tests/bfabric_app_runner/bfabric_app/fixtures/app_version.yml new file mode 100644 index 00000000..4aa0bf3b --- /dev/null +++ b/tests/bfabric_app_runner/bfabric_app/fixtures/app_version.yml @@ -0,0 +1,13 @@ +version: "1.0.0" +submitter: + name: "test_submitter" +commands: + dispatch: + type: shell + command: echo "Dispatching" + process: + type: shell + command: echo "Processing" + collect: + type: shell + command: echo "Collecting" diff --git a/tests/bfabric_app_runner/bfabric_app/fixtures/submitter_slurm_spec.yml b/tests/bfabric_app_runner/bfabric_app/fixtures/submitter_slurm_spec.yml new file mode 100644 index 00000000..f2856c98 --- /dev/null +++ b/tests/bfabric_app_runner/bfabric_app/fixtures/submitter_slurm_spec.yml @@ -0,0 +1,8 @@ +type: slurm +params: + --nodes: 2 + --partition: compute +config: + slurm-root: /test/slurm-root + local-script-dir: /test/local-script-dir + worker-scratch-dir: /test/worker-scratch-dir/${app.id}_${app.name}/${workunit.id} diff --git a/tests/bfabric_app_runner/bfabric_app/fixtures/workunit_definition.yml b/tests/bfabric_app_runner/bfabric_app/fixtures/workunit_definition.yml new file mode 100644 index 00000000..cf7838cd --- /dev/null +++ b/tests/bfabric_app_runner/bfabric_app/fixtures/workunit_definition.yml @@ -0,0 +1,15 @@ +execution: + raw_parameters: + cpus: "4" + memory: "8G" + time: "1:00:00" + dataset: -1 +registration: + application_id: -1 + application_name: Testing App + workunit_id: -1 + workunit_name: Testing WU + container_id: -1 + storage_id: -1 + storage_output_folder: /dev/null/ + container_type: project diff --git a/tests/bfabric_app_runner/bfabric_app/test_submitter.py b/tests/bfabric_app_runner/bfabric_app/test_submitter.py new file mode 100644 index 00000000..b20d192d --- /dev/null +++ b/tests/bfabric_app_runner/bfabric_app/test_submitter.py @@ -0,0 +1,95 @@ +import base64 +from pathlib import Path +from typing import Any + +import pytest +import yaml +from pytest_mock import MockerFixture + +from bfabric import Bfabric +from bfabric.entities import Workunit, ExternalJob +from bfabric.experimental.workunit_definition import WorkunitDefinition +from bfabric_app_runner.bfabric_app.submitter import Submitter +from bfabric_app_runner.bfabric_app.submitter import WorkunitWrapperData +from bfabric_app_runner.specs.app.app_version import AppVersion +from bfabric_app_runner.specs.submitters_spec import SubmittersSpecTemplate, SubmitterSlurmSpec +from tests.conftest import yaml_fixture + +WORKUNIT_ID = 500 + + +@pytest.fixture +def mock_client(mocker: MockerFixture) -> Bfabric: + return mocker.MagicMock() + + +@pytest.fixture +def mock_workunit(mocker: MockerFixture) -> Workunit: + return mocker.MagicMock(id=WORKUNIT_ID) + + +@pytest.fixture +def mock_external_job(mocker: MockerFixture, mock_workunit: Workunit) -> ExternalJob: + return mocker.MagicMock(workunit=mock_workunit) + + +@pytest.fixture +def mock_submitters_spec_template(mock_submitter_slurm_spec) -> SubmittersSpecTemplate: + return SubmittersSpecTemplate(submitters={"test_submitter": mock_submitter_slurm_spec}) + + +@pytest.fixture +def mock_submitter( + mock_client: Bfabric, mock_external_job: ExternalJob, mock_submitters_spec_template: SubmittersSpecTemplate +) -> Submitter: + return Submitter( + client=mock_client, external_job=mock_external_job, submitters_spec_template=mock_submitters_spec_template + ) + + +mock_app_version = yaml_fixture(AppVersion, "app_version") +mock_workunit_definition = yaml_fixture(WorkunitDefinition, "workunit_definition") +mock_submitter_slurm_spec = yaml_fixture(SubmitterSlurmSpec, "submitter_slurm_spec") + + +@pytest.fixture +def mock_workunit_wrapper_data( + mock_app_version: AppVersion, mock_workunit_definition: WorkunitDefinition +) -> WorkunitWrapperData: + return WorkunitWrapperData( + workunit_definition=mock_workunit_definition, app_version=mock_app_version, app_runner_version="1.0.0" + ) + + +@pytest.fixture +def mock_executable_data(mock_workunit_wrapper_data: WorkunitWrapperData) -> dict[str, Any]: + yaml_str = yaml.safe_dump(mock_workunit_wrapper_data.model_dump(mode="json")) + base64_str = base64.b64encode(yaml_str.encode()).decode() + return {"1": {"context": "WORKUNIT", "base64": base64_str}} + + +def test_get_submitter_spec( + mock_submitter: Submitter, + mock_submitters_spec_template: SubmittersSpecTemplate, + mock_workunit_wrapper_data: WorkunitWrapperData, +) -> None: + # Execute + result = mock_submitter.get_submitter_spec(mock_workunit_wrapper_data) + + # Verify + assert isinstance(result, SubmitterSlurmSpec) + assert result.params["--partition"] == "compute" + assert result.config.worker_scratch_dir == Path("/test/worker-scratch-dir/-1_Testing_App/-1") + + +def test_get_submitter_spec_not_found( + mock_submitter: Submitter, + mock_submitters_spec_template: SubmittersSpecTemplate, + mock_workunit_wrapper_data: WorkunitWrapperData, +) -> None: + # Setup + mock_workunit_wrapper_data.app_version.submitter.name = "nonexistent_submitter" + + # Execute and verify + with pytest.raises(ValueError, match="Submitter 'nonexistent_submitter' not found"): + mock_submitter.get_submitter_spec(mock_workunit_wrapper_data) diff --git a/tests/bfabric_app_runner/specs/test_app_spec.py b/tests/bfabric_app_runner/specs/test_app_spec.py index 5c6c5c42..9064d5cf 100644 --- a/tests/bfabric_app_runner/specs/test_app_spec.py +++ b/tests/bfabric_app_runner/specs/test_app_spec.py @@ -57,8 +57,8 @@ def serialized() -> str: type: docker reuse_default_resource: true submitter: - config: {} name: submitter + params: {} version: 0.0.1""" diff --git a/tests/bfabric_app_runner/specs/test_config_interpolation.py b/tests/bfabric_app_runner/specs/test_config_interpolation.py index b8c94658..95b5d8aa 100644 --- a/tests/bfabric_app_runner/specs/test_config_interpolation.py +++ b/tests/bfabric_app_runner/specs/test_config_interpolation.py @@ -9,7 +9,7 @@ @pytest.fixture def basic_variables(): - return Variables(app=VariablesApp(id=1000, name="Test Application", version="1.0.0")) + return Variables(app=VariablesApp(id=1000, name="Test Application", version="1.0.0"), workunit=None) def test_variables_app_model(): @@ -69,7 +69,7 @@ def test_interpolate_non_string_values(basic_variables): def test_interpolate_with_dict_variables(): - variables = {"app": {"id": "2000", "name": "Dict App", "version": "2.0.0"}} + variables = {"app": {"id": "2000", "name": "Dict App", "version": "2.0.0"}, "workunit": None} template = "${app.name} ${app.version}" result = interpolate_config_strings(template, variables) assert result == "Dict_App 2.0.0" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..735bda77 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,44 @@ +from pathlib import Path +from typing import TypeVar + +import pytest +import yaml +from pydantic import BaseModel + + +T = TypeVar("T", bound=BaseModel) + + +def yaml_fixture(model_class: type[T], fixture_name: str): + """Create a pytest fixture that loads a YAML file into a Pydantic model. + + :param model_class: The Pydantic model class to parse the YAML into + :type model_class: Type[T] + :param fixture_name: Name of the YAML file (without extension) + :type fixture_name: str + :return: A pytest fixture function that returns the parsed model + :rtype: pytest.fixture + + :Example: + + .. code-block:: python + + class User(BaseModel): + name: str + email: str + + + # Will load from tests/fixtures/test_user.yml + user = yaml_fixture(User, "test_user") + """ + + @pytest.fixture + def _fixture(request) -> T: + fixtures_dir = Path(request.module.__file__).parent / "fixtures" + fixture_path = fixtures_dir / f"{fixture_name}.yml" + + with fixture_path.open() as f: + data = yaml.safe_load(f) + return model_class.model_validate(data) + + return _fixture