Skip to content

Commit

Permalink
Handle redis errors on queues view (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
SlavaSkvortsov authored Oct 16, 2022
1 parent 2175aaa commit f90e39d
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 74 deletions.
35 changes: 21 additions & 14 deletions arq_admin/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from dataclasses import dataclass
from typing import List, NamedTuple, Optional, Set
from typing import List, Optional, Set

from arq import ArqRedis
from arq.connections import RedisSettings
Expand All @@ -13,15 +13,18 @@
from arq_admin.redis import get_redis


class QueueStats(NamedTuple):
@dataclass
class QueueStats:
name: str
host: str
port: int
database: int

queued_jobs: int
running_jobs: int
deferred_jobs: int
queued_jobs: Optional[int] = None
running_jobs: Optional[int] = None
deferred_jobs: Optional[int] = None

error: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -50,20 +53,24 @@ async def get_jobs(self, status: Optional[JobStatus] = None) -> List[JobInfo]:
return jobs

async def get_stats(self) -> QueueStats:
async with get_redis(self.redis_settings) as redis:
job_ids = await self._get_job_ids(redis)

statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids])

return QueueStats(
result = QueueStats(
name=self.name,
host=str(self.redis_settings.host),
port=self.redis_settings.port,
database=self.redis_settings.database,
queued_jobs=len([status for status in statuses if status == JobStatus.queued]),
running_jobs=len([status for status in statuses if status == JobStatus.in_progress]),
deferred_jobs=len([status for status in statuses if status == JobStatus.deferred]),
)
try:
async with get_redis(self.redis_settings) as redis:
job_ids = await self._get_job_ids(redis)
statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids])
except Exception as ex: # noqa: B902
result.error = str(ex)
else:
result.queued_jobs = len([status for status in statuses if status == JobStatus.queued])
result.running_jobs = len([status for status in statuses if status == JobStatus.in_progress])
result.deferred_jobs = len([status for status in statuses if status == JobStatus.deferred])

return result

async def get_job_by_id(self, job_id: str, redis: Optional[ArqRedis] = None) -> JobInfo:
if redis is None:
Expand Down
121 changes: 71 additions & 50 deletions arq_admin/templates/arq_admin/queues.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,67 +3,88 @@
{% block title %}Queues {{ block.super }}{% endblock %}

{% block extrastyle %}
{{ block.super }}
<style>table {width: 100%;}</style>
{{ block.super }}
<style>
table {
width: 100%;
}
</style>
{% endblock %}

{% block content_title %}<h1>ARQ Queues</h1>{% endblock %}

{% block breadcrumbs %}
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">Home</a> &rsaquo;
<a href="{% url 'arq_admin:home' %}">Django ARQ</a>
</div>
<div class="breadcrumbs">
<a href="{% url 'admin:index' %}">Home</a> &rsaquo;
<a href="{% url 'arq_admin:home' %}">Django ARQ</a>
</div>
{% endblock %}

{% block content %}

<div id="content-main">
<div id="content-main">

<div class="module">
<table>
<thead>
<tr>
<th>Name</th>
<th>Queued Jobs</th>
<th>Deferred Jobs</th>
<th>Running Jobs</th>
<th>Host</th>
<th>Port</th>
<th>DB</th>
</tr>
</thead>
<tbody>
{% for queue in object_list %}
<tr class = "{% cycle 'row1' 'row2' %}">
<th>
<a href="{% url 'arq_admin:all_jobs' queue.name %}">
{{ queue.name }}
</a>
</th>
<td>
<a href="{% url 'arq_admin:queued_jobs' queue.name %}">
{{ queue.queued_jobs }}
</a>
</td>
<th>
<a href="{% url 'arq_admin:deferred_jobs' queue.name %}">
{{ queue.deferred_jobs }}
</a>
</th>
<th>
<a href="{% url 'arq_admin:running_jobs' queue.name %}">
{{ queue.running_jobs }}
</a>
</th>
<td>{{ queue.host }}</td>
<td>{{ queue.port }}</td>
<td>{{ queue.database }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% for queue in object_list %}
{% if queue.error %}
<h2>{{ queue.name }} - <span style="color: red;">{{ queue.error }}</span></h2>
{% endif %}
{% endfor %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Queued Jobs</th>
<th>Deferred Jobs</th>
<th>Running Jobs</th>
<th>Host</th>
<th>Port</th>
<th>DB</th>
</tr>
</thead>
<tbody>
{% for queue in object_list %}
<tr class="{% cycle 'row1' 'row2' %}">
<th>
<a href="{% url 'arq_admin:all_jobs' queue.name %}">
{{ queue.name }}
</a>
</th>
<td>
{% if queue.queued_jobs is None %}
{% else %}
<a href="{% url 'arq_admin:queued_jobs' queue.name %}">
{{ queue.queued_jobs }}
</a>
{% endif %}
</td>
<th>
{% if queue.deferred_jobs is None %}
{% else %}
<a href="{% url 'arq_admin:deferred_jobs' queue.name %}">
{{ queue.deferred_jobs }}
</a>
{% endif %}
</th>
<th>
{% if queue.running_jobs is None %}
{% else %}
<a href="{% url 'arq_admin:running_jobs' queue.name %}">
{{ queue.running_jobs }}
</a>
{% endif %}
</th>
<td>{{ queue.host }}</td>
<td>{{ queue.port }}</td>
<td>{{ queue.database }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>

{% endblock %}
13 changes: 8 additions & 5 deletions arq_admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ class QueueListView(ListView):
template_name = 'arq_admin/queues.html'

def get_queryset(self) -> List[QueueStats]:
async def _gather_queues() -> List[QueueStats]:
tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()]

return await asyncio.gather(*tasks)

return asyncio.run(_gather_queues())
result = asyncio.run(self._gather_queues())
return result

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
context.update(admin.site.each_context(self.request))

return context

@staticmethod
async def _gather_queues() -> List[QueueStats]:
tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()] # pragma: nocover

return await asyncio.gather(*tasks)


@method_decorator(staff_member_required, name='dispatch')
class BaseJobListView(ListView):
Expand Down
8 changes: 4 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ filterwarnings =
ignore::UserWarning:pytest.*:
ignore::ResourceWarning:redis.*:
junit_family = xunit1
;addopts =
; --cov=arq_admin
; --cov-fail-under 100
; --cov-report term-missing
addopts =
--cov=arq_admin
--cov-fail-under 100
--cov-report term-missing

[coverage:run]
source = arq_admin
Expand Down
17 changes: 16 additions & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from arq.constants import default_queue_name
Expand Down Expand Up @@ -42,6 +42,21 @@ async def test_stats() -> None:
)


@pytest.mark.asyncio()
@patch.object(Queue, '_get_job_ids')
async def test_stats_with_error(mocked_get_job_ids: AsyncMock) -> None:
error_text = 'test error'
mocked_get_job_ids.side_effect = Exception(error_text)
queue = Queue.from_name(default_queue_name)
assert await queue.get_stats() == QueueStats(
name=default_queue_name,
host=settings.REDIS_SETTINGS.host,
port=settings.REDIS_SETTINGS.port,
database=settings.REDIS_SETTINGS.database,
error=error_text,
)


@pytest.mark.asyncio()
@patch.object(Job, 'info')
async def test_deserialize_error(mocked_job_info: MagicMock, jobs_creator: JobsCreator) -> None:
Expand Down

0 comments on commit f90e39d

Please sign in to comment.