Skip to content

Commit

Permalink
implement also upload to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jul 18, 2024
1 parent 31718ef commit 2e541e4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/anemoi/registry/commands/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def add_arguments(self, command_parser):
help="See if there are tasks for this worker and exit with 0 if there are task to do.",
action="store_true",
)
subparser.add_argument("--dry-run", help="Dry run, do not actually do anything", action="store_true")

def run(self, args):
kwargs = vars(args)
Expand Down
38 changes: 32 additions & 6 deletions src/anemoi/registry/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(
loop=False,
check_todo=False,
timeout=None,
dry_run=False,
):
"""Run a worker that will process tasks in the queue.
timeout: Kill itself after `timeout` seconds.
Expand All @@ -42,6 +43,7 @@ def __init__(
self.max_no_heartbeat = max_no_heartbeat
self.loop = loop
self.check_todo = check_todo
self.dry_run = dry_run

self.wait = wait
if timeout:
Expand Down Expand Up @@ -87,16 +89,16 @@ def process_one_task(self):
LOG.info(f"Processing task {uuid}: {task}")
self.parse_task(task) # for checking only

task.take_ownership()
self.take_ownership(task)
try:
self.process_task_with_heartbeat(task)
except Exception as e:
LOG.error(f"Error for task {task}: {e}")
LOG.exception("Exception occurred during task processing:", exc_info=e)
task.release_ownership()
self.release_ownership(task)
return
LOG.info(f"Task {uuid} completed.")
task.unregister()
self.unregister(task)
LOG.info(f"Task {uuid} deleted.")

def process_task_with_heartbeat(self, task):
Expand All @@ -106,7 +108,7 @@ def process_task_with_heartbeat(self, task):
def send_heartbeat():
while True:
try:
task.set_status("running")
self.set_status(task, "running")
except Exception:
return
for _ in range(self.heartbeat):
Expand Down Expand Up @@ -145,7 +147,7 @@ def choose_task(self):
for task in TaskCatalogueEntryList(status="queued", **self.filter_tasks):
LOG.info("Found task")
return task
LOG.info("No queued tasks found")
LOG.info(f"No queued tasks found with filter_tasks={self.filter_tasks}")

if self.max_no_heartbeat == 0:
return None
Expand All @@ -167,7 +169,31 @@ def choose_task(self):
LOG.warning(
f"Task {task.key} has been running for more than {self.max_no_heartbeat} seconds, freeing it."
)
task.release_ownership()
self.release_ownership(task)

def take_ownership(self, task):
if self.dry_run:
LOG.warning(f"Would take ownership of task {task.key} but this is only a dry run.")
return
task.take_ownership()

def release_ownership(self, task):
if self.dry_run:
LOG.warning(f"Would release ownership of task {task.key} but this is only a dry run.")
return
task.release_ownership()

def unregister(self, task):
if self.dry_run:
LOG.warning(f"Would unregister task {task.key} but this is only a dry run.")
return
task.unregister()

def set_status(self, task, status):
if self.dry_run:
LOG.warning(f"Would set status of task {task.key} to {status} but this is only a dry run.")
return
task.set_status(status)

def worker_process_task(self, task):
raise NotImplementedError("Subclasses must implement this method.")
Expand Down
3 changes: 3 additions & 0 deletions src/anemoi/registry/workers/delete_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def worker_process_task(self, task):
if platform not in locations:
LOG.warning(f"Dataset {dataset} has no locations on '{platform}'. Ignoring delete request.")
return
if self.dry_run:
LOG.warning(f"Would delete {locations[platform]['path']} from '{platform}' but this is only a dry run.")
return

path = locations[platform]["path"]
LOG.warning(f"Deleting {path} from '{platform}'")
Expand Down
14 changes: 10 additions & 4 deletions src/anemoi/registry/workers/transfer_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(
if not self.destination:
raise ValueError("No destination platform specified")

if not os.path.exists(self.target_dir):
if not os.path.exists(self.target_dir) and not self.target_dir.startswith("s3://"):
raise ValueError(f"Target directory {self.target_dir} must already exist")

def worker_process_task(self, task):
Expand Down Expand Up @@ -126,20 +126,26 @@ def get_source_path():
return

from anemoi.utils.s3 import download
from anemoi.utils.s3 import upload

LOG.info(f"Source path: {source_path}")
LOG.info(f"Target path: {target_path}")

if source_path.startswith("s3://"):
source_path = source_path + "/" if not source_path.endswith("/") else source_path

if self.dry_run:
LOG.warning(f"Would tranfer {source_path} to {target_path} but this is only a dry run.")
return

progress = Progress(task, frequency=10)

if target_path.startswith("s3://"):
LOG.warning("Uploading to S3 is experimental and has not been tested yet.")
download(source_path, target_path, resume=True, threads=self.threads, progress=progress)
return
# upload to S3 uses function upload()
LOG.info(f"Upload('{source_path}','{target_path}', resume=True, threads={self.threads})")
upload(source_path, target_path, resume=True, threads=self.threads, progress=progress)
else:
# download to local uses function download() and a temporary path
target_tmp_path = os.path.join(self.target_dir + "-downloading", basename)
os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True)
download(source_path, target_tmp_path, resume=True, threads=self.threads, progress=progress)
Expand Down

0 comments on commit 2e541e4

Please sign in to comment.