概述
Mqtt
是常用的发布/订阅方案,在Node
中,由于Node
天生单进程的特点,在订阅并处理大量主题时可能会有些力不从心,且传统的Mqtt
订阅/处理实现也不够优雅。可以通过Nest
的依赖注入、AOP
特性配合Cluster
多进程解决这一问题,将消息订阅与具体处理逻辑进行进程分离,耗时的处理通过多进程进行,逻辑示意如下:
创建工程
初始化工程
mkdir MqttHandler && cd MqttHandler && yarn init
创建typescript
配置文件
在工程根目录下创建tsconfig.json
文件,内容如下:
{
"compilerOptions": {
"module": "commonjs", // 模块类型
"declaration": true, // 是否生成类型声明文件,即*.d.ts
"removeComments": true, // 是否去除注释
"emitDecoratorMetadata": true, // 是否可以添加和读取元数据
"experimentalDecorators": true, // 启用装饰器
"target": "esnext", // 编译目标版本
"sourceMap": true, // 是否生成源码地图
"outDir": "./dist", // 编译输入目录
"baseUrl": "./", // 根目录
"incremental": true // 开启增量编译
},
"include": [
"src/**/*"
],
"exclude": ["node_modules", "dist", "resource", "out"]
}
创建编译配置tsconfig.build.json
,内容如下:
{
"extends": "./tsconfig.json",
"exclude": ["node_modules", "test", "dist", "**/*spec.ts"]
}
创建源码目录
mkdir src # 源码根目录
mkdir src/processor # 业务处理进程代码目录
mkdir src/subscriber # 订阅进程代码目录
安装相关依赖
yarn add @nestjs/core @nestjs/common @nestjs/microservices rxjs reflect-metadata # nest所需相关依赖
yarn add mqtt # mqtt依赖
yarn add typescript --dev # typescript依赖
yarn add @types/node --dev # node类型库
订阅主进程实现
消息订阅采用nest
的微服务实现
在src/subscriber
下创建sub.module.ts
,作为订阅逻辑的根模块,用于容器初始化,代码如下:
// src/subscriber/sub.module.ts
import { Module } from "@nestjs/common";
@Module({})
export default class SubModule {}
然后src/subscriber
下创建main.ts
,做为订阅逻辑的入口文件,启动mqtt
监听服务,如下:
import { NestFactory } from "@nestjs/core";
import { MicroserviceOptions, Transport } from "@nestjs/microservices";
import SubModule from "./sub.module";
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(SubModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883' // mqtt路径
}
});
app.listen(() => console.log('mqtt listening ...'));
}
最后编写mqtt
消息接收转发逻辑,在src
目录下新建dto.ts
,创建一个示例数据类型,对应于mqtt
消息体格式,如下:
// src/dto.ts
export class TestDto {
name: string;
index: number;
}
然后在src/subscriber
下创建dispatch.controller.ts
进行消息订阅与进程转发:
// src/subscriber/dispatch.controller.ts
import * as cluster from 'cluster';
import { Ctx, EventPattern, MqttContext, Payload } from "@nestjs/microservices";
import { TestDto } from "../dto";
export default class DispatchController {
private getWorkderId() {
const workIds = Object.keys(cluster.workers); // 所有工作进程的ID
const workId = Math.floor(Math.random() * workIds.length) % workIds.length; // 随机挑选一个工作进程
return workIds[workId];
}
// 订阅主题,任何符合该主题通配规则的消息都会在此处处理
@EventPattern('test/+/test')
testDispatch(@Payload() payload: TestDto, @Ctx() ctx: MqttContext) {
const topic = ctx.getTopic();
// 不对消息做任何实质性处理,仅仅做一个到工作进程的转发
cluster.workers[this.getWorkderId()].send({topic, payload});
}
}
将DispatchController
添加到src/subscriber/sub.module.ts
中,以便可以实例化:
// src/subscriber/sub.module.ts
import { Module } from "@nestjs/common";
import DispatchController from "./dispatch.controller";
@Module({
controllers: [DispatchController] // 引入控制类
})
export default class SubModule {}
至此,订阅进程相关逻辑便完成了。
工作进程逻辑实现
下面对工作进程相关逻辑进行实现。工作进程的目的即为通过IPC
通道接收主进程传过来的消息,然后将消息分发给相应处理函数进行具体业务处理,基于此,我们自行实现一个监听策略去监听IPC
通道,在src/processor
目录下新建ipc.strategy.ts
,如下:
// src/processor/ipc.strategy.ts
import { CustomTransportStrategy, Server } from "@nestjs/microservices";
export default class IpcStrategy extends Server implements CustomTransportStrategy {
/**
* 根据主题获取具体处理函数的名称
* @param {string} topic 主题
*/
getHandlerName(topic: string) {
if (/^test\/([1-9a-zA-Z-._]+)\/test$/.test(topic)) {
// 匹配 test/+/test 格式的主题,该主题的消息都会交给名为 test 的处理函数处理,处理函数会在后面定义
return 'test';
} else {
return null;
}
}
/**
* 监听IPC通道
* @param {() => void} callback 回调函数
*/
listen(callback: () => void) {
process.on('message', (message: {topic: string, payload: any}) => {
const { topic } = message;
const handlerName = this.getHandlerName(topic); // 获取处理函数名称
if (handlerName) {
const handler = this.getHandlerByPattern(handlerName); // 获取具体处理函数
if (handler) {
handler(message); // 调用处理函数
}
}
});
callback();
}
close() {
process.exit(0);
}
}
然后定义具体的处理函数,在src/processor
目录下新建test-processor.controller.ts
,如下:
// src/processor.test-processor.controller.ts
import { MessagePattern, Payload } from "@nestjs/microservices";
import { TestDto } from "../dto";
export default class TestProcessorController {
// 具体业务处理函数,处理 test/+/test 主题对应的消息
@MessagePattern('test') // DispatchController中便是根据此处名称获取到函数实例
async processTest(@Payload() message: {topic: string, payload: TestDto}) {
const {topic, payload} = message;
// TODO 处理逻辑 ...
console.log(`processing, process id = ${process.pid}, topic = ${topic}, paylaod = `, payload);
}
}
然后定义模块容器,在src/processor
目录下新建processor.module.ts
,如下:
import { Module } from "@nestjs/common";
import TestProcessorController from "./test-processor.controller";
@Module({
controllers: [TestProcessorController]
})
export default class ProcessorModule {}
最后定义入口函数,实例化相关模块并启动监听,在src/processor
目录下新建main.ts
,如下:
// src/processor/main.ts
import { NestFactory } from "@nestjs/core";
import { MicroserviceOptions } from "@nestjs/microservices";
import IpcStrategy from "./ipc.strategy";
import ProcessorModule from "./processor.module";
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(ProcessorModule, {
strategy: new IpcStrategy() // 应用自定义的IPC监听策略
});
// 启动监听
app.listen(() => console.log(`worker processor listening, process id = ${process.pid}`));
}
程序主入口实现
至此,mqtt
订阅与业务处理工作进程都已经实现,最后编写主入口逻辑,在主进程启动mqtt
订阅服务,并启动多个工作线程,在src
目录下新建main.ts
,如下:
import * as cluster from 'cluster';
if (cluster.isMaster) {
// 主进程引入mqtt订阅服务
import('./subscriber/main').then(async suber => {
await suber.bootstrap();
});
// 启动5个处理进程
for (let i = 0; i < 5; i ++) {
cluster.fork();
}
} else {
// 工作进程
import ('./processor/main').then(async processor => {
await processor.bootstrap();
});
}
启动
在package.json
中加入启动命令,如下:
// package.json
{
"name": "MqttHandler",
"version": "1.0.0",
"main": "dist/main.js",
"author": "tensoar",
"license": "MIT",
"devDependencies": {
"@types/node": "^14.14.22",
"typescript": "^4.1.3"
},
"scripts": {
"start": "tsc && node dist/main.js" // 启动
},
"dependencies": {
"@nestjs/common": "^7.6.7",
"@nestjs/core": "^7.6.7",
"@nestjs/microservices": "^7.6.7",
"mqtt": "^4.2.6",
"reflect-metadata": "^0.1.13",
"rxjs": "^6.6.3"
}
}
启动:
yarn start //启动程序 或直接运行 npx tsc && node dist/main.js
测试
在根目录下新建test
文件夹,并创建一个mqtt-publish.ts
测试文件:
mkdir test && cd test && touch mqtt-publish.ts
发送一些消息,如下:
import { connect } from 'mqtt';
const client = connect('mqtt://localhost:1883');
// 发送10个消息
for (let i = 0; i < 10; i ++) {
const payload ={
name: `test${i}`,
index: i
};
client.publish(`test/${i}/test`, JSON.stringify(payload), {qos: 0});
}
运行此文件:
npx tsc test\mqtt-publish.ts // 编译
node test\mqtt-publish.js // 运行
将会看到处理程序输出。