diff --git a/django_q/cluster.py b/django_q/cluster.py index 9b0113f1..7ad1e7d0 100644 --- a/django_q/cluster.py +++ b/django_q/cluster.py @@ -1,6 +1,7 @@ # Standard import ast -import importlib +import inspect +import pydoc import signal import socket import traceback @@ -416,31 +417,22 @@ def worker( f = task["func"] # if it's not an instance try to get it from the string if not callable(task["func"]): - try: - module, func = f.rsplit(".", 1) - m = importlib.import_module(module) - f = getattr(m, func) - except (ValueError, ImportError, AttributeError) as e: - result = (e, False) - if error_reporter: - error_reporter.report() - # We're still going - if not result: - close_old_django_connections() - timer_value = task.pop("timeout", timeout) - # signal execution - pre_execute.send(sender="django_q", func=f, task=task) - # execute the payload - timer.value = timer_value # Busy - try: - res = f(*task["args"], **task["kwargs"]) - result = (res, True) - except Exception as e: - result = (f"{e} : {traceback.format_exc()}", False) - if error_reporter: - error_reporter.report() - if task.get("sync", False): - raise + f = pydoc.locate(f) + close_old_django_connections() + timer_value = task.pop("timeout", timeout) + # signal execution + pre_execute.send(sender="django_q", func=f, task=task) + # execute the payload + timer.value = timer_value # Busy + try: + res = f(*task["args"], **task["kwargs"]) + result = (res, True) + except Exception as e: + result = (f"{e} : {traceback.format_exc()}", False) + if error_reporter: + error_reporter.report() + if task.get("sync", False): + raise with timer.get_lock(): # Process result task["result"] = result[0] @@ -495,10 +487,19 @@ def save_task(task, broker: Broker): broker.acknowledge(task['ack_id']) else: + func = task["func"] + # convert func to string + if inspect.isfunction(func): + func = f"{func.__module__}.{func.__name__}" + elif inspect.ismethod(func): + func = ( + f'{func.__self__.__module__}.' + f'{func.__self__.__name__}.{func.__name__}' + ) Task.objects.create( id=task["id"], name=task["name"], - func=task["func"], + func=func, hook=task.get("hook"), args=task["args"], kwargs=task["kwargs"],