Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 8, 2024
1 parent 7141a12 commit 05e7a72
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 30 deletions.
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,18 @@ dynamic = [
dependencies = [
"anemoi-datasets",
"jsonpatch",
"requests",
]

optional-dependencies.all = [
"boto3",
]

optional-dependencies.dev = [
"boto3",
"nbsphinx",
"pandoc",
"pytest",
"requests",
"sphinx",
"sphinx-argparse",
"sphinx-rtd-theme",
Expand All @@ -71,14 +74,17 @@ optional-dependencies.dev = [
optional-dependencies.docs = [
"nbsphinx",
"pandoc",
"requests",
"sphinx",
"sphinx-argparse",
"sphinx-rtd-theme",
"termcolor",
"tomli",
]

optional-dependencies.s3 = [
"boto3",
]

optional-dependencies.tests = [
"pytest",
]
Expand Down
10 changes: 7 additions & 3 deletions src/anemoi/registry/commands/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ def add_arguments(self, command_parser):
action="store_true",
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")

command_parser.add_argument("--set-status", help="Set the status to the dataset")
command_parser.add_argument("--add-location", nargs="+", help="Add a location to the dataset")
command_parser.add_argument("--add-recipe", help="Add a recipe file")
command_parser.add_argument(
"--add-location",
nargs="+",
help="Path to add a location to the dataset. Implies --platform",
)
command_parser.add_argument("--platform", help="Platform to add the location to.")

def check_arguments(self, args):
pass
Expand All @@ -50,7 +54,7 @@ def _run(self, entry, args):
self.process_task(entry, args, "unregister")
self.process_task(entry, args, "register")
# self.process_task(entry, args, "remove_location")
self.process_task(entry, args, "add_location")
self.process_task(entry, args, "add_location", platform=args.platform)
self.process_task(entry, args, "add_recipe")
self.process_task(entry, args, "set_status")

Expand Down
12 changes: 3 additions & 9 deletions src/anemoi/registry/commands/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,17 @@ def add_arguments(self, command_parser):
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")

command_parser.add_argument("--add-location", nargs="+", help="Add a location to the weights")

command_parser.add_argument("--add-location", help="Add a location to the weights.")
command_parser.add_argument("--platform", help="Platform where to add the location.")
command_parser.add_argument("--overwrite", help="Overwrite any existing weights", action="store_true")

def check_arguments(self, args):
pass

def parse_location(self, location):
for x in location:
if "=" not in x:
raise ValueError(f"Invalid location format '{x}', use 'key1=value1 key2=value2' list.")
return {x.split("=")[0]: x.split("=")[1] for x in location}

def _run(self, entry, args):
self.process_task(entry, args, "unregister")
self.process_task(entry, args, "register", overwrite=args.overwrite)
self.process_task(entry, args, "add_location")
self.process_task(entry, args, "add_location", platform=args.platform)


command = Weights
7 changes: 5 additions & 2 deletions src/anemoi/registry/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ def add_arguments(self, command_parser):
default="transfer-dataset",
nargs="?",
)
command_parser.add_argument("--target-dir", help="Target directory", default=".")
command_parser.add_argument("--destination", help="Platform destination (e.g. leonardo, lumi)")
command_parser.add_argument(
"--target-dir", help="The actual target directory where the worker will write.", default="."
)
command_parser.add_argument("--published-target-dir", help="The target directory published in the catalogue.")
command_parser.add_argument("--destination", help="Platform destination (e.g. leonardo, lumi, marenostrum)")
command_parser.add_argument("--request", help="Filter tasks to process (key=value list)", nargs="*", default=[])
command_parser.add_argument("--threads", help="Number of threads to use", type=int, default=1)
command_parser.add_argument("--heartbeat", help="Heartbeat interval", type=int, default=60)
Expand Down
7 changes: 1 addition & 6 deletions src/anemoi/registry/entry/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import yaml
from anemoi.datasets import open_dataset

from anemoi.registry.utils import list_to_dict

from . import CatalogueEntry

LOG = logging.getLogger(__name__)
Expand All @@ -25,10 +23,7 @@ class DatasetCatalogueEntry(CatalogueEntry):
def set_status(self, status):
self.rest_item.patch([{"op": "add", "path": "/status", "value": status}])

def add_location(self, *args, platform=None, path=None):
if args:
return self.add_location(**list_to_dict(args))

def add_location(self, path, platform):
self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}])

def add_recipe(self, file):
Expand Down
7 changes: 1 addition & 6 deletions src/anemoi/registry/entry/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from anemoi.utils.checkpoints import load_metadata as load_checkpoint_metadata
from anemoi.utils.s3 import upload

from anemoi.registry.utils import list_to_dict

from .. import config
from . import CatalogueEntry

Expand All @@ -23,10 +21,7 @@ class WeightCatalogueEntry(CatalogueEntry):
collection = "weights"
main_key = "uuid"

def add_location(self, *args, platform=None, path=None):
if args:
return self.add_location(**list_to_dict(args))

def add_location(self, path, platform):
self.rest_item.patch([{"op": "add", "path": f"/locations/{platform}", "value": {"path": path}}])

def default_location(self, **kwargs):
Expand Down
17 changes: 15 additions & 2 deletions src/anemoi/registry/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
wait=60,
stop_if_finished=True,
target_dir=".",
publish_target_dir=None,
auto_register=True,
threads=1,
heartbeat=60,
Expand All @@ -53,6 +54,7 @@ def __init__(

self.destination = destination
self.target_dir = target_dir
self.publish_target_dir = publish_target_dir or target_dir
self.request = request
self.threads = threads
self.heartbeat = heartbeat
Expand Down Expand Up @@ -210,19 +212,30 @@ def get_source_path():
os.rename(target_tmp_path, target_path)

if self.auto_register:
dataset_entry.add_location(platform=destination, path=target_path)
published_target_path = os.path.join(self.publish_target_dir, basename)
dataset_entry.add_location(platform=destination, path=published_target_path)

@classmethod
def parse_entry(cls, entry):
data = entry.record.copy()

assert isinstance(data, dict), data
assert data["action"] == "transfer-dataset", data["action"]

def is_alphanumeric(s):
assert isinstance(s, str), s
return all(c.isalnum() or c in ("-", "_") for c in s)

destination = data.pop("destination")
source = data.pop("source")
dataset = data.pop("dataset")

assert is_alphanumeric(destination), destination
assert is_alphanumeric(source), source
assert is_alphanumeric(dataset), dataset
for k in data:
if k not in ("action", "status", "progress", "created", "updated", "uuid"):
LOG.warning(f"Unknown key {k}=data[k]")
data = None

if "/" in destination:
raise ValueError(f"Destination {destination} must not contain '/', this is a platform name")
Expand Down

0 comments on commit 05e7a72

Please sign in to comment.