mosquitto的安装与配置

安装环境: Ubuntu18.04.1

通过apt即可安装mosquitto:

#获取最新版本mosquitto
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa #若提示未找到repository错误,先运行:sudo apt-get install python-software-properties
sudo apt update
sudo apt install mosquitto mosquitto-clients

配置文件位于/etc/mosquitto目录下,名为mosquitto.conf,打开该文件,内容及说明如下:

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

pid_file /var/run/mosquitto.pid #pid文件路径,当mosquitto服务启动时,就会将主进程的PID信息写入该文件,当mosquitto服务关闭时,会自动删除该文件。

persistence true #持久化开关,当mosquitto的服务质量设置为至少一次(Qos=1)或恰好一次(Qos=2)时,mosquitto会将消息持久化存储到磁盘上,以防止连接或服务崩溃时消息丢失。
persistence_location /var/lib/mosquitto/ #持久化文件的存储路径

log_dest file /var/log/mosquitto/mosquitto.log  #日志路径
include_dir /etc/mosquitto/conf.d #加载的配置目录

下面创建一个用于客户端连接的用户名和密码:

sudo mosquitto_passwd -c /etc/mosquitto/passwd.conf mqdemo #注意此处的-c,c是小写

运行该命令后,将会提示你创建密码,最终该条命令会在/etc/mosquitto/目录下创建一个名为passwd.conf的文件,并将用户名和密码信息保存在passwd.conf文件中。打开该文件就会看到相应信息,且密码是加密后写在里面。若要删除用户名,命令为:

sudo mosquitto_passwd -D /etc/mosquitto/passwd.conf mqdemo

然后将用户名信息文件加载到配置文件中,在/etc/mosquitto/mosquitto.conf中加入以下配置:

password_file /etc/mosquitto/passwd.conf

启动

通过以下命令即可启动mosquitto:

# -c 指定要加载的配置文件
# -p 指定监听的端口(若不指定,默认为1883)
# -d 以后台服务运行
mosquitto -c /etc/mosquitto/mosquitto.conf -p 1883 -d

若是报错:Unable to open log file /var/log/mosquitto/mosquitto.log for writing,给予文件足够权限即可:

sudo chmod 666  /var/log/mosquitto/mosquitto.log

停止服务时,直接找到pid杀死即可:

ps -aux | grep mosquitto #当然,mosquitto启动时即已把pid信息写入/var/run/mosquitto.pid文件中,也可从其获取pid
sudo kill pid

使用

下面在Node下实现简单的mqtt消息收发。新建名为mosquitto_demo_publish.jsmosquitto_demo_subscribe.jsNode脚本,在脚本文件夹下打开一个terminal终端,安装mqtt模块:

npm install mqtt --save

mosquitto_demo_publish.js脚本用于发送mqtt消息,代码如下:

const mqtt = require('mqtt')

const mq_broker_url = 'mqtt://192.168.1.8'  //mqtt服务器地址
const mq_options = {
    username: 'mqdemo', //用户名
    password: '123456', //密码
}
const mqClient = mqtt.connect(mq_broker_url, mq_options)    //连接mqtt服务,返回一个client对象

mqClient.on('connect', () => {  //链接成功时触发
    console.log('MQTT Connected ...')
})

/***********************发布10个消息*************************/
let topic = 'demo_topic'
let message = {msg: "I'm msg"}
let options = {
    qos: 1, //服务质量 0:至多一次 1:至少一次 2:刚好一次,默认0
    retain: false, //是否持久化 默认false
    dup: false //重发标志 默认false
}
for (let i = 0; i < 10; i ++) { //发布十个消息
    message.msg = `I'm msg ` + i.toString()
    mqClient.publish(topic, JSON.stringify(message), options, () => {  //发布消息
        console.log('msg ' + i + ' published with topic1...')
    })
}

mosquitto_demo_subscribe.js脚本用于订阅并处理mqtt消息,代码如下:

const mqtt = require('mqtt')

const mq_broker_url = 'mqtt://192.168.1.8'
const mq_options = {
    username: 'mqdemo',
    password: '123456',
}
const mqClient = mqtt.connect(mq_broker_url, mq_options)

let topic = 'demo_topic'
mqClient.on('connect', () => {
    console.log('MQTT Connected ...')
    mqClient.subscribe(topic, {qos: 1}, (err, granted) => { //若订阅多个主题,topic可以是一个数组
        if(!err)
            granted.forEach(item => console.log('topic ' + item.topic + ' has been subscribed, QoS = ' + item.qos + '...'))
    })
})

/*****************处理消息************************/
mqClient.on('message', (tp, msg) => {   //处理消息
    console.log('topic = ', tp)
    console.log('msg = ', JSON.parse(msg))
})

打开一个控制台,运行mosquitto_demo_subscribe.js,输出如下:

Snipaste_2018-12-09_22-27-25.png

然后打开另一个控制台,运行mosquitto_demo_publish.js,输出如下:

Snipaste_2018-12-09_22-21-50.png

然后查看mosquitto_demo_subscribe.js脚本的控制台,有了以下输出:

Snipaste_2018-12-09_22-27-38.png

证明我们成功通过mosquitto发布并订阅、处理了mqtt消息,在此基础上便可以实现消息推送、即时通讯等功能需求。

通过protobuf格式传输

上面已经成功发送并订阅了消息,但mqtt传输是基于tcp的,是未加密的传输方式,且上面的传输格式是用的JSON,也是一种明文传输方式,这都决定了消息传输的不安全性。虽然mosquitto支持ssl加密,但实际应用时,要申请权威的ssl证书,较为繁琐,这里通过将消息以protobuf格式传输,可以做到较为安全,且protobuf格式相较于JSON,其编码后的消息体积可以大大减少,可以有效节约带宽。
首先引入protobufjs模块:

npm install protobufjs --save

protobuf需要后缀为.proto的消息描述文件,新建一个demo.proto文件,并定义一个消息实体(一个.proto文件中可以定义多个消息实体,且消息实体间可以嵌套),内容如下:

syntax = "proto3";

message DemoMsg {
    string name = 1;
    double number = 2;
}

开始的syntax字段表明使用proto3语法,若不加该字段,默认使用proto2proto3语法相对更为简洁。message为消息关键字,这里定义了一个名为DemoMsg的消息实体,消息实体的名称一定要采用大驼峰命名方式,消息中有两个字段,类型分别为stringdoubleid分别为12,消息字段名称要采用下划线命名格式,在将消息编译为类时,会自动将下划线方式命名的字段转化为小驼峰式命名。
然后与上面类似,定义两个脚本实现消息收发。定义脚本protobuf_publish_demo.js,代码如下:

const mqtt = require('mqtt')
const protobuf = require('protobufjs')

const mq_broker_url = 'mqtt://192.168.1.8'
const mq_options = {
    username: 'mqdemo',
    password: '123456',
}
const mqClient = mqtt.connect(mq_broker_url, mq_options)
mqClient.on('connect', () => {  //链接成功时触发
    console.log('MQTT Connected ...')
})

let topic = 'proto_buf_test'

protobuf.load('./demo.proto', (err, demoRoot) => {  //加载.proto文件并处理
    if(err){
        console.log('err = ', err)
        return
    }
    let DemoMsg = demoRoot.lookupType('DemoMsg')    //获取消息实体

    let demoMsgPayLoad = {  //消息加载类,与消息实体的描述相对应
        name: 'demo',
        number: 1.2
    }

    //检验消息加载类的结构、数据类型等是否与消息实体中的描述一致
    if(DemoMsg.verify(demoMsgPayLoad)){
        console.log('格式错误')
        return
    }

    let demoMsg = DemoMsg.create(demoMsgPayLoad)    //创建消息

    let demoMsgBuf = DemoMsg.encode(demoMsg).finish()   //编码

    mqClient.publish(topic, demoMsgBuf, {qos: 1}, () => {   //发布
        console.log('msg has published ...')
    })
})

然后定义脚本protobuf_subscribe_demo.js,内容如下:

const mqtt = require('mqtt')
const protobuf = require('protobufjs')

const mq_broker_url = 'mqtt://192.168.1.8'
const mq_options = {
    username: 'mqdemo',
    password: '123456'
}
const mqClient = mqtt.connect(mq_broker_url, mq_options)

let topic = 'proto_buf_test'
mqClient.on('connect', () => {
    console.log("MQTT connected ... ")
    mqClient.subscribe(topic, {qos: 1}, (err, granted) => {
        if (!err)
            granted.forEach(g => console.log('topic ' + g.topic + ' has been subscribed, QoS = ' + g.qos + ' ...'))
    })
})

mqClient.on('message', (tp, msg) => {
    console.log('topic = ', tp)
    console.log('msg = ', msg)
    //console.log('msg_parse = ', JSON.parse(msg)) //若是直接以这种方式打印,会发现得到的是乱码

    protobuf.load('./demo.proto', (err, demoRoot) => {    //加载.proto消息描述文件并处理
        if(err){
            console.log('err = ', err)
            return
        }
        let DemoMsg = demoRoot.lookupType('DemoMsg')  //加载消息实体

        let msgDecode = DemoMsg.toObject(DemoMsg.decode(msg))  //根据消息描述实体解码,这也展现出只有拿到消息实体,才能进行解码,一定程度上保障了安全性。
        console.log('msg_decode = ', msgDecode);
    })
})

首先打开一个终端,运行protobuf_subscribe_demo.js脚本,输出如下:

Snipaste_2018-12-10_16-43-01.png

然后代开另一个终端,运行protobuf_publish_demo.js脚本,输出如下:

Snipaste_2018-12-10_16-45-36.png

再看protobuf_subscribe_demo.js脚本的终端,输出如下:

Snipaste_2018-12-10_16-45-55.png

可见,得到的消息直接打印,是一堆buffer数据,且直接利用JSON.parse等转换时,得到的是乱码,只有拿到消息实体,才能进行解码,得到正确的数据。