前言
本文将介绍RabbitMQ
的Hello World
程序,即简单队列模式。包含一个发送单个消息的生产者,和一个接收消息并打印出来的消费者。该模式的工作原理如下图所示:
P
表示生产者,C
表示消费者,中间的红框表示一个队列。
下面我们使用Java
客户端来实现该程序。
核心依赖
新建一个maven
项目,在pom.xml
中添加以下依赖:
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency>
|
编写消息生产者
创建消息生产者Producer
类,编写代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.helloworld;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Producer { private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/SunChaser"); factory.setUsername("guest"); factory.setPassword("guest"); try ( Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
|
编写消息消费者
创建消息消费者Consumer
类,编写代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.sunchaser.sparrow.middleware.mq.rabbitmq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Consumer { private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/SunChaser"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(" [x] Received '" + new String(message.getBody()) + "'"); };
CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费时的回调逻辑"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
启动生产者和消费者
分别启动生产者Producer
和消费者Consumer
,观察控制台输出:
1
| [x] Received 'Hello World!'
|
可看到已经成功完成了一次消息的发送和消费。