Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support dynamic create mqtt consumer #3699

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions packages/mock/src/creator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,13 +646,18 @@ class BootstrapAppStarter implements IBootstrapAppStarter {

/**
* Create a real project but not ready or a virtual project
* @param baseDir
* @param baseDirOrOptions
* @param options
*/
export async function createLightApp(
baseDir = '',
baseDirOrOptions: string | MockAppConfigurationOptions,
options: MockAppConfigurationOptions = {}
): Promise<IMidwayApplication> {
if (baseDirOrOptions && typeof baseDirOrOptions === 'object') {
options = baseDirOrOptions;
baseDirOrOptions = options.baseDir || '';
}

Framework()(LightFramework);
options.globalConfig = Object.assign(
{
Expand All @@ -676,7 +681,7 @@ export async function createLightApp(
options.moduleLoadType = pkgJSON?.type === 'module' ? 'esm' : 'commonjs';
}

return createApp(baseDir, {
return createApp(baseDirOrOptions as string, {
...options,
imports: [
await transformFrameworkToConfiguration(
Expand Down
65 changes: 43 additions & 22 deletions packages/mqtt/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
IMidwayMQTTApplication,
IMidwayMQTTConfigurationOptions,
IMidwayMQTTContext,
IMqttSubscriber,
MqttSubscriberOptions,
} from './interface';
import { connect, IClientOptions, MqttClient } from 'mqtt';
import { MQTT_DECORATOR_KEY } from './decorator';
Expand Down Expand Up @@ -53,30 +55,12 @@ export class MidwayMQTTFramework extends BaseFramework<
}

for (const customKey in sub) {
const consumer = await this.createSubscriber(
await this.createSubscriber(
sub[customKey].connectOptions,
sub[customKey].subscribeOptions,
mqttSubscriberMap[customKey],
customKey
);

await consumer.subscribeAsync(
sub[customKey].subscribeOptions.topicObject,
sub[customKey].subscribeOptions.opts
);

consumer.on('message', async (topic, message, packet) => {
const ctx = this.app.createAnonymousContext();
ctx.topic = topic;
ctx.packet = packet;
ctx.message = message;
const fn = await this.applyMiddleware(async ctx => {
const instance = await ctx.requestContext.getAsync(
mqttSubscriberMap[customKey]
);
// eslint-disable-next-line prefer-spread
return await instance['subscribe'].call(instance, ctx);
});
return await fn(ctx);
});
}
}

Expand All @@ -87,11 +71,28 @@ export class MidwayMQTTFramework extends BaseFramework<
}
}

/**
* dynamic create subscriber
*/
public async createSubscriber(
/**
* mqtt connection options
*/
connectionOptions: IClientOptions,
/**
* mqtt subscribe options
*/
subscribeOptions: MqttSubscriberOptions,
/**
* midway mqtt subscriber class
*/
ClzProvider: new () => IMqttSubscriber,
/**
* midway mqtt component instance name, if not set, will be manager by your self
*/
clientName?: string
) {
return new Promise<MqttClient>(resolve => {
const consumer = await new Promise<MqttClient>(resolve => {
const client = connect(connectionOptions);
client.on('connect', () => {
if (clientName) {
Expand All @@ -107,6 +108,26 @@ export class MidwayMQTTFramework extends BaseFramework<
this.mqttLogger.error(err);
});
});

consumer.on('message', async (topic, message, packet) => {
const ctx = this.app.createAnonymousContext();
ctx.topic = topic;
ctx.packet = packet;
ctx.message = message;
const fn = await this.applyMiddleware(async ctx => {
const instance = await ctx.requestContext.getAsync(ClzProvider);
// eslint-disable-next-line prefer-spread
return await instance['subscribe'].call(instance, ctx);
});
return await fn(ctx);
});

await consumer.subscribeAsync(
subscribeOptions.topicObject,
subscribeOptions.opts
);

return consumer;
}

public getSubscriber(name: string) {
Expand Down
1 change: 1 addition & 0 deletions packages/mqtt/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { MidwayMQTTFramework as Framework } from './framework';
export { MQTTConfiguration as Configuration } from './configuration';
export * from './decorator';
export * from './service';
export * as Mqtt from 'mqtt';
2 changes: 1 addition & 1 deletion packages/mqtt/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ export interface Context extends IMidwayMQTTContext {}
export type NextFunction = BaseNextFunction;

export interface IMqttSubscriber {
subscribe(ctx: IMidwayMQTTContext): Promise<void>;
subscribe(ctx: IMidwayMQTTContext): Promise<any>;
}

4 changes: 2 additions & 2 deletions packages/mqtt/test/fixtures/base-app/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as mqtt from '../../../../src';
port: 1883,
},
subscribeOptions: {
topicObject: 'test',
topicObject: 'test_midway',
},
},
},
Expand All @@ -41,7 +41,7 @@ export class AutoConfiguration implements ILifeCycle {
async onServerReady(container: IMidwayContainer) {
const producer = await container.getAsync(DefaultMqttProducer);
console.log('onServerReady and send message');
await producer.publishAsync('test', 'hello world', {
await producer.publishAsync('test_midway', 'hello world', {
qos: 2
});
}
Expand Down
54 changes: 52 additions & 2 deletions packages/mqtt/test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { createApp, close } from '@midwayjs/mock';
import { sleep } from '@midwayjs/core';
import { close, createApp, createLightApp } from '@midwayjs/mock';
import { IMidwayApplication, Provide, Scope, ScopeEnum, sleep } from '@midwayjs/core';
import { Framework, IMqttSubscriber, Mqtt, MqttProducerFactory } from '../src';

describe('/test/index.test.ts', () => {

it('should test Mqtt export ', () => {
expect(Mqtt).toBeDefined();
});

it('should test subscribe topic and send message', async () => {
// create app and got data
const app = await createApp('base-app');
Expand All @@ -16,4 +22,48 @@ describe('/test/index.test.ts', () => {
await sleep();
await close(app);
});

it('should test dynamic create mqtt subscribe case', async () => {
let app: IMidwayApplication;
@Provide()
@Scope(ScopeEnum.Request)
class TestSubscriber implements IMqttSubscriber {
async subscribe() {
app.setAttr('subscribe', true);
}
}
app = await createLightApp({
imports: [
require('../src')
],
preloadModules: [
TestSubscriber
]
});
await (app.getFramework() as Framework).createSubscriber({
host: 'test.mosquitto.org',
port: 1883,
}, {
topicObject: 'test_midway_dynamic',
}, TestSubscriber, 'test');

await sleep();

// send
const producerService = await app.getApplicationContext().getAsync(MqttProducerFactory);
const producer = await producerService.createInstance({
host: 'test.mosquitto.org',
port: 1883,
}, 'test');

producer.publish('test_midway_dynamic', 'hello world', {
qos: 2
});

await sleep();

expect(app.getAttr('subscribe')).toBe(true);

await close(app);
});
});