前言

本文将介绍RabbitMQHello World程序,即简单队列模式。包含一个发送单个消息的生产者,和一个接收消息并打印出来的消费者。该模式的工作原理如下图所示:

P表示生产者,C表示消费者,中间的红框表示一个队列。

下面我们使用Java客户端来实现该程序。

核心依赖

新建一个maven项目,在pom.xml中添加以下依赖:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>

编写消息生产者

创建消息生产者Producer类,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.sunchaser.sparrow.middleware.mq.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* rabbitmq 简单队列模式 生产者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/15
*/
public class Producer {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置host
factory.setHost("127.0.0.1");
// 端口号。默认值5672,可不进行设置。
factory.setPort(5672);
// 虚拟主机名。可不进行设置,默认值为"/"。
factory.setVirtualHost("/SunChaser");
// 可不设置,username和password默认都为guest
factory.setUsername("guest");
factory.setPassword("guest");

// Connection和Channel都实现了java.io.Closeable接口
// 生产者中可使用try-with-resources语法
try (// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel()) {

/*
* 声明一个队列用来发送消息
*
* @param queue 队列名称
* @param durable 消息是否进行持久化(不持久化则保存在内存中)。true:进行持久化
* @param exclusive 是否声明为独占队列。当前队列只允许当前连接使用,其它连接不可用。true:为独占队列
* @param autoDelete 是否在消费完成后自动删除队列。true:自动删除
* @param arguments 队列的其它参数信息
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";

/*
* 往信道中发送一个消息
*
* @param exchange 交换机名称
* @param routingKey 路由key
* @param props 消息的其它参数信息
* @param body 消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

编写消息消费者

创建消息消费者Consumer类,编写代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.sunchaser.sparrow.middleware.mq.rabbitmq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* rabbitmq 简单队列模式 消费者
*
* @author sunchaser admin@lilu.org.cn
* @since JDK8 2022/4/15
*/
public class Consumer {
private static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 设置host
factory.setHost("127.0.0.1");
// 端口号。默认值5672,可不进行设置。
factory.setPort(5672);
// 虚拟主机名。可不进行设置,默认值为"/"。
factory.setVirtualHost("/SunChaser");
// 可不设置,username和password默认都为guest
factory.setUsername("guest");
factory.setPassword("guest");

// 消费者不需要使用try-with-resources语法自动关闭Connection和Channel。
// 因为消息的发送是异步的,有可能消费者先启动,如果连接和信道都关闭了,则无法进行消费消息
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();

/*
* 消息传递(消费)时的回调
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(" [x] Received '" + new String(message.getBody()) + "'");
};

/*
* 取消消费时的回调
*/
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费时的回调逻辑");
};

/*
* 消费者消费消息
*
* @param queue 队列名称
* @param autoAck 消费成功之后是否自动回应ack。true:自动;false:手动。
* @param deliverCallback 消息传递时的回调
* @param cancelCallback 消费者取消消费时的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

启动生产者和消费者

分别启动生产者Producer和消费者Consumer,观察控制台输出:

1
[x] Sent 'Hello World!'
1
[x] Received 'Hello World!'

可看到已经成功完成了一次消息的发送和消费。