diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index c06f7b49d8..af56dca086 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -2,12 +2,14 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) import logging +import os from functools import total_ordering from heapq import heappop, heappush from weakref import WeakValueDictionary from ..exception import ChannelNotFound from ..job import CANCELLED, DONE, ENQUEUED, FAILED, PENDING, STARTED, WAIT_DEPENDENCIES +from . import queue_job_config NOT_DONE = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED, FAILED) @@ -411,7 +413,7 @@ def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): self._running = SafeSet() self._failed = SafeSet() self._pause_until = 0 # utc seconds since the epoch - self.capacity = capacity + self.capacity = capacity or _default_subchannel_capacity() self.throttle = throttle # seconds self.sequential = sequential @@ -933,7 +935,7 @@ def get_channel_from_config(self, config): If the channel does not exist it is created. The configuration is applied on the channel before returning it. If some of the parent channels are missing when creating a subchannel, - the parent channels are auto created with an infinite capacity + the parent channels are auto created with the default subchannel capacity (except for the root channel, which defaults to a capacity of 1 when not configured explicity). """ @@ -1077,3 +1079,11 @@ def get_jobs_to_run(self, now): def get_wakeup_time(self): return self._root_channel.get_wakeup_time() + + +def _default_subchannel_capacity(): + capacity = os.environ.get( + "ODOO_QUEUE_JOB_DEFAULT_SUBCHANNEL_CAPACITY" + ) or queue_job_config.get("default_subchannel_capacity") + if capacity: + return int(capacity) diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index b9547b9465..95fff95648 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -5,6 +5,9 @@ - ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels configuration. The default is ``root:1`` + - ``ODOO_QUEUE_JOB_DEFAULT_SUBCHANNEL_CAPACITY=1`` + The default is infinite capacity. + - if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069`` * Start Odoo with ``--load=web,queue_job`` @@ -23,6 +26,7 @@ (...) [queue_job] channels = root:2 + # default_subchannel_capacity = 1 * Confirm the runner is starting correctly by checking the odoo log file: diff --git a/queue_job/tests/test_runner_channels.py b/queue_job/tests/test_runner_channels.py index d323d00683..872550521a 100644 --- a/queue_job/tests/test_runner_channels.py +++ b/queue_job/tests/test_runner_channels.py @@ -1,6 +1,11 @@ # Copyright 2015-2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +import os +from unittest.mock import patch + +from odoo.tests import BaseCase + # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API from odoo.addons.queue_job.jobrunner import channels @@ -8,3 +13,16 @@ from .common import load_doctests load_tests = load_doctests(channels) + + +class TestDefaultSubchannelCapacity(BaseCase): + @patch.dict(os.environ, {"ODOO_QUEUE_JOB_DEFAULT_SUBCHANNEL_CAPACITY": "1"}) + def test_default_subchannel_capacity_env(self): + self.assertEqual(channels._default_subchannel_capacity(), 1) + + @patch.dict(channels.queue_job_config, {"default_subchannel_capacity": "1"}) + def test_default_subchannel_capacity_conf(self): + self.assertEqual(channels._default_subchannel_capacity(), 1) + + def test_default_subchannel_capacity_omit(self): + self.assertIs(channels._default_subchannel_capacity(), None)