消息队列RabbitMQ实战 - 扇出(fanout)类型交换机(发布订阅模式)
|字数总计:1.1k|阅读时长:4分钟|阅读量:|
fanout
扇出
扇出类型交换机非常简单,它将收到的所有消息广播到它知道的所有队列。这种模式被称为“发布/订阅”。
下面使用扇出类型交换机构建一个简单的日志系统,系统交互如下图所示:
首先生产者EmitLog
将日志消息发送到扇出类型交换机logs
,然后交换机logs
会绑定两个临时队列,消费者FanoutReceiveLogsInConsole
消费其中一个队列的消息将日志输出在控制台,另一个消费者FanoutReceiveLogsInFile
则消费另外一个队列的消息将日志写入磁盘,整个过程结束。下面我们来看代码实现。
编写生产者
创建消息生产者EmitLog
类,定义一个名称为logs
的fanout
类型交换机从控制台读取消息进行发送。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
import java.util.Scanner;
public class EmitLog { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqHelper.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Fanout Exchange Sent '" + message + "'"); } } } }
|
定义交换机类型时,可以直接使用字符串"fanout"
,也可以用枚举类BuiltinExchangeType
的FANOUT
。推荐使用枚举,不容易出现拼写错误。
编写消费者FanoutReceiveLogsInConsole
创建消息消费者FanoutReceiveLogsInConsole
类,将logs
交换机绑定到一个临时队列,然后消费消息,将日志输出在控制台。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
public class FanoutReceiveLogsInConsole { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. Print message in console. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
|
编写消费者FanoutReceiveLogsInFile
创建消息消费者FanoutReceiveLogsInFile
类,将logs
交换机绑定到一个临时队列,然后消费消息,将日志写入磁盘。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.exchange.fanout;
import cn.hutool.core.io.FileUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
import java.io.File; import java.nio.charset.StandardCharsets;
public class FanoutReceiveLogsInFile { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. Write message to file. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); File file = new File("/Users/sunchaser/workspace/idea-projects/sunchaser-sparrow/middleware/mq/rabbitmq-sample/src/main/resources/fanout_log.txt"); FileUtil.appendUtf8String(message, file); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
|
这里将字符串写入文件用的是hutool
工具包中FileUtil
类的appendString
,需要在pom.xml
中引入hutool
的依赖:
1 2 3 4 5
| <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.22</version> </dependency>
|
启动生产者和两个消费者
直接运行三个类的main
方法,用生产者的控制台发送一条消息,可看到两个消费者都消费到了消息,一个将消息内容输出在了控制台,另一个将消息写入了文件。