diff --git a/src/anemoi/registry/workers/__init__.py b/src/anemoi/registry/workers/__init__.py index b39e50f..bff70e9 100644 --- a/src/anemoi/registry/workers/__init__.py +++ b/src/anemoi/registry/workers/__init__.py @@ -81,18 +81,23 @@ def run(self): def process_one_task(self): task = self.choose_task() if not task: - return False + return uuid = task.key LOG.info(f"Processing task {uuid}: {task}") self.parse_task(task) # for checking only task.take_ownership() - self.process_task_with_heartbeat(task) + try: + self.process_task_with_heartbeat(task) + except Exception as e: + LOG.error(f"Error for task {task}: {e}") + LOG.exception("Exception occurred during task processing:", exc_info=e) + task.release_ownership() + return LOG.info(f"Task {uuid} completed.") task.unregister() LOG.info(f"Task {uuid} deleted.") - return True def process_task_with_heartbeat(self, task): STOP = [] @@ -149,7 +154,7 @@ def choose_task(self): if not cat: LOG.info("No queued tasks found") else: - LOG.info(cat.to_str(long=True)) + LOG.info(f"Tasks list \n{cat.to_str(long=True)}") # if a task is running, check if it has been running for too long, and free it for task in cat: