Introduction#

GitHub this note shows how to use Glue Interactive Session with local notebook to develop ETL

  • Setup Glue kernels for local
  • Setup role for Glue notebook
  • Read data from S3 to dataframe
  • Write data to S3 and Glue catalog

Setup Kernel#

  • Create an virtual env with pythons
  • Install Glue Spark kernel
  • Setup the glue_arn_role

It is good to setup an virtual environtment

python3 -m venv env

Then, install the Glue Spark kernel using pip

python -m pip install --user --upgrade jupyter boto3 aws-glue-sessions

Check kernel list

jupyter-kernelspec list

Install Glue PySpark kernel

cd env/lib/python3.8/site-packages/aws_glue_interactive_sessions_kernel
jupyter-kernelspec install glue_pyspark
./install glue_pyspark

Install Glue Scala kernel

jupyter-kernelspec install glue_spark
./install glue_spark

Then check the result

jupyter-kernelspec list

If vscode does not see the Glue PySpark kernel, we need to update the kernel.json

/home/ubuntu/.local/share/jupyter/kernels/glue_pyspark/kernel.json

and update kernel.json with full path to python

{
"argv": [
"/usr/bin/python3",
"-m",
"aws_glue_interactive_sessions_kernel.glue_pyspark.GlueKernel",
"-f",
"{connection_file}"
],
"display_name": "Glue PySpark",
"language": "python"
}

Role for Glue#

Glue ETL Job, notebook usually need to

  • Read data from S3
  • Do ETL job, write transfomred data to S3
  • Create tables in Glue catalog

To enable the local vscode or notebook run the Glue session, we need two roles

  • glue_role_arn which the Glue assume when runing ETL job
  • IAM role or IAM user for CLI as normal

Then there are two options to grant permissions to the Glue role for notebook

  • Use IAM
  • Use LakeFormation

First option, setup IAM role for Glue notebook

const role = new aws_iam.Role(this, "RoleForGlueNotebook", {
roleName: "RoleForGlueNotebook",
assumedBy: new aws_iam.ServicePrincipal("glue.amazonaws.com"),
});
role.addManagedPolicy(
aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
"service-role/AWSGlueServiceRole"
)
);
const policy = new aws_iam.Policy(
this,
"LeastPriviledgePolicyForGlueNotebookRole",
{
policyName: "LeastPriviledgePolicyForGlueNotebookRole",
statements: [
// pass iam role
new aws_iam.PolicyStatement({
actions: ["iam:PassRole", "iam:GetRole"],
effect: Effect.ALLOW,
resources: ["*"],
}),
// athena
new aws_iam.PolicyStatement({
actions: ["athena:*"],
effect: Effect.ALLOW,
resources: ["*"],
}),
// access s3
new aws_iam.PolicyStatement({
actions: ["s3:*"],
effect: Effect.ALLOW,
resources: [
props.athenaResultBucketArn,
`${props.athenaResultBucketArn}/*`,
props.sourceBucketArn,
`${props.sourceBucketArn}/*`,
],
}),
// access glue catalog
new aws_iam.PolicyStatement({
actions: ["glue:*"],
effect: Effect.ALLOW,
resources: [
`arn:aws:glue:${this.region}:*:table/${props.databaseName}/*`,
`arn:aws:glue:${this.region}:*:database/${props.databaseName}`,
`arn:aws:glue:${this.region}:*:catalog`,
],
}),
],
}
);
policy.attachToRole(role);

Magics#

We use magic cell to setup idle time, Glue version, number of worker, etc. For example

%additional_python_modules matplotlib, numpy, pandas
%idle_timeout 15
%glue_version 3.0
%number_of_workers 5

Please do not forget to stop the interactive session to save cost, otherwise it will be there for 48 hours

%stop_session

Glue Context#

Let import awsglue and pyspark to create Glue and Spark Context

from awsglue.context import GlueContext
from pyspark.context import SparkContext
# create spark and glue context
sparkContext = SparkContext.getOrCreate()
glueContext = GlueContext(sparkContext)
# create spark session
spark = glueContext.spark_session
# create spark session another way
from pyspark.sql import SparkSession
spark2 = SparkSession.builder.appName("Test").enableHiveSupport().getOrCreate()

Read Data#

There are different ways to read data from S3

  • Glue DynamicFrame read data from S3
  • Glue DynamicFrame read from an existing catalog table
  • Spark DataFrame read from S3

First, let read data from S3 using Glue DyanmicFrame

df_glue = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": "\t",
},
connection_type="s3",
format="csv",
connection_options={
"paths": ["s3://amazon-reviews-pds/tsv/"],
"recurse": True,
},
transformation_ctx="df_glue",
)

Second, let read data from S3 using Spark DataFrame. Please it is possilbe to enforce a schema

from pyspark.sql.types import StructType, StringType, IntegerType
schema = StructType() \
.add("marketplace",StringType(),True) \
.add("customer_id",StringType(),True) \
.add("review_id",StringType(),True) \
.add("product_id",StringType(),True) \
.add("product_parent",StringType(),True) \
.add("product_title",StringType(),True) \
.add("product_category",StringType(),True) \
.add("star_rating",StringType(),True) \
.add("helpful_vote",StringType(),True) \
.add("total_vote",StringType(),True) \
.add("vine",StringType(),True) \
.add("verified_purchase",StringType(),True) \
.add("review_headline",StringType(),True) \
.add("review_body",StringType(),True) \
.add("myyear",StringType(),True)

Then pass the schema into the spark_session.read

df = spark_session.read.format("csv")\
.option("header", False)\
.option("delimiter", "\t")\
.option("quote", '"')\
.schema(schema)\
.load("s3://amazon-reviews-pds/tsv/")

Third, assume that there is already an existing catalog table, let read it using Glue DynamicFrame

df2 = glueContext.create_dynamic_frame.from_catalog(
database="default",
table_name=table_name)

Write Data#

There are different ways to write data to S3 and catalog table

  • Glue DynamicFrame write to an existing catalog table
  • Glue DynamicFrame write only data to S3
  • Spark DataFrame write only data to S3

Let create a catalog table using Boto3

import boto3
# create glue client
client = boto3.client(
region_name="ap-southeast-1", service_name="glue"
)
# create tabe in catalog
resp = client.create_table(
CatalogId=catalog_id,
DatabaseName=database_name,
TableInput={
"Name": table_name,
"Description": "test",
"TableType": "EXTERNAL",
"Parameters": {"classification": "parquet"},
"StorageDescriptor": {
"Columns": [
{
"Name": "marketplace",
"Type": "string",
},
{
"Name": "customer_id",
"Type": "string",
},
{
"Name": "review_id",
"Type": "string",
},
{
"Name": "product_id",
"Type": "string",
},
{
"Name": "product_parent",
"Type": "string",
},
{
"Name": "product_title",
"Type": "string",
},
{
"Name": "product_category",
"Type": "string",
},
{
"Name": "star_rating",
"Type": "string",
},
{
"Name": "helpful_vote",
"Type": "string",
},
{
"Name": "total_vote",
"Type": "string",
},
{
"Name": "vine",
"Type": "string",
},
{
"Name": "verified_purchase",
"Type": "string",
},
{
"Name": "review_headline",
"Type": "string",
},
{
"Name": "review_body",
"Type": "string",
},
{
"Name": "myyear",
"Type": "string",
},
],
"Location": "s3://{0}/{1}/".format(
lake_bucket_name, table_name
),
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"SerdeInfo": {
"Name": "ParquetHiveSerDe",
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
},
"Compressed": False,
},
},
)

First option, we can use Glue DynamicFrame to write data to the catalog table

glueContext.write_dynamic_frame.from_catalog(
frame=glue_df,
database= "default",
table_name=table_name,
transformation_ctx="S3bucket_node3",
)

Second option, we can use Glue DynamicFrame to write data to S3

S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=glue_df,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://{0}/glue-dynamicframe-parquet/".format(lake_bucket_name),
# "partitionKeys": ["product_category"],
# "enableUpdateCatalog": True,
# "database":"default",
# "table":"amazon_reviews_parquet_table",
},
format_options={"compression": "uncompressed"},
transformation_ctx="S3bucket_node3",
)

Third option, use Spark DataFrame to write data to S3

df.write.parquet(f"s3://{lake_bucket_name}/spark-dataframe/", mode='overwrite')

Finally, there is another option to write to S3 using glueContext.getSink

sink = glueContext.getSink(
path="s3://{0}/amazon-review-tsv-parquet/".format(data_lake_bucket),
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=[],
# compression="snappy",
enableUpdateCatalog=True,
transformation_ctx="write_sink",
)
sink.setCatalogInfo(
catalogDatabase="default",
catalogTableName="amazon_review_tsv_parquet"
)
sink.setFormat("glueparquet")
sink.writeFrame(df)

Reference#