Getting Started with Stepfunctions#
In this GitHub, I implement some stepfunctions described from aws docs. Stepfunctions can be used for
- Data processing workflows
- Machine Learning pipelines
- Impelement backend with branches (if/else)
Iterate a Lambda Function#
get number of iteration
const getNumIterLambda = new aws_lambda.Function(this, 'GetNumIterLambda', {code: aws_lambda.Code.fromInline(fs.readFileSync(path.resolve(__dirname, './../lambda/get-num-iter-lambda.py'),{ encoding: 'utf-8' })),handler: 'index.main',runtime: aws_lambda.Runtime.PYTHON_3_8})
processing an iteration
const iterLambda = new aws_lambda.Function(this, 'IterLambda', {runtime: aws_lambda.Runtime.PYTHON_3_8,code: aws_lambda.Code.fromInline(fs.readFileSync(path.resolve(__dirname, './../lambda/iter-lambda.py'), {encoding: 'utf-8'})),handler: 'index.main'})
get number of iteration task
const getNumIter = new aws_stepfunctions_tasks.LambdaInvoke(this,'GetNumIter',{lambdaFunction: getNumIterLambda,outputPath: '$.Payload'})
process an iter task
const nextIter = new aws_stepfunctions_tasks.LambdaInvoke(this,'LambdaProcessJob',{lambdaFunction: iterLambda,outputPath: '$.Payload'})
state machine definition
const definition = getNumIter.next(new aws_stepfunctions.Choice(this, 'Loop Completed?').when(aws_stepfunctions.Condition.numberEquals('$.counter', 0),new aws_stepfunctions.Succeed(this, 'Finish')).when(aws_stepfunctions.Condition.numberGreaterThan('$.counter', 0),nextIter))new aws_stepfunctions.StateMachine(this, 'LambdaIterMachine', {stateMachineName: 'LambdaIter',definition: definition})
Pooler Job State Machine#
create the submit lambda
const submit = new aws_lambda.Function(this, 'SubmitLambda', {functionName: 'SubmitFunction',code: new aws_lambda.InlineCode(fs.readFileSync(path.resolve(__dirname, './../lambda/submit.py'), {encoding: 'utf-8'})),handler: 'index.main',timeout: Duration.seconds(10),runtime: aws_lambda.Runtime.PYTHON_3_8})
create the check status lambda
// get status lambda functionconst checkStatus = new aws_lambda.Function(this, 'CheckStatusFunction', {functionName: 'CheckStatusFunction',code: new aws_lambda.InlineCode(fs.readFileSync(path.resolve(__dirname, './../lambda/check_status.py'), {encoding: 'utf-8'})),handler: 'index.main',runtime: aws_lambda.Runtime.PYTHON_3_8,timeout: Duration.seconds(10)})
create a submit task
const submitTask = new aws_stepfunctions_tasks.LambdaInvoke(this, 'SubmitJob', {lambdaFunction: submit,outputPath: '$'})
create a wait
const waitX = new aws_stepfunctions.Wait(this, 'WaitXSeconds', {time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10))})
create a check status
const checkStatusTask = new aws_stepfunctions_tasks.LambdaInvoke(this,'CheckStatusTask',{lambdaFunction: checkStatus,outputPath: '$.Payload'})
create the job failed
const jobFailed = new aws_stepfunctions.Fail(this, 'JobFailed', {cause: 'AWS Batch Job Failed',error: 'Described returned FAILED'})
create the final status
const finalStatus = new aws_stepfunctions_tasks.LambdaInvoke(this,'GetFinalJobStatus',{lambdaFunction: checkStatus,outputPath: '$.Payload'})
chain tasks into a state machine
// create chainconst definition = submitTask.next(waitX).next(checkStatusTask).next(new aws_stepfunctions.Choice(this, 'Job Complete?').when(aws_stepfunctions.Condition.stringEquals('$.status', 'FAILED'),jobFailed).when(aws_stepfunctions.Condition.stringEquals('$.status', 'SUCCEEDED'),finalStatus).otherwise(jobFailed))// state machineconst stateMachine = new aws_stepfunctions.StateMachine(this,'StateMachineDemo',{timeout: Duration.minutes(2),definition: definition})
Transfer Data Record State Machine#
create a table
const table = new aws_dynamodb.Table(this, 'TableStepFuncDemo', {tableName: 'TableStepFuncDemo',partitionKey: {name: 'MessageId',type: aws_dynamodb.AttributeType.STRING}})
create a lambda function
const func = new aws_lambda.Function(this, 'SeedDDBFunction', {functionName: 'SeedDDBFunction',code: aws_lambda.Code.fromInline(fs.readFileSync(path.join(__dirname, './../lambda/seed_ddb.py'), {encoding: 'utf-8'})),handler: 'index.handler',runtime: aws_lambda.Runtime.PYTHON_3_8,environment: {TABLE_NAME: table.tableName}})table.grantWriteData(func)
create a task to seed/generate data into the data
const seedTask = new aws_stepfunctions_tasks.LambdaInvoke(this,'SeedDDBTable',{lambdaFunction: func,outputPath: '$.Payload'})
create a task to retrieve an item from the table
const readNextItem = new aws_stepfunctions_tasks.DynamoGetItem(this,'GetItemFromDb',{table: table,key: {MessageId:aws_stepfunctions_tasks.DynamoAttributeValue.fromString('MessageNo1')},resultPath: '$.DynamoDB'})
create a task to pop the item from the retrieved list
const popItemFromList = new aws_stepfunctions.Pass(this, 'PopItemFromList', {parameters: {'List.$': '$.List[1:]'}})
the condition loop
const conditionLoop = new aws_stepfunctions.Choice(this, 'ConditionLoop', {})
chain tasks into a state machine
const definition = seedTask.next(conditionLoop.when(aws_stepfunctions.Condition.stringEquals('$.List[0]', 'DONE'),new aws_stepfunctions.Succeed(this, 'Finish')).otherwise(readNextItem.next(popItemFromList).next(conditionLoop)))const stateMachine = new aws_stepfunctions.StateMachine(this,'TransferRecordStateMachine',{stateMachineName: 'TransferRecord',definition: definition})