Introduction#

Build a simple GenAI app using NextJS and Amazon Bedrock

  • chat and simple prompt
  • simple vector search
  • simple rag
  • image generation
  • image analysis

Setup NextJS#

Let create a new NextJS project

npx create-next-app@latest

Then install dependencies, here is package.json

package.json
{
"name": "next-prisma-hello",
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "next lint"
},
"dependencies": {
"@aws-sdk/client-bedrock-runtime": "^3.490.0",
"@aws-sdk/client-s3": "^3.525.0",
"@aws-sdk/credential-provider-node": "^3.525.0",
"@aws-sdk/credential-providers": "^3.525.0",
"@aws-sdk/s3-request-presigner": "^3.525.0",
"@opensearch-project/opensearch": "^2.5.0",
"@prisma/client": "^5.8.0",
"ai": "^2.2.33",
"aws-sdk": "^2.1569.0",
"i": "^0.3.7",
"langchain": "^0.1.25",
"next": "14.0.4",
"npm": "^10.5.0",
"package.json": "^2.0.1",
"react": "^18",
"react-dom": "^18"
},
"devDependencies": {
"@types/node": "^20",
"@types/react": "^18",
"@types/react-dom": "^18",
"autoprefixer": "^10.0.1",
"eslint": "^8",
"eslint-config-next": "14.0.4",
"postcss": "^8",
"prisma": "^5.8.0",
"tailwindcss": "^3.3.0",
"typescript": "^5"
}
}

Project structure

|--app
|--page.tsx
|--api
|--chat
|--route.ts
|--rag
|--route.js
|--image
|--page.tsx
|--rag
|--page.tsx
|--aoss
|--page.tsx
|--action.ts
|--cdk
|--bin
|--cdk.ts
|--lib
|--cdk-stack.ts
|--next.config.js
|--package.json
|--Dockerfile
|--.dockerignore
|--build.py
|--.env

Chat Function#

  • Simple conversation memory by using vercel SDK and bedrock
  • Streaming chat response
  • Frontend uses useChat react hook

Let implement the route.ts for chat

import {
BedrockRuntime,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { AWSBedrockAnthropicStream, StreamingTextResponse } from 'ai'
import { experimental_buildAnthropicPrompt } from 'ai/prompts'
// IMPORTANT! Set the runtime to edge
// export const runtime = "edge";
const bedrockClient = new BedrockRuntime({
region: 'us-east-1'
// region: process.env.AWS_REGION ?? "us-east-1",
// credentials: {
// accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? "",
// secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? "",
// sessionToken: process.env.AWS_SESSION_TOKEN ?? "",
// },
})
export async function POST(req: Request) {
// Extract the `prompt` from the body of the request
const { messages } = await req.json()
console.log(messages)
console.log(experimental_buildAnthropicPrompt(messages))
// Ask Claude for a streaming chat completion given the prompt
const bedrockResponse = await bedrockClient.send(
new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: experimental_buildAnthropicPrompt(messages),
max_tokens_to_sample: 2048
})
})
)
// Convert the response into a friendly text-stream
const stream = AWSBedrockAnthropicStream(bedrockResponse)
// Respond with the stream
return new StreamingTextResponse(stream)
}

The useChat hook and submit chat on enter key

const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/chat'
})
;<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>

Image Function#

Then let implement server function for generating image

  • Generate image
  • Upload to S3
  • Generate signed url
'use server'
import {
GetObjectCommand,
S3Client,
PutObjectCommand
} from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import {
BedrockRuntime,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import * as fs from 'fs'
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
const s3Client = new S3Client({ region: 'us-east-1' })
const genImage = async ({ prompt }: { prompt: string }) => {
let url = ''
const body = {
text_prompts: [{ text: prompt, weight: 1 }],
seed: 3,
cfg_scale: 10,
samples: 1,
steps: 50,
style_preset: 'anime',
height: 1024,
width: 1024
}
const command = new InvokeModelCommand({
body: JSON.stringify(body),
modelId: 'stability.stable-diffusion-xl-v1',
contentType: 'application/json',
accept: 'image/png'
})
try {
console.log(prompt)
const imageName = 'sample' + Date.now().toString() + '.png'
const key = `next-vercel-ai/${imageName}`
const response = await bedrock.send(command)
// fs.writeFile(`./public/${imageName}`, response["body"], () => {
// console.log("OK");
// });
// upload to s3 input location
await s3Client.send(
new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: key,
Body: response['body']
})
)
// generate signed url
const commandGetUrl = new GetObjectCommand({
Bucket: process.env.BUCKET,
Key: key
})
url = await getSignedUrl(s3Client as any, commandGetUrl as any, {
expiresIn: 3600
})
console.log(url)
} catch (error) {
console.log(error)
}
return url
}
export { genImage }

OpenSearch Permission#

[!IMPORTANT]

Please update data access policy in OpenSearch Serverless to grant principal such as Lambda ARN Role, CodeBuild Role, IAM to query and index collections.

  • AppRunner instance role
  • OpenSearch data access policy

Instance role to access AOSS opensearch

const instanceRole = new aws_iam.Role(this, 'InstanceRoleForApprunerBedrock', {
assumedBy: new aws_iam.ServicePrincipal('tasks.apprunner.amazonaws.com'),
roleName: 'InstanceRoleForApprunnerBedrock'
})
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: ['bedrock:InvokeModel', 'bedrock:InvokeModelWithResponseStream']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)

We also need to configure the OpenSearch data access policy

[
{
"Rules": [
{
"Resource": ["collection/demo"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
],
"ResourceType": "collection"
},
{
"Resource": ["index/demo/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
],
"ResourceType": "index"
}
],
"Principal": [
"arn:aws:iam::$ACCOUNT_ID:role/InstanceRoleForApprunnerBedrock"
],
"Description": "Rule 1"
}
]

OpenSearch Client#

  • SignatureV4
  • Index, Query
  • Bulk data

It is possible to comment out the aws-sdk v2 in like in this script

AwsSigv4Signer.js
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/
'use strict'
const Connection = require('../Connection')
const Transport = require('../Transport')
const aws4 = require('aws4')
const AwsSigv4SignerError = require('./errors')
const crypto = require('crypto')
const { toMs } = Transport.internals
const getAwsSDKCredentialsProvider = async () => {
// First try V3
try {
const awsV3 = await import('@aws-sdk/credential-provider-node')
if (typeof awsV3.defaultProvider === 'function') {
return awsV3.defaultProvider()
}
} catch (err) {
// Ignore
}
// try {
// const awsV2 = await import('aws-sdk');
// if (awsV2.default && typeof awsV2.default.config.getCredentials === 'function') {
// return () =>
// new Promise((resolve, reject) => {
// awsV2.default.config.getCredentials((err, credentials) => {
// if (err) {
// reject(err);
// } else {
// resolve(credentials);
// }
// });
// });
// }
// } catch (err) {
// // Ignore
// }
throw new AwsSigv4SignerError(
'Unable to find a valid AWS SDK, please provide a valid getCredentials function to AwsSigv4Signer options.'
)
}
const awsDefaultCredentialsProvider = () =>
new Promise((resolve, reject) => {
getAwsSDKCredentialsProvider()
.then(provider => {
provider().then(resolve).catch(reject)
})
.catch(err => {
reject(err)
})
})
function AwsSigv4Signer(opts = {}) {
const credentialsState = {
credentials: null
}
if (!opts.region) {
throw new AwsSigv4SignerError('Region cannot be empty')
}
if (!opts.service) {
opts.service = 'es'
}
if (typeof opts.getCredentials !== 'function') {
opts.getCredentials = awsDefaultCredentialsProvider
}
function buildSignedRequestObject(request = {}) {
request.service = opts.service
request.region = opts.region
request.headers = request.headers || {}
request.headers['host'] = request.hostname
const signed = aws4.sign(request, credentialsState.credentials)
signed.headers['x-amz-content-sha256'] = crypto
.createHash('sha256')
.update(request.body || '', 'utf8')
.digest('hex')
return signed
}
class AwsSigv4SignerConnection extends Connection {
buildRequestObject(params) {
const request = super.buildRequestObject(params)
return buildSignedRequestObject(request)
}
}
class AwsSigv4SignerTransport extends Transport {
request(params, options = {}, callback = undefined) {
// options is optional so if options is a function, it's the callback.
if (typeof options === 'function') {
callback = options
options = {}
}
const currentCredentials = credentialsState.credentials
/**
* For AWS SDK V3
* Make sure token will expire no earlier than `expiryBufferMs` milliseconds in the future.
*/
const expiryBufferMs = toMs(options.requestTimeout || this.requestTimeout)
let expired = false
if (!currentCredentials) {
// Credentials haven't been acquired yet.
expired = true
}
// AWS SDK V2, needsRefresh should be available.
else if (typeof currentCredentials.needsRefresh === 'function') {
expired = currentCredentials.needsRefresh()
}
// AWS SDK V2, alternative to needsRefresh.
else if (currentCredentials.expired === true) {
expired = true
}
// AWS SDK V2, alternative to needsRefresh and expired.
else if (
currentCredentials.expireTime &&
currentCredentials.expireTime < new Date()
) {
expired = true
}
// AWS SDK V3, Credentials.expiration is a Date object
else if (
currentCredentials.expiration &&
currentCredentials.expiration.getTime() - Date.now() < expiryBufferMs
) {
expired = true
}
if (!expired) {
if (typeof callback === 'undefined') {
return super.request(params, options)
}
super.request(params, options, callback)
return
}
// In AWS SDK V2 Credentials.refreshPromise should be available.
if (
currentCredentials &&
typeof currentCredentials.refreshPromise === 'function'
) {
if (typeof callback === 'undefined') {
return currentCredentials.refreshPromise().then(() => {
return super.request(params, options)
})
} else {
currentCredentials
.refreshPromise()
.then(() => {
super.request(params, options, callback)
})
.catch(callback)
return
}
}
// For AWS SDK V3 or when the client has not acquired credentials yet.
if (typeof callback === 'undefined') {
return opts.getCredentials().then(credentials => {
credentialsState.credentials = credentials
return super.request(params, options)
})
} else {
opts
.getCredentials()
.then(credentials => {
credentialsState.credentials = credentials
super.request(params, options, callback)
})
.catch(callback)
}
}
}
return {
Transport: AwsSigv4SignerTransport,
Connection: AwsSigv4SignerConnection,
buildSignedRequestObject
}
}
module.exports = AwsSigv4Signer

Index and Query#

Let create a page to index and query, and using server function

index-query.tsx
'use client'
import { indexAoss, queryAoss } from './action'
const IndexPage = () => {
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form
onSubmit={event => {
event.preventDefault()
}}
>
<textarea
className="bg-gray-200 w-full p-5"
rows={15}
placeholder="document ..."
></textarea>
<button
className="px-10 py-3 rounded-sm bg-orange-400"
onClick={async event => {
event.preventDefault()
const result = await indexAoss({ doc: '', title: '' })
}}
>
Index
</button>
</form>
<div className="mt-10">
<form
onSubmit={event => {
event.preventDefault()
}}
>
<input
className="w-full px-5 py-3 bg-gray-200"
placeholder="query ..."
type="text"
id="query"
name="query"
></input>
<button
className="px-10 py-3 rounded-sm bg-orange-400 mt-2"
onClick={async event => {
event.preventDefault()
const query = (
document.getElementById('query') as HTMLInputElement
).value
console.log(query)
const result = await queryAoss({ query: query })
document.getElementById('result')!.innerText = result
}}
>
Query
</button>
</form>
</div>
<div>
<p id="result"></p>
</div>
</div>
</div>
)
}
export default IndexPage

Here is server function which index and query AOSS. First Bedrock client for creating embedding vectors and aoss client

'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.OASS_URL
})

Create a function for embedding vector

const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}

Create functions to query and index

export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
let result = ''
for (let hit of response.body.hits.hits) {
result += hit._source.text
}
console.log(result)
return result
}
export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}

Simple RAG#

  • Use vercel SDK for handling streaming messages
  • Query AOSS to get context
  • Create prompt with context

Here is simple frontend page

rag.tsx
'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/rag'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

Here is /api/rag which handle post request from client

route.js
import {
BedrockRuntime,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { AWSBedrockAnthropicStream, StreamingTextResponse } from 'ai'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
// IMPORTANT! Set the runtime to edge
// export const runtime = "edge";
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntime({
region: process.env.REGION
// region: process.env.AWS_REGION ?? process.env.REGION,
// credentials: {
// accessKeyId: process.env.AWS_ACCESS_KEY_ID ?? "",
// secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY ?? "",
// sessionToken: process.env.AWS_SESSION_TOKEN ?? "",
// },
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider()
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})
const createEmbedVector = async doc => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
console.log(vec.length)
return vec
}
const queryAoss = async query => {
const vec = await createEmbedVector(query)
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
// for (let hit of response.body.hits.hits) {
// console.log(hit);
// }
let result = ''
for (let hit of response.body.hits.hits) {
result += hit._source.text
}
// console.log(result);
return result
}
export async function POST(req) {
// Extract the `prompt` from the body of the request
const { messages } = await req.json()
console.log(messages)
// question last messages
// const question = "what is developing on aws course?";
const question = messages.pop().content
console.log(question)
// query opensearch get context
let context = ''
try {
context = await queryAoss(question)
console.log(context)
} catch (error) {
console.log(error)
}
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
// Ask Claude for a streaming chat completion given the prompt
const bedrockResponse = await bedrockClient.send(
new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
// Convert the response into a friendly text-stream
const stream = AWSBedrockAnthropicStream(bedrockResponse)
// Respond with the stream
return new StreamingTextResponse(stream)
}

Here is the most simple prompt with context

const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`

Docker#

To build and deploy with Apprunner please take note the next.config.js

/** @type {import('next').NextConfig} */
const nextConfig = {
output: 'standalone'
}
module.exports = nextConfig

Then prepare Dockerfile with multiple stage to optimize the image size

Dockerfile
FROM node:18-alpine AS base
# Install dependencies only when needed
FROM base AS deps
# Check https://github.com/nodejs/docker-node/tree/b4117f9333da4138b03a546ec926ef50a31506c3#nodealpine to understand why libc6-compat might be needed.
RUN apk add --no-cache libc6-compat
WORKDIR /app
# Install dependencies based on the preferred package manager
COPY package.json yarn.lock* package-lock.json* pnpm-lock.yaml* ./
RUN \
if [ -f yarn.lock ]; then yarn --frozen-lockfile; \
elif [ -f package-lock.json ]; then npm ci; \
elif [ -f pnpm-lock.yaml ]; then corepack enable pnpm && pnpm i --frozen-lockfile; \
else echo "Lockfile not found." && exit 1; \
fi
# Rebuild the source code only when needed
FROM base AS builder
WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY . .
# Next.js collects completely anonymous telemetry data about general usage.
# Learn more here: https://nextjs.org/telemetry
# Uncomment the following line in case you want to disable telemetry during the build.
# ENV NEXT_TELEMETRY_DISABLED 1
RUN \
if [ -f yarn.lock ]; then yarn run build; \
elif [ -f package-lock.json ]; then npm run build; \
elif [ -f pnpm-lock.yaml ]; then corepack enable pnpm && pnpm run build; \
else echo "Lockfile not found." && exit 1; \
fi
# Production image, copy all the files and run next
FROM base AS runner
WORKDIR /app
ENV NODE_ENV production
# Uncomment the following line in case you want to disable telemetry during runtime.
# ENV NEXT_TELEMETRY_DISABLED 1
RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 nextjs
COPY --from=builder /app/public ./public
# Set the correct permission for prerender cache
RUN mkdir .next
RUN chown nextjs:nodejs .next
# Automatically leverage output traces to reduce image size
# https://nextjs.org/docs/advanced-features/output-file-tracing
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
USER nextjs
EXPOSE 3000
ENV PORT 3000
# set hostname to localhost
ENV HOSTNAME "0.0.0.0"
# server.js is created by next build from the standalone output
# https://nextjs.org/docs/pages/api-reference/next-config-js/output
CMD ["node", "server.js"]

And .dockerignore file

Dockerfile.dockerignore
node_modules
npm - debug.log
README.md.next.git

Docker Command#

Stop all running containers and remove images

docker stop $(docker ps -a -q)
docker rmi -f $(docker images -aq)

Delete existing images

docker rmi -f $(docker images -aq)

Build and tag image

docker build --tag entest .

Stop all processing running on port 3000

sudo kill -9 $(lsof -i:3000 -t)

Here is build script

build.py
import os
import boto3
# parameters
deploy = 1
# # parameters
REGION = "us-east-1"
ACCOUNT = os.environ["ACCOUNT_ID"]
# delete all docker images
os.system("sudo docker system prune -a")
# build next-bedrock image
os.system("sudo docker build -t next-bedrock . ")
# aws ecr login
os.system(f"aws ecr get-login-password --region {REGION} | sudo docker login --username AWS --password-stdin {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com")
# get image id
IMAGE_ID=os.popen("sudo docker images -q next-bedrock:latest").read()
# tag next-bedrock image
os.system(f"sudo docker tag {IMAGE_ID.strip()} {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com/next-bedrock:latest")
# create ecr repository
os.system(f"aws ecr create-repository --registry-id {ACCOUNT} --repository-name next-bedrock")
# push image to ecr
os.system(f"sudo docker push {ACCOUNT}.dkr.ecr.{REGION}.amazonaws.com/next-bedrock:latest")
# run locally to test
# os.system(f"sudo docker run -d -p 3000:3000 next-bedrock:latest")
# apprunner deploy
if deploy == 1:
apprunner = boto3.client('apprunner')
apprunner.start_deployment(
ServiceArn=f"arn:aws:apprunner:{REGION}:{ACCOUNT}:service/NextBedrockService/xxx"
)

AppRunner#

Let create a stack to deploy the app on apprunner

  • Build role to pull ecr image
  • Task role to invoke bedrock model
import { Stack, StackProps, aws_apprunner, aws_iam } from 'aws-cdk-lib'
import { Effect } from 'aws-cdk-lib/aws-iam'
import { Construct } from 'constructs'
interface AppRunnerProps extends StackProps {
ecr: string
bucket: string
}
export class AppRunnerStack extends Stack {
constructor(scope: Construct, id: string, props: AppRunnerProps) {
super(scope, id, props)
const buildRole = new aws_iam.Role(this, 'RoleForAppRunnerPullEcrBedrock', {
assumedBy: new aws_iam.ServicePrincipal('build.apprunner.amazonaws.com'),
roleName: 'RoleForAppRunnerPullEcrBedrock'
})
buildRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['ecr:*']
})
)
const instanceRole = new aws_iam.Role(
this,
'InstanceRoleForApprunerBedrock',
{
assumedBy: new aws_iam.ServicePrincipal(
'tasks.apprunner.amazonaws.com'
),
roleName: 'InstanceRoleForApprunnerBedrock'
}
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
const autoscaling = new aws_apprunner.CfnAutoScalingConfiguration(
this,
'AutoScalingForGoApp',
{
autoScalingConfigurationName: 'AutoScalingForGoApp',
// min number instance
minSize: 1,
// max number instance
maxSize: 10,
// max concurrent request per instance
maxConcurrency: 100
}
)
const apprunner = new aws_apprunner.CfnService(this, 'NextBedrockService', {
serviceName: 'NextBedrockService',
sourceConfiguration: {
authenticationConfiguration: {
accessRoleArn: buildRole.roleArn
},
autoDeploymentsEnabled: false,
imageRepository: {
imageIdentifier: props.ecr,
imageRepositoryType: 'ECR',
imageConfiguration: {
port: '3000',
runtimeEnvironmentVariables: [
{
name: 'BUCKET',
value: 'demo'
},
{
name: 'HOSTNAME',
value: '0.0.0.0'
},
{
name: 'PORT',
value: '3000'
}
]
// startCommand: "",
}
}
},
instanceConfiguration: {
cpu: '1 vCPU',
memory: '2 GB',
instanceRoleArn: instanceRole.roleArn
},
observabilityConfiguration: {
observabilityEnabled: false
},
autoScalingConfigurationArn: autoscaling.ref
})
apprunner.addDependency(autoscaling)
}
}

FrontEnd Chat#

First let create a chat page which submit on enter key

'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/chat'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

FrontEnd Image#

Second let create an image generation page

'use client'
import { useEffect, useState } from 'react'
import { genImage } from './actions'
const SendIcon = ({ className }: { className?: string }) => {
return (
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 16 16"
fill="none"
className={className}
strokeWidth="2"
>
<path
d="M.5 1.163A1 1 0 0 1 1.97.28l12.868 6.837a1 1 0 0 1 0 1.766L1.969 15.72A1 1 0 0 1 .5 14.836V10.33a1 1 0 0 1 .816-.983L8.5 8 1.316 6.653A1 1 0 0 1 .5 5.67V1.163Z"
fill="currentColor"
></path>
</svg>
)
}
const HomePage = () => {
const [url, setUrl] = useState<string>('')
useEffect(() => {}, [url])
return (
<main>
<div className="max-w-3xl mx-auto">
<form className="mt-10 px-10">
<div className="relative ">
<textarea
rows={2}
id="prompt"
name="prompt"
className="w-[100%] bg-gray-300 px-5 py-5 rounded-sm"
onKeyDown={async event => {
if (event.key === 'Enter' && event.shiftKey === false) {
event.preventDefault()
setUrl('')
document.getElementById('modal')!.style.display = 'block'
const url = await genImage({
prompt: (
document.getElementById('prompt') as HTMLInputElement
).value
})
setUrl(url)
document.getElementById('modal')!.style.display = 'none'
}
}}
></textarea>
<button
className="absolute top-[50%] translate-y-[-50%] right-1 flex items-center justify-center rounded-md w-10 h-16 bg-green-500 hover:bg-green-600"
onClick={async event => {
event.preventDefault()
setUrl('')
document.getElementById('modal')!.style.display = 'block'
const url = await genImage({
prompt: (
document.getElementById('prompt') as HTMLInputElement
).value
})
setUrl(url)
document.getElementById('modal')!.style.display = 'none'
}}
>
<SendIcon className="h-4 w-4 text-white"></SendIcon>
</button>
</div>
</form>
<div className="mt-10 px-10">
<img src={url}></img>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-60"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
</main>
)
}
export default HomePage

And here is the image generation server function

  • Call bedrock stability model
  • Save image to S3
  • Return signed URL and display image
'use server'
import {
GetObjectCommand,
S3Client,
PutObjectCommand
} from '@aws-sdk/client-s3'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import {
BedrockRuntime,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import * as fs from 'fs'
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
const s3Client = new S3Client({ region: 'us-east-1' })
const genImage = async ({ prompt }: { prompt: string }) => {
let url = ''
const body = {
text_prompts: [{ text: prompt, weight: 1 }],
seed: 3,
cfg_scale: 10,
samples: 1,
steps: 50,
style_preset: 'anime',
height: 1024,
width: 1024
}
const command = new InvokeModelCommand({
body: JSON.stringify(body),
modelId: 'stability.stable-diffusion-xl-v1',
contentType: 'application/json',
accept: 'image/png'
})
try {
console.log(prompt)
const imageName = 'sample' + Date.now().toString() + '.png'
const key = `next-vercel-ai/${imageName}`
const response = await bedrock.send(command)
// fs.writeFile(`./public/${imageName}`, response["body"], () => {
// console.log("OK");
// });
// upload to s3 input location
await s3Client.send(
new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: key,
Body: response['body']
})
)
// generate signed url
const commandGetUrl = new GetObjectCommand({
Bucket: process.env.BUCKET,
Key: key
})
url = await getSignedUrl(s3Client as any, commandGetUrl as any, {
expiresIn: 3600
})
console.log(url)
} catch (error) {
console.log(error)
}
return url
}
export { genImage }

Upload Document#

Let create a page for uploading documents to S3 which then trigger a Lambda function to index documents to OpenSearch

  • create cognito identity pool for guest users for uploading to s3
  • create s3 client
  • frontend

First let create an cognito identity pool for guest user with policy as the following

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["cognito-identity:GetCredentialsForIdentity"],
"Resource": ["*"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:GetObject"],
"Resource": ["arn:aws:s3:::$BUCKET/documents/*"]
}
]
}

Second let create s3 client for uploading

import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import { fromCognitoIdentityPool } from '@aws-sdk/credential-providers'
const uploadToS3 = async (file: File) => {
console.log('upload to s3 ')
// s3 client
const s3Client = new S3Client({
region: 'us-east-1',
credentials: fromCognitoIdentityPool({
clientConfig: { region: 'us-east-1' },
identityPoolId: 'us-east-1:887c4756-e061-4fb0-a44a-cc9a6a59d96d',
logins: {
// [process.env.COGNITO_POOL_ID ?? ""]: idToken,
}
})
})
// command to upload to s3
const command = new PutObjectCommand({
Bucket: 'cdk-entest-videos',
Key: `documents/${file.name}`,
Body: file
})
// upload file to s3
try {
document.getElementById('modal')!.style.display = 'block'
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
document.getElementById('modal')!.style.display = 'none'
}

Finally create a simple frontend

upload-s3.tsx
'use client'
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import { fromCognitoIdentityPool } from '@aws-sdk/credential-providers'
const uploadToS3 = async (file: File) => {
console.log('upload to s3 ')
// s3 client
const s3Client = new S3Client({
region: 'us-east-1',
credentials: fromCognitoIdentityPool({
clientConfig: { region: 'us-east-1' },
identityPoolId: 'us-east-1:111222333444',
logins: {
// [process.env.COGNITO_POOL_ID ?? ""]: idToken,
}
})
})
// command to upload to s3
const command = new PutObjectCommand({
Bucket: 'bucket-name',
Key: `documents/${file.name}`,
Body: file
})
// upload file to s3
try {
document.getElementById('modal')!.style.display = 'block'
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
document.getElementById('modal')!.style.display = 'none'
}
const IndexPage = () => {
const submit = async (data: FormData) => {
const file = data.get('upload') as File
await uploadToS3(file)
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form className="mb-5" action={submit}>
<div className="flex flex-row w-full bg-gray-200 justify-center items-center py-3 px-3">
<input
type="file"
id="upload"
name="upload"
className="w-full cursor-pointer"
></input>
<button
id="upload-button"
className="bg-orange-400 px-10 py-3 rounded-sm"
onClick={event => {
console.log('upload file ...')
}}
>
Upload
</button>
</div>
</form>
<div>
<p id="result"></p>
</div>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-60"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
)
}
export default IndexPage

OpenSearch#

Let create a page for query and inexing opensearch serverless (aoss). We need aoss client, bedrock client for creating embedding vectors.

'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
import { writeFile } from 'fs/promises'
import { join } from 'path'
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const s3Client = new S3Client({
region: process.env.REGION
// credentials: fromCognitoIdentityPool({
// clientConfig: { region: config.REGION },
// identityPoolId: config.IDENTITY_POOL_ID,
// logins: {
// [config.COGNITO_POOL_ID]: idToken,
// },
// }),
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})

Create embedding vector using Bedrock titan model

const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}

Query AOSS

export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
const docs = response.body.hits.hits
console.log(docs)
return docs
}

Index AOSS

export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}

Finally RAG

const qaRag = async ({ question }: { question: string }) => {
// query opensearch get context
const context = await queryAoss({ query: question })
// build prompt
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
console.log(prompt)
// prompt bedrock anthropic
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
console.log(response)
console.log(decoder.decode(response.body))
}

Here is detail code

aoss.ts
'use server'
import {
BedrockRuntimeClient,
InvokeModelCommand
} from '@aws-sdk/client-bedrock-runtime'
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
const { defaultProvider } = require('@aws-sdk/credential-provider-node')
const { Client } = require('@opensearch-project/opensearch')
const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws')
import { writeFile } from 'fs/promises'
import { join } from 'path'
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: process.env.REGION
})
const s3Client = new S3Client({
region: process.env.REGION
// credentials: fromCognitoIdentityPool({
// clientConfig: { region: config.REGION },
// identityPoolId: config.IDENTITY_POOL_ID,
// logins: {
// [config.COGNITO_POOL_ID]: idToken,
// },
// }),
})
const aossClient = new Client({
...AwsSigv4Signer({
region: process.env.REGION,
service: 'aoss',
getCredentials: () => {
const credentialsProvider = defaultProvider({})
return credentialsProvider()
}
}),
node: process.env.AOSS_URL
})
const createEmbedVector = async ({ doc }: { doc: string }) => {
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'amazon.titan-embed-text-v1',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
inputText: doc
})
})
)
const vec = JSON.parse(decoder.decode(response.body))['embedding']
return vec
}
export const queryAoss = async ({ query }: { query: string }) => {
const vec = await createEmbedVector({ doc: query })
const body = {
size: 2,
query: {
knn: {
vector_field: {
vector: vec,
k: 2
}
}
}
}
var response = await aossClient.search({
index: 'demo',
body: body
})
// let result = "";
// for (let hit of response.body.hits.hits) {
// result += hit._source.text;
// }
// console.log(result);
// return result;
const docs = response.body.hits.hits
console.log(docs)
return docs
}
export const indexAoss = async ({
doc,
title
}: {
doc: string
title: string
}) => {
// no chunk split
const vec = await createEmbedVector({ doc: doc })
// serverless does not support id
const body = {
vector_field: vec,
text: doc,
title: title
}
var response = await aossClient.index({
index: 'demo',
body: body
})
console.log(response)
}
const qaRag = async ({ question }: { question: string }) => {
// query opensearch get context
const context = await queryAoss({ query: question })
// build prompt
const prompt = `\n\nHuman: Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. don't include harmful content \n\n ${context} \n\nHuman: ${question} \n\nAssistant:`
console.log(prompt)
// prompt bedrock anthropic
const response = await bedrockClient.send(
new InvokeModelCommand({
modelId: 'anthropic.claude-v2',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
prompt: prompt,
max_tokens_to_sample: 2048
})
})
)
console.log(response)
console.log(decoder.decode(response.body))
}
export const uploadFile = async (data: FormData) => {
console.log('file: ', data.get('upload'))
const file: File | null = data.get('upload') as unknown as File
if (!file) {
throw new Error('No file uploaded')
}
// file buffer
const bytes = await file.arrayBuffer()
const buffer = Buffer.from(bytes)
// write to local
const path = join('/tmp', file.name)
await writeFile(path, buffer)
console.log(`open ${path} to see the upload file`)
// write to s3
const command = new PutObjectCommand({
Bucket: process.env.BUCKET,
Key: `documents/${file.name}`,
Body: Buffer.from(bytes)
})
// upload file to s3
try {
const res = await s3Client.send(command)
console.log(res)
} catch (error) {
console.log('erorr upload to s3 ', error)
}
return {
status: 'OK',
message: 'message sent OK'
}
}
// createEmbedVector({ doc: "hello" });
// queryAoss({ query: "What is Amazon Bedrock" });
// indexAoss({
// doc: "Amazon Bedrock is a fully managed service that makes high-performing foundation models (FMs) from leading AI startups and Amazon available for your use through a unified API. You can choose from a wide range of foundation models to find the model that is best suited for your use case. Amazon Bedrock also offers a broad set of capabilities to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can easily experiment with and evaluate top foundation models for your use cases, privately customize them with your data using techniques such as fine-tuning and Retrieval Augmented Generation (RAG), and build agents that execute tasks using your enterprise systems and data sources. With Amazon Bedrock's serverless experience, you can get started quickly, privately customize foundation models with your own data, and easily and securely integrate and deploy them into your applications using AWS tools without having to manage any infrastructure.",
// title: "What is Amazon Bedrock",
// });
// qaRag({ question: "what is developing on aws course?" });

And here is frontend

aoss.tsx
'use client'
import { useState } from 'react'
import { indexAoss, queryAoss } from './action'
const IndexPage = () => {
const [docs, setDocs] = useState<any[]>([])
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
<div>
<form
className="mb-3"
onSubmit={event => {
event.preventDefault()
}}
>
<input
type="text"
id="title"
name="title"
placeholder="title"
className="w-full bg-gray-200 p-5 mb-3"
></input>
<textarea
className="bg-gray-200 w-full p-5"
rows={15}
placeholder="document ..."
id="document"
name="document"
></textarea>
<button
className="px-10 py-3 rounded-sm bg-orange-400"
onClick={async event => {
event.preventDefault()
const title = (
document.getElementById('title') as HTMLInputElement
).value
const doc = (
document.getElementById('document') as HTMLInputElement
).value
console.log(title, doc)
document.getElementById('modal')!.style.display = 'block'
await indexAoss({ doc: doc, title: title })
document.getElementById('modal')!.style.display = 'none'
}}
>
Index
</button>
</form>
<div>
<form
onSubmit={event => {
event.preventDefault()
}}
>
<input
className="w-full px-5 py-3 bg-gray-200"
placeholder="query ..."
type="text"
id="query"
name="query"
></input>
<button
className="px-10 py-3 rounded-sm bg-orange-400 mt-2"
onClick={async event => {
event.preventDefault()
const query = (
document.getElementById('query') as HTMLInputElement
).value
console.log(query)
const docs = await queryAoss({ query: query })
setDocs(docs)
console.log(docs)
// document.getElementById("result")!.innerText = docs;
}}
>
Query
</button>
</form>
</div>
<div>
{docs.map(doc => (
<div key={doc._id} className="mb-3">
<h3 className="font-bold">{doc._source.title}</h3>
<p>{doc._source.text}</p>
<hr className="bg-blue-500 border-1 border-blue-500"></hr>
</div>
))}
</div>
</div>
<div
id="modal"
className="fixed top-0 left-0 bg-slate-400 min-h-screen w-full opacity-90"
hidden
>
<div className="min-h-screen flex justify-center items-center">
<h1>Wait a few second!</h1>
</div>
</div>
</div>
)
}
export default IndexPage

And RAG frontend page

rag.tsx
'use client'
import { Message } from 'ai'
import { useChat } from 'ai/react'
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: './api/rag'
})
// Generate a map of message role to text color
const roleToColorMap: Record<Message['role'], string> = {
system: 'red',
user: 'black',
function: 'blue',
tool: 'purple',
assistant: 'green',
data: 'orange'
}
return (
<div className="flex flex-col w-full max-w-md py-24 mx-auto stretch">
{messages.length > 0
? messages.map(m => (
<div
key={m.id}
className="whitespace-pre-wrap"
style={{ color: roleToColorMap[m.role] }}
>
<strong>{`${m.role}: `}</strong>
{m.content || JSON.stringify(m.function_call)}
<br />
<br />
</div>
))
: null}
<form onSubmit={handleSubmit}>
<input
className="fixed bottom-0 w-full max-w-md p-2 mb-8 border border-gray-300 rounded shadow-xl"
value={input}
placeholder="Say something..."
onChange={handleInputChange}
/>
</form>
</div>
)
}

Lambda OASS#

Let create a lambda function which triggered when uploading a PDF file. This function will index oass. Here is the logic

# haimtran 07 DEC 2022
# opensearch serverless
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
import boto3
import json
import os
from datetime import datetime
#
INDEX = "demo"
# chunk size
CHUNK_SIZE = 1000
#
if "BUCKET" in os.environ:
pass
else:
os.environ["BUCKET"] = "BUCKET-NAME"
# opensearch domain
if "OPENSEARCH_DOMAIN" in os.environ:
pass
else:
os.environ["OPENSEARCH_DOMAIN"] = (
"domain.us-east-1.aoss.amazonaws.com"
)
os.environ["REGION"] = "us-east-1"
# bedrock client
bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")
# s3 client
s3_client = boto3.client("s3")
# host and opensearch client
host = os.environ["OPENSEARCH_DOMAIN"]
client = boto3.client("opensearchserverless")
region = os.environ["REGION"]
credentials = boto3.Session().get_credentials()
# auth
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
"aoss",
session_token=credentials.token,
)
# opensearch client
aoss_client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
def get_embedded_vector(query: str):
"""
convert text to embedding vector using bedrock
"""
# request body
body = json.dumps({"inputText": query})
# call bedrock titan
response = bedrock_client.invoke_model(
body=body,
modelId="amazon.titan-embed-text-v1",
accept="application/json",
contentType="application/json",
)
# get embed vector
vector = json.loads(response["body"].read())["embedding"]
# return
return vector
def load_pdf_to_vectors(key: str):
"""
load pdf to opensearch
"""
# filename
filename = (
key.split("/").pop().split(".")[0] + "-" + datetime.now().isoformat() + ".pdf"
)
# download s3 file YOUR-DEFAULT-AWS-REGION
s3_client.download_file(os.environ["BUCKET"], key, f"/tmp/{filename}")
# read pdf file
loader = PyPDFLoader(f"/tmp/{filename}")
pages = loader.load_and_split()
# chunk with fixed size
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=0)
docs = text_splitter.split_documents(documents=pages)
# embedding
vectors = [get_embedded_vector(doc.page_content) for doc in docs]
# return
return vectors, docs
def index_vectors_to_aoss(vectors, docs, filename):
"""
indexing data to opensearch
"""
# bulk indexing
data = ""
for index, doc in enumerate(docs):
data += json.dumps({"index": {"_index": INDEX}}) + "\n"
data += (
json.dumps(
{
"vector_field": vectors[index],
"text": doc.page_content,
"title": f"{filename} chunk {index}",
}
)
+ "\n"
)
# bulk index
aoss_client.bulk(data)
# return
return data
def handler(event, context):
"""
seach
"""
# get filename from event
for record in event["Records"]:
# get key object
key = record["s3"]["object"]["key"]
print(key)
# filename
filename = key.split("/").pop()
# load pdf to vectors
vectors, docs = load_pdf_to_vectors(key=key)
print(vectors)
# index vectors to aoss
data = index_vectors_to_aoss(vectors=vectors, docs=docs, filename=filename)
print(data)
# return
return {
"statusCode": 200,
"headers": {
"Access-Control-Allow-Headers": "*",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST,GET",
},
"body": json.dumps({}),
}
if __name__ == "__main__":
# vecs, docs = load_pdf_to_vectors("book/datasheet.pdf")
# data = index_vectors_to_aoss(vecs, docs)
# print(data)
handler(
event={"Records": [{"s3": {"object": {"key": "book/datasheet.pdf"}}}]},
context=None,
)

Let deploy it using CDK and container. Here is requirements.txt and Dockerfile

boto3==1.34.55
opensearch-py==2.4.2
langchain==0.1.10
pypdf==4.1.0
requests-aws4auth==1.2.3

Dockerfile

FROM public.ecr.aws/lambda/python:3.9
# create code dir inside container
RUN mkdir ${LAMBDA_TASK_ROOT}/source
# copy code to container
COPY "requirements.txt" ${LAMBDA_TASK_ROOT}/source
# copy handler function to container
COPY ./index.py ${LAMBDA_TASK_ROOT}
# install dependencies for running time environment
RUN pip3 install -r ./source/requirements.txt --target "${LAMBDA_TASK_ROOT}"
RUN python3 -m pip install boto3 --upgrade
# set the CMD to your handler
CMD [ "index.handler" ]

Finally here is CDK stack. Please take note to grant the Lambda function index and query oass we need to

  • Setup IAM role for Lambda with permission to do aoss:ApiCall
  • From OASS access role, please add the IAM role

Here is the cdk stack.

lambda-stack.ts
interface LambdaAossProps extends StackProps {
opensearchDomain: string
aossCollectionArn: string
bucketName: string
}
export class LambdaAossStack extends Stack {
constructor(scope: Construct, id: string, props: LambdaAossProps) {
super(scope, id, props)
// role for lambda to read opensearch
const role = new aws_iam.Role(this, 'RoleForLambdaIndexAossBedrock', {
roleName: 'RoleForLambdaIndexAossBedrock',
assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucketName}/*`],
actions: ['s3:GetObject']
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
`arn:aws:bedrock:${this.region}::foundation-model/anthropic.claude-v2`,
`arn:aws:bedrock:${this.region}::foundation-model/stability.stable-diffusion-xl-v1`,
`arn:aws:bedrock:${this.region}::foundation-model/amazon.titan-embed-text-v1`
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
// lambda function to query opensearch
new aws_lambda.Function(this, 'LamdaQueryOpenSearch', {
functionName: 'LambdaIndexAossBedrock',
memorySize: 2048,
timeout: Duration.seconds(300),
code: aws_lambda.EcrImageCode.fromAssetImage(
path.join(__dirname, './../lambda/lambda-index-aoss/')
),
handler: aws_lambda.Handler.FROM_IMAGE,
runtime: aws_lambda.Runtime.FROM_IMAGE,
environment: {
OPENSEARCH_DOMAIN: props.opensearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region,
BUCKET: props.bucketName
},
role: role
})
}
}

Here is both apprunner and Lambda

apprunner-lambda.ts
import {
Duration,
Stack,
StackProps,
aws_apprunner,
aws_iam,
aws_lambda
} from 'aws-cdk-lib'
import { Effect } from 'aws-cdk-lib/aws-iam'
import { Construct } from 'constructs'
import * as path from 'path'
interface AppRunnerProps extends StackProps {
ecr: string
bucket: string
aossCollectionArn: string
}
export class AppRunnerStack extends Stack {
constructor(scope: Construct, id: string, props: AppRunnerProps) {
super(scope, id, props)
const buildRole = new aws_iam.Role(this, 'RoleForAppRunnerPullEcrBedrock', {
assumedBy: new aws_iam.ServicePrincipal('build.apprunner.amazonaws.com'),
roleName: 'RoleForAppRunnerPullEcrBedrock'
})
buildRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['ecr:*']
})
)
const instanceRole = new aws_iam.Role(
this,
'InstanceRoleForApprunerBedrock',
{
assumedBy: new aws_iam.ServicePrincipal(
'tasks.apprunner.amazonaws.com'
),
roleName: 'InstanceRoleForApprunnerBedrock'
}
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'arn:aws:bedrock:us-east-1::foundation-model/stability.stable-diffusion-xl-v1',
'arn:aws:bedrock:us-east-1::foundation-model/amazon.titan-embed-text-v1'
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucket}/*`],
actions: ['s3:PutObject', 's3:GetObject']
})
)
instanceRole.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
const autoscaling = new aws_apprunner.CfnAutoScalingConfiguration(
this,
'AutoScalingForGoApp',
{
autoScalingConfigurationName: 'AutoScalingForGoApp',
// min number instance
minSize: 1,
// max number instance
maxSize: 10,
// max concurrent request per instance
maxConcurrency: 100
}
)
const apprunner = new aws_apprunner.CfnService(this, 'NextBedrockService', {
serviceName: 'NextBedrockService',
sourceConfiguration: {
authenticationConfiguration: {
accessRoleArn: buildRole.roleArn
},
autoDeploymentsEnabled: false,
imageRepository: {
imageIdentifier: props.ecr,
imageRepositoryType: 'ECR',
imageConfiguration: {
port: '3000',
runtimeEnvironmentVariables: [
{
name: 'BUCKET',
value: props.bucket
},
{
name: 'HOSTNAME',
value: '0.0.0.0'
},
{
name: 'PORT',
value: '3000'
}
]
// startCommand: "",
}
}
},
instanceConfiguration: {
cpu: '1 vCPU',
memory: '2 GB',
instanceRoleArn: instanceRole.roleArn
},
observabilityConfiguration: {
observabilityEnabled: false
},
autoScalingConfigurationArn: autoscaling.ref
})
apprunner.addDependency(autoscaling)
}
}
interface LambdaAossProps extends StackProps {
opensearchDomain: string
aossCollectionArn: string
bucketName: string
}
export class LambdaAossStack extends Stack {
constructor(scope: Construct, id: string, props: LambdaAossProps) {
super(scope, id, props)
// role for lambda to read opensearch
const role = new aws_iam.Role(this, 'RoleForLambdaIndexAossBedrock', {
roleName: 'RoleForLambdaIndexAossBedrock',
assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com')
})
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:s3:::${props.bucketName}/*`],
actions: ['s3:GetObject']
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [
`arn:aws:bedrock:${this.region}::foundation-model/anthropic.claude-v2`,
`arn:aws:bedrock:${this.region}::foundation-model/stability.stable-diffusion-xl-v1`,
`arn:aws:bedrock:${this.region}::foundation-model/amazon.titan-embed-text-v1`
],
actions: [
'bedrock:InvokeModel',
'bedrock:InvokeModelWithResponseStream'
]
})
)
role.addToPolicy(
new aws_iam.PolicyStatement({
effect: Effect.ALLOW,
resources: [props.aossCollectionArn],
actions: ['aoss:APIAccessAll']
})
)
// lambda function to query opensearch
new aws_lambda.Function(this, 'LamdaQueryOpenSearch', {
functionName: 'LambdaIndexAossBedrock',
memorySize: 2048,
timeout: Duration.seconds(300),
code: aws_lambda.EcrImageCode.fromAssetImage(
path.join(__dirname, './../lambda/lambda-index-aoss/')
),
handler: aws_lambda.Handler.FROM_IMAGE,
runtime: aws_lambda.Runtime.FROM_IMAGE,
environment: {
OPENSEARCH_DOMAIN: props.opensearchDomain,
PYTHONPATH: '/var/task/package',
REGION: this.region,
BUCKET: props.bucketName
},
role: role
})
}
}

Claude3 Sonnet#

Since vercel ai does not support this model id yet, let stream it to client. First let create an /app/api/claude/route.ts

route.ts
import {
BedrockRuntime,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { NextRequest } from 'next/server'
const decoder = new TextDecoder()
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
async function* makeIterator(prompt: String) {
const claudePrompt = `\n\nHuman: ${prompt} \n\nAssistant:`
const config = {
prompt: claudePrompt,
max_tokens_to_sample: 2048,
temperature: 0.5,
top_k: 250,
top_p: 1,
stop_sequences: ['\n\nHuman:']
}
// const command = new InvokeModelWithResponseStreamCommand({
// // body: JSON.stringify(config),
// body: body,
// modelId: "anthropic.claude-3-sonnet-20240229-v1:0",
// accept: "application/json",
// contentType: "application/json",
// });
const command = new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-3-sonnet-20240229-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 1000,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt
}
]
}
]
})
})
try {
console.log('call bedrock ...')
const response = await bedrock.send(command)
if (response.body) {
console.log(response.body)
for await (const chunk of response.body) {
if (chunk.chunk) {
try {
const json = JSON.parse(decoder.decode(chunk.chunk.bytes))
yield json.delta.text
} catch (error) {
console.log(error)
yield ' '
}
}
}
}
} catch (error) {
console.log(error)
}
}
function iteratorToStream(iterator: any) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
}
})
}
function sleep(time: number) {
return new Promise(resolve => {
setTimeout(resolve, time)
})
}
const encoder = new TextEncoder()
export async function GET() {
const iterator = makeIterator('how to cook chicken soup?')
const stream = iteratorToStream(iterator)
return new Response(stream)
}
export async function POST(request: NextRequest) {
const res = await request.json()
console.log(res)
const iterator = makeIterator(res.prompt)
const stream = iteratorToStream(iterator)
return new Response(stream)
// return Response.json({ name: "hai" });
}

Please note to parse the stream response at node service side as it is less prone to error than parsing in frontend client

async function* makeIterator(prompt: String) {
const claudePrompt = `\n\nHuman: ${prompt} \n\nAssistant:`
const config = {
prompt: claudePrompt,
max_tokens_to_sample: 2048,
temperature: 0.5,
top_k: 250,
top_p: 1,
stop_sequences: ['\n\nHuman:']
}
// const command = new InvokeModelWithResponseStreamCommand({
// // body: JSON.stringify(config),
// body: body,
// modelId: "anthropic.claude-3-sonnet-20240229-v1:0",
// accept: "application/json",
// contentType: "application/json",
// });
const command = new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-3-sonnet-20240229-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 1000,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt
}
]
}
]
})
})
try {
console.log('call bedrock ...')
const response = await bedrock.send(command)
if (response.body) {
console.log(response.body)
for await (const chunk of response.body) {
if (chunk.chunk) {
try {
const json = JSON.parse(decoder.decode(chunk.chunk.bytes))
yield json.delta.text
} catch (error) {
console.log(error)
yield ' '
}
}
}
}
} catch (error) {
console.log(error)
}
}

Next create a simple front end /app/claude/page.tsx

page.tsx
'use client'
const ChatPage = () => {
const callBedrock = async () => {
const prompt = (document.getElementById('prompt') as HTMLInputElement).value
const story = document.getElementById('story-output')
story!.innerText = ''
// console.log("call bedrock ", prompt);
try {
const response = await fetch('/api/claudesonet', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ prompt: prompt })
})
// console.log(response);
const reader = response.body!.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
try {
const json = decoder.decode(value)
story!.innerText += json
} catch (error) {
console.log(error)
// story!.innerText += "ERROR";
}
}
} catch (error) {
console.log(error)
}
}
return (
<div className="min-h-screen">
<div className="max-w-3xl mx-auto px-10 py-10">
<div>
<form className="relative mb-3">
<input
type="text"
className="border-2 border-blue-500 border- w-full p-3"
id="prompt"
></input>
<button
type="submit"
className="bg-orange-500 px-10 py-2.5 rounded-md cursor-pointer absolute top-0 right-0 translate-y-1 mr-2"
onClick={event => {
event.preventDefault()
callBedrock()
}}
>
Submit
</button>
</form>
</div>
<p id="story-output"></p>
</div>
</div>
)
}
export default ChatPage

Claude3 Haiku#

Similar to Sonet, let write a simple nextjs server api or server function to invoke Bedrock, and stream to client chunk by chunk

import {
BedrockRuntime,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
import { NextRequest } from 'next/server'
const decoder = new TextDecoder()
const bedrock = new BedrockRuntime({ region: 'us-east-1' })
async function* makeIterator(prompt: String) {
const claudePrompt = `\n\nHuman: ${prompt} \n\nAssistant:`
const config = {
prompt: claudePrompt,
max_tokens_to_sample: 2048,
temperature: 0.5,
top_k: 250,
top_p: 1,
stop_sequences: ['\n\nHuman:']
}
const command = new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-3-haiku-20240307-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 2048,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: prompt
}
]
}
]
})
})
try {
console.log('call bedrock ...')
const response = await bedrock.send(command)
if (response.body) {
console.log(response.body)
for await (const chunk of response.body) {
if (chunk.chunk) {
try {
const json = JSON.parse(decoder.decode(chunk.chunk.bytes))
console.log(json)
if (json.type == 'content_block_delta') {
yield json.delta.text
// yield chunk.chunk.bytes;
}
} catch (error) {
console.log(error)
yield ' '
}
}
// await sleep(100);
}
}
} catch (error) {
console.log(error)
}
}
function iteratorToStream(iterator: any) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
}
})
}
function sleep(time: number) {
return new Promise(resolve => {
setTimeout(resolve, time)
})
}
const encoder = new TextEncoder()
export async function GET() {
const iterator = makeIterator('how to cook chicken soup?')
const stream = iteratorToStream(iterator)
return new Response(stream)
}
export async function POST(request: NextRequest) {
const res = await request.json()
console.log(res)
const iterator = makeIterator(res.prompt)
const stream = iteratorToStream(iterator)
return new Response(stream)
// return Response.json({ name: "hai" });
}

Analyse Image#

  • Call Claude3 Sonet and Haiku to analyse image
  • Capture image from camera

Invoke Bedrock Claude3 in batch mode

import { NextRequest } from 'next/server'
import {
BedrockRuntimeClient,
InvokeModelCommand,
InvokeModelWithResponseStreamCommand
} from '@aws-sdk/client-bedrock-runtime'
const decoder = new TextDecoder()
const bedrockClient = new BedrockRuntimeClient({
region: 'us-east-1'
})
const describeImageBedrockBatch = async ({ image }: { image: string }) => {
// command to claude3 sonet
const commandHaiku = new InvokeModelCommand({
modelId: 'anthropic.claude-3-haiku-20240307-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 1000,
messages: [
{
role: 'user',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/jpeg',
data: image
}
},
{
type: 'text',
text: 'this is an image about fashion, please describe be in detail, and giving positive and fun comments'
}
]
}
]
})
})
const response = await bedrockClient.send(commandHaiku)
// console.log(JSON.parse(new TextDecoder().decode(response.body)));
return JSON.parse(new TextDecoder().decode(response.body))
}

Invoke Bedrock Claude3 in stream mode

async function* makeIterator(image: String) {
const command = new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-3-haiku-20240307-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 2048,
messages: [
{
role: 'user',
content: [
{
type: 'image',
source: {
type: 'base64',
media_type: 'image/jpeg',
data: image
}
},
{
type: 'text',
text: 'this is an image about fashion, please describe be in detail, and giving positive and fun comments'
}
]
}
]
})
})
try {
console.log('call bedrock ...')
const response = await bedrockClient.send(command)
if (response.body) {
console.log(response.body)
for await (const chunk of response.body) {
if (chunk.chunk) {
try {
const json = JSON.parse(decoder.decode(chunk.chunk.bytes))
console.log(json)
if (json.type == 'content_block_delta') {
yield json.delta.text
}
} catch (error) {
console.log(error)
yield ' '
}
}
}
}
} catch (error) {
console.log(error)
}
}
function iteratorToStream(iterator: any) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(value)
}
}
})
}
export async function GET() {
return new Response('hello')
}
export async function POST(request: NextRequest) {
// parse image from request
const res = await request.json()
const image = res.image.split(',').pop()
// call bedrock batch to describe the image
// const description = await describeImageBedrockBatch({ image: image });
// return Response.json(description);
// invoke bedrock stream
const iterator = makeIterator(image)
const stream = iteratorToStream(iterator)
return new Response(stream)
}

Here is the frontend code

analyze-image.tsx
'use client'
import Script from 'next/script'
const decoder = new TextDecoder()
const MirrorPage = () => {
return (
<div>
<div className="max-w-3xl mx-auto">
<div className="flex flex-row space-x-2 mt-5">
<a href="/mirror">
<button className="bg-orange-500 px-5 py-3 rounded-sm cursor-pointer">
Claude Haiku
</button>
</a>
<a href="/mirror-claude-sonet">
<button className="bg-orange-500 px-5 py-3 rounded-sm cursor-pointer">
Claude Sonet
</button>
</a>
<a href="/mirror-llava">
<button className="bg-orange-500 px-5 py-3 rounded-sm cursor-pointer">
LLaVA
</button>
</a>
</div>
</div>
<div className="max-w-3xl mx-auto">
<div className="min-h-screen flex flex-col justify-center items-center">
<video id="video" width={800} height={600}></video>
<canvas
id="canvas"
width={800}
height={600}
style={{ display: 'none' }}
></canvas>
<button
className="bg-green-500 px-10 py-3 rounded-sm cursor-pointer mt-5"
onClick={async event => {
event.preventDefault()
document.getElementById('modal')!.classList.toggle('hidden')
const canvas = document.getElementById('canvas')
const video = document.getElementById('video')
;(canvas as any)
.getContext('2d')
.drawImage(
video,
0,
0,
(canvas as any).width,
(canvas as any).height
)
const dataUrl = (canvas as any).toDataURL('image/jpeg')
document
.getElementById('modal-photo')!
.setAttribute('src', dataUrl)
document.getElementById('image-description')!.innerHTML =
'I am thinking ... '
const response = await fetch('/api/image-haiku', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ image: dataUrl })
})
// parse batch response
// const desc = await response.text();
// document.getElementById("image-description")!.innerHTML =
// JSON.parse(desc).content[0].text;
// parse stream response
const story = document.getElementById('image-description')
story!.innerText = ''
const reader = response.body!.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
try {
const json = decoder.decode(value)
story!.innerText += json
console.log(json)
} catch (error) {
console.log(error)
// story!.innerText += "ERROR";
}
}
}}
>
Submit
</button>
</div>
<div
className="bg-gray-100 min-h-screen w-[100%] fixed top-0 left-0 opacity-100 hidden"
id="modal"
>
<div className="flex flex-col min-h-screen justify-center items-center">
<div className="relative">
<img width={800} height={600} id="modal-photo"></img>
<p
id="image-description"
className="absolute bottom-0 left-0 bg-gray-100 p-5 opacity-85"
>
Lorem ipsum dolor sit, amet consectetur adipisicing elit. Sequi,
enim veniam! Blanditiis magni quasi id quia harum voluptatibus,
tempora voluptates illum vel cum. Consequuntur quam ipsa, unde
eos inventore nemo. Lorem ipsum dolor sit amet, consectetur
adipisicing elit. Dolorum, expedita nam eos voluptatibus
consequatur modi quis aut ducimus veritatis omnis laborum
accusantium? Illum inventore soluta suscipit ipsam velit
consectetur incidunt?
</p>
</div>
<button
className="bg-orange-500 px-10 py-3 rounded-sm mt-5"
onClick={event => {
event.preventDefault()
document.getElementById('modal')!.classList.toggle('hidden')
}}
>
Close
</button>
</div>
</div>
</div>
<Script>
{`
const video = document.getElementById("video");
const getCamera = async () => {
let stream = null;
try {
stream = await navigator.mediaDevices.getUserMedia({
video: { width: 1280, height: 720 },
});
video.srcObject = stream;
video.onloadedmetadata = () => {
video.play();
};
} catch (error) {
console.log(error);
}
};
const main = async () => {
await getCamera();
};
main();
`}
</Script>
</div>
)
}
export default MirrorPage

Process PDF#

  • Use Amazon Textract and handle async jobs
  • Use simple PyPDFLoader langchain CharacterTextSplitter
  • Load documents (chunks) to AOSS

First let use Textract to process a PDF uploaded to S3

import boto3
import time
# data in s3
BUCKET = ''
FILE = 'book/datasheet.pdf'
# create textract client
client = boto3.client("textract")
# create a job
response = client.start_document_text_detection(
DocumentLocation={
'S3Object': {
'Bucket': BUCKET,
'Name': FILE
}
}
)
# check job status
job_id = response['JobId']
# get job status
response = client.get_document_text_detection(JobId=job_id)
response['JobStatus']
# get response
pages = []
time.sleep(1)
response = client.get_document_text_detection(JobId=job_id)
pages.append(response)
next_token = None
if 'NextToken' in response:
next_token = response['NextToken']
while next_token:
time.sleep(1)
response = client.get_document_text_detection(
JobId=job_id,
NextToken=next_token
)
# update next_token
next_token=None
if 'NextToken' in response:
next_token = response['NextToken']

Second let write a lambda function to process a PDF file uploaded to S3, and index to AOSS

from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader("datasheet.pdf")
pages = loader.load_and_split()
# get number of page
len(pages)
# print pages
for page in pages:
print(page.page_content)
# text splitter
from langchain.text_splitter import CharacterTextSplitter
# define chunk length and overlap window
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
# split pages into docs each doc chunk is 1000 token
docs = text_splitter.split_documents(documents=pages)
# check a doc or chunk content
docs[0].page_content

Here is python lambda function which using PyPDFLoader, langchain to load pdf and index into AOSS

load-pdf-index-aoss.py
import boto3
import json
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
BUCKET = 'cdk-entest-videos'
FILE = 'book/datasheet.pdf'
# chunk size
CHUNK_SIZE = 1000
# bedrock client
bedrock_client = boto3.client("bedrock-runtime")
# s3 client
s3_client = boto3.client("s3")
def load_pdf_to_aoss(filename: str):
"""
load pdf to opensearch
"""
# download s3 file
s3_client.download_file(
BUCKET,
FILE,
f'/tmp/{filename}'
)
# read pdf file
loader = PyPDFLoader(f'/tmp/{filename}')
pages = loader.load_and_split()
# chunk with fixed size
text_splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=0)
docs = text_splitter.split_documents(documents=pages)
# embedding
vectors = []
for doc in docs:
body = json.dumps({
"inputText": docs[0].page_content
})
response = bedrock_client.invoke_model(
body=body,
modelId='amazon.titan-embed-text-v1',
accept='application/json',
contentType='application/json')
vector = json.loads(response['body'].read())['embedding']
vectors.append(vector)
# indexing to aoss
return vectors

Finally let write a function to index documents or chunk of text to AOSS

import boto3
import json
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import CharacterTextSplitter
# chunk size
CHUNK_SIZE = 1000
# bedrock client
bedrock_client = boto3.client("bedrock-runtime")
def load_document_to_aoss(document: str):
"""
load pdf to opensearch
"""
# chunk with fixed size
text_splitter = CharacterTextSplitter(
separator="\n\n",
chunk_size=10,
chunk_overlap=0,
length_function=len,
is_separator_regex=False)
docs = text_splitter.create_documents([document])
# embedding
vectors = []
for doc in docs:
body = json.dumps({
"inputText": docs[0].page_content
})
response = bedrock_client.invoke_model(
body=body,
modelId='amazon.titan-embed-text-v1',
accept='application/json',
contentType='application/json')
vector = json.loads(response['body'].read())['embedding']
vectors.append(vector)
# indexing to aoss
return vectors

Basic Prompt#

Let create a multiple conversation turns which mean chat history between client and server. First we have to update the client web to store the chat messages history. Here is the full code

Here is an example

const commandHaiku = new InvokeModelWithResponseStreamCommand({
modelId: 'anthropic.claude-3-haiku-20240307-v1:0',
contentType: 'application/json',
accept: 'application/json',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 1024,
system: `Please response in Vietnamese only!`,
messages: [
{
role: 'user',
content: [
{
type: 'text',
text: 'How to cook chicken soup?'
}
]
},
{
role: 'assistant',
content: [
{
type: 'text',
text: `Here is a basic recipe for cooking chicken soup: ...`
}
]
},
{
role: 'user',
content: [
{
type: 'text',
text: prompt
}
]
}
]
})
})

Here is the full code for testing

bedrock-prompt.ts
import {
BedrockRuntimeClient,
InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";
const decoder = new TextDecoder();
const bedrockClient = new BedrockRuntimeClient({ region: "us-east-1" });
const chatBedrock = async ({ prompt }: { prompt: string }) => {
//
let answer = "";
const messages = [
{
role: "user",
content: [
{
type: "text",
text: "How to cook chicken soup?",
},
],
},
{
role: "assistant",
content: [
{
type: "text",
text: `Here is a basic recipe for cooking chicken soup:
Ingredients:
- 1 whole chicken or 3-4 lbs of chicken parts (breasts, thighs, drumsticks)
- 8 cups of water or chicken broth
- 2 large carrots, peeled and sliced
- 2 celery stalks, sliced
- 1 onion, diced
- 2 cloves garlic, minced
- 1 bay leaf
- 1 tsp dried thyme
- Salt and pepper to taste
- Egg noodles or other pasta (optional)
Instructions:
1. Place the whole chicken or chicken parts in a large pot and cover with the water or broth. Bring to a boil over high heat.
2. Once boiling, reduce heat to medium-low, cover and simmer for 45-60 minutes, until the chicken is cooked through.
3. Remove the chicken from the pot and set aside. Skim any foam or fat that rises to the surface.
4. Add the carrots, celery, onion, garlic, bay leaf and thyme to the pot. Simmer for 15-20 minutes until the vegetables are tender.
5. Meanwhile, remove the meat from the chicken bones and shred or chop it into bite-size pieces.
6. Add the cooked chicken back to the pot and season with salt and pepper to taste.
7. If using, add the egg noodles or other pasta and cook for the time specified on the package.
8. Serve the chicken soup hot. You can garnish with fresh parsley or dill if desired.
The longer you simmer the soup, the more flavorful it will become. You can also add other vegetables like potatoes, peas, or corn. Adjust seasoning as needed.`,
},
],
},
{
role: "user",
content: [
{
type: "text",
text: prompt,
},
],
},
];
const messages1 = [
{
role: "user",
content: "How to cook chicken soup?",
},
{
role: "assistant",
content: `Here is a basic recipe for cooking chicken soup:
Ingredients:
- 1 whole chicken or 3-4 lbs of chicken parts (breasts, thighs, drumsticks)
- 8 cups of water or chicken broth
- 2 large carrots, peeled and sliced
- 2 celery stalks, sliced
- 1 onion, diced
- 2 cloves garlic, minced
- 1 bay leaf
- 1 tsp dried thyme
- Salt and pepper to taste
- Egg noodles or other pasta (optional)
Instructions:
1. Place the whole chicken or chicken parts in a large pot and cover with the water or broth. Bring to a boil over high heat.
2. Once boiling, reduce heat to medium-low, cover and simmer for 45-60 minutes, until the chicken is cooked through.
3. Remove the chicken from the pot and set aside. Skim any foam or fat that rises to the surface.
4. Add the carrots, celery, onion, garlic, bay leaf and thyme to the pot. Simmer for 15-20 minutes until the vegetables are tender.
5. Meanwhile, remove the meat from the chicken bones and shred or chop it into bite-size pieces.
6. Add the cooked chicken back to the pot and season with salt and pepper to taste.
7. If using, add the egg noodles or other pasta and cook for the time specified on the package.
8. Serve the chicken soup hot. You can garnish with fresh parsley or dill if desired.
The longer you simmer the soup, the more flavorful it will become. You can also add other vegetables like potatoes, peas, or corn. Adjust seasoning as needed.`,
},
{
role: "user",
content: prompt,
},
];
const commandHaiku = new InvokeModelWithResponseStreamCommand({
modelId: "anthropic.claude-3-haiku-20240307-v1:0",
contentType: "application/json",
accept: "application/json",
body: JSON.stringify({
anthropic_version: "bedrock-2023-05-31",
max_tokens: 1024,
system: `Please response in Vietnamese only!`,
messages: messages1,
}),
});
const response = await bedrockClient.send(commandHaiku);
console.log(response.body);
if (response.body) {
for await (const chunk of response.body) {
if (chunk.chunk) {
try {
const data = JSON.parse(decoder.decode(chunk.chunk.bytes));
if (data.type == "content_block_start") {
console.log(data.content_block.text);
answer += data.content_block.text;
} else if (data.type == "content_block_delta") {
console.log(data.delta.text);
answer += data.delta.text;
} else {
}
} catch (error) {
console.log(error);
}
// console.log(JSON.parse(decoder.decode(chunk.chunk.bytes)));
}
}
}
console.log(answer);
};
const main = async () => {
await chatBedrock({
prompt: "How to customize it for 3 years old girl?",
});
};
main();

And here is the frontend code

haiku.html
"use client";
import { SendIcon } from "@/components/icons";
import { Layout } from "@/components/layout";
const ChatPage = () => {
// conversation turns
let messages: any[] = [];
const callBedrock = async () => {
// get user question from frotend
const userQuestion = (
document.getElementById("user-question") as HTMLInputElement
).value;
// present model response to frontend
const modelAnswer = document.getElementById("model-answer");
modelAnswer!.innerText = "";
// push user userQuestion to messages
messages.push({ role: "user", content: userQuestion });
// keep the most recent 9 messages [user,bot,user,...]
messages = messages.slice(-9);
try {
const response = await fetch("/api/claudehaiku", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ messages: messages }),
});
// console.log(response);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
try {
const json = decoder.decode(value);
modelAnswer!.innerText += json;
console.log(json);
} catch (error) {
console.log(error);
// modelAnswer!.innerText += "ERROR";
}
}
// push bot response to messages
messages.push({ role: "assistant", content: modelAnswer!.innerText });
} catch (error) {
console.log(error);
}
};
const children = (
<div>
<form>
<div className="fixed top-0 mt-8 w-full max-w-md px-5">
<input
type="text"
className="w-full p-2 border border-gray-300 rounded shadow-xl"
id="user-question"
placeholder="say something enter ..."
></input>
<button
className="px-5 py-1 rounded-sm absolute top-[50%] right-2 translate-y-[-50%] opacity-80"
onClick={(event) => {
event.preventDefault();
callBedrock();
}}
>
<SendIcon className="w-5 h-5 text-green-500" />
</button>
</div>
</form>
<p
id="model-answer"
className="px-5"
style={{ color: "green", marginBottom: "10px" }}
></p>
</div>
);
return <Layout>{children}</Layout>;
};
export default ChatPage;

The sever code is almost no change, just req.messages and send to bedrocks

Architecture#

next-bedrock-arch

Reference#