diff --git a/docs/indexes/pinecone-sync-routes.ipynb b/docs/indexes/pinecone-sync-routes.ipynb new file mode 100644 index 00000000..d156e9a0 --- /dev/null +++ b/docs/indexes/pinecone-sync-routes.ipynb @@ -0,0 +1,391 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "#!pip install -qU \"semantic-router[pinecone]==0.0.73\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Syncing Routes with Pinecone Index" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When using the `PineconeIndex`, our `RouteLayer` is stored in two places:\n", + "\n", + "* We keep route layer metadata locally.\n", + "* Vectors alongside a backup of our metadata is stored remotely in Pinecone.\n", + "\n", + "By storing some data locally and some remotely we achieve improved persistence and the ability to recover our local state if lost. However, it does come with challenges around keep our local and remote instances synchronized. Fortunately, we have [several synchronization options](https://docs.aurelio.ai/semantic-router/route_layer/sync.html). In this example, we'll see how to use these options to keep our local and remote Pinecone instances synchronized." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from semantic_router import Route\n", + "\n", + "# we could use this as a guide for our chatbot to avoid political conversations\n", + "politics = Route(\n", + " name=\"politics\",\n", + " utterances=[\n", + " \"isn't politics the best thing ever\",\n", + " \"why don't you tell me about your political opinions\",\n", + " \"don't you just love the president\",\n", + " \"don't you just hate the president\",\n", + " \"they're going to destroy this country!\",\n", + " \"they will save the country!\",\n", + " ],\n", + ")\n", + "\n", + "# this could be used as an indicator to our chatbot to switch to a more\n", + "# conversational prompt\n", + "chitchat = Route(\n", + " name=\"chitchat\",\n", + " utterances=[\n", + " \"how's the weather today?\",\n", + " \"how are things going?\",\n", + " \"lovely weather today\",\n", + " \"the weather is horrendous\",\n", + " \"let's go to the chippy\",\n", + " ],\n", + ")\n", + "\n", + "# we place both of our decisions together into single list\n", + "routes = [politics, chitchat]" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from getpass import getpass\n", + "from semantic_router.encoders import OpenAIEncoder\n", + "\n", + "# get at platform.openai.com\n", + "os.environ[\"OPENAI_API_KEY\"] = os.environ.get(\"OPENAI_API_KEY\") or getpass(\"Enter OpenAI API key: \")\n", + "\n", + "encoder = OpenAIEncoder(name=\"text-embedding-3-small\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For our `PineconeIndex` we do the exact same thing, ie we initialize as usual:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from semantic_router.index.pinecone import PineconeIndex\n", + "\n", + "# get at app.pinecone.io\n", + "os.environ[\"PINECONE_API_KEY\"] = os.environ.get(\"PINECONE_API_KEY\") or getpass(\"Enter Pinecone API key: \")\n", + "\n", + "pc_index = PineconeIndex(\n", + " dimensions=1536,\n", + " init_async_index=True, # enables asynchronous methods, it's optional\n", + " sync=None, # defines whether we sync between local and remote route layers\n", + " # when sync is None, no sync is performed\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## RouteLayer" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `RouteLayer` class supports both sync and async operations by default, so we initialize as usual:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "from semantic_router.layer import RouteLayer\n", + "\n", + "rl = RouteLayer(encoder=encoder, routes=routes, index=pc_index)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can check our route layer and index information as usual:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['politics', 'chitchat']" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rl.list_route_names()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(rl.index)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's see if our local and remote instances are synchronized..." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "hash_id: sr_hash#\n" + ] + }, + { + "data": { + "text/plain": [ + "ConfigParameter(field='sr_hash', value='f8f04794014c855bd68e283e64c57d8cc7a92f2ecd143386105de98f57c55e04', namespace='', created_at='2024-11-10T21:41:35.991948')" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import time\n", + "\n", + "# due to pinecone indexing latency we wait 3 seconds\n", + "time.sleep(3)\n", + "rl.index._read_hash()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "hash_id: sr_hash#\n" + ] + }, + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rl.is_synced()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "It looks like everything is synced! Let's try deleting our local route layer, initializing it with just the politics route, and checking again." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "hash_id: sr_hash#\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "\u001b[33m2024-11-10 22:41:41 WARNING semantic_router.utils.logger Local and remote route layers were not aligned. Remote hash not updated. Use `RouteLayer.get_utterance_diff()` to see details.\u001b[0m\n" + ] + } + ], + "source": [ + "del rl\n", + "\n", + "rl = RouteLayer(encoder=encoder, routes=[politics], index=pc_index)\n", + "time.sleep(3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's try `rl.is_synced()` again:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "hash_id: sr_hash#\n" + ] + }, + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rl.is_synced()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can use the `get_utterance_diff` method to see exactly _why_ our local and remote are not synced" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['chitchat: how are things going?', \"chitchat: how's the weather today?\", \"chitchat: let's go to the chippy\", 'chitchat: lovely weather today', 'chitchat: the weather is horrendous', \"politics: don't you just hate the president\", \"politics: don't you just love the president\", \"politics: isn't politics the best thing ever\", 'politics: they will save the country!', \"politics: they're going to destroy this country!\", \"politics: why don't you tell me about your political opinions\"]\n", + "[\"politics: don't you just hate the president\", \"politics: don't you just love the president\", \"politics: isn't politics the best thing ever\", 'politics: they will save the country!', \"politics: they're going to destroy this country!\", \"politics: why don't you tell me about your political opinions\"]\n" + ] + }, + { + "data": { + "text/plain": [ + "['+ chitchat: how are things going?',\n", + " \"+ chitchat: how's the weather today?\",\n", + " \"+ chitchat: let's go to the chippy\",\n", + " '+ chitchat: lovely weather today',\n", + " '+ chitchat: the weather is horrendous',\n", + " \" politics: don't you just hate the president\",\n", + " \" politics: don't you just love the president\",\n", + " \" politics: isn't politics the best thing ever\",\n", + " ' politics: they will save the country!',\n", + " \" politics: they're going to destroy this country!\",\n", + " \" politics: why don't you tell me about your political opinions\"]" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rl.get_utterance_diff()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "semantic_router_1", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/semantic_router/index/pinecone.py b/semantic_router/index/pinecone.py index 2c4c4961..8bcc9ff3 100644 --- a/semantic_router/index/pinecone.py +++ b/semantic_router/index/pinecone.py @@ -633,7 +633,11 @@ def query( def _read_hash(self) -> ConfigParameter: if self.index is None: - raise ValueError("Index has not been initialized.") + return ConfigParameter( + field="sr_hash", + value="", + namespace=self.namespace, + ) hash_id = f"sr_hash#{self.namespace}" print(f"hash_id: {hash_id}") hash_record = self.index.fetch( diff --git a/semantic_router/layer.py b/semantic_router/layer.py index c0550bf7..0e2339e4 100644 --- a/semantic_router/layer.py +++ b/semantic_router/layer.py @@ -515,6 +515,16 @@ def from_config(cls, config: LayerConfig, index: Optional[BaseIndex] = None): return cls(encoder=encoder, routes=config.routes, index=index) def add(self, route: Route): + """Add a route to the local RouteLayer and index. + + :param route: The route to add. + :type route: Route + """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash embedded_utterances = self.encoder(route.utterances) self.index.add( embeddings=embedded_utterances, @@ -530,7 +540,14 @@ def add(self, route: Route): ) self.routes.append(route) - self._write_hash() # update current hash in index + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) def list_route_names(self) -> List[str]: return [route.name for route in self.routes] @@ -549,6 +566,11 @@ def update( The name must exist within the local RouteLayer, if not a KeyError will be raised. """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash if threshold is None and utterances is None: raise ValueError( @@ -570,12 +592,27 @@ def update( else: raise ValueError(f"Route '{name}' not found. Nothing updated.") + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) + def delete(self, route_name: str): """Deletes a route given a specific route name. :param route_name: the name of the route to be deleted :type str: """ + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + if route_name not in [route.name for route in self.routes]: err_msg = f"Route `{route_name}` not found in RouteLayer" logger.warning(err_msg) @@ -587,6 +624,15 @@ def delete(self, route_name: str): self.routes = [route for route in self.routes if route.name != route_name] self.index.delete(route_name=route_name) + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) + def _refresh_routes(self): """Pulls out the latest routes from the index.""" raise NotImplementedError("This method has not yet been implemented.") @@ -605,6 +651,12 @@ def _refresh_routes(self): self.routes.append(route) def _add_routes(self, routes: List[Route]): + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash + if not routes: logger.warning("No routes provided to add.") return @@ -626,7 +678,14 @@ def _add_routes(self, routes: List[Route]): logger.error(f"Failed to add routes to the index: {e}") raise Exception("Indexing error occurred") from e - self._write_hash() + if current_local_hash.value == current_remote_hash.value: + self._write_hash() # update current hash in index + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) def _get_hash(self) -> ConfigParameter: config = self.to_config() @@ -686,14 +745,18 @@ def get_utterance_diff(self) -> List[str]: # sort local and remote utterances local_utterances.sort() remote_utterances.sort() - print(remote_utterances) - print(local_utterances) # now get diff differ = Differ() diff = list(differ.compare(local_utterances, remote_utterances)) return diff def _add_and_sync_routes(self, routes: List[Route]): + # get current local hash + current_local_hash = self._get_hash() + current_remote_hash = self.index._read_hash() + if current_remote_hash.value == "": + # if remote hash is empty, the index is to be initialized + current_remote_hash = current_local_hash # create embeddings for all routes and sync at startup with remote ones based on sync setting local_route_names, local_utterances, local_function_schemas, local_metadata = ( self._extract_routes_details(routes, include_metadata=True) @@ -754,7 +817,15 @@ def _add_and_sync_routes(self, routes: List[Route]): metadata=data.get("metadata", {}), ) ) - self._write_hash() + # update hash IF index and local hash were aligned + if current_local_hash.value == current_remote_hash.value: + self._write_hash() + else: + logger.warning( + "Local and remote route layers were not aligned. Remote hash " + "not updated. Use `RouteLayer.get_utterance_diff()` to see " + "details." + ) def _extract_routes_details( self, routes: List[Route], include_metadata: bool = False