Introduction#

GitHub This note shows a simple workflow using Glue. There are some basic concepts

  • workflow is a container of multiple glue job
  • glue job running your code on (container) data processing unit (DPU) or worker type
  • there are different job type such as pythonshell, spark (glueetl)
  • trigger to kick start a job
  • starting trigger (on per workflow) to start a workflow
  • trigger has mutliple type: ON_DEMAND, CRON, CONDITIONAL
  • glue job bookmark
glue-workflow-1

Prepare ETL Code#

In CDK, we can use s3 asset to upload our ETL processing code a s3 bucket, then glue job will take these code to run on worker (DPU, G1.X, G2.X)

const pyScriptPath = new Asset(this, 'py-glue-demo', {
path: path.join(__dirname, './../script/python_demo.py')
})
const sparkScriptPath = new Asset(this, 'spark-script-demo', {
path: path.join(__dirname, './../script/read_amazon_reviews_tsv.py')
})

Role for Glue#

Glue job need to access s3 for code, and save results to destination bucket. In case of creating tables in a lake registered to Lake Formation, then Lake Formation need to grant data location permissions to the Glue role.

const jobRole = new cdk.aws_iam.Role(this, 'RoleForGlueJobDemo', {
roleName: 'AWSGlueServiceRole-Demo',
assumedBy: new cdk.aws_iam.ServicePrincipal('glue.amazonaws.com')
})
jobRole.addManagedPolicy(
cdk.aws_iam.ManagedPolicy.fromManagedPolicyArn(
this,
'AWSGlueServicePolicy',
'arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole'
)
)
jobRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
effect: Effect.ALLOW,
actions: ['logs:*'],
resources: ['*']
})
)
jobRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:*'],
resources: ['*']
})
)
jobRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
effect: Effect.ALLOW,
actions: ['lakeformation:GetDataAccess'],
resources: ['*']
})
)
jobRole.addToPolicy(
new cdk.aws_iam.PolicyStatement({
effect: Effect.ALLOW,
actions: ['iam:PassRole'],
resources: ['*']
})
)

Pythonshell Job#

Python shell job run with either 0.0625 DPU or 1 DPU (4vCPU and 16GB RAM), this is not running in a Spark cluster. These jobs have a 1-minute minimum billing duration. Compatible python versions are 2.7, 3.6 and 3.9, please check supported libs here

const pythonJob = new CfnJob(this, 'SimpleJob', {
name: 'SimpleJobTest',
command: {
name: 'pythonshell',
pythonVersion: '3.9',
scriptLocation: pyScriptPath.s3ObjectUrl
},
defaultArguments: {
'--name': ''
},
role: jobRole.roleArn,
executionProperty: {
maxConcurrentRuns: 1
},
maxRetries: 2,
timeout: 60,
maxCapacity: 1
})

Spark Job#

This job can run in multiple DPU or worker type (G1.X, 4vCPUs, 16GB) (G2.X, 8vCPUs, 32GB), (G.025X, 2 vCPUs, 4GB), and (Starndard, 4vCPUs, 16GB). The command name should be glueetl and should check the glueVersion for spark and python version supported. By default, AWS Glue allocates 10 DPUs to each spark job and 2 DPUs to each spark streaming job. Job using AWS Glue version 0.9 or 1.0 have a 10-minute minimum billing duration, while jobs that use AWS Glue version 2.0 and later have 1-minute minimum. detail

const sparkJob = new CfnJob(this, 'WorflowDemo-Read-S3', {
name: 'WorkFlowDemo-Read-S3',
command: {
name: 'glueetl',
pythonVersion: '3',
scriptLocation: sparkScriptPath.s3ObjectUrl
},
defaultArguments: {
'--source_data_path': props.sourceDataPath,
'--dest_data_path': props.destDataPath
},
role: jobRole.roleArn,
executionProperty: {
maxConcurrentRuns: 1
},
// spark job
glueVersion: '3.0',
maxRetries: 1,
timeout: 1200,
// spark etl job can have more than 2 dpu
// use dpu then do not user worker type
// maxCapacity: 6,
numberOfWorkers: 5,
workerType: 'G.1X'
})

Glue Crawler#

  • Setup classifider to skip header in csv
  • Lake Formation grant data location for creating catalog tables

Ceate a csv classifier

const classifier = new CfnClassifier(this, 'Classifier-GlueWorkFlow-S3-Stack', {
csvClassifier: {
name: classifierName,
containsHeader: 'PRESENT',
delimiter: ',',
header: [
'marketplace',
'customer_id',
'review_id',
'product_id',
'product_parent',
'product_title',
'product_category',
'star_rating',
'helpful_votes',
'total_votes',
'vine',
'verified_purchase',
'review_headline',
'review_body',
'review_date'
],
quoteSymbol: '"'
}
})

Then create a crawler which crawl tsv and create a catalog table

const crawler = new CfnCrawler(this, `Crawler-GlueWorkFlowDemo-S3-Stack`, {
name: 'Crawler-GlueWorkFlowDemo-S3-Stack',
role: jobRole.roleArn,
targets: {
s3Targets: [
{
path: props.destDataPath,
sampleSize: 1
}
]
},
databaseName: 'default',
description: 'craw transformed data',
tablePrefix: props.tableName,
classifiers: [
(classifier.csvClassifier as CfnClassifier.CsvClassifierProperty).name!
]
})

Glue Trigger#

Trigger should lives in a workflow, to let create a workflow first.

const workflow = new CfnWorkflow(this, 'WorkFlowDemo-S3-Stack', {
name: 'WorflowDemo-S3-Stack',
description: 'demo workflow'
})

There is only one starting trigger for a workflow, detail

const triggerPythonJob = new CfnTrigger(this, 'TriggerPythonJob-S3-Stack', {
name: 'TriggerPythonJob-S3-Stack',
description: 'trigger the simple job',
actions: [
{
jobName: pythonJob.name,
timeout: 300
},
{
jobName: sparkJob.name,
timeout: 300
}
],
workflowName: workflow.name,
type: 'ON_DEMAND'
})

Then jobs can be chain using tringger (conditional). Please note that you have to activate the trigger or setting startOnCreation to true.

const conditionTrigger = new CfnTrigger(this, 'ConditionTrigger-S3-Stack', {
name: 'ConditionTrigger-S3-Stack',
description: 'when both jobs done trigger this',
startOnCreation: true,
actions: [
{
crawlerName: crawler.name,
timeout: 600
}
],
workflowName: workflow.name,
type: 'CONDITIONAL',
predicate: {
conditions: [
{
logicalOperator: 'EQUALS',
state: 'SUCCEEDED',
jobName: sparkJob.name
},
{
logicalOperator: 'EQUALS',
state: 'SUCCEEDED',
jobName: pythonJob.name
}
],
logical: 'AND'
}
})

Job Bookmark#

It track information states of job and prevent duplicatte processing data which already processed

  • S3 and object timestamp
  • RDS and bookmark key column which keep increasing

Let create a bookmark job stack

new aws_glue.CfnJob(this, "GlueJobBookMarkDemo", {
name: "GlueJobBookMarkDemo1",
command: {
name: "glueetl",
pythonVersion: "3",
scriptLocation: sparkScriptPath.s3ObjectUrl,
},
defaultArguments: {
"--source_data_path": props.sourceDataPath,
"--dest_data_path": props.destDataPath,
"--job-bookmark-option": JobBookMarkOption.ENABLE,
"--enable-auto-scaling": true,
},
role: role.roleArn,
executionProperty: {
maxConcurrentRuns: 1,
},
glueVersion: "3.0",
maxRetries: 0,
timeout: 1200,
// enable autosclaing this is max_num_worker
numberOfWorkers: 5,
workerType: "G.1X",
});

Let create a simple ETL pysparck script which download data from a S3, transform and write to another S3

# parse arguments
args = getResolvedOptions(sys.argv, ["JOB_NAME", "source_data_path", "dest_data_path"])
# job bookmark enable
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# read data from source
df_glue = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": '"',
"withHeader": True,
"separator": "\t",
},
connection_type="s3",
format="csv",
connection_options={
"paths": [args["source_data_path"]],
"recurse": True,
},
transformation_ctx="df_glue",
)
# transform
df_clean = xxx
# write data to destination
glueContext.write_dynamic_frame.from_options(
frame=df_glue_clean,
connection_type="s3",
format="glueparquet",
connection_options={
"path": args["dest_data_path"],
},
format_options={"compression": "uncompressed"},
transformation_ctx="S3bucket_node3",
)
# job bookmark commit
job.commit()

To check how it works, upload a csv file to source

aws s3 cp xxx s3://bucket-name/source-csv/001.tsv.gz

Run the Glue job and check the output

aws s3 ls s3://bucket-name/job-bookmark/

Run the job again, and validate that no new data created in output

aws s3 ls s3://bucket-name/job-bookmark/

Upload another file to source and run the job again

aws s3 cp xxx s3://bucket-name/source-csv/002.tsv.gz

Check output and validate that only one new file created in the output

aws s3 ls s3://bucket-name/job-bookmark/

Reference#