Getting Started with Stepfunctions#

In this GitHub, I implement some stepfunctions described from aws docs. Stepfunctions can be used for

  • Data processing workflows
  • Machine Learning pipelines
  • Impelement backend with branches (if/else)
Stepfunctions Input Output Processing

Iterate a Lambda Function#

get number of iteration

const getNumIterLambda = new aws_lambda.Function(this, 'GetNumIterLambda', {
code: aws_lambda.Code.fromInline(
path.resolve(__dirname, './../lambda/'),
{ encoding: 'utf-8' }
handler: 'index.main',
runtime: aws_lambda.Runtime.PYTHON_3_8

processing an iteration

const iterLambda = new aws_lambda.Function(this, 'IterLambda', {
runtime: aws_lambda.Runtime.PYTHON_3_8,
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.resolve(__dirname, './../lambda/'), {
encoding: 'utf-8'
handler: 'index.main'

get number of iteration task

const getNumIter = new aws_stepfunctions_tasks.LambdaInvoke(
lambdaFunction: getNumIterLambda,
outputPath: '$.Payload'

process an iter task

const nextIter = new aws_stepfunctions_tasks.LambdaInvoke(
lambdaFunction: iterLambda,
outputPath: '$.Payload'

state machine definition

const definition =
new aws_stepfunctions.Choice(this, 'Loop Completed?')
aws_stepfunctions.Condition.numberEquals('$.counter', 0),
new aws_stepfunctions.Succeed(this, 'Finish')
aws_stepfunctions.Condition.numberGreaterThan('$.counter', 0),
new aws_stepfunctions.StateMachine(this, 'LambdaIterMachine', {
stateMachineName: 'LambdaIter',
definition: definition

Pooler Job State Machine#

create the submit lambda

const submit = new aws_lambda.Function(this, 'SubmitLambda', {
functionName: 'SubmitFunction',
code: new aws_lambda.InlineCode(
fs.readFileSync(path.resolve(__dirname, './../lambda/'), {
encoding: 'utf-8'
handler: 'index.main',
timeout: Duration.seconds(10),
runtime: aws_lambda.Runtime.PYTHON_3_8

create the check status lambda

// get status lambda function
const checkStatus = new aws_lambda.Function(this, 'CheckStatusFunction', {
functionName: 'CheckStatusFunction',
code: new aws_lambda.InlineCode(
fs.readFileSync(path.resolve(__dirname, './../lambda/'), {
encoding: 'utf-8'
handler: 'index.main',
runtime: aws_lambda.Runtime.PYTHON_3_8,
timeout: Duration.seconds(10)

create a submit task

const submitTask = new aws_stepfunctions_tasks.LambdaInvoke(this, 'SubmitJob', {
lambdaFunction: submit,
outputPath: '$'

create a wait

const waitX = new aws_stepfunctions.Wait(this, 'WaitXSeconds', {
time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10))

create a check status

const checkStatusTask = new aws_stepfunctions_tasks.LambdaInvoke(
lambdaFunction: checkStatus,
outputPath: '$.Payload'

create the job failed

const jobFailed = new aws_stepfunctions.Fail(this, 'JobFailed', {
cause: 'AWS Batch Job Failed',
error: 'Described returned FAILED'

create the final status

const finalStatus = new aws_stepfunctions_tasks.LambdaInvoke(
lambdaFunction: checkStatus,
outputPath: '$.Payload'

chain tasks into a state machine

// create chain
const definition = submitTask
new aws_stepfunctions.Choice(this, 'Job Complete?')
aws_stepfunctions.Condition.stringEquals('$.status', 'FAILED'),
aws_stepfunctions.Condition.stringEquals('$.status', 'SUCCEEDED'),
// state machine
const stateMachine = new aws_stepfunctions.StateMachine(
timeout: Duration.minutes(2),
definition: definition

Transfer Data Record State Machine#

Stepfunctions DynamoDB

create a table

const table = new aws_dynamodb.Table(this, 'TableStepFuncDemo', {
tableName: 'TableStepFuncDemo',
partitionKey: {
name: 'MessageId',
type: aws_dynamodb.AttributeType.STRING

create a lambda function

const func = new aws_lambda.Function(this, 'SeedDDBFunction', {
functionName: 'SeedDDBFunction',
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.join(__dirname, './../lambda/'), {
encoding: 'utf-8'
handler: 'index.handler',
runtime: aws_lambda.Runtime.PYTHON_3_8,
environment: {
TABLE_NAME: table.tableName

create a task to seed/generate data into the data

const seedTask = new aws_stepfunctions_tasks.LambdaInvoke(
lambdaFunction: func,
outputPath: '$.Payload'

create a task to retrieve an item from the table

const readNextItem = new aws_stepfunctions_tasks.DynamoGetItem(
table: table,
key: {
resultPath: '$.DynamoDB'

create a task to pop the item from the retrieved list

const popItemFromList = new aws_stepfunctions.Pass(this, 'PopItemFromList', {
parameters: {
'List.$': '$.List[1:]'

the condition loop

const conditionLoop = new aws_stepfunctions.Choice(this, 'ConditionLoop', {})

chain tasks into a state machine

const definition =
aws_stepfunctions.Condition.stringEquals('$.List[0]', 'DONE'),
new aws_stepfunctions.Succeed(this, 'Finish')
const stateMachine = new aws_stepfunctions.StateMachine(
stateMachineName: 'TransferRecord',
definition: definition


