Skip to content

The project is developed with open source big data tools from Apache. All the required tools are installed and configured from scratch in a Linux environment to explore the mechanism of the Apache Big Data ecosystem.

License

asifuzzamann8/ETL-Workflow-using-Apache-NiFi

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

39 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ETL Workflow using Apache NiFi

The project is developed with open source big data tools from Apache. All the required tools are installed and configured from scratch in a Linux environment to explore the mechanism of the Apache Big Data ecosystem.

All the supporting tutorials are provided so that this document can be used as a development manual. Also, the technical challenges during the development process and their solution are described or highlighted in the sections. In addition, all the required configuration and coding files are attached to the project folder. Also, important code and configuration steps are mentioned in the glossary.

Scope

As part of the ETL process, the data needs to be fetched from UCI Machine Learning Repository from HTTPS link. The compressed file will be extracted. The dataset contains 2,916,697 records and ten attributes (Dataset Description). The target column contains the name of families, including the Ransomware ones (e.g., Cryptxxx, cryptolocker etc). A new column needs to be added to flag the probable Ransomware transaction. Also, column names will be changed to avoid system keywords (e.g. year, day, and count). After the required transformation, data will be inserted into the data warehouse (HIVE) for future use.

Platform & Tools

Archieve location is shared. Please check version compatibility before downloading the files.

Purpose Name Version Download Link
Virtualization VMWare Workstation Player 16 Click Here
OS Ubuntu 20.04 Click Here
Bigdata framework Apache Hadoop 3.1.2 Click Here
Database Apache Hive 3.1.2 Click Here
ETL Tool Apache NiFi 1.16.0 Click Here

Set up Big Data

The following section briefly describes the required steps to configure Hadoop and Hive. For set up, different .xml and .env files need to be configured. The details installation tutorial links are pasted with each section. On top of that cross check with the Glossary File for any additional configuration and essential parts included for this project. Copy of my configuration files are also shared in the Configs folder.

Install VMware and Os:

VMware is installed on a Windows machine (Laptop with six cores and 24GB memory). Although the procedure is simple, the memory and process allocation should be done accordingly. 8GB of memory, three cores, and 30GB of space are allocated for this project.

VM and Ubuntu installation guide: Click Here

Hadoop:

As a prerequisite for Hadoop, JDK 8 and OpenSHH needs to be installed. A separate os user is created for better management and security. Hadoop is downloaded from the mentioned link. The linux bash profile and configuration files should be updated with caution. The required commands are given in the glossary.

Hadoop installation guide: Click Here

Hive:

Apache Hive is a data warehouse software project built on Apache Hadoop to provide SQL query and analysis features. In the backend, it runs a map-reduce process to extract data from HDFS. Hive is installed to store and manage the data for further analysis.

Hive requires a conventional relational DB to store the necessary metadata for its management. The default installation comes with a derby database. However, accessing Hive from external tools or servers requires concurrent sessions. HiveServer2 facilitates the necessary services. It requires MySQL database as the metadatabase. Special care should be given to download the correct version of MySQL JDBC connector. The detailed steps are given in the glossary.

Hive installation guide: Click Here
HiveServer2 config details: Click Here
HiveServer2 config: Youtube Tutorial

NiFi:

Apache NiFi was built to automate the flow of data between systems. It supports almost all the databases and sources with a GUI-based data flow design facility that is easy to understand and manage. Also, the data flow can be saved and imported as a template to build redundant flows. It scales up the development time. NiFi is backed by ZooKeeper and can be worked in the distributed cluster.

NiFi is developed to manage huge data volumes with high throughput and low latency. It is advised to install NiFi on a separate server with dedicated raid space for logs and contents for the production environment. However, the same server is used for this project. The archive configuration and Java heap size need to be changed to run it smoothly. The log files need to be checked regularly for warnings.

NiFi installation Guide: Click Here

Manage Services:

Start:

Once installed and configured, the services can be started with below commands in terminal.

cd /home/hadoop/hadoop-3.1.2/sbin
./start-dfs.sh
./start-yarn.sh
hive --service metastore
hiveserver2
sudo service nifi start

Stop:

stop-dfs.sh 
stop-yarn.sh
sudo service nifi stop

Service Portals:

From the same VM below URLs can be access through browser.
namenode: http://localhost:9870
datanode: http://localhost:9864
yarn manager: http://localhost:8088
hive: http://localhost:10002/
nifi: https://localhost:8443/nifi/login

Hive Database Setup

The destination tables are created before workflow design. Both regular table and external table is created with same table structure. The purpose will be discussed in the following sections.

Access through Terminal

beeline -u jdbc:hive2://localhost:10000

Create Database:

CREATE DATABASE ETL;
SHOW DATABASES;
USE ETL;

Create Normal Hive Table:

CREATE TABLE IF NOT EXISTS etl.uci_ransomware (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',';

CREATE TABLE IF NOT EXISTS etl.uci_ransomware_v2 (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',';

External Table Set Up:

From terminal create HDFS Location and Structure.

hdfs dfs -mkdir -p /user/hive/uci_ransomware_ext
hdfs dfs -chmod g+w /user/hive/uci_ransomware_ext
hdfs dfs -ls /user/hive

External Table Creation in Hive:

CREATE EXTERNAL TABLE IF NOT EXISTS etl.uci_ransomware_ext (address string, year_at int, day_at int, length int, weight string, count_of int, looped int, neighbors int, income string, label string, ransomware int)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/uci_ransomware_ext'
tblproperties ("skip.header.line.count"="1");

ETL Workflow Development in NiFi

NiFi's fundamental design concepts closely relate to the main ideas of Flow-Based Programming. Data or "FlowFile" is moved from one step to another for required processing and transformation. Each task is completed by the "FlowFile Processor". Connection defines the relationship among processors.

Details Overview of NiFi: Click Here

For the final NiFi workflow xml template, Click Here. The template can be directly exported once NiFi is configured properly.


Nifi Processors Design - Final Workflow
Blue: Common processors for Data Pulling
Orange: Insert directly into Hive Table Using JDBC Connection
Green: Insert Using External Hive Table

As shown above, total 10 FlowFile Processors are configured to complete all the steps discussed in the ETL Workflow Description. The marked blue processors are for data fetching. Data transform and loading in Hive table is performed in two different methods (Marked green and orange). The processes are described in the following sections.

Common Processors for Data Pulling:

InvokeHTTP:

It is an HTTP client processor which can interact with a configurable HTTP endpoint. It is capable of processing API requests. SSL certificate needs to be downloaded from the site and configured in the processor to fetch data from Rest API/HTTPs. The figure below shows the required configuration, including URL, HTTP Method (Get/post), SSL Context Service, and others. Youtube Tutorial for the required process to configure the processor and SSL certificate.

InvokeHTTP

UnpackContent:

This processor takes a compressed file as an input and delivery uncompressed files as output. The compression type and file name can be filtered from this processor.

UnpackContent

PutFile:

The uncompressed file is forwarded to the PutFile processor to store it in the local file system.

PutFile

Insert directly into Hive Table Using JDBC Connection:

This method ingests data in the Hive table straight from the NiFi application using the Hive JDBC connection.

Twitter Data Example: Click Here

ExecuteStreamCommand (ExecutePythonScript):

This processor can execute external commands on the content of the FlowFile and creates a new FlowFile with the results. The FlowFile content in the input can be accessed as STDIN, and the processor can forward STOUT from the command as an output to the next processor.

The below figure shows the configuration of the processor. It takes a python script (Click for the script) as the command. The python script takes the STDIN and updates the dataset with an additional "Ransomware" flag column based on the label value. In addition, it supports code blocks (Groovy, Jython, Javascript, JRuby) instead of the script from the local machine.

ExecuteStreamCommand

ReplaceText (RenameHeader):

The ReplaceText processor is used to rename the header names with system keywords like year, day, and count.

ReplaceText

QueryRecords (FilterRecords):

QueryRecords processor can perform SQL-like queries directly on the FlowFile content. Also, the data format can be changed with this processor. In this case, the CSV format is converted into JSON for further processing compatibility. Record Reader/Writer value needs to be configured with the arrow sign on the right. In this figure, a new property "data" is included with the "+" sign in the top right corner, and an SQL query is provided as an input. The SQL query should not have any ";" at the end as the processor.

QueryRecords

ConvertJSONToSQL:

This processor transforms each entry of the JSON file into an SQL INSERT statement. The database JDBC connection pool needs to be created for this processor. The detail of the configuration is given below. Database connection URL, Database user, and Password are provided. The path of hive-site.xml should be provided in the Hive configuration resources box. Although it is not a mandatory parameter, without the Validation query "Select 1 as Test_column" the connection cannot be established.
Moreover, table and schema names need to be provided as input. Also, SQL parameter attributes have to be defined. In this case, "hiveql" is the correct input. The output FlowFile is a queue of insert statements. The hive table creation DDL is given in the glossary.

HiveConnectionPool

ConvertJsonToSQL

PutHiveQL:

It receives insert statements as input in the FlowFile and executes it in the Hive database through a JDBC connection.

PutHiveQL

Once completed, all records will be inserted in Hive.

Insert Using External Hive Table:

The aforementioned process executes single insert statements in a queue. It requires a lot of time due to JDBC connection overhead. In addition, Hive works differently than transactional databases and is not suitable for single insert statements. Hence, to improve the data insertion time below method is proposed using Hive external table functionality.

In this method, CSV data is transferred into a file location of HDFS. A Hive external table is defined in the database, which points to the same directory, and the table properties should match the columns and delimiter of the CSV file. Basically, the external table is an abstraction that presents the data in the CSV file as a table. However, it doesn't hold any information. The data will stay in the CSV file even if the external table architecture is dropped. Then an insert statement from the external table to the normal Hive table transfers all the data into the database. Since the operation happens within the HDFS, the execution time is much faster than the JDBC connection request.

ExecuteStreamCommand (ExecuteShellScript):

A shell script (Click for the script) is called using this processor to add the Ransomware flag column in the dataset and transfer the updated CSV file to the HDFS location.

ExecuteShellScript

SelectHiveQL (InsertFromExtrenalTable):

The SelectHiveQL has an additional property, "HiveQL Post-Query". This property is used in this step to execute the insert statement from the external table to Hive table. For the primary "HiveQl Select Query", a dummy statement has been provided.

SelectHiveQL

Additional Configuration:

Please check the NiFi Developer Manual for proper configuration of the connections, loop back, and error handling. Also buffer, wait time, recurrence parameters need to be configured for each processor based on the requirements.

Execution and Results

The workflow can be scheduled based on event or time(cron). Once the starting point is executed, the workflow will be completed accordingly and data will be loaded in final table. The data can be accessed from Hive beeline editor.

Result

In this case, the external table method takes less than a minute to insert 2.9Mn records in the Hive database. In contrast, the JDBC connection takes 5 minutes to execute a batch of 1000 insert statements. Hence external table method should be used for bulk data insertion in the data warehouse environment.

Road Blocks and Solutions

  • Hive won’t work without exact version of MySQL JDBC connector
  • Mar Reduce merory allocation needs to be changed
  • NiFi needs a restart after default password change
  • NiFI Hive JDBC connection won’t work without Validation query
  • NiFi Java default heap size needs to be increased
  • NiFi archive needs to be purged regularly

Conclusion

Overall, NiFi is a reasonably simple ETL design tool. The GUI makes it easy to understand. The 200+ built-in processors serve all the purposes of modern data ingestion needs. The connections can hold the FlowFile in case of failure. It provides an efficient way to execute the workflow from the point of failure. Using Kafka, NiFi can serve the purpose of message queueing as well. Also, the custom script execution makes NiFi versatile to make any custom operations. However, it losses cache information if the primary node gets disconnected. NiFi cluster can solve this problem. The configuration and resource allocation is the most important thing while working with Big Data platforms.

About

The project is developed with open source big data tools from Apache. All the required tools are installed and configured from scratch in a Linux environment to explore the mechanism of the Apache Big Data ecosystem.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published