Skip to content

Design/Implement stream/batch architecture on NYC taxi data | #DE

Notifications You must be signed in to change notification settings

yennanliu/NYC_Taxi_Pipeline

Repository files navigation

NYC Taxi Pipeline

INTRO

Architect batch/stream data processing systems from nyc-tlc-trip-records-data, via the ETL batch process : E (extract : tlc-trip-record-data.page -> S3 ) -> T (transform : S3 -> Spark) -> L (load : Spark -> Mysql) & stream process : Event -> Event digest -> Event storage. The system then can support calculation such as Top Driver By area, Order by time windiw, latest-top-driver, and Top busy areas.

Batch data : nyc-tlc-trip-records-data

Stream data : TaxiEvent, stream from file.

Please also check NYC_Taxi_Trip_Duration in case you are interested in the data science projects with similar taxi dataset.

Architecture

  • Architecture idea (Batch):
  • Architecture idea (Stream):

File structure

├── Dockerfile    : Scala spark Dockerfile
├── build.sbt     : Scala sbt build file
├── config        : configuration files for DB/Kafka/AWS..
├── data          : Raw/processed/output data (batch/stream)
├── doc           : All repo reference/doc/pic
├── elk           : ELK (Elasticsearch, Logstash, Kibana) config/scripts 
├── fluentd       : Fluentd help scripts
├── kafka         : Kafka help scripts
├── pyspark       : Legacy pipeline code (Python)
├── requirements.txt
├── script        : Help scripts (env/services) 
├── src           : Batch/stream process scripts (Scala)
└── utility       : Help scripts (pipeline)

Prerequisites

Prerequisites

Quick start

Quick-Start-Batch-Pipeline-Manually
# STEP 1) Download the dataset
bash script/download_sample_data.sh

# STEP 2) sbt build
sbt compile
sbt assembly

# STEP 3) Load data 
spark-submit \
 --class DataLoad.LoadReferenceData \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

spark-submit \
 --class DataLoad.LoadGreenTripData \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

spark-submit \
 --class DataLoad.LoadYellowTripData \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 4) Transform data 
spark-submit \
 --class DataTransform.TransformGreenTaxiData \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

spark-submit \
 --class DataTransform.TransformYellowTaxiData \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 5) Create view 
spark-submit \
 --class CreateView.CreateMaterializedView \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 6) Save to JDBC (mysql)
spark-submit \
 --class SaveToDB.JDBCToMysql \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 7) Save to Hive
spark-submit \
 --class SaveToHive.SaveMaterializedviewToHive \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
Quick-Start-Stream-Pipeline-Manually
# STEP 1) sbt build
abt compile
sbt assembly

# STEP 2) Create Taxi event
spark-submit \
 --class TaxiEvent.CreateBasicTaxiEvent \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# check the event
curl localhost:44444

# STEP 3) Process Taxi event
spark-submit \
 --class EventLoad.SparkStream_demo_LoadTaxiEvent \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 4) Send Taxi event to Kafaka
# start zookeeper, kafka
brew services start zookeeper
brew services start kafka

# create kafka topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic first_topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic streams-taxi

# curl event to kafka producer
curl localhost:44444 | kafka-console-producer  --broker-list  127.0.0.1:9092 --topic first_topic

# STEP 5) Spark process kafka stream
spark-submit \
 --class KafkaEventLoad.LoadKafkaEventExample \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 6) Spark process kafka stream
spark-submit \
 --class KafkaEventLoad.LoadTaxiKafkaEventWriteToKafka \
 target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar

# STEP 7) Run elsacsearch, kibana, logstach
# make sure curl localhost:44444 can get the taxi event
cd ~ 
kibana-7.6.1-darwin-x86_64/bin/kibana
elasticsearch-7.6.1/bin/elasticsearch
logstash-7.6.1/bin/logstash -f /Users/$USER/NYC_Taxi_Pipeline/elk/logstash/logstash_taxi_event_file.conf

# test insert toy data to logstash 
# (logstash config: elk/logstash.conf)
#nc 127.0.0.1 5000 < data/event_sample.json

# then visit kibana UI : localhost:5601
# then visit "management" -> "index_patterns" -> "Create index pattern" 
# create new index : logstash-* (not select timestamp as filter)
# then visit the "discover" tag and check the data

Dependency

Dependency
  1. Spark 2.4.3

  2. Java 8

  3. Apache Hadoop 2.7

  4. Jars

  5. build.sbt

Ref

Ref
  • ref.md - dataset link ref, code ref, other ref
  • doc - All ref docs

TODO

TODO
# 1. Tune the main pipeline for large scale data (to process whole nyc-tlc-trip data)
# 2. Add front-end UI (flask to visualize supply & demand and surging price)
# 3. Add test 
# 4. Dockerize the project 
# 5. Tune the spark batch/stream code 
# 6. Tune the kafka, zoopkeeper cluster setting