From 1cf9f18cb2c0c92c3d36671e154948d6fec2c493 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Wed, 3 Jul 2024 17:56:53 +0200 Subject: [PATCH] wip --- src/anemoi/registry/commands/datasets.py | 2 - src/anemoi/registry/commands/experiments.py | 2 - src/anemoi/registry/commands/tasks.py | 57 ++++--- src/anemoi/registry/commands/weights.py | 2 - src/anemoi/registry/entry/__init__.py | 1 + src/anemoi/registry/queue_manager.py | 63 -------- src/anemoi/registry/tasks.py | 162 ++++++++++++++++++++ 7 files changed, 191 insertions(+), 98 deletions(-) delete mode 100644 src/anemoi/registry/queue_manager.py create mode 100644 src/anemoi/registry/tasks.py diff --git a/src/anemoi/registry/commands/datasets.py b/src/anemoi/registry/commands/datasets.py index 8ea5507..c7ef749 100644 --- a/src/anemoi/registry/commands/datasets.py +++ b/src/anemoi/registry/commands/datasets.py @@ -35,7 +35,6 @@ def add_arguments(self, command_parser): action="store_true", ) # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") - command_parser.add_argument("--json", help="Output json record", action="store_true") command_parser.add_argument("--set-status", help="Set the status to the dataset") command_parser.add_argument("--add-location", nargs="+", help="Add a location to the dataset") @@ -52,7 +51,6 @@ def _run(self, entry, args): self.process_task(entry, args, "add_location") self.process_task(entry, args, "add_recipe") self.process_task(entry, args, "set_status") - self.process_task(entry, args, "json") command = Datasets diff --git a/src/anemoi/registry/commands/experiments.py b/src/anemoi/registry/commands/experiments.py index 837882e..e810028 100644 --- a/src/anemoi/registry/commands/experiments.py +++ b/src/anemoi/registry/commands/experiments.py @@ -35,7 +35,6 @@ def add_arguments(self, command_parser): help="Remove from catalogue (without deleting all)", action="store_true", ) - command_parser.add_argument("--json", help="Output json record", action="store_true") # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") command_parser.add_argument("--add-weights", nargs="+", help="Add weights to the experiment") @@ -57,7 +56,6 @@ def _run(self, entry, args): self.process_task(entry, args, "register", overwrite=args.overwrite) self.process_task(entry, args, "add_weights") self.process_task(entry, args, "add_plots") - self.process_task(entry, args, "json") command = Experiments diff --git a/src/anemoi/registry/commands/tasks.py b/src/anemoi/registry/commands/tasks.py index a35a565..b6e8cf9 100644 --- a/src/anemoi/registry/commands/tasks.py +++ b/src/anemoi/registry/commands/tasks.py @@ -20,8 +20,9 @@ from anemoi.registry.commands.base import BaseCommand from anemoi.registry.entry import list_to_dict -from anemoi.registry.queue_manager import TaskCatalogueEntry from anemoi.registry.rest import RestItemList +from anemoi.registry.tasks import TaskCatalogueEntry +from anemoi.registry.tasks import Worker LOG = logging.getLogger(__name__) @@ -35,20 +36,27 @@ def task_list_to_str(data, long): updated = datetime.datetime.fromisoformat(v.pop("updated")) uuid = v.pop("uuid") - content = " ".join(f"{k}={v}" for k, v in v.items()) + status = v.pop("status") + progress = v.pop("progress", "") + action = v.pop("action", "") if not long: - content = content[:20] + "..." + if "worker" in v: + v["worker"] = v["worker"].get("host") + content = " ".join(f"{k}={v}" for k, v in v.items()) rows.append( [ + action, when(created), when(updated), - v.pop("status"), - v.pop("progress", ""), + status, + progress, content, uuid, ] ) - return table(rows, ["Created", "Updated", "Status", "%", "Details", "UUID"], ["<", "<", "<", "<", "<", "<"]) + return table( + rows, ["Action", "Created", "Updated", "Status", "%", "Details", "UUID"], ["<", "<", "<", "<", "<", "<", "<"] + ) class Tasks(BaseCommand): @@ -60,12 +68,12 @@ class Tasks(BaseCommand): def add_arguments(self, command_parser): command_parser.add_argument("TASK", help="The uuid of the task", nargs="?") - command_parser.add_argument("--remove", help="remove from queue") - command_parser.add_argument("--set-status", help="--set-status ") - command_parser.add_argument("--set-progress", help="--set-progress ", type=int) + command_parser.add_argument("--set-status", help="Set status of the given task", metavar="STATUS") + command_parser.add_argument( + "--set-progress", help="Set progress of the given task (0 to 100 percents)", type=int, metavar="N" + ) + command_parser.add_argument("--own", help="Take ownership of a task", action="store_true") command_parser.add_argument("--disown", help="Release a task and requeue it", action="store_true") - command_parser.add_argument("--own", help="Release a task and requeue it", action="store_true") - command_parser.add_argument("--json", help="Output json record", action="store_true") group = command_parser.add_mutually_exclusive_group() group.add_argument("--new", help="Add a new queue entry", nargs="*", metavar="K=V") @@ -87,6 +95,11 @@ def add_arguments(self, command_parser): ) def run(self, args): + if args.TASK is not None and ( + args.new is not None or args.take_one is not None or args.list is not None or args.worker is not None + ): + raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list or --worker") + if args.TASK: return self.run_with_uuid(args.TASK, args) if args.new is not None: @@ -103,30 +116,16 @@ def run_with_uuid(self, uuid, args): uuid = args.TASK entry = self.entry_class(key=uuid) - self.process_task(entry, args, "remove") self.process_task(entry, args, "disown", "release_ownership") self.process_task(entry, args, "own", "take_ownership") self.process_task(entry, args, "set_status") self.process_task(entry, args, "set_progress") - self.process_task(entry, args, "json") def run_worker(self, args): - if args.timeout: - import signal - - signal.alarm(args.timeout) - - data = self._retrieve_task_list(args.worker, args.sort) - if data: - self._process_one_task(data[-1]["uuid"]) - else: - LOG.info("No tasks found") - - def _process_one_task(self, uuid): - entry = self.entry_class(key=uuid) - LOG.info(f"Processing task {uuid}: {entry}") - res = entry.take_ownership() - print(res) + Worker( + *args.worker, + timeout=args.timeout, + ).run() def run_new(self, args): res = TaskCatalogueEntry.new(*args.new) diff --git a/src/anemoi/registry/commands/weights.py b/src/anemoi/registry/commands/weights.py index 5de3110..c315489 100644 --- a/src/anemoi/registry/commands/weights.py +++ b/src/anemoi/registry/commands/weights.py @@ -35,7 +35,6 @@ def add_arguments(self, command_parser): action="store_true", ) # command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true") - command_parser.add_argument("--json", help="Output json record", action="store_true") command_parser.add_argument("--add-location", nargs="+", help="Add a location to the weights") @@ -54,7 +53,6 @@ def _run(self, entry, args): self.process_task(entry, args, "unregister") self.process_task(entry, args, "register", overwrite=args.overwrite) self.process_task(entry, args, "add_location") - self.process_task(entry, args, "json") command = Weights diff --git a/src/anemoi/registry/entry/__init__.py b/src/anemoi/registry/entry/__init__.py index a9f646e..3fafd4e 100644 --- a/src/anemoi/registry/entry/__init__.py +++ b/src/anemoi/registry/entry/__init__.py @@ -29,6 +29,7 @@ class CatalogueEntryNotFound(Exception): def list_to_dict(lst): assert isinstance(lst, (tuple, list)), f"lst must be a list. Got {lst} of type {type(lst)}." for x in lst: + assert isinstance(x, str), f"lst must be a list of strings. Got {x} of type {type(x)}." if "=" not in x: raise ValueError(f"Invalid key-value pairs format '{x}', use 'key1=value1 key2=value2' list.") return {x.split("=")[0]: x.split("=")[1] for x in lst} diff --git a/src/anemoi/registry/queue_manager.py b/src/anemoi/registry/queue_manager.py deleted file mode 100644 index 1351723..0000000 --- a/src/anemoi/registry/queue_manager.py +++ /dev/null @@ -1,63 +0,0 @@ -# (C) Copyright 2023 European Centre for Medium-Range Weather Forecasts. -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -import datetime -import logging - -from anemoi.registry.entry import CatalogueEntry -from anemoi.registry.rest import RestItemList -from anemoi.registry.rest import trace_info - -# from anemoi.utils.provenance import trace_info - -LOG = logging.getLogger(__name__) - - -class TaskCatalogueEntry(CatalogueEntry): - collection = "queues" - main_key = "uuid" - - @classmethod - def new(cls, *args, **kwargs): - if args: - return cls.new(**cls.list_to_dict(args)) - rest_collection = RestItemList(cls.collection) - return rest_collection.post(kwargs) - - def set_status(self, status): - patch = [{"op": "add", "path": "/status", "value": status}] - self.rest_item.patch(patch) - - def unregister(self): - return self.rest_item.delete() - - def take_ownership(self): - trace = trace_info() - trace["timestamp"] = datetime.datetime.now().isoformat() - return self.rest_item.patch( - [ - {"op": "test", "path": "/status", "value": "queued"}, - {"op": "replace", "path": "/status", "value": "running"}, - {"op": "add", "path": "/worker", "value": trace}, - ] - ) - - def release_ownership(self): - self.rest_item.patch( - [ - {"op": "test", "path": "/status", "value": "running"}, - {"op": "replace", "path": "/status", "value": "queued"}, - {"op": "remove", "path": "/worker"}, - ] - ) - - def set_progress(self, progress): - assert isinstance(progress, int), progress - if not (0 <= progress <= 100): - raise ValueError("Progress must be between 0 and 100") - patch = [{"op": "add", "path": "/progress", "value": progress}] - self.rest_item.patch(patch) diff --git a/src/anemoi/registry/tasks.py b/src/anemoi/registry/tasks.py new file mode 100644 index 0000000..17f9369 --- /dev/null +++ b/src/anemoi/registry/tasks.py @@ -0,0 +1,162 @@ +# (C) Copyright 2023 European Centre for Medium-Range Weather Forecasts. +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import datetime +import logging +import os + +from anemoi.registry import config +from anemoi.registry.entry import CatalogueEntry +from anemoi.registry.rest import RestItemList +from anemoi.registry.rest import trace_info + +# from anemoi.utils.provenance import trace_info + +LOG = logging.getLogger(__name__) + + +class Actor: + def __init__(self, **kwargs): + for k, v in kwargs.items(): + if k not in ["action", "status", "created", "updated", "uuid"]: + LOG.error(f"Unknown attribute {k}={v} in actor {kwargs}") + + def check(self): + LOG.info(f"Checking {self}") + + +class TransferDataset(Actor): + def __init__(self, *, status, source, destination, dataset, target, **kwargs): + super().__init__(**kwargs) + self.source = source + self.destination = destination + self.target = target + self.dataset = dataset + + def __repr__(self): + return f"{self.__class__.__name__}(source={self.source}, target={self.target}, dataset={self.dataset})" + + def run(self): + LOG.info(f"Transferring {self.dataset} from {self.source} to {self.target}") + LOG.info(f"anemoi-datasets copy {self.source}/{self.dataset} {self.target}/{self.dataset}") + c = config() + print(c) + + def check(self): + super().check() + if "/" in self.destination: + raise ValueError(f"Destination {self.destination} must not contain '/', this is a platform name") + + if "/" in self.source: + raise ValueError(f"Source {self.source} must not contain '/', this is a platform name") + + if not os.path.exists(self.target) or not os.path.isdir(self.target): + raise ValueError(f"Target {self.target} must exist and be a directory") + + if "." in self.dataset: + raise ValueError(f"The dataset {self.dataset} must not contain a '.', this is the name of the dataset.") + + +class Worker: + def __init__(self, *args, timeout=None): + if timeout: + import signal + + signal.alarm(args.timeout) + + self.args = TaskCatalogueEntry.list_to_dict(args) + + def run(self): + while self.run_one_task(): + pass + + def run_one_task(self): + request = self.args.copy() + request["status"] = "queued" + data = RestItemList(TaskCatalogueEntry.collection).get(params=request) + + if not data: + LOG.info("No tasks found") + return False + + uuid = data[-1]["uuid"] + entry = TaskCatalogueEntry(key=uuid) + LOG.info(f"Processing task {uuid}: {entry}") + entry.to_actor().check() + + entry.take_ownership() + actor = entry.to_actor() + actor.run() + LOG.info(f"Task {uuid} completed.") + entry.unregister() + LOG.info(f"Task {uuid} deleted.") + return True + + +def actor_factory(**record): + LOG.info(f"Converting task {record} to actor") + record = record.copy() + action = record.pop("action").replace("-", "_").lower() + + ACTIONS = dict( + transfer_dataset=TransferDataset, + # delete_dataset=DeleteDataset, + ) + if action not in ACTIONS: + raise ValueError(f"Unknown action {action}") + + return ACTIONS[action](**record) + + +class TaskCatalogueEntry(CatalogueEntry): + collection = "queues" + main_key = "uuid" + + def to_actor(self): + return actor_factory(**self.record) + + @classmethod + def new(cls, *args, **kwargs): + if args: + return cls.new(**cls.list_to_dict(args)) + rest_collection = RestItemList(cls.collection) + actor_factory(status=None, **kwargs).check() + return rest_collection.post(kwargs) + + def set_status(self, status): + patch = [{"op": "add", "path": "/status", "value": status}] + self.rest_item.patch(patch) + + def unregister(self): + return self.rest_item.delete() + + def take_ownership(self): + trace = trace_info() + trace["timestamp"] = datetime.datetime.now().isoformat() + return self.rest_item.patch( + [ + {"op": "test", "path": "/status", "value": "queued"}, + {"op": "replace", "path": "/status", "value": "running"}, + {"op": "add", "path": "/worker", "value": trace}, + ] + ) + + def release_ownership(self): + self.rest_item.patch( + [ + {"op": "test", "path": "/status", "value": "running"}, + {"op": "replace", "path": "/status", "value": "queued"}, + {"op": "remove", "path": "/worker"}, + ] + ) + + def set_progress(self, progress): + assert isinstance(progress, int), progress + if not (0 <= progress <= 100): + raise ValueError("Progress must be between 0 and 100") + patch = [{"op": "add", "path": "/progress", "value": progress}] + self.rest_item.patch(patch)