From 63903efe5e46e1cf83ca9a5a77d08ca2facb64b9 Mon Sep 17 00:00:00 2001 From: jakubmonhart Date: Sat, 6 Jan 2024 15:46:45 +0100 Subject: [PATCH 1/2] Read correct attribute of message. --- shepherd/shepherd/shepherd.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/shepherd/shepherd/shepherd.py b/shepherd/shepherd/shepherd.py index 9f86409..130b3dc 100644 --- a/shepherd/shepherd/shepherd.py +++ b/shepherd/shepherd/shepherd.py @@ -300,12 +300,11 @@ async def _listen(self) -> None: "exception_traceback": message.exception_traceback }) await self._report_job_failed(job_id, error, sheep) - self._job_status.pop(job_id) - logging.info('Job `%s` from sheep `%s` failed (%s)', job_id, sheep_id, message.short_error) + logging.info('Job `%s` from sheep `%s` failed (%s)', job_id, sheep_id, message.message) - # notify about the finished job sheep.in_progress.remove(job_id) + # notify about the finished job async with self.job_done_condition: self.job_done_condition.notify_all() From 4877acf037adb1a9a6fd813700b3aa1c8a69b26e Mon Sep 17 00:00:00 2001 From: jakubmonhart Date: Mon, 8 Jan 2024 11:20:30 +0100 Subject: [PATCH 2/2] Notify about the finished job only once. --- shepherd/shepherd/shepherd.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/shepherd/shepherd/shepherd.py b/shepherd/shepherd/shepherd.py index 130b3dc..c08802e 100644 --- a/shepherd/shepherd/shepherd.py +++ b/shepherd/shepherd/shepherd.py @@ -293,6 +293,11 @@ async def _listen(self) -> None: status.finished_at = datetime.utcnow() await self._job_status_update_queue.enqueue_task(self._storage.set_job_status(job_id, status.copy())) logging.info('Job `%s` from sheep `%s` done', job_id, sheep_id) + + # notify about the finished job + async with self.job_done_condition: + self.job_done_condition.notify_all() + elif isinstance(message, ErrorMessage): error = ErrorModel({ "message": message.message, @@ -303,11 +308,7 @@ async def _listen(self) -> None: logging.info('Job `%s` from sheep `%s` failed (%s)', job_id, sheep_id, message.message) sheep.in_progress.remove(job_id) - - # notify about the finished job - async with self.job_done_condition: - self.job_done_condition.notify_all() - + def get_status(self) -> Generator[Tuple[str, SheepModel], None, None]: """ Get status information for all sheep