Skip to content

Commit

Permalink
Adds timeout and reincarnation test
Browse files Browse the repository at this point in the history
Also found a bug because of this.
  • Loading branch information
Koed00 committed Jul 2, 2015
1 parent a9a9d8e commit 5399865
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
14 changes: 8 additions & 6 deletions django_q/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(self, list_key=Conf.Q_LIST):
self.pid = current_process().pid
self.host = socket.gethostname()
self.list_key = list_key
self.timeout = Conf.TIMEOUT
signal.signal(signal.SIGTERM, self.sig_handler)
signal.signal(signal.SIGINT, self.sig_handler)

Expand All @@ -79,7 +80,7 @@ def dummy_close():
# Start Sentinel
self.stop_event = Event()
self.start_event = Event()
self.sentinel = Process(target=Sentinel, args=(self.stop_event, self.start_event, self.list_key))
self.sentinel = Process(target=Sentinel, args=(self.stop_event, self.start_event, self.list_key, self.timeout))
self.sentinel.start()
logger.info('Q Cluster-{} starting.'.format(self.pid))
while not self.start_event.is_set():
Expand Down Expand Up @@ -129,7 +130,7 @@ def is_idle(self):


class Sentinel(object):
def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, start=True):
def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, timeout=Conf.TIMEOUT, start=True):
# Make sure we catch signals for the pool
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
Expand All @@ -144,6 +145,7 @@ def __init__(self, stop_event, start_event, list_key=Conf.Q_LIST, start=True):
self.start_event = start_event
self.pool_size = Conf.WORKERS
self.pool = []
self.timeout = timeout
self.task_queue = Queue()
self.done_queue = Queue()
self.event_out = Event()
Expand Down Expand Up @@ -206,7 +208,7 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if int(process.timer.value) >= Conf.TIMEOUT:
if int(process.timer.value) >= self.timeout:
logger.warn("reincarnated worker {} after timeout.".format(process.pid))
else:
logger.error("reincarnated worker {} after sudden death.".format(process.pid))
Expand All @@ -231,18 +233,18 @@ def guard(self):
# Check Workers
for p in self.pool:
# Are you alive?
if not p.is_alive() or (Conf.TIMEOUT and int(p.timer.value) >= Conf.TIMEOUT):
if not p.is_alive() or (self.timeout and int(p.timer.value) >= self.timeout):
self.reincarnate(p)
continue
# Increment timer if work is being done
if p.timer.value >= 0:
p.timer.value += 1
# Check Monitor
if not self.monitor.is_alive():
self.reincarnate(self.monitor.pid)
self.reincarnate(p)
# Check Pusher
if not self.pusher.is_alive():
self.reincarnate(self.monitor.pid)
self.reincarnate(p)
# Call scheduler once a minute (or so)
counter += 1
if counter > 60:
Expand Down
8 changes: 8 additions & 0 deletions django_q/tests/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# simple countdown, returns nothing
from time import sleep


def countdown(n):
while n > 0:
n -= 1
Expand All @@ -23,6 +26,11 @@ def word_multiply(x, word=''):
return len(word) * x


def count_forever():
while True:
sleep(0.5)


def get_task_name(task):
return task.name

Expand Down
16 changes: 16 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import sys
import os
from multiprocessing import Queue, Event, Value
import threading
from time import sleep

import pytest

Expand Down Expand Up @@ -181,6 +183,20 @@ def test_async(r):
assert result(h) == 12
r.delete(list_key)

@pytest.mark.django_db
def test_timeout(r):
# set up the Sentinel
list_key = 'timeout_test:q'
async('django_q.tests.tasks.count_forever', list_key=list_key)
start_event = Event()
stop_event = Event()
# Set a timer to stop the Sentinel
threading.Timer(3, stop_event.set).start()
s = Sentinel(stop_event, start_event, list_key=list_key, timeout=1)
assert start_event.is_set()
assert s.status() == Conf.STOPPED
assert s.reincarnations == 1


@pytest.mark.django_db
def assert_result(task):
Expand Down

0 comments on commit 5399865

Please sign in to comment.