Skip to content

Ingestion of bid requests through Amazon Kinesis Firehose and Kinesis Data Analytics. Data lake storage with Amazon S3. Restitution with Amazon QuickSight and CloudWatch. πŸ“‰

Notifications You must be signed in to change notification settings

hervenivon/aws-experiments-data-ingestion-and-analytics

Repository files navigation

AWS Serverless Data Lake for Bid Requests

This experiment simulates data ingestion of bid requests to a serverless data lake and data analytics pipeline deployed on AWS. As a result, you get a real-time dashboard and a BI tool to analyze your stream of bid requests. Overview of the real-time dashboard.

Overview of the real-time CloudWatch dashboard

Services in use:

Data used for this experiment are coming from the Kaggle Display Advertising Challenge Dataset published in 2014 by Criteo. If you are curious or if you want to push the Criteo Dataset further, you can refer to their 2015 announcement and the related download.

Every time it is possible, this experiment leverages AWS CDK to deploy the required infrastructure.

Table of content

Architecture overview

  1. Producer: AWS Fargate simulates bid request pushes to Amazon Kinesis Data Firehose from the TSV "mock" file
  2. Ingestion: Amazon Kinesis Data Firehose ingests the data into Amazon S3
  3. Enhancement: Amazon Kinesis Data Analytics
    • enhances the data with catalog stored in Amazon s3
    • computes counters from the ingestion stream of records
    • triggers an AWS Lambda function to store real-time counts in Amazon CloudWatch
  4. Visualization:
    • Amazon CloudWatch allows viewing of custom near real-time metrics
    • Amazon Quick Sights allows reporting on raw data stored in Amazon S3

Architecture

Prerequisites

For this experiment, you will need the following:

  • The AWS CLI
  • An AWS account. If you don't have an AWS account, you can create a free account here.
  • Node.js (>= 8.10). To install Node.js visit the node.js website. You can also install the node version manager: nvm
  • The AWS CDK toolkit: $> npm install -g aws-cdk

If this is the first time you deploy a CDK application in an AWS environment, you need to bootstrap it: cdk bootstrap. Please take a look at the bootstrap section of the CDK workshop.

Deployment of the experiment

To deploy this experiment in your account, you have four actions to take:

  1. Download necessary data
  2. Build the CDK application
  3. Deploy the stack and upload the data
  4. Deploy Amazon QuickSight

Downloading necessary data

Download the data and extract the zip file to the data directory.

Important: we don't want to upload the whole dataset. Therefore we are taking a small amount of it with the following command in the data directory:

$> head -5000000 train.txt > bidrequests.txt

Data fields explanation:

  • C0 (Integer) - Indicates if an ad was clicked (1) or not (0).
  • C1-C13 - 13 columns of integer features mostly representing count features.
  • C14-C39 - 26 columns of categorical features. The values of these features have been hashed onto 32 bits.

Building the CDK application

At the root of the repository:

$> npm install

This will install all the AWS CDK project dependencies.

$> npm run build

This command will build the CDK application: compile Typescript code into Javascript.

Deploying the stack and upload the data

To deploy the CDK application:

$> cdk deploy

This command will generate a cloud formation stack that will be pushed to your configured account. This will create around 60 resources (Roles, Streams, Lambda functions, Container Registry, etc.) and will also upload prepared data to the AWS Cloud.

Actual limitations:

  • If you change parameters or code of the CloudWatch's dashboard, you must delete it in the console before deploying the update with cdk.
  • The Cloudwatch's dashboard is configured for certain input and output ids of the Kinesis data analytics application. If the deployed dashboard doesn't work, please check the current inputId and outpuId of the Kinesis data analytics application (with: aws kinesisanalytics describe-application --application-name EnhancementSQLApplication), update the ./cdk/lib/visualization-construct.ts accordingly and deploy the CDK application.

Deploying Amazon QuickSight

In order to deploy the Amazon QuickSight dashboard, you must do the following:

  1. Preparing a manifest.json file
  2. Signing-up
  3. Creating a dataset

A prerequisite to the deployment of Amazon QuickSight using a S3 bucket is that the data actually exists in the bucket. So, please follow this part once you have launched the data producer.

Preparing the Manifest file

On your local computer, edit the manifest.json file in the visualization folder. Use the output DataExperimentStack.VisualizationLayerQuickSightManifestFile of the deployed stack or replace XXXXXXXXXXXXX with the URi of you bucket in the provided manifest.json file.

For more information on the Manifest file, please have a look to Supported Formats for Amazon S3 Manifest Files.

Signing-up

If you have already signed up for Amazon QuickSight or you haven't selected this experiment raw data bucket during the sign-up, please allow QuickSight to read the bucket of this experiment. You can find the bucket name in the output of the cdk deploy command line or from the Cloud Formation console.

Creating a dataset

In this section, you are going to create a new QuickSight dataset using Amazon S3 files.

From the QuickSight home page:

  1. Click on "Manage Data"
  2. Click on "New Data Set"
  3. Select "S3"
  4. Enter a "Data Source Name" and select your local manifest.json file.

New S3 data source

  1. Click "Connect"

You should see the following screen:

Finish Data Set Creation

The QuickSight "deployment" is finished. For an exploration of Quicksight, see Exploring the demo.

Once the import is finished, you will get the following screen:

Import Complete

Note: Amazon QuickSight has certain Data Source Limits. In particular, the total size of the files specified in the manifest file can't exceed 25 GB, or you can't exceed 1,000 files. Therefore, as we are pointing to row data, we should only indicate a particular day in the manifest.json file. For instance:

{
  "fileLocations": [
    {
      "URIPrefixes": [
        "https://s3.us-east-1.amazonaws.com/dataexperimentstack-bidrequestexperimentstoragecc-XXXXXXXXXXXXX/raw-data/2019/09/01/"
      ]
    }
  ],
  "globalUploadSettings": {
    "format": "TSV",
    "delimiter": "\t",
    "containsHeader": "false"
  }
}

Exploring the demo

Before starting the exploration of the demo, let's launch the producer and application. This will populate the demonstration with data, and we will have something to look at.

Launching the experiment

Launching the producer

As part of the demo, we have deployed a lambda function to simplify the launch of the producer running on AWS Fargate.

To launch the producer, execute the command line that you get as an output of the deployed demo in the output named DataExperimentStack.ProducerLayerlaunchProducerOnMacOS or DataExperimentStack.ProducerLayerlaunchProducerOnLinux. On MacOS, it will look like this.

$> aws lambda invoke --function-name arn:aws:lambda:us-east-1:XXXXXXXXXXXX:function:DataExperimentStack-ProducerLayerProducerLauncherD-XXXXXXXXXXXXX --payload '{}' /tmp/out --log-type Tail --query 'LogResult' --output text | base64 -D

As an output, you get the ARN of the running task.

Note: If you encounter the following error "Unable to assume the service linked role. Please verify that the ECS service linked role exists." while launching the producer, please follow instructions here and create the linked service role:

$> aws iam create-service-linked-role --aws-service-name ecs.amazonaws.com

Launching the Kinesis Data Analytics Application

In order to get enhanced results and real-time metrics, you need to also launch the Kinesis Data Analytics Application. To do so, execute the following command:

$> aws kinesisanalytics start-application --application-name EnhancementSQLApplication --input-configurations Id=1.1,InputStartingPositionConfiguration={InputStartingPosition=LAST_STOPPED_POINT}

Depending on the number of deployments and changes you made to the following CDK application, the input Id of the Kinesis Data Analytics application may change. You can get the right InputId with the following command: aws kinesisanalytics describe-application --application-name EnhancementSQLApplication.

Note: you can also stop the application with the AWS CLI with the following command line:

$> aws kinesisanalytics stop-application --application-name EnhancementSQLApplication

Producer

Architecture overview of the producer layer

Architecture overview of the producer layer

The data are pushed to Amazon Kinesis from a producer layer based on a python program running into a container on AWS Fargate.

The key components of the producer are:

  • A virtual private cloud to host your producer (not detailed here),
  • A lambda function that ease the launch of the producer,
  • A container registry to host the Docker image of the producer (not detailed here)
  • A task definition which defines how to run the producer,

You can get a list of all resources deployed related to the producer in the Cloud Formation console for the stack DataExperimentStack and searching for "Producer" in the resources search field.

Lambda function

AWS Lambda is a compute service that lets you run code without provisioning or managing servers.

In the present case, we use AWS Lambda to launch the Fargate Task from all the necessary information provided at deployment time by the CDK application:

To access the deployed lambda function:

  1. Go to your AWS Account,
  2. Search for Lambda,
  3. Look for a function name starting with "DataExperimentStack-ProducerLayerProducerLauncher"
  4. Click on the function name

You will see the following screen:

Lambda function

If you scroll down the page, you will notably see the "Environment variables" that are provided to the lambda function at deployment time and necessary to launch the Fargate Task.

Lambda function environment variables

AWS Fargate

AWS Fargate is a compute engine for Amazon ECS that allows you to run containers without having to manage servers or clusters.

In the present case, we leverage AWS Fargate to host the program that will continuously - until we stop it or it has pushed all records - push records to the Kinesis Firehose ingestion stream.

  1. Search for the ECS service,
  2. Click on "Clusters" on the left panel
  3. Search for "Producer"

Producer ECS cluster

  1. Click on the cluster name, then on the "Task" tabulation. You get the following screen.

Producer ECS cluster details

  1. Click on the "Running" task

Running Task

From here, you can check the status of the task and access logs.

Troubleshooting: if you want to check that your producer is effectively and successfully sending events to your ingestion layer, you can look at the logs of your Fargate task. If everything is going well, you will read messages like "SUCCESS: your request ID is : ebc2c2b9-c94a-b850-be24-30ee9c33a5e7".

Ingestion

Architecture overview of the ingestion layer

Architecture overview of the ingestion layer

The data generated by the producer are pushed to an ingestion layer made of:

Kinesis Data Firehose

Amazon Kinesis Data Firehose is a fully managed service to load streaming data into data stores and analytics tools. It can capture, transform, compress, and load streaming data into various destination such as Amazon S3 or Amazon Elasticsearch Service. It automatically scales to the load. Further details here.

  1. Open the Kinesis Dashboard.

Kinesis Dashboard

  1. From here, open the "BidRequestExperimentIngestionLayer" kinesis delivery stream
  2. Inspect the details of the stream, in particular, take a look at the Amazon S3 destination configuration

Kinesis delivery stream S3 destination configuration

The buffer conditions of "128 MB or 60 seconds" mean that the data will be written to AWS S3 every 60 seconds or when the buffer reach 128 MB. In our case, data are written to S3 every 60 seconds. See in the next paragraph.

  1. Click on the "Monitoring" tabulation

Kinesis delivery stream monitoring

This tabulation provides a monitoring view of the stream like the number of "Incoming records" per minutes or the "Incoming Bytes". You can access all Firehose and delivery stream metrics in AWS Cloudwatch.

S3

Amazon Simple Storage Service (S3) is a storage solution designed for the Internet scale. In this experiment, S3 is a fully managed serverless data lake. It automatically scales, and you don't need to provision any storage space.

  1. From the Kinesis Firehose delivery stream details page, click on the S3 bucket link from the "Amazon S3 destination" section.
  2. Open the "raw-data" folder then navigate the structure up to a list of files.

Firehose has been configured to prefix all data with "raw-data" in S3. All data are pushed according to this prefix and then with the date prefix: "YYYY/MM/DD/HH" UTC

S3 data structure

Enhancement

Architecture overview of the enhancement layer

#### Architecture overview of the enhancement layer

The data ingested are processed through an SQL application that enhances the data from a referential stored on S3 and compute analytics on top of the initial ingestion stream. The results of this application are pushed as custom metrics in AWS CloudWatch. This enhancement layer is made of :

S3 for referential data

S3 is used to store the referential file that is then connected to the Kinesis application.

  1. Open the Kinesis Dashboard.
  2. Open the "EnhancementSQLApplication" in the Kinesis Analytics Application card.

EnhancementSQLApplication configuration

  1. You can see the Amazon S3 Object as a reference data and its associated "In-application reference table name" that can be used in the SQL application (see below)

See Example: Adding Reference Data to a Kinesis Data Analytics Application for further details on the topic.

Kinesis Data analytics SQL application

A Kinesis Data Analytics application continuously reads and processes streaming data in real-time. You write application code using SQL or Java to process the incoming streaming data and produce output(s). In our case, we use an SQL application.

Kinesis Data Analytics then writes the output to a configured destination. The following diagram illustrates a typical application architecture.

Kinesis Analytics Application

This experiment leverages as a:

  • Source:
    • The in-application input stream from the ingestion layer
    • A reference table (see above)
  • Real-time analytics:
    • SQL code
    • 2 in-application output streams
  • Destination:
  1. Open the Kinesis Dashboard.
  2. Open the "EnhancementSQLApplication" in the Kinesis Analytics Application card.
  3. You can see the Firehose delivery stream as a "streaming data" and its associated "In-application reference table name" that can be used in the SQL application
  4. Click "Go to SQL results"

SQL editor

From here, you can navigate the application, edit the SQL, see incoming data, and real-time computed analytics.

SQL code:

CREATE OR REPLACE STREAM "enhanced_stream" (INGESTION_TIME BIGINT, AD VARCHAR(12));

CREATE OR REPLACE PUMP "enhanced_stream_pump" AS INSERT INTO "enhanced_stream"
      SELECT STREAM UNIX_TIMESTAMP(APPROXIMATE_ARRIVAL_TIME), "r"."REFERENCE" as "AD"
      FROM "input_stream_001" LEFT JOIN "referential" as "r"
      ON "input_stream_001"."AD" = "r"."CODE";

CREATE OR REPLACE STREAM "count_stream" (AD VARCHAR(12), INGESTION_TIME BIGINT, NBR INTEGER);

CREATE OR REPLACE PUMP "count_stream_pump" AS INSERT INTO "count_stream"
    SELECT STREAM AD, MIN(INGESTION_TIME), COUNT(AD)
        FROM "enhanced_stream"
        GROUP BY AD,
            STEP("enhanced_stream".ROWTIME BY INTERVAL '30' SECOND);

The SQL language supported by Kinesis data analytics applications is based on the SQL:2008 standard with some extensions to enable operations on streaming data such as the CREATE OR REPLACE STREAM statement that creates a stream accessible to other statements in the SQL application and adds a continuous delivery stream output to the application.

This stream can then be connected to a destination: a Kinesis stream, a Kinesis Firehose delivery stream or an AWS Lambda function.

A pump is a continuous insert query running that inserts data from one in-application stream to another in-application stream.

This SQL application is commonly named a multi-step application:

  1. we create a stream and extend the data with a referential
  2. we use that stream to perform an aggregation with a tumbling window - non-overlapping manner, here every 30 seconds.

AWS Lambda as a destination for a Kinesis data analytics

  1. Open the Kinesis Dashboard.
  2. Open the "EnhancementSQLApplication" in the Kinesis Analytics Application card.
  3. You can see the Lambda function as a destination of the Kinesis Analytics application.
  4. Click the lambda function name

Lambda destination for a Kinesis Analytics application

  1. Inspect the code that pushes custom metrics to Cloud Watch
  2. Open the monitoring tabulation

Lambda Monitoring of a Kinesis Analytics application

You notice that the lambda function is called 10 times every 5 minutes. From Using a Lambda Function as Output:

If records are emitted to the destination in-application stream within the data analytics application as a tumbling window, the AWS Lambda destination function is invoked per tumbling window trigger. For example, if a tumbling window of 60 seconds is used to emit the records to the destination in-application stream, the Lambda function is invoked once every 60 seconds.

As our tumbling window is 30 seconds, we are called two times per minutes, 10 times every 5 minutes.

Visualization

Architecture overview of the visualization layer

Architecture overview of the visualization layer

The data ingested by the ingestion layer and the custom metrics generated by the enhancement layerare displayed through two distinct visualization system with different purposes:

CloudWatch

Amazon CloudWatch dashboards are customizable home pages in the CloudWatch console that you can use to monitor your resources in a single view. You can include custom metrics in these dashboards.

  1. Open the CloudWatch console
  2. On the left panel click on "Dashboards"
  3. Click on the BidRequestRealTimeDashboard

CloudWatch Dashboard

In the above example, between 9AM and 9:30AM you can see a spike. It is because a second producer has been temporarily launched. Kinesis Data Firehose adjusts automatically to ingest the additional bid requests.

The dashboard provides two views:

  • "Nbr of Bid requests per minutes": it is based on the custom metrics from the enhancement layer. It sums the values every minute.
  • "Statistics over the last hour": it is based on the standard metrics provided by Kinesis Data Analytics. It sums the values over the last hour.

You can further explore the source code of the widgets and the dashboard. You can easily extend the dashboard to your will.

QuickSight

Amazon QuickSight is a fast cloud-native business intelligence service that makes it easy to deliver insights to everyone in your organization. It is a fully managed service, that scales automatically to your need. You pay only per editor and per viewer per session. You don't need to provision capacity unless you need extra performance.

  1. Go to your AWS Account,
  2. Search for Quicksight and open it.
  3. You can create and add:
  4. And publish and share your work as dashboards

Here is a sampled Analysis I made based on the ingested data.

QuickSight Analysis.

Cost

This paragraph describes the cost of this experiment for one hour of produced data by one producer detailed by layer and services. It roughly represents 270k records and 240 custom metrics (60 * 2 * 2).

All prices are for the us-east-1 AWS region.

Usage details:

  • Producer:
    • Lambda:
      • Memory: 128MB
      • Duration: 2000 ms
    • Fargate:
      • Task: 1
      • vCPU: 0.5 (512 CPU unit requested, see here for details)
      • Memory: 512 MB
    • Amazon S3:
      • Mock file storage: 1.1 GB
  • Ingestion
  • Enhancement
  • Visualization

Total costs:

  • Lambda (assuming the whole lambda free tier has been consumed):
    • Cost: 121 executions * $0.0000002 + 34000 / 100 * $0.000000208 = $0
  • Fargate:
    • CPU charges: 1 task * O.5 vCPU * $0.04048 * 1 hour = $0.02024
    • Memory charges: 1 task * 0.5 GB * $0.004445 * 1 hour = $0.0022225
  • Kinesis Data Firehose:
    • Total record size (each record rounded to nearest 5KB): 1.35GB
    • Cost: 1.35 * $0.029 = $0.03915
  • Kinesis Data Analytics:
    • Cost: 1 KPU * $0.11 = $0.11
  • CloudWatch (assuming the whole CloudWatch free tier has been consumed):
    • Cost Dashboard: $3
    • Cost custom metrics: 1 metric * $0.3 = $0.3
    • Cost API Calls (push custom metrics + dashboard display) ~ to 1000 calls: 1 * $0.01 = $0.01
  • Amazon S3 (Assuming data will stay on S3 for one month):
    • Cost storage: ~ 1.4 GB * $0.023 = $0.0323
    • Cost API ~ to 1000 calls PUT and 1000 calls GET: 1 * $0.005 + 1 * $0.0004= $0.00504
  • QuickSight (Assuming paying for one month):
    • Cost author: $24

Total: < $30

Solutions alternatives

Although the cost can't simply multiply when you scale your application - decreasing price mechanism notably apply to several services and the QuickSight fees would be mostly the same until you require more capacities - it is interesting to look at specific parts of this architecture and estimate what would be the cost with the presented technology and some alternatives.

Example: What would be the cost of the ingestion layer for 800.000 bid requests / seconds for 1 hour?

Architecture alternatives to Kinesis Data Firehose for data ingestion:

  • Amazon Kinesis Data Stream: It is a massively scalable and durable real-time data streaming service. In the same fashion than Amazon Kinesis Data Firehose except that you must manage shards.
  • Amazon DynamoDB: It is a fully managed and serverless key-value and document database that delivers single-digit millisecond performance at any scale. We could use its API to push records directly into the database and later on using it.
  • Amazon MSK: it is a fully managed Apache Kafka service that makes it easy for you to build and run applications to process streaming data.

Kinesis Data Firehose:

  • Total record size (each record rounded to nearest 5KB): 800000 * 60 * 60 * 5KB = 14.4 TB (14400 GB)
  • Cost: $0.029 * 14400 = $417.6

Kinesis Data Stream:

  • "One shard provides ingest capacity of 1MB/sec or 1000 records/sec"
  • "A PUT Payload Unit is counted in 25KB payload β€œchunks” that comprise a record"
  • Each record is 0.5 KB, so 1000 records represents 0.5 MB. One shard will handle 1000 records/sec
  • Each record represents 1 PUT Payload Unit
  • At 800000 records/ses:
    • 800 shards are necessary
    • it represents 2,880,000,000 PUT Payload Unit
  • Cost: 800 shards * $0.015 + 2,880 PUT payload units * $0.014 = $52,32

Amazon DynamoDB:

  • "For items up to 1 KB in size, one WCU can perform one standard write request per second."
  • 800000 WCU are necessary to write records in the table
  • Cost write: 800000/s over one hour * $0.00065/hour = $520 (Doesn't include reading cost)
  • Cost storage: 800000 * 60 * 60 * 0.5 KB = 1440 GB * $0.25 = $360

Amazon MSK:

  • Assuming a broker instance kafka.m5.24xlarge can handle 200000 requests / sec
  • Broker cost: 4 * $10.08 = $40.32
  • Broker Storage Cost: 800000 * 60 * 60 * 0.5 KB = 1440 GB * $0.1 = $140

In that example, Amazon Kinesis Data Stream is the winner. In that case, contrary to Kinesis Firehose, it would be necessary to manage the scaling (in and out) of the Kinesis Data Stream: 800 shards won't always be necessary.

At a larger scale, some optimization and best practices will be necessary. Other alternatives might be tested (such as EC2 fleet behind a load balancer):

  • Records grouping to reduce the number of records to ingest
  • Geo distribution of ingestion layer to reduce latency
  • Enhancement of Elasticity

To learn more about real-time bidding on AWS, read the whitepaper.

Develop

Since this CDK project is typescript based, sources need to be compiled to JavaScript every time you make a modification to source files. This project is configured with a nice little npm script called watch that automatically compile .js file every time you make a change

Start watching for changes

In the home directory, open a new terminal and enter:

$> npm run watch

Useful commands

  • npm run build compile typescript to js
  • npm run watch watch for changes as you edit the application and compile
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk synth emits the synthesized CloudFormation template
  • cdk destroy destroy the CDK application in your default AWS account/region

Note: a cdk diff command might take up several seconds. The main reason here is that the CDK command performs a hash of a very big file (train.txt) that we are uploading as an asset of the application.

Clean up

Delete data of the Raw Data Bucket (output rawBucket of the stack) and destroy the CDK application:

$> aws s3 rm s3://dataexperimentstack-bidrequestexperimentstoragecc-XXXXXXXXXXXXX --recursive
$> cdk destroy

Inspiring source of information

  1. Producer:
  2. Ingestion:
  3. Enhancement:
  4. Visualization:

About

Ingestion of bid requests through Amazon Kinesis Firehose and Kinesis Data Analytics. Data lake storage with Amazon S3. Restitution with Amazon QuickSight and CloudWatch. πŸ“‰

Topics

Resources

Stars

Watchers

Forks