Using the Search Stack with Langchain#
Langchain is a popular framework for building applications with large language models, and you can use Aryn’s Conversational Search Stack to power apps created with Langchain. In this example, we will show how to use the OpenSearch’s Conversation Memory feature set, which is part of stack, to store chat history for a LangChain app. This tutorial will walk through creating an implementation of Chat History that integrates with LangChain primitives, and calls out to OpenSearch Conversation Memory APIs.
OpenSearch Conversation Memory Client#
Since the Conversation Memory API is quite new, it isn’t yet available in the various language clients that integrate with OpenSearch. Luckily, it’s super easy to just create it. First, for dependencies: this requires the opensearchpy
package, OpenSearch’s python client
pip install opensearch-py
Then, imports:
from opensearchpy.client.utils import NamespacedClient, _make_path
from opensearchpy import OpenSearch
import json
Now, we’ll create our ConversationMemoryOpensearchClient
as a subclass of NamespaceClient
, which is an abstract client class that constructs off of a basic OpenSearch
client, and provides the functionality to hit any OpenSearch endpoint exposed by a cluster. We will define a method for each of the Conversation Memory APIs:
class ConversationMemoryOpensearchClient(NamespacedClient):
def create_conversation(self, name: str=None):
return self.transport.perform_request(
method="POST",
url=_make_path("_plugins", "_ml", "memory", "conversation"),
body=({"name": name} if name is not None else None)
)
def create_interaction(self, conversation_id: str, input: str,
prompt: str, response: str, origin: str,
additional_info: dict):
return self.transport.perform_request(
method="POST",
url=_make_path("_plugins", "_ml", "memory", "conversation", conversation_id),
body={
"input": input,
"prompt": prompt,
"response": response,
"origin": origin,
"additional_info": json.dumps(additional_info)
}
)
def get_conversations(self, max_results: int = None, next_token: int = None):
params = {}
if max_results:
params["max_results"] = max_results
if next_token:
params["next_token"] = next_token
return self.transport.perform_request(
method="GET",
url=_make_path("_plugins", "_ml", "memory", "conversation"),
params=params if len(params) != 0 else None
)
def get_interactions(self, conversation_id: str, max_results: int = None,
next_token: int = None):
params = {}
if max_results:
params["max_results"] = max_results
if next_token:
params["next_token"] = next_token
return self.transport.perform_request(
method="GET",
url=_make_path("_plugins", "_ml", "memory", "conversation", conversation_id),
params=params if len(params) != 0 else None
)
def delete_conversation(self, conversation_id: str):
return self.transport.perform_request(
method="DELETE",
url=_make_path("_plugins", "_ml", "memory", "conversation", conversation_id),
)
We can construct one like this:
opensearch_client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_compress = True, # enables gzip compression for request bodies
use_ssl = False,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False
)
conversation_client = ConversationMemoryOpensearchClient(opensearch_client)
With this implementation, we hit the endpoints of the Conversation Memory API with the appropriate arguments, using a method provided by NamespaceClient
that does just that. The return values are all parsed into python dicts and lists from JSON, so all that’s required is minimal logic to determine what optional parameters to send.
LangChain ChatHistory#
Now let’s use this client to implement a LangChain ChatHistory management object. This will be a subclass of BaseChatMessageHistory
, LangChain’s base class for handling chat history over a number of backends. We need to implement a constructor, a messages
method/property, an add_message
method, and a clear
method. First, if you don’t have LangChain installed, then let’s install it
pip install langchain
Next, imports:
from typing import List
from langchain.schema import BaseChatMessageHistory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
Now, for the implementation:
class OpenSearchChatMessageHistory(BaseChatMessageHistory):
def __init__(
self,
client: OpenSearch,
conversation_id: str = None
):
self.conversation_client = ConversationMemoryOpensearchClient(client)
self.conversation_id = conversation_id
self.pending = []
# Validate that this conversation id exists
if not self._validate_conversation_id():
self.conversation_id = None
def _validate_conversation_id(self):
if self.conversation_id == None:
return False
next_token = 0
conversations = []
while self.conversation_id not in conversations:
conversations_response = self.conversational_client.get_conversations(next_token=next_token, max_results=100)
conversations = [c["conversation_id"] for c in conversations_response.get("conversation_id")]
if self.conversation_id in conversations:
return True
if "next_token" not in conversations_response:
return False
next_token = conversations_response.get(next_token)
return False
@property
def messages(self) -> List[BaseMessage]:
if self.conversation_id is None:
return []
messages = []
next_token = 0
while True:
response = self.conversation_client.get_interactions(self.conversation_id, next_token=next_token, max_results=100)
for interaction in response.get("interactions"):
messages.insert(0, AIMessage(content=interaction.get("response")))
messages.insert(0, HumanMessage(content=interaction.get("input")))
if "next_token" not in response:
return messages + self.pending
next_token = response.get("next_token")
def add_message(self, message: BaseMessage):
# If no conversation is active, create one
if self.conversation_id is None:
self.conversation_id = self.conversation_client.create_conversation(name=message.content).get("conversation_id")
# If no pending message then no pairs are possible
if len(self.pending) == 0:
self.pending.append(message)
# If the pending messages are the same type as this message, then it's not a pair
elif type(message) == type(self.pending[0]):
self.pending.append(message)
# If the pending messages are different type than this message, it's a pair and we can write
else:
for msg in reversed(self.pending[1:]):
if type(msg) == HumanMessage:
self.conversation_client.create_interaction(
self.conversation_id, msg.content, "", "", "", {}
)
if type(msg) == AIMessage:
self.conversation_client.create_interaction(
self.conversation_id, "", "", msg.content, "", {}
)
if type(message) == HumanMessage:
self.conversation_client.create_interaction(
self.conversation_id, message.content, "", self.pending[0].content, "", {}
)
if type(message) == AIMessage:
self.conversation_client.create_interaction(
self.conversation_id, self.pending[0].content, "", message.content, "", {}
)
self.pending = []
def clear(self):
self.conversation_client.delete_conversation(self.conversation_id)
self.pending = []
Let’s go over this method by method.
__init__
: We make aConversationMemoryOpensearchClient
to use for all our API calls, and we construct this from the OpenSearch client object to keep complexity to a minimum. We’ll also be holding onto a singleconversation_id
for the duration of this Chat History. Additionally, we introduce a list calledpending
, because LangChain expects to be able to put individual messages into history one at a time. On the other hand, OpenSearch’s Conversation API is built around interactions, which represent a pair of messages (e.g. query and response). So,pending
represents a message that doesn’t yet have a pair. For this exmple, we use a list, because it’s possible for a user to submit severalHumanMessages
before adding anAIMessage
, or vice versa (although we think this scenario is unlikely). Finally, we validate that if you passed in a conversation ID, it’s a conversation that exists. Otherwise, we set theconversation_id
to None._validate_conversation_id
: The job of this method is to determine whether the currently held conversation ID exists. We do this by retrieving the list of conversations, and checking to see if the currently held conversation ID is in that list. Since the GetConversations API is paginated, we iteratively step through it in case the conversation we’re looking for isn’t on the first page. If we run out of pages, the conversation doesn’t exist.messages
: This method retrieves the list of messages in the conversation and arranges them for LangChain. There are a couple of oddities about this. First, the GetInteractions API is paginated, but LangChain expects to receive all of the messages at once, so we again have to iterate through the pages. Second, the GetInteractions API retrieves interactions sorted from most recent to least recent; so we have to build the list of messages backwards. Third, an interaction consists of two messages, so we have to decompose that and add two messages for every interaction. Lastly, if there are any pending messages, they will not be returned by the Conversation API, since they are yet to enter the OpenSearch indices, so we need to tack those on at the end too.add_message
: This method adds a message to Conversational Memory. Because of the mismatch between LangChain messages and Conversational Memory interactions described above, there is some added complexity:If nothing is pending, the new message doesn’t have a pair, so we put it in the ‘pending’ container to wait for a pair.
If the new message is the same kind of message as the pending messages (e.g. ‘HumanMessages’), then we still don’t have a pair of Human/AI messages, so we add the new message onto the pending list.
If the new message is a different kind of message from the pending messages, then we do have a pair. If there are unpaired pending messages still in the list, we assume that the pair comes from the most recent message (this preserves the message order), and iteratively enter the unpaired messages into Conversational Memory as interactions with null responses or inputs. Finally, we add the paired message to Conversational Memory, and then empty out the pending list.
clear
: This method resets everything. It deletes the conversation from the memory, un-set the current conversation id, and empties out the pending list.
LLM Chain with Memory#
This is an example on how to create a LLMChain
that uses the OpenSearch Conversation Memory API as a remote conversation memory store.
from opensearchpy import OpenSearch
from langchain.memory import ConversationBufferMemory
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
template = """You are a chatbot having a conversation with a human.
{chat_history}
Human: {human_input}
Chatbot:"""
prompt = PromptTemplate(
input_variables=["chat_history", "human_input"], template=template
)
opensearch_client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_compress = True, # enables gzip compression for request bodies
use_ssl = False,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False
)
message_history = OpenSearchChatMessageHistory(opensearch_client)
memory = ConversationBufferMemory(
memory_key="chat_history",
chat_memory=message_history
)
llm_chain = LLMChain(
llm=OpenAI(temperature=0),
prompt=prompt,
verbose=True,
memory=memory
)
And now, this LLMChain will write its interactions to OpenSearch Conversation Memory.