Skip to content

Commit

Permalink
Use a regex instead of split by colon when parsing arq keys (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
fedtf authored Jul 11, 2023
1 parent e757685 commit d5a9bc7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
14 changes: 8 additions & 6 deletions arq_admin/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import re
from contextlib import suppress
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
Expand All @@ -13,6 +14,7 @@
from arq_admin.job import JobInfo

ARQ_PREFIX = 'arq:'
ARQ_KEY_REGEX = re.compile(r'arq\:(?P<prefix>.+?)\:(?P<job_id>.+)')
PREFIX_PRIORITY = {prefix: i for i, prefix in enumerate(['job', 'in-progress', 'result'])}


Expand Down Expand Up @@ -153,16 +155,16 @@ async def _get_job_id_to_status_map(self) -> Dict[str, JobStatus]:
await pipe.zrange(self.name, withscores=True, start=0, end=-1)
all_arq_keys, job_ids_with_scores = await pipe.execute()

# iter over lists of type [job_id, prefix];
# can't use dict here because we can have multiple keys for one job and need to use the more specific one
job_ids_with_prefixes = (
key.decode('utf-8')[len(ARQ_PREFIX):].split(':')[::-1] for key in all_arq_keys
)
regex_matches_from_arq_keys = (ARQ_KEY_REGEX.match(key.decode('utf-8')) for key in all_arq_keys)
# iter over dicts with job ids and their keys' prefixes
# can't create mapping from ids to prefixes right away here
# because we can have multiple keys with different prefixes for one job and need to use the more specific one
job_ids_with_prefixes = (match.groupdict() for match in regex_matches_from_arq_keys if match is not None)

job_ids_to_scores = {key[0].decode('utf-8'): key[1] for key in job_ids_with_scores}
job_ids_to_prefixes = dict(sorted(
# not only ensure that we don't get key error but also filter out stuff that's not a client job
([job_id, prefix] for job_id, prefix in job_ids_with_prefixes if prefix in PREFIX_PRIORITY),
([key['job_id'], key['prefix']] for key in job_ids_with_prefixes if key['prefix'] in PREFIX_PRIORITY),
# make sure that more specific indices go after less specific ones
key=lambda job_id_with_prefix: PREFIX_PRIORITY[job_id_with_prefix[-1]],
))
Expand Down
16 changes: 16 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ async def test_stats_with_running_job_wo_zscore(redis: ArqRedis, queue: Queue) -
)


@pytest.mark.asyncio()
@pytest.mark.usefixtures('all_jobs')
async def test_stats_job_with_colon_in_the_name(redis: ArqRedis, queue: Queue) -> None:
await redis.enqueue_job('new_task', _job_id='queued:task')

assert await queue.get_stats() == QueueStats(
name=default_queue_name,
host=settings.REDIS_SETTINGS.host,
port=settings.REDIS_SETTINGS.port,
database=settings.REDIS_SETTINGS.database,
queued_jobs=2,
running_jobs=1,
deferred_jobs=1,
)


@pytest.mark.asyncio()
@patch.object(Queue, '_get_job_id_to_status_map')
async def test_stats_with_error(mocked_get_job_ids: AsyncMock, queue: Queue) -> None:
Expand Down

0 comments on commit d5a9bc7

Please sign in to comment.