Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 4, 2024
1 parent 7599842 commit 1dab082
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/anemoi/registry/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def add_arguments(self, command_parser):
command_parser.add_argument("--target-dir", help="Target directory", default=".")
command_parser.add_argument("--destination", help="Platform destination (e.g. leonardo, lumi)")
command_parser.add_argument("--request", help="Filter tasks to process (key=value list)", nargs="*", default=[])
command_parser.add_argument("--threads", help="Number of threads to use", type=int, default=1)

def run(self, args):
kwargs = vars(args)
Expand Down
8 changes: 5 additions & 3 deletions src/anemoi/registry/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
stop_if_finished=True,
target_dir=".",
auto_register=True,
threads=1,
request={},
):
"""Run a worker that will process tasks in the queue.
Expand All @@ -44,6 +45,7 @@ def __init__(
self.destination = destination
self.target_dir = target_dir
self.request = request
self.threads = threads

self.wait = wait
self.stop_if_finished = stop_if_finished
Expand Down Expand Up @@ -92,7 +94,7 @@ def process_entry(self, entry):

dataset_entry = DatasetCatalogueEntry(key=dataset)

LOG.info(f"Transferring {dataset} from {source} to {destination}")
LOG.info(f"Transferring {dataset} from '{source}' to '{destination}'")

def get_source_path():
e = dataset_entry.record
Expand Down Expand Up @@ -129,12 +131,12 @@ def get_source_path():

if target_path.startswith("s3://"):
# untested
download(source_path, target_path, resume=True)
download(source_path, target_path, resume=True, threads=self.threads)
return
else:
target_tmp_path = os.path.join(self.target_dir + "-downloading", basename)
os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True)
download(source_path, target_tmp_path, resume=True)
download(source_path, target_tmp_path, resume=True, threads=self.threads)
os.rename(target_tmp_path, target_path)

if self.auto_register:
Expand Down

0 comments on commit 1dab082

Please sign in to comment.