Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release and recreate all locks when lock.acquire is failed #58

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def read_config(filename=None, options=None):
'drop_slot_countdown': 300,
'replication_slots_polling': None,
'max_allowed_switchover_lag_ms': 60000,
'release_lock_after_acquire_failed': 'yes',
},
'primary': {
'change_replication_type': 'yes',
Expand Down
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def _acquire_replication_source_slot_lock(self, source):
)
if source:
# And acquire lock (then new_primary will create replication slot)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True)
self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True)

def _return_to_cluster(self, new_primary, role, is_dead=False):
"""
Expand Down
13 changes: 7 additions & 6 deletions src/zk.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Zookeeper(object):
def __init__(self, config, plugins):
self._plugins = plugins
self._zk_hosts = config.get('global', 'zk_hosts')
self._release_lock_after_acquire_failed = config.getboolean('global', 'release_lock_after_acquire_failed')
self._timeout = config.getfloat('global', 'iteration_timeout')
self._zk_connect_max_delay = config.getfloat('global', 'zk_connect_max_delay')
self._zk_auth = config.getboolean('global', 'zk_auth')
Expand Down Expand Up @@ -197,7 +198,7 @@ def _init_lock(self, name, read_lock=False):
lock = self._zk.Lock(path, helpers.get_hostname())
self._locks[name] = lock

def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_fail=False):
def _acquire_lock(self, name, allow_queue, timeout, read_lock=False):
if timeout is None:
timeout = self._timeout
if self._zk.state != KazooState.CONNECTED:
Expand Down Expand Up @@ -230,7 +231,7 @@ def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_
for line in traceback.format_exc().split('\n'):
logging.error(line.rstrip())
acquired = False
if not acquired and release_on_fail:
if not acquired and self._release_lock_after_acquire_failed:
logging.debug('Try to release and delete lock "%s", to recreate on next iter', name)
try:
self.release_lock(name)
Expand Down Expand Up @@ -495,19 +496,19 @@ def get_current_lock_holder(self, name=None, catch_except=True):
else:
return None

def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)
def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False):
result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)
if not result:

raise ZookeeperException(f'Failed to acquire lock {lock_type}')
logging.debug(f'Success acquire lock: {lock_type}')

def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False):
def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False):
"""
Acquire lock (leader by default)
"""
lock_type = lock_type or self.PRIMARY_LOCK_PATH
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail)
return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock)

def release_lock(self, lock_type=None, wait=0):
"""
Expand Down
Loading