Introduction#
GitHub this note shows how to stream response from Bedrock to web application through Appsync
- Setup appsync and cognito auth using CDK
- Create Lambda resolver and Bedrock
- Simple web subscription using Amplify graphql client
NodejsFunction#
Let create a lambda function
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3'const s3Client = new S3Client({region: 'ap-southeast-1'})export const handler = async (event: any) => {const response = await s3Client.send(new PutObjectCommand({Bucket: 'bedrock-demo-03122023',Key: 'hehe.txt',Body: 'hello'}))console.log(response)return {body: JSON.stringify({ message: 'hello' }),statusCode: 200}}
The package.json and package-lock.json
{"name": "bedrocklambda","version": "1.0.0","description": "","main": "index.js","scripts": {"test": "echo \"Error: no test specified\" && exit 1"},"author": "","license": "ISC","types": "./index.d.ts","dependencies": {"@aws-sdk/client-bedrock-runtime": "^3.428.0","@aws-sdk/client-s3": "^3.465.0","@types/aws-lambda": "^8.10.124","aws-lambda": "^1.0.7","package.json": "^2.0.1"}}
Use CDK NodejsFunction to bundle the lambda function written in TypeScript
import { Duration, Stack, StackProps, aws_lambda_nodejs } from 'aws-cdk-lib'import { Construct } from 'constructs'import * as aws_lambda from 'aws-cdk-lib/aws-lambda'import * as path from 'path'export class BackendStack extends Stack {constructor(scope: Construct, id: string, props: StackProps) {super(scope, id, props)const bedrockLambda = new aws_lambda_nodejs.NodejsFunction(this,'BedrockLambda',{functionName: 'BedrockLambda',entry: path.join(__dirname, './../lambda/index.ts'),handler: 'handler',runtime: aws_lambda.Runtime.NODEJS_18_X,timeout: Duration.seconds(29),bundling: {nodeModules: ['@aws-sdk/client-bedrock-runtime','@types/aws-lambda','@aws-sdk/client-s3']},depsLockFilePath: path.join(__dirname, './../lambda/package-lock.json')})}}
Cognito#
Let create cognito userpool and identity pool for authentication and authorization
const userPool = new aws_cognito.UserPool(this, 'BedrockUserPool', {userPoolName: 'bedrock',selfSignUpEnabled: true,autoVerify: {email: true},accountRecovery: aws_cognito.AccountRecovery.EMAIL_ONLY,userVerification: {emailStyle: aws_cognito.VerificationEmailStyle.CODE},standardAttributes: {email: {required: true,mutable: true}}})const userPoolClient = new aws_cognito.UserPoolClient(this,'BedrockUserPoolClient',{userPool})const identityPool = new IdentityPool(this, 'BedrockIdentityPool', {identityPoolName: 'BedrockIdentityPool',allowUnauthenticatedIdentities: true,authenticationProviders: {userPools: [new UserPoolAuthenticationProvider({userPool: userPool,userPoolClient: userPoolClient})]}})
Appsync Amplify Construct#
Use @aws-amplify/graphql-api-construct to create data models and resolvers in Appsync. This high level construct automatly create models and resolvers from the provided schema.graphql.
This will create a nested stack
new AmplifyGraphqlApi(this, 'BedrockApi', {apiName: 'BedrockApi',definition: AmplifyGraphqlDefinition.fromFiles(path.join(__dirname, 'schema.graphql')),authorizationModes: {defaultAuthorizationMode: 'API_KEY',apiKeyConfig: {expires: Duration.days(30)},userPoolConfig: {userPool},iamConfig: {identityPoolId: identityPool.identityPoolId,unauthenticatedUserRole: identityPool.unauthenticatedRole,authenticatedUserRole: identityPool.authenticatedRole}}})
Here is schema.graphql
type Todo @model @auth(rules: [{ allow: public }]) {id: ID!content: String}type Query {summaryNote(prompt: String): String}
Lambda Resolver#
- Leverage schema.graphql and AmplifyGraphqlApi
- Use addLambdaDataSource and addResolver method
Firs way is to leverage schema.graphql
type Note@model@auth(rules: [{ allow: owner }, { allow: public, operations: [read] }]) {name: String!completed: Boolean!owner: String @auth(rules: [{ allow: owner, operations: [read, delete] }])}type Query {noteSummary(msg: String): String@function(name: "noteSummary")@auth(rules: [{ allow: private }])}
and update the AmplifyGraphqlApi with functionNameMap as below
new AmplifyGraphqlApi(this, 'BedrockApi', {apiName: 'BedrockApi',definition: AmplifyGraphqlDefinition.fromFiles(path.join(__dirname, 'schema.graphql')),authorizationModes: {defaultAuthorizationMode: 'API_KEY',apiKeyConfig: {expires: Duration.days(30)},userPoolConfig: {userPool},iamConfig: {identityPoolId: identityPool.identityPoolId,unauthenticatedUserRole: identityPool.unauthenticatedRole,authenticatedUserRole: identityPool.authenticatedRole}},// HERE map to the lambda resolverfunctionNameMap: { noteSummary: bedrockLambda }})
Second method, it is possible to use normal Appsync construct to build data models and resolve
const appsync = new AmplifyGraphqlApi(this, 'AmplifyCdkGraphQlApi', {definition: AmplifyGraphqlDefinition.fromFiles(path.join(__dirname, 'schema.graphql')),authorizationModes: {defaultAuthorizationMode: 'API_KEY',apiKeyConfig: {expires: Duration.days(30)},userPoolConfig: {userPool: userPool},iamConfig: {identityPoolId: identityPoolId,unauthenticatedUserRole: unauthenticatedUserRole,authenticatedUserRole: authenticatedUserRole}}})const lambdaDataSource = appsync.addLambdaDataSource('TestLambdaDataSource',bedrockLambda)appsync.addResolver('TestLambdaResolver', {dataSource: lambdaDataSource,typeName: 'Query',fieldName: 'generateTacoRecipe',code: Code.fromAsset(path.join(__dirname, './../resolver/generateTacoRecipe.js')),runtime: FunctionRuntime.JS_1_0_0})
Here is content of resolver which invoke lambda
import { util } from '@aws-appsync/utils'export function request(ctx) {const { source, args } = ctxreturn {operation: 'Invoke',payload: { field: ctx.info.fieldName, arguments: args, source }}}export function response(ctx) {return ctx.result}
Please take note the schema.graphql for this method
type Todo @model @auth(rules: [{ allow: public }]) {id: ID!content: String}type Query {summaryNote(prompt: String): String}
HTTP Resolver#
const bedrockDataSource = appsync.addHttpDataSource('BedrockDataSource','https://bedrock-runtime.us-east-1.amazonaws.com',{authorizationConfig: {signingRegion: 'us-east-1',signingServiceName: 'bedrock'}})bedrockDataSource.grantPrincipal.addToPrincipalPolicy(new PolicyStatement({resources: ['arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2'],actions: ['bedrock:InvokeModel']}))const generateTacoRecipeResolver = appsync.addResolver('BedrockResolver', {dataSource: bedrockDataSource,typeName: 'Query',fieldName: 'bedrock',code: Code.fromAsset(path.join(__dirname, './../resolver/bedrock.js')),runtime: FunctionRuntime.JS_1_0_0})
Subscription#
- Handle duplicate subscriber
- Encode and decode base64
- Call the Lambda resolver
First submit button to send a query to Lambda resolver which send a prompt to Bedrod
const onsubmit = async () => {setMessages([])let prompt = (document.getElementById('prompt') as HTMLInputElement).valueconsole.log(prompt)client.graphql({query: summaryNote,variables: {prompt: prompt}})}
Second subscribe and decode
const subscribe = async () => {const sub = client.graphql({ query: onCreateTodo }).subscribe({next: ({ data }) => {console.log(data)const todo = data.onCreateTodoconst message = JSON.parse(Buffer.from(todo.content!, 'base64').toString())setMessages(old => [...old, message.completion + '\n'])},error: error => console.log(error)})console.log('subscribe ...', sub)return () => sub.unsubscribe()}
The Lambda function which process the stream from Bedrock and update table in graphql
export const handler = async (event: AppSyncResolverEvent<AppSyncEvent>) => {// parse prompt from eventlet prompt = ''try {prompt = event.arguments.prompt} catch (error) {prompt = 'how to cook a chicken soup?'}const claudePrompt = `\n\nHuman: ${prompt} \n\nAssistant:`const config = {prompt: claudePrompt,max_tokens_to_sample: 2048,temperature: 0.5,top_k: 250,top_p: 1,stop_sequences: ['\n\nHuman:']}const command = new InvokeModelWithResponseStreamCommand({body: JSON.stringify(config),modelId: 'anthropic.claude-v2',accept: 'application/json',contentType: 'application/json'})const response = await bedrock.send(command)if (response.body) {for await (const chunk of response.body) {if (chunk.chunk) {// convert bytes to base64 stringconst based64 = Buffer.from(chunk.chunk.bytes as any,'base64').toString('base64')// update graphqlcreateTodo(based64)}}}}
The code in Lambda to update or do mutation on creaeTodo table
const createTodo = async (message: string) => {const createTodoQuery = `mutation createTodo {createTodo(input: {content: "${message}"}) {contentcreatedAtidupdatedAt}}`try {const response = await axios.post(endpoint,{query: createTodoQuery,variables: {},operationName: 'createTodo'},{headers: {'x-api-key': apiKey}})} catch (error) {// console.log("error update graphql", error);}}
Hosting#
- Update next.config.js
- Simple manual deploy from CLI which zip and upload to S3 and CloudFront
- This S3 and CloudFront does not show up in your account
Please update the next.config.js first
/** @type {import('next').NextConfig} */const nextConfig = {output: 'export'}module.exports = nextConfig
Manuall hosting, first init a new amplify project
amplify init
Then add hosting
amplify add hosting
Then push and publish
amplify pushamplify publish
Python Client#
let create a simple graphql client in python
from urllib import requestfrom datetime import datetimeimport simplejsonclass GraphqlClient:"""query graphql appsync"""def __init__(self, endpoint, headers):self.endpoint = endpointself.headers = headers@staticmethoddef serialization_helper(o):"""test"""if isinstance(o, datetime):return o.strftime("%Y-%m-%dT%H:%M:%S.000Z")def execute(self, query, operation_name, variables={}):"""test"""data = simplejson.dumps({"query": query, "variables": variables, "operationName": operation_name},default=self.serialization_helper,ignore_nan=True,)r = request.Request(headers=self.headers,url=self.endpoint,method="POST",data=data.encode("utf8"),)response = request.urlopen(r).read()return response.decode("utf8")
Let create a mutation todo
def create_todo(client):"""test"""message = "should be working now man"query = f"""mutation createTodo {{createTodo(input: {{content: "{message}" }}) {{contentcreatedAtidupdatedAt}}}}"""#operation_name = "createTodo"#print(query)response = client.execute(query=query,operation_name=operation_name)#print(response)