Aravind V
Dev Post

Dev Post

๐Ÿ‰ AWS CDK 101 - ๐Ÿ‡ Using batched dynamodb stream to delete item on another dynamodb table

๐Ÿ‰ AWS CDK 101 - ๐Ÿ‡ Using batched dynamodb stream to delete item on another dynamodb table

Aravind V's photo
Aravind V
ยทMay 14, 2022ยท

5 min read

Subscribe to my newsletter and never miss my upcoming articles

Play this article

Table of contents

๐Ÿ”ฐ Beginners new to AWS CDK, please do look at my previous articles one by one in this series.

If in case missed my previous article, do find it with the below links.

๐Ÿ” Original previous post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted previous post at ๐Ÿ”— dev to @aravindvcyber

In this article, let us add a new async batch integration to our message dynamodb table which will help us to delete the processed records from the staging dynamodb table.

Benefits in this approach ๐Ÿ’ฆ

  • In this approach we tried to clear the staging table data, asynchronously using the dynamodb stream directly invoking a lambda.
  • So we can use this in systems where we may not directly do the scavenging synchronous in a compute-intensive workload. Saving compute by avoiding wait time for these I/O.
  • Again, some may argue that anyway I am making a lambda call elsewhere separately, which could be thought of as a good decoupling strategy as well.
  • But invocation charges and compute still apply there. Yet, we need no one more thing that the streams can be ready in batches by our lambda and the default is 100, so one invocation.
  • At the same time, the handler can do a batch request to delete the data from the dynamodb in a single request.
  • So not only you can read a batch of streams, but you can also perform batch delete which, is fast as well as we get this delete operation into chunks of 25 max limit which gives a great reduction in the number of I/O operations from lambda.
  • Maybe I will write a separate article later about it, for now, it is not limited to the 1:1 delete operation here.

batch size

batched stream

Planning ๐Ÿž

Here we will be making use of the dynamodb streams to trigger the deleteItem action by invoking a new lambda handler function which achieves our objective.

Construction ๐Ÿ’ฎ

We need a new lambda function code that has a fresh handler to receive the dynamodb stream event object and process it as shown below.

Create a new lambda function in the file lambda/message-stream.ts.

Here you may find that we are targeting an event name to be INSERT, likewise we can have finer control over our desired outcome during these stream invocations as shown below.

New handler function logic ๐Ÿ’

exports.created = async function (event: any) {
  console.log("Received stream:", JSON.stringify(event, undefined, 2));
  const results: any[] = [];
  await Promise.all(
    event.Records.map(async (Record: any) => {
      console.log(JSON.stringify(Record, undefined, 2));
      if (Record.eventName === "INSERT") {
        results.push(await deleteDbItem(Record.dynamodb.Keys));
      }
    })
  );
  results.map((res) => console.log(res));
};

Helper function dynamodb deleteItem ๐ŸŠ

Simple helper function to perform deleteItem from a dynamodb table.

const deleteDbItem: any = async (keys: any) => {
  console.log("Deleting: ", { keys });
  const deleteData: DeleteItemInput = {
    TableName: process.env.STAGING_MESSAGES_TABLE_NAME || "",
    Key: keys,
    ReturnConsumedCapacity: "TOTAL",
  };
  console.log("deleteItem: ", JSON.stringify(deleteData, undefined, 2));
  return await dynamo.deleteItem(deleteData).promise();
};

Minor changes to the dynamodb table definition ๐ŸŠ

I have highlighted the necessary changes, we need to perform dynamodb stream generation for our table.

Most importantly, I have requested both the new and old images, which will have all the necessary data, however, we are not going to use all of the data. This is only for demonstration purposes.

const messages = new dynamodb.Table(this, "MessagesTable", {
      tableName: process.env.messagesTable,
      sortKey: { name: "createdAt", type: dynamodb.AttributeType.NUMBER },
      partitionKey: { name: "messageId", type: dynamodb.AttributeType.STRING },
      encryption: dynamodb.TableEncryption.AWS_MANAGED,
      readCapacity: 5,
      writeCapacity: 5,
      //New item added below
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES
});

db stream

Defining the new lambda ๐Ÿ‡

Here we will use the code bloc used above to provision our lambda as shown below inside our master stack.

const messageStreamFunc = new lambda.Function(this, "messageStreamFunc", {
      runtime: lambda.Runtime.NODEJS_14_X,
      code: lambda.Code.fromAsset("lambda"),
      handler: "message-stream.created",
      logRetention: logs.RetentionDays.ONE_MONTH,
      tracing: Tracing.ACTIVE,
      layers: [nodejsUtils,nodejsXray],
      environment: {
        STAGING_MESSAGES_TABLE_NAME: envParams.messages.stgTableName || "",
      },
});
messageStreamFunc.applyRemovalPolicy(RemovalPolicy.DESTROY);

Function

trigger

Grant permission for the handler to write to the table ๐Ÿ’จ

Also, our handler must have sufficient access to delete from the other dynamodb table stgMessages.

stgMessages.grantWriteData(messageStreamFunc);

db permission

Adding Event source to the lambda function ๐ŸŒฝ

It is time not to connect the handler function to the dynamodb event source as follows.

messageStreamFunc.addEventSource(
    new DynamoEventSource(messages, {
    startingPosition: lambda.StartingPosition.LATEST,
    })
)

or you can improve the batch-size and window in seconds for long polling as follows.

messageStreamFunc.addEventSource(new DynamoEventSource(messages, {
      startingPosition: lambda.StartingPosition.LATEST,
      batchSize:100,
      maxBatchingWindow: Duration.seconds(60)
}))

Sample dynamodb stream object ๐Ÿฅฃ

I have shared the dynamodb stream object used as payload to invoke our handler lambda below.


{
  "Records": [
    {
      "eventID": "5c9aa5395f970324e088a32578ee0a66",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "ap-south-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1652355068,
        "Keys": {
          "createdAt": {
            "N": "1652355039000"
          },
          "messageId": {
            "S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
          }
        },
        "NewImage": {
          "createdAt": {
            "N": "1652355039000"
          },
          "messageId": {
            "S": "47d97141-01e7-42a1-b6b3-0c59a6a3827e"
          },
          "event": {
            "S": "{\n    \"message\": {\"new\": \"A secret message\"}\n}"
          }
        },
        "SequenceNumber": "20927100000000035334874026",
        "SizeBytes": 173,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:ap-south-1:8888888:table/MessagesTable/stream/2022-05-*****"
    }
  ]
}

Console log during execution ๐Ÿฟ

Finally post-execution, we could find the above JSON payload we have received in the event object and which is then used to delete from our staging table you may find the results below in cloud watch logs.

deleting

In the next article, we will demonstrate how we will use a similar approach to delete Object from S3, which we have previously created.

We will be adding more connections to our stack and making it more usable in the upcoming articles by creating new constructs, so do consider following and subscribing to my newsletter.

โญ We have our next article in serverless, do check out

๐ŸŽ‰ Thanks for supporting! ๐Ÿ™

Would be great if you like to โ˜• Buy Me a Coffee, to help boost my efforts.

Buy Me a Coffee at ko-fi.com

๐Ÿ” Original post at ๐Ÿ”— Dev Post

๐Ÿ” Reposted at ๐Ÿ”— dev to @aravindvcyber

๐Ÿ‰ AWS CDK 101 - ๐Ÿ‡ Using batched dynamodb stream to delete item on another dynamodb table
@hashnode

Checkout more like this in my pagehttps://t.co/CuYxnKr0Ig#TheHashnodeWriteathon#awscdk #aws #awslambda #thwcloud-computing https://t.co/RlrahA0wrI

โ€” Aravind V (@Aravind_V7) May 14, 2022

Did you find this article valuable?

Support Aravind V by becoming a sponsor. Any amount is appreciated!

See recent sponsors |ย Learn more about Hashnode Sponsors
ย 
Share this

Impressum

Also you can find my dev.to profile at โœจ @aravindvcyber for similar posts and contributions to community.

Are you wondering about reading more posts like this or writing your own post with hashnode.

Hashnode is free to ๐Ÿ”— sign-up, using my link helps to enable more features in my blog.

Would you like to โ˜• Buy Me a Coffee to help boost my efforts.

Also you can follow me on twitter, to find my latest articles and shares ๐Ÿ“ฃ @Aravind_V7