Error Handling with Checkpoint and Bisect On Batch

While Bisect On Batch is helpful in narrowing down to the problematic messages, it can result in processing previously successful messages more than once. With Checkpoint feature you can return the sequence identifier for the failed messages. This provides more precise control over how to choose to continue processing the stream. For example in a batch of 9 messages where the fifth message fails - Lambda process the batch of messages, items 1-9. The fifth message fails and the function returns the failed sequence identifier. The batch is only retried from message 5 to 9

  1. Go to the AWS Management Console, choose Services then select Lambda under Compute. Alternatively, you can use the search bar and type Lambda in the search dialog box.

  2. Figure out the changes to the WildRydesStreamProcessor function. The changes involve storing the kinesis sequence number (kinesis.sequenceNumber) of the error record and returning the sequence number in the catch block

    'use strict';
        
    const AWS = require('aws-sdk');
    const dynamoDB = new AWS.DynamoDB.DocumentClient();
    const tableName = process.env.TABLE_NAME;
    const errorString = 'error';
        
    exports.handler = function(event, context, callback) {
      console.log('Number of Records sent for each invocation of Lambda function = ', event.Records.length)
      const requestItems = buildRequestItems(event.Records);
      const requests = buildRequests(requestItems);
        
      Promise.all(requests)
        .then(() => callback(null, `Delivered ${event.Records.length} records`))
        .catch(callback);
    };
        
    function buildRequestItems(records) {
      let sequenceNumber = 0;
      try {
        return records.map((record) => {
          sequenceNumber = record.kinesis.sequenceNumber;
          console.log('Processing record with Sequence Number = ', sequenceNumber);
          const json = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
          const item = JSON.parse(json);
              
          if(item.InputData.toLowerCase().includes(errorString)) {
            console.log ('Error record is = ', item);
            throw new Error('kaboom')
          }
            
          return {
            PutRequest: {
              Item: item,
            },
          };
        });
      }catch (err) {
          console.log('Returning Failure Sequence Number =', sequenceNumber)
          return { "batchItemFailures": [ {"itemIdentifier": sequenceNumber} ] }
        } 
    }
        
    function buildRequests(requestItems) {
      const requests = [];
        
      while (requestItems.length > 0) {
        const request = batchWrite(requestItems.splice(0, 25));
        
        requests.push(request);
      }
        
      return requests;
    }
        
    function batchWrite(requestItems, attempt = 0) {
      const params = {
        RequestItems: {
          [tableName]: requestItems,
        },
      };
        
      let delay = 0;
        
      if (attempt > 0) {
        delay = 50 * Math.pow(2, attempt);
      }
        
      return new Promise(function(resolve, reject) {
        setTimeout(function() {
          dynamoDB.batchWrite(params).promise()
            .then(function(data) {
              if (data.UnprocessedItems.hasOwnProperty(tableName)) {
                return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
              }
            })
            .then(resolve)
            .catch(reject);
        }, delay);
      });
    }
    
    
  3. Do not forget to click Deploy to deploy the changes to the WildRydesStreamProcessor lambda function.

  4. Do not forget to Remove the existing Kinesis Data Stream mapping by clicking the Configuration Tab above the code editor. (This step is needed only if there is an existing Kinesis Data Stream mapping or any other Event Source Mapping present for the lambda function). In the Configuration Tab select the Kinesis:wildrydes (Enabled) and Delete the trigger.

  5. Add a new Kinesis Data Stream mapping by clicking the Configuration Tab. The mapping configuration is same except that you can check mark an additional field Report batch item failures.

  6. Test your changes by inserting data into Kinesis Data Stream.

  7. Monitor CloudWatch logs and Query DynamoDB by repeating steps in the section [Monitor and Query][monitor-and-query]