Procházet zdrojové kódy

feat:工作队列demo代码,一个消息生产者对应一个队列,一个队列对应多个消费者

yangyi před 2 měsíci
rodič
revize
921718f157

+ 35 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/workQueue/NewTask.java

@@ -0,0 +1,35 @@
+package space.anyi.rabbitMQ_learn.workQueue;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+
+import java.util.Scanner;
+
+public class NewTask {
+
+    private static final String TASK_QUEUE_NAME = "task_queue";
+    private static final Scanner input = new Scanner(System.in);
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        try (Connection connection = factory.newConnection();
+             Channel channel = connection.createChannel()) {
+            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
+
+            while (input.hasNext()) {
+                //接收控制台的消息,用于发送多条消息
+                String message = input.nextLine();
+
+                //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的
+                channel.basicPublish("", TASK_QUEUE_NAME,
+                        MessageProperties.PERSISTENT_TEXT_PLAIN,
+                        message.getBytes("UTF-8"));
+                System.out.println(" [x] Sent '" + message + "'");
+            }
+        }
+    }
+
+}

+ 73 - 0
RabbitMQ-learn/src/main/java/space/anyi/rabbitMQ_learn/workQueue/Worker.java

@@ -0,0 +1,73 @@
+package space.anyi.rabbitMQ_learn.workQueue;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+public class Worker {
+
+    private static final String TASK_QUEUE_NAME = "task_queue";
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        final Connection connection = factory.newConnection();
+        final Channel channel = connection.createChannel();
+        //设置消息不再自动确认,需要手动确认
+        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
+        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
+        //Qos一次处理多少条消息,即从消息队列一次取多少条消息
+        channel.basicQos(1);
+
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+
+            System.out.println(" [x] Received '" + message + "'");
+            try {
+                //业务逻辑
+                doWork(message);
+            } finally {
+                System.out.println(" [x] Done");
+                //通过delivery.getEnvelope().getDeliveryTag()获取消息的Tag,可以认为是消息的ID
+                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+                // 消息处理完成,确认消息成功接收
+                /**
+                 * 消息确认
+                 * 第一个参数(deliveryTag)是消息的Tag
+                 * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
+                 */
+                channel.basicAck(deliveryTag, false);
+                /**
+                 * 消息拒绝
+                 * 第一个参数(deliveryTag)是消息的Tag
+                 * 第二个参数(multiple)是是否批量确认消息
+                 * 第三个参数(requeue)表示是否将消息重新放回消息队列
+                 */
+                //channel.basicNack(deliveryTag,false,true);
+                /**
+                 * 消息拒绝
+                 * 第一个参数(deliveryTag)是消息的Tag
+                 * 第二个参数(requeue)表示是否将消息重新放回消息队列
+                 */
+                //channel.basicReject(deliveryTag,true);
+            }
+        };
+        //连接到服务端,开始消费消息,取消自动确认消息,改为手动确认
+        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
+    }
+
+    /**
+     * 消息的处理
+     * @param task 消息任务
+     */
+    private static void doWork(String task) {
+        try {
+            //模拟消费消息消耗的时间
+            Thread.sleep(1000*5);
+            System.out.println("处理消息:%s".formatted(task));
+        } catch (InterruptedException _ignored) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 0 - 0
RabbitMQ-learn/src/main/resources/workQueu.svg


Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů