Skip to content

josephj1o4e1/kafka-project-coinbase

Repository files navigation

kafka-project-coinbase

project-techstack-logo

Introduction

This final project repo includes real-time Coinbase market streaming pipeline.
The above graph is a brief summary of my streaming pipeline. My Kafka producer, written in Python, ingests data from Coinbase and publishes it to a Confluent Kafka Topic. Prior to consumption, I use ksqlDB for essential stream processing and transformation. For consuming the data, I utilize a managed Confluent connector as my Kafka consumer, which retrieves messages from ksqlDB and transfers them to a BigQuery Table. I built a simple dashboard on Looker Studio from the data from BigQuery Table.

Problem description:

This repository fulfills the requirement for real-time monitoring of Coinbase market data updates, specifically focusing on orders and trades. Through the implementation of a streaming data pipeline, it empowers traders with up-to-date information on trading volume and values across various virtual currencies on Coinbase.
This project is achieved by streaming data from Coinbase's "Exchange Websocket Direct Market Data".

The streaming data pipeline encompasses the following key aspects:

Cloud:

Project Clouds Logo

The project is developed using Confluent Cloud and BigQuery. Terraform serves as the Infrastructure as Code (IaC) tool for resource creation.

Data ingestion:

Producer:
Utilizing Kafka as the streaming tool, this repository employs the producer_coinbase.py script to ingest real-time market data from the Coinbase WebSocket feed. Acting as a local producer, this script retrieves data from the WebSocket, processes it, and publishes messages to Confluent Cloud Topics. In essence, it serves as a vital link between the Coinbase feed and Confluent Cloud, facilitating seamless data flow.
Consumer:
The consumer script is not essential in this setup because I utilize Confluent's BigQuery Sink Connector v2 to consume the data and send it directly to BigQuery. Visit this LINK.

Data warehouse:

Data has been streamed to BigQuery tables, where they are partitioned and clustered to optimize upstream queries. Refer to bigquery_partition.sql for details.
Partitioning data based on the TIME column at the hourly level can notably enhance query performance for time-based queries. Additionally, clustering by PRODUCT_ID ensures that data within each partition is logically sorted based on the product ID column, aligning well with GROUP BY clauses.

Transformations:

Utilized ksqlDB to perform real-time data transformations, enrichments, and aggregations on the incoming data streams from Coinbase. Refer to ksqldb/transform_changes.sql for details.
One of the reasons for the transformation is that our data includes an attribute called "changes" that is a nested array. While nested arrays are supported by AVRO on Confluent Kafka, it is not yet supported by AVRO on BigQuery. Therefore, we perform necessary transformations to ensure that the data meets the type requirements for AVRO on BigQuery. Refer to this LINK.

Dashboard:

My Interactive Looker Dashboard that visualizes simple analytical results after 10 hours of continuous streaming.
image image image

Reproduce the Pipeline

Please follow the below steps to reproduce the pipeline.
From the beginning to the end of the project, all required services are free!

  1. Setup
  2. Usage

1. Setup

Environment/Prequisites:

OS: WSL2 (Linux AMD64) on Windows 10
Package Manager: Miniconda
Git
BigQuery Free Account
Confluent Cloud Free Account

Step-by-step Setup:

  1. git clone this repo and navigate to project directory
    git clone https://github.com/josephj1o4e1/kafka-project-coinbase.git
    cd kafka-project-coinbase

  2. Create the conda environment
    conda env create -f environment.yml
    conda activate dezoom-project-reproduce

  3. Create a BigQuery project
    Follow this LINK.

  4. Add a BigQuery API Key
    Under the project folder, create keys/ folder under terraform/ folder. (kafka-project-coinbase/terraform/keys)
    In the GCP Cloud Console, create a service account:

    • Go to IAM&admin -> service accounts -> create new service account -> choose only BigQuery Admin Permission
    • Click the 3-dots icon -> manage keys -> create a new key(JSON) -> save .json file to terraform/keys/ folder
  5. Add a Confluent Cloud API Key

    • In the Confluent Cloud Console:
      wheres-cloud-api-key
  6. Prepare a secret.tfvars file
    Copy template_secret.tfvars to secret.tfvars and start filling in the variables.
    This is for running terraform.
    GCP:

    • gcp_credentials:
      File path of your (credential) .json file.
    • gcp_project:
      Project id of your GCP project.

    Confluent Cloud:

    • confluent_cloud_api_key, confluent_cloud_api_secret:
      Your confluent cloud api key and api secret.
  7. Run terraform (without BigQueryConnector)

    • Install Terraform if you haven't already (I use Linux AMD64)
      https://developer.hashicorp.com/terraform/install (use terraform --help command to confirm installation)
    • cd terraform/
    • terraform init (get providers)
    • terraform plan -var-file="secret.tfvars" (this make sure credentials work and let you inspect prepared resources)
    • terraform apply -var-file="secret.tfvars" (takes a couple of minutes)
  8. Run queries in ksqlDB editor

    • Go to Confluent Cloud Console. In your current cluster, go to ksqlDB Editor tab -> run the three queries in ksqldb/transform_changes.sql, one at a time.
    • After that, you should already have three ksql streams created: coinbase_avro, coinbase_avro_explode and coinbase_avro_flat. You should also have two corresponding topics created, each with a name suffixed by COINBASE_AVRO_EXPLODE and COINBASE_AVRO_FLAT, respectively.
  9. Run terraform (with BigQueryConnector)

    • Uncomment the last part of main.tf which is the confluent_connector resource.
    • terraform plan -var-file="secret.tfvars"
    • terraform apply -var-file="secret.tfvars" (takes a couple of minutes)
  10. Prepare a .env file
    Copy template.env to .env and start filling in the variables.
    Coinbase Sandbox API:

    • SANDBOX_API_KEY, SANDBOX_PASSPHRASE, SANDBOX_SECRET_KEY:
      Sign up and Log into the sandbox web interface, and go to the "API" tab to create an API key.

2. Usage

After finishing all the setup steps above:

  1. Simply run python producer_coinbase.py. Streaming begins.
    It should look something like this:
    terminal_view_streaming

    Check if your data is sent to the BigQuery Table.

  2. BigQuery table Partitioning and Clustering.
    Have a look at bigquery_partition.sql and run the sql query in your BigQuery project to partition and cluster the table.
    Change TABLE_NAME and TABLE_NAME_PARTITIONED_CLUSTERED to your desired table name.
    Partitioned by time (hour), and clustered by product_id.
    After partitioning and clustering the original table, you can compare the performance improvement like this:

    -- Performance before Partitioning and Clustering: 
    -- process 22.13MB
    SELECT * FROM <TABLE_NAME> 
    where PRODUCT_ID='BTC-EUR' and time between '2024-04-13T07:00:00' and '2024-04-13T9:00:00'
    limit 1000
    ;
    -- Performance after Partitioning and Clustering: 
    -- process 2.35MB
    SELECT * FROM <TABLE_NAME_PARTITIONED_CLUSTERED> 
    where PRODUCT_ID='BTC-EUR' and time between '2024-04-13T07:00:00' and '2024-04-13T9:00:00'
    limit 1000
    ;
    
  3. Looker Studio.
    Visualize the data on Looker studio.
    Here's the link of my simple analysis and visualization.

  4. Destroy all resources terraform destroy -var-file="secret.tfvars"

Feel free to provide comments, issues, or contributions to the project. Your feedback and involvement are highly valued and appreciated.

About

Real-time Coinbase market data streaming pipeline with visualizations. Much appreciation to DataTalks.Club Data Engineering Zoom Camp: https://github.com/DataTalksClub/data-engineering-zoomcamp

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published