Quellcode durchsuchen

feat:发布订阅信息信息队列,一个生产者对应一个交换机,一个交换机对应多个队列

yang yi vor 1 Monat
Ursprung
Commit
850952653c

+ 57 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/publish_subscribe/EmitLog.java

@@ -0,0 +1,57 @@
+package space.anyi.rabbitMQ_learn.publish_subscribe;
+
+import java.util.Scanner;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: EmitLog
+ * @Author: 杨逸
+ * @Data:2025/10/7 21:47
+ * @Description: 信息生产者
+ */
+public class EmitLog {
+    //交换机的名称
+    private static final String EXCHANGE_NAME = "logs";
+    public static final Scanner input = new Scanner(System.in);
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        try (Connection connection = factory.newConnection();
+             Channel channel = connection.createChannel()) {
+            //1.声明一个交换机
+            /**
+             * 第一个参数(exchange)是交换机的名称
+             * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
+             * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
+             */
+            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
+
+            String queue1 = "queue1";
+            String queue2 = "queue2";
+            //2.声明队列
+            channel.queueDeclare(queue1, false, false, false, null);
+            channel.queueDeclare(queue2, false, false, false, null);
+
+            //3.绑定队列到交换机上(注意需要先声明队列才能绑定队列)
+            /**
+             * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
+             * 第一个参数(queue)是队列的名称
+             * 第二个参数(exchange)是交换机的名称
+             * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
+             */
+            channel.queueBind(queue1, EXCHANGE_NAME, "");
+            channel.queueBind(queue2, EXCHANGE_NAME, "");
+
+            while (input.hasNext()) {
+                String message = input.nextLine();
+                //4.发布信息,向交换机发布信息
+                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
+                System.out.println(" [x] Sent '" + message + "'");
+            }
+        }
+    }
+}

+ 49 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/publish_subscribe/ReceiveLogs.java

@@ -0,0 +1,49 @@
+package space.anyi.rabbitMQ_learn.publish_subscribe;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+/**
+ * @ProjectName: RabbitMQ-learn
+ * @FileName: ReceiveLogs
+ * @Author: 杨逸
+ * @Data:2025/10/7 22:00
+ * @Description: 信息消费者
+ */
+
+public class ReceiveLogs {
+    private static final String EXCHANGE_NAME = "logs";
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        Connection connection = factory.newConnection();
+        //1.获取两个channel模拟绑定到fanout交换机的多个队列同时进行消费
+        Channel channel1 = connection.createChannel();
+        Channel channel2 = connection.createChannel();
+        //消费者不需要绑定到交换机
+        //channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
+        //channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
+        //消费者也不需要绑定到信息队列
+        String queueName1 = "queue1";
+        String queueName2 = "queue2";
+        //channel1.queueBind(queueName1, EXCHANGE_NAME, "");
+        //channel2.queueBind(queueName2, EXCHANGE_NAME, "");
+
+        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
+        //两个消费信息的回调
+        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(" [消费者1] Received '" + message + "'");
+        };
+        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(" [消费者2] Received '" + message + "'");
+        };
+        //2.消费对应队列的信息
+        channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
+        channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
+    }
+}

Datei-Diff unterdrückt, da er zu groß ist
+ 0 - 0
RabbitMQ-learn/src/main/resources/publish_subscribe.svg


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.