Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(brokers): make option props more correct #10242

Merged
merged 3 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/brokers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pnpm add @discordjs/brokers
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new PubSubRedisBroker({ redisClient: new Redis() });
const broker = new PubSubRedisBroker(new Redis());

await broker.publish('test', 'Hello World!');
await broker.destroy();
Expand All @@ -49,7 +49,7 @@ await broker.destroy();
import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new PubSubRedisBroker({ redisClient: new Redis() });
const broker = new PubSubRedisBroker(new Redis());
broker.on('test', ({ data, ack }) => {
console.log(data);
void ack();
Expand All @@ -65,7 +65,7 @@ await broker.subscribe('subscribers', ['test']);
import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new RPCRedisBroker({ redisClient: new Redis() });
const broker = new RPCRedisBroker(new Redis());

console.log(await broker.call('testcall', 'Hello World!'));
await broker.destroy();
Expand All @@ -74,7 +74,7 @@ await broker.destroy();
import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis';

const broker = new RPCRedisBroker({ redisClient: new Redis() });
const broker = new RPCRedisBroker(new Redis());
broker.on('testcall', ({ data, ack, reply }) => {
console.log('responder', data);
void ack();
Expand Down
2 changes: 1 addition & 1 deletion packages/brokers/__tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const mockRedisClient = {
test('pubsub with custom encoding', async () => {
const encode = vi.fn((data) => data);

const broker = new PubSubRedisBroker({ redisClient: mockRedisClient, encode });
const broker = new PubSubRedisBroker(mockRedisClient, { encode });
await broker.publish('test', 'test');
expect(encode).toHaveBeenCalledWith('test');
});
18 changes: 0 additions & 18 deletions packages/brokers/src/brokers/Broker.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
import { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { encode, decode } from '@msgpack/msgpack';
import type { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';

/**
* Base options for a broker implementation
*/
export interface BaseBrokerOptions {
/**
* How long to block for messages when polling
*/
blockTimeout?: number;
/**
* Function to use for decoding messages
*/
Expand All @@ -21,25 +16,12 @@ export interface BaseBrokerOptions {
*/
// eslint-disable-next-line @typescript-eslint/method-signature-style
encode?: (data: unknown) => Buffer;
/**
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* Unique consumer name.
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
name?: string;
}

/**
* Default broker options
*/
export const DefaultBrokerOptions = {
name: randomBytes(20).toString('hex'),
maxChunk: 10,
blockTimeout: 5_000,
encode: (data): Buffer => {
const encoded = encode(data);
return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength);
Expand Down
42 changes: 33 additions & 9 deletions packages/brokers/src/brokers/redis/BaseRedis.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { readFileSync } from 'node:fs';
import { resolve } from 'node:path';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
Expand All @@ -19,11 +20,31 @@ declare module 'ioredis' {
*/
export interface RedisBrokerOptions extends BaseBrokerOptions {
/**
* The Redis client to use
* How long to block for messages when polling
*/
redisClient: Redis;
blockTimeout?: number;
/**
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* Unique consumer name.
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
name?: string;
}

/**
* Default broker options for redis
*/
export const DefaultRedisBrokerOptions = {
...DefaultBrokerOptions,
name: randomBytes(20).toString('hex'),
maxChunk: 10,
blockTimeout: 5_000,
} as const satisfies Required<RedisBrokerOptions>;

/**
* Helper class with shared Redis logic
*/
Expand Down Expand Up @@ -56,14 +77,17 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
*/
protected listening = false;

public constructor(options: RedisBrokerOptions) {
public constructor(
protected readonly redisClient: Redis,
options: RedisBrokerOptions,
) {
super();
this.options = { ...DefaultBrokerOptions, ...options };
options.redisClient.defineCommand('xcleangroup', {
this.options = { ...DefaultRedisBrokerOptions, ...options };
redisClient.defineCommand('xcleangroup', {
numberOfKeys: 1,
lua: readFileSync(resolve(__dirname, '..', 'scripts', 'xcleangroup.lua'), 'utf8'),
});
this.streamReadClient = options.redisClient.duplicate();
this.streamReadClient = redisClient.duplicate();
}

/**
Expand All @@ -75,7 +99,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
events.map(async (event) => {
this.subscribedEvents.add(event as string);
try {
return await this.options.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
return await this.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
} catch (error) {
if (!(error instanceof ReplyError)) {
throw error;
Expand All @@ -97,7 +121,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
commands[idx + 1] = ['xcleangroup', event as string, group];
}

await this.options.redisClient.pipeline(commands).exec();
await this.redisClient.pipeline(commands).exec();

for (const event of events) {
this.subscribedEvents.delete(event as string);
Expand Down Expand Up @@ -162,7 +186,7 @@ export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
*/
public async destroy() {
this.streamReadClient.disconnect();
this.options.redisClient.disconnect();
this.redisClient.disconnect();
}

/**
Expand Down
13 changes: 4 additions & 9 deletions packages/brokers/src/brokers/redis/PubSubRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { BaseRedisBroker } from './BaseRedis.js';
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
* const broker = new PubSubRedisBroker(new Redis());
*
* await broker.publish('test', 'Hello World!');
* await broker.destroy();
Expand All @@ -20,7 +20,7 @@ import { BaseRedisBroker } from './BaseRedis.js';
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
* const broker = new PubSubRedisBroker(new Redis());
* broker.on('test', ({ data, ack }) => {
* console.log(data);
* void ack();
Expand All @@ -37,19 +37,14 @@ export class PubSubRedisBroker<TEvents extends Record<string, any>>
* {@inheritDoc IPubSubBroker.publish}
*/
public async publish<Event extends keyof TEvents>(event: Event, data: TEvents[Event]): Promise<void> {
await this.options.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
this.options.encode(data),
);
await this.redisClient.xadd(event as string, '*', BaseRedisBroker.STREAM_DATA_KEY, this.options.encode(data));
}

protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
const payload: { ack(): Promise<void>; data: unknown } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
await this.redisClient.xack(event, group, id);
},
};

Expand Down
22 changes: 11 additions & 11 deletions packages/brokers/src/brokers/redis/RPCRedis.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { Buffer } from 'node:buffer';
import { clearTimeout, setTimeout } from 'node:timers';
import type Redis from 'ioredis/built/Redis.js';
import type { IRPCBroker } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
import type { RedisBrokerOptions } from './BaseRedis.js';
import { BaseRedisBroker } from './BaseRedis.js';
import { BaseRedisBroker, DefaultRedisBrokerOptions } from './BaseRedis.js';

interface InternalPromise {
reject(error: any): void;
Expand All @@ -22,9 +22,9 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
* Default values used for the {@link RPCRedisBrokerOptions}
*/
export const DefaultRPCRedisBrokerOptions = {
...DefaultBrokerOptions,
...DefaultRedisBrokerOptions,
timeout: 5_000,
} as const satisfies Required<Omit<RPCRedisBrokerOptions, 'redisClient'>>;
} as const satisfies Required<RPCRedisBrokerOptions>;

/**
* RPC broker powered by Redis
Expand All @@ -35,7 +35,7 @@ export const DefaultRPCRedisBrokerOptions = {
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* const broker = new RPCRedisBroker(new Redis());
*
* console.log(await broker.call('testcall', 'Hello World!'));
* await broker.destroy();
Expand All @@ -44,7 +44,7 @@ export const DefaultRPCRedisBrokerOptions = {
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* const broker = new RPCRedisBroker(new Redis());
* broker.on('testcall', ({ data, ack, reply }) => {
* console.log('responder', data);
* void ack();
Expand All @@ -65,8 +65,8 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte

protected readonly promises = new Map<string, InternalPromise>();

public constructor(options: RPCRedisBrokerOptions) {
super(options);
public constructor(redisClient: Redis, options: RPCRedisBrokerOptions) {
super(redisClient, options);
this.options = { ...DefaultRPCRedisBrokerOptions, ...options };

this.streamReadClient.on('messageBuffer', (channel: Buffer, message: Buffer) => {
Expand All @@ -88,7 +88,7 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte
data: TEvents[Event],
timeoutDuration: number = this.options.timeout,
): Promise<TResponses[Event]> {
const id = await this.options.redisClient.xadd(
const id = await this.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
Expand Down Expand Up @@ -118,10 +118,10 @@ export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses exte
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
await this.redisClient.xack(event, group, id);
},
reply: async (data) => {
await this.options.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
},
};

Expand Down