前面我们用direct直接类型交换机重建了日志系统。它可以让消费者有选择性的订阅消息,但这个选择还不够灵活。例如我们的日志类型有info.authinfo.cronerror.autherror.cron等等,类型非常多,有的消费者希望能订阅所有的消息,有的消费者希望能订阅其中部分消息,但有的消费者只想订阅其中一个类型例如error.cron类型的消息,如果使用direct直接类型交换机,我们就需要给订阅所有消息的消费者提供全量类型的绑定键。如果类型特别多,特别是当这些绑定键有一些共同特征,例如有相同的前缀info或后缀cron等时,我们的代码写法就显得很不优雅。

topic主题

发送到topic主题交换机的消息不能随意指定routingKey路由键,它必须是单词列表,由点进行分隔。单词可以任意,一般会与消息内容有关联。例如:“stock.used.nyse”、“nyse.vmw”、“quick.orange.rabbit”。路由键中可以有任意多的单词,但最多不能超过255个字节。

队列的绑定键也必须采用相同的格式。但是绑定键有两个特殊规则:

  • *(星号)可以只替换一个单词。
  • #(井号)可以替换零个或多个单词。

主题类型交换机投递消息的原则有点类似于direct直接交换机:消息只进入和路由键相匹配的绑定键的所有队列。

举个栗子

python-five

我们将发送描述动物的消息。消息使用由三个单词(包含两个点)组成的路由键发送。路由键中的第一个单词表示速度,第二个单词表示颜色,第三个单词表示物种:“<speed>.<colour>.<species>”。然后创建了三个绑定:队列Q1使用绑定键“*.orange.*”进行绑定;队列Q2使用绑定键“*.*.rabbit”和“lazy.#”进行绑定。其含义为:队列Q1订阅了所有橙色orange动物的消息;队列Q2订阅了兔子rabbit和懒惰lazy动物的消息。

我们来看一些路由键的示例:

  • quick.orange.rabbit
  • lazy.orange.elephant
  • quick.orange.fox
  • lazy.brown.fox
  • lazy.pink.rabbit
  • quick.brown.fox
  • quick.orange.male.rabbit
  • lazy.orange.male.rabbit

主题交换机会根据*#的规则去匹配这些路由键与队列Q1Q2的绑定键。如果匹配成功则进行投递,否则将丢弃消息。下面是匹配情况说明:

路由键队列说明
quick.orange.rabbitQ1Q2匹配了*.orange.**.*.rabbit,被队列Q1Q2接收。
lazy.orange.elephantQ1Q2匹配了*.orange.**.*.rabbit,被队列Q1Q2接收。
quick.orange.foxQ1匹配了*.orange.*,被队列Q1接收。
lazy.brown.foxQ2匹配了lazy.#,被队列Q2接收。
lazy.pink.rabbitQ2匹配了*.*.rabbitlazy.#,都是队列Q2的绑定键,只会被队列Q2接收一次。
quick.brown.fox——不匹配任何绑定键,丢弃该消息。
quick.orange.male.rabbit——不匹配任何绑定键,丢弃该消息。
lazy.orange.male.rabbitQ2匹配了lazy.#,被队列Q2接收。

下面用代码进行实现。

编写生产者

创建消息生产者EmitLogTopic类,定义一个名为topic_logstopic主题类型交换机,依次发送指定了路由键的消息。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.util.HashMap;
import java.util.Map;

/**
* topic exchange 主题类型交换机 消息生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqHelper.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1 Q2接收\n");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1 Q2接收\n");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收\n");
bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收\n");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只会被队列Q2接收一次\n");
bindingKeyMap.put("quick.brown.fox", "没有队列接收,丢弃\n");
bindingKeyMap.put("quick.orange.male.rabbit", "没有队列接收,丢弃\n");
bindingKeyMap.put("lazy.orange.male.rabbit", "被队列Q2接收\n");
for (Map.Entry<String, String> bindingKey : bindingKeyMap.entrySet()) {
String routingKey = bindingKey.getKey();
String message = bindingKey.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Topic Exchange Sent '" + routingKey + "':" + message);
}
}
}
}

编写消费者

模仿之前构建的日志系统,我们将队列Q1接收的消息在控制台打印,队列Q2接收的消息写入文件。

创建消费者TopicReceiveLogsInConsole类,使用绑定键*.orange.*将队列Q1topic_logs主题交换机进行绑定,将消费到的消息打印在控制台。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

/**
* topic exchange 主题类型交换机 消息消费者:绑定队列Q1
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class TopicReceiveLogsInConsole {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 队列Q1
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(" [*] " + queueName + " Waiting for messages. Print *.orange.* log in console. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

创建消费者TopicReceiveLogsInFile类,使用绑定键*.*.rabbitlazy.#将队列Q2topic_logs主题交换机进行绑定,将消费到的消息写入topic_log文件。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.topic;

import cn.hutool.core.io.FileUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.io.File;

/**
* topic exchange 直接类型交换机 消息消费者:绑定队列Q2
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class TopicReceiveLogsInFile {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 队列Q1
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" [*] " + queueName + " Waiting for messages. Write *.*.rabbit and lazy.# log to file. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
File file = new File("/Users/sunchaser/workspace/idea-projects/sunchaser-sparrow/middleware/mq/rabbitmq-sample/src/main/resources/topic_log.txt");
FileUtil.appendUtf8String(message, file);
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':" + message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

启动生产者和两个消费者

直接运行三个类的main方法,可看到生产者EmitLogTopic控制台输出如下:

1
2
3
4
5
6
7
8
[x] Topic Exchange Sent 'quick.orange.male.rabbit':没有队列接收,丢弃
[x] Topic Exchange Sent 'quick.brown.fox':没有队列接收,丢弃
[x] Topic Exchange Sent 'lazy.orange.elephant':被队列Q1 Q2接收
[x] Topic Exchange Sent 'lazy.brown.fox':被队列Q2接收
[x] Topic Exchange Sent 'quick.orange.rabbit':被队列Q1 Q2接收
[x] Topic Exchange Sent 'quick.orange.fox':被队列Q1接收
[x] Topic Exchange Sent 'lazy.pink.rabbit':虽然满足两个绑定但只会被队列Q2接收一次
[x] Topic Exchange Sent 'lazy.orange.male.rabbit':被队列Q2接收

消费者TopicReceiveLogsInConsole控制台输出如下:

1
2
3
4
[*] Q1 Waiting for messages. Print *.orange.* log in console. To exit press CTRL+C
[x] Received 'lazy.orange.elephant':被队列Q1 Q2接收
[x] Received 'quick.orange.rabbit':被队列Q1 Q2接收
[x] Received 'quick.orange.fox':被队列Q1接收

消费者TopicReceiveLogsInFile写入的topic_log.txt文件内容如下:

1
2
3
4
5
被队列Q1 Q2接收
被队列Q2接收
被队列Q1 Q2接收
虽然满足两个绑定但只会被队列Q2接收一次
被队列Q2接收

总结

主题类型交换机功能十分强大。特定情况下可以实现其它类型交换机的效果。例如当绑定键为#时,无论路由键是什么,它将接收所有消息,就和fanout扇出类型交换机类似。又比如当绑定键中不存在特殊字符* 和#时,匹配的规则和direct直接类型交换机一样。