diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 0daada1821..9a67722bc7 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -8,7 +8,7 @@ "license": "LGPL-3", "category": "Generic Modules", "depends": ["mail", "base_sparse_field", "web"], - "external_dependencies": {"python": ["requests"]}, + "external_dependencies": {"python": ["requests", "aiohttp"]}, "data": [ "security/security.xml", "security/ir.model.access.csv", diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 47417caa4f..5b5340e4ce 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -108,28 +108,6 @@ * Tip: to enable debug logging for the queue job, use ``--log-handler=odoo.addons.queue_job:DEBUG`` -Caveat ------- - -* After creating a new database or installing queue_job on an - existing database, Odoo must be restarted for the runner to detect it. - -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -139,16 +117,18 @@ of running Odoo is obviously not for production purposes. """ +import asyncio import datetime import logging import os import selectors +import socket import threading import time from contextlib import closing, contextmanager +import aiohttp import psycopg2 -import requests from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import odoo @@ -197,7 +177,7 @@ def _connection_info_for(db_name): for p in ("host", "port", "user", "password"): cfg = os.environ.get( - "ODOO_QUEUE_JOB_JOBRUNNER_DB_%s" % p.upper() + f"ODOO_QUEUE_JOB_JOBRUNNER_DB_{p.upper()}" ) or queue_job_config.get("jobrunner_db_" + p) if cfg: @@ -206,57 +186,66 @@ def _connection_info_for(db_name): return connection_info -def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, +async def _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout=5 +): + async def set_job_pending(): + try: + connection_info = _connection_info_for(db_name) + conn = psycopg2.connect(**connection_info) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with closing(conn.cursor()) as cr: + cr.execute( + "UPDATE queue_job SET state=%s, " + "date_enqueued=NULL, date_started=NULL " + "WHERE uuid=%s and state=%s " + "RETURNING uuid", + (PENDING, job_uuid, ENQUEUED), ) - - # TODO: better way to HTTP GET asynchronously (grequest, ...)? - # if this was python3 I would be doing this with - # asyncio, aiohttp and aiopg - def urlopen(): - url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( - scheme, host, port, db_name, job_uuid - ) + if cr.fetchone(): + _logger.warning( + "state of job %s was reset from %s to %s", + job_uuid, + ENQUEUED, + PENDING, + ) + except Exception as e: + _logger.error(f"Failed to set job {job_uuid} to pending: {e}") + + url = f"{scheme}://{host}:{port}/queue_job/runjob?db={db_name}&job_uuid={job_uuid}" + auth = aiohttp.BasicAuth(user, password) if user else None + + async with aiohttp.ClientSession(auth=auth) as session: try: - auth = None - if user: - auth = (user, password) - # we are not interested in the result, so we set a short timeout - # but not too short so we trap and log hard configuration errors - response = requests.get(url, timeout=1, auth=auth) - - # raise_for_status will result in either nothing, a Client Error - # for HTTP Response codes between 400 and 500 or a Server Error - # for codes between 500 and 600 - response.raise_for_status() - except requests.Timeout: - set_job_pending() - except Exception: - _logger.exception("exception in GET %s", url) - set_job_pending() - - thread = threading.Thread(target=urlopen) - thread.daemon = True - thread.start() + async with session.get(url, timeout=timeout) as response: + response.raise_for_status() + except aiohttp.ClientTimeout: + _logger.warning(f"Timeout while accessing {url}") + await set_job_pending() + except aiohttp.ClientError as e: + _logger.error(f"ClientError for {url}: {e}") + await set_job_pending() + except Exception as e: + _logger.exception(f"Unhandled exception for {url}: {e}") + await set_job_pending() + + +def start_async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout=5 +): + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task( + _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout + ) + ) + else: + asyncio.run( + _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout + ) + ) class Database: @@ -353,6 +342,7 @@ def __init__( user=None, password=None, channel_config_string=None, + check_interval=60, # Add a parameter to control the check interval ): self.scheme = scheme self.host = host @@ -365,7 +355,71 @@ def __init__( self.channel_manager.simple_configure(channel_config_string) self.db_by_name = {} self._stop = False - self._stop_pipe = os.pipe() + # Initialize stop signals using a socket pair + self._stop_sock_recv, self._stop_sock_send = self._create_socket_pair() + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self.loop_thread.start() + # Add a parameter to control the check interval + self.check_interval = check_interval + self._new_db_check_thread = threading.Thread( + target=self._check_new_databases_periodically, daemon=True + ) + self._new_db_check_thread.start() # Start the thread here only + + def _check_new_databases_periodically(self): + while not self._stop: + try: + self.check_and_initialize_new_databases() + except Exception as e: + _logger.error(f"Error while checking new databases: {e}") + time.sleep(self.check_interval) + + def check_and_initialize_new_databases(self): + current_dbs = self.get_db_names() + known_dbs = set(self.db_by_name.keys()) + + new_dbs = set(current_dbs) - known_dbs + for db_name in new_dbs: + db = Database(db_name) + if db.has_queue_job: + self.db_by_name[db_name] = db + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db_name, *job_data) + _logger.info("queue job runner ready for new db %s", db_name) + + # Check if `queue_job` is installed on any known database that didn't have it + for db_name in known_dbs: + db = self.db_by_name[db_name] + if not db.has_queue_job: + if db._has_queue_job(): + db.has_queue_job = True + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db_name, *job_data) + _logger.info( + "queue job installed and runner ready for db %s", db_name + ) + + def _run_event_loop(self): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + + def _create_socket_pair(self): + # Method to create a socket pair; returns a tuple of (receiver, sender) + if hasattr(socket, "socketpair"): + return socket.socketpair() + else: + # Compatibility with Windows, manually creating a socket pair + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(("localhost", 0)) + server.listen(1) + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect(server.getsockname()) + conn, _ = server.accept() + server.close() + return conn, client @classmethod def from_environ_or_config(cls): @@ -431,7 +485,8 @@ def run_jobs(self): break _logger.info("asking Odoo to run job %s on db %s", job.uuid, job.db_name) self.db_by_name[job.db_name].set_job_enqueued(job.uuid) - _async_http_get( + self.loop.call_soon_threadsafe( + start_async_http_get, self.scheme, self.host, self.port, @@ -462,47 +517,33 @@ def process_notifications(self): self.channel_manager.remove_job(uuid) def wait_notification(self): - for db in self.db_by_name.values(): - if db.conn.notifies: - # something is going on in the queue, no need to wait - return - # wait for something to happen in the queue_job tables - # we'll select() on database connections and the stop pipe - conns = [db.conn for db in self.db_by_name.values()] - conns.append(self._stop_pipe[0]) - # look if the channels specify a wakeup time - wakeup_time = self.channel_manager.get_wakeup_time() - if not wakeup_time: - # this could very well be no timeout at all, because - # any activity in the job queue will wake us up, but - # let's have a timeout anyway, just to be safe - timeout = SELECT_TIMEOUT - else: - timeout = wakeup_time - _odoo_now() - # wait for a notification or a timeout; - # if timeout is negative (ie wakeup time in the past), - # do not wait; this should rarely happen - # because of how get_wakeup_time is designed; actually - # if timeout remains a large negative number, it is most - # probably a bug - _logger.debug("select() timeout: %.2f sec", timeout) - if timeout > 0: - if conns and not self._stop: - with select() as sel: - for conn in conns: - sel.register(conn, selectors.EVENT_READ) - events = sel.select(timeout=timeout) - for key, _mask in events: - if key.fileobj == self._stop_pipe[0]: - # stop-pipe is not a conn so doesn't need poll() - continue - key.fileobj.poll() + with selectors.DefaultSelector() as selector: + # Register the database connections and the stop socket + for db in self.db_by_name.values(): + selector.register(db.conn, selectors.EVENT_READ) + selector.register(self._stop_sock_recv, selectors.EVENT_READ) + + wakeup_time = self.channel_manager.get_wakeup_time() + timeout = ( + max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT + ) + + events = selector.select(timeout) + for key, _ in events: + if key.fileobj is self._stop_sock_recv: + self._stop_sock_recv.recv(1024) # Clear the socket buffer + self.stop() + else: + key.fileobj.poll() # this will trigger notifications def stop(self): _logger.info("graceful stop requested") self._stop = True - # wakeup the select() in wait_notification - os.write(self._stop_pipe[1], b".") + self.loop.call_soon_threadsafe(self.loop.stop) + self._stop_sock_send.send(b"stop") + # Ensure the new DB check thread is also stopped + if self._new_db_check_thread.is_alive(): + self._new_db_check_thread.join() def run(self): _logger.info("starting") @@ -510,11 +551,8 @@ def run(self): # outer loop does exception recovery try: _logger.info("initializing database connections") - # TODO: how to detect new databases or databases - # on which queue_job is installed after server start? self.initialize_databases() _logger.info("database connections ready") - # inner loop does the normal processing while not self._stop: self.process_notifications() self.run_jobs() @@ -531,4 +569,5 @@ def run(self): self.close_databases() time.sleep(ERROR_RECOVERY_DELAY) self.close_databases(remove_jobs=False) + self.loop.call_soon_threadsafe(self.loop.close) _logger.info("stopped") diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index c6486e27ef..3c9fbf8675 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -3,8 +3,134 @@ # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API -from odoo.addons.queue_job.jobrunner import runner +import os +import socket +import threading +from unittest.mock import MagicMock, patch -from .common import load_doctests +from odoo.tests import common -load_tests = load_doctests(runner) +from ..jobrunner.runner import ( + ENQUEUED, + QueueJobRunner, + _channels, + _connection_info_for, + start_async_http_get, +) +from .common import JobMixin + + +class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): + def setUp(self): + super().setUp() + with patch.object(QueueJobRunner, "_run_event_loop"), patch.object( + threading.Thread, "start" + ): + self.runner = QueueJobRunner() + + def test_channels_from_env(self): + with patch.dict(os.environ, {"ODOO_QUEUE_JOB_CHANNELS": "root:4"}): + self.assertEqual(_channels(), "root:4") + + def test_channels_default(self): + with patch.dict(os.environ, {}, clear=True): + self.assertEqual(_channels(), "root:1") + + def test_connection_info_for(self): + with patch.dict( + os.environ, {"ODOO_QUEUE_JOB_JOBRUNNER_DB_HOST": "custom_host"} + ): + with patch("odoo.sql_db.connection_info_for") as mock_connection_info_for: + mock_connection_info_for.return_value = ("db_name", {}) + connection_info = _connection_info_for("test_db") + self.assertEqual(connection_info["host"], "custom_host") + + def test_create_socket_pair(self): + recv, send = self.runner._create_socket_pair() + self.assertIsInstance(recv, socket.socket) + self.assertIsInstance(send, socket.socket) + + def test_run_jobs(self): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + self.runner.db_by_name = {"test_db": mock_conn} + mock_job = MagicMock() + mock_job.uuid = "test_uuid" + mock_job.db_name = "test_db" + with patch.object( + self.runner.channel_manager, "get_jobs_to_run", return_value=[mock_job] + ): + with patch( + "odoo.addons.queue_job.jobrunner._async_http_get" + ) as mock_async_get: + self.runner.run_jobs() + mock_conn.cursor().execute.assert_called_with( + """ + UPDATE queue_job + SET state=%s, + date_enqueued=date_trunc('seconds', now() at time zone 'utc') + WHERE uuid=%s + """, + (ENQUEUED, "test_uuid"), + ) + mock_async_get.assert_called_once_with( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + + def test_wait_notification(self): + with patch("time.sleep", return_value=None): + mock_conn = MagicMock() + self.runner.db_by_name = {"test_db": mock_conn} + mock_conn.poll.return_value = 1 + self.runner.wait_notification() + mock_conn.poll.assert_called_once() + + def test_run(self): + with patch("time.sleep", return_value=None): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + with ( + patch.object(self.runner, "initialize_databases") as mock_init, + patch.object(self.runner, "close_databases") as mock_close, + ): + self.runner.run() + mock_init.assert_called_once() + mock_close.assert_called_once() + + def test_stop(self): + with patch.object(self.runner, "loop"), patch.object( + self.runner, "_stop_sock_send" + ), patch.object(self.runner, "_new_db_check_thread"): + self.runner.stop() + self.assertTrue(self.runner._stop) + recv, send = self.runner._create_socket_pair() + self.assertTrue(send.send(b"stop")) + + def test_start_async_http_get_event_loop_running(self): + with patch( + "odoo.addons.queue_job.jobrunner.asyncio.get_event_loop" + ) as mock_get_loop: + mock_loop = MagicMock() + mock_loop.is_running.return_value = True + mock_get_loop.return_value = mock_loop + start_async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_loop.create_task.assert_called_once() + + def test_start_async_http_get_event_loop_not_running(self): + with patch("odoo.addons.queue_job.jobrunner.asyncio.run") as mock_run: + start_async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_run.assert_called_once() + + def test_run_event_loop_start_stop(self): + runner = QueueJobRunner() + runner.loop.call_soon_threadsafe = MagicMock() + runner.loop.stop = MagicMock() + runner._stop = True + runner._run_event_loop() + runner.loop.stop.assert_called_once() diff --git a/requirements.txt b/requirements.txt index b4d39fb9e0..2e94c64c60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ # generated from manifests external_dependencies +aiohttp requests