RabbitMq配置

rabbitmq实现延迟队列有两种方式,一是利用死信队列,二是利用rabbitmq-delayed-message-exchange插件,插件的方式直接提供了延迟队列的实现,实现起来更为简单方便,直接安装启用即可,docker下提供了相应的镜像,可以直接拉取运行,不需要额外配置,docker-compose文件如下:

version: '3.1'

services:
  rabbitmq:
    image: heidiks/rabbitmq-delayed-message-exchange
    container_name: rabbitmq
    privileged: true
    network_mode: bridge
    ports:
      - 15672:15672
      - 5672:5672
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: rabbit
      RABBITMQ_DEFAULT_PASS: rabbit

使用非docker方式部署时,由于插件是第三方插件,默认没有包含在初始安装包里,因此需要先下载编译好的.ez插件,直接在github下载即可,然后将其拷贝到插件安装目录,启用即可:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

SpringBoot中实现

依赖配置

加入amqp依赖如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yaml中配置rabbitmq信息:

spring:
  rabbitmq:
    username: rabbit
    password: rabbit
    host: localhost
    port: 5672
    publisher-returns: true

队列及交换机配置

首先将交换机名称、队列名称、路由名称封装到常量里:

package ink.labrador.mq.constants;

public class DelayQueueConstants {
    private final static String ExchangeName = "delayTest.direct"; // 交换机名称
    private final static String QueueName = "delayTest.queueTest"; // 队列名称
    private final static String RouteKey = "delayTest.queueTest"; // 路由值
}

创建配置类,配置交换机与队列信息,交换机或队列不存在时,自动声明创建:

package ink.labrador.mq.configuration;

import ink.labrador.mq.constants.DelayQueueConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfiguration {

    /**
     * 声明交换机、队列及绑定关系,不存在时自动创建
     * @param connectionFactory 连接工厂,自动注入
     * @return 管理者对象
     */
    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(delayedExchange());
        rabbitAdmin.declareQueue(delayQueue());
        rabbitAdmin.declareBinding(delayBinding());
        return rabbitAdmin;
    }

    /**
     * 创建交换机
     * @return 交换机
     */
    @Bean
    public CustomExchange delayedExchange() {
        // 因为是第三方插件,需要创建自定义交换机,声明类型为 x-delayed-message
        Map<String, Object> map = new HashMap<String, Object>() {{
            put("x-delayed-type", "direct");  // 路由类型
        }};
        return new CustomExchange(DelayQueueConstants.ExchangeName, "x-delayed-message", true, false, map);
    }

    /**
     * 创建队列
     * @return 延迟队列
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(DelayQueueConstants.QueueName);
    }

    /**
     * 创建队列与交换机绑定关系
     * @return 列与交换机绑定关系
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder
                .bind(delayQueue())
                .to(delayedExchange())
                .with(DelayQueueConstants.RouteKey)
                .noargs();
    }

}

监听队列

创建一个监听者,监听延迟队列:

package ink.labrador.mq.listener;

import ink.labrador.mq.constants.DelayQueueConstants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = {DelayQueueConstants.QueueName}) // 监听的队列名
public class DelayQueueListener {
    // 处理函数
    @RabbitHandler
    public void handleQueueMessage(String message) {
        System.out.println("message received at " + System.currentTimeMillis() + ": " + message);
    }
}

测试

创建测试类,发送消息进行验证:

package ink.labrador.mq;

import ink.labrador.mq.constants.DelayQueueConstants;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class DelayQueueTest {
    @Autowired private AmqpTemplate amqpTemplate;

    @Test
    void sendDelayMessage() throws InterruptedException {
        amqpTemplate.convertAndSend(
                DelayQueueConstants.ExchangeName,
                DelayQueueConstants.RouteKey,
                "I'm delay message",
                message -> {
                    // 设置延迟时间,毫秒,通过x-delay头部控制,延迟3秒
                    message.getMessageProperties().setHeader("x-delay", 3 * 1000);
                    return message;
                }
        );
        System.out.println("message was sent at " + System.currentTimeMillis());
        // 阻塞4秒,确保能收到消息
        Thread.sleep(4 * 1000);
    }
}

运行即可看到输出:

image-20220327120636554