Introduction#
This GitHub shows basic architecture and examples with MSK and Kinesis Data Analytics (FLINK)
- Setup s3 vpc endpoint, glue vpc endpoint for msk
- Create a msk cluster
- Create client, pub/sub topic
- Update cluster configuration
- Create a zeppeline notebook
- Do some simple streaming analytics
Setup Client#
Setup permissions for client with the following policy, this policy apply for both
- EC2 for demo
- Zeppline notebook
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["kafka-cluster:Connect","kafka-cluster:AlterCluster","kafka-cluster:DescribeCluster"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:cluster/*"]},{"Effect": "Allow","Action": ["kafka-cluster:*Topic*","kafka-cluster:WriteData","kafka-cluster:ReadData"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:topic/*"]},{"Effect": "Allow","Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],"Resource": ["arn:aws:kafka:ap-southeast-1:1111222233334444:group/*"]}]}
Describe a cluster
export CLUSTER_ARN=
aws kafka describe-cluster --cluster-arn $CLUSTER_ARN
install java for EC2
sudo yum -y install java-11
then install
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
download iam
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
create the client.properities file with below content
security.protocol=SASL_SSLsasl.mechanism=AWS_MSK_IAMsasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
put it together in userdata
UserData#
Use this userdata to configure an EC2 instance which connect to msk cluster
sudo yum -y install java-11wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgztar -xvf kafka_2.13-2.8.1.tgzcd kafka_2.13-2.8.1/libs/wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jarcd ../bin/echo 'security.protocol=SASL_SSL' >> client.propertiesecho 'sasl.mechanism=AWS_MSK_IAM' >> client.propertiesecho 'sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;' >> client.propertiesecho 'sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler' >> client.propertieschmod 700 client.properties
Pub and Sub#
Check the msk client information to get endpoint and run below command to create an topic. Export the list of broker
export ENDPOINT="IAM_BOOTSTRAP_BROKERS_GO_HERE"
bin/kafka-topics.sh \--create --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--replication-factor 3 --partitions 3 \--topic sensor-topic
list topic
bin/kafka-topics.sh \--list --bootstrap-server $ENDPOINT \--command-config bin/client.properties
describe a topic
bin/kafka-topics.sh \--describe --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--topic sensor-topic
send a message to by using a producer
bin/kafka-console-producer.sh --broker-list \$ENDPOINT \--producer.config bin/client.properties \--topic sensor-topic
receive message by using a consumer
bin/kafka-console-consumer.sh \--bootstrap-server $ENDPOINT \--consumer.config bin/client.properties \--topic sensor-topic \--from-beginning
Producer Python#
Please update the security of msk, for example for quickly testing, allow unauthorized access
pip install
then create a producer, please use the SSL BOOTSTRAP CLUSTER
import datetimeimport randomimport timeimport jsonfrom confluent_kafka import Producer# bootstrapserverBOOTSTRAP_SERVERS ="SSL_BOOTSTRAP_CLUSTER"# topic nameTOPIC = "stock-topic"# callback delivery functiondef delivery_report(error, message):""""""if error is not None:print("GOOD")else:print(message)# producerproducer = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS,'security.protocol': 'SSL'})# send event to topicwhile True:# create eventevent = {'event_time': datetime.datetime.now().isoformat(),'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),'price': round(random.random() * 100, 2)}# convert dict to bytedata = json.dumps(event, indent=2).encode("utf-8")# send event to topicproducer.produce(TOPIC, data, callback=delivery_report)time.sleep(1)
Notebook#
Please update the vpc configuration for notebook first, so it can access msk cluster inside a vpc.Then let create a table which connect to the kafka topic stream, please use the IAM BOOTSTRAP CLUSTER. From the producer please use correct datetime formate for event_time.
'event_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
let create a table which connect to the kafka topic
%flink.ssql(type=update)DROP TABLE IF EXISTS stock_stream;CREATE TABLE stock_stream (ticker STRING,price DOUBLE,event_time TIMESTAMP(3))WITH ('connector' = 'kafka','topic' = 'stock-topic','properties.bootstrap.servers' = 'b-2.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098,b-1.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098,b-3.democluster2.vidd98.c3.kafka.ap-southeast-1.amazonaws.com:9098','properties.group.id' = 'KdaStudioGroup','scan.startup.mode' = 'latest-offset','format' = 'json','properties.security.protocol' = 'SASL_SSL','properties.sasl.mechanism' = 'AWS_MSK_IAM','properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;','properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler');
run a simple query
%flink.ssql(type=update)SELECT * FROM stock_stream
create a sink table for writting to s3
%flink.ssql(type=update)CREATE TABLE stock_output_table(ticker STRING,price DOUBLE,event_time TIMESTAMP(3))PARTITIONED BY (ticker)WITH ('connector'='filesystem','path'='s3a://data-lake-stream-20072023/kafka-data/','format'='csv','sink.partition-commit.policy.kind'='success-file','sink.partition-commit.delay' = '1 min');
enable checkpoint
%flink.pyflinkst_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "1min")st_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
insert data into the sink table, in case of msk, need to setup s3 endpoint
%flink.ssql(type=update)INSERT INTO stock_output_tableSELECTticker,price,event_timeFROM stock_table
similarly, we can writ to json format by another table
%flink.ssql(type=update)CREATE TABLE stock_output_table_json(ticker STRING,price DOUBLE,event_time TIMESTAMP(3))PARTITIONED BY (ticker)WITH ('connector'='filesystem','path'='s3a://data-lake-stream-20072023/kafka-data-json/','format'='json','sink.rolling-policy.rollover-interval' = '60s','sink.rolling-policy.check-interval' = '30s');
then insert data into json table
%flink.ssql(type=update)INSERT INTO stock_output_table_jsonSELECTticker,price,event_timeFROM stock_table
Policy for Client#
policy
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["kafka-cluster:Connect","kafka-cluster:AlterCluster","kafka-cluster:DescribeCluster"],"Resource": ["*"]},{"Effect": "Allow","Action": ["kafka-cluster:*Topic*","kafka-cluster:WriteData","kafka-cluster:ReadData"],"Resource": ["*"]},{"Effect": "Allow","Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],"Resource": ["*"]}]}
and policy to access kafka, glue
{"Sid": "ReadGlue","Effect": "Allow","Action": ["glue:*"],"Resource": ["*"]},{"Sid": "ReadKafka","Effect": "Allow","Action": ["kafka:*"],"Resource": ["*"]},{"Sid": "AccessMSK","Effect": "Allow","Action": ["kafka-cluster:*"],"Resource": ["*"]},
please double check below policy
CloudWatchFullAccessCloudWatchLogsFullAccessAmazonKinesisFullAccessAmazonS3FullAccessAWSGlueServiceRole
to access msk cluster inside an vpc we need to attach AmazonVPCFullAccess to the notebook
AmazonVPCFullAccess
Best practice policy with resource arn
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["kafka-cluster:Connect","kafka-cluster:AlterCluster","kafka-cluster:DescribeCluster"],"Resource": ["arn:aws:kafka:ap-southeast-1:111222333444:cluster/demo-cluster-1/*"]},{"Effect": "Allow","Action": ["kafka-cluster:*Topic*","kafka-cluster:WriteData","kafka-cluster:ReadData"],"Resource": ["arn:aws:kafka:ap-southeast-1:111222333444:topic/demo-cluster-1/*"]},{"Effect": "Allow","Action": ["kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup"],"Resource": ["arn:aws:kafka:ap-southeast-1:111222333444:group/demo-cluster-1/*"]}]}
Troubeshooting#
- ensure that notebook (inside vpc) can accessl glue catalog via nat or vpc endpoint
- update vpc configuration of notebook to access msk inside a cluster
- update role of the zeppeline notebook
- update security (allow unauthorized access) of msk
- double check name of stream
Describe a cluster
aws kafka describe-cluster --cluster-arn "CLUSTER_ARN"
Troubleshooting#
![IMPORTANT]
Please take note the difference between IAM_BOOTSTRAP_BROKERS_GO_HERE and SSL_BOOTSTRAP_CLUSTER. For demo, let enable aunthentication
and client connection information
Install pip
python3 -m ensurepip --upgrade
Here is connection script via terminal
# export endpoint - iamexport ENDPOINT=""export TOPIC=stock-topic# create a topicbin/kafka-topics.sh \--create --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--replication-factor 2 --partitions 1 \--topic $TOPIC# list topicbin/kafka-topics.sh \--list --bootstrap-server $ENDPOINT \--command-config bin/client.properties# describe topicbin/kafka-topics.sh \--describe --bootstrap-server $ENDPOINT \--command-config bin/client.properties \--topic $TOPIC# pub a topicbin/kafka-console-producer.sh --broker-list \$ENDPOINT \--producer.config bin/client.properties \--topic $TOPIC# sub a topicbin/kafka-console-consumer.sh \--bootstrap-server $ENDPOINT \--consumer.config bin/client.properties \--topic $TOPIC \--from-beginning