From 75c2f91fd394f4cdb019ea75f1530540938ad9af Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Fri, 12 Jul 2024 13:28:24 +0000 Subject: [PATCH] add archive --- src/anemoi/registry/commands/experiments.py | 48 ++++++++++++++-- src/anemoi/registry/commands/list.py | 3 - src/anemoi/registry/config.yaml | 4 +- src/anemoi/registry/entry/experiment.py | 64 +++++++++++++++++---- tests/dummy-recipe-dataset.yaml | 4 +- tests/dummy-recipe-experiment.yaml | 55 ++---------------- 6 files changed, 105 insertions(+), 73 deletions(-) diff --git a/src/anemoi/registry/commands/experiments.py b/src/anemoi/registry/commands/experiments.py index cf95166..b25d7b4 100644 --- a/src/anemoi/registry/commands/experiments.py +++ b/src/anemoi/registry/commands/experiments.py @@ -43,13 +43,34 @@ def add_arguments(self, command_parser): ) # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") - command_parser.add_argument("--add-weights", nargs="+", help="Add weights to the experiment.") + command_parser.add_argument( + "--add-weights", + nargs="+", + help=( + "Add weights to the experiment and upload them do s3." + "Skip upload if these weights are already uploaded." + ), + ) command_parser.add_argument("--add-plots", nargs="+", help="Add plots to the experiment.") - command_parser.add_argument("--add-artefacts", nargs="+", help="Add artefacts to the experiment.") - command_parser.add_argument("--overwrite", help="Overwrite if already exists.", action="store_true") - def check_arguments(self, args): - pass + command_parser.add_argument( + "--set-archive", help="Input file to register as an archive metadata file to the catalogue." + ) + command_parser.add_argument( + "--get-archive", help="Output file to save the archive metadata file from the catalogue." + ) + command_parser.add_argument( + "--archive-platform", + help="Archive platform. Only relevant for --set-archive and --get-archive.", + ) + command_parser.add_argument( + "--run-number", help="The run number of the experiment. Relevant --set-archive and --get-archive." + ) + command_parser.add_argument( + "--archive-extra-metadata", help="Extra metadata. A list of key=value pairs.", nargs="+" + ) + + command_parser.add_argument("--overwrite", help="Overwrite if already exists.", action="store_true") def is_path(self, name_or_path): if not os.path.exists(name_or_path): @@ -62,8 +83,23 @@ def _run(self, entry, args): self.process_task(entry, args, "unregister") self.process_task(entry, args, "register", overwrite=args.overwrite) self.process_task(entry, args, "add_weights") - self.process_task(entry, args, "add_artefacts") self.process_task(entry, args, "add_plots") + self.process_task( + entry, + args, + "set_archive", + run_number=args.run_number, + platform=args.archive_platform, + overwrite=args.overwrite, + extras=args.archive_extra_metadata, + ) + self.process_task( + entry, + args, + "get_archive", + run_number=args.run_number, + platform=args.archive_platform, + ) command = Experiments diff --git a/src/anemoi/registry/commands/list.py b/src/anemoi/registry/commands/list.py index bd2dbbe..935741b 100644 --- a/src/anemoi/registry/commands/list.py +++ b/src/anemoi/registry/commands/list.py @@ -57,9 +57,6 @@ def add_arguments(self, command_parser): # tasks.add_argument("-l", "--long", help="Details", action="store_true") # tasks.add_argument("--sort", help="Sort by date", choices=["created", "updated"], default="updated") - def check_arguments(self, args): - pass - def run(self, args): if not args.subcommand: raise ValueError("Missing subcommand") diff --git a/src/anemoi/registry/config.yaml b/src/anemoi/registry/config.yaml index 789af5e..b7d82ab 100644 --- a/src/anemoi/registry/config.yaml +++ b/src/anemoi/registry/config.yaml @@ -1,8 +1,10 @@ registry: api_url: "https://anemoi.ecmwf.int/api/v1" + web_url: "https://anemoi.ecmwf.int" + + artefacts_uri_base: "s3://ml-artefacts" plots_uri_pattern: "s3://ml-artefacts/{expver}/{basename}" - artefacts_uri_pattern: "s3://ml-artefacts/{expver}/{basename}" datasets_uri_pattern: "s3://ml-datasets/{name}" weights_uri_pattern: "s3://ml-weights/{uuid}.ckpt" weights_platform: "ewc" diff --git a/src/anemoi/registry/entry/experiment.py b/src/anemoi/registry/entry/experiment.py index 456f8c5..9311545 100644 --- a/src/anemoi/registry/entry/experiment.py +++ b/src/anemoi/registry/entry/experiment.py @@ -5,10 +5,12 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. +import datetime import logging import os import yaml +from anemoi.utils.s3 import download from anemoi.utils.s3 import upload from .. import config @@ -34,27 +36,63 @@ def load_from_path(self, path): expver = metadata["expver"] self.key = expver - self.record = dict(expver=expver, metadata=metadata) + self.record = dict(expver=expver, metadata=metadata, runs={}) def add_plots(self, *paths, **kwargs): for path in paths: self._add_one_plot(path, **kwargs) - def _add_one_plot(self, path, **kwargs): - return self._add_one_plot_or_artefact("plot", path, **kwargs) - def add_weights(self, *paths, **kwargs): for path in paths: self._add_one_weights(path, **kwargs) - def add_artefacts(self, *paths, **kwargs): - for path in paths: - self._add_one_artefact(path, **kwargs) + def set_archive(self, path, platform, run_number, overwrite, extras): + if run_number is None: + raise ValueError("run_number must be set") + if platform is None: + raise ValueError("platform must be set") + if not os.path.exists(path): + raise FileNotFoundError(f"Could not find archive to upload at {path}") + extras = {v.split("=")[0]: v.split("=")[1] for v in extras} - def _add_one_artefact(self, path, **kwargs): - return self._add_one_plot_or_artefact("artefact", path, **kwargs) + _, ext = os.path.splitext(path) + target = config()["artefacts_uri_base"] + f"/{self.key}/runs/{run_number}/{platform}{ext}" + LOG.info(f"Uploading {path} to {target}.") + upload(path, target, overwrite=overwrite) + + dic = dict(url=target, path=path, updated=datetime.datetime.utcnow().isoformat(), **extras) + + if "runs" not in self.record: + # for backwards compatibility, create '/runs' if it does not exist + e = self.__class__(key=self.key) + if "runs" not in e.record: + e.rest_item.patch([{"op": "add", "path": "/runs", "value": {}}]) + self.record["runs"] = {} + + if str(run_number) not in self.record["runs"]: + # add run_number if it does not exist + self.rest_item.patch( + [ + {"op": "add", "path": "/runs", "value": self.record["runs"]}, + {"op": "add", "path": f"/runs/{run_number}", "value": dict(archives={})}, + ] + ) + + self.rest_item.patch([{"op": "add", "path": f"/runs/{run_number}/archives/{platform}", "value": dic}]) + + def get_archive(self, path, run_number, platform): + if os.path.exists(path): + raise FileExistsError(f"Path {path} already exists") + if run_number not in self.record["runs"]: + raise ValueError(f"Run number {run_number} not found") + if platform not in self.record["runs"][run_number]["archives"]: + raise ValueError(f"Platform {platform} not found") + url = self.record["runs"][run_number]["archives"][platform]["url"] + print(url) + download(url, path) - def _add_one_plot_or_artefact(self, kind, path, **kwargs): + def _add_one_plot(self, path, **kwargs): + kind = "plot" if not os.path.exists(path): raise FileNotFoundError(f"Could not find {kind} to upload at {path}") @@ -70,10 +108,16 @@ def _add_one_plot_or_artefact(self, kind, path, **kwargs): def _add_one_weights(self, path, **kwargs): weights = WeightCatalogueEntry(path=path) + if not WeightCatalogueEntry.key_exists(weights.key): + # weights with this uuid does not exist, register and upload them weights.register(ignore_existing=False, overwrite=False) weights.upload(path, overwrite=False) + else: + # Weights with this uuid already exist + # Skip if the weights are the same + # Raise an error if the weights are different other = WeightCatalogueEntry(key=weights.key) if other.record["metadata"]["timestamp"] == weights.record["metadata"]["timestamp"]: LOG.info( diff --git a/tests/dummy-recipe-dataset.yaml b/tests/dummy-recipe-dataset.yaml index 8df2ce7..2a73081 100644 --- a/tests/dummy-recipe-dataset.yaml +++ b/tests/dummy-recipe-dataset.yaml @@ -5,8 +5,8 @@ common: grid: 20./20. dates: - start: 2020-12-30 00:00:00 - end: 2021-01-03 18:00:00 + start: 1979-01-01 00:00:00 + end: 1979-01-03 18:00:00 frequency: 6h input: diff --git a/tests/dummy-recipe-experiment.yaml b/tests/dummy-recipe-experiment.yaml index 8a9bb75..5a22ec0 100644 --- a/tests/dummy-recipe-experiment.yaml +++ b/tests/dummy-recipe-experiment.yaml @@ -1,54 +1,7 @@ +description: Minimal config dates: - end: 2023-08-31 00:00:00 - frequency: 24 start: 2023-06-01 00:00:00 -description: Minimal config -ecflow: - host: ecflow-gen-mlx-001 - limits: - gpu: 10 - port: 3141 - target_running_user: mlx + end: 2023-06-03 00:00:00 + frequency: 24 evaluation: - name: quaver - scores: scorecard -input: - name: mars -metadata: - config_home: /home/mafp/prepml/i4df - expver: i4df - input: scorecard.yaml - owner: mafp - prepml_command: /home/mafp/venvs/mafp-dev24-02/bin/python3 -m prepml - prepml_module_version: '0.1' - time: '2024-02-22T17:10:31.433359' -model: - name: aifs -output: - class: rd - database: marsrd - name: mars -platform: - flavours: - cpu: - host: - slurm: ac-login - late: -c +23:59 - submit_arguments: - account: ecaifs - gpu: - host: - slurm: ac-login - late: -c +00:10 - submit_arguments: - account: ecaifs - cpus-per-task: '32' - gres: gpu:1 - mem: 64G - nice: '100' - partition: gpu - time: 0-00:10 - name: atos -runner: - name: ai-models-module - version: auto + name: quaver-basic