From 08cf6c5cb5b226b745ba9eb0ca431900a9897335 Mon Sep 17 00:00:00 2001 From: Florian Pinault Date: Thu, 18 Jul 2024 13:21:57 +0200 Subject: [PATCH] more robust --- src/anemoi/registry/workers/__init__.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/anemoi/registry/workers/__init__.py b/src/anemoi/registry/workers/__init__.py index b39e50f..bff70e9 100644 --- a/src/anemoi/registry/workers/__init__.py +++ b/src/anemoi/registry/workers/__init__.py @@ -81,18 +81,23 @@ def run(self): def process_one_task(self): task = self.choose_task() if not task: - return False + return uuid = task.key LOG.info(f"Processing task {uuid}: {task}") self.parse_task(task) # for checking only task.take_ownership() - self.process_task_with_heartbeat(task) + try: + self.process_task_with_heartbeat(task) + except Exception as e: + LOG.error(f"Error for task {task}: {e}") + LOG.exception("Exception occurred during task processing:", exc_info=e) + task.release_ownership() + return LOG.info(f"Task {uuid} completed.") task.unregister() LOG.info(f"Task {uuid} deleted.") - return True def process_task_with_heartbeat(self, task): STOP = [] @@ -149,7 +154,7 @@ def choose_task(self): if not cat: LOG.info("No queued tasks found") else: - LOG.info(cat.to_str(long=True)) + LOG.info(f"Tasks list \n{cat.to_str(long=True)}") # if a task is running, check if it has been running for too long, and free it for task in cat: