Parcourir la source

feat:路由信息队列,一个生产者对应一个交换机,一个交换机对应多个队列,direct交换机根据路由键进行信息转发

yang yi il y a 1 mois
Parent
commit
3051d90815

+ 60 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/routing/EmitLogDirect.java

@@ -0,0 +1,60 @@
+package space.anyi.rabbitMQ_learn.routing;
+
+import java.util.Scanner;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: EmitLogDirect
+ * @Author: 杨逸
+ * @Data:2025/10/9 8:33
+ * @Description:
+ */
+
+public class EmitLogDirect {
+
+    private static final String EXCHANGE_NAME = "direct_logs";
+    public static final String[]  queueNames = new String[]{"error","waring","info"};
+    public static final Scanner input = new Scanner(System.in);
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        try (Connection connection = factory.newConnection();
+             Channel channel = connection.createChannel()) {
+            //声明一个交换机,使用direct类型的交换机
+            /**
+             * direct类型的交换机转发信息需要根据路由键
+             */
+            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
+
+            //声明队列,三个表示不同类型日志的信息队列
+            for (int i = 0; i < queueNames.length; i++) {
+                String queueName = queueNames[i];
+                String routingKey = queueName;
+                //信息队列声明
+                channel.queueDeclare(queueName, false, false, false, null);
+                //交换机绑定队列
+                /**
+                 * 交换机绑定信息队列,指定路由键(routingKey)
+                 */
+                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
+            }
+
+            while (input.hasNext()) {
+                System.out.println("请输入路由键:");
+                String routingKey = input.nextLine();
+                System.out.println("请输入信息:");
+                String message = input.nextLine();
+                //发布信息时,指定交换机exchange和路由键routingKey
+                /**
+                 * 根据路由键转发到对应的信息队列
+                 */
+                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
+                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
+            }
+
+        }
+    }
+}

+ 52 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/routing/ReceiveLogsDirect.java

@@ -0,0 +1,52 @@
+package space.anyi.rabbitMQ_learn.routing;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: ReceiveLogsDirect
+ * @Author: 杨逸
+ * @Data:2025/10/9 8:48
+ * @Description:
+ */
+
+public class ReceiveLogsDirect {
+
+    private static final String EXCHANGE_NAME = "direct_logs";
+    public static final String[]  queueNames = new String[]{"error","waring","info"};
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        Connection connection = factory.newConnection();
+        Channel channel = connection.createChannel();
+
+        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
+
+        for (int i = 0; i < queueNames.length; i++) {
+            String queueName = queueNames[i];
+            String routingKey = queueName;
+            //信息队列声明
+            channel.queueDeclare(queueName, false, false, false, null);
+            //交换机绑定队列
+            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
+        }
+
+        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
+
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            //获取信息的routingKey
+            String routingKey = delivery.getEnvelope().getRoutingKey();
+            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
+        };
+        //模拟多个客户端消费信息
+        for (int i = 0; i < queueNames.length; i++) {
+            String queueName = queueNames[i];
+            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
+        }
+    }
+}

Fichier diff supprimé car celui-ci est trop grand
+ 0 - 0
RabbitMQ-learn/src/main/resources/routing.svg


Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff