Getting Started with Stepfunctions#

In this GitHub, I implement some stepfunctions described from aws docs. Stepfunctions can be used for

  • Data processing workflows
  • Machine Learning pipelines
  • Impelement backend with branches (if/else)
Stepfunctions Input Output Processing

Iterate a Lambda Function#

Screen Shot 2022-08-11 at 13 30 22

get number of iteration

const getNumIterLambda = new aws_lambda.Function(this, 'GetNumIterLambda', {
code: aws_lambda.Code.fromInline(
fs.readFileSync(
path.resolve(__dirname, './../lambda/get-num-iter-lambda.py'),
{ encoding: 'utf-8' }
)
),
handler: 'index.main',
runtime: aws_lambda.Runtime.PYTHON_3_8
})

processing an iteration

const iterLambda = new aws_lambda.Function(this, 'IterLambda', {
runtime: aws_lambda.Runtime.PYTHON_3_8,
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.resolve(__dirname, './../lambda/iter-lambda.py'), {
encoding: 'utf-8'
})
),
handler: 'index.main'
})

get number of iteration task

const getNumIter = new aws_stepfunctions_tasks.LambdaInvoke(
this,
'GetNumIter',
{
lambdaFunction: getNumIterLambda,
outputPath: '$.Payload'
}
)

process an iter task

const nextIter = new aws_stepfunctions_tasks.LambdaInvoke(
this,
'LambdaProcessJob',
{
lambdaFunction: iterLambda,
outputPath: '$.Payload'
}
)

state machine definition

const definition = getNumIter.next(
new aws_stepfunctions.Choice(this, 'Loop Completed?')
.when(
aws_stepfunctions.Condition.numberEquals('$.counter', 0),
new aws_stepfunctions.Succeed(this, 'Finish')
)
.when(
aws_stepfunctions.Condition.numberGreaterThan('$.counter', 0),
nextIter
)
)
new aws_stepfunctions.StateMachine(this, 'LambdaIterMachine', {
stateMachineName: 'LambdaIter',
definition: definition
})

Pooler Job State Machine#

Screen Shot 2022-07-31 at 15 30 38

create the submit lambda

const submit = new aws_lambda.Function(this, 'SubmitLambda', {
functionName: 'SubmitFunction',
code: new aws_lambda.InlineCode(
fs.readFileSync(path.resolve(__dirname, './../lambda/submit.py'), {
encoding: 'utf-8'
})
),
handler: 'index.main',
timeout: Duration.seconds(10),
runtime: aws_lambda.Runtime.PYTHON_3_8
})

create the check status lambda

// get status lambda function
const checkStatus = new aws_lambda.Function(this, 'CheckStatusFunction', {
functionName: 'CheckStatusFunction',
code: new aws_lambda.InlineCode(
fs.readFileSync(path.resolve(__dirname, './../lambda/check_status.py'), {
encoding: 'utf-8'
})
),
handler: 'index.main',
runtime: aws_lambda.Runtime.PYTHON_3_8,
timeout: Duration.seconds(10)
})

create a submit task

const submitTask = new aws_stepfunctions_tasks.LambdaInvoke(this, 'SubmitJob', {
lambdaFunction: submit,
outputPath: '$'
})

create a wait

const waitX = new aws_stepfunctions.Wait(this, 'WaitXSeconds', {
time: aws_stepfunctions.WaitTime.duration(Duration.seconds(10))
})

create a check status

const checkStatusTask = new aws_stepfunctions_tasks.LambdaInvoke(
this,
'CheckStatusTask',
{
lambdaFunction: checkStatus,
outputPath: '$.Payload'
}
)

create the job failed

const jobFailed = new aws_stepfunctions.Fail(this, 'JobFailed', {
cause: 'AWS Batch Job Failed',
error: 'Described returned FAILED'
})

create the final status

const finalStatus = new aws_stepfunctions_tasks.LambdaInvoke(
this,
'GetFinalJobStatus',
{
lambdaFunction: checkStatus,
outputPath: '$.Payload'
}
)

chain tasks into a state machine

// create chain
const definition = submitTask
.next(waitX)
.next(checkStatusTask)
.next(
new aws_stepfunctions.Choice(this, 'Job Complete?')
.when(
aws_stepfunctions.Condition.stringEquals('$.status', 'FAILED'),
jobFailed
)
.when(
aws_stepfunctions.Condition.stringEquals('$.status', 'SUCCEEDED'),
finalStatus
)
.otherwise(jobFailed)
)
// state machine
const stateMachine = new aws_stepfunctions.StateMachine(
this,
'StateMachineDemo',
{
timeout: Duration.minutes(2),
definition: definition
}
)

Transfer Data Record State Machine#

Stepfunctions DynamoDB

create a table

const table = new aws_dynamodb.Table(this, 'TableStepFuncDemo', {
tableName: 'TableStepFuncDemo',
partitionKey: {
name: 'MessageId',
type: aws_dynamodb.AttributeType.STRING
}
})

create a lambda function

const func = new aws_lambda.Function(this, 'SeedDDBFunction', {
functionName: 'SeedDDBFunction',
code: aws_lambda.Code.fromInline(
fs.readFileSync(path.join(__dirname, './../lambda/seed_ddb.py'), {
encoding: 'utf-8'
})
),
handler: 'index.handler',
runtime: aws_lambda.Runtime.PYTHON_3_8,
environment: {
TABLE_NAME: table.tableName
}
})
table.grantWriteData(func)

create a task to seed/generate data into the data

const seedTask = new aws_stepfunctions_tasks.LambdaInvoke(
this,
'SeedDDBTable',
{
lambdaFunction: func,
outputPath: '$.Payload'
}
)

create a task to retrieve an item from the table

const readNextItem = new aws_stepfunctions_tasks.DynamoGetItem(
this,
'GetItemFromDb',
{
table: table,
key: {
MessageId:
aws_stepfunctions_tasks.DynamoAttributeValue.fromString('MessageNo1')
},
resultPath: '$.DynamoDB'
}
)

create a task to pop the item from the retrieved list

const popItemFromList = new aws_stepfunctions.Pass(this, 'PopItemFromList', {
parameters: {
'List.$': '$.List[1:]'
}
})

the condition loop

const conditionLoop = new aws_stepfunctions.Choice(this, 'ConditionLoop', {})

chain tasks into a state machine

const definition = seedTask.next(
conditionLoop
.when(
aws_stepfunctions.Condition.stringEquals('$.List[0]', 'DONE'),
new aws_stepfunctions.Succeed(this, 'Finish')
)
.otherwise(readNextItem.next(popItemFromList).next(conditionLoop))
)
const stateMachine = new aws_stepfunctions.StateMachine(
this,
'TransferRecordStateMachine',
{
stateMachineName: 'TransferRecord',
definition: definition
}
)

Reference#

  1. Invoke lambda arn save payload
  2. Stepfunctions job poller
  3. Stepfunctions workshop
  4. Low-code speech ML and stepfunctions
  5. Sagemaker data wrangler into MLOps workflows