Skip to content

Commit

Permalink
Merge branch 'main' into create_replication_slots_on_swithover
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso authored Oct 15, 2024
2 parents 7cf7260 + ea8ecd7 commit 946fc9b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ def _acquire_replication_source_slot_lock(self, source):
)
if source:
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Expand Down
25 changes: 17 additions & 8 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def _init_lock(self, name, read_lock=False):
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_fail=False):
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand All @@ -220,14 +220,22 @@ def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
logging.warning('%s lock is already taken by %s.', name[0].upper() + name[1:], contenders[0])
return False
try:
return lock.acquire(blocking=True, timeout=timeout)
acquired = lock.acquire(blocking=True, timeout=timeout)
if not acquired:
logging.warning('Unable to acquire lock "%s", but not because of timeout...', name)
except LockTimeout:
logging.warning('Unable to obtain lock %s within timeout (%s s)', name, timeout)
return False
acquired = False
except Exception:
for line in traceback.format_exc().split('\n'):
logging.error(line.rstrip())
return False
acquired = False
if not acquired and release_on_fail:
logging.debug('Try to release and delete lock "%s", to recreate on next iter', name)
self.release_lock(name)
if name in self._locks:
del self._locks[name]
return acquired

def _release_lock(self, name):
if name in self._locks:
Expand Down Expand Up @@ -482,18 +490,19 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)
if not result:

raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)

def release_lock(self, lock_type=None, wait=0):
"""
Expand Down

0 comments on commit 946fc9b

Please sign in to comment.