| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package space.anyi.BI.mq;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.rabbitmq.client.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
- import space.anyi.BI.entity.Chart;
- import space.anyi.BI.entity.deepseek.DeepSeekHttpRequestData;
- import space.anyi.BI.entity.deepseek.DeepSeekHttpResponseData;
- import space.anyi.BI.exception.SystemException;
- import space.anyi.BI.service.ChartService;
- import space.anyi.BI.util.DeepSeekAIUtil;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.util.Map;
- /**
- * @ProjectName: serve
- * @FileName: MessageConsumer
- * @Author: 杨逸
- * @Data:2025/10/9 20:22
- * @Description: 信息消费者
- */
- @Component
- public class MessageConsumer {
- private final static Logger log = LoggerFactory.getLogger(MessageConsumer.class);
- @Resource
- private ChartService chartService;
- @Resource
- private ObjectMapper objectMapper;
- /**
- * @param message 信息
- * @param channel 通道
- * @param deliverTag 消息标签
- * @description:
- * @author: 杨逸
- * @data:2025/10/09 20:29:40
- * @since 1.0.0
- */
- //接收信息
- @RabbitListener(queues = {RabbitMQMessageConstant.QUEUE_NAME},ackMode = "MANUAL")
- public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
- //消费信息的代码
- //通过信息获取处理任务
- Chart chart = chartService.getById(Long.parseLong(message));
- chart.setState("生成中");
- chartService.updateById(chart);
- StringBuilder prompt = new StringBuilder("原始数据:\n");
- prompt.append(chart.getChartData());
- prompt.append("分析目标:\n");
- prompt.append(chart.getAnalysisTarget());
- prompt.append("\n.使用").append(chart.getChartType()).append("进行可视化分析.\n");
- //配置prompt向AI发送请求
- DeepSeekHttpRequestData requestData = DeepSeekAIUtil.createDefaultRequestData(prompt.toString());
- try {
- DeepSeekHttpResponseData responseData = DeepSeekAIUtil.doChat(requestData);
- //解析AI返回的数据
- String content = responseData.getChoices().get(0).getMessage().getContent();
- log.info("AI返回的数据为:{}", content);
- //AI生成的指定内容
- Map contentMap = objectMapper.readValue(content, Map.class);
- //数据可视化,Echarts的option代码
- chart.setGeneratedChartData(contentMap.get("option").toString());
- //分析结论
- chart.setAnalysisConclusion(contentMap.get("conclusion").toString());
- //保存到数据库
- chart.setState("成功");
- chart.setExecuteMessage("AI生成图表成功");
- chartService.updateById(chart);
- //确认信息消费成功
- channel.basicAck(deliverTag,false);
- //todo:考虑后续添加出现异常的重试机制
- } catch (JsonProcessingException e) {
- chart.setState("失败");
- chart.setExecuteMessage("AI生成图表失败");
- chartService.updateById(chart);
- throw new SystemException(500, "AI生成图表失败");
- }catch (SystemException e){
- chart.setState("失败");
- chart.setExecuteMessage("AI生成图表失败");
- chartService.updateById(chart);
- throw e;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
|