From ea8ecd72cf54502093919aa156905b4ec49d5cd8 Mon Sep 17 00:00:00 2001 From: munakoiso Date: Tue, 15 Oct 2024 18:38:20 +0500 Subject: [PATCH] release and recreate replication source lock when lock.acquire is failed (#39) --- src/main.py | 2 +- src/zk.py | 25 +++++++++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/main.py b/src/main.py index ca0fbce..00660b5 100644 --- a/src/main.py +++ b/src/main.py @@ -1169,7 +1169,7 @@ def _acquire_replication_source_slot_lock(self, source): 'Can not release old replication slot locks. We will skip it this time' ) # And acquire lock (then new_primary will create replication slot) - self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True) + self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True) def _return_to_cluster(self, new_primary, role, is_dead=False): """ diff --git a/src/zk.py b/src/zk.py index 1a0e76e..448efae 100644 --- a/src/zk.py +++ b/src/zk.py @@ -197,7 +197,7 @@ def _init_lock(self, name, read_lock=False): lock = self._zk.Lock(path, helpers.get_hostname()) self._locks[name] = lock - def _acquire_lock(self, name, allow_queue, timeout, read_lock=False): + def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_fail=False): if timeout is None: timeout = self._timeout if self._zk.state != KazooState.CONNECTED: @@ -220,14 +220,22 @@ def _acquire_lock(self, name, allow_queue, timeout, read_lock=False): logging.warning('%s lock is already taken by %s.', name[0].upper() + name[1:], contenders[0]) return False try: - return lock.acquire(blocking=True, timeout=timeout) + acquired = lock.acquire(blocking=True, timeout=timeout) + if not acquired: + logging.warning('Unable to acquire lock "%s", but not because of timeout...', name) except LockTimeout: logging.warning('Unable to obtain lock %s within timeout (%s s)', name, timeout) - return False + acquired = False except Exception: for line in traceback.format_exc().split('\n'): logging.error(line.rstrip()) - return False + acquired = False + if not acquired and release_on_fail: + logging.debug('Try to release and delete lock "%s", to recreate on next iter', name) + self.release_lock(name) + if name in self._locks: + del self._locks[name] + return acquired def _release_lock(self, name): if name in self._locks: @@ -482,18 +490,19 @@ def get_current_lock_holder(self, name=None, catch_except=True): else: return None - def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False): - result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) + def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False): + result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail) if not result: + raise ZookeeperException(f'Failed to acquire lock {lock_type}') logging.debug(f'Success acquire lock: {lock_type}') - def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False): + def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False): """ Acquire lock (leader by default) """ lock_type = lock_type or self.PRIMARY_LOCK_PATH - return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) + return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail) def release_lock(self, lock_type=None, wait=0): """