Skip to content

Commit

Permalink
[IMP] queue_job: Add split method
Browse files Browse the repository at this point in the history
  • Loading branch information
paradoxxxzero committed Jun 4, 2024
1 parent 76a42e2 commit 6131cf1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
51 changes: 49 additions & 2 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,45 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()

def split(self, size=None):
"""Split the Delayable into a DelayableGroup containing batches
of size `size`
"""

total_records = len(self.recordset)
count = 1 + total_records // size

delayables = []
for batch in range(count):
start = batch * size
end = min((batch + 1) * size, total_records)
if end > start:
recordset = self.recordset[start:end]
delayable = Delayable(
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description="%s (split %d/%d)"
% (self.description or "/", batch + 1, count),
channel=self.channel,
identity_key=self.identity_key,
)
if self._job_method:
# Update the __self__
delayable._job_method = getattr(
recordset, self._job_method.__name__
)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

delayables.append(delayable)

# Prevent warning on deletion
self._generated_job = True

return DelayableGroup(*delayables)

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down Expand Up @@ -586,7 +625,7 @@ class DelayableRecordset(object):
by :meth:`~odoo.addons.queue_job.models.base.Base.with_delay`
"""

__slots__ = ("delayable",)
__slots__ = ("delayable", "split")

def __init__(
self,
Expand All @@ -597,6 +636,7 @@ def __init__(
description=None,
channel=None,
identity_key=None,
split=None,
):
self.delayable = Delayable(
recordset,
Expand All @@ -607,14 +647,21 @@ def __init__(
channel=channel,
identity_key=identity_key,
)
self.split = split

@property
def recordset(self):
return self.delayable.recordset

def __getattr__(self, name):
def _delay_delayable(*args, **kwargs):
getattr(self.delayable, name)(*args, **kwargs).delay()
delayable = getattr(self.delayable, name)(*args, **kwargs)
if self.split:
group = delayable.split(self.split)
group.delay()
return [d._generated_job for d in group._delayables]

delayable.delay()
return self.delayable._generated_job

return _delay_delayable
Expand Down
2 changes: 2 additions & 0 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def with_delay(
description=None,
channel=None,
identity_key=None,
split=None,
):
"""Return a ``DelayableRecordset``
Expand Down Expand Up @@ -62,6 +63,7 @@ def with_delay(
description=description,
channel=channel,
identity_key=identity_key,
split=split,
)

def delayable(
Expand Down
1 change: 1 addition & 0 deletions test_queue_job/tests/test_delay_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def test_mock_with_delay(self):
"identity_key": identity_exact,
"max_retries": 1,
"priority": 15,
"split": None,
},
)

Expand Down

0 comments on commit 6131cf1

Please sign in to comment.