Skip to content

Commit

Permalink
implement datasets --upload
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 19, 2024
1 parent 2e541e4 commit 7a3b0dc
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 32 deletions.
5 changes: 0 additions & 5 deletions docs/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ this file are optional and have default values.
[registry]
api_url = "https://anemoi.ecmwf.int/api/v1"
plots_uri_pattern = "s3://ml-artefacts/{expver}/{basename}"
datasets_uri_pattern = "s3://ml-datasets/{name}"
weights_uri_pattern = "s3://ml-weights/{uuid}.ckpt"
weights_platform = "ewc"
The second config file is ``~/.config/anemoi/config.secret.toml``. This
file must have the right permissions set to avoid unauthorized access
(`chmod 600 <filename>`). All keys in this file have no default values.
Expand Down
4 changes: 2 additions & 2 deletions src/anemoi/registry/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ def run(self, args):

def get_entry(self, name_or_path):
if self.is_path(name_or_path):
LOG.info(f"Found local {self.kind} at {name_or_path}")
LOG.debug(f"Found local {self.kind} at {name_or_path}")
return self.entry_class(path=name_or_path)

if self.is_identifier(name_or_path):
LOG.info(f"Processing {self.kind} with identifier '{name_or_path}'")
LOG.debug(f"Processing {self.kind} with identifier '{name_or_path}'")
return self.entry_class(key=name_or_path)

def run_from_identifier(self, *args, **kwargs):
Expand Down
79 changes: 66 additions & 13 deletions src/anemoi/registry/commands/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
"""

import argparse
import logging
import os

from ..entry.dataset import DatasetCatalogueEntry
from .base import BaseCommand
Expand All @@ -29,31 +31,82 @@ class Datasets(BaseCommand):
kind = "dataset"

def add_arguments(self, command_parser):
command_parser.add_argument("NAME_OR_PATH", help=f"The name or the path of a {self.kind}.")
command_parser.add_argument("--register", help=f"Register a {self.kind} in the catalogue.", action="store_true")
command_parser.add_argument("NAME_OR_PATH", help="The name or the path of a dataset.")
command_parser.add_argument("--register", help="Register a dataset in the catalogue.", action="store_true")
command_parser.add_argument(
"--unregister",
help=f"Remove a {self.kind} from catalogue (without deleting it from its locations)",
help="Remove a dataset from catalogue (without deleting it from its locations). Ignore all other options.",
action="store_true",
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")
command_parser.add_argument("--set-status", help="Set the status to the {self.kind}.")
command_parser.add_argument("--set-recipe", help="Set the recipe file to [re-]build the {self.kind}.")
command_parser.add_argument("--url", help="Print the URL of the dataset.", action="store_true")
# command_parser.add_argument("--delete", help=f"Delete the dataset from the catalogue and from any other location", action="store_true")
command_parser.add_argument("--set-status", help="Set the status to the dataset.", metavar="STATUS")
command_parser.add_argument(
"--add-location",
nargs="+",
help="Path to add a location to the dataset. Implies --platform",
"--set-recipe", help="Set the recipe file to [re-]build the dataset.", metavar="FILE"
)
command_parser.add_argument("--platform", help="Platform to add the location to. Implies --add-location")
command_parser.add_argument(
"--add-local",
help=("Platform name to add a new location to the NAME_OR_PATH. " "Requires that NAME_OR_PATH is a path."),
metavar="PLATFORM",
)

command_parser.add_argument("--add-location", help="Platform name to add a new location.", metavar="PLATFORM")
command_parser.add_argument(
"--uri-pattern",
help="Path of the new location using {name}, such as 's3://ml-datasets/{name}.zarr' . Requires a platform name in --add-location.",
metavar="PATH",
)
command_parser.add_argument(
"--upload",
help="Upload the dataset. Requires a platform name in --add-location.",
action=argparse.BooleanOptionalAction,
default=False,
)

command_parser.add_argument("--remove-location", help="Platform name to remove.", metavar="PLATFORM")

def _run(self, entry, args):
if entry is None:
raise ValueError(f"Dataset {args.NAME_OR_PATH} not found in the catalogue and path does not exists.")

if args.unregister:
entry.unregister()
return

if args.add_local and not os.path.exists(args.NAME_OR_PATH):
raise ValueError(f"Path {args.NAME_OR_PATH} does not exists. Cannot use --add-local.")

if args.upload:
if not os.path.exists(args.NAME_OR_PATH):
raise ValueError(f"Path {args.NAME_OR_PATH} does not exists. Cannot use --upload.")
if not args.add_location:
raise ValueError("Cannot use --upload without --add-location.")

if args.uri_pattern is not None:
if not args.add_location:
raise ValueError("Cannot use --uri-pattern without --add-location.")
if "{name}" not in args.uri_pattern:
raise ValueError(f"URI pattern {args.uri_pattern} does not contain '{{name}}'")

# order matters
self.process_task(entry, args, "unregister")
self.process_task(entry, args, "register")
# self.process_task(entry, args, "remove_location")
self.process_task(entry, args, "add_location", platform=args.platform)
self.process_task(entry, args, "set_recipe")
self.process_task(entry, args, "set_status")
self.process_task(entry, args, "remove_location")

if args.add_local:
entry.add_location(args.add_local, path=args.NAME_OR_PATH)

if args.upload or args.add_location:
path = entry.build_location_path(platform=args.add_location, uri_pattern=args.uri_pattern)
if args.upload:
entry.upload(source=args.NAME_OR_PATH, target=path, platform=args.add_location)
if args.add_location:
LOG.info(f"Adding location to {args.add_location}: {path}")
entry.add_location(platform=args.add_location, path=path)

if args.url:
print(entry.url)


command = Datasets
3 changes: 3 additions & 0 deletions src/anemoi/registry/commands/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def add_arguments(self, command_parser):
help="Remove from catalogue (without deleting the experiment from other locations)",
action="store_true",
)
command_parser.add_argument("--url", help="Print the URL of the experiment.", action="store_true")
command_parser.add_argument(
"--delete-artefacts",
help="Remove experiments artefacts (such as plots)",
Expand Down Expand Up @@ -106,6 +107,8 @@ def _run(self, entry, args):
run_number=args.run_number,
platform=args.archive_platform,
)
if args.url:
print(entry.url)


command = Experiments
15 changes: 10 additions & 5 deletions src/anemoi/registry/commands/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,24 @@ def add_arguments(self, command_parser):
)
command_parser.add_argument(
"--unregister",
help="Remove from catalogue (without deleting it from its actual locations).",
help="Remove from catalogue (without deleting it from its actual locations). Ignore all other options.",
action="store_true",
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")

command_parser.add_argument("--add-location", help="Add a location to the weights.")
command_parser.add_argument("--platform", help="Platform where to add the location.")
command_parser.add_argument("--add-location", help="Platform to add location to the weights.")
command_parser.add_argument("--location-path", help="Path of the new location using {{uuid}}.", metavar="PATH")
command_parser.add_argument("--overwrite", help="Overwrite any existing weights.", action="store_true")
command_parser.add_argument("--url", help="Print the URL of the dataset.", action="store_true")

def _run(self, entry, args):
self.process_task(entry, args, "unregister")
if args.unregister:
entry.unregister()
return
self.process_task(entry, args, "register", overwrite=args.overwrite)
self.process_task(entry, args, "add_location", platform=args.platform)
self.process_task(entry, args, "add_location", path=args.location_path)
if args.url:
print(entry.url)


command = Weights
5 changes: 4 additions & 1 deletion src/anemoi/registry/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ registry:
artefacts_uri_base: "s3://ml-artefacts"

plots_uri_pattern: "s3://ml-artefacts/{expver}/{basename}"
datasets_uri_pattern: "s3://ml-datasets/{name}"

datasets_uri_pattern: "s3://ml-datasets/{name}.zarr"
datasets_platform: "ewc"

weights_uri_pattern: "s3://ml-weights/{uuid}.ckpt"
weights_platform: "ewc"

Expand Down
5 changes: 5 additions & 0 deletions src/anemoi/registry/entry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class CatalogueEntry:
record = None
path = None
key = None
collection = None

@property
def url(self):
return f"{config()['web_url']}/{self.collection}/{self.key}"

def __init__(self, key=None, path=None, must_exist=True):
assert key is not None or path is not None, "key or path must be provided"
Expand Down
69 changes: 68 additions & 1 deletion src/anemoi/registry/entry/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import datetime
import logging
import os

import yaml
from anemoi.datasets import open_dataset
from anemoi.utils.humanize import when

from anemoi.registry import config

from . import CatalogueEntry

Expand All @@ -23,12 +27,75 @@ class DatasetCatalogueEntry(CatalogueEntry):
def set_status(self, status):
self.rest_item.patch([{"op": "add", "path": "/status", "value": status}])

def add_location(self, path, platform):
def build_location_path(self, platform, uri_pattern=None):
if uri_pattern is None:
assert platform == config()["datasets_platform"]
uri_pattern = config()["datasets_uri_pattern"]
LOG.debug(f"Using uri pattern from config: {uri_pattern}")
else:
LOG.debug(f"Using uri pattern: {uri_pattern}")
return uri_pattern.format(name=self.key)

def add_location(self, platform, path):
LOG.debug(f"Adding location to {platform}: {path}")
self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}])
return path

def remove_location(self, platform):
self.rest_item.patch([{"op": "remove", "path": f"/locations/{platform}"}])

def upload(self, source, target, platform="unknown", resume=True):
LOG.info(f"Uploading from {source} to {target} ")
assert target.startswith("s3://"), target

source_path = os.path.abspath(source)
kwargs = dict(
action="transfer-dataset",
source="cli",
source_path=source_path,
destination=platform,
target_path=target,
dataset=self.key,
)
LOG.info(f"Task: {kwargs}")

from anemoi.utils.s3 import upload

from anemoi.registry.tasks import TaskCatalogueEntry
from anemoi.registry.tasks import TaskCatalogueEntryList
from anemoi.registry.workers.transfer_dataset import Progress

def find_or_create_task(**kwargs):
lst = TaskCatalogueEntryList(**kwargs)

if not lst:
LOG.info("No runnning transfer found, starting one.")
uuid = TaskCatalogueEntryList().add_new_task(**kwargs)
task = TaskCatalogueEntry(key=uuid)
return task

lst = TaskCatalogueEntryList(**kwargs)
task = lst[0]
updated = datetime.datetime.fromisoformat(task.record["updated"])
if resume:
LOG.info(f"Resuming from previous transfer (last update {when(updated)})")
else:
raise ValueError(f"Transfer already in progress (last update {when(updated)})")
return task

task = find_or_create_task(**kwargs)
task.set_status("running")

progress = Progress(task, frequency=10)
LOG.info(f"Upload('{source_path}','{target}', resume=True, threads=2)")
try:
upload(source_path, target, resume=True, threads=2, progress=progress)
except:
task.set_status("stopped")
raise

task.unregister()

def set_recipe(self, file):
if not os.path.exists(file):
raise FileNotFoundError(f"Recipe file not found: {file}")
Expand Down
3 changes: 2 additions & 1 deletion src/anemoi/registry/entry/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class WeightCatalogueEntry(CatalogueEntry):
collection = "weights"
main_key = "uuid"

def add_location(self, path, platform):
def add_location(self, platform, path):
self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}])
return path

def default_location(self, **kwargs):
uri = config()["weights_uri_pattern"]
Expand Down
5 changes: 5 additions & 0 deletions src/anemoi/registry/workers/delete_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ def worker_process_task(self, task):
LOG.warning(f"Deleting {path} from '{platform}'")

tmp_path = path + ".deleting"
i = 0
while os.path.exists(tmp_path):
i += 1
tmp_path = path + ".deleting." + str(i)
os.rename(path, tmp_path)

# shutil.rmtree(tmp_path)
LOG.warning(f"Deleted {path} from '{platform}'")

Expand Down
29 changes: 25 additions & 4 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ def run(*args):
def setup_module():
teardown_module(raise_if_error=False)
run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml", "--register")
run("anemoi-registry", "experiments", "./dummy-recipe-experiment.yaml")

run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt", "--register")
run("anemoi-registry", "weights", "./dummy-checkpoint.ckpt")

if not os.path.exists(DATASET_PATH):
run("anemoi-datasets", "create", "./dummy-recipe-dataset.yaml", DATASET_PATH, "--overwrite")
assert os.path.exists(DATASET_PATH)

os.symlink(DATASET_PATH, TMP_DATASET_PATH)
run("anemoi-registry", "datasets", TMP_DATASET_PATH, "--register")
run("anemoi-registry", "datasets", TMP_DATASET_PATH)
print("# Setup done")


Expand Down Expand Up @@ -60,8 +64,25 @@ def test_datasets():
run("anemoi-registry", "datasets", TMP_DATASET)
run("anemoi-registry", "datasets", TMP_DATASET, "--set-recipe", "./dummy-recipe-dataset.yaml")
run("anemoi-registry", "datasets", TMP_DATASET, "--set-status", "testing")
run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "/the/dataset/path", "--platform", "atos")
run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "/other/path", "--platform", "leonardo")
run(
"anemoi-registry",
"datasets",
TMP_DATASET,
"--add-location",
"atos",
"--uri-pattern",
"/the/dataset/path/{name}",
)
run(
"anemoi-registry",
"datasets",
TMP_DATASET,
"--add-location",
"leonardo",
"--uri-pattern",
"https://other/{name}/path",
)
run("anemoi-registry", "datasets", TMP_DATASET, "--add-location", "ewc")


def test_weights():
Expand All @@ -72,9 +93,9 @@ def test_weights():
"weights",
"./dummy-checkpoint.ckpt",
"--add-location",
"s3://ml-weights/a5275e04-0000-0000-a0f6-be19591b09fe.ckpt",
"--platform",
"ewc",
"--location-path",
"s3://ml-weights/a5275e04-0000-0000-a0f6-be19591b09fe.ckpt",
)


Expand Down

0 comments on commit 7a3b0dc

Please sign in to comment.