概述

Mqtt是常用的发布/订阅方案,在Node中,由于Node天生单进程的特点,在订阅并处理大量主题时可能会有些力不从心,且传统的Mqtt订阅/处理实现也不够优雅。可以通过Nest的依赖注入、AOP特性配合Cluster多进程解决这一问题,将消息订阅与具体处理逻辑进行进程分离,耗时的处理通过多进程进行,逻辑示意如下:

nest-mqtt-arch.png

创建工程

初始化工程

mkdir MqttHandler && cd MqttHandler && yarn init

创建typescript配置文件

在工程根目录下创建tsconfig.json文件,内容如下:

{
  "compilerOptions": {
    "module": "commonjs",  // 模块类型
    "declaration": true, // 是否生成类型声明文件,即*.d.ts
    "removeComments": true,  // 是否去除注释
    "emitDecoratorMetadata": true, // 是否可以添加和读取元数据
    "experimentalDecorators": true, // 启用装饰器
    "target": "esnext", // 编译目标版本
    "sourceMap": true, // 是否生成源码地图
    "outDir": "./dist",  // 编译输入目录
    "baseUrl": "./", // 根目录
    "incremental": true  // 开启增量编译
  },
  "include": [
      "src/**/*"
  ],
  "exclude": ["node_modules", "dist", "resource", "out"]
}

创建编译配置tsconfig.build.json,内容如下:

{
  "extends": "./tsconfig.json",
  "exclude": ["node_modules", "test", "dist", "**/*spec.ts"]
}

创建源码目录

mkdir src # 源码根目录
mkdir src/processor  # 业务处理进程代码目录
mkdir src/subscriber # 订阅进程代码目录

安装相关依赖

yarn add @nestjs/core @nestjs/common @nestjs/microservices rxjs reflect-metadata # nest所需相关依赖
yarn add mqtt # mqtt依赖

yarn add typescript --dev # typescript依赖
yarn add @types/node --dev # node类型库

订阅主进程实现

消息订阅采用nest的微服务实现

src/subscriber下创建sub.module.ts,作为订阅逻辑的根模块,用于容器初始化,代码如下:

// src/subscriber/sub.module.ts

import { Module } from "@nestjs/common";

@Module({})
export default class SubModule {}

然后src/subscriber下创建main.ts,做为订阅逻辑的入口文件,启动mqtt监听服务,如下:

import { NestFactory } from "@nestjs/core";
import { MicroserviceOptions, Transport } from "@nestjs/microservices";

import SubModule from "./sub.module";

async function bootstrap() {
    const app = await NestFactory.createMicroservice<MicroserviceOptions>(SubModule, {
        transport: Transport.MQTT,
        options: {
            url: 'mqtt://localhost:1883' // mqtt路径
        }
    });
    app.listen(() => console.log('mqtt listening ...'));
}

最后编写mqtt消息接收转发逻辑,在src目录下新建dto.ts,创建一个示例数据类型,对应于mqtt消息体格式,如下:

// src/dto.ts

export class TestDto {
    name: string;
    index: number;
}

然后在src/subscriber下创建dispatch.controller.ts进行消息订阅与进程转发:

// src/subscriber/dispatch.controller.ts
import * as cluster from 'cluster';

import { Ctx, EventPattern, MqttContext, Payload } from "@nestjs/microservices";

import { TestDto } from "../dto";

export default class DispatchController {
    private getWorkderId() {
        const workIds = Object.keys(cluster.workers); // 所有工作进程的ID
        const workId = Math.floor(Math.random() * workIds.length) % workIds.length; // 随机挑选一个工作进程
        return workIds[workId];
    }

    // 订阅主题,任何符合该主题通配规则的消息都会在此处处理
    @EventPattern('test/+/test')
    testDispatch(@Payload() payload: TestDto, @Ctx() ctx: MqttContext) {
        const topic = ctx.getTopic();
        // 不对消息做任何实质性处理,仅仅做一个到工作进程的转发
        cluster.workers[this.getWorkderId()].send({topic, payload});
    }
}

DispatchController添加到src/subscriber/sub.module.ts中,以便可以实例化:

// src/subscriber/sub.module.ts

import { Module } from "@nestjs/common";
import DispatchController from "./dispatch.controller";

@Module({
    controllers: [DispatchController] // 引入控制类
})
export default class SubModule {}

至此,订阅进程相关逻辑便完成了。

工作进程逻辑实现

下面对工作进程相关逻辑进行实现。工作进程的目的即为通过IPC通道接收主进程传过来的消息,然后将消息分发给相应处理函数进行具体业务处理,基于此,我们自行实现一个监听策略去监听IPC通道,在src/processor目录下新建ipc.strategy.ts,如下:

// src/processor/ipc.strategy.ts

import { CustomTransportStrategy, Server } from "@nestjs/microservices";

export default class IpcStrategy extends Server implements CustomTransportStrategy {
    /**
     * 根据主题获取具体处理函数的名称
     * @param {string} topic 主题
     */
    getHandlerName(topic: string) {
        if (/^test\/([1-9a-zA-Z-._]+)\/test$/.test(topic)) {
            // 匹配 test/+/test 格式的主题,该主题的消息都会交给名为 test 的处理函数处理,处理函数会在后面定义
            return 'test';
        } else {
            return null;
        }
    }

    /**
     * 监听IPC通道
     * @param {() => void} callback 回调函数
     */
    listen(callback: () => void) {
        process.on('message', (message: {topic: string, payload: any}) => {
            const { topic } = message;
            const handlerName = this.getHandlerName(topic); // 获取处理函数名称
            if (handlerName) {
                const handler = this.getHandlerByPattern(handlerName); // 获取具体处理函数
                if (handler) {
                    handler(message); // 调用处理函数
                }
            }
        });
        callback();
    }

    close() {
        process.exit(0);
    }
}

然后定义具体的处理函数,在src/processor目录下新建test-processor.controller.ts,如下:

// src/processor.test-processor.controller.ts

import { MessagePattern, Payload } from "@nestjs/microservices";
import { TestDto } from "../dto";

export default class TestProcessorController {

    // 具体业务处理函数,处理 test/+/test 主题对应的消息
    @MessagePattern('test') // DispatchController中便是根据此处名称获取到函数实例
    async processTest(@Payload() message: {topic: string, payload: TestDto}) {
        const {topic, payload} = message;
        // TODO 处理逻辑 ...
        console.log(`processing, process id = ${process.pid}, topic = ${topic}, paylaod = `, payload);
    }
}

然后定义模块容器,在src/processor目录下新建processor.module.ts,如下:

import { Module } from "@nestjs/common";
import TestProcessorController from "./test-processor.controller";

@Module({
    controllers: [TestProcessorController]
})
export default class ProcessorModule {}

最后定义入口函数,实例化相关模块并启动监听,在src/processor目录下新建main.ts,如下:

// src/processor/main.ts

import { NestFactory } from "@nestjs/core";
import { MicroserviceOptions } from "@nestjs/microservices";

import IpcStrategy from "./ipc.strategy";
import ProcessorModule from "./processor.module";

async function bootstrap() {
    const app = await NestFactory.createMicroservice<MicroserviceOptions>(ProcessorModule, {
        strategy: new IpcStrategy() // 应用自定义的IPC监听策略
    });
    // 启动监听
    app.listen(() => console.log(`worker processor listening, process id = ${process.pid}`));
}

程序主入口实现

至此,mqtt订阅与业务处理工作进程都已经实现,最后编写主入口逻辑,在主进程启动mqtt订阅服务,并启动多个工作线程,在src目录下新建main.ts,如下:

import * as cluster from 'cluster';

if (cluster.isMaster) {
    // 主进程引入mqtt订阅服务
    import('./subscriber/main').then(async suber => {
        await suber.bootstrap();
    });
    // 启动5个处理进程
    for (let i = 0; i < 5; i ++) {
        cluster.fork();
    }
} else {
    // 工作进程
    import ('./processor/main').then(async processor => {
        await processor.bootstrap();
    });
}

启动

package.json中加入启动命令,如下:

// package.json
{
  "name": "MqttHandler",
  "version": "1.0.0",
  "main": "dist/main.js",
  "author": "tensoar",
  "license": "MIT",
  "devDependencies": {
    "@types/node": "^14.14.22",
    "typescript": "^4.1.3"
  },
  "scripts": {
    "start": "tsc && node dist/main.js" // 启动
  },
  "dependencies": {
    "@nestjs/common": "^7.6.7",
    "@nestjs/core": "^7.6.7",
    "@nestjs/microservices": "^7.6.7",
    "mqtt": "^4.2.6",
    "reflect-metadata": "^0.1.13",
    "rxjs": "^6.6.3"
  }
}

启动:

yarn start //启动程序 或直接运行 npx tsc && node dist/main.js

测试

在根目录下新建test文件夹,并创建一个mqtt-publish.ts测试文件:

mkdir test && cd test && touch mqtt-publish.ts

发送一些消息,如下:

import { connect } from 'mqtt';

const client = connect('mqtt://localhost:1883');
// 发送10个消息
for (let i = 0; i < 10; i ++) {
    const payload ={
        name: `test${i}`,
        index: i
    };
    client.publish(`test/${i}/test`, JSON.stringify(payload), {qos: 0});
}

运行此文件:

npx tsc test\mqtt-publish.ts // 编译
node test\mqtt-publish.js // 运行

将会看到处理程序输出。