diff --git a/src/anemoi/registry/commands/worker.py b/src/anemoi/registry/commands/worker.py index 1be55a2..7333d0b 100644 --- a/src/anemoi/registry/commands/worker.py +++ b/src/anemoi/registry/commands/worker.py @@ -66,6 +66,7 @@ def add_arguments(self, command_parser): help="See if there are tasks for this worker and exit with 0 if there are task to do.", action="store_true", ) + subparser.add_argument("--dry-run", help="Dry run, do not actually do anything", action="store_true") def run(self, args): kwargs = vars(args) diff --git a/src/anemoi/registry/workers/__init__.py b/src/anemoi/registry/workers/__init__.py index bff70e9..4d9364b 100644 --- a/src/anemoi/registry/workers/__init__.py +++ b/src/anemoi/registry/workers/__init__.py @@ -33,6 +33,7 @@ def __init__( loop=False, check_todo=False, timeout=None, + dry_run=False, ): """Run a worker that will process tasks in the queue. timeout: Kill itself after `timeout` seconds. @@ -42,6 +43,7 @@ def __init__( self.max_no_heartbeat = max_no_heartbeat self.loop = loop self.check_todo = check_todo + self.dry_run = dry_run self.wait = wait if timeout: @@ -87,16 +89,16 @@ def process_one_task(self): LOG.info(f"Processing task {uuid}: {task}") self.parse_task(task) # for checking only - task.take_ownership() + self.take_ownership(task) try: self.process_task_with_heartbeat(task) except Exception as e: LOG.error(f"Error for task {task}: {e}") LOG.exception("Exception occurred during task processing:", exc_info=e) - task.release_ownership() + self.release_ownership(task) return LOG.info(f"Task {uuid} completed.") - task.unregister() + self.unregister(task) LOG.info(f"Task {uuid} deleted.") def process_task_with_heartbeat(self, task): @@ -106,7 +108,7 @@ def process_task_with_heartbeat(self, task): def send_heartbeat(): while True: try: - task.set_status("running") + self.set_status(task, "running") except Exception: return for _ in range(self.heartbeat): @@ -145,7 +147,7 @@ def choose_task(self): for task in TaskCatalogueEntryList(status="queued", **self.filter_tasks): LOG.info("Found task") return task - LOG.info("No queued tasks found") + LOG.info(f"No queued tasks found with filter_tasks={self.filter_tasks}") if self.max_no_heartbeat == 0: return None @@ -167,7 +169,31 @@ def choose_task(self): LOG.warning( f"Task {task.key} has been running for more than {self.max_no_heartbeat} seconds, freeing it." ) - task.release_ownership() + self.release_ownership(task) + + def take_ownership(self, task): + if self.dry_run: + LOG.warning(f"Would take ownership of task {task.key} but this is only a dry run.") + return + task.take_ownership() + + def release_ownership(self, task): + if self.dry_run: + LOG.warning(f"Would release ownership of task {task.key} but this is only a dry run.") + return + task.release_ownership() + + def unregister(self, task): + if self.dry_run: + LOG.warning(f"Would unregister task {task.key} but this is only a dry run.") + return + task.unregister() + + def set_status(self, task, status): + if self.dry_run: + LOG.warning(f"Would set status of task {task.key} to {status} but this is only a dry run.") + return + task.set_status(status) def worker_process_task(self, task): raise NotImplementedError("Subclasses must implement this method.") diff --git a/src/anemoi/registry/workers/delete_dataset.py b/src/anemoi/registry/workers/delete_dataset.py index 6bfc820..43d96f1 100644 --- a/src/anemoi/registry/workers/delete_dataset.py +++ b/src/anemoi/registry/workers/delete_dataset.py @@ -42,6 +42,9 @@ def worker_process_task(self, task): if platform not in locations: LOG.warning(f"Dataset {dataset} has no locations on '{platform}'. Ignoring delete request.") return + if self.dry_run: + LOG.warning(f"Would delete {locations[platform]['path']} from '{platform}' but this is only a dry run.") + return path = locations[platform]["path"] LOG.warning(f"Deleting {path} from '{platform}'") diff --git a/src/anemoi/registry/workers/transfer_dataset.py b/src/anemoi/registry/workers/transfer_dataset.py index ac41f18..16ae156 100644 --- a/src/anemoi/registry/workers/transfer_dataset.py +++ b/src/anemoi/registry/workers/transfer_dataset.py @@ -91,7 +91,7 @@ def __init__( if not self.destination: raise ValueError("No destination platform specified") - if not os.path.exists(self.target_dir): + if not os.path.exists(self.target_dir) and not self.target_dir.startswith("s3://"): raise ValueError(f"Target directory {self.target_dir} must already exist") def worker_process_task(self, task): @@ -126,6 +126,7 @@ def get_source_path(): return from anemoi.utils.s3 import download + from anemoi.utils.s3 import upload LOG.info(f"Source path: {source_path}") LOG.info(f"Target path: {target_path}") @@ -133,13 +134,18 @@ def get_source_path(): if source_path.startswith("s3://"): source_path = source_path + "/" if not source_path.endswith("/") else source_path + if self.dry_run: + LOG.warning(f"Would tranfer {source_path} to {target_path} but this is only a dry run.") + return + progress = Progress(task, frequency=10) if target_path.startswith("s3://"): - LOG.warning("Uploading to S3 is experimental and has not been tested yet.") - download(source_path, target_path, resume=True, threads=self.threads, progress=progress) - return + # upload to S3 uses function upload() + LOG.info(f"Upload('{source_path}','{target_path}', resume=True, threads={self.threads})") + upload(source_path, target_path, resume=True, threads=self.threads, progress=progress) else: + # download to local uses function download() and a temporary path target_tmp_path = os.path.join(self.target_dir + "-downloading", basename) os.makedirs(os.path.dirname(target_tmp_path), exist_ok=True) download(source_path, target_tmp_path, resume=True, threads=self.threads, progress=progress)