Introduction#
- Create a stack for creating opensearch serverless collection
- Transform text to embedding vectors
- Load embedding vectors to the collection
- Query the collection
- Create a QA retrieval with historical chat
Stack#
We need to prepare access policy, network policy and encryption policy. First, let create access policy
[{"Description": "Rule 1","Rules": [{"ResourceType": "collection","Resource": ["collection/demo"],"Permission": ["aoss:CreateCollectionItems","aoss:DeleteCollectionItems","aoss:UpdateCollectionItems","aoss:DescribeCollectionItems"]},{"ResourceType": "index","Resource": ["index/demo/*"],"Permission": ["aoss:CreateIndex","aoss:DeleteIndex","aoss:UpdateIndex","aoss:DescribeIndex","aoss:ReadDocument","aoss:WriteDocument"]}],"Principal": ["arn:aws:sts::111222333444:assumed-role/TeamRole/MasterKey"]}]
Then create network policy
[{"Rules": [{"ResourceType": "collection","Resource": ["collection/demo*"]},{"ResourceType": "dashboard","Resource": ["collection/demo*"]}],"AllowFromPublic": true}]
Then, create encryption policy
{"Rules": [{"ResourceType": "collection","Resource": ["collection/demo*"]}],"AWSOwnedKey": true}
Finally, create collection
import { Stack, StackProps, aws_opensearchserverless } from "aws-cdk-lib";import { Construct } from "constructs";import * as fs from "fs";import * as path from "path";const strAccessPolicy = JSON.stringify(JSON.parse(fs.readFileSync(path.join(__dirname, "./../policy/access-policy.json"),"utf-8")));const strNetworkPolicy = JSON.stringify(JSON.parse(fs.readFileSync(path.join(__dirname, "./../policy/network-policy.json"),"utf-8")));const strEncryptPolicy = JSON.stringify(JSON.parse(fs.readFileSync(path.join(__dirname, "./../policy/encryption-policy.json"),"utf-8")));interface AOSSProps extends StackProps {arnPrincipal: string;}export class AOSSStack extends Stack {constructor(scope: Construct, id: string, props: AOSSProps) {super(scope, id, props);const collection = new aws_opensearchserverless.CfnCollection(this,"demo",{name: "demo",description: "vector search demo",type: "VECTORSEARCH",standbyReplicas: "DISABLED",});const accessPolicy = new aws_opensearchserverless.CfnAccessPolicy(this,"accessPolicyDemo",{name: "demo-access-policy",type: "data",description: "access policy demo",policy: strAccessPolicy,});const networkPolicy = new aws_opensearchserverless.CfnSecurityPolicy(this,"networkPolicyDemo",{name: "network-policy-demo",type: "network",description: "network policy demo",policy: strNetworkPolicy,});const encryptionPolicy = new aws_opensearchserverless.CfnSecurityPolicy(this,"encryptionPolicyDemo",{name: "encryption-policy-demo",type: "encryption",description: "encryption policy demo",policy: strEncryptPolicy,});collection.addDependency(networkPolicy);collection.addDependency(encryptionPolicy);collection.addDependency(accessPolicy);}}
Vectors#
Let load sample data, split into chunks, and transform into embedding vectors. There are many options to choose an embedding model such as OpenAIEmbeddings or Bedrock.
First let load OpenAI Token from env variable
# load sample dataloader = TextLoader("state_of_the_union.txt")documents = loader.load()# split into chunkstext_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)docs = text_splitter.split_documents(documents)
Then load some sample data
from langchain_community.document_loaders import TextLoaderfrom langchain.text_splitter import CharacterTextSplitter# load sample dataloader = TextLoader("state_of_the_union.txt")documents = loader.load()# split into chunkstext_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)docs = text_splitter.split_documents(documents)
Finally transform text chunks to embedding vectors size of 3072 in this case.
# openai embeddingfrom langchain_openai import OpenAIEmbeddingsembeddings = OpenAIEmbeddings(model="text-embedding-3-large")# transform a chunk text to num vectorresponse = embeddings.embed_query(docs[0].page_content)# transform chunks to num vectorsvectors = [embeddings.embed_query(docs[i].page_content) for i in range(len(docs))]
Build bulk data
index_name = "demo"# bulk indexingdata = ""for i in range(len(docs)):data += json.dumps({"index": {"_index": index_name}}) + "\n"data += json.dumps({"vector_field": vectors[i], "text":docs[i].page_content, "title": f"chunk {i}"}) + "\n"
Indexing#
Create an array of vectors and then use bulk load to index data into opensearch collection.
# load sample dataloader = TextLoader("state_of_the_union.txt")documents = loader.load()# split into chunkstext_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)docs = text_splitter.split_documents(documents)
Let create aoss client
# bulk indexingfrom opensearchpy import OpenSearch, RequestsHttpConnectionfrom requests_aws4auth import AWS4Authimport boto3service = 'aoss'region = 'us-east-1'credentials = boto3.Session().get_credentials()awsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,service,session_token=credentials.token,)service = 'aoss'region = 'us-east-1'credentials = boto3.Session().get_credentials()awsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,service,session_token=credentials.token,)
Create an index name demo which support vector knn type
# create index with vector supportclient.indices.create(index='demo',body={"settings": {"index.knn": True},"mappings": {"properties": {"vector_field": {"type": "knn_vector","dimension": 3072},"text": {"type": "text"},"title": {"type": "text"},}}})
Then bulk load
client.bulk(data)
Query#
Let simply get an document by its id
# get document by indexx = client.get(index=index_name,id="1%3A0%3AYc8Iyo0BE34oY8mOyId1",)
Query by providing a vector
# query indexclient.search(index=index_name,body={"size": 5,"query": {"knn": {"vector_field": {"vector": vectors[1],"k": 5}}}})
LangChain#
Let create a docsearch using LangChain
from langchain_community.vectorstores import OpenSearchVectorSearchdocsearch = OpenSearchVectorSearch(embedding_function=embeddings,opensearch_url=f"https://{os.environ['OASS_URL']}",http_auth=awsauth,timeout=300,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection,index_name='demo',engine='nmslib')
Then perform a simple simlar search
x = docsearch.similarity_search(query='Putin’s latest attack on Ukraine was premeditated and unprovoked.',vector_field='vector_field',text_field='text',k=2)
Retriever#
Let use LangChain to create
- Simple chain (llm, retriever, prompt)
- Single question
Create bedrock llm
from langchain.chains import RetrievalQAfrom langchain.llms.bedrock import Bedrockfrom langchain.prompts import PromptTemplate# bedrock clientbedrock_clien = boto3.client("bedrock-runtime", region_name="us-east-1")# bedrock llmbedrock_llm = Bedrock(model_id="anthropic.claude-v2",client=bedrock_clien,model_kwargs={'temperature':0.8})
Create prompt template
# promt templateprompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content{context}Question: {question}Answer:"""PROMPT = PromptTemplate(template=prompt_template,input_variables=["context", "question"])
Then create the first RetrievalQA
qa = RetrievalQA.from_chain_type(llm=bedrock_llm,chain_type="stuff",retriever=docsearch.as_retriever(),return_source_documents=True,chain_type_kwargs={"prompt": PROMPT, "verbose": True},verbose=True)
Let ask a single question
# response = qa("Who is Puttin?", return_only_outputs=False)
Alternatively, we can use ChatOpenAI model
llm = ChatOpenAI(temperature=0.85)llm.invoke("how to cook chicken soup?")
Then create a chain
qa = RetrievalQA.from_chain_type(llm=llm,chain_type="stuff",retriever=docsearch.as_retriever(),return_source_documents=True,chain_type_kwargs={"prompt": PROMPT, "verbose": True},verbose=True)
Let ask a question. Under the hood context is retrieved from the opensearch collection and prompt to the llm model.
response = qa("Tell me what is Puttion's opinion about Ukraine war?",return_only_outputs=False)
Historical Awareness#
Let use LCEL chain to create a historical awareness. There are to prompts
- Based on historical chat, LangChain prompt llm to create a search query
- Based on the context and user input, LangChain prompt llm for final answer
from langchain.chains import create_history_aware_retrieverfrom langchain_core.prompts import MessagesPlaceholderfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.messages import HumanMessage, AIMessagefrom langchain.chains.combine_documents import create_stuff_documents_chainfrom langchain.chains import create_retrieval_chain
Let create the first prompt which for creating search query
# 1. prompt and history to generate search query# a prompt that we pass into an LLM to generate this search query# when invoke it return documentsprompt = ChatPromptTemplate.from_messages([MessagesPlaceholder(variable_name="chat_history"),("user", "{input}"),("user", "Given the above conversation, generate a search query to look up in order to get information relevant to the conversation")])retriever_chain = create_history_aware_retriever(llm=llm,retriever=docsearch.as_retriever(),prompt=prompt)chat_history = [HumanMessage(content="Who is Puttin?"),AIMessage(content="Puttin is the president of Russia")]
Let invoke and get a set of documents which relavent to historical chat and user question
# invoke to get a set of documentsresponse = retriever_chain.invoke({"chat_history": chat_history,"input": "Tel me Puttin's opinion about Ukraine war"})
Now let create the end-to-end chain
# given the new retriever, create an end to end chatprompt = ChatPromptTemplate.from_messages([("system", "Answer the user's questions based on the below context:\n\n{context}"),MessagesPlaceholder(variable_name="chat_history"),("user", "{input}"),])document_chain = create_stuff_documents_chain(llm, prompt)retrieval_chain = create_retrieval_chain(retriever_chain, document_chain)chat_history = [HumanMessage(content="Who is Puttin?"),AIMessage(content="Puttin is the president of Russia")]
Let ask a question
response = retrieval_chain.invoke({"chat_history": chat_history,"input": "Tell me about Puttin and Ukraine war"})