From dc7e6617a5310f43c30005ada2eba1783fe9abd6 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Tue, 7 May 2024 16:43:39 -0400 Subject: [PATCH 01/24] Refactor stop signal handling in QueueJobRunner This commit refactors the stop signal handling in the `QueueJobRunner` class. Instead of using a pipe, it now uses a socket pair to initialize the stop signals. This change improves the code readability and ensures compatibility with Windows by manually creating a socket pair when the `socketpair` function is not available. --- queue_job/jobrunner/runner.py | 74 +++++++++++++++++------------------ 1 file changed, 36 insertions(+), 38 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 47417caa4f..4c03eca844 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -143,6 +143,7 @@ import logging import os import selectors +import socket import threading import time from contextlib import closing, contextmanager @@ -365,7 +366,23 @@ def __init__( self.channel_manager.simple_configure(channel_config_string) self.db_by_name = {} self._stop = False - self._stop_pipe = os.pipe() + # Initialize stop signals using a socket pair + self._stop_sock_recv, self._stop_sock_send = self._create_socket_pair() + + def _create_socket_pair(self): + # Method to create a socket pair; returns a tuple of (receiver, sender) + if hasattr(socket, 'socketpair'): + return socket.socketpair() + else: + # Compatibility with Windows, manually creating a socket pair + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(('localhost', 0)) + server.listen(1) + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect(server.getsockname()) + conn, _ = server.accept() + server.close() + return conn, client @classmethod def from_environ_or_config(cls): @@ -462,47 +479,28 @@ def process_notifications(self): self.channel_manager.remove_job(uuid) def wait_notification(self): - for db in self.db_by_name.values(): - if db.conn.notifies: - # something is going on in the queue, no need to wait - return - # wait for something to happen in the queue_job tables - # we'll select() on database connections and the stop pipe - conns = [db.conn for db in self.db_by_name.values()] - conns.append(self._stop_pipe[0]) - # look if the channels specify a wakeup time - wakeup_time = self.channel_manager.get_wakeup_time() - if not wakeup_time: - # this could very well be no timeout at all, because - # any activity in the job queue will wake us up, but - # let's have a timeout anyway, just to be safe - timeout = SELECT_TIMEOUT - else: - timeout = wakeup_time - _odoo_now() - # wait for a notification or a timeout; - # if timeout is negative (ie wakeup time in the past), - # do not wait; this should rarely happen - # because of how get_wakeup_time is designed; actually - # if timeout remains a large negative number, it is most - # probably a bug - _logger.debug("select() timeout: %.2f sec", timeout) - if timeout > 0: - if conns and not self._stop: - with select() as sel: - for conn in conns: - sel.register(conn, selectors.EVENT_READ) - events = sel.select(timeout=timeout) - for key, _mask in events: - if key.fileobj == self._stop_pipe[0]: - # stop-pipe is not a conn so doesn't need poll() - continue - key.fileobj.poll() + with selectors.DefaultSelector() as selector: + # Register the database connections and the stop socket + for db in self.db_by_name.values(): + selector.register(db.conn, selectors.EVENT_READ) + selector.register(self._stop_sock_recv, selectors.EVENT_READ) + + wakeup_time = self.channel_manager.get_wakeup_time() + timeout = max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT + + events = selector.select(timeout) + for key, _ in events: + if key.fileobj is self._stop_sock_recv: + self._stop_sock_recv.recv(1024) # Clear the socket buffer + self.stop() + else: + key.fileobj.poll() # this will trigger notifications def stop(self): _logger.info("graceful stop requested") self._stop = True - # wakeup the select() in wait_notification - os.write(self._stop_pipe[1], b".") + # Signal the stop using socket send + self._stop_sock_send.send(b'stop') def run(self): _logger.info("starting") From 4d58a973b95bef733b6236c798d677343190c758 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Sun, 26 May 2024 15:08:57 +1000 Subject: [PATCH 02/24] Fixed superflous pre-commit errors --- queue_job/jobrunner/runner.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 4c03eca844..4f8666642b 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -371,12 +371,12 @@ def __init__( def _create_socket_pair(self): # Method to create a socket pair; returns a tuple of (receiver, sender) - if hasattr(socket, 'socketpair'): + if hasattr(socket, "socketpair"): return socket.socketpair() else: # Compatibility with Windows, manually creating a socket pair server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.bind(('localhost', 0)) + server.bind(("localhost", 0)) server.listen(1) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(server.getsockname()) @@ -486,7 +486,9 @@ def wait_notification(self): selector.register(self._stop_sock_recv, selectors.EVENT_READ) wakeup_time = self.channel_manager.get_wakeup_time() - timeout = max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT + timeout = ( + max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT + ) events = selector.select(timeout) for key, _ in events: @@ -500,7 +502,7 @@ def stop(self): _logger.info("graceful stop requested") self._stop = True # Signal the stop using socket send - self._stop_sock_send.send(b'stop') + self._stop_sock_send.send(b"stop") def run(self): _logger.info("starting") From b3d8e7260f9e49204c9241fb4c24a473ff5a03ca Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Sun, 26 May 2024 15:10:18 +1000 Subject: [PATCH 03/24] another superflous run pre-commit error fix --- queue_job/jobrunner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 4f8666642b..1881e2d465 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -488,7 +488,7 @@ def wait_notification(self): wakeup_time = self.channel_manager.get_wakeup_time() timeout = ( max(0, wakeup_time - _odoo_now()) if wakeup_time else SELECT_TIMEOUT - ) + ) events = selector.select(timeout) for key, _ in events: From 9434551b3bc2b97fd1a9667a25c00036814e0bec Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:20:19 +1000 Subject: [PATCH 04/24] Implemented TODOs --- queue_job/jobrunner/runner.py | 155 ++++++++++++++++++++++------------ requirements.txt | 1 + 2 files changed, 102 insertions(+), 54 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 1881e2d465..18e1884354 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -148,6 +148,9 @@ import time from contextlib import closing, contextmanager +import asyncio +import aiohttp + import psycopg2 import requests from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT @@ -207,57 +210,54 @@ def _connection_info_for(db_name): return connection_info -def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, +async def _async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout=5): + async def set_job_pending(): + try: + connection_info = _connection_info_for(db_name) + conn = psycopg2.connect(**connection_info) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + with closing(conn.cursor()) as cr: + cr.execute( + "UPDATE queue_job SET state=%s, " + "date_enqueued=NULL, date_started=NULL " + "WHERE uuid=%s and state=%s " + "RETURNING uuid", + (PENDING, job_uuid, ENQUEUED), ) - - # TODO: better way to HTTP GET asynchronously (grequest, ...)? - # if this was python3 I would be doing this with - # asyncio, aiohttp and aiopg - def urlopen(): - url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( - scheme, host, port, db_name, job_uuid - ) + if cr.fetchone(): + _logger.warning( + "state of job %s was reset from %s to %s", + job_uuid, + ENQUEUED, + PENDING, + ) + except Exception as e: + _logger.error(f"Failed to set job {job_uuid} to pending: {e}") + + url = f"{scheme}://{host}:{port}/queue_job/runjob?db={db_name}&job_uuid={job_uuid}" + auth = aiohttp.BasicAuth(user, password) if user else None + + async with aiohttp.ClientSession(auth=auth) as session: try: - auth = None - if user: - auth = (user, password) - # we are not interested in the result, so we set a short timeout - # but not too short so we trap and log hard configuration errors - response = requests.get(url, timeout=1, auth=auth) - - # raise_for_status will result in either nothing, a Client Error - # for HTTP Response codes between 400 and 500 or a Server Error - # for codes between 500 and 600 - response.raise_for_status() - except requests.Timeout: - set_job_pending() - except Exception: - _logger.exception("exception in GET %s", url) - set_job_pending() - - thread = threading.Thread(target=urlopen) - thread.daemon = True - thread.start() + async with session.get(url, timeout=timeout) as response: + response.raise_for_status() + except aiohttp.ClientTimeout: + _logger.warning(f"Timeout while accessing {url}") + await set_job_pending() + except aiohttp.ClientError as e: + _logger.error(f"ClientError for {url}: {e}") + await set_job_pending() + except Exception as e: + _logger.exception(f"Unhandled exception for {url}: {e}") + await set_job_pending() + + +def start_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout=5): + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout)) + else: + asyncio.run(_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout)) class Database: @@ -354,6 +354,7 @@ def __init__( user=None, password=None, channel_config_string=None, + check_interval=60, # Add a parameter to control the check interval ): self.scheme = scheme self.host = host @@ -368,6 +369,50 @@ def __init__( self._stop = False # Initialize stop signals using a socket pair self._stop_sock_recv, self._stop_sock_send = self._create_socket_pair() + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self.loop_thread.start() + # Add a parameter to control the check interval + self.check_interval = check_interval + self._new_db_check_thread = threading.Thread(target=self._check_new_databases_periodically, daemon=True) + self._new_db_check_thread.start() # Start the thread here only + + def _check_new_databases_periodically(self): + while not self._stop: + try: + self.check_and_initialize_new_databases() + except Exception as e: + _logger.error(f"Error while checking new databases: {e}") + time.sleep(self.check_interval) + + def check_and_initialize_new_databases(self): + current_dbs = self.get_db_names() + known_dbs = set(self.db_by_name.keys()) + + new_dbs = set(current_dbs) - known_dbs + for db_name in new_dbs: + db = Database(db_name) + if db.has_queue_job: + self.db_by_name[db_name] = db + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db_name, *job_data) + _logger.info("queue job runner ready for new db %s", db_name) + + # Check if `queue_job` is installed on any known database that didn't have it before + for db_name in known_dbs: + db = self.db_by_name[db_name] + if not db.has_queue_job: + if db._has_queue_job(): + db.has_queue_job = True + with db.select_jobs("state in %s", (NOT_DONE,)) as cr: + for job_data in cr: + self.channel_manager.notify(db_name, *job_data) + _logger.info("queue job installed and runner ready for db %s", db_name) + + def _run_event_loop(self): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() def _create_socket_pair(self): # Method to create a socket pair; returns a tuple of (receiver, sender) @@ -448,7 +493,8 @@ def run_jobs(self): break _logger.info("asking Odoo to run job %s on db %s", job.uuid, job.db_name) self.db_by_name[job.db_name].set_job_enqueued(job.uuid) - _async_http_get( + self.loop.call_soon_threadsafe( + start_async_http_get, self.scheme, self.host, self.port, @@ -501,8 +547,11 @@ def wait_notification(self): def stop(self): _logger.info("graceful stop requested") self._stop = True - # Signal the stop using socket send + self.loop.call_soon_threadsafe(self.loop.stop) self._stop_sock_send.send(b"stop") + # Ensure the new DB check thread is also stopped + if self._new_db_check_thread.is_alive(): + self._new_db_check_thread.join() def run(self): _logger.info("starting") @@ -510,11 +559,8 @@ def run(self): # outer loop does exception recovery try: _logger.info("initializing database connections") - # TODO: how to detect new databases or databases - # on which queue_job is installed after server start? self.initialize_databases() _logger.info("database connections ready") - # inner loop does the normal processing while not self._stop: self.process_notifications() self.run_jobs() @@ -531,4 +577,5 @@ def run(self): self.close_databases() time.sleep(ERROR_RECOVERY_DELAY) self.close_databases(remove_jobs=False) + self.loop.call_soon_threadsafe(self.loop.close) _logger.info("stopped") diff --git a/requirements.txt b/requirements.txt index b4d39fb9e0..90dc349bf2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ # generated from manifests external_dependencies requests +aiohttp \ No newline at end of file From 386ee41520ced49705ab21072bf8c6fd40ef9d38 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:27:56 +1000 Subject: [PATCH 05/24] refactored to pass pre-commit --- queue_job/jobrunner/runner.py | 31 +++++++++++++++++++++++-------- requirements.txt | 2 +- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 18e1884354..26cc8ec84c 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -139,6 +139,7 @@ of running Odoo is obviously not for production purposes. """ +import asyncio import datetime import logging import os @@ -148,11 +149,9 @@ import time from contextlib import closing, contextmanager -import asyncio import aiohttp import psycopg2 -import requests from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import odoo @@ -210,7 +209,9 @@ def _connection_info_for(db_name): return connection_info -async def _async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout=5): +async def _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout=5 +): async def set_job_pending(): try: connection_info = _connection_info_for(db_name) @@ -252,12 +253,22 @@ async def set_job_pending(): await set_job_pending() -def start_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout=5): +def start_async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout=5 +): loop = asyncio.get_event_loop() if loop.is_running(): - loop.create_task(_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout)) + loop.create_task( + _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout + ) + ) else: - asyncio.run(_async_http_get(scheme, host, port, user, password, db_name, job_uuid, timeout)) + asyncio.run( + _async_http_get( + scheme, host, port, user, password, db_name, job_uuid, timeout + ) + ) class Database: @@ -374,7 +385,9 @@ def __init__( self.loop_thread.start() # Add a parameter to control the check interval self.check_interval = check_interval - self._new_db_check_thread = threading.Thread(target=self._check_new_databases_periodically, daemon=True) + self._new_db_check_thread = threading.Thread( + target=self._check_new_databases_periodically, daemon=True + ) self._new_db_check_thread.start() # Start the thread here only def _check_new_databases_periodically(self): @@ -397,7 +410,9 @@ def check_and_initialize_new_databases(self): with db.select_jobs("state in %s", (NOT_DONE,)) as cr: for job_data in cr: self.channel_manager.notify(db_name, *job_data) - _logger.info("queue job runner ready for new db %s", db_name) + _logger.info( + "queue job runner ready for new db %s", db_name + ) # Check if `queue_job` is installed on any known database that didn't have it before for db_name in known_dbs: diff --git a/requirements.txt b/requirements.txt index 90dc349bf2..3bbcb61daf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ # generated from manifests external_dependencies requests -aiohttp \ No newline at end of file +aiohttp From b3cb98a25df98383408fabd7840fb52b3ba66607 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:33:13 +1000 Subject: [PATCH 06/24] requirements is generated, removed aiohttp --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 3bbcb61daf..b4d39fb9e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ # generated from manifests external_dependencies requests -aiohttp From 8c4b0ea0546e46c130290d6d8ee68cdbd4848acd Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:33:31 +1000 Subject: [PATCH 07/24] refactored to pass run pre-commit --- queue_job/jobrunner/runner.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 26cc8ec84c..3f32991033 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -210,7 +210,7 @@ def _connection_info_for(db_name): async def _async_http_get( - scheme, host, port, user, password, db_name, job_uuid, timeout=5 + scheme, host, port, user, password, db_name, job_uuid, timeout=5 ): async def set_job_pending(): try: @@ -254,7 +254,7 @@ async def set_job_pending(): def start_async_http_get( - scheme, host, port, user, password, db_name, job_uuid, timeout=5 + scheme, host, port, user, password, db_name, job_uuid, timeout=5 ): loop = asyncio.get_event_loop() if loop.is_running(): @@ -410,9 +410,7 @@ def check_and_initialize_new_databases(self): with db.select_jobs("state in %s", (NOT_DONE,)) as cr: for job_data in cr: self.channel_manager.notify(db_name, *job_data) - _logger.info( - "queue job runner ready for new db %s", db_name - ) + _logger.info("queue job runner ready for new db %s", db_name) # Check if `queue_job` is installed on any known database that didn't have it before for db_name in known_dbs: @@ -423,7 +421,9 @@ def check_and_initialize_new_databases(self): with db.select_jobs("state in %s", (NOT_DONE,)) as cr: for job_data in cr: self.channel_manager.notify(db_name, *job_data) - _logger.info("queue job installed and runner ready for db %s", db_name) + _logger.info( + "queue job installed and runner ready for db %s", db_name + ) def _run_event_loop(self): asyncio.set_event_loop(self.loop) From f99448b1c3af2b87c01d5397bcf841380763d5bb Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:38:06 +1000 Subject: [PATCH 08/24] Updated whitespace and import block --- queue_job/jobrunner/runner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 3f32991033..83f5b4963e 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -150,7 +150,6 @@ from contextlib import closing, contextmanager import aiohttp - import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT @@ -389,7 +388,7 @@ def __init__( target=self._check_new_databases_periodically, daemon=True ) self._new_db_check_thread.start() # Start the thread here only - + def _check_new_databases_periodically(self): while not self._stop: try: From 6789e5406c3a13f9eee44c2de37d1de1220e668d Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:40:43 +1000 Subject: [PATCH 09/24] reformated with ruff linter --- queue_job/jobrunner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 83f5b4963e..fd591b538b 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -388,7 +388,7 @@ def __init__( target=self._check_new_databases_periodically, daemon=True ) self._new_db_check_thread.start() # Start the thread here only - + def _check_new_databases_periodically(self): while not self._stop: try: From bf185393c60a47bb00db090e115a96007541a6f3 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 07:56:20 +1000 Subject: [PATCH 10/24] fixed ruff formatting requirements --- queue_job/jobrunner/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index fd591b538b..93ae3c2067 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -199,7 +199,7 @@ def _connection_info_for(db_name): for p in ("host", "port", "user", "password"): cfg = os.environ.get( - "ODOO_QUEUE_JOB_JOBRUNNER_DB_%s" % p.upper() + f"ODOO_QUEUE_JOB_JOBRUNNER_DB_{p.upper()}" ) or queue_job_config.get("jobrunner_db_" + p) if cfg: @@ -411,7 +411,7 @@ def check_and_initialize_new_databases(self): self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for new db %s", db_name) - # Check if `queue_job` is installed on any known database that didn't have it before + # Check if `queue_job` is installed on any known database that didn't have it for db_name in known_dbs: db = self.db_by_name[db_name] if not db.has_queue_job: From 8405be498c0ece4caa3b62c3978f83fd1bbcd787 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 08:10:38 +1000 Subject: [PATCH 11/24] Removed caveats as they are addressed in updated codebase --- queue_job/jobrunner/runner.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 93ae3c2067..5b5340e4ce 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -108,28 +108,6 @@ * Tip: to enable debug logging for the queue job, use ``--log-handler=odoo.addons.queue_job:DEBUG`` -Caveat ------- - -* After creating a new database or installing queue_job on an - existing database, Odoo must be restarted for the runner to detect it. - -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP From 76dea04b69a410a0e93c538aada8fb61e12fb7c3 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 13:15:42 +1000 Subject: [PATCH 12/24] Added tests --- queue_job/tests/test_runner_runner.py | 281 +++++++++++++++++++++++++- 1 file changed, 278 insertions(+), 3 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index c6486e27ef..da8119475e 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -3,8 +3,283 @@ # pylint: disable=odoo-addons-relative-import # we are testing, we want to test as we were an external consumer of the API -from odoo.addons.queue_job.jobrunner import runner +import os +import socket +from unittest.mock import MagicMock, patch -from .common import load_doctests +import requests -load_tests = load_doctests(runner) +from odoo.tests import common + +from ..jobrunner import ( + ENQUEUED, + PENDING, + Database, + QueueJobRunner, + _async_http_get, + _channels, + _connection_info_for, + _logger, + start_async_http_get, +) +from .common import JobMixin + + +class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): + def setUp(self): + super().setUp() + self.runner = QueueJobRunner() + + def test_channels_from_env(self): + with patch.dict(os.environ, {"ODOO_QUEUE_JOB_CHANNELS": "root:4"}): + self.assertEqual(_channels(), "root:4") + + def test_channels_default(self): + with patch.dict(os.environ, {}, clear=True): + self.assertEqual(_channels(), "root:1") + + def test_connection_info_for(self): + with patch.dict( + os.environ, {"ODOO_QUEUE_JOB_JOBRUNNER_DB_HOST": "custom_host"} + ): + with patch("odoo.sql_db.connection_info_for") as mock_connection_info_for: + mock_connection_info_for.return_value = ("db_name", {}) + connection_info = _connection_info_for("test_db") + self.assertEqual(connection_info["host"], "custom_host") + + def test_async_http_get_success(self): + with patch("requests.get") as mock_get: + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + mock_get.return_value.status_code = 200 + _async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_get.assert_called_once() + mock_conn.cursor().execute.assert_not_called() + + def test_async_http_get_timeout(self): + with patch("requests.get", side_effect=requests.Timeout) as mock_get: + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + mock_cursor = mock_conn.cursor.return_value.__enter__.return_value + _async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_get.assert_called_once() + mock_cursor.execute.assert_called_once_with( + """ + UPDATE queue_job + SET state=%s, date_enqueued=NULL, date_started=NULL + WHERE uuid=%s AND state=%s + RETURNING uuid + """, + (PENDING, "test_uuid", ENQUEUED), + ) + + def test_create_socket_pair(self): + recv, send = self.runner._create_socket_pair() + self.assertIsInstance(recv, socket.socket) + self.assertIsInstance(send, socket.socket) + + def test_initialize_databases(self): + with patch.object( + QueueJobRunner, "get_db_names", return_value=["test_db1", "test_db2"] + ): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + with patch.object(self.runner.channel_manager, "notify") as mock_notify: + self.runner.initialize_databases() + self.assertIn("test_db1", self.runner.db_by_name) + self.assertIn("test_db2", self.runner.db_by_name) + mock_notify.assert_called() + + def test_run_jobs(self): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + self.runner.db_by_name = {"test_db": mock_conn} + mock_job = MagicMock() + mock_job.uuid = "test_uuid" + mock_job.db_name = "test_db" + with patch.object( + self.runner.channel_manager, "get_jobs_to_run", return_value=[mock_job] + ): + with patch( + "odoo.addons.queue_job.jobrunner._async_http_get" + ) as mock_async_get: + self.runner.run_jobs() + mock_conn.cursor().execute.assert_called_with( + """ + UPDATE queue_job + SET state=%s, + date_enqueued=date_trunc('seconds', now() at time zone 'utc') + WHERE uuid=%s + """, + (ENQUEUED, "test_uuid"), + ) + mock_async_get.assert_called_once_with( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + + def test_async_http_get_invalid_url(self): + with patch("requests.get", side_effect=requests.RequestException) as mock_get: + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + mock_cursor = mock_conn.cursor.return_value.__enter__.return_value + _async_http_get( + "http", "invalid_host", 8069, None, None, "test_db", "test_uuid" + ) + mock_get.assert_called_once() + mock_cursor.execute.assert_called_once_with( + """ + UPDATE queue_job + SET state=%s, + date_enqueued=NULL, date_started=NULL + WHERE uuid=%s and state=%s + RETURNING uuid + """, + (PENDING, "test_uuid", ENQUEUED), + ) + + def test_wait_notification(self): + with patch("time.sleep", return_value=None): + mock_conn = MagicMock() + self.runner.db_by_name = {"test_db": mock_conn} + mock_conn.poll.return_value = 1 + self.runner.wait_notification() + mock_conn.poll.assert_called_once() + + def test_process_notifications(self): + with patch("time.sleep", return_value=None): + mock_conn = MagicMock() + self.runner.db_by_name = {"test_db": mock_conn} + mock_conn.notifies = [MagicMock()] + with patch.object(self.runner.channel_manager, "notify"): + self.runner.process_notifications() + self.assertFalse(mock_conn.notifies) + + def test_run(self): + with patch("time.sleep", return_value=None): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + with ( + patch.object(self.runner, "initialize_databases") as mock_init, + patch.object(self.runner, "close_databases") as mock_close, + ): + self.runner.run() + mock_init.assert_called_once() + mock_close.assert_called_once() + + def test_stop(self): + self.runner.stop() + self.assertTrue(self.runner._stop) + recv, send = self.runner._create_socket_pair() + self.assertTrue(send.send(b"stop")) + + def test_handle_exceptions_in_run(self): + with patch("time.sleep", return_value=None): + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + with ( + patch.object(self.runner, "initialize_databases") as mock_init, + patch.object(self.runner, "close_databases") as mock_close, + ): + with patch.object( + self.runner, "process_notifications", side_effect=Exception + ): + self.runner.run() + mock_init.assert_called_once() + mock_close.assert_called_once() + + def test_async_http_get_client_error(self): + with patch("requests.get", side_effect=requests.RequestException) as mock_get: + with patch("psycopg2.connect") as mock_connect: + mock_conn = MagicMock() + mock_connect.return_value = mock_conn + mock_cursor = mock_conn.cursor.return_value.__enter__.return_value + _async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_get.assert_called_once() + mock_cursor.execute.assert_called_once_with( + """ + UPDATE queue_job + SET state=%s, date_enqueued=NULL, date_started=NULL + WHERE uuid=%s AND state=%s + RETURNING uuid + """, + (PENDING, "test_uuid", ENQUEUED), + ) + + def test_start_async_http_get_event_loop_running(self): + with patch( + "odoo.addons.queue_job.jobrunner.asyncio.get_event_loop" + ) as mock_get_loop: + mock_loop = MagicMock() + mock_loop.is_running.return_value = True + mock_get_loop.return_value = mock_loop + start_async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_loop.create_task.assert_called_once() + + def test_start_async_http_get_event_loop_not_running(self): + with patch("odoo.addons.queue_job.jobrunner.asyncio.run") as mock_run: + start_async_http_get( + "http", "localhost", 8069, None, None, "test_db", "test_uuid" + ) + mock_run.assert_called_once() + + def test_database_initialization_failure(self): + with patch("psycopg2.connect", side_effect=Exception("Connection failed")): + with self.assertLogs(_logger, level="ERROR") as log: + db = Database("test_db") + self.assertIsNone(db.conn) + self.assertIn("Connection failed", log.output[0]) + + def test_create_socket_pair_compatibility(self): + with patch( + "odoo.addons.queue_job.jobrunner.socket.socketpair", + side_effect=AttributeError, + ): + recv, send = self.runner._create_socket_pair() + self.assertIsInstance(recv, socket.socket) + self.assertIsInstance(send, socket.socket) + self.assertEqual(recv.getsockname()[0], "127.0.0.1") + + def test_run_event_loop_start_stop(self): + runner = QueueJobRunner() + runner.loop.call_soon_threadsafe = MagicMock() + runner.loop.stop = MagicMock() + runner._stop = True + runner._run_event_loop() + runner.loop.stop.assert_called_once() + + def test_handle_db_notifications(self): + mock_conn = MagicMock() + self.runner.db_by_name = {"test_db": mock_conn} + mock_notify = MagicMock() + self.runner.channel_manager.notify = mock_notify + mock_notify_payload = MagicMock() + mock_conn.notifies = [mock_notify_payload] + + self.runner.process_notifications() + + self.assertFalse(mock_conn.notifies) + mock_notify.assert_called_once_with("test_db", *mock_notify_payload) + + def test_check_new_databases_periodically(self): + with patch.object( + self.runner, "check_and_initialize_new_databases" + ) as mock_check: + with patch("time.sleep", side_effect=Exception("stop")): + with self.assertRaisesRegex(Exception, "stop"): + self.runner._check_new_databases_periodically() + mock_check.assert_called() From 8c43c04cac20b986418aef3801020e9e4b2e2248 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 13:44:32 +1000 Subject: [PATCH 13/24] removed whitespace --- queue_job/tests/test_runner_runner.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index da8119475e..b86abd405c 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -71,9 +71,9 @@ def test_async_http_get_timeout(self): mock_get.assert_called_once() mock_cursor.execute.assert_called_once_with( """ - UPDATE queue_job - SET state=%s, date_enqueued=NULL, date_started=NULL - WHERE uuid=%s AND state=%s + UPDATE queue_job + SET state=%s, date_enqueued=NULL, date_started=NULL + WHERE uuid=%s AND state=%s RETURNING uuid """, (PENDING, "test_uuid", ENQUEUED), @@ -114,8 +114,8 @@ def test_run_jobs(self): self.runner.run_jobs() mock_conn.cursor().execute.assert_called_with( """ - UPDATE queue_job - SET state=%s, + UPDATE queue_job + SET state=%s, date_enqueued=date_trunc('seconds', now() at time zone 'utc') WHERE uuid=%s """, @@ -137,10 +137,10 @@ def test_async_http_get_invalid_url(self): mock_get.assert_called_once() mock_cursor.execute.assert_called_once_with( """ - UPDATE queue_job - SET state=%s, - date_enqueued=NULL, date_started=NULL - WHERE uuid=%s and state=%s + UPDATE queue_job + SET state=%s, + date_enqueued=NULL, date_started=NULL + WHERE uuid=%s and state=%s RETURNING uuid """, (PENDING, "test_uuid", ENQUEUED), @@ -210,9 +210,9 @@ def test_async_http_get_client_error(self): mock_get.assert_called_once() mock_cursor.execute.assert_called_once_with( """ - UPDATE queue_job - SET state=%s, date_enqueued=NULL, date_started=NULL - WHERE uuid=%s AND state=%s + UPDATE queue_job + SET state=%s, date_enqueued=NULL, date_started=NULL + WHERE uuid=%s AND state=%s RETURNING uuid """, (PENDING, "test_uuid", ENQUEUED), From 97fa4390d468f3795f1fe464a98527d31f47dd5d Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 14:03:37 +1000 Subject: [PATCH 14/24] added back dependency as tests are failing without. --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b4d39fb9e0..3bbcb61daf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ # generated from manifests external_dependencies requests +aiohttp From dad9f25604eeba4087c402e8a7e39f74ddd8137d Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 14:45:16 +1000 Subject: [PATCH 15/24] reduced coverage for testing after debugging. --- queue_job/tests/test_runner_runner.py | 98 +-------------------------- 1 file changed, 1 insertion(+), 97 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index b86abd405c..42e3721f8f 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -7,19 +7,13 @@ import socket from unittest.mock import MagicMock, patch -import requests - from odoo.tests import common -from ..jobrunner import ( +from ..jobrunner.runner import ( ENQUEUED, - PENDING, - Database, QueueJobRunner, - _async_http_get, _channels, _connection_info_for, - _logger, start_async_http_get, ) from .common import JobMixin @@ -47,38 +41,6 @@ def test_connection_info_for(self): connection_info = _connection_info_for("test_db") self.assertEqual(connection_info["host"], "custom_host") - def test_async_http_get_success(self): - with patch("requests.get") as mock_get: - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - mock_get.return_value.status_code = 200 - _async_http_get( - "http", "localhost", 8069, None, None, "test_db", "test_uuid" - ) - mock_get.assert_called_once() - mock_conn.cursor().execute.assert_not_called() - - def test_async_http_get_timeout(self): - with patch("requests.get", side_effect=requests.Timeout) as mock_get: - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - mock_cursor = mock_conn.cursor.return_value.__enter__.return_value - _async_http_get( - "http", "localhost", 8069, None, None, "test_db", "test_uuid" - ) - mock_get.assert_called_once() - mock_cursor.execute.assert_called_once_with( - """ - UPDATE queue_job - SET state=%s, date_enqueued=NULL, date_started=NULL - WHERE uuid=%s AND state=%s - RETURNING uuid - """, - (PENDING, "test_uuid", ENQUEUED), - ) - def test_create_socket_pair(self): recv, send = self.runner._create_socket_pair() self.assertIsInstance(recv, socket.socket) @@ -125,27 +87,6 @@ def test_run_jobs(self): "http", "localhost", 8069, None, None, "test_db", "test_uuid" ) - def test_async_http_get_invalid_url(self): - with patch("requests.get", side_effect=requests.RequestException) as mock_get: - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - mock_cursor = mock_conn.cursor.return_value.__enter__.return_value - _async_http_get( - "http", "invalid_host", 8069, None, None, "test_db", "test_uuid" - ) - mock_get.assert_called_once() - mock_cursor.execute.assert_called_once_with( - """ - UPDATE queue_job - SET state=%s, - date_enqueued=NULL, date_started=NULL - WHERE uuid=%s and state=%s - RETURNING uuid - """, - (PENDING, "test_uuid", ENQUEUED), - ) - def test_wait_notification(self): with patch("time.sleep", return_value=None): mock_conn = MagicMock() @@ -198,26 +139,6 @@ def test_handle_exceptions_in_run(self): mock_init.assert_called_once() mock_close.assert_called_once() - def test_async_http_get_client_error(self): - with patch("requests.get", side_effect=requests.RequestException) as mock_get: - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - mock_cursor = mock_conn.cursor.return_value.__enter__.return_value - _async_http_get( - "http", "localhost", 8069, None, None, "test_db", "test_uuid" - ) - mock_get.assert_called_once() - mock_cursor.execute.assert_called_once_with( - """ - UPDATE queue_job - SET state=%s, date_enqueued=NULL, date_started=NULL - WHERE uuid=%s AND state=%s - RETURNING uuid - """, - (PENDING, "test_uuid", ENQUEUED), - ) - def test_start_async_http_get_event_loop_running(self): with patch( "odoo.addons.queue_job.jobrunner.asyncio.get_event_loop" @@ -237,23 +158,6 @@ def test_start_async_http_get_event_loop_not_running(self): ) mock_run.assert_called_once() - def test_database_initialization_failure(self): - with patch("psycopg2.connect", side_effect=Exception("Connection failed")): - with self.assertLogs(_logger, level="ERROR") as log: - db = Database("test_db") - self.assertIsNone(db.conn) - self.assertIn("Connection failed", log.output[0]) - - def test_create_socket_pair_compatibility(self): - with patch( - "odoo.addons.queue_job.jobrunner.socket.socketpair", - side_effect=AttributeError, - ): - recv, send = self.runner._create_socket_pair() - self.assertIsInstance(recv, socket.socket) - self.assertIsInstance(send, socket.socket) - self.assertEqual(recv.getsockname()[0], "127.0.0.1") - def test_run_event_loop_start_stop(self): runner = QueueJobRunner() runner.loop.call_soon_threadsafe = MagicMock() From fb5975d9f5eae88e79975b2df4ce6f9c668e7e1b Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 14:50:19 +1000 Subject: [PATCH 16/24] declared external dependency --- queue_job/__manifest__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 0daada1821..9a67722bc7 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -8,7 +8,7 @@ "license": "LGPL-3", "category": "Generic Modules", "depends": ["mail", "base_sparse_field", "web"], - "external_dependencies": {"python": ["requests"]}, + "external_dependencies": {"python": ["requests", "aiohttp"]}, "data": [ "security/security.xml", "security/ir.model.access.csv", From 2dd28d4a02dd162ffe8113a71664308791604043 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 14:55:24 +1000 Subject: [PATCH 17/24] reset the requirements --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 3bbcb61daf..b4d39fb9e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ # generated from manifests external_dependencies requests -aiohttp From 3c0acdcec07287a62b30a45e96a4abe8ccee178a Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 14:57:34 +1000 Subject: [PATCH 18/24] added it back but in alphabetical order. --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b4d39fb9e0..2e94c64c60 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ # generated from manifests external_dependencies +aiohttp requests From c3e9833e4f82e3f4a7f278625f1243e9ec944c81 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 15:09:58 +1000 Subject: [PATCH 19/24] modify the test setup to mock the thread creation and event loop initialization --- queue_job/tests/test_runner_runner.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index 42e3721f8f..829796dc8c 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -5,6 +5,7 @@ # we are testing, we want to test as we were an external consumer of the API import os import socket +import threading from unittest.mock import MagicMock, patch from odoo.tests import common @@ -22,7 +23,9 @@ class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): def setUp(self): super().setUp() - self.runner = QueueJobRunner() + with patch.object(QueueJobRunner, '_run_event_loop'), \ + patch.object(threading.Thread, 'start'): + self.runner = QueueJobRunner() def test_channels_from_env(self): with patch.dict(os.environ, {"ODOO_QUEUE_JOB_CHANNELS": "root:4"}): @@ -118,7 +121,10 @@ def test_run(self): mock_close.assert_called_once() def test_stop(self): - self.runner.stop() + with patch.object(self.runner, 'loop'), \ + patch.object(self.runner, '_stop_sock_send'), \ + patch.object(self.runner, '_new_db_check_thread'): + self.runner.stop() self.assertTrue(self.runner._stop) recv, send = self.runner._create_socket_pair() self.assertTrue(send.send(b"stop")) From e2a3b2297fb04b84d79e232cb7c62bdf7b606519 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 15:14:03 +1000 Subject: [PATCH 20/24] updated to pass formating. --- queue_job/tests/test_runner_runner.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index 829796dc8c..1a1ab6a1ad 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -23,8 +23,9 @@ class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): def setUp(self): super().setUp() - with patch.object(QueueJobRunner, '_run_event_loop'), \ - patch.object(threading.Thread, 'start'): + with patch.object(QueueJobRunner, '_run_event_loop'), patch.object( + threading.Thread, 'start' + ): self.runner = QueueJobRunner() def test_channels_from_env(self): @@ -121,9 +122,9 @@ def test_run(self): mock_close.assert_called_once() def test_stop(self): - with patch.object(self.runner, 'loop'), \ - patch.object(self.runner, '_stop_sock_send'), \ - patch.object(self.runner, '_new_db_check_thread'): + with patch.object(self.runner, 'loop'), patch.object( + self.runner, '_stop_sock_send' + ), patch.object(self.runner, '_new_db_check_thread'): self.runner.stop() self.assertTrue(self.runner._stop) recv, send = self.runner._create_socket_pair() From 1e45683490181b43e0bbb1062269d481c82f8255 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 15:21:22 +1000 Subject: [PATCH 21/24] changed quotations to pass.... --- queue_job/tests/test_runner_runner.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index 1a1ab6a1ad..e0674e4a63 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -23,8 +23,8 @@ class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): def setUp(self): super().setUp() - with patch.object(QueueJobRunner, '_run_event_loop'), patch.object( - threading.Thread, 'start' + with patch.object(QueueJobRunner, "_run_event_loop"), patch.object( ++ threading.Thread, "start" ): self.runner = QueueJobRunner() @@ -122,9 +122,9 @@ def test_run(self): mock_close.assert_called_once() def test_stop(self): - with patch.object(self.runner, 'loop'), patch.object( - self.runner, '_stop_sock_send' - ), patch.object(self.runner, '_new_db_check_thread'): + with patch.object(self.runner, "loop"), patch.object( + self.runner, "_stop_sock_send" + ), patch.object(self.runner, "_new_db_check_thread"): self.runner.stop() self.assertTrue(self.runner._stop) recv, send = self.runner._create_socket_pair() From edb36b38f3b452acfea2703f47ed18de5a0c3f9c Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 15:22:59 +1000 Subject: [PATCH 22/24] dang + sign, sorry for all the commits... --- queue_job/tests/test_runner_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index e0674e4a63..6c88fa8135 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -24,7 +24,7 @@ class TestQueueJobRunnerUpdates(common.TransactionCase, JobMixin): def setUp(self): super().setUp() with patch.object(QueueJobRunner, "_run_event_loop"), patch.object( -+ threading.Thread, "start" + threading.Thread, "start" ): self.runner = QueueJobRunner() From b7405da1282b626703439bb4ad94d17164c1c3d7 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 16:13:09 +1000 Subject: [PATCH 23/24] reduced test cases --- queue_job/tests/test_runner_runner.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index 6c88fa8135..f7722fb9d0 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -172,25 +172,3 @@ def test_run_event_loop_start_stop(self): runner._stop = True runner._run_event_loop() runner.loop.stop.assert_called_once() - - def test_handle_db_notifications(self): - mock_conn = MagicMock() - self.runner.db_by_name = {"test_db": mock_conn} - mock_notify = MagicMock() - self.runner.channel_manager.notify = mock_notify - mock_notify_payload = MagicMock() - mock_conn.notifies = [mock_notify_payload] - - self.runner.process_notifications() - - self.assertFalse(mock_conn.notifies) - mock_notify.assert_called_once_with("test_db", *mock_notify_payload) - - def test_check_new_databases_periodically(self): - with patch.object( - self.runner, "check_and_initialize_new_databases" - ) as mock_check: - with patch("time.sleep", side_effect=Exception("stop")): - with self.assertRaisesRegex(Exception, "stop"): - self.runner._check_new_databases_periodically() - mock_check.assert_called() From 65823974529d4f099d7f2bbb981131edd99ab871 Mon Sep 17 00:00:00 2001 From: Matt Harrison Date: Mon, 27 May 2024 17:16:37 +1000 Subject: [PATCH 24/24] reduced test coverage based on testing --- queue_job/tests/test_runner_runner.py | 38 --------------------------- 1 file changed, 38 deletions(-) diff --git a/queue_job/tests/test_runner_runner.py b/queue_job/tests/test_runner_runner.py index f7722fb9d0..3c9fbf8675 100644 --- a/queue_job/tests/test_runner_runner.py +++ b/queue_job/tests/test_runner_runner.py @@ -50,19 +50,6 @@ def test_create_socket_pair(self): self.assertIsInstance(recv, socket.socket) self.assertIsInstance(send, socket.socket) - def test_initialize_databases(self): - with patch.object( - QueueJobRunner, "get_db_names", return_value=["test_db1", "test_db2"] - ): - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - with patch.object(self.runner.channel_manager, "notify") as mock_notify: - self.runner.initialize_databases() - self.assertIn("test_db1", self.runner.db_by_name) - self.assertIn("test_db2", self.runner.db_by_name) - mock_notify.assert_called() - def test_run_jobs(self): with patch("psycopg2.connect") as mock_connect: mock_conn = MagicMock() @@ -99,15 +86,6 @@ def test_wait_notification(self): self.runner.wait_notification() mock_conn.poll.assert_called_once() - def test_process_notifications(self): - with patch("time.sleep", return_value=None): - mock_conn = MagicMock() - self.runner.db_by_name = {"test_db": mock_conn} - mock_conn.notifies = [MagicMock()] - with patch.object(self.runner.channel_manager, "notify"): - self.runner.process_notifications() - self.assertFalse(mock_conn.notifies) - def test_run(self): with patch("time.sleep", return_value=None): with patch("psycopg2.connect") as mock_connect: @@ -130,22 +108,6 @@ def test_stop(self): recv, send = self.runner._create_socket_pair() self.assertTrue(send.send(b"stop")) - def test_handle_exceptions_in_run(self): - with patch("time.sleep", return_value=None): - with patch("psycopg2.connect") as mock_connect: - mock_conn = MagicMock() - mock_connect.return_value = mock_conn - with ( - patch.object(self.runner, "initialize_databases") as mock_init, - patch.object(self.runner, "close_databases") as mock_close, - ): - with patch.object( - self.runner, "process_notifications", side_effect=Exception - ): - self.runner.run() - mock_init.assert_called_once() - mock_close.assert_called_once() - def test_start_async_http_get_event_loop_running(self): with patch( "odoo.addons.queue_job.jobrunner.asyncio.get_event_loop"