fanout扇出

扇出类型交换机非常简单,它将收到的所有消息广播到它知道的所有队列。这种模式被称为“发布/订阅”。

下面使用扇出类型交换机构建一个简单的日志系统,系统交互如下图所示:

简单日志系统

首先生产者EmitLog将日志消息发送到扇出类型交换机logs,然后交换机logs会绑定两个临时队列,消费者FanoutReceiveLogsInConsole消费其中一个队列的消息将日志输出在控制台,另一个消费者FanoutReceiveLogsInFile则消费另外一个队列的消息将日志写入磁盘,整个过程结束。下面我们来看代码实现。

编写生产者

创建消息生产者EmitLog类,定义一个名称为logsfanout类型交换机从控制台读取消息进行发送。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.util.Scanner;

/**
* fanout exchange 扇出(发布订阅)类型交换机 消息生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqHelper.getChannel()) {
// 定义fanout类型的交换机
// channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Fanout Exchange Sent '" + message + "'");
}
}
}
}

定义交换机类型时,可以直接使用字符串"fanout",也可以用枚举类BuiltinExchangeTypeFANOUT。推荐使用枚举,不容易出现拼写错误。

编写消费者FanoutReceiveLogsInConsole

创建消息消费者FanoutReceiveLogsInConsole类,将logs交换机绑定到一个临时队列,然后消费消息,将日志输出在控制台。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

/**
* fanout exchange 扇出(发布订阅)类型交换机 消息消费者:消费日志消息后在控制台进行打印
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class FanoutReceiveLogsInConsole {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 临时队列:具有随机生成名称的非持久、独占、自动删除的队列。
String queueName = channel.queueDeclare().getQueue();
// 交换机绑定队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. Print message in console. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

编写消费者FanoutReceiveLogsInFile

创建消息消费者FanoutReceiveLogsInFile类,将logs交换机绑定到一个临时队列,然后消费消息,将日志写入磁盘。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;

import cn.hutool.core.io.FileUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.io.File;
import java.nio.charset.StandardCharsets;

/**
* fanout exchange 扇出(发布订阅)类型交换机 消息消费者:消费日志消息后将消息内容写入文件
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class FanoutReceiveLogsInFile {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 临时队列:具有随机生成名称的非持久、独占、自动删除的队列。
String queueName = channel.queueDeclare().getQueue();
// 交换机绑定队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. Write message to file. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
File file = new File("/Users/sunchaser/workspace/idea-projects/sunchaser-sparrow/middleware/mq/rabbitmq-sample/src/main/resources/fanout_log.txt");
FileUtil.appendUtf8String(message, file);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

这里将字符串写入文件用的是hutool工具包中FileUtil类的appendString,需要在pom.xml中引入hutool的依赖:

1
2
3
4
5
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>

启动生产者和两个消费者

直接运行三个类的main方法,用生产者的控制台发送一条消息,可看到两个消费者都消费到了消息,一个将消息内容输出在了控制台,另一个将消息写入了文件。