前面我们通过消息手动应答的方式保证了即使消费者挂掉,消息也不会丢失。但是它不能保证RabbitMQ服务端停止的情况,一旦服务端停止,消息仍会丢失。

为了确保服务端停止消息仍不丢失,我们需要将队列和消息都进行持久化。

队列持久化

在声明队列时,设置durable=true将队列声明为持久的。

1
2
// durable:true(持久化队列)
channel.queueDeclare(PERSISTENT_QUEUE_NAME, true, false, false, null);

注意:RabbitMQ不允许使用不同的参数重新定义现有队列。所以我们应该使用前面未使用过的队列名。

消息持久化

在发送消息时,传递MessageProperties.PERSISTENT_TEXT_PLAIN参数将消息标记为持久的。

1
2
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
channel.basicPublish("", PERSISTENT_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

注意:这里并不能保证消息绝对的持久化,虽然传递MessageProperties.PERSISTENT_TEXT_PLAIN参数后RabbitMQ会将消息保存到磁盘,但是当RabbitMQ接收消息并且还没保存它时,仍然有很短的时间窗口。此外,RabbitMQ不会对每条消息都执行fsync(2)方法,它可能只是保存到缓存中,而不是真正写入磁盘。持久化保证并不强,但对于一般的工作队列来说可用性已经很高了,如果需要保证绝对持久化,可以使用生产者的发布确认。

持久化队列和消息的生产者完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sunchaser.sparrow.middleware.mq.rabbitmq.persistent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.util.Scanner;

/**
* 持久化消息和队列 生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class PersistentProducer {
private static final String PERSISTENT_QUEUE_NAME = "persistent_queue";

public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqHelper.getChannel()) {
// durable:true(持久化队列)
channel.queueDeclare(PERSISTENT_QUEUE_NAME, true, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息
channel.basicPublish("", PERSISTENT_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] persistent_queue Sent '" + message + "'");
}
}
}
}

公平消费

在工作队列模式中,消息是轮询分发给两个消费者的。看似公平,实则不然。例如,现在有两个消费者同时在消费,如果其中一个消费者处理速度很快,而另一个消费者处理速度较慢,那么处理速度快的消费者将很快处理完消息进入空闲状态;而处理速度慢的消费者将一直忙于处理消息。俗称:“旱的旱死,涝的涝死”。RabbitMQ并不知道消费者此时的状态,仍会轮询分发消息。

为了解决这个问题,我们可以调用basicQos方法并传递参数prefetchCount=1

1
2
// 设置不公平分发
channel.basicQos(1);

prefetchCount:预取值,表示channel上允许的未确认消息的最大数量。

RabbitMQ会查看消费者未进行应答的消息数量,如果某个消费者持有一条(prefetchCount=1)未应答的消息,则不会给该消费者投递新的消息,而是将消息分发给其它空闲的消费者。这才是真正的公平消费,“能者多劳”。

公平消费的消费者完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.sunchaser.sparrow.middleware.mq.rabbitmq.qos;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

/**
* 不公平分发 消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class QosConsumer {
private static final String PERSISTENT_QUEUE_NAME = "persistent_queue";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(1000);
// Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" [x] Received persistent_queue '" + new String(message.getBody()) + "'");

// 设置不公平分发
channel.basicQos(1);

/*
* 手动应答
*
* @param deliveryTag 消息标记tag
* @param multiple 是否批量应答:false:不批量;true:批量。
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费时的回调逻辑");
};
channel.basicConsume(PERSISTENT_QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}