Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 3, 2024
1 parent 8ee92a6 commit 1cf9f18
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 98 deletions.
2 changes: 0 additions & 2 deletions src/anemoi/registry/commands/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def add_arguments(self, command_parser):
action="store_true",
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")
command_parser.add_argument("--json", help="Output json record", action="store_true")

command_parser.add_argument("--set-status", help="Set the status to the dataset")
command_parser.add_argument("--add-location", nargs="+", help="Add a location to the dataset")
Expand All @@ -52,7 +51,6 @@ def _run(self, entry, args):
self.process_task(entry, args, "add_location")
self.process_task(entry, args, "add_recipe")
self.process_task(entry, args, "set_status")
self.process_task(entry, args, "json")


command = Datasets
2 changes: 0 additions & 2 deletions src/anemoi/registry/commands/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def add_arguments(self, command_parser):
help="Remove from catalogue (without deleting all)",
action="store_true",
)
command_parser.add_argument("--json", help="Output json record", action="store_true")
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")

command_parser.add_argument("--add-weights", nargs="+", help="Add weights to the experiment")
Expand All @@ -57,7 +56,6 @@ def _run(self, entry, args):
self.process_task(entry, args, "register", overwrite=args.overwrite)
self.process_task(entry, args, "add_weights")
self.process_task(entry, args, "add_plots")
self.process_task(entry, args, "json")


command = Experiments
57 changes: 28 additions & 29 deletions src/anemoi/registry/commands/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

from anemoi.registry.commands.base import BaseCommand
from anemoi.registry.entry import list_to_dict
from anemoi.registry.queue_manager import TaskCatalogueEntry
from anemoi.registry.rest import RestItemList
from anemoi.registry.tasks import TaskCatalogueEntry
from anemoi.registry.tasks import Worker

LOG = logging.getLogger(__name__)

Expand All @@ -35,20 +36,27 @@ def task_list_to_str(data, long):
updated = datetime.datetime.fromisoformat(v.pop("updated"))

uuid = v.pop("uuid")
content = " ".join(f"{k}={v}" for k, v in v.items())
status = v.pop("status")
progress = v.pop("progress", "")
action = v.pop("action", "")
if not long:
content = content[:20] + "..."
if "worker" in v:
v["worker"] = v["worker"].get("host")
content = " ".join(f"{k}={v}" for k, v in v.items())
rows.append(
[
action,
when(created),
when(updated),
v.pop("status"),
v.pop("progress", ""),
status,
progress,
content,
uuid,
]
)
return table(rows, ["Created", "Updated", "Status", "%", "Details", "UUID"], ["<", "<", "<", "<", "<", "<"])
return table(
rows, ["Action", "Created", "Updated", "Status", "%", "Details", "UUID"], ["<", "<", "<", "<", "<", "<", "<"]
)


class Tasks(BaseCommand):
Expand All @@ -60,12 +68,12 @@ class Tasks(BaseCommand):

def add_arguments(self, command_parser):
command_parser.add_argument("TASK", help="The uuid of the task", nargs="?")
command_parser.add_argument("--remove", help="remove from queue")
command_parser.add_argument("--set-status", help="--set-status <status>")
command_parser.add_argument("--set-progress", help="--set-progress <progress>", type=int)
command_parser.add_argument("--set-status", help="Set status of the given task", metavar="STATUS")
command_parser.add_argument(
"--set-progress", help="Set progress of the given task (0 to 100 percents)", type=int, metavar="N"
)
command_parser.add_argument("--own", help="Take ownership of a task", action="store_true")
command_parser.add_argument("--disown", help="Release a task and requeue it", action="store_true")
command_parser.add_argument("--own", help="Release a task and requeue it", action="store_true")
command_parser.add_argument("--json", help="Output json record", action="store_true")

group = command_parser.add_mutually_exclusive_group()
group.add_argument("--new", help="Add a new queue entry", nargs="*", metavar="K=V")
Expand All @@ -87,6 +95,11 @@ def add_arguments(self, command_parser):
)

def run(self, args):
if args.TASK is not None and (
args.new is not None or args.take_one is not None or args.list is not None or args.worker is not None
):
raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list or --worker")

if args.TASK:
return self.run_with_uuid(args.TASK, args)
if args.new is not None:
Expand All @@ -103,30 +116,16 @@ def run_with_uuid(self, uuid, args):
uuid = args.TASK
entry = self.entry_class(key=uuid)

self.process_task(entry, args, "remove")
self.process_task(entry, args, "disown", "release_ownership")
self.process_task(entry, args, "own", "take_ownership")
self.process_task(entry, args, "set_status")
self.process_task(entry, args, "set_progress")
self.process_task(entry, args, "json")

def run_worker(self, args):
if args.timeout:
import signal

signal.alarm(args.timeout)

data = self._retrieve_task_list(args.worker, args.sort)
if data:
self._process_one_task(data[-1]["uuid"])
else:
LOG.info("No tasks found")

def _process_one_task(self, uuid):
entry = self.entry_class(key=uuid)
LOG.info(f"Processing task {uuid}: {entry}")
res = entry.take_ownership()
print(res)
Worker(
*args.worker,
timeout=args.timeout,
).run()

def run_new(self, args):
res = TaskCatalogueEntry.new(*args.new)
Expand Down
2 changes: 0 additions & 2 deletions src/anemoi/registry/commands/weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def add_arguments(self, command_parser):
action="store_true",
)
# command_parser.add_argument("--delete", help=f"Delete the {self.kind} from the catalogue and from any other location", action="store_true")
command_parser.add_argument("--json", help="Output json record", action="store_true")

command_parser.add_argument("--add-location", nargs="+", help="Add a location to the weights")

Expand All @@ -54,7 +53,6 @@ def _run(self, entry, args):
self.process_task(entry, args, "unregister")
self.process_task(entry, args, "register", overwrite=args.overwrite)
self.process_task(entry, args, "add_location")
self.process_task(entry, args, "json")


command = Weights
1 change: 1 addition & 0 deletions src/anemoi/registry/entry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class CatalogueEntryNotFound(Exception):
def list_to_dict(lst):
assert isinstance(lst, (tuple, list)), f"lst must be a list. Got {lst} of type {type(lst)}."
for x in lst:
assert isinstance(x, str), f"lst must be a list of strings. Got {x} of type {type(x)}."
if "=" not in x:
raise ValueError(f"Invalid key-value pairs format '{x}', use 'key1=value1 key2=value2' list.")
return {x.split("=")[0]: x.split("=")[1] for x in lst}
Expand Down
63 changes: 0 additions & 63 deletions src/anemoi/registry/queue_manager.py

This file was deleted.

162 changes: 162 additions & 0 deletions src/anemoi/registry/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# (C) Copyright 2023 European Centre for Medium-Range Weather Forecasts.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import datetime
import logging
import os

from anemoi.registry import config
from anemoi.registry.entry import CatalogueEntry
from anemoi.registry.rest import RestItemList
from anemoi.registry.rest import trace_info

# from anemoi.utils.provenance import trace_info

LOG = logging.getLogger(__name__)


class Actor:
def __init__(self, **kwargs):
for k, v in kwargs.items():
if k not in ["action", "status", "created", "updated", "uuid"]:
LOG.error(f"Unknown attribute {k}={v} in actor {kwargs}")

def check(self):
LOG.info(f"Checking {self}")


class TransferDataset(Actor):
def __init__(self, *, status, source, destination, dataset, target, **kwargs):
super().__init__(**kwargs)
self.source = source
self.destination = destination
self.target = target
self.dataset = dataset

def __repr__(self):
return f"{self.__class__.__name__}(source={self.source}, target={self.target}, dataset={self.dataset})"

def run(self):
LOG.info(f"Transferring {self.dataset} from {self.source} to {self.target}")
LOG.info(f"anemoi-datasets copy {self.source}/{self.dataset} {self.target}/{self.dataset}")
c = config()
print(c)

def check(self):
super().check()
if "/" in self.destination:
raise ValueError(f"Destination {self.destination} must not contain '/', this is a platform name")

if "/" in self.source:
raise ValueError(f"Source {self.source} must not contain '/', this is a platform name")

if not os.path.exists(self.target) or not os.path.isdir(self.target):
raise ValueError(f"Target {self.target} must exist and be a directory")

if "." in self.dataset:
raise ValueError(f"The dataset {self.dataset} must not contain a '.', this is the name of the dataset.")


class Worker:
def __init__(self, *args, timeout=None):
if timeout:
import signal

signal.alarm(args.timeout)

self.args = TaskCatalogueEntry.list_to_dict(args)

def run(self):
while self.run_one_task():
pass

def run_one_task(self):
request = self.args.copy()
request["status"] = "queued"
data = RestItemList(TaskCatalogueEntry.collection).get(params=request)

if not data:
LOG.info("No tasks found")
return False

uuid = data[-1]["uuid"]
entry = TaskCatalogueEntry(key=uuid)
LOG.info(f"Processing task {uuid}: {entry}")
entry.to_actor().check()

entry.take_ownership()
actor = entry.to_actor()
actor.run()
LOG.info(f"Task {uuid} completed.")
entry.unregister()
LOG.info(f"Task {uuid} deleted.")
return True


def actor_factory(**record):
LOG.info(f"Converting task {record} to actor")
record = record.copy()
action = record.pop("action").replace("-", "_").lower()

ACTIONS = dict(
transfer_dataset=TransferDataset,
# delete_dataset=DeleteDataset,
)
if action not in ACTIONS:
raise ValueError(f"Unknown action {action}")

return ACTIONS[action](**record)


class TaskCatalogueEntry(CatalogueEntry):
collection = "queues"
main_key = "uuid"

def to_actor(self):
return actor_factory(**self.record)

@classmethod
def new(cls, *args, **kwargs):
if args:
return cls.new(**cls.list_to_dict(args))
rest_collection = RestItemList(cls.collection)
actor_factory(status=None, **kwargs).check()
return rest_collection.post(kwargs)

def set_status(self, status):
patch = [{"op": "add", "path": "/status", "value": status}]
self.rest_item.patch(patch)

def unregister(self):
return self.rest_item.delete()

def take_ownership(self):
trace = trace_info()
trace["timestamp"] = datetime.datetime.now().isoformat()
return self.rest_item.patch(
[
{"op": "test", "path": "/status", "value": "queued"},
{"op": "replace", "path": "/status", "value": "running"},
{"op": "add", "path": "/worker", "value": trace},
]
)

def release_ownership(self):
self.rest_item.patch(
[
{"op": "test", "path": "/status", "value": "running"},
{"op": "replace", "path": "/status", "value": "queued"},
{"op": "remove", "path": "/worker"},
]
)

def set_progress(self, progress):
assert isinstance(progress, int), progress
if not (0 <= progress <= 100):
raise ValueError("Progress must be between 0 and 100")
patch = [{"op": "add", "path": "/progress", "value": progress}]
self.rest_item.patch(patch)

0 comments on commit 1cf9f18

Please sign in to comment.