Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of Postgres pub/sub ? #86

Open
codeliner opened this issue Jun 20, 2017 · 14 comments
Open

Make use of Postgres pub/sub ? #86

codeliner opened this issue Jun 20, 2017 · 14 comments

Comments

@codeliner
Copy link
Member

I read recently that PostgresSql supports durable pub/sub

I'm wondering if this is useful for our projections to avoid polling event streams directly ...
The mechanism is relatively simple: When creating a stream or appending to it we could use NOTIFY to indicate that new events are available.

Running projections can use pg_get_notify to check for new notifications and only query effected event streams if new events are available.

pg_get_notify still uses polling, so I'm not sure if it is worth the effort. I also checked amphp but they use pg_get_notify, too. But an AmphpProjectionManager would be nice so that a single non-blocking Postgres connection is shared between multiple projections. But that's a different story.

Thoughts @prolic @basz @sandrokeil @oqq @shochdoerfer ?

@shochdoerfer Do you use that feature or do you know someone who uses it?

@shochdoerfer
Copy link
Contributor

I am aware of that feature but have never tried it myself.

@Ocramius
Copy link

IMO, if you get down this road, it's more efficient to just do everything in the DB here.

@sandrokeil
Copy link
Member

This should be more efficient than polling the table. Especially on high polling frequency. We should give it a try. Like @Ocramius mentioned, a trigger should work.

@Ocramius
Copy link

Interestingly, I was reading about this stuff yesterday:

That's haskell keeping a connection with the DB, but the NOTIFY should probably stay in the database directly, instead of looping through PHP (which can become an extreme bottleneck)

@codeliner
Copy link
Member Author

@Ocramius

Interestingly, I was reading about this stuff yesterday:

Follow my first link "durable pub/sub". You will be surprised :D

@codeliner
Copy link
Member Author

codeliner commented Jun 20, 2017

DB trigger for NOTIFY is a great idea. This would keep pdo-event-store logic untouched. Even better we can keep everything untouched and just wrap a projection with a pg_get_notify loop and run the projection manually whenever a new notification is received. One can put that into amphp loop or use Node.js or even haskell projections :D

@Ocramius
Copy link

You will be surprised :D

More like: "I don't know how to read, duh"

@trowski
Copy link

trowski commented Jun 21, 2017

Amp v2's postgres package supports listening and notifying that is entirely async. pg_get_notify is used in the watcher callback that is only invoked when data is available (never blocks). The code below is taken from the listening example.

use Amp\Loop;
use Amp\Postgres;

Loop::run(function () {
    $pool = Postgres\pool('host=localhost user=postgres');

    $channel = "test";

    /** @var \Amp\Postgres\Listener $listener */
    $listener = yield $pool->listen($channel);

    printf("Listening on channel '%s'\n", $listener->getChannel());

    Loop::delay(3000, function () use ($listener) { // Unlisten in 3 seconds.
        printf("Unlistening from channel '%s'\n", $listener->getChannel());
        return $listener->unlisten();
    });

    Loop::delay(1000, function () use ($pool, $channel) {
        return $pool->notify($channel, "Data 1"); // Send first notification.
    });

    Loop::delay(2000, function () use ($pool, $channel) {
        return $pool->notify($channel, "Data 2"); // Send second notification.
    });

    while (yield $listener->advance()) {
        $notification = $listener->getCurrent();
        printf(
            "Received notification from PID %d on channel '%s' with payload: %s\n",
            $notification->pid,
            $notification->channel,
            $notification->payload
        );
    }
});

@codeliner
Copy link
Member Author

@trowski wow, that's nice. Did not read the source code carefully enough. Just to make sure I understand correctly: watcher callback is only invoked if data is available. But it will also be invoked if I would notify on channel otherChannel, right?

Anyway, that is a killer feature for an amp postgres projection. Thx for the reply!

@trowski
Copy link

trowski commented Jun 21, 2017

@codeliner Yes, the watcher callback is only invoked when the connection associated with that watcher has data available. This is done by creating the watcher using the stream socket resource returned from pg_socket(). The watcher is invoked anytime data arrives from a listener, query, prepare, etc.

You can listen and notify on multiple channels. All connection ops return promises that resolve with various objects depending on the op when complete. Make a new project requiring amphp/postgres and give the script below a try.

#!/usr/bin/env php
<?php

require __DIR__ . '/vendor/autoload.php';

use Amp\Iterator;
use Amp\Loop;
use Amp\Postgres;

Loop::run(function () {
    $pool = Postgres\pool('host=localhost user=postgres');

    $channel1 = "test1";
    $channel2 = "test2";

    /** @var \Amp\Postgres\Listener $listener1 */
    $listener1 = yield $pool->listen($channel1);

    printf("Listening on channel '%s'\n", $listener1->getChannel());

    /** @var \Amp\Postgres\Listener $listener2 */
    $listener2 = yield $pool->listen($channel2);

    printf("Listening on channel '%s'\n", $listener2->getChannel());

    Loop::delay(6000, function () use ($listener1) { // Unlisten in 6 seconds.
        printf("Unlistening from channel '%s'\n", $listener1->getChannel());
        return $listener1->unlisten();
    });

    Loop::delay(4000, function () use ($listener2) { // Unlisten in 4 seconds.
        printf("Unlistening from channel '%s'\n", $listener2->getChannel());
        return $listener2->unlisten();
    });

    Loop::delay(1000, function () use ($pool, $channel1) {
        return $pool->notify($channel1, "Data 1.1");
    });

    Loop::delay(2000, function () use ($pool, $channel2) {
        return $pool->notify($channel2, "Data 2.1");
    });

    Loop::delay(3000, function () use ($pool, $channel2) {
        return $pool->notify($channel2, "Data 2.2");
    });

    Loop::delay(5000, function () use ($pool, $channel1) {
        return $pool->notify($channel1, "Data 1.2");
    });

    // Merge both listeners into single iterator.
    $listener = Iterator\merge([$listener1, $listener2]);

    while (yield $listener->advance()) {
        $notification = $listener->getCurrent();
        printf(
            "Received notification from PID %d on channel '%s' with payload: %s\n",
            $notification->pid,
            $notification->channel,
            $notification->payload
        );
    }
});

A connection can have multiple listeners, though a connection can only perform a single query at a time. The library also provides a connection pool that sends concurrent queries over multiple connection. In general you will always want to use a connection pool.

@prolic
Copy link
Member

prolic commented Sep 23, 2018

Coming to this again today (more then one year later) as we are planning event-store v8, I think this is another reason to pin this new major release to postgres only. I leave this issue open for now, until we get to implement and work really on v8

@sandrokeil
Copy link
Member

I think this is another reason to pin this new major release to postgres only.

But MongoDB 4 has such a feature too and it is implemented here.

@gquemener
Copy link
Contributor

For reference/inspiration, a similar PR is opened on symfony/messenger (symfony/symfony#35485).

@codeliner
Copy link
Member Author

@gquemener thanks for the link. That's indeed a good inspiration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants