Introduction#

web-css-color

GitHub this note shows how to stream response from Bedrock to web application through Appsync

  • Setup appsync and cognito auth using CDK
  • Create Lambda resolver and Bedrock
  • Simple web subscription using Amplify graphql client

NodejsFunction#

Let create a lambda function

import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'
const s3Client = new S3Client({
region: 'ap-southeast-1'
})
export const handler = async (event: any) => {
const response = await s3Client.send(
new PutObjectCommand({
Bucket: 'bedrock-demo-03122023',
Key: 'hehe.txt',
Body: 'hello'
})
)
console.log(response)
return {
body: JSON.stringify({ message: 'hello' }),
statusCode: 200
}
}

The package.json and package-lock.json

{
"name": "bedrocklambda",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"types": "./index.d.ts",
"dependencies": {
"@aws-sdk/client-bedrock-runtime": "^3.428.0",
"@aws-sdk/client-s3": "^3.465.0",
"@types/aws-lambda": "^8.10.124",
"aws-lambda": "^1.0.7",
"package.json": "^2.0.1"
}
}

Use CDK NodejsFunction to bundle the lambda function written in TypeScript

import { Duration, Stack, StackProps, aws_lambda_nodejs } from 'aws-cdk-lib'
import { Construct } from 'constructs'
import * as aws_lambda from 'aws-cdk-lib/aws-lambda'
import * as path from 'path'
export class BackendStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props)
const bedrockLambda = new aws_lambda_nodejs.NodejsFunction(
this,
'BedrockLambda',
{
functionName: 'BedrockLambda',
entry: path.join(__dirname, './../lambda/index.ts'),
handler: 'handler',
runtime: aws_lambda.Runtime.NODEJS_18_X,
timeout: Duration.seconds(29),
bundling: {
nodeModules: [
'@aws-sdk/client-bedrock-runtime',
'@types/aws-lambda',
'@aws-sdk/client-s3'
]
},
depsLockFilePath: path.join(__dirname, './../lambda/package-lock.json')
}
)
}
}

Cognito#

Let create cognito userpool and identity pool for authentication and authorization

const userPool = new aws_cognito.UserPool(this, 'BedrockUserPool', {
userPoolName: 'bedrock',
selfSignUpEnabled: true,
autoVerify: {
email: true
},
accountRecovery: aws_cognito.AccountRecovery.EMAIL_ONLY,
userVerification: {
emailStyle: aws_cognito.VerificationEmailStyle.CODE
},
standardAttributes: {
email: {
required: true,
mutable: true
}
}
})
const userPoolClient = new aws_cognito.UserPoolClient(
this,
'BedrockUserPoolClient',
{
userPool
}
)
const identityPool = new IdentityPool(this, 'BedrockIdentityPool', {
identityPoolName: 'BedrockIdentityPool',
allowUnauthenticatedIdentities: true,
authenticationProviders: {
userPools: [
new UserPoolAuthenticationProvider({
userPool: userPool,
userPoolClient: userPoolClient
})
]
}
})

Appsync Amplify Construct#

Use @aws-amplify/graphql-api-construct to create data models and resolvers in Appsync. This high level construct automatly create models and resolvers from the provided schema.graphql.

This will create a nested stack

new AmplifyGraphqlApi(this, 'BedrockApi', {
apiName: 'BedrockApi',
definition: AmplifyGraphqlDefinition.fromFiles(
path.join(__dirname, 'schema.graphql')
),
authorizationModes: {
defaultAuthorizationMode: 'API_KEY',
apiKeyConfig: {
expires: Duration.days(30)
},
userPoolConfig: {
userPool
},
iamConfig: {
identityPoolId: identityPool.identityPoolId,
unauthenticatedUserRole: identityPool.unauthenticatedRole,
authenticatedUserRole: identityPool.authenticatedRole
}
}
})

Here is schema.graphql

type Todo @model @auth(rules: [{ allow: public }]) {
id: ID!
content: String
}
type Query {
summaryNote(prompt: String): String
}

Lambda Resolver#

  • Leverage schema.graphql and AmplifyGraphqlApi
  • Use addLambdaDataSource and addResolver method

Firs way is to leverage schema.graphql

type Note
@model
@auth(rules: [{ allow: owner }, { allow: public, operations: [read] }]) {
name: String!
completed: Boolean!
owner: String @auth(rules: [{ allow: owner, operations: [read, delete] }])
}
type Query {
noteSummary(msg: String): String
@function(name: "noteSummary")
@auth(rules: [{ allow: private }])
}

and update the AmplifyGraphqlApi with functionNameMap as below

new AmplifyGraphqlApi(this, 'BedrockApi', {
apiName: 'BedrockApi',
definition: AmplifyGraphqlDefinition.fromFiles(
path.join(__dirname, 'schema.graphql')
),
authorizationModes: {
defaultAuthorizationMode: 'API_KEY',
apiKeyConfig: {
expires: Duration.days(30)
},
userPoolConfig: {
userPool
},
iamConfig: {
identityPoolId: identityPool.identityPoolId,
unauthenticatedUserRole: identityPool.unauthenticatedRole,
authenticatedUserRole: identityPool.authenticatedRole
}
},
// HERE map to the lambda resolver
functionNameMap: { noteSummary: bedrockLambda }
})

Second method, it is possible to use normal Appsync construct to build data models and resolve

const appsync = new AmplifyGraphqlApi(this, 'AmplifyCdkGraphQlApi', {
definition: AmplifyGraphqlDefinition.fromFiles(
path.join(__dirname, 'schema.graphql')
),
authorizationModes: {
defaultAuthorizationMode: 'API_KEY',
apiKeyConfig: {
expires: Duration.days(30)
},
userPoolConfig: {
userPool: userPool
},
iamConfig: {
identityPoolId: identityPoolId,
unauthenticatedUserRole: unauthenticatedUserRole,
authenticatedUserRole: authenticatedUserRole
}
}
})
const lambdaDataSource = appsync.addLambdaDataSource(
'TestLambdaDataSource',
bedrockLambda
)
appsync.addResolver('TestLambdaResolver', {
dataSource: lambdaDataSource,
typeName: 'Query',
fieldName: 'generateTacoRecipe',
code: Code.fromAsset(
path.join(__dirname, './../resolver/generateTacoRecipe.js')
),
runtime: FunctionRuntime.JS_1_0_0
})

Here is content of resolver which invoke lambda

import { util } from '@aws-appsync/utils'
export function request(ctx) {
const { source, args } = ctx
return {
operation: 'Invoke',
payload: { field: ctx.info.fieldName, arguments: args, source }
}
}
export function response(ctx) {
return ctx.result
}

Please take note the schema.graphql for this method

type Todo @model @auth(rules: [{ allow: public }]) {
id: ID!
content: String
}
type Query {
summaryNote(prompt: String): String
}

HTTP Resolver#

const bedrockDataSource = appsync.addHttpDataSource(
'BedrockDataSource',
'https://bedrock-runtime.us-east-1.amazonaws.com',
{
authorizationConfig: {
signingRegion: 'us-east-1',
signingServiceName: 'bedrock'
}
}
)
bedrockDataSource.grantPrincipal.addToPrincipalPolicy(
new PolicyStatement({
resources: [
'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2'
],
actions: ['bedrock:InvokeModel']
})
)
const generateTacoRecipeResolver = appsync.addResolver('BedrockResolver', {
dataSource: bedrockDataSource,
typeName: 'Query',
fieldName: 'bedrock',
code: Code.fromAsset(path.join(__dirname, './../resolver/bedrock.js')),
runtime: FunctionRuntime.JS_1_0_0
})

Subscription#

  • Handle duplicate subscriber
  • Encode and decode base64
  • Call the Lambda resolver

First submit button to send a query to Lambda resolver which send a prompt to Bedrod

const onsubmit = async () => {
setMessages([])
let prompt = (document.getElementById('prompt') as HTMLInputElement).value
console.log(prompt)
client.graphql({
query: summaryNote,
variables: {
prompt: prompt
}
})
}

Second subscribe and decode

const subscribe = async () => {
const sub = client.graphql({ query: onCreateTodo }).subscribe({
next: ({ data }) => {
console.log(data)
const todo = data.onCreateTodo
const message = JSON.parse(
Buffer.from(todo.content!, 'base64').toString()
)
setMessages(old => [...old, message.completion + '\n'])
},
error: error => console.log(error)
})
console.log('subscribe ...', sub)
return () => sub.unsubscribe()
}

The Lambda function which process the stream from Bedrock and update table in graphql

export const handler = async (event: AppSyncResolverEvent<AppSyncEvent>) => {
// parse prompt from event
let prompt = ''
try {
prompt = event.arguments.prompt
} catch (error) {
prompt = 'how to cook a chicken soup?'
}
const claudePrompt = `\n\nHuman: ${prompt} \n\nAssistant:`
const config = {
prompt: claudePrompt,
max_tokens_to_sample: 2048,
temperature: 0.5,
top_k: 250,
top_p: 1,
stop_sequences: ['\n\nHuman:']
}
const command = new InvokeModelWithResponseStreamCommand({
body: JSON.stringify(config),
modelId: 'anthropic.claude-v2',
accept: 'application/json',
contentType: 'application/json'
})
const response = await bedrock.send(command)
if (response.body) {
for await (const chunk of response.body) {
if (chunk.chunk) {
// convert bytes to base64 string
const based64 = Buffer.from(
chunk.chunk.bytes as any,
'base64'
).toString('base64')
// update graphql
createTodo(based64)
}
}
}
}

The code in Lambda to update or do mutation on creaeTodo table

const createTodo = async (message: string) => {
const createTodoQuery = `
mutation createTodo {
createTodo(input: {content: "${message}"}) {
content
createdAt
id
updatedAt
}
}
`
try {
const response = await axios.post(
endpoint,
{
query: createTodoQuery,
variables: {},
operationName: 'createTodo'
},
{
headers: {
'x-api-key': apiKey
}
}
)
} catch (error) {
// console.log("error update graphql", error);
}
}

Hosting#

  • Update next.config.js
  • Simple manual deploy from CLI which zip and upload to S3 and CloudFront
  • This S3 and CloudFront does not show up in your account

Please update the next.config.js first

/** @type {import('next').NextConfig} */
const nextConfig = {
output: 'export'
}
module.exports = nextConfig

Manuall hosting, first init a new amplify project

amplify init

Then add hosting

amplify add hosting

Then push and publish

amplify push
amplify publish

Python Client#

let create a simple graphql client in python

from urllib import request
from datetime import datetime
import simplejson
class GraphqlClient:
"""
query graphql appsync
"""
def __init__(self, endpoint, headers):
self.endpoint = endpoint
self.headers = headers
@staticmethod
def serialization_helper(o):
"""
test
"""
if isinstance(o, datetime):
return o.strftime("%Y-%m-%dT%H:%M:%S.000Z")
def execute(self, query, operation_name, variables={}):
"""
test
"""
data = simplejson.dumps(
{"query": query, "variables": variables, "operationName": operation_name},
default=self.serialization_helper,
ignore_nan=True,
)
r = request.Request(
headers=self.headers,
url=self.endpoint,
method="POST",
data=data.encode("utf8"),
)
response = request.urlopen(r).read()
return response.decode("utf8")

Let create a mutation todo

def create_todo(client):
"""
test
"""
message = "should be working now man"
query = f"""
mutation createTodo {{
createTodo(input: {{content: "{message}" }}) {{
content
createdAt
id
updatedAt
}}
}}
"""
#
operation_name = "createTodo"
#
print(query)
response = client.execute(
query=query,
operation_name=operation_name
)
#
print(response)

Reference#