Skip to content

Commit

Permalink
Patch zookeeper client and upgrade version to 2.7.1(#848)
Browse files Browse the repository at this point in the history
We apply a patch to the zookeeper client. Users can create and delete indexes without connecting to the Zookeeper server if the environment variable ZOOKEEPER_HOSTS is unset. If the environment variable is set, a connection to the zookeeper server is required to create or delete indexes.
  • Loading branch information
wanliAlex authored May 28, 2024
1 parent 2dcd3c5 commit b0ef791
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 32 deletions.
1 change: 1 addition & 0 deletions run_marqo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ elif [ -z "$VESPA_QUERY_URL" ] && [ -z "$VESPA_DOCUMENT_URL" ] && [ -z "$VESPA_C
export VESPA_QUERY_URL="http://localhost:8080"
export VESPA_DOCUMENT_URL="http://localhost:8080"
export VESPA_CONFIG_URL="http://localhost:19071"
export ZOOKEEPER_HOSTS="localhost:2181"
export VESPA_IS_INTERNAL=True

else
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/api/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def default_env_vars() -> dict:
EnvVars.VESPA_CONFIG_URL: "http://localhost:19071",
EnvVars.VESPA_QUERY_URL: "http://localhost:8080",
EnvVars.VESPA_DOCUMENT_URL: "http://localhost:8080",
EnvVars.ZOOKEEPER_HOSTS: "localhost:2181",
EnvVars.ZOOKEEPER_HOSTS: None,
EnvVars.VESPA_CONTENT_CLUSTER_NAME: "content_default",
EnvVars.VESPA_POOL_SIZE: 10,
EnvVars.VESPA_FEED_POOL_SIZE: 10,
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
utils.read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE))

# Initialize Core layer dependencies
self.index_management = IndexManagement(vespa_client, zookeeper_client)
self.index_management = IndexManagement(vespa_client, zookeeper_client, enable_index_operations=True)
self.monitoring = Monitoring(vespa_client, self.index_management)
self.document = Document(vespa_client, self.index_management)
self.recommender = Recommender(vespa_client, self.index_management)
Expand Down
58 changes: 40 additions & 18 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import textwrap
import xml.etree.ElementTree as ET
from contextlib import contextmanager
from datetime import datetime
from typing import List
from typing import Optional
Expand All @@ -10,22 +11,20 @@
from marqo import version
from marqo.base_model import ImmutableStrictBaseModel
from marqo.core import constants
from marqo.core.distributed_lock.zookeeper_distributed_lock import get_deployment_lock
from marqo.vespa.zookeeper_client import ZookeeperClient
from marqo.core.distributed_lock.zookeeper_distributed_lock import ZookeeperDistributedLock
from marqo.core.exceptions import OperationConflictError
from marqo.core.distributed_lock.zookeeper_distributed_lock import get_deployment_lock
from marqo.core.exceptions import IndexExistsError, IndexNotFoundError
from marqo.core.exceptions import OperationConflictError
from marqo.core.exceptions import ZookeeperLockNotAcquiredError, InternalError
from marqo.core.models import MarqoIndex
from marqo.core.models.marqo_index_request import MarqoIndexRequest
from marqo.core.vespa_schema import for_marqo_index_request as vespa_schema_factory
from marqo.exceptions import InternalError
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.models.index_settings import IndexSettings
from marqo.vespa.exceptions import VespaStatusError
from marqo.vespa.models import VespaDocument
from marqo.vespa.vespa_client import VespaClient
from marqo.core.exceptions import ZookeeperLockNotAcquiredError, InternalError
from contextlib import contextmanager

from marqo.vespa.zookeeper_client import ZookeeperClient

logger = marqo.logging.get_logger(__name__)

Expand Down Expand Up @@ -60,13 +59,23 @@ class IndexManagement:
'''
)

def __init__(self, vespa_client: VespaClient, zookeeper_client: Optional[ZookeeperClient] = None):
def __init__(self, vespa_client: VespaClient, zookeeper_client: Optional[ZookeeperClient] = None,
enable_index_operations: bool = False):
"""Instantiate an IndexManagement object.
Args:
vespa_client: VespaClient object
zookeeper_client: ZookeeperClient object
enable_index_operations: A flag to enable index operations. If set to True,
the object can create/delete indexes, otherwise, it raises an InternalError during index operations.
"""
self.vespa_client = vespa_client
self._zookeeper_client = zookeeper_client
self._zookeeper_deployment_lock: Optional[ZookeeperDistributedLock] = self._instantiate_deployment_lock()
self._enable_index_operations = enable_index_operations

def _instantiate_deployment_lock(self) -> Optional[ZookeeperDistributedLock]:
"""Instantiate a ZookeeperDistributedLock"""
"""Instantiate a ZookeeperDistributedLock."""
if self._zookeeper_client is None:
return None
else:
Expand Down Expand Up @@ -544,16 +553,29 @@ def _delete_index_settings_by_name(self, index_name: str):
def _deployment_lock_context_manager(self):
"""A context manager for deployment lock acquisition.
If the _enable_index_operations flag is set to True, the context manager tries to acquire the deployment lock.
If the lock is acquired, the context manager yields control to the caller. Or if the lock is not acquired,
it raises an OperationConflictError.
If the lock is None, the context manager yields control to the caller without acquiring the lock.
If the _enable_index_operations flag is set to False, the context manager raises an InternalError during
index operations.
Raises:
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
InternalError: If deployment lock is not instantiated
InternalError: If index_management object is not enabled for index operations
"""
if self._zookeeper_deployment_lock is None:
raise InternalError("Deployment lock is not instantiated and cannot be used for index creation/deletion")
try:
with self._zookeeper_deployment_lock:
yield
except ZookeeperLockNotAcquiredError:
raise OperationConflictError("Another index creation/deletion operation is in progress. "
"Your request is rejected. Please try again later")
if self._enable_index_operations:
if self._zookeeper_deployment_lock is None:
logger.warning(f"No Zookeeper client provided. "
f"Concurrent index operations may result in race conditions. ")
yield # No lock, proceed without locking
else:
try:
with self._zookeeper_deployment_lock:
yield
except ZookeeperLockNotAcquiredError:
raise OperationConflictError("Another index creation/deletion operation is in progress. "
"Your request is rejected. Please try again later")
else:
raise InternalError("You index_management object is not enabled for index operations. ")
3 changes: 2 additions & 1 deletion src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ def generate_config() -> config.Config:
partial_update_pool_size=utils.read_env_vars_and_defaults_ints(EnvVars.VESPA_PARTIAL_UPDATE_POOL_SIZE),
)

# Zookeeper is only instantiated if the hosts are provided
zookeeper_client = ZookeeperClient(
zookeeper_connection_timeout=utils.read_env_vars_and_defaults_ints(EnvVars.ZOOKEEPER_CONNECTION_TIMEOUT),
hosts=utils.read_env_vars_and_defaults(EnvVars.ZOOKEEPER_HOSTS)
)
) if utils.read_env_vars_and_defaults(EnvVars.ZOOKEEPER_HOSTS) else None

# Determine default device
default_device = utils.read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE)
Expand Down
2 changes: 1 addition & 1 deletion src/marqo/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.7.0"
__version__ = "2.7.1"


def get_version() -> str:
Expand Down
29 changes: 21 additions & 8 deletions tests/core/index_management/test_index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from marqo.vespa.models import VespaDocument
from tests.marqo_test import MarqoTestCase
from marqo.core.exceptions import OperationConflictError
from marqo.core.distributed_lock.zookeeper_distributed_lock import ZookeeperDistributedLock
from unittest.mock import patch
import threading
import time
from marqo.core.exceptions import InternalError
Expand All @@ -22,7 +22,8 @@
class TestIndexManagement(MarqoTestCase):

def setUp(self):
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=self.zookeeper_client)
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=self.zookeeper_client,
enable_index_operations=True)

def test_bootstrap_vespa_doesNotExist_successful(self):
settings_schema_name = 'a' + str(uuid.uuid4()).replace('-', '')
Expand Down Expand Up @@ -307,24 +308,36 @@ def create_index(marqo_index_request):
t_1.join()
self.index_management.delete_index_by_name(index_name_1)

def test_createIndexFailIfNoZookeeperProvided(self):
def test_createIndexFailIfEnableIndexCreationIsFalse(self):
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=None)
index_name = 'a' + str(uuid.uuid4()).replace('-', '')
marqo_index_request = self.unstructured_marqo_index_request(
name=index_name,
)
with self.assertRaises(InternalError) as e:
self.index_management.create_index(marqo_index_request)
self.assertEqual(str(e.exception), 'Deployment lock is not '
'instantiated and cannot be used for index creation/deletion')
self.assertIn("You index_management object is not enabled for index operations. ",
str(e.exception))

def test_deleteIndexFailIfNoZookeeperProvided(self):
def test_deleteIndexFailIfEnableIndexCreationIsFalse(self):
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=None)
index_name = 'a' + str(uuid.uuid4()).replace('-', '')
with self.assertRaises(InternalError) as e:
self.index_management.delete_index_by_name(index_name)
self.assertEqual(str(e.exception), 'Deployment lock is not '
'instantiated and cannot be used for index creation/deletion')
self.assertIn("You index_management object is not enabled for index operations. ",
str(e.exception))

def test_createIndexWithoutZookeeperClient_success(self):
"""Test to ensure create_index requests can be made without Zookeeper client with a warning logged."""
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=None, enable_index_operations=True)
index_name = 'a' + str(uuid.uuid4()).replace('-', '')
try:
with patch("marqo.core.index_management.index_management.logger.warning") as mock_logger_warning:
self.index_management.create_index(self.unstructured_marqo_index_request(name=index_name))
mock_logger_warning.assert_called_once()

finally:
self.index_management.delete_index_by_name(index_name)

def test_deploymentLockIsNone(self):
"""Test to ensure if no Zookeeper client is provided, deployment lock is None
Expand Down
2 changes: 1 addition & 1 deletion tests/marqo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def setUpClass(cls) -> None:
cls.configure_request_metrics()
cls.vespa_client = vespa_client
cls.zookeeper_client = zookeeper_client
cls.index_management = IndexManagement(cls.vespa_client, cls.zookeeper_client)
cls.index_management = IndexManagement(cls.vespa_client, cls.zookeeper_client, enable_index_operations=True)
cls.monitoring = Monitoring(cls.vespa_client, cls.index_management)
cls.config = config.Config(vespa_client=vespa_client, default_device="cpu",
zookeeper_client=cls.zookeeper_client)
Expand Down
31 changes: 30 additions & 1 deletion tests/tensor_search/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import torch
from tests.marqo_test import MarqoTestCase
from marqo.tensor_search import enums

from marqo.tensor_search.api import generate_config
import os
from unittest import mock
from marqo.tensor_search.enums import EnvVars

@unittest.skip
class TestConfig(MarqoTestCase):
Expand Down Expand Up @@ -37,6 +40,8 @@ def test_device_for_clip(self):
assert str(enums.Device.cpu) == "cpu"




@unittest.skip
class TestConfigBackend(MarqoTestCase):

Expand All @@ -58,3 +63,27 @@ def test_init_custom_backend(self):
def test_init_custom_backend_as_string(self):
c = config.Config(url=self.endpoint, backend="elasticsearch")
assert c.backend == "elasticsearch"


class TestGenerateConfig(MarqoTestCase):

def test_configWithoutZookeeperHostsBeingSet(self):
"""Test that the config is generated correctly when ZOOKEEPER_HOSTS is not set or is an empty string."""
environment_variable_test_cases = [
{"ZOOKEEPER_HOSTS": ""}, # Empty string
dict() # Empty dict, unset
]
for env in environment_variable_test_cases:
with self.subTest(env):
with mock.patch.dict(os.environ, env):
c = generate_config()
self.assertIsNone(c._zookeeper_client)

def test_configWithZookeeperHostsBeingSet(self):
"""Test that the config is generated correctly when ZOOKEEPER_HOSTS is set to a value."""
env = {"ZOOKEEPER_HOSTS": "a.fake.url"}
with mock.patch.dict(os.environ, env):
with mock.patch("marqo.config.Config._connect_to_zookeeper") as mock_connect_to_zookeeper:
c = generate_config()
mock_connect_to_zookeeper.assert_called_once()
self.assertIsNotNone(c._zookeeper_client)

0 comments on commit b0ef791

Please sign in to comment.