一、目标
springboot
实现mqtt
消息的订阅处理,基于spring integration
,做到:
- 业务处理分离,通过
@Handler
与@MessagePattern
自定义注解标记处理类与处理方法,接收到Mqtt
消息后自动依据主题去匹配相应的处理方法。 - 通过
@MessagePattern
注解自动搜索所有需处理的主题并订阅。 - 每个消息到达后异步进行处理并自动对消息体进行类型转换。
大概流程如下:
二、配置
在applocation.yaml
中加入mqtt broker
的配置:
mqtt:
host: 127.0.0.1
port: 1883
maven
配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>top.wteng</groupId>
<artifactId>MqttBootIntegration</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>MqttBootIntegration</name>
<description>Mqtt integration for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- spring integration相关依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- json转换 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<!-- 开发者依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<!-- 测试相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<!-- 插件仓库 -->
<repository>
<id>Spring Plugins Repository</id>
<name>Spring Plugins Repository</name>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、 自定义注解
自定义两个注解,一个Handler
,类似于Controller
注解,标记一个类为处理类,接收到mqtt
消息后去应用了该注解的类中查找处理方法,另一个MessagePattern
,应用在方法上,其value
参数表明处理的主题,订阅也是订阅的所有MessagePattern
声明了的主题,可以应用通配符,启动时,会先扫描所有应用了Handler
的类,在类中获取到所有MessagePattern
注解的value
值进行订阅,当收到消息后,遍历所有应用了MessagePattern
的方法,匹配消息的topic
与注解的value
值,若匹配成功,则应用相应的方法去处理。
Handler
实现如下:
package top.wteng.mqttbootintegration.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Handler {
}
MessagePattern
实现如下:
package top.wteng.mqttbootintegration.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MessagePattern {
// 主题,可以用通配符,应用启动时会取出该值进行订阅,收到消息时也会依据此值与主题进行匹配,确定处理函数
String value();
}
四、处理类
假设有一个temperature
主题需要处理,其消息体格式对应的实体如下:
package top.wteng.mqttbootintegration.entity;
public class Temperature {
private Integer id;
private float value;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public float getValue() {
return value;
}
public void setValue(float value) {
this.value = value;
}
@Override
public String toString() {
return "Temperature{" +
"id=" + id +
", value=" + value +
'}';
}
}
要实现的目的便是只需为其定义一个处理类,无需任何其他操作,即可自动订阅并分发处理,只是约定处理函数第一个参数为字符串类型的topic
,第二个参数为消息的payload
,第二个参数的类型应为string
或payload
对应的实体类型,若类型不为string
,则会用JSON.parseObject
方法进行解析转换,如下:
package top.wteng.mqttbootintegration.handler;
import org.springframework.stereotype.Component;
import top.wteng.mqttbootintegration.annotation.Handler;
import top.wteng.mqttbootintegration.annotation.MessagePattern;
import top.wteng.mqttbootintegration.entity.Temperature;
@Handler // 表明是处理类
@Component
public class TemperatureHandler {
// 程序启动时自动取出value值进行订阅并在收到消息后依据value值匹配到该函数进行处理
@MessagePattern(value = "temperature")
public void temperatureHandler(String topic, Temperature temperature) throws InterruptedException {
// 因为第二个参数类型为Temperature,在应用该函数时,会通过JSON.parseObject对原始字符串类型的payload进行转换
System.out.printf("thread id: %d, temperature = %s%n", Thread.currentThread().getId(), temperature.toString());
}
}
五、Mqtt
集成
处理类都有了,下一步就是将mqtt
集成进来,在spring integration
下mqtt
的集成本身很简单,只需在启动类中定义几个bean
即可,只是在程序启动后,需要在mqtt
中自动取出并订阅前面MessagePattern
中的值,为实现这个目的,先实现一个工具类如下:
package top.wteng.mqttbootintegration.util;
import java.lang.reflect.Method;
import java.util.*;
import java.util.stream.Collectors;
import org.springframework.context.ApplicationContext;
import top.wteng.mqttbootintegration.annotation.Handler;
import top.wteng.mqttbootintegration.annotation.MessagePattern;
import top.wteng.mqttbootintegration.entity.HandlerCache;
public class HandlerBeanUtil {
// 保存了主题与处理函数等的对应关系,收到消息后直接遍历该列表进行匹配调用即可,这样就无需再扫描一遍注解,可以省去扫描过程中的反射开销
private static final List<HandlerCache> messageHandlers = new ArrayList<>();
private static final String WILDCARD_ALL = "#";
private static final String WILDCARD_PART = "+";
private static final String SEPARATOR = "/";
// 获取所有MessagePattern注解中的值并存入列表
public static List<String> getAndCacheMessagePatterns(ApplicationContext context) {
return Arrays.asList(context.getBeanNamesForAnnotation(Handler.class))
.parallelStream()
.map(context::getBean) // 所有应用了Handler注解的bean
.flatMap(bean -> {
Class<?> handlerClass = bean.getClass();
List<HandlerCache> mhs = new ArrayList<>();
// 所有应用了MessagePatter注解的函数
Method[] methods = handlerClass.getDeclaredMethods();
for (Method m: methods) {
MessagePattern mp = m.getDeclaredAnnotation(MessagePattern.class);
if (mp != null && !isMessageHandlersHas(mp.value())) {
Class<?>[] parameterTypes = m.getParameterTypes();
mhs.add(new HandlerCache(mp.value(), bean, m, parameterTypes.length > 1 ? parameterTypes[1] : String.class));
}
}
messageHandlers.addAll(mhs);
return mhs.stream();
})
.map(HandlerCache::getMessagePattern)
.collect(Collectors.toList());
}
private static boolean isMessageHandlersHas(String pattern) {
return messageHandlers.stream()
.filter(hc -> hc.getMessagePattern().equals(pattern)).findFirst().orElse(null) != null;
}
private static boolean isMatchPattern(String pattern, String topic) {
if (!pattern.contains(WILDCARD_ALL) && !pattern.contains(WILDCARD_PART)) {
// 不是通配订阅
return pattern.equals(topic);
}
// 订阅中含有通配符
String[] patternSegments = pattern.split(SEPARATOR);
String[] topicSegments = topic.split(SEPARATOR);
for (int i = 0; i < patternSegments.length; i ++) {
// 对各个主题层级进行匹配
String curPatternSeg = patternSegments[i];
String curTopicSeg = topicSegments.length > i ? topicSegments[i]: null;
if (curTopicSeg == null && !curPatternSeg.equals(WILDCARD_ALL)) {
// 主题层级比订阅层级少且相应的订阅层级不是#
return false;
}
if ("".equals(curTopicSeg) && "".equals(curPatternSeg)) {
// 可能以 / 开头
continue;
}
if (curPatternSeg.equals(WILDCARD_ALL)) {
// 是#通配,则#必须是最后一级
return i == patternSegments.length - 1;
}
// 当前层级不是通配,需要字符串相等
if (!curPatternSeg.equals(WILDCARD_PART) && !curPatternSeg.equals(curTopicSeg)) {
return false;
}
}
return patternSegments.length == topicSegments.length;
}
public static HandlerCache getHandlerMethod(String topic) {
return messageHandlers.stream()
.filter(hc -> isMatchPattern(hc.getMessagePattern(), topic)).findFirst().orElse(null);
}
}
其中HandlerCache
保存了主题(MessagePattern
中的值)、处理函数、消息体对应的实体类型(处理方法的第二个参数的类型)、处理函数所在对象的bean
,如下:
package top.wteng.mqttbootintegration.entity;
import java.lang.reflect.Method;
public class HandlerCache {
private Object handlerBean; // 处理函数所在的对象引用
private Method handlerMethod; // 处理函数的反射
private String messagePattern; // 主题,@MessagePattern注解中的值
private Class<?> convertType; // 消息体对应的实体类型,即处理函数第二个参数的类型
public HandlerCache(String messagePattern, Object handlerBean, Method handlerMethod, Class<?> convertType) {
this.messagePattern = messagePattern;
this.handlerBean = handlerBean;
this.handlerMethod = handlerMethod;
this.convertType = convertType;
}
public Object getHandlerBean() {
return handlerBean;
}
public void setHandlerBean(Object handlerBean) {
this.handlerBean = handlerBean;
}
public Method getHandlerMethod() {
return handlerMethod;
}
public void setHandlerMethod(Method handlerMethod) {
this.handlerMethod = handlerMethod;
}
public String getMessagePattern() {
return messagePattern;
}
public void setMessagePattern(String messagePattern) {
this.messagePattern = messagePattern;
}
public Class<?> getConvertType() {
return convertType;
}
public void setConvertType(Class<?> convertType) {
this.convertType = convertType;
}
}
最后在启动类中定义mqtt
相关的bean
如下:
package top.wteng.mqttbootintegration;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.EnableAsync;
import top.wteng.mqttbootintegration.util.DispatchUtil;
import top.wteng.mqttbootintegration.util.HandlerBeanUtil;
@SpringBootApplication
@EnableAsync // 开启异步
public class MqttBootIntegrationApplication implements CommandLineRunner{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired private ApplicationContext applicationContext;
@Autowired private DispatchUtil dispatchUtil;
@Value("${mqtt.host}") private String mqttHost;
@Value("${mqtt.port}") private String mqttPort;
public static void main(String[] args) {
SpringApplication.run(MqttBootIntegrationApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("MqttBootIntegrationApplication is running ...");
}
@Bean
// mqtt接收频道
public MessageChannel mqttReceiverChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer mqttReceiver() {
// 取出所有主题进行订阅
List<String> mps = HandlerBeanUtil.getAndCacheMessagePatterns(applicationContext);
String[] mpArr = mps.toArray(new String[0]);
String mqttUrl = String.format("tcp://%s:%s", mqttHost, mqttPort);
String clientId = UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttUrl, clientId, mpArr);
logger.info(String.format("connected to mqtt %s with client id %s ...", mqttUrl, clientId));
logger.info(String.format("topic subscribed: %s", String.join(",", mpArr)));
adapter.setOutputChannel(mqttReceiverChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttReceiverChannel")
// 绑定了mqtt接收频道,收到消息后会触发此函数
public MessageHandler messageHandler() {
// 分发消息,异步处理
return message ->
dispatchUtil.dispatchMessage((String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), (String)message.getPayload());
}
}
其中用于匹配处理函数的dispatchMessage
单独定义到了一个DispatchUtil
类中,这是因为要为其应用@Async
注解实现异步,但该注解不能应用于EnableAsync
标记的同一类中,不然不会生效,DispatchUtil
实现如下:
package top.wteng.mqttbootintegration.util;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import top.wteng.mqttbootintegration.entity.HandlerCache;
import java.lang.reflect.InvocationTargetException;
@Component
public class DispatchUtil {
private final Logger logger = LoggerFactory.getLogger(DispatchUtil.class);
@Async // 实现异步
public void dispatchMessage(String topic, String payload) {
HandlerCache hc = HandlerBeanUtil.getHandlerMethod(topic);
if (hc == null) {
logger.warn(String.format("not find handler method for topic %s ...", topic));
return;
}
try {
Class<?> type = hc.getConvertType();
hc.getHandlerMethod().invoke(hc.getHandlerBean(), topic, type.getTypeName().equals("java.lang.String") ? payload: JSON.parseObject(payload, type));
} catch (InvocationTargetException e) {
logger.warn(String.format("invoke handler method for topic %s failed ...", topic));
} catch (IllegalAccessException e) {
logger.warn(String.format("access handler method for topic %s failed ...", topic));
}
}
}
至此,集成完毕,启动程序并发送几个消息,即可看到打印输出:
完整示例看这里。