Skip to content

Commit

Permalink
[IMP] queue_job: HA job runner using session level advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sbidoul committed Jul 2, 2024
1 parent cefb0a8 commit 54620ab
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,17 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
PG_ADVISORY_LOCK_ID = 2293787760715711918

_logger = logging.getLogger(__name__)

select = selectors.DefaultSelector


class MasterElectionLost(Exception):
pass


# Unfortunately, it is not possible to extend the Odoo
# server command line arguments, so we resort to environment variables
# to configure the runner (channels mostly).
Expand Down Expand Up @@ -280,6 +285,14 @@ def close(self):
pass
self.conn = None

def acquire_master_lock(self):
with closing(self.conn.cursor()) as cr:
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))

Check warning on line 290 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L290

Added line #L290 was not covered by tests
if not cr.fetchone()[0]:
self.close()
msg = f"Could not acquire master runner lock on {self.db_name}"
raise MasterElectionLost(msg)

Check warning on line 294 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L292-L294

Added lines #L292 - L294 were not covered by tests

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand Down Expand Up @@ -409,7 +422,7 @@ def get_db_names(self):
db_names = config["db_name"].split(",")
else:
db_names = odoo.service.db.list_dbs(True)
return db_names
return sorted(db_names)

Check warning on line 425 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L425

Added line #L425 was not covered by tests

def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
Expand All @@ -424,6 +437,7 @@ def close_databases(self, remove_jobs=True):
def initialize_databases(self):
for db_name in self.get_db_names():
db = Database(db_name)
db.acquire_master_lock()
if db.has_queue_job:
self.db_by_name[db_name] = db
with db.select_jobs("state in %s", (NOT_DONE,)) as cr:
Expand Down Expand Up @@ -516,7 +530,7 @@ def run(self):
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
_logger.debug("initializing database connections")

Check warning on line 533 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L533

Added line #L533 was not covered by tests
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
Expand All @@ -531,6 +545,14 @@ def run(self):
except InterruptedError:
# Interrupted system call, i.e. KeyboardInterrupt during select
self.stop()
except MasterElectionLost as e:
_logger.debug(
"master election lost: %s, sleeping %ds and retrying",

Check warning on line 550 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L548-L550

Added lines #L548 - L550 were not covered by tests
e,
ERROR_RECOVERY_DELAY,
)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)

Check warning on line 555 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L553-L555

Added lines #L553 - L555 were not covered by tests
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
Expand Down

0 comments on commit 54620ab

Please sign in to comment.