S3 event and DynamoDB stream trigger Lambda#
CDK stack#
Use the same role for two lambdas
const role = new aws_iam.Role(this, 'RoleForLambdaIcaServerlessDemo', {assumedBy: new aws_iam.ServicePrincipal('lambda.amazonaws.com'),roleName: 'RoleForLambdaIcaServerlessDemo'})// inline policiesrole.attachInlinePolicy(new aws_iam.Policy(this, 'PolicyForLambdaIcaServerlessDemo', {policyName: 'PolicyForLambdaIcaServerlessDemo',statements: [// acces s3new aws_iam.PolicyStatement({effect: aws_iam.Effect.ALLOW,actions: ['s3:*', 's3-object-lambda:*'],resources: ['arn:aws:s3:::haimtran-workspace/*']}),// write to dynamo dbnew aws_iam.PolicyStatement({effect: aws_iam.Effect.ALLOW,actions: ['dynamodb:*'],resources: ['*']}),// send snsnew aws_iam.PolicyStatement({effect: aws_iam.Effect.ALLOW,actions: ['sns:*'],resources: ['*']})]}))
lambda to write to dynamodb
const func = new aws_lambda.Function(this, 'CdkLambdaIcaDemo', {functionName: 'CdkLambdaIcaDemo',runtime: aws_lambda.Runtime.PYTHON_3_8,memorySize: 512,timeout: Duration.seconds(15),code: aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),handler: 'lambda_write_ddb.handler',role: role})
lambda to send sns
const lambda_sns = new aws_lambda.Function(this, 'IcaLambdaSnsDemo', {functionName: 'LambdaSnsIcaDemo',code: aws_lambda.Code.fromAsset(path.join(__dirname, './../lambda')),handler: 'lambda_send_sns.handler',runtime: aws_lambda.Runtime.PYTHON_3_8,role: role})
an existed S3 trigger lambda
// an existed s3 trigger a lambdaconst bucket = aws_s3.Bucket.fromBucketName(this,'haimtran-bucket-id','haimtran-workspace')bucket.addEventNotification(aws_s3.EventType.OBJECT_CREATED,new aws_s3_notifications.LambdaDestination(func),{prefix: 'notify-lambda/'})
dynamodb table enabled stream
const table = new aws_dynamodb.Table(this, 'S3LambdaEventTable', {tableName: 'S3LambdaEventTable',partitionKey: {name: 'id',type: aws_dynamodb.AttributeType.STRING},billingMode: aws_dynamodb.BillingMode.PAY_PER_REQUEST,stream: aws_dynamodb.StreamViewType.NEW_IMAGE})
dynamodb stream trigger lambda
lambda_sns.addEventSource(new aws_lambda_event_sources.DynamoEventSource(table, {startingPosition: aws_lambda.StartingPosition.LATEST,batchSize: 1,retryAttempts: 2}))
sns topic and subscription
// create a sns topicconst topic = new aws_sns.Topic(this, 'SnsTopicIcaDemo', {topicName: 'SnsTopicIcaDemo'})// subscripttopic.addSubscription(new aws_sns_subscriptions.EmailSubscription('hai@entest.io'))
API Gateway Lambda Integration#
create an api gateway
const api_gw = new aws_apigateway.RestApi(this, 'ApiGwIcaDemo', {restApiName: 'lambda-api-demo'})
create api resource
const api_resource = api_gw.root.addResource('books')
add api GET method
api_resource.addMethod('GET', new aws_apigateway.LambdaIntegration(func))
API Gateway SQS Lambad Integration#
API gateway integerates with SQS queue via aws_apigateway.AwsIntegration class and API Gateway need a role or granted to write messages to the queue. Note After the message successuflly processed by the lambda, need to return statusCode: 200 to the SQS queue, so the queue will delete the processed message. Fail/exception messages will be put in a dead letter queue (DLQ)
Role to enable API Gateway writting messages to the SQS queue#
const role = new aws_iam.Role(this, 'apiGatewayWriteToSqsRole', {assumedBy: new aws_iam.ServicePrincipal('apigateway.amazonaws.com')})role.attachInlinePolicy(new aws_iam.Policy(this, 'writeToSqsPolicy', {statements: [new aws_iam.PolicyStatement({effect: aws_iam.Effect.ALLOW,actions: ['sqs:SendMessage'],resources: [queue.queueArn]})]}))
API Gateway#
const api_gw = new aws_apigateway.RestApi(this, 'apiGatewaySqsDemo', {restApiName: 'api-gateway-sqs-demo'})
API Gateway integration with SQS queue
const integration = new aws_apigateway.AwsIntegration({service: 'sqs',path: 'sqsQueueApiGatewayDemo',integrationHttpMethod: 'POST',options: {credentialsRole: role,requestParameters: {'integration.request.header.Content-Type': `'application/x-www-form-urlencoded'`},requestTemplates: {'application/json': `Action=SendMessage&MessageBody=$util.urlEncode("$method.request.querystring.message")`},integrationResponses: [{statusCode: '200',responseTemplates: {'application/json': `{'done': true}`}}]}})
API Gateway resource or path
const resource = api_gw.root.addResource('queue')
API Gateway add method
resource.addMethod('GET', integration, {methodResponses: [{ statusCode: '200' }]})
Lambda function to process messages from the queue#
create a Lambda function
const fn = new aws_lambda.Function(this, 'lambdaConsumeSqsMessageDemo', {runtime: aws_lambda.Runtime.PYTHON_3_8,code: aws_lambda.Code.fromAsset(path.join(__dirname, 'lambda')),handler: 'index.handler'})
lambda resource event to trigger lambda by the queue
fn.addEventSource(new SqsEventSource(queue, {batchSize: 1,maxBatchingWindow: Duration.minutes(1),reportBatchItemFailures: true}))
grant lambda to publish messages to a SNS topic
// existing topicconst topic = aws_sns.Topic.fromTopicArn(this,'lambdaSendMessageToSnsDemo','arn:aws:sns:ap-southeast-1:account_id:CodePipelineNotification')// grant publish to lambdatopic.grantPublish(fn)