在工作队列模式中,一个消费者完成一个任务可能需要消耗一段时间,如果其中一个消费者在执行一个耗时较长的任务过程中挂掉了,会出现什么情况?

由于生产者是轮询分发消息,消费者在执行耗时较长的任务过程中还会不断地接收到消息,这些消息都在等待前面耗时较长的任务执行完毕,如果此时消费者挂掉,按照现有消费者的代码:

1
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

由于设置了参数autoAck=true,已被接收但未被消费的消息和执行了一半的消息均会丢失。

autoAck=true的含义是:一旦RabbitMQ将消息传递给消费者,RabbitMQ会立即将消息标记为删除。

消息确认

我们肯定不希望出现消息丢失的情况,理想状态是:如果一个消费者挂掉,消息能被转递到另一个正常的消费者。

RabbitMQ为了确保消息不丢失,提供了手动应答的方式。由消费者主动向RabbitMQ回应一个ack,告诉RabbitMQ该消息已被接收和处理,可以自由的进行删除。如果消费者在没有发送ack的情况下挂掉,RabbitMQ将认为消息未完全处理从而让消息重新进行排队,这时如果有其它消费者在线,RabbitMQ会迅速将该消息投递给其它消费者进行处理。

这样,就保证了即使消费者挂掉,只要未回应ack,消息就能被重新排队,从而可以被投递到其它消费者处,避免消息丢失问题。

手动应答的方法

Java客户端提供了三个方法用于手动应答:

  • Channel.basicAck(long deliveryTag, boolean multiple):用于肯定应答。消息已经正确处理完毕,可以进行删除,可设置multiple=true进行批量应答。
  • Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):用于否定应答。消息未正确处理。可设置multiple=true进行批量应答,设置requeue=true将消息重新入队。
  • Channel.basicReject(long deliveryTag, boolean requeue):用于否定应答。AMQP规范中定义的方法,与basicNack相比仅不支持批量。

multiple参数

例如当前channel上有deliveryTag=5, 6, 7, 8的多条消息,当前被消费的消息的deliveryTag等于8,那么当multiple被设置为true时,前面deliveryTag=5, 6, 7中还未应答的消息均会被确认应答;而当multiple被设置为false时,则只会应答当前deliveryTag=8这条消息。一般在生产环境中建议设置multiple=false,不进行批量应答。

开启手动应答只需要消费者设置autoAck=false。下面我们编写手动应答的示例代码。

编写生产者

创建手动应答消息生产者AckProducer类,仍然是从控制台读取消息内容进行发送。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.sunchaser.sparrow.middleware.mq.rabbitmq.ack;

import com.rabbitmq.client.Channel;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.util.Scanner;

/**
* 手动应答 消息生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class AckProducer {
private static final String ACK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqHelper.getChannel()) {
channel.queueDeclare(ACK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", ACK_QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] ack_queue Sent '" + message + "'");
}
}
}
}

编写消费者

创建手动应答消息消费者AckConsumer类,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.sunchaser.sparrow.middleware.mq.rabbitmq.ack;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

/**
* 手动应答 消息消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class AckConsumer {
private static final String ACK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
Thread.sleep(1000);
// Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" [x] Received '" + new String(message.getBody()) + "'");

/*
* 手动应答
*
* @param deliveryTag 消息标记tag
* @param multiple 是否批量应答:false:不批量;true:批量。
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费时的回调逻辑");
};
// 设置autoAck = false开启手动应答
channel.basicConsume(ACK_QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}

手动应答务必要在消息处理逻辑完成后执行,这里使用Thread.sleep睡眠模拟消息处理逻辑的耗时。

启动生产者和两个消费者

生产者直接进行启动,消费者启动时设置其中一个消费者每次消费时sleep睡眠1秒,记为C1,另外一个消费者每次消费时sleep睡眠5秒,记为C2

全部启动成功后我们在生产者的控制台迅速发送多条消息,根据工作队列模式的特点,生产者生产的消息将轮询地投递给两个消费者C1C2。由于C1每次消费仅sleep睡眠1秒,消费速度较快,很快就将消息全部消费成功,而C2每次消费会sleep睡眠5秒,它还在睡眠等待,此时如果C2挂掉(手动停止程序),其channel中未进行手动ack的消息将被重新排队转递给C1

image-20220420193446990 image-20220420193552626 image-20220420193552626

忘记手动应答

设置autoAck=false后很可能忘记调用basicAck进行应答,这时消息虽然会被消费处理(执行消息处理逻辑),但消息的状态会一直处于Unacked状态,一旦当前消费者停止或重启,其channel中存在的Unacked状态的消息就会被重新排队,从而出现重复投递,而如果另外的消费者也不进行ack,该消息将始终处于Unacked状态,这些消息也将堆积在RabbitMQ服务端内存中。

RabbitMQ针对上述情况提供了消费者手动应答的强制超时时间,默认为30分钟。如果一条消息超过了强制超时时间仍未进行ack应答,则其对应channel将记录PRECONDITION_FAILED异常并关闭,该channel中所有未ack的消息将被重新排队。

强制超时时间可在rabbitmq.conf文件中配置,单位为毫秒:

1
2
# 30分钟(30 * 60 * 1000毫秒)
consumer_timeout = 1800000

回调出现异常

DeliverCallback回调中,如果出现异常未进行捕获,RabbitMQ客户端会关闭信道channel,当前消费者线程虽然仍存活,但却无法再接收到任何消息,已接收的消息会被重新排队。关闭channel的逻辑可查看com.rabbitmq.client.impl.StrictExceptionHandler#handleConsumerException方法。