From ab2dda2011ab27007650c4918d3704f3bf7ac13d Mon Sep 17 00:00:00 2001 From: Rishabh Mittal Date: Mon, 30 Oct 2023 23:39:26 +0530 Subject: [PATCH 01/10] =?UTF-8?q?=F0=9F=94=A8=20Adding=20a=20job=20counter?= =?UTF-8?q?=20to=20address=20Semaphore=20issues=20(#408)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ๐Ÿ”จ Adding a job counter to address Semaphore issues * ๐Ÿงช Test function for semaphore blocker --- arq/worker.py | 25 +++++++++++++++++++++---- tests/test_worker.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 81afd5b7..398409b5 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -236,7 +236,11 @@ def __init__( self.on_job_start = on_job_start self.on_job_end = on_job_end self.after_job_end = after_job_end - self.sem = asyncio.BoundedSemaphore(max_jobs) + + self.max_jobs = max_jobs + self.sem = asyncio.BoundedSemaphore(max_jobs + 1) + self.job_counter: int = 0 + self.job_timeout_s = to_seconds(job_timeout) self.keep_result_s = to_seconds(keep_result) self.keep_result_forever = keep_result_forever @@ -374,13 +378,13 @@ async def _poll_iteration(self) -> None: return count = min(burst_jobs_remaining, count) if self.allow_pick_jobs: - async with self.sem: # don't bother with zrangebyscore until we have "space" to run the jobs + if self.job_counter < self.max_jobs: now = timestamp_ms() job_ids = await self.pool.zrangebyscore( self.queue_name, min=float('-inf'), start=self._queue_read_offset, num=count, max=now ) - await self.start_jobs(job_ids) + await self.start_jobs(job_ids) if self.allow_abort_jobs: await self._cancel_aborted_jobs() @@ -419,12 +423,23 @@ async def _cancel_aborted_jobs(self) -> None: self.aborting_tasks.update(aborted) await self.pool.zrem(abort_jobs_ss, *aborted) + def _release_sem_dec_counter_on_complete(self) -> None: + self.job_counter = self.job_counter - 1 + self.sem.release() + async def start_jobs(self, job_ids: List[bytes]) -> None: """ For each job id, get the job definition, check it's not running and start it in a task """ for job_id_b in job_ids: await self.sem.acquire() + + if self.job_counter >= self.max_jobs: + self.sem.release() + return None + + self.job_counter = self.job_counter + 1 + job_id = job_id_b.decode() in_progress_key = in_progress_key_prefix + job_id async with self.pool.pipeline(transaction=True) as pipe: @@ -433,6 +448,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: score = await pipe.zscore(self.queue_name, job_id) if ongoing_exists or not score: # job already started elsewhere, or already finished and removed from queue + self.job_counter = self.job_counter - 1 self.sem.release() logger.debug('job %s already running elsewhere', job_id) continue @@ -445,11 +461,12 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: await pipe.execute() except (ResponseError, WatchError): # job already started elsewhere since we got 'existing' + self.job_counter = self.job_counter - 1 self.sem.release() logger.debug('multi-exec error, job %s already started elsewhere', job_id) else: t = self.loop.create_task(self.run_job(job_id, int(score))) - t.add_done_callback(lambda _: self.sem.release()) + t.add_done_callback(lambda _: self._release_sem_dec_counter_on_complete()) self.tasks[job_id] = t async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 diff --git a/tests/test_worker.py b/tests/test_worker.py index aa56085b..23dd91d2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -984,6 +984,36 @@ async def test(ctx): assert result['called'] == 4 +async def test_job_cancel_on_max_jobs(arq_redis: ArqRedis, worker, caplog): + async def longfunc(ctx): + await asyncio.sleep(3600) + + async def wait_and_abort(job, delay=0.1): + await asyncio.sleep(delay) + assert await job.abort() is True + + caplog.set_level(logging.INFO) + await arq_redis.zadd(abort_jobs_ss, {b'foobar': int(1e9)}) + job = await arq_redis.enqueue_job('longfunc', _job_id='testing') + + worker: Worker = worker( + functions=[func(longfunc, name='longfunc')], allow_abort_jobs=True, poll_delay=0.1, max_jobs=1 + ) + assert worker.jobs_complete == 0 + assert worker.jobs_failed == 0 + assert worker.jobs_retried == 0 + await asyncio.gather(wait_and_abort(job), worker.main()) + await worker.main() + assert worker.jobs_complete == 0 + assert worker.jobs_failed == 1 + assert worker.jobs_retried == 0 + log = re.sub(r'\d+.\d\ds', 'X.XXs', '\n'.join(r.message for r in caplog.records)) + assert 'X.XXs โ†’ testing:longfunc()\n X.XXs โŠ˜ testing:longfunc aborted' in log + assert worker.aborting_tasks == set() + assert worker.tasks == {} + assert worker.job_tasks == {} + + async def test_worker_timezone_defaults_to_system_timezone(worker): worker = worker(functions=[func(foobar)]) assert worker.timezone is not None From ec1532b29d490e66b5c22424b51781865ea4cf44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Fri, 29 Mar 2024 15:36:05 +0100 Subject: [PATCH 02/10] docs: add documentation on how to retrieve running jobs (#377) --- docs/examples/job_ids.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/examples/job_ids.py b/docs/examples/job_ids.py index 9de9b8cc..1148591c 100644 --- a/docs/examples/job_ids.py +++ b/docs/examples/job_ids.py @@ -2,6 +2,8 @@ from arq import create_pool from arq.connections import RedisSettings +from arq.jobs import Job + async def the_task(ctx): print('running the task with id', ctx['job_id']) @@ -37,6 +39,14 @@ async def main(): > None """ + # you can retrieve jobs by using arq.jobs.Job + await redis.enqueue_job('the_task', _job_id='my_job') + job5 = Job(job_id='my_job', redis=redis) + print(job5) + """ + + """ + class WorkerSettings: functions = [the_task] From e27ad93c1bd93624acfa9ef9ee67239cf4301dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:23:14 +0200 Subject: [PATCH 03/10] feat: add job_id to JobDef, closing #376 (#378) --- arq/connections.py | 1 + arq/jobs.py | 6 +++++- arq/worker.py | 3 +++ tests/test_jobs.py | 24 +++++++++++++++++++----- tests/test_main.py | 9 ++++++--- 5 files changed, 34 insertions(+), 9 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index d4fc4434..69ac8ce2 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -193,6 +193,7 @@ async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: assert v is not None, f'job "{key}" not found' jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score + jd.job_id = job_id.decode() return jd async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef]: diff --git a/arq/jobs.py b/arq/jobs.py index 8028cbe7..d0c0a5ef 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -47,6 +47,7 @@ class JobDef: job_try: int enqueue_time: datetime score: Optional[int] + job_id: Optional[str] def __post_init__(self) -> None: if isinstance(self.score, float): @@ -60,7 +61,6 @@ class JobResult(JobDef): start_time: datetime finish_time: datetime queue_name: str - job_id: Optional[str] = None class Job: @@ -238,6 +238,7 @@ def serialize_result( finished_ms: int, ref: str, queue_name: str, + job_id: str, *, serializer: Optional[Serializer] = None, ) -> Optional[bytes]: @@ -252,6 +253,7 @@ def serialize_result( 'st': start_ms, 'ft': finished_ms, 'q': queue_name, + 'id': job_id, } if serializer is None: serializer = pickle.dumps @@ -281,6 +283,7 @@ def deserialize_job(r: bytes, *, deserializer: Optional[Deserializer] = None) -> job_try=d['t'], enqueue_time=ms_to_datetime(d['et']), score=None, + job_id=None, ) except Exception as e: raise DeserializationError('unable to deserialize job') from e @@ -315,6 +318,7 @@ def deserialize_result(r: bytes, *, deserializer: Optional[Deserializer] = None) start_time=ms_to_datetime(d['st']), finish_time=ms_to_datetime(d['ft']), queue_name=d.get('q', ''), + job_id=d.get('id', ''), ) except Exception as e: raise DeserializationError('unable to deserialize job result') from e diff --git a/arq/worker.py b/arq/worker.py index 398409b5..7ff5393a 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -501,6 +501,7 @@ async def job_failed(exc: BaseException) -> None: ref=f'{job_id}:{function_name}', serializer=self.job_serializer, queue_name=self.queue_name, + job_id=job_id, ) await asyncio.shield(self.finish_failed_job(job_id, result_data_)) @@ -556,6 +557,7 @@ async def job_failed(exc: BaseException) -> None: timestamp_ms(), ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) return await asyncio.shield(self.finish_failed_job(job_id, result_data)) @@ -649,6 +651,7 @@ async def job_failed(exc: BaseException) -> None: finished_ms, ref, self.queue_name, + job_id=job_id, serializer=self.job_serializer, ) diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 634a8b03..f8f6c8c4 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -2,7 +2,7 @@ import pickle import pytest -from dirty_equals import IsNow +from dirty_equals import IsNow, IsStr from arq import Worker, func from arq.connections import ArqRedis, RedisSettings, create_pool @@ -89,6 +89,7 @@ async def foobar(ctx, *args, **kwargs): finish_time=IsNow(tz='utc'), score=None, queue_name=expected_queue_name, + job_id=IsStr(), ) results = await arq_redis.all_job_results() assert results == [ @@ -139,9 +140,9 @@ class Foobar: def __getstate__(self): raise TypeError("this doesn't pickle") - r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r1 = serialize_result('foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert isinstance(r1, bytes) - r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue') + r2 = serialize_result('foobar', (Foobar(),), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', 'job_1') assert r2 is None @@ -154,7 +155,19 @@ def custom_serializer(x): return b'0123456789' r1 = serialize_result( - 'foobar', (1,), {}, 1, 123, True, Foobar(), 123, 123, 'testing', 'test-queue', serializer=custom_serializer + 'foobar', + (1,), + {}, + 1, + 123, + True, + Foobar(), + 123, + 123, + 'testing', + 'test-queue', + 'job_1', + serializer=custom_serializer, ) assert r1 == b'0123456789' r2 = serialize_result( @@ -169,6 +182,7 @@ def custom_serializer(x): 123, 'testing', 'test-queue', + 'job_1', serializer=custom_serializer, ) assert r2 == b'0123456789' @@ -213,7 +227,7 @@ async def test_get_job_result(arq_redis: ArqRedis): async def test_result_pole_delay_dep(arq_redis: ArqRedis): j = Job('foobar', arq_redis) - r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue') + r = serialize_result('foobar', (1,), {}, 1, 123, True, 42, 123, 123, 'testing', 'test-queue', 'job_1') await arq_redis.set(result_key_prefix + j.job_id, r) with pytest.warns( DeprecationWarning, match='"pole_delay" is deprecated, use the correct spelling "poll_delay" instead' diff --git a/tests/test_main.py b/tests/test_main.py index 7c3a9835..198c815b 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -238,11 +238,11 @@ async def foobar(ctx): async def test_get_jobs(arq_redis: ArqRedis): - await arq_redis.enqueue_job('foobar', a=1, b=2, c=3) + await arq_redis.enqueue_job('foobar', a=1, b=2, c=3, _job_id='1') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('second', 4, b=5, c=6) + await arq_redis.enqueue_job('second', 4, b=5, c=6, _job_id='2') await asyncio.sleep(0.01) - await arq_redis.enqueue_job('third', 7, b=8) + await arq_redis.enqueue_job('third', 7, b=8, _job_id='3') jobs = await arq_redis.queued_jobs() assert [dataclasses.asdict(j) for j in jobs] == [ { @@ -252,6 +252,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '1', }, { 'function': 'second', @@ -260,6 +261,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '2', }, { 'function': 'third', @@ -268,6 +270,7 @@ async def test_get_jobs(arq_redis: ArqRedis): 'job_try': None, 'enqueue_time': IsNow(tz='utc'), 'score': IsInt(), + 'job_id': '3', }, ] assert jobs[0].score < jobs[1].score < jobs[2].score From b59e71674634ceb53e2f6a55fcd6d3c6e65b8598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:30:38 +0200 Subject: [PATCH 04/10] chore: update dependencies, fixing tests (#382) * chore: update dependencies, fixing tests * Commit message --------- Co-authored-by: Marcelo Trylesinski --- arq/cli.py | 2 +- requirements/docs.txt | 4 ++-- requirements/linting.txt | 10 ++-------- requirements/pyproject.txt | 14 +++----------- requirements/testing.in | 2 +- requirements/testing.txt | 18 +++++------------- 6 files changed, 14 insertions(+), 36 deletions(-) diff --git a/arq/cli.py b/arq/cli.py index e4d2ef96..3d3aa300 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -60,7 +60,7 @@ async def watch_reload(path: str, worker_settings: 'WorkerSettingsType') -> None except ImportError as e: # pragma: no cover raise ImportError('watchfiles not installed, use `pip install watchfiles`') from e - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() stop_event = asyncio.Event() def worker_on_stop(s: Signals) -> None: diff --git a/requirements/docs.txt b/requirements/docs.txt index 5d1651e5..7473b36b 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/docs.txt requirements/docs.in @@ -35,7 +35,7 @@ requests==2.28.1 snowballstemmer==2.2.0 # via sphinx sphinx==5.1.1 - # via -r docs.in + # via -r requirements/docs.in sphinxcontrib-applehelp==1.0.2 # via sphinx sphinxcontrib-devhelp==1.0.2 diff --git a/requirements/linting.txt b/requirements/linting.txt index 57176e06..faf0a6ba 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/linting.txt requirements/linting.in @@ -34,15 +34,9 @@ pycodestyle==2.9.1 # via flake8 pyflakes==2.5.0 # via flake8 -tomli==2.0.1 - # via - # black - # mypy types-pytz==2022.2.1.0 # via -r requirements/linting.in types-redis==4.2.8 # via -r requirements/linting.in typing-extensions==4.3.0 - # via - # black - # mypy + # via mypy diff --git a/requirements/pyproject.txt b/requirements/pyproject.txt index 5c605c88..c2c38af6 100644 --- a/requirements/pyproject.txt +++ b/requirements/pyproject.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --extra=watch --output-file=requirements/pyproject.txt pyproject.toml @@ -10,17 +10,11 @@ async-timeout==4.0.2 # via redis click==8.1.3 # via arq (pyproject.toml) -deprecated==1.2.13 - # via redis -hiredis==2.0.0 +hiredis==2.1.0 # via redis idna==3.3 # via anyio -packaging==21.3 - # via redis -pyparsing==3.0.9 - # via packaging -redis[hiredis]==4.3.4 +redis[hiredis]==4.4.0 # via arq (pyproject.toml) sniffio==1.2.0 # via anyio @@ -28,5 +22,3 @@ typing-extensions==4.3.0 # via arq (pyproject.toml) watchfiles==0.16.1 # via arq (pyproject.toml) -wrapt==1.14.1 - # via deprecated diff --git a/requirements/testing.in b/requirements/testing.in index 2b39e898..5a32ec5d 100644 --- a/requirements/testing.in +++ b/requirements/testing.in @@ -3,7 +3,7 @@ dirty-equals>=0.4,<1 msgpack>=1,<2 pydantic>=1.9.2,<2 pytest>=7,<8 -pytest-asyncio>=0.19,<0.20 +pytest-asyncio>=0.20.3 pytest-mock>=3,<4 pytest-sugar>=0.9,<1 pytest-timeout>=2,<3 diff --git a/requirements/testing.txt b/requirements/testing.txt index 243d054a..a639ab8a 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with python 3.9 +# This file is autogenerated by pip-compile with python 3.11 # To update, run: # # pip-compile --output-file=requirements/testing.txt requirements/testing.in @@ -10,8 +10,6 @@ attrs==22.1.0 # via pytest coverage[toml]==6.4.4 # via -r requirements/testing.in -deprecated==1.2.13 - # via redis dirty-equals==0.4 # via -r requirements/testing.in iniconfig==1.1.1 @@ -22,7 +20,6 @@ packaging==21.3 # via # pytest # pytest-sugar - # redis pluggy==1.0.0 # via pytest psutil==5.9.1 @@ -40,7 +37,7 @@ pytest==7.1.2 # pytest-mock # pytest-sugar # pytest-timeout -pytest-asyncio==0.19.0 +pytest-asyncio==0.20.3 # via -r requirements/testing.in pytest-mock==3.8.2 # via -r requirements/testing.in @@ -52,21 +49,16 @@ pytz==2022.2.1 # via # -r requirements/testing.in # dirty-equals -# manually removed to avoid conflict with redis version from pyproject.toml -# redis==4.2.2 -# # via redislite +redis==4.4.0 + # via redislite redislite==6.2.805324 # via -r requirements/testing.in termcolor==1.1.0 # via pytest-sugar tomli==2.0.1 - # via - # coverage - # pytest + # via pytest typing-extensions==4.3.0 # via pydantic -wrapt==1.14.1 - # via deprecated # The following packages are considered to be unsafe in a requirements file: # setuptools From 5769e1095cfdd20af7b135c36670272559eb3352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Kr=C3=BCger=20Svensson?= Date: Mon, 1 Apr 2024 12:32:25 +0200 Subject: [PATCH 05/10] refactor: refactor all asserts into raise , close #371 (#379) --- arq/connections.py | 14 ++++++++------ arq/cron.py | 11 +++++++---- arq/worker.py | 9 ++++++--- 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index 69ac8ce2..b3a91b48 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -50,7 +50,8 @@ class RedisSettings: @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': conf = urlparse(dsn) - assert conf.scheme in {'redis', 'rediss', 'unix'}, 'invalid DSN scheme' + if conf.scheme not in {'redis', 'rediss', 'unix'}: + raise RuntimeError('invalid DSN scheme') query_db = parse_qs(conf.query).get('db') if query_db: # e.g. redis://localhost:6379?db=1 @@ -138,7 +139,8 @@ async def enqueue_job( _queue_name = self.default_queue_name job_id = _job_id or uuid4().hex job_key = job_key_prefix + job_id - assert not (_defer_until and _defer_by), "use either 'defer_until' or 'defer_by' or neither, not both" + if _defer_until and _defer_by: + raise RuntimeError("use either 'defer_until' or 'defer_by' or neither, not both") defer_by_ms = to_ms(_defer_by) expires_ms = to_ms(_expires) @@ -190,7 +192,8 @@ async def all_job_results(self) -> List[JobResult]: async def _get_job_def(self, job_id: bytes, score: int) -> JobDef: key = job_key_prefix + job_id.decode() v = await self.get(key) - assert v is not None, f'job "{key}" not found' + if v is None: + raise RuntimeError(f'job "{key}" not found') jd = deserialize_job(v, deserializer=self.job_deserializer) jd.score = score jd.job_id = job_id.decode() @@ -222,9 +225,8 @@ async def create_pool( """ settings: RedisSettings = RedisSettings() if settings_ is None else settings_ - assert not ( - type(settings.host) is str and settings.sentinel - ), "str provided for 'host' but 'sentinel' is true; list of sentinels expected" + if isinstance(settings.host, str) and settings.sentinel: + raise RuntimeError("str provided for 'host' but 'sentinel' is true; list of sentinels expected") if settings.sentinel: diff --git a/arq/cron.py b/arq/cron.py index 2eca6c75..f62ea0bd 100644 --- a/arq/cron.py +++ b/arq/cron.py @@ -58,9 +58,10 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa next_v = getattr(dt_, field) if isinstance(v, int): mismatch = next_v != v - else: - assert isinstance(v, (set, list, tuple)), v + elif isinstance(v, (set, list, tuple)): mismatch = next_v not in v + else: + raise RuntimeError(v) # print(field, v, next_v, mismatch) if mismatch: micro = max(dt_.microsecond - options.microsecond, 0) @@ -82,7 +83,8 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa elif field == 'second': return dt_ + timedelta(seconds=1) - timedelta(microseconds=micro) else: - assert field == 'microsecond', field + if field != 'microsecond': + raise RuntimeError(field) return dt_ + timedelta(microseconds=options.microsecond - dt_.microsecond) return None @@ -173,7 +175,8 @@ def cron( else: coroutine_ = coroutine - assert asyncio.iscoroutinefunction(coroutine_), f'{coroutine_} is not a coroutine function' + if not asyncio.iscoroutinefunction(coroutine_): + raise RuntimeError(f'{coroutine_} is not a coroutine function') timeout = to_seconds(timeout) keep_result = to_seconds(keep_result) diff --git a/arq/worker.py b/arq/worker.py index 7ff5393a..2bdab0f0 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -85,7 +85,8 @@ def func( else: coroutine_ = coroutine - assert asyncio.iscoroutinefunction(coroutine_), f'{coroutine_} is not a coroutine function' + if not asyncio.iscoroutinefunction(coroutine_): + raise RuntimeError(f'{coroutine_} is not a coroutine function') timeout = to_seconds(timeout) keep_result = to_seconds(keep_result) @@ -226,10 +227,12 @@ def __init__( self.queue_name = queue_name self.cron_jobs: List[CronJob] = [] if cron_jobs is not None: - assert all(isinstance(cj, CronJob) for cj in cron_jobs), 'cron_jobs, must be instances of CronJob' + if not all(isinstance(cj, CronJob) for cj in cron_jobs): + raise RuntimeError('cron_jobs, must be instances of CronJob') self.cron_jobs = list(cron_jobs) self.functions.update({cj.name: cj for cj in self.cron_jobs}) - assert len(self.functions) > 0, 'at least one function or cron_job must be registered' + if len(self.functions) == 0: + raise RuntimeError('at least one function or cron_job must be registered') self.burst = burst self.on_startup = on_startup self.on_shutdown = on_shutdown From 94cd8782b4f0764a17962186a349d32125cb98e3 Mon Sep 17 00:00:00 2001 From: Piotr Janiszewski <13190648+iamlikeme@users.noreply.github.com> Date: Mon, 1 Apr 2024 12:33:12 +0200 Subject: [PATCH 06/10] Fix: timezone info occasionally removed from cron job execution time (#383) * Expose the bug in tests * Do not remove timezone when incrementing month in _get_next_dt --- arq/cron.py | 4 +-- tests/test_cron.py | 88 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/arq/cron.py b/arq/cron.py index f62ea0bd..53f053f7 100644 --- a/arq/cron.py +++ b/arq/cron.py @@ -67,9 +67,9 @@ def _get_next_dt(dt_: datetime, options: Options) -> Optional[datetime]: # noqa micro = max(dt_.microsecond - options.microsecond, 0) if field == 'month': if dt_.month == 12: - return datetime(dt_.year + 1, 1, 1) + return datetime(dt_.year + 1, 1, 1, tzinfo=dt_.tzinfo) else: - return datetime(dt_.year, dt_.month + 1, 1) + return datetime(dt_.year, dt_.month + 1, 1, tzinfo=dt_.tzinfo) elif field in ('day', 'weekday'): return ( dt_ diff --git a/tests/test_cron.py b/tests/test_cron.py index 5300041d..16b5b507 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,7 +1,7 @@ import asyncio import logging import re -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from random import random import pytest @@ -12,37 +12,87 @@ from arq.constants import in_progress_key_prefix from arq.cron import cron, next_cron +tz = timezone(offset=timedelta(hours=3)) + @pytest.mark.parametrize( 'previous,expected,kwargs', [ - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 10, 20, microsecond=123_456), dict(second=20)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 11, 0, microsecond=123_456), dict(minute=11)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 10, 20), dict(second=20, microsecond=0)), - (datetime(2016, 6, 1, 12, 10, 10), datetime(2016, 6, 1, 12, 11, 0), dict(minute=11, microsecond=0)), ( - datetime(2016, 6, 1, 12, 10, 11), - datetime(2017, 6, 1, 12, 10, 10, microsecond=123_456), + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 10, 20, microsecond=123_456, tzinfo=tz), + dict(second=20), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 11, 0, microsecond=123_456, tzinfo=tz), + dict(minute=11), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 10, 20, tzinfo=tz), + dict(second=20, microsecond=0), + ), + ( + datetime(2016, 6, 1, 12, 10, 10, tzinfo=tz), + datetime(2016, 6, 1, 12, 11, 0, tzinfo=tz), + dict(minute=11, microsecond=0), + ), + ( + datetime(2016, 6, 1, 12, 10, 11, tzinfo=tz), + datetime(2017, 6, 1, 12, 10, 10, microsecond=123_456, tzinfo=tz), dict(month=6, day=1, hour=12, minute=10, second=10), ), ( - datetime(2016, 6, 1, 12, 10, 10, microsecond=1), - datetime(2016, 7, 1, 12, 10, 10), + datetime(2016, 6, 1, 12, 10, 10, microsecond=1, tzinfo=tz), + datetime(2016, 7, 1, 12, 10, 10, tzinfo=tz), dict(day=1, hour=12, minute=10, second=10, microsecond=0), ), - (datetime(2032, 1, 31, 0, 0, 0), datetime(2032, 2, 28, 0, 0, 0, microsecond=123_456), dict(day=28)), - (datetime(2032, 1, 1, 0, 5), datetime(2032, 1, 1, 4, 0, microsecond=123_456), dict(hour=4)), - (datetime(2032, 1, 1, 0, 0), datetime(2032, 1, 1, 4, 2, microsecond=123_456), dict(hour=4, minute={2, 4, 6})), - (datetime(2032, 1, 1, 0, 5), datetime(2032, 1, 1, 4, 2, microsecond=123_456), dict(hour=4, minute={2, 4, 6})), - (datetime(2032, 2, 5, 0, 0, 0), datetime(2032, 3, 31, 0, 0, 0, microsecond=123_456), dict(day=31)), ( - datetime(2001, 1, 1, 0, 0, 0), # Monday - datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456), + datetime(2032, 1, 31, 0, 0, 0, tzinfo=tz), + datetime(2032, 2, 28, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(day=28), + ), + ( + datetime(2032, 1, 1, 0, 5, tzinfo=tz), + datetime(2032, 1, 1, 4, 0, microsecond=123_456, tzinfo=tz), + dict(hour=4), + ), + ( + datetime(2032, 1, 1, 0, 0, tzinfo=tz), + datetime(2032, 1, 1, 4, 2, microsecond=123_456, tzinfo=tz), + dict(hour=4, minute={2, 4, 6}), + ), + ( + datetime(2032, 1, 1, 0, 5, tzinfo=tz), + datetime(2032, 1, 1, 4, 2, microsecond=123_456, tzinfo=tz), + dict(hour=4, minute={2, 4, 6}), + ), + ( + datetime(2032, 2, 5, 0, 0, 0, tzinfo=tz), + datetime(2032, 3, 31, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(day=31), + ), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), # Monday + datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), dict(weekday='Sun'), # Sunday ), - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456), dict(weekday=6)), # Sunday - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 11, 7, 0, 0, 0, microsecond=123_456), dict(month=11, weekday=2)), - (datetime(2001, 1, 1, 0, 0, 0), datetime(2001, 1, 3, 0, 0, 0, microsecond=123_456), dict(weekday='wed')), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 1, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(weekday=6), + ), # Sunday + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 11, 7, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(month=11, weekday=2), + ), + ( + datetime(2001, 1, 1, 0, 0, 0, tzinfo=tz), + datetime(2001, 1, 3, 0, 0, 0, microsecond=123_456, tzinfo=tz), + dict(weekday='wed'), + ), ], ) def test_next_cron(previous, expected, kwargs): From 8321dc197651a582a0a327a163c290970a485e2a Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Mon, 1 Apr 2024 13:23:42 +0100 Subject: [PATCH 07/10] 3.12 support, drop 3.7, uprev dependencies (#439) * support python 3.12, drop 3.7 * fix linting and type checking * add optional deps to requirements/pyproject.txt * make test_max_bursts_multiple less flakey :fingers_crossed: * mypy strict, ruff improvement * fix github release job --- .github/workflows/ci.yml | 74 +++++++++++++++++++++---------------- .pre-commit-config.yaml | 6 +-- Makefile | 28 ++++++++++---- arq/connections.py | 19 +++++----- arq/jobs.py | 16 ++++---- arq/typing.py | 4 +- arq/worker.py | 48 +++++++++++------------- pyproject.toml | 45 ++++++----------------- requirements/docs.txt | 44 ++++++++++------------ requirements/linting.in | 9 ++--- requirements/linting.txt | 52 ++++++++++---------------- requirements/pyproject.txt | 24 ++++++------ requirements/testing.in | 19 +++++----- requirements/testing.txt | 75 +++++++++++++++++--------------------- tests/check_tag.py | 15 -------- tests/conftest.py | 8 ---- tests/test_jobs.py | 11 ------ tests/test_utils.py | 6 +-- tests/test_worker.py | 14 +++++-- 19 files changed, 226 insertions(+), 291 deletions(-) delete mode 100755 tests/check_tag.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3d79f197..d6e09f61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,7 +4,6 @@ on: push: branches: - main - - v0.23-branch tags: - '**' pull_request: {} @@ -14,12 +13,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: set up python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.11' - run: pip install -r requirements/linting.txt -r requirements/pyproject.txt pre-commit @@ -29,12 +28,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: set up python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.11' - run: pip install -r requirements/docs.txt -r requirements/pyproject.txt - run: pip install . @@ -42,7 +41,7 @@ jobs: - run: make docs - name: Store docs site - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: docs path: docs/_build/ @@ -53,13 +52,13 @@ jobs: fail-fast: false matrix: os: [ubuntu] - python: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11', '3.12'] redis: ['5'] include: - - python: '3.10' + - python: '3.11' redis: '6' os: 'ubuntu' - - python: '3.10' + - python: '3.11' redis: '7' os: 'ubuntu' @@ -67,7 +66,7 @@ jobs: PYTHON: ${{ matrix.python }} OS: ${{ matrix.os }} - runs-on: ${{ format('{0}-latest', matrix.os) }} + runs-on: ${{ matrix.os }}-latest services: redis: @@ -77,10 +76,10 @@ jobs: options: --entrypoint redis-server steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: set up python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} @@ -90,50 +89,61 @@ jobs: - run: coverage xml - - uses: codecov/codecov-action@v2 + - uses: codecov/codecov-action@v4 with: file: ./coverage.xml env_vars: PYTHON,OS - deploy: - name: Deploy + check: + if: always() needs: [lint, docs, test] + runs-on: ubuntu-latest + + steps: + - name: Decide whether the needed jobs succeeded or failed + uses: re-actors/alls-green@release/v1 + id: all-green + with: + jobs: ${{ toJSON(needs) }} + + release: + name: Release + needs: [check] if: "success() && startsWith(github.ref, 'refs/tags/')" runs-on: ubuntu-latest + environment: release + + permissions: + id-token: write steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: get docs - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: name: docs path: docs/_build/ - name: set up python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.11' - name: install - run: pip install -U twine build packaging + run: pip install -U build - name: check version id: check-version - run: python <(curl -Ls https://gist.githubusercontent.com/samuelcolvin/4e1ad439c5489e8d6478cdee3eb952ef/raw/check_version.py) - env: - VERSION_PATH: 'arq/version.py' + uses: samuelcolvin/check-python-version@v3.2 + with: + version_file_path: 'arq/version.py' - name: build run: python -m build - - run: twine check dist/* - - - name: upload to pypi - run: twine upload dist/* - env: - TWINE_USERNAME: __token__ - TWINE_PASSWORD: ${{ secrets.pypi_token }} + - name: Upload package to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 - name: publish docs if: '!fromJSON(steps.check-version.outputs.IS_PRERELEASE)' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aa774cdf..2d77e128 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,9 +11,9 @@ repos: - repo: local hooks: - - id: lint - name: Lint - entry: make lint + - id: format + name: Format + entry: make format types: [python] language: system pass_filenames: false diff --git a/Makefile b/Makefile index f0c2d044..f5417c05 100644 --- a/Makefile +++ b/Makefile @@ -1,24 +1,36 @@ .DEFAULT_GOAL := all -isort = isort arq tests -black = black arq tests +sources = arq tests .PHONY: install install: - pip install -U pip pre-commit + pip install -U pip pre-commit pip-tools pip install -r requirements/all.txt pip install -e .[watch] pre-commit install +.PHONY: refresh-lockfiles +refresh-lockfiles: + find requirements/ -name '*.txt' ! -name 'all.txt' -type f -delete + make update-lockfiles + +.PHONY: update-lockfiles +update-lockfiles: + @echo "Updating requirements/*.txt files using pip-compile" + pip-compile -q --strip-extras -o requirements/linting.txt requirements/linting.in + pip-compile -q --strip-extras -o requirements/testing.txt requirements/testing.in + pip-compile -q --strip-extras -o requirements/docs.txt requirements/docs.in + pip-compile -q --strip-extras -o requirements/pyproject.txt pyproject.toml --all-extras + pip install --dry-run -r requirements/all.txt + .PHONY: format format: - $(isort) - $(black) + ruff check --fix $(sources) + ruff format $(sources) .PHONY: lint lint: - flake8 --max-complexity 10 --max-line-length 120 --ignore E203,W503 arq/ tests/ - $(isort) --check-only --df - $(black) --check + ruff check $(sources) + ruff format --check $(sources) .PHONY: test test: diff --git a/arq/connections.py b/arq/connections.py index b3a91b48..ec11b8c7 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from operator import attrgetter -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union, cast from urllib.parse import parse_qs, urlparse from uuid import uuid4 @@ -163,8 +163,8 @@ async def enqueue_job( job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() - pipe.psetex(job_key, expires_ms, job) # type: ignore[no-untyped-call] - pipe.zadd(_queue_name, {job_id: score}) # type: ignore[unused-coroutine] + pipe.psetex(job_key, expires_ms, job) + pipe.zadd(_queue_name, {job_id: score}) try: await pipe.execute() except WatchError: @@ -210,7 +210,7 @@ async def queued_jobs(self, *, queue_name: Optional[str] = None) -> List[JobDef] async def create_pool( - settings_: RedisSettings = None, + settings_: Optional[RedisSettings] = None, *, retry: int = 0, job_serializer: Optional[Serializer] = None, @@ -237,7 +237,8 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: ssl=settings.ssl, **kwargs, ) - return client.master_for(settings.sentinel_master, redis_class=ArqRedis) + redis = client.master_for(settings.sentinel_master, redis_class=ArqRedis) + return cast(ArqRedis, redis) else: pool_factory = functools.partial( @@ -288,10 +289,10 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: async def log_redis_info(redis: 'Redis[bytes]', log_func: Callable[[str], Any]) -> None: async with redis.pipeline(transaction=False) as pipe: - pipe.info(section='Server') # type: ignore[unused-coroutine] - pipe.info(section='Memory') # type: ignore[unused-coroutine] - pipe.info(section='Clients') # type: ignore[unused-coroutine] - pipe.dbsize() # type: ignore[unused-coroutine] + pipe.info(section='Server') + pipe.info(section='Memory') + pipe.info(section='Clients') + pipe.dbsize() info_server, info_memory, info_clients, key_count = await pipe.execute() redis_version = info_server.get('redis_version', '?') diff --git a/arq/jobs.py b/arq/jobs.py index d0c0a5ef..15b7231e 100644 --- a/arq/jobs.py +++ b/arq/jobs.py @@ -83,7 +83,7 @@ def __init__( self._deserializer = _deserializer async def result( - self, timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: float = None + self, timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: Optional[float] = None ) -> Any: """ Get the result of the job or, if the job raised an exception, reraise it. @@ -103,8 +103,8 @@ async def result( async for delay in poll(poll_delay): async with self._redis.pipeline(transaction=True) as tr: - tr.get(result_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine] + tr.get(result_key_prefix + self.job_id) + tr.zscore(self._queue_name, self.job_id) v, s = await tr.execute() if v: @@ -154,9 +154,9 @@ async def status(self) -> JobStatus: Status of the job. """ async with self._redis.pipeline(transaction=True) as tr: - tr.exists(result_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.exists(in_progress_key_prefix + self.job_id) # type: ignore[unused-coroutine] - tr.zscore(self._queue_name, self.job_id) # type: ignore[unused-coroutine] + tr.exists(result_key_prefix + self.job_id) + tr.exists(in_progress_key_prefix + self.job_id) + tr.zscore(self._queue_name, self.job_id) is_complete, is_in_progress, score = await tr.execute() if is_complete: @@ -180,8 +180,8 @@ async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0. job_info = await self.info() if job_info and job_info.score and job_info.score > timestamp_ms(): async with self._redis.pipeline(transaction=True) as tr: - tr.zrem(self._queue_name, self.job_id) # type: ignore[unused-coroutine] - tr.zadd(self._queue_name, {self.job_id: 1}) # type: ignore[unused-coroutine] + tr.zrem(self._queue_name, self.job_id) + tr.zadd(self._queue_name, {self.job_id: 1}) await tr.execute() await self._redis.zadd(abort_jobs_ss, {self.job_id: timestamp_ms()}) diff --git a/arq/typing.py b/arq/typing.py index bde59053..945e3370 100644 --- a/arq/typing.py +++ b/arq/typing.py @@ -19,8 +19,8 @@ if TYPE_CHECKING: - from .cron import CronJob # noqa F401 - from .worker import Function # noqa F401 + from .cron import CronJob + from .worker import Function OptionType = Union[None, Set[int], int] WEEKDAYS = 'mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun' diff --git a/arq/worker.py b/arq/worker.py index 2bdab0f0..4c33b677 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -41,7 +41,7 @@ ) if TYPE_CHECKING: - from .typing import SecondsTimedelta, StartupShutdown, WorkerCoroutine, WorkerSettingsType # noqa F401 + from .typing import SecondsTimedelta, StartupShutdown, WorkerCoroutine, WorkerSettingsType logger = logging.getLogger('arq.worker') no_result = object() @@ -189,8 +189,8 @@ def __init__( *, queue_name: Optional[str] = default_queue_name, cron_jobs: Optional[Sequence[CronJob]] = None, - redis_settings: RedisSettings = None, - redis_pool: ArqRedis = None, + redis_settings: Optional[RedisSettings] = None, + redis_pool: Optional[ArqRedis] = None, burst: bool = False, on_startup: Optional['StartupShutdown'] = None, on_shutdown: Optional['StartupShutdown'] = None, @@ -357,7 +357,7 @@ async def main(self) -> None: if self.on_startup: await self.on_startup(self.ctx) - async for _ in poll(self.poll_delay_s): # noqa F841 + async for _ in poll(self.poll_delay_s): await self._poll_iteration() if self.burst: @@ -405,10 +405,8 @@ async def _cancel_aborted_jobs(self) -> None: Go through job_ids in the abort_jobs_ss sorted set and cancel those tasks. """ async with self.pool.pipeline(transaction=True) as pipe: - pipe.zrange(abort_jobs_ss, start=0, end=-1) # type: ignore[unused-coroutine] - pipe.zremrangebyscore( # type: ignore[unused-coroutine] - abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf') - ) + pipe.zrange(abort_jobs_ss, start=0, end=-1) + pipe.zremrangebyscore(abort_jobs_ss, min=timestamp_ms() + abort_job_max_age, max=float('inf')) abort_job_ids, _ = await pipe.execute() aborted: Set[str] = set() @@ -457,9 +455,7 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: continue pipe.multi() - pipe.psetex( # type: ignore[no-untyped-call] - in_progress_key, int(self.in_progress_timeout_s * 1000), b'1' - ) + pipe.psetex(in_progress_key, int(self.in_progress_timeout_s * 1000), b'1') try: await pipe.execute() except (ResponseError, WatchError): @@ -475,11 +471,11 @@ async def start_jobs(self, job_ids: List[bytes]) -> None: async def run_job(self, job_id: str, score: int) -> None: # noqa: C901 start_ms = timestamp_ms() async with self.pool.pipeline(transaction=True) as pipe: - pipe.get(job_key_prefix + job_id) # type: ignore[unused-coroutine] - pipe.incr(retry_key_prefix + job_id) # type: ignore[unused-coroutine] - pipe.expire(retry_key_prefix + job_id, 88400) # type: ignore[unused-coroutine] + pipe.get(job_key_prefix + job_id) + pipe.incr(retry_key_prefix + job_id) + pipe.expire(retry_key_prefix + job_id, 88400) if self.allow_abort_jobs: - pipe.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] + pipe.zrem(abort_jobs_ss, job_id) v, job_try, _, abort_job = await pipe.execute() else: v, job_try, _ = await pipe.execute() @@ -692,35 +688,35 @@ async def finish_job( if keep_in_progress is None: delete_keys += [in_progress_key] else: - tr.pexpire(in_progress_key, to_ms(keep_in_progress)) # type: ignore[unused-coroutine] + tr.pexpire(in_progress_key, to_ms(keep_in_progress)) if finish: if result_data: expire = None if keep_result_forever else result_timeout_s - tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine] + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) delete_keys += [retry_key_prefix + job_id, job_key_prefix + job_id] - tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] - tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + tr.zrem(abort_jobs_ss, job_id) + tr.zrem(self.queue_name, job_id) elif incr_score: - tr.zincrby(self.queue_name, incr_score, job_id) # type: ignore[unused-coroutine] + tr.zincrby(self.queue_name, incr_score, job_id) if delete_keys: - tr.delete(*delete_keys) # type: ignore[unused-coroutine] + tr.delete(*delete_keys) await tr.execute() async def finish_failed_job(self, job_id: str, result_data: Optional[bytes]) -> None: async with self.pool.pipeline(transaction=True) as tr: - tr.delete( # type: ignore[unused-coroutine] + tr.delete( retry_key_prefix + job_id, in_progress_key_prefix + job_id, job_key_prefix + job_id, ) - tr.zrem(abort_jobs_ss, job_id) # type: ignore[unused-coroutine] - tr.zrem(self.queue_name, job_id) # type: ignore[unused-coroutine] + tr.zrem(abort_jobs_ss, job_id) + tr.zrem(self.queue_name, job_id) # result_data would only be None if serializing the result fails keep_result = self.keep_result_forever or self.keep_result_s > 0 if result_data is not None and keep_result: # pragma: no branch expire = 0 if self.keep_result_forever else self.keep_result_s - tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) # type: ignore[unused-coroutine] + tr.set(result_key_prefix + job_id, result_data, px=to_ms(expire)) await tr.execute() async def heart_beat(self) -> None: @@ -883,7 +879,7 @@ def get_kwargs(settings_cls: 'WorkerSettingsType') -> Dict[str, NameError]: def create_worker(settings_cls: 'WorkerSettingsType', **kwargs: Any) -> Worker: - return Worker(**{**get_kwargs(settings_cls), **kwargs}) # type: ignore + return Worker(**{**get_kwargs(settings_cls), **kwargs}) def run_worker(settings_cls: 'WorkerSettingsType', **kwargs: Any) -> Worker: diff --git a/pyproject.toml b/pyproject.toml index 7d88ada4..d05aedd2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,11 +24,11 @@ classifiers = [ 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Topic :: Internet', 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Clustering', @@ -38,7 +38,7 @@ classifiers = [ ] requires-python = '>=3.7' dependencies = [ - 'redis[hiredis]>=4.2.0', + 'redis[hiredis]>=4.2.0,<5', 'click>=8.0', 'typing-extensions>=4.1.0', ] @@ -76,39 +76,16 @@ exclude_lines = [ '@overload', ] -[tool.black] -color = true +[tool.ruff] line-length = 120 -target-version = ['py39'] -skip-string-normalization = true -[tool.isort] -line_length = 120 -known_third_party = 'foxglove' -multi_line_output = 3 -include_trailing_comma = true -force_grid_wrap = 0 -combine_as_imports = true -color_output = true +[tool.ruff.lint] +extend-select = ['Q', 'RUF100', 'C90', 'UP', 'I'] +flake8-quotes = {inline-quotes = 'single', multiline-quotes = 'double'} +mccabe = { max-complexity = 13 } -[tool.mypy] -show_error_codes = true -follow_imports = 'silent' -strict_optional = true -warn_redundant_casts = true -warn_unused_ignores = true -disallow_any_generics = true -check_untyped_defs = true -no_implicit_reexport = true -warn_unused_configs = true -disallow_subclassing_any = true -disallow_incomplete_defs = true -disallow_untyped_decorators = true -disallow_untyped_calls = true - -# for strict mypy: (this is the tricky one :-)) -disallow_untyped_defs = true +[tool.ruff.format] +quote-style = 'single' -# remaining arguments from `mypy --strict` which cause errors -#no_implicit_optional = true -#warn_return_any = true +[tool.mypy] +strict = true diff --git a/requirements/docs.txt b/requirements/docs.txt index 7473b36b..2ce9f8c2 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -1,52 +1,48 @@ # -# This file is autogenerated by pip-compile with python 3.11 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: # -# pip-compile --output-file=requirements/docs.txt requirements/docs.in +# pip-compile --output-file=requirements/docs.txt --strip-extras requirements/docs.in # -alabaster==0.7.12 +alabaster==0.7.16 # via sphinx -babel==2.10.3 +babel==2.14.0 # via sphinx -certifi==2022.12.7 +certifi==2024.2.2 # via requests -charset-normalizer==2.1.1 +charset-normalizer==3.3.2 # via requests docutils==0.19 # via sphinx -idna==3.3 +idna==3.6 # via requests imagesize==1.4.1 # via sphinx -jinja2==3.0.3 +jinja2==3.1.3 # via sphinx -markupsafe==2.1.1 +markupsafe==2.1.5 # via jinja2 -packaging==21.3 +packaging==24.0 # via sphinx -pygments==2.13.0 +pygments==2.17.2 # via sphinx -pyparsing==3.0.9 - # via packaging -pytz==2022.2.1 - # via babel -requests==2.28.1 +requests==2.31.0 # via sphinx snowballstemmer==2.2.0 # via sphinx -sphinx==5.1.1 +sphinx==5.3.0 # via -r requirements/docs.in -sphinxcontrib-applehelp==1.0.2 +sphinxcontrib-applehelp==1.0.8 # via sphinx -sphinxcontrib-devhelp==1.0.2 +sphinxcontrib-devhelp==1.0.6 # via sphinx -sphinxcontrib-htmlhelp==2.0.0 +sphinxcontrib-htmlhelp==2.0.5 # via sphinx sphinxcontrib-jsmath==1.0.1 # via sphinx -sphinxcontrib-qthelp==1.0.3 +sphinxcontrib-qthelp==1.0.7 # via sphinx -sphinxcontrib-serializinghtml==1.1.5 +sphinxcontrib-serializinghtml==1.1.10 # via sphinx -urllib3==1.26.12 +urllib3==2.2.1 # via requests diff --git a/requirements/linting.in b/requirements/linting.in index ae67c86a..72cdaa0d 100644 --- a/requirements/linting.in +++ b/requirements/linting.in @@ -1,7 +1,4 @@ -black>=22,<23 -flake8>=5,<6 -flake8-quotes>=3,<4 -isort[colors]>=5,<6 -mypy<1 +ruff +mypy types-pytz -types_redis>=4.2,<4.3 +types_redis diff --git a/requirements/linting.txt b/requirements/linting.txt index faf0a6ba..873ee0c1 100644 --- a/requirements/linting.txt +++ b/requirements/linting.txt @@ -1,42 +1,28 @@ # -# This file is autogenerated by pip-compile with python 3.11 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: # -# pip-compile --output-file=requirements/linting.txt requirements/linting.in +# pip-compile --output-file=requirements/linting.txt --strip-extras requirements/linting.in # -black==22.6.0 - # via -r requirements/linting.in -click==8.1.3 - # via black -colorama==0.4.5 - # via isort -flake8==5.0.4 +cffi==1.16.0 + # via cryptography +cryptography==42.0.5 # via - # -r requirements/linting.in - # flake8-quotes -flake8-quotes==3.3.1 - # via -r requirements/linting.in -isort[colors]==5.10.1 + # types-pyopenssl + # types-redis +mypy==1.9.0 # via -r requirements/linting.in -mccabe==0.7.0 - # via flake8 -mypy==0.971 +mypy-extensions==1.0.0 + # via mypy +pycparser==2.22 + # via cffi +ruff==0.3.4 # via -r requirements/linting.in -mypy-extensions==0.4.3 - # via - # black - # mypy -pathspec==0.9.0 - # via black -platformdirs==2.5.2 - # via black -pycodestyle==2.9.1 - # via flake8 -pyflakes==2.5.0 - # via flake8 -types-pytz==2022.2.1.0 +types-pyopenssl==24.0.0.20240311 + # via types-redis +types-pytz==2024.1.0.20240203 # via -r requirements/linting.in -types-redis==4.2.8 +types-redis==4.6.0.20240311 # via -r requirements/linting.in -typing-extensions==4.3.0 +typing-extensions==4.10.0 # via mypy diff --git a/requirements/pyproject.txt b/requirements/pyproject.txt index c2c38af6..17a8bb5a 100644 --- a/requirements/pyproject.txt +++ b/requirements/pyproject.txt @@ -1,24 +1,22 @@ # -# This file is autogenerated by pip-compile with python 3.11 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: # -# pip-compile --extra=watch --output-file=requirements/pyproject.txt pyproject.toml +# pip-compile --all-extras --output-file=requirements/pyproject.txt --strip-extras pyproject.toml # -anyio==3.6.1 +anyio==4.3.0 # via watchfiles -async-timeout==4.0.2 - # via redis -click==8.1.3 +click==8.1.7 # via arq (pyproject.toml) -hiredis==2.1.0 +hiredis==2.3.2 # via redis -idna==3.3 +idna==3.6 # via anyio -redis[hiredis]==4.4.0 +redis==4.6.0 # via arq (pyproject.toml) -sniffio==1.2.0 +sniffio==1.3.1 # via anyio -typing-extensions==4.3.0 +typing-extensions==4.10.0 # via arq (pyproject.toml) -watchfiles==0.16.1 +watchfiles==0.21.0 # via arq (pyproject.toml) diff --git a/requirements/testing.in b/requirements/testing.in index 5a32ec5d..eb019d9d 100644 --- a/requirements/testing.in +++ b/requirements/testing.in @@ -1,11 +1,10 @@ -coverage[toml]>=6,<7 -dirty-equals>=0.4,<1 -msgpack>=1,<2 -pydantic>=1.9.2,<2 -pytest>=7,<8 -pytest-asyncio>=0.20.3 -pytest-mock>=3,<4 -pytest-sugar>=0.9,<1 -pytest-timeout>=2,<3 +coverage[toml] +dirty-equals +msgpack +pydantic +pytest +pytest-asyncio +pytest-mock +pytest-pretty +pytest-timeout pytz -redislite diff --git a/requirements/testing.txt b/requirements/testing.txt index a639ab8a..77e186a6 100644 --- a/requirements/testing.txt +++ b/requirements/testing.txt @@ -1,64 +1,55 @@ # -# This file is autogenerated by pip-compile with python 3.11 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: # -# pip-compile --output-file=requirements/testing.txt requirements/testing.in +# pip-compile --output-file=requirements/testing.txt --strip-extras requirements/testing.in # -async-timeout==4.0.2 - # via redis -attrs==22.1.0 - # via pytest -coverage[toml]==6.4.4 +annotated-types==0.6.0 + # via pydantic +coverage==7.4.4 # via -r requirements/testing.in -dirty-equals==0.4 +dirty-equals==0.7.1.post0 # via -r requirements/testing.in -iniconfig==1.1.1 +iniconfig==2.0.0 # via pytest -msgpack==1.0.4 +markdown-it-py==3.0.0 + # via rich +mdurl==0.1.2 + # via markdown-it-py +msgpack==1.0.8 # via -r requirements/testing.in -packaging==21.3 - # via - # pytest - # pytest-sugar -pluggy==1.0.0 +packaging==24.0 # via pytest -psutil==5.9.1 - # via redislite -py==1.11.0 +pluggy==1.4.0 # via pytest -pydantic==1.9.2 +pydantic==2.6.4 # via -r requirements/testing.in -pyparsing==3.0.9 - # via packaging -pytest==7.1.2 +pydantic-core==2.16.3 + # via pydantic +pygments==2.17.2 + # via rich +pytest==8.1.1 # via # -r requirements/testing.in # pytest-asyncio # pytest-mock - # pytest-sugar + # pytest-pretty # pytest-timeout -pytest-asyncio==0.20.3 +pytest-asyncio==0.23.6 # via -r requirements/testing.in -pytest-mock==3.8.2 +pytest-mock==3.14.0 # via -r requirements/testing.in -pytest-sugar==0.9.5 +pytest-pretty==1.2.0 # via -r requirements/testing.in -pytest-timeout==2.1.0 +pytest-timeout==2.3.1 # via -r requirements/testing.in -pytz==2022.2.1 +pytz==2024.1 # via # -r requirements/testing.in # dirty-equals -redis==4.4.0 - # via redislite -redislite==6.2.805324 - # via -r requirements/testing.in -termcolor==1.1.0 - # via pytest-sugar -tomli==2.0.1 - # via pytest -typing-extensions==4.3.0 - # via pydantic - -# The following packages are considered to be unsafe in a requirements file: -# setuptools +rich==13.7.1 + # via pytest-pretty +typing-extensions==4.10.0 + # via + # pydantic + # pydantic-core diff --git a/tests/check_tag.py b/tests/check_tag.py deleted file mode 100755 index 210b9bbc..00000000 --- a/tests/check_tag.py +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys - -from arq.version import VERSION - -git_tag = os.getenv('TRAVIS_TAG') -if git_tag: - if git_tag.lower().lstrip('v') != str(VERSION).lower(): - print('โœ– "TRAVIS_TAG" environment variable does not match arq.version: "%s" vs. "%s"' % (git_tag, VERSION)) - sys.exit(1) - else: - print('โœ“ "TRAVIS_TAG" environment variable matches arq.version: "%s" vs. "%s"' % (git_tag, VERSION)) -else: - print('โœ“ "TRAVIS_TAG" not defined') diff --git a/tests/conftest.py b/tests/conftest.py index 755aeec6..3b050be5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,7 +5,6 @@ import msgpack import pytest -from redislite import Redis from arq.connections import ArqRedis, create_pool from arq.worker import Worker @@ -31,13 +30,6 @@ async def arq_redis(loop): await redis_.close(close_connection_pool=True) -@pytest.fixture -async def unix_socket_path(loop, tmp_path): - rdb = Redis(str(tmp_path / 'redis_test.db')) - yield rdb.socket_file - rdb.close() - - @pytest.fixture async def arq_redis_msgpack(loop): redis_ = ArqRedis( diff --git a/tests/test_jobs.py b/tests/test_jobs.py index f8f6c8c4..c30113d7 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -110,17 +110,6 @@ async def foobar(ctx, *args, **kwargs): ] -async def test_enqueue_job_with_unix_socket(worker, unix_socket_path): - """Test initializing arq_redis using a unix socket connection, and the worker using it.""" - settings = RedisSettings(unix_socket_path=unix_socket_path) - arq_redis = await create_pool(settings, default_queue_name='socket_queue') - await test_enqueue_job( - arq_redis, - lambda functions, **_: worker(functions=functions, arq_redis=arq_redis, queue_name=None), - queue_name=None, - ) - - async def test_enqueue_job_alt_queue(arq_redis: ArqRedis, worker): await test_enqueue_job(arq_redis, worker, queue_name='custom_queue') diff --git a/tests/test_utils.py b/tests/test_utils.py index e499d85f..997c137d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,7 +4,7 @@ from datetime import timedelta import pytest -from pydantic import BaseModel, validator +from pydantic import BaseModel, field_validator from redis.asyncio import ConnectionError, ResponseError import arq.typing @@ -112,7 +112,7 @@ def test_redis_settings_validation(): class Settings(BaseModel): redis_settings: RedisSettings - @validator('redis_settings', always=True, pre=True) + @field_validator('redis_settings', mode='before') def parse_redis_settings(cls, v): if isinstance(v, str): return RedisSettings.from_dsn(v) @@ -129,7 +129,7 @@ def parse_redis_settings(cls, v): assert s2.redis_settings.host == 'testing.com' assert s2.redis_settings.port == 6379 - with pytest.raises(ValueError, match='1 validation error for Settings\nredis_settings -> ssl'): + with pytest.raises(ValueError, match='1 validation error for Settings\nredis_settings.ssl'): Settings(redis_settings={'ssl': 123}) s3 = Settings(redis_settings={'ssl': True}) diff --git a/tests/test_worker.py b/tests/test_worker.py index 23dd91d2..192a0d87 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,7 +4,7 @@ import re import signal import sys -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from unittest.mock import MagicMock import msgpack @@ -760,8 +760,12 @@ async def foo(ctx, v): assert worker.jobs_complete == 1 assert worker.jobs_retried == 0 assert worker.jobs_failed == 0 - assert 'foo(1)' in caplog.text - assert 'foo(2)' not in caplog.text + # either foo(1) or foo(2) can be run, but not both + if 'foo(1)' in caplog.text: + assert 'foo(2)' not in caplog.text + else: + assert 'foo(2)' in caplog.text + assert 'foo(1)' not in caplog.text async def test_max_bursts_dont_get(arq_redis: ArqRedis, worker): @@ -879,7 +883,9 @@ async def longfunc(ctx): caplog.set_level(logging.INFO) - job = await arq_redis.enqueue_job('longfunc', _job_id='testing', _defer_until=datetime.utcnow() + timedelta(days=1)) + job = await arq_redis.enqueue_job( + 'longfunc', _job_id='testing', _defer_until=datetime.now(timezone.utc) + timedelta(days=1) + ) worker: Worker = worker(functions=[func(longfunc, name='longfunc')], allow_abort_jobs=True, poll_delay=0.1) assert worker.jobs_complete == 0 From 8fe4fc555e7e9d32054695ea6b54671051608763 Mon Sep 17 00:00:00 2001 From: mernmic <92449806+mernmic@users.noreply.github.com> Date: Mon, 1 Apr 2024 07:31:04 -0500 Subject: [PATCH 08/10] Extend RedisSettings to include redis Retry Helper settings (#387) * extend RedisSettings retry settings * fix type and settings test * add redis.Retry type * fix test to allow arbitrary types * add testing for retry settings * update tests * granular patch handling * update comment * stop patch when exists * update retry type to asyncio * chore: test cleanup * fix exception type --------- Co-authored-by: Samuel Colvin --- arq/connections.py | 8 +++++ tests/conftest.py | 40 ++++++++++++++++++++++ tests/test_utils.py | 4 +-- tests/test_worker.py | 79 +++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 128 insertions(+), 3 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index ec11b8c7..8aac55ff 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -9,6 +9,7 @@ from uuid import uuid4 from redis.asyncio import ConnectionPool, Redis +from redis.asyncio.retry import Retry from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError @@ -47,6 +48,10 @@ class RedisSettings: sentinel: bool = False sentinel_master: str = 'mymaster' + retry_on_timeout: bool = False + retry_on_error: Optional[List[Exception]] = None + retry: Optional[Retry] = None + @classmethod def from_dsn(cls, dsn: str) -> 'RedisSettings': conf = urlparse(dsn) @@ -254,6 +259,9 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: ssl_ca_certs=settings.ssl_ca_certs, ssl_ca_data=settings.ssl_ca_data, ssl_check_hostname=settings.ssl_check_hostname, + retry=settings.retry, + retry_on_timeout=settings.retry_on_timeout, + retry_on_error=settings.retry_on_error, ) while True: diff --git a/tests/conftest.py b/tests/conftest.py index 3b050be5..b9332eed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,9 @@ import msgpack import pytest +import redis.exceptions +from redis.asyncio.retry import Retry +from redis.backoff import NoBackoff from arq.connections import ArqRedis, create_pool from arq.worker import Worker @@ -44,6 +47,21 @@ async def arq_redis_msgpack(loop): await redis_.close(close_connection_pool=True) +@pytest.fixture +async def arq_redis_retry(loop): + redis_ = ArqRedis( + host='localhost', + port=6379, + encoding='utf-8', + retry=Retry(backoff=NoBackoff(), retries=3), + retry_on_timeout=True, + retry_on_error=[redis.exceptions.ConnectionError], + ) + await redis_.flushall() + yield redis_ + await redis_.close(close_connection_pool=True) + + @pytest.fixture async def worker(arq_redis): worker_: Worker = None @@ -61,6 +79,28 @@ def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_re await worker_.close() +@pytest.fixture +async def worker_retry(arq_redis_retry): + worker_retry_: Worker = None + + def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_redis_retry, **kwargs): + nonlocal worker_retry_ + worker_retry_ = Worker( + functions=functions, + redis_pool=arq_redis, + burst=burst, + poll_delay=poll_delay, + max_jobs=max_jobs, + **kwargs, + ) + return worker_retry_ + + yield create + + if worker_retry_: + await worker_retry_.close() + + @pytest.fixture(name='create_pool') async def fix_create_pool(loop): pools = [] diff --git a/tests/test_utils.py b/tests/test_utils.py index 997c137d..c1156db4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -21,7 +21,7 @@ def test_settings_changed(): "RedisSettings(host='localhost', port=123, unix_socket_path=None, database=0, username=None, password=None, " "ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, " 'ssl_ca_data=None, ssl_check_hostname=False, conn_timeout=1, conn_retries=5, conn_retry_delay=1, ' - "sentinel=False, sentinel_master='mymaster')" + "sentinel=False, sentinel_master='mymaster', retry_on_timeout=False, retry_on_error=None, retry=None)" ) == str(settings) @@ -109,7 +109,7 @@ def test_typing(): def test_redis_settings_validation(): - class Settings(BaseModel): + class Settings(BaseModel, arbitrary_types_allowed=True): redis_settings: RedisSettings @field_validator('redis_settings', mode='before') diff --git a/tests/test_worker.py b/tests/test_worker.py index 192a0d87..a25f0f1d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,10 +5,11 @@ import signal import sys from datetime import datetime, timedelta, timezone -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import msgpack import pytest +import redis.exceptions from arq.connections import ArqRedis, RedisSettings from arq.constants import abort_jobs_ss, default_queue_name, expires_extra_ms, health_check_key_suffix, job_key_prefix @@ -1024,3 +1025,79 @@ async def test_worker_timezone_defaults_to_system_timezone(worker): worker = worker(functions=[func(foobar)]) assert worker.timezone is not None assert worker.timezone == datetime.now().astimezone().tzinfo + + +@pytest.mark.parametrize( + 'exception_thrown', + [ + redis.exceptions.ConnectionError('Error while reading from host'), + redis.exceptions.TimeoutError('Timeout reading from host'), + ], +) +async def test_worker_retry(mocker, worker_retry, exception_thrown): + # Testing redis exceptions, with retry settings specified + worker = worker_retry(functions=[func(foobar)]) + + # patch db read_response to mimic connection exceptions + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + + # baseline + await worker.main() + await worker._poll_iteration() + + # spy method handling call_with_retry failure + spy = mocker.spy(worker.pool, '_disconnect_raise') + + try: + # start patch + p.start() + + # assert exception thrown + with pytest.raises(type(exception_thrown)): + await worker._poll_iteration() + + # assert retry counts and no exception thrown during '_disconnect_raise' + assert spy.call_count == 4 # retries setting + 1 + assert spy.spy_exception is None + + finally: + # stop patch to allow worker cleanup + p.stop() + + +@pytest.mark.parametrize( + 'exception_thrown', + [ + redis.exceptions.ConnectionError('Error while reading from host'), + redis.exceptions.TimeoutError('Timeout reading from host'), + ], +) +async def test_worker_crash(mocker, worker, exception_thrown): + # Testing redis exceptions, no retry settings specified + worker = worker(functions=[func(foobar)]) + + # patch db read_response to mimic connection exceptions + p = patch.object(worker.pool.connection_pool.connection_class, 'read_response', side_effect=exception_thrown) + + # baseline + await worker.main() + await worker._poll_iteration() + + # spy method handling call_with_retry failure + spy = mocker.spy(worker.pool, '_disconnect_raise') + + try: + # start patch + p.start() + + # assert exception thrown + with pytest.raises(type(exception_thrown)): + await worker._poll_iteration() + + # assert no retry counts and exception thrown during '_disconnect_raise' + assert spy.call_count == 1 + assert spy.spy_exception == exception_thrown + + finally: + # stop patch to allow worker cleanup + p.stop() From d4a37f3e906bfa6193587a66687b5e7c3f5f6875 Mon Sep 17 00:00:00 2001 From: Victor Golovanenko <43933400+drygdryg@users.noreply.github.com> Date: Mon, 1 Apr 2024 17:22:36 +0300 Subject: [PATCH 09/10] =?UTF-8?q?Fix=20connections.py:=20allow=20to=20conn?= =?UTF-8?q?ect=20to=20Redis=20using=20a=20Unix=20socket=20URL=E2=80=A6=20(?= =?UTF-8?q?#392)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix connections.py: allow to connect to Redis using a Unix socket URL without specifying the database number * add tests --------- Co-authored-by: Samuel Colvin --- arq/connections.py | 4 +++- tests/test_utils.py | 56 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index 8aac55ff..c79ec534 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -61,8 +61,10 @@ def from_dsn(cls, dsn: str) -> 'RedisSettings': if query_db: # e.g. redis://localhost:6379?db=1 database = int(query_db[0]) - else: + elif conf.scheme != 'unix': database = int(conf.path.lstrip('/')) if conf.path else 0 + else: + database = 0 return RedisSettings( host=conf.hostname or 'localhost', port=conf.port or 6379, diff --git a/tests/test_utils.py b/tests/test_utils.py index c1156db4..74b90eca 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,7 @@ import logging import re import sys +from dataclasses import asdict from datetime import timedelta import pytest @@ -197,3 +198,58 @@ def test_import_string_invalid_short(): def test_import_string_invalid_missing(): with pytest.raises(ImportError, match='Module "math" does not define a "foobar" attribute'): arq.utils.import_string('math.foobar') + + +def test_settings_plain(): + settings = RedisSettings() + assert asdict(settings) == { + 'host': 'localhost', + 'port': 6379, + 'unix_socket_path': None, + 'database': 0, + 'username': None, + 'password': None, + 'ssl': False, + 'ssl_keyfile': None, + 'ssl_certfile': None, + 'ssl_cert_reqs': 'required', + 'ssl_ca_certs': None, + 'ssl_ca_data': None, + 'ssl_check_hostname': False, + 'conn_timeout': 1, + 'conn_retries': 5, + 'conn_retry_delay': 1, + 'sentinel': False, + 'sentinel_master': 'mymaster', + 'retry_on_timeout': False, + 'retry_on_error': None, + 'retry': None, + } + + +def test_settings_from_socket_dsn(): + settings = RedisSettings.from_dsn('unix:///run/redis/redis.sock') + # insert_assert(asdict(settings)) + assert asdict(settings) == { + 'host': 'localhost', + 'port': 6379, + 'unix_socket_path': '/run/redis/redis.sock', + 'database': 0, + 'username': None, + 'password': None, + 'ssl': False, + 'ssl_keyfile': None, + 'ssl_certfile': None, + 'ssl_cert_reqs': 'required', + 'ssl_ca_certs': None, + 'ssl_ca_data': None, + 'ssl_check_hostname': False, + 'conn_timeout': 1, + 'conn_retries': 5, + 'conn_retry_delay': 1, + 'sentinel': False, + 'sentinel_master': 'mymaster', + 'retry_on_timeout': False, + 'retry_on_error': None, + 'retry': None, + } From 3de6d8724e25cce5cf5086a03b3452fcb3258d51 Mon Sep 17 00:00:00 2001 From: vvmruder Date: Mon, 1 Apr 2024 16:23:55 +0200 Subject: [PATCH 10/10] Update connections.py (#396) Allows values like `float('inf')` to retry infinite. This can be useful in cluster contexts where a redis service might temporarily not available for some reason but you might not want the worker to crash on that but waiting until redis is available again --- arq/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index c79ec534..63c7ab82 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -280,7 +280,7 @@ def pool_factory(*args: Any, **kwargs: Any) -> ArqRedis: except (ConnectionError, OSError, RedisError, asyncio.TimeoutError) as e: if retry < settings.conn_retries: logger.warning( - 'redis connection error %s:%s %s %s, %d retries remaining...', + 'redis connection error %s:%s %s %s, %s retries remaining...', settings.host, settings.port, e.__class__.__name__,