在开始使用消息分组之前,我们必须手动创建分组才行,以下是几个和 Stream 分组有关的命令,我们先来学习一下它的使用。
1 | 127.0.0.1:6379> xgroup create mq group1 0-0 |
相关语法:
1 | xgroup create stream-key group-key ID |
其中:
如果要从当前最后一条消息向后读取,使用 $
即可,命令如下:
1 | 127.0.0.1:6379> xgroup create mq group2 $ |
1 | 127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq > |
相关语法:
1 | xreadgroup group group-key consumer-key streams stream-key |
其中:
>
表示读取下一条消息;xreadgroup 命令和 xread 使用类似,也可以设置阻塞读取,命令如下:
1 | 127.0.0.1:6379> xreadgroup group group1 c2 streams mq > |
此时打开另一个命令行创建使用 xadd 添加一条消息,阻塞命令执行结果如下:
1 | 127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > |
接收到消息之后,我们要手动确认一下(ack),命令如下:
1 | 127.0.0.1:6379> xack mq group1 1580959593553-0 |
相关语法:
1 | xack key group-key ID [ID ...] |
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:
1 | 127.0.0.1:6379> xpending mq group1 |
1. 查询流信息
1 | 127.0.0.1:6379> xinfo stream mq |
相关语法:
1 | xinfo stream stream-key |
2. 查询消费组消息
1 | 127.0.0.1:6379> xinfo groups mq |
相关语法:
1 | xinfo groups stream-key |
3. 查看消费者组成员信息
1 | 127.0.0.1:6379> xinfo consumers mq group1 |
相关语法:
1 | xinfo consumers stream group-key |
1 | 127.0.0.1:6379> xgroup delconsumer mq group1 c1 |
相关语法:
1 | xgroup delconsumer stream-key group-key consumer-key |
1 | 127.0.0.1:6379> xgroup destroy mq group1 |
相关语法:
1 | xgroup destroy stream-key group-key |
接下来我们使用 Jedis 来实现 Stream 分组消息队列,代码如下:
1 | import com.google.gson.Gson; |
以上代码运行结果如下:
1 | 消息添加成功 ID:1580971482344-0 |
其中,jedis.xreadGroup() 方法的第五个参数 noAck 表示是否自动确认消息,如果设置 true 收到消息会自动确认(ack)消息,否则则需要手动确认。
注意:Jedis 框架要使用最新版,低版本 block 设置大于 0 时,会有 bug 抛连接超时异常。
可以看出,同一个分组内的多个 consumer 会读取到不同消息,不同的 consumer 不会读取到分组内的同一条消息。
本文我们介绍了 Stream 分组的相关知识,使用 Jedis 的 xreadGroup() 方法实现了消息的阻塞读取,并且使用此方法自带 noAck 参数,实现了消息的自动确认,通过本文我们也知道了,一个分组内的多个 consumer 会轮询收到消息队列的消息,并且不会出现一个消息被多个 consumer 读取的情况。
如果你看了本文的知识还是觉得没看懂,那是因为你没有结合实践去理解,所以如果对本文还有疑问,跟着本文一步一步实践起来吧。