Introduction#
- Stack to creat MSK serverless
- Pub/sub using terminal
- Pub/sub using python client
- Integrate with Flink application
[!IMPORTANT]
Ensure that the client and cluster are in the same region
[!IMPORTANT]
Using IAM authentication and client lib for MSK serverless
Serverless Cluster#
First create a net work stack
import { aws_ec2, Stack, StackProps } from "aws-cdk-lib";import { Construct } from "constructs";interface VpcProps extends StackProps {cidr: string;name: string;}export class NetworkStack extends Stack {public readonly vpc: aws_ec2.Vpc;public readonly MskSecurityGroup: aws_ec2.SecurityGroup;constructor(scope: Construct, id: string, props: VpcProps) {super(scope, id, props);const vpc = new aws_ec2.Vpc(this, `${props.name}-Vpc`, {vpcName: props.name,maxAzs: 3,enableDnsHostnames: true,enableDnsSupport: true,ipAddresses: aws_ec2.IpAddresses.cidr(props.cidr),// aws nat gateway service not instancenatGatewayProvider: aws_ec2.NatProvider.gateway(),// can be less than num az default 1 natgw/zonenatGateways: 1,// which public subet have the natgw// natGatewaySubnets: {// subnetType: aws_ec2.SubnetType.PRIVATE_WITH_EGRESS,// },subnetConfiguration: [{// cdk add igw and route tablesname: "PublicSubnet",cidrMask: 24,subnetType: aws_ec2.SubnetType.PUBLIC,},{// cdk add nat and route tablesname: "PrivateSubnetNat",cidrMask: 24,subnetType: aws_ec2.SubnetType.PRIVATE_WITH_EGRESS,},],});const MskSecurityGroup = new aws_ec2.SecurityGroup(this,"MskSecurityGroup",{securityGroupName: "MskSecurityGroup",vpc: vpc,});MskSecurityGroup.addIngressRule(MskSecurityGroup,aws_ec2.Port.allTraffic(),"self reference security group");vpc.addInterfaceEndpoint("STSVpcEndpoint", {service: aws_ec2.InterfaceVpcEndpointAwsService.STS,open: true,subnets: {subnetType: aws_ec2.SubnetType.PRIVATE_WITH_EGRESS,},securityGroups: [MskSecurityGroup],});this.vpc = vpc;this.MskSecurityGroup = MskSecurityGroup;}}
Then create a cluster
import { Stack, StackProps, aws_ec2, aws_msk } from "aws-cdk-lib";import { Construct } from "constructs";interface MskServerlessProps extends StackProps {clusterName: string;securityGroup: aws_ec2.SecurityGroup;vpc: aws_ec2.Vpc;}export class MSKServerlessStack extends Stack {constructor(scope: Construct, id: string, props: MskServerlessProps) {super(scope, id, props);const subnetIds = props.vpc.publicSubnets.map((net) => net.subnetId);const securityGroupId = props.securityGroup.securityGroupId;new aws_msk.CfnServerlessCluster(this, "MSKServerlessDemo", {clusterName: "ServerlessDemo",clientAuthentication: {sasl: {iam: {enabled: true,},},},vpcConfigs: [{subnetIds: subnetIds,securityGroups: [securityGroupId],},],});}}
Setup Client#
Setup permissions for client with the following policy, this policy apply for both
- EC2 for demo
- Zeppline notebook
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["kafka-cluster:Connect","kafka-cluster:AlterCluster","kafka-cluster:DescribeCluster"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:cluster/*"]},{"Effect": "Allow","Action": ["kafka-cluster:*Topic*","kafka-cluster:WriteData","kafka-cluster:ReadData"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:topic/*"]},{"Effect": "Allow","Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:group/*"]}]}
Pub Sub Client#
# export endpointexport ENDPOINT=""export TOPIC=sensor-topic# create a topicbin/kafka-topics.sh \--create --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--replication-factor 3 --partitions 1 \--topic $TOPIC# list topicbin/kafka-topics.sh \--list --bootstrap-server $ENDPOINT \--command-config bin/client.properties# describe topicbin/kafka-topics.sh \--describe --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--topic $TOPIC# pub a topicbin/kafka-console-producer.sh --broker-list \$ENDPOINT \--producer.config bin/client.properties \--topic $TOPIC# sub a topicbin/kafka-console-consumer.sh \--bootstrap-server $ENDPOINT \--consumer.config bin/client.properties \--topic $TOPIC \--from-beginning
Python Client#
Install dependencies
confluent-kafka==2.2.0kafka-python==2.0.2aws-msk-iam-sasl-signer-python
First option, let create a Confluent consumer and producer
# haimtran 20/07/2023# use confuent kafka clientimport socketimport datetimeimport randomimport timeimport jsonfrom confluent_kafka import Producer, Consumerfrom aws_msk_iam_sasl_signer import MSKAuthTokenProvider# bootstrapserver with iam authBOOTSTRAP_SERVERS = ""# topic nameREGION = "us-east-1"TOPIC = "sensor-topic"def oauth_cb(oauth_config=None):auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(REGION)# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in msprint(auth_token)return auth_token, expiry_ms / 1000# callback delivery functiondef delivery_report(error, message):""" """if error is not None:print("GOOD")else:print(message)def get_data():# consumerconsumer = Consumer({# "debug": "all","bootstrap.servers": BOOTSTRAP_SERVERS,"client.id": socket.gethostname(),"security.protocol": "SASL_SSL","sasl.mechanisms": "OAUTHBEARER","oauth_cb": oauth_cb,"group.id": "mygroup","auto.offset.reset": "earliest"})consumer.subscribe(['sensor-topic'])while True:message = consumer.poll(5)if message is None:continueif message.error():print("consumer error")#print(message.value())def send_data():# producerproducer = Producer({"bootstrap.servers": BOOTSTRAP_SERVERS,"client.id": socket.gethostname(),"security.protocol": "SASL_SSL","sasl.mechanisms": "OAUTHBEARER","oauth_cb": oauth_cb})print(producer)# send event to topicwhile True:# create eventevent = {# 'event_time': datetime.datetime.now().isoformat(),"event_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),"ticker": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),"price": round(random.random() * 100, 2),}# convert dict to bytedata = json.dumps(event, indent=2).encode("utf-8")# send event to topictry:producer.produce(TOPIC, data, callback=delivery_report)producer.flush()time.sleep(1)except:print('not able to send message')if __name__ == "__main__":# oauth_cb()send_data()# get_data()
Second option, let create a Kafka producer
from kafka import KafkaProducerfrom kafka.errors import KafkaErrorimport socketimport timefrom aws_msk_iam_sasl_signer import MSKAuthTokenProviderBOOTSTRAP_SERVERS = ""REGION = "us-east-1"TOPIC = "sensor-topic"class MSKTokenProvider():def token(self):token, _ = MSKAuthTokenProvider.generate_auth_token(REGION)return tokentp = MSKTokenProvider()producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS,security_protocol='SASL_SSL',sasl_mechanism='OAUTHBEARER',sasl_oauth_token_provider=tp,client_id=socket.gethostname(),)while True:try:inp=input("Hello me")producer.send(TOPIC, inp.encode())producer.flush()print("Produced!")time.sleep(1)except Exception:print("Failed to send message:", e)producer.close()