diff --git a/src/anemoi/registry/commands/tasks.py b/src/anemoi/registry/commands/tasks.py index e1558d7..6de4bc0 100644 --- a/src/anemoi/registry/commands/tasks.py +++ b/src/anemoi/registry/commands/tasks.py @@ -15,6 +15,7 @@ import logging from anemoi.registry.commands.base import BaseCommand +from anemoi.registry.entry import CatalogueEntryNotFound from anemoi.registry.tasks import TaskCatalogueEntry from anemoi.registry.tasks import TaskCatalogueEntryList from anemoi.registry.utils import list_to_dict @@ -45,22 +46,21 @@ def add_arguments(self, command_parser): group.add_argument( "--take-one", help="Take ownership of the oldest entry with status=queued", nargs="*", metavar="K=V" ) - group.add_argument("--list", help="List some queue entries", nargs="*", metavar="K=V") - group.add_argument("--worker", help="Run a worker, taking ownership of the oldest task, running it.", nargs="*") + group.add_argument("--list", help="List tasks", nargs="*", metavar="K=V") + group.add_argument("--delete-many", help="Batch remove multiple tasks", nargs="*", metavar="K=V") command_parser.add_argument( "--sort", - help="Sort by date. Use with --list, --worker, --take-one", + help="Sort by date. Use with --list, --take-one", choices=["created", "updated"], default="updated", ) command_parser.add_argument("-l", "--long", help="Details, use with --list", action="store_true") + command_parser.add_argument("-y", "--yes", help="Assume yes", action="store_true") def run(self, args): - if args.TASK is not None and ( - args.new is not None or args.take_one is not None or args.list is not None or args.worker is not None - ): - raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list or --worker") + if args.TASK is not None and (args.new is not None or args.take_one is not None or args.list is not None): + raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list") if args.TASK: return self.run_with_uuid(args.TASK, args) @@ -70,8 +70,9 @@ def run(self, args): self.run_take_one(args) if args.list is not None: self.run_list(args) - if args.worker is not None: - self.run_worker(args) + if args.delete_many is not None: + assert args.TASK is None + self.run_delete_many(args) def run_with_uuid(self, uuid, args): @@ -93,6 +94,24 @@ def run_list(self, args): cat = TaskCatalogueEntryList(*args.list, sort=args.sort) print(cat.to_str(args.long)) + def run_delete_many(self, args): + cat = TaskCatalogueEntryList(*args.delete_many, sort=args.sort) + if not cat: + LOG.info("No tasks found") + return + if not args.yes: + print(f"Do you really want to delete these {len(cat)} entries? (y/n)", end=" ") + if input("").lower() != "y": + return + while cat: + try: + entry = cat[0] + entry.unregister() + LOG.info(f"Task {entry.key} deleted.") + except CatalogueEntryNotFound: + LOG.warning(f"Task {entry.key} not found.") + LOG.info(f"{len(cat)} tasks deleted.") + def run_take_one(self, args): cat = TaskCatalogueEntryList(*args.take_one, status="queued", sort=args.sort) uuid = cat.take_last() diff --git a/src/anemoi/registry/workers.py b/src/anemoi/registry/workers.py index faf064d..4552257 100644 --- a/src/anemoi/registry/workers.py +++ b/src/anemoi/registry/workers.py @@ -40,6 +40,9 @@ def __init__(self, action, destination, timeout=None, wait=1, stop_if_finished=T if timeout: signal.alarm(timeout) + if not os.path.exists(target_dir): + raise ValueError(f"Target directory {target_dir} must already exist") + def run(self): while True: res = self.process_one_task() @@ -101,13 +104,25 @@ def get_source_path(): source_path = get_source_path() basename = os.path.basename(source_path) - target_path = f"{self.target_dir}/{basename}" + target_path = os.path.join(self.target_dir, basename) + if os.path.exists(target_path): + LOG.error(f"Target path {target_path} already exists, skipping.") + return from anemoi.utils.s3 import download LOG.info(f"Source path: {source_path}") LOG.info(f"Target path: {target_path}") - download(source_path, target_path, resume=True) + + if target_path.startswith("s3://"): + # untested + download(source_path, target_path, resume=True) + return + else: + target_tmp_path = os.path.join(self.target_dir + "-downloading", basename) + os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True) + download(source_path, target_tmp_path, resume=True) + os.rename(target_tmp_path, target_path) @classmethod def parse_entry(cls, entry):