Sfoglia il codice sorgente

feat:使用RabbitMQ进行异步化改造,解决使用线程池因程序重启导致任务丢失的问题

yang yi 1 mese fa
parent
commit
ff44e6c859

+ 6 - 0
serve/pom.xml

@@ -86,6 +86,12 @@
             <artifactId>redisson</artifactId>
             <version>3.23.3</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+            <version>2.7.16</version>
+        </dependency>
 
 
     </dependencies>

+ 97 - 0
serve/src/main/java/space/anyi/BI/mq/MessageConsumer.java

@@ -0,0 +1,97 @@
+package space.anyi.BI.mq;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.support.AmqpHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+import space.anyi.BI.entity.Chart;
+import space.anyi.BI.entity.deepseek.DeepSeekHttpRequestData;
+import space.anyi.BI.entity.deepseek.DeepSeekHttpResponseData;
+import space.anyi.BI.exception.SystemException;
+import space.anyi.BI.service.ChartService;
+import space.anyi.BI.util.DeepSeekAIUtil;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @ProjectName: serve
+ * @FileName: MessageConsumer
+ * @Author: 杨逸
+ * @Data:2025/10/9 20:22
+ * @Description: 信息消费者
+ */
+@Component
+public class MessageConsumer {
+    private final static Logger log = LoggerFactory.getLogger(MessageConsumer.class);
+    @Resource
+    private ChartService chartService;
+    @Resource
+    private ObjectMapper objectMapper;
+    /**
+     * @param message 信息
+     * @param channel 通道
+     * @param deliverTag 消息标签
+     * @description:
+     * @author: 杨逸
+     * @data:2025/10/09 20:29:40
+     * @since 1.0.0
+     */
+    //接收信息
+    @RabbitListener(queues = {RabbitMQMessageConstant.QUEUE_NAME},ackMode = "MANUAL")
+    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
+        //消费信息的代码
+        //通过信息获取处理任务
+        Chart chart = chartService.getById(Long.parseLong(message));
+        chart.setState("生成中");
+        chartService.updateById(chart);
+
+        StringBuilder prompt = new StringBuilder("原始数据:\n");
+        prompt.append(chart.getChartData());
+        prompt.append("分析目标:\n");
+        prompt.append(chart.getAnalysisTarget());
+        prompt.append("\n.使用").append(chart.getChartType()).append("进行可视化分析.\n");
+        //配置prompt向AI发送请求
+        DeepSeekHttpRequestData requestData = DeepSeekAIUtil.createDefaultRequestData(prompt.toString());
+
+        try {
+            DeepSeekHttpResponseData responseData = DeepSeekAIUtil.doChat(requestData);
+            //解析AI返回的数据
+            String content = responseData.getChoices().get(0).getMessage().getContent();
+            log.info("AI返回的数据为:{}", content);
+            //AI生成的指定内容
+            Map contentMap = objectMapper.readValue(content, Map.class);
+            //数据可视化,Echarts的option代码
+            chart.setGeneratedChartData(contentMap.get("option").toString());
+            //分析结论
+            chart.setAnalysisConclusion(contentMap.get("conclusion").toString());
+            //保存到数据库
+            chart.setState("成功");
+            chart.setExecuteMessage("AI生成图表成功");
+            chartService.updateById(chart);
+
+            //确认信息消费成功
+            channel.basicAck(deliverTag,false);
+            //todo:考虑后续添加出现异常的重试机制
+        } catch (JsonProcessingException e) {
+            chart.setState("失败");
+            chart.setExecuteMessage("AI生成图表失败");
+            chartService.updateById(chart);
+            throw new SystemException(500, "AI生成图表失败");
+        }catch (SystemException e){
+            chart.setState("失败");
+            chart.setExecuteMessage("AI生成图表失败");
+            chartService.updateById(chart);
+            throw e;
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+}

+ 39 - 0
serve/src/main/java/space/anyi/BI/mq/MessageProducer.java

@@ -0,0 +1,39 @@
+package space.anyi.BI.mq;
+
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @ProjectName: serve
+ * @FileName: MessageProducer
+ * @Author: 杨逸
+ * @Data:2025/10/9 20:17
+ * @Description: 信息生产者
+ */
+@Component
+public class MessageProducer {
+    //通过RabbitTemplate操作RabbitMQ
+    @Resource
+    private RabbitTemplate rabbitTemplate;
+
+    /**
+     * 发送信息
+     * @param exchange 交换机
+     * @param routingKey 路由键
+     * @param message 信息
+     * @description:
+     * @author: 杨逸
+     * @data:2025/10/09 20:19:06
+     * @since 1.0.0
+     */
+    public void sendMessage(String exchange,String routingKey,String message) {
+        //发送信息
+        rabbitTemplate.convertAndSend(exchange,routingKey,message);
+    }
+    public void sendMessage(String message) {
+        //发送信息
+        sendMessage(RabbitMQMessageConstant.EXCHANGE_NAME,RabbitMQMessageConstant.ROUTING_KEY,message);
+    }
+}

+ 14 - 0
serve/src/main/java/space/anyi/BI/mq/RabbitMQMessageConstant.java

@@ -0,0 +1,14 @@
+package space.anyi.BI.mq;
+
+/**
+ * @ProjectName: serve
+ * @FileName: RabbitMQMessageConstant
+ * @Author: 杨逸
+ * @Data:2025/10/10 22:19
+ * @Description: RabbitMQ使用的信息常量
+ */
+public class RabbitMQMessageConstant {
+    public static final String EXCHANGE_NAME = "bi_exchange";
+    public static final String QUEUE_NAME = "bi_queue";
+    public static final String ROUTING_KEY = "bi_routing_key";
+}

+ 36 - 0
serve/src/main/java/space/anyi/BI/mq/RabbitMQinit.java

@@ -0,0 +1,36 @@
+package space.anyi.BI.mq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * @ProjectName: serve
+ * @FileName: RabbitMQinit
+ * @Author: 杨逸
+ * @Data:2025/10/10 22:36
+ * @Description: RabbitMQ初始化,仅在部署项目时运行一次
+ */
+public class RabbitMQinit {
+    public static void main(String[] args) throws IOException, TimeoutException {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        //基础配置
+        connectionFactory.setHost("localhost");
+        connectionFactory.setPort(5672);
+        connectionFactory.setUsername("guest");
+        connectionFactory.setPassword("guest");
+        //业务配置
+        try (Connection connection = connectionFactory.newConnection();
+             Channel channel = connection.createChannel();) {
+            //声明交换机
+            channel.exchangeDeclare(RabbitMQMessageConstant.EXCHANGE_NAME, "direct");
+            //声明队列
+            channel.queueDeclare(RabbitMQMessageConstant.QUEUE_NAME,true,false,false,null);
+            //绑定队列和交换机
+            channel.queueBind(RabbitMQMessageConstant.QUEUE_NAME,RabbitMQMessageConstant.EXCHANGE_NAME,RabbitMQMessageConstant.ROUTING_KEY);
+        }
+    }
+}

+ 4 - 51
serve/src/main/java/space/anyi/BI/service/impl/ChartServiceImpl.java

@@ -17,6 +17,7 @@ import space.anyi.BI.entity.vo.PageVO;
 import space.anyi.BI.entity.deepseek.DeepSeekHttpRequestData;
 import space.anyi.BI.entity.deepseek.DeepSeekHttpResponseData;
 import space.anyi.BI.exception.SystemException;
+import space.anyi.BI.mq.MessageProducer;
 import space.anyi.BI.service.ChartService;
 import space.anyi.BI.mapper.ChartMapper;
 import org.springframework.stereotype.Service;
@@ -43,6 +44,8 @@ public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart>
     private ThreadPoolExecutor threadPoolExecutor;
     @Resource
     private ObjectMapper objectMapper;
+    @Resource
+    private MessageProducer messageProducer;
 
     /**
      * 异步使用AI生成图表
@@ -79,57 +82,7 @@ public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart>
         }
         chart.setChartData(csvData);
         save(chart);
-        //使用线程池优化生成图表的逻辑
-        String finalCsvData = csvData;
-        try {
-            threadPoolExecutor.execute(()->{
-                chart.setState("生成中");
-               updateById(chart);
-
-                StringBuilder message = new StringBuilder("原始数据:\n");
-                message.append(finalCsvData);
-                message.append("分析目标:\n");
-                message.append(chartDTO.getAnalysisTarget());
-                message.append("\n.使用").append(chartDTO.getChartType()).append("进行可视化分析.\n");
-                //配置prompt向AI发送请求
-                DeepSeekHttpRequestData requestData = DeepSeekAIUtil.createDefaultRequestData(message.toString());
-
-                //AI生成的指定内容
-                Map contentMap = null;
-                try {
-                    DeepSeekHttpResponseData responseData = DeepSeekAIUtil.doChat(requestData);
-                    //解析AI返回的数据
-                    String content = responseData.getChoices().get(0).getMessage().getContent();
-                    log.info("AI返回的数据为:{}", content);
-                    contentMap = objectMapper.readValue(content, Map.class);
-                } catch (JsonProcessingException e) {
-                    chart.setState("失败");
-                    chart.setExecuteMessage("AI生成图表失败");
-                    updateById(chart);
-                    throw new SystemException(500, "AI生成图表失败");
-                }catch (SystemException e){
-                    chart.setState("失败");
-                    chart.setExecuteMessage("AI生成图表失败");
-                    updateById(chart);
-                    throw e;
-                }
-                //数据可视化,Echarts的option代码
-                chart.setGeneratedChartData(contentMap.get("option").toString());
-                //分析结论
-                chart.setAnalysisConclusion(contentMap.get("conclusion").toString());
-                //保存到数据库
-                chart.setState("成功");
-                chart.setExecuteMessage("AI生成图表成功");
-                updateById(chart);
-            });
-        } catch (RejectedExecutionException e) {
-            //e.printStackTrace();
-            log.error("线程池已满,请稍后再试",e);
-            chart.setState("失败");
-            chart.setExecuteMessage("系统异常");
-            updateById(chart);
-        }
-
+        messageProducer.sendMessage(chartId+"");
         return chartId;
     }
 

+ 5 - 0
serve/src/main/resources/application.yaml

@@ -13,6 +13,11 @@ spring:
     database: 0
     host: localhost
     port: 6379
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: guest
+    password: guest
 server:
   port: 8888
   servlet: