Setup

Implementation

In this module, you’ll setup all of the resources needed to support processing records from the Kinesis Data Stream wildrydes including a Dynamo DB table, a Lambda function, an IAM role, a Kinesis Data Stream, and a SQS queue.

Resources

  1. Create a Dynamo DB Table
  2. Create an SQS On-Error Destination Queue
  3. Create an IAM Role
  4. Create a Lambda Function

1. Create an Amazon DynamoDB table


Use the Amazon DynamoDB console to create a new DynamoDB table. This table is used to store the unicorn data from the AWS Lambda function. We will call your table UnicornSensorData and give it a Partition key called Name of type String and a Sort key called StatusTime of type String. Use the defaults for all other settings.

After you’ve created the table, note the Amazon Resource Name (ARN) for use in the next section.

✅ Step-by-step Instructions

  1. Go to the AWS Management Console, choose Services then select DynamoDB under Database. Alternatively, you can use the search bar and type DynamoDB in the search dialog box.

  2. Click Create table.

  3. Enter UnicornSensorData for the Table name.

  4. Enter Name for the Partition key and select String for the key type.

  5. Tick the Add sort key checkbox. Enter StatusTime for the Sort key and select String for the key type.

  6. Leave the Use default settings box checked and choose Create.

  7. Once the table is created, Click on the hyperlink on the table name. This will take you to the new screen. Under General information, You will see Amazon Resource Name (ARN). Copy and save the Amazon Resource Name (ARN) in the scratchpad. You will use this when creating IAM role.


2. Create an SQS On-Error Destination Queue


Use the Amazon SQS console to create a new queue nammed wildrydes-queue. Your Lambda function will send messages to this queue when the processing is failed based on the retry settings.

After you’ve created the queue, note the Amazon Resource Name (ARN) for use in later sections.

✅ Step-by-step Instructions

  1. Go to the AWS Management Console, choose Services then select Simple Queue Service under Application Integration. Alternatively, you can use the search bar and type Simple Queue Service in the search dialog box.

  2. Click the orange Create queue button

  3. For the Name field enter wildrydes-queue

  4. Leave the rest of the options as the defaults and click “Create queue”

  5. Copy and save the ARN of the SQS queue in the scratchpad as you will need it for Lambda configuration


3. Create an IAM role for your Lambda function


Use the IAM console to create a new role. Name it WildRydesStreamProcessorRole and select Lambda for the role type. Create a policy that allows dynamodb:BatchWriteItem access to the DynamoDB table created in the last section and sqs:SendMessage to send failed messages to DLQ and attach it to the new role. Also, Attach the managed policy called AWSLambdaKinesisExecutionRole to this role in order to grant permissions for your function to read from Amazon Kinesis streams and to log to Amazon CloudWatch Logs.

✅ Step-by-step Instructions

  1. From the AWS Console, click on Services and then select IAM in the Security, Identity & Compliance section. Alternatively, you can use the search bar and type IAM in the search dialog box.

  2. Select Policies from the left navigation and then click Create policy.

  3. Using the Visual editor, we’re going to create an IAM policy to allow our Lambda function access to the DynamoDB table created in the last section. To begin, click Service, begin typing DynamoDB in Find a service, and click DynamoDB.

  4. Click Action, begin typing BatchWriteItem in Filter actions, and tick the BatchWriteItem checkbox.

  5. Click Resources, click Add ARN in table, Copy the ARN of the DyanamoDB table from the scratchpad and paste it in Specify ARN for table. Alternatively, you can construct the ARN of the DynamoDB table you created in the previous section by specifying the Region, Account, and Table Name.

    In Region, enter the AWS Region in which you created the DynamoDB table in the previous section, e.g.: us-east-1.

    In Table Name, enter UnicornSensorData.

    You should see your ARN in the Specify ARN for table field and it should look similar to:

  6. Click Add.

    When completed, your console should look similar to this:

  7. Next we are going to add permissions to allow the Lambda function access to the SQS On-Error Destination Queue. Click Add additional permissions click Service begin typing SQS in Find a service and click SQS.

  8. Click Action begin typing SendMessage in Filter actions, and tick the SendMessage checkbox.

  9. Click Resources, click Add ARN in queue. Copy the ARN of the SQS queue from the scratchpad and paste it in Specify ARN for queue. Alternatively, you can construct the ARN of the SQS queue you created in the previous section by specifying the Region, Account, and Queue Name.

    In Region, enter the AWS Region in which you created the SQS queue in the previous section, e.g.: us-east-1.

    In Queue Name, enter wildrydes-queue.

    You should see your ARN in the Specify ARN for queue field and it should look similar to:

  10. Click Next: Tags.

  11. Click Next: Review.

  12. Enter WildRydesDynamoDBWritePolicy in the Name field.

  13. Click Create policy.

  14. Select Roles from the left navigation and then click Create role.

  15. Click Lambda for the role type from the AWS service section.

  16. Click Next: Permissions.

  17. Begin typing AWSLambdaKinesisExecutionRole in the Filter text box and check the box next to that role.

  18. Begin typing WildRydesDynamoDBWritePolicy in the Filter text box and check the box next to that policy.

  19. Click Next: Tags.

  20. Click Next: Review.

  21. Enter WildRydesStreamProcessorRole for the Role name.

  22. Click Create role.

  23. Begin typing WildRydesStreamProcessorRole in the Search text box

  24. Click on WildRydesStreamProcessorRole and it should look similar to:


4. Create a Lambda function to process the stream


Create a Lambda function called WildRydesStreamProcessor that will be triggered whenever a new record is avaialble in the wildrydes stream. Use the provided index.js implementation for your function code. Create an environment variable with the key TABLE_NAME and the value UnicornSensorData. Configure the function to use the WildRydesStreamProcessor role created in the previous section.

✅ Step-by-step Instructions

  1. Go to the AWS Management Console, choose Services then select Lambda under Compute. Alternatively, you can use the search bar and type Lambda in the search dialog box.

  2. Click Create a function.

  3. Enter WildRydesStreamProcessor in the Function name field.

  4. Select Node.js 14.x from Runtime.

  5. Select WildRydesStreamProcessorRole from the Existing role dropdown.

  6. Click Create function.

  7. Scroll down to the Code source section.

  8. Copy and paste the JavaScript code below into the code editor.

    "use strict";
    
    const AWS = require("aws-sdk");
    const dynamoDB = new AWS.DynamoDB.DocumentClient();
    const tableName = process.env.TABLE_NAME;
    
    // Entrypoint for Lambda Function
    exports.handler = function (event, context, callback) {
      const requestItems = buildRequestItems(event.Records);
      const requests = buildRequests(requestItems);
    
      Promise.all(requests)
        .then(() => callback(null, `Delivered ${event.Records.length} records`))
        .catch(callback);
    };
    
    // Build DynamoDB request payload
    
    function buildRequestItems(records) {
      return records.map((record) => {
        const json = Buffer.from(record.kinesis.data, "base64").toString(
          "ascii"
        );
        const item = JSON.parse(json);
    
        return {
          PutRequest: {
            Item: item,
          },
        };
      });
    }
    
    function buildRequests(requestItems) {
      const requests = [];
      // Batch Write 25 request items from the beginning of the list at a time
      while (requestItems.length > 0) {
        const request = batchWrite(requestItems.splice(0, 25));
    
        requests.push(request);
      }
    
      return requests;
    }
    
    // Batch write items into DynamoDB table using DynamoDB API
    function batchWrite(requestItems, attempt = 0) {
      const params = {
        RequestItems: {
          [tableName]: requestItems,
        },
      };
    
      let delay = 0;
    
      if (attempt > 0) {
        delay = 50 * Math.pow(2, attempt);
      }
    
      return new Promise(function (resolve, reject) {
        setTimeout(function () {
          dynamoDB
            .batchWrite(params)
            .promise()
            .then(function (data) {
              if (data.UnprocessedItems.hasOwnProperty(tableName)) {
                return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
              }
            })
            .then(resolve)
            .catch(reject);
        }, delay);
      });
    }
    

  9. Now, You will be adding the DynamoDB table name as an environment variable. In the Configuration tab, select the Environment variables section.

  10. Click Edit and then Add environment variable

    • Enter TABLE_NAME in Key and UnicornSensorData in Value.

    • Add another environment variable called AWS_NODEJS_CONNECTION_REUSE_ENABLED in Key and 1 in Value. This setting helps to reuse TCP connection. You can learn more here

    • click Save.

  11. Now, You will add the event source mapping(ESM) for AWS Lambda to integrate with Kinesis. Scroll up and click on Add Trigger from the Function overview section.

  12. In the Trigger configuration section, select Kinesis.

  13. Select wildrydes from Kinesis stream.

  14. Leave Batch size set to 100 and Starting position set to Latest.

  15. Open the Additional settings section

  16. Under On-failure destination add the ARN of the wildrydes-queue SQS queue

  17. Make sure Enable trigger is checked.

  18. Click Add.

  19. Go back to Code Tab and Deploy the Lambda function - the screen will change to Changes Deployed

⭐ Recap

🔑 You can subscribe Lambda functions to automatically read batches of records off your Kinesis stream and process them if records are detected on the stream.

🔧 In this module, you’ve setup a DynamoDB table for storing unicorn data, a Dead Letter Queue(DLQ) for recieving failed messages and a Lambda function to read data from Kinesis Data Stream and store in DynamoDB.

Next

✅ Proceed to the next module, Stream Processing, wherein you’ll use the Lambda funtion you created just now to process the stream and store in DynamoDB. You will also verify the process.