diff --git a/arq_admin/queue.py b/arq_admin/queue.py index db1c92e..cf38c11 100644 --- a/arq_admin/queue.py +++ b/arq_admin/queue.py @@ -1,6 +1,6 @@ import asyncio from dataclasses import dataclass -from typing import List, NamedTuple, Optional, Set +from typing import List, Optional, Set from arq import ArqRedis from arq.connections import RedisSettings @@ -13,15 +13,18 @@ from arq_admin.redis import get_redis -class QueueStats(NamedTuple): +@dataclass +class QueueStats: name: str host: str port: int database: int - queued_jobs: int - running_jobs: int - deferred_jobs: int + queued_jobs: Optional[int] = None + running_jobs: Optional[int] = None + deferred_jobs: Optional[int] = None + + error: Optional[str] = None @dataclass @@ -50,20 +53,24 @@ async def get_jobs(self, status: Optional[JobStatus] = None) -> List[JobInfo]: return jobs async def get_stats(self) -> QueueStats: - async with get_redis(self.redis_settings) as redis: - job_ids = await self._get_job_ids(redis) - - statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids]) - - return QueueStats( + result = QueueStats( name=self.name, host=str(self.redis_settings.host), port=self.redis_settings.port, database=self.redis_settings.database, - queued_jobs=len([status for status in statuses if status == JobStatus.queued]), - running_jobs=len([status for status in statuses if status == JobStatus.in_progress]), - deferred_jobs=len([status for status in statuses if status == JobStatus.deferred]), ) + try: + async with get_redis(self.redis_settings) as redis: + job_ids = await self._get_job_ids(redis) + statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids]) + except Exception as ex: # noqa: B902 + result.error = str(ex) + else: + result.queued_jobs = len([status for status in statuses if status == JobStatus.queued]) + result.running_jobs = len([status for status in statuses if status == JobStatus.in_progress]) + result.deferred_jobs = len([status for status in statuses if status == JobStatus.deferred]) + + return result async def get_job_by_id(self, job_id: str, redis: Optional[ArqRedis] = None) -> JobInfo: if redis is None: diff --git a/arq_admin/templates/arq_admin/queues.html b/arq_admin/templates/arq_admin/queues.html index 2859d80..7bfec9c 100644 --- a/arq_admin/templates/arq_admin/queues.html +++ b/arq_admin/templates/arq_admin/queues.html @@ -3,67 +3,88 @@ {% block title %}Queues {{ block.super }}{% endblock %} {% block extrastyle %} - {{ block.super }} - + {{ block.super }} + {% endblock %} {% block content_title %}

ARQ Queues

{% endblock %} {% block breadcrumbs %} - + {% endblock %} {% block content %} -
+
- - - - - - - - - - - - - - {% for queue in object_list %} - - - - - - - - - - {% endfor %} - -
NameQueued JobsDeferred JobsRunning JobsHostPortDB
- - {{ queue.name }} - - - - {{ queue.queued_jobs }} - - - - {{ queue.deferred_jobs }} - - - - {{ queue.running_jobs }} - - {{ queue.host }}{{ queue.port }}{{ queue.database }}
+ {% for queue in object_list %} + {% if queue.error %} +

{{ queue.name }} - {{ queue.error }}

+ {% endif %} + {% endfor %} + + + + + + + + + + + + + + {% for queue in object_list %} + + + + + + + + + + {% endfor %} + +
NameQueued JobsDeferred JobsRunning JobsHostPortDB
+ + {{ queue.name }} + + + {% if queue.queued_jobs is None %} + — + {% else %} + + {{ queue.queued_jobs }} + + {% endif %} + + {% if queue.deferred_jobs is None %} + — + {% else %} + + {{ queue.deferred_jobs }} + + {% endif %} + + {% if queue.running_jobs is None %} + — + {% else %} + + {{ queue.running_jobs }} + + {% endif %} + {{ queue.host }}{{ queue.port }}{{ queue.database }}
-
+
{% endblock %} diff --git a/arq_admin/views.py b/arq_admin/views.py index 95a609a..48d8d81 100644 --- a/arq_admin/views.py +++ b/arq_admin/views.py @@ -18,12 +18,9 @@ class QueueListView(ListView): template_name = 'arq_admin/queues.html' def get_queryset(self) -> List[QueueStats]: - async def _gather_queues() -> List[QueueStats]: - tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()] - return await asyncio.gather(*tasks) - - return asyncio.run(_gather_queues()) + result = asyncio.run(self._gather_queues()) + return result def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: context = super().get_context_data(**kwargs) @@ -31,6 +28,12 @@ def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: return context + @staticmethod + async def _gather_queues() -> List[QueueStats]: + tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()] # pragma: nocover + + return await asyncio.gather(*tasks) + @method_decorator(staff_member_required, name='dispatch') class BaseJobListView(ListView): diff --git a/setup.cfg b/setup.cfg index 688a380..e052911 100644 --- a/setup.cfg +++ b/setup.cfg @@ -68,10 +68,10 @@ filterwarnings = ignore::UserWarning:pytest.*: ignore::ResourceWarning:redis.*: junit_family = xunit1 -;addopts = -; --cov=arq_admin -; --cov-fail-under 100 -; --cov-report term-missing +addopts = + --cov=arq_admin + --cov-fail-under 100 + --cov-report term-missing [coverage:run] source = arq_admin diff --git a/tests/test_queue.py b/tests/test_queue.py index 804bf53..8a41f89 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from arq.constants import default_queue_name @@ -42,6 +42,21 @@ async def test_stats() -> None: ) +@pytest.mark.asyncio() +@patch.object(Queue, '_get_job_ids') +async def test_stats_with_error(mocked_get_job_ids: AsyncMock) -> None: + error_text = 'test error' + mocked_get_job_ids.side_effect = Exception(error_text) + queue = Queue.from_name(default_queue_name) + assert await queue.get_stats() == QueueStats( + name=default_queue_name, + host=settings.REDIS_SETTINGS.host, + port=settings.REDIS_SETTINGS.port, + database=settings.REDIS_SETTINGS.database, + error=error_text, + ) + + @pytest.mark.asyncio() @patch.object(Job, 'info') async def test_deserialize_error(mocked_job_info: MagicMock, jobs_creator: JobsCreator) -> None: