Spring Boot整合RabbitMQ

核心依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

创建工程

创建Spring Boot项目chunyu-rabbitmq,引入amqp及一些必要的web依赖,完整pom.xml文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sunchaser-chunyu</artifactId>
<groupId>com.sunchaser.chunyu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>chunyu-rabbitmq</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<springboot.version>2.6.4</springboot.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>

基本配置

创建Spring Boot配置文件application.yml,添加以下基本配置项:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1 # host
port: 5672 # 端口号,默认值5672。
username: guest # 用户名,默认值guest。
password: guest # 密码,默认值guest。
virtual-host: / # 虚拟主机,默认值/。

消息发送

这里以topic主题类型交换机为例进行消息发送。

topic主题类型交换机可通过设置特殊的绑定键实现fanout扇出类型交换机和direct直接类型交换机的效果。

  • fanout扇出类型交换机:绑定键为#
  • direct直接类型交换机:绑定键中不包含*#

基本信息配置

使用配置文件配置交换机名称、队列名称及绑定键名称。

创建RabbitMQ属性配置信息类RabbitMQProperties,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.sunchaser.chunyu.rabbitmq.config.property;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 属性配置信息类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@ConfigurationProperties(prefix = "my.rabbitmq")
@Data
public class RabbitMQProperties {

/**
* 交换机名称
*/
private String topicExchangeName;

/**
* 队列名称
*/
private String topicQueueName;

/**
* 绑定键
*/
private String topicRoutingKey;
}

然后在application.yml中进行自定义配置,例如:

1
2
3
4
5
6
# 自定义的交换机、队列及绑定键的名称
my:
rabbitmq:
topic-exchange-name: boot_topic
topic-queue-name: boot_topic_queue
topic-routing-key: boot.#

主题交换机关系配置

配置交换机、队列及它们之间的绑定关系。

创建RabbitMQ交换机关系配置类BootTopicConfig,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.sunchaser.chunyu.rabbitmq.config;

import com.sunchaser.chunyu.rabbitmq.config.property.RabbitMQProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 交换机、队列、binding关系 配置类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
public class BootTopicConfig {

@Autowired
private RabbitMQProperties rabbitMQProperties;

/**
* 创建主题交换机。
* ExchangeBuilder中默认durable=true,持久化。
*
* @return TopicExchange
* @see org.springframework.amqp.core.TopicExchange
*/
@Bean
public Exchange bootTopicExchange() {
return ExchangeBuilder.topicExchange(rabbitMQProperties.getTopicExchangeName())
.build();
}

/**
* 创建队列。
* durable方法创建的是持久化队列
*
* @return Queue
*/
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(rabbitMQProperties.getTopicQueueName())
.build();
}

/**
* 交换机和队列进行绑定
*
* @return Binding
*/
@Bean
public Binding bindingBootQueueExchange() {
return BindingBuilder.bind(bootQueue())
.to(bootTopicExchange())
.with(rabbitMQProperties.getTopicRoutingKey())
.noargs();
}
}

消息投递

调用RabbitTemplate.convertAndSend方法将消息用指定的routingKey(路由键)发送到指定交换机boot_topic

创建消息生产者类RabbitMQProducer,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.sunchaser.chunyu.rabbitmq.mq.producer;

import com.sunchaser.chunyu.rabbitmq.config.property.RabbitMQProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* topic交换机 消息生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Component
public class RabbitMQProducer {

@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMQProperties rabbitMQProperties;

public void send(String msg, String routingKey) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getTopicExchangeName(),
routingKey,
msg
);
}
}

为了方便触发消息发送,我们暴露出一个HTTP接口。创建控制器类RabbitMQController,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.sunchaser.chunyu.rabbitmq.controller;

import com.sunchaser.chunyu.rabbitmq.mq.producer.RabbitMQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 消息发送入口
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@RestController
public class RabbitMQController {

@Autowired
private RabbitMQProducer rabbitMQProducer;

@GetMapping("/send")
public void send(String msg, String routingKey) {
rabbitMQProducer.send(msg, routingKey);
}
}

启动服务后访问 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 即可发送消息。

消息消费

使用@RabbitListener@RabbitHandler注解进行消息监听与处理。

创建消息消费者类RabbitMQConsumer,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.sunchaser.chunyu.rabbitmq.mq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 消息消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Component
@RabbitListener(queues = "${my.rabbitmq.topic-queue-name}")
@Slf4j
public class RabbitMQConsumer {

@RabbitHandler
public void listener(String msg, Message message, Channel channel) {
log.info("RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg={}, message={}", msg, message);
}
}

@RabbitHandler注解标记的listener方法的第一个参数msg为生产者发送的消息对象,类型为String;第二个参数message为标准AMQP协议的Message对象;第三个参数channel是消息发送的信道。

启动服务后访问 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 发送消息,消费者接收到消息后将消息内容打印在控制台:

1
2022-04-26 17:17:01.315  INFO 20092 --- [ntContainer#0-1] c.s.c.r.m.c.RabbitMQConsumer       : RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg=xxx, message=(Body:'xxx' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=boot_topic, receivedRoutingKey=boot.xxx, deliveryTag=1, consumerTag=amq.ctag-J6pgj9XCNuV9LeCRkY825g, consumerQueue=boot_topic_queue])

至此,我们就完成了Spring BootRabbitMQ的整合,可以对消息进行简单的发送与接收。

发送自定义实体消息

实际业务中,不可能仅发送String类型的消息。如果不是直接将消息转化为byte数组并设置到org.springframework.amqp.core.Message类中进行发送,Spring Boot AMQP会使用MessageConverter消息格式转化器在消息发送和接收时自动将消息内容序列化为byte数组或将byte数组反序列化为消息,默认使用的转化器是SimpleMessageConverter,前面我们发送的String类型的消息能被正常接收就是有转化器的存在。它支持以下三种格式的消息:

  1. byte数组
  2. String
  3. 实现了java.io.Serializable接口的对象。

例如现在我们想将一个业务DTO对象作为消息进行发送,默认情况下需要DTO类实现Serializable接口。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.sunchaser.chunyu.rabbitmq.model;

import lombok.Data;

import java.io.Serializable;

/**
* 自定义实体消息
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/26
*/
@Data
public class MsgDTO implements Serializable {
private static final long serialVersionUID = -51410032238146012L;
private String msg;
}

然后我们就可以将MsgDTO类的对象作为消息进行发送了。消息生产者代码如下:

1
2
3
4
5
6
7
public void send(MsgDTO msgDTO, String routingKey) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getTopicExchangeName(),
routingKey,
msgDTO
);
}

同样地,我们暴露一个HTTP接口便于触发消息发送。代码如下:

1
2
3
4
@PostMapping("/send")
public void send(@RequestBody MsgDTO msgDTO, String routingKey) {
rabbitMQProducer.send(msgDTO, routingKey);
}

接下来在消费端RabbitMQConsumer类中,我们可以提供一个重载的listener方法进行消费。代码示例如下:

1
2
3
4
@RabbitHandler
public void listener(MsgDTO msg, Message message, Channel channel) {
log.info("RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg={}, message={}", msg, message);
}

重启服务后使用命令curl --location --request POST 'localhost:8080/send?routingKey=boot.xxx' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'发送消息。控制台输出如下:

1
2022-04-26 17:01:11.244  INFO 22279 --- [ntContainer#0-1] c.s.c.r.m.c.RabbitMQConsumer       : RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg=MsgDTO(msg=xxx), message=(Body:'{"msg":"xxx"}' MessageProperties [headers={__TypeId__=com.sunchaser.chunyu.rabbitmq.model.MsgDTO}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=boot_topic, receivedRoutingKey=boot.xxx, deliveryTag=1, consumerTag=amq.ctag-MpLMZE0hHfGnLzFrc_XoCQ, consumerQueue=boot_topic_queue])

Jackson2JsonMessageConverter

SimpleMessageConverter消息格式转换器序列化自定义实体类时使用的是JDK的序列化方式,性能略差,且不支持跨语言发送消息,不推荐使用。实际生产环境中通常会使用Jackson2JsonMessageConverter转化器,它基于JSON格式,底层用的是Jackson

Spring Boot配置Jackson2JsonMessageConverter转化器的方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.sunchaser.chunyu.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Objects;

/**
* RabbitMQ 基本配置类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@Slf4j
public class RabbitMQConfig {

/**
* 消息内容使用JSON序列化和反序列化
* 消息生产端RabbitTemplate会通过RabbitTemplateConfigurer自动注入该Bean
* 消息消费端SimpleRabbitListenerContainerFactory会自动注入该Bean
*
* @return Jackson2JsonMessageConverter
* @see org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
* @see org.springframework.boot.autoconfigure.amqp.RabbitTemplateConfigurer
* @see org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

消息可靠投递

实际生产环境中,我们经常需要考虑的一个问题是:如何保证消息不丢失?

对于RabbitMQ来说,可以从以下三个方面进行考虑:

  • 生产端Ack机制。消息生产者将消息投递至交换机时进行发布确认。
  • 消息回退机制。交换机路由消息至队列时,如果找不到合适的队列进行投递,则将消息回退给消息发起者。
  • 消费端Ack机制。消息消费者可以开启手动应答,当消息的处理逻辑执行成功后,由消费者主动发起肯定应答;如果消息处理过程中发生异常,则发起否定应答。如果应答过程中消费者线程挂掉无法进行应答,RabbitMQ还提供了应答超时时间进行兜底处理。

生产端Ack

配置spring.rabbitmq.publisher-confirm-type参数开启生产者的发布确认。共三个取值:

  • NONE:默认值。禁用发布确认。
  • CORRELATED:异步发布确认。触发ConfirmCallback回调。建议使用。
  • SIMPLE:同步发布确认。发送消息成功后主动调用RabbitTemplate#waitForConfirmsRabbitTemplate#waitForConfirmsOrDie方法等待确认结果。需要保证发送消息和等待确认这两步操作在同一个作用域内,可以使用RabbitTemplate.invoke方法实现。同步确认效率较低,不推荐使用。

示例配置如下:

1
2
3
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启生产者发布确认:异步模式。会触发ConfirmCallback回调。

创建回调类RabbitMQCallback,实现RabbitTemplate.ConfirmCallback回调接口重写confirm方法,同时注入RabbitTemplate并为其配置ConfirmCallback回调。完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.sunchaser.chunyu.rabbitmq.config;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.Objects;

/**
* RabbitMQ消息可靠投递 生产者发布确认回调
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@Slf4j
public class RabbitMQCallback implements InitializingBean, RabbitTemplate.ConfirmCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(this);
}

/**
* 生产者发布确认回调
* 需要开启spring.rabbitmq.publisher-confirm-type参数,建议设置为correlated,异步回调。
*
* @param correlationData 可保存消息ID和消息内容,需要发送方在消息发送时自行设置
* @param ack true:交换机收到消息;false:交换机未收到消息
* @param cause 投递到交换机失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = Objects.isNull(correlationData) ? "" : correlationData.getId();
if (ack) {
log.info("RabbitMQ - [ConfirmCallback] 生产端ID={}的消息投递至交换机 成功", id);
} else {
log.error("RabbitMQ - [ConfirmCallback] 生产端ID={}的消息投递至交换机 失败,原因:{}", id, cause);
}
}
}

发布正常回调

重启服务后通过 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 接口发送一条消息,消息正常投递到了交换机,回调ack=true。控制台输出如下:

1
2022-04-26 17:18:51.644  INFO 20147 --- [nectionFactory1] c.s.o.rabbitmq.config.RabbitMQCallback   : RabbitMQ - [ConfirmCallback] 生产端ID=的消息投递至交换机 成功

发布异常回调

修改消息发送者的代码,将消息发送给一个不存在的交换机,模拟发布异常。代码示例如下:

1
2
3
4
5
6
7
public void send(String msg, String routingKey) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getTopicExchangeName() + "xxx",
routingKey,
msg
);
}

重启服务后通过 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 接口发送一条消息,这时生产者会找不到指定交换机,消息投递失败,回调ack=false。控制台输出如下:

1
2
2022-04-26 17:19:23.544 ERROR 20166 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'boot_topicxxx' in vhost '/', class-id=60, method-id=40)
2022-04-26 17:19:23.548 ERROR 20166 --- [nectionFactory2] c.s.c.rabbitmq.config.RabbitMQCallback : RabbitMQ - [ConfirmCallback] 生产端ID=的消息投递至交换机 失败,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'boot_topicxxx' in vhost '/', class-id=60, method-id=40)

设置生产者ID

注意到控制台输出的生产端ID一直为空,该IDCorrelationData类的id字段,它是由发送方在消息发送时自行设置的。CorrelationData类还可存储原消息内容,可用于消息投递给交换机失败回调时进行重试处理。

RabbitTemplate有一个包含四个参数的convertAndSend重载的方法可用于设置CorrelationData。示例用法如下:

1
2
3
4
5
6
7
8
public void send(String msg, String routingKey) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getTopicExchangeName(),
routingKey,
msg,
new CorrelationData()
);
}

CorrelationData类的无参构造函数将ID设置为了UUID。重启服务后通过 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 接口发送一条消息,控制台输出如下:

1
2022-04-26 17:20:30.678  INFO 20203 --- [nectionFactory1] c.s.c.rabbitmq.config.RabbitMQCallback   : RabbitMQ - [ConfirmCallback] 生产端ID=b643ed26-a28c-43a1-9b12-af7aed3c18d5的消息投递至交换机 成功

消息回退机制

配置spring.rabbitmq.publisher-returns=true开启消息回退机制。当交换机找不到合适的队列发送消息时,触发ReturnsCallback回调,需要配合spring.rabbitmq.template.mandatory=true使用。

示例配置如下:

1
2
3
4
5
spring:
rabbitmq:
publisher-returns: true # 开启消息回退,消息路由不到合适的队列时,触发ReturnsCallback回调,配合RabbitTemplate的mandatory=true使用。
template:
mandatory: true # true:消息路由不到合适的队列时,强制触发ReturnsCallback回调。

让回调类RabbitMQCallback实现RabbitTemplate.ReturnsCallback回调接口重写returnedMessage方法,然后给RabbitTemplate配置ReturnsCallback回调。

RabbitMQCallback类完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.sunchaser.chunyu.rabbitmq.config;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import java.util.Objects;

/**
* RabbitMQ消息可靠投递 生产者发布确认回调和消息回退回调
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@Slf4j
public class RabbitMQCallback implements InitializingBean, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/**
* 生产者发布确认回调
* 需要开启spring.rabbitmq.publisher-confirm-type参数,建议设置为correlated,异步回调。
*
* @param correlationData 可保存消息ID和消息内容,需要发送方在消息发送时自行设置
* @param ack true:交换机收到消息;false:交换机未收到消息
* @param cause 投递到交换机失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = Objects.isNull(correlationData) ? "" : correlationData.getId();
if (ack) {
log.info("RabbitMQ - [ConfirmCallback] 生产端ID={}的消息投递至交换机 成功", id);
} else {
log.error("RabbitMQ - [ConfirmCallback] 生产端ID={}的消息投递至交换机 失败,原因:{}", id, cause);
}
}

/**
* 交换机路由消息至队列失败 - 消息回退回调
* 需要开启spring.rabbitmq.publisher-returns参数,设置为true。
* 同时需要配合spring.rabbitmq.template.mandatory参数使用,也设置为true。
*
* @param returned 退回的消息
*/
@Override
public void returnedMessage(@NonNull ReturnedMessage returned) {
log.error("RabbitMQ - [ReturnsCallback] 交换机路由消息至队列失败,消息退回发起者,消息内容为:{}", returned);
}
}

重启服务后通过 http://localhost:8080/send?msg=xxx&routingKey=xxxboot.xxx 接口发送消息,修改请求参数routingKey路由键为xxxboot.xxx,模拟交换机路由消息至队列失败。此时的情况是生产端投递消息给交换机成功,交换机路由消息至队列失败,触发ReturnsCallback回调。控制台输出如下:

1
2
2022-04-26 17:21:49.365 ERROR 20225 --- [nectionFactory1] c.s.c.rabbitmq.config.RabbitMQCallback   : RabbitMQ - [ReturnsCallback] 交换机路由消息至队列失败,消息退回发起者,消息内容为:ReturnedMessage [message=(Body:'xxx' MessageProperties [headers={spring_returned_message_correlation=66b1b1a1-8d16-4f1e-a615-980cc6dd299b}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=boot_topic, routingKey=xxxboot.xxx]
2022-04-26 17:21:49.366 INFO 20225 --- [nectionFactory2] c.s.c.rabbitmq.config.RabbitMQCallback : RabbitMQ - [ConfirmCallback] 生产端ID=66b1b1a1-8d16-4f1e-a615-980cc6dd299b的消息投递至交换机 成功

消费端Ack

配置spring.rabbitmq.listener.simple.acknowledge-mode=manual开启消费端手动ack应答。示例配置如下:

1
2
3
4
5
6
7
spring:
rabbitmq:
listener:
# 容器类型:默认值为simple,即SimpleMessageListenerContainer。
type: simple
simple:
acknowledge-mode: manual # manual:开启消费端手动ack应答。默认值为auto自动应答。

改造消费者RabbitMQConsumerlistener方法,使用try-catch语法包裹消息处理部分,并在处理完成后调用channel.basicAck方法进行肯定应答,在catch代码块中调用channel.basicNack方法进行否定应答。

channel.basicAck方法有两个参数:

  • deliveryTag:消息传递标记。可通过调用message.getMessageProperties().getDeliveryTag()获取。
  • multiple:是否批量应答。设置为true时会应答通道内该消息之前的所有未应答消息。建议设置为false不批量。

channel.basicNack方法在basicAck方法基础上新增一个参数requeue:否定应答后是否将消息重新投递至队列。可用于消费异常后将消息重新投递至队列进行重试。

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.sunchaser.chunyu.rabbitmq.mq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* 消息消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Component
@RabbitListener(queues = "${my.rabbitmq.topic-queue-name}")
@Slf4j
public class RabbitMQConsumer {

@RabbitHandler
public void listener(String msg, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg={}, message={}", msg, message);
if ("error".equals(msg)) {
throw new RuntimeException("error");
}
// 手动ack,肯定应答,multiple=false表示不批量
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("RabbitMQ - [RabbitMQConsumer] 消费端 消费消息时发生异常", e);
try {
// 否定应答,multiple=false表示不批量,requeue=false表示不重新投递至原队列
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
log.error("RabbitMQ - [RabbitMQConsumer] 消费端 否定应答消息时发生异常", ex);
}
}
}
}

当投递的消息内容为error时会抛出异常进行否定应答。重启服务后分别访问 http://localhost:8080/send?msg=xxx&routingKey=boot.xxx 发送正常消息和 http://localhost:8080/send?msg=error&routingKey=boot.xxx 发送”异常“消息。

正常消息控制台输出如下:

1
2
2022-04-26 18:07:32.538  INFO 20521 --- [ntContainer#0-1] c.s.c.r.m.c.RabbitMQConsumer       : RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg=xxx, message=(Body:'xxx' MessageProperties [headers={spring_listener_return_correlation=31a0cc2c-9551-442e-bbdb-99ce744d1357, spring_returned_message_correlation=04bc3584-c62b-4afa-ba09-3d13393dc7d9}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=boot_topic, receivedRoutingKey=boot.xxx, deliveryTag=3, consumerTag=amq.ctag-4hgZDTG0hVmjOA3nltqx0A, consumerQueue=boot_topic_queue])
2022-04-26 18:07:32.544 INFO 20521 --- [nectionFactory2] c.s.c.rabbitmq.config.RabbitMQCallback : RabbitMQ - [ConfirmCallback] 生产端ID=04bc3584-c62b-4afa-ba09-3d13393dc7d9的消息投递至交换机 成功

异常消息控制台输出如下:

1
2
3
4
5
6
7
2022-04-26 18:09:46.099  INFO 21303 --- [nectionFactory1] c.s.c.rabbitmq.config.RabbitMQCallback   : RabbitMQ - [ConfirmCallback] 生产端ID=7d88f590-b237-48b1-84b0-e9467dd3f969的消息投递至交换机 成功
2022-04-26 18:09:46.106 INFO 21303 --- [ntContainer#0-1] c.s.c.r.m.c.RabbitMQConsumer : RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg=error, message=(Body:'error' MessageProperties [headers={spring_listener_return_correlation=1cafb3d1-ceca-4c6d-9bf7-be1fe7955506, spring_returned_message_correlation=7d88f590-b237-48b1-84b0-e9467dd3f969}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=boot_topic, receivedRoutingKey=boot.xxx, deliveryTag=1, consumerTag=amq.ctag-0u75YXhiAWNFddHyktk44w, consumerQueue=boot_topic_queue])
2022-04-26 18:09:46.110 ERROR 21303 --- [ntContainer#0-1] c.s.c.r.m.c.RabbitMQConsumer : RabbitMQ - [RabbitMQConsumer] 消费端 消费消息时发生异常

java.lang.RuntimeException: error
at com.sunchaser.chunyu.rabbitmq.mq.consumer.RabbitMQConsumer.listener(RabbitMQConsumer.java:29) ~[classes/:na]
...

死信消息

在某些特殊情况下,正常队列中的消息可能会无法被消费到,这样的消息称为死信。通常,死信有以下三个来源:

  1. 消息到达TTL过期时间。
  2. 队列超过了长度限制。
  3. 消费者使用basic.nackbasic.reject否定应答消息,并将requeue参数设置为false

对于死信,我们可以为其配置一个特殊的交换机和队列,将死信转发到特殊交换机和队列上进行后续处理。

原理图

核心是在创建正常队列时为其配置死信交换机,同时可选择配置死信队列的绑定键。当正常队列中出现死信时,就会自动将其转发到死信交换机,然后路由到死信队列进行消费。

基本信息配置

在属性配置信息类RabbitMQProperties中添加正常交换机和死信交换机等字段,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.sunchaser.chunyu.rabbitmq.config.property;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 属性配置信息类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@ConfigurationProperties(prefix = "my.rabbitmq")
@Data
public class RabbitMQProperties {

// ...省略前文主题交换机及队列等字段

/**
* 正常交换机名称
*/
private String normalExchangeName;

/**
* 正常队列名称
*/
private String normalQueueName;

/**
* 正常队列绑定键
*/
private String normalRoutingKey;

/**
* 死信交换机名称
*/
private String deadLetterExchangeName;

/**
* 死信队列名称
*/
private String deadLetterQueueName;

/**
* 死信队列绑定键
*/
private String deadLetterRoutingKey;
}

然后在application.yml文件中进行配置。示例如下:

1
2
3
4
5
6
7
8
my:
rabbitmq:
normal-exchange-name: normal_exchange
normal-queue-name: normal_queue
normal-routing-key: normal.#
dead-letter-exchange-name: dead_exchange
dead-letter-queue-name: dead_queue
dead-letter-routing-key: dead.#

绑定正常交换机和正常队列

将正常收发消息的交换机和队列进行绑定,同时配置死信交换机。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.sunchaser.chunyu.rabbitmq.config;

import com.sunchaser.chunyu.rabbitmq.config.property.RabbitMQProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 死信队列 - 正常收发消息的交换机与队列关系配置
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/27
*/
@Configuration
public class NormalConfig {

@Autowired
private RabbitMQProperties rabbitMQProperties;

/**
* 正常交换机。
*
* @return TopicExchange
*/
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(rabbitMQProperties.getNormalExchangeName())
.build();
}

/**
* 创建队列。
* durable方法创建的是持久化队列
* deadLetterExchange方法设置死信交换机
* deadLetterRoutingKey方法设置死信队列绑定键
*
* @return Queue
*/
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(rabbitMQProperties.getNormalQueueName())
.deadLetterExchange(rabbitMQProperties.getDeadLetterExchangeName())
.deadLetterRoutingKey(rabbitMQProperties.getDeadLetterRoutingKey())
.build();
}

/**
* 交换机和队列进行绑定
*
* @return Binding
*/
@Bean
public Binding bindingNormalQueueExchange() {
return BindingBuilder.bind(normalQueue())
.to(normalExchange())
.with(rabbitMQProperties.getNormalRoutingKey())
.noargs();
}
}

调用deadLetterExchange方法会给队列配置x-dead-letter-exchange参数表示设置死信交换机,调用deadLetterRoutingKey方法会给队列配置x-dead-letter-routing-key参数表示设置死信交换机和队列的routingKey绑定键。

绑定死信交换机和死信队列

将死信交换机和队列进行绑定。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.sunchaser.chunyu.rabbitmq.config;

import com.sunchaser.chunyu.rabbitmq.config.property.RabbitMQProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 死信交换机与队列配置
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/27
*/
@Configuration
public class DeadLetterConfig {

@Autowired
private RabbitMQProperties rabbitMQProperties;

/**
* 死信交换机
*
* @return TopicExchange
*/
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder.topicExchange(rabbitMQProperties.getDeadLetterExchangeName())
.build();
}

/**
* 死信队列
*
* @return Queue
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(rabbitMQProperties.getDeadLetterQueueName())
.build();
}

/**
* 死信交换机和死信队列进行绑定
*
* @return Binding
*/
@Bean
public Binding bindingDeadQueueExchange() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(rabbitMQProperties.getDeadLetterRoutingKey())
.noargs();
}
}

发送带TTL过期时间的消息

让消息具有TTL过期时间有两种方式:

  1. 通过队列属性x-message-ttl进行设置。设置后队列中所有消息都具有相同的过期时间。
  2. 由发送方为每条消息单独设置TTL时间。更加灵活,推荐使用。

注意:如果两种方式同时使用,则以TTL时间设置的较短的为准。

RabbitTemplate有一个convertAndSend重载方法的参数中包含MessagePostProcessor类,可以通过MessagePostProcessor类为消息设置TTL过期时间。示例用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void send(MsgDTO msgDTO, String routingKey, String expire) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getNormalExchangeName(),
routingKey,
msgDTO,
message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration(expire);// 消息过期时间,单位ms。
return message;
},
new CorrelationData()
);
}

同样地,我们暴露一个HTTP接口便于触发TTL消息的发送。代码如下:

1
2
3
4
@PostMapping("/send-ttl")
public void send(@RequestBody MsgDTO msgDTO, String routingKey, String expire) {
rabbitMQProducer.send(msgDTO, routingKey, expire);
}

重启服务后使用命令curl --location --request POST 'localhost:8080/send-ttl?routingKey=normal.xxx&expire=10000' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'往正常队列中发送一条TTL=10s的消息。

如果消息到达TTL时间后还未被消费,则会被转发到配置好的死信队列中,可在控制台页面进行查看。

normal_queue队列中的消息数量Ready=Total=1,将在TTL=10s到期后被转发至dead_queue死信队列。

消费正常队列中的消息

消息体为MsgDTO自定义实体对象,如果与前文的消费逻辑一致,直接在@RabbitListener注解上添加队列名my.rabbitmq.normal-queue-name即可,示例如下:

1
@RabbitListener(queues = {"${my.rabbitmq.topic-queue-name}", "${my.rabbitmq.normal-queue-name}"})

这样两个队列中类型为MsgDTO的消息的消费逻辑就都是以下方法:

1
2
@RabbitHandler
public void listener(MsgDTO msg, Message message, Channel channel) {...}

消费死信队列中的消息

@RabbitListener注解上添加死信队列名my.rabbitmq.dead-letter-queue-name。代码示例如下:

1
2
3
4
5
@RabbitListener(queues = {
"${my.rabbitmq.topic-queue-name}",
"${my.rabbitmq.normal-queue-name}",
"${my.rabbitmq.dead-letter-queue-name}"
})

这样,死信队列中类型为MsgDTO的消息的消费逻辑也为:

1
2
@RabbitHandler
public void listener(MsgDTO msg, Message message, Channel channel) {...}

如果死信队列中的消息需要进行特殊告警等处理,可以进行单独的消费封装。这里是为了复用消费逻辑。

队列超过长度限制

给正常队列配置队列长度。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 创建队列。
* durable方法创建的是持久化队列
* deadLetterExchange方法设置死信交换机
* deadLetterRoutingKey方法设置死信队列绑定键
* maxLength方法设置队列长度
*
* @return Queue
*/
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(rabbitMQProperties.getNormalQueueName())
.deadLetterExchange(rabbitMQProperties.getDeadLetterExchangeName())
.deadLetterRoutingKey(rabbitMQProperties.getDeadLetterRoutingKey())
.maxLength(6)
.build();
}

给队列添加x-max-length参数以限制队列最大长度。这里限制最大长度为6。由于队列一旦创建后便无法修改其定义,所以我们需要去控制台手动删除队列,然后为了看到效果,我们先不要消费普通队列和死信队列中的消息,注释掉@RabbitListener注解中的${my.rabbitmq.normal-queue-name}${my.rabbitmq.dead-letter-queue-name}

生产者代码如下:

1
2
3
4
5
6
7
8
public void sendNormalExchange(MsgDTO msgDTO, String routingKey) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getNormalExchangeName(),
routingKey,
msgDTO,
new CorrelationData()
);
}

另外,我们还要暴露一个HTTP接口便于我们往正常队列中发送消息,控制器MQController代码如下:

1
2
3
4
@PostMapping("/send-normal")
public void sendNormalExchange(@RequestBody MsgDTO msgDTO, String routingKey) {
rabbitMQProducer.sendNormalExchange(msgDTO, routingKey);
}

重启服务后使用命令

1
2
3
4
for i in {1..10}
do
curl --location --request POST 'localhost:8080/send-normal?routingKey=normal.xxx' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'
done

一次性发送10条消息,观察控制台。

可看到normal_queue队列中存在6条消息,而dead_queue死信队列中存在4条。(测试完成后打开消费者的注释重启服务即可消费掉消息)

消费者否定应答(消息被拒)

修改MsgDTO类型消息消费者代码,当MsgDTO对象中的msg属性值为error时抛出异常,然后拒绝消息。代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RabbitHandler
public void listener(MsgDTO msg, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg={}, message={}", msg, message);
if ("error".equals(msg.getMsg())) {
throw new RuntimeException("error");
}
// 手动ack,肯定应答,multiple=false表示不批量
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("RabbitMQ - [RabbitMQConsumer] 消费端 消费消息时发生异常", e);
try {
// 否定应答,multiple=false表示不批量,requeue=false表示不重新投递至原队列,如果配置了死信交换机则会自动转发
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
log.error("RabbitMQ - [RabbitMQConsumer] 消费端 否定应答消息时发生异常", ex);
}
}
}

打开对${my.rabbitmq.normal-queue-name}正常队列的消费,同时暂时注释掉对${my.rabbitmq.dead-letter-queue-name}死信队列的消费。重启服务后是用以下命令

1
2
3
4
5
6
7
8
for i in {1..5}
do
if [ "$i" -eq "2" ]; then
curl --location --request POST 'localhost:8080/send-normal?routingKey=normal.xxx' --header 'Content-Type: application/json' --data-raw '{"msg": "error"}'
else
curl --location --request POST 'localhost:8080/send-normal?routingKey=normal.xxx' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'
fi
done

往正常队列中发送5条消息,其中第二条消息的msg字段为error,即该消息会被否定应答,进入死信队列。查看IDEA控制台可看到正常消费了四条消息,剩下一条消息消费失败抛出了异常;查看RabbitMQ控制台可看到死信队列中存在一条消息。

延迟消息

实现方式之一是给队列设置过期时间,基于死信来实现,和死信原理图一样,需要配置两个交换机和队列,给正常队列设置延迟时间但不进行消费,消费者仅监听死信队列,当到达指定延迟时间后,正常队列中的消息就会被转发到死信队列从而被消费,实现延迟消息的效果。此方式实现的延迟消息存在一定的缺陷:正常队列中的所有消息延迟时间都是相同的,如果想要实现不同的延迟时间,就需要不断地增加正常队列。

另一种方式是前文介绍的直接由生产者发送带TTL过期时间的消息,正常队列不设置过期时间,从而实现不同延迟时间,但是这样也会存在问题,不同延迟时间的消息在正常队列中是有序的,如果先发送一个延迟10秒的消息后发送一个延迟2秒的消息,RabbitMQ只会检查处于队首位置的消息是否到达过期时间,后面的消息不会被检查,所以就导致后发送的延迟2秒的消息会一直等待先发送的延迟10秒的消息被转发走才被转发,从而出现消息延迟时间不准确的问题。

基于插件的延迟消息

使用RabbitMQ社区提供的插件rabbitmq-delayed-message-exchange来实现延迟消息。下面基于Docker来安装使用该插件。

制作镜像

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/latest上下载最新.ez插件文件并存放至/plugins目录,然后使用命令rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange开启延迟消息插件。Dockerfile文件如下:

1
2
3
4
5
6
7
8
9
10
11
# 2022-05-04 21:11:39 最新rabbitmq-delayed-message-exchange插件支持的rabbitmq server版本为3.9.x.
# FROM rabbitmq:management-alpine
FROM rabbitmq:3.9.16-management-alpine

ARG URL=https://api.github.com/repos/rabbitmq/rabbitmq-delayed-message-exchange/releases/latest
RUN apk add curl jq wget && \
VERSION=`curl $URL | jq -r ".tag_name"` && \
wget -P /plugins https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/${VERSION}/rabbitmq_delayed_message_exchange-${VERSION}.ez && \
rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

LABEL maintainer=admin@lilu.org.cn

制作镜像命令参考:docker build -t sunchaserlilu/rabbitmq-management-delayed:latest .

参见:

Docker Compose启动

使用Docker Componse工具编排,docker-compose.yml文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
version: "3.9"

services:
rabbitmq-management:
restart: always
image: sunchaserlilu/rabbitmq-management-delayed:latest
container_name: rabbitmq
ports:
- 15672:15672
- 5672:5672
environment:
- TZ=Asia/Shanghai

执行命令:

1
docker-compose -f docker-compose.yml up -d

访问 http://127.0.0.1:15672 即可看到RabbitMQ控制台。

插件基本原理

基于插件的延迟消息原理图如下:

核心是一个类型为x-delayed-message的交换机,该交换机会先将消息保存至MnesiaErlang语言生态中的一个分布式数据库管理系统),然后尝试确认消息是否过期,如果过期,则将消息投递至绑定的队列,整个延迟消息过程结束。

基本信息配置

在属性配置信息类RabbitMQProperties中添加延迟交换机、延迟队列及绑定键字段。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.sunchaser.chunyu.rabbitmq.config.property;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ 属性配置信息类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/25
*/
@Configuration
@ConfigurationProperties(prefix = "my.rabbitmq")
@Data
public class RabbitMQProperties {

// ...省略前文字段

/**
* 延迟交换机名称
*/
private String delayedExchangeName;

/**
* 延迟队列名称
*/
private String delayedQueueName;

/**
* 延迟队列绑定键
*/
private String delayedRoutingKey;
}

然后在application.yml文件中进行配置。示例如下:

1
2
3
4
5
my:
rabbitmq:
delayed-exchange-name: delayed_exchange
delayed-queue-name: delayed_queue
delayed-routing-key: delayed.#

延迟交换机关系配置

配置延迟交换机、队列及它们之间的绑定关系。

创建延迟交换机关系配置类DelayedMessageConfig,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.sunchaser.chunyu.rabbitmq.config;

import com.google.common.collect.Maps;
import com.sunchaser.chunyu.rabbitmq.config.property.RabbitMQProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

/**
* 基于rabbitmq-delayed-message-exchange插件的延迟消息配置类
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/5/5
*/
@Configuration
public class DelayedMessageConfig {

@Autowired
private RabbitMQProperties rabbitMQProperties;

/**
* x-delayed-message类型的交换机
*
* @return CustomExchange
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(1);
arguments.put("x-delayed-type", ExchangeTypes.TOPIC);
return new CustomExchange(rabbitMQProperties.getDelayedExchangeName(), "x-delayed-message", true, false, arguments);
}

/**
* 延迟队列
*
* @return Queue
*/
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(rabbitMQProperties.getDelayedQueueName())
.build();
}

/**
* 延迟交换机和队列进行绑定
*
* @return Binding
*/
@Bean
public Binding bindingDelayedQueueExchange() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with(rabbitMQProperties.getDelayedRoutingKey())
.noargs();
}
}

延迟消息投递

通过MessagePostProcessor类为消息设置x-delay延迟时间。示例用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void send(MsgDTO msgDTO, String routingKey, Integer expire) {
rabbitTemplate.convertAndSend(
rabbitMQProperties.getDelayedExchangeName(),
routingKey,
msgDTO,
message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setDelay(expire); // 消息延迟时间,单位ms。
return message;
},
new CorrelationData()
);
}

同样地,我们暴露一个HTTP接口便于触发延迟消息的发送。代码如下:

1
2
3
4
@PostMapping("/send-delayed")
public void send(@RequestBody MsgDTO msgDTO, String routingKey, Integer delay) {
rabbitMQProducer.send(msgDTO, routingKey, delay);
}

启动服务后使用命令curl --location --request POST 'localhost:8080/send-delayed?routingKey=delayed.xxx&delay=3000' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'往延迟交换机中发送一条delay=3s的消息。

消费延迟消息

由于消息体为MsgDTO,且消费逻辑一致。可直接在消费者RabbitMQConsumer中的@RabbitListener注解上添加${my.rabbitmq.delayed-queue-name}延迟队列进行消费。代码示例如下:

1
2
3
4
5
6
@RabbitListener(queues = {
"${my.rabbitmq.topic-queue-name}",
// "${my.rabbitmq.normal-queue-name}",
"${my.rabbitmq.dead-letter-queue-name}",
"${my.rabbitmq.delayed-queue-name}"
})

重启服务后使用命令curl --location --request POST 'localhost:8080/send-delayed?routingKey=delayed.xxx&delay=3000' --header 'Content-Type: application/json' --data-raw '{"msg": "xxx"}'往延迟交换机中发送一条delay=3s的消息。控制台输出如下:

1
2
3
2022-05-05 17:11:06.845 ERROR 41081 --- [nectionFactory1] c.s.c.rabbitmq.config.RabbitMQCallback   : RabbitMQ - [ReturnsCallback] 交换机路由消息至队列失败,消息退回发起者,消息内容为:ReturnedMessage [message=(Body:'{"msg":"xxx"}' MessageProperties [headers={spring_returned_message_correlation=ea0839b0-9f6e-4efd-88e9-c449e04f8092, __TypeId__=com.sunchaser.chunyu.rabbitmq.model.MsgDTO}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, receivedDelay=3000, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=delayed_exchange, routingKey=delayed.xxx]
2022-05-05 17:11:06.849 INFO 41081 --- [nectionFactory1] c.s.c.rabbitmq.config.RabbitMQCallback : RabbitMQ - [ConfirmCallback] 生产端ID=ea0839b0-9f6e-4efd-88e9-c449e04f8092的消息投递至交换机 成功
2022-05-05 17:11:09.900 INFO 41081 --- [ntContainer#0-1] c.s.c.r.mq.consumer.RabbitMQConsumer : RabbitMQ - [RabbitMQConsumer] 消费端 消费到消息,msg=MsgDTO(msg=xxx), message=(Body:'{"msg":"xxx"}' MessageProperties [headers={spring_listener_return_correlation=021fc745-7417-4cba-9f6c-b7bdd32209f3, spring_returned_message_correlation=ea0839b0-9f6e-4efd-88e9-c449e04f8092, __TypeId__=com.sunchaser.chunyu.rabbitmq.model.MsgDTO}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=delayed_exchange, receivedRoutingKey=delayed.xxx, receivedDelay=3000, deliveryTag=1, consumerTag=amq.ctag-4hR_XSMUwvEYTX1lvrNx4Q, consumerQueue=delayed_queue])

可看到消息被延迟了3秒才被消费到。同时我们注意到控制台还输出了ReturnsCallback的回调日志,原因是延迟交换机确实没有将消息路由至队列,而是将消息暂存至了Mnesia中,等延迟时间到了才会路由至队列。建议在ReturnsCallback回调中根据交换机名称进行判断处理,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 交换机路由消息至队列失败 - 消息回退回调
* 需要开启spring.rabbitmq.publisher-returns参数,设置为true。
* 同时需要配合spring.rabbitmq.template.mandatory参数使用,也设置为true。
*
* @param returned 退回的消息
*/
@Override
public void returnedMessage(@NonNull ReturnedMessage returned) {
if (rabbitMQProperties.getDelayedExchangeName().equals(returned.getExchange())) {
// 处理rabbitmq-delayed-message-exchange插件的延迟交换机回调
return;
}
log.error("RabbitMQ - [ReturnsCallback] 交换机路由消息至队列失败,消息退回发起者,消息内容为:{}", returned);
}

个人建议:如果业务只需要固定延迟时间的延迟消息则优先选择基于死信+延迟队列的实现;否则选用插件。注意插件也存在一定局限性,如果业务中包含百万级别以上的延迟消息,该插件也不能保证延迟时间的绝对准确。详见https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

其它类型队列

优先队列

消息可按照指定的优先级进行顺序消费。它需要将队列设置为优先队列(调用QueueBuilder#maxPriority方法设置x-max-priority参数),同时消息发送时给消息指定优先级(调用MessageProperties#setPriority),然后将所有消息全部发送至队列后才能启动消费者进行消费(全部发送至队列后才能按照优先级进行排序,否则会是发一条消费一条)。实际应用场景并不多。

惰性队列

惰性队列会将消息尽可能的存储在磁盘中,只有当消费者消费到对应消息时才会加载到内存,极大的减小了内存开销,但同时也降低了消费者的消费速度。另外一点是消息存储在磁盘可以让RabbitMQ存储更多的消息。

设置惰性队列:调用QueueBuilder#lazy方法设置x-queue-mode参数为lazy

镜像队列

RabbitMQ主从集群中的主节点如果挂掉,其队列中未被消费的消息就会丢失,无法自动进行故障转移。镜像队列的做法是将主节点队列中的消息备份至集群其它节点上,形成整个集群的高可用。具体设置方法可参考https://www.rabbitmq.com/parameters.html#policies