diff --git a/src/anemoi/registry/commands/base.py b/src/anemoi/registry/commands/base.py index 2599df6..8bf1168 100644 --- a/src/anemoi/registry/commands/base.py +++ b/src/anemoi/registry/commands/base.py @@ -25,9 +25,6 @@ class BaseCommand(Command): internal = True timestamp = True - def check_arguments(self, args): - pass - def is_path(self, name_or_path): return os.path.exists(name_or_path) diff --git a/src/anemoi/registry/commands/datasets.py b/src/anemoi/registry/commands/datasets.py index cc91a5d..c4579fb 100644 --- a/src/anemoi/registry/commands/datasets.py +++ b/src/anemoi/registry/commands/datasets.py @@ -46,9 +46,6 @@ def add_arguments(self, command_parser): ) command_parser.add_argument("--platform", help="Platform to add the location to.") - def check_arguments(self, args): - pass - def _run(self, entry, args): # order matters self.process_task(entry, args, "unregister") diff --git a/src/anemoi/registry/commands/entry.py b/src/anemoi/registry/commands/entry.py index b57262f..8012d17 100644 --- a/src/anemoi/registry/commands/entry.py +++ b/src/anemoi/registry/commands/entry.py @@ -83,9 +83,6 @@ def add_arguments(self, command_parser): "--yaml", action="store_true", help="Use the YAML format with ``--dump`` and ``--edit``." ) - def check_arguments(self, args): - pass - def run(self, args): path = args.path if "/" not in path[1:] or not path.startswith("/"): diff --git a/src/anemoi/registry/commands/weights.py b/src/anemoi/registry/commands/weights.py index b191113..6d74196 100644 --- a/src/anemoi/registry/commands/weights.py +++ b/src/anemoi/registry/commands/weights.py @@ -44,9 +44,6 @@ def add_arguments(self, command_parser): command_parser.add_argument("--platform", help="Platform where to add the location.") command_parser.add_argument("--overwrite", help="Overwrite any existing weights.", action="store_true") - def check_arguments(self, args): - pass - def _run(self, entry, args): self.process_task(entry, args, "unregister") self.process_task(entry, args, "register", overwrite=args.overwrite) diff --git a/src/anemoi/registry/entry/__init__.py b/src/anemoi/registry/entry/__init__.py index 00c513f..f972567 100644 --- a/src/anemoi/registry/entry/__init__.py +++ b/src/anemoi/registry/entry/__init__.py @@ -31,18 +31,26 @@ class CatalogueEntry: path = None key = None - def __init__(self, key=None, path=None): + def __init__(self, key=None, path=None, must_exist=True): assert key is not None or path is not None, "key or path must be provided" if path is not None: assert key is None self.load_from_path(path) - assert self.record is not None - else: - assert key is not None - self.load_from_key(key) - assert self.record is not None + if key is not None: + assert path is None + if self.key_exists(key): + # found in catalogue so load it + self.load_from_key(key) + else: + # not found in catalogue, so create a new one + if must_exist: + raise CatalogueEntryNotFound(f"Could not find any {self.collection} with key={key}") + else: + self.create_from_new_new(key) + + assert self.record is not None assert self.key is not None, "key must be provided" self.rest_item = RestItem(self.collection, self.key) @@ -55,6 +63,9 @@ def as_json(self): def key_exists(cls, key): return RestItem(cls.collection, key).exists() + def exists(self): + return self.rest_item.exists() + def load_from_key(self, key): rest_item = RestItem(self.collection, key) if rest_item.exists(): diff --git a/src/anemoi/registry/tasks.py b/src/anemoi/registry/tasks.py index 78d5339..836509a 100644 --- a/src/anemoi/registry/tasks.py +++ b/src/anemoi/registry/tasks.py @@ -142,8 +142,10 @@ def release_ownership(self): ) def set_progress(self, progress): - assert isinstance(progress, int), progress - if not (0 <= progress <= 100): - raise ValueError("Progress must be between 0 and 100") + # progress can be a dict or an int + if isinstance(progress, int): + if not (0 <= progress <= 100): + raise ValueError("Progress must be between 0 and 100") + progress = dict(percent=progress) patch = [{"op": "add", "path": "/progress", "value": progress}] self.rest_item.patch(patch) diff --git a/src/anemoi/registry/workers/transfer_dataset.py b/src/anemoi/registry/workers/transfer_dataset.py index 7070421..3285963 100644 --- a/src/anemoi/registry/workers/transfer_dataset.py +++ b/src/anemoi/registry/workers/transfer_dataset.py @@ -5,6 +5,7 @@ # granted to it by virtue of its status as an intergovernmental organisation # nor does it submit to any jurisdiction. +import datetime import logging import os @@ -15,6 +16,32 @@ LOG = logging.getLogger(__name__) +class Progress: + latest_progress = None + + def __init__(self, task, frequency=60): + self.task = task + self.frequency = frequency + + def __call__(self, number_of_files, total_size, total_transferred, transfering, **kwargs): + now = datetime.datetime.utcnow() + + if self.latest_progress is not None and (now - self.latest_progress).seconds < self.frequency: + # already updated recently + return + + p = dict( + number_of_files=number_of_files, + total_size=total_size, + total_transferred=total_transferred, + transfering=transfering, + **kwargs, + ) + p["percentage"] = 100 * total_transferred / total_size if total_size and transfering else 0 + + self.task.set_progress(p) + + class TransferDatasetWorker(Worker): name = "transfer-dataset" @@ -83,14 +110,16 @@ def get_source_path(): if source_path.startswith("s3://"): source_path = source_path + "/" if not source_path.endswith("/") else source_path + progress = Progress(task, frequency=60) + if target_path.startswith("s3://"): LOG.warning("Uploading to S3 is experimental and has not been tested yet.") - download(source_path, target_path, resume=True, threads=self.threads) + download(source_path, target_path, resume=True, threads=self.threads, progress=progress) return else: target_tmp_path = os.path.join(self.target_dir + "-downloading", basename) os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True) - download(source_path, target_tmp_path, resume=True, threads=self.threads) + download(source_path, target_tmp_path, resume=True, threads=self.threads, progress=progress) os.rename(target_tmp_path, target_path) if self.auto_register: diff --git a/tests/dummy-recipe-experiment.yaml b/tests/dummy-recipe-experiment.yaml index 5a22ec0..2e080c0 100644 --- a/tests/dummy-recipe-experiment.yaml +++ b/tests/dummy-recipe-experiment.yaml @@ -1,7 +1,15 @@ -description: Minimal config dates: - start: 2023-06-01 00:00:00 - end: 2023-06-03 00:00:00 + end: 2023-08-31 00:00:00 frequency: 24 -evaluation: - name: quaver-basic + start: 2023-06-01 00:00:00 +description: Minimal config +input: + name: mars +metadata: + config_home: /home/user/prepml/i4df + expver: i4df + input: scorecard.yaml + owner: user + prepml_command: /home/user/venvs/user-dev24-02/bin/python3 -m prepml + prepml_module_version: '0.1' + time: '2024-02-22T17:10:31.433359'