Skip to content

Commit

Permalink
fix(lost-jobs): Jobs get early uuid assignment; tests use job.id; mak…
Browse files Browse the repository at this point in the history
…e a removeJob test be deterministic
  • Loading branch information
hughsw committed Apr 19, 2020
1 parent 3430506 commit 20d411e
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 48 deletions.
6 changes: 6 additions & 0 deletions lib/helpers.js
Expand Up @@ -24,6 +24,11 @@ function finallyRejectsWithInitial(promise, fn) {

const promiseUtils = require('promise-callbacks');

// A simple way to get up to 14 random hex digits, not cryptographic
const randomHex14 = () => {
return Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16);
};

module.exports = {
asCallback: promiseUtils.asCallback,
callAsync: promiseUtils.callAsync,
Expand All @@ -34,4 +39,5 @@ module.exports = {
waitOn: promiseUtils.waitOn,
withTimeout: promiseUtils.withTimeout,
wrapAsync: promiseUtils.wrapAsync,
makeJobId: randomHex14,
};
34 changes: 20 additions & 14 deletions lib/job.js
Expand Up @@ -52,49 +52,55 @@ class Job extends Emitter {
save(cb) {
const toKey = this.queue.toKey.bind(this.queue);

const isDefaultId = !this.id;
if (isDefaultId) {
this.id = helpers.makeJobId();
}

// We add to the stored-jobs set before addJob (or addDelayedJob) so that
// this job is already in queue.jobs. This supports the case of the job
// getting popped off 'waiting', being run, completing, and making callbacks
// (which use the id in queue.jobs) all before the _evalScript promise has
// resolved.
if (this.queue.settings.storeJobs) {
this.queue.jobs.set(this.id, this);
}

let promise;
if (this.options.delay) {
promise = this.queue._evalScript(
'addDelayedJob',
4,
toKey('id'),
isDefaultId ? toKey('id') : '',
toKey('jobs'),
toKey('delayed'),
toKey('earlierDelayed'),
this.id || '',
this.id,
this.toData(),
this.options.delay
);

if (this.queue.settings.activateDelayedJobs) {
promise = promise.then((jobId) => {
// Only reschedule if the job was actually created.
// Only reschedule if a new job was actually created.
if (jobId) {
this.queue._delayedTimer.schedule(this.options.delay);
}
return jobId;
});
}
} else {
promise = this.queue._evalScript(
'addJob',
3,
toKey('id'),
isDefaultId ? toKey('id') : '',
toKey('jobs'),
toKey('waiting'),
this.id || '',
this.id,
this.toData()
);
}

promise = promise.then((jobId) => {
this.id = jobId;
// If the jobId is not null, then store the job in the job map.
if (jobId && this.queue.settings.storeJobs) {
this.queue.jobs.set(jobId, this);
}
return this;
});
promise = promise.then(() => this);

if (cb) helpers.asCallback(promise, cb);
return promise;
Expand Down
6 changes: 2 additions & 4 deletions lib/lua/addDelayedJob.lua
@@ -1,5 +1,5 @@
--[[
key 1 -> bq:name:id (job ID counter)
key 1 -> bq:name:id (job counter)
key 2 -> bq:name:jobs
key 3 -> bq:name:delayed
key 4 -> bq:name:earlierDelayed
Expand All @@ -9,10 +9,8 @@ arg 3 -> job delay timestamp
]]

local jobId = ARGV[1]
if jobId == "" then
jobId = "" .. redis.call("incr", KEYS[1])
end
if redis.call("hexists", KEYS[2], jobId) == 1 then return nil end
if KEYS[1] then redis.call("incr", KEYS[1]) end
redis.call("hset", KEYS[2], jobId, ARGV[2])
redis.call("zadd", KEYS[3], tonumber(ARGV[3]), jobId)

Expand Down
10 changes: 3 additions & 7 deletions lib/lua/addJob.lua
@@ -1,17 +1,13 @@
--[[
key 1 -> bq:name:id (job ID counter)
key 1 -> bq:name:id (job counter)
key 2 -> bq:name:jobs
key 3 -> bq:name:waiting
arg 1 -> job id
arg 2 -> job data
]]

local jobId = ARGV[1]
if jobId == "" then
jobId = "" .. redis.call("incr", KEYS[1])
end
if redis.call("hexists", KEYS[2], jobId) == 1 then return nil end
if redis.call("hexists", KEYS[2], jobId) == 1 then return end
if KEYS[1] then redis.call("incr", KEYS[1]) end
redis.call("hset", KEYS[2], jobId, ARGV[2])
redis.call("lpush", KEYS[3], jobId)

return jobId
46 changes: 23 additions & 23 deletions test/queue-test.js
@@ -1,6 +1,7 @@
import {describe} from 'ava-spec';

import Queue from '../lib/queue';
import Job from '../lib/job';
import helpers from '../lib/helpers';
import sinon from 'sinon';

Expand Down Expand Up @@ -443,7 +444,7 @@ describe('Queue', (it) => {
const jobs = spitter();
queue.process((job) => jobs.pushSuspend(job));

await queue.createJob({}).save();
const job = await queue.createJob({}).save();
const [, finishJob] = await jobs.shift();

await t.throws(queue.close(10));
Expand All @@ -455,7 +456,7 @@ describe('Queue', (it) => {
count = errors.length;
t.context.queueErrors = errors.filter((err) => {
return (
err.message !== 'unable to update the status of succeeded job 1'
err.message !== `unable to update the status of succeeded job ${job.id}`
);
});
t.is(t.context.queueErrors.length, count - 1);
Expand Down Expand Up @@ -561,12 +562,12 @@ describe('Queue', (it) => {
const waitJob = queue._waitForJob,
wait = helpers.deferred();
let waitDone = wait.defer();
queue._waitForJob = function (...args) {
queue._waitForJob = function () {
if (waitDone) {
waitDone();
waitDone = null;
}
return waitJob.apply(this, args);
return waitJob.call(this);
};

await wait;
Expand Down Expand Up @@ -1008,34 +1009,33 @@ describe('Queue', (it) => {
});

it.describe('removeJob', (it) => {
it('should not cause an error if immediately removed', async (t) => {
it('should not error if active then removed before running', async (t) => {
const queue = t.context.makeQueue();

queue.process(async (job) => {
if (job.id === 'deadjob') {
t.fail('should not be able to process the job');
}
});

const waitJob = queue._waitForJob,
wait = helpers.deferred();
// Override Job.fromId. The first time fromId is called (from
// queue._waitForJob) we remove the job between when it is moved from
// 'waiting' to 'active' and when Queue._waitForJob() returns its id
const wait = helpers.deferred();
let waitDone = wait.defer();
queue._waitForJob = function (...args) {
const fromId = Job.fromId;
Job.fromId = async (queue, jobId, cb) => {
if (waitDone) {
await queue.removeJob(jobId);
waitDone();
waitDone = null;
}
return waitJob.apply(this, args);
return fromId(queue, jobId, cb);
};

await wait;
queue.process(async (job) => {
if (job.id === 'deadjob') {
t.fail('should not be able to process the job');
}
});

const job = queue.createJob({foo: 'bar'}).setId('deadjob');
await Promise.all([
job.save(),
queue.removeJob(job.id),
queue.createJob({foo: 'bar'}).setId('goodjob').save(),
]);
await queue.createJob({foo: 'bar'}).setId('deadjob').save();
await wait;
await queue.createJob({foo: 'bar'}).setId('goodjob').save();

const goodJob = await helpers.waitOn(queue, 'succeeded');
t.is(goodJob.id, 'goodjob');
Expand Down Expand Up @@ -1292,7 +1292,7 @@ describe('Queue', (it) => {
const [[failedJob, err]] = fail.args;

t.truthy(failedJob);
t.is(job.id, '1');
t.is(job.id, failedJob.id);
t.is(failedJob.data.foo, 'bar');
t.is(err.message, `Job ${job.id} timed out (10 ms)`);
});
Expand Down

0 comments on commit 20d411e

Please sign in to comment.