Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update PineconeIndex to use aiohttp for async requests and … #520

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

italianconcerto
Copy link
Contributor

@italianconcerto italianconcerto commented Jan 23, 2025

User description

…improve error handling

  • Replaced async_client with aiohttp.ClientSession for making asynchronous HTTP requests.
  • Added headers for API requests to enhance security and compatibility.
  • Simplified error handling by removing checks for async_client initialization.
  • Improved code readability and maintainability by consolidating request logic.

PR Type

Enhancement, Bug fix


Description

  • Replaced async_client with aiohttp.ClientSession for async HTTP requests.

  • Added standardized headers for API requests.

  • Simplified error handling by removing async_client checks.

  • Improved maintainability by consolidating request logic.


Changes walkthrough 📝

Relevant files
Enhancement
pinecone.py
Refactor async HTTP requests and error handling                   

semantic_router/index/pinecone.py

  • Replaced async_client with aiohttp.ClientSession for HTTP requests.
  • Added headers for API requests to enhance security and compatibility.
  • Removed checks for async_client initialization in error handling.
  • Updated all async methods to use aiohttp.ClientSession.
  • +81/-57 

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    …improve error handling
    
    - Replaced async_client with aiohttp.ClientSession for making asynchronous HTTP requests.
    - Added headers for API requests to enhance security and compatibility.
    - Simplified error handling by removing checks for async_client initialization.
    - Improved code readability and maintainability by consolidating request logic.
    Copy link

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Possible Issue

    The new aiohttp.ClientSession is being created multiple times within each async method. This could lead to resource exhaustion or performance issues. Consider reusing a single session instance across the class.

        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"https://{self.host}/query",
                json=params,
                headers=self.headers,
            ) as response:
                return await response.json(content_type=None)
    
    async def _async_list_indexes(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.base_url}/indexes",
                headers=self.headers,
            ) as response:
                return await response.json(content_type=None)
    
    async def _async_upsert(
        self,
        vectors: list[dict],
        namespace: str = "",
    ):
        params = {
            "vectors": vectors,
            "namespace": namespace,
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"https://{self.host}/vectors/upsert",
                json=params,
                headers=self.headers,
            ) as response:
                res = await response.json(content_type=None)
                return res
    
    async def _async_create_index(
        self,
        name: str,
        dimension: int,
        cloud: str,
        region: str,
        metric: str = "dotproduct",
    ):
        params = {
            "name": name,
            "dimension": dimension,
            "metric": metric,
            "spec": {"serverless": {"cloud": cloud, "region": region}},
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.base_url}/indexes",
                json=params,
                headers=self.headers,
            ) as response:
                return await response.json(content_type=None)
    
    async def _async_delete(self, ids: list[str], namespace: str = ""):
        params = {
            "ids": ids,
            "namespace": namespace,
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"https://{self.host}/vectors/delete",
                json=params,
                headers=self.headers,
            ) as response:
                return await response.json(content_type=None)
    
    async def _async_describe_index(self, name: str):
        async with aiohttp.ClientSession() as session:
            async with session.get(
                f"{self.base_url}/indexes/{name}",
                headers=self.headers,
            ) as response:
                return await response.json(content_type=None)
    
    async def _async_get_all(
        self, prefix: Optional[str] = None, include_metadata: bool = False
    ) -> tuple[list[str], list[dict]]:
        """Retrieves all vector IDs from the Pinecone index using pagination
        asynchronously.
    
        :param prefix: The prefix to filter the vectors by.
        :type prefix: Optional[str]
        :param include_metadata: Whether to include metadata in the response.
        :type include_metadata: bool
        :return: A tuple containing a list of vector IDs and a list of metadata dictionaries.
        :rtype: tuple[list[str], list[dict]]
        """
        if self.index is None:
            raise ValueError("Index is None, could not retrieve vector IDs.")
        if self.host == "":
            raise ValueError("self.host is not initialized.")
    
        all_vector_ids = []
        next_page_token = None
    
        if prefix:
            prefix_str = f"?prefix={prefix}"
        else:
            prefix_str = ""
    
        list_url = f"https://{self.host}/vectors/list{prefix_str}"
        params: dict = {}
        if self.namespace:
            params["namespace"] = self.namespace
        metadata = []
    
        while True:
            if next_page_token:
                params["paginationToken"] = next_page_token
    
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    list_url,
                    params=params,
                    headers=self.headers,
                ) as response:
    Error Handling

    The error handling for HTTP responses is inconsistent. While some methods log errors when the response status is not 200, others do not. Ensure consistent error handling and consider raising exceptions for non-200 responses.

                    if response.status != 200:
                        error_text = await response.text()
                        logger.error(f"Error fetching vectors: {error_text}")
                        break
    
                response_data = await response.json(content_type=None)
    
            vector_ids = [vec["id"] for vec in response_data.get("vectors", [])]
            if not vector_ids:
                break
            all_vector_ids.extend(vector_ids)
    
            if include_metadata:
                metadata_tasks = [self._async_fetch_metadata(id) for id in vector_ids]
                metadata_results = await asyncio.gather(*metadata_tasks)
                metadata.extend(metadata_results)
    
            next_page_token = response_data.get("pagination", {}).get("next")
            if not next_page_token:
                break
    
        return all_vector_ids, metadata
    
    async def _async_fetch_metadata(
        self,
        vector_id: str,
        namespace: str | None = None,
    ) -> dict:
        """Fetch metadata for a single vector ID asynchronously using the
        async_client.
    
        :param vector_id: The ID of the vector to fetch metadata for.
        :type vector_id: str
        :param namespace: The namespace to fetch metadata for.
        :type namespace: str | None
        :return: A dictionary containing the metadata for the vector.
        :rtype: dict
        """
        if self.host == "":
            raise ValueError("self.host is not initialized.")
        url = f"https://{self.host}/vectors/fetch"
    
        params = {
            "ids": [vector_id],
        }
    
        if namespace:
            params["namespace"] = [namespace]
        elif self.namespace:
            params["namespace"] = [self.namespace]
    
        headers = {
            "Api-Key": self.api_key,
        }
    
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params, headers=headers) as response:
                if response.status != 200:
                    error_text = await response.text()
                    logger.error(f"Error fetching metadata: {error_text}")
                    return {}
    
                try:
                    response_data = await response.json(content_type=None)
                except Exception as e:
                    logger.warning(f"No metadata found for vector {vector_id}: {e}")
                    return {}
    
                return (
                    response_data.get("vectors", {})
                    .get(vector_id, {})
                    .get("metadata", {})
                )

    Copy link

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    General
    Reuse aiohttp.ClientSession for efficiency

    Ensure that aiohttp.ClientSession is reused instead of creating a new session for
    every request, as creating multiple sessions can lead to resource exhaustion and
    performance issues.

    semantic_router/index/pinecone.py [729-735]

    -async with aiohttp.ClientSession() as session:
    -    async with session.post(
    -        f"https://{self.host}/query",
    -        json=params,
    -        headers=self.headers,
    -    ) as response:
    -        return await response.json(content_type=None)
    +async with self.session.post(
    +    f"https://{self.host}/query",
    +    json=params,
    +    headers=self.headers,
    +) as response:
    +    return await response.json(content_type=None)
    Suggestion importance[1-10]: 9

    Why: Reusing aiohttp.ClientSession is a significant improvement as it avoids resource exhaustion and enhances performance. The suggestion is accurate and directly addresses a potential issue in the PR.

    9
    Handle invalid JSON responses gracefully

    Add error handling for cases where await response.json() fails due to invalid JSON,
    to prevent unhandled exceptions and ensure robustness.

    semantic_router/index/pinecone.py [853]

    -response_data = await response.json(content_type=None)
    +try:
    +    response_data = await response.json(content_type=None)
    +except aiohttp.ContentTypeError:
    +    logger.error("Invalid JSON response received")
    +    return {}
    Suggestion importance[1-10]: 8

    Why: Adding error handling for invalid JSON responses improves robustness and prevents unhandled exceptions. The suggestion is relevant and correctly addresses a potential issue in the PR.

    8
    Log HTTP status codes for errors

    Log the HTTP status code and response body when a non-200 status code is encountered
    to aid in debugging and monitoring.

    semantic_router/index/pinecone.py [848-850]

     if response.status != 200:
         error_text = await response.text()
    -    logger.error(f"Error fetching vectors: {error_text}")
    +    logger.error(f"Error fetching vectors: {error_text}, Status Code: {response.status}")
         break
    Suggestion importance[1-10]: 6

    Why: Logging HTTP status codes along with error messages provides better debugging information. The suggestion is useful but offers a relatively minor improvement compared to other suggestions.

    6
    Possible issue
    Validate URLs before making requests

    Validate that self.host and self.base_url are properly set before making requests to
    avoid runtime errors due to malformed or missing URLs.

    semantic_router/index/pinecone.py [738-743]

    +if not self.base_url:
    +    raise ValueError("Base URL is not initialized.")
     async with aiohttp.ClientSession() as session:
         async with session.get(
             f"{self.base_url}/indexes",
             headers=self.headers,
         ) as response:
             return await response.json(content_type=None)
    Suggestion importance[1-10]: 7

    Why: Validating self.base_url ensures that requests are made to valid URLs, reducing the risk of runtime errors. The suggestion is appropriate and enhances the reliability of the code.

    7

    italianconcerto and others added 10 commits January 23, 2025 10:05
    …ient
    
    - Removed the async_client attribute and its related initialization logic.
    - Consolidated API key and headers setup into the constructor for clarity.
    - Enhanced code readability by eliminating redundant code and improving structure.
    - Updated the constructor to retrieve the API key from environment variables if not provided.
    - Added a validation step to ensure the API key is present, raising a ValueError if missing.
    - Consolidated header management by using the instance's headers attribute for API requests, improving code clarity and maintainability.
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants