Init Client#

  • load aws credentials from profile
  • create opensearch client
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2"
)
type EmbedResponse struct {
Embedding []float64 `json:"embedding"`
}
type Hits struct {
Hits []map[string]interface{} `json:"hits"`
}
type AossResponse struct {
Hits Hits `json:"hits"`
}
const endpoint = "https://yvp6plo4ijurgy8ymhdg.us-east-1.aoss.amazonaws.com" // e.g. https://opensearch-domain.region.com or Amazon OpenSearch Serverless endpoint
var AOSSClient *opensearch.Client
var bedrockClient *bedrockruntime.Client
// create an init function to initializing opensearch client
func init() {
//
fmt.Println("init and create an opensearch client")
// load aws credentials from profile demo using config
awsCfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion("us-east-1"),
config.WithSharedConfigProfile("demo"),
)
if err != nil {
log.Fatal(err)
}
// create a aws request signer using requestsigner
signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss")
if err != nil {
log.Fatal(err)
}
// create an opensearch client using opensearch package
AOSSClient, err = opensearch.NewClient(opensearch.Config{
Addresses: []string{endpoint},
Signer: signer,
})
if err != nil {
log.Fatal(err)
}
// create bedorck runtime client
bedrockClient = bedrockruntime.NewFromConfig(awsCfg)
}

Query All#

Let query all item from an index

First let create struct type for later decoding opensearch response

type Hits struct {
Hits []map[string]interface{} `json:"hits"`
}
type AossResponse struct {
Hits Hits `json:"hits"`
}

Then query all items from an index

func main() {
// let query get all item in an index
content := strings.NewReader(`{
"size": 5,
"query": {
"match_all": {}
}
}`)
search := opensearchapi.SearchRequest{
Index: []string{"demo"},
Body: content,
}
searchResponse, err := search.Do(context.Background(), client)
if err != nil {
log.Fatal(err)
}
// fmt.Println(searchResponse.Body)
var answer AossResponse
json.NewDecoder(searchResponse.Body).Decode(&answer)
first := answer.Hits.Hits[0]
fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])
// fmt.Println(answer.Hits.Hits[0]["_id"])
}

Query Vector#

  • call bedrock titan to get embedding vector
  • query opensearch given the vector

First let write a function to get embedding vector from bedrock titan model

func getEmbedVector(string) ([]float64, error) {
// create request body to titan model
body := map[string]interface{}{
"inputText": "what is amazon bedrock?",
}
bodyJson, err := json.Marshal(body)
if err != nil {
fmt.Println(err)
return nil, err
}
// invoke bedrock titan model to convert string to embedding vector
response, error := bedrockClient.InvokeModel(
context.Background(),
&bedrockruntime.InvokeModelInput{
Body: []byte(bodyJson),
ModelId: aws.String("amazon.titan-embed-text-v1"),
ContentType: aws.String("application/json"),
},
)
if error != nil {
fmt.Println(error)
return nil, error
}
// assert response to map
var embedResponse map[string]interface{}
error = json.Unmarshal(response.Body, &embedResponse)
if error != nil {
fmt.Println(error)
return nil, error
}
// assert response to array
slice, ok := embedResponse["embedding"].([]interface{})
if !ok {
fmt.Println(ok)
}
// assert to array of float64
values := make([]float64, len(slice))
for k, v := range slice {
values[k] = float64(v.(float64))
}
return values, nil
}

Second let query opensearch

func query(vec []float64) ([]string, error) {
// let query get all item in an index
// content := strings.NewReader(`{
// "size": 5,
// "query": {
// "match_all": {}
// }
// }`)
vecStr := make([]string, len(vec))
// convert array float to string
for k, v := range vec {
if k < len(vec)-1 {
vecStr[k] = fmt.Sprint(v) + ","
} else {
vecStr[k] = fmt.Sprint(v)
}
}
// create request body to titan model
content := strings.NewReader(fmt.Sprintf(`{
"size": 5,
"query": {
"knn": {
"vector_field": {
"vector": %s,
"k": 5
}
}
}
}`, vecStr))
// fmt.Println(content)
search := opensearchapi.SearchRequest{
Index: []string{"demo"},
Body: content,
}
searchResponse, err := search.Do(context.Background(), AOSSClient)
if err != nil {
log.Fatal(err)
}
// fmt.Println(searchResponse)
var answer AossResponse
json.NewDecoder(searchResponse.Body).Decode(&answer)
// first := answer.Hits.Hits[0]
// fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])
// fmt.Println(answer.Hits.Hits[0]["_id"])
queryResult := answer.Hits.Hits[0]["_source"].(map[string]interface{})["text"]
if queryResult == nil {
return []string{"nil"}, nil
}
// extract hint text only
hits := []string{}
for k, v := range answer.Hits.Hits {
if k > 0 {
hits = append(hits, v["_source"].(map[string]interface{})["text"].(string))
}
}
return hits, nil
// return fmt.Sprint(queryResult), nil
}

Index Vector#

  • index normal text
  • index embedding vector

First let index a normal item to an OpenSearch index in AOSS

func indexOpenSearch() {
fmt.Println("call index...")
content := strings.NewReader(`{
"title": "Hello",
"text": "Hello, world!"
}`)
index := opensearchapi.IndexRequest{
Index: "testing",
Body: content,
}
response, error := index.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
fmt.Println(response)
}

Second let index item with embedding vector to OpenSearch index

func indexVectorOpenSearch() {
fmt.Println("call indexVector...")
// get embedding vector
vec, error := gobedrock.GetEmbedVector("Hello, world!", BedrockClientTest)
if error != nil {
fmt.Println(error)
}
fmt.Println(vec)
// convert vector of number to string
vecStr := make([]string, len(vec))
// convert array float to string
for k, v := range vec {
if k < len(vec)-1 {
vecStr[k] = fmt.Sprint(v) + ","
} else {
vecStr[k] = fmt.Sprint(v)
}
}
content := strings.NewReader(fmt.Sprintf(`{
"title": "Hello Indexing OpenSearch",
"text": "Hello Indexing OpenSearch",
"vector_field": %s
}`, vecStr))
index := opensearchapi.IndexRequest{
Index: "demo",
Body: content,
}
response, error := index.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
fmt.Println(response)
}

Application#

  • create http server
  • handle request

Here is full golang server code

index.html
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2"
)
type EmbedResponse struct {
Embedding []float64 `json:"embedding"`
}
type Hits struct {
Hits []map[string]interface{} `json:"hits"`
}
type AossResponse struct {
Hits Hits `json:"hits"`
}
const endpoint = "https://yvp6plo4ijurgy8ymhdg.us-east-1.aoss.amazonaws.com" // e.g. https://opensearch-domain.region.com or Amazon OpenSearch Serverless endpoint
var AOSSClient \*opensearch.Client
var bedrockClient \*bedrockruntime.Client
// create an init function to initializing opensearch client
func init() {
//
fmt.Println("init and create an opensearch client")
// load aws credentials from profile demo using config
awsCfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion("us-east-1"),
config.WithSharedConfigProfile("demo"),
)
if err != nil {
log.Fatal(err)
}
// create a aws request signer using requestsigner
signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss")
if err != nil {
log.Fatal(err)
}
// create an opensearch client using opensearch package
AOSSClient, err = opensearch.NewClient(opensearch.Config{
Addresses: []string{endpoint},
Signer: signer,
})
if err != nil {
log.Fatal(err)
}
// create bedorck runtime client
bedrockClient = bedrockruntime.NewFromConfig(awsCfg)
}
func query(vec []float64) ([]string, error) {
// let query get all item in an index
// content := strings.NewReader(`{
// "size": 5,
// "query": {
// "match_all": {}
// }
// }`)
vecStr := make([]string, len(vec))
// convert array float to string
for k, v := range vec {
if k < len(vec)-1 {
vecStr[k] = fmt.Sprint(v) + ","
} else {
vecStr[k] = fmt.Sprint(v)
}
}
// create request body to titan model
content := strings.NewReader(fmt.Sprintf(`{
"size": 5,
"query": {
"knn": {
"vector_field": {
"vector": %s,
"k": 5
}
}
}
}`, vecStr))
// fmt.Println(content)
search := opensearchapi.SearchRequest{
Index: []string{"demo"},
Body: content,
}
searchResponse, err := search.Do(context.Background(), AOSSClient)
if err != nil {
log.Fatal(err)
}
// fmt.Println(searchResponse)
var answer AossResponse
json.NewDecoder(searchResponse.Body).Decode(&answer)
// first := answer.Hits.Hits[0]
// fmt.Printf("id: %s\n, index: %s\n, text: %s", first["_id"], first["_index"], first["_source"].(map[string]interface{})["text"])
// fmt.Println(answer.Hits.Hits[0]["_id"])
queryResult := answer.Hits.Hits[0]["_source"].(map[string]interface{})["text"]
if queryResult == nil {
return []string{"nil"}, nil
}
// extract hint text only
hits := []string{}
for k, v := range answer.Hits.Hits {
if k > 0 {
hits = append(hits, v["_source"].(map[string]interface{})["text"].(string))
}
}
return hits, nil
// return fmt.Sprint(queryResult), nil
}
func getEmbedVector(string) ([]float64, error) {
// create request body to titan model
body := map[string]interface{}{
"inputText": "what is amazon bedrock?",
}
bodyJson, err := json.Marshal(body)
if err != nil {
fmt.Println(err)
return nil, err
}
// invoke bedrock titan model to convert string to embedding vector
response, error := bedrockClient.InvokeModel(
context.Background(),
&bedrockruntime.InvokeModelInput{
Body: []byte(bodyJson),
ModelId: aws.String("amazon.titan-embed-text-v1"),
ContentType: aws.String("application/json"),
},
)
if error != nil {
fmt.Println(error)
return nil, error
}
// assert response to map
var embedResponse map[string]interface{}
error = json.Unmarshal(response.Body, &embedResponse)
if error != nil {
fmt.Println(error)
return nil, error
}
// assert response to array
slice, ok := embedResponse["embedding"].([]interface{})
if !ok {
fmt.Println(ok)
}
// assert to array of float64
values := make([]float64, len(slice))
for k, v := range slice {
values[k] = float64(v.(float64))
}
return values, nil
}
func main1() {
fmt.Println("Hello")
v, error := getEmbedVector("hello")
if error != nil {
fmt.Println(error)
}
awnsers, error := query(v)
if error != nil {
fmt.Println(error)
}
// fmt.Println(awnsers)
for k, v := range awnsers {
fmt.Println(k, v)
fmt.Println("====================================")
}
}
func main() {
// create handler multiplexer
mux := http.NewServeMux()
// hello message
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Hello"))
})
// response simple json
mux.HandleFunc("/json", func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(struct {
Message string `json:"Message"`
}{Message: "Hello"})
})
// handle a simple post request from a form
mux.HandleFunc("/form", func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
r.ParseForm()
name := r.FormValue("name")
w.Write([]byte(fmt.Sprintf("Hello %s", name)))
}
if r.Method == "GET" {
// w.Write([]byte("Hello"))
http.ServeFile(w, r, "index.html")
}
})
// query opensarch
mux.HandleFunc("/query", func(w http.ResponseWriter, r *http.Request) {
// parse question from user
question := r.FormValue("query")
// convert question to embedding vector
vec, error := getEmbedVector(question)
if error != nil {
fmt.Println(error)
}
// do query opensearch
answers, err := query(vec)
if err != nil {
fmt.Println(err)
}
json.NewEncoder(w).Encode(struct {
Messages []string `json:"Messages"`
}{Messages: answers})
})
// create a http server using http
server := http.Server{
Addr: ":3000",
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
server.ListenAndServe()
}

Here is a simple frontend

index.html
<html>
<head>
<title>Query OpenSearch</title>
<style>
body {
background-color: antiquewhite;
}
.button-submit {
background-color: orange;
padding: 5px 20px;
border-radius: 5px;
border: none;
cursor: pointer;
position: absolute;
transform: translateY(-50%);
top: 50%;
right: 10px;
}
.input-query {
padding: 10px 15px;
width: 100%;
}
.container-input {
position: relative;
}
</style>
</head>
<body>
<div>
<h1>Query OpenSearch</h1>
<form id="form">
<div class="container-input">
<input type="text" id="query" name="query" class="input-query" />
<button class="button-submit">Submit</button>
</div>
</form>
<div id="list"></div>
</div>
</body>
<script>
console.log('hello')
const query = async () => {
// call post request
const response = await fetch('/query')
const json = await response.json()
console.log(json)
// update ui
// document.getElementById("content").innerHTML = json.Messages;
// Create an array of items
var items = json.Messages
// Get the list container element
var listContainer = document.getElementById('list')
// Loop through the items array and create list items (<li>)
for (var i = 0; i < items.length; i++) {
var listItem = document.createElement('div')
listItem.style.marginBottom = '15px'
listItem.style.borderBottom = '1px solid #0000FF'
var header = document.createElement('h4')
header.textContent = `Chunk ${i}`
var itemText = document.createTextNode(items[i])
listItem.appendChild(header)
listItem.appendChild(itemText)
listContainer.appendChild(listItem)
}
}
document.getElementById('form').addEventListener('submit', async event => {
event.preventDefault()
console.log('submit form')
await query()
})
</script>
</html>

Here is test aoss code

test-aoss-index.go
// haimtran 14/04/2204
// index and query opensearch
package main
import (
"context"
"encoding/json"
gobedrock "entest/gobedrock/bedrock"
"fmt"
"strings"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2"
)
// opensearch client
var AOSSClient2 *opensearch.Client
// bedrock runtime client
var BedrockClientTest *bedrockruntime.Client
func init() {
fmt.Println("create aoss client")
awsCfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion("us-east-1"),
)
if err != nil {
fmt.Println(err)
}
signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss")
if err != nil {
fmt.Println(err)
}
AOSSClient2, err = opensearch.NewClient(opensearch.Config{
Addresses: []string{gobedrock.AOSS_ENDPOINT},
Signer: signer,
})
if err != nil {
fmt.Println(err)
}
fmt.Println(AOSSClient2)
BedrockClientTest = bedrockruntime.NewFromConfig(awsCfg)
}
func getDocumentById(id string, indexName string) {
fmt.Println("call getDocumentById...%s", id)
query := opensearchapi.GetRequest{
Index: indexName,
DocumentID: id,
}
response, error := query.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
fmt.Println(response)
}
func indexVectorOpenSearch() {
fmt.Println("call indexVector...")
// get embedding vector
vec, error := gobedrock.GetEmbedVector("Hello, world!", BedrockClientTest)
if error != nil {
fmt.Println(error)
}
fmt.Println(vec)
// convert vector of number to string
vecStr := make([]string, len(vec))
// convert array float to string
for k, v := range vec {
if k < len(vec)-1 {
vecStr[k] = fmt.Sprint(v) + ","
} else {
vecStr[k] = fmt.Sprint(v)
}
}
content := strings.NewReader(fmt.Sprintf(`{
"title": "Hello Indexing OpenSearch",
"text": "Hello Indexing OpenSearch",
"vector_field": %s
}`, vecStr))
index := opensearchapi.IndexRequest{
Index: "demo",
Body: content,
}
response, error := index.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
fmt.Println(response)
}
func indexOpenSearch() {
fmt.Println("call index...")
content := strings.NewReader(`{
"title": "Hello",
"text": "Hello, world!"
}`)
index := opensearchapi.IndexRequest{
Index: "testing",
Body: content,
}
response, error := index.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
fmt.Println(response)
}
func queryOpenSearch() {
type Hit struct {
Id string `json:"_id"`
Index string `json:"_index"`
Text string `json:"text"`
}
type Hits struct {
Hits []map[string]interface{} `json:"hits"`
}
type AOSSResponse struct {
Hits Hits `json:"hits"`
}
fmt.Println("call query...")
content := strings.NewReader(`{
"size": 10,
"query": {
"match_all": {}
}
}`)
search := opensearchapi.SearchRequest{
Index: []string{"demo"},
Body: content,
}
response, error := search.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
// fmt.Println(response)
//
var aossResponse AOSSResponse
json.NewDecoder(response.Body).Decode(&aossResponse)
for _, hit := range aossResponse.Hits.Hits {
fmt.Println(hit["_id"])
// type assertion in golang
fmt.Println(hit["_source"].(map[string]interface{})["text"])
fmt.Println("=================================================")
}
}
func queryOpenSearchByTitle(title string) {
type Hit struct {
Id string `json:"_id"`
Index string `json:"_index"`
Text string `json:"text"`
}
type Hits struct {
Hits []map[string]interface{} `json:"hits"`
}
type AOSSResponse struct {
Hits Hits `json:"hits"`
}
fmt.Println("call query...")
content := strings.NewReader(`{
"size": 10,
"query": {
"multi_match": {
"query": "Hello Index OpenSearch",
"fields": ["title"]
}
}
}`)
search := opensearchapi.SearchRequest{
Index: []string{"demo"},
Body: content,
}
response, error := search.Do(context.Background(), AOSSClient2)
if error != nil {
fmt.Println(error)
}
// fmt.Println(response)
//
var aossResponse AOSSResponse
json.NewDecoder(response.Body).Decode(&aossResponse)
for _, hit := range aossResponse.Hits.Hits {
fmt.Println(hit["_id"])
// type assertion in golang
fmt.Println(hit["_source"].(map[string]interface{})["text"])
fmt.Println("=================================================")
}
}
func main() {
// queryOpenSearch()
// queryOpenSearchByTitle("Hello")
// indexOpenSearch()
// getDocumentById("1%3A0%3AgPUP344BM77PrxQVBumZ")
// getDocumentById("1%3A0%3AgfUT5o4BM77PrxQVqOmc", "demo")
// indexVectorOpenSearch()
}

Reference#

  • go client