Skip to content

Commit

Permalink
feat(queue): allow manual connection (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
billyen2012 committed Nov 2, 2023
1 parent ae047a1 commit 425fb89
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 50 deletions.
17 changes: 17 additions & 0 deletions README.md
Expand Up @@ -322,6 +322,7 @@ const queue = new Queue('test', {
removeOnSuccess: false,
removeOnFailure: false,
redisScanCount: 100,
autoConnect: true,
});
```

Expand Down Expand Up @@ -349,6 +350,7 @@ The `settings` fields are:
- `removeOnFailure`: boolean. Enable to have this worker automatically remove its failed jobs from Redis, so as to keep memory usage down. This will not remove jobs that are set to retry unless they fail all their retries.
- `quitCommandClient`: boolean. Whether to `QUIT` the redis command client (the client it sends normal operations over) when `Queue#close` is called. This defaults to `true` for normal usage, and `false` if an existing `RedisClient` object was provided to the `redis` option.
- `redisScanCount`: number. For setting the value of the `SSCAN` Redis command used in `Queue#getJobs` for succeeded and failed job types.
- `autoConnect`: if set to `false`, then `queue.connect()` must be called to connect to the redis host. This is useful when the timing of connection to the redis need to be strictly controlled.

### Properties

Expand Down Expand Up @@ -604,6 +606,21 @@ process.on('uncaughtException', async () => {
});
```

#### Queue#connect()

Establish the queue's connections to Redis. Will only works if `settings.autoConnect` is set to `false`

```js
const Queue = require('bee-queue');
const queue = new Queue('example', {
redis: redis: redis.createClient(process.env.REDIS_URL),
autoConnect: false;
});
await queue.connect();
queue.createJob({...})
//....
```

#### Queue#isRunning()

Returns `true` unless the Queue is shutting down due to a call to `Queue#close()`.
Expand Down
2 changes: 2 additions & 0 deletions index.d.ts
Expand Up @@ -31,6 +31,7 @@ declare class BeeQueue<T = any> extends EventEmitter {

isRunning(): boolean;

connect(): Promise<boolean>;
createJob<U extends T>(data: U): BeeQueue.Job<U>;

getJob(jobId: string, cb: (job: BeeQueue.Job<T>) => void): void;
Expand Down Expand Up @@ -101,6 +102,7 @@ declare namespace BeeQueue {
removeOnFailure?: boolean;
quitCommandClient?: boolean;
redisScanCount?: number;
autoConnect?: boolean;
}

interface Job<T> extends EventEmitter {
Expand Down
8 changes: 8 additions & 0 deletions lib/helpers.js
Expand Up @@ -6,6 +6,13 @@ function has(object, name) {
return hasOwn.call(object, name);
}

function bool(input, defaultValue) {
if (typeof input === 'boolean') {
return input;
}
return defaultValue;
}

/**
* A variant of the Promise#finally implementation. Instead of rejecting with
* the error that occurs in the finally clause, it rejects with the error from
Expand All @@ -26,6 +33,7 @@ const promiseUtils = require('promise-callbacks');

module.exports = {
asCallback: promiseUtils.asCallback,
bool,
callAsync: promiseUtils.callAsync,
deferred: promiseUtils.deferred,
delay: promiseUtils.delay,
Expand Down
119 changes: 69 additions & 50 deletions lib/queue.js
Expand Up @@ -12,7 +12,7 @@ const EagerTimer = require('./eager-timer');
const finally_ = require('p-finally');

class Queue extends Emitter {
constructor(name, settings) {
constructor(name, settings = {}) {
super();

this.name = name;
Expand All @@ -33,13 +33,16 @@ class Queue extends Emitter {
this.bclient = null;
this.eclient = null;

settings = settings || {};
this.settings = {
redis: settings.redis || {},
quitCommandClient: settings.quitCommandClient,
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':',
autoConnect: helpers.bool(settings.autoConnect, true),
};

this._isReady = false;
this._ready = false;

for (const prop in defaults) {
const def = defaults[prop],
setting = settings[prop],
Expand Down Expand Up @@ -72,57 +75,73 @@ class Queue extends Emitter {
this._delayedTimer.on('trigger', this._activateDelayed.bind(this));
}

const makeClient = (clientName, createNew) => {
return redis
.createClient(this.settings.redis, createNew)
.then((client) => {
// This event gets cleaned up and removed in Queue#close for the
// primary client if quitCommandClient is disabled.
client.on('error', this._emitError);
return (this[clientName] = client);
});
};

let eventsPromise = null;

if (this.settings.getEvents || this.settings.activateDelayedJobs) {
eventsPromise = makeClient('eclient', true).then(() => {
this.eclient.on('message', this._onMessage.bind(this));
const channels = [];
if (this.settings.getEvents) {
channels.push(this.toKey('events'));
}
if (this.settings.activateDelayedJobs) {
channels.push(this.toKey('earlierDelayed'));
}
return Promise.all(
channels.map((channel) =>
helpers.callAsync((done) => this.eclient.subscribe(channel, done))
)
);
});
if (this.settings.autoConnect) {
this.connect();
}
}

this._isReady = false;
makeClient(clientName, createNew) {
return redis.createClient(this.settings.redis, createNew).then((client) => {
// This event gets cleaned up and removed in Queue#close for the
// primary client if quitCommandClient is disabled.
client.on('error', this._emitError);
return (this[clientName] = client);
});
}

// Wait for Lua scripts and client connections to load. Also wait for
// bclient and eclient/subscribe if they're needed.
this._ready = Promise.all([
// Make the clients
makeClient('client', false),
this.settings.isWorker ? makeClient('bclient', true) : null,
eventsPromise,
])
.then(() => {
if (this.settings.ensureScripts) {
return lua.buildCache(this.client);
}
})
.then(() => {
this._isReady = true;
setImmediate(() => this.emit('ready'));
return this;
});
connect() {
return new Promise((resolve, reject) => {
try {
if (this._isReady) return resolve(this._isReady);

const getEventPromise = () => {
if (this.settings.getEvents || this.settings.activateDelayedJobs) {
return this.makeClient('eclient', true).then(() => {
this.eclient.on('message', this._onMessage.bind(this));
const channels = [];
if (this.settings.getEvents) {
channels.push(this.toKey('events'));
}
if (this.settings.activateDelayedJobs) {
channels.push(this.toKey('earlierDelayed'));
}
return Promise.all(
channels.map((channel) =>
helpers.callAsync((done) =>
this.eclient.subscribe(channel, done)
)
)
);
});
}

return null;
};

const eventsPromise = getEventPromise();
// Wait for Lua scripts and client connections to load. Also wait for
// bclient and eclient/subscribe if they're needed.
this._ready = Promise.all([
// Make the clients
this.makeClient('client', false),
this.settings.isWorker ? this.makeClient('bclient', true) : null,
eventsPromise,
])
.then(() => {
if (this.settings.ensureScripts) {
return lua.buildCache(this.client);
}
})
.then(() => {
this._isReady = true;
setImmediate(() => this.emit('ready'));
resolve(this._isReady);
return this;
});
} catch (err) {
reject(err);
}
});
}

_onMessage(channel, message) {
Expand Down
28 changes: 28 additions & 0 deletions test/queue-test.js
Expand Up @@ -726,6 +726,34 @@ describe('Queue', (it) => {

await t.notThrows(() => queue.createJob().save());
});

it('should connect to redis automatically if autoConnect=true', async (t) => {
const client = actualRedis.createClient(redisUrl);

const queue = t.context.makeQueue({
redis: client,
autoConnect: true,
});

await queue.ready();
t.is(queue._isReady, true);
});

it('should connect to redis only if connect() is called while setting autoConnect=false', async (t) => {
const client = actualRedis.createClient(redisUrl);

const queue = t.context.makeQueue({
redis: client,
autoConnect: false,
});

await queue.ready();
t.is(queue._isReady, false);

await queue.connect();

t.is(queue._isReady, true);
});
});

it('adds a job with correct prefix', async (t) => {
Expand Down

0 comments on commit 425fb89

Please sign in to comment.