Init Client#
- load aws credentials from profile
- create opensearch client
package mainimport ("context""encoding/json""fmt""log""net/http""strings""time""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/service/bedrockruntime"opensearch "github.com/opensearch-project/opensearch-go/v2""github.com/opensearch-project/opensearch-go/v2/opensearchapi"requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2")type EmbedResponse struct {Embedding []float64 `json:"embedding"`}type Hits struct {Hits []map[string]interface{} `json:"hits"`}type AossResponse struct {Hits Hits `json:"hits"`}const endpoint = "https://yvp6plo4ijurgy8ymhdg.us-east-1.aoss.amazonaws.com" // e.g. https://opensearch-domain.region.com or Amazon OpenSearch Serverless endpointvar AOSSClient *opensearch.Clientvar bedrockClient *bedrockruntime.Client// create an init function to initializing opensearch clientfunc init() {//fmt.Println("init and create an opensearch client")// load aws credentials from profile demo using configawsCfg, err := config.LoadDefaultConfig(context.Background(),config.WithRegion("us-east-1"),config.WithSharedConfigProfile("demo"),)if err != nil {log.Fatal(err)}// create a aws request signer using requestsignersigner, err := requestsigner.NewSignerWithService(awsCfg, "aoss")if err != nil {log.Fatal(err)}// create an opensearch client using opensearch packageAOSSClient, err = opensearch.NewClient(opensearch.Config{Addresses: []string{endpoint},Signer: signer,})if err != nil {log.Fatal(err)}// create bedorck runtime clientbedrockClient = bedrockruntime.NewFromConfig(awsCfg)}
Query All#
Let query all item from an index
First let create struct type for later decoding opensearch response
type Hits struct {Hits []map[string]interface{} `json:"hits"`}type AossResponse struct {Hits Hits `json:"hits"`}
Then query all items from an index
func main() {// let query get all item in an indexcontent := strings.NewReader(`{"size": 5,"query": {"match_all": {}}}`)search := opensearchapi.SearchRequest{Index: []string{"demo"},Body: content,}searchResponse, err := search.Do(context.Background(), client)if err != nil {log.Fatal(err)}// fmt.Println(searchResponse.Body)var answer AossResponsejson.NewDecoder(searchResponse.Body).Decode(&answer)first := answer.Hits.Hits[0]fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])// fmt.Println(answer.Hits.Hits[0]["_id"])}
Query Vector#
- call bedrock titan to get embedding vector
- query opensearch given the vector
First let write a function to get embedding vector from bedrock titan model
func getEmbedVector(string) ([]float64, error) {// create request body to titan modelbody := map[string]interface{}{"inputText": "what is amazon bedrock?",}bodyJson, err := json.Marshal(body)if err != nil {fmt.Println(err)return nil, err}// invoke bedrock titan model to convert string to embedding vectorresponse, error := bedrockClient.InvokeModel(context.Background(),&bedrockruntime.InvokeModelInput{Body: []byte(bodyJson),ModelId: aws.String("amazon.titan-embed-text-v1"),ContentType: aws.String("application/json"),},)if error != nil {fmt.Println(error)return nil, error}// assert response to mapvar embedResponse map[string]interface{}error = json.Unmarshal(response.Body, &embedResponse)if error != nil {fmt.Println(error)return nil, error}// assert response to arrayslice, ok := embedResponse["embedding"].([]interface{})if !ok {fmt.Println(ok)}// assert to array of float64values := make([]float64, len(slice))for k, v := range slice {values[k] = float64(v.(float64))}return values, nil}
Second let query opensearch
func query(vec []float64) ([]string, error) {// let query get all item in an index// content := strings.NewReader(`{// "size": 5,// "query": {// "match_all": {}// }// }`)vecStr := make([]string, len(vec))// convert array float to stringfor k, v := range vec {if k < len(vec)-1 {vecStr[k] = fmt.Sprint(v) + ","} else {vecStr[k] = fmt.Sprint(v)}}// create request body to titan modelcontent := strings.NewReader(fmt.Sprintf(`{"size": 5,"query": {"knn": {"vector_field": {"vector": %s,"k": 5}}}}`, vecStr))// fmt.Println(content)search := opensearchapi.SearchRequest{Index: []string{"demo"},Body: content,}searchResponse, err := search.Do(context.Background(), AOSSClient)if err != nil {log.Fatal(err)}// fmt.Println(searchResponse)var answer AossResponsejson.NewDecoder(searchResponse.Body).Decode(&answer)// first := answer.Hits.Hits[0]// fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])// fmt.Println(answer.Hits.Hits[0]["_id"])queryResult := answer.Hits.Hits[0]["_source"].(map[string]interface{})["text"]if queryResult == nil {return []string{"nil"}, nil}// extract hint text onlyhits := []string{}for k, v := range answer.Hits.Hits {if k > 0 {hits = append(hits, v["_source"].(map[string]interface{})["text"].(string))}}return hits, nil// return fmt.Sprint(queryResult), nil}
Index Vector#
- index normal text
- index embedding vector
First let index a normal item to an OpenSearch index in AOSS
func indexOpenSearch() {fmt.Println("call index...")content := strings.NewReader(`{"title": "Hello","text": "Hello, world!"}`)index := opensearchapi.IndexRequest{Index: "testing",Body: content,}response, error := index.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}fmt.Println(response)}
Second let index item with embedding vector to OpenSearch index
func indexVectorOpenSearch() {fmt.Println("call indexVector...")// get embedding vectorvec, error := gobedrock.GetEmbedVector("Hello, world!", BedrockClientTest)if error != nil {fmt.Println(error)}fmt.Println(vec)// convert vector of number to stringvecStr := make([]string, len(vec))// convert array float to stringfor k, v := range vec {if k < len(vec)-1 {vecStr[k] = fmt.Sprint(v) + ","} else {vecStr[k] = fmt.Sprint(v)}}content := strings.NewReader(fmt.Sprintf(`{"title": "Hello Indexing OpenSearch","text": "Hello Indexing OpenSearch","vector_field": %s}`, vecStr))index := opensearchapi.IndexRequest{Index: "demo",Body: content,}response, error := index.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}fmt.Println(response)}
Application#
- create http server
- handle request
Here is full golang server code
index.html
package mainimport ("context""encoding/json""fmt""log""net/http""strings""time""github.com/aws/aws-sdk-go-v2/aws""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/service/bedrockruntime"opensearch "github.com/opensearch-project/opensearch-go/v2""github.com/opensearch-project/opensearch-go/v2/opensearchapi"requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2")type EmbedResponse struct {Embedding []float64 `json:"embedding"`}type Hits struct {Hits []map[string]interface{} `json:"hits"`}type AossResponse struct {Hits Hits `json:"hits"`}const endpoint = "https://yvp6plo4ijurgy8ymhdg.us-east-1.aoss.amazonaws.com" // e.g. https://opensearch-domain.region.com or Amazon OpenSearch Serverless endpointvar AOSSClient \*opensearch.Clientvar bedrockClient \*bedrockruntime.Client// create an init function to initializing opensearch clientfunc init() {//fmt.Println("init and create an opensearch client")// load aws credentials from profile demo using configawsCfg, err := config.LoadDefaultConfig(context.Background(),config.WithRegion("us-east-1"),config.WithSharedConfigProfile("demo"),)if err != nil {log.Fatal(err)}// create a aws request signer using requestsignersigner, err := requestsigner.NewSignerWithService(awsCfg, "aoss")if err != nil {log.Fatal(err)}// create an opensearch client using opensearch packageAOSSClient, err = opensearch.NewClient(opensearch.Config{Addresses: []string{endpoint},Signer: signer,})if err != nil {log.Fatal(err)}// create bedorck runtime clientbedrockClient = bedrockruntime.NewFromConfig(awsCfg)}func query(vec []float64) ([]string, error) {// let query get all item in an index// content := strings.NewReader(`{// "size": 5,// "query": {// "match_all": {}// }// }`)vecStr := make([]string, len(vec))// convert array float to stringfor k, v := range vec {if k < len(vec)-1 {vecStr[k] = fmt.Sprint(v) + ","} else {vecStr[k] = fmt.Sprint(v)}}// create request body to titan modelcontent := strings.NewReader(fmt.Sprintf(`{"size": 5,"query": {"knn": {"vector_field": {"vector": %s,"k": 5}}}}`, vecStr))// fmt.Println(content)search := opensearchapi.SearchRequest{Index: []string{"demo"},Body: content,}searchResponse, err := search.Do(context.Background(), AOSSClient)if err != nil {log.Fatal(err)}// fmt.Println(searchResponse)var answer AossResponsejson.NewDecoder(searchResponse.Body).Decode(&answer)// first := answer.Hits.Hits[0]// fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])// fmt.Println(answer.Hits.Hits[0]["_id"])queryResult := answer.Hits.Hits[0]["_source"].(map[string]interface{})["text"]if queryResult == nil {return []string{"nil"}, nil}// extract hint text onlyhits := []string{}for k, v := range answer.Hits.Hits {if k > 0 {hits = append(hits, v["_source"].(map[string]interface{})["text"].(string))}}return hits, nil// return fmt.Sprint(queryResult), nil}func getEmbedVector(string) ([]float64, error) {// create request body to titan modelbody := map[string]interface{}{"inputText": "what is amazon bedrock?",}bodyJson, err := json.Marshal(body)if err != nil {fmt.Println(err)return nil, err}// invoke bedrock titan model to convert string to embedding vectorresponse, error := bedrockClient.InvokeModel(context.Background(),&bedrockruntime.InvokeModelInput{Body: []byte(bodyJson),ModelId: aws.String("amazon.titan-embed-text-v1"),ContentType: aws.String("application/json"),},)if error != nil {fmt.Println(error)return nil, error}// assert response to mapvar embedResponse map[string]interface{}error = json.Unmarshal(response.Body, &embedResponse)if error != nil {fmt.Println(error)return nil, error}// assert response to arrayslice, ok := embedResponse["embedding"].([]interface{})if !ok {fmt.Println(ok)}// assert to array of float64values := make([]float64, len(slice))for k, v := range slice {values[k] = float64(v.(float64))}return values, nil}func main1() {fmt.Println("Hello")v, error := getEmbedVector("hello")if error != nil {fmt.Println(error)}awnsers, error := query(v)if error != nil {fmt.Println(error)}// fmt.Println(awnsers)for k, v := range awnsers {fmt.Println(k, v)fmt.Println("====================================")}}func main() {// create handler multiplexermux := http.NewServeMux()// hello messagemux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {w.Write([]byte("Hello"))})// response simple jsonmux.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {json.NewEncoder(w).Encode(struct {Message string `json:"Message"`}{Message: "Hello"})})// handle a simple post request from a formmux.HandleFunc("/form", func(w http.ResponseWriter, r *http.Request) {if r.Method == "POST" {r.ParseForm()name := r.FormValue("name")w.Write([]byte(fmt.Sprintf("Hello %s", name)))}if r.Method == "GET" {// w.Write([]byte("Hello"))http.ServeFile(w, r, "index.html")}})// query opensarchmux.HandleFunc("/query", func(w http.ResponseWriter, r *http.Request) {// parse question from userquestion := r.FormValue("query")// convert question to embedding vectorvec, error := getEmbedVector(question)if error != nil {fmt.Println(error)}// do query opensearchanswers, err := query(vec)if err != nil {fmt.Println(err)}json.NewEncoder(w).Encode(struct {Messages []string `json:"Messages"`}{Messages: answers})})// create a http server using httpserver := http.Server{Addr: ":3000",Handler: mux,ReadTimeout: 10 * time.Second,WriteTimeout: 10 * time.Second,MaxHeaderBytes: 1 << 20,}server.ListenAndServe()}
Here is a simple frontend
index.html
<html><head><title>Query OpenSearch</title><style>body {background-color: antiquewhite;}.button-submit {background-color: orange;padding: 5px 20px;border-radius: 5px;border: none;cursor: pointer;position: absolute;transform: translateY(-50%);top: 50%;right: 10px;}.input-query {padding: 10px 15px;width: 100%;}.container-input {position: relative;}</style></head><body><div><h1>Query OpenSearch</h1><form id="form"><div class="container-input"><input type="text" id="query" name="query" class="input-query" /><button class="button-submit">Submit</button></div></form><div id="list"></div></div></body><script>console.log('hello')const query = async () => {// call post requestconst response = await fetch('/query')const json = await response.json()console.log(json)// update ui// document.getElementById("content").innerHTML = json.Messages;// Create an array of itemsvar items = json.Messages// Get the list container elementvar listContainer = document.getElementById('list')// Loop through the items array and create list items (<li>)for (var i = 0; i < items.length; i++) {var listItem = document.createElement('div')listItem.style.marginBottom = '15px'listItem.style.borderBottom = '1px solid #0000FF'var header = document.createElement('h4')header.textContent = `Chunk ${i}`var itemText = document.createTextNode(items[i])listItem.appendChild(header)listItem.appendChild(itemText)listContainer.appendChild(listItem)}}document.getElementById('form').addEventListener('submit', async event => {event.preventDefault()console.log('submit form')await query()})</script></html>
Here is test aoss code
test-aoss-index.go
// haimtran 14/04/2204// index and query opensearchpackage mainimport ("context""encoding/json"gobedrock "entest/gobedrock/bedrock""fmt""strings""github.com/aws/aws-sdk-go-v2/config""github.com/aws/aws-sdk-go-v2/service/bedrockruntime"opensearch "github.com/opensearch-project/opensearch-go/v2""github.com/opensearch-project/opensearch-go/v2/opensearchapi"requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2")// opensearch clientvar AOSSClient2 *opensearch.Client// bedrock runtime clientvar BedrockClientTest *bedrockruntime.Clientfunc init() {fmt.Println("create aoss client")awsCfg, err := config.LoadDefaultConfig(context.Background(),config.WithRegion("us-east-1"),)if err != nil {fmt.Println(err)}signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss")if err != nil {fmt.Println(err)}AOSSClient2, err = opensearch.NewClient(opensearch.Config{Addresses: []string{gobedrock.AOSS_ENDPOINT},Signer: signer,})if err != nil {fmt.Println(err)}fmt.Println(AOSSClient2)BedrockClientTest = bedrockruntime.NewFromConfig(awsCfg)}func getDocumentById(id string, indexName string) {fmt.Println("call getDocumentById...%s", id)query := opensearchapi.GetRequest{Index: indexName,DocumentID: id,}response, error := query.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}fmt.Println(response)}func indexVectorOpenSearch() {fmt.Println("call indexVector...")// get embedding vectorvec, error := gobedrock.GetEmbedVector("Hello, world!", BedrockClientTest)if error != nil {fmt.Println(error)}fmt.Println(vec)// convert vector of number to stringvecStr := make([]string, len(vec))// convert array float to stringfor k, v := range vec {if k < len(vec)-1 {vecStr[k] = fmt.Sprint(v) + ","} else {vecStr[k] = fmt.Sprint(v)}}content := strings.NewReader(fmt.Sprintf(`{"title": "Hello Indexing OpenSearch","text": "Hello Indexing OpenSearch","vector_field": %s}`, vecStr))index := opensearchapi.IndexRequest{Index: "demo",Body: content,}response, error := index.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}fmt.Println(response)}func indexOpenSearch() {fmt.Println("call index...")content := strings.NewReader(`{"title": "Hello","text": "Hello, world!"}`)index := opensearchapi.IndexRequest{Index: "testing",Body: content,}response, error := index.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}fmt.Println(response)}func queryOpenSearch() {type Hit struct {Id string `json:"_id"`Index string `json:"_index"`Text string `json:"text"`}type Hits struct {Hits []map[string]interface{} `json:"hits"`}type AOSSResponse struct {Hits Hits `json:"hits"`}fmt.Println("call query...")content := strings.NewReader(`{"size": 10,"query": {"match_all": {}}}`)search := opensearchapi.SearchRequest{Index: []string{"demo"},Body: content,}response, error := search.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}// fmt.Println(response)//var aossResponse AOSSResponsejson.NewDecoder(response.Body).Decode(&aossResponse)for _, hit := range aossResponse.Hits.Hits {fmt.Println(hit["_id"])// type assertion in golangfmt.Println(hit["_source"].(map[string]interface{})["text"])fmt.Println("=================================================")}}func queryOpenSearchByTitle(title string) {type Hit struct {Id string `json:"_id"`Index string `json:"_index"`Text string `json:"text"`}type Hits struct {Hits []map[string]interface{} `json:"hits"`}type AOSSResponse struct {Hits Hits `json:"hits"`}fmt.Println("call query...")content := strings.NewReader(`{"size": 10,"query": {"multi_match": {"query": "Hello Index OpenSearch","fields": ["title"]}}}`)search := opensearchapi.SearchRequest{Index: []string{"demo"},Body: content,}response, error := search.Do(context.Background(), AOSSClient2)if error != nil {fmt.Println(error)}// fmt.Println(response)//var aossResponse AOSSResponsejson.NewDecoder(response.Body).Decode(&aossResponse)for _, hit := range aossResponse.Hits.Hits {fmt.Println(hit["_id"])// type assertion in golangfmt.Println(hit["_source"].(map[string]interface{})["text"])fmt.Println("=================================================")}}func main() {// queryOpenSearch()// queryOpenSearchByTitle("Hello")// indexOpenSearch()// getDocumentById("1%3A0%3AgPUP344BM77PrxQVBumZ")// getDocumentById("1%3A0%3AgfUT5o4BM77PrxQVqOmc", "demo")// indexVectorOpenSearch()}