Skip to content

Commit

Permalink
Fix rq exception handling for the failing state (#1548)
Browse files Browse the repository at this point in the history
  • Loading branch information
robhudson authored Nov 14, 2024
1 parent 454f126 commit 98d0bc8
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
19 changes: 14 additions & 5 deletions basket/base/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests
import sentry_sdk
from rq import Callback, Retry, SimpleWorker
from rq.job import JobStatus
from rq.queue import Queue
from rq.serializers import JSONSerializer
from silverpop.api import SilverpopResponseException
Expand Down Expand Up @@ -174,11 +175,16 @@ def store_task_exception_handler(job, *exc_info):
if task_name.endswith("snitch"):
return

# A job will retry if it's failed but not yet reached the max retries.
# We know when a job is going to be retried if the status is `is_scheduled`, otherwise the
# status is set to `is_failed`.
# NOTE: We are deliberately not using `job.is_scheduled` or `job.is_failed` properties because
# they trigger a `get_status` call, which refreshes the status from Redis by default. Since the code
# is modifying the `job._status` property directly, Redis does not accurately reflect the job's
# status until processing completes. This custom exception handler is triggered in the middle of
# that process, so we must access the `_status` property directly to get the current state.

if job.is_scheduled:
# A job will retry if it has failed but has not yet reached the maximum number of retries.
# A job is scheduled for retry when its status is `SCHEDULED`; otherwise, its status is set to `FAILED`.

if job._status == JobStatus.SCHEDULED:
# Job failed but is scheduled for a retry.
metrics.incr("base.tasks.retried", tags=[f"task:{task_name}"])

Expand All @@ -194,7 +200,7 @@ def store_task_exception_handler(job, *exc_info):
scope.set_tag("action", "retried")
sentry_sdk.capture_exception()

elif job.is_failed:
elif job._status == JobStatus.FAILED:
# Job failed but no retries left.
metrics.incr("base.tasks.failed", tags=[f"task:{task_name}"])

Expand All @@ -221,3 +227,6 @@ def store_task_exception_handler(job, *exc_info):
with sentry_sdk.isolation_scope() as scope:
scope.set_tag("action", "failed")
sentry_sdk.capture_exception()

# Returning `False`` prevents any subsequent exception handlers from running.
return False
29 changes: 16 additions & 13 deletions basket/base/tests/test_rq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def test_get_worker(self):
assert worker._exc_handlers == [store_task_exception_handler]
assert worker.serializer == JSONSerializer

@override_settings(RQ_EXCEPTION_HANDLERS=["basket.base.rq.store_task_exception_handler"])
@override_settings(
RQ_EXCEPTION_HANDLERS=["basket.base.rq.store_task_exception_handler"],
RQ_IS_ASYNC=True,
)
def test_on_failure(self, metricsmock):
"""
Test that the on_failure function creates a FailedTask object and sends
Expand All @@ -97,22 +100,22 @@ def test_on_failure(self, metricsmock):
job = failing_job.delay(*args, **kwargs)

worker = get_worker()
assert worker._exc_handlers == [store_task_exception_handler]
worker.work(burst=True) # Burst = worker will quit after all jobs consumed.

assert job.is_failed

# TODO: Determine why the `store_task_exception_handler` is not called.
# assert FailedTask.objects.count() == 1
# fail = FailedTask.objects.get()
# assert fail.name == "news.tasks.failing_job"
# assert fail.task_id is not None
# assert fail.args == args
# assert fail.kwargs == kwargs
# assert fail.exc == 'ValueError("An exception to trigger the failure handler.")'
# assert "Traceback (most recent call last):" in fail.einfo
# assert "ValueError: An exception to trigger the failure handler." in fail.einfo
# metricsmock.assert_incr_once("news.tasks.failure_total")
# metricsmock.assert_incr_once("news.tasks.failing_job.failure")
assert FailedTask.objects.count() == 1
fail = FailedTask.objects.get()
assert fail.name == "basket.base.tests.tasks.failing_job"
assert fail.task_id is not None
assert fail.args == args
assert fail.kwargs == kwargs
assert fail.exc == "ValueError('An exception to trigger the failure handler.')"
assert "Traceback (most recent call last):" in fail.einfo
assert "ValueError: An exception to trigger the failure handler." in fail.einfo
metricsmock.assert_timing_once("task.timings", tags=["task:basket.base.tests.tasks.failing_job", "status:failure"])
metricsmock.assert_incr_once("base.tasks.failed", tags=["task:basket.base.tests.tasks.failing_job"])

@override_settings(MAINTENANCE_MODE=True)
def test_on_failure_maintenance(self, metricsmock):
Expand Down

0 comments on commit 98d0bc8

Please sign in to comment.