From 6131cf146cbe224fe31abe15c4ab326a88777f2b Mon Sep 17 00:00:00 2001 From: Florian Mounier Date: Tue, 4 Jun 2024 10:34:56 +0200 Subject: [PATCH] [IMP] queue_job: Add split method --- queue_job/delay.py | 51 +++++++++++++++++++++++- queue_job/models/base.py | 2 + test_queue_job/tests/test_delay_mocks.py | 1 + 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/queue_job/delay.py b/queue_job/delay.py index 4b2ed5c001..5eac64b069 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -534,6 +534,45 @@ def delay(self): """Delay the whole graph""" self._graph.delay() + def split(self, size=None): + """Split the Delayable into a DelayableGroup containing batches + of size `size` + """ + + total_records = len(self.recordset) + count = 1 + total_records // size + + delayables = [] + for batch in range(count): + start = batch * size + end = min((batch + 1) * size, total_records) + if end > start: + recordset = self.recordset[start:end] + delayable = Delayable( + recordset, + priority=self.priority, + eta=self.eta, + max_retries=self.max_retries, + description="%s (split %d/%d)" + % (self.description or "/", batch + 1, count), + channel=self.channel, + identity_key=self.identity_key, + ) + if self._job_method: + # Update the __self__ + delayable._job_method = getattr( + recordset, self._job_method.__name__ + ) + delayable._job_args = self._job_args + delayable._job_kwargs = self._job_kwargs + + delayables.append(delayable) + + # Prevent warning on deletion + self._generated_job = True + + return DelayableGroup(*delayables) + def _build_job(self): if self._generated_job: return self._generated_job @@ -586,7 +625,7 @@ class DelayableRecordset(object): by :meth:`~odoo.addons.queue_job.models.base.Base.with_delay` """ - __slots__ = ("delayable",) + __slots__ = ("delayable", "split") def __init__( self, @@ -597,6 +636,7 @@ def __init__( description=None, channel=None, identity_key=None, + split=None, ): self.delayable = Delayable( recordset, @@ -607,6 +647,7 @@ def __init__( channel=channel, identity_key=identity_key, ) + self.split = split @property def recordset(self): @@ -614,7 +655,13 @@ def recordset(self): def __getattr__(self, name): def _delay_delayable(*args, **kwargs): - getattr(self.delayable, name)(*args, **kwargs).delay() + delayable = getattr(self.delayable, name)(*args, **kwargs) + if self.split: + group = delayable.split(self.split) + group.delay() + return [d._generated_job for d in group._delayables] + + delayable.delay() return self.delayable._generated_job return _delay_delayable diff --git a/queue_job/models/base.py b/queue_job/models/base.py index 16f106450a..fc98087853 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -29,6 +29,7 @@ def with_delay( description=None, channel=None, identity_key=None, + split=None, ): """Return a ``DelayableRecordset`` @@ -62,6 +63,7 @@ def with_delay( description=description, channel=channel, identity_key=identity_key, + split=split, ) def delayable( diff --git a/test_queue_job/tests/test_delay_mocks.py b/test_queue_job/tests/test_delay_mocks.py index d513a90797..5a775e684b 100644 --- a/test_queue_job/tests/test_delay_mocks.py +++ b/test_queue_job/tests/test_delay_mocks.py @@ -285,6 +285,7 @@ def test_mock_with_delay(self): "identity_key": identity_exact, "max_retries": 1, "priority": 15, + "split": None, }, )