diff --git a/django_q/core.py b/django_q/core.py index 9bf6b0d8..5101eeb0 100644 --- a/django_q/core.py +++ b/django_q/core.py @@ -66,6 +66,7 @@ def __init__(self, list_key=Conf.Q_LIST): self.pid = current_process().pid self.host = socket.gethostname() self.list_key = list_key + self.timeout = Conf.TIMEOUT signal.signal(signal.SIGTERM, self.sig_handler) signal.signal(signal.SIGINT, self.sig_handler) @@ -79,7 +80,7 @@ def dummy_close(): # Start Sentinel self.stop_event = Event() self.start_event = Event() - self.sentinel = Process(target=Sentinel, args=(self.stop_event, self.start_event, self.list_key)) + self.sentinel = Process(target=Sentinel, args=(self.stop_event, self.start_event, self.list_key, self.timeout)) self.sentinel.start() logger.info('Q Cluster-{} starting.'.format(self.pid)) while not self.start_event.is_set(): @@ -129,7 +130,7 @@ def is_idle(self): class Sentinel(object): - def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, start=True): + def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, timeout=Conf.TIMEOUT, start=True): # Make sure we catch signals for the pool signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -144,6 +145,7 @@ def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, start=True): self.start_event = start_event self.pool_size = Conf.WORKERS self.pool = [] + self.timeout = timeout self.task_queue = Queue() self.done_queue = Queue() self.event_out = Event() @@ -206,7 +208,7 @@ def reincarnate(self, process): else: self.pool.remove(process) self.spawn_worker() - if int(process.timer.value) >= Conf.TIMEOUT: + if int(process.timer.value) >= self.timeout: logger.warn("reincarnated worker {} after timeout.".format(process.pid)) else: logger.error("reincarnated worker {} after sudden death.".format(process.pid)) @@ -231,7 +233,7 @@ def guard(self): # Check Workers for p in self.pool: # Are you alive? - if not p.is_alive() or (Conf.TIMEOUT and int(p.timer.value) >= Conf.TIMEOUT): + if not p.is_alive() or (self.timeout and int(p.timer.value) >= self.timeout): self.reincarnate(p) continue # Increment timer if work is being done @@ -239,10 +241,10 @@ def guard(self): p.timer.value += 1 # Check Monitor if not self.monitor.is_alive(): - self.reincarnate(self.monitor.pid) + self.reincarnate(p) # Check Pusher if not self.pusher.is_alive(): - self.reincarnate(self.monitor.pid) + self.reincarnate(p) # Call scheduler once a minute (or so) counter += 1 if counter > 60: diff --git a/django_q/tests/tasks.py b/django_q/tests/tasks.py index a03f317a..1263a817 100644 --- a/django_q/tests/tasks.py +++ b/django_q/tests/tasks.py @@ -1,4 +1,7 @@ # simple countdown, returns nothing +from time import sleep + + def countdown(n): while n > 0: n -= 1 @@ -23,6 +26,11 @@ def word_multiply(x, word=''): return len(word) * x +def count_forever(): + while True: + sleep(0.5) + + def get_task_name(task): return task.name diff --git a/django_q/tests/test_cluster.py b/django_q/tests/test_cluster.py index 57552cba..d9abf177 100644 --- a/django_q/tests/test_cluster.py +++ b/django_q/tests/test_cluster.py @@ -1,6 +1,8 @@ import sys import os from multiprocessing import Queue, Event, Value +import threading +from time import sleep import pytest @@ -181,6 +183,20 @@ def test_async(r): assert result(h) == 12 r.delete(list_key) +@pytest.mark.django_db +def test_timeout(r): + # set up the Sentinel + list_key = 'timeout_test:q' + async('django_q.tests.tasks.count_forever', list_key=list_key) + start_event = Event() + stop_event = Event() + # Set a timer to stop the Sentinel + threading.Timer(3, stop_event.set).start() + s = Sentinel(stop_event, start_event, list_key=list_key, timeout=1) + assert start_event.is_set() + assert s.status() == Conf.STOPPED + assert s.reincarnations == 1 + @pytest.mark.django_db def assert_result(task):