前言

工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。即异步化,生产者将任务封装成消息并发送到队列中。在后台运行的消费者线程从队列中消费消息并最终执行任务。当有多个消费者时,任务在其之间进行共享。

该模式的工作原理如下图所示:

P表示生产者,C1C2表示两个消费者,中间的红框表示一个队列。

工具类封装

在简单队列模式的代码实现中,获取信道Channel对象的代码略显重复,这里将其抽取成一个静态方法放置在工具类RabbitMqHelper中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.sunchaser.sparrow.middleware.mq.rabbitmq.common;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* rabbitmq 工具类
* 抽取一些公共方法
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/15
*/
public class RabbitMqHelper {

public static Channel getChannel() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// 设置host
factory.setHost("127.0.0.1");
// 端口号。默认值5672,可不进行设置。
factory.setPort(5672);
// 虚拟主机名。可不进行设置,默认值为"/"。
factory.setVirtualHost("/");
// 可不设置,username和password默认都为guest
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
return connection.createChannel();
}
}

编写生产者

创建消息生产者NewTask类,为了便捷快速地发送消息,这里直接从控制台读取消息内容进行发送。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.sunchaser.sparrow.middleware.mq.rabbitmq.workqueues;

import com.rabbitmq.client.Channel;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

import java.util.Scanner;

/**
* 工作队列模式 消息生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/18
*/
public class NewTask {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqHelper.getChannel()) {
/*
* 声明一个队列用来发送消息
*
* @param queue 队列名称
* @param durable 消息是否进行持久化(不持久化则保存在内存中)。true:进行持久化
* @param exclusive 是否声明为独占队列。当前队列只允许当前连接使用,其它连接不可用。true:为独占队列
* @param autoDelete 是否在消费完成后自动删除队列。true:自动删除
* @param arguments 队列的其它参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 从控制台读取消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();

/*
* 往信道中发送一个消息
*
* @param exchange 交换机名称
* @param routingKey 路由key
* @param props 消息的其它参数信息
* @param body 消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

编写消费者

创建消息消费者Worker类。编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.sunchaser.sparrow.middleware.mq.rabbitmq.workqueues;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.sunchaser.sparrow.middleware.mq.rabbitmq.common.RabbitMqHelper;

/**
* 工作队列模式 消息消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/15
*/
public class Worker {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqHelper.getChannel();

/*
* 消息传递(消费)时的回调
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(" [x] Received '" + new String(message.getBody()) + "'");
};

/*
* 取消消费时的回调
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费时的回调逻辑");
};

channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

启动生产者和两个消费者

生产者直接进行启动;消费者需要进行简单配置后才能运行两个实例。

IDEA 2021.3.2为例,点击上方Edit Configurations...,点击Modify options,勾选Allow multiple instances,点击ApplyOK保存设置。

image-20220419210631882

随后启动两次Worker类的main方法即可启动两个消费者。

多次往生产者控制台输入消息并回车进行发送,观察两个消费者控制台的输出:

image-20220420000322224 image-20220420000411092
可看到消息是轮询进行分发,两个消费者“你来我往”的均匀消费消息。