Skip to content

Commit

Permalink
refactor(brokers): make option props more correct (#10242)
Browse files Browse the repository at this point in the history
* refactor(brokers): make option props more correct

BREAKING CHANGE: Classes now take redis client as standalone parameter, various props from the base option interface moved to redis options

* chore: update comment

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
didinele and kodiakhq[bot] committed May 11, 2024
1 parent 20258f9 commit 393ded4
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 52 deletions.
8 changes: 4 additions & 4 deletions packages/brokers/README.md
Expand Up @@ -40,7 +40,7 @@ pnpm add @discordjs/brokers
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new PubSubRedisBroker({ redisClient: new Redis() });
const broker = new PubSubRedisBroker(new Redis());

await broker.publish('test', 'Hello World!');
await broker.destroy();
Expand All @@ -49,7 +49,7 @@ await broker.destroy();
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new PubSubRedisBroker({ redisClient: new Redis() });
const broker = new PubSubRedisBroker(new Redis());
broker.on('test', ({ data, ack }) => {
console.log(data);
void ack();
Expand All @@ -65,7 +65,7 @@ await broker.subscribe('subscribers', ['test']);
import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new RPCRedisBroker({ redisClient: new Redis() });
const broker = new RPCRedisBroker(new Redis());

console.log(await broker.call('testcall', 'Hello World!'));
await broker.destroy();
Expand All @@ -74,7 +74,7 @@ await broker.destroy();
import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new RPCRedisBroker({ redisClient: new Redis() });
const broker = new RPCRedisBroker(new Redis());
broker.on('testcall', ({ data, ack, reply }) => {
console.log('responder', data);
void ack();
Expand Down
2 changes: 1 addition & 1 deletion packages/brokers/__tests__/index.test.ts
Expand Up @@ -17,7 +17,7 @@ const mockRedisClient = {
test('pubsub with custom encoding', async () => {
const encode = vi.fn((data) => data);

const broker = new PubSubRedisBroker({ redisClient: mockRedisClient, encode });
const broker = new PubSubRedisBroker(mockRedisClient, { encode });
await broker.publish('test', 'test');
expect(encode).toHaveBeenCalledWith('test');
});
18 changes: 0 additions & 18 deletions packages/brokers/src/brokers/Broker.ts
@@ -1,16 +1,11 @@
import { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { encode, decode } from '@msgpack/msgpack';
import type { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';

/**
* Base options for a broker implementation
*/
export interface BaseBrokerOptions {
/**
* How long to block for messages when polling
*/
blockTimeout?: number;
/**
* Function to use for decoding messages
*/
Expand All @@ -21,25 +16,12 @@ export interface BaseBrokerOptions {
*/
// eslint-disable-next-line @typescript-eslint/method-signature-style
encode?: (data: unknown) => Buffer;
/**
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* Unique consumer name.
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
name?: string;
}

/**
* Default broker options
*/
export const DefaultBrokerOptions = {
name: randomBytes(20).toString('hex'),
maxChunk: 10,
blockTimeout: 5_000,
encode: (data): Buffer => {
const encoded = encode(data);
return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength);
Expand Down
42 changes: 33 additions & 9 deletions packages/brokers/src/brokers/redis/BaseRedis.ts
@@ -1,4 +1,5 @@
import type { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { readFileSync } from 'node:fs';
import { resolve } from 'node:path';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
Expand All @@ -19,11 +20,31 @@ declare module 'ioredis' {
*/
export interface RedisBrokerOptions extends BaseBrokerOptions {
/**
* The Redis client to use
* How long to block for messages when polling
*/
redisClient: Redis;
blockTimeout?: number;
/**
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* Unique consumer name.
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
name?: string;
}

/**
* Default broker options for redis
*/
export const DefaultRedisBrokerOptions = {
...DefaultBrokerOptions,
name: randomBytes(20).toString('hex'),
maxChunk: 10,
blockTimeout: 5_000,
} as const satisfies Required<RedisBrokerOptions>;

/**
* Helper class with shared Redis logic
*/
Expand Down Expand Up @@ -56,14 +77,17 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
*/
protected listening = false;

public constructor(options: RedisBrokerOptions) {
public constructor(
protected readonly redisClient: Redis,
options: RedisBrokerOptions,
) {
super();
this.options = { ...DefaultBrokerOptions, ...options };
options.redisClient.defineCommand('xcleangroup', {
this.options = { ...DefaultRedisBrokerOptions, ...options };
redisClient.defineCommand('xcleangroup', {
numberOfKeys: 1,
lua: readFileSync(resolve(__dirname, '..', 'scripts', 'xcleangroup.lua'), 'utf8'),
});
this.streamReadClient = options.redisClient.duplicate();
this.streamReadClient = redisClient.duplicate();
}

/**
Expand All @@ -75,7 +99,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
events.map(async (event) => {
this.subscribedEvents.add(event as string);
try {
return await this.options.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
} catch (error) {
if (!(error instanceof ReplyError)) {
throw error;
Expand All @@ -97,7 +121,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
commands[idx + 1] = ['xcleangroup', event as string, group];
}

await this.options.redisClient.pipeline(commands).exec();
await this.redisClient.pipeline(commands).exec();

for (const event of events) {
this.subscribedEvents.delete(event as string);
Expand Down Expand Up @@ -162,7 +186,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
*/
public async destroy() {
this.streamReadClient.disconnect();
this.options.redisClient.disconnect();
this.redisClient.disconnect();
}

/**
Expand Down
13 changes: 4 additions & 9 deletions packages/brokers/src/brokers/redis/PubSubRedis.ts
Expand Up @@ -11,7 +11,7 @@ import { BaseRedisBroker } from './BaseRedis.js';
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
* const broker = new PubSubRedisBroker(new Redis());
*
* await broker.publish('test', 'Hello World!');
* await broker.destroy();
Expand All @@ -20,7 +20,7 @@ import { BaseRedisBroker } from './BaseRedis.js';
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
* const broker = new PubSubRedisBroker(new Redis());
* broker.on('test', ({ data, ack }) => {
* console.log(data);
* void ack();
Expand All @@ -37,19 +37,14 @@ export class PubSubRedisBroker<TEvents extends Record<string, any>>
* {@inheritDoc IPubSubBroker.publish}
*/
public async publish<Event extends keyof TEvents>(event: Event, data: TEvents[Event]): Promise<void> {
await this.options.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
this.options.encode(data),
);
await this.redisClient.xadd(event as string, '*', BaseRedisBroker.STREAM_DATA_KEY, this.options.encode(data));
}

protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
const payload: { ack(): Promise<void>; data: unknown } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
await this.redisClient.xack(event, group, id);
},
};

Expand Down
22 changes: 11 additions & 11 deletions packages/brokers/src/brokers/redis/RPCRedis.ts
@@ -1,9 +1,9 @@
import type { Buffer } from 'node:buffer';
import { clearTimeout, setTimeout } from 'node:timers';
import type Redis from 'ioredis/built/Redis.js';
import type { IRPCBroker } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
import type { RedisBrokerOptions } from './BaseRedis.js';
import { BaseRedisBroker } from './BaseRedis.js';
import { BaseRedisBroker, DefaultRedisBrokerOptions } from './BaseRedis.js';

interface InternalPromise {
reject(error: any): void;
Expand All @@ -22,9 +22,9 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
* Default values used for the {@link RPCRedisBrokerOptions}
*/
export const DefaultRPCRedisBrokerOptions = {
...DefaultBrokerOptions,
...DefaultRedisBrokerOptions,
timeout: 5_000,
} as const satisfies Required<Omit<RPCRedisBrokerOptions, 'redisClient'>>;
} as const satisfies Required<RPCRedisBrokerOptions>;

/**
* RPC broker powered by Redis
Expand All @@ -35,7 +35,7 @@ export const DefaultRPCRedisBrokerOptions = {
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* const broker = new RPCRedisBroker(new Redis());
*
* console.log(await broker.call('testcall', 'Hello World!'));
* await broker.destroy();
Expand All @@ -44,7 +44,7 @@ export const DefaultRPCRedisBrokerOptions = {
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* const broker = new RPCRedisBroker(new Redis());
* broker.on('testcall', ({ data, ack, reply }) => {
* console.log('responder', data);
* void ack();
Expand All @@ -65,8 +65,8 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte

protected readonly promises = new Map<string, InternalPromise>();

public constructor(options: RPCRedisBrokerOptions) {
super(options);
public constructor(redisClient: Redis, options: RPCRedisBrokerOptions) {
super(redisClient, options);
this.options = { ...DefaultRPCRedisBrokerOptions, ...options };

this.streamReadClient.on('messageBuffer', (channel: Buffer, message: Buffer) => {
Expand All @@ -88,7 +88,7 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte
data: TEvents[Event],
timeoutDuration: number = this.options.timeout,
): Promise<TResponses[Event]> {
const id = await this.options.redisClient.xadd(
const id = await this.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
Expand Down Expand Up @@ -118,10 +118,10 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
await this.redisClient.xack(event, group, id);
},
reply: async (data) => {
await this.options.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
},
};

Expand Down

0 comments on commit 393ded4

Please sign in to comment.