Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15/advisability Changes #560

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
"category": "Generic Modules",
"depends": ["mail", "base_sparse_field"],
"depends": ["mail", "base_sparse_field", "alfaleads_utils"],
"external_dependencies": {"python": ["requests"]},
"data": [
"security/security.xml",
Expand Down
12 changes: 12 additions & 0 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
<field name="state">code</field>
<field name="code">model.autovacuum()</field>
</record>
<record id="ir_cron_autovacuum_failed_queue_jobs" model="ir.cron">
<field name="name">AutoVacuum Failed Jobs</field>
<field ref="model_queue_job" name="model_id" />
<field eval="True" name="active" />
<field name="user_id" ref="base.user_root" />
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="numbercall">-1</field>
<field eval="False" name="doall" />
<field name="state">code</field>
<field name="code">model.autovacuum(vacuum_failed=True)</field>
</record>
</data>
<data noupdate="0">
<record model="queue.job.channel" id="channel_root">
Expand Down
4 changes: 4 additions & 0 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ class Delayable:
"description",
"channel",
"identity_key",
"retryable_exceptions",
)
__slots__ = _properties + (
"recordset",
Expand All @@ -466,6 +467,7 @@ def __init__(
description=None,
channel=None,
identity_key=None,
retryable_exceptions=None,
):
self._graph = DelayableGraph()
self._graph.add_vertex(self)
Expand All @@ -478,6 +480,7 @@ def __init__(
self.description = description
self.channel = channel
self.identity_key = identity_key
self.retryable_exceptions = retryable_exceptions

self._job_method = None
self._job_args = ()
Expand Down Expand Up @@ -547,6 +550,7 @@ def _build_job(self):
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
retryable_exceptions=self.retryable_exceptions,
)
return self._generated_job

Expand Down
23 changes: 23 additions & 0 deletions queue_job/exception.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Copyright 2012-2016 Camptocamp
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
from enum import Enum

from odoo.exceptions import CacheMiss
from psycopg2.errors import (
LockNotAvailable,
SerializationFailure,
UniqueViolation,
)
from requests.exceptions import ConnectionError


class BaseQueueJobError(Exception):
Expand Down Expand Up @@ -41,3 +50,17 @@ class NothingToDoJob(JobError):

class ChannelNotFound(BaseQueueJobError):
"""A channel could not be found"""


class UnusedException(Exception):
"""An exception class that is never raised by any code anywhere"""


class StringifyExceptions(Enum):
UnusedException = UnusedException

UniqueViolation = UniqueViolation
LockNotAvailable = LockNotAvailable
SerializationFailure = SerializationFailure
CacheMiss = CacheMiss
ConnectionError = ConnectionError
39 changes: 30 additions & 9 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@

import odoo

from .exception import FailedJobError, NoSuchJobError, RetryableJobError
from .exception import (
FailedJobError,
NoSuchJobError,
RetryableJobError,
StringifyExceptions,
)

WAIT_DEPENDENCIES = "wait_dependencies"
PENDING = "pending"
Expand Down Expand Up @@ -281,6 +286,7 @@ def _load_from_db_record(cls, job_db_record):
job_.exc_info = stored.exc_info if stored.exc_info else None
job_.retry = stored.retry
job_.max_retries = stored.max_retries
job_.retryable_exceptions = stored.retryable_exceptions
if stored.company_id:
job_.company_id = stored.company_id.id
job_.identity_key = stored.identity_key
Expand Down Expand Up @@ -390,6 +396,7 @@ def __init__(
description=None,
channel=None,
identity_key=None,
retryable_exceptions=None,
):
"""Create a Job

Expand Down Expand Up @@ -466,6 +473,7 @@ def __init__(

self.date_created = datetime.now()
self._description = description
self.retryable_exceptions = retryable_exceptions

if isinstance(identity_key, str):
self._identity_key = identity_key
Expand Down Expand Up @@ -507,29 +515,41 @@ def add_depends(self, jobs):
if any(j.state != DONE for j in jobs):
self.state = WAIT_DEPENDENCIES

def failed_retries(self):
type_, value, traceback = sys.exc_info()
# change the exception type but keep the original
# traceback and message:
# http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/
return FailedJobError(
"Max. retries (%d) reached: %s" % (self.max_retries, value or type_)
)

def perform(self):
"""Execute the job.

The job is executed with the user which has initiated it.
"""
self.retry += 1
if self.retryable_exceptions:
expected_errors = tuple(
[StringifyExceptions[exc].value for exc in self.retryable_exceptions]
)
else:
expected_errors = (StringifyExceptions.UnusedException.value,)
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
except expected_errors as err:
if self.max_retries and self.retry >= self.max_retries:
raise self.failed_retries()
raise RetryableJobError(msg="Postponed, %s" % str(err))
except RetryableJobError as err:
if err.ignore_retry:
self.retry -= 1
raise
elif not self.max_retries: # infinite retries
raise
elif self.retry >= self.max_retries:
type_, value, traceback = sys.exc_info()
# change the exception type but keep the original
# traceback and message:
# http://blog.ianbicking.org/2007/09/12/re-raising-exceptions/
new_exc = FailedJobError(
"Max. retries (%d) reached: %s" % (self.max_retries, value or type_)
)
raise new_exc from err
raise self.failed_retries() from err
raise

return self.result
Expand Down Expand Up @@ -646,6 +666,7 @@ def _store_values(self, create=False):
"records": self.recordset,
"args": self.args,
"kwargs": self.kwargs,
"retryable_exceptions": self.retryable_exceptions,
}
)

Expand Down
38 changes: 34 additions & 4 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@
import threading
import time
from contextlib import closing, contextmanager
from urllib.parse import urlparse

import psycopg2
import requests
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from psycopg2.errors import UndefinedTable

import odoo
from odoo.tools import config
Expand Down Expand Up @@ -365,13 +367,38 @@ def __init__(
self._stop = False
self._stop_pipe = os.pipe()

@classmethod
def get_web_base_url(cls):
scheme, hostname = None, None
for db_name in cls.get_db_names():
db = Database(db_name)
with closing(db.conn.cursor()) as cr:
try:
cr.execute(
"SELECT value FROM ir_config_parameter WHERE key='web.base.url' limit 1"
)
res = cr.fetchone()
if res:
url = urlparse(res[0])
scheme, hostname = url.scheme, url.hostname
except UndefinedTable:
_logger.warning("No ir_config_parameter table - maybe this is the first run with -i option")
except Exception:
_logger.exception("Getting web.base.url failed")
db.close()
return scheme, hostname

@classmethod
def from_environ_or_config(cls):
scheme = os.environ.get("ODOO_QUEUE_JOB_SCHEME") or queue_job_config.get(
"scheme"
web_base_scheme, web_base_host = cls.get_web_base_url()
scheme = (
web_base_scheme
or os.environ.get("ODOO_QUEUE_JOB_SCHEME")
or queue_job_config.get("scheme")
)
host = (
os.environ.get("ODOO_QUEUE_JOB_HOST")
web_base_host
or os.environ.get("ODOO_QUEUE_JOB_HOST")
or queue_job_config.get("host")
or config["http_interface"]
)
Expand All @@ -395,7 +422,8 @@ def from_environ_or_config(cls):
)
return runner

def get_db_names(self):
@staticmethod
def get_db_names():
if config["db_name"]:
db_names = config["db_name"].split(",")
else:
Expand Down Expand Up @@ -516,6 +544,8 @@ def run(self):
except InterruptedError:
# Interrupted system call, i.e. KeyboardInterrupt during select
self.stop()
except OSError:
self.stop()
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
Expand Down
2 changes: 2 additions & 0 deletions queue_job/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def delayable(
description=None,
channel=None,
identity_key=None,
retryable_exceptions=None,
):
"""Return a ``Delayable``

Expand Down Expand Up @@ -143,6 +144,7 @@ def delayable(
description=description,
channel=channel,
identity_key=identity_key,
retryable_exceptions=retryable_exceptions,
)

def _patch_job_auto_delay(self, method_name, context_key=None):
Expand Down
56 changes: 34 additions & 22 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

import logging
import random
from datetime import datetime, timedelta
from datetime import timedelta

from odoo import _, api, exceptions, fields, models
from odoo.addons.base_sparse_field.models.fields import Serialized
from odoo.osv import expression
from odoo.tools import config, html_escape

from odoo.addons.base_sparse_field.models.fields import Serialized

from ..delay import Graph
from ..exception import JobError
from ..fields import JobSerialized
Expand All @@ -33,7 +32,11 @@ class QueueJob(models.Model):

_name = "queue.job"
_description = "Queue Job"
_inherit = ["mail.thread", "mail.activity.mixin"]
_inherit = [
"mail.thread",
"mail.activity.mixin",
"alfaleads_utils.autovacuum_mixin",
]
_log_access = False

_order = "date_created DESC, date_done DESC"
Expand Down Expand Up @@ -129,6 +132,7 @@ class QueueJob(models.Model):

identity_key = fields.Char(readonly=True)
worker_pid = fields.Integer(readonly=True)
retryable_exceptions = JobSerialized(readonly=True, base_type=list)

def init(self):
self._cr.execute(
Expand Down Expand Up @@ -390,32 +394,40 @@ def _needaction_domain_get(self):
"""
return [("state", "=", "failed")]

def autovacuum(self):
def autovacuum(self, batch_size=1000, limit_batches=0, vacuum_failed=False):
"""Delete all jobs done based on the removal interval defined on the
channel

Called from a cron.
"""

for channel in self.env["queue.job.channel"].search([]):
deadline = datetime.now() - timedelta(days=int(channel.removal_interval))
while True:
jobs = self.search(
[
"|",
("date_done", "<=", deadline),
("date_cancelled", "<=", deadline),
("channel", "=", channel.complete_name),
],
limit=1000,
)
if jobs:
jobs.unlink()
if not config["test_enable"]:
self.env.cr.commit() # pylint: disable=E8102
else:
break
self._vacuum(
domain=self._get_vacuum_domain(vacuum_failed, channel),
batch_size=batch_size,
limit_batches=limit_batches,
)
return True

@staticmethod
def _get_vacuum_domain(vacuum_failed, channel):
deadline = fields.Datetime.now() - timedelta(days=int(channel.removal_interval))
if vacuum_failed:
domain = [
("state", "=", FAILED),
("date_created", "<=", deadline),
("channel", "=", channel.complete_name),
]
else:
domain = [
"|",
("date_done", "<=", deadline),
("date_cancelled", "<=", deadline),
("channel", "=", channel.complete_name),
]

return domain

def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0):
"""Fix jobs that are in a bad states

Expand Down
Loading