Skip to content

roveo/streamz_postgres

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

streamz_postgres

Build status

API of this plugin is considered unstable and subject to change without notice!

Home-made Postgres CDC for Streamz.

Detecting changes in databases is a hard problem. Most solutions require extensive infrastructure and heavy tweaking of the source database. If you're looking for Postgres CDC, you're probably going to end up reading about logical replication and Debezium. These are great solutions (and scalable too), but a little bit of an overkill for smaller projects.

If you need CDC on a couple-million-rows table and don't have a dedicated data engineering team, you probably won't like CDC tutorials starting with "first, let's install ZooKeeper". You can do almost the same with a glorified while True: loop and some select queries.

Installation

pip install git+https://github.com/roveo/streamz_postgres.git

Usage

There are some requirements for the source table.

  • It needs to have an integer primary key: that's what we're going to detect updates of.

  • It shouldn't be too big. The source is going to generate queries that are essentially sequence scans of the table, since you can't create indexes on xmin system column. You can overcome this with triggers and a dedicated integer column on the table, but it's not yet supported by this plugin. I'll probably add this functionality and write a tutorial on how to do this sometime in the future.

from streamz import Stream

src = Stream.from_postgres_cdc(
    "my_table",
    pk="id",  # this is the default
    polling_interval=10,  # seconds
    connection_params=dict(
        host="localhost",
        dbname="postgres",
        user="user",
        password="password"
    )
)
L = src.sink_to_list()
src.start()

That's it. At each iteration, rows updated since the previous one will end up in L. Each row is a psycopg2 DictRow.

Caveats

  • As I said, each query will generate a sequence scan. Be mindful of performance. You probably want to use a read replica as a source.
  • The source won't emit anything when a row is deleted. Most modern systems implement soft-delete, so this shouldn't be a problem, but if yours doesn't, you're out of luck.

About

Home-made Postgres CDC for Streamz

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages