Skip to content

mojolicious/minion.js

Repository files navigation

Screenshot

Coverage Status npm

A high performance job queue for Node.js and PostgreSQL. With support for alternative database backends. Written in TypeScript. Also available for Perl.

import Minion from '@minionjs/core';

// Use the default high performance PostgreSQL backend
const minion = new Minion('postgres://user:password@localhost:5432/database');

// Update the database schema to the latest version
await minion.update();

// Add tasks
minion.addTask('somethingSlow', async (job, ...args) => {
  console.log('This is a background worker process.');
});

// Enqueue jobs
await minion.enqueue('somethingSlow', ['foo', 'bar']);
await minion.enqueue('somethingSlow', [1, 2, 3], {priority: 5});

// Perform jobs for testing
await minion.enqueue('somethingSlow', ['foo', 'bar']);
await minion.performJobs();

// Start a worker to perform up to 12 jobs concurrently
const worker = minion.worker();
worker.status.jobs = 12;
await worker.start();

Features

  • Multiple named queues
  • Priorities
  • High priority fast lane
  • Delayed jobs
  • Job dependencies
  • Job progress
  • Job results
  • Retries with backoff
  • Rate limiting
  • Unique jobs
  • Expiring jobs
  • Statistics
  • Distributed workers
  • Parallel processing
  • Remote control
  • Multiple backends (such as PostgreSQL)
  • mojo.js admin ui

Job Queue

Job queues allow you to process time and/or computationally intensive tasks in background processes, outside of the request/response lifecycle of web applications. Among those tasks you'll commonly find image resizing, spam filtering, HTTP downloads, building tarballs, warming caches and basically everything else you can imagine that's not super fast.

Web Applications                      +--------------+                     Minion
|- Node.js [1]       enqueue job ->   |              |   -> dequeue job    |- Worker [1]
|- Node.js [2]                        |  PostgreSQL  |                     |- Worker [2]
|- Node.js [3]   retrieve result <-   |              |   <- store result   |- Worker [3]
+- Node.js [4]                        +--------------+                     |- Worker [4]
                                                                           +- Worker [5]

They are not to be confused with time based job schedulers, such as cron or systemd timers. Both serve very different purposes, and cron jobs are in fact commonly used to enqueue Minion jobs that need to follow a schedule. For example to perform regular maintenance tasks.

Consistency

Every new job starts out as inactive, then progresses to active when it is dequeued by a worker, and finally ends up as finished or failed, depending on its result. Every failed job can then be retried to progress back to the inactive state and start all over again.

                                                   +----------+
                                                   |          |
                                          +----->  | finished |
+----------+            +--------+        |        |          |
|          |            |        |        |        +----------+
| inactive |  ------->  | active |  ------+
|          |            |        |        |        +----------+
+----------+            +--------+        |        |          |
                                          +----->  |  failed  |  -----+
     ^                                             |          |       |
     |                                             +----------+       |
     |                                                                |
     +----------------------------------------------------------------+

The system is eventually consistent and will preserve job results for as long as you like, depending on the value of the minion.removeAfter property. But be aware that failed results are preserved indefinitely, and need to be manually removed by an administrator if they are out of automatic retries.

While individual workers can fail in the middle of processing a job, the system will detect this and ensure that no job is left in an uncertain state, depending on the value of the minion.missingAfter property. Jobs that do not get processed after a certain amount of time, will be considered stuck and fail automatically, depending on the value of the minion.stuckAfter property. So an admin can take a look and resolve the issue.

Examples

This distribution also contains a great example application you can use for inspiration. The link checker will show you how to integrate background jobs into well-structured mojo.js applications.

API

Minion uses a PostgreSQL backend by default, but allows for 3rd party implementations of alternative backends. See the PgBackend class and its tests for inspiration.

// Default PostgreSQL backend
const minion = new Minion('postgres://user:password@localhost:5432/database', {
  // Amount of time in milliseconds after which workers without a heartbeat will be considered missing and removed from
  // the registry, defaults to 30 minutes
  missingAfter: 1800000,

  // Amount of time in seconds after which jobs that have reached the state finished and have no unresolved
  // dependencies will be removed automatically by "repair", defaults to 2 days (it is not recommended to set this
  // value below 2 days)
  removeAfter: 172800000,

  // Amount of time in seconds after which jobs that have not been processed will be considered stuck and transition to
  // the failed state, defaults to 2 days
  stuckAfter: 172800000
});

// Custom 3rd party backend
const minion = new Minion('sqlite:test.db', {backendClass: SQLiteBackend});

Enqueue

New jobs are created with the minion.enqueue() method, which requires a task name to tell the worker what kind of workload the job represents, an array with job arguments, and an object with optional features to use for processing this job. Every newly created job has a unique id that can be used to check its current status.

const jobId = await minion.enqueue('task', ['arg1', 'arg2', 'arg3'], {

  // Number of times performing this job will be attempted (defaults to 0), with a computed increasing delay
  attempts: 5,

  // Delay job for this many milliseconds (from now)
  delay: 5000,

  // Job is valid for this many milliseconds (from now) before it expires
  expire: 10000,

  // Existing jobs this job depends on may also have transitioned to the "failed" state to allow for it to be processed
  lax: true,

  // Object with arbitrary metadata for this job that gets serialized as JSON
  notes: {foo: 'bar'},

  // One or more existing jobs this job depends on, and that need to have transitioned to the state "finished" before
  // it can be processed
  parents: [23, 24, 25],

  // Job priority (defaults to 0), jobs with a higher priority get performed first, priorities can be positive or
  // negative, but should be in the range between 100 and -100
  priority: 9,

  // Queue to put job in (defaults to "default")
  queue: 'important'
});

Tasks

Tasks are created with minion.addTask(), and are async functions that represent the individual workloads workers can perform. Not all workers need to have the same tasks, but it is recommended for easier to maintain code. If you want to route individual jobs to specific workers it is better to use named queues for that.

// Task without result
minion.addTask('somethingSlow', async job => {
  console.log('This is a background worker process.');
});

// Task with result
minion.addTask('somethingWithResult', async (job, num1, num2) => {
  const rersult = num1 + num2;
  await job.finish(result);
});

Jobs

Individual jobs are represented as instances of the Job class, which are the first argument passed to all task functions. To check the current status of a specific job you can use the minion.job() method.

// Request a specific job (this does not prevent workers from processing the job)
const job = await minion.job(23);

// Job properties
const jobId   = job.id;
const task    = job.task;
const args    = job.args;
const retries = job.retries;

// Request current state of the job
const info     = await job.info();
const attempts = info.attempts;
const children = info.children;
const created  = info.created;
const delayed  = info.delayed;
const expires  = info.expires;
const finished = info.finished;
const lax      = info.lax;
const notes    = info.notes;
const parents  = info.parents;
const priority = info.priority;
const queue    = info.queue;
const result   = info.result;
const retried  = info.retried;
const started  = info.started;
const state    = info.state;
const time     = info.time;
const worker   = info.worker;

// Merge notes (remove a note by setting it to "null")
const success = await job.note({just: 'a note', another: ['note'], foo: null});

// Remove job from database
const success = await job.remove();

// Manually finish/fail the job (the result is an arbitrary data structure that will be serialized as JSON)
const success = await job.finish('Huge success!');
const success = await job.fail('Something went wrong!');

Every job still in the database can be retried at any time, this is also the only way to change most of the available processing options. A worker already processing this job will not be able to assign a result afterwards, but it will not stop processing.

const success = await job.retry({

  // Number of times performing this job will be attempted
  attempts: 3,

  // Delay job for this many milliseconds (from now)
  delay: 3000,

  // Job is valid for this many milliseconds (from now) before it expires
  expire: 5000,

  // Existing jobs this job depends on may also have transitioned to the "failed" state to allow for it to be processed
  lax: false,

  // One or more existing jobs this job depends on, and that need to have transitioned to the state "finished" before
  // it can be processed
  parents: [23, 25],

  // Job priority (defaults to 0), jobs with a higher priority get performed first, priorities can be positive or
  // negative, but should be in the range between 100 and -100
  priority: 5,

  // Queue to put job in
  queue: 'unimportant'
});

The iterator API allows you to search through all jobs currently in the database.

const jobs = minion.jobs({

  // List only jobs with these ids
  ids: [23, 24],

  // List only jobs with one of these notes
  notes: ['foo', 'bar'],

  // List only jobs in these queues
  queues: ['important', 'unimportant'],

  // List only jobs in these states
  states: ['inactive', 'active'],

  // List only jobs for these tasks
  tasks: ['foo', 'bar']
});

// Total number of results
const total = await jobs.total();

// Iterate current job states
for await (const info of jobs) {
  const {id, state} = info;
  console.log(`${id}: ${state}`);
}

Locks

Named locks are a tool that can be used for many things, inside and outside of task functions. They expire automatically after a certain amont of time in milliseconds. You can release the lock manually with minion.unlock() to limit concurrency, or let it expire fro rate limiting.

// Acquire a named lock
const success = await minion.lock('fragile_backend_service', 5000, {

  // Number of shared locks with the same name that can be active at the same time (defaults to "1")
  limit: 5
});

// Check if a named lock currently exists
const success = await minion.isLocked('fragile_backend_service');

// Release a named lock
const success = await minion.unlock('fragile_backend_service');

The most common use for named locks is limiting access to shared resources.

// Only one job should run at a time (unique job)
minion.addTask('do_unique_stuff', async job => {
  if (await minion.lock('fragile_web_service', 7200000) !== true) {
    await minion.finish('Previous job still active')
    return;
  }
  ...
  await minion.unlock('fragile_web_service');
});

// Only five jobs should run at a time and we wait for our turn
minion.addTask('do_concurrent_stuff', async job => {
  while (await minion.lock('some_web_service', 60000, {limit: 5}) !== true) {
    await sleep(1000);
  }
  ...
  await minion.unlock('some_web_service');
});

// Only a hundred jobs should run per hour and we try again later if necessary
minion.addTask('do_rate_limit_stuff', async job => {
  if (await minion.lock('another_web_service', 360000, {limit: 100}) !== true) {
    await minion.retry({delay: 3600000})
    return;
  }
  ...
});

An expiration time of 0 can be used to check if a named lock could have been acquired without creating one.

// Check if the lock "foo" could have been acquired
const success = await $minion.lock('foo', 0);

Utilities

// Retry job in "minion_foreground" queue, then perform it right away with a temporary worker in this process, very
// useful for debugging
const success = await minion.foreground(23);

// Broadcast "jobs" command to pause worker 23
const success = await minion.broadcast('jobs', [0], [23]);

// Reset job queue
await minion.reset({

  // Reset everything
  all: true,

  // Reset only locks
  locks: true
});

mojo.js

You can use Minion as a standalone job queue, or integrate it into mojo.js applications with minionPlugin.

import {minionPlugin} from '@minionjs/core';
import mojo from '@mojojs/core';

export const app = mojo();

app.plugin(minionPlugin, {config: 'postgres://user:password@localhost:5432/database'});

// Slow task
app.models.minion.addTask('poke_mojo', async job => {
  await job.app.ua.get('mojolicious.org');
  job.app.log.debug('We have poked mojolicious.org for a visitor');
});

// Perform job in a background worker process
app.get('/', async ctx => {
  await ctx.models.minion.enqueue('poke_mojo');
  await ctx.render({text: 'We will poke mojolicious.org for you soon.'});
});

app.start();

Background worker processes are usually started with the minion-worker command, which becomes automatically available when an application loads minionPlugin.

$ node index.js minion-worker

Jobs can be managed right from the command line with the minion-job command.

$ node index.js minion-job

You can also add an admin ui to your application by loading minionAdminPlugin. Just make sure to secure access before making your application publicly accessible.

import {minionPlugin, minionAdminPlugin} from '@minionjs/core';
import mojo from '@mojojs/core';

export const app = mojo();

const minionPrefix = app.any('/minion');

app.plugin(minionPlugin, {config: 'postgres://user:password@localhost:5432/database'});
app.plugin(minionAdminPlugin, {route: minionPrefix});

app.start();

Deployment

To manage background worker processes with systemd, you can use a unit configuration file like this.

[Unit]
Description=My mojo.js application worker
After=postgresql.service

[Service]
Type=simple
ExecStart=NODE_ENV=production node /home/sri/myapp/myapp.js minion-worker
KillMode=process

[Install]
WantedBy=multi-user.target

Installation

All you need is Node.js 16.0.0 (or newer).

$ npm install @minionjs/core

Support

If you have any questions the documentation might not yet answer, don't hesitate to ask in the Forum, on Matrix, or IRC.