package space.anyi.BI.mq; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import space.anyi.BI.entity.Chart; import space.anyi.BI.entity.deepseek.DeepSeekHttpRequestData; import space.anyi.BI.entity.deepseek.DeepSeekHttpResponseData; import space.anyi.BI.exception.SystemException; import space.anyi.BI.service.ChartService; import space.anyi.BI.util.DeepSeekAIUtil; import javax.annotation.Resource; import java.io.IOException; import java.util.Map; /** * @ProjectName: serve * @FileName: MessageConsumer * @Author: 杨逸 * @Data:2025/10/9 20:22 * @Description: 信息消费者 */ @Component public class MessageConsumer { private final static Logger log = LoggerFactory.getLogger(MessageConsumer.class); @Resource private ChartService chartService; @Resource private ObjectMapper objectMapper; /** * @param message 信息 * @param channel 通道 * @param deliverTag 消息标签 * @description: * @author: 杨逸 * @data:2025/10/09 20:29:40 * @since 1.0.0 */ //接收信息 @RabbitListener(queues = {RabbitMQMessageConstant.QUEUE_NAME},ackMode = "MANUAL") public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) { //消费信息的代码 //通过信息获取处理任务 Chart chart = chartService.getById(Long.parseLong(message)); chart.setState("生成中"); chartService.updateById(chart); StringBuilder prompt = new StringBuilder("原始数据:\n"); prompt.append(chart.getChartData()); prompt.append("分析目标:\n"); prompt.append(chart.getAnalysisTarget()); prompt.append("\n.使用").append(chart.getChartType()).append("进行可视化分析.\n"); //配置prompt向AI发送请求 DeepSeekHttpRequestData requestData = DeepSeekAIUtil.createDefaultRequestData(prompt.toString()); try { DeepSeekHttpResponseData responseData = DeepSeekAIUtil.doChat(requestData); //解析AI返回的数据 String content = responseData.getChoices().get(0).getMessage().getContent(); log.info("AI返回的数据为:{}", content); //AI生成的指定内容 Map contentMap = objectMapper.readValue(content, Map.class); //数据可视化,Echarts的option代码 chart.setGeneratedChartData(contentMap.get("option").toString()); //分析结论 chart.setAnalysisConclusion(contentMap.get("conclusion").toString()); //保存到数据库 chart.setState("成功"); chart.setExecuteMessage("AI生成图表成功"); chartService.updateById(chart); //确认信息消费成功 channel.basicAck(deliverTag,false); //todo:考虑后续添加出现异常的重试机制 } catch (JsonProcessingException e) { chart.setState("失败"); chart.setExecuteMessage("AI生成图表失败"); chartService.updateById(chart); throw new SystemException(500, "AI生成图表失败"); }catch (SystemException e){ chart.setState("失败"); chart.setExecuteMessage("AI生成图表失败"); chartService.updateById(chart); throw e; } catch (IOException e) { e.printStackTrace(); } } }