The workflow is reported in: here
The goal of this project is to:
- Capture data change in Postgres database to Google Cloud Storage Sink
- Capture data change in Postgres database to Google Cloud BigQuery Sink
- Distributed RDBMS Postgres with Citus extension use to store sample co-located transaction table across Citus Cluster
- Debezium Connector captures data change in Postgres database and publish messages to Apache Kafka
- Apache Kafka is a message queue that decouple source and destination. In this scope, use single node Kafka Broker to develop
- Debezium (Kafka Connect) integrate with Sink Connector of Confluent Platform is responsible for processing data in Apache Kafka and load data to Google Cloud Storage, Google Cloud BigQuery
- Use Kafka UI to manage and monitor Kafka Cluster
See all images here: https://github.com/zaivi/cdc-debezium-postgres-gcp-pipeline/pkgs/container/cdc-debezium-postgres-gcp-pipeline/versions
Pull image from the command line:
$ docker pull ghcr.io/zaivi/cdc-debezium-postgres-gcp-pipeline:<tag>
Use as base image in Dockerfile:
FROM ghcr.io/zaivi/cdc-debezium-postgres-gcp-pipeline:<tag>
Assuming that Docker is installed, simply execute the following command to build and run the Docker Containers:
docker compose -f [postgres.docker-compose.yaml|citus.docker-compose.yaml] -f kafka.docker-compose.yaml -f debezium.docker-compose.yaml up
To shutdown Docker Containers, execute the following command:
docker compose -f [postgres.docker-compose.yaml|citus.docker-compose.yaml] -f kafka.docker-compose.yaml -f debezium.docker-compose.yaml down
Example for source Citus connector:
curl -X POST -H "Content-Type: application/json" --data @./Kafka-Connect/citus-customer-info-connection.json localhost:8083/connectors
To get connector information, use a command:
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/citus-customer-info-source-connector
To delete connector, use a command:
curl -i -X DELETE localhost:8083/connectors/citus-customer-info-source-connector
-- insert some events
INSERT INTO events (device_id, data)
SELECT s % 100, ('{"measurement":'||random()||'}')::jsonb FROM generate_series(1,1000000) s;
-- get the last 3 events for device 1, routed to a single node
SELECT * FROM events WHERE device_id = 1 ORDER BY event_time DESC, event_id DESC LIMIT 3;
┌───────────┬──────────┬───────────────────────────────┬───────────────────────────────────────┐
│ device_id │ event_id │ event_time │ data │
├───────────┼──────────┼───────────────────────────────┼───────────────────────────────────────┤
│ 1 │ 1999901 │ 2023-05-22 15:28:23.700068+00 │ {"measurement": 0.88722643925054} │
│ 1 │ 1999801 │ 2023-05-22 15:28:23.700068+00 │ {"measurement": 0.6512231304621992} │
│ 1 │ 1999701 │ 2023-05-22 15:28:23.700068+00 │ {"measurement": 0.019368766051897524} │
└───────────┴──────────┴───────────────────────────────┴───────────────────────────────────────┘
(3 rows)
Query complete 00:00:00.181
-- explain plan for a query that is parallelized across shards, which shows the plan for
-- a query one of the shards and how the aggregation across shards is done
EXPLAIN (VERBOSE ON) SELECT count(*) FROM events;
┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ QUERY PLAN │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ Aggregate (cost=250.00..250.02 rows=1 width=8) │
│ Output: COALESCE((pg_catalog.sum(remote_scan.count))::bigint, '0'::bigint) │
│ -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) │
│ ... │
│ -> Task │
│ Query: SELECT count(*) AS count FROM events_102008 events WHERE true │
│ Node: host=cdc-debezium-postgres-pipeline-worker-1 port=5432 dbname=postgres │
│ -> Aggregate (cost=1450.00..1450.01 rows=1 width=8) │
│ -> Seq Scan on public.events_102008 events (cost=0.00..1300.00 rows=60000 width=0) │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┘
- Citus cluter with 1 master, 1 worker and 1 manager