Skip to content

Commit

Permalink
Fixes #314 - Convert func to string before saving task in databse so …
Browse files Browse the repository at this point in the history
…that resubmitting failed task works (#554)
  • Loading branch information
kennyhei authored May 14, 2021
1 parent 728a1b8 commit 1eb5cf4
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Standard
import ast
import importlib
import inspect
import pydoc
import signal
import socket
import traceback
Expand Down Expand Up @@ -416,31 +417,22 @@ def worker(
f = task["func"]
# if it's not an instance try to get it from the string
if not callable(task["func"]):
try:
module, func = f.rsplit(".", 1)
m = importlib.import_module(module)
f = getattr(m, func)
except (ValueError, ImportError, AttributeError) as e:
result = (e, False)
if error_reporter:
error_reporter.report()
# We're still going
if not result:
close_old_django_connections()
timer_value = task.pop("timeout", timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy
try:
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
result = (f"{e} : {traceback.format_exc()}", False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
f = pydoc.locate(f)
close_old_django_connections()
timer_value = task.pop("timeout", timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy
try:
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
result = (f"{e} : {traceback.format_exc()}", False)
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
with timer.get_lock():
# Process result
task["result"] = result[0]
Expand Down Expand Up @@ -495,10 +487,19 @@ def save_task(task, broker: Broker):
broker.acknowledge(task['ack_id'])

else:
func = task["func"]
# convert func to string
if inspect.isfunction(func):
func = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
func = (
f'{func.__self__.__module__}.'
f'{func.__self__.__name__}.{func.__name__}'
)
Task.objects.create(
id=task["id"],
name=task["name"],
func=task["func"],
func=func,
hook=task.get("hook"),
args=task["args"],
kwargs=task["kwargs"],
Expand Down

0 comments on commit 1eb5cf4

Please sign in to comment.