diff --git a/docs/configuring.rst b/docs/configuring.rst index 374f3c2..3453d99 100644 --- a/docs/configuring.rst +++ b/docs/configuring.rst @@ -14,11 +14,6 @@ this file are optional and have default values. [registry] api_url = "https://anemoi.ecmwf.int/api/v1" - plots_uri_pattern = "s3://ml-artefacts/{expver}/{basename}" - datasets_uri_pattern = "s3://ml-datasets/{name}" - weights_uri_pattern = "s3://ml-weights/{uuid}.ckpt" - weights_platform = "ewc" - The second config file is ``~/.config/anemoi/config.secret.toml``. This file must have the right permissions set to avoid unauthorized access (`chmod 600 `). All keys in this file have no default values. diff --git a/src/anemoi/registry/commands/base.py b/src/anemoi/registry/commands/base.py index 8bf1168..4564907 100644 --- a/src/anemoi/registry/commands/base.py +++ b/src/anemoi/registry/commands/base.py @@ -81,11 +81,11 @@ def run(self, args): def get_entry(self, name_or_path): if self.is_path(name_or_path): - LOG.info(f"Found local {self.kind} at {name_or_path}") + LOG.debug(f"Found local {self.kind} at {name_or_path}") return self.entry_class(path=name_or_path) if self.is_identifier(name_or_path): - LOG.info(f"Processing {self.kind} with identifier '{name_or_path}'") + LOG.debug(f"Processing {self.kind} with identifier '{name_or_path}'") return self.entry_class(key=name_or_path) def run_from_identifier(self, *args, **kwargs): diff --git a/src/anemoi/registry/commands/datasets.py b/src/anemoi/registry/commands/datasets.py index 0a601a6..1749eb2 100644 --- a/src/anemoi/registry/commands/datasets.py +++ b/src/anemoi/registry/commands/datasets.py @@ -12,7 +12,9 @@ """ +import argparse import logging +import os from ..entry.dataset import DatasetCatalogueEntry from .base import BaseCommand @@ -29,31 +31,82 @@ class Datasets(BaseCommand): kind = "dataset" def add_arguments(self, command_parser): - command_parser.add_argument("NAME_OR_PATH", help=f"The name or the path of a {self.kind}.") - command_parser.add_argument("--register", help=f"Register a {self.kind} in the catalogue.", action="store_true") + command_parser.add_argument("NAME_OR_PATH", help="The name or the path of a dataset.") + command_parser.add_argument("--register", help="Register a dataset in the catalogue.", action="store_true") command_parser.add_argument( "--unregister", - help=f"Remove a {self.kind} from catalogue (without deleting it from its locations)", + help="Remove a dataset from catalogue (without deleting it from its locations). Ignore all other options.", action="store_true", ) - # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") - command_parser.add_argument("--set-status", help="Set the status to the {self.kind}.") - command_parser.add_argument("--set-recipe", help="Set the recipe file to [re-]build the {self.kind}.") + command_parser.add_argument("--url", help="Print the URL of the dataset.", action="store_true") + # command_parser.add_argument("--delete", help=f"Delete the dataset from the catalogue and from any other location", action="store_true") + command_parser.add_argument("--set-status", help="Set the status to the dataset.", metavar="STATUS") command_parser.add_argument( - "--add-location", - nargs="+", - help="Path to add a location to the dataset. Implies --platform", + "--set-recipe", help="Set the recipe file to [re-]build the dataset.", metavar="FILE" ) - command_parser.add_argument("--platform", help="Platform to add the location to. Implies --add-location") + command_parser.add_argument( + "--add-local", + help=("Platform name to add a new location to the NAME_OR_PATH. " "Requires that NAME_OR_PATH is a path."), + metavar="PLATFORM", + ) + + command_parser.add_argument("--add-location", help="Platform name to add a new location.", metavar="PLATFORM") + command_parser.add_argument( + "--uri-pattern", + help="Path of the new location using {name}, such as 's3://ml-datasets/{name}.zarr' . Requires a platform name in --add-location.", + metavar="PATH", + ) + command_parser.add_argument( + "--upload", + help="Upload the dataset. Requires a platform name in --add-location.", + action=argparse.BooleanOptionalAction, + default=False, + ) + + command_parser.add_argument("--remove-location", help="Platform name to remove.", metavar="PLATFORM") def _run(self, entry, args): + if entry is None: + raise ValueError(f"Dataset {args.NAME_OR_PATH} not found in the catalogue and path does not exists.") + + if args.unregister: + entry.unregister() + return + + if args.add_local and not os.path.exists(args.NAME_OR_PATH): + raise ValueError(f"Path {args.NAME_OR_PATH} does not exists. Cannot use --add-local.") + + if args.upload: + if not os.path.exists(args.NAME_OR_PATH): + raise ValueError(f"Path {args.NAME_OR_PATH} does not exists. Cannot use --upload.") + if not args.add_location: + raise ValueError("Cannot use --upload without --add-location.") + + if args.uri_pattern is not None: + if not args.add_location: + raise ValueError("Cannot use --uri-pattern without --add-location.") + if "{name}" not in args.uri_pattern: + raise ValueError(f"URI pattern {args.uri_pattern} does not contain '{{name}}'") + # order matters - self.process_task(entry, args, "unregister") self.process_task(entry, args, "register") - # self.process_task(entry, args, "remove_location") - self.process_task(entry, args, "add_location", platform=args.platform) self.process_task(entry, args, "set_recipe") self.process_task(entry, args, "set_status") + self.process_task(entry, args, "remove_location") + + if args.add_local: + entry.add_location(args.add_local, path=args.NAME_OR_PATH) + + if args.upload or args.add_location: + path = entry.build_location_path(platform=args.add_location, uri_pattern=args.uri_pattern) + if args.upload: + entry.upload(source=args.NAME_OR_PATH, target=path, platform=args.add_location) + if args.add_location: + LOG.info(f"Adding location to {args.add_location}: {path}") + entry.add_location(platform=args.add_location, path=path) + + if args.url: + print(entry.url) command = Datasets diff --git a/src/anemoi/registry/commands/experiments.py b/src/anemoi/registry/commands/experiments.py index 9c162e9..73aa59e 100644 --- a/src/anemoi/registry/commands/experiments.py +++ b/src/anemoi/registry/commands/experiments.py @@ -41,6 +41,7 @@ def add_arguments(self, command_parser): help="Remove from catalogue (without deleting the experiment from other locations)", action="store_true", ) + command_parser.add_argument("--url", help="Print the URL of the experiment.", action="store_true") command_parser.add_argument( "--delete-artefacts", help="Remove experiments artefacts (such as plots)", @@ -106,6 +107,8 @@ def _run(self, entry, args): run_number=args.run_number, platform=args.archive_platform, ) + if args.url: + print(entry.url) command = Experiments diff --git a/src/anemoi/registry/commands/weights.py b/src/anemoi/registry/commands/weights.py index 6d74196..d26473e 100644 --- a/src/anemoi/registry/commands/weights.py +++ b/src/anemoi/registry/commands/weights.py @@ -35,19 +35,24 @@ def add_arguments(self, command_parser): ) command_parser.add_argument( "--unregister", - help="Remove from catalogue (without deleting it from its actual locations).", + help="Remove from catalogue (without deleting it from its actual locations). Ignore all other options.", action="store_true", ) # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") - command_parser.add_argument("--add-location", help="Add a location to the weights.") - command_parser.add_argument("--platform", help="Platform where to add the location.") + command_parser.add_argument("--add-location", help="Platform to add location to the weights.") + command_parser.add_argument("--location-path", help="Path of the new location using {{uuid}}.", metavar="PATH") command_parser.add_argument("--overwrite", help="Overwrite any existing weights.", action="store_true") + command_parser.add_argument("--url", help="Print the URL of the dataset.", action="store_true") def _run(self, entry, args): - self.process_task(entry, args, "unregister") + if args.unregister: + entry.unregister() + return self.process_task(entry, args, "register", overwrite=args.overwrite) - self.process_task(entry, args, "add_location", platform=args.platform) + self.process_task(entry, args, "add_location", path=args.location_path) + if args.url: + print(entry.url) command = Weights diff --git a/src/anemoi/registry/config.yaml b/src/anemoi/registry/config.yaml index 7720c09..adaf732 100644 --- a/src/anemoi/registry/config.yaml +++ b/src/anemoi/registry/config.yaml @@ -5,7 +5,10 @@ registry: artefacts_uri_base: "s3://ml-artefacts" plots_uri_pattern: "s3://ml-artefacts/{expver}/{basename}" - datasets_uri_pattern: "s3://ml-datasets/{name}" + + datasets_uri_pattern: "s3://ml-datasets/{name}.zarr" + datasets_platform: "ewc" + weights_uri_pattern: "s3://ml-weights/{uuid}.ckpt" weights_platform: "ewc" diff --git a/src/anemoi/registry/entry/__init__.py b/src/anemoi/registry/entry/__init__.py index c7509fd..7c2b17d 100644 --- a/src/anemoi/registry/entry/__init__.py +++ b/src/anemoi/registry/entry/__init__.py @@ -30,6 +30,11 @@ class CatalogueEntry: record = None path = None key = None + collection = None + + @property + def url(self): + return f"{config()['web_url']}/{self.collection}/{self.key}" def __init__(self, key=None, path=None, must_exist=True): assert key is not None or path is not None, "key or path must be provided" diff --git a/src/anemoi/registry/entry/dataset.py b/src/anemoi/registry/entry/dataset.py index 028838f..749a64b 100644 --- a/src/anemoi/registry/entry/dataset.py +++ b/src/anemoi/registry/entry/dataset.py @@ -5,11 +5,15 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. +import datetime import logging import os import yaml from anemoi.datasets import open_dataset +from anemoi.utils.humanize import when + +from anemoi.registry import config from . import CatalogueEntry @@ -23,12 +27,75 @@ class DatasetCatalogueEntry(CatalogueEntry): def set_status(self, status): self.rest_item.patch([{"op": "add", "path": "/status", "value": status}]) - def add_location(self, path, platform): + def build_location_path(self, platform, uri_pattern=None): + if uri_pattern is None: + assert platform == config()["datasets_platform"] + uri_pattern = config()["datasets_uri_pattern"] + LOG.debug(f"Using uri pattern from config: {uri_pattern}") + else: + LOG.debug(f"Using uri pattern: {uri_pattern}") + return uri_pattern.format(name=self.key) + + def add_location(self, platform, path): + LOG.debug(f"Adding location to {platform}: {path}") self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}]) + return path def remove_location(self, platform): self.rest_item.patch([{"op": "remove", "path": f"/locations/{platform}"}]) + def upload(self, source, target, platform="unknown", resume=True): + LOG.info(f"Uploading from {source} to {target} ") + assert target.startswith("s3://"), target + + source_path = os.path.abspath(source) + kwargs = dict( + action="transfer-dataset", + source="cli", + source_path=source_path, + destination=platform, + target_path=target, + dataset=self.key, + ) + LOG.info(f"Task: {kwargs}") + + from anemoi.utils.s3 import upload + + from anemoi.registry.tasks import TaskCatalogueEntry + from anemoi.registry.tasks import TaskCatalogueEntryList + from anemoi.registry.workers.transfer_dataset import Progress + + def find_or_create_task(**kwargs): + lst = TaskCatalogueEntryList(**kwargs) + + if not lst: + LOG.info("No runnning transfer found, starting one.") + uuid = TaskCatalogueEntryList().add_new_task(**kwargs) + task = TaskCatalogueEntry(key=uuid) + return task + + lst = TaskCatalogueEntryList(**kwargs) + task = lst[0] + updated = datetime.datetime.fromisoformat(task.record["updated"]) + if resume: + LOG.info(f"Resuming from previous transfer (last update {when(updated)})") + else: + raise ValueError(f"Transfer already in progress (last update {when(updated)})") + return task + + task = find_or_create_task(**kwargs) + task.set_status("running") + + progress = Progress(task, frequency=10) + LOG.info(f"Upload('{source_path}','{target}', resume=True, threads=2)") + try: + upload(source_path, target, resume=True, threads=2, progress=progress) + except: + task.set_status("stopped") + raise + + task.unregister() + def set_recipe(self, file): if not os.path.exists(file): raise FileNotFoundError(f"Recipe file not found: {file}") diff --git a/src/anemoi/registry/entry/weights.py b/src/anemoi/registry/entry/weights.py index 0c5a6aa..70b975a 100644 --- a/src/anemoi/registry/entry/weights.py +++ b/src/anemoi/registry/entry/weights.py @@ -21,8 +21,9 @@ class WeightCatalogueEntry(CatalogueEntry): collection = "weights" main_key = "uuid" - def add_location(self, path, platform): + def add_location(self, platform, path): self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}]) + return path def default_location(self, **kwargs): uri = config()["weights_uri_pattern"] diff --git a/src/anemoi/registry/workers/delete_dataset.py b/src/anemoi/registry/workers/delete_dataset.py index 43d96f1..7fcf8c2 100644 --- a/src/anemoi/registry/workers/delete_dataset.py +++ b/src/anemoi/registry/workers/delete_dataset.py @@ -50,7 +50,12 @@ def worker_process_task(self, task): LOG.warning(f"Deleting {path} from '{platform}'") tmp_path = path + ".deleting" + i = 0 + while os.path.exists(tmp_path): + i += 1 + tmp_path = path + ".deleting." + str(i) os.rename(path, tmp_path) + # shutil.rmtree(tmp_path) LOG.warning(f"Deleted {path} from '{platform}'") diff --git a/tests/test_all.py b/tests/test_all.py index f61e745..c3a6719 100755 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -23,7 +23,10 @@ def run(*args): def setup_module(): teardown_module(raise_if_error=False) run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--register") + run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml") + run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt", "--register") + run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt") if not os.path.exists(DATASET_PATH): run("anemoi-datasets", "create", "./dummy-recipe-dataset.yaml", DATASET_PATH, "--overwrite") @@ -31,6 +34,7 @@ def setup_module(): os.symlink(DATASET_PATH, TMP_DATASET_PATH) run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--register") + run("anemoi-registry", "datasets", TMP_DATASET_PATH) print("# Setup done") @@ -60,8 +64,25 @@ def test_datasets(): run("anemoi-registry", "datasets", TMP_DATASET) run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", "./dummy-recipe-dataset.yaml") run("anemoi-registry", "datasets", TMP_DATASET, "--set-status", "testing") - run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "/the/dataset/path", "--platform", "atos") - run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "/other/path", "--platform", "leonardo") + run( + "anemoi-registry", + "datasets", + TMP_DATASET, + "--add-location", + "atos", + "--uri-pattern", + "/the/dataset/path/{name}", + ) + run( + "anemoi-registry", + "datasets", + TMP_DATASET, + "--add-location", + "leonardo", + "--uri-pattern", + "https://other/{name}/path", + ) + run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "ewc") def test_weights(): @@ -72,9 +93,9 @@ def test_weights(): "weights", "./dummy-checkpoint.ckpt", "--add-location", - "s3://ml-weights/a5275e04-0000-0000-a0f6-be19591b09fe.ckpt", - "--platform", "ewc", + "--location-path", + "s3://ml-weights/a5275e04-0000-0000-a0f6-be19591b09fe.ckpt", )