Indexing DynamoDB Items to ElasticSearch using AWS Lambda

Meric Taze
Meric Taze
4 Jul 2018

cover

DynamoDB is Amazon’s NoSQL database which offers single-digit milliseconds latency. It is great for variety of use cases, but when you need to run complex search queries on your dataset, you quickly realise it is not designed for it.

You can try using composite primary keys, local and global secondary indexes to fulfil your needs. But as the queries get more complex, you might realise those are not enough. But don’t worry, you are not alone. Lots of people who have faced with this problem chose a similar solution; using ElasticSearch for their complex search operations while keeping the DynamoDB as the authority for the data.

Here we are going to learn how ElasticSearch can be plugged into your DynamoDB with a click of a button using CloudFormation.

The same steps can be done using AWS Web Console, but I think making use of CloudFormation is better as often times you need to create the same resources for multiple stages and regions.

Creating DynamoDB Table

Let’s start with creating a DynamoDB table using CloudFormation. Here we create a table called ‘OrderTable’ whose key is orderId. It also sets read/write capacity to 5. One of the most important things here is that we also enable DynamoDB streams. Whenever an entry is created or updated, it will be streamed automatically. It will only return the new image, but you have other options if you also need the old image or just the keys.

OrderTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: 'orderId'
AttributeType: 'S'
KeySchema:
- AttributeName: 'orderId'
KeyType: 'HASH'
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
TableName: 'OrderTable'
StreamSpecification:
StreamViewType: 'NEW_IMAGE'

If you already have an existing table and don’t want to create a new one, you can get its SourceArn from AWS Web Console, and use it in the following sections.

Adding a Lambda function

Here we print the records coming from DynamoDB stream. We also add a Role to access various AWS resources from our lambda function which we’ll cover later. We use ZipFile which allows adding inline code to our CloudFormation template.

EsIndexerFunction:
Type: 'AWS::Lambda::Function'
Properties:
Handler: 'index.handler'
Runtime: nodejs6.10
Role: !GetAtt LambdaRole.Arn
Code:
ZipFile: |
exports.handler = (event, context, callback) => {
event.Records.forEach(record => {
console.log(record);
});
}

Mapping between the stream and lambda function

This defines the mapping between DynamoDB Stream and our Lambda function. Whenever there is something new in DynamoDB Streams, it will trigger our lambda function with those records.

TableStreamLambdaMapping:
Type: 'AWS::Lambda::EventSourceMapping'
Properties:
BatchSize: 2
EventSourceArn: !GetAtt OrderTable.StreamArn
FunctionName: !GetAtt EsIndexerFunction.Arn
StartingPosition: 'LATEST'

Lambda IAM role

Here we give our lambda function access for writing logs so that we can check them in CloudWatch and also access for reading DynamoDB streams.

LambdaRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
Service: 'lambda.amazonaws.com'
Action: 'sts:AssumeRole'
Path: '/'
Policies:
- PolicyName: 'LambdaRolePolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: 'arn:aws:logs:*:*:*'
- Effect: 'Allow'
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
Resource: !GetAtt OrderTable.StreamArn

Adding ElasticSearch

This creates a t2.micro ElasticSearch instance which is included in Free Tier. It also allows LambdaRole to access to the instance.

ElasticsearchDomain:
Type: 'AWS::Elasticsearch::Domain'
Properties:
DomainName: 'es-order'
ElasticsearchClusterConfig:
InstanceType: 't2.micro.elasticsearch'
InstanceCount: 1
EBSOptions:
EBSEnabled: true
Iops: 0
VolumeSize: 10
VolumeType: 'standard'
AccessPolicies:
Version: '2012-10-17'
Statement:
- Effect: 'Allow'
Principal:
AWS: !GetAtt LambdaRole.Arn
Action: 'es:*'
Resource: '*'
AdvancedOptions:
rest.action.multi.allow_explicit_index: 'true'

Updating Lambda to index documents

Now let’s update our previous lambda function to index DynamoDB stream records to ElasticSearch. Note that we need to sign our requests, otherwise you’ll get an authorization error.

Consider using http-aws-es library which makes things quite easy if you are using S3Bucket and S3Key instead of ZipFile. It allows you to use elasticsearch-js client and also handles the request signing part.

EsIndexerFunction:
Type: 'AWS::Lambda::Function'
Properties:
Handler: 'index.handler'
Runtime: nodejs6.10
Role: !GetAtt LambdaRole.Arn
Environment:
Variables:
ES_ENDPOINT: !GetAtt ElasticsearchDomain.DomainEndpoint
ES_REGION: !Ref AWS::Region
Code:
ZipFile: |
var AWS = require('aws-sdk');
var path = require('path');
var creds = new AWS.EnvironmentCredentials('AWS');
var esDomain = {
endpoint: process.env.ES_ENDPOINT,
region: process.env.ES_REGION,
index: 'test',
doctype: 'order'
};
var endpoint = new AWS.Endpoint(esDomain.endpoint);
exports.handler = (event, context, callback) => {
event.Records.forEach(record => {
postDocumentToES(record.dynamodb.NewImage, context);
});
}
function postDocumentToES(doc, context) {
var req = new AWS.HttpRequest(endpoint);
req.method = 'POST';
req.path = path.join('/', esDomain.index, esDomain.doctype);
req.region = esDomain.region;
req.body = JSON.stringify(doc);
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
// Sign the request (Sigv4)
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
// Post document to ES
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function(httpResp) {
var body = '';
httpResp.on('data', chunk => body += chunk);
httpResp.on('end', chunk => context.succeed());
}, function(err) {
console.log('Error: ' + err);
context.fail();
});
}

So at this point we have everything we need. All the new DynamoDB data is being indexed to ElasticSearch behind the scenes.

You can also see the full CloudFormation template here

Feel free to comment below if you have any questions or feedback.