Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add external limiter support #183

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 52 additions & 1 deletion README.md
Expand Up @@ -37,6 +37,7 @@ Thanks to the folks at [Mixmax](https://mixmax.com), Bee-Queue is once again bei
- Concurrent processing
- Job timeouts, retries, and retry strategies
- Scheduled jobs
- External rate limiter support
- Pass events via Pub/Sub
- Progress reporting
- Send job results back to producers
Expand Down Expand Up @@ -189,6 +190,48 @@ subQueue.process(10, function (job, done) {
});
```

## Use With a Rate Limiter

You can integrate Bee-Queue with any external rate limiter of your choice to throttle jobs.

`Queue.process` takes a function as an optional second argument, which if provided will be run just prior to executing each job. This allows you to query an external limiter and optionally reschedule the job if it is not cleared to run. The limiter query function helps the queue rapidly scan for jobs which are ready to run rather than tying up your concurrency with waiting jobs.

The limiter query function can use a callback or return a promise. The promise should resolve to an object which at minimum contains a boolean `ready` parameter, `true` if the job is ready to run.

```js
limiterQueue.process(
async (job) => {
console.log(`Processing job ${job.id} after ${job.data.tries} tries`);
// Do some work
},
async (job) => {
const tries = job.data.tries + 1;
const data = { ...job.data, tries };
// You can plug in any non-blocking external rate limiter here
const result = await limiter.check();
if (result.ok) {
return {
ready: true,
// (optional) this will replace the existing job data
data
}
} else {
return {
ready: false,
// (optional) this will replace the existing job data
data,
// (optional) reschedule the job to try again at this timestamp
// if 0 or not specified, the job goes to the end of the waiting queue
delayUntil: Date.now() + result.msUntilReady
}
}
}
);
```

Additionally if the limiter check promise rejects or passes an error to the callback the job will be immediately failed with no retry logic.


## Progress Reporting

Handlers can send progress reports, which will be received as events on the original job instance:
Expand Down Expand Up @@ -508,7 +551,7 @@ Looks up jobs by their queue type. When looking up jobs of type `waiting`, `acti

Note that large values of the attributes of `page` may cause excess load on the Redis server.

#### Queue#process([concurrency], handler(job, done))
#### Queue#process([concurrency], handler(job, done), [limiterQuery(job, done)])

Begins processing jobs with the provided handler function.

Expand All @@ -527,6 +570,14 @@ The handler function should either:

_N.B. If the handler returns a `Promise`, calls to the `done` callback will be ignored._

The limiter query function acts exactly the same way as the handler function above, but it expects the promise or callback to return an object with the following properties:

- `ready`: boolean, `true` if the job is ready to run
- `delayUntil`: (optional) Date or integer, timestamp that the job will be rescheduled for. If 0 or not specified the job will be placed at the end of the waiting queue.
- `data`: (optional) any, replaces `job.data` if present

If the promise rejects or the callback returns an error, the job will be immediately failed without any retry logic.

#### Queue#checkStalledJobs([interval], [cb])

Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.
Expand Down
28 changes: 24 additions & 4 deletions index.d.ts
Expand Up @@ -29,10 +29,24 @@ declare class BeeQueue {
getJobs(type: string, page: BeeQueue.Page, cb: (jobs: BeeQueue.Job[]) => void): void;
getJobs(type: string, page: BeeQueue.Page): Promise<BeeQueue.Job[]>;

process<T>(handler: (job: BeeQueue.Job) => Promise<T>): void;
process<T>(concurrency: number, handler: (job: BeeQueue.Job) => Promise<T>): void;
process<T>(handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void): void;
process<T>(concurrency: number, handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void): void;
process<T>(
handler: (job: BeeQueue.Job) => Promise<T>,
limiterQuery?: (job: BeeQueue.Job) => Promise<BeeQueue.LimiterQueryResult>
): void;
process<T>(
concurrency: number,
handler: (job: BeeQueue.Job) => Promise<T>,
limiterQuery?: (job: BeeQueue.Job) => Promise<BeeQueue.LimiterQueryResult>
): void;
process<T>(
handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>) => void,
limiterQuery?: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<BeeQueue.LimiterQueryResult>) => void
): void;
process<T>(
concurrency: number,
handler: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<T>,
limiterQuery?: (job: BeeQueue.Job, done: BeeQueue.DoneCallback<BeeQueue.LimiterQueryResult>) => void) => void
): void;

checkStalledJobs(interval?: number): Promise<number>;
checkStalledJobs(interval: number, cb: (err: Error, numStalled: number) => void): void
Expand Down Expand Up @@ -110,6 +124,12 @@ declare namespace BeeQueue {
newestJob?: string;
}

interface LimiterQueryResult {
ready: boolean;
delayUntil?: Date | number;
data?: any;
}

type DoneCallback<T> = (error: Error | null, result?: T) => void;
}

Expand Down
160 changes: 134 additions & 26 deletions lib/queue.js
Expand Up @@ -494,6 +494,77 @@ class Queue extends Emitter {
return jobPromise;
}

_checkLimiter(job, limiterQuery) {
// Returns a promise that resolves to true if the job is ready to run
// otherwise false
return limiterQuery(job)
.then(result => {
if (result.ready) {
if (result.data) {
// Update the job if data has changed
job.data = result.data;
this.jobs.set(job.id, job);
const promise = helpers.deferred();
this.client.multi()
.hset(this.toKey('jobs'), job.id, job.toData())
.exec(promise.defer());
return promise.then(() => true);
}
return true;
} else {
// The job is not ready to run
// remove it from the active jobs list
const multi = this.client.multi();
this._deactivateJob(job, multi);
// If the retry is delayed, add it to the delayed queue
if (result.delayUntil) {
job.delayUntil(result.delayUntil);
}
if (job.options.delay) {
this._delayJob(job, multi, job.options.delay);
} else {
// Otherwise move it to the end of the waiting queue
multi.lpush(this.toKey('waiting'), job.id);
}
// If the job data has been modified, save it
if (result.data) {
job.data = result.data;
multi.hset(this.toKey('jobs'), job.id, job.toData());
}
this.jobs.set(job.id, job);
const promise = helpers.deferred();
multi.exec(promise.defer());
return promise.then(() => {
this.emit('limited', job, job.options.delay || 0);
return false;
});
}
}, (err) => {
// If the limiterQuery function rejects,
// fail the job immediately with no retry
const errInfo = err.stack || err.message || err;
job.options.stacktraces.unshift(errInfo);
const multi = this.client.multi();
this._deactivateJob(job, multi);
this._removeFailedJob(job, multi);
/* istanbul ignore else */
if (this.settings.sendEvents) {
const jobEvent = {
id: job.id,
event: job.status,
data: errInfo
};
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
}
const promise = helpers.deferred();
multi.exec(promise.defer());
return promise.then(() => {
this.emit('error', err);
return false;
});
});
}

_preventStall(jobId) {
const promise = helpers.deferred(), cb = promise.defer();
this.client.srem(this.toKey('stalling'), jobId, cb);
Expand All @@ -508,9 +579,8 @@ class Queue extends Emitter {
throw new Error(`unable to update the status of ${status} job ${job.id}`);
}

const multi = this.client.multi()
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
const multi = this.client.multi();
this._deactivateJob(job, multi);

const jobEvent = {
id: job.id,
Expand All @@ -530,13 +600,7 @@ class Queue extends Emitter {
: null;
const delay = strategy ? strategy(job) : -1;
if (delay < 0) {
job.status = 'failed';
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
this._removeFailedJob(job, multi);
} else {
job.options.retries -= 1;
job.status = 'retrying';
Expand All @@ -546,8 +610,7 @@ class Queue extends Emitter {
multi.lpush(this.toKey('waiting'), job.id);
} else {
const time = Date.now() + delay;
multi.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
this._delayJob(job, multi, time);
}
}
} else {
Expand All @@ -572,7 +635,7 @@ class Queue extends Emitter {
return promise.then(() => [status, result]);
}

process(concurrency, handler) {
process(concurrency, handler, limiterQuery) {
if (!this.settings.isWorker) {
throw new Error('Cannot call Queue#process on a non-worker');
}
Expand All @@ -586,6 +649,7 @@ class Queue extends Emitter {
}

if (typeof concurrency === 'function') {
limiterQuery = handler;
handler = concurrency;
concurrency = defaults['#process'].concurrency;
}
Expand All @@ -598,6 +662,10 @@ class Queue extends Emitter {
this.queued = 1;
this.concurrency = concurrency;

if (limiterQuery) {
this.limiterQuery = helpers.wrapAsync(limiterQuery, catchExceptions);
}

const jobTick = () => {
if (this.paused) {
this.queued -= 1;
Expand All @@ -607,6 +675,21 @@ class Queue extends Emitter {
// invariant: in this code path, this.running < this.concurrency, always
// after spoolup, this.running + this.queued === this.concurrency
this._getNextJob().then((job) => {
const execJob = () => {
return this._runJob(job).catch((err) => {
this.emit('error', err);
}).then((results) => {
this.running -= 1;
this.queued += 1;
setImmediate(jobTick);
/* istanbul ignore else */
if (results) {
const status = results[0], result = results[1];
this.emit(status, job, result);
}
});
};

// We're shutting down.
if (this.paused) {
// This job will get picked up later as a stalled job if we happen to
Expand All @@ -633,20 +716,23 @@ class Queue extends Emitter {
return;
}

return this._runJob(job).catch((err) => {
this.emit('error', err);
}).then((results) => {
this.running -= 1;
this.queued += 1;

setImmediate(jobTick);
// If a limiterQuery function is provided, check the limiter before
// running the job
if (this.limiterQuery) {
return this._checkLimiter(job, this.limiterQuery)
.then(ready => {
if (ready) {
return execJob();
} else {
this.running -= 1;
this.queued += 1;
setImmediate(jobTick);
}
});
} else {
return execJob();
}

/* istanbul ignore else */
if (results) {
const status = results[0], result = results[1];
this.emit(status, job, result);
}
});
}, (err) => {
setImmediate(jobTick);
throw err;
Expand All @@ -659,6 +745,28 @@ class Queue extends Emitter {
return this;
}

_deactivateJob(job, multi) {
multi
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
}

_removeFailedJob(job, multi) {
job.status = 'failed';
if (this.settings.removeOnFailure) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.hset(this.toKey('jobs'), job.id, job.toData());
multi.sadd(this.toKey('failed'), job.id);
}
}

_delayJob(job, multi, time) {
multi
.zadd(this.toKey('delayed'), time, job.id)
.publish(this.toKey('earlierDelayed'), time);
}

_doStalledJobCheck() {
return this._evalScript('checkStalledJobs', 4, this.toKey('stallBlock'),
this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
Expand Down