前面我们通过消息手动应答的方式保证了即使消费者挂掉,消息也不会丢失。但是它不能保证RabbitMQ
服务端停止的情况,一旦服务端停止,消息仍会丢失。
为了确保服务端停止消息仍不丢失,我们需要将队列和消息都进行持久化。
队列持久化
在声明队列时,设置durable=true
将队列声明为持久的。
1 2
| // durable:true(持久化队列) channel.queueDeclare(PERSISTENT_QUEUE_NAME, true, false, false, null);
|
注意:RabbitMQ
不允许使用不同的参数重新定义现有队列。所以我们应该使用前面未使用过的队列名。
消息持久化
在发送消息时,传递MessageProperties.PERSISTENT_TEXT_PLAIN
参数将消息标记为持久的。
1 2
| // MessageProperties.PERSISTENT_TEXT_PLAIN:持久化消息 channel.basicPublish("", PERSISTENT_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
注意:这里并不能保证消息绝对的持久化,虽然传递MessageProperties.PERSISTENT_TEXT_PLAIN
参数后RabbitMQ
会将消息保存到磁盘,但是当RabbitMQ
接收消息并且还没保存它时,仍然有很短的时间窗口。此外,RabbitMQ
不会对每条消息都执行fsync(2)
方法,它可能只是保存到缓存中,而不是真正写入磁盘。持久化保证并不强,但对于一般的工作队列来说可用性已经很高了,如果需要保证绝对持久化,可以使用生产者的发布确认。
持久化队列和消息的生产者完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.persistent;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
import java.util.Scanner;
public class PersistentProducer { private static final String PERSISTENT_QUEUE_NAME = "persistent_queue";
public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqHelper.getChannel()) { channel.queueDeclare(PERSISTENT_QUEUE_NAME, true, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", PERSISTENT_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] persistent_queue Sent '" + message + "'"); } } } }
|
公平消费
在工作队列模式中,消息是轮询分发给两个消费者的。看似公平,实则不然。例如,现在有两个消费者同时在消费,如果其中一个消费者处理速度很快,而另一个消费者处理速度较慢,那么处理速度快的消费者将很快处理完消息进入空闲状态;而处理速度慢的消费者将一直忙于处理消息。俗称:“旱的旱死,涝的涝死”。RabbitMQ
并不知道消费者此时的状态,仍会轮询分发消息。
为了解决这个问题,我们可以调用basicQos
方法并传递参数prefetchCount=1
。
1 2
| // 设置不公平分发 channel.basicQos(1);
|
prefetchCount
:预取值,表示channel
上允许的未确认消息的最大数量。
RabbitMQ
会查看消费者未进行应答的消息数量,如果某个消费者持有一条(prefetchCount=1
)未应答的消息,则不会给该消费者投递新的消息,而是将消息分发给其它空闲的消费者。这才是真正的公平消费,“能者多劳”。
公平消费的消费者完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.qos;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
public class QosConsumer { private static final String PERSISTENT_QUEUE_NAME = "persistent_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" [x] Received persistent_queue '" + new String(message.getBody()) + "'");
channel.basicQos(1);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费时的回调逻辑"); }; channel.basicConsume(PERSISTENT_QUEUE_NAME, false, deliverCallback, cancelCallback); } }
|