Introduction#
GitHub this note shows how to use Glue Interactive Session with local notebook to develop ETL
- Setup Glue kernels for local
- Setup role for Glue notebook
- Read data from S3 to dataframe
- Write data to S3 and Glue catalog
Setup Kernel#
- Create an virtual env with pythons
- Install Glue Spark kernel
- Setup the glue_arn_role
It is good to setup an virtual environtment
python3 -m venv env
Then, install the Glue Spark kernel using pip
python -m pip install --user --upgrade jupyter boto3 aws-glue-sessions
Check kernel list
jupyter-kernelspec list
Install Glue PySpark kernel
cd env/lib/python3.8/site-packages/aws_glue_interactive_sessions_kernel
jupyter-kernelspec install glue_pyspark./install glue_pyspark
Install Glue Scala kernel
jupyter-kernelspec install glue_spark./install glue_spark
Then check the result
jupyter-kernelspec list
If vscode does not see the Glue PySpark kernel, we need to update the kernel.json
/home/ubuntu/.local/share/jupyter/kernels/glue_pyspark/kernel.json
and update kernel.json with full path to python
{"argv": ["/usr/bin/python3","-m","aws_glue_interactive_sessions_kernel.glue_pyspark.GlueKernel","-f","{connection_file}"],"display_name": "Glue PySpark","language": "python"}
Role for Glue#
Glue ETL Job, notebook usually need to
- Read data from S3
- Do ETL job, write transfomred data to S3
- Create tables in Glue catalog
To enable the local vscode or notebook run the Glue session, we need two roles
- glue_role_arn which the Glue assume when runing ETL job
- IAM role or IAM user for CLI as normal
Then there are two options to grant permissions to the Glue role for notebook
- Use IAM
- Use LakeFormation
First option, setup IAM role for Glue notebook
const role = new aws_iam.Role(this, "RoleForGlueNotebook", {roleName: "RoleForGlueNotebook",assumedBy: new aws_iam.ServicePrincipal("glue.amazonaws.com"),});role.addManagedPolicy(aws_iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"));const policy = new aws_iam.Policy(this,"LeastPriviledgePolicyForGlueNotebookRole",{policyName: "LeastPriviledgePolicyForGlueNotebookRole",statements: [// pass iam rolenew aws_iam.PolicyStatement({actions: ["iam:PassRole", "iam:GetRole"],effect: Effect.ALLOW,resources: ["*"],}),// athenanew aws_iam.PolicyStatement({actions: ["athena:*"],effect: Effect.ALLOW,resources: ["*"],}),// access s3new aws_iam.PolicyStatement({actions: ["s3:*"],effect: Effect.ALLOW,resources: [props.athenaResultBucketArn,`${props.athenaResultBucketArn}/*`,props.sourceBucketArn,`${props.sourceBucketArn}/*`,],}),// access glue catalognew aws_iam.PolicyStatement({actions: ["glue:*"],effect: Effect.ALLOW,resources: [`arn:aws:glue:${this.region}:*:table/${props.databaseName}/*`,`arn:aws:glue:${this.region}:*:database/${props.databaseName}`,`arn:aws:glue:${this.region}:*:catalog`,],}),],});policy.attachToRole(role);
Magics#
We use magic cell to setup idle time, Glue version, number of worker, etc. For example
%additional_python_modules matplotlib, numpy, pandas%idle_timeout 15%glue_version 3.0%number_of_workers 5
Please do not forget to stop the interactive session to save cost, otherwise it will be there for 48 hours
%stop_session
Glue Context#
Let import awsglue and pyspark to create Glue and Spark Context
from awsglue.context import GlueContextfrom pyspark.context import SparkContext# create spark and glue contextsparkContext = SparkContext.getOrCreate()glueContext = GlueContext(sparkContext)# create spark sessionspark = glueContext.spark_session# create spark session another wayfrom pyspark.sql import SparkSessionspark2 = SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()
Read Data#
There are different ways to read data from S3
- Glue DynamicFrame read data from S3
- Glue DynamicFrame read from an existing catalog table
- Spark DataFrame read from S3
First, let read data from S3 using Glue DyanmicFrame
df_glue = glueContext.create_dynamic_frame.from_options(format_options={"quoteChar": '"',"withHeader": True,"separator": "\t",},connection_type="s3",format="csv",connection_options={"paths": ["s3://amazon-reviews-pds/tsv/"],"recurse": True,},transformation_ctx="df_glue",)
Second, let read data from S3 using Spark DataFrame. Please it is possilbe to enforce a schema
from pyspark.sql.types import StructType, StringType, IntegerTypeschema = StructType() \.add("marketplace",StringType(),True) \.add("customer_id",StringType(),True) \.add("review_id",StringType(),True) \.add("product_id",StringType(),True) \.add("product_parent",StringType(),True) \.add("product_title",StringType(),True) \.add("product_category",StringType(),True) \.add("star_rating",StringType(),True) \.add("helpful_vote",StringType(),True) \.add("total_vote",StringType(),True) \.add("vine",StringType(),True) \.add("verified_purchase",StringType(),True) \.add("review_headline",StringType(),True) \.add("review_body",StringType(),True) \.add("myyear",StringType(),True)
Then pass the schema into the spark_session.read
df = spark_session.read.format("csv")\.option("header", False)\.option("delimiter", "\t")\.option("quote", '"')\.schema(schema)\.load("s3://amazon-reviews-pds/tsv/")
Third, assume that there is already an existing catalog table, let read it using Glue DynamicFrame
df2 = glueContext.create_dynamic_frame.from_catalog(database="default",table_name=table_name)
Write Data#
There are different ways to write data to S3 and catalog table
- Glue DynamicFrame write to an existing catalog table
- Glue DynamicFrame write only data to S3
- Spark DataFrame write only data to S3
Let create a catalog table using Boto3
import boto3# create glue clientclient = boto3.client(region_name="ap-southeast-1", service_name="glue")# create tabe in catalogresp = client.create_table(CatalogId=catalog_id,DatabaseName=database_name,TableInput={"Name": table_name,"Description": "test","TableType": "EXTERNAL","Parameters": {"classification": "parquet"},"StorageDescriptor": {"Columns": [{"Name": "marketplace","Type": "string",},{"Name": "customer_id","Type": "string",},{"Name": "review_id","Type": "string",},{"Name": "product_id","Type": "string",},{"Name": "product_parent","Type": "string",},{"Name": "product_title","Type": "string",},{"Name": "product_category","Type": "string",},{"Name": "star_rating","Type": "string",},{"Name": "helpful_vote","Type": "string",},{"Name": "total_vote","Type": "string",},{"Name": "vine","Type": "string",},{"Name": "verified_purchase","Type": "string",},{"Name": "review_headline","Type": "string",},{"Name": "review_body","Type": "string",},{"Name": "myyear","Type": "string",},],"Location": "s3://{0}/{1}/".format(lake_bucket_name, table_name),"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat","OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat","SerdeInfo": {"Name": "ParquetHiveSerDe","SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",},"Compressed": False,},},)
First option, we can use Glue DynamicFrame to write data to the catalog table
glueContext.write_dynamic_frame.from_catalog(frame=glue_df,database= "default",table_name=table_name,transformation_ctx="S3bucket_node3",)
Second option, we can use Glue DynamicFrame to write data to S3
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(frame=glue_df,connection_type="s3",format="glueparquet",connection_options={"path": "s3://{0}/glue-dynamicframe-parquet/".format(lake_bucket_name),# "partitionKeys": ["product_category"],# "enableUpdateCatalog": True,# "database":"default",# "table":"amazon_reviews_parquet_table",},format_options={"compression": "uncompressed"},transformation_ctx="S3bucket_node3",)
Third option, use Spark DataFrame to write data to S3
df.write.parquet(f"s3://{lake_bucket_name}/spark-dataframe/", mode='overwrite')
Finally, there is another option to write to S3 using glueContext.getSink
sink = glueContext.getSink(path="s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket),connection_type="s3",updateBehavior="UPDATE_IN_DATABASE",partitionKeys=[],# compression="snappy",enableUpdateCatalog=True,transformation_ctx="write_sink",)sink.setCatalogInfo(catalogDatabase="default",catalogTableName="amazon_review_tsv_parquet")sink.setFormat("glueparquet")sink.writeFrame(df)