Skip to content

Commit

Permalink
use tmp to download
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 4, 2024
1 parent f630732 commit 1d563e5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
37 changes: 28 additions & 9 deletions src/anemoi/registry/commands/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from anemoi.registry.commands.base import BaseCommand
from anemoi.registry.entry import CatalogueEntryNotFound
from anemoi.registry.tasks import TaskCatalogueEntry
from anemoi.registry.tasks import TaskCatalogueEntryList
from anemoi.registry.utils import list_to_dict
Expand Down Expand Up @@ -45,22 +46,21 @@ def add_arguments(self, command_parser):
group.add_argument(
"--take-one", help="Take ownership of the oldest entry with status=queued", nargs="*", metavar="K=V"
)
group.add_argument("--list", help="List some queue entries", nargs="*", metavar="K=V")
group.add_argument("--worker", help="Run a worker, taking ownership of the oldest task, running it.", nargs="*")
group.add_argument("--list", help="List tasks", nargs="*", metavar="K=V")
group.add_argument("--delete-many", help="Batch remove multiple tasks", nargs="*", metavar="K=V")

command_parser.add_argument(
"--sort",
help="Sort by date. Use with --list, --worker, --take-one",
help="Sort by date. Use with --list, --take-one",
choices=["created", "updated"],
default="updated",
)
command_parser.add_argument("-l", "--long", help="Details, use with --list", action="store_true")
command_parser.add_argument("-y", "--yes", help="Assume yes", action="store_true")

def run(self, args):
if args.TASK is not None and (
args.new is not None or args.take_one is not None or args.list is not None or args.worker is not None
):
raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list or --worker")
if args.TASK is not None and (args.new is not None or args.take_one is not None or args.list is not None):
raise ValueError("Cannot use positional argument TASK with --new, --take-one or --list")

if args.TASK:
return self.run_with_uuid(args.TASK, args)
Expand All @@ -70,8 +70,9 @@ def run(self, args):
self.run_take_one(args)
if args.list is not None:
self.run_list(args)
if args.worker is not None:
self.run_worker(args)
if args.delete_many is not None:
assert args.TASK is None
self.run_delete_many(args)

def run_with_uuid(self, uuid, args):

Expand All @@ -93,6 +94,24 @@ def run_list(self, args):
cat = TaskCatalogueEntryList(*args.list, sort=args.sort)
print(cat.to_str(args.long))

def run_delete_many(self, args):
cat = TaskCatalogueEntryList(*args.delete_many, sort=args.sort)
if not cat:
LOG.info("No tasks found")
return
if not args.yes:
print(f"Do you really want to delete these {len(cat)} entries? (y/n)", end=" ")
if input("").lower() != "y":
return
while cat:
try:
entry = cat[0]
entry.unregister()
LOG.info(f"Task {entry.key} deleted.")
except CatalogueEntryNotFound:
LOG.warning(f"Task {entry.key} not found.")
LOG.info(f"{len(cat)} tasks deleted.")

def run_take_one(self, args):
cat = TaskCatalogueEntryList(*args.take_one, status="queued", sort=args.sort)
uuid = cat.take_last()
Expand Down
19 changes: 17 additions & 2 deletions src/anemoi/registry/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def __init__(self, action, destination, timeout=None, wait=1, stop_if_finished=T
if timeout:
signal.alarm(timeout)

if not os.path.exists(target_dir):
raise ValueError(f"Target directory {target_dir} must already exist")

def run(self):
while True:
res = self.process_one_task()
Expand Down Expand Up @@ -101,13 +104,25 @@ def get_source_path():

source_path = get_source_path()
basename = os.path.basename(source_path)
target_path = f"{self.target_dir}/{basename}"
target_path = os.path.join(self.target_dir, basename)
if os.path.exists(target_path):
LOG.error(f"Target path {target_path} already exists, skipping.")
return

from anemoi.utils.s3 import download

LOG.info(f"Source path: {source_path}")
LOG.info(f"Target path: {target_path}")
download(source_path, target_path, resume=True)

if target_path.startswith("s3://"):
# untested
download(source_path, target_path, resume=True)
return
else:
target_tmp_path = os.path.join(self.target_dir + "-downloading", basename)
os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True)
download(source_path, target_tmp_path, resume=True)
os.rename(target_tmp_path, target_path)

@classmethod
def parse_entry(cls, entry):
Expand Down

0 comments on commit 1d563e5

Please sign in to comment.