Skip to content

Commit

Permalink
release and recreate all locks if acquire is failed (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
munakoiso authored Jan 17, 2025
1 parent 1e8fb69 commit 86752c1
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def read_config(filename=None, options=None):
'drop_slot_countdown': 300,
'replication_slots_polling': None,
'max_allowed_switchover_lag_ms': 60000,
'release_lock_after_acquire_failed': 'yes',
},
'primary': {
'change_replication_type': 'yes',
Expand Down
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def _acquire_replication_source_slot_lock(self, source):
)
if source:
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Expand Down
13 changes: 7 additions & 6 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Zookeeper(object):
def __init__(self, config, plugins):
self._plugins = plugins
self._zk_hosts = config.get('global', 'zk_hosts')
self._release_lock_after_acquire_failed = config.getboolean('global', 'release_lock_after_acquire_failed')
self._timeout = config.getfloat('global', 'iteration_timeout')
self._zk_connect_max_delay = config.getfloat('global', 'zk_connect_max_delay')
self._zk_auth = config.getboolean('global', 'zk_auth')
Expand Down Expand Up @@ -197,7 +198,7 @@ def _init_lock(self, name, read_lock=False):
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_fail=False):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand Down Expand Up @@ -230,7 +231,7 @@ def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_
for line in traceback.format_exc().split('\n'):
logging.error(line.rstrip())
acquired = False
if not acquired and release_on_fail:
if not acquired and self._release_lock_after_acquire_failed:
logging.debug('Try to release and delete lock "%s", to recreate on next iter', name)
try:
self.release_lock(name)
Expand Down Expand Up @@ -495,19 +496,19 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
if not result:

raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)

def release_lock(self, lock_type=None, wait=0):
"""
Expand Down

0 comments on commit 86752c1

Please sign in to comment.