Introduction#
- setup aws opensearch
- interact via curl
- using python sdk
- basic dsl language
Architecture#
- Opensearch cluster
- Lambda to index and query
- Optional API Gateway or DynamoDB stream
OpenSearch Cluster#
Let create a standard opensearch cluster with dashboard accessible by username and password
Configure permission from dashboard
Upload Data#
opensearch domain
export DOMAIN=https://$DOMAIN/$INDEX/_search
curl -XPUT -u $USER:$PASS $DOMAIN \-H 'Content-Type: application/json' \-d '{"author": "hai","title":"hello minh tran","year": "2022"}'
Basic Search#
match a field, opensearch will analyze and match with score, a full-match has highest score, but if a title contain any of match word, it is still a match
curl -XGET -u $USER:$PASS $DOMAIN \-H 'Content-Type: application/json' \-d '{"query": {"match": {"DocumentTitle": "ebs volume pricing"}}}'
curl -XGET -u $USER:$PASS $DOMAIN \-H 'Content-Type: application/json' \-d '{"query": {"match": {"title": "Hello"}}}'
query string
curl -XGET -u $USER:$PASS $DOMAIN \-H 'Content-Type: application/json' \-d '{"query": {"query_string": {"query": "ebs"}}}'
Python SDK#
python3 -m pip install opensearch-py
create a client
import jsonimport uuidimport boto3from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth# opensearch domainOPENSEARCH_DOMAIN = "search-xxx.us-east-1.es.amazonaws.com"# get credentialcredentials = boto3.Session().get_credentials()auth = AWSV4SignerAuth(credentials, region="us-east-1")# create opensearch clientclient = OpenSearch(hosts=[{'host': OPENSEARCH_DOMAIN,'port': 443,}],use_ssl=True,verify_certs=True,http_auth=auth,connection_class=RequestsHttpConnection)
upload to opensearch
def update_search_data() -> None:"""update item with uuid"""# load search datawith open("./search-data.json","r",encoding='utf-8') as file:data = json.load(file)# update document id by uuidfor index, item in enumerate(data):item["DocumentId"] = str(uuid.uuid4())# overwrite the search data jsonfinal = json.dumps(data, indent=2)with open("./search-data-update.json", "w", encoding='utf-8') as file:file.write(final)
create opensearch index
def create_os_index(index_name):"""create open search index"""resp = client.indices.create(index_name,{"settings": {"index": {"number_of_shards": 1}}})print(resp)return resp
delete opensearch index
def delete_os_index(index_name):"""delete index"""resp = client.indices.delete(index=index_name)print(resp)return resp
upload to opensearch
def upload_data_to_os(index_name):"""load data into opensearch index"""with open("./search-data.json", "r", encoding="utf-8") as file:items = json.load(file)# loop over each itemfor item in items:# bodydocument = {"DocumentTitle": item["DocumentTitle"]["Text"],"DocumentExcerpt": item["DocumentExcerpt"]["Text"],"DocumentURI": item["DocumentURI"]}# write to open searchresp = client.index(index=index_name,id=item["DocumentId"],body=document)print(resp)
search
def search_index(index_name):"""test search by key word match"""# queryquery = {# 'size': 10,'query': {'multi_match': {'query': 'credit','fields': ['DocumentTitle', 'DocumentExcerpt', "DocumentURI"]}}}# query = {# 'query': {# 'query_string': {# 'query': 'credit'# }# }# }# searchresp = client.search(index=index_name,body=query)#print(resp)return resp
Lambda Function#
- lambda to index oss
- lambda to query oss
- deploy using cdk
lambda_index_os.py
"""haimtran 05/11/2022lambda opensearch"""import jsonimport osimport boto3from opensearchpy import (OpenSearch,RequestsHttpConnection,AWSV4SignerAuth,)# opensearch domainOPENSEARCH_DOMAIN = os.environ["OPENSEARCH_DOMAIN"]INDEX = "cdk-entest"# get credentialcredentials = boto3.Session().get_credentials()auth = AWSV4SignerAuth(credentials, region=os.environ["REGION"])# create opensearch clientclient = OpenSearch(hosts=[{"host": OPENSEARCH_DOMAIN,"port": 443,}],use_ssl=True,verify_certs=True,http_auth=auth,connection_class=RequestsHttpConnection,)def handler(event, context):"""seach"""for record in event["Records"]:id = record["dynamodb"]["Keys"]["id"]["S"]if record["eventName"] == "REMOVE":passelse:item = record["dynamodb"]["NewImage"]document = {"DocumentTitle": item["DocumentTitle"]["S"],"DocumentExcerpt": item["DocumentExcerpt"]["S"],"DocumentURI": item["DocumentURI"]["S"]}resp = client.index(index=INDEX, id=id, body=document)print(resp)# returnreturn {"statusCode": 200,"headers": {"Access-Control-Allow-Headers": "*","Access-Control-Allow-Origin": "*","Access-Control-Allow-Methods": "OPTIONS,POST,GET",},"body": json.dumps(resp),}
And lambda to query oss
lambda_query_os.py
"""haimtran 05/11/2022lambda opensearch"""import jsonimport osimport boto3from opensearchpy import (OpenSearch,RequestsHttpConnection,AWSV4SignerAuth,)# opensearch domainOPENSEARCH_DOMAIN = os.environ["OPENSEARCH_DOMAIN"]# get credentialcredentials = boto3.Session().get_credentials()auth = AWSV4SignerAuth(credentials, region=os.environ["REGION"])# create opensearch clientclient = OpenSearch(hosts=[{"host": OPENSEARCH_DOMAIN,"port": 443,}],use_ssl=True,verify_certs=True,http_auth=auth,connection_class=RequestsHttpConnection,)def handler(event, context):"""seach"""# parse query from requesttry:query_request = event["queryStringParameters"]["query"]index = event["queryStringParameters"]["index"]except:query_request = "ebs"index = "cdk-entest"# opensearch queryquery = {# 'size': 10,"query": {"multi_match": {"query": query_request,"fields": ["DocumentTitle","DocumentExcerpt","DocumentURI",],}}}# responseresp = client.search(index=index, body=query)print(resp)# returnreturn {"statusCode": 200,"headers": {"Access-Control-Allow-Headers": "*","Access-Control-Allow-Origin": "*","Access-Control-Allow-Methods": "OPTIONS,POST,GET",},"body": json.dumps(resp),}if __name__ == "__main__":resp_test = handler(event={"queryStringParameters": {"query": "ebs","index": "cdk-entest",}},context=None,)print(resp_test)
Finally here is a stack to deploy lambda functions with dependencies isntalled locally
lambda-stack.ts
import * as cdk from 'aws-cdk-lib'import { Duration, Stack, StackProps } from 'aws-cdk-lib'import { Construct } from 'constructs'import * as path from 'path'import * as fs from 'fs'import { Effect } from 'aws-cdk-lib/aws-iam'interface OpenSearchLambdaProps extends StackProps {opensearchDomain: string}export class OpensearchLambdaStack extends cdk.Stack {public readonly apigw: cdk.aws_apigateway.RestApiconstructor(scope: Construct, id: string, props: OpenSearchLambdaProps) {super(scope, id, props)// lambda functionconst func = new cdk.aws_lambda.Function(this, 'TestOpenSearchLambda', {functionName: 'TestOpenSearchLambda',handler: 'lambda_query_os.handler',runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,memorySize: 512,timeout: Duration.seconds(10),code: cdk.aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),environment: {OPENSEARCH_DOMAIN: props.opensearchDomain,PYTHONPATH: '/var/task/package',REGION: this.region}})// apigatewaythis.apigw = new cdk.aws_apigateway.RestApi(this, 'OpenSearchApiGwLambda', {restApiName: 'OpenSearchApiGwLambda'})const resource = this.apigw.root.addResource('cdk-entest')// integrate with lambda functionresource.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))}}interface DDBStreamProps extends StackProps {openSearchDomain: stringtableArn: stringtableStreamArn: string}export class DDBStreamStack extends Stack {constructor(scope: Construct, id: string, props: DDBStreamProps) {super(scope, id, props)// role for lambda to read opensearch// need to configure from opensearch sideconst role = new cdk.aws_iam.Role(this, 'RoleForLambdaIndexOpenSearch', {roleName: 'RoleForLambdaIndexOpenSearch',assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')})// lambda functionconst func = new cdk.aws_lambda.Function(this, 'LambdaIndexOpenSearch', {functionName: 'LambdaIndexOpenSearch',memorySize: 512,timeout: Duration.seconds(10),code: cdk.aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),handler: 'lambda_index_os.handler',runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,role: role,environment: {OPENSEARCH_DOMAIN: props.openSearchDomain,PYTHONPATH: '/var/task/package',REGION: this.region}})// existing ddb tableconst table = cdk.aws_dynamodb.Table.fromTableAttributes(this,'NoteTableStream',{tableArn: props.tableArn,tableStreamArn: props.tableStreamArn})// configure ddb stream to trigger lambdafunc.addEventSource(new cdk.aws_lambda_event_sources.DynamoEventSource(table, {startingPosition: cdk.aws_lambda.StartingPosition.LATEST,batchSize: 5,maxBatchingWindow: Duration.seconds(1),bisectBatchOnError: true,retryAttempts: 2,enabled: true}))}}interface ReportProps extends StackProps {apigwId: stringrootId: stringbucketArn: stringbucketName: string}export class ReportApiStack extends Stack {constructor(scope: Construct, id: string, props: ReportProps) {super(scope, id, props)// look up existing apiconst apigw = cdk.aws_apigateway.RestApi.fromRestApiAttributes(this,'ExistingApigwLookUp',{restApiId: props.apigwId,rootResourceId: props.rootId})// role for lambda to read from bucketconst role = new cdk.aws_iam.Role(this, 'RoleForGenerateReportFunction', {roleName: 'RoleForGenerateReportFunction',assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')})role.addToPolicy(new cdk.aws_iam.PolicyStatement({effect: Effect.ALLOW,resources: [`${props.bucketArn}/public/*`],actions: ['s3:*']}))// lamdba function to generate reportconst func = new cdk.aws_lambda.Function(this, 'GenerateReportLambda', {functionName: 'GenerateReportLambda',code: cdk.aws_lambda.Code.fromInline(fs.readFileSync(path.resolve(__dirname, './../lambda/lambda_report.py'),{encoding: 'utf-8'})),runtime: cdk.aws_lambda.Runtime.PYTHON_3_9,handler: 'index.handler',timeout: Duration.seconds(10),memorySize: 512,role: role,environment: {BUCKET_NAME: props.bucketName}})// integrate with api gatewayconst report = apigw.root.addResource('report')report.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))}}
OpenSearch Serverless#
Let experiment with opensearch serverless (AOSS)
- Create collection
- Python client
Let create a lambda function for indexing. The client is here
host = os.environ["OPENSEARCH_DOMAIN"]client = boto3.client("opensearchserverless")service = "aoss"region = os.environ["REGION"]credentials = boto3.Session().get_credentials()# authawsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,service,session_token=credentials.token,)# opensearch clientclient = OpenSearch(hosts=[{"host": host, "port": 443}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection,timeout=300,)
lambda-index.py
# haimtran 07 DEC 2022# opensearch serverlessfrom opensearchpy import OpenSearch, RequestsHttpConnectionfrom requests_aws4auth import AWS4Authimport boto3import jsonimport os#INDEX = 'cdk-entest'# opensearch domainif "OPENSEARCH_DOMAIN" in os.environ:passelse:os.environ["OPENSEARCH_DOMAIN"] = ""os.environ["REGION"] = "us-east-1"# host and opensearch clienthost = os.environ["OPENSEARCH_DOMAIN"]client = boto3.client("opensearchserverless")service = "aoss"region = os.environ["REGION"]credentials = boto3.Session().get_credentials()# authawsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,service,session_token=credentials.token,)# opensearch clientclient = OpenSearch(hosts=[{"host": host, "port": 443}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection,timeout=300,)def handler(event, context):"""seach"""for record in event["Records"]:id = record["dynamodb"]["Keys"]["id"]["S"]if record["eventName"] == "REMOVE":passelse:item = record["dynamodb"]["NewImage"]document = {"DocumentTitle": item["DocumentTitle"]["S"],"DocumentExcerpt": item["DocumentExcerpt"]["S"],"DocumentURI": item["DocumentURI"]["S"]}resp = client.index(index=INDEX, id=id, body=document)print(resp)# returnreturn {"statusCode": 200,"headers": {"Access-Control-Allow-Headers": "*","Access-Control-Allow-Origin": "*","Access-Control-Allow-Methods": "OPTIONS,POST,GET",},"body": json.dumps(resp),}
Similarly, let create a lambda function to query
lambda-query.py
# haimtran 07 DEC 2022# opensearch serverlessfrom opensearchpy import OpenSearch, RequestsHttpConnectionfrom requests_aws4auth import AWS4Authimport boto3import jsonimport os# opensearch domainif "OPENSEARCH_DOMAIN" in os.environ:passelse:# testingos.environ["OPENSEARCH_DOMAIN"] = ""os.environ["REGION"] = "us-east-1"# host and opensearch clienthost = os.environ["OPENSEARCH_DOMAIN"]client = boto3.client("opensearchserverless")service = "aoss"region = os.environ["REGION"]credentials = boto3.Session().get_credentials()# authawsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,service,session_token=credentials.token,)# opensearch clientclient = OpenSearch(hosts=[{"host": host, "port": 443}],http_auth=awsauth,use_ssl=True,verify_certs=True,connection_class=RequestsHttpConnection,timeout=300,)def handler(event, context):"""seach"""# parse query from requesttry:query_request = event["queryStringParameters"]["query"]index = event["queryStringParameters"]["index"]except:query_request = "ebs"index = "cdk-entest"# opensearch queryquery = {"query": {"query_string": {"query": query_request}}}# responseresp = client.search(index=index, body=query)print(resp)# returnreturn {"statusCode": 200,"headers": {"Access-Control-Allow-Headers": "*","Access-Control-Allow-Origin": "*","Access-Control-Allow-Methods": "OPTIONS,POST,GET",},"body": json.dumps(resp),}if __name__ == "__main__":handler(event={"queryStringParameters": {"query": "ebs volume pricing","index": "cdk-entest",}},context=None)
Deploy the lambda using cdk, we need requirements.txt and cdk stack
requirements.txt
boto3==1.26.25boto3-stubs==1.24.26botocore==1.29.25botocore-stubs==1.27.42.post1requests-aws4auth==1.1.2opensearch-py==2.0.0
And the cdk stack
lambda-stack.ts
import * as cdk from 'aws-cdk-lib'import { Duration, Stack, StackProps } from 'aws-cdk-lib'import { Construct } from 'constructs'import * as path from 'path'interface OpensearchNoteProps extends StackProps {opensearchDomain: string}export class OpensearchNoteStack extends cdk.Stack {constructor(scope: Construct, id: string, props: OpensearchNoteProps) {super(scope, id, props)// role for lambda to read opensearchconst role = new cdk.aws_iam.Role(this, 'RoleForLambdaQueryOpenSearch', {roleName: 'RoleForLambdaQueryOpenSearch',assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')})// lambda function to query opensearchconst func = new cdk.aws_lambda.Function(this, 'LamdaQueryOpenSearch', {functionName: 'LamdaQueryOpenSearch',memorySize: 512,timeout: Duration.seconds(10),code: cdk.aws_lambda.EcrImageCode.fromAssetImage(path.join(__dirname, './../lambda-query-oss')),handler: cdk.aws_lambda.Handler.FROM_IMAGE,runtime: cdk.aws_lambda.Runtime.FROM_IMAGE,environment: {OPENSEARCH_DOMAIN: props.opensearchDomain,PYTHONPATH: '/var/task/package',REGION: this.region},role: role})// api gatewayconst apigw = new cdk.aws_apigateway.RestApi(this, 'OpenSearchApi', {restApiName: 'opensearch'})const resource = apigw.root.addResource('cdk-entest')resource.addMethod('GET', new cdk.aws_apigateway.LambdaIntegration(func))}}interface DDBStreamProps extends StackProps {openSearchDomain: stringtableArn: stringtableStreamArn: string}export class DDBStreamStack extends Stack {constructor(scope: Construct, id: string, props: DDBStreamProps) {super(scope, id, props)// role for lambda to read opensearchconst role = new cdk.aws_iam.Role(this, 'RoleForLambdaIndexOpenSearch', {roleName: 'RoleForLambdaIndexOpenSearch',assumedBy: new cdk.aws_iam.ServicePrincipal('lambda.amazonaws.com')})// lambda functionconst func = new cdk.aws_lambda.Function(this, 'LambdaIndexOpenSearch', {functionName: 'LambdaIndexOpenSearch',memorySize: 512,timeout: Duration.seconds(10),code: cdk.aws_lambda.EcrImageCode.fromAssetImage(path.join(__dirname, './../lambda-index-oss')),handler: cdk.aws_lambda.Handler.FROM_IMAGE,runtime: cdk.aws_lambda.Runtime.FROM_IMAGE,environment: {OPENSEARCH_DOMAIN: props.openSearchDomain,PYTHONPATH: '/var/task/package',REGION: this.region},role: role})// existing ddb tableconst table = cdk.aws_dynamodb.Table.fromTableAttributes(this,'NoteTableStream',{tableArn: props.tableArn,tableStreamArn: props.tableStreamArn})// configure ddb stream to trigger lambdafunc.addEventSource(new cdk.aws_lambda_event_sources.DynamoEventSource(table, {startingPosition: cdk.aws_lambda.StartingPosition.LATEST,batchSize: 5,maxBatchingWindow: Duration.seconds(1),bisectBatchOnError: true,retryAttempts: 2,enabled: true}))}}