Skip to content

mansueli/supa_queue

Repository files navigation

supa_queue Extension for Postgres

Overview

The supa_queue extension is a simple but robust message queue system designed to work seamlessly with Postgres. It allows you to manage and process asynchronous jobs efficiently within your Postgres database. You can read the blog post explaining this system here: Building a Queue System with Supabase and PostgreSQL

Features

  • Simple message queue system.
  • Support for various HTTP methods (GET, POST, DELETE).
  • Retry mechanism for failed jobs.
  • Concurrent processing using multiple workers.
  • Integration with external services via HTTP requests.
  • Flexible scheduling with pg_cron for optimal job management.

Installation

To install the supa_queue extension, follow these steps:

  1. Make sure you have Postgres installed and running.

  2. Install the required extensions pg_cron and pg_net if not already installed.

  3. Install DB.DEV:

create extension if not exists http with schema extensions;
create extension if not exists pg_tle;
select pgtle.uninstall_extension_if_exists('supabase-dbdev');
drop extension if exists "supabase-dbdev";
select
    pgtle.install_extension(
        'supabase-dbdev',
        resp.contents ->> 'version',
        'PostgreSQL package manager',
        resp.contents ->> 'sql'
    )
from http(
    (
        'GET',
        'https://api.database.dev/rest/v1/'
        || 'package_versions?select=sql,version'
        || '&package_name=eq.supabase-dbdev'
        || '&order=version.desc'
        || '&limit=1',
        array[
            ('apiKey', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InhtdXB0cHBsZnZpaWZyYndtbXR2Iiwicm9sZSI6ImFub24iLCJpYXQiOjE2ODAxMDczNzIsImV4cCI6MTk5NTY4MzM3Mn0.z2CN0mvO2No8wSi46Gw59DFGCTJrzM0AQKsu_5k134s')::http_header
        ],
        null,
        null
    )
) x,
lateral (
    select
        ((row_to_json(x) -> 'content') #>> '{}')::json -> 0
) resp(contents);
create extension "supabase-dbdev";
select dbdev.install('supabase-dbdev');
drop extension if exists "supabase-dbdev";
create extension "supabase-dbdev";
  1. Install supa_queue with the following code:
select dbdev.install('mansueli-supa_queue');
create extension "mansueli-supa_queue"
    version '1.0.4';

You can also install it in a different schema with:

create schema supa_queue;
select dbdev.install('mansueli-supa_queue');
create extension "mansueli-supa_queue"
    schema supa_queue
    version '1.0.4';

Usage

Inserting Jobs

To add a job to the queue, insert a new record into the job_queue table. Specify the HTTP verb (GET, POST, DELETE), payload, and other relevant information. The job will be processed asynchronously.

INSERT INTO job_queue (http_verb, payload, url_path) VALUES ('GET', '{"key": "value"}', '/api/resource');

Processing Jobs

Jobs are processed automatically using the provided functions. The process_job() trigger function processes newly inserted jobs. The process_current_jobs_if_unlocked() function assigns jobs to available workers for execution.

Retrying Failed Jobs

Failed jobs are automatically retried with the retry_failed_jobs() function, increasing job reliability. Jobs with a status of 'failed' and within the retry limit will be retried.

Configuration

You can configure various aspects of the supa_queue extension by modifying the provided SQL functions and cron schedules to suit your specific use case.

Note that you'll need to set these values in Vault:

  • service_role key you can get this in the dashboard.
  • consumer_function this is the URL of the Edge Function that will consume the tasks.

Using the queue

The job_queue table comes with several values by default, which you can tweak for more advanced cases or use the standard values. Here's the definition of each column:

Usable params to be used when adding tasks:

  • http_verb: HTTP verb to be used when consuming the function
  • payload: the payload of the task in the queue
  • status: the status of the request (⚠️you should leave this empty⚠️)
  • retry_limit: the amount of times a job can be retried
  • url_path: additional paths to be passed to the consuming edge function for routing cases

Internal parameters (should not be changed by you):

  • job_id: the id for the table
  • retry_count: another internal parameter
  • content: the content received from the edge function (response)
  • created_at: timestamp of when the task entered the queue
CREATE TABLE job_queue (
    job_id BIGINT GENERATED BY DEFAULT AS IDENTITY,
    http_verb TEXT NOT NULL CHECK (http_verb IN ('GET', 'POST', 'DELETE')),
    payload jsonb,
    status TEXT NOT NULL DEFAULT '',
    retry_count INTEGER DEFAULT 0,
    retry_limit INTEGER DEFAULT 10,
    url_path TEXT DEFAULT '',
    content TEXT DEFAULT '',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

License

This extension is provided under the license included in the repository.

Contributing

If you'd like to contribute to this project or report issues, please visit the GitHub repository for more information.

About

A Simple Postgres Queue with pg_net & pg_cron

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published