Aravind V
Dev Post

Dev Post

๐Ÿ‡ AWS CDK 101 - ๐Ÿญ StateMachine and StepFunctions replacing our SQS based lambda trigger

๐Ÿ‡ AWS CDK 101 - ๐Ÿญ StateMachine and StepFunctions replacing our SQS based lambda trigger

Aravind V's photo
Aravind V
ยทApr 16, 2022ยท

9 min read

Subscribe to my newsletter and never miss my upcoming articles

Play this article

Table of contents

๐Ÿ”ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

๐Ÿ” Original previous post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted previous post at ๐Ÿ”— dev to @aravindvcyber

In this article, let us refactor our previous event rule which targets messages to a queue that triggers lambda directly into a new rule which will invoke a state machine, which will, in turn, invoke our lambda as a step function.

Benefits achieved in this approach ๐Ÿšฃโ€โ™€๏ธ

There are multiple benefits associated with this approach as follows.

  • AWS Step Functions lets you coordinate multiple AWS services into serverless workflows so you can build and update apps quickly

  • We can detach the direct invocation of our lambda by SQS into an indirect invocation via a state machine.

  • Using state machine, we could do a lot of transformation and conditional checks while we also enjoy the ability to creatively do a lot of orchestration by adding several step functions which could be identified as distinct chunks of steps in our workflow.

  • Thus eventually refracturing away most of the business flow logic from within the lambda into the statemachine definition and thereby making our lambda/processors more generalized and could be shared among various other tasks as well.

  • Also statemachine provides a lot of metrics, logs, and visual reference to the actual point of failure, which we may find a bit hard to trace and find inside the traditional monolithic lambda.

New construct for state machine ๐Ÿšง

Let us start by creating a new file constructs/sfn-simple.ts

We will start by importing the common modules along with stepfunction and stepfunctions_tasks as follows.

import * as lambda from "aws-cdk-lib/aws-lambda";
import * as sfn from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import { Construct } from "constructs";

We do need other minor imports for this construct, but those are already discussed in our other articles and you should be able to understand them implicitly.

Let us add a new props interface sfnProps to the required input information from the stack where this construct has been implemented.


export interface sfnProps {
  triggerFunction: lambda.Function;
  timeout: Duration;
}

In the above block of code, triggerFunction will be our backend lambda which we would write later here when we are done with the statemachine definition. `Timeout will be to limit the maximum total time taken for statemachine invocation.

Construct skeleton ๐Ÿƒ

Let us create a model construct template as shown below.


export class simpleSfnConstruct extends Construct {

  public readonly sfnMachine: sfn.StateMachine

  constructor(scope: Construct, id: string, props: sfnProps) {
    super(scope, id);
  }
}

You could also find the read-only object sfnMachine, which we will use to refer from the stack to the statemachine created from inside the constructor function definition.

const { triggerFunction,timeout } = props;

Usual destructuring of our props inside the constructor.

Lambda payload with taskToken ๐Ÿ”‘


const sfnTaskPayload = sfn.TaskInput.fromObject({
      "MyTaskToken": sfn.JsonPath.taskToken,
      "Record": {
        "messageId.$": "$.id",
        "createdAt.$": "$.time",
        "event.$": "States.StringToJson($.detail.message)"
      }
});

Here sfnTaskPayload will define our payload which we would use to pass as a parameter inside our stepfunction which will be used to invoke the lambda and wait for its completion.

Important things to note here will be the below properties.

  • MyTaskToken which will get the sfn.JsonPath.taskToken from the context data, and then the stepfunction will pause and wait. Later we will get the result from the lambda via SendTaskSuccess and SendTaskFailure which help us to generate the output without polling the lambda, again and again, to check for the status and thereby saving us some state transitions using this MyTaskToken as reference.

  • In the Record section, you could see that we have offloaded a certain part of the message level data extraction from the event message data from the processor code base to the stepfunction itself compared to our previous article. This involves using the JSON path syntax. Such kind of transformation and data extraction before actually invoking the compute help us with fine grain granularity, and control in our workflow logic and will be much useful for the statemachine designer and maintainer.

const recordMsg = new tasks.LambdaInvoke(this, "Record Message", {
      lambdaFunction: triggerFunction,
      timeout: Duration.minutes(1),
      comment: "Record message in dynamo",
      integrationPattern: sfn.IntegrationPattern.WAIT_FOR_TASK_TOKEN,
      inputPath: "$",
      payload: sfnTaskPayload,
      resultSelector: {
        "Payload.$": "$",
        "StatusCode.$": "$.statusCode"
      },
      resultPath: "$.recordResult",
});

lambda connected with sfn

Final status steps ๐Ÿ“ฒ

This is only a formal success and failure point of reference, which we create in our statemachine to better visualize what has happened during the workflow execution.

const jobFailed = new sfn.Fail(this, "Job Failed", {
      comment: "Job Failed"
});
const jobSucceed = new sfn.Succeed(this, "Job Succeed", {
      comment: "Job Succeed"
});

Choice step for branching the workflow โœˆ๏ธ

The choice step is used as a visual and functional reference to help make a decision based on the output from the lambda invocation job status from the previous step. Here we do the decision-making by choosing the next step using the data from the previous step's output.

const checkStatus = new sfn.Choice(this, "Check Status?",{
      inputPath: "$.recordResult"
    })
    .when(sfn.Condition.numberEquals("$.StatusCode", 500), jobFailed)
    .when(sfn.Condition.numberEquals("$.StatusCode", 200), jobSucceed)
    .otherwise(jobFailed);

Stepfunction chaining into statemachine definition ๐Ÿ”—

You can see from the below code that statefunction is nothing but a chain of stepfunctions, which we defined earlier. And every stepfunction is connected to the next one as a simple chain, though it could even contain some branching steps like the choice function.

const sfnDef = recordMsg.next(checkStatus);

state machine json

Statemachine log group ๐ŸŒผ

A new log group is created to contain the logs received from the statemachine execution as shown below. This will be used in the statemachine implementation part.


const sfnLog = new LogGroup(this, "sfnLog", {
      logGroupName: "sfnLogGroup",
      removalPolicy: RemovalPolicy.DESTROY,
      retention: RetentionDays.ONE_WEEK
})

statemachine logging

Statemachine specification ๐Ÿฉ

With that now, we can define the statemachine properties as shown below. Here it includes the sfnDef and sfnLog which is created earlier.

const stateMachine = new sfn.StateMachine(this, "msgStateMachine", {
      definition: sfnDef,
      timeout: timeout,
      logs: {
      destination: sfnLog,
      includeExecutionData: true,
      level: LogLevel.ALL
      }
});

statemachine workflow

Granting invoke lambda to statemachine ๐ŸŒน

Since lambda is a resource, we have to explicitly grant privilege to the statemachine execution role utilizing adding a new IAM policy statement sfnLambdaInvokePolicy shown below. This will help in the Record Message step in the workflow shown above.

const sfnLambdaInvokePolicy = new Policy(this, 'sfnLambdaInvokePolicy');
    sfnLambdaInvokePolicy.addStatements(
      new PolicyStatement({
        actions:[
          "lambda:InvokeFunction"
      ],
        effect: Effect.ALLOW,
        resources: [`${triggerFunction.functionArn}:$LATEST`],
        sid: "sfnLambdaInvokePolicy"
      })
    )
stateMachine.role.attachInlinePolicy(sfnLambdaInvokePolicy)

sfn exec role inline policy

Granting lambda execution role for sending status update ๐Ÿ

Since we are not going to poll the lambda to find the status, again and again, we expect the lambda to callback the statemachine on the job completion results. We have already sent the token part of the payload to the lambda, which will then post a message back to the statemachine giving the status as success or failure based on the scenario. Till then the statemachine will be paused in its current step. Please find the privileges in the form of IAM policy statement which is assigned to the processor lambda execution role to help achieve this.

const lambdaSfnStatusUpdatePolicy = new Policy(this, 'lambdaSfnStatusUpdatePolicy');
    lambdaSfnStatusUpdatePolicy.addStatements(
      new PolicyStatement({
        actions:[
          "states:SendTaskSuccess",
          "states:SendTaskFailure",
      ],
        effect: Effect.ALLOW,
        resources: ['*'],
        sid: "lambdaSfnStatusUpdatePolicy"
      })
    )
triggerFunction.role?.attachInlinePolicy(lambdaSfnStatusUpdatePolicy)

lambda role policy

Setting the sfnMachine readonly property ๐Ÿฒ

This is required to access the statement object from the stack where it is implemented and use it for further integration.

stateMachine.applyRemovalPolicy(RemovalPolicy.DESTROY);

this.sfnMachine = stateMachine

New lambda to record the message into the dynamodb ๐ŸŒท

Let us create a new file under lambda/message-recorder.ts. In this lambda, we are only going to implement the save to dynamodb on the specified table. Besides logic to send status callback for success or failure scenarios, with the right output message.

import { PutItemInput } from "aws-sdk/clients/dynamodb";

import { DynamoDB,StepFunctions } from "aws-sdk";

const sfn = new StepFunctions({ apiVersion: "2016-11-23" });

exports.processor = async function (event: any) {
  const dynamo = new DynamoDB();
  let result: any | undefined = undefined;
  const msg = event.Record;
  const crt_time: number = new Date(msg.createdAt).getTime();
  const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.messageId },
      createdAt: { N: `${crt_time}` },
      event: { S: JSON.stringify(msg.event) },
    },
    ReturnConsumedCapacity: "TOTAL",
  };
  try {
    result = await dynamo.putItem(putData).promise();
  } catch (err) {
    const sendFailure: StepFunctions.SendTaskFailureInput = {
      error: JSON.stringify(err),
      cause: JSON.stringify({
        statusCode: 500,
        headers: { "Content-Type": "text/json" },
        putStatus: {
          messageId: msg.messageId, 
          ProcessorResult: err,
        },
      }),
      taskToken: event.MyTaskToken,
    };

    await sfn.sendTaskFailure(sendFailure, function (err: any, data: any) {
      if (err) console.log(err, err.stack); 
      else console.log(data); 
    });
    return sendFailure;
  }

  const sendSuccess: StepFunctions.SendTaskSuccessInput = {
    output: JSON.stringify({
      statusCode: 200,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: msg.messageId,
        ProcessorResult: result,
      },
    }),
    taskToken: event.MyTaskToken,
  };
  await sfn
    .sendTaskSuccess(sendSuccess, function (err: any, data: any) {
      if (err) console.log(err, err.stack); 
      else console.log(data); 
    })
    .promise();
  return sendSuccess;
};

Defining the lambda inside the stack ๐Ÿƒ

Here we will be using the above code asset to define the lambda resource inside our CDK stack.


const messageRecorder = new lambda.Function(this, "MessageRecorderHandler", {
      runtime: lambda.Runtime.NODEJS_14_X,
      code: lambda.Code.fromAsset("lambda"),
      handler: "message-recorder.processor",
      logRetention: logs.RetentionDays.ONE_MONTH,
      environment: {
        MESSAGES_TABLE_NAME: envParams.messages.tableName || "",
      },
    });

messageRecorder.applyRemovalPolicy(RemovalPolicy.DESTROY);

Implementing the new sfn construct inside our stack ๐ŸŒด

Importing the construct library created earlier.

import {simpleSfnConstruct} from "../constructs/sfn-simple"

Passing the required params and getting an instance object reference by initialing the construct.

const sfnMachine = new simpleSfnConstruct(this, 'sfnMachine', {
      timeout: Duration.seconds(30),
      triggerFunction: messageRecorder 
})

statemachine list

statemachine iam role

Event Target to statemachine ๐Ÿ“ข

const sfnRole = new Role(this, 'Role', {
      assumedBy: new ServicePrincipal('events.amazonaws.com'),
});
const sfnCommonEventTarget = new eventTargets.SfnStateMachine(sfnMachine.sfnMachine,{
      deadLetterQueue: commonEventProcessorQueueDLQ.queue,
      retryAttempts: 3,
      input: RuleTargetInput.fromEventPath("$"),
      role: sfnRole
})

event rule targets

New event rule for the event target ๐Ÿ”ฉ

In this event rule, we use the new bus commonbus and we use the same eventPattern to forward the events to the statemachine defined above.

const sfnEventRule = new Rule(this, `sfnCommonEventProcessorRule`, {
      eventBus: commonBus,
      eventPattern: { source: [`com.devpost.commonevent`] },
      targets: [sfnCommonEventTarget],
      ruleName: "sfnCommonEventProcessorRule",
      enabled: true
});
sfnEventRule.applyRemovalPolicy(RemovalPolicy.DESTROY);

new event rule

Testing with postman ๐ŸŽฟ

Here I will be performing a test, by sending a message to the API endpoint as shown below once I have deployed the solution to my AWS environment.

postman

Querying with the messageId inside the dynamodb, the table is as follows

SELECT * FROM "MessagesTable"  where messageId = 'cdd51245-987a-b3c7-eecf-6d6d63046073'

We can find the message in the dynamodb now.

dynamodb query

Inspecting from AWS console ๐ŸŽณ

You could now check the workflow progress and execution logs from the AWS console.

statemachine executions

event history steps

sfn execution

Find with the event messageId ๐Ÿ€

logging

statemachine success

Thus we have defined a new statemachine and reconfigured our existing event bus role to the new rule which delivers messages to the statemachine that we have built-in this article.

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

โญ We have our next article in serverless, do check out

๐ŸŽ‰ Thanks for supporting! ๐Ÿ™

Would be great if you like to โ˜• Buy Me a Coffee, to help boost my efforts.

๐Ÿ” Original post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted at ๐Ÿ”— dev to @aravindvcyber

๐Ÿ‡ AWS CDK 101 - ๐Ÿญ StateMachine and StepFunctions replacing our SQS based lambda trigger

And much more in my pagehttps://t.co/CuYxnKr0Ig#awscdk #dynamodb #serverless #typescript #awslambda https://t.co/086UGXD0OV

โ€” Aravind V (@Aravind_V7) April 16, 2022

Did you find this article valuable?

Support Aravind V by becoming a sponsor. Any amount is appreciated!

See recent sponsors |ย Learn more about Hashnode Sponsors
ย 
Share this

Impressum

Also you can find my dev.to profile at โœจ @aravindvcyber for similar posts and contributions to community.

Are you wondering about reading more posts like this or writing your own post with hashnode.

Hashnode is free to ๐Ÿ”— sign-up, using my link helps to enable more features in my blog.

Would you like to โ˜• Buy Me a Coffee to help boost my efforts.

Also you can follow me on twitter, to find my latest articles and shares ๐Ÿ“ฃ @Aravind_V7