|
@@ -0,0 +1,1583 @@
|
|
|
|
|
+# 分布式消息队列(RabbitMQ)学习记录
|
|
|
|
|
+
|
|
|
|
|
+本文使用的操作系统为windows,使用JDK17
|
|
|
|
|
+
|
|
|
|
|
+[RabbitMQ官网](https://www.rabbitmq.com/)
|
|
|
|
|
+
|
|
|
|
|
+[分布式消息队列技术选型参考](https://developer.aliyun.com/article/953777)
|
|
|
|
|
+
|
|
|
|
|
+分布式消息队列的使用场景:
|
|
|
|
|
+
|
|
|
|
|
+- 异步处理
|
|
|
|
|
+- 应用解耦
|
|
|
|
|
+- 流量削峰
|
|
|
|
|
+- 流量削峰
|
|
|
|
|
+
|
|
|
|
|
+## 基本概念
|
|
|
|
|
+
|
|
|
|
|
+- [AMQP协议](https://www.rabbitmq.com/tutorials/amqp-concepts)(Advanced Message Queue Protocol):高级消息队列协议
|
|
|
|
|
+- 生产者:生产消息
|
|
|
|
|
+- 消费者:消费消息
|
|
|
|
|
+- 交换机:转发消息到对应的队列
|
|
|
|
|
+- 路由:转发消息的路径
|
|
|
|
|
+- 队列:储存消息
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## 下载安装RabbitMQ
|
|
|
|
|
+
|
|
|
|
|
+[Installing RabbitMQ | RabbitMQ](https://www.rabbitmq.com/docs/download)
|
|
|
|
|
+
|
|
|
|
|
+[RabbitMQ4.1.4安装包链接](https://github.com/rabbitmq/rabbitmq-server/releases/download/v4.1.4/rabbitmq-server-4.1.4.exe)
|
|
|
|
|
+
|
|
|
|
|
+[Erlang26.2安装包链接](https://github.com/erlang/otp/releases/download/OTP-26.2/otp_win64_26.2.exe)
|
|
|
|
|
+
|
|
|
|
|
+下载RabbitMQ4.1.4
|
|
|
|
|
+
|
|
|
|
|
+使用Erlang26.2
|
|
|
|
|
+
|
|
|
|
|
+RabbitMQ是使用Erlang开发的消息队列,安装RabbitMQ需要先安装Erlang,Erlang是一门高性能的编程语言
|
|
|
|
|
+
|
|
|
|
|
+RabbitMQ版本与Erlang[版本的对应关系](https://www.rabbitmq.com/docs/which-erlang)
|
|
|
|
|
+
|
|
|
|
|
+先安装Erlang,然后安装RabbitMQ
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+安装成功后可以在windows的服务管理面板看到RabbitMQ服务
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## 安装RabbitMQ的监控面板
|
|
|
|
|
+
|
|
|
|
|
+进入RabbitMQ安装目录下的"sbin"目录,使用CMD运行下面的脚本启用Rabiit的管理面板
|
|
|
|
|
+
|
|
|
|
|
+```shell
|
|
|
|
|
+rabbitmq-plugins.bat enable rabbitmq_management
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+输出以下信息表示安装成功
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+重启RabbitMQ访问"http://localhost:15672"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+默认的账号密码都是guest,登陆后的界面
|
|
|
|
|
+
|
|
|
|
|
+这个guest用户只能本地访问,远程访问需要新建一个用户
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## 快速入门(Hello World)
|
|
|
|
|
+
|
|
|
|
|
+[官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-one-java)
|
|
|
|
|
+
|
|
|
|
|
+一对一的场景,一个消息生产者往一个队列发送消息,一个消息消费者从一个队列中消费消息
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+在项目中加入操作RabbitMQ的客户端依赖
|
|
|
|
|
+
|
|
|
|
|
+```xml
|
|
|
|
|
+ <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
|
|
|
|
|
+ <dependency>
|
|
|
|
|
+ <groupId>com.rabbitmq</groupId>
|
|
|
|
|
+ <artifactId>amqp-client</artifactId>
|
|
|
|
|
+ <version>5.26.0</version>
|
|
|
|
|
+ </dependency>
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+### 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+1. 创建连接工厂
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //1.创建连接工厂
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+2. 配置
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //2.配置
|
|
|
|
|
+ //设置RabbitMQ服务端的主机地址
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ //实际生产中要配置用户名和密码等等
|
|
|
|
|
+ // factory.setUsername("");
|
|
|
|
|
+ // factory.setPassword("");
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+3. 创建连接,并拿到channel
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //3.创建连接,并拿到channel
|
|
|
|
|
+ //我们通过channel来操作RabbitMQ
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel())
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+4. 给channel配置队列的消息,当对应名称的信息队列不存在时会自动创建
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //4.给channel配置队列的消息
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二参数是(durable)这是是否持久化队列
|
|
|
|
|
+ * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列)
|
|
|
|
|
+ * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列
|
|
|
|
|
+ * 第五个参数(arguments)表示其它额外的参数
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+5. 生产发布消息
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //5.生产消息
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(exchange)指定一个交换机
|
|
|
|
|
+ * 第二个参数(routingKey)表示队列的名称
|
|
|
|
|
+ * 第三个参数(props)表示携带的额外属性
|
|
|
|
|
+ * 第四个参数(body)是要生产的消息本身
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.helloworld;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+//生产者
|
|
|
|
|
+public class Send {
|
|
|
|
|
+ //队列的名称
|
|
|
|
|
+ private final static String QUEUE_NAME = "hello";
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ //1.创建连接工厂
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ //2.配置
|
|
|
|
|
+ //设置RabbitMQ服务端的主机地址
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ //实际生产中要配置用户名和密码等等
|
|
|
|
|
+// factory.setUsername("");
|
|
|
|
|
+// factory.setPassword("");
|
|
|
|
|
+ //3.创建连接,并拿到channel
|
|
|
|
|
+ //我们通过channel来操作RabbitMQ
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+ //4.给channel配置队列的消息
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二参数是(durable)这是是否持久化队列
|
|
|
|
|
+ * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列)
|
|
|
|
|
+ * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列
|
|
|
|
|
+ * 第五个参数(arguments)表示其它额外的参数
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
|
+ String message = "Hello World!";
|
|
|
|
|
+ //5.生产消息
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(exchange)指定一个交换机
|
|
|
|
|
+ * 第二个参数(routingKey)表示队列的名称
|
|
|
|
|
+ * 第三个参数(props)表示携带的额外属性
|
|
|
|
|
+ * 第四个参数(body)是要生产的消息本身
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
|
|
|
|
|
+ System.out.println(" [x] Sent '" + message + "'");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+### 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+1. 创建连接工厂
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //1.创建连接工厂
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+2. 配置
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //2.配置
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+3. 获取连接和对应的channel
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //3.获取连接和对应的channel
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+4. 配置队列(这一步不是必须的,当对应名称的信息队列已经存在时,可以省略,一般为了代码的健壮性都会加上)
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //4.配置队列
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+5. 消费消息
|
|
|
|
|
+
|
|
|
|
|
+ ```JAVA
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功
|
|
|
|
|
+ * 第三个参数(deliverCallback)是消息的处理回调方法
|
|
|
|
|
+ * 第四个参数(cancelCallback)是消息被取消的回调方法
|
|
|
|
|
+ */
|
|
|
|
|
+ //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息
|
|
|
|
|
+ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 消费信息的回调
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [x] Received '" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.helloworld;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+//消费者
|
|
|
|
|
+public class Recv {
|
|
|
|
|
+
|
|
|
|
|
+ private final static String QUEUE_NAME = "hello";
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ //1.创建连接工厂
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ //2.配置
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ //3.获取连接和对应的channel
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
|
|
+ //4.配置队列
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
|
+ System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
|
|
|
|
+
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [x] Received '" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功
|
|
|
|
|
+ * 第三个参数(deliverCallback)是消息的处理回调方法
|
|
|
|
|
+ * 第四个参数(cancelCallback)是消息被取消的回调方法
|
|
|
|
|
+ */
|
|
|
|
|
+ //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息
|
|
|
|
|
+ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+## 工作队列(Work Queue)
|
|
|
|
|
+
|
|
|
|
|
+一个消息生产者对应一个队列,一个队列对应多个消费者
|
|
|
|
|
+
|
|
|
|
|
+场景:单个消费者处理消息能力不够强,需要加机器进行并行处理
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+[官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-two-java)
|
|
|
|
|
+
|
|
|
|
|
+### 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+- 队列持久化
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //队列持久化,将durable设置为true
|
|
|
|
|
+ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+- 消息持久化
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的
|
|
|
|
|
+ /**
|
|
|
|
|
+ * MessageProperties.PERSISTENT_TEXT_PLAIN中有两个属性:
|
|
|
|
|
+ * contentType:text/plain 表示纯文本
|
|
|
|
|
+ * deliveryMode:2 表示消息持久化
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.workQueue;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.MessageProperties;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.Scanner;
|
|
|
|
|
+
|
|
|
|
|
+public class NewTask {
|
|
|
|
|
+
|
|
|
|
|
+ private static final String TASK_QUEUE_NAME = "task_queue";
|
|
|
|
|
+ private static final Scanner input = new Scanner(System.in);
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+ //队列持久化,将durable设置为true
|
|
|
|
|
+ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
|
|
|
|
+
|
|
|
|
|
+ while (input.hasNext()) {
|
|
|
|
|
+ //接收控制台的消息,用于发送多条消息
|
|
|
|
|
+ String message = input.nextLine();
|
|
|
|
|
+
|
|
|
|
|
+ //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的
|
|
|
|
|
+ /**
|
|
|
|
|
+ * contentType:text/plain 表示纯文本
|
|
|
|
|
+ * deliveryMode:2 表示消息持久化
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicPublish("", TASK_QUEUE_NAME,
|
|
|
|
|
+ MessageProperties.PERSISTENT_TEXT_PLAIN,
|
|
|
|
|
+ message.getBytes("UTF-8"));
|
|
|
|
|
+ System.out.println(" [x] Sent '" + message + "'");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+### 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+- 取消消息自动确认
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //设置消息不再自动确认,需要手动确认
|
|
|
|
|
+ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- 设置一次取多少条消息
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //Qos一次处理多少条消息,即从消息队列一次取多少条消息
|
|
|
|
|
+ channel.basicQos(1);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ void basicQos(int prefetchCount) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- 消息确认
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息确认
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicAck(deliveryTag, false);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ void basicAck(long deliveryTag, boolean multiple) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- 消息拒绝
|
|
|
|
|
+
|
|
|
|
|
+ - basicNackh和basicReject都可以拒绝消息,basicNack可以批量拒绝,basicReject只能拒绝一条
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息拒绝
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(multiple)是是否批量确认消息
|
|
|
|
|
+ * 第三个参数(requeue)表示是否将消息重新放回消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicNack(deliveryTag,false,true);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息拒绝
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(requeue)表示是否将消息重新放回消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicReject(deliveryTag,true);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;
|
|
|
|
|
+ void basicReject(long deliveryTag, boolean requeue) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+#### RabbitMQ的消息确认机制
|
|
|
|
|
+
|
|
|
|
|
+[官方文档](https://www.rabbitmq.com/docs/confirms)
|
|
|
|
|
+
|
|
|
|
|
+```mermaid
|
|
|
|
|
+stateDiagram-v2
|
|
|
|
|
+ [*] --> Ready : 消息入队
|
|
|
|
|
+ Ready --> Unacked : 消费者取走
|
|
|
|
|
+ Unacked --> Ready : 消费者断开连接
|
|
|
|
|
+ Unacked --> Acked : 消费者确认
|
|
|
|
|
+ Acked --> [*] : 从队列中删除
|
|
|
|
|
+ Unacked --> Ready : 消费者拒绝(并重新入队)
|
|
|
|
|
+ Unacked --> [*] : 消费者拒绝(并丢弃)
|
|
|
|
|
+
|
|
|
|
|
+ note right of Ready
|
|
|
|
|
+ 消息在队列中,
|
|
|
|
|
+ 等待被消费
|
|
|
|
|
+ end note
|
|
|
|
|
+
|
|
|
|
|
+ note right of Unacked
|
|
|
|
|
+ 消息已被消费者持有,
|
|
|
|
|
+ 但未得到最终确认。
|
|
|
|
|
+ 此状态消息不会被删除。
|
|
|
|
|
+ end note
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- ready:消息未被消费者取走
|
|
|
|
|
+- Unacked:消息被消费者取走,但还未确认
|
|
|
|
|
+- acked:消息被消费者取走,且已经确认
|
|
|
|
|
+- nack:信息消费失败
|
|
|
|
|
+- reject:消息被消费者拒绝
|
|
|
|
|
+
|
|
|
|
|
+1. 还未被消费者取走的消息才能被消费者取走
|
|
|
|
|
+2. 消费者取走消息但未确认,消息不会从消息队列中删除
|
|
|
|
|
+3. 只有消费者确认的消息才会从消息队列中删除
|
|
|
|
|
+4. 被拒绝的消息可以重新放回消息队列,也可能被丢弃
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.workQueue;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+public class Worker {
|
|
|
|
|
+
|
|
|
|
|
+ private static final String TASK_QUEUE_NAME = "task_queue";
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ final Connection connection = factory.newConnection();
|
|
|
|
|
+ final Channel channel = connection.createChannel();
|
|
|
|
|
+ //设置消息不再自动确认,需要手动确认
|
|
|
|
|
+ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
|
|
|
|
+ System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
|
|
|
|
+ //Qos一次处理多少条消息,即从消息队列一次取多少条消息
|
|
|
|
|
+ channel.basicQos(1);
|
|
|
|
|
+
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println(" [x] Received '" + message + "'");
|
|
|
|
|
+ try {
|
|
|
|
|
+ //业务逻辑
|
|
|
|
|
+ doWork(message);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ System.out.println(" [x] Done");
|
|
|
|
|
+ //通过delivery.getEnvelope().getDeliveryTag()获取消息的Tag,可以认为是消息的ID
|
|
|
|
|
+ long deliveryTag = delivery.getEnvelope().getDeliveryTag();
|
|
|
|
|
+ // 消息处理完成,确认消息成功接收
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息确认
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicAck(deliveryTag, false);
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息拒绝
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(multiple)是是否批量确认消息
|
|
|
|
|
+ * 第三个参数(requeue)表示是否将消息重新放回消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ //channel.basicNack(deliveryTag,false,true);
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息拒绝
|
|
|
|
|
+ * 第一个参数(deliveryTag)是消息的Tag
|
|
|
|
|
+ * 第二个参数(requeue)表示是否将消息重新放回消息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ //channel.basicReject(deliveryTag,true);
|
|
|
|
|
+ }
|
|
|
|
|
+ };
|
|
|
|
|
+ //连接到服务端,开始消费消息,取消自动确认消息,改为手动确认
|
|
|
|
|
+ channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 消息的处理
|
|
|
|
|
+ * @param task 消息任务
|
|
|
|
|
+ */
|
|
|
|
|
+ private static void doWork(String task) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ //模拟消费消息消耗的时间
|
|
|
|
|
+ Thread.sleep(1000*5);
|
|
|
|
|
+ System.out.println("处理消息:%s".formatted(task));
|
|
|
|
|
+ } catch (InterruptedException _ignored) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+## 发布订阅(Publish/Subcribe)信息队列
|
|
|
|
|
+
|
|
|
|
|
+[官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-three-java#putting-it-all-together)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+一个生产者对应一个交换机,一个交换机对应多个队列
|
|
|
|
|
+
|
|
|
|
|
+场景:一个系统产生的信息被多个外部系统使用,交换机将生产的信息转发到外部系统使用的信息队列中
|
|
|
|
|
+
|
|
|
|
|
+引入交换机的概念,交换机负责信息的转发,将信息转发到对应满足条件的信息队列
|
|
|
|
|
+
|
|
|
|
|
+交换机有四种类型:
|
|
|
|
|
+
|
|
|
|
|
+- direct
|
|
|
|
|
+- topic
|
|
|
|
|
+- headers
|
|
|
|
|
+- fanout
|
|
|
|
|
+
|
|
|
|
|
+这里的案例以fanout类型为例,fanout直译为扇出,可以理解为广播
|
|
|
|
|
+
|
|
|
|
|
+### 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+1. 先声明交换机,指定名称和类型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(exchange)是交换机的名称
|
|
|
|
|
+ * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
|
|
|
|
|
+ * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+2. 然后声明消息队列
|
|
|
|
|
+
|
|
|
|
|
+3. 再将消息队列绑定到交换机上
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二个参数(exchange)是交换机的名称
|
|
|
|
|
+ * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.queueBind(queue1, EXCHANGE_NAME, "");
|
|
|
|
|
+ channel.queueBind(queue2, EXCHANGE_NAME, "");
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+ 方法原型
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+4. 最后发布信息
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //4.发布信息,向交换机发布信息
|
|
|
|
|
+ channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.publish_subscribe;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.Scanner;
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: EmitLog
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/7 21:47
|
|
|
|
|
+ * @Description: 信息生产者
|
|
|
|
|
+ */
|
|
|
|
|
+public class EmitLog {
|
|
|
|
|
+ //交换机的名称
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "logs";
|
|
|
|
|
+ public static final Scanner input = new Scanner(System.in);
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+ //1.声明一个交换机
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 第一个参数(exchange)是交换机的名称
|
|
|
|
|
+ * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
|
|
|
|
|
+ * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
|
|
|
|
+
|
|
|
|
|
+ String queue1 = "queue1";
|
|
|
|
|
+ String queue2 = "queue2";
|
|
|
|
|
+ //2.声明队列
|
|
|
|
|
+ channel.queueDeclare(queue1, false, false, false, null);
|
|
|
|
|
+ channel.queueDeclare(queue2, false, false, false, null);
|
|
|
|
|
+
|
|
|
|
|
+ //3.绑定队列到交换机上(注意需要先声明队列才能绑定队列)
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
|
|
|
|
|
+ * 第一个参数(queue)是队列的名称
|
|
|
|
|
+ * 第二个参数(exchange)是交换机的名称
|
|
|
|
|
+ * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.queueBind(queue1, EXCHANGE_NAME, "");
|
|
|
|
|
+ channel.queueBind(queue2, EXCHANGE_NAME, "");
|
|
|
|
|
+
|
|
|
|
|
+ while (input.hasNext()) {
|
|
|
|
|
+ String message = input.nextLine();
|
|
|
|
|
+ //4.发布信息,向交换机发布信息
|
|
|
|
|
+ channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
|
|
|
|
|
+ System.out.println(" [x] Sent '" + message + "'");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+### 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+使用两个不同的channel表示不同的外部系统消费不同的信息队列
|
|
|
|
|
+
|
|
|
|
|
+1. 拿到channel
|
|
|
|
|
+2. 消费对应的信息队列
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.publish_subscribe;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: ReceiveLogs
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/7 22:00
|
|
|
|
|
+ * @Description: 信息消费者
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+public class ReceiveLogs {
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "logs";
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ //1.获取两个channel模拟绑定到fanout交换机的多个队列同时进行消费
|
|
|
|
|
+ Channel channel1 = connection.createChannel();
|
|
|
|
|
+ Channel channel2 = connection.createChannel();
|
|
|
|
|
+ //消费者不需要绑定到交换机
|
|
|
|
|
+ //channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
|
|
|
|
+ //channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
|
|
|
|
+ //消费者也不需要绑定到信息队列
|
|
|
|
|
+ String queueName1 = "queue1";
|
|
|
|
|
+ String queueName2 = "queue2";
|
|
|
|
|
+ //channel1.queueBind(queueName1, EXCHANGE_NAME, "");
|
|
|
|
|
+ //channel2.queueBind(queueName2, EXCHANGE_NAME, "");
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
|
|
|
|
+ //两个消费信息的回调
|
|
|
|
|
+ DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [消费者1] Received '" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [消费者2] Received '" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ //2.消费对应队列的信息
|
|
|
|
|
+ channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
|
|
|
|
|
+ channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+## 路由(Routing)信息队列
|
|
|
|
|
+
|
|
|
|
|
+[官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-four-java)
|
|
|
|
|
+
|
|
|
|
|
+一个信息生产者对应一个交换机,一个交换机对应多个信息队列,交换机转发信息根据routingKey进行转发到对应的信息队列,direct交换机的路由键routingKey的匹配规则是字符串的完全匹配
|
|
|
|
|
+
|
|
|
|
|
+场景:一个系统生产的信息有等级层次划分,有些信息只希望授权过的外部系统才能访问,根据路由键这些信息只发往特定的信息队列
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+### 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+- 交换机使用direct类型,direct交换机的工作方式是根据路由键(routingKey)进行信息转发,路由键的匹配方式是字符串完全匹配
|
|
|
|
|
+- 绑定队列指定交换机和路由键
|
|
|
|
|
+- 发布信息指定交换机的路由键
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.routing;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.Scanner;
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: EmitLogDirect
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 8:33
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+public class EmitLogDirect {
|
|
|
|
|
+
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "direct_logs";
|
|
|
|
|
+ public static final String[] queueNames = new String[]{"error","waring","info"};
|
|
|
|
|
+ public static final Scanner input = new Scanner(System.in);
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+ //声明一个交换机,使用direct类型的交换机
|
|
|
|
|
+ /**
|
|
|
|
|
+ * direct类型的交换机转发信息需要根据路由键
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "direct");
|
|
|
|
|
+
|
|
|
|
|
+ //声明队列,三个表示不同类型日志的信息队列
|
|
|
|
|
+ for (int i = 0; i < queueNames.length; i++) {
|
|
|
|
|
+ String queueName = queueNames[i];
|
|
|
|
|
+ String routingKey = queueName;
|
|
|
|
|
+ //信息队列声明
|
|
|
|
|
+ channel.queueDeclare(queueName, false, false, false, null);
|
|
|
|
|
+ //交换机绑定队列
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 交换机绑定信息队列,指定路由键(routingKey)
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ while (input.hasNext()) {
|
|
|
|
|
+ System.out.println("请输入路由键:");
|
|
|
|
|
+ String routingKey = input.nextLine();
|
|
|
|
|
+ System.out.println("请输入信息:");
|
|
|
|
|
+ String message = input.nextLine();
|
|
|
|
|
+ //发布信息时,指定交换机exchange和路由键routingKey
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 根据路由键转发到对应的信息队列
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
|
|
|
|
|
+ System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+### 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+- 消费不同的信息队列
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.routing;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: ReceiveLogsDirect
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 8:48
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+public class ReceiveLogsDirect {
|
|
|
|
|
+
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "direct_logs";
|
|
|
|
|
+ public static final String[] queueNames = new String[]{"error","waring","info"};
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
|
|
+
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "direct");
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < queueNames.length; i++) {
|
|
|
|
|
+ String queueName = queueNames[i];
|
|
|
|
|
+ String routingKey = queueName;
|
|
|
|
|
+ //信息队列声明
|
|
|
|
|
+ channel.queueDeclare(queueName, false, false, false, null);
|
|
|
|
|
+ //交换机绑定队列
|
|
|
|
|
+ channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
|
|
|
|
+
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ //获取信息的routingKey
|
|
|
|
|
+ String routingKey = delivery.getEnvelope().getRoutingKey();
|
|
|
|
|
+ System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ //模拟多个客户端消费信息
|
|
|
|
|
+ for (int i = 0; i < queueNames.length; i++) {
|
|
|
|
|
+ String queueName = queueNames[i];
|
|
|
|
|
+ channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+## 主题(Topic)信息队列
|
|
|
|
|
+
|
|
|
|
|
+[官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-five-java#topics)
|
|
|
|
|
+
|
|
|
|
|
+topic交换机与direct交换机的主要区别是,topic交换机的路由键routingKey支持通配符的格式
|
|
|
|
|
+
|
|
|
|
|
+场景:特定的一类信息给特定的一类程序处理
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割
|
|
|
|
|
+
|
|
|
|
|
+- "*":匹配一个单词
|
|
|
|
|
+- "#":匹配零个或多个单词
|
|
|
|
|
+- "\*.orange.\*" :匹配"a.orange.b",也匹配"abc.orange.abc",但不匹配"aorangeb"
|
|
|
|
|
+- "a.#" :匹配"a.b.c",也匹配"a.b",也匹配"a",但不匹配"ab"
|
|
|
|
|
+
|
|
|
|
|
+### 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.topic;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import java.util.Scanner;
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: EmitLogTopic
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 9:45
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+public class EmitLogTopic {
|
|
|
|
|
+ public static final Scanner input = new Scanner(System.in);
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "topic_logs";
|
|
|
|
|
+ public static final String[] queueNames = new String[]{"q1","q2"};
|
|
|
|
|
+ public static final String[] routingKeys = new String[]{"*.orange.*","a.#"};
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+
|
|
|
|
|
+ //创建一个topic类型的交换机
|
|
|
|
|
+ /**
|
|
|
|
|
+ * topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割
|
|
|
|
|
+ * * :匹配一个单词
|
|
|
|
|
+ * # :匹配零个或多个单词
|
|
|
|
|
+ *
|
|
|
|
|
+ * e.g:
|
|
|
|
|
+ * *.orange.* :匹配a.orange.b,也匹配abc.orange.abc,但不匹配aorangeb
|
|
|
|
|
+ * a.# :匹配a.b.c,也匹配a.b,也匹配a,但不匹配ab
|
|
|
|
|
+ */
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "topic");
|
|
|
|
|
+ for (int i = 0; i < queueNames.length; i++) {
|
|
|
|
|
+ String queueName = queueNames[i];
|
|
|
|
|
+ String routingKey = routingKeys[i];
|
|
|
|
|
+ //创建信息队列
|
|
|
|
|
+ channel.queueDeclare(queueName, false, false, false, null);
|
|
|
|
|
+ //将队列绑定到交换机
|
|
|
|
|
+ channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println("请输入路由键:");
|
|
|
|
|
+ while (input.hasNext()) {
|
|
|
|
|
+ String routingKey = input.nextLine();
|
|
|
|
|
+ System.out.println("请输入消息:");
|
|
|
|
|
+ String message = input.nextLine();
|
|
|
|
|
+ //生产信息
|
|
|
|
|
+ channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
|
|
|
|
|
+ System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ System.out.println("请输入路由键:");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+### 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+消费者通过信息队列的名称进行信息消费即可
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.topic;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: ReceiveLogsTopic
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 10:05
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+public class ReceiveLogsTopic {
|
|
|
|
|
+
|
|
|
|
|
+ private static final String EXCHANGE_NAME = "topic_logs";
|
|
|
|
|
+ public static final String[] queueNames = new String[]{"q1","q2"};
|
|
|
|
|
+
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
|
|
+ //声明交换机
|
|
|
|
|
+ channel.exchangeDeclare(EXCHANGE_NAME, "topic");
|
|
|
|
|
+ //获取信息队列的名称
|
|
|
|
|
+ //String queueName = channel.queueDeclare().getQueue();
|
|
|
|
|
+
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [x] Received '" +
|
|
|
|
|
+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ //消费信息
|
|
|
|
|
+ for (String queueName : queueNames) {
|
|
|
|
|
+ channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+## 核心特性
|
|
|
|
|
+
|
|
|
|
|
+### [信息确认机制](#RabbitMQ的消息确认机制)
|
|
|
|
|
+
|
|
|
|
|
+### 信息过期机制
|
|
|
|
|
+
|
|
|
|
|
+[官方文档](https://www.rabbitmq.com/docs/ttl)
|
|
|
|
|
+
|
|
|
|
|
+场景:有些信息的时效比较高,一段时间不处理后可能就不需要处理了,
|
|
|
|
|
+
|
|
|
|
|
+- 给信息队列指定过期时间(信息队列里的每一条信息都有固定的过期时间)
|
|
|
|
|
+
|
|
|
|
|
+- 在创建信息队列时指定过期时间
|
|
|
|
|
+
|
|
|
|
|
+ ```java
|
|
|
|
|
+ //信息队列的额外配置参数
|
|
|
|
|
+ Map<String, Object> arguments = new HashMap<String, Object>();
|
|
|
|
|
+ //设置队列中的消息的过期时间为五秒钟
|
|
|
|
|
+ arguments.put("x-message-ttl",5*1000);
|
|
|
|
|
+ //创建信息队列时,通过arguments设置队列的过期时间
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
|
|
|
|
|
+ ```
|
|
|
|
|
+
|
|
|
|
|
+>信息在过期时间过后仍没有被消费,信息就会被丢弃
|
|
|
|
|
+>
|
|
|
|
|
+>信息被读取,但未确认,在信息过期时间到后不会被丢弃
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- 在信息发布时给信息指定过期时间
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+//创建一个信息的额外配置,设置消息的过期时间为十秒钟
|
|
|
|
|
+AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
|
|
|
|
|
+//在发布信息时给特定信息指定过期时间
|
|
|
|
|
+channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+> 给信息队列设置的过期时间对信息队列中的每一条信息都生效
|
|
|
|
|
+>
|
|
|
|
|
+> 给信息设置的过期时间只对这条信息生效
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.ttl;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.AMQP;
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: Producer
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 16:40
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ */
|
|
|
|
|
+public class Producer {
|
|
|
|
|
+ private final static String QUEUE_NAME = "ttl_queue";
|
|
|
|
|
+ public static void main(String[] argv) throws Exception {
|
|
|
|
|
+ ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+ factory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = factory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel()) {
|
|
|
|
|
+ //信息队列的额外配置参数
|
|
|
|
|
+ Map<String, Object> arguments = new HashMap<String, Object>();
|
|
|
|
|
+ //设置队列中的消息的过期时间为五秒钟
|
|
|
|
|
+ arguments.put("x-message-ttl",5*1000);
|
|
|
|
|
+ //创建信息队列时,通过arguments设置队列的过期时间
|
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
|
|
|
|
|
+ String message = "Hello World!";
|
|
|
|
|
+ //创建一个信息的额外配置,设置消息的过期时间为十秒钟
|
|
|
|
|
+ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
|
|
|
|
|
+ .expiration(10*1000+"")
|
|
|
|
|
+ .build();
|
|
|
|
|
+ //在发布信息时给特定信息指定过期时间
|
|
|
|
|
+ channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
|
|
|
|
|
+ System.out.println(" [x] Sent '" + message + "'");
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 总结:
|
|
|
|
|
+ * 给信息队列设置的过期时间对信息队列中的每一条信息都生效
|
|
|
|
|
+ * 给信息设置的过期时间只对这条信息生效
|
|
|
|
|
+ */
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+### 死信队列
|
|
|
|
|
+
|
|
|
|
|
+[官方文档](https://www.rabbitmq.com/docs/dlx)
|
|
|
|
|
+
|
|
|
|
|
+死信:因为各种原因导致无法被处理的信息被称为死信,比如信息过期,信息消费失败又没有重新放到信息队列,信息队列满了信息放不进去
|
|
|
|
|
+
|
|
|
|
|
+死信队列:保存死信的队列,就是个普通的信息队列,只是保存的是死信
|
|
|
|
|
+
|
|
|
|
|
+死信交换机:将死信转发到死信队列的交换机,也是一个普通的交换机
|
|
|
|
|
+
|
|
|
|
|
+> 死信队列是为了给系统提供的一种容错机制,让无法被处理的信息,也有机会可以被处理,防止信息的丢失
|
|
|
|
|
+
|
|
|
|
|
+- 死信的处理方法
|
|
|
|
|
+ 1. 给信息队列绑定一个出现死信使用的交换机exchange
|
|
|
|
|
+ 2. 给信息队列绑定一个出现死信使用的路由键routingKey
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+//创建工作交换机
|
|
|
|
|
+channel.exchangeDeclare(WORK_EXCHANGE, "direct");
|
|
|
|
|
+//1.创建死信交换机
|
|
|
|
|
+channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
|
|
|
|
|
+
|
|
|
|
|
+//2.创建死信队列
|
|
|
|
|
+String dlx_name1 = "dead_letter_queue1";
|
|
|
|
|
+channel.queueDeclare(dlx_name1, false, false, false, null);
|
|
|
|
|
+//3.死信队列绑定死信交换机
|
|
|
|
|
+String routingKey1 = "key1";
|
|
|
|
|
+channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
|
|
|
|
|
+
|
|
|
|
|
+//死信交换机配置
|
|
|
|
|
+Map<String,Object> arguments1 = new HashMap<>();
|
|
|
|
|
+/**
|
|
|
|
|
+* 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
|
|
|
|
|
+* 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
|
|
|
|
|
+*/
|
|
|
|
|
+arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
|
|
|
|
|
+arguments1.put("x-dead-letter-routing-key", routingKey1);
|
|
|
|
|
+//4.创建工作信息队列,通过arguments参数配置死信交换机
|
|
|
|
|
+channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
|
|
|
|
|
+//工作队列绑定工作交换机
|
|
|
|
|
+channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 生产者代码
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.dead_letter;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
|
+import java.util.HashMap;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
+import java.util.Scanner;
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: Producer
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 18:39
|
|
|
|
|
+ * @Description: 死信队列机制
|
|
|
|
|
+ * 工作交换机将通过路由键转发到工作信息队列
|
|
|
|
|
+ * 当工作队列产生死信时,死信交换机通过路由键转发到死信队列
|
|
|
|
|
+ */
|
|
|
|
|
+public class Producer {
|
|
|
|
|
+ public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
|
|
|
|
|
+ public static final String WORK_EXCHANGE = "work_exchange";
|
|
|
|
|
+ public static final String WORK_QUEUE1 = "work_queue1";
|
|
|
|
|
+ public static final String WORK_QUEUE2 = "work_queue2";
|
|
|
|
|
+ public static final Scanner input = new Scanner(System.in);
|
|
|
|
|
+ public static void main(String[] args) throws IOException, TimeoutException {
|
|
|
|
|
+ ConnectionFactory connectionFactory = new ConnectionFactory();
|
|
|
|
|
+ connectionFactory.setHost("localhost");
|
|
|
|
|
+ try (Connection connection = connectionFactory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();) {
|
|
|
|
|
+ //创建工作交换机
|
|
|
|
|
+ channel.exchangeDeclare(WORK_EXCHANGE, "direct");
|
|
|
|
|
+ //创建死信交换机
|
|
|
|
|
+ channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
|
|
|
|
|
+
|
|
|
|
|
+ //创建死信队列
|
|
|
|
|
+ String dlx_name1 = "dead_letter_queue1";
|
|
|
|
|
+ String dlx_name2 = "dead_letter_queue2";
|
|
|
|
|
+ channel.queueDeclare(dlx_name1, false, false, false, null);
|
|
|
|
|
+ channel.queueDeclare(dlx_name2, false, false, false, null);
|
|
|
|
|
+ //死信队列绑定死信交换机
|
|
|
|
|
+ String routingKey1 = "key1";
|
|
|
|
|
+ String routingKey2 = "key2";
|
|
|
|
|
+ channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
|
|
|
|
|
+ channel.queueBind(dlx_name2, DEAD_LETTER_EXCHANGE, routingKey2);
|
|
|
|
|
+
|
|
|
|
|
+ //死信交换机配置
|
|
|
|
|
+ Map<String,Object> arguments1 = new HashMap<>();
|
|
|
|
|
+ Map<String,Object> arguments2 = new HashMap<>();
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
|
|
|
|
|
+ * 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
|
|
|
|
|
+ */
|
|
|
|
|
+ arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
|
|
|
|
|
+ arguments1.put("x-dead-letter-routing-key", routingKey1);
|
|
|
|
|
+ arguments2.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
|
|
|
|
|
+ arguments2.put("x-dead-letter-routing-key", routingKey2);
|
|
|
|
|
+ //创建工作信息队列,通过arguments参数配置死信交换机
|
|
|
|
|
+ channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
|
|
|
|
|
+ channel.queueDeclare(WORK_QUEUE2, false, false, false, arguments2);
|
|
|
|
|
+ //工作队列绑定工作交换机
|
|
|
|
|
+ channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
|
|
|
|
|
+ channel.queueBind(WORK_QUEUE2, WORK_EXCHANGE, routingKey2);
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println("请输入路由键:");
|
|
|
|
|
+ while (input.hasNext()) {
|
|
|
|
|
+ String routingKey = input.nextLine();
|
|
|
|
|
+ System.out.println("请输入信息:");
|
|
|
|
|
+ String message = input.nextLine();
|
|
|
|
|
+ //发送信息
|
|
|
|
|
+ channel.basicPublish(WORK_EXCHANGE,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
|
|
|
|
|
+ System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ System.out.println("请输入路由键:");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+- 消费者代码
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.rabbitMQ_learn.dead_letter;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import com.rabbitmq.client.Connection;
|
|
|
|
|
+import com.rabbitmq.client.ConnectionFactory;
|
|
|
|
|
+import com.rabbitmq.client.DeliverCallback;
|
|
|
|
|
+
|
|
|
|
|
+import java.io.IOException;
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: RabbitMQ-learn
|
|
|
|
|
+ * @FileName: Consumer
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 18:59
|
|
|
|
|
+ * @Description:
|
|
|
|
|
+ * 消费者拒绝工作队列的信息模拟死信的产生
|
|
|
|
|
+ * 模拟其他程序消费死信队列的信息
|
|
|
|
|
+ */
|
|
|
|
|
+public class Consumer {
|
|
|
|
|
+ public static final String WORK_QUEUE1 = "work_queue1";
|
|
|
|
|
+ public static final String WORK_QUEUE2 = "work_queue2";
|
|
|
|
|
+ public static final String DLX_QUEUE_NAME1 = "dead_letter_queue1";
|
|
|
|
|
+ public static final String DLX_QUEUE_NAME2 = "dead_letter_queue2";
|
|
|
|
|
+ public static void main(String[] args) throws IOException, TimeoutException {
|
|
|
|
|
+ ConnectionFactory connectionFactory = new ConnectionFactory();
|
|
|
|
|
+ connectionFactory.setHost("localhost");
|
|
|
|
|
+ Connection connection = connectionFactory.newConnection();
|
|
|
|
|
+ Channel channel = connection.createChannel();
|
|
|
|
|
+
|
|
|
|
|
+ //工作信息的处理
|
|
|
|
|
+ DeliverCallback deliverCallback = (consumerTag,delivery)->{
|
|
|
|
|
+ long deliveryTag = delivery.getEnvelope().getDeliveryTag();
|
|
|
|
|
+ String routingKey = delivery.getEnvelope().getRoutingKey();
|
|
|
|
|
+ String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
|
|
|
|
|
+ //拒绝信息,模拟死信的产生
|
|
|
|
|
+ channel.basicNack(deliveryTag,false,false);
|
|
|
|
|
+ System.out.println(" [worker] Received '" + routingKey + "':'" + message + "已拒绝'");
|
|
|
|
|
+ };
|
|
|
|
|
+ //死信队列的处理
|
|
|
|
|
+ DeliverCallback deliverCallback1 = (consumerTag,delivery)->{
|
|
|
|
|
+ String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
|
|
|
|
|
+ String routingKey = delivery.getEnvelope().getRoutingKey();
|
|
|
|
|
+ System.out.println(" [dlx1] Received '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ DeliverCallback deliverCallback2 = (consumerTag,delivery)->{
|
|
|
|
|
+ String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
|
|
|
|
|
+ String routingKey = delivery.getEnvelope().getRoutingKey();
|
|
|
|
|
+ System.out.println(" [dlx2] Received '" + routingKey + "':'" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+ //消费工作信息
|
|
|
|
|
+ channel.basicConsume(WORK_QUEUE1,false,deliverCallback, consumerTag->{});
|
|
|
|
|
+ channel.basicConsume(WORK_QUEUE2,false,deliverCallback, consumerTag->{});
|
|
|
|
|
+ //消费死信队列的信息
|
|
|
|
|
+ channel.basicConsume(DLX_QUEUE_NAME1,true,deliverCallback1,consumerTag->{});
|
|
|
|
|
+ channel.basicConsume(DLX_QUEUE_NAME2,true,deliverCallback2,consumerTag->{});
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## RabbitMQ常用API
|
|
|
|
|
+
|
|
|
|
|
+- 常规操作
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+//1.创建连接工厂
|
|
|
|
|
+ConnectionFactory factory = new ConnectionFactory();
|
|
|
|
|
+//2.配置
|
|
|
|
|
+//设置RabbitMQ服务端的主机地址
|
|
|
|
|
+factory.setHost("localhost");
|
|
|
|
|
+//实际生产中要配置用户名和密码等等
|
|
|
|
|
+factory.setUsername("");
|
|
|
|
|
+factory.setPassword("");
|
|
|
|
|
+//3.创建连接,并拿到channel
|
|
|
|
|
+//我们通过channel来操作RabbitMQ
|
|
|
|
|
+Connection connection = factory.newConnection();
|
|
|
|
|
+Channel channel = connection.createChannel())
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 创建交换机
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
|
|
|
|
+//方法原型
|
|
|
|
|
+Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 创建信息队列
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
|
+//方法原型
|
|
|
|
|
+Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 信息队列绑定交换机
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+channel.queueBind(queue1, EXCHANGE_NAME, "");
|
|
|
|
|
+//方法原型
|
|
|
|
|
+Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 发布信息
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
|
|
|
|
|
+//方法原型
|
|
|
|
|
+void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 消费信息
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
|
|
|
|
|
+//方法原型
|
|
|
|
|
+String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 给信息队列设置额外的参数
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+//信息队列的额外配置参数
|
|
|
|
|
+Map<String, Object> arguments = new HashMap<String, Object>();
|
|
|
|
|
+//设置队列中的消息的过期时间为五秒钟
|
|
|
|
|
+arguments.put("x-message-ttl",5*1000);
|
|
|
|
|
+//创建信息队列时,通过arguments设置队列的过期时间
|
|
|
|
|
+channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 给信息设置额外的参数
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+//创建一个信息的额外配置,设置消息的过期时间为十秒钟
|
|
|
|
|
+AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
|
|
|
|
|
+//在发布信息时给特定信息指定过期时间
|
|
|
|
|
+channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+- 信息消费的处理回调
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
|
|
|
|
+ String message = new String(delivery.getBody(), "UTF-8");
|
|
|
|
|
+ System.out.println(" [x] Received '" + message + "'");
|
|
|
|
|
+ };
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## RabbitMQ集成到项目中
|
|
|
|
|
+
|
|
|
|
|
+通过springboot starter集成到项目中
|
|
|
|
|
+
|
|
|
|
|
+导入依赖
|
|
|
|
|
+
|
|
|
|
|
+```xml
|
|
|
|
|
+<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
|
|
|
|
|
+<dependency>
|
|
|
|
|
+ <groupId>org.springframework.boot</groupId>
|
|
|
|
|
+ <artifactId>spring-boot-starter-amqp</artifactId>
|
|
|
|
|
+ <version>2.7.16</version>
|
|
|
|
|
+</dependency>
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+配置RabbitMQ
|
|
|
|
|
+
|
|
|
|
|
+```yaml
|
|
|
|
|
+spring:
|
|
|
|
|
+ rabbitmq:
|
|
|
|
|
+ host: localhost
|
|
|
|
|
+ port: 5672
|
|
|
|
|
+ username: guest
|
|
|
|
|
+ password: guest
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+自定义信息生产者
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.BI.mq;
|
|
|
|
|
+
|
|
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: serve
|
|
|
|
|
+ * @FileName: MessageProducer
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 20:17
|
|
|
|
|
+ * @Description: 信息生产者
|
|
|
|
|
+ */
|
|
|
|
|
+@Component
|
|
|
|
|
+public class MessageProducer {
|
|
|
|
|
+ //通过RabbitTemplate操作RabbitMQ
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private RabbitTemplate rabbitTemplate;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发送信息
|
|
|
|
|
+ * @param exchange 交换机
|
|
|
|
|
+ * @param routingKey 路由键
|
|
|
|
|
+ * @param message 信息
|
|
|
|
|
+ * @description:
|
|
|
|
|
+ * @author: 杨逸
|
|
|
|
|
+ * @data:2025/10/09 20:19:06
|
|
|
|
|
+ * @since 1.0.0
|
|
|
|
|
+ */
|
|
|
|
|
+ public void sendMessage(String exchange,String routingKey,String message) {
|
|
|
|
|
+ //发送信息
|
|
|
|
|
+ rabbitTemplate.convertAndSend(exchange,routingKey,message);
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+自定义信息消费者
|
|
|
|
|
+
|
|
|
|
|
+```java
|
|
|
|
|
+package space.anyi.BI.mq;
|
|
|
|
|
+
|
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
|
|
+import org.springframework.amqp.support.AmqpHeaders;
|
|
|
|
|
+import org.springframework.messaging.handler.annotation.Header;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @ProjectName: serve
|
|
|
|
|
+ * @FileName: MessageConsumer
|
|
|
|
|
+ * @Author: 杨逸
|
|
|
|
|
+ * @Data:2025/10/9 20:22
|
|
|
|
|
+ * @Description: 信息消费者
|
|
|
|
|
+ */
|
|
|
|
|
+@Component
|
|
|
|
|
+public class MessageConsumer {
|
|
|
|
|
+ /**
|
|
|
|
|
+ * @param message 信息
|
|
|
|
|
+ * @param channel 通道
|
|
|
|
|
+ * @param deliverTag 消息标签
|
|
|
|
|
+ * @description:
|
|
|
|
|
+ * @author: 杨逸
|
|
|
|
|
+ * @data:2025/10/09 20:29:40
|
|
|
|
|
+ * @since 1.0.0
|
|
|
|
|
+ */
|
|
|
|
|
+ //接收信息
|
|
|
|
|
+ @RabbitListener(queues = {"queue_name"},ackMode = "MANUAL")
|
|
|
|
|
+ public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
|
|
|
|
|
+ //消费信息的代码
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+```
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+## RabbitMQ的重要特性
|
|
|
|
|
+
|
|
|
|
|
+> 面试考点
|
|
|
|
|
+
|
|
|
|
|
+1. 消息队列的概念,模型,应用场景
|
|
|
|
|
+2. 交换机的类别,路由的绑定关系
|
|
|
|
|
+3. 信息可靠性
|
|
|
|
|
+ 1. 信息确认机制(ack,nack,reject)
|
|
|
|
|
+ 2. 信息持久化(durable)
|
|
|
|
|
+ 3. 信息过期机制
|
|
|
|
|
+ 4. 死信队列
|
|
|
|
|
+4. 延迟队列(类似死信队列)
|
|
|
|
|
+5. 顺序消费,消费幂等性
|
|
|
|
|
+6. 可扩展性
|
|
|
|
|
+ 1. 集群
|
|
|
|
|
+ 2. 故障恢复机制
|
|
|
|
|
+ 3. 镜像
|
|
|
|
|
+7. 运维监控告警
|
|
|
|
|
+
|