Skip to content

Commit

Permalink
better managing replication source locks (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso authored Oct 16, 2024
1 parent ea8ecd7 commit 616e931
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ def primary_iter(self, db_state, zk_state):

self.checks['primary_switch'] = 0

# release replication source locks
self._acquire_replication_source_slot_lock(None)

self._handle_slots()

self._store_replics_info(db_state, zk_state)
Expand Down Expand Up @@ -603,7 +606,7 @@ def replica_return(self, db_state, zk_state):
# Try to resume WAL replaying, it can be paused earlier
self.db.pg_wal_replay_resume()

if not self._check_archive_recovery(limit) and not self._wait_for_streaming(limit):
if not self._check_archive_recovery(limit) and not self._wait_for_streaming(holder, limit):
# Wal receiver is not running and
# postgresql isn't in archive recovery
# We should try to restart
Expand Down Expand Up @@ -1035,7 +1038,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead):
# timeline N-1 before current recovery point M".
# Checking it with the info from ZK.
#
if self._wait_for_streaming(limit, new_primary):
if self._wait_for_streaming(new_primary, limit):
#
# The easy way succeeded.
#
Expand Down Expand Up @@ -1091,7 +1094,7 @@ def _attach_to_primary(self, new_primary, limit):
self.checks['primary_switch'] = 0
return None

if not self._wait_for_streaming(limit):
if not self._wait_for_streaming(new_primary, limit):
self.checks['primary_switch'] = 0
return None

Expand Down Expand Up @@ -1168,8 +1171,9 @@ def _acquire_replication_source_slot_lock(self, source):
'Could not get all hosts list from ZK.'
'Can not release old replication slot locks. We will skip it this time'
)
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True)
if source:
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Expand Down Expand Up @@ -1617,11 +1621,12 @@ def _check_postgresql_streaming(self, primary=None):

return None

def _wait_for_streaming(self, limit=-1, primary=None):
def _wait_for_streaming(self, primary, limit=-1):
"""
Stop until postgresql start streaming from primary.
With limit=-1 the loop here can be infinite.
"""
self._acquire_replication_source_slot_lock(primary)
check_streaming = functools.partial(self._check_postgresql_streaming, primary)
return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')

Expand Down

0 comments on commit 616e931

Please sign in to comment.