- Produce persistent data to PostgreSQL.
- Identify and track changes to data in a database using Debezium Connector.
- Read the streaming data from Kafka topic using PySpark (Spark Streaming).
- Convert to Delta Lake format and write the streaming data to MinIO (AWS Object Storage).
- Use Trino to query data and display in DBeaver.
System Architecture
Before running this script, ensure you have the following installed:
- Python 3.10
- Docker
- Debezium (Debezium UI)
- PostgreSQL
- Confluent Containers (Zookeeper, Kafka, Schema Registry, Connect, Control Center)
- Apache Spark
- Delta Lake
- MinIO, Trino, DBeaver
-
Clone the repository:
git clone https://github.com/trannhatnguyen2/streaming_data_processing
-
Start our data streaming infrastructure
docker compose -f docker-compose.yaml -f storage-docker-compose.yaml up -d
This command will download the necessary Docker images, create containers, and start the services in detached mode.
-
Setup environment
conda create -n streaming python==3.10 y conda activate streaming pip install -r requirements.txt
Activate your conda environment and install required packages
-
Access the Services
- Postgres is accessible on the default port
5432
. - Debezium UI is accessible at
http://localhost:8080
. - Kafka Control Center is accessible at
http://localhost:9021
. - MinIO is accessible at
http://localhost:9001
.
- Postgres is accessible on the default port
-
Create Connector Postgres to Debezium
Firstly, modifying your config in configs/orders-cdc.json
cd debezium/ bash run.sh register_connector configs/orders-cdc.json
-
Create an empty table in PostgreSQL and insert new record to the table
python utils/create_table.py python utils/insert_table.py
-
Read and store data to MinIO
python spark_streaming/orders_delta_spark_to_minio.py
Read data in
Kafka Topic
then push them toMinIO
withdelta lake
format
After putting your files to MinIO
, please execute trino
container by the following command:
docker exec -ti datalake-trino bash
When you are already inside the trino
container, typing trino
to in an interactive mode
After that, run the following command to register a new schema for our data:
CREATE SCHEMA IF NOT EXISTS lakehouse.orders
WITH (location = 's3://lakehouse/');
CREATE TABLE IF NOT EXISTS lakehouse.orders.orders (
event_timestamp TIMESTAMP(3) WITH TIME ZONE,
order_date VARCHAR,
order_time VARCHAR,
order_number VARCHAR,
order_line_number TINYINT,
customer_name VARCHAR,
product_name VARCHAR,
store VARCHAR,
promotion VARCHAR,
order_quantity TINYINT,
unit_price DOUBLE,
unit_cost DOUBLE,
unit_discount DOUBLE,
sales_amount DOUBLE
) WITH (
location = 's3://lakehouse/orders'
);
- Install
DBeaver
as in the following guide - Connect to our database (type
trino
) using the following information (emptypassword
):
© 2024 NhatNguyen