Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@uppy/companion: switch from node-redis to ioredis #4623

Merged
merged 12 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/mock-server.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const requestListener = (req, res) => {
export default function startMockServer (host, port) {
const server = http.createServer(requestListener)
server.listen(port, host, () => {
console.log(`Server is running on http://${host}:${port}`)
console.log(`Mock server is running on http://${host}:${port}`)
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/@uppy/companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"got": "^13.0.0",
"grant": "5.4.22",
"helmet": "^4.6.0",
"ioredis": "^5.3.2",
"ipaddr.js": "^2.0.1",
"jsonwebtoken": "9.0.2",
"lodash": "^4.17.21",
Expand All @@ -60,7 +61,6 @@
"ms": "2.1.3",
"node-schedule": "2.1.1",
"prom-client": "14.0.1",
"redis": "4.6.13",
"serialize-error": "^2.1.0",
"serialize-javascript": "^6.0.0",
"tus-js-client": "^3.1.3",
Expand Down
7 changes: 4 additions & 3 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ class StreamableBlob {
}

class Uploader {
/** @type {import('ioredis').Redis} */
storage

/**
* Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
* For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
Expand Down Expand Up @@ -446,9 +449,7 @@ class Uploader {
// https://github.com/transloadit/uppy/issues/3748
const keyExpirySec = 60 * 60 * 24
const redisKey = `${Uploader.STORAGE_PREFIX}:${this.token}`
this.storage.set(redisKey, jsonStringify(state), {
EX: keyExpirySec,
})
this.storage.set(redisKey, jsonStringify(state), 'EX', keyExpirySec)
}

throttledEmitProgress = throttle((dataToEmit) => {
Expand Down
38 changes: 29 additions & 9 deletions packages/@uppy/companion/src/server/emitter/redis-emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ const logger = require('../logger')
* This module simulates the builtin events.EventEmitter but with the use of redis.
* This is useful for when companion is running on multiple instances and events need
* to be distributed across.
*
* @param {import('ioredis').Redis} redisClient
* @param {string} redisPubSubScope
* @returns
*/
module.exports = (redisClient, redisPubSubScope) => {
const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
const publisher = redisClient.duplicate()
publisher.on('error', err => logger.error('publisher redis error', err))
const publisher = redisClient.duplicate({ lazyConnect: true })
publisher.on('error', err => logger.error('publisher redis error', err.toString()))
/** @type {import('ioredis').Redis} */
let subscriber

const connectedPromise = publisher.connect().then(() => {
subscriber = publisher.duplicate()
subscriber.on('error', err => logger.error('subscriber redis error', err))
subscriber.on('error', err => logger.error('subscriber redis error', err.toString()))
return subscriber.connect()
})

Expand Down Expand Up @@ -55,20 +60,32 @@ module.exports = (redisClient, redisPubSubScope) => {
handlersByThisEventName.delete(handler)
if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)

return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
subscriber.off('pmessage', actualHandler)
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

/**
*
* @param {string} eventName
* @param {*} handler
* @param {*} _once
*/
function addListener (eventName, handler, _once = false) {
function actualHandler (message) {
function actualHandler (pattern, channel, message) {
if (pattern !== getPrefixedEventName(eventName)) {
return
}

if (_once) removeListener(eventName, handler)
let args
try {
args = JSON.parse(message)
} catch (ex) {
return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
return
}
return handler(...args)
handler(...args)
}

let handlersByThisEventName = handlersByEvent.get(eventName)
Expand All @@ -78,7 +95,10 @@ module.exports = (redisClient, redisPubSubScope) => {
}
handlersByThisEventName.set(handler, actualHandler)

runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
runWhenConnected(() => {
subscriber.on('pmessage', actualHandler)
return subscriber.psubscribe(getPrefixedEventName(eventName))
})
}

/**
Expand Down Expand Up @@ -134,7 +154,7 @@ module.exports = (redisClient, redisPubSubScope) => {

return runWhenConnected(() => {
handlersByEvent.delete(eventName)
return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
return subscriber.punsubscribe(getPrefixedEventName(eventName))
})
}

Expand Down
37 changes: 14 additions & 23 deletions packages/@uppy/companion/src/server/redis.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,34 @@
const redis = require('redis')
const Redis = require('ioredis').default

const logger = require('./logger')

/** @type {import('ioredis').Redis} */
let redisClient

/**
* A Singleton module that provides a single redis client through out
* the lifetime of the server
*
* @param {{ redisUrl?: string, redisOptions?: Record<string, any> }} [companionOptions] options
* @param {string} [redisUrl] ioredis url
* @param {Record<string, any>} [redisOptions] ioredis client options
*/
function createClient (companionOptions) {
function createClient (redisUrl, redisOptions) {
if (!redisClient) {
const { redisUrl, redisOptions } = companionOptions
redisClient = redis.createClient({
...redisOptions,
...(redisUrl && { url: redisUrl }),
})

redisClient.on('error', err => logger.error('redis error', err))

;(async () => {
try {
// fire and forget.
// any requests made on the client before connection is established will be auto-queued by node-redis
await redisClient.connect()
} catch (err) {
logger.error(err.message, 'redis.error')
}
})()
if (redisUrl) {
redisClient = new Redis(redisUrl, redisOptions)
} else {
redisClient = new Redis(redisOptions)
}
redisClient.on('error', err => logger.error('redis error', err.toString()))
}

return redisClient
}

module.exports.client = (companionOptions) => {
if (!companionOptions?.redisUrl && !companionOptions?.redisOptions) {
module.exports.client = ({ redisUrl, redisOptions } = { redisUrl: undefined, redisOptions: undefined }) => {
if (!redisUrl && !redisOptions) {
return redisClient
}

return createClient(companionOptions)
return createClient(redisUrl, redisOptions)
}
4 changes: 2 additions & 2 deletions packages/@uppy/companion/src/standalone/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ const getConfigFromEnv = () => {
periodicPingCount: process.env.COMPANION_PERIODIC_PING_COUNT
? parseInt(process.env.COMPANION_PERIODIC_PING_COUNT, 10) : undefined,
filePath: process.env.COMPANION_DATADIR,
redisUrl: process.env.COMPANION_REDIS_URL,
redisPubSubScope: process.env.COMPANION_REDIS_PUBSUB_SCOPE,
// redisOptions refers to https://www.npmjs.com/package/redis#options-object-properties
redisUrl: process.env.COMPANION_REDIS_URL,
// redisOptions refers to https://redis.github.io/ioredis/index.html#RedisOptions
redisOptions: (() => {
try {
if (!process.env.COMPANION_REDIS_OPTIONS) {
Expand Down