Introduction#

  • Create a stack for creating opensearch serverless collection
  • Transform text to embedding vectors
  • Load embedding vectors to the collection
  • Query the collection
  • Create a QA retrieval with historical chat

Stack#

We need to prepare access policy, network policy and encryption policy. First, let create access policy

[
{
"Description": "Rule 1",
"Rules": [
{
"ResourceType": "collection",
"Resource": ["collection/demo"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
]
},
{
"ResourceType": "index",
"Resource": ["index/demo/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
]
}
],
"Principal": ["arn:aws:sts::111222333444:assumed-role/TeamRole/MasterKey"]
}
]

Then create network policy

[
{
"Rules": [
{
"ResourceType": "collection",
"Resource": ["collection/demo*"]
},
{
"ResourceType": "dashboard",
"Resource": ["collection/demo*"]
}
],
"AllowFromPublic": true
}
]

Then, create encryption policy

{
"Rules": [
{
"ResourceType": "collection",
"Resource": ["collection/demo*"]
}
],
"AWSOwnedKey": true
}

Finally, create collection

import { Stack, StackProps, aws_opensearchserverless } from "aws-cdk-lib";
import { Construct } from "constructs";
import * as fs from "fs";
import * as path from "path";
const strAccessPolicy = JSON.stringify(
JSON.parse(
fs.readFileSync(
path.join(__dirname, "./../policy/access-policy.json"),
"utf-8"
)
)
);
const strNetworkPolicy = JSON.stringify(
JSON.parse(
fs.readFileSync(
path.join(__dirname, "./../policy/network-policy.json"),
"utf-8"
)
)
);
const strEncryptPolicy = JSON.stringify(
JSON.parse(
fs.readFileSync(
path.join(__dirname, "./../policy/encryption-policy.json"),
"utf-8"
)
)
);
interface AOSSProps extends StackProps {
arnPrincipal: string;
}
export class AOSSStack extends Stack {
constructor(scope: Construct, id: string, props: AOSSProps) {
super(scope, id, props);
const collection = new aws_opensearchserverless.CfnCollection(
this,
"demo",
{
name: "demo",
description: "vector search demo",
type: "VECTORSEARCH",
standbyReplicas: "DISABLED",
}
);
const accessPolicy = new aws_opensearchserverless.CfnAccessPolicy(
this,
"accessPolicyDemo",
{
name: "demo-access-policy",
type: "data",
description: "access policy demo",
policy: strAccessPolicy,
}
);
const networkPolicy = new aws_opensearchserverless.CfnSecurityPolicy(
this,
"networkPolicyDemo",
{
name: "network-policy-demo",
type: "network",
description: "network policy demo",
policy: strNetworkPolicy,
}
);
const encryptionPolicy = new aws_opensearchserverless.CfnSecurityPolicy(
this,
"encryptionPolicyDemo",
{
name: "encryption-policy-demo",
type: "encryption",
description: "encryption policy demo",
policy: strEncryptPolicy,
}
);
collection.addDependency(networkPolicy);
collection.addDependency(encryptionPolicy);
collection.addDependency(accessPolicy);
}
}

Vectors#

Let load sample data, split into chunks, and transform into embedding vectors. There are many options to choose an embedding model such as OpenAIEmbeddings or Bedrock.

First let load OpenAI Token from env variable

# load sample data
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
# split into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

Then load some sample data

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
# load sample data
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
# split into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

Finally transform text chunks to embedding vectors size of 3072 in this case.

# openai embedding
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
# transform a chunk text to num vector
response = embeddings.embed_query(docs[0].page_content)
# transform chunks to num vectors
vectors = [embeddings.embed_query(docs[i].page_content) for i in range(len(docs))]

Build bulk data

index_name = "demo"
# bulk indexing
data = ""
for i in range(len(docs)):
data += json.dumps({"index": {"_index": index_name}}) + "\n"
data += json.dumps({"vector_field": vectors[i], "text":docs[i].page_content, "title": f"chunk {i}"}) + "\n"

Indexing#

Create an array of vectors and then use bulk load to index data into opensearch collection.

# load sample data
loader = TextLoader("state_of_the_union.txt")
documents = loader.load()
# split into chunks
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

Let create aoss client

# bulk indexing
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
service = 'aoss'
region = 'us-east-1'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)
service = 'aoss'
region = 'us-east-1'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)

Create an index name demo which support vector knn type

# create index with vector support
client.indices.create(
index='demo',
body={
"settings": {
"index.knn": True
},
"mappings": {
"properties": {
"vector_field": {
"type": "knn_vector",
"dimension": 3072
},
"text": {
"type": "text"
},
"title": {
"type": "text"
},
}
}
}
)

Then bulk load

client.bulk(data)

Query#

Let simply get an document by its id

# get document by index
x = client.get(
index=index_name,
id="1%3A0%3AYc8Iyo0BE34oY8mOyId1",
)

Query by providing a vector

# query index
client.search(
index=index_name,
body={
"size": 5,
"query": {
"knn": {
"vector_field": {
"vector": vectors[1],
"k": 5
}
}
}
}
)

LangChain#

Let create a docsearch using LangChain

from langchain_community.vectorstores import OpenSearchVectorSearch
docsearch = OpenSearchVectorSearch(
embedding_function=embeddings,
opensearch_url=f"https://{os.environ['OASS_URL']}",
http_auth=awsauth,
timeout=300,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
index_name='demo',
engine='nmslib'
)

Then perform a simple simlar search

x = docsearch.similarity_search(
query='Putin’s latest attack on Ukraine was premeditated and unprovoked.',
vector_field='vector_field',
text_field='text',
k=2
)

Retriever#

Let use LangChain to create

  • Simple chain (llm, retriever, prompt)
  • Single question

Create bedrock llm

from langchain.chains import RetrievalQA
from langchain.llms.bedrock import Bedrock
from langchain.prompts import PromptTemplate
# bedrock client
bedrock_clien = boto3.client("bedrock-runtime", region_name="us-east-1")
# bedrock llm
bedrock_llm = Bedrock(
model_id="anthropic.claude-v2",
client=bedrock_clien,
model_kwargs={'temperature':0.8}
)

Create prompt template

# promt template
prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content
{context}
Question: {question}
Answer:"""
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)

Then create the first RetrievalQA

qa = RetrievalQA.from_chain_type(
llm=bedrock_llm,
chain_type="stuff",
retriever=docsearch.as_retriever(),
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT, "verbose": True},
verbose=True)

Let ask a single question

# response = qa("Who is Puttin?", return_only_outputs=False)

Alternatively, we can use ChatOpenAI model

llm = ChatOpenAI(temperature=0.85)
llm.invoke("how to cook chicken soup?")

Then create a chain

qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=docsearch.as_retriever(),
return_source_documents=True,
chain_type_kwargs={"prompt": PROMPT, "verbose": True},
verbose=True)

Let ask a question. Under the hood context is retrieved from the opensearch collection and prompt to the llm model.

response = qa(
"Tell me what is Puttion's opinion about Ukraine war?",
return_only_outputs=False)

Historical Awareness#

Let use LCEL chain to create a historical awareness. There are to prompts

  1. Based on historical chat, LangChain prompt llm to create a search query
  2. Based on the context and user input, LangChain prompt llm for final answer
from langchain.chains import create_history_aware_retriever
from langchain_core.prompts import MessagesPlaceholder
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage, AIMessage
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain

Let create the first prompt which for creating search query

# 1. prompt and history to generate search query
# a prompt that we pass into an LLM to generate this search query
# when invoke it return documents
prompt = ChatPromptTemplate.from_messages([
MessagesPlaceholder(variable_name="chat_history"),
("user", "{input}"),
("user", "Given the above conversation, generate a search query to look up in order to get information relevant to the conversation")
])
retriever_chain = create_history_aware_retriever(
llm=llm,
retriever=docsearch.as_retriever(),
prompt=prompt
)
chat_history = [
HumanMessage(content="Who is Puttin?"),
AIMessage(content="Puttin is the president of Russia")
]

Let invoke and get a set of documents which relavent to historical chat and user question

# invoke to get a set of documents
response = retriever_chain.invoke({
"chat_history": chat_history,
"input": "Tel me Puttin's opinion about Ukraine war"
})

Now let create the end-to-end chain

# given the new retriever, create an end to end chat
prompt = ChatPromptTemplate.from_messages([
("system", "Answer the user's questions based on the below context:\n\n{context}"),
MessagesPlaceholder(variable_name="chat_history"),
("user", "{input}"),
])
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever_chain, document_chain)
chat_history = [
HumanMessage(content="Who is Puttin?"),
AIMessage(content="Puttin is the president of Russia")
]

Let ask a question

response = retrieval_chain.invoke({
"chat_history": chat_history,
"input": "Tell me about Puttin and Ukraine war"
})

Reference#