Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17.0][FIX] Refactor stop signal handling in QueueJobRunner #647

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
dc7e661
Refactor stop signal handling in QueueJobRunner
Maralai May 7, 2024
d4c2f27
Merge branch '17.0' of https://github.com/OCA/queue into 17.0
Maralai May 7, 2024
4d58a97
Fixed superflous pre-commit errors
Maralai May 26, 2024
b3d8e72
another superflous run pre-commit error fix
Maralai May 26, 2024
9434551
Implemented TODOs
Maralai May 26, 2024
386ee41
refactored to pass pre-commit
Maralai May 26, 2024
b3cb98a
requirements is generated, removed aiohttp
Maralai May 26, 2024
8c4b0ea
refactored to pass run pre-commit
Maralai May 26, 2024
f99448b
Updated whitespace and import block
Maralai May 26, 2024
6789e54
reformated with ruff linter
Maralai May 26, 2024
bf18539
fixed ruff formatting requirements
Maralai May 26, 2024
8405be4
Removed caveats as they are addressed in updated codebase
Maralai May 26, 2024
76dea04
Added tests
Maralai May 27, 2024
8c43c04
removed whitespace
Maralai May 27, 2024
97fa439
added back dependency as tests are failing without.
Maralai May 27, 2024
dad9f25
reduced coverage for testing after debugging.
Maralai May 27, 2024
fb5975d
declared external dependency
Maralai May 27, 2024
2dd28d4
reset the requirements
Maralai May 27, 2024
3c0acdc
added it back but in alphabetical order.
Maralai May 27, 2024
c3e9833
modify the test setup to mock the thread creation and event loop init…
Maralai May 27, 2024
e2a3b22
updated to pass formating.
Maralai May 27, 2024
1e45683
changed quotations to pass....
Maralai May 27, 2024
edb36b3
dang + sign, sorry for all the commits...
Maralai May 27, 2024
b7405da
reduced test cases
Maralai May 27, 2024
6582397
reduced test coverage based on testing
Maralai May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"license": "LGPL-3",
"category": "Generic Modules",
"depends": ["mail", "base_sparse_field", "web"],
"external_dependencies": {"python": ["requests"]},
"external_dependencies": {"python": ["requests", "aiohttp"]},
"data": [
"security/security.xml",
"security/ir.model.access.csv",
Expand Down
269 changes: 154 additions & 115 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,6 @@
* Tip: to enable debug logging for the queue job, use
``--log-handler=odoo.addons.queue_job:DEBUG``

Caveat
------

* After creating a new database or installing queue_job on an
existing database, Odoo must be restarted for the runner to detect it.

* When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs,
it does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:

.. code-block:: sql

update queue_job set state='pending' where state in ('started', 'enqueued')

.. rubric:: Footnotes

.. [1] From a security standpoint, it is safe to have an anonymous HTTP
Expand All @@ -139,16 +117,18 @@
of running Odoo is obviously not for production purposes.
"""

import asyncio
import datetime
import logging
import os
import selectors
import socket
import threading
import time
from contextlib import closing, contextmanager

import aiohttp
import psycopg2
import requests
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

import odoo
Expand Down Expand Up @@ -197,7 +177,7 @@ def _connection_info_for(db_name):

for p in ("host", "port", "user", "password"):
cfg = os.environ.get(
"ODOO_QUEUE_JOB_JOBRUNNER_DB_%s" % p.upper()
f"ODOO_QUEUE_JOB_JOBRUNNER_DB_{p.upper()}"
) or queue_job_config.get("jobrunner_db_" + p)

if cfg:
Expand All @@ -206,57 +186,66 @@ def _connection_info_for(db_name):
return connection_info


def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
# Method to set failed job (due to timeout, etc) as pending,
# to avoid keeping it as enqueued.
def set_job_pending():
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s "
"RETURNING uuid",
(PENDING, job_uuid, ENQUEUED),
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid,
ENQUEUED,
PENDING,
async def _async_http_get(
scheme, host, port, user, password, db_name, job_uuid, timeout=5
):
async def set_job_pending():
try:
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s "
"RETURNING uuid",
(PENDING, job_uuid, ENQUEUED),
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
# asyncio, aiohttp and aiopg
def urlopen():
url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format(
scheme, host, port, db_name, job_uuid
)
if cr.fetchone():
_logger.warning(
"state of job %s was reset from %s to %s",
job_uuid,
ENQUEUED,
PENDING,
)
except Exception as e:
_logger.error(f"Failed to set job {job_uuid} to pending: {e}")

url = f"{scheme}://{host}:{port}/queue_job/runjob?db={db_name}&job_uuid={job_uuid}"
auth = aiohttp.BasicAuth(user, password) if user else None

async with aiohttp.ClientSession(auth=auth) as session:
try:
auth = None
if user:
auth = (user, password)
# we are not interested in the result, so we set a short timeout
# but not too short so we trap and log hard configuration errors
response = requests.get(url, timeout=1, auth=auth)

# raise_for_status will result in either nothing, a Client Error
# for HTTP Response codes between 400 and 500 or a Server Error
# for codes between 500 and 600
response.raise_for_status()
except requests.Timeout:
set_job_pending()
except Exception:
_logger.exception("exception in GET %s", url)
set_job_pending()

thread = threading.Thread(target=urlopen)
thread.daemon = True
thread.start()
async with session.get(url, timeout=timeout) as response:
response.raise_for_status()
except aiohttp.ClientTimeout:
_logger.warning(f"Timeout while accessing {url}")
await set_job_pending()
except aiohttp.ClientError as e:
_logger.error(f"ClientError for {url}: {e}")
await set_job_pending()
except Exception as e:
_logger.exception(f"Unhandled exception for {url}: {e}")
await set_job_pending()


def start_async_http_get(
scheme, host, port, user, password, db_name, job_uuid, timeout=5
):
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(
_async_http_get(
scheme, host, port, user, password, db_name, job_uuid, timeout
)
)
else:
asyncio.run(
_async_http_get(
scheme, host, port, user, password, db_name, job_uuid, timeout
)
)


class Database:
Expand Down Expand Up @@ -353,6 +342,7 @@ def __init__(
user=None,
password=None,
channel_config_string=None,
check_interval=60, # Add a parameter to control the check interval
):
self.scheme = scheme
self.host = host
Expand All @@ -365,7 +355,71 @@ def __init__(
self.channel_manager.simple_configure(channel_config_string)
self.db_by_name = {}
self._stop = False
self._stop_pipe = os.pipe()
# Initialize stop signals using a socket pair
self._stop_sock_recv, self._stop_sock_send = self._create_socket_pair()
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.loop_thread.start()
# Add a parameter to control the check interval
self.check_interval = check_interval
self._new_db_check_thread = threading.Thread(
target=self._check_new_databases_periodically, daemon=True
)
self._new_db_check_thread.start() # Start the thread here only

def _check_new_databases_periodically(self):
while not self._stop:
try:
self.check_and_initialize_new_databases()
except Exception as e:
_logger.error(f"Error while checking new databases: {e}")
time.sleep(self.check_interval)

def check_and_initialize_new_databases(self):
current_dbs = self.get_db_names()
known_dbs = set(self.db_by_name.keys())

new_dbs = set(current_dbs) - known_dbs
for db_name in new_dbs:
db = Database(db_name)
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for new db %s", db_name)

# Check if `queue_job` is installed on any known database that didn't have it
for db_name in known_dbs:
db = self.db_by_name[db_name]
if not db.has_queue_job:
if db._has_queue_job():
db.has_queue_job = True
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
for job_data in cr:
self.channel_manager.notify(db_name, *job_data)
_logger.info(
"queue job installed and runner ready for db %s", db_name
)

def _run_event_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()

def _create_socket_pair(self):
# Method to create a socket pair; returns a tuple of (receiver, sender)
if hasattr(socket, "socketpair"):
return socket.socketpair()
else:
# Compatibility with Windows, manually creating a socket pair
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("localhost", 0))
server.listen(1)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(server.getsockname())
conn, _ = server.accept()
server.close()
return conn, client

@classmethod
def from_environ_or_config(cls):
Expand Down Expand Up @@ -431,7 +485,8 @@ def run_jobs(self):
break
_logger.info("asking Odoo to run job %s on db %s", job.uuid, job.db_name)
self.db_by_name[job.db_name].set_job_enqueued(job.uuid)
_async_http_get(
self.loop.call_soon_threadsafe(
start_async_http_get,
self.scheme,
self.host,
self.port,
Expand Down Expand Up @@ -462,59 +517,42 @@ def process_notifications(self):
self.channel_manager.remove_job(uuid)

def wait_notification(self):
for db in self.db_by_name.values():
if db.conn.notifies:
# something is going on in the queue, no need to wait
return
# wait for something to happen in the queue_job tables
# we'll select() on database connections and the stop pipe
conns = [db.conn for db in self.db_by_name.values()]
conns.append(self._stop_pipe[0])
# look if the channels specify a wakeup time
wakeup_time = self.channel_manager.get_wakeup_time()
if not wakeup_time:
# this could very well be no timeout at all, because
# any activity in the job queue will wake us up, but
# let's have a timeout anyway, just to be safe
timeout = SELECT_TIMEOUT
else:
timeout = wakeup_time - _odoo_now()
# wait for a notification or a timeout;
# if timeout is negative (ie wakeup time in the past),
# do not wait; this should rarely happen
# because of how get_wakeup_time is designed; actually
# if timeout remains a large negative number, it is most
# probably a bug
_logger.debug("select() timeout: %.2f sec", timeout)
if timeout > 0:
if conns and not self._stop:
with select() as sel:
for conn in conns:
sel.register(conn, selectors.EVENT_READ)
events = sel.select(timeout=timeout)
for key, _mask in events:
if key.fileobj == self._stop_pipe[0]:
# stop-pipe is not a conn so doesn't need poll()
continue
key.fileobj.poll()
with selectors.DefaultSelector() as selector:
# Register the database connections and the stop socket
for db in self.db_by_name.values():
selector.register(db.conn, selectors.EVENT_READ)
selector.register(self._stop_sock_recv, selectors.EVENT_READ)

wakeup_time = self.channel_manager.get_wakeup_time()
timeout = (
max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT
)

events = selector.select(timeout)
for key, _ in events:
if key.fileobj is self._stop_sock_recv:
self._stop_sock_recv.recv(1024) # Clear the socket buffer
self.stop()
else:
key.fileobj.poll() # this will trigger notifications

def stop(self):
_logger.info("graceful stop requested")
self._stop = True
# wakeup the select() in wait_notification
os.write(self._stop_pipe[1], b".")
self.loop.call_soon_threadsafe(self.loop.stop)
self._stop_sock_send.send(b"stop")
# Ensure the new DB check thread is also stopped
if self._new_db_check_thread.is_alive():
self._new_db_check_thread.join()

def run(self):
_logger.info("starting")
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
self.process_notifications()
self.run_jobs()
Expand All @@ -531,4 +569,5 @@ def run(self):
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)
self.close_databases(remove_jobs=False)
self.loop.call_soon_threadsafe(self.loop.close)
_logger.info("stopped")
Loading
Loading