๐Ÿ“ฑ AWS CDK 101 - ๐Ÿ‘ฏ Fetching JSON from S3 through stepfunction

๐Ÿ“ฑ AWS CDK 101 - ๐Ÿ‘ฏ Fetching JSON from S3 through stepfunction

ยท

6 min read

๐Ÿ”ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

๐Ÿ” Original previous post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted previous post at ๐Ÿ”— dev to @aravindvcyber

In this article, let us refactor one of our previous step functions which invoked lambda which puts records to dynamo using full message from the event data, into one which can read message data from s3 with keys during the event data. Also, this is a direct follow-up to my previous article mentioned above.

Benefits of using S3 data โŒ›๏ธ

These are not the only benefits, it is just only what I just observed in my deployments.

  • Using S3 data directly will make the event detail payload always of the same payload size and very light

  • While a huge message is pumped into the api gateway S3 will act as the place to capture the full message payload and only the s3 object identifier is pushed into the event detail so that we are not pushing a lot of into the actual moving parts.

  • This also helps to simply preprocess the actual message received by checking for the format, data integrity, transformation, and even antivirus scanning.

  • S3 in itself can trigger various event-driven actions asynchronously by its event notifications.

  • Pumping the event details into the step functions will be now much more efficient as we are no longer worried about the size of the data.

  • Also here we have using S3 as staging, if we need high performance and high throughput we can change it to dynamodb for storing this staging data.

Construction Plan ๐Ÿ—

As we mentioned earlier we are trying to read a JSON from s3, from our message recorder lambda function which we have refined in our previous articles for writing to dynamodb.

import { S3 } from "aws-sdk";

const s3 = new S3();

Refactoring put operation inside the lambda into a helper function ๐ŸŽŠ

We have refactored the previously used message recorder function for better reuse of the components as shown below.


const dbPut: any = async (Record: any, msg: any) => {
  const dynamo = new DynamoDB();
  const crt_time: number = new Date(msg.createdAt).getTime();
  const putData: PutItemInput = {
    TableName: process.env.MESSAGES_TABLE_NAME || "",
    Item: {
      messageId: { S: msg.messageId },
      createdAt: { N: `${crt_time}` },
      event: { S: msg.event },
    },
    ReturnConsumedCapacity: "TOTAL",
  };

  console.log("putData", JSON.stringify(putData, undefined, 2));

  await dynamo.putItem(putData).promise();
};

Refactoring the sendSuccess Helper ๐Ÿ”ฎ

We will also make use of this opportunity to refactor the existing SendTaskSuccess statement into a dedicated function that we could efficiently.


const funcSuccess: any = (res: any, mid: string, token: string) => {
  console.log("sending success ", { res });
  const sendSuccess: StepFunctions.SendTaskSuccessInput = {
    output: JSON.stringify({
      statusCode: 200,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: res,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskSuccess(
    sendSuccess,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent success: ", { resultStatus, sendSuccess });
};

Refractoring the sendFailure Helper ๐Ÿ„

Similarly, we will also refactor and create sendTaskFailure module into a new helper function.

const funcFailure: any = (err: any, mid: string, token: string) => {
  console.log("sending failure ", { err });
  const sendFailure: StepFunctions.SendTaskFailureInput = {
    error: JSON.stringify(err),
    cause: JSON.stringify({
      statusCode: 500,
      headers: { "Content-Type": "text/json" },
      putStatus: {
        messageId: mid,
        ProcessorResult: err,
      },
    }),
    taskToken: token,
  };
  const resultStatus = sfn.sendTaskFailure(
    sendFailure,
    (err: any, data: any) => {
      if (err) console.log(err, err.stack);
      else console.log(data);
    }
  );
  console.log("sent failure: ", { resultStatus, sendFailure });
};

Granting read Object access to lambda ๐Ÿƒ

In our previous article we have granted access to the entry handler to putObject, here we will grant access to the recorder function to read data as follows.


stgMsgBucket.grantWrite(eventCounterBus.handler);
stgMsgBucket.grantRead(messageRecorder);

S3 read access

Changes inside the lambda ๐ŸŒ

The below changes will be used inside the lambda handler to read every message using the event.detail content which now has the bucket name and the key from the event.

Here can find that the bucket name and object key is extracted from the message and used to retrieve an object from S3, then it is put into dynamodb for recording using the various helper functions created above.

await Promise.all(
    event.Records.map(async (Record: any) => {
      console.log("Received message:", JSON.stringify(event, undefined, 2));
      const msg = JSON.parse(Record.body).Record;
      const s3Get = await s3
        .getObject({
          Bucket: msg.bucket,
          Key: msg.key,
        })
        .promise();
      const data = s3Get.Body?.toString("utf-8");
      if (data) {
        msg.event = data;
        const token = JSON.parse(Record.body).MyTaskToken;
        await dbPut(Record, msg)
          .then(async (data: any) => {
            await funcSuccess(data, msg.messageId, token);
          })
          .catch(async (err: any) => {
            await funcFailure(err, msg.messageId, token);
          });
      }
    })
  );

Once we get the message, we are updating the message object with the message content and inserting it into dynamodb with the dbPut we have refracted earlier.

Removing the event content from statemachine invocation payload ๐Ÿญ

Now we can remove the usage of the actual message body across the entire pipeline, which makes it reduce the storage used in transmission call during various invocations call.

const sfnTaskPayload = sfn.TaskInput.fromObject({
      MyTaskToken: sfn.JsonPath.taskToken,
      Record: {
        "messageId.$": "$.id",
        "createdAt.$": "$.time",
        // "event.$": "States.StringToJson($.detail.message)",
       // "event.$": "$.detail.message",
        "bucket.$": "$.detail.message.bucket",
        "key.$": "$.detail.message.key"
      },
    });

Also in the entry handler function discussed in the last article as shown below

   const message = JSON.parse(event.body);
  message.uuid = getUuid();
  message.handler = context.awsRequestId;
  message.key = `uploads/${message.uuid}.json`;
  message.bucket = process.env.BucketName || "";
  console.log("Initial request:", JSON.stringify(message, undefined, 2));
  delete message.message;  //new line added, since s3 will have the data

Integration testing ๐Ÿ

New message

sfn instance

s3 data

Final destination

Keypoint async-await ๐Ÿ””

One important thing you may have to learn here would be how we have handled the async-await, to fetch from S3 first and then write to dynamodb, among other async operations.

Beginners most likely may go wrong in using async-await and they may mesh up by getting execution leaks when they implemented callbacks poorly.

By then the trace below can help you understand where the problem is present.

trace

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

โญ We have our next article in serverless, do check out

๐ŸŽ‰ Thanks for supporting! ๐Ÿ™

Would be great if you like to โ˜• Buy Me a Coffee, to help boost my efforts.

๐Ÿ” Original post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted at ๐Ÿ”— dev to @aravindvcyber

๐Ÿš AWS CDK 101 - ๐Ÿฌ Fetching JSON from dynamodb vs S3 through stepfunction @hashnode

Checkout the full collectionhttps://t.co/CuYxnKr0Ig

โ€” Aravind V (@Aravind_V7) May 8, 2022

Did you find this article valuable?

Support Aravind V by becoming a sponsor. Any amount is appreciated!

ย