Event processing in a serverless environment
Feb 19, 2016
AWS Serverless Lambda

Some time ago we implemented a method that identifies requests to our sites that seem to have been generated by anything else other than normal users - like scraping bots. This situation is unpleasant for us because these requests generate unwanted load to our infrastructure and also to our partners, without resulting in any income at all.

We call this method of detection Banhammer. Without going into any details, Banhammer examines requests and takes appropriate actions depending on the results. As part of what it does, Banhammer writes events about its findings to our Unified Log (which is an implementation of the unified log vision inspired by this article - perhaps another blog post will tell this story in more detail). These events, along with others, are being processed in a batch manner to produce useful information for us.

At some point we started entertaining the thought of sharing Banhammer data with other partners, who could also contribute their own information. This would have the effect of creating a much more powerful data set by accumulating more data points, thus multiplying its effectiveness by value of the network effect. For extra fun points, we decided to entertain this as a practical experiment by using AWS Lambda.

Today, Banhammer writes events to AWS Kinesis. Events written there are being massaged by our unified log infrastructure and are made available to consumers that feed business intelligence and report tools, create custom views by joining disparate pieces of information or simply run ad-hoc queries. All well and good - however, while our unified log is fast, it does not provide access to information in real time which is particularly the case the farther away one moves from Kinesis. What we wanted to do was capture and process this information as quickly as possible, without instituting an ETL-like cycle.

Figure 1 Figure 1: Our unified log.

The first step was to grab the events by creating a Lambda function that used the Banhammer Kinesis stream as an event source. That’s easy enough as Lambda can use S3, IoT, Dynamo, SNS and Cloudwatch as a trigger. In essence, Lambda implements KCL functionality for us so we don’t have to do anything except receive and process the event. The Lambda function that does that follows.

const TableName = "bots";

var AWS = require('aws-sdk');
var table = new AWS.DynamoDB({params: {TableName: 'bots'}});

/* Insert or update a bot entry in DynamoDB */
var Updater = function (clientIp, createdAt, registeredBy, updatedBy, classification, dynamoItem, context) {
    if (dynamoItem != null)
    {
        // Update an existing item.
        var encounteredTimes = parseInt(dynamoItem.encountered_times.N);
        encounteredTimes += 1;
        var updateItem = {
            "ip_address": {"S": clientIp},
            "created_at": {"S": dynamoItem.created_at.S},
            "updated_at": {"S": createdAt},
            "registered_by": {"S": dynamoItem.registered_by.S},
            "updated_by": {"S": updatedBy},
            "encountered_times": {"N": "" + encounteredTimes + ""},
            "classification": {"S": classification}
        };
        var updateParams = {
            "TableName": TableName,
            "Item": updateItem
        };

        table.putItem(updateParams, function(err, data) {
            if (err) {
                console.log(err);
                context.fail();
            }
            else
            {
                context.done();
            }
        });
    }
    else {
        // Add a new item.
        var addItem = {
            "ip_address": {"S": clientIp},
            "created_at": {"S": createdAt},
            "updated_at": {"S": createdAt},
            "registered_by": {"S": registeredBy},
            "updated_by": {"S": updatedBy},
            "encountered_times": {"N": "1"},
            "classification": {"S": classification}
        };
        var addParams = {
            "TableName": TableName,
            "Item": addItem
        };

        table.putItem(addParams, function(err, data) {
            if (err) {
                console.log(err);
                context.fail();
            }
            else
            {
                context.done();
            }
        });
    }
};

exports.handler = function(event, context) {

    //console.log(JSON.stringify(event, null, '  '));

    event.Records.forEach(function(record) {
        var payload = new Buffer(record.kinesis.data, 'base64').toString('utf8');

        var parsed = JSON.parse(payload);

        var foundSome = false;

        var parsedInner = JSON.parse(parsed["Data"]);

        var classification = parsedInner["banhammer_classification"];

        if (classification == "PriceScraper" || classification == "SearchEngine")
        {
            foundSome = true;
            var clientIp = parsedInner["client_ip"];
            var createdAt = parsedInner["created_at"];
            var registeredBy = "e-Travel";
            var updatedBy = "e-Travel";

            var params = {
                "ConsistentRead": true,
                "TableName": TableName,
                "Key": {
                    "ip_address": {
                        "S": clientIp
                    }
                }
            };

            table.getItem(params, function(err, data) {
                if (err) {
                    console.log(err);
                    context.fail();
                }
                else {
                    Updater(clientIp, createdAt, registeredBy, updatedBy, classification, data.Item, context);
                }
            });
        }

        if (!foundSome)
        {
            context.done();
        }
    });
};

The function selects only events that have been identified as originating from scraping bots and search engines and disregards the rest. Once such an event is detected, the function looks in a Dynamo table and tries to find the same IP address. If this is not found, it adds a new entry to the table and sets the encountered_times counter to 1. Otherwise, it increments this counter and updates the updated_at timestamp for this table entry, then updates the table. This takes care of processing the Banhammer events and digesting them into useful information per IP address. After a while, the result in Dynamo looks like this.

Figure 2 Figure 2: Aggregated bot info.

This looks useful, so we wanted to serve it. First, we wrote another small Lambda that accepts an ip_address parameter, searches in the table for that and if there’s an entry it returns it.

const TableName = "bots";

var AWS = require('aws-sdk');
var table = new AWS.DynamoDB({params: {TableName: 'bots'}});

exports.handler = function(event, context) {

    var clientIp = (event.ip_address === undefined ? 'none' : event.ip_address);

    var params = {
        "ConsistentRead": true,
        "TableName": TableName,
        "Key": {
            "ip_address": {
                "S": clientIp
            }
        }
    };

    table.getItem(params, function(err, data) {
        if (err) {
            console.log(err);
            context.fail();
        }
        else {
            if (data.Item !== null) {
                context.done(null, {
                    "ip_address":clientIp,
                    "created_at":data.Item.created_at.S,
                    "updated_at":data.Item.updated_at.S,
                    "registered_by":data.Item.registered_by.S,
                    "updated_by":data.Item.updated_by.S,
                    "encountered_times":data.Item.encountered_times.N,
                    "classification": data.Item.classification.S
                });
            }
            else {
                context.done(null, {});
            }
        }
    });
};

The code is simple enough and now comes the really fun part. We used API Gateway to expose the Lambda function using a simple HTTPS GET method. We created a new API and all we had to do was add a query string parameter and map the query string to our ip_address parameter expected by the Lambda. Mapping is done by adding a new template for application/json that looks like this:

#set($inputRoot = $input.path('$'))
{
  "ip_address" : "$input.params('ip_address')"
}

We deployed the API and the information in Dynamo was instantly available in our web browser. Creating the API was an exercise that literally took a few minutes from start to deployment.

So what happened here? Well, we massaged an existing piece of data already flowing through our systems in order to produce something small and useful and we served it to the world. All done without using any servers at all and the total implementation time from inception to delivery was measured in hours.

Figure 3

What a mindshift! After our little game experiment was over we started seeing several opportunities to use the Lambda and API Gateway combo throughout our infrastructure instead of spawning microservices in our servers. Data mashups, small interfaces provided to partners, CRUD scaffolds, small glueing/integration pieces of code…why not write a Lambda for it? Of course, it’s pretty dangerous to view everything as a nail when you have a shiny new hammer. Plus the development and operational aspects of this paradigm which is new to us will take some getting used to. But the possibilities seem literally endless.

Some aspects worth mentioning:

  • It’s pretty straightforward to use integration and method responses of API Gateway in order to return a 404 response if the ip_address isn’t found.
  • While the created API is pretty basic, API Gateway has more surprises in store to take care of other concerns as well. It allows you to generate and use API keys, apply throttling settings to the API, add a custom domain, use a client certificate, use separate environments (staging/production), use variables per environment that can be used to configure the behavior of the API, use caching with specified caching capacity and more. Oh, and the whole thing can be exported to Swagger.
  • A POST method can be implemented in the same manner as the GET method was, by creating another Lambda that takes care of insert/update and expose that through the API. In fact, it might be interesting to refactor the original Lambda that processes events and have it call the Lambda that inserts/updates data either directly or through the API itself.
  • We had to wait until our Lambda was called a few tens of thousands of times until we saw a cost of $0.06 attached to our monthly AWS bill for it.

And one concluding thought. It is literally scary how easy, quick and tempting it is to just dive into Lambda and code away what is required in order to achieve immediate results and satisfaction. We must pause to remind ourselves of the rest of the continuous integration and delivery pipeline: issue tracking, unit & integration tests, change tracking & management, deployment and rollback scripts and all these practices that help turn development into a discipline and not disorganized chaos. Some interesting links on how to approach these (and other) topics relevant to Lambda development can be found here.

Share on