Skip to content

Commit

Permalink
Switch to ioredis
Browse files Browse the repository at this point in the history
  • Loading branch information
dschmidt committed Aug 14, 2023
1 parent 3f6606e commit 892a117
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 118 deletions.
2 changes: 1 addition & 1 deletion packages/@uppy/companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"got": "11",
"grant": "5.4.21",
"helmet": "^4.6.0",
"ioredis": "5.3.2",
"ipaddr.js": "^2.0.1",
"jsonwebtoken": "8.5.1",
"lodash": "^4.17.21",
Expand All @@ -62,7 +63,6 @@
"ms": "2.1.3",
"node-schedule": "2.1.0",
"prom-client": "14.0.1",
"redis": "4.2.0",
"semver": "7.5.3",
"serialize-error": "^2.1.0",
"serialize-javascript": "^6.0.0",
Expand Down
22 changes: 15 additions & 7 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ const logger = require('../logger')
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate()
publisher.on('error', err => logger.error('publisher redis error', err))
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err))
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

Expand Down Expand Up @@ -55,12 +55,17 @@ module.exports = (redisClient, redisPubSubScope) => {
handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
subscriber.off('message', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
}

if (_once) removeListener(eventName, handler)
let args
try {
Expand All @@ -78,7 +83,10 @@ module.exports = (redisClient, redisPubSubScope) => {
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
runWhenConnected(() => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

/**
Expand Down Expand Up @@ -124,7 +132,7 @@ module.exports = (redisClient, redisPubSubScope) => {

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

Expand Down
36 changes: 13 additions & 23 deletions packages/@uppy/companion/src/server/redis.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const redis = require('redis')
const Redis = require('ioredis').default

const logger = require('./logger')

Expand All @@ -8,36 +8,26 @@ let redisClient
* A Singleton module that provides a single redis client through out
* the lifetime of the server
*
* @param {{ redisUrl?: string, redisOptions?: Record<string, any> }} [companionOptions] options
* @param {string} [redisUrl] ioredis url
* @param {Record<string, any>} [redisOptions] ioredis client options
*/
function createClient (companionOptions) {
function createClient (redisUrl, redisOptions) {
if (!redisClient) {
const { redisUrl, redisOptions } = companionOptions
redisClient = redis.createClient({
...redisOptions,
...(redisUrl && { url: redisUrl }),
})

redisClient.on('error', err => logger.error('redis error', err))

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
if (redisUrl) {
redisClient = new Redis(redisUrl, redisOptions)
} else {
redisClient = new Redis(redisOptions)
}
redisClient.on('error', err => logger.error('redis error', err.toString()))
}

return redisClient
}

module.exports.client = (companionOptions) => {
if (!companionOptions?.redisUrl && !companionOptions?.redisOptions) {
module.exports.client = ({ redisUrl, redisOptions } = { redisUrl: undefined, redisOptions: undefined }) => {
if (!redisUrl && !redisOptions) {
return redisClient
}

return createClient(companionOptions)
return createClient(redisUrl, redisOptions)
}
4 changes: 2 additions & 2 deletions packages/@uppy/companion/src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ const getConfigFromEnv = () => {
periodicPingCount: process.env.COMPANION_PERIODIC_PING_COUNT
? parseInt(process.env.COMPANION_PERIODIC_PING_COUNT, 10) : undefined,
filePath: process.env.COMPANION_DATADIR,
redisUrl: process.env.COMPANION_REDIS_URL,
redisPubSubScope: process.env.COMPANION_REDIS_PUBSUB_SCOPE,
// redisOptions refers to https://www.npmjs.com/package/redis#options-object-properties
redisUrl: process.env.COMPANION_REDIS_URL,
// redisOptions refers to https://redis.github.io/ioredis/index.html#RedisOptions
redisOptions: (() => {
try {
if (!process.env.COMPANION_REDIS_OPTIONS) {
Expand Down

0 comments on commit 892a117

Please sign in to comment.