Skip to content

A data streaming pipeline built with Apache Kafka and Faust to perform automatic word counting of scientific files dropped in AWS S3 bucket

Notifications You must be signed in to change notification settings

Zak-Musah/Apache-Kafka-Data-Pipeline-in-AWS

Repository files navigation

Introduction

Scientific publications are continuously loaded to Amazon S3 and processed with Apache Kafka to create an output topic with word counts of the text corpus. In this mini project, a data streaming pipeline is built with Apache Kafka and Faust to perform the word counting. Faust is a stream processing library, porting the ideas from Kafka Streams to Python as described by its owners.

By default the maximum file size that can be sent to Kafka is 1 MB and in order to take care text files greater than 1 MB; we’ll leverage on another python library from Amazon, Boto 3 with capabilities of reading objects in S3 transparently and feed to Faust application agents to complete the word counting.

Image

The Python Kafka producer also known as the Faust Application or Worker sits in the EC2 instance and interacts with the S3 bucket and the Kafka Cluster to complete the pipeline as shown in fig 1. Faust relies heavily on the newly Python asyncio and uses Apache Kafka as a message broker.

Description

The Infrastructure that is used in this project is bundled as code with AWS CloudFormation. The resources created consist of a multi broker Serverless Amazon Managed Streaming for Apache Kafka (Amazon MSK) Cluster with an EC2 instance serving as the client to connect to the brokers as well as an S3 bucket. Specifically 3 brokers will be created and distributed evenly across 3 availability zones.

Image

It is worth noting that, almost every service in AWS uses the concept of ‘Least Privileges’ which basically says that a user or another AWS service cannot access other services unless granted permission and this is achieved by Roles and Policies among services in AWS. Therefore the EC2 being the backbone of the pipeline is granted permissions to access both the MSK cluster and S3 by assigning it a role with attach policies. The AWS account user will also require Access Keys to programmatically access resources (S3) through the boto 3 API.

Implementation

Step 1: Dependencies

All the dependencies, be it access control or software have been taken care of in the CloudFormation code.

AWS - Related

  • AWS Account
  • EC2 KeyPairs
  • Access key ID / Secret access key - Rename .env.example as .env and set the values of the keys
  • AWS MSK
  • Role and Policy access to access S3 and MSK Cluster
  • Inbound Security Group Rules to access MSK and EC2 instance remotely

Non AWS Related

  • Python 3.7
  • Faust 1.0
  • Confluent-Kafka
  • kafka_2.12-2.2.1

Step 2: Create – Access Keys

In the IAM console click on Manage Security Credentials

Image

Click on Create New Access Key and then Download Key File

Step 3: Create - EC2 KeyPairs

In the EC2 Key pairs page click on Create key pair and provide a name and file format.

Now click on Create key pair and MSKkeyPair is downloaded automatically.

Image

Step 4: Deploying the MSK Cluster + S3 Bucket

In the AWS console - Services go to CloudFormation and then click Create stack

Image

Select the location of the CloudFormation document and click Next.

Image

Provide a name and select the KeyName created in step 3

Image

Optionally add a tag Name and Value

Image

Acknowledge and Create Stack

Image

When start creation is done, all resources status will show as CREATE_COMPLETE

Image

Everything is now set to go 😄

Running and Testing

To run the producer, start the Kafka cluster. SSH into the Kafka Client Machine using the credentials obtained in Step 3 and the public IP of the EC2 instance that came with the Kafka Cluster. The screenshot given shows how to obtain the public IP address.

Image

Starting Zookeeper:

cd kafka/kafka_2.12-2.2.1
bin/zookeeper-server-start.sh config/zookeeper.properties

Starting Kafka Server:

cd kafka/kafka_2.12-2.2.1  
bin/kafka-server-start.sh config/server.properties

Starting the producer and Consumer:

Upload the wordcount.py to the /home/ec2-user directory. Three files with different names but same content are uploaded to S3 in turns and the word counts in each round is observed through the Faust app tables and the Output Topic in Kafka through the Kafka Console Consumer. The worker is built with a task timer of 30s. That is to say, the worker listens for the presence of new objects in the S3 bucket every thirty seconds.

Watch Out!

1- Run the Faust Application with the following command with an empty S3 bucket state in the /home/ec2-user directory:

faust -A wordcount worker -l info

Image

The screenshot shows worker in a waiting state. Please note that by default Faust combines the app-id and table name and a suffix changelog to represent the name of the output topic sent to Kafka. In this example the console consumer consumes from word-count-word_counts-changelog.

2- Run Kafka Console Consumer in another window with:

cd kafka/kafka_2.12-2.2.1  
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic word-count-word_counts-changelog --from-beginning --property print.key=true --property print.value=true

3- Put the Console Consumer window to the right of the Producer and Upload file A1.headed.txt to the S3 bucket.

Image

4- Next Upload A2.headed.txt .

Image

5- Finally A3.headed.txt

Image

Conclusion

The last but two words in the console consumer “the” and “star” appears one hundred and twentieth and one times respectively in the first instance of file upload to S3, accumulating to three hundred and sixty and three respectively on the third upload indicating the Word Count App is working as expected. The application have been tested for all the requirements of the use case.

About

A data streaming pipeline built with Apache Kafka and Faust to perform automatic word counting of scientific files dropped in AWS S3 bucket

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages