diff --git a/src/__init__.py b/src/__init__.py index d816b7f..f127073 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -80,6 +80,7 @@ def read_config(filename=None, options=None): 'drop_slot_countdown': 300, 'replication_slots_polling': None, 'max_allowed_switchover_lag_ms': 60000, + 'release_lock_after_acquire_failed': 'yes', }, 'primary': { 'change_replication_type': 'yes', diff --git a/src/main.py b/src/main.py index c88528e..ba4cb25 100644 --- a/src/main.py +++ b/src/main.py @@ -1222,7 +1222,7 @@ def _acquire_replication_source_slot_lock(self, source): ) if source: # And acquire lock (then new_primary will create replication slot) - self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True) + self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True) def _return_to_cluster(self, new_primary, role, is_dead=False): """ diff --git a/src/zk.py b/src/zk.py index b8c0e17..ab48ee8 100644 --- a/src/zk.py +++ b/src/zk.py @@ -76,6 +76,7 @@ class Zookeeper(object): def __init__(self, config, plugins): self._plugins = plugins self._zk_hosts = config.get('global', 'zk_hosts') + self._release_lock_after_acquire_failed = config.getboolean('global', 'release_lock_after_acquire_failed') self._timeout = config.getfloat('global', 'iteration_timeout') self._zk_connect_max_delay = config.getfloat('global', 'zk_connect_max_delay') self._zk_auth = config.getboolean('global', 'zk_auth') @@ -197,7 +198,7 @@ def _init_lock(self, name, read_lock=False): lock = self._zk.Lock(path, helpers.get_hostname()) self._locks[name] = lock - def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_fail=False): + def _acquire_lock(self, name, allow_queue, timeout, read_lock=False): if timeout is None: timeout = self._timeout if self._zk.state != KazooState.CONNECTED: @@ -230,7 +231,7 @@ def _acquire_lock(self, name, allow_queue, timeout, read_lock=False, release_on_ for line in traceback.format_exc().split('\n'): logging.error(line.rstrip()) acquired = False - if not acquired and release_on_fail: + if not acquired and self._release_lock_after_acquire_failed: logging.debug('Try to release and delete lock "%s", to recreate on next iter', name) try: self.release_lock(name) @@ -495,19 +496,19 @@ def get_current_lock_holder(self, name=None, catch_except=True): else: return None - def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False): - result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail) + def acquire_lock(self, lock_type, allow_queue=False, timeout=None, read_lock=False): + result = self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) if not result: raise ZookeeperException(f'Failed to acquire lock {lock_type}') logging.debug(f'Success acquire lock: {lock_type}') - def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False, release_on_fail=False): + def try_acquire_lock(self, lock_type=None, allow_queue=False, timeout=None, read_lock=False): """ Acquire lock (leader by default) """ lock_type = lock_type or self.PRIMARY_LOCK_PATH - return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock, release_on_fail=release_on_fail) + return self._acquire_lock(lock_type, allow_queue, timeout, read_lock=read_lock) def release_lock(self, lock_type=None, wait=0): """