RabbitMq
配置
rabbitmq
实现延迟队列有两种方式,一是利用死信队列,二是利用rabbitmq-delayed-message-exchange
插件,插件的方式直接提供了延迟队列的实现,实现起来更为简单方便,直接安装启用即可,docker
下提供了相应的镜像,可以直接拉取运行,不需要额外配置,docker-compose
文件如下:
version: '3.1'
services:
rabbitmq:
image: heidiks/rabbitmq-delayed-message-exchange
container_name: rabbitmq
privileged: true
network_mode: bridge
ports:
- 15672:15672
- 5672:5672
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: rabbit
RABBITMQ_DEFAULT_PASS: rabbit
使用非docker
方式部署时,由于插件是第三方插件,默认没有包含在初始安装包里,因此需要先下载编译好的.ez
插件,直接在github
下载即可,然后将其拷贝到插件安装目录,启用即可:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
SpringBoot
中实现
依赖配置
加入amqp
依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yaml
中配置rabbitmq
信息:
spring:
rabbitmq:
username: rabbit
password: rabbit
host: localhost
port: 5672
publisher-returns: true
队列及交换机配置
首先将交换机名称、队列名称、路由名称封装到常量里:
package ink.labrador.mq.constants;
public class DelayQueueConstants {
private final static String ExchangeName = "delayTest.direct"; // 交换机名称
private final static String QueueName = "delayTest.queueTest"; // 队列名称
private final static String RouteKey = "delayTest.queueTest"; // 路由值
}
创建配置类,配置交换机与队列信息,交换机或队列不存在时,自动声明创建:
package ink.labrador.mq.configuration;
import ink.labrador.mq.constants.DelayQueueConstants;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class QueueConfiguration {
/**
* 声明交换机、队列及绑定关系,不存在时自动创建
* @param connectionFactory 连接工厂,自动注入
* @return 管理者对象
*/
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(delayedExchange());
rabbitAdmin.declareQueue(delayQueue());
rabbitAdmin.declareBinding(delayBinding());
return rabbitAdmin;
}
/**
* 创建交换机
* @return 交换机
*/
@Bean
public CustomExchange delayedExchange() {
// 因为是第三方插件,需要创建自定义交换机,声明类型为 x-delayed-message
Map<String, Object> map = new HashMap<String, Object>() {{
put("x-delayed-type", "direct"); // 路由类型
}};
return new CustomExchange(DelayQueueConstants.ExchangeName, "x-delayed-message", true, false, map);
}
/**
* 创建队列
* @return 延迟队列
*/
@Bean
public Queue delayQueue() {
return new Queue(DelayQueueConstants.QueueName);
}
/**
* 创建队列与交换机绑定关系
* @return 列与交换机绑定关系
*/
@Bean
public Binding delayBinding() {
return BindingBuilder
.bind(delayQueue())
.to(delayedExchange())
.with(DelayQueueConstants.RouteKey)
.noargs();
}
}
监听队列
创建一个监听者,监听延迟队列:
package ink.labrador.mq.listener;
import ink.labrador.mq.constants.DelayQueueConstants;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = {DelayQueueConstants.QueueName}) // 监听的队列名
public class DelayQueueListener {
// 处理函数
@RabbitHandler
public void handleQueueMessage(String message) {
System.out.println("message received at " + System.currentTimeMillis() + ": " + message);
}
}
测试
创建测试类,发送消息进行验证:
package ink.labrador.mq;
import ink.labrador.mq.constants.DelayQueueConstants;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DelayQueueTest {
@Autowired private AmqpTemplate amqpTemplate;
@Test
void sendDelayMessage() throws InterruptedException {
amqpTemplate.convertAndSend(
DelayQueueConstants.ExchangeName,
DelayQueueConstants.RouteKey,
"I'm delay message",
message -> {
// 设置延迟时间,毫秒,通过x-delay头部控制,延迟3秒
message.getMessageProperties().setHeader("x-delay", 3 * 1000);
return message;
}
);
System.out.println("message was sent at " + System.currentTimeMillis());
// 阻塞4秒,确保能收到消息
Thread.sleep(4 * 1000);
}
}
运行即可看到输出: