Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db.postgres: implement wait() and post() based on Postgres notifications #428

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

starius
Copy link
Contributor

@starius starius commented Apr 6, 2016

Postgres supports publish-subscribe feature. One client listens on a channel
(provided as a string), while another (or same) client send a notification.
It can be used to send notifications accross machines and nginx workers.

The notification can be delivered with any read operation. To decouple
receiving notifications from reading results of normal queries, a
background handler with independent database socket is created. The
handler starts two light threads: a reader and a writer. The reader reads
new notifications from the database. The writer pushes commands "listen"
and "unlisten" to the database. Signals to/from the background handler
are delivered using openresty's semaphores.

Two new methods of "lapis.db.postgres" (and "lapis.db") were added:

  • wait(channel) -- waits on the channel until it is notified with post().
    Returns a payload passed by caller of post().
  • post(channel, payload) -- notifies the channel with the payload.

All waiting light threads in nginx worker share the same database socket.

All light threads waiting on the same channel use the same semaphore
object which is triggered by reader light thread of the background handler.
When new channel is started being listened, it is provided to writer light
thread of the background handler and it sends command "listen channel".
When a notification is received, the corresponding channel is "unlistened"
by writer light thread and the corresponding semaphore is removed. It is
needed to prevent resource leaks in nginx worker and in database worker.

All methods are 100% non-blocking.
Notifications add zero overhead when not used.

Example:

  location /wait {
    default_type text/html;
    lua_check_client_abort on;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      while true do
        ngx.say(require("lapis.db").wait(channel))
        ngx.flush()
      end
    ';
  }

  location /post {
    default_type text/html;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      local payload = ngx.req.get_uri_args().payload
      require("lapis.db").post(channel, payload)
    ';
  }
$ curl http://localhost:8080/wait?channel=foo

In another terminal:

$ curl 'http://localhost:25516/post?channel=foo&payload=bar'

The first terminal should output "bar".

This pull request depends on leafo/pgmoon#28

Postgres supports publish-subscribe feature. One client listens on a channel
(provided as a string), while another (or same) client send a notification.
It can be used to send notifications accross machines and nginx workers.

The notification can be delivered with any read operation. To decouple
receiving notifications from reading results of normal queries, a
background handler with independent database socket is created. The
handler starts two light threads: a reader and a writer. The reader reads
new notifications from the database. The writer pushes commands "listen"
and "unlisten" to the database. Signals to/from the background handler
are delivered using openresty's semaphores.

Two new methods of "lapis.db.postgres" (and "lapis.db") were added:

  * wait(channel) -- waits on the channel until it is notified with post().
    Returns a payload passed by caller of post().

  * post(channel, payload) -- notifies the channel with the payload.

All waiting light threads in nginx worker share the same database socket.

All light threads waiting on the same channel use the same semaphore
object which is triggered by reader light thread of the background handler.
When new channel is started being listened, it is provided to writer light
thread of the background handler and it sends command "listen channel".
When a notification is received, the corresponding channel is "unlistened"
by writer light thread and the corresponding semaphore is removed. It is
needed to prevent resource leaks in nginx worker and in database worker.

All methods are 100% non-blocking.
Notifications add zero overhead when not used.

Example:

  location /wait {
    default_type text/html;
    lua_check_client_abort on;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      while true do
        ngx.say(require("lapis.db").wait(channel))
        ngx.flush()
      end
    ';
  }

  location /post {
    default_type text/html;
    content_by_lua '
      local channel = ngx.req.get_uri_args().channel
      local payload = ngx.req.get_uri_args().payload
      require("lapis.db").post(channel, payload)
    ';
  }

$ curl http://localhost:8080/wait?channel=foo

In another terminal:

$ curl 'http://localhost:25516/post?channel=foo&payload=bar'

The first terminal should output "bar".
In general, read operation is limited by settings of the server or firewall.
In case of errors in reader or writer, new background Lua handler runs.
All channels being listened are re-listened.

Timeout of socket is also set to 5 minutes to have a value.
In previous implementation, resources were leaked in case of disconnection
of all clients (sema:count() == 0) and no notification on the channel.
The entry of "queues" in nginx and the channel listener in postres were
leaked.

Function cleanup_queues was introduced. It checks all queues and removes
unused items and unlistens corresponding channels. It is called when
reader light thread detects read timeout (after 5 minutes of no
notifications) or once in 10000 received notifications.

Relistening channels after restart of the background Lua handler
was moved to function listen_queues.
Instead of checking all channels once in a while, put them
to weak table and unlisten in a finalizer.
Send UNLISTEN command after a channel being unused for some time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant