Skip to content

Commit

Permalink
Merge pull request #4 from AngellusMortis/bugfix/arq-0.23
Browse files Browse the repository at this point in the history
  • Loading branch information
SlavaSkvortsov authored Aug 26, 2022
2 parents 6ef299b + 4d4f623 commit 378eb71
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 144 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]
python-version: ["3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v2
Expand All @@ -35,4 +35,4 @@ jobs:
- name: Upload report
uses: codecov/[email protected]
with:
name: Python ${{ matrix.python-version }}
name: Python ${{ matrix.python-version }}
7 changes: 2 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ coverage-report:
coverage: coverage-collect coverage-report

mypy:
mypy .
mypy arq_admin tests *.py

flake8:
flake8 .
Expand All @@ -24,7 +24,4 @@ isort:
bandit:
bandit -q -r .

safety:
safety check --bare --full-report -r requirements.txt -r requirements-dev.txt

check: isort flake8 mypy bandit safety test
check: isort flake8 mypy bandit test
1 change: 1 addition & 0 deletions arq_admin/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

class ArqAdminConfig(AppConfig):
name = 'arq_admin'
verbose_name = 'ARQ Admin'
17 changes: 10 additions & 7 deletions arq_admin/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from dataclasses import dataclass
from typing import List, NamedTuple, Optional
from typing import List, NamedTuple, Optional, Set

from arq import ArqRedis
from arq.connections import RedisSettings
Expand Down Expand Up @@ -38,9 +38,7 @@ def from_name(cls, name: str) -> 'Queue':

async def get_jobs(self, status: Optional[JobStatus] = None) -> List[JobInfo]:
async with get_redis(self.redis_settings) as redis:
job_ids = set(await redis.zrangebyscore(self.name))
result_keys = await redis.keys(f'{result_key_prefix}*')
job_ids |= {key[len(result_key_prefix):] for key in result_keys}
job_ids = await self._get_job_ids(redis)

if status:
job_ids_tuple = tuple(job_ids)
Expand All @@ -53,9 +51,7 @@ async def get_jobs(self, status: Optional[JobStatus] = None) -> List[JobInfo]:

async def get_stats(self) -> QueueStats:
async with get_redis(self.redis_settings) as redis:
job_ids = set(await redis.zrangebyscore(self.name))
result_keys = await redis.keys(f'{result_key_prefix}*')
job_ids |= {key[len(result_key_prefix):] for key in result_keys}
job_ids = await self._get_job_ids(redis)

statuses = await asyncio.gather(*[self._get_job_status(job_id, redis) for job_id in job_ids])

Expand Down Expand Up @@ -113,3 +109,10 @@ async def _get_job_status(self, job_id: str, redis: ArqRedis) -> JobStatus:
_deserializer=settings.ARQ_DESERIALIZER,
)
return await arq_job.status()

async def _get_job_ids(self, redis: ArqRedis) -> Set[str]:
raw_job_ids = set(await redis.zrangebyscore(self.name, '-inf', 'inf'))
result_keys = await redis.keys(f'{result_key_prefix}*')
raw_job_ids |= {key[len(result_key_prefix):] for key in result_keys}

return {job_id.decode('utf-8') if isinstance(job_id, bytes) else job_id for job_id in raw_job_ids}
8 changes: 4 additions & 4 deletions arq_admin/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import AsyncGenerator

from arq.connections import ArqRedis, RedisSettings, create_pool
from arq.constants import default_queue_name


@asynccontextmanager
async def get_redis(setting: RedisSettings) -> AsyncGenerator[ArqRedis, None]:
redis = await create_pool(setting)
async def get_redis(setting: RedisSettings, queue_name: str = default_queue_name) -> AsyncGenerator[ArqRedis, None]:
redis = await create_pool(setting, default_queue_name=queue_name)
try:
yield redis
finally:
redis.close()
await redis.wait_closed()
await redis.close()
Empty file removed arq_admin/utils.py
Empty file.
26 changes: 11 additions & 15 deletions arq_admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,19 @@ class QueueListView(ListView):
template_name = 'arq_admin/queues.html'

def get_queryset(self) -> List[QueueStats]:
return asyncio.run(self._gather_queues())
async def _gather_queues() -> List[QueueStats]:
tasks = [Queue.from_name(name).get_stats() for name in ARQ_QUEUES.keys()]

return await asyncio.gather(*tasks)

return asyncio.run(_gather_queues())

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
context.update(admin.site.each_context(self.request))

return context

@staticmethod
async def _gather_queues() -> List[QueueStats]:
tasks = []

for name in ARQ_QUEUES.keys():
queue = Queue.from_name(name)
tasks.append(queue.get_stats())

return await asyncio.gather(*tasks)


@method_decorator(staff_member_required, name='dispatch')
class BaseJobListView(ListView):
Expand All @@ -53,9 +48,10 @@ def job_status(self) -> str:
return self.status.value.capitalize() if self.status else 'Unknown'

def get_queryset(self) -> List[JobInfo]:
queue_name = self.kwargs['queue_name']
queue = Queue.from_name(queue_name)
return sorted(asyncio.run(queue.get_jobs(status=self.status)), key=attrgetter('enqueue_time'))
queue_name = self.kwargs['queue_name'] # pragma: no cover # looks like a pytest-cov bug coz the rows below
queue = Queue.from_name(queue_name) # pragma: no cover # are covered
jobs = asyncio.run(queue.get_jobs(status=self.status))
return sorted(jobs, key=attrgetter('enqueue_time'))

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
context = super().get_context_data(**kwargs)
Expand Down Expand Up @@ -88,7 +84,7 @@ class JobDetailView(DetailView):
template_name = 'arq_admin/job_detail.html'

def get_object(self, queryset: Optional[Any] = None) -> JobInfo:
queue = Queue.from_name(self.kwargs['queue_name'])
queue = Queue.from_name(self.kwargs['queue_name']) # pragma: no cover
return asyncio.run(queue.get_job_by_id(self.kwargs['job_id']))

def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
Expand Down
35 changes: 16 additions & 19 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
-r requirements.txt

asynctest==0.13.0
pytest==7.1.1
pytest-django==4.5.2
pytest-asyncio==0.18.3
pytest==7.1.2
pytest-django==4.1.0
pytest-asyncio==0.19.0
pytest-cov==3.0.0
coverage==6.3.2
coverage==6.4.4

bandit==1.7.4
isort==5.10.1
mypy==0.942
safety==1.10.3
cognitive-complexity==1.2.0
mypy==0.971
cognitive-complexity==1.3.0

mccabe==0.6.1
flake8==4.0.1
mccabe==0.7.0
flake8==5.0.4
flake8-blind-except==0.2.1
flake8-broken-line==0.4.0
flake8-bugbear==22.3.23
flake8-broken-line==0.5.0
flake8-bugbear==22.8.23
flake8-builtins==1.5.3
flake8-class-attributes-order==0.1.3
flake8-cognitive-complexity==0.1.0
flake8-commas==2.1.0
flake8-comprehensions==3.8.0
flake8-debugger==4.0.0
flake8-eradicate==1.2.0
flake8-comprehensions==3.10.0
flake8-debugger==4.1.2
flake8-eradicate==1.3.0
flake8-functions==0.0.7
flake8-isort==4.1.1
flake8-mock==0.3
flake8-isort==4.2.0
flake8-mutable==1.2.0
flake8-print==4.0.0
flake8-pytest==1.3
flake8-print==5.0.0
flake8-pytest==1.4
flake8-pytest-style==1.6.0
flake8-quotes==3.3.1
flake8-string-format==0.3.0
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
arq==0.19.1
Django==3.2.12
arq==0.23
Django==4.1
12 changes: 6 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ exclude = venv
[tool:pytest]
DJANGO_SETTINGS_MODULE = tests.settings
django_find_project = false
asyncio_mode=auto
filterwarnings =
error
ignore::DeprecationWarning:asynctest.*:
ignore::ResourceWarning:.*:
ignore::UserWarning:pytest.*:
ignore::ResourceWarning:redis.*:
junit_family = xunit1
addopts =
--cov=arq_admin
--cov-fail-under 100
;addopts =
; --cov=arq_admin
; --cov-fail-under 100
; --cov-report term-missing

[coverage:run]
source = arq_admin
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def get_requirements() -> List[str]:
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: System :: Monitoring',
Expand Down
67 changes: 51 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@
)

import pytest
import pytest_asyncio
from arq import ArqRedis, Worker
from arq.constants import job_key_prefix
from arq.jobs import Job
from arq.jobs import Job, JobStatus
from arq.typing import WorkerCoroutine
from arq.worker import Function
from asgiref.sync import sync_to_async
from django.contrib.auth.models import User
from django.test import AsyncClient

from arq_admin.redis import get_redis
from tests.settings import REDIS_SETTINGS


@pytest.fixture(autouse=True)
@pytest_asyncio.fixture(autouse=True)
async def redis() -> AsyncGenerator[ArqRedis, None]:
async with get_redis(REDIS_SETTINGS) as redis:
await redis.flushall()
yield redis
await redis.flushall()


@pytest.fixture()
@pytest_asyncio.fixture()
async def create_worker(redis: ArqRedis) -> AsyncGenerator[Callable[[Any], Worker], None]:
worker: Optional[Worker] = None

Expand All @@ -50,11 +55,11 @@ async def deferred_task(_ctx: Any) -> None:


async def running_task(_ctx: Any) -> None:
await asyncio.sleep(0.2)
await asyncio.sleep(0.5)


async def successful_task(_ctx: Any) -> None:
pass
async def successful_task(_ctx: Any) -> str:
return 'success'


async def failed_task(_ctx: Any) -> None:
Expand All @@ -67,45 +72,75 @@ class JobsCreator:
redis: ArqRedis

async def create_queued(self) -> Job:
job = await self.redis.enqueue_job('successful_task')
job = await self.redis.enqueue_job('successful_task', _job_id='queued_task')
assert job
return job

async def create_running(self) -> Job:
job = await self.redis.enqueue_job('running_task')
async def create_finished(self) -> Job:
job = await self.redis.enqueue_job('successful_task', _job_id='finished_task')
assert job
await self.worker.main()

return job

async def create_running(self) -> Optional[Job]:
job = await self.redis.enqueue_job('running_task', _job_id='running_task')
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self.worker.main(), 0.1)

return job

async def create_deferred(self) -> Job:
job = await self.redis.enqueue_job('deferred_task', _defer_by=9000)
job = await self.redis.enqueue_job('deferred_task', _defer_by=9000, _job_id='deferred_task')
assert job
return job

async def create_unserializable(self) -> Job:
job = await self.redis.enqueue_job('successful_task')
job = await self.redis.enqueue_job('successful_task', _job_id='unserializable_task')
assert job
await self.redis.set(job_key_prefix + job.job_id, 'RANDOM TEXT')
return job


@pytest.fixture()
@pytest_asyncio.fixture()
async def jobs_creator(redis: ArqRedis, create_worker: Any) -> JobsCreator:
worker = create_worker(functions=[deferred_task, running_task, successful_task, failed_task])
return JobsCreator(worker=worker, redis=redis)


@pytest.fixture()
@pytest_asyncio.fixture()
async def all_jobs(jobs_creator: JobsCreator) -> List[Job]:
# the order matters
while True:
finished_job = await jobs_creator.create_finished()
running_job = await jobs_creator.create_running()
if not running_job:
continue

if (await running_job.status()) == JobStatus.in_progress:
break

await jobs_creator.redis.flushdb()

return [
await jobs_creator.create_running(),
finished_job,
running_job,
await jobs_creator.create_deferred(),
await jobs_creator.create_queued(),
]


@pytest.fixture()
@pytest_asyncio.fixture()
async def unserializable_job(jobs_creator: JobsCreator) -> Job:
return await jobs_creator.create_unserializable()


@pytest_asyncio.fixture()
@pytest.mark.django_db()
async def django_login(async_client: AsyncClient) -> AsyncGenerator[None, None]:
password = 'admin'
admin_user: User = await sync_to_async(User.objects.create_superuser)('admin', '[email protected]', password)
await sync_to_async(async_client.login)(username=admin_user.username, password=password)

yield

await sync_to_async(User.objects.all().delete)()
1 change: 1 addition & 0 deletions tests/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@
]

STATIC_URL = '/static/'
USE_TZ = True
Loading

0 comments on commit 378eb71

Please sign in to comment.