Introduction#

Build a simple GenAI app using NextJS and Amazon Bedrock

  • chat and simple prompt
  • simple vector search
  • simple rag
  • image generation
  • image analysis

Setup NextJS#

Let create a new NextJS project

npx create-next-app@latest

Then install dependencies, here is package.json

package.json
{
"name": "next-prisma-hello",
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint"
},
"dependencies": {
"@aws-sdk/client-bedrock-runtime": "^3.490.0",
"@aws-sdk/client-s3": "^3.525.0",
"@aws-sdk/credential-provider-node": "^3.525.0",
"@aws-sdk/credential-providers": "^3.525.0",
"@aws-sdk/s3-request-presigner": "^3.525.0",
"@opensearch-project/opensearch": "^2.5.0",
"@prisma/client": "^5.8.0",
"ai": "^2.2.33",
"aws-sdk": "^2.1569.0",
"i": "^0.3.7",
"langchain": "^0.1.25",
"next": "14.0.4",
"npm": "^10.5.0",
"package.json": "^2.0.1",
"react": "^18",
"react-dom": "^18"
},
"devDependencies": {
"@types/node": "^20",
"@types/react": "^18",
"@types/react-dom": "^18",
"autoprefixer": "^10.0.1",
"eslint": "^8",
"eslint-config-next": "14.0.4",
"postcss": "^8",
"prisma": "^5.8.0",
"tailwindcss": "^3.3.0",
"typescript": "^5"
}
}

Project structure

|--app
|--page.tsx
|--api
|--chat
|--route.ts
|--rag
|--route.js
|--image
|--page.tsx
|--rag
|--page.tsx
|--aoss
|--page.tsx
|--action.ts
|--cdk
|--bin
|--cdk.ts
|--lib
|--cdk-stack.ts
|--next.config.js
|--package.json
|--Dockerfile
|--.dockerignore
|--build.py
|--.env

Chat Function#

  • Simple conversation memory by using vercel SDK and bedrock
  • Streaming chat response
  • Frontend uses useChat react hook

Let implement the route.ts for chat

import {
BedrockRuntime,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { AWSBedrockAnthropicStream, StreamingTextResponse } from 'ai'
import { experimental_buildAnthropicPrompt } from 'ai/prompts'
// IMPORTANT! Set the runtime to edge
// export const runtime = "edge";
const bedrockClient = new BedrockRuntime({
region: 'us-east-1'
// region: process.env.AWS_REGION ?? "us-east-1",
// credentials: {
// accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? "",
// secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? "",
// sessionToken: process.env.AWS_SESSION_TOKEN ?? "",
// },
})
export async function POST(req: Request) {
// Extract the `prompt` from the body of the request
const { messages } = await req.json()
console.log(messages)
console.log(experimental_buildAnthropicPrompt(messages))
// Ask Claude for a streaming chat completion given the prompt
const bedrockResponse = await bedrockClient.send(
new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: experimental_buildAnthropicPrompt(messages),
max_tokens_to_sample: 2048
})
})
)
// Convert the response into a friendly text-stream
const stream = AWSBedrockAnthropicStream(bedrockResponse)
// Respond with the stream
return new StreamingTextResponse(stream)
}

The useChat hook and submit chat on enter key

const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/chat'
})
;<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>

Image Function#

Then let implement server function for generating image

  • Generate image
  • Upload to S3
  • Generate signed url
'use server'
import {
GetObjectCommand,
S3Client,
PutObjectCommand
} from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import {
BedrockRuntime,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import * as fs from 'fs'
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
const s3Client = new S3Client({ region: 'us-east-1' })
const genImage = async ({ prompt }: { prompt: string }) => {
let url = ''
const body = {
text_prompts: [{ text: prompt, weight: 1 }],
seed: 3,
cfg_scale: 10,
samples: 1,
steps: 50,
style_preset: 'anime',
height: 1024,
width: 1024
}
const command = new InvokeModelCommand({
body: JSON.stringify(body),
modelId: 'stability.stable-diffusion-xl-v1',
contentType: 'application/json',
accept: 'image/png'
})
try {
console.log(prompt)
const imageName = 'sample' + Date.now().toString() + '.png'
const key = `next-vercel-ai/${imageName}`
const response = await bedrock.send(command)
// fs.writeFile(`./public/${imageName}`, response["body"], () => {
// console.log("OK");
// });
// upload to s3 input location
await s3Client.send(
new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: key,
Body: response['body']
})
)
// generate signed url
const commandGetUrl = new GetObjectCommand({
Bucket: process.env.BUCKET,
Key: key
})
url = await getSignedUrl(s3Client as any, commandGetUrl as any, {
expiresIn: 3600
})
console.log(url)
} catch (error) {
console.log(error)
}
return url
}
export { genImage }

OpenSearch Permission#

[!IMPORTANT]

Please update data access policy in OpenSearch Serverless to grant principal such as Lambda ARN Role, CodeBuild Role, IAM to query and index collections.

  • AppRunner instance role
  • OpenSearch data access policy

Instance role to access AOSS opensearch

const instanceRole = new aws_iam.Role(this, 'InstanceRoleForApprunerBedrock', {
assumedBy: new aws_iam.ServicePrincipal('tasks.apprunner.amazonaws.com'),
roleName: 'InstanceRoleForApprunnerBedrock'
})
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: ['bedrock:InvokeModel', 'bedrock:InvokeModelWithResponseStream']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)

We also need to configure the OpenSearch data access policy

[
{
"Rules": [
{
"Resource": ["collection/demo"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
],
"ResourceType": "collection"
},
{
"Resource": ["index/demo/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
],
"ResourceType": "index"
}
],
"Principal": [
"arn:aws:iam::$ACCOUNT_ID:role/InstanceRoleForApprunnerBedrock"
],
"Description": "Rule 1"
}
]

OpenSearch Client#

  • SignatureV4
  • Index, Query
  • Bulk data

It is possible to comment out the aws-sdk v2 in like in this script

AwsSigv4Signer.js
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
'use strict'
const Connection = require('../Connection')
const Transport = require('../Transport')
const aws4 = require('aws4')
const AwsSigv4SignerError = require('./errors')
const crypto = require('crypto')
const { toMs } = Transport.internals
const getAwsSDKCredentialsProvider = async () => {
// First try V3
try {
const awsV3 = await import('@aws-sdk/credential-provider-node')
if (typeof awsV3.defaultProvider === 'function') {
return awsV3.defaultProvider()
}
} catch (err) {
// Ignore
}
// try {
// const awsV2 = await import('aws-sdk');
// if (awsV2.default && typeof awsV2.default.config.getCredentials === 'function') {
// return () =>
// new Promise((resolve, reject) => {
// awsV2.default.config.getCredentials((err, credentials) => {
// if (err) {
// reject(err);
// } else {
// resolve(credentials);
// }
// });
// });
// }
// } catch (err) {
// // Ignore
// }
throw new AwsSigv4SignerError(
'Unable to find a valid AWS SDK, please provide a valid getCredentials function to AwsSigv4Signer options.'
)
}
const awsDefaultCredentialsProvider = () =>
new Promise((resolve, reject) => {
getAwsSDKCredentialsProvider()
.then(provider => {
provider().then(resolve).catch(reject)
})
.catch(err => {
reject(err)
})
})
function AwsSigv4Signer(opts = {}) {
const credentialsState = {
credentials: null
}
if (!opts.region) {
throw new AwsSigv4SignerError('Region cannot be empty')
}
if (!opts.service) {
opts.service = 'es'
}
if (typeof opts.getCredentials !== 'function') {
opts.getCredentials = awsDefaultCredentialsProvider
}
function buildSignedRequestObject(request = {}) {
request.service = opts.service
request.region = opts.region
request.headers = request.headers || {}
request.headers['host'] = request.hostname
const signed = aws4.sign(request, credentialsState.credentials)
signed.headers['x-amz-content-sha256'] = crypto
.createHash('sha256')
.update(request.body || '', 'utf8')
.digest('hex')
return signed
}
class AwsSigv4SignerConnection extends Connection {
buildRequestObject(params) {
const request = super.buildRequestObject(params)
return buildSignedRequestObject(request)
}
}
class AwsSigv4SignerTransport extends Transport {
request(params, options = {}, callback = undefined) {
// options is optional so if options is a function, it's the callback.
if (typeof options === 'function') {
callback = options
options = {}
}
const currentCredentials = credentialsState.credentials
/**
* For AWS SDK V3
* Make sure token will expire no earlier than `expiryBufferMs` milliseconds in the future.
*/
const expiryBufferMs = toMs(options.requestTimeout || this.requestTimeout)
let expired = false
if (!currentCredentials) {
// Credentials haven't been acquired yet.
expired = true
}
// AWS SDK V2, needsRefresh should be available.
else if (typeof currentCredentials.needsRefresh === 'function') {
expired = currentCredentials.needsRefresh()
}
// AWS SDK V2, alternative to needsRefresh.
else if (currentCredentials.expired === true) {
expired = true
}
// AWS SDK V2, alternative to needsRefresh and expired.
else if (
currentCredentials.expireTime &&
currentCredentials.expireTime < new Date()
) {
expired = true
}
// AWS SDK V3, Credentials.expiration is a Date object
else if (
currentCredentials.expiration &&
currentCredentials.expiration.getTime() - Date.now() < expiryBufferMs
) {
expired = true
}
if (!expired) {
if (typeof callback === 'undefined') {
return super.request(params, options)
}
super.request(params, options, callback)
return
}
// In AWS SDK V2 Credentials.refreshPromise should be available.
if (
currentCredentials &&
typeof currentCredentials.refreshPromise === 'function'
) {
if (typeof callback === 'undefined') {
return currentCredentials.refreshPromise().then(() => {
return super.request(params, options)
})
} else {
currentCredentials
.refreshPromise()
.then(() => {
super.request(params, options, callback)
})
.catch(callback)
return
}
}
// For AWS SDK V3 or when the client has not acquired credentials yet.
if (typeof callback === 'undefined') {
return opts.getCredentials().then(credentials => {
credentialsState.credentials = credentials
return super.request(params, options)
})
} else {
opts
.getCredentials()
.then(credentials => {
credentialsState.credentials = credentials
super.request(params, options, callback)
})
.catch(callback)
}
}
}
return {
Transport: AwsSigv4SignerTransport,
Connection: AwsSigv4SignerConnection,
buildSignedRequestObject
}
}
module.exports = AwsSigv4Signer

Index and Query#

Let create a page to index and query, and using server function

index-query.tsx
'use client'
import { indexAoss, queryAoss } from './action'
const IndexPage = () => {
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form
onSubmit={event => {
event.preventDefault()
}}
>
<textarea
className="bg-gray-200 w-full p-5"
rows={15}
placeholder="document ..."
></textarea>
<button
className="px-10 py-3 rounded-sm bg-orange-400"
onClick={async event => {
event.preventDefault()
const result = await indexAoss({ doc: '', title: '' })
}}
>
Index
</button>
</form>
<div className="mt-10">
<form
onSubmit={event => {
event.preventDefault()
}}
>
<input
className="w-full px-5 py-3 bg-gray-200"
placeholder="query ..."
type="text"
id="query"
name="query"
></input>
<button
className="px-10 py-3 rounded-sm bg-orange-400 mt-2"
onClick={async event => {
event.preventDefault()
const query = (
document.getElementById('query') as HTMLInputElement
).value
console.log(query)
const result = await queryAoss({ query: query })
document.getElementById('result')!.innerText = result
}}
>
Query
</button>
</form>
</div>
<div>
<p id="result"></p>
</div>
</div>
</div>
)
}
export default IndexPage

Here is server function which index and query AOSS. First Bedrock client for creating embedding vectors and aoss client

'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.OASS_URL
})

Create a function for embedding vector

const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}

Create functions to query and index

export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
let result = ''
for (let hit of response.body.hits.hits) {
result += hit._source.text
}
console.log(result)
return result
}
export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}

Simple RAG#

  • Use vercel SDK for handling streaming messages
  • Query AOSS to get context
  • Create prompt with context

Here is simple frontend page

rag.tsx
'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/rag'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

Here is /api/rag which handle post request from client

route.js
import {
BedrockRuntime,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { AWSBedrockAnthropicStream, StreamingTextResponse } from 'ai'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
// IMPORTANT! Set the runtime to edge
// export const runtime = "edge";
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntime({
region: process.env.REGION
// region: process.env.AWS_REGION ?? process.env.REGION,
// credentials: {
// accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? "",
// secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? "",
// sessionToken: process.env.AWS_SESSION_TOKEN ?? "",
// },
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider()
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})
const createEmbedVector = async doc => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
console.log(vec.length)
return vec
}
const queryAoss = async query => {
const vec = await createEmbedVector(query)
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
// for (let hit of response.body.hits.hits) {
// console.log(hit);
// }
let result = ''
for (let hit of response.body.hits.hits) {
result += hit._source.text
}
// console.log(result);
return result
}
export async function POST(req) {
// Extract the `prompt` from the body of the request
const { messages } = await req.json()
console.log(messages)
// question last messages
// const question = "what is developing on aws course?";
const question = messages.pop().content
console.log(question)
// query opensearch get context
let context = ''
try {
context = await queryAoss(question)
console.log(context)
} catch (error) {
console.log(error)
}
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
// Ask Claude for a streaming chat completion given the prompt
const bedrockResponse = await bedrockClient.send(
new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
// Convert the response into a friendly text-stream
const stream = AWSBedrockAnthropicStream(bedrockResponse)
// Respond with the stream
return new StreamingTextResponse(stream)
}

Here is the most simple prompt with context

const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`

Docker#

To build and deploy with Apprunner please take note the next.config.js

/** @type {import('next').NextConfig} */
const nextConfig = {
output: 'standalone'
}
module.exports = nextConfig

Then prepare Dockerfile with multiple stage to optimize the image size

Dockerfile
FROM node:18-alpine AS base
# Install dependencies only when needed
FROM base AS deps
# Check https://github.com/nodejs/docker-node/tree/b4117f9333da4138b03a546ec926ef50a31506c3#nodealpine to understand why libc6-compat might be needed.
RUN apk add --no-cache libc6-compat
WORKDIR /app
# Install dependencies based on the preferred package manager
COPY package.json yarn.lock* package-lock.json* pnpm-lock.yaml* ./
RUN \
if [ -f yarn.lock ]; then yarn --frozen-lockfile; \
elif [ -f package-lock.json ]; then npm ci; \
elif [ -f pnpm-lock.yaml ]; then corepack enable pnpm && pnpm i --frozen-lockfile; \
else echo "Lockfile not found." && exit 1; \
fi
# Rebuild the source code only when needed
FROM base AS builder
WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY . .
# Next.js collects completely anonymous telemetry data about general usage.
# Learn more here: https://nextjs.org/telemetry
# Uncomment the following line in case you want to disable telemetry during the build.
# ENV NEXT_TELEMETRY_DISABLED 1
RUN \
if [ -f yarn.lock ]; then yarn run build; \
elif [ -f package-lock.json ]; then npm run build; \
elif [ -f pnpm-lock.yaml ]; then corepack enable pnpm && pnpm run build; \
else echo "Lockfile not found." && exit 1; \
fi
# Production image, copy all the files and run next
FROM base AS runner
WORKDIR /app
ENV NODE_ENV production
# Uncomment the following line in case you want to disable telemetry during runtime.
# ENV NEXT_TELEMETRY_DISABLED 1
RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 nextjs
COPY --from=builder /app/public ./public
# Set the correct permission for prerender cache
RUN mkdir .next
RUN chown nextjs:nodejs .next
# Automatically leverage output traces to reduce image size
# https://nextjs.org/docs/advanced-features/output-file-tracing
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
USER nextjs
EXPOSE 3000
ENV PORT 3000
# set hostname to localhost
ENV HOSTNAME "0.0.0.0"
# server.js is created by next build from the standalone output
# https://nextjs.org/docs/pages/api-reference/next-config-js/output
CMD ["node", "server.js"]

And .dockerignore file

Dockerfile.dockerignore
node_modules
npm - debug.log
README.md.next.git

Docker Command#

Stop all running containers and remove images

docker stop $(docker ps -a -q)
docker rmi -f $(docker images -aq)

Delete existing images

docker rmi -f $(docker images -aq)

Build and tag image

docker build --tag entest .

Stop all processing running on port 3000

sudo kill -9 $(lsof -i:3000 -t)

Here is build script

build.py
import os
import boto3
# parameters
deploy = 1
# # parameters
REGION = "us-east-1"
ACCOUNT = os.environ["ACCOUNT_ID"]
# delete all docker images
os.system("sudo docker system prune -a")
# build next-bedrock image
os.system("sudo docker build -t next-bedrock . ")
# aws ecr login
os.system(f"aws ecr get-login-password --region {REGION} | sudo docker login --username AWS --password-stdin {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com")
# get image id
IMAGE_ID=os.popen("sudo docker images -q next-bedrock:latest").read()
# tag next-bedrock image
os.system(f"sudo docker tag {IMAGE_ID.strip()} {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com/next-bedrock:latest")
# create ecr repository
os.system(f"aws ecr create-repository --registry-id {ACCOUNT} --repository-name next-bedrock")
# push image to ecr
os.system(f"sudo docker push {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com/next-bedrock:latest")
# run locally to test
# os.system(f"sudo docker run -d -p 3000:3000 next-bedrock:latest")
# apprunner deploy
if deploy == 1:
apprunner = boto3.client('apprunner')
apprunner.start_deployment(
ServiceArn=f"arn:aws:apprunner:{REGION}:{ACCOUNT}:service/NextBedrockService/xxx"
)

AppRunner#

Let create a stack to deploy the app on apprunner

  • Build role to pull ecr image
  • Task role to invoke bedrock model
import { Stack, StackProps, aws_apprunner, aws_iam } from 'aws-cdk-lib'
import { Effect } from 'aws-cdk-lib/aws-iam'
import { Construct } from 'constructs'
interface AppRunnerProps extends StackProps {
ecr: string
bucket: string
}
export class AppRunnerStack extends Stack {
constructor(scope: Construct, id: string, props: AppRunnerProps) {
super(scope, id, props)
const buildRole = new aws_iam.Role(this, 'RoleForAppRunnerPullEcrBedrock', {
assumedBy: new aws_iam.ServicePrincipal('build.apprunner.amazonaws.com'),
roleName: 'RoleForAppRunnerPullEcrBedrock'
})
buildRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['ecr:*']
})
)
const instanceRole = new aws_iam.Role(
this,
'InstanceRoleForApprunerBedrock',
{
assumedBy: new aws_iam.ServicePrincipal(
'tasks.apprunner.amazonaws.com'
),
roleName: 'InstanceRoleForApprunnerBedrock'
}
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
const autoscaling = new aws_apprunner.CfnAutoScalingConfiguration(
this,
'AutoScalingForGoApp',
{
autoScalingConfigurationName: 'AutoScalingForGoApp',
// min number instance
minSize: 1,
// max number instance
maxSize: 10,
// max concurrent request per instance
maxConcurrency: 100
}
)
const apprunner = new aws_apprunner.CfnService(this, 'NextBedrockService', {
serviceName: 'NextBedrockService',
sourceConfiguration: {
authenticationConfiguration: {
accessRoleArn: buildRole.roleArn
},
autoDeploymentsEnabled: false,
imageRepository: {
imageIdentifier: props.ecr,
imageRepositoryType: 'ECR',
imageConfiguration: {
port: '3000',
runtimeEnvironmentVariables: [
{
name: 'BUCKET',
value: 'demo'
},
{
name: 'HOSTNAME',
value: '0.0.0.0'
},
{
name: 'PORT',
value: '3000'
}
]
// startCommand: "",
}
}
},
instanceConfiguration: {
cpu: '1 vCPU',
memory: '2 GB',
instanceRoleArn: instanceRole.roleArn
},
observabilityConfiguration: {
observabilityEnabled: false
},
autoScalingConfigurationArn: autoscaling.ref
})
apprunner.addDependency(autoscaling)
}
}

FrontEnd Chat#

First let create a chat page which submit on enter key

'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/chat'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

FrontEnd Image#

Second let create an image generation page

'use client'
import { useEffect, useState } from 'react'
import { genImage } from './actions'
const SendIcon = ({ className }: { className?: string }) => {
return (
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 16 16"
fill="none"
className={className}
strokeWidth="2"
>
<path
d="M.5 1.163A1 1 0 0 1 1.97.28l12.868 6.837a1 1 0 0 1 0 1.766L1.969 15.72A1 1 0 0 1 .5 14.836V10.33a1 1 0 0 1 .816-.983L8.5 8 1.316 6.653A1 1 0 0 1 .5 5.67V1.163Z"
fill="currentColor"
></path>
</svg>
)
}
const HomePage = () => {
const [url, setUrl] = useState<string>('')
useEffect(() => {}, [url])
return (
<main>
<div className="max-w-3xl mx-auto">
<form className="mt-10 px-10">
<div className="relative ">
<textarea
rows={2}
id="prompt"
name="prompt"
className="w-[100%] bg-gray-300 px-5 py-5 rounded-sm"
onKeyDown={async event => {
if (event.key === 'Enter' && event.shiftKey === false) {
event.preventDefault()
setUrl('')
document.getElementById('modal')!.style.display = 'block'
const url = await genImage({
prompt: (
document.getElementById('prompt') as HTMLInputElement
).value
})
setUrl(url)
document.getElementById('modal')!.style.display = 'none'
}
}}
></textarea>
<button
className="absolute top-[50%] translate-y-[-50%] right-1 flex items-center justify-center rounded-md w-10 h-16 bg-green-500 hover:bg-green-600"
onClick={async event => {
event.preventDefault()
setUrl('')
document.getElementById('modal')!.style.display = 'block'
const url = await genImage({
prompt: (
document.getElementById('prompt') as HTMLInputElement
).value
})
setUrl(url)
document.getElementById('modal')!.style.display = 'none'
}}
>
<SendIcon className="h-4 w-4 text-white"></SendIcon>
</button>
</div>
</form>
<div className="mt-10 px-10">
<img src={url}></img>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-60"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
</main>
)
}
export default HomePage

And here is the image generation server function

  • Call bedrock stability model
  • Save image to S3
  • Return signed URL and display image
'use server'
import {
GetObjectCommand,
S3Client,
PutObjectCommand
} from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import {
BedrockRuntime,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import * as fs from 'fs'
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
const s3Client = new S3Client({ region: 'us-east-1' })
const genImage = async ({ prompt }: { prompt: string }) => {
let url = ''
const body = {
text_prompts: [{ text: prompt, weight: 1 }],
seed: 3,
cfg_scale: 10,
samples: 1,
steps: 50,
style_preset: 'anime',
height: 1024,
width: 1024
}
const command = new InvokeModelCommand({
body: JSON.stringify(body),
modelId: 'stability.stable-diffusion-xl-v1',
contentType: 'application/json',
accept: 'image/png'
})
try {
console.log(prompt)
const imageName = 'sample' + Date.now().toString() + '.png'
const key = `next-vercel-ai/${imageName}`
const response = await bedrock.send(command)
// fs.writeFile(`./public/${imageName}`, response["body"], () => {
// console.log("OK");
// });
// upload to s3 input location
await s3Client.send(
new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: key,
Body: response['body']
})
)
// generate signed url
const commandGetUrl = new GetObjectCommand({
Bucket: process.env.BUCKET,
Key: key
})
url = await getSignedUrl(s3Client as any, commandGetUrl as any, {
expiresIn: 3600
})
console.log(url)
} catch (error) {
console.log(error)
}
return url
}
export { genImage }

Upload Document#

Let create a page for uploading documents to S3 which then trigger a Lambda function to index documents to OpenSearch

  • create cognito identity pool for guest users for uploading to s3
  • create s3 client
  • frontend

First let create an cognito identity pool for guest user with policy as the following

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["cognito-identity:GetCredentialsForIdentity"],
"Resource": ["*"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject"],
"Resource": ["arn:aws:s3:::$BUCKET/documents/*"]
}
]
}

Second let create s3 client for uploading

import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import { fromCognitoIdentityPool } from '@aws-sdk/credential-providers'
const uploadToS3 = async (file: File) => {
console.log('upload to s3 ')
// s3 client
const s3Client = new S3Client({
region: 'us-east-1',
credentials: fromCognitoIdentityPool({
clientConfig: { region: 'us-east-1' },
identityPoolId: 'us-east-1:887c4756-e061-4fb0-a44a-cc9a6a59d96d',
logins: {
// [process.env.COGNITO_POOL_ID ?? ""]: idToken,
}
})
})
// command to upload to s3
const command = new PutObjectCommand({
Bucket: 'cdk-entest-videos',
Key: `documents/${file.name}`,
Body: file
})
// upload file to s3
try {
document.getElementById('modal')!.style.display = 'block'
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
document.getElementById('modal')!.style.display = 'none'
}

Finally create a simple frontend

upload-s3.tsx
'use client'
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import { fromCognitoIdentityPool } from '@aws-sdk/credential-providers'
const uploadToS3 = async (file: File) => {
console.log('upload to s3 ')
// s3 client
const s3Client = new S3Client({
region: 'us-east-1',
credentials: fromCognitoIdentityPool({
clientConfig: { region: 'us-east-1' },
identityPoolId: 'us-east-1:111222333444',
logins: {
// [process.env.COGNITO_POOL_ID ?? ""]: idToken,
}
})
})
// command to upload to s3
const command = new PutObjectCommand({
Bucket: 'bucket-name',
Key: `documents/${file.name}`,
Body: file
})
// upload file to s3
try {
document.getElementById('modal')!.style.display = 'block'
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
document.getElementById('modal')!.style.display = 'none'
}
const IndexPage = () => {
const submit = async (data: FormData) => {
const file = data.get('upload') as File
await uploadToS3(file)
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form className="mb-5" action={submit}>
<div className="flex flex-row w-full bg-gray-200 justify-center items-center py-3 px-3">
<input
type="file"
id="upload"
name="upload"
className="w-full cursor-pointer"
></input>
<button
id="upload-button"
className="bg-orange-400 px-10 py-3 rounded-sm"
onClick={event => {
console.log('upload file ...')
}}
>
Upload
</button>
</div>
</form>
<div>
<p id="result"></p>
</div>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-60"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
)
}
export default IndexPage

OpenSearch#

Let create a page for query and inexing opensearch serverless (aoss). We need aoss client, bedrock client for creating embedding vectors.

'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
import { writeFile } from 'fs/promises'
import { join } from 'path'
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const s3Client = new S3Client({
region: process.env.REGION
// credentials: fromCognitoIdentityPool({
// clientConfig: { region: config.REGION },
// identityPoolId: config.IDENTITY_POOL_ID,
// logins: {
// [config.COGNITO_POOL_ID]: idToken,
// },
// }),
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})

Create embedding vector using Bedrock titan model

const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}

Query AOSS

export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
const docs = response.body.hits.hits
console.log(docs)
return docs
}

Index AOSS

export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}

Finally RAG

const qaRag = async ({ question }: { question: string }) => {
// query opensearch get context
const context = await queryAoss({ query: question })
// build prompt
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
console.log(prompt)
// prompt bedrock anthropic
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
console.log(response)
console.log(decoder.decode(response.body))
}

Here is detail code

aoss.ts
'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
import { writeFile } from 'fs/promises'
import { join } from 'path'
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const s3Client = new S3Client({
region: process.env.REGION
// credentials: fromCognitoIdentityPool({
// clientConfig: { region: config.REGION },
// identityPoolId: config.IDENTITY_POOL_ID,
// logins: {
// [config.COGNITO_POOL_ID]: idToken,
// },
// }),
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})
const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}
export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
// let result = "";
// for (let hit of response.body.hits.hits) {
// result += hit._source.text;
// }
// console.log(result);
// return result;
const docs = response.body.hits.hits
console.log(docs)
return docs
}
export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}
const qaRag = async ({ question }: { question: string }) => {
// query opensearch get context
const context = await queryAoss({ query: question })
// build prompt
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
console.log(prompt)
// prompt bedrock anthropic
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
console.log(response)
console.log(decoder.decode(response.body))
}
export const uploadFile = async (data: FormData) => {
console.log('file: ', data.get('upload'))
const file: File | null = data.get('upload') as unknown as File
if (!file) {
throw new Error('No file uploaded')
}
// file buffer
const bytes = await file.arrayBuffer()
const buffer = Buffer.from(bytes)
// write to local
const path = join('/tmp', file.name)
await writeFile(path, buffer)
console.log(`open ${path} to see the upload file`)
// write to s3
const command = new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: `documents/${file.name}`,
Body: Buffer.from(bytes)
})
// upload file to s3
try {
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
return {
status: 'OK',
message: 'message sent OK'
}
}
// createEmbedVector({ doc: "hello" });
// queryAoss({ query: "What is Amazon Bedrock" });
// indexAoss({
// doc: "Amazon Bedrock is a fully managed service that makes high-performing foundation models (FMs) from leading AI startups and Amazon available for your use through a unified API. You can choose from a wide range of foundation models to find the model that is best suited for your use case. Amazon Bedrock also offers a broad set of capabilities to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can easily experiment with and evaluate top foundation models for your use cases, privately customize them with your data using techniques such as fine-tuning and Retrieval Augmented Generation (RAG), and build agents that execute tasks using your enterprise systems and data sources. With Amazon Bedrock's serverless experience, you can get started quickly, privately customize foundation models with your own data, and easily and securely integrate and deploy them into your applications using AWS tools without having to manage any infrastructure.",
// title: "What is Amazon Bedrock",
// });
// qaRag({ question: "what is developing on aws course?" });

And here is frontend

aoss.tsx
'use client'
import { useState } from 'react'
import { indexAoss, queryAoss } from './action'
const IndexPage = () => {
const [docs, setDocs] = useState<any[]>([])
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form
className="mb-3"
onSubmit={event => {
event.preventDefault()
}}
>
<input
type="text"
id="title"
name="title"
placeholder="title"
className="w-full bg-gray-200 p-5 mb-3"
></input>
<textarea
className="bg-gray-200 w-full p-5"
rows={15}
placeholder="document ..."
id="document"
name="document"
></textarea>
<button
className="px-10 py-3 rounded-sm bg-orange-400"
onClick={async event => {
event.preventDefault()
const title = (
document.getElementById('title') as HTMLInputElement
).value
const doc = (
document.getElementById('document') as HTMLInputElement
).value
console.log(title, doc)
document.getElementById('modal')!.style.display = 'block'
await indexAoss({ doc: doc, title: title })
document.getElementById('modal')!.style.display = 'none'
}}
>
Index
</button>
</form>
<div>
<form
onSubmit={event => {
event.preventDefault()
}}
>
<input
className="w-full px-5 py-3 bg-gray-200"
placeholder="query ..."
type="text"
id="query"
name="query"
></input>
<button
className="px-10 py-3 rounded-sm bg-orange-400 mt-2"
onClick={async event => {
event.preventDefault()
const query = (
document.getElementById('query') as HTMLInputElement
).value
console.log(query)
const docs = await queryAoss({ query: query })
setDocs(docs)
console.log(docs)
// document.getElementById("result")!.innerText = docs;
}}
>
Query
</button>
</form>
</div>
<div>
{docs.map(doc => (
<div key={doc._id} className="mb-3">
<h3 className="font-bold">{doc._source.title}</h3>
<p>{doc._source.text}</p>
<hr className="bg-blue-500 border-1 border-blue-500"></hr>
</div>
))}
</div>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-90"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
)
}
export default IndexPage

And RAG frontend page

rag.tsx
'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/rag'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

Lambda OASS#

Let create a lambda function which triggered when uploading a PDF file. This function will index oass. Here is the logic

# haimtran 07 DEC 2022
# opensearch serverless
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
import boto3
import json
import os
from datetime import datetime
#
INDEX = "demo"
# chunk size
CHUNK_SIZE = 1000
#
if "BUCKET" in os.environ:
pass
else:
os.environ["BUCKET"] = "BUCKET-NAME"
# opensearch domain
if "OPENSEARCH_DOMAIN" in os.environ:
pass
else:
os.environ["OPENSEARCH_DOMAIN"] = (
"domain.us-east-1.aoss.amazonaws.com"
)
os.environ["REGION"] = "us-east-1"
# bedrock client
bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")
# s3 client
s3_client = boto3.client("s3")
# host and opensearch client
host = os.environ["OPENSEARCH_DOMAIN"]
client = boto3.client("opensearchserverless")
region = os.environ["REGION"]
credentials = boto3.Session().get_credentials()
# auth
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
"aoss",
session_token=credentials.token,
)
# opensearch client
aoss_client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
def get_embedded_vector(query: str):
"""
convert text to embedding vector using bedrock
"""
# request body
body = json.dumps({"inputText": query})
# call bedrock titan
response = bedrock_client.invoke_model(
body=body,
modelId="amazon.titan-embed-text-v1",
accept="application/json",
contentType="application/json",
)
# get embed vector
vector = json.loads(response["body"].read())["embedding"]
# return
return vector
def load_pdf_to_vectors(key: str):
"""
load pdf to opensearch
"""
# filename
filename = (
key.split("/").pop().split(".")[0] + "-" + datetime.now().isoformat() + ".pdf"
)
# download s3 file YOUR-DEFAULT-AWS-REGION
s3_client.download_file(os.environ["BUCKET"], key, f"/tmp/{filename}")
# read pdf file
loader = PyPDFLoader(f"/tmp/{filename}")
pages = loader.load_and_split()
# chunk with fixed size
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=0)
docs = text_splitter.split_documents(documents=pages)
# embedding
vectors = [get_embedded_vector(doc.page_content) for doc in docs]
# return
return vectors, docs
def index_vectors_to_aoss(vectors, docs, filename):
"""
indexing data to opensearch
"""
# bulk indexing
data = ""
for index, doc in enumerate(docs):
data += json.dumps({"index": {"_index": INDEX}}) + "\n"
data += (
json.dumps(
{
"vector_field": vectors[index],
"text": doc.page_content,
"title": f"{filename} chunk {index}",
}
)
+ "\n"
)
# bulk index
aoss_client.bulk(data)
# return
return data
def handler(event, context):
"""
seach
"""
# get filename from event
for record in event["Records"]:
# get key object
key = record["s3"]["object"]["key"]
print(key)
# filename
filename = key.split("/").pop()
# load pdf to vectors
vectors, docs = load_pdf_to_vectors(key=key)
print(vectors)
# index vectors to aoss
data = index_vectors_to_aoss(vectors=vectors, docs=docs, filename=filename)
print(data)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps({}),
}
if __name__ == "__main__":
# vecs, docs = load_pdf_to_vectors("book/datasheet.pdf")
# data = index_vectors_to_aoss(vecs, docs)
# print(data)
handler(
event={"Records": [{"s3": {"object": {"key": "book/datasheet.pdf"}}}]},
context=None,
)

Let deploy it using CDK and container. Here is requirements.txt and Dockerfile

boto3==1.34.55
opensearch-py==2.4.2
langchain==0.1.10
pypdf==4.1.0
requests-aws4auth==1.2.3

Dockerfile

FROM public.ecr.aws/lambda/python:3.9
# create code dir inside container
RUN mkdir ${LAMBDA_TASK_ROOT}/source
# copy code to container
COPY "requirements.txt" ${LAMBDA_TASK_ROOT}/source
# copy handler function to container
COPY ./index.py ${LAMBDA_TASK_ROOT}
# install dependencies for running time environment
RUN pip3 install -r ./source/requirements.txt --target "${LAMBDA_TASK_ROOT}"
RUN python3 -m pip install boto3 --upgrade
# set the CMD to your handler
CMD [ "index.handler" ]

Finally here is CDK stack. Please take note to grant the Lambda function index and query oass we need to

  • Setup IAM role for Lambda with permission to do aoss:ApiCall
  • From OASS access role, please add the IAM role

Here is the cdk stack.

lambda-stack.ts
interface LambdaAossProps extends StackProps {
opensearchDomain: string
aossCollectionArn: string
bucketName: string
}
export class LambdaAossStack extends Stack {
constructor(scope: Construct, id: string, props: LambdaAossProps) {
super(scope, id, props)
// role for lambda to read opensearch
const role = new aws_iam.Role(this, 'RoleForLambdaIndexAossBedrock', {
roleName: 'RoleForLambdaIndexAossBedrock',
assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucketName}/*`],
actions: ['s3:GetObject']
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
`arn:aws:bedrock:${this.region}::foundation-model/anthropic.claude-v2`,
`arn:aws:bedrock:${this.region}::foundation-model/stability.stable-diffusion-xl-v1`,
`arn:aws:bedrock:${this.region}::foundation-model/amazon.titan-embed-text-v1`
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
// lambda function to query opensearch
new aws_lambda.Function(this, 'LamdaQueryOpenSearch', {
functionName: 'LambdaIndexAossBedrock',
memorySize: 2048,
timeout: Duration.seconds(300),
code: aws_lambda.EcrImageCode.fromAssetImage(
path.join(__dirname, './../lambda/lambda-index-aoss/')
),
handler: aws_lambda.Handler.FROM_IMAGE,
runtime: aws_lambda.Runtime.FROM_IMAGE,
environment: {
OPENSEARCH_DOMAIN: props.opensearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region,
BUCKET: props.bucketName
},
role: role
})
}
}

Here is both apprunner and Lambda

apprunner-lambda.ts
import {
Duration,
Stack,
StackProps,
aws_apprunner,
aws_iam,
aws_lambda
} from 'aws-cdk-lib'
import { Effect } from 'aws-cdk-lib/aws-iam'
import { Construct } from 'constructs'
import * as path from 'path'
interface AppRunnerProps extends StackProps {
ecr: string
bucket: string
aossCollectionArn: string
}
export class AppRunnerStack extends Stack {
constructor(scope: Construct, id: string, props: AppRunnerProps) {
super(scope, id, props)
const buildRole = new aws_iam.Role(this, 'RoleForAppRunnerPullEcrBedrock', {
assumedBy: new aws_iam.ServicePrincipal('build.apprunner.amazonaws.com'),
roleName: 'RoleForAppRunnerPullEcrBedrock'
})
buildRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['ecr:*']
})
)
const instanceRole = new aws_iam.Role(
this,
'InstanceRoleForApprunerBedrock',
{
assumedBy: new aws_iam.ServicePrincipal(
'tasks.apprunner.amazonaws.com'
),
roleName: 'InstanceRoleForApprunnerBedrock'
}
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
const autoscaling = new aws_apprunner.CfnAutoScalingConfiguration(
this,
'AutoScalingForGoApp',
{
autoScalingConfigurationName: 'AutoScalingForGoApp',
// min number instance
minSize: 1,