Introduction#

GitHub shows events format, how they are produced and consumed and how to create a rule and add targets. Part 1 uses lambdas to produce and consume my-own events, then part 2 creates a S3 rule with prefix.

Api Gateway Socket

Part 1. Producer and Consumer#

  • A lambda procuder sends events to the event bus
  • A lambda consumer being trigger by the event rule
  • The event rule map the events to targets (the lamda consumer)
  • EventPattern specify how to match events to targets
  • Use amazon s3 event notification with amazon eventbridge

aws_devops-eventbridge drawio(5)

Event Format#

send/put an event format by python boto3. Detail field should be a JSON object, that's why json.dumps here, or JSON.stringfy in javascript.

resp = eventClient.put_events(
Entries=[
{
'Time': datetime.now(),
'Source': 'io.entest.demo',
'Detail': json.dumps({"title": "purchase event"}),
'DetailType': 'purchase',
'Resources':['arn:aws:lambda...']
}
]
)

event rule uses eventPattern to map purchase-event to process purchase lambda

const purchaseRule = new aws_events.Rule(this, 'TriggerProcessPurchaseLambda', {
ruleName: 'TriggerProcessPurchaseLambda',
description: '',
eventPattern: { source: ['io.entest.demo'], detailType: ['purchase'] }
})
purchaseRule.addTarget(
new aws_events_targets.LambdaFunction(processPurchaseLambda.lambda)
)

received event format in python

{
"version": "0",
"id": "9b8aa069-94a8-2283-dcb0-0d3511df4345",
"detail-type": "purchase",
"source": "io.entest.demo",
"account": "ACCOUNT_ID",
"time": "2022-07-30T03:22:23Z",
"region": "REGION",
"resources": ["arn:aws:lambda..."],
"detail": { "title": "purchase event" }
}

CDK Stack#

create lambda producer

const producerLambda = new aws_lambda.Function(this, 'ProducerLambda', {
functionName: 'ProducerLambda',
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.join(__dirname, './../lambdas/producer.py'), {
encoding: 'utf-8'
})
),
handler: 'index.handler',
runtime: aws_lambda.Runtime.PYTHON_3_8,
timeout: Duration.seconds(10)
})
producerLambda.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['events:PutEvents']
})
)

create a lambda based construct for purchase and order services.

export class LambdaService extends Construct {
public readonly lambda: aws_lambda.Function
constructor(scope: Construct, id: string, props: LambdaServiceProps) {
super(scope, id)
const func = new aws_lambda.Function(this, props.functionName, {
functionName: props.functionName,
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.join(__dirname, props.functionCode), {
encoding: 'utf-8'
})
),
handler: 'index.handler',
runtime: aws_lambda.Runtime.PYTHON_3_8,
timeout: Duration.seconds(10),
// provide a topic arn if want the consumer send sns
environment: {
TOPIC_ARN: props.topicArn ? props.topicArn : ''
}
})
func.addToRolePolicy(
new PolicyStatement({
effect: Effect.ALLOW,
resources: ['*'],
actions: ['sns:*']
})
)
this.lambda = func
}
}

event rule map purchase-events to process purchase lambda

const purchaseRule = new aws_events.Rule(this, 'TriggerProcessPurchaseLambda', {
ruleName: 'TriggerProcessPurchaseLambda',
description: '',
eventPattern: { source: ['io.entest.demo'], detailType: ['purchase'] }
})
purchaseRule.addTarget(
new aws_events_targets.LambdaFunction(processPurchaseLambda.lambda)
)

event rule map order-lambda to process order lambda

const orderRule = new aws_events.Rule(this, 'TriggerProcessOrderLambda', {
ruleName: 'TriggerProcessOrderLambda',
description: '',
eventPattern: { source: ['io.entest.demo'], detailType: ['order'] }
})
orderRule.addTarget(
new aws_events_targets.LambdaFunction(processOrderLambda.lambda)
)

Lambda Producer#

from datetime import datetime
import json
import boto3
eventClient = boto3.client('events')
def handler(event, context):
"""
send event to enventbridge
"""
request_body = event['body']
print(request_body)
resp = eventClient.put_events(
Entries=[
{
'Time': datetime.now(),
'Source': 'io.entest.demo',
'Detail': json.dumps(request_body),
'DetailType': 'service_status',
}
]
)
print(resp)
return (
{
'message': 'procuder'
}
)

Lambda Consumer#

import os
import json
import boto3
snsClient = boto3.client('sns')
def handler(event, context):
"""
consume event and send sns
"""
# if an sns topic arn avaiable
if (os.environ['TOPIC_ARN'] != ''):
snsClient.publish(
TopicArn=os.environ['TOPIC_ARN'],
Message=json.dumps(event)
)
else:
pass
return {
'message': event
}

Part 2. S3 Event Rule Prefix#

S3 Event#

event sent from S3 to EventBridge has a format as below

{
"version": "0",
"id": "17793124-05d4-b198-2fde-7ededc63b103",
"detail-type": "Object Created",
"source": "aws.s3",
"account": "111122223333",
"time": "2021-11-12T00:00:00Z",
"region": "ca-central-1",
"resources": ["arn:aws:s3:::DOC-EXAMPLE-BUCKET1"],
"detail": {
"version": "0",
"bucket": {
"name": "DOC-EXAMPLE-BUCKET1"
},
"object": {
"key": "example-key",
"size": 5,
"etag": "b1946ac92492d2347c6235b4d2611184",
"version-id": "IYV3p45BT0ac8hjHg1houSdS1a.Mro8e",
"sequencer": "617f08299329d189"
},
"request-id": "N4N7GDK58NMKJ12R",
"requester": "123456789012",
"source-ip-address": "1.2.3.4",
"reason": "PutObject"
}
}

Rule - Prefix#

const eventRule = new aws_events.Rule(this, 'S3PutObjectTriggerLambdaRule', {
ruleName: 'S3PutObjectTriggerLambdaRule',
eventPattern: {
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
name: [props.bucketName],
object: {
key: [{ prefix: 'eventbridge-demo/' }]
}
}
}
})

CDK Stack#

lookup an existed bucket

const bucket = aws_s3.Bucket.fromBucketName(
this,
'S3BucketEventSource',
props.bucketName
)
bucket.enableEventBridgeNotification()

lambda function to process the event

const func = new aws_lambda.Function(this, 'ProcessEventS3PutObject', {
functionName: 'ProcessEventS3PutObject',
code: aws_lambda.Code.fromInline(
fs.readFileSync(
path.resolve(__dirname, './../lambdas/lambda_prefix_s3.py'),
{
encoding: 'utf-8'
}
)
),
handler: 'index.main',
runtime: aws_lambda.Runtime.PYTHON_3_8
})

then add a lambda as a target

eventRule.addTarget(new aws_events_targets.LambdaFunction(func))

not forget to grant lambda to read s3

bucket.grantRead(func)