Skip to content

Commit

Permalink
feat: support dynamic create mqtt consumer (#3699)
Browse files Browse the repository at this point in the history
  • Loading branch information
czy88840616 committed May 6, 2024
1 parent a44f795 commit 107539a
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 30 deletions.
11 changes: 8 additions & 3 deletions packages/mock/src/creator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,13 +646,18 @@ class BootstrapAppStarter implements IBootstrapAppStarter {

/**
* Create a real project but not ready or a virtual project
* @param baseDir
* @param baseDirOrOptions
* @param options
*/
export async function createLightApp(
baseDir = '',
baseDirOrOptions: string | MockAppConfigurationOptions,
options: MockAppConfigurationOptions = {}
): Promise<IMidwayApplication> {
if (baseDirOrOptions && typeof baseDirOrOptions === 'object') {
options = baseDirOrOptions;
baseDirOrOptions = options.baseDir || '';
}

Framework()(LightFramework);
options.globalConfig = Object.assign(
{
Expand All @@ -676,7 +681,7 @@ export async function createLightApp(
options.moduleLoadType = pkgJSON?.type === 'module' ? 'esm' : 'commonjs';
}

return createApp(baseDir, {
return createApp(baseDirOrOptions as string, {
...options,
imports: [
await transformFrameworkToConfiguration(
Expand Down
65 changes: 43 additions & 22 deletions packages/mqtt/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
IMidwayMQTTApplication,
IMidwayMQTTConfigurationOptions,
IMidwayMQTTContext,
IMqttSubscriber,
MqttSubscriberOptions,
} from './interface';
import { connect, IClientOptions, MqttClient } from 'mqtt';
import { MQTT_DECORATOR_KEY } from './decorator';
Expand Down Expand Up @@ -53,30 +55,12 @@ export class MidwayMQTTFramework extends BaseFramework<
}

for (const customKey in sub) {
const consumer = await this.createSubscriber(
await this.createSubscriber(
sub[customKey].connectOptions,
sub[customKey].subscribeOptions,
mqttSubscriberMap[customKey],
customKey
);

await consumer.subscribeAsync(
sub[customKey].subscribeOptions.topicObject,
sub[customKey].subscribeOptions.opts
);

consumer.on('message', async (topic, message, packet) => {
const ctx = this.app.createAnonymousContext();
ctx.topic = topic;
ctx.packet = packet;
ctx.message = message;
const fn = await this.applyMiddleware(async ctx => {
const instance = await ctx.requestContext.getAsync(
mqttSubscriberMap[customKey]
);
// eslint-disable-next-line prefer-spread
return await instance['subscribe'].call(instance, ctx);
});
return await fn(ctx);
});
}
}

Expand All @@ -87,11 +71,28 @@ export class MidwayMQTTFramework extends BaseFramework<
}
}

/**
* dynamic create subscriber
*/
public async createSubscriber(
/**
* mqtt connection options
*/
connectionOptions: IClientOptions,
/**
* mqtt subscribe options
*/
subscribeOptions: MqttSubscriberOptions,
/**
* midway mqtt subscriber class
*/
ClzProvider: new () => IMqttSubscriber,
/**
* midway mqtt component instance name, if not set, will be manager by your self
*/
clientName?: string
) {
return new Promise<MqttClient>(resolve => {
const consumer = await new Promise<MqttClient>(resolve => {
const client = connect(connectionOptions);
client.on('connect', () => {
if (clientName) {
Expand All @@ -107,6 +108,26 @@ export class MidwayMQTTFramework extends BaseFramework<
this.mqttLogger.error(err);
});
});

consumer.on('message', async (topic, message, packet) => {
const ctx = this.app.createAnonymousContext();
ctx.topic = topic;
ctx.packet = packet;
ctx.message = message;
const fn = await this.applyMiddleware(async ctx => {
const instance = await ctx.requestContext.getAsync(ClzProvider);
// eslint-disable-next-line prefer-spread
return await instance['subscribe'].call(instance, ctx);
});
return await fn(ctx);
});

await consumer.subscribeAsync(
subscribeOptions.topicObject,
subscribeOptions.opts
);

return consumer;
}

public getSubscriber(name: string) {
Expand Down
1 change: 1 addition & 0 deletions packages/mqtt/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { MidwayMQTTFramework as Framework } from './framework';
export { MQTTConfiguration as Configuration } from './configuration';
export * from './decorator';
export * from './service';
export * as Mqtt from 'mqtt';
2 changes: 1 addition & 1 deletion packages/mqtt/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ export interface Context extends IMidwayMQTTContext {}
export type NextFunction = BaseNextFunction;

export interface IMqttSubscriber {
subscribe(ctx: IMidwayMQTTContext): Promise<void>;
subscribe(ctx: IMidwayMQTTContext): Promise<any>;
}

4 changes: 2 additions & 2 deletions packages/mqtt/test/fixtures/base-app/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as mqtt from '../../../../src';
port: 1883,
},
subscribeOptions: {
topicObject: 'test',
topicObject: 'test_midway',
},
},
},
Expand All @@ -41,7 +41,7 @@ export class AutoConfiguration implements ILifeCycle {
async onServerReady(container: IMidwayContainer) {
const producer = await container.getAsync(DefaultMqttProducer);
console.log('onServerReady and send message');
await producer.publishAsync('test', 'hello world', {
await producer.publishAsync('test_midway', 'hello world', {
qos: 2
});
}
Expand Down
54 changes: 52 additions & 2 deletions packages/mqtt/test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { createApp, close } from '@midwayjs/mock';
import { sleep } from '@midwayjs/core';
import { close, createApp, createLightApp } from '@midwayjs/mock';
import { IMidwayApplication, Provide, Scope, ScopeEnum, sleep } from '@midwayjs/core';
import { Framework, IMqttSubscriber, Mqtt, MqttProducerFactory } from '../src';

describe('/test/index.test.ts', () => {

it('should test Mqtt export ', () => {
expect(Mqtt).toBeDefined();
});

it('should test subscribe topic and send message', async () => {
// create app and got data
const app = await createApp('base-app');
Expand All @@ -16,4 +22,48 @@ describe('/test/index.test.ts', () => {
await sleep();
await close(app);
});

it('should test dynamic create mqtt subscribe case', async () => {
let app: IMidwayApplication;
@Provide()
@Scope(ScopeEnum.Request)
class TestSubscriber implements IMqttSubscriber {
async subscribe() {
app.setAttr('subscribe', true);
}
}
app = await createLightApp({
imports: [
require('../src')
],
preloadModules: [
TestSubscriber
]
});
await (app.getFramework() as Framework).createSubscriber({
host: 'test.mosquitto.org',
port: 1883,
}, {
topicObject: 'test_midway_dynamic',
}, TestSubscriber, 'test');

await sleep();

// send
const producerService = await app.getApplicationContext().getAsync(MqttProducerFactory);
const producer = await producerService.createInstance({
host: 'test.mosquitto.org',
port: 1883,
}, 'test');

producer.publish('test_midway_dynamic', 'hello world', {
qos: 2
});

await sleep();

expect(app.getAttr('subscribe')).toBe(true);

await close(app);
});
});

0 comments on commit 107539a

Please sign in to comment.