Luis Silva

Luis Silva

DevOps, Cloud Expert

In a lot of real life scenarios, services need to store information in databases tables. The tables contain critical information, where changes need to be tracked for security, accountability and regulatory requirements. Typically, this sort of action is not time critical, so adding an audit trail with the same availability and access times as the rest of your application is not advised, especially if you don't want your costs to increase unexpectedly.

In this blog post I will detail an example of how to extract changes made to an Amazon DynamoDB table and store the audit trail of those changes in an Amazon S3 (Amazon Simple Storage Service) bucket, making use of AWS Kinesis Data Stream and AWS Kinesis Firehose.

Architecture

afe2af11-e264-48ab-825c-4b24ac957eb1.png

Change data streaming

On the initial step of our design, an AWS Kinesis Data Stream is attached to the Amazon DynamoDB table. What this means is, every time a new change happens in the table, that change is captured and kept in a data stream.

The format of the data sent includes: a timestamp of the captured change, the primary key (will be the partitionKey + sortKey), and a before and after image of the changed item. With the before and after, it's possible to understand what particular values have changed.

After this, a consumer, in this case, AWS Kinesis Firehose, must retrieve data from this stream. Its only purpose on the architecture is to retrieve data from the AWS Kinesis Data Stream and into an Amazon S3 bucket.

Audit trail storage

When data reaches the Amazon S3 bucket, we could leave it as is and be done with this example, but in order to reduce costs of such a solution, we're going to use Amazon S3 lifecycle rules.

Lifecycle rules allow us to move older data to cheaper storage classes progressively and in an automated fashion. The reasoning behind this is, most recent auditing data is the most likely to be accessed, and older data should still be accessible, but we can live with it taking longer to get. This should be tweaked according to your use case.

Building the infrastructure

Using AWS CDK to deploy infrastructure

This post assumes you have AWS CDK Toolkit. You should have an AWS account created.

Amazon offers an Infrastructure-As-Code solution, called AWS Cloud Development Kit, or CDK for short. It uses AWS Cloudformation internally to define infrastructure in a reviewable, deployable manner. CDK is offered in multiple languages, we'll use Typescript in this example.

AWS CDK setup

Assuming you already have CDK Toolkit installed, create a folder for the CDK code and initialise the project:

mkdir audit-trail-cdk
cd audit-trail-cdk && cdk init app --language typescript  

cdk init app builds the initial scaffolding for the project. It creates a Stack, which will contain the pieces of infrastructure required. To finish up the setup, go to the generated code for the stack under bin/audit-trail-cdk.ts and uncomment the env: line and replace with your AWS account info:

// env: { account: '123456789012', region: 'us-east-1' }, 

Let's add the necessary pieces for building our infrastructure stack on lib/audit-trail-cdk-stack.ts:

import * as cdk from "aws-cdk-lib";
import { Duration } from "aws-cdk-lib";
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as iam from "aws-cdk-lib/aws-iam";
import * as kinesis from "aws-cdk-lib/aws-kinesis";
import * as kinesisfirehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";

export class SimpleAwsAuditTrailStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const auditTrailStream = new kinesis.Stream(this, "Stream");

    new dynamodb.Table(this, "TableToAudit", {
      tableName: `TableToAudit`,
      partitionKey: {
        name: "partitionKey",
        type: dynamodb.AttributeType.STRING,
      },
      sortKey: {
        name: "sortKey",
        type: dynamodb.AttributeType.STRING,
      },
      kinesisStream: auditTrailStream,
    });

    const auditTrailBucket = new s3.Bucket(this, "AuditTrailDestination", {
      lifecycleRules: [
        {
          transitions: [
            {
              storageClass: s3.StorageClass.INFREQUENT_ACCESS,
              transitionAfter: Duration.days(30),
            },
            {
              storageClass: s3.StorageClass.INTELLIGENT_TIERING,
              transitionAfter: Duration.days(60),
            },
            {
              storageClass: s3.StorageClass.GLACIER,
              transitionAfter: Duration.days(90),
            },
            {
              storageClass: s3.StorageClass.DEEP_ARCHIVE,
              transitionAfter: Duration.days(180),
            },
          ],
        },
      ],
    });

    const deliveryRole = new iam.Role(this, "deliveryRole", {
      assumedBy: new iam.ServicePrincipal("firehose.amazonaws.com"),
      inlinePolicies: {
        s3: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              resources: [
                `arn:aws:s3:::${auditTrailBucket.bucketName}`,
                `arn:aws:s3:::${auditTrailBucket.bucketName}/*`,
              ],
              actions: [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
              ],
            }),
          ],
        }),
        kinesis: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              resources: [auditTrailStream.streamArn],
              actions: [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
              ],
            }),
          ],
        }),
        logs: new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              resources: ["*"],
              actions: ["logs:PutLogEvents"],
            }),
          ],
        }),
      },
    });

    new kinesisfirehose.CfnDeliveryStream(this, "DeliveryStream", {
      deliveryStreamType: "KinesisStreamAsSource",
      kinesisStreamSourceConfiguration: {
        roleArn: deliveryRole.roleArn,
        kinesisStreamArn: auditTrailStream.streamArn,
      },
      s3DestinationConfiguration: {
        bucketArn: auditTrailBucket.bucketArn,
        roleArn: deliveryRole.roleArn,
      },
    });
  }
}

Understanding the infrastructure code

The code tries to emulate the architecture diagram as much as possible, but let's go through it in more detail:

  1. Create an AWS Kinesis Data Stream.
  2. Create an Amazon DynamoDB table and add the AWS Kinesis Data Stream created in the previous step.
  3. Create the Amazon S3 bucket, and define on it the lifecycle rules.
  4. Create the AWS IAM role to be used by AWS Kinesis Firehose. The role needs to have permissions to retrieve data from the AWS Kinesis Data Stream, and to store data in the AWS S3 bucket.
  5. Create the AWS Kinesis Firehose resource, with reference to the data stream and the S3 bucket.

Testing the solution

Let's create a record in the Amazon DynamoDB and make a change, to see what the data in the AWS S3 bucket looks like.

aws dynamodb put-item  \
--table-name TableToAudit \
--item '{"partitionKey": {"S": "1"}, "sortKey": {"S": "State"}, "Data": {"S": "Initial state"}}'

aws dynamodb put-item  \
--table-name TableToAudit \
--item '{"partitionKey": {"S": "1"}, "sortKey": {"S": "State"}, "Data": {"S": "Changed state"}}'

At this point, the Amazon DynamoDB table should look like this:

0481cd29-9829-4173-9dcf-744e568839ba.png

Let's see what the data looks like on the Amazon S3 bucket, inside a file.

fa79e3ec-9c1b-4008-b1a4-d7ce17bba922.png

{
  "awsRegion": "us-east-1",
  "eventID": "65eed7c0-4c74-4e71-b443-3b68b28f9455",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "TableToAudit",
  "dynamodb": {
    "ApproximateCreationDateTime": 1662992934392,
    "Keys": { "sortKey": { "S": "State" }, "partitionKey": { "S": "1" } },
    "NewImage": {
      "sortKey": { "S": "State" },
      "Data": { "S": "Initial state" },
      "partitionKey": { "S": "1" }
    },
    "SizeBytes": 67
  },
  "eventSource": "aws:dynamodb"
}
{
  "awsRegion": "us-east-1",
  "eventID": "55c911a3-d0a6-4d23-99ab-c1fd3e4d01b5",
  "eventName": "MODIFY",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "TableToAudit",
  "dynamodb": {
    "ApproximateCreationDateTime": 1662992947714,
    "Keys": { "sortKey": { "S": "State" }, "partitionKey": { "S": "1" } },
    "NewImage": {
      "sortKey": { "S": "State" },
      "Data": { "S": "Changed state" },
      "partitionKey": { "S": "1" }
    },
    "OldImage": {
      "sortKey": { "S": "State" },
      "Data": { "S": "Initial state" },
      "partitionKey": { "S": "1" }
    },
    "SizeBytes": 109
  },
  "eventSource": "aws:dynamodb"
}

As you can see, there's data about each change in a JSON Lines format, first the "INSERT", then the "MODIFY", where the modification has the "OldImage" and "NewImage". Data on changes is partitioned by default inside the bucket, with the smallest partition being on the hour of the day.

How to query the data

In the past, one of the negative sides of storing this kind of data in Amazon S3 would be losing some flexibility: typically, you would have to get (download) the files that were relevant, and then do some data analytics on it. That is no longer the case. There's a nice feature in Amazon S3 where you can query the data in files directly, using a SQL-like language. For example, to get all audit records with partitionKey equal to "1":

aws s3api select-object-content \
--bucket BUCKET_NAME \
--key BUCKET_KEY \
--expression "select * from s3object as s where s.dynamodb.Keys.partitionKey.S = '1'" \
--expression-type 'SQL' \
--input-serialization '{"JSON": {"Type": "LINES"}}' \
--output-serialization '{"JSON": {}}' "output.json"

Conclusion

In this blog post, a simple architecture to store audit trail from an Amazon DynamoDB table was presented. The data pipeline to export this data through AWS Kinesis was detailed, and information on how to query the final data was provided. Future improvements on this pattern include data preprocessing and scaling.

All code referenced in this blog post is available in this repository.

Frequently asked questions

The default format output to S3 has all the data required, but if you want to access that data and show it costumers, it's likely you will need to clean the data before storing it.

AWS Kinesis firehose allows you to add an AWS Lambda function to preprocess the data you receive. In this case, you can technically check the before and after images, make a diff, and store the information in a cleaner, more readable manner.

For this example, both would work, with DynamoDB Stream requiring more development effort to send data to Amazon S3. The differences start to appear with scale, as AWS Kinesis Data Stream overall scales better for tables with high throughput of changes. For more information about the differences, consult the AWS documentation.

Vetted experts, custom approach, dedication to meet deadlines

As your reliable partner, our team will use the right technology for your case, and turn your concept into a sustainable product.

Contact us
upwork iconclutch icon

Further reading