Setup#

Let create a domain

create-oass-collection

Then select test environment which create 1 node

create-oass-collection

Then select software version

create-oass-collection

Next, let make it public accessible

create-oass-collection

Finally, setup permission and authentication

create-oass-collection

Simple demo with only use fine-grained access control

Go to the domain dashboard and update permissions

create-oass-collection

Client#

First install dependencies, here is requirements.txt

boto3==1.26.25
boto3-stubs==1.24.26
botocore==1.29.25
botocore-stubs==1.27.42.post1
requests-aws4auth==1.1.2
opensearch-py==2.0.0

Authentication (singnature v4)

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import os
# load oss url
from dotenv import load_dotenv
load_dotenv(".demo.env")
# credentials
service = 'es'
# service = 'aoss' for serverless
region = 'us-east-1'
credentials = boto3.Session().get_credentials()
# create a client
awsauth = AWSV4SignerAuth(credentials, region=region)
client = OpenSearch(
hosts=[{'host': os.environ["OSS_URL"], 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300
)

Operation#

Let query an index

# query index
response = client.search(
index="cdk-entest",
body={
"query": {
"query_string": {
"query": "Hello"
}
}
}
)
print(response)

Let write a function to index data

# index data
def test_index_data(host):
"""
"""
client = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300
)
print(client)
#
response = client.index(
index="cdk-entest",
body={
"title": "Hello",
"creator": "Larry David",
"year": 1989
},
id="1"
)
print(response)

Let write a function to query data

# query index
def test_query_index(host):
"""
"""
client = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300
)
print(client)
#
response = client.search(
index="cdk-entest",
body={
"query": {
"query_string": {
"query": "Hello"
}
}
}
)
print(response)