Skip to content

chobostar/pg_listener

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go application to capture inserts to PostgreSQL table and produce events to Apache Kafka using replication slot and WAL

It connects to the replication slot and listens to INSERTs into the specified tables, from where it copies the value of the topic and payload columns, then sends this message to Kafka. Listening table example:

CREATE TABLE queue.events (
    id bigserial primary key,
    added_at timestamp NOT NULL default clock_timestamp(),
    topic text NOT NULL,
    payload json
);

Installing

make build

Example run

make up
make init
make build
make demo

make insert to postgres:

$ echo "insert into queue.events(topic, payload) values('demo_topic', '{\"id\": \"file\"}'::json);" | docker-compose exec -T postgres psql -U postgres

INSERT 0 1

see topics list:

$ docker-compose exec -T broker /bin/kafka-topics --bootstrap-server=localhost:9092 --list

demo_topic

read content:

$ docker-compose exec -T broker /bin/kafka-console-consumer --bootstrap-server=localhost:9092 --topic demo_topic --from-beginning 

{"id": "file"}

check metrics:

$ curl -s http://localhost:9938/metrics | grep pg_listener_processed_messages_total
# HELP pg_listener_processed_messages_total Total amount processed logical messages
# TYPE pg_listener_processed_messages_total counter
pg_listener_processed_messages_total 1

clean infra after all:

make down

Configure

PostgreSQL connection:

  • PGLSN_DB_HOST
  • PGLSN_DB_PORT
  • PGLSN_DB_NAME
  • PGLSN_DB_USER
  • PGLSN_DB_PASS

wal2json options (see also https://github.com/eulerto/wal2json#parameters):

  • PGLSN_TABLE_NAMES is add-tables - comma "," separated table names which INSERTs are produced to Kafka
  • PGLSN_CHUNKS is write-in-chunks, if "1", write after every change instead of every changeset

Postgres replication options:

  • PGLSN_SLOT - replication slot name where to connect
  • PGLSN_LSN - Instructs server to start streaming WAL, starting at WAL location XXX/XXX. 0/0 by default.

Apache Kafka connection:

  • PGLSN_KAFKA_HOSTS - comma separated hostname:port

Exported metrics

it exports metrics on port 9938 of http path /metrics

  • pg_listener_last_committed_wal_location - Last successfully commited to producer wal
  • pg_listener_logged_errors_total - Total amount logged errors
  • pg_listener_processed_messages_total - Total amount processed logical messages
  • pg_listener_raw_buffer_bytes - Current amount of raw buffer size of logical message
  • pg_listener_received_heartbeats_total - Total amount received heartbeats

Сaution

About

Go application to capture inserts into PostgreSQL table and produce events to Apache Kafka using replication slot and WAL

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published