Skip to content

Commit

Permalink
add archive
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 12, 2024
1 parent 6521a98 commit 75c2f91
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 73 deletions.
48 changes: 42 additions & 6 deletions src/anemoi/registry/commands/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,34 @@ def add_arguments(self, command_parser):
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")

command_parser.add_argument("--add-weights", nargs="+", help="Add weights to the experiment.")
command_parser.add_argument(
"--add-weights",
nargs="+",
help=(
"Add weights to the experiment and upload them do s3."
"Skip upload if these weights are already uploaded."
),
)
command_parser.add_argument("--add-plots", nargs="+", help="Add plots to the experiment.")
command_parser.add_argument("--add-artefacts", nargs="+", help="Add artefacts to the experiment.")
command_parser.add_argument("--overwrite", help="Overwrite if already exists.", action="store_true")

def check_arguments(self, args):
pass
command_parser.add_argument(
"--set-archive", help="Input file to register as an archive metadata file to the catalogue."
)
command_parser.add_argument(
"--get-archive", help="Output file to save the archive metadata file from the catalogue."
)
command_parser.add_argument(
"--archive-platform",
help="Archive platform. Only relevant for --set-archive and --get-archive.",
)
command_parser.add_argument(
"--run-number", help="The run number of the experiment. Relevant --set-archive and --get-archive."
)
command_parser.add_argument(
"--archive-extra-metadata", help="Extra metadata. A list of key=value pairs.", nargs="+"
)

command_parser.add_argument("--overwrite", help="Overwrite if already exists.", action="store_true")

def is_path(self, name_or_path):
if not os.path.exists(name_or_path):
Expand All @@ -62,8 +83,23 @@ def _run(self, entry, args):
self.process_task(entry, args, "unregister")
self.process_task(entry, args, "register", overwrite=args.overwrite)
self.process_task(entry, args, "add_weights")
self.process_task(entry, args, "add_artefacts")
self.process_task(entry, args, "add_plots")
self.process_task(
entry,
args,
"set_archive",
run_number=args.run_number,
platform=args.archive_platform,
overwrite=args.overwrite,
extras=args.archive_extra_metadata,
)
self.process_task(
entry,
args,
"get_archive",
run_number=args.run_number,
platform=args.archive_platform,
)


command = Experiments
3 changes: 0 additions & 3 deletions src/anemoi/registry/commands/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ def add_arguments(self, command_parser):
# tasks.add_argument("-l", "--long", help="Details", action="store_true")
# tasks.add_argument("--sort", help="Sort by date", choices=["created", "updated"], default="updated")

def check_arguments(self, args):
pass

def run(self, args):
if not args.subcommand:
raise ValueError("Missing subcommand")
Expand Down
4 changes: 3 additions & 1 deletion src/anemoi/registry/config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
registry:
api_url: "https://anemoi.ecmwf.int/api/v1"
web_url: "https://anemoi.ecmwf.int"

artefacts_uri_base: "s3://ml-artefacts"

plots_uri_pattern: "s3://ml-artefacts/{expver}/{basename}"
artefacts_uri_pattern: "s3://ml-artefacts/{expver}/{basename}"
datasets_uri_pattern: "s3://ml-datasets/{name}"
weights_uri_pattern: "s3://ml-weights/{uuid}.ckpt"
weights_platform: "ewc"
64 changes: 54 additions & 10 deletions src/anemoi/registry/entry/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import datetime
import logging
import os

import yaml
from anemoi.utils.s3 import download
from anemoi.utils.s3 import upload

from .. import config
Expand All @@ -34,27 +36,63 @@ def load_from_path(self, path):
expver = metadata["expver"]

self.key = expver
self.record = dict(expver=expver, metadata=metadata)
self.record = dict(expver=expver, metadata=metadata, runs={})

def add_plots(self, *paths, **kwargs):
for path in paths:
self._add_one_plot(path, **kwargs)

def _add_one_plot(self, path, **kwargs):
return self._add_one_plot_or_artefact("plot", path, **kwargs)

def add_weights(self, *paths, **kwargs):
for path in paths:
self._add_one_weights(path, **kwargs)

def add_artefacts(self, *paths, **kwargs):
for path in paths:
self._add_one_artefact(path, **kwargs)
def set_archive(self, path, platform, run_number, overwrite, extras):
if run_number is None:
raise ValueError("run_number must be set")
if platform is None:
raise ValueError("platform must be set")
if not os.path.exists(path):
raise FileNotFoundError(f"Could not find archive to upload at {path}")
extras = {v.split("=")[0]: v.split("=")[1] for v in extras}

def _add_one_artefact(self, path, **kwargs):
return self._add_one_plot_or_artefact("artefact", path, **kwargs)
_, ext = os.path.splitext(path)
target = config()["artefacts_uri_base"] + f"/{self.key}/runs/{run_number}/{platform}{ext}"
LOG.info(f"Uploading {path} to {target}.")
upload(path, target, overwrite=overwrite)

dic = dict(url=target, path=path, updated=datetime.datetime.utcnow().isoformat(), **extras)

if "runs" not in self.record:
# for backwards compatibility, create '/runs' if it does not exist
e = self.__class__(key=self.key)
if "runs" not in e.record:
e.rest_item.patch([{"op": "add", "path": "/runs", "value": {}}])
self.record["runs"] = {}

if str(run_number) not in self.record["runs"]:
# add run_number if it does not exist
self.rest_item.patch(
[
{"op": "add", "path": "/runs", "value": self.record["runs"]},
{"op": "add", "path": f"/runs/{run_number}", "value": dict(archives={})},
]
)

self.rest_item.patch([{"op": "add", "path": f"/runs/{run_number}/archives/{platform}", "value": dic}])

def get_archive(self, path, run_number, platform):
if os.path.exists(path):
raise FileExistsError(f"Path {path} already exists")
if run_number not in self.record["runs"]:
raise ValueError(f"Run number {run_number} not found")
if platform not in self.record["runs"][run_number]["archives"]:
raise ValueError(f"Platform {platform} not found")
url = self.record["runs"][run_number]["archives"][platform]["url"]
print(url)
download(url, path)

def _add_one_plot_or_artefact(self, kind, path, **kwargs):
def _add_one_plot(self, path, **kwargs):
kind = "plot"
if not os.path.exists(path):
raise FileNotFoundError(f"Could not find {kind} to upload at {path}")

Expand All @@ -70,10 +108,16 @@ def _add_one_plot_or_artefact(self, kind, path, **kwargs):

def _add_one_weights(self, path, **kwargs):
weights = WeightCatalogueEntry(path=path)

if not WeightCatalogueEntry.key_exists(weights.key):
# weights with this uuid does not exist, register and upload them
weights.register(ignore_existing=False, overwrite=False)
weights.upload(path, overwrite=False)

else:
# Weights with this uuid already exist
# Skip if the weights are the same
# Raise an error if the weights are different
other = WeightCatalogueEntry(key=weights.key)
if other.record["metadata"]["timestamp"] == weights.record["metadata"]["timestamp"]:
LOG.info(
Expand Down
4 changes: 2 additions & 2 deletions tests/dummy-recipe-dataset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ common:
grid: 20./20.

dates:
start: 2020-12-30 00:00:00
end: 2021-01-03 18:00:00
start: 1979-01-01 00:00:00
end: 1979-01-03 18:00:00
frequency: 6h

input:
Expand Down
55 changes: 4 additions & 51 deletions tests/dummy-recipe-experiment.yaml
Original file line number Diff line number Diff line change
@@ -1,54 +1,7 @@
description: Minimal config
dates:
end: 2023-08-31 00:00:00
frequency: 24
start: 2023-06-01 00:00:00
description: Minimal config
ecflow:
host: ecflow-gen-mlx-001
limits:
gpu: 10
port: 3141
target_running_user: mlx
end: 2023-06-03 00:00:00
frequency: 24
evaluation:
name: quaver
scores: scorecard
input:
name: mars
metadata:
config_home: /home/mafp/prepml/i4df
expver: i4df
input: scorecard.yaml
owner: mafp
prepml_command: /home/mafp/venvs/mafp-dev24-02/bin/python3 -m prepml
prepml_module_version: '0.1'
time: '2024-02-22T17:10:31.433359'
model:
name: aifs
output:
class: rd
database: marsrd
name: mars
platform:
flavours:
cpu:
host:
slurm: ac-login
late: -c +23:59
submit_arguments:
account: ecaifs
gpu:
host:
slurm: ac-login
late: -c +00:10
submit_arguments:
account: ecaifs
cpus-per-task: '32'
gres: gpu:1
mem: 64G
nice: '100'
partition: gpu
time: 0-00:10
name: atos
runner:
name: ai-models-module
version: auto
name: quaver-basic

0 comments on commit 75c2f91

Please sign in to comment.