# 分布式消息队列(RabbitMQ)学习记录 本文使用的操作系统为windows,使用JDK17 [RabbitMQ官网](https://www.rabbitmq.com/) [分布式消息队列技术选型参考](https://developer.aliyun.com/article/953777) 分布式消息队列的使用场景: - 异步处理 - 应用解耦 - 流量削峰 - 流量削峰 ## 基本概念 - [AMQP协议](https://www.rabbitmq.com/tutorials/amqp-concepts)(Advanced Message Queue Protocol):高级消息队列协议 - 生产者:生产消息 - 消费者:消费消息 - 交换机:转发消息到对应的队列 - 路由:转发消息的路径 - 队列:储存消息 ![](https://www.rabbitmq.com/assets/images/hello-world-example-routing-cbe9a872b37956a4072a5e13f9d76e7b.png) ## 下载安装RabbitMQ [Installing RabbitMQ | RabbitMQ](https://www.rabbitmq.com/docs/download) [RabbitMQ4.1.4安装包链接](https://github.com/rabbitmq/rabbitmq-server/releases/download/v4.1.4/rabbitmq-server-4.1.4.exe) [Erlang26.2安装包链接](https://github.com/erlang/otp/releases/download/OTP-26.2/otp_win64_26.2.exe) 下载RabbitMQ4.1.4 使用Erlang26.2 RabbitMQ是使用Erlang开发的消息队列,安装RabbitMQ需要先安装Erlang,Erlang是一门高性能的编程语言 RabbitMQ版本与Erlang[版本的对应关系](https://www.rabbitmq.com/docs/which-erlang) 先安装Erlang,然后安装RabbitMQ ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_12-57-21.png) ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_13-00-59.png) 安装成功后可以在windows的服务管理面板看到RabbitMQ服务 ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_13-10-16.png) ## 安装RabbitMQ的监控面板 进入RabbitMQ安装目录下的"sbin"目录,使用CMD运行下面的脚本启用Rabiit的管理面板 ```shell rabbitmq-plugins.bat enable rabbitmq_management ``` 输出以下信息表示安装成功 ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_13-02-55.png) 重启RabbitMQ访问"http://localhost:15672" ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_13-06-48.png) 默认的账号密码都是guest,登陆后的界面 这个guest用户只能本地访问,远程访问需要新建一个用户 ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-07_13-08-25.png) ## 快速入门(Hello World) [官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-one-java) 一对一的场景,一个消息生产者往一个队列发送消息,一个消息消费者从一个队列中消费消息 ![](http://tuchuang.anyi.space/imgs/HelloWord.svg) 在项目中加入操作RabbitMQ的客户端依赖 ```xml com.rabbitmq amqp-client 5.26.0 ``` ### 生产者代码 1. 创建连接工厂 ```java //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); ``` 2. 配置 ```java //2.配置 //设置RabbitMQ服务端的主机地址 factory.setHost("localhost"); //实际生产中要配置用户名和密码等等 // factory.setUsername(""); // factory.setPassword(""); ``` 3. 创建连接,并拿到channel ```java //3.创建连接,并拿到channel //我们通过channel来操作RabbitMQ Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) ``` 4. 给channel配置队列的消息,当对应名称的信息队列不存在时会自动创建 ```java //4.给channel配置队列的消息 /** * 第一个参数(queue)是队列的名称 * 第二参数是(durable)这是是否持久化队列 * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列) * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列 * 第五个参数(arguments)表示其它额外的参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); ``` 方法原型 ```java Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException; ``` 5. 生产发布消息 ```java //5.生产消息 /** * 第一个参数(exchange)指定一个交换机 * 第二个参数(routingKey)表示队列的名称 * 第三个参数(props)表示携带的额外属性 * 第四个参数(body)是要生产的消息本身 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ``` 方法原型 ```java void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; ``` ```java package space.anyi.rabbitMQ_learn.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //生产者 public class Send { //队列的名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.配置 //设置RabbitMQ服务端的主机地址 factory.setHost("localhost"); //实际生产中要配置用户名和密码等等 // factory.setUsername(""); // factory.setPassword(""); //3.创建连接,并拿到channel //我们通过channel来操作RabbitMQ try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //4.给channel配置队列的消息 /** * 第一个参数(queue)是队列的名称 * 第二参数是(durable)这是是否持久化队列 * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列) * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列 * 第五个参数(arguments)表示其它额外的参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //5.生产消息 /** * 第一个参数(exchange)指定一个交换机 * 第二个参数(routingKey)表示队列的名称 * 第三个参数(props)表示携带的额外属性 * 第四个参数(body)是要生产的消息本身 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } } ``` ### 消费者代码 1. 创建连接工厂 ```java //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); ``` 2. 配置 ```java //2.配置 factory.setHost("localhost"); ``` 3. 获取连接和对应的channel ```java //3.获取连接和对应的channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); ``` 4. 配置队列(这一步不是必须的,当对应名称的信息队列已经存在时,可以省略,一般为了代码的健壮性都会加上) ```java //4.配置队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ``` 5. 消费消息 ```JAVA /** * 第一个参数(queue)是队列的名称 * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功 * 第三个参数(deliverCallback)是消息的处理回调方法 * 第四个参数(cancelCallback)是消息被取消的回调方法 */ //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); ``` 方法原型 ```java String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; ``` 消费信息的回调 ```java DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; ``` ```java package space.anyi.rabbitMQ_learn.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; //消费者 public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.配置 factory.setHost("localhost"); //3.获取连接和对应的channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //4.配置队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; /** * 第一个参数(queue)是队列的名称 * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功 * 第三个参数(deliverCallback)是消息的处理回调方法 * 第四个参数(cancelCallback)是消息被取消的回调方法 */ //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } ``` ## 工作队列(Work Queue) 一个消息生产者对应一个队列,一个队列对应多个消费者 场景:单个消费者处理消息能力不够强,需要加机器进行并行处理 ![](http://tuchuang.anyi.space/imgs/workQueu.svg) [官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-two-java) ### 生产者代码 - 队列持久化 ```java //队列持久化,将durable设置为true channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ``` 方法原型 ```java Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException; ``` - 消息持久化 ```java //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的 /** * MessageProperties.PERSISTENT_TEXT_PLAIN中有两个属性: * contentType:text/plain 表示纯文本 * deliveryMode:2 表示消息持久化 */ channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); ``` 方法原型 ```java void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; ``` ```java package space.anyi.rabbitMQ_learn.workQueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; private static final Scanner input = new Scanner(System.in); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //队列持久化,将durable设置为true channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); while (input.hasNext()) { //接收控制台的消息,用于发送多条消息 String message = input.nextLine(); //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的 /** * contentType:text/plain 表示纯文本 * deliveryMode:2 表示消息持久化 */ channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } } ``` ### 消费者代码 - 取消消息自动确认 ```java //设置消息不再自动确认,需要手动确认 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ``` - 设置一次取多少条消息 ```java //Qos一次处理多少条消息,即从消息队列一次取多少条消息 channel.basicQos(1); ``` 方法原型 ```java void basicQos(int prefetchCount) throws IOException; ``` - 消息确认 ```java /** * 消息确认 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息 */ channel.basicAck(deliveryTag, false); ``` 方法原型 ```java void basicAck(long deliveryTag, boolean multiple) throws IOException; ``` - 消息拒绝 - basicNackh和basicReject都可以拒绝消息,basicNack可以批量拒绝,basicReject只能拒绝一条 ```java /** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息 * 第三个参数(requeue)表示是否将消息重新放回消息队列 */ channel.basicNack(deliveryTag,false,true); ``` ```java /** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(requeue)表示是否将消息重新放回消息队列 */ channel.basicReject(deliveryTag,true); ``` 方法原型 ```java void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException; ``` #### RabbitMQ的消息确认机制 [官方文档](https://www.rabbitmq.com/docs/confirms) ```mermaid stateDiagram-v2 [*] --> Ready : 消息入队 Ready --> Unacked : 消费者取走 Unacked --> Ready : 消费者断开连接 Unacked --> Acked : 消费者确认 Acked --> [*] : 从队列中删除 Unacked --> Ready : 消费者拒绝(并重新入队) Unacked --> [*] : 消费者拒绝(并丢弃) note right of Ready 消息在队列中, 等待被消费 end note note right of Unacked 消息已被消费者持有, 但未得到最终确认。 此状态消息不会被删除。 end note ``` - ready:消息未被消费者取走 - Unacked:消息被消费者取走,但还未确认 - acked:消息被消费者取走,且已经确认 - nack:信息消费失败 - reject:消息被消费者拒绝 1. 还未被消费者取走的消息才能被消费者取走 2. 消费者取走消息但未确认,消息不会从消息队列中删除 3. 只有消费者确认的消息才会从消息队列中删除 4. 被拒绝的消息可以重新放回消息队列,也可能被丢弃 ```java package space.anyi.rabbitMQ_learn.workQueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //设置消息不再自动确认,需要手动确认 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //Qos一次处理多少条消息,即从消息队列一次取多少条消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { //业务逻辑 doWork(message); } finally { System.out.println(" [x] Done"); //通过delivery.getEnvelope().getDeliveryTag()获取消息的Tag,可以认为是消息的ID long deliveryTag = delivery.getEnvelope().getDeliveryTag(); // 消息处理完成,确认消息成功接收 /** * 消息确认 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息 */ channel.basicAck(deliveryTag, false); /** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息 * 第三个参数(requeue)表示是否将消息重新放回消息队列 */ //channel.basicNack(deliveryTag,false,true); /** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(requeue)表示是否将消息重新放回消息队列 */ //channel.basicReject(deliveryTag,true); } }; //连接到服务端,开始消费消息,取消自动确认消息,改为手动确认 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } /** * 消息的处理 * @param task 消息任务 */ private static void doWork(String task) { try { //模拟消费消息消耗的时间 Thread.sleep(1000*5); System.out.println("处理消息:%s".formatted(task)); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } ``` ## 发布订阅(Publish/Subcribe)信息队列 [官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-three-java#putting-it-all-together) ![](http://tuchuang.anyi.space/imgs/publish_subscribe.svg) 一个生产者对应一个交换机,一个交换机对应多个队列 场景:一个系统产生的信息被多个外部系统使用,交换机将生产的信息转发到外部系统使用的信息队列中 引入交换机的概念,交换机负责信息的转发,将信息转发到对应满足条件的信息队列 交换机有四种类型: - direct - topic - headers - fanout 这里的案例以fanout类型为例,fanout直译为扇出,可以理解为广播 ### 生产者代码 1. 先声明交换机,指定名称和类型 ```java /** * 第一个参数(exchange)是交换机的名称 * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); ``` 方法原型 ```java Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; ``` 2. 然后声明消息队列 3. 再将消息队列绑定到交换机上 ```java /** * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列) * 第一个参数(queue)是队列的名称 * 第二个参数(exchange)是交换机的名称 * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键 */ channel.queueBind(queue1, EXCHANGE_NAME, ""); channel.queueBind(queue2, EXCHANGE_NAME, ""); ``` 方法原型 ```java Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; ``` 4. 最后发布信息 ```java //4.发布信息,向交换机发布信息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); ``` ```java package space.anyi.rabbitMQ_learn.publish_subscribe; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ProjectName: RabbitMQ-learn * @FileName: EmitLog * @Author: 杨逸 * @Data:2025/10/7 21:47 * @Description: 信息生产者 */ public class EmitLog { //交换机的名称 private static final String EXCHANGE_NAME = "logs"; public static final Scanner input = new Scanner(System.in); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //1.声明一个交换机 /** * 第一个参数(exchange)是交换机的名称 * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queue1 = "queue1"; String queue2 = "queue2"; //2.声明队列 channel.queueDeclare(queue1, false, false, false, null); channel.queueDeclare(queue2, false, false, false, null); //3.绑定队列到交换机上(注意需要先声明队列才能绑定队列) /** * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列) * 第一个参数(queue)是队列的名称 * 第二个参数(exchange)是交换机的名称 * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键 */ channel.queueBind(queue1, EXCHANGE_NAME, ""); channel.queueBind(queue2, EXCHANGE_NAME, ""); while (input.hasNext()) { String message = input.nextLine(); //4.发布信息,向交换机发布信息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } } ``` ### 消费者代码 使用两个不同的channel表示不同的外部系统消费不同的信息队列 1. 拿到channel 2. 消费对应的信息队列 ```java package space.anyi.rabbitMQ_learn.publish_subscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * @ProjectName: RabbitMQ-learn * @FileName: ReceiveLogs * @Author: 杨逸 * @Data:2025/10/7 22:00 * @Description: 信息消费者 */ public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); //1.获取两个channel模拟绑定到fanout交换机的多个队列同时进行消费 Channel channel1 = connection.createChannel(); Channel channel2 = connection.createChannel(); //消费者不需要绑定到交换机 //channel1.exchangeDeclare(EXCHANGE_NAME, "fanout"); //channel2.exchangeDeclare(EXCHANGE_NAME, "fanout"); //消费者也不需要绑定到信息队列 String queueName1 = "queue1"; String queueName2 = "queue2"; //channel1.queueBind(queueName1, EXCHANGE_NAME, ""); //channel2.queueBind(queueName2, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //两个消费信息的回调 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [消费者1] Received '" + message + "'"); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [消费者2] Received '" + message + "'"); }; //2.消费对应队列的信息 channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } } ``` ## 路由(Routing)信息队列 [官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-four-java) 一个信息生产者对应一个交换机,一个交换机对应多个信息队列,交换机转发信息根据routingKey进行转发到对应的信息队列,direct交换机的路由键routingKey的匹配规则是字符串的完全匹配 场景:一个系统生产的信息有等级层次划分,有些信息只希望授权过的外部系统才能访问,根据路由键这些信息只发往特定的信息队列 ![](http://tuchuang.anyi.space/imgs/routing.svg) ### 生产者代码 - 交换机使用direct类型,direct交换机的工作方式是根据路由键(routingKey)进行信息转发,路由键的匹配方式是字符串完全匹配 - 绑定队列指定交换机和路由键 - 发布信息指定交换机的路由键 ```java package space.anyi.rabbitMQ_learn.routing; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @ProjectName: RabbitMQ-learn * @FileName: EmitLogDirect * @Author: 杨逸 * @Data:2025/10/9 8:33 * @Description: */ public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static final String[] queueNames = new String[]{"error","waring","info"}; public static final Scanner input = new Scanner(System.in); public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个交换机,使用direct类型的交换机 /** * direct类型的交换机转发信息需要根据路由键 */ channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //声明队列,三个表示不同类型日志的信息队列 for (int i = 0; i < queueNames.length; i++) { String queueName = queueNames[i]; String routingKey = queueName; //信息队列声明 channel.queueDeclare(queueName, false, false, false, null); //交换机绑定队列 /** * 交换机绑定信息队列,指定路由键(routingKey) */ channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } while (input.hasNext()) { System.out.println("请输入路由键:"); String routingKey = input.nextLine(); System.out.println("请输入信息:"); String message = input.nextLine(); //发布信息时,指定交换机exchange和路由键routingKey /** * 根据路由键转发到对应的信息队列 */ channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } } ``` ### 消费者代码 - 消费不同的信息队列 ```java package space.anyi.rabbitMQ_learn.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * @ProjectName: RabbitMQ-learn * @FileName: ReceiveLogsDirect * @Author: 杨逸 * @Data:2025/10/9 8:48 * @Description: */ public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static final String[] queueNames = new String[]{"error","waring","info"}; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (int i = 0; i < queueNames.length; i++) { String queueName = queueNames[i]; String routingKey = queueName; //信息队列声明 channel.queueDeclare(queueName, false, false, false, null); //交换机绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); //获取信息的routingKey String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); }; //模拟多个客户端消费信息 for (int i = 0; i < queueNames.length; i++) { String queueName = queueNames[i]; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } } ``` ## 主题(Topic)信息队列 [官方文档案例](https://www.rabbitmq.com/tutorials/tutorial-five-java#topics) topic交换机与direct交换机的主要区别是,topic交换机的路由键routingKey支持通配符的格式 场景:特定的一类信息给特定的一类程序处理 ![](http://tuchuang.anyi.space/imgs/topic.svg) topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割 - "*":匹配一个单词 - "#":匹配零个或多个单词 - "\*.orange.\*" :匹配"a.orange.b",也匹配"abc.orange.abc",但不匹配"aorangeb" - "a.#" :匹配"a.b.c",也匹配"a.b",也匹配"a",但不匹配"ab" ### 生产者代码 ```java package space.anyi.rabbitMQ_learn.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; /** * @ProjectName: RabbitMQ-learn * @FileName: EmitLogTopic * @Author: 杨逸 * @Data:2025/10/9 9:45 * @Description: */ public class EmitLogTopic { public static final Scanner input = new Scanner(System.in); private static final String EXCHANGE_NAME = "topic_logs"; public static final String[] queueNames = new String[]{"q1","q2"}; public static final String[] routingKeys = new String[]{"*.orange.*","a.#"}; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //创建一个topic类型的交换机 /** * topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割 * * :匹配一个单词 * # :匹配零个或多个单词 * * e.g: * *.orange.* :匹配a.orange.b,也匹配abc.orange.abc,但不匹配aorangeb * a.# :匹配a.b.c,也匹配a.b,也匹配a,但不匹配ab */ channel.exchangeDeclare(EXCHANGE_NAME, "topic"); for (int i = 0; i < queueNames.length; i++) { String queueName = queueNames[i]; String routingKey = routingKeys[i]; //创建信息队列 channel.queueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } System.out.println("请输入路由键:"); while (input.hasNext()) { String routingKey = input.nextLine(); System.out.println("请输入消息:"); String message = input.nextLine(); //生产信息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); System.out.println("请输入路由键:"); } } } } ``` ### 消费者代码 消费者通过信息队列的名称进行信息消费即可 ```java package space.anyi.rabbitMQ_learn.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * @ProjectName: RabbitMQ-learn * @FileName: ReceiveLogsTopic * @Author: 杨逸 * @Data:2025/10/9 10:05 * @Description: */ public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static final String[] queueNames = new String[]{"q1","q2"}; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //获取信息队列的名称 //String queueName = channel.queueDeclare().getQueue(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; //消费信息 for (String queueName : queueNames) { channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } } ``` ## 核心特性 ### [信息确认机制](#RabbitMQ的消息确认机制) ### 信息过期机制 [官方文档](https://www.rabbitmq.com/docs/ttl) 场景:有些信息的时效比较高,一段时间不处理后可能就不需要处理了, - 给信息队列指定过期时间(信息队列里的每一条信息都有固定的过期时间) - 在创建信息队列时指定过期时间 ```java //信息队列的额外配置参数 Map arguments = new HashMap(); //设置队列中的消息的过期时间为五秒钟 arguments.put("x-message-ttl",5*1000); //创建信息队列时,通过arguments设置队列的过期时间 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); ``` >信息在过期时间过后仍没有被消费,信息就会被丢弃 > >信息被读取,但未确认,在信息过期时间到后不会被丢弃 ![](http://tuchuang.anyi.space/imgs/Snipaste_2025-10-09_16-52-45.png) - 在信息发布时给信息指定过期时间 ```java //创建一个信息的额外配置,设置消息的过期时间为十秒钟 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build(); //在发布信息时给特定信息指定过期时间 channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); ``` > 给信息队列设置的过期时间对信息队列中的每一条信息都生效 > > 给信息设置的过期时间只对这条信息生效 ```java package space.anyi.rabbitMQ_learn.ttl; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; /** * @ProjectName: RabbitMQ-learn * @FileName: Producer * @Author: 杨逸 * @Data:2025/10/9 16:40 * @Description: */ public class Producer { private final static String QUEUE_NAME = "ttl_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //信息队列的额外配置参数 Map arguments = new HashMap(); //设置队列中的消息的过期时间为五秒钟 arguments.put("x-message-ttl",5*1000); //创建信息队列时,通过arguments设置队列的过期时间 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); String message = "Hello World!"; //创建一个信息的额外配置,设置消息的过期时间为十秒钟 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration(10*1000+"") .build(); //在发布信息时给特定信息指定过期时间 channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); /** * 总结: * 给信息队列设置的过期时间对信息队列中的每一条信息都生效 * 给信息设置的过期时间只对这条信息生效 */ } } } ``` ### 死信队列 [官方文档](https://www.rabbitmq.com/docs/dlx) 死信:因为各种原因导致无法被处理的信息被称为死信,比如信息过期,信息消费失败又没有重新放到信息队列,信息队列满了信息放不进去 死信队列:保存死信的队列,就是个普通的信息队列,只是保存的是死信 死信交换机:将死信转发到死信队列的交换机,也是一个普通的交换机 > 死信队列是为了给系统提供的一种容错机制,让无法被处理的信息,也有机会可以被处理,防止信息的丢失 - 死信的处理方法 1. 给信息队列绑定一个出现死信使用的交换机exchange 2. 给信息队列绑定一个出现死信使用的路由键routingKey ```java //创建工作交换机 channel.exchangeDeclare(WORK_EXCHANGE, "direct"); //1.创建死信交换机 channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct"); //2.创建死信队列 String dlx_name1 = "dead_letter_queue1"; channel.queueDeclare(dlx_name1, false, false, false, null); //3.死信队列绑定死信交换机 String routingKey1 = "key1"; channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1); //死信交换机配置 Map arguments1 = new HashMap<>(); /** * 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称 * 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey) */ arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); arguments1.put("x-dead-letter-routing-key", routingKey1); //4.创建工作信息队列,通过arguments参数配置死信交换机 channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1); //工作队列绑定工作交换机 channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1); ``` - 生产者代码 ```java package space.anyi.rabbitMQ_learn.dead_letter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.Scanner; /** * @ProjectName: RabbitMQ-learn * @FileName: Producer * @Author: 杨逸 * @Data:2025/10/9 18:39 * @Description: 死信队列机制 * 工作交换机将通过路由键转发到工作信息队列 * 当工作队列产生死信时,死信交换机通过路由键转发到死信队列 */ public class Producer { public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange"; public static final String WORK_EXCHANGE = "work_exchange"; public static final String WORK_QUEUE1 = "work_queue1"; public static final String WORK_QUEUE2 = "work_queue2"; public static final Scanner input = new Scanner(System.in); public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();) { //创建工作交换机 channel.exchangeDeclare(WORK_EXCHANGE, "direct"); //创建死信交换机 channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct"); //创建死信队列 String dlx_name1 = "dead_letter_queue1"; String dlx_name2 = "dead_letter_queue2"; channel.queueDeclare(dlx_name1, false, false, false, null); channel.queueDeclare(dlx_name2, false, false, false, null); //死信队列绑定死信交换机 String routingKey1 = "key1"; String routingKey2 = "key2"; channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1); channel.queueBind(dlx_name2, DEAD_LETTER_EXCHANGE, routingKey2); //死信交换机配置 Map arguments1 = new HashMap<>(); Map arguments2 = new HashMap<>(); /** * 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称 * 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey) */ arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); arguments1.put("x-dead-letter-routing-key", routingKey1); arguments2.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); arguments2.put("x-dead-letter-routing-key", routingKey2); //创建工作信息队列,通过arguments参数配置死信交换机 channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1); channel.queueDeclare(WORK_QUEUE2, false, false, false, arguments2); //工作队列绑定工作交换机 channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1); channel.queueBind(WORK_QUEUE2, WORK_EXCHANGE, routingKey2); System.out.println("请输入路由键:"); while (input.hasNext()) { String routingKey = input.nextLine(); System.out.println("请输入信息:"); String message = input.nextLine(); //发送信息 channel.basicPublish(WORK_EXCHANGE,routingKey,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); System.out.println("请输入路由键:"); } } } } ``` - 消费者代码 ```java package space.anyi.rabbitMQ_learn.dead_letter; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; /** * @ProjectName: RabbitMQ-learn * @FileName: Consumer * @Author: 杨逸 * @Data:2025/10/9 18:59 * @Description: * 消费者拒绝工作队列的信息模拟死信的产生 * 模拟其他程序消费死信队列的信息 */ public class Consumer { public static final String WORK_QUEUE1 = "work_queue1"; public static final String WORK_QUEUE2 = "work_queue2"; public static final String DLX_QUEUE_NAME1 = "dead_letter_queue1"; public static final String DLX_QUEUE_NAME2 = "dead_letter_queue2"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //工作信息的处理 DeliverCallback deliverCallback = (consumerTag,delivery)->{ long deliveryTag = delivery.getEnvelope().getDeliveryTag(); String routingKey = delivery.getEnvelope().getRoutingKey(); String message = new String(delivery.getBody(), StandardCharsets.UTF_8); //拒绝信息,模拟死信的产生 channel.basicNack(deliveryTag,false,false); System.out.println(" [worker] Received '" + routingKey + "':'" + message + "已拒绝'"); }; //死信队列的处理 DeliverCallback deliverCallback1 = (consumerTag,delivery)->{ String message = new String(delivery.getBody(), StandardCharsets.UTF_8); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [dlx1] Received '" + routingKey + "':'" + message + "'"); }; DeliverCallback deliverCallback2 = (consumerTag,delivery)->{ String message = new String(delivery.getBody(), StandardCharsets.UTF_8); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [dlx2] Received '" + routingKey + "':'" + message + "'"); }; //消费工作信息 channel.basicConsume(WORK_QUEUE1,false,deliverCallback, consumerTag->{}); channel.basicConsume(WORK_QUEUE2,false,deliverCallback, consumerTag->{}); //消费死信队列的信息 channel.basicConsume(DLX_QUEUE_NAME1,true,deliverCallback1,consumerTag->{}); channel.basicConsume(DLX_QUEUE_NAME2,true,deliverCallback2,consumerTag->{}); } } ``` ## RabbitMQ常用API - 常规操作 ```java //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.配置 //设置RabbitMQ服务端的主机地址 factory.setHost("localhost"); //实际生产中要配置用户名和密码等等 factory.setUsername(""); factory.setPassword(""); //3.创建连接,并拿到channel //我们通过channel来操作RabbitMQ Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) ``` - 创建交换机 ```java channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //方法原型 Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; ``` - 创建信息队列 ```java channel.queueDeclare(QUEUE_NAME, false, false, false, null); //方法原型 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException; ``` - 信息队列绑定交换机 ```java channel.queueBind(queue1, EXCHANGE_NAME, ""); //方法原型 Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; ``` - 发布信息 ```java channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //方法原型 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; ``` - 消费信息 ```java channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); //方法原型 String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; ``` - 给信息队列设置额外的参数 ```java //信息队列的额外配置参数 Map arguments = new HashMap(); //设置队列中的消息的过期时间为五秒钟 arguments.put("x-message-ttl",5*1000); //创建信息队列时,通过arguments设置队列的过期时间 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); ``` - 给信息设置额外的参数 ```java //创建一个信息的额外配置,设置消息的过期时间为十秒钟 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build(); //在发布信息时给特定信息指定过期时间 channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); ``` - 信息消费的处理回调 ```java DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; ``` ## RabbitMQ集成到项目中 通过springboot starter集成到项目中 导入依赖 ```xml org.springframework.boot spring-boot-starter-amqp 2.7.16 ``` 配置RabbitMQ ```yaml spring: rabbitmq: host: localhost port: 5672 username: guest password: guest ``` 自定义信息生产者 ```java package space.anyi.BI.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @ProjectName: serve * @FileName: MessageProducer * @Author: 杨逸 * @Data:2025/10/9 20:17 * @Description: 信息生产者 */ @Component public class MessageProducer { //通过RabbitTemplate操作RabbitMQ @Resource private RabbitTemplate rabbitTemplate; /** * 发送信息 * @param exchange 交换机 * @param routingKey 路由键 * @param message 信息 * @description: * @author: 杨逸 * @data:2025/10/09 20:19:06 * @since 1.0.0 */ public void sendMessage(String exchange,String routingKey,String message) { //发送信息 rabbitTemplate.convertAndSend(exchange,routingKey,message); } } ``` 自定义信息消费者 ```java package space.anyi.BI.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * @ProjectName: serve * @FileName: MessageConsumer * @Author: 杨逸 * @Data:2025/10/9 20:22 * @Description: 信息消费者 */ @Component public class MessageConsumer { /** * @param message 信息 * @param channel 通道 * @param deliverTag 消息标签 * @description: * @author: 杨逸 * @data:2025/10/09 20:29:40 * @since 1.0.0 */ //接收信息 @RabbitListener(queues = {"queue_name"},ackMode = "MANUAL") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) { //消费信息的代码 } } ``` ## RabbitMQ的重要特性 > 面试考点 1. 消息队列的概念,模型,应用场景 2. 交换机的类别,路由的绑定关系 3. 信息可靠性 1. 信息确认机制(ack,nack,reject) 2. 信息持久化(durable) 3. 信息过期机制 4. 死信队列 4. 延迟队列(类似死信队列) 5. 顺序消费,消费幂等性 6. 可扩展性 1. 集群 2. 故障恢复机制 3. 镜像 7. 运维监控告警