Просмотр исходного кода

feat:死信队列,死信交换机与死信交换机的绑定,死信队列的声明,死信队列的绑定,死信队列的消费者,处理死信队列的demo

yang yi 1 месяц назад
Родитель
Сommit
76efbbebac

+ 59 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/dead_letter/Consumer.java

@@ -0,0 +1,59 @@
+package space.anyi.rabbitMQ_learn.dead_letter;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: Consumer
+ * @Author: 杨逸
+ * @Data:2025/10/9 18:59
+ * @Description:
+ * 消费者拒绝工作队列的信息模拟死信的产生
+ * 模拟其他程序消费死信队列的信息
+ */
+public class Consumer {
+    public static final String WORK_QUEUE1 = "work_queue1";
+    public static final String WORK_QUEUE2 = "work_queue2";
+    public static final String DLX_QUEUE_NAME1 = "dead_letter_queue1";
+    public static final String DLX_QUEUE_NAME2 = "dead_letter_queue2";
+    public static void main(String[] args) throws IOException, TimeoutException {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost("localhost");
+        Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel();
+
+        //工作信息的处理
+        DeliverCallback deliverCallback = (consumerTag,delivery)->{
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            String routingKey = delivery.getEnvelope().getRoutingKey();
+            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+            //拒绝信息,模拟死信的产生
+            channel.basicNack(deliveryTag,false,false);
+            System.out.println(" [worker] Received '" + routingKey + "':'" + message + "已拒绝'");
+        };
+        //死信队列的处理
+        DeliverCallback deliverCallback1 = (consumerTag,delivery)->{
+            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+            String routingKey = delivery.getEnvelope().getRoutingKey();
+            System.out.println(" [dlx1] Received '" + routingKey + "':'" + message + "'");
+        };
+        DeliverCallback deliverCallback2 = (consumerTag,delivery)->{
+            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
+            String routingKey = delivery.getEnvelope().getRoutingKey();
+            System.out.println(" [dlx2] Received '" + routingKey + "':'" + message + "'");
+        };
+        //消费工作信息
+        channel.basicConsume(WORK_QUEUE1,false,deliverCallback, consumerTag->{});
+        channel.basicConsume(WORK_QUEUE2,false,deliverCallback, consumerTag->{});
+        //消费死信队列的信息
+        channel.basicConsume(DLX_QUEUE_NAME1,true,deliverCallback1,consumerTag->{});
+        channel.basicConsume(DLX_QUEUE_NAME2,true,deliverCallback2,consumerTag->{});
+    }
+}

+ 79 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/dead_letter/Producer.java

@@ -0,0 +1,79 @@
+package space.anyi.rabbitMQ_learn.dead_letter;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.Scanner;
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: Producer
+ * @Author: 杨逸
+ * @Data:2025/10/9 18:39
+ * @Description: 死信队列机制
+ * 工作交换机将通过路由键转发到工作信息队列
+ * 当工作队列产生死信时,死信交换机通过路由键转发到死信队列
+ */
+public class Producer {
+    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
+    public static final String WORK_EXCHANGE = "work_exchange";
+    public static final String WORK_QUEUE1 = "work_queue1";
+    public static final String WORK_QUEUE2 = "work_queue2";
+    public static final Scanner input = new Scanner(System.in);
+    public static void main(String[] args) throws IOException, TimeoutException {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost("localhost");
+        try (Connection connection = connectionFactory.newConnection();
+             Channel channel = connection.createChannel();) {
+            //创建工作交换机
+            channel.exchangeDeclare(WORK_EXCHANGE, "direct");
+            //创建死信交换机
+            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
+
+            //创建死信队列
+            String dlx_name1 = "dead_letter_queue1";
+            String dlx_name2 = "dead_letter_queue2";
+            channel.queueDeclare(dlx_name1, false, false, false, null);
+            channel.queueDeclare(dlx_name2, false, false, false, null);
+            //死信队列绑定死信交换机
+            String routingKey1 = "key1";
+            String routingKey2 = "key2";
+            channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
+            channel.queueBind(dlx_name2, DEAD_LETTER_EXCHANGE, routingKey2);
+
+            //死信交换机配置
+            Map<String,Object> arguments1 = new HashMap<>();
+            Map<String,Object> arguments2 = new HashMap<>();
+            /**
+             * 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
+             * 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
+             */
+            arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
+            arguments1.put("x-dead-letter-routing-key", routingKey1);
+            arguments2.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
+            arguments2.put("x-dead-letter-routing-key", routingKey2);
+            //创建工作信息队列,通过arguments参数配置死信交换机
+            channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
+            channel.queueDeclare(WORK_QUEUE2, false, false, false, arguments2);
+            //工作队列绑定工作交换机
+            channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
+            channel.queueBind(WORK_QUEUE2, WORK_EXCHANGE, routingKey2);
+
+            System.out.println("请输入路由键:");
+            while (input.hasNext()) {
+                String routingKey = input.nextLine();
+                System.out.println("请输入信息:");
+                String message = input.nextLine();
+                //发送信息
+                channel.basicPublish(WORK_EXCHANGE,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
+                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
+                System.out.println("请输入路由键:");
+            }
+        }
+    }
+}