Commit b35d4689 authored by ligaowei's avatar ligaowei

添加对LLM 401错误的处理,当接收到401 Unauthorized错误时提示用户配置API密钥

parent 9e67a5c4
package pangea.hiagent.agent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.dto.AgentRequest;
import pangea.hiagent.dto.WorkPanelEvent;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.AgentDialogue;
import pangea.hiagent.service.AgentService;
import pangea.hiagent.agent.ReActService;
import pangea.hiagent.workpanel.SseEventManager;
import pangea.hiagent.memory.SmartHistorySummarizer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
/**
* Agent 对话服务
* 负责处理与 Agent 的对话逻辑
*/
@Slf4j
@Service
public class AgentChatService {
@Autowired
private AgentService agentService;
@Autowired
private ReActService reActService;
@Autowired
private ChatMemory chatMemory;
@Autowired
private SmartHistorySummarizer smartHistorySummarizer;
private static final String DEFAULT_SYSTEM_PROMPT = "你是一个智能助手";
/**
* 处理 ReAct Agent 请求
*/
public String handleReActAgentRequest(Agent agent, AgentRequest request, String userId) {
log.info("使用ReAct Agent处理请求");
// 使用ReAct Agent处理请求,传递userId以支持记忆功能
String responseContent = reActService.processRequestWithUserId(agent, request.getUserMessage(), userId);
// 保存对话记录并返回结果
return responseContent;
}
/**
* 处理普通 Agent 请求
*/
public String handleNormalAgentRequest(Agent agent, AgentRequest request, String userId) {
log.info("使用普通Agent处理请求");
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = userId + "_" + agent.getId();
// 使用ChatMemory管理对话历史
// 添加用户消息到ChatMemory
UserMessage userMessage = new UserMessage(request.getUserMessage());
chatMemory.add(sessionId, Collections.singletonList(userMessage));
// 获取系统提示词
String systemPrompt = agent.getPromptTemplate() != null ? agent.getPromptTemplate() : DEFAULT_SYSTEM_PROMPT;
// 构建Prompt,使用Agent配置的历史记录长度
int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10;
Prompt prompt = buildPrompt(systemPrompt, historyLength, sessionId);
log.info("构建提示词完成");
// 根据Agent配置获取对应的ChatModel
ChatModel chatModel = agentService.getChatModelForAgent(agent);
log.info("获取ChatModel成功");
// 使用对应模型进行调用
log.info("开始调用模型");
try {
org.springframework.ai.chat.model.ChatResponse chatResponse = chatModel.call(prompt);
log.info("模型调用完成");
// 提取助理回复
String responseContent = chatResponse.getResult().getOutput().getText();
log.info("模型调用成功,响应内容长度: {}", responseContent.length());
// 将助理回复添加到ChatMemory
AssistantMessage assistantMessage = new AssistantMessage(responseContent);
chatMemory.add(sessionId, Collections.singletonList(assistantMessage));
return responseContent;
} catch (Exception e) {
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
throw new RuntimeException(" 请配置API密钥");
} else {
log.error("模型调用失败", e);
throw new RuntimeException("模型调用失败: " + e.getMessage());
}
}
}
/**
* 处理 ReAct Agent 流式请求
*/
public void processReActAgentStream(AgentRequest request, Agent agent, String userId,
Consumer<String> tokenConsumer) {
log.info("使用ReAct Agent处理流式请求");
// 使用ReAct Agent流式处理请求,传递userId以支持记忆功能
reActService.processRequestStreamWithUserId(agent, request.getUserMessage(), tokenConsumer, userId);
}
/**
* 处理普通 Agent 流式请求
*/
public void processNormalAgentStream(AgentRequest request, Agent agent, String userId,
Consumer<String> tokenConsumer) {
try {
log.info("使用普通Agent处理流式请求");
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = userId + "_" + agent.getId();
// 准备对话上下文
prepareChatContext(request, sessionId);
// 获取系统提示词
String systemPrompt = agent.getPromptTemplate() != null ? agent.getPromptTemplate() : DEFAULT_SYSTEM_PROMPT;
// 构建Prompt,使用Agent配置的历史记录长度
int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10;
Prompt prompt = buildPrompt(systemPrompt, historyLength, sessionId);
// 获取流式模型
StreamingChatModel streamingChatModel = getStreamingChatModel(agent);
if (streamingChatModel == null) {
log.warn("当前模型不支持流式输出");
return;
}
// 流式处理
handleStreamingResponse(tokenConsumer, prompt, streamingChatModel, sessionId);
} catch (Exception e) {
log.error("普通Agent流式处理失败", e);
}
}
/**
* 准备对话上下文
*/
private void prepareChatContext(AgentRequest request, String sessionId) {
UserMessage userMessage = new UserMessage(request.getUserMessage());
chatMemory.add(sessionId, Collections.singletonList(userMessage));
}
/**
* 构建Prompt
*/
private Prompt buildPrompt(String systemPrompt, int historyLength, String sessionId) {
List<org.springframework.ai.chat.messages.Message> historyMessages = chatMemory.get(sessionId, historyLength);
// 使用智能历史摘要器优化历史消息
List<org.springframework.ai.chat.messages.Message> summarizedMessages =
smartHistorySummarizer.summarize(historyMessages, historyLength);
List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>();
messages.add(new org.springframework.ai.chat.messages.SystemMessage(systemPrompt));
messages.addAll(summarizedMessages);
return new Prompt(messages);
}
/**
* 获取流式模型
*/
private StreamingChatModel getStreamingChatModel(Agent agent) {
try {
ChatModel chatModel = agentService.getChatModelForAgent(agent);
if (!(chatModel instanceof StreamingChatModel)) {
log.warn("模型不支持流式输出: {}", chatModel.getClass().getName());
return null;
}
return (StreamingChatModel) chatModel;
} catch (Exception e) {
log.error("获取流式模型失败", e);
return null;
}
}
/**
* 处理流式响应
*/
private void handleStreamingResponse(Consumer<String> tokenConsumer, Prompt prompt,
StreamingChatModel streamingChatModel, String sessionId) {
StringBuilder fullText = new StringBuilder();
streamingChatModel.stream(prompt)
.subscribe(chatResponse -> {
try {
String token = chatResponse.getResult().getOutput().getText();
if (token != null && !token.isEmpty()) {
fullText.append(token);
tokenConsumer.accept(token);
}
} catch (Exception e) {
log.error("处理token时发生错误", e);
}
}, throwable -> {
log.error("流式调用出错", throwable);
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(throwable)) {
log.error("LLM返回401未授权错误: {}", throwable.getMessage());
if (tokenConsumer != null) {
tokenConsumer.accept(" 请配置API密钥");
}
}
}, () -> {
try {
// 添加助理回复到ChatMemory
AssistantMessage assistantMessage = new AssistantMessage(fullText.toString());
chatMemory.add(sessionId, Collections.singletonList(assistantMessage));
// 发送完成事件,包含完整内容
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString());
}
log.info("流式处理完成,总字符数: {}", fullText.length());
} catch (Exception e) {
log.error("保存对话记录失败", e);
}
});
}
/**
* 判断异常是否为401未授权错误
* @param e 异常对象
* @return 是否为401错误
*/
private boolean isUnauthorizedError(Throwable e) {
if (e == null) {
return false;
}
// 检查异常消息中是否包含401 Unauthorized
String message = e.getMessage();
if (message != null && (message.contains("401 Unauthorized") || message.contains("Unauthorized"))) {
return true;
}
// 递归检查cause
return isUnauthorizedError(e.getCause());
}
/**
* 创建token事件数据
*/
public java.util.Map<String, Object> createTokenEventData(String token, String fullText) {
java.util.Map<String, Object> data = new java.util.HashMap<>();
data.put("token", token);
data.put("fullText", fullText);
data.put("isDone", false);
return data;
}
/**
* 创建优化的token事件数据(用于提高传输效率、支持打字机效果)
*
* @param token 当前接收到的token
* @return 优化后的数据
*/
public java.util.Map<String, Object> createOptimizedTokenEventData(String token) {
java.util.Map<String, Object> data = new java.util.HashMap<>();
// 验证token是否有效
if (token == null || token.isEmpty()) {
return data;
}
// 添加token内容
data.put("token", token);
// 为前端打字机效果添加元数据
data.put("tokenLength", token.length());
// 只在DEBUG级别记录日志,避免生产环境性能下降
if (log.isDebugEnabled()) {
if (token.length() > 50) {
log.debug("token事件已发送: length={}", token.length());
} else {
log.debug("token事件已发送: content={}", token);
}
}
// 不再每次都发送完整的fullText,减少数据传输量
// 只在完成事件时发送完整内容
return data;
}
/**
* 创建批量token事件数据
* 优化版本:根据内容类型动态调整批量大小,提高传输效率
*
* @param tokens 批量token字符串
* @return 批量处理后的数据
*/
public java.util.Map<String, Object> createBatchTokenEventData(String tokens) {
java.util.Map<String, Object> data = new java.util.HashMap<>();
// 验证tokens是否有效
if (tokens == null || tokens.isEmpty()) {
return data;
}
// 添加批量token内容
data.put("token", tokens);
// 为前端打字机效果添加元数据
data.put("tokenLength", tokens.length());
data.put("isBatch", true); // 标记这是批量数据
// 只在DEBUG级别记录日志
if (log.isDebugEnabled()) {
log.debug("批量token事件已发送: length={}", tokens.length());
}
return data;
}
/**
* 保存对话记录
*/
public void saveDialogue(Agent agent, AgentRequest request, String userId, String responseContent) {
try {
// 创建对话记录
AgentDialogue dialogue = AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
// 保存对话记录
agentService.saveDialogue(dialogue);
} catch (Exception e) {
log.error("保存对话记录失败", e);
throw new RuntimeException("保存对话记录失败", e);
}
}
/**
* 保存对话记录并发送完成事件
*/
public void saveDialogueAndSendCompleteEvent(Agent agent, AgentRequest request, String userId,
String responseContent, org.springframework.web.servlet.mvc.method.annotation.SseEmitter emitter,
java.util.concurrent.atomic.AtomicBoolean isCompleted, AgentService agentService, SseEventManager sseEventManager) throws java.io.IOException {
try {
// 添加调试日志
if (responseContent == null || responseContent.isEmpty()) {
log.warn("saveDialogueAndSendCompleteEvent接收到空的responseContent!");
} else {
log.info("saveDialogueAndSendCompleteEvent接收到的responseContent长度: {}, 内容预览: {}",
responseContent.length() > 100 ? responseContent.substring(0, 100) + "..." : responseContent);
}
// 创建并保存对话记录
AgentDialogue dialogue = AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
agentService.saveDialogue(dialogue);
// 发送完成事件
java.util.Map<String, Object> data = new java.util.HashMap<>();
data.put("fullText", responseContent);
data.put("dialogueId", dialogue.getId());
data.put("isDone", true);
// 发送完成事件到前端
sseEventManager.sendEvent(emitter, "complete", data, isCompleted);
} catch (Exception e) {
log.error("保存对话记录或发送完成事件失败", e);
// 检查是否是客户端断开连接导致的异常
if (e instanceof java.io.IOException || (e.getCause() instanceof java.io.IOException)) {
log.debug("客户端连接已断开,忽略异常");
isCompleted.set(true);
}
// 检查是否是响应已提交导致的异常
else if (e.getMessage() != null && e.getMessage().contains("response has already been committed")) {
log.debug("响应已提交,忽略异常");
isCompleted.set(true);
}
// 其他异常情况下尝试发送错误信息
else {
log.warn("发送错误信息失败", e);
}
}
}
/**
* 处理 ReAct Agent 流式请求(带 SSE)
*/
public void processReActAgentStreamWithSse(AgentRequest request, Agent agent, String userId,
SseEmitter emitter, SseEventManager sseEventManager, ReActService reActService,
SaveDialogueAndSendCompleteEventFunction saveDialogueAndSendCompleteEvent) {
// 如果传入的reActService为null,则使用自动注入的实例
if (reActService == null) {
reActService = this.reActService;
}
// 用于累积token的缓冲区
StringBuilder tokenBuffer = new StringBuilder();
// 批量大小阈值 - 适合中文流式输出,提高传输效率
int BATCH_SIZE = 50; // 增加批量大小到50个字符,减少SSE事件频率
// 时间间隔阈值(毫秒)- 保证响应及时性
long FLUSH_INTERVAL = 200; // 200ms刷新间隔,平衡效率与响应速度
// 上次刷新时间
java.util.concurrent.atomic.AtomicLong lastFlushTime = new java.util.concurrent.atomic.AtomicLong(System.currentTimeMillis());
// 是否已完成标记
java.util.concurrent.atomic.AtomicBoolean isCompleted = new java.util.concurrent.atomic.AtomicBoolean(false);
// 完整内容 - 使用对象包装以支持线程安全的最终赋值
final java.util.concurrent.atomic.AtomicReference<String> fullTextRef = new java.util.concurrent.atomic.AtomicReference<>("");
try {
log.info("使用ReAct Agent处理流式请求,批量大小: {}, 刷新间隔: {}ms", BATCH_SIZE, FLUSH_INTERVAL);
// 启动心跳保活机制
sseEventManager.startHeartbeat(emitter, new java.util.concurrent.atomic.AtomicBoolean(false));
// 订阅工作面板事件,并通过SSE推送到前端
reActService.setWorkPanelEventSubscriber(event -> {
try {
if (isCompleted.get()) {
if (log.isTraceEnabled()) {
log.trace("连接已完成,跳过推送工作面板事件");
}
return;
}
// 将工作面板事件转换为SSE事件推送
sseEventManager.sendWorkPanelEvent(emitter, (WorkPanelEvent) event, isCompleted);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("推送工作面板事件失败: {}", e.getMessage());
}
}
});
// 使用ReAct Agent流式处理请求
processReActAgentStream(request, agent, userId, new TokenConsumerWithCompletion() {
@Override
public void accept(String token) {
try {
if (isCompleted.get()) {
if (log.isTraceEnabled()) {
log.trace("连接已完成,跳过处理token");
}
return;
}
// 验证token是否有效
if (token == null || token.isEmpty()) {
log.debug("ReAct Agent接收到空的token,跳过处理");
return;
}
// 将token添加到完整内容中(原子引用安全更新)
String currentFullText = fullTextRef.get();
fullTextRef.set(currentFullText + token);
// 将token添加到缓冲区
tokenBuffer.append(token);
// 检查是否需要刷新缓冲区
// 优化逻辑:根据内容类型和长度动态调整刷新策略
long currentTime = System.currentTimeMillis();
boolean shouldFlush = false;
// 如果缓冲区达到批量大小阈值,立即刷新
if (tokenBuffer.length() >= BATCH_SIZE) {
shouldFlush = true;
}
// 如果距离上次刷新时间超过间隔阈值,也需要刷新
else if ((currentTime - lastFlushTime.get()) >= FLUSH_INTERVAL) {
shouldFlush = true;
}
// 对于较短的内容,即使未达到阈值也适时刷新以提高响应速度
else if (tokenBuffer.length() > 0 && fullTextRef.get().length() < 100 && (currentTime - lastFlushTime.get()) >= FLUSH_INTERVAL / 2) {
shouldFlush = true;
}
if (shouldFlush && tokenBuffer.length() > 0) {
// 发送批量token事件
String batchTokens = tokenBuffer.toString();
java.util.Map<String, Object> data = createBatchTokenEventData(batchTokens);
try {
sseEventManager.sendEvent(emitter, "token", data, isCompleted);
// 清空缓冲区
tokenBuffer.setLength(0);
// 更新上次刷新时间
lastFlushTime.set(currentTime);
// 添加性能监控日志
if (log.isDebugEnabled()) {
log.debug("发送批量token事件,大小: {}字符, 总内容长度: {}字符",
batchTokens.length(), fullTextRef.get().length());
}
} catch (Exception e) {
if (isCompleted.get()) {
if (log.isTraceEnabled()) {
log.trace("ReAct Agent连接已完成,无法发送SSE token事件: {}", e.getMessage());
}
} else {
log.debug("ReAct Agent发送SSE token事件失败: {}", e.getMessage());
}
}
}
} catch (Exception e) {
log.error("ReAct Agent处理token时发生错误", e);
if (!isCompleted.getAndSet(true)) {
try {
sseEventManager.sendError(emitter, "处理响应时发生错误: " + e.getMessage());
} catch (Exception ignored) {
log.debug("无法发送错误信息");
}
}
}
}
@Override
public void onComplete(String fullContent) {
// ReAct处理完成时的回调 - 防止onComplete重复执行
if (!isCompleted.getAndSet(true)) {
log.info("ReAct Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
// 刷新剩余的缓冲区内容
if (tokenBuffer.length() > 0) {
try {
String remainingTokens = tokenBuffer.toString();
java.util.Map<String, Object> data = createBatchTokenEventData(remainingTokens);
sseEventManager.sendEvent(emitter, "token", data, isCompleted);
tokenBuffer.setLength(0);
if (log.isDebugEnabled()) {
log.debug("刷新剩余缓冲区内容,大小: {}字符", remainingTokens.length());
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ReAct Agent发送剩余SSE token事件失败: {}", e.getMessage());
}
}
}
// 保存完整内容到原子引用
fullTextRef.set(fullContent != null ? fullContent : "");
// 保存对话记录并发送完成事件到前端
try {
saveDialogueAndSendCompleteEvent.execute(agent, request, userId, fullTextRef.get(), emitter, isCompleted, sseEventManager);
} catch (Exception e) {
log.error("保存对话记录或发送完成事件失败: {}", e.getMessage());
try {
sseEventManager.sendError(emitter, "保存对话记录失败: " + e.getMessage());
} catch (Exception ignored) {
if (log.isDebugEnabled()) {
log.debug("无法发送错误信息");
}
}
}
}
}
});
// 不需要额外的等待和重复检查,onComplete回调会处理所有完成逻辑
// 这里只需要等待足够的时间让异步的onComplete回调执行完成
try {
// 通过轮询检查是否已完成,最多等待5秒
long maxWaitTime = 5000;
long startTime = System.currentTimeMillis();
while (!isCompleted.get() && (System.currentTimeMillis() - startTime) < maxWaitTime) {
Thread.sleep(100); // 每100ms检查一次
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 如果在超时后仍未完成,发送超时错误(仅作为最后手段)
if (!isCompleted.get()) {
log.warn("ReAct Agent流式处理超时,isCompleted未被设置为true");
try {
sseEventManager.sendError(emitter, "处理超时:未能及时完成处理");
} catch (Exception ignored) {
if (log.isDebugEnabled()) {
log.debug("无法发送超时错误信息");
}
}
}
// 关闭连接
sseEventManager.completeEmitter(emitter, isCompleted);
} catch (Exception e) {
log.error("ReAct Agent流式处理失败: {}", e.getMessage(), e);
// 检查是否是客户端断开连接导致的异常
if (e instanceof java.io.IOException || (e.getCause() instanceof java.io.IOException)) {
if (log.isDebugEnabled()) {
log.debug("客户端连接已断开");
}
} else {
// 尝试发送错误信息
if (!isCompleted.getAndSet(true)) {
try {
sseEventManager.sendError(emitter, "处理请求时发生错误: " + e.getMessage());
} catch (Exception sendError) {
if (log.isDebugEnabled()) {
log.debug("无法发送错误信息", sendError);
}
}
}
}
sseEventManager.completeEmitter(emitter, isCompleted);
}
}
/**
* 处理普通 Agent 流式请求(带 SSE)
*/
public void processNormalAgentStreamWithSse(AgentRequest request, Agent agent, String userId,
SseEmitter emitter, SseEventManager sseEventManager,
CreateTokenEventDataFunction createTokenEventData,
SaveDialogueAndSendCompleteEventFunction saveDialogueAndSendCompleteEvent) {
// 是否已完成标记
java.util.concurrent.atomic.AtomicBoolean isCompleted = new java.util.concurrent.atomic.AtomicBoolean(false);
try {
log.info("使用普通Agent处理流式请求");
// 启动心跳保活机制
sseEventManager.startHeartbeat(emitter, new java.util.concurrent.atomic.AtomicBoolean(false));
// 用于累积token的缓冲区
StringBuilder tokenBuffer = new StringBuilder();
// 批量大小阈值 - 适合中文流式输出,提高传输效率
int BATCH_SIZE = 50; // 增加批量大小到50个字符,减少SSE事件频率
// 时间间隔阈值(毫秒)- 保证响应及时性
long FLUSH_INTERVAL = 200; // 200ms刷新间隔,平衡效率与响应速度
// 上次刷新时间
java.util.concurrent.atomic.AtomicLong lastFlushTime = new java.util.concurrent.atomic.AtomicLong(System.currentTimeMillis());
// 完整内容 - 使用对象包装以支持线程安全的最终赋值
final java.util.concurrent.atomic.AtomicReference<String> fullTextRef = new java.util.concurrent.atomic.AtomicReference<>("");
// 使用普通Agent流式处理请求
processNormalAgentStream(request, agent, userId, new TokenConsumerWithCompletion() {
@Override
public void accept(String token) {
try {
if (isCompleted.get()) {
if (log.isTraceEnabled()) {
log.trace("连接已完成,跳过处理token");
}
return;
}
// 验证token是否有效
if (token == null || token.isEmpty()) {
return;
}
// 将token添加到完整内容中
String currentFullText = fullTextRef.get();
fullTextRef.set(currentFullText + token);
// 将token添加到缓冲区
tokenBuffer.append(token);
// 检查是否需要刷新缓冲区
long currentTime = System.currentTimeMillis();
boolean shouldFlush = false;
// 如果缓冲区达到批量大小阈值,立即刷新
if (tokenBuffer.length() >= BATCH_SIZE) {
shouldFlush = true;
}
// 如果距离上次刷新时间超过间隔阈值,也需要刷新
else if ((currentTime - lastFlushTime.get()) >= FLUSH_INTERVAL) {
shouldFlush = true;
}
if (shouldFlush && tokenBuffer.length() > 0) {
// 发送批量token事件
String batchTokens = tokenBuffer.toString();
java.util.Map<String, Object> data = createBatchTokenEventData(batchTokens);
try {
sseEventManager.sendEvent(emitter, "token", data, isCompleted);
// 清空缓冲区
tokenBuffer.setLength(0);
// 更新上次刷新时间
lastFlushTime.set(currentTime);
if (log.isDebugEnabled()) {
log.debug("发送批量token事件,大小: {}字符, 总内容长度: {}字符",
batchTokens.length(), fullTextRef.get().length());
}
} catch (Exception e) {
if (isCompleted.get()) {
if (log.isTraceEnabled()) {
log.trace("普通 Agent连接已完成,无法发送token事件: {}", e.getMessage());
}
} else {
if (log.isDebugEnabled()) {
log.debug("普通 Agent发送token事件失败: {}", e.getMessage());
}
}
}
}
} catch (Exception e) {
log.error("普通 Agent处理token时发生错误", e);
if (!isCompleted.getAndSet(true)) {
try {
sseEventManager.sendError(emitter, "处理响应时发生错误: " + e.getMessage());
} catch (Exception ignored) {
if (log.isDebugEnabled()) {
log.debug("无法发送错误信息");
}
}
}
}
}
@Override
public void onComplete(String fullContent) {
// 普通Agent处理完成时的回调
if (isCompleted.get()) {
return; // 已完成,防止重复处理
}
log.info("普通Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
// 刷新剩余的缓冲区内容
if (tokenBuffer.length() > 0) {
try {
String remainingTokens = tokenBuffer.toString();
java.util.Map<String, Object> data = createBatchTokenEventData(remainingTokens);
sseEventManager.sendEvent(emitter, "token", data, isCompleted);
tokenBuffer.setLength(0);
if (log.isDebugEnabled()) {
log.debug("刷新剩余缓冲区内容,大小: {}字符", remainingTokens.length());
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("普通Agent发送剩余token事件失败: {}", e.getMessage());
}
}
}
// 保存完整内容到原子引用
fullTextRef.set(fullContent != null ? fullContent : "");
// 发送完成事件(只在这里发送一次)
try {
// 一定要向前端发送完整的fullText
saveDialogueAndSendCompleteEvent.execute(agent, request, userId, fullTextRef.get(), emitter, isCompleted, sseEventManager);
} catch (Exception e) {
log.error("保存对话记录或发送完成事件失败: {}", e.getMessage());
if (!isCompleted.getAndSet(true)) {
try {
sseEventManager.sendError(emitter, "保存对话记录失败: " + e.getMessage());
} catch (Exception ignored) {
if (log.isDebugEnabled()) {
log.debug("无法发送错误信息");
}
}
}
}
}
});
} catch (Exception e) {
log.error("普通Agent流式处理失败", e);
// 发送错误信息给客户端
if (!isCompleted.getAndSet(true)) {
try {
sseEventManager.sendError(emitter, "处理请求时发生错误: " + e.getMessage());
} catch (Exception sendError) {
if (log.isDebugEnabled()) {
log.debug("发送错误信息失败", sendError);
}
}
}
}
}
/**
* 创建token事件数据函数式接口
*/
@FunctionalInterface
public interface CreateTokenEventDataFunction {
java.util.Map<String, Object> execute(String token, String fullText);
}
/**
* 保存对话记录并发送完成事件函数式接口
*/
@FunctionalInterface
public interface SaveDialogueAndSendCompleteEventFunction {
void execute(Agent agent, AgentRequest request, String userId,
String responseContent, SseEmitter emitter, java.util.concurrent.atomic.AtomicBoolean isCompleted, SseEventManager sseEventManager) throws java.io.IOException;
}
/**
* Token消费者接口,支持完成回调
*/
public interface TokenConsumerWithCompletion extends java.util.function.Consumer<String> {
/**
* 当流式处理完成时调用
* @param fullContent 完整的内容
*/
default void onComplete(String fullContent) {
// 默认实现为空
}
}
}
\ No newline at end of file
package pangea.hiagent.agent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import pangea.hiagent.model.Agent;
import pangea.hiagent.service.AgentService;
import pangea.hiagent.dto.WorkPanelEvent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.tool.DefaultReactExecutor;
import pangea.hiagent.tool.ReactCallback;
import pangea.hiagent.tool.ReactExecutor;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* 基于Spring AI ChatClient的ReAct Service类
* 负责实现ReAct Agent的核心逻辑,使用Spring AI的标准工具调用机制
* 集成工作面板数据收集器,捕获思考、行动、观察、结束等步骤
*/
@Slf4j
@Service
public class ReActService {
@Autowired
private AgentService agentService;
@Autowired
private RagService ragService;
// 注入所有带@Component注解的工具类
@Autowired
@Lazy
private List<Object> allTools;
@Autowired
private IWorkPanelDataCollector workPanelCollector;
@Autowired
private MemoryService memoryService;
@Autowired
private ReactCallback defaultReactCallback;
@Autowired
private ReactExecutor defaultReactExecutor;
/**
* 根据工具名称筛选工具实例
* @param toolNames 工具名称集合
* @return 筛选后的工具实例列表
*/
private List<Object> filterToolsByNames(Set<String> toolNames) {
if (toolNames == null || toolNames.isEmpty()) {
return allTools;
}
return allTools.stream()
.filter(tool -> {
// 获取工具类名(不含包名)
String className = tool.getClass().getSimpleName();
// 检查类名是否匹配
return toolNames.contains(className) ||
toolNames.stream().anyMatch(name ->
className.toLowerCase().contains(name.toLowerCase()));
})
.collect(Collectors.toList());
}
/**
* 处理用户请求的主方法(同步方式)
*
* @param agent Agent对象
* @param userMessage 用户消息
* @return 处理结果
*/
public String processRequest(Agent agent, String userMessage) {
return processRequestWithUserId(agent, userMessage, null);
}
/**
* 处理用户请求的主方法(同步方式)- 支持显式传递userId
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param userId 用户ID(可选)
* @return 处理结果
*/
public String processRequestWithUserId(Agent agent, String userMessage, String userId) {
log.info("开始处理ReAct Agent请求,Agent ID: {}, 用户消息: {}", agent.getId(), userMessage);
try {
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = memoryService.generateSessionId(agent, userId);
// 添加用户消息到ChatMemory
memoryService.addUserMessageToMemory(sessionId, userMessage);
// 检查是否启用RAG并尝试RAG增强
String ragResponse = tryRagEnhancement(agent, userMessage);
if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return ragResponse;
}
// 准备执行环境
ChatClient client = prepareChatClient(agent);
List<Object> tools = prepareTools(agent);
// 添加自定义回调到ReAct执行器
addReactCallbackIfNeeded();
// 使用ReAct执行器执行流程,传递Agent对象以支持记忆功能
String finalAnswer = executeReactProcess(client, userMessage, tools, agent);
// 将助理回复添加到ChatMemory
memoryService.addAssistantMessageToMemory(sessionId, finalAnswer);
return finalAnswer;
} catch (Exception e) {
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return " 请配置API密钥";
} else {
log.error("处理ReAct请求时发生错误", e);
return "处理请求时发生错误: " + e.getMessage();
}
}
}
/**
* 准备ChatClient
* @param agent Agent对象
* @return ChatClient实例
*/
private ChatClient prepareChatClient(Agent agent) {
// 根据Agent配置获取对应的ChatModel
ChatModel chatModel = agentService.getChatModelForAgent(agent);
log.info("获取ChatModel成功: {}", chatModel.getClass().getName());
// 使用获取的ChatModel构建ChatClient
return ChatClient.builder(chatModel).build();
}
/**
* 准备工具列表
* @param agent Agent对象
* @return 工具列表
*/
private List<Object> prepareTools(Agent agent) {
// 获取Agent配置的工具名称集合
Set<String> toolNames = agent.getToolNameSet();
// 根据工具名称筛选工具实例
return filterToolsByNames(toolNames);
}
/**
* 添加ReAct回调(如果需要)
*/
private void addReactCallbackIfNeeded() {
if (defaultReactExecutor != null && defaultReactCallback != null) {
defaultReactExecutor.addReactCallback(defaultReactCallback);
}
}
/**
* 执行ReAct流程
* @param client ChatClient实例
* @param userMessage 用户消息
* @param tools 工具列表
* @param agent Agent对象
* @return 最终答案
*/
private String executeReactProcess(ChatClient client, String userMessage, List<Object> tools, Agent agent) {
if (defaultReactExecutor instanceof DefaultReactExecutor) {
return ((DefaultReactExecutor) defaultReactExecutor).executeWithAgent(client, userMessage, tools, agent);
} else {
return defaultReactExecutor.execute(client, userMessage, tools);
}
}
/**
* 尝试RAG增强
* @param agent Agent对象
* @param userMessage 用户消息
* @return RAG增强结果,如果没有启用或没有结果则返回null
*/
private String tryRagEnhancement(Agent agent, String userMessage) {
if (Boolean.TRUE.equals(agent.getEnableRag())) {
log.info("Agent启用RAG功能,尝试RAG增强");
try {
String ragResponse = ragService.ragQa(agent, userMessage);
// 如果RAG有结果,直接返回
if (ragResponse != null && !ragResponse.isEmpty()) {
log.info("RAG增强返回结果");
return saveRagResponseToMemory(agent, ragResponse);
}
} catch (Exception e) {
log.error("RAG增强过程中发生错误", e);
// RAG失败不应该影响正常的ReAct流程,继续执行
}
}
return null;
}
/**
* 保存响应到ChatMemory
* @param agent Agent对象
* @param response 响应内容
* @return 响应内容
*/
private String saveResponseToMemory(Agent agent, String response) {
try {
String sessionId = memoryService.generateSessionId(agent);
// 将响应添加到ChatMemory
memoryService.addAssistantMessageToMemory(sessionId, response);
return response;
} catch (Exception e) {
log.error("保存响应到内存时发生错误", e);
return response; // 即使保存失败也返回响应
}
}
/**
* 保存RAG响应到ChatMemory (已废弃,使用saveResponseToMemory替代)
* @param agent Agent对象
* @param ragResponse RAG响应
* @return RAG响应
*/
@Deprecated
private String saveRagResponseToMemory(Agent agent, String ragResponse) {
return saveResponseToMemory(agent, ragResponse);
}
/**
* 处理流式错误
*/
private void handleStreamError(AtomicBoolean isCompleted, Consumer<String> tokenConsumer, String errorMessage) {
// 确保只处理一次错误
if (!isCompleted.getAndSet(true) && tokenConsumer != null) {
try {
// 记录详细错误日志
log.error("流式处理错误: {}", errorMessage);
// 同时将错误信息记录到工作面板
if (workPanelCollector != null) {
try {
workPanelCollector.recordLog("流式处理错误: " + errorMessage, "error");
} catch (Exception e) {
log.debug("记录错误到工作面板失败: {}", e.getMessage());
}
}
// 发送错误信息给客户端
tokenConsumer.accept("[ERROR] " + errorMessage);
} catch (Exception e) {
log.error("发送错误消息时发生异常", e);
}
}
}
/**
* 获取流式模型
*/
private StreamingChatModel getStreamingChatModel(Agent agent) {
try {
ChatModel chatModel = agentService.getChatModelForAgent(agent);
if (!(chatModel instanceof StreamingChatModel)) {
log.warn("模型不支持流式输出: {}", chatModel.getClass().getName());
return null;
}
return (StreamingChatModel) chatModel;
} catch (Exception e) {
log.error("获取流式模型失败: {}", e.getMessage(), e);
return null;
}
}
/**
* 流式处理ReAct Agent请求
*
* 优化后的实现采用更直接的流式处理方式,确保与普通Agent流式处理保持一致的行为
* 核心优化点:
* 1. 简化了Consumer包装逻辑,减少不必要的复杂性
* 2. 统一了onComplete回调机制,确保前端能正确接收到完整内容
* 3. 增强了错误处理机制,提供更清晰的错误信息
* 4. 支持对话历史记忆功能
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param tokenConsumer token处理回调函数(前端的TokenConsumerWithCompletion实现)
*/
public void processRequestStream(Agent agent, String userMessage, Consumer<String> tokenConsumer) {
processRequestStreamWithUserId(agent, userMessage, tokenConsumer, null);
}
/**
* 流式处理ReAct Agent请求 - 支持显式传递userId
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param tokenConsumer token处理回调函数(前端的TokenConsumerWithCompletion实现)
* @param userId 用户ID(可选)
*/
public void processRequestStreamWithUserId(Agent agent, String userMessage, Consumer<String> tokenConsumer, String userId) {
AtomicBoolean isCompleted = new AtomicBoolean(false);
try {
log.info("开始流式处理ReAct Agent请求,Agent ID: {}, 用户消息: {}", agent.getId(), userMessage);
// 检查用户消息是否为空
if (userMessage == null || userMessage.trim().isEmpty()) {
String errorMsg = "用户消息不能为空";
log.error(errorMsg);
handleStreamError(isCompleted, tokenConsumer, errorMsg);
return;
}
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = memoryService.generateSessionId(agent, userId);
// 添加用户消息到ChatMemory
memoryService.addUserMessageToMemory(sessionId, userMessage);
// 初始化工作面板
initializeWorkPanel(agent);
// 获取流式模型
StreamingChatModel streamingChatModel = getStreamingChatModel(agent);
if (streamingChatModel == null) {
String errorMsg = "当前模型不支持流式输出,请检查Agent配置";
log.error(errorMsg);
handleStreamError(isCompleted, tokenConsumer, errorMsg);
return;
}
// 准备执行环境
ChatClient client = ChatClient.builder((ChatModel) streamingChatModel).build();
List<Object> tools = prepareTools(agent);
// 添加自定义回调到ReAct执行器
addReactCallbackIfNeeded();
// 直接传递tokenConsumer给ReAct执行器,简化处理逻辑
// ReAct执行器会负责处理token和onComplete回调
// 传递Agent对象以支持记忆功能
executeReactStreamProcess(client, userMessage, tools, tokenConsumer, agent);
log.debug("流式执行完成");
} catch (Exception e) {
String errorMsg = "流式处理ReAct请求时发生错误: " + e.getMessage();
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
errorMsg = " 请配置API密钥";
} else {
log.error(errorMsg, e);
}
handleStreamError(isCompleted, tokenConsumer, errorMsg);
}
}
/**
* 执行ReAct流式流程
* @param client ChatClient实例
* @param userMessage 用户消息
* @param tools 工具列表
* @param tokenConsumer token处理回调函数
* @param agent Agent对象
*/
private void executeReactStreamProcess(ChatClient client, String userMessage, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) {
if (defaultReactExecutor instanceof DefaultReactExecutor) {
((DefaultReactExecutor) defaultReactExecutor).executeStreamWithAgent(
client, userMessage, tools, tokenConsumer, agent);
} else {
defaultReactExecutor.executeStream(client, userMessage, tools, tokenConsumer);
}
}
/**
* 初始化工作面板
* @param agent Agent对象
*/
private void initializeWorkPanel(Agent agent) {
if (workPanelCollector != null) {
try {
workPanelCollector.clear();
workPanelCollector.recordLog("开始处理Agent请求: " + agent.getName(), "info");
} catch (Exception e) {
log.error("初始化工作面板时发生错误", e);
}
}
}
/**
* 为流式处理设置工作面板数据收集器
*/
public void setWorkPanelEventSubscriber(Consumer<WorkPanelEvent> consumer) {
// 订阅工作面板事件,用于实时推送
if (workPanelCollector != null && consumer != null) {
workPanelCollector.subscribe(consumer);
}
}
/**
* 获取工作面板数据收集器
*/
public IWorkPanelDataCollector getWorkPanelCollector() {
return workPanelCollector;
}
/**
* 判断异常是否为401未授权错误
* @param e 异常对象
* @return 是否为401错误
*/
private boolean isUnauthorizedError(Throwable e) {
if (e == null) {
return false;
}
// 检查异常消息中是否包含401 Unauthorized
String message = e.getMessage();
if (message != null && (message.contains("401 Unauthorized") || message.contains("Unauthorized"))) {
return true;
}
// 递归检查cause
return isUnauthorizedError(e.getCause());
}
}
\ No newline at end of file
package pangea.hiagent.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import pangea.hiagent.dto.ApiResponse;
import pangea.hiagent.dto.PageData;
import pangea.hiagent.model.LlmConfig;
import pangea.hiagent.service.LlmConfigService;
import pangea.hiagent.llm.LlmModelFactory;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import java.util.List;
import java.util.stream.Collectors;
/**
* LLM配置API控制器
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/llm-config")
public class LlmConfigController {
private final LlmConfigService llmConfigService;
// 注入LlmModelFactory用于测试配置
private final LlmModelFactory llmModelFactory;
public LlmConfigController(LlmConfigService llmConfigService, LlmModelFactory llmModelFactory) {
this.llmConfigService = llmConfigService;
this.llmModelFactory = llmModelFactory;
}
/**
* 创建LLM配置
*/
@PostMapping
public ApiResponse<LlmConfig> createLlmConfig(@RequestBody LlmConfig config) {
try {
LlmConfig created = llmConfigService.createLlmConfig(config);
return ApiResponse.success(created, "创建LLM配置成功");
} catch (Exception e) {
log.error("创建LLM配置失败", e);
return ApiResponse.error(5001, "创建LLM配置失败: " + e.getMessage());
}
}
/**
* 更新LLM配置
*/
@PutMapping("/{id}")
public ApiResponse<LlmConfig> updateLlmConfig(@PathVariable String id, @RequestBody LlmConfig config) {
try {
config.setId(id);
LlmConfig updated = llmConfigService.updateLlmConfig(config);
return ApiResponse.success(updated, "更新LLM配置成功");
} catch (Exception e) {
log.error("更新LLM配置失败", e);
return ApiResponse.error(5001, "更新LLM配置失败: " + e.getMessage());
}
}
/**
* 删除LLM配置
*/
@DeleteMapping("/{id}")
public ApiResponse<Void> deleteLlmConfig(@PathVariable String id) {
try {
llmConfigService.deleteLlmConfig(id);
return ApiResponse.success(null, "删除LLM配置成功");
} catch (Exception e) {
log.error("删除LLM配置失败", e);
return ApiResponse.error(5001, "删除LLM配置失败: " + e.getMessage());
}
}
/**
* 获取LLM配置详情
*/
@GetMapping("/{id}")
public ApiResponse<LlmConfig> getLlmConfig(@PathVariable String id) {
try {
LlmConfig config = llmConfigService.getLlmConfig(id);
if (config == null) {
return ApiResponse.error(4001, "LLM配置不存在");
}
return ApiResponse.success(config);
} catch (Exception e) {
log.error("获取LLM配置详情失败", e);
return ApiResponse.error(5001, "获取LLM配置详情失败: " + e.getMessage());
}
}
/**
* 分页获取LLM配置列表
*/
@GetMapping("/list")
public ApiResponse<PageData<LlmConfig>> listLlmConfigs(
@RequestParam(defaultValue = "1") Long current,
@RequestParam(defaultValue = "10") Long size,
@RequestParam(required = false) String name,
@RequestParam(required = false) String provider) {
try {
IPage<LlmConfig> page = llmConfigService.pageLlmConfigs(current, size, name, provider);
return ApiResponse.success(PageData.from(page));
} catch (Exception e) {
log.error("获取LLM配置列表失败", e);
return ApiResponse.error(5001, "获取LLM配置列表失败: " + e.getMessage());
}
}
/**
* 获取启用的LLM配置列表
*/
@GetMapping("/enabled")
public ApiResponse<PageData<LlmConfig>> getEnabledLlmConfigs() {
try {
// 获取所有启用的配置
var configs = llmConfigService.getEnabledLlmConfigs();
// 转换为分页数据格式(这里简化处理,实际应该分页)
PageData<LlmConfig> pageData = new PageData<>();
pageData.setRecords(configs);
pageData.setTotal((long) configs.size());
return ApiResponse.success(pageData);
} catch (Exception e) {
log.error("获取启用的LLM配置列表失败", e);
return ApiResponse.error(5001, "获取启用的LLM配置列表失败: " + e.getMessage());
}
}
/**
* 测试LLM配置是否有效
*/
@PostMapping("/{id}/test")
public ApiResponse<String> testLlmConfig(@PathVariable String id) {
try {
log.info("开始测试LLM配置,ID: {}", id);
// 获取LLM配置
LlmConfig config = llmConfigService.getLlmConfig(id);
if (config == null) {
log.warn("LLM配置不存在,ID: {}", id);
return ApiResponse.error(4001, "LLM配置不存在");
}
log.info("获取到LLM配置: {}", config);
if (!config.getEnabled()) {
log.warn("LLM配置未启用,ID: {}", id);
return ApiResponse.error(4001, "LLM配置未启用");
}
// 使用LlmModelFactory创建ChatModel实例进行测试
log.info("开始创建ChatModel实例");
ChatModel chatModel = llmModelFactory.createChatModel(config);
log.info("成功创建ChatModel实例");
// 创建简单的测试提示词
PromptTemplate promptTemplate = new PromptTemplate("请用中文回答:你好世界");
Prompt prompt = promptTemplate.create();
// 尝试调用模型
log.info("开始调用模型");
org.springframework.ai.chat.model.ChatResponse response = chatModel.call(prompt);
String result = response.getResult().getOutput().toString();
log.info("模型调用成功,结果: {}", result);
return ApiResponse.success("测试成功:" + result, "LLM配置测试成功");
} catch (Exception e) {
log.error("测试LLM配置失败", e);
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return ApiResponse.error(5001, " 请配置API密钥");
} else {
return ApiResponse.error(5001, "测试LLM配置失败: " + e.getMessage());
}
}
}
/**
* 获取所有可用的LLM提供商名称
*/
@GetMapping("/providers")
public ApiResponse<List<String>> getAvailableProviders() {
try {
List<String> providers = llmModelFactory.getModelAdapterManager().getAdapters().keySet().stream()
.map(String::toUpperCase)
.sorted()
.collect(Collectors.toList());
return ApiResponse.success(providers);
} catch (Exception e) {
log.error("获取LLM提供商列表失败", e);
return ApiResponse.error(5001, "获取LLM提供商列表失败: " + e.getMessage());
}
}
/**
* 判断异常是否为401未授权错误
* @param e 异常对象
* @return 是否为401错误
*/
private boolean isUnauthorizedError(Throwable e) {
if (e == null) {
return false;
}
// 检查异常消息中是否包含401 Unauthorized
String message = e.getMessage();
if (message != null && (message.contains("401 Unauthorized") || message.contains("Unauthorized"))) {
return true;
}
// 递归检查cause
return isUnauthorizedError(e.getCause());
}
}
\ No newline at end of file
package pangea.hiagent.tool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.messages.*;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.context.annotation.Lazy;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
import pangea.hiagent.agent.AgentChatService;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.model.Agent;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
* 重构后的默认ReAct执行器实现 - 支持真正的流式输出和完整的ReAct流程可观测性
*
* 核心设计:
* 1. 使用真正的流式处理机制,逐步返回结果给客户端
* 2. 提供完整的ReAct流程监控和事件通知
* 3. 实现异步处理以提高性能
* 4. 完善的异常处理和资源管理
* 5. 支持对话历史记忆功能
*/
@Slf4j
@Service
public class DefaultReactExecutor implements ReactExecutor {
private final List<ReactCallback> reactCallbacks = new ArrayList<>();
private final AtomicInteger stepCounter = new AtomicInteger(0);
@Autowired
@Lazy
private IWorkPanelDataCollector workPanelCollector;
@Autowired
private MemoryService memoryService;
/**
* 添加ReAct回调
* @param callback ReAct回调
*/
@Override
public void addReactCallback(ReactCallback callback) {
if (callback != null) {
reactCallbacks.add(callback);
}
}
/**
* 执行ReAct流程(同步方式)
* @param chatClient ChatClient实例
* @param userInput 用户输入
* @param tools 工具列表
* @return 最终答案
*/
@Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools) {
return executeWithAgent(chatClient, userInput, tools, null);
}
/**
* 执行ReAct流程(同步方式)- 支持Agent配置
* @param chatClient ChatClient实例
* @param userInput 用户输入
* @param tools 工具列表
* @param agent Agent对象(可选)
* @return 最终答案
*/
public String executeWithAgent(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) {
log.info("开始执行ReAct流程,用户输入: {}", userInput);
// 重置步骤计数器
stepCounter.set(0);
try {
// 触发思考步骤
triggerThinkStep("开始处理用户请求: " + userInput);
// 构建系统提示词
String systemPrompt = "You are a helpful AI assistant that can use tools to answer questions. " +
"Use the available tools when needed to gather information. " +
"Think step by step and show your reasoning process. " +
"When using tools, clearly indicate your thoughts, actions, and observations.";
// 构建Prompt,包含历史对话记录
Prompt prompt = buildPromptWithHistory(systemPrompt, userInput, agent);
// 使用call()获取完整的LLM响应
// 这会阻塞直到完成整个ReAct循环(思考→行動→观察→...→最终答案)
log.info("使用call()方法处理ReAct流程,确保完整的工具调用循环");
ChatResponse response = chatClient.prompt(prompt)
.tools(tools.toArray())
.call()
.chatResponse();
// 获取响应文本
String responseText = response.getResult().getOutput().getText();
// 触发观察步骤
triggerObservationStep(responseText);
// 返回最终结果
log.info("最终答案: {}", responseText);
// 触发最终答案步骤
triggerFinalAnswerStep(responseText);
return responseText;
} catch (Exception e) {
log.error("执行ReAct流程时发生错误", e);
return "处理请求时发生错误: " + e.getMessage();
}
}
/**
* 构建包含历史对话记录的Prompt
* @param systemPrompt 系统提示词
* @param userInput 用户输入
* @param agent Agent对象(可选)
* @return 构建的Prompt
*/
private Prompt buildPromptWithHistory(String systemPrompt, String userInput, Agent agent) {
List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>();
// 添加系统消息
messages.add(new SystemMessage(systemPrompt));
// 如果提供了Agent,添加历史对话记录
if (agent != null) {
try {
// 生成会话ID
String sessionId = memoryService.generateSessionId(agent);
// 获取历史记录长度配置,默认为10
int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10;
// 获取历史消息
List<org.springframework.ai.chat.messages.Message> historyMessages =
memoryService.getHistoryMessages(sessionId, historyLength);
// 添加历史消息到Prompt
messages.addAll(historyMessages);
} catch (Exception e) {
log.warn("获取历史对话记录时发生错误: {}", e.getMessage());
}
}
// 添加当前用户消息
messages.add(new UserMessage(userInput));
return new Prompt(messages);
}
/**
* 流式执行ReAct流程 - 使用真正的流式处理机制
*
* 核心改进点:
* 1. 使用stream()方法实现真正的流式处理
* 2. 逐步返回结果给客户端,实现实时数据传输
* 3. 完整的ReAct流程监控
* 4. 增强的错误处理机制
* 5. 异步处理提高性能
* 6. 正确捕获和记录工具调用事件
* 7. 支持对话历史记忆功能
*
* @param chatClient ChatClient实例
* @param userInput 用户输入
* @param tools 工具列表
* @param tokenConsumer token处理回调函数
*/
@Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer) {
executeStreamWithAgent(chatClient, userInput, tools, tokenConsumer, null);
}
/**
* 流式执行ReAct流程 - 使用真正的流式处理机制(支持Agent配置)
*
* @param chatClient ChatClient实例
* @param userInput 用户输入
* @param tools 工具列表
* @param tokenConsumer token处理回调函数
* @param agent Agent对象(可选)
*/
public void executeStreamWithAgent(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) {
log.info("使用stream()方法处理ReAct流程,支持真正的流式输出");
// 重置步骤计数器
stepCounter.set(0);
// 使用StringBuilder累积完整响应
StringBuilder fullResponse = new StringBuilder();
try {
// 触发思考步骤
triggerThinkStep("开始处理用户请求: " + userInput);
// 构建系统提示词
String systemPrompt = "You are a helpful AI assistant that can use tools to answer questions. " +
"Use the available tools when needed to gather information. " +
"Think step by step and show your reasoning process. " +
"When using tools, clearly indicate your thoughts, actions, and observations.";
// 构建Prompt,包含历史对话记录
Prompt prompt = buildPromptWithHistory(systemPrompt, userInput, agent);
// 订阅流式响应
chatClient.prompt(prompt)
.tools(tools.toArray())
.stream()
.chatResponse()
.subscribe(
chatResponse -> {
try {
// 获取token
String token = chatResponse.getResult().getOutput().getText();
// 验证token是否有效
if (isValidToken(token)) {
// 累积完整响应
fullResponse.append(token);
// 分析token内容,识别工具调用和结果
analyzeAndRecordToolEvents(token, fullResponse.toString());
// 实时发送token给客户端
if (tokenConsumer != null) {
tokenConsumer.accept(token);
}
// 记录思考过程
processTokenForSteps(token);
}
} catch (Exception e) {
log.error("处理token时发生错误", e);
}
},
throwable -> {
log.error("流式处理出错", throwable);
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(throwable)) {
log.error("LLM返回401未授权错误: {}", throwable.getMessage());
sendErrorToConsumer(tokenConsumer, " 请配置API密钥");
} else {
recordStreamError(throwable.getMessage());
sendErrorToConsumer(tokenConsumer, throwable.getMessage());
}
},
() -> {
log.info("流式处理完成");
// 触发最终答案步骤
triggerFinalAnswerStep(fullResponse.toString());
// 发送完成事件,包含完整内容
sendCompletionEvent(tokenConsumer, fullResponse.toString());
}
);
} catch (Exception e) {
log.error("流式执行ReAct流程时发生错误", e);
recordStreamError(e.getMessage());
sendErrorToConsumer(tokenConsumer, e.getMessage());
}
}
/**
* 检查token是否有效
* @param token token字符串
* @return 是否有效
*/
private boolean isValidToken(String token) {
return token != null && !token.isEmpty();
}
/**
* 处理token以识别不同的步骤类型
* @param token token字符串
*/
private void processTokenForSteps(String token) {
if (token.contains("Thought:")) {
triggerThinkStep(token);
} else if (token.contains("Action:")) {
// 提取工具名称和参数
String toolName = extractToolName(token);
Object toolArgs = extractToolArgs(token);
triggerActionStep(toolName != null ? toolName : "unknown tool", token, toolArgs);
} else if (token.contains("Observation:")) {
triggerObservationStep(token);
}
}
/**
* 向消费者发送错误信息
* @param tokenConsumer token消费者
* @param errorMessage 错误信息
*/
private void sendErrorToConsumer(Consumer<String> tokenConsumer, String errorMessage) {
if (tokenConsumer != null) {
tokenConsumer.accept("[ERROR] " + errorMessage);
}
}
/**
* 发送完成事件
* @param tokenConsumer token消费者
* @param fullResponse 完整响应
*/
private void sendCompletionEvent(Consumer<String> tokenConsumer, String fullResponse) {
if (tokenConsumer instanceof AgentChatService.TokenConsumerWithCompletion) {
log.debug("调用onComplete,内容长度: {}", fullResponse.length());
((AgentChatService.TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
} else if (tokenConsumer != null) {
log.warn("tokenConsumer不是TokenConsumerWithCompletion实例");
tokenConsumer.accept("");
}
}
/**
* 分析token内容,识别工具调用和结果
* 这个方法通过分析响应中的特殊标记来识别工具调用
*/
private void analyzeAndRecordToolEvents(String token, String fullResponse) {
if (!isValidToken(token) || workPanelCollector == null) {
return;
}
try {
// 检查工具调用的标记
// 通常格式为: "Tool: [工具名称]" 或 "Calling [工具名称]" 或类似的模式
if (isToolCall(token)) {
String toolName = extractToolName(token);
if (isValidToolName(toolName)) {
// 记录工具调用开始
workPanelCollector.recordToolCallStart(toolName, "execute", extractToolArgs(token));
}
}
// 检查工具结果的标记
else if (isToolResult(token)) {
String toolName = extractToolName(fullResponse);
String result = extractToolResult(token);
if (isValidToolName(toolName)) {
// 记录工具调用完成
workPanelCollector.recordToolCallComplete(toolName, result, "success");
}
}
// 检查错误标记
else if (isError(token)) {
String toolName = extractToolName(fullResponse);
if (isValidToolName(toolName)) {
// 记录工具调用错误
workPanelCollector.recordToolCallError(toolName, token);
}
}
} catch (Exception e) {
log.debug("分析工具调用事件时发生错误: {}", e.getMessage());
}
}
/**
* 检查是否为工具调用
* @param token token字符串
* @return 是否为工具调用
*/
private boolean isToolCall(String token) {
return token.contains("Tool:") || token.contains("Calling") ||
token.contains("tool:") || token.contains("calling");
}
/**
* 检查是否为工具结果
* @param token token字符串
* @return 是否为工具结果
*/
private boolean isToolResult(String token) {
return token.contains("Result:") || token.contains("result:") ||
token.contains("Output:") || token.contains("output:");
}
/**
* 检查是否为错误信息
* @param token token字符串
* @return 是否为错误信息
*/
private boolean isError(String token) {
return token.contains("Error:") || token.contains("error:") ||
token.contains("Exception:") || token.contains("exception:");
}
/**
* 检查工具名称是否有效
* @param toolName 工具名称
* @return 是否有效
*/
private boolean isValidToolName(String toolName) {
return toolName != null && !toolName.isEmpty();
}
/**
* 从token中提取工具名称
*/
private String extractToolName(String text) {
if (text == null) return null;
// 尝试从常见的工具调用格式中提取工具名称
String[] patterns = {
"Tool: (\\w+)",
"tool: (\\w+)",
"Calling (\\w+)",
"calling (\\w+)",
"Use (\\w+)",
"use (\\w+)",
"Tool\\((\\w+)\\)",
"tool\\((\\w+)\\)"
};
for (String pattern : patterns) {
java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern);
java.util.regex.Matcher m = p.matcher(text);
if (m.find()) {
return m.group(1);
}
}
return null;
}
/**
* 从token中提取工具参数
*/
private Object extractToolArgs(String text) {
if (text == null) return null;
// 简单的参数提取逻辑
// 可以根据具体的工具调用格式进行更复杂的解析
java.util.Map<String, Object> args = new java.util.HashMap<>();
// 尝试提取括号内的内容作为参数
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile("\\(([^)]*)\\)");
java.util.regex.Matcher matcher = pattern.matcher(text);
if (matcher.find()) {
args.put("params", matcher.group(1));
return args;
}
return args.isEmpty() ? null : args;
}
/**
* 从token中提取工具结果
*/
private String extractToolResult(String token) {
if (token == null) return "";
// 简单的结果提取逻辑
// 提取冒号后的内容
int colonIndex = token.lastIndexOf(':');
if (colonIndex >= 0 && colonIndex < token.length() - 1) {
return token.substring(colonIndex + 1).trim();
}
return token.trim();
}
/**
* 记录流式处理错误到工作面板
*/
private void recordStreamError(String errorMessage) {
if (workPanelCollector != null) {
try {
workPanelCollector.recordLog("流式处理错误: " + errorMessage, "error");
} catch (Exception e) {
log.debug("记录流式处理错误到工作面板失败: {}", e.getMessage());
}
}
}
/**
* 触发思考步骤
* @param content 思考内容
*/
private void triggerThinkStep(String content) {
int stepNumber = stepCounter.incrementAndGet();
ReactStep reactStep = new ReactStep(stepNumber, ReactStepType.THOUGHT, content);
notifyCallbacks(reactStep);
}
/**
* 触发行动步骤
* @param toolName 工具名称
* @param toolAction 工具行动
* @param toolArgs 工具参数
*/
private void triggerActionStep(String toolName, String toolAction, Object toolArgs) {
int stepNumber = stepCounter.incrementAndGet();
ReactStep reactStep = new ReactStep(stepNumber, ReactStepType.ACTION, "执行工具: " + toolName);
ReactStep.ToolCallAction toolActionObj = new ReactStep.ToolCallAction(toolName, toolArgs);
reactStep.setAction(toolActionObj);
notifyCallbacks(reactStep);
}
/**
* 触发观察步骤
* @param observation 观察内容
*/
private void triggerObservationStep(String observation) {
int stepNumber = stepCounter.incrementAndGet();
ReactStep reactStep = new ReactStep(stepNumber, ReactStepType.OBSERVATION, observation);
ReactStep.ToolObservation toolObservation = new ReactStep.ToolObservation(observation);
reactStep.setObservation(toolObservation);
notifyCallbacks(reactStep);
}
/**
* 触发最终答案步骤
* @param finalAnswer 最终答案
*/
private void triggerFinalAnswerStep(String finalAnswer) {
int stepNumber = stepCounter.incrementAndGet();
ReactStep reactStep = new ReactStep(stepNumber, ReactStepType.FINAL_ANSWER, finalAnswer);
notifyCallbacks(reactStep);
}
/**
* 通知所有回调
* @param reactStep ReAct步骤
*/
private void notifyCallbacks(ReactStep reactStep) {
for (ReactCallback callback : reactCallbacks) {
try {
callback.onStep(reactStep);
} catch (Exception e) {
log.error("执行ReAct回调时发生错误", e);
}
}
}
/**
* 判断异常是否为401未授权错误
* @param e 异常对象
* @return 是否为401错误
*/
private boolean isUnauthorizedError(Throwable e) {
if (e == null) {
return false;
}
// 检查异常消息中是否包含401 Unauthorized
String message = e.getMessage();
if (message != null && (message.contains("401 Unauthorized") || message.contains("Unauthorized"))) {
return true;
}
// 递归检查cause
return isUnauthorizedError(e.getCause());
}
}
\ No newline at end of file
<template>
<div class="chat-area">
<!-- 顶部Agent选择和操作栏 -->
<div class="chat-header">
<div class="agent-selector">
<el-select v-model="selectedAgent" @change="handleAgentChange" placeholder="选择智能体" class="agent-select">
<el-option v-for="agent in agents" :key="agent.id" :label="agent.name" :value="agent.id" />
</el-select>
</div>
<div class="header-actions">
<el-tooltip content="清空对话">
<el-button @click="clearMessages" :disabled="messages.length === 0" circle>
<span>🗑️</span>
</el-button>
</el-tooltip>
</div>
</div>
<!-- 消息区域 -->
<div class="messages-container" ref="messagesContainer">
<div class="empty-state" v-if="messages.length === 0">
<div class="empty-icon">💬</div>
<div class="empty-text">选择一个智能体开始对话</div>
<div class="empty-tips">
<div>✨ 智能对话 - 支持流式输出</div>
<div>🤖 ReAct模式 - 可查看思考过程</div>
<div>📚 RAG能力 - 知识库增强</div>
</div>
</div>
<message-item
v-for="(msg, index) in messages"
:key="index"
:content="msg.content"
:is-user="msg.isUser"
:agent-name="getAgentName(msg.agentId)"
:timestamp="msg.timestamp"
:is-streaming="msg.isStreaming && index === messages.length - 1"
:is-markdown="!msg.isUser"
/>
<div v-if="isLoading" class="loading-indicator">
<el-skeleton :rows="3" animated />
</div>
</div>
<!-- 输入框区域 -->
<div class="chat-input-area">
<div class="input-container">
<el-input
v-model="inputMessage"
type="textarea"
:rows="3"
placeholder="输入消息... (支持 Shift+Enter 换行,Ctrl+Enter 发送)"
@keydown.ctrl.enter="sendMessage"
@keydown.meta.enter="sendMessage"
:disabled="!selectedAgent || isLoading"
/>
</div>
<div class="input-footer">
<div class="input-tips">
<span>Ctrl/⌘ + Enter 发送</span>
</div>
<el-button
type="primary"
@click="sendMessage"
:loading="isLoading"
:disabled="!selectedAgent || !inputMessage.trim() || isLoading"
>
发送
</el-button>
</div>
</div>
</div>
</template>
<script setup lang="ts">
import { ref, nextTick, onMounted } from 'vue'
import { ElMessage, ElMessageBox } from 'element-plus'
import MessageItem from './MessageItem.vue'
import request from '@/utils/request'
interface Message {
content: string
isUser: boolean
agentId?: string
timestamp: number
isStreaming: boolean
}
interface Agent {
id: string
name: string
[key: string]: any
}
const selectedAgent = ref<string>('')
const agents = ref<Agent[]>([])
const messages = ref<Message[]>([])
const inputMessage = ref('')
const isLoading = ref(false)
const messagesContainer = ref<HTMLElement>()
// 获取Agent列表
const loadAgents = async () => {
try {
const res = await request.get('/agent')
agents.value = res.data.data || []
if (agents.value.length > 0) {
selectedAgent.value = agents.value[0].id
}
} catch (error) {
console.error('获取Agent列表失败:', error)
ElMessage.error('获取Agent列表失败')
}
}
// 获取Agent名称
const getAgentName = (agentId?: string): string => {
if (!agentId) return 'Assistant'
const agent = agents.value.find(a => a.id === agentId)
return agent?.name || 'Assistant'
}
// 处理Agent切换
const handleAgentChange = () => {
clearMessages()
}
// 清空消息
const clearMessages = () => {
ElMessageBox.confirm('确认清空所有对话吗?此操作无法撤销。', '提示', {
confirmButtonText: '清空',
cancelButtonText: '取消',
type: 'warning'
}).then(() => {
messages.value = []
ElMessage.success('对话已清空')
}).catch(() => {
// 用户取消
})
}
// 自动滚动到底部
const scrollToBottom = async () => {
await nextTick()
if (messagesContainer.value) {
messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight
}
}
// 处理SSE数据行的通用函数
const processSSELine = async (line: string, accumulatedContentRef: { value: string }, hasFinalAnswerRef: { value: boolean }, currentEventRef: { value: string }, aiMessageIndex: number) => {
if (!line.trim()) return false
if (line.startsWith('event:')) {
currentEventRef.value = line.slice(6).trim()
return false
} else if (line.startsWith('data:')) {
try {
const dataStr = line.startsWith('data: ') ? line.slice(6) : line.slice(5)
// 修复:检查dataStr是否为空或只包含空白字符
if (!dataStr.trim()) {
return false
}
const data = JSON.parse(dataStr)
const eventType = currentEventRef.value || data.type
// 根据事件类型处理数据
if (eventType === 'token') {
accumulatedContentRef.value += data.token || ''
messages.value[aiMessageIndex].content = accumulatedContentRef.value
await scrollToBottom()
} else if (eventType === 'complete') {
messages.value[aiMessageIndex].isStreaming = false
return true // 返回true表示流已完成
} else if (eventType === 'error') {
messages.value[aiMessageIndex].isStreaming = false
// 检查是否是API密钥错误的特殊提示
if (data.message && data.message.includes('请配置API密钥')) {
messages.value[aiMessageIndex].content = '[错误] 请配置API密钥'
} else {
messages.value[aiMessageIndex].content = `[错误] ${data.message || data.error}`
}
isLoading.value = false
return true
} else if (eventType === 'thinking') {
// 处理思考事件,将其发送到时间轴面板
const event = {
type: 'thought',
title: data.thinkingType === 'final_answer' ? '最终答案' : '思考过程',
content: data.content,
timestamp: data.timestamp
}
// 通过事件总线将事件发送到时间轴
window.dispatchEvent(new CustomEvent('timeline-event', { detail: event }))
// 如果是最终答案,也应该显示在主要对话框中
// 修复:确保最终答案只添加一次,避免重复显示
if (data.thinkingType === 'final_answer' && !hasFinalAnswerRef.value) {
hasFinalAnswerRef.value = true
// 修复:这里不应该再添加到accumulatedContentRef.value,因为这会在流结束后再次设置导致重复
// accumulatedContentRef.value += data.content || ''
// 直接设置消息内容为最终答案
messages.value[aiMessageIndex].content = data.content || ''
await scrollToBottom()
}
// 对于非最终答案的思考过程,不添加到主对话框中
} else if (eventType === 'tool_call' || eventType === 'embed') {
// 处理工具调用和嵌入事件,将其发送到时间轴面板
// 添加详细的调试日志,便于问题诊断
console.log(`[ChatArea] 接收${eventType}事件,原始数据:`, {
hasToolName: 'toolName' in data,
toolName: data.toolName,
hasToolInput: 'toolInput' in data,
toolInput: data.toolInput,
hasToolOutput: 'toolOutput' in data,
toolOutput: data.toolOutput,
hasContent: 'content' in data,
content: data.content,
rawData: data
});
// 构建事件标题
let title = data.title || '事件'
if (eventType === 'tool_call' && data.toolName) {
title = `调用工具: ${data.toolName}`
} else if (eventType === 'embed' && data.embedTitle) {
title = data.embedTitle
}
// 构建元数据
const metadata: Record<string, any> = data.metadata || {}
if (eventType === 'tool_call') {
if (data.toolName) metadata['工具'] = data.toolName
if (data.toolAction) metadata['操作'] = data.toolAction
if (data.toolInput) metadata['输入'] = JSON.stringify(data.toolInput).substring(0, 100)
if (data.toolOutput) metadata['输出'] = String(data.toolOutput).substring(0, 100)
if (data.toolStatus) metadata['状态'] = data.toolStatus
if (data.executionTime) metadata['耗时'] = `${data.executionTime}ms`
} else if (eventType === 'embed') {
if (data.embedUrl) metadata['URL'] = data.embedUrl
if (data.embedType) metadata['类型'] = data.embedType
}
const event = {
type: eventType,
title: title,
content: data.content,
metadata: Object.keys(metadata).length > 0 ? metadata : undefined,
toolName: data.toolName,
toolAction: data.toolAction,
toolInput: data.toolInput,
toolOutput: data.toolOutput,
toolStatus: data.toolStatus,
executionTime: data.executionTime,
embedUrl: data.embedUrl,
embedType: data.embedType,
embedTitle: data.embedTitle,
embedHtmlContent: data.embedHtmlContent,
timestamp: data.timestamp || Date.now()
}
// 添加日志记录事件构建完成
console.log(`[ChatArea] ${eventType}事件构建完成,事件对象:`, {
type: event.type,
title: event.title,
hasToolName: !!event.toolName,
hasToolInput: 'toolInput' in event,
hasToolOutput: 'toolOutput' in event,
hasContent: !!event.content,
event: event
});
// 通过事件总线将事件发送到时间轴
window.dispatchEvent(new CustomEvent('timeline-event', { detail: event }))
// 对于embed事件,还需要触发embed-event事件
if (eventType === 'embed' && data.embedUrl) {
window.dispatchEvent(new CustomEvent('embed-event', {
detail: {
url: data.embedUrl,
type: data.embedType,
title: data.embedTitle,
htmlContent: data.embedHtmlContent
}
}))
}
}
// 重置当前事件类型
currentEventRef.value = ''
} catch (err) {
console.error('解析SSE数据失败:', err, '原始行:', line)
}
return false
}
return false
}
// 发送消息
const sendMessage = async () => {
if (!selectedAgent.value || !inputMessage.value.trim()) {
return
}
const userMessage = inputMessage.value.trim()
inputMessage.value = ''
// 添加用户消息
messages.value.push({
content: userMessage,
isUser: true,
agentId: selectedAgent.value,
timestamp: Date.now(),
isStreaming: false
})
await scrollToBottom()
// 添加AI消息容器(流式接收)
const aiMessageIndex = messages.value.length
messages.value.push({
content: '',
isUser: false,
agentId: selectedAgent.value,
timestamp: Date.now(),
isStreaming: true
})
isLoading.value = true
let hasReceivedFirstToken = false // 标记是否接收到第一个token
try {
// 使用自定义的流式处理
const token = localStorage.getItem('token')
const abortController = new AbortController()
// 调用后端的流式API
const response = await fetch(
`/api/v1/agent/chat-stream?agentId=${selectedAgent.value}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token || ''}`
},
body: JSON.stringify({ message: userMessage }),
signal: abortController.signal
}
)
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
const reader = response.body?.getReader()
if (!reader) {
throw new Error('无法读取响应体')
}
const accumulatedContentRef = { value: '' }
const hasFinalAnswerRef = { value: false }
const currentEventRef = { value: '' }
const decoder = new TextDecoder()
let buffer = ''
let isStreamComplete = false // 标记流是否已完成
while (true) {
if (isStreamComplete) break // 如果已收到complete事件,停止读取
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
const isComplete = await processSSELine(line, accumulatedContentRef, hasFinalAnswerRef, currentEventRef, aiMessageIndex)
if (isComplete) {
isStreamComplete = true
}
// 在接收到第一个token时,立即隐藏Skeleton加载动画
if (!hasReceivedFirstToken && (line.includes('"type":"token"') || currentEventRef.value === 'token')) {
isLoading.value = false
hasReceivedFirstToken = true
}
}
}
// 处理残余的数据
if (buffer && !isStreamComplete) {
const lines = buffer.split('\n')
for (const line of lines) {
const isComplete = await processSSELine(line, accumulatedContentRef, hasFinalAnswerRef, currentEventRef, aiMessageIndex)
if (isComplete) {
isStreamComplete = true
}
// 在接收到第一个token时,立即隐藏Skeleton加载动画
if (!hasReceivedFirstToken && (line.includes('"type":"token"') || currentEventRef.value === 'token')) {
isLoading.value = false
hasReceivedFirstToken = true
}
}
}
// 修复:只有在没有最终答案的情况下才更新消息内容
// 如果已经有最终答案,就不需要再更新消息内容了,避免重复显示
if (!hasFinalAnswerRef.value) {
messages.value[aiMessageIndex].content = accumulatedContentRef.value
}
// 确保最终状态正确
messages.value[aiMessageIndex].isStreaming = false
// 只在未接收到token的情况下设置isLoading为false(比如error或直接complete)
if (!hasReceivedFirstToken) {
isLoading.value = false
}
} catch (error: any) {
console.error('发送消息失败:', error)
// 判断是否是网络错误
let errorMessage = '[错误] '
// 检查是否是API密钥错误的特殊提示
if (error.message && error.message.includes('请配置API密钥')) {
errorMessage = '[错误] 请配置API密钥'
} else if (error instanceof TypeError) {
errorMessage += '网络获取失败,请检查你的网络连接'
} else if (error.name === 'AbortError') {
errorMessage += '请求已取消'
} else if (error.message) {
errorMessage += error.message
} else {
errorMessage += '一个未知错误发生'
}
messages.value[aiMessageIndex].content = errorMessage
messages.value[aiMessageIndex].isStreaming = false
isLoading.value = false
}
}
onMounted(() => {
loadAgents()
})
</script>
<style scoped>
.chat-area {
display: flex;
flex-direction: column;
height: 100%;
background-color: var(--bg-primary);
border-right: 1px solid var(--border-color);
min-height: 0; /* 允许容器收缩 */
}
.chat-header {
display: flex;
align-items: center;
justify-content: space-between;
padding: var(--spacing-4);
border-bottom: 1px solid var(--border-color);
background-color: var(--bg-secondary);
flex-shrink: 0;
}
.agent-selector {
flex: 1;
}
.agent-select {
width: 100%;
}
.header-actions {
display: flex;
gap: var(--spacing-2);
margin-left: var(--spacing-3);
}
.messages-container {
flex: 1;
overflow-y: auto;
padding: var(--spacing-4);
display: flex;
flex-direction: column;
gap: var(--spacing-3);
min-height: 0; /* 允许容器收缩 */
}
.empty-state {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100%;
color: var(--text-tertiary);
text-align: center;
}
.empty-icon {
font-size: 48px;
margin-bottom: var(--spacing-4);
}
.empty-text {
font-size: var(--font-size-lg);
font-weight: var(--font-weight-semibold);
color: var(--text-secondary);
margin-bottom: var(--spacing-3);
}
.empty-tips {
display: flex;
flex-direction: column;
gap: var(--spacing-2);
font-size: var(--font-size-sm);
color: var(--text-tertiary);
}
.loading-indicator {
padding: var(--spacing-2);
}
.chat-input-area {
padding: var(--spacing-4);
border-top: 1px solid var(--border-color);
background-color: var(--bg-secondary);
flex-shrink: 0;
}
.input-container {
margin-bottom: var(--spacing-3);
}
.el-input {
border-radius: var(--border-radius-lg);
}
.input-footer {
display: flex;
align-items: center;
justify-content: space-between;
}
.input-tips {
font-size: var(--font-size-xs);
color: var(--text-tertiary);
}
/* 消息容器滚动样式 */
.messages-container::-webkit-scrollbar {
width: 6px;
}
.messages-container::-webkit-scrollbar-track {
background: transparent;
}
.messages-container::-webkit-scrollbar-thumb {
background: var(--gray-300);
border-radius: 3px;
}
.messages-container::-webkit-scrollbar-thumb:hover {
background: var(--gray-400);
}
/* 响应式设计 */
@media (max-width: 768px) {
.chat-header {
padding: var(--spacing-3);
}
.messages-container {
padding: var(--spacing-3);
gap: var(--spacing-2);
}
.chat-input-area {
padding: var(--spacing-3);
}
.header-actions {
margin-left: var(--spacing-2);
}
}
</style>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment