Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid collisions in auto-generated Job IDs causing dropped jobs #237

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -558,7 +558,7 @@ Note that for calls that specify an interval, you must provide a callback if you

#### Queue#checkHealth([cb])

Check the "health" of the queue. Returns a promise that resolves to the number of jobs in each state (`waiting`, `active`, `succeeded`, `failed`, `delayed`), and the newest job ID (if using the default ID behavior) in `newestJob`. You can periodically query the `newestJob` ID to estimate the job creation throughput, and can infer the job processing throughput by incorporating the `waiting` and `active` counts.
Check the "health" of the queue. Returns a promise that resolves to the number of jobs in each state (`waiting`, `active`, `succeeded`, `failed`, `delayed`), and the total number of jobs saved (with a default ID) in `newestJob`. You can periodically query `newestJob` to estimate the job creation throughput, and can infer the job processing throughput by incorporating the `waiting` and `active` counts.

```js
const counts = await queue.checkHealth();
Expand Down
6 changes: 5 additions & 1 deletion lib/helpers.js
@@ -1,5 +1,8 @@
'use strict';

const crypto = require('crypto');
const promiseUtils = require('promise-callbacks');

const hasOwn = Object.prototype.hasOwnProperty;

function has(object, name) {
Expand All @@ -22,7 +25,7 @@ function finallyRejectsWithInitial(promise, fn) {
);
}

const promiseUtils = require('promise-callbacks');
const makeJobId = () => crypto.randomBytes(7).toString('hex');

module.exports = {
asCallback: promiseUtils.asCallback,
Expand All @@ -34,4 +37,5 @@ module.exports = {
waitOn: promiseUtils.waitOn,
withTimeout: promiseUtils.withTimeout,
wrapAsync: promiseUtils.wrapAsync,
makeJobId: makeJobId,
};
34 changes: 20 additions & 14 deletions lib/job.js
Expand Up @@ -52,49 +52,55 @@ class Job extends Emitter {
save(cb) {
const toKey = this.queue.toKey.bind(this.queue);

const isDefaultId = !this.id;
if (isDefaultId) {
this.id = helpers.makeJobId();
}

// We add to the stored-jobs set before addJob (or addDelayedJob) so that
// this job is already in queue.jobs. This supports the case of the job
// getting popped off 'waiting', being run, completing, and making callbacks
// (which use the id in queue.jobs) all before the _evalScript promise has
// resolved.
if (this.queue.settings.storeJobs) {
this.queue.jobs.set(this.id, this);
}

let promise;
if (this.options.delay) {
promise = this.queue._evalScript(
'addDelayedJob',
4,
toKey('id'),
isDefaultId ? toKey('id') : '',
toKey('jobs'),
toKey('delayed'),
toKey('earlierDelayed'),
this.id || '',
this.id,
this.toData(),
this.options.delay
);

if (this.queue.settings.activateDelayedJobs) {
promise = promise.then((jobId) => {
// Only reschedule if the job was actually created.
// Only reschedule if a new job was actually created.
if (jobId) {
this.queue._delayedTimer.schedule(this.options.delay);
}
return jobId;
});
}
} else {
promise = this.queue._evalScript(
'addJob',
3,
toKey('id'),
isDefaultId ? toKey('id') : '',
toKey('jobs'),
toKey('waiting'),
this.id || '',
this.id,
this.toData()
);
}

promise = promise.then((jobId) => {
this.id = jobId;
// If the jobId is not null, then store the job in the job map.
if (jobId && this.queue.settings.storeJobs) {
this.queue.jobs.set(jobId, this);
}
return this;
});
promise = promise.then(() => this);

if (cb) helpers.asCallback(promise, cb);
return promise;
Expand Down
6 changes: 2 additions & 4 deletions lib/lua/addDelayedJob.lua
@@ -1,5 +1,5 @@
--[[
key 1 -> bq:name:id (job ID counter)
key 1 -> bq:name:id (job counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is stored in bq:name:id? My understanding from this PR is that it's the number of jobs that don't have a custom id? What's that used for?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that is correct, it's poorly named, and appears to come from the fact that auto-generated IDs are currently sequential. So job ID 1000 would be the 1000th job. This PR of course changes that but does not rename to try to maximize compatibility, replacing the stat with the actual count instead of the actual last job ID from which you would infer the count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is that information ("the number of jobs that don't have a custom id") used?

key 2 -> bq:name:jobs
key 3 -> bq:name:delayed
key 4 -> bq:name:earlierDelayed
Expand All @@ -9,10 +9,8 @@ arg 3 -> job delay timestamp
]]

local jobId = ARGV[1]
if jobId == "" then
jobId = "" .. redis.call("incr", KEYS[1])
end
if redis.call("hexists", KEYS[2], jobId) == 1 then return nil end
if KEYS[1] then redis.call("incr", KEYS[1]) end
redis.call("hset", KEYS[2], jobId, ARGV[2])
redis.call("zadd", KEYS[3], tonumber(ARGV[3]), jobId)

Expand Down
10 changes: 3 additions & 7 deletions lib/lua/addJob.lua
@@ -1,17 +1,13 @@
--[[
key 1 -> bq:name:id (job ID counter)
key 1 -> bq:name:id (job counter)
key 2 -> bq:name:jobs
key 3 -> bq:name:waiting
arg 1 -> job id
arg 2 -> job data
]]

local jobId = ARGV[1]
if jobId == "" then
jobId = "" .. redis.call("incr", KEYS[1])
end
if redis.call("hexists", KEYS[2], jobId) == 1 then return nil end
if redis.call("hexists", KEYS[2], jobId) == 1 then return end
if KEYS[1] then redis.call("incr", KEYS[1]) end
redis.call("hset", KEYS[2], jobId, ARGV[2])
redis.call("lpush", KEYS[3], jobId)

return jobId
51 changes: 25 additions & 26 deletions test/queue-test.js
@@ -1,6 +1,7 @@
import {describe} from 'ava-spec';

import Queue from '../lib/queue';
import Job from '../lib/job';
import helpers from '../lib/helpers';
import sinon from 'sinon';

Expand Down Expand Up @@ -443,7 +444,7 @@ describe('Queue', (it) => {
const jobs = spitter();
queue.process((job) => jobs.pushSuspend(job));

await queue.createJob({}).save();
const job = await queue.createJob({}).save();
const [, finishJob] = await jobs.shift();

await t.throws(queue.close(10));
Expand All @@ -454,9 +455,8 @@ describe('Queue', (it) => {
const errors = t.context.queueErrors,
count = errors.length;
t.context.queueErrors = errors.filter((err) => {
return (
err.message !== 'unable to update the status of succeeded job 1'
);
const ref = `unable to update the status of succeeded job ${job.id}`;
return err.message !== ref;
});
t.is(t.context.queueErrors.length, count - 1);
t.context.handleErrors(t);
Expand Down Expand Up @@ -561,12 +561,12 @@ describe('Queue', (it) => {
const waitJob = queue._waitForJob,
wait = helpers.deferred();
let waitDone = wait.defer();
queue._waitForJob = function (...args) {
queue._waitForJob = function () {
if (waitDone) {
waitDone();
waitDone = null;
}
return waitJob.apply(this, args);
return waitJob.call(this);
};

await wait;
Expand Down Expand Up @@ -1008,7 +1008,7 @@ describe('Queue', (it) => {
});

it.describe('removeJob', (it) => {
it('should not cause an error if immediately removed', async (t) => {
it('should not error if active then removed before running', async (t) => {
const queue = t.context.makeQueue();

queue.process(async (job) => {
Expand All @@ -1017,25 +1017,24 @@ describe('Queue', (it) => {
}
});

const waitJob = queue._waitForJob,
wait = helpers.deferred();
let waitDone = wait.defer();
queue._waitForJob = function (...args) {
if (waitDone) {
// Wrap Job.fromId. When our wrapper is called with 'deadjob'
// (from queue._waitForJob) we undo the wrapping and we remove
// the job (from 'active')
const wait = helpers.deferred();
const waitDone = wait.defer();
const fromId = Job.fromId;
Job.fromId = async (queue, jobId, cb) => {
if (jobId === 'deadjob') {
Job.fromId = fromId;
await queue.removeJob(jobId);
waitDone();
waitDone = null;
}
return waitJob.apply(this, args);
return fromId(queue, jobId, cb);
};

await queue.createJob({foo: 'bar'}).setId('deadjob').save();
await wait;

const job = queue.createJob({foo: 'bar'}).setId('deadjob');
await Promise.all([
job.save(),
queue.removeJob(job.id),
queue.createJob({foo: 'bar'}).setId('goodjob').save(),
]);
await queue.createJob({foo: 'bar'}).setId('goodjob').save();

const goodJob = await helpers.waitOn(queue, 'succeeded');
t.is(goodJob.id, 'goodjob');
Expand Down Expand Up @@ -1229,16 +1228,16 @@ describe('Queue', (it) => {
queue.jobs = new Map();

const failed = await queue.getJobs('failed');
const failedErrors = new Set(
failed.map((job) => {
const failedErrors = failed
.map((job) => {
t.is(job.options.stacktraces.length, 1);
return job.options.stacktraces[0];
})
);
.sort();

t.deepEqual(
failedErrors,
new Set([stack, 'has message', 'is string', true])
[stack, 'has message', 'is string', true].sort()
);
});

Expand Down Expand Up @@ -1292,7 +1291,7 @@ describe('Queue', (it) => {
const [[failedJob, err]] = fail.args;

t.truthy(failedJob);
t.is(job.id, '1');
t.is(job.id, failedJob.id);
t.is(failedJob.data.foo, 'bar');
t.is(err.message, `Job ${job.id} timed out (10 ms)`);
});
Expand Down