Introduction#

  • setup aws opensearch
  • interact via curl
  • using python sdk
  • basic dsl language

Architecture#

  • Opensearch cluster
  • Lambda to index and query
  • Optional API Gateway or DynamoDB stream
alb and aurora

OpenSearch Cluster#

Let create a standard opensearch cluster with dashboard accessible by username and password

alb and aurora

Configure permission from dashboard

alb and aurora

Upload Data#

opensearch domain

export DOMAIN=https://$DOMAIN/$INDEX/_search
curl -XPUT -u $USER:$PASS $DOMAIN \
-H 'Content-Type: application/json' \
-d '{
"author": "hai",
"title":"hello minh tran",
"year": "2022"}'

match a field, opensearch will analyze and match with score, a full-match has highest score, but if a title contain any of match word, it is still a match

curl -XGET -u $USER:$PASS $DOMAIN \
-H 'Content-Type: application/json' \
-d '{
"query": {
"match": {
"DocumentTitle": "ebs volume pricing"
}
}
}'
curl -XGET -u $USER:$PASS $DOMAIN \
-H 'Content-Type: application/json' \
-d '{
"query": {
"match": {
"title": "Hello"
}
}
}'

query string

curl -XGET -u $USER:$PASS $DOMAIN \
-H 'Content-Type: application/json' \
-d '{
"query": {
"query_string": {
"query": "ebs"
}
}
}'

Python SDK#

python3 -m pip install opensearch-py

create a client

import json
import uuid
import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
# opensearch domain
OPENSEARCH_DOMAIN = "search-xxx.us-east-1.es.amazonaws.com"
# get credential
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region="us-east-1")
# create opensearch client
client = OpenSearch(
hosts=[{
'host': OPENSEARCH_DOMAIN,
'port': 443,
}],
use_ssl=True,
verify_certs=True,
http_auth=auth,
connection_class=RequestsHttpConnection
)

upload to opensearch

def update_search_data() -> None:
"""
update item with uuid
"""
# load search data
with open("./search-data.json","r",encoding='utf-8') as file:
data = json.load(file)
# update document id by uuid
for index, item in enumerate(data):
item["DocumentId"] = str(uuid.uuid4())
# overwrite the search data json
final = json.dumps(data, indent=2)
with open("./search-data-update.json", "w", encoding='utf-8') as file:
file.write(final)

create opensearch index

def create_os_index(index_name):
"""
create open search index
"""
resp = client.indices.create(
index_name,
{
"settings": {
"index": {
"number_of_shards": 1
}
}
}
)
print(resp)
return resp

delete opensearch index

def delete_os_index(index_name):
"""
delete index
"""
resp = client.indices.delete(
index=index_name
)
print(resp)
return resp

upload to opensearch

def upload_data_to_os(index_name):
"""
load data into opensearch index
"""
with open("./search-data.json", "r", encoding="utf-8") as file:
items = json.load(file)
# loop over each item
for item in items:
# body
document = {
"DocumentTitle": item["DocumentTitle"]["Text"],
"DocumentExcerpt": item["DocumentExcerpt"]["Text"],
"DocumentURI": item["DocumentURI"]
}
# write to open search
resp = client.index(
index=index_name,
id=item["DocumentId"],
body=document
)
print(resp)

search

def search_index(index_name):
"""
test search by key word match
"""
# query
query = {
# 'size': 10,
'query': {
'multi_match': {
'query': 'credit',
'fields': ['DocumentTitle', 'DocumentExcerpt', "DocumentURI"]
}
}
}
# query = {
# 'query': {
# 'query_string': {
# 'query': 'credit'
# }
# }
# }
# search
resp = client.search(
index=index_name,
body=query
)
#
print(resp)
return resp

Lambda Function#

  • lambda to index oss
  • lambda to query oss
  • deploy using cdk
lambda_index_os.py
"""
haimtran 05/11/2022
lambda opensearch
"""
import json
import os
import boto3
from opensearchpy import (
OpenSearch,
RequestsHttpConnection,
AWSV4SignerAuth,
)
# opensearch domain
OPENSEARCH_DOMAIN = os.environ["OPENSEARCH_DOMAIN"]
INDEX = "cdk-entest"
# get credential
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region=os.environ["REGION"])
# create opensearch client
client = OpenSearch(
hosts=[
{
"host": OPENSEARCH_DOMAIN,
"port": 443,
}
],
use_ssl=True,
verify_certs=True,
http_auth=auth,
connection_class=RequestsHttpConnection,
)
def handler(event, context):
"""
seach
"""
for record in event["Records"]:
id = record["dynamodb"]["Keys"]["id"]["S"]
if record["eventName"] == "REMOVE":
pass
else:
item = record["dynamodb"]["NewImage"]
document = {
"DocumentTitle": item["DocumentTitle"]["S"],
"DocumentExcerpt": item["DocumentExcerpt"]["S"],
"DocumentURI": item["DocumentURI"]["S"]
}
resp = client.index(
index=INDEX, id=id, body=document
)
print(resp)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps(resp),
}

And lambda to query oss

lambda_query_os.py
"""
haimtran 05/11/2022
lambda opensearch
"""
import json
import os
import boto3
from opensearchpy import (
OpenSearch,
RequestsHttpConnection,
AWSV4SignerAuth,
)
# opensearch domain
OPENSEARCH_DOMAIN = os.environ["OPENSEARCH_DOMAIN"]
# get credential
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region=os.environ["REGION"])
# create opensearch client
client = OpenSearch(
hosts=[
{
"host": OPENSEARCH_DOMAIN,
"port": 443,
}
],
use_ssl=True,
verify_certs=True,
http_auth=auth,
connection_class=RequestsHttpConnection,
)
def handler(event, context):
"""
seach
"""
# parse query from request
try:
query_request = event["queryStringParameters"]["query"]
index = event["queryStringParameters"]["index"]
except:
query_request = "ebs"
index = "cdk-entest"
# opensearch query
query = {
# 'size': 10,
"query": {
"multi_match": {
"query": query_request,
"fields": [
"DocumentTitle",
"DocumentExcerpt",
"DocumentURI",
],
}
}
}
# response
resp = client.search(index=index, body=query)
print(resp)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps(resp),
}
if __name__ == "__main__":
resp_test = handler(
event={
"queryStringParameters": {
"query": "ebs",
"index": "cdk-entest",
}
},
context=None,
)
print(resp_test)

Finally here is a stack to deploy lambda functions with dependencies isntalled locally

lambda-stack.ts
import * as cdk from 'aws-cdk-lib'
import { Duration, Stack, StackProps } from 'aws-cdk-lib'
import { Construct } from 'constructs'
import * as path from 'path'
import * as fs from 'fs'
import { Effect } from 'aws-cdk-lib/aws-iam'
interface OpenSearchLambdaProps extends StackProps {
opensearchDomain: string
}
export class OpensearchLambdaStack extends cdk.Stack {
public readonly apigw: cdk.aws_apigateway.RestApi
constructor(scope: Construct, id: string, props: OpenSearchLambdaProps) {
super(scope, id, props)
// lambda function
const func = new cdk.aws_lambda.Function(this, 'TestOpenSearchLambda', {
functionName: 'TestOpenSearchLambda',
handler: 'lambda_query_os.handler',
runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,
memorySize: 512,
timeout: Duration.seconds(10),
code: cdk.aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),
environment: {
OPENSEARCH_DOMAIN: props.opensearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region
}
})
// apigateway
this.apigw = new cdk.aws_apigateway.RestApi(this, 'OpenSearchApiGwLambda', {
restApiName: 'OpenSearchApiGwLambda'
})
const resource = this.apigw.root.addResource('cdk-entest')
// integrate with lambda function
resource.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))
}
}
interface DDBStreamProps extends StackProps {
openSearchDomain: string
tableArn: string
tableStreamArn: string
}
export class DDBStreamStack extends Stack {
constructor(scope: Construct, id: string, props: DDBStreamProps) {
super(scope, id, props)
// role for lambda to read opensearch
// need to configure from opensearch side
const role = new cdk.aws_iam.Role(this, 'RoleForLambdaIndexOpenSearch', {
roleName: 'RoleForLambdaIndexOpenSearch',
assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
// lambda function
const func = new cdk.aws_lambda.Function(this, 'LambdaIndexOpenSearch', {
functionName: 'LambdaIndexOpenSearch',
memorySize: 512,
timeout: Duration.seconds(10),
code: cdk.aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),
handler: 'lambda_index_os.handler',
runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,
role: role,
environment: {
OPENSEARCH_DOMAIN: props.openSearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region
}
})
// existing ddb table
const table = cdk.aws_dynamodb.Table.fromTableAttributes(
this,
'NoteTableStream',
{
tableArn: props.tableArn,
tableStreamArn: props.tableStreamArn
}
)
// configure ddb stream to trigger lambda
func.addEventSource(
new cdk.aws_lambda_event_sources.DynamoEventSource(table, {
startingPosition: cdk.aws_lambda.StartingPosition.LATEST,
batchSize: 5,
maxBatchingWindow: Duration.seconds(1),
bisectBatchOnError: true,
retryAttempts: 2,
enabled: true
})
)
}
}
interface ReportProps extends StackProps {
apigwId: string
rootId: string
bucketArn: string
bucketName: string
}
export class ReportApiStack extends Stack {
constructor(scope: Construct, id: string, props: ReportProps) {
super(scope, id, props)
// look up existing api
const apigw = cdk.aws_apigateway.RestApi.fromRestApiAttributes(
this,
'ExistingApigwLookUp',
{
restApiId: props.apigwId,
rootResourceId: props.rootId
}
)
// role for lambda to read from bucket
const role = new cdk.aws_iam.Role(this, 'RoleForGenerateReportFunction', {
roleName: 'RoleForGenerateReportFunction',
assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
role.addToPolicy(
new cdk.aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`${props.bucketArn}/public/*`],
actions: ['s3:*']
})
)
// lamdba function to generate report
const func = new cdk.aws_lambda.Function(this, 'GenerateReportLambda', {
functionName: 'GenerateReportLambda',
code: cdk.aws_lambda.Code.fromInline(
fs.readFileSync(
path.resolve(__dirname, './../lambda/lambda_report.py'),
{
encoding: 'utf-8'
}
)
),
runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,
handler: 'index.handler',
timeout: Duration.seconds(10),
memorySize: 512,
role: role,
environment: {
BUCKET_NAME: props.bucketName
}
})
// integrate with api gateway
const report = apigw.root.addResource('report')
report.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))
}
}

OpenSearch Serverless#

Let experiment with opensearch serverless (AOSS)

  • Create collection
  • Python client

Let create a lambda function for indexing. The client is here

host = os.environ["OPENSEARCH_DOMAIN"]
client = boto3.client("opensearchserverless")
service = "aoss"
region = os.environ["REGION"]
credentials = boto3.Session().get_credentials()
# auth
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)
# opensearch client
client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
lambda-index.py
# haimtran 07 DEC 2022
# opensearch serverless
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
import json
import os
#
INDEX = 'cdk-entest'
# opensearch domain
if "OPENSEARCH_DOMAIN" in os.environ:
pass
else:
os.environ["OPENSEARCH_DOMAIN"] = ""
os.environ["REGION"] = "us-east-1"
# host and opensearch client
host = os.environ["OPENSEARCH_DOMAIN"]
client = boto3.client("opensearchserverless")
service = "aoss"
region = os.environ["REGION"]
credentials = boto3.Session().get_credentials()
# auth
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)
# opensearch client
client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
def handler(event, context):
"""
seach
"""
for record in event["Records"]:
id = record["dynamodb"]["Keys"]["id"]["S"]
if record["eventName"] == "REMOVE":
pass
else:
item = record["dynamodb"]["NewImage"]
document = {
"DocumentTitle": item["DocumentTitle"]["S"],
"DocumentExcerpt": item["DocumentExcerpt"]["S"],
"DocumentURI": item["DocumentURI"]["S"]
}
resp = client.index(
index=INDEX, id=id, body=document
)
print(resp)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps(resp),
}

Similarly, let create a lambda function to query

lambda-query.py
# haimtran 07 DEC 2022
# opensearch serverless
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
import json
import os
# opensearch domain
if "OPENSEARCH_DOMAIN" in os.environ:
pass
else:
# testing
os.environ["OPENSEARCH_DOMAIN"] = ""
os.environ["REGION"] = "us-east-1"
# host and opensearch client
host = os.environ["OPENSEARCH_DOMAIN"]
client = boto3.client("opensearchserverless")
service = "aoss"
region = os.environ["REGION"]
credentials = boto3.Session().get_credentials()
# auth
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token,
)
# opensearch client
client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
def handler(event, context):
"""
seach
"""
# parse query from request
try:
query_request = event["queryStringParameters"]["query"]
index = event["queryStringParameters"]["index"]
except:
query_request = "ebs"
index = "cdk-entest"
# opensearch query
query = {
"query": {
"query_string": {
"query": query_request
}
}
}
# response
resp = client.search(index=index, body=query)
print(resp)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps(resp),
}
if __name__ == "__main__":
handler(
event={
"queryStringParameters": {
"query": "ebs volume pricing",
"index": "cdk-entest",
}
},
context=None
)

Deploy the lambda using cdk, we need requirements.txt and cdk stack

requirements.txt
boto3==1.26.25
boto3-stubs==1.24.26
botocore==1.29.25
botocore-stubs==1.27.42.post1
requests-aws4auth==1.1.2
opensearch-py==2.0.0

And the cdk stack

lambda-stack.ts
import * as cdk from 'aws-cdk-lib'
import { Duration, Stack, StackProps } from 'aws-cdk-lib'
import { Construct } from 'constructs'
import * as path from 'path'
interface OpensearchNoteProps extends StackProps {
opensearchDomain: string
}
export class OpensearchNoteStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: OpensearchNoteProps) {
super(scope, id, props)
// role for lambda to read opensearch
const role = new cdk.aws_iam.Role(this, 'RoleForLambdaQueryOpenSearch', {
roleName: 'RoleForLambdaQueryOpenSearch',
assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
// lambda function to query opensearch
const func = new cdk.aws_lambda.Function(this, 'LamdaQueryOpenSearch', {
functionName: 'LamdaQueryOpenSearch',
memorySize: 512,
timeout: Duration.seconds(10),
code: cdk.aws_lambda.EcrImageCode.fromAssetImage(
path.join(__dirname, './../lambda-query-oss')
),
handler: cdk.aws_lambda.Handler.FROM_IMAGE,
runtime: cdk.aws_lambda.Runtime.FROM_IMAGE,
environment: {
OPENSEARCH_DOMAIN: props.opensearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region
},
role: role
})
// api gateway
const apigw = new cdk.aws_apigateway.RestApi(this, 'OpenSearchApi', {
restApiName: 'opensearch'
})
const resource = apigw.root.addResource('cdk-entest')
resource.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))
}
}
interface DDBStreamProps extends StackProps {
openSearchDomain: string
tableArn: string
tableStreamArn: string
}
export class DDBStreamStack extends Stack {
constructor(scope: Construct, id: string, props: DDBStreamProps) {
super(scope, id, props)
// role for lambda to read opensearch
const role = new cdk.aws_iam.Role(this, 'RoleForLambdaIndexOpenSearch', {
roleName: 'RoleForLambdaIndexOpenSearch',
assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
// lambda function
const func = new cdk.aws_lambda.Function(this, 'LambdaIndexOpenSearch', {
functionName: 'LambdaIndexOpenSearch',
memorySize: 512,
timeout: Duration.seconds(10),
code: cdk.aws_lambda.EcrImageCode.fromAssetImage(
path.join(__dirname, './../lambda-index-oss')
),
handler: cdk.aws_lambda.Handler.FROM_IMAGE,
runtime: cdk.aws_lambda.Runtime.FROM_IMAGE,
environment: {
OPENSEARCH_DOMAIN: props.openSearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region
},
role: role
})
// existing ddb table
const table = cdk.aws_dynamodb.Table.fromTableAttributes(
this,
'NoteTableStream',
{
tableArn: props.tableArn,
tableStreamArn: props.tableStreamArn
}
)
// configure ddb stream to trigger lambda
func.addEventSource(
new cdk.aws_lambda_event_sources.DynamoEventSource(table, {
startingPosition: cdk.aws_lambda.StartingPosition.LATEST,
batchSize: 5,
maxBatchingWindow: Duration.seconds(1),
bisectBatchOnError: true,
retryAttempts: 2,
enabled: true
})
)
}
}