Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearch as a cache and vector store #623

Open
eswarthammana opened this issue Apr 8, 2024 · 1 comment
Open

OpenSearch as a cache and vector store #623

eswarthammana opened this issue Apr 8, 2024 · 1 comment

Comments

@eswarthammana
Copy link

eswarthammana commented Apr 8, 2024

what are the modifications, i have to perform to use OpenSearch as a Cache for exact match and in case of semantic cache OpenSearch as a vector store and cache.

Thank you

@eswarthammana
Copy link
Author

i have the code to add ai and user messages to the OpenSearch this is just a sample still in exploration, where i am currently lack of auto session, cache polices etc, which i found you have better algo's to maintain.

from time import time
from typing import List, Optional
import json

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
)

from opensearchpy import OpenSearch
from logs.logger import Log #my custom logger class


class OpenSearchChatMessageHistory(Log, BaseChatMessageHistory):
    """Chat message history that stores history in OpenSearch.

    Args:
        index (str): Name of the index to use.
        session_id (str): Arbitrary key that is used to store the messages
            of a single chat session.
        opensearch_url (Optional[str]): URL of the OpenSearch instance to connect to.
            Defaults to "http://localhost:9200".
        ensure_ascii (Optional[bool]): Used to escape ASCII symbols in json.dumps.
            Defaults to True.
    """

    def __init__(
        self,
        index: str,
        session_id: str,
        opensearch_url: Optional[str] = "http://localhost:9200",
        ensure_ascii: Optional[bool] = True,
    ) -> None:
        super().__init__()
        self.log_info("Initializing the OpenSearchChatMessageHistory class.")
        self.index: str = index
        self.session_id: str = session_id
        self.ensure_ascii: bool = ensure_ascii

        self.client: OpenSearch = OpenSearch([opensearch_url])

        if self.client.indices.exists(index=index):
            self.log_info(
                f"Chat history index '{index}' already exists, skipping creation."
            )
        else:
            self.log_info(f"Creating index '{index}' for storing chat history.")
            self.client.indices.create(
                index=index,
                body={
                    "mappings": {
                        "properties": {
                            "session_id": {"type": "keyword"},
                            "created_at": {"type": "date"},
                            "history": {"type": "text"},
                        }
                    }
                },
            )
        self.log_info("OpenSearchChatMessageHistory class initialized successfully.")

    @property
    def messages(self) -> List[BaseMessage]:
        """Retrieve the messages from OpenSearch."""
        self.log_info("Loading messages from OpenSearch to buffer.")
        result = self.client.search(
            index=self.index,
            body={
                "query": {
                    "term": {
                        "session_id": self.session_id
                    }
                }
            },
            sort="created_at:asc",
        )

        items = [
            json.loads(document["_source"]["history"])
            for document in result.get("hits", {}).get("hits", [])
        ] if result else []

        self.log_info("Messages loaded from OpenSearch to buffer.")
        return [messages_from_dict(item) for item in items]

    def add_message(self, message: BaseMessage) -> None:
        """Add a message to the chat session in OpenSearch."""
        self.log_info("Adding messages to OpenSearch.")
        self.client.index(
            index=self.index,
            body={
                "session_id": self.session_id,
                "created_at": round(time() * 1000),
                "history": json.dumps(
                    message_to_dict(message),
                    ensure_ascii=self.ensure_ascii,
                ),
            },
            refresh=True,
        )
        self.log_info("Messages added to OpenSearch.")

    def clear(self) -> None:
        """Clear session memory in OpenSearch."""
        self.log_info("Purging data in OpenSearch started.")
        self.client.delete_by_query(
            index=self.index,
            body={
                "query": {
                    "term": {
                        "session_id": self.session_id
                        }
                    }
                },
            refresh=True,
        )
        self.log_info("OpenSearch data purged.")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant