MessageConsumer.java 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package space.anyi.BI.mq;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.rabbitmq.client.Channel;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.support.AmqpHeaders;
  9. import org.springframework.messaging.handler.annotation.Header;
  10. import org.springframework.stereotype.Component;
  11. import space.anyi.BI.entity.Chart;
  12. import space.anyi.BI.entity.deepseek.DeepSeekHttpRequestData;
  13. import space.anyi.BI.entity.deepseek.DeepSeekHttpResponseData;
  14. import space.anyi.BI.exception.SystemException;
  15. import space.anyi.BI.service.ChartService;
  16. import space.anyi.BI.util.DeepSeekAIUtil;
  17. import javax.annotation.Resource;
  18. import java.io.IOException;
  19. import java.util.Map;
  20. /**
  21. * @ProjectName: serve
  22. * @FileName: MessageConsumer
  23. * @Author: 杨逸
  24. * @Data:2025/10/9 20:22
  25. * @Description: 信息消费者
  26. */
  27. @Component
  28. public class MessageConsumer {
  29. private final static Logger log = LoggerFactory.getLogger(MessageConsumer.class);
  30. @Resource
  31. private ChartService chartService;
  32. @Resource
  33. private ObjectMapper objectMapper;
  34. /**
  35. * @param message 信息
  36. * @param channel 通道
  37. * @param deliverTag 消息标签
  38. * @description:
  39. * @author: 杨逸
  40. * @data:2025/10/09 20:29:40
  41. * @since 1.0.0
  42. */
  43. //接收信息
  44. @RabbitListener(queues = {RabbitMQMessageConstant.QUEUE_NAME},ackMode = "MANUAL")
  45. public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
  46. //消费信息的代码
  47. //通过信息获取处理任务
  48. Chart chart = chartService.getById(Long.parseLong(message));
  49. chart.setState("生成中");
  50. chartService.updateById(chart);
  51. StringBuilder prompt = new StringBuilder("原始数据:\n");
  52. prompt.append(chart.getChartData());
  53. prompt.append("分析目标:\n");
  54. prompt.append(chart.getAnalysisTarget());
  55. prompt.append("\n.使用").append(chart.getChartType()).append("进行可视化分析.\n");
  56. //配置prompt向AI发送请求
  57. DeepSeekHttpRequestData requestData = DeepSeekAIUtil.createDefaultRequestData(prompt.toString());
  58. try {
  59. DeepSeekHttpResponseData responseData = DeepSeekAIUtil.doChat(requestData);
  60. //解析AI返回的数据
  61. String content = responseData.getChoices().get(0).getMessage().getContent();
  62. log.info("AI返回的数据为:{}", content);
  63. //AI生成的指定内容
  64. Map contentMap = objectMapper.readValue(content, Map.class);
  65. //数据可视化,Echarts的option代码
  66. chart.setGeneratedChartData(contentMap.get("option").toString());
  67. //分析结论
  68. chart.setAnalysisConclusion(contentMap.get("conclusion").toString());
  69. //保存到数据库
  70. chart.setState("成功");
  71. chart.setExecuteMessage("AI生成图表成功");
  72. chartService.updateById(chart);
  73. //确认信息消费成功
  74. channel.basicAck(deliverTag,false);
  75. //todo:考虑后续添加出现异常的重试机制
  76. } catch (JsonProcessingException e) {
  77. chart.setState("失败");
  78. chart.setExecuteMessage("AI生成图表失败");
  79. chartService.updateById(chart);
  80. throw new SystemException(500, "AI生成图表失败");
  81. }catch (SystemException e){
  82. chart.setState("失败");
  83. chart.setExecuteMessage("AI生成图表失败");
  84. chartService.updateById(chart);
  85. throw e;
  86. } catch (IOException e) {
  87. e.printStackTrace();
  88. }
  89. }
  90. }