前言
工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。即异步化,生产者将任务封装成消息并发送到队列中。在后台运行的消费者线程从队列中消费消息并最终执行任务。当有多个消费者时,任务在其之间进行共享。
该模式的工作原理如下图所示:
P
表示生产者,C1
和C2
表示两个消费者,中间的红框表示一个队列。
工具类封装
在简单队列模式的代码实现中,获取信道Channel
对象的代码略显重复,这里将其抽取成一个静态方法放置在工具类RabbitMqHelper
中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.common;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqHelper {
public static Channel getChannel() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection.createChannel(); } }
|
编写生产者
创建消息生产者NewTask
类,为了便捷快速地发送消息,这里直接从控制台读取消息内容进行发送。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.workqueues;
import com.rabbitmq.client.Channel; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
import java.util.Scanner;
public class NewTask { private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqHelper.getChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
编写消费者
创建消息消费者Worker
类。编写代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.workqueues;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;
public class Worker { private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqHelper.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(" [x] Received '" + new String(message.getBody()) + "'"); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费时的回调逻辑"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
启动生产者和两个消费者
生产者直接进行启动;消费者需要进行简单配置后才能运行两个实例。
以IDEA 2021.3.2
为例,点击上方Edit Configurations...
,点击Modify options
,勾选Allow multiple instances
,点击Apply
和OK
保存设置。
随后启动两次Worker
类的main
方法即可启动两个消费者。
多次往生产者控制台输入消息并回车进行发送,观察两个消费者控制台的输出:
可看到消息是轮询进行分发,两个消费者“你来我往”的均匀消费消息。