消息队列RabbitMQ实战 - 消费者的手动应答
在工作队列模式中,一个消费者完成一个任务可能需要消耗一段时间,如果其中一个消费者在执行一个耗时较长的任务过程中挂掉了,会出现什么情况?
由于生产者是轮询分发消息,消费者在执行耗时较长的任务过程中还会不断地接收到消息,这些消息都在等待前面耗时较长的任务执行完毕,如果此时消费者挂掉,按照现有消费者的代码:
1 | channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); |
由于设置了参数autoAck=true
,已被接收但未被消费的消息和执行了一半的消息均会丢失。
autoAck=true
的含义是:一旦RabbitMQ
将消息传递给消费者,RabbitMQ
会立即将消息标记为删除。
消息确认
我们肯定不希望出现消息丢失的情况,理想状态是:如果一个消费者挂掉,消息能被转递到另一个正常的消费者。
RabbitMQ
为了确保消息不丢失,提供了手动应答的方式。由消费者主动向RabbitMQ
回应一个ack
,告诉RabbitMQ
该消息已被接收和处理,可以自由的进行删除。如果消费者在没有发送ack
的情况下挂掉,RabbitMQ
将认为消息未完全处理从而让消息重新进行排队,这时如果有其它消费者在线,RabbitMQ
会迅速将该消息投递给其它消费者进行处理。
这样,就保证了即使消费者挂掉,只要未回应ack
,消息就能被重新排队,从而可以被投递到其它消费者处,避免消息丢失问题。
手动应答的方法
Java
客户端提供了三个方法用于手动应答:
Channel.basicAck(long deliveryTag, boolean multiple)
:用于肯定应答。消息已经正确处理完毕,可以进行删除,可设置multiple=true
进行批量应答。Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:用于否定应答。消息未正确处理。可设置multiple=true
进行批量应答,设置requeue=true
将消息重新入队。Channel.basicReject(long deliveryTag, boolean requeue)
:用于否定应答。AMQP
规范中定义的方法,与basicNack
相比仅不支持批量。
multiple
参数
例如当前channel
上有deliveryTag=5, 6, 7, 8
的多条消息,当前被消费的消息的deliveryTag
等于8
,那么当multiple
被设置为true
时,前面deliveryTag=5, 6, 7
中还未应答的消息均会被确认应答;而当multiple
被设置为false
时,则只会应答当前deliveryTag=8
这条消息。一般在生产环境中建议设置multiple=false
,不进行批量应答。
开启手动应答只需要消费者设置autoAck=false
。下面我们编写手动应答的示例代码。
编写生产者
创建手动应答消息生产者AckProducer
类,仍然是从控制台读取消息内容进行发送。代码如下:
1 | package com.sunchaser.sparrow.middleware.mq.rabbitmq.ack; |
编写消费者
创建手动应答消息消费者AckConsumer
类,编写代码如下:
1 | package com.sunchaser.sparrow.middleware.mq.rabbitmq.ack; |
手动应答务必要在消息处理逻辑完成后执行,这里使用Thread.sleep
睡眠模拟消息处理逻辑的耗时。
启动生产者和两个消费者
生产者直接进行启动,消费者启动时设置其中一个消费者每次消费时sleep
睡眠1
秒,记为C1
,另外一个消费者每次消费时sleep
睡眠5
秒,记为C2
。
全部启动成功后我们在生产者的控制台迅速发送多条消息,根据工作队列模式的特点,生产者生产的消息将轮询地投递给两个消费者C1
和C2
。由于C1
每次消费仅sleep
睡眠1
秒,消费速度较快,很快就将消息全部消费成功,而C2
每次消费会sleep
睡眠5
秒,它还在睡眠等待,此时如果C2
挂掉(手动停止程序),其channel
中未进行手动ack
的消息将被重新排队转递给C1
。
忘记手动应答
设置autoAck=false
后很可能忘记调用basicAck
进行应答,这时消息虽然会被消费处理(执行消息处理逻辑),但消息的状态会一直处于Unacked
状态,一旦当前消费者停止或重启,其channel
中存在的Unacked
状态的消息就会被重新排队,从而出现重复投递,而如果另外的消费者也不进行ack
,该消息将始终处于Unacked
状态,这些消息也将堆积在RabbitMQ
服务端内存中。
RabbitMQ
针对上述情况提供了消费者手动应答的强制超时时间,默认为30
分钟。如果一条消息超过了强制超时时间仍未进行ack
应答,则其对应channel
将记录PRECONDITION_FAILED
异常并关闭,该channel
中所有未ack
的消息将被重新排队。
强制超时时间可在
rabbitmq.conf
文件中配置,单位为毫秒:
1
2 # 30分钟(30 * 60 * 1000毫秒)
consumer_timeout = 1800000
回调出现异常
在DeliverCallback
回调中,如果出现异常未进行捕获,RabbitMQ
客户端会关闭信道channel
,当前消费者线程虽然仍存活,但却无法再接收到任何消息,已接收的消息会被重新排队。关闭channel
的逻辑可查看com.rabbitmq.client.impl.StrictExceptionHandler#handleConsumerException
方法。