一、目标

springboot实现mqtt消息的订阅处理,基于spring integration,做到:

  • 业务处理分离,通过@Handler@MessagePattern自定义注解标记处理类与处理方法,接收到Mqtt消息后自动依据主题去匹配相应的处理方法。
  • 通过@MessagePattern注解自动搜索所有需处理的主题并订阅。
  • 每个消息到达后异步进行处理并自动对消息体进行类型转换。

大概流程如下:

mqtt-boot-tu.png

二、配置

applocation.yaml中加入mqtt broker的配置:

mqtt:
  host: 127.0.0.1
  port: 1883

maven配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>top.wteng</groupId>
    <artifactId>MqttBootIntegration</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>MqttBootIntegration</name>
    <description>Mqtt integration for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- spring integration相关依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <!-- json转换 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

        <!-- 开发者依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- 测试相关 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
     </dependencies>

     <repositories>
         <!-- 插件仓库 -->
        <repository>
            <id>Spring Plugins Repository</id>
            <name>Spring Plugins Repository</name>
            <url>https://repo.spring.io/plugins-release/</url>
        </repository>
     </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、 自定义注解

自定义两个注解,一个Handler,类似于Controller注解,标记一个类为处理类,接收到mqtt消息后去应用了该注解的类中查找处理方法,另一个MessagePattern,应用在方法上,其value参数表明处理的主题,订阅也是订阅的所有MessagePattern声明了的主题,可以应用通配符,启动时,会先扫描所有应用了Handler的类,在类中获取到所有MessagePattern注解的value值进行订阅,当收到消息后,遍历所有应用了MessagePattern的方法,匹配消息的topic与注解的value值,若匹配成功,则应用相应的方法去处理。

Handler实现如下:

package top.wteng.mqttbootintegration.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Handler {
}

MessagePattern实现如下:

package top.wteng.mqttbootintegration.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MessagePattern {
    // 主题,可以用通配符,应用启动时会取出该值进行订阅,收到消息时也会依据此值与主题进行匹配,确定处理函数
    String value();
}

四、处理类

假设有一个temperature主题需要处理,其消息体格式对应的实体如下:

package top.wteng.mqttbootintegration.entity;

public class Temperature {
    private Integer id;
    private float value;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public float getValue() {
        return value;
    }

    public void setValue(float value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Temperature{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

要实现的目的便是只需为其定义一个处理类,无需任何其他操作,即可自动订阅并分发处理,只是约定处理函数第一个参数为字符串类型的topic,第二个参数为消息的payload,第二个参数的类型应为stringpayload对应的实体类型,若类型不为string,则会用JSON.parseObject方法进行解析转换,如下:

package top.wteng.mqttbootintegration.handler;

import org.springframework.stereotype.Component;

import top.wteng.mqttbootintegration.annotation.Handler;
import top.wteng.mqttbootintegration.annotation.MessagePattern;
import top.wteng.mqttbootintegration.entity.Temperature;

@Handler // 表明是处理类
@Component
public class TemperatureHandler {

       // 程序启动时自动取出value值进行订阅并在收到消息后依据value值匹配到该函数进行处理
    @MessagePattern(value = "temperature")
    public void temperatureHandler(String topic, Temperature temperature) throws InterruptedException {
        // 因为第二个参数类型为Temperature,在应用该函数时,会通过JSON.parseObject对原始字符串类型的payload进行转换
        System.out.printf("thread id: %d, temperature = %s%n", Thread.currentThread().getId(), temperature.toString());
    }

}

五、Mqtt集成

处理类都有了,下一步就是将mqtt集成进来,在spring integrationmqtt的集成本身很简单,只需在启动类中定义几个bean即可,只是在程序启动后,需要在mqtt中自动取出并订阅前面MessagePattern中的值,为实现这个目的,先实现一个工具类如下:

package top.wteng.mqttbootintegration.util;

import java.lang.reflect.Method;
import java.util.*;
import java.util.stream.Collectors;

import org.springframework.context.ApplicationContext;

import top.wteng.mqttbootintegration.annotation.Handler;
import top.wteng.mqttbootintegration.annotation.MessagePattern;
import top.wteng.mqttbootintegration.entity.HandlerCache;

public class HandlerBeanUtil {
    // 保存了主题与处理函数等的对应关系,收到消息后直接遍历该列表进行匹配调用即可,这样就无需再扫描一遍注解,可以省去扫描过程中的反射开销
    private static final List<HandlerCache> messageHandlers = new ArrayList<>();
    private static final String WILDCARD_ALL = "#";
    private static final String WILDCARD_PART = "+";
    private static final String SEPARATOR = "/";
    
    // 获取所有MessagePattern注解中的值并存入列表
    public static List<String> getAndCacheMessagePatterns(ApplicationContext context) {
        return Arrays.asList(context.getBeanNamesForAnnotation(Handler.class))
                .parallelStream()
                .map(context::getBean) // 所有应用了Handler注解的bean
                .flatMap(bean -> {
                    Class<?> handlerClass = bean.getClass();
                    List<HandlerCache> mhs = new ArrayList<>();
                    // 所有应用了MessagePatter注解的函数
                    Method[] methods = handlerClass.getDeclaredMethods();
                    for (Method m: methods) {
                        MessagePattern mp = m.getDeclaredAnnotation(MessagePattern.class);
                        if (mp != null && !isMessageHandlersHas(mp.value())) {
                            Class<?>[] parameterTypes = m.getParameterTypes();
                            mhs.add(new HandlerCache(mp.value(), bean, m, parameterTypes.length > 1 ? parameterTypes[1] : String.class));
                        }
                    }
                    messageHandlers.addAll(mhs);
                    return mhs.stream();
                })
                .map(HandlerCache::getMessagePattern)
                .collect(Collectors.toList());
    }

    private static boolean isMessageHandlersHas(String pattern) {
        return messageHandlers.stream()
                .filter(hc -> hc.getMessagePattern().equals(pattern)).findFirst().orElse(null) != null;
    }

    private static boolean isMatchPattern(String pattern, String topic) {
        if (!pattern.contains(WILDCARD_ALL) && !pattern.contains(WILDCARD_PART)) {
            // 不是通配订阅
            return pattern.equals(topic);
        }
        // 订阅中含有通配符
        String[] patternSegments = pattern.split(SEPARATOR);
        String[] topicSegments = topic.split(SEPARATOR);
        for (int i = 0; i < patternSegments.length; i ++) {
            // 对各个主题层级进行匹配
            String curPatternSeg = patternSegments[i];
            String curTopicSeg = topicSegments.length > i ? topicSegments[i]: null;
            if (curTopicSeg == null && !curPatternSeg.equals(WILDCARD_ALL)) {
                // 主题层级比订阅层级少且相应的订阅层级不是#
                return false;
            }
            if ("".equals(curTopicSeg) && "".equals(curPatternSeg)) {
                // 可能以 / 开头
                continue;
            }
            if (curPatternSeg.equals(WILDCARD_ALL)) {
                // 是#通配,则#必须是最后一级
                return i == patternSegments.length - 1;
            }
            // 当前层级不是通配,需要字符串相等
            if (!curPatternSeg.equals(WILDCARD_PART) && !curPatternSeg.equals(curTopicSeg)) {
                return false;
            }
        }
        return patternSegments.length == topicSegments.length;
    }

    public static HandlerCache getHandlerMethod(String topic) {
        return messageHandlers.stream()
                        .filter(hc -> isMatchPattern(hc.getMessagePattern(), topic)).findFirst().orElse(null);
    }
}

其中HandlerCache保存了主题(MessagePattern中的值)、处理函数、消息体对应的实体类型(处理方法的第二个参数的类型)、处理函数所在对象的bean,如下:

package top.wteng.mqttbootintegration.entity;


import java.lang.reflect.Method;

public class HandlerCache {
    private Object handlerBean; // 处理函数所在的对象引用
    private Method handlerMethod; // 处理函数的反射
    private String messagePattern; // 主题,@MessagePattern注解中的值
    private Class<?> convertType; // 消息体对应的实体类型,即处理函数第二个参数的类型
    public HandlerCache(String messagePattern, Object handlerBean, Method handlerMethod, Class<?> convertType) {
        this.messagePattern = messagePattern;
        this.handlerBean = handlerBean;
        this.handlerMethod = handlerMethod;
        this.convertType = convertType;
    }
    public Object getHandlerBean() {
        return handlerBean;
    }

    public void setHandlerBean(Object handlerBean) {
        this.handlerBean = handlerBean;
    }

    public Method getHandlerMethod() {
        return handlerMethod;
    }

    public void setHandlerMethod(Method handlerMethod) {
        this.handlerMethod = handlerMethod;
    }

    public String getMessagePattern() {
        return messagePattern;
    }

    public void setMessagePattern(String messagePattern) {
        this.messagePattern = messagePattern;
    }

    public Class<?> getConvertType() {
        return convertType;
    }

    public void setConvertType(Class<?> convertType) {
        this.convertType = convertType;
    }
}

最后在启动类中定义mqtt相关的bean如下:

package top.wteng.mqttbootintegration;

import java.util.List;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.annotation.EnableAsync;

import top.wteng.mqttbootintegration.util.DispatchUtil;
import top.wteng.mqttbootintegration.util.HandlerBeanUtil;

@SpringBootApplication
@EnableAsync // 开启异步
public class MqttBootIntegrationApplication implements CommandLineRunner{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired private ApplicationContext applicationContext;
    @Autowired private DispatchUtil dispatchUtil;
    @Value("${mqtt.host}") private String mqttHost;
    @Value("${mqtt.port}") private String mqttPort;

    public static void main(String[] args) {
        SpringApplication.run(MqttBootIntegrationApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("MqttBootIntegrationApplication is running ...");
    }

    @Bean
    // mqtt接收频道
    public MessageChannel mqttReceiverChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer mqttReceiver() {
        // 取出所有主题进行订阅
        List<String> mps = HandlerBeanUtil.getAndCacheMessagePatterns(applicationContext);
        String[] mpArr = mps.toArray(new String[0]);
        String mqttUrl = String.format("tcp://%s:%s", mqttHost, mqttPort);
        String clientId = UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttUrl, clientId, mpArr);
        logger.info(String.format("connected to mqtt %s with client id %s ...", mqttUrl, clientId));
        logger.info(String.format("topic subscribed: %s", String.join(",", mpArr)));
        adapter.setOutputChannel(mqttReceiverChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttReceiverChannel")
    // 绑定了mqtt接收频道,收到消息后会触发此函数
    public MessageHandler messageHandler() {
        // 分发消息,异步处理
        return message ->
                dispatchUtil.dispatchMessage((String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), (String)message.getPayload());
    }
}

其中用于匹配处理函数的dispatchMessage单独定义到了一个DispatchUtil类中,这是因为要为其应用@Async注解实现异步,但该注解不能应用于EnableAsync标记的同一类中,不然不会生效,DispatchUtil实现如下:

package top.wteng.mqttbootintegration.util;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import top.wteng.mqttbootintegration.entity.HandlerCache;

import java.lang.reflect.InvocationTargetException;

@Component
public class DispatchUtil {
    private final Logger logger = LoggerFactory.getLogger(DispatchUtil.class);

    @Async // 实现异步
    public void dispatchMessage(String topic, String payload) {
        HandlerCache hc = HandlerBeanUtil.getHandlerMethod(topic);
        if (hc == null) {
            logger.warn(String.format("not find handler method for topic %s ...", topic));
            return;
        }
        try {
            Class<?> type = hc.getConvertType();
            hc.getHandlerMethod().invoke(hc.getHandlerBean(), topic, type.getTypeName().equals("java.lang.String") ? payload: JSON.parseObject(payload, type));
        } catch (InvocationTargetException e) {
            logger.warn(String.format("invoke handler method for topic %s failed ...", topic));
        } catch (IllegalAccessException e) {
            logger.warn(String.format("access handler method for topic %s failed ...", topic));
        }
    }
}

至此,集成完毕,启动程序并发送几个消息,即可看到打印输出:

mqtt-boot-async.png

完整示例看这里