From 08cbe4d9938169c05aea0d322b41fd8077b2dba5 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Fri, 5 Jul 2024 15:31:16 +0200 Subject: [PATCH] up --- src/anemoi/registry/commands/worker.py | 8 ++++- src/anemoi/registry/workers.py | 50 +++++++++++++++++--------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/anemoi/registry/commands/worker.py b/src/anemoi/registry/commands/worker.py index acd341b..904039e 100644 --- a/src/anemoi/registry/commands/worker.py +++ b/src/anemoi/registry/commands/worker.py @@ -47,7 +47,13 @@ def add_arguments(self, command_parser): command_parser.add_argument("--request", help="Filter tasks to process (key=value list)", nargs="*", default=[]) command_parser.add_argument("--threads", help="Number of threads to use", type=int, default=1) command_parser.add_argument("--heartbeat", help="Heartbeat interval", type=int, default=60) - command_parser.add_argument("--max-no-heartbeat", help="Max interval without heartbeat", type=int, default=3600) + command_parser.add_argument( + "--max-no-heartbeat", + help="Max interval without heartbeat before considering task needs to be freed.", + type=int, + default=0, + ) + command_parser.add_argument("--loop", help="Run in a loop", action="store_true") def run(self, args): kwargs = vars(args) diff --git a/src/anemoi/registry/workers.py b/src/anemoi/registry/workers.py index 41e85aa..1499594 100644 --- a/src/anemoi/registry/workers.py +++ b/src/anemoi/registry/workers.py @@ -34,7 +34,8 @@ def __init__( auto_register=True, threads=1, heartbeat=60, - max_no_heartbeat=3600, + max_no_heartbeat=0, + loop=False, request={}, ): """Run a worker that will process tasks in the queue. @@ -54,6 +55,7 @@ def __init__( self.threads = threads self.heartbeat = heartbeat self.max_no_heartbeat = max_no_heartbeat + self.loop = loop self.wait = wait self.stop_if_finished = stop_if_finished @@ -65,16 +67,18 @@ def __init__( raise ValueError(f"Target directory {target_dir} must already exist") def run(self): + if self.loop: + while True: + res = self.process_one_task() - while True: - res = self.process_one_task() - - if self.stop_if_finished and res is None: - LOG.info("All tasks have been processed, stopping.") - return + if self.stop_if_finished and res is None: + LOG.info("All tasks have been processed, stopping.") + return - LOG.info(f"Waiting {self.wait} seconds before checking again.") - time.sleep(self.wait) + LOG.info(f"Waiting {self.wait} seconds before checking again.") + time.sleep(self.wait) + else: + self.process_one_task() def choose_task(self): request = self.request.copy() @@ -86,6 +90,9 @@ def choose_task(self): return entry # else if a task is running, check if it has been running for too long, and free it + if self.max_no_heartbeat == 0: + return None + cat = TaskCatalogueEntryList(status="running", **request) if not cat: LOG.info("No queued tasks found") @@ -110,17 +117,14 @@ def process_one_task(self): self.parse_entry(entry) # for checking only entry.take_ownership() - self.process_entry(entry) + self.process_entry_with_heartbeat(entry) LOG.info(f"Task {uuid} completed.") entry.unregister() LOG.info(f"Task {uuid} deleted.") return True - def process_entry(self, entry): - - destination, source, dataset = self.parse_entry(entry) - - dataset_entry = DatasetCatalogueEntry(key=dataset) + def process_entry_with_heartbeat(self, entry): + STOP = [] # create another thread to send heartbeat def send_heartbeat(): @@ -129,11 +133,25 @@ def send_heartbeat(): entry.set_status("running") except Exception: return - time.sleep(self.heartbeat) + for _ in range(self.heartbeat): + time.sleep(1) + if len(STOP) > 0: + STOP.pop() + return thread = threading.Thread(target=send_heartbeat) thread.start() + try: + self.process_entry(entry) + finally: + STOP.append(1) # stop the heartbeat thread + thread.join() + + def process_entry(self, entry): + destination, source, dataset = self.parse_entry(entry) + dataset_entry = DatasetCatalogueEntry(key=dataset) + LOG.info(f"Transferring {dataset} from '{source}' to '{destination}'") def get_source_path():