消息队列RabbitMQ实战 - 主题(topic)类型交换机(主题模式) | 字数总计: 2.1k | 阅读时长: 8分钟 | 阅读量: |
前面我们用direct
直接类型交换机重建了日志系统。它可以让消费者有选择性的订阅消息,但这个选择还不够灵活。例如我们的日志类型有info.auth
、info.cron
、error.auth
及error.cron
等等,类型非常多,有的消费者希望能订阅所有的消息,有的消费者希望能订阅其中部分消息,但有的消费者只想订阅其中一个类型例如error.cron
类型的消息,如果使用direct
直接类型交换机,我们就需要给订阅所有消息的消费者提供全量类型的绑定键。如果类型特别多,特别是当这些绑定键有一些共同特征,例如有相同的前缀info
或后缀cron
等时,我们的代码写法就显得很不优雅。
topic
主题发送到topic
主题交换机的消息不能随意指定routingKey
路由键,它必须是单词列表,由点进行分隔。单词可以任意,一般会与消息内容有关联。例如:“stock.used.nyse
”、“nyse.vmw
”、“quick.orange.rabbit
”。路由键中可以有任意多的单词,但最多不能超过255
个字节。
队列的绑定键也必须采用相同的格式。但是绑定键有两个特殊规则:
*
(星号)可以只替换一个单词。#
(井号)可以替换零个或多个单词。主题类型交换机投递消息的原则有点类似于direct
直接交换机:消息只进入和路由键相匹配的绑定键的所有队列。
举个栗子
我们将发送描述动物的消息。消息使用由三个单词(包含两个点)组成的路由键发送。路由键中的第一个单词表示速度,第二个单词表示颜色,第三个单词表示物种:“<speed>.<colour>.<species>
”。然后创建了三个绑定:队列Q1
使用绑定键“*.orange.*
”进行绑定;队列Q2
使用绑定键“*.*.rabbit
”和“lazy.#
”进行绑定。其含义为:队列Q1
订阅了所有橙色orange
动物的消息;队列Q2
订阅了兔子rabbit
和懒惰lazy
动物的消息。
我们来看一些路由键的示例:
quick.orange.rabbit
lazy.orange.elephant
quick.orange.fox
lazy.brown.fox
lazy.pink.rabbit
quick.brown.fox
quick.orange.male.rabbit
lazy.orange.male.rabbit
主题交换机会根据*
和#
的规则去匹配这些路由键与队列Q1
、Q2
的绑定键。如果匹配成功则进行投递,否则将丢弃消息。下面是匹配情况说明:
路由键 队列 说明 quick.orange.rabbit
Q1
、Q2
匹配了*.orange.*
和*.*.rabbit
,被队列Q1
和Q2
接收。 lazy.orange.elephant
Q1
、Q2
匹配了*.orange.*
和*.*.rabbit
,被队列Q1
和Q2
接收。 quick.orange.fox
Q1
匹配了*.orange.*
,被队列Q1
接收。 lazy.brown.fox
Q2
匹配了lazy.#
,被队列Q2
接收。 lazy.pink.rabbit
Q2
匹配了*.*.rabbit
和lazy.#
,都是队列Q2
的绑定键,只会被队列Q2
接收一次。 quick.brown.fox
—— 不匹配任何绑定键,丢弃该消息。 quick.orange.male.rabbit
—— 不匹配任何绑定键,丢弃该消息。 lazy.orange.male.rabbit
Q2
匹配了lazy.#
,被队列Q2
接收。
下面用代码进行实现。
编写生产者 创建消息生产者EmitLogTopic
类,定义一个名为topic_logs
的topic
主题类型交换机,依次发送指定了路由键的消息。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;import java.util.HashMap;import java.util.Map;public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { try (Channel channel = RabbitMqHelper.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> bindingKeyMap = new HashMap <>(); bindingKeyMap.put("quick.orange.rabbit" , "被队列Q1 Q2接收\n" ); bindingKeyMap.put("lazy.orange.elephant" , "被队列Q1 Q2接收\n" ); bindingKeyMap.put("quick.orange.fox" , "被队列Q1接收\n" ); bindingKeyMap.put("lazy.brown.fox" , "被队列Q2接收\n" ); bindingKeyMap.put("lazy.pink.rabbit" , "虽然满足两个绑定但只会被队列Q2接收一次\n" ); bindingKeyMap.put("quick.brown.fox" , "没有队列接收,丢弃\n" ); bindingKeyMap.put("quick.orange.male.rabbit" , "没有队列接收,丢弃\n" ); bindingKeyMap.put("lazy.orange.male.rabbit" , "被队列Q2接收\n" ); for (Map.Entry<String, String> bindingKey : bindingKeyMap.entrySet()) { String routingKey = bindingKey.getKey(); String message = bindingKey.getValue(); channel.basicPublish(EXCHANGE_NAME, routingKey, null , message.getBytes()); System.out.println(" [x] Topic Exchange Sent '" + routingKey + "':" + message); } } } }
编写消费者 模仿之前构建的日志系统,我们将队列Q1
接收的消息在控制台打印,队列Q2
接收的消息写入文件。
创建消费者TopicReceiveLogsInConsole
类,使用绑定键*.orange.*
将队列Q1
和topic_logs
主题交换机进行绑定,将消费到的消息打印在控制台。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;public class TopicReceiveLogsInConsole { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q1" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*" ); System.out.println(" [*] " + queueName + " Waiting for messages. Print *.orange.* log in console. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody()); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } }
创建消费者TopicReceiveLogsInFile
类,使用绑定键*.*.rabbit
和lazy.#
将队列Q2
和topic_logs
主题交换机进行绑定,将消费到的消息写入topic_log
文件。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;import cn.hutool.core.io.FileUtil;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;import java.io.File;public class TopicReceiveLogsInFile { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q2" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit" ); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#" ); System.out.println(" [*] " + queueName + " Waiting for messages. Write *.*.rabbit and lazy.# log to file. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody()); File file = new File ("/Users/sunchaser/workspace/idea-projects/sunchaser-sparrow/middleware/mq/rabbitmq-sample/src/main/resources/topic_log.txt" ); FileUtil.appendUtf8String(message, file); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } }
启动生产者和两个消费者 直接运行三个类的main
方法,可看到生产者EmitLogTopic
控制台输出如下:
1 2 3 4 5 6 7 8 [x] Topic Exchange Sent 'quick.orange.male.rabbit':没有队列接收,丢弃 [x] Topic Exchange Sent 'quick.brown.fox':没有队列接收,丢弃 [x] Topic Exchange Sent 'lazy.orange.elephant':被队列Q1 Q2接收 [x] Topic Exchange Sent 'lazy.brown.fox':被队列Q2接收 [x] Topic Exchange Sent 'quick.orange.rabbit':被队列Q1 Q2接收 [x] Topic Exchange Sent 'quick.orange.fox':被队列Q1接收 [x] Topic Exchange Sent 'lazy.pink.rabbit':虽然满足两个绑定但只会被队列Q2接收一次 [x] Topic Exchange Sent 'lazy.orange.male.rabbit':被队列Q2接收
消费者TopicReceiveLogsInConsole
控制台输出如下:
1 2 3 4 [*] Q1 Waiting for messages. Print *.orange.* log in console. To exit press CTRL+C [x] Received 'lazy.orange.elephant':被队列Q1 Q2接收 [x] Received 'quick.orange.rabbit':被队列Q1 Q2接收 [x] Received 'quick.orange.fox':被队列Q1接收
消费者TopicReceiveLogsInFile
写入的topic_log.txt
文件内容如下:
1 2 3 4 5 被队列Q1 Q2接收 被队列Q2接收 被队列Q1 Q2接收 虽然满足两个绑定但只会被队列Q2接收一次 被队列Q2接收
总结 主题类型交换机功能十分强大。特定情况下可以实现其它类型交换机的效果。例如当绑定键为#
时,无论路由键是什么,它将接收所有消息,就和fanout
扇出类型交换机类似。又比如当绑定键中不存在特殊字符*
和#
时,匹配的规则和direct
直接类型交换机一样。