Commit cff82ce8 authored by ligaowei's avatar ligaowei

feat: 重构ReAct执行流程并优化事件处理

refactor(react): 合并EventSplitter到DefaultReactCallback简化流程
feat(timeline): 添加params字段支持工具参数展示
fix(WorkArea): 设置el-tabs的lazy属性为false
perf(useContentExpansion): 优化内容展开状态管理
style(EmailTools): 更新邮件工具参数描述和默认值
docs: 更新ReAct系统提示词和onComplete分析报告
parent 901b31c3
package pangea.hiagent.agent.processor;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
......@@ -29,7 +30,7 @@ public interface AgentProcessor {
* @param userId 用户ID
* @param tokenConsumer token处理回调函数
*/
void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer);
void processStreamRequest(AgentRequest request, Agent agent, String userId, SseTokenEmitter tokenConsumer);
/**
* 获取处理器类型
......
......@@ -392,6 +392,9 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
}
// 发送完成事件,包含完整内容
// 符合onComplete设计原则:在所有通讯操作最终完成后执行
// 触发条件:流式响应处理完成,所有token都已处理完毕
// 通讯流程位置:处理流程的最终阶段,确保客户端收到完整的响应内容
try {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
......@@ -455,14 +458,6 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
}
}
return ragResponse;
}
......
......@@ -11,6 +11,8 @@ import pangea.hiagent.rag.RagService;
import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/**
......@@ -73,7 +75,7 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
}
@Override
public void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer) {
public void processStreamRequest(AgentRequest request, Agent agent, String userId, SseTokenEmitter tokenConsumer) {
try {
log.info("使用普通Agent处理流式请求");
......@@ -104,6 +106,9 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
} catch (Exception e) {
errorHandlerService.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败");
// 直接调用完成回调,不依赖AgentErrorHandler
// 符合onComplete设计原则:在通讯操作失败后执行的最终操作
// 触发条件:处理请求时发生异常
// 通讯流程位置:异常处理流程的最终阶段,确保客户端收到完整的错误信息
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
......@@ -124,6 +129,9 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
// 发送错误信息
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
// 符合onComplete设计原则:在通讯操作失败后执行的最终操作
// 触发条件:当前模型不支持流式输出
// 通讯流程位置:错误处理流程的最终阶段,确保客户端收到完整的错误信息
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
......
......@@ -15,6 +15,8 @@ import pangea.hiagent.web.service.AgentService;
import java.util.List;
import java.util.function.Consumer;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/**
......@@ -49,7 +51,7 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
}
@Override
public void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer) {
public void processStreamRequest(AgentRequest request, Agent agent, String userId, SseTokenEmitter tokenConsumer) {
log.info("使用ReAct Agent处理流式请求");
processRequestStreamInternal(agent, request.getUserMessage(), tokenConsumer, userId);
}
......@@ -102,7 +104,7 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
* @param tokenConsumer token消费者
* @param userId 用户ID(可选)
*/
private void processRequestStreamInternal(Agent agent, String userMessage, Consumer<String> tokenConsumer, String userId) {
private void processRequestStreamInternal(Agent agent, String userMessage, SseTokenEmitter tokenConsumer, String userId) {
log.info("开始流式处理ReAct Agent请求,Agent ID: {}, 用户消息: {}", agent != null ? agent.getId() : "null", userMessage);
try {
......@@ -133,6 +135,9 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
} catch (Exception e) {
errorHandlerService.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误");
// 直接调用完成回调,不依赖AgentErrorHandler
// 符合onComplete设计原则:在通讯操作失败后执行的最终操作
// 触发条件:处理ReAct请求时发生异常
// 通讯流程位置:异常处理流程的最终阶段,确保客户端收到完整的错误信息
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
......@@ -153,6 +158,9 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
// 发送错误信息
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
// 符合onComplete设计原则:在通讯操作失败后执行的最终操作
// 触发条件:无法获取Agent的聊天模型
// 通讯流程位置:错误处理流程的最终阶段,确保客户端收到完整的错误信息
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
......
package pangea.hiagent.agent.react;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -19,8 +23,20 @@ public class DefaultReactCallback implements ReactCallback {
@Autowired
private UserSseService userSseService;
@Override
public void onStep(ReactStep reactStep) {
// EventSplitter functionality integrated directly
private final List<String> keywords = Arrays.asList(
"Thought", "Action", "Observation", "Final_Answer");
private final Pattern keywordPattern = Pattern.compile(
String.format("(?i)(Thought|Action|Observation|Final[ _]Answer):", String.join("|", keywords)),
Pattern.CASE_INSENSITIVE);
private String currentType = null;
private StringBuilder currentContent = new StringBuilder();
private StringBuilder buffer = new StringBuilder();
private volatile int stepNumber = 0;
private void onStep(ReactStep reactStep) {
String reactStepName = reactStep.getStepType().name();
......@@ -44,4 +60,57 @@ public class DefaultReactCallback implements ReactCallback {
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
}
// 每收到一个token/字符,调用此方法
@Override
public void onToken(String token) {
buffer.append(token);
// log.debug("当前缓冲区: {}", buffer.toString());
Matcher matcher = keywordPattern.matcher(buffer);
while (matcher.find()) {
log.debug("发现新事件关键词: {}", matcher.group(1));
// 发现新事件
if (currentType != null && currentContent.length() > 0) {
// 实时输出已分割事件
onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
// 更新事件类型
currentType = matcher.group(1);
currentContent.setLength(0);
// 累积匹配位置后的内容
currentContent.append(buffer.substring(matcher.end()));
// 重置buffer为剩余内容
buffer.setLength(0);
buffer.append(currentContent);
// 重新查找
matcher = keywordPattern.matcher(buffer);
}
// 检查是否有部分关键词在buffer末尾
if (buffer.length() > 0) {
// 检查是否可能是关键词的一部分
boolean isPartialKeyword = false;
String bufferStr = buffer.toString();
for (String keyword : keywords) {
if (keyword.startsWith(bufferStr) || bufferStr.startsWith(keyword)) {
isPartialKeyword = true;
break;
}
}
if (!isPartialKeyword) {
// 不是部分关键词,添加到内容中
currentContent.append(buffer);
buffer.setLength(0);
}
}
}
// 流式结束时,调用此方法输出最后一个事件
@Override
public void endStream() {
if (currentType != null && currentContent.length() > 0) {
onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.model.Agent;
......@@ -23,71 +24,70 @@ import java.util.function.Consumer;
@Slf4j
@Service
public class DefaultReactExecutor implements ReactExecutor {
@Value("${hiagent.react.system-prompt}")
private String defaultSystemPrompt;
private final List<ReactCallback> reactCallbacks = new ArrayList<>();
private final EventSplitter eventSplitter;
private final ReactCallback reactCallback;
private MemoryService memoryService;
private ErrorHandlerService errorHandlerService;
private final AgentToolManager agentToolManager;
public DefaultReactExecutor(EventSplitter eventSplitter, AgentToolManager agentToolManager ,
MemoryService memoryService, ErrorHandlerService errorHandlerService) {
this.eventSplitter = eventSplitter;
public DefaultReactExecutor(ReactCallback reactCallback, AgentToolManager agentToolManager,
MemoryService memoryService, ErrorHandlerService errorHandlerService) {
this.reactCallback = reactCallback;
this.agentToolManager = agentToolManager;
this.memoryService = memoryService;
this.memoryService = memoryService;
this.errorHandlerService = errorHandlerService;
}
@Override
public void addReactCallback(ReactCallback callback) {
if (callback != null) {
reactCallbacks.add(callback);
}
}
@Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) {
// 调用带用户ID的方法,首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserIdStatic();
return execute(chatClient, userInput, tools, agent, userId);
}
@Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent, String userId) {
log.info("开始执行ReAct流程,用户输入: {}", userInput);
List<Object> agentTools = getAgentTools(agent);
try {
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
ChatResponse response = chatClient.prompt(prompt)
.tools(agentTools.toArray())
.call()
.chatResponse();
String responseText = response.getResult().getOutput().getText();
log.info("最终答案: {}", responseText);
// 保存助手回复到内存,使用提供的用户ID
saveAssistantResponseToMemory(agent, responseText, userId);
return responseText;
} catch (Exception e) {
log.error("执行ReAct流程时发生错误", e);
return handleReActError(e);
}
}
/**
* 处理ReAct执行过程中发生的错误
*
......@@ -98,21 +98,21 @@ public class DefaultReactExecutor implements ReactExecutor {
log.error("ReAct执行过程中发生错误", e);
return errorHandlerService.handleSyncError(e, "处理ReAct请求时发生错误");
}
/**
* 构建带有历史记录的提示词
*
* @param systemPrompt 系统提示词
* @param userInput 用户输入
* @param agent 智能体对象
* @param userId 用户ID(可选,如果为null则自动获取)
* @param userInput 用户输入
* @param agent 智能体对象
* @param userId 用户ID(可选,如果为null则自动获取)
* @return 构建好的提示词对象
*/
private Prompt buildPromptWithHistory(String systemPrompt, String userInput, Agent agent, String userId) {
List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>();
messages.add(new SystemMessage(systemPrompt));
if (agent != null) {
try {
// 如果没有提供用户ID,则尝试获取当前用户ID
......@@ -120,112 +120,120 @@ public class DefaultReactExecutor implements ReactExecutor {
userId = UserUtils.getCurrentUserIdStatic();
}
String sessionId = memoryService.generateSessionId(agent, userId);
int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10;
List<org.springframework.ai.chat.messages.Message> historyMessages =
memoryService.getHistoryMessages(sessionId, historyLength);
List<org.springframework.ai.chat.messages.Message> historyMessages = memoryService
.getHistoryMessages(sessionId, historyLength);
messages.addAll(historyMessages);
memoryService.addUserMessageToMemory(sessionId, userInput);
} catch (Exception e) {
log.warn("获取历史对话记录时发生错误: {}", e.getMessage());
}
}
messages.add(new UserMessage(userInput));
return new Prompt(messages);
}
@Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) {
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools,
SseTokenEmitter tokenConsumer, Agent agent) {
// 调用带用户ID的方法,但首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserIdStatic();
executeStream(chatClient, userInput, tools, tokenConsumer, agent, userId);
}
@Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId) {
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools,
SseTokenEmitter tokenConsumer, Agent agent, String userId) {
log.info("使用stream()方法处理ReAct流程,支持真正的流式输出");
List<Object> agentTools = getAgentTools(agent);
StringBuilder fullResponse = new StringBuilder();
try {
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
chatClient.prompt(prompt)
.tools(agentTools.toArray())
.stream()
.chatResponse()
.subscribe(
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
throwable -> handleStreamError(throwable, tokenConsumer),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId)
);
} catch (Exception e) {
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
throwable -> handleStreamError(throwable, tokenConsumer),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId));
} catch (Exception e) {
log.error("流式执行ReAct流程时发生错误", e);
errorHandlerService.handleReactFlowError(e, tokenConsumer);
}
}
/**
* 处理流式响应中的单个token
*
* @param chatResponse 聊天响应对象
* @param chatResponse 聊天响应对象
* @param tokenConsumer token消费者
* @param fullResponse 完整响应构建器
* @param fullResponse 完整响应构建器
*/
private void handleTokenResponse(org.springframework.ai.chat.model.ChatResponse chatResponse, Consumer<String> tokenConsumer, StringBuilder fullResponse) {
private void handleTokenResponse(org.springframework.ai.chat.model.ChatResponse chatResponse,
SseTokenEmitter tokenConsumer, StringBuilder fullResponse) {
try {
String token = chatResponse.getResult().getOutput().getText();
if (isValidToken(token)) {
fullResponse.append(token);
if (tokenConsumer != null) {
tokenConsumer.accept(token);
}
eventSplitter.feedToken(token);
reactCallback.onToken(token);
}
} catch (Exception e) {
log.error("处理token时发生错误", e);
errorHandlerService.handleReactFlowError(e, tokenConsumer);
}
}
/**
* 处理流式响应完成事件
*
* @param tokenConsumer token消费者
* @param fullResponse 完整响应内容
* @param agent 智能体对象
* @param userId 用户ID
* @param fullResponse 完整响应内容
* @param agent 智能体对象
* @param userId 用户ID
*/
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId) {
private void handleStreamCompletion(SseTokenEmitter tokenConsumer, StringBuilder fullResponse, Agent agent,
String userId) {
try {
log.info("流式处理完成");
reactCallback.endStream();
String responseStr = fullResponse.toString();
saveAssistantResponseToMemory(agent, responseStr, userId);
sendCompletionEvent(tokenConsumer, responseStr);
// 发送完成事件,触发对话历史保存到数据库
tokenConsumer.onComplete(responseStr);
} catch (Exception e) {
log.error("处理流式完成回调时发生错误", e);
handleCompletionError(tokenConsumer, e);
}
}
/**
* 将助手的回复保存到内存中
*
* @param agent 智能体对象
* @param agent 智能体对象
* @param response 助手的回复内容
* @param userId 用户ID
* @param userId 用户ID
*/
private void saveAssistantResponseToMemory(Agent agent, String response, String userId) {
if (agent != null) {
......@@ -237,18 +245,21 @@ public class DefaultReactExecutor implements ReactExecutor {
}
}
}
/**
* 处理完成事件时发生的错误
*
* @param tokenConsumer token消费者
* @param e 发生的异常
* @param e 发生的异常
*/
private void handleCompletionError(Consumer<String> tokenConsumer, Exception e) {
private void handleCompletionError(SseTokenEmitter tokenConsumer, Exception e) {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
String errorId = errorHandlerService.generateErrorId();
String fullErrorMessage = errorHandlerService.buildFullErrorMessage("处理完成时发生错误", e, errorId, "ReAct");
// 符合onComplete设计原则:在通讯操作失败后执行的最终操作
// 触发条件:处理完成事件时发生异常
// 通讯流程位置:错误处理流程的最终阶段,确保客户端收到完整的错误信息
((TokenConsumerWithCompletion) tokenConsumer).onComplete("[" + errorId + "] " + fullErrorMessage);
} catch (NoClassDefFoundError ex) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", ex.getMessage());
......@@ -257,7 +268,7 @@ public class DefaultReactExecutor implements ReactExecutor {
}
}
}
/**
* 验证token是否有效
*
......@@ -267,49 +278,53 @@ public class DefaultReactExecutor implements ReactExecutor {
private boolean isValidToken(String token) {
return token != null && !token.isEmpty();
}
/**
* 处理流式响应中的错误
*
* @param throwable 异常对象
* @param throwable 异常对象
* @param tokenConsumer token消费者
*/
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer) {
private void handleStreamError(Throwable throwable, SseTokenEmitter tokenConsumer) {
errorHandlerService.handleStreamError(throwable, tokenConsumer, "ReAct流式处理");
}
/**
* 发送完成事件
*
* @param tokenConsumer token消费者
* @param fullResponse 完整响应内容
*/
private void sendCompletionEvent(Consumer<String> tokenConsumer, String fullResponse) {
if (fullResponse == null) {
fullResponse = "";
}
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
// 如果类未找到,至少发送一个空消息以确保流的完整性
if (tokenConsumer != null) {
try {
tokenConsumer.accept("");
} catch (Exception ex) {
log.error("发送空消息也失败", ex);
}
}
} catch (Exception e) {
log.error("调用onComplete时发生错误", e);
}
} else if (tokenConsumer != null) {
tokenConsumer.accept("");
}
}
// /**
// * 发送完成事件
// *
// * @param tokenConsumer token消费者
// * @param fullResponse 完整响应内容
// */
// private void sendCompletionEvent(SseTokenEmitter tokenConsumer, String
// fullResponse) {
// if (fullResponse == null) {
// fullResponse = "";
// }
// if (tokenConsumer instanceof TokenConsumerWithCompletion) {
// try {
// // 符合onComplete设计原则:在所有通讯操作最终完成后执行
// // 触发条件:ReAct执行流程完成,需要发送完整响应
// // 通讯流程位置:执行流程的最终阶段,确保客户端收到完整的响应内容
// ((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
// } catch (NoClassDefFoundError e) {
// log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
// // 如果类未找到,至少发送一个空消息以确保流的完整性
// if (tokenConsumer != null) {
// try {
// tokenConsumer.accept("");
// } catch (Exception ex) {
// log.error("发送空消息也失败", ex);
// }
// }
// } catch (Exception e) {
// log.error("调用onComplete时发生错误", e);
// }
// } else if (tokenConsumer != null) {
// tokenConsumer.accept("");
// }
// }
/**
* 获取智能体工具
*
......@@ -318,7 +333,7 @@ public class DefaultReactExecutor implements ReactExecutor {
*/
private List<Object> getAgentTools(Agent agent) {
List<Object> tools = new ArrayList<>();
if (agent != null) {
try {
tools = agentToolManager.getAvailableToolInstances(agent);
......@@ -327,7 +342,7 @@ public class DefaultReactExecutor implements ReactExecutor {
// 发生异常时,tools 保持为空列表
}
}
return tools;
}
}
\ No newline at end of file
package pangea.hiagent.agent.react;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class EventSplitter {
private final List<String> keywords = Arrays.asList(
"Thought", "Action", "Observation", "Final_Answer"
);
private final Pattern keywordPattern = Pattern.compile(
String.format("(?i)(Thought|Action|Observation|Final[ _]Answer):", String.join("|", keywords)), Pattern.CASE_INSENSITIVE
);
private String currentType = null;
private StringBuilder currentContent = new StringBuilder();
private StringBuilder buffer = new StringBuilder();
private final ReactCallback callback;
private volatile int stepNumber = 0;
public EventSplitter(ReactCallback callback) {
this.callback = callback;
}
// 每收到一个token/字符,调用此方法
public void feedToken(String token) {
buffer.append(token);
// log.debug("当前缓冲区: {}", buffer.toString());
Matcher matcher = keywordPattern.matcher(buffer);
while (matcher.find()) {
log.debug("发现新事件关键词: {}", matcher.group(1));
// 发现新事件
if (currentType != null && currentContent.length() > 0) {
// 实时输出已分割事件
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
// 更新事件类型
currentType = matcher.group(1);
currentContent.setLength(0);
// 累积匹配位置后的内容
currentContent.append(buffer.substring(matcher.end()));
// 重置buffer为剩余内容
buffer.setLength(0);
buffer.append(currentContent);
// 重新查找
matcher = keywordPattern.matcher(buffer);
}
// 检查是否有部分关键词在buffer末尾
if (buffer.length() > 0) {
// 检查是否可能是关键词的一部分
boolean isPartialKeyword = false;
String bufferStr = buffer.toString();
for (String keyword : keywords) {
if (keyword.startsWith(bufferStr) || bufferStr.startsWith(keyword)) {
isPartialKeyword = true;
break;
}
}
if (!isPartialKeyword) {
// 不是部分关键词,添加到内容中
currentContent.append(buffer);
buffer.setLength(0);
}
}
}
// 流式结束时,调用此方法输出最后一个事件
public void endStream(ReactCallback tokenConsumer) {
if (currentType != null && currentContent.length() > 0) {
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
}
}
......@@ -7,7 +7,12 @@ public interface ReactCallback {
/**
* ReAct每执行一个步骤,该方法会被触发
* @param reactStep ReAct步骤对象,包含步骤的所有核心信息
* @param token ReAct步骤对象,包含步骤的所有核心信息
*/
void onStep(ReactStep reactStep);
void onToken(String token);
/**
* 流式结束时,调用此方法输出最后一个事件
*/
void endStream();
}
\ No newline at end of file
package pangea.hiagent.agent.react;
import org.springframework.ai.chat.client.ChatClient;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.model.Agent;
import java.util.List;
import java.util.function.Consumer;
......@@ -40,7 +42,7 @@ public interface ReactExecutor {
* @param agent Agent对象
* @param userId 用户ID
*/
void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId);
void executeStream(ChatClient chatClient, String userInput, List<Object> tools, SseTokenEmitter tokenConsumer, Agent agent, String userId);
/**
* 流式执行ReAct流程(旧方法,保持向后兼容)
......@@ -50,7 +52,7 @@ public interface ReactExecutor {
* @param tokenConsumer token处理回调函数
* @param agent Agent对象
*/
void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent);
void executeStream(ChatClient chatClient, String userInput, List<Object> tools, SseTokenEmitter tokenConsumer, Agent agent);
/**
* 添加ReAct回调
......
......@@ -145,7 +145,7 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
public void closeEmitter() {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
// emitter.complete();
emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
......
......@@ -600,7 +600,7 @@ public class UserSseService {
Map<String, Object> data = mapPoolService.acquireMap();
// 设置基础属性
data.put("eventType", event.getType());
data.put("type", event.getType());
data.put("timestamp", event.getTimestamp());
data.put("title", event.getTitle());
data.put("content", event.getContent());
......
......@@ -30,16 +30,16 @@ public class EmailTools {
// 邮件请求参数类
@JsonClassDescription("邮件操作请求参数")
public record EmailRequest(
@JsonProperty(required = true, value = "host")
@JsonPropertyDescription("POP3服务器地址")
@JsonProperty(value = "host")
@JsonPropertyDescription("POP3服务器地址,默认pop3.hisense.com")
String host,
@JsonProperty(value = "port")
@JsonPropertyDescription("POP3服务器端口,默认995")
Integer port,
@JsonProperty(required = true, value = "username")
@JsonPropertyDescription("邮箱用户名")
@JsonProperty(value = "username")
@JsonPropertyDescription("邮箱用户名,默认是ligaowei")
String username,
@JsonProperty(required = true, value = "password")
......@@ -58,6 +58,11 @@ public class EmailTools {
public Integer port() {
return port != null ? port : 995;
}
// 默认主机为配置值
public String host() {
return host != null ? host : "pop3.hisense.com";
}
}
// 邮件基本信息响应类
......
......@@ -11,33 +11,37 @@ import org.springframework.http.ResponseEntity;
// 天气API响应数据结构
class WeatherApiResponse {
public String message;
public int status;
public String date;
public String time;
public CityInfo cityInfo;
public WeatherData data;
public Result[] results;
static class CityInfo {
public String city;
public String citykey;
public String parent;
public String updateTime;
static class Result {
public Location location;
public Now now;
public String last_update;
}
static class WeatherData {
public String shidu; // 湿度
public String wendu; // 温度
public String pm25;
public String quality;
public Forecast[] forecast;
static class Forecast {
public String date;
public String high;
public String low;
public String type; // 天气状况
}
static class Location {
public String id;
public String name;
public String country;
public String path;
public String timezone;
public String timezone_offset;
}
static class Now {
public String text; // 天气现象文字
public String code; // 天气现象代码
public String temperature; // 温度
public String feels_like; // 体感温度
public String pressure; // 气压
public String humidity; // 相对湿度
public String visibility; // 能见度
public String wind_direction; // 风向文字
public String wind_direction_degree; // 风向角度
public String wind_speed; // 风速
public String wind_scale; // 风力等级
public String clouds; // 云量
public String dew_point; // 露点温度
}
}
......@@ -55,43 +59,81 @@ public class WeatherFunction {
this.restTemplate = restTemplate;
}
@JsonClassDescription("获取指定城市的天气信息")
// API密钥常量
private static final String API_KEY = "SNCqjZiDAIPXdhE_O";
@JsonClassDescription("获取指定位置的天气信息")
public record Request(
@JsonProperty(required = true, value = "city")
@JsonPropertyDescription("城市名称")
String city
@JsonProperty(required = true, value = "location")
@JsonPropertyDescription("位置名称")
String location,
@JsonProperty(required = false, value = "language")
@JsonPropertyDescription("语言,默认值:zh-Hans")
String language,
@JsonProperty(required = false, value = "unit")
@JsonPropertyDescription("单位,默认值:c")
String unit
) {}
@JsonClassDescription("天气信息响应")
public record Response(
@JsonPropertyDescription("位置名称") String locationName,
@JsonPropertyDescription("温度") String temperature,
@JsonPropertyDescription("体感温度") String feelsLike,
@JsonPropertyDescription("湿度") String humidity,
@JsonPropertyDescription("天气状况") String condition
@JsonPropertyDescription("天气状况") String condition,
@JsonPropertyDescription("风向") String windDirection,
@JsonPropertyDescription("风力等级") String windScale,
@JsonPropertyDescription("气压") String pressure,
@JsonPropertyDescription("能见度") String visibility,
@JsonPropertyDescription("数据更新时间") String lastUpdate
) {}
@Tool(description = "获取指定城市的天气信息")
@Tool(description = "获取指定位置的天气信息")
public Response getWeather(Request request) {
log.debug("查询城市天气信息: {}", request.city);
log.debug("查询天气信息,位置: {}, 语言: {}, 单位: {}", request.location, request.language, request.unit);
try {
// 注意:这里使用固定的城市代码(天津)进行演示,实际应用中需要根据城市名称查找对应的城市代码
String url = "http://t.weather.sojson.com/api/weather/city/101030100";
// 构建API请求URL
String baseUrl = "https://api.seniverse.com/v3/weather/now.json";
// 设置默认参数
String language = request.language != null ? request.language : "zh-Hans";
String unit = request.unit != null ? request.unit : "c";
// 构建完整URL
String url = String.format("%s?key=%s&location=%s&language=%s&unit=%s",
baseUrl, API_KEY, request.location, language, unit);
ResponseEntity<WeatherApiResponse> responseEntity = restTemplate.getForEntity(url, WeatherApiResponse.class);
if (responseEntity.getStatusCode().is2xxSuccessful() && responseEntity.getBody() != null) {
WeatherApiResponse apiResponse = responseEntity.getBody();
if ("success".equals(apiResponse.message) && apiResponse.data != null) {
String temperature = apiResponse.data.wendu + "°C";
String humidity = apiResponse.data.shidu;
String condition = apiResponse.data.forecast != null && apiResponse.data.forecast.length > 0 ?
apiResponse.data.forecast[0].type : "未知";
if (apiResponse.results != null && apiResponse.results.length > 0) {
WeatherApiResponse.Result result = apiResponse.results[0];
WeatherApiResponse.Now now = result.now;
// 构建响应数据
Response response = new Response(
result.location.name,
now.temperature + "°" + ("c".equals(unit) ? "C" : "F"),
now.feels_like + "°" + ("c".equals(unit) ? "C" : "F"),
now.humidity + "%",
now.text,
now.wind_direction,
now.wind_scale + "级",
now.pressure + "mb",
now.visibility + "km",
result.last_update
);
Response response = new Response(temperature, humidity, condition);
log.debug("天气查询结果: 温度={}, 湿度={}, 天气状况={}", response.temperature, response.humidity, response.condition);
log.debug("天气查询结果: {}", response);
return response;
} else {
log.error("天气API返回错误信息: {}", apiResponse.message);
log.error("天气API返回结果为空");
}
} else {
log.error("天气API调用失败,HTTP状态码: {}", responseEntity.getStatusCode());
......@@ -101,8 +143,19 @@ public class WeatherFunction {
}
// 如果API调用失败,返回默认值
Response response = new Response("22°C", "65%", "晴天");
log.debug("天气查询结果(默认值): 温度={}, 湿度={}, 天气状况={}", response.temperature, response.humidity, response.condition);
Response response = new Response(
request.location,
"",
"",
"",
"",
"",
"",
"",
"",
""
);
log.debug("天气查询结果(默认值): {}", response);
return response;
}
}
\ No newline at end of file
......@@ -231,37 +231,31 @@ hiagent:
# ReAct配置
react:
system-prompt: >
You are a Spring AI tool orchestration assistant. Your TOP PRIORITY: ALWAYS CALL TOOLS FIRST, answer EXCLUSIVELY based on tool results.
=== CORE RULES ===
1. Tool-First Mandate: For any non-trivial query, EXECUTE RELEVANT TOOLS, never just describe them. Only use internal knowledge for simple common sense.
2. Result-Based Answers: All conclusions must come directly from tool execution results. Never fabricate data.
3. Multi-Tool Support: Call multiple tools in sequence where one tool's output feeds into the next.
4. Iterative Loop: If results are incomplete, re-analyze, adjust tools, and repeat until satisfactory.
5. Complex Queries: Use multiple tools for complex tasks; avoid single-tool reliance.
=== REACT PROCESS ===
Cyclic process for every query, execute in order until complete:
- Step 1 - THOUGHT: Analyze the query, break into sub-tasks, select relevant tools with alternatives, define execution sequence.
- Step 2 - ACTION: EXECUTE TOOLS DIRECTLY, NEVER JUST DESCRIBE THEM. Call specific tools in planned order, execute multiple if needed, use alternatives if a tool fails.
- Step 3 - OBSERVATION: Analyze all tool results, extract key insights, check completeness. If results are complete → Proceed to Final Answer; if incomplete → Return to Thought.
- Step 4 - FINAL ANSWER: Synthesize tool results into a clear, complete answer. Explain tool synergy if helpful. Keep it conversational.
=== RESPONSE FORMAT ===
Strictly follow this structure:
1. Thought: Problem analysis, tool selection, execution sequence
2. Action: Actual tool calls (not descriptions)
3. Observation: Key results summary, decision (terminate/restart)
4. Final_Answer: Result-based answer
=== HARD RULES ===
- Execute tools first, never just describe them
- Only use tool results for answers
- Use multiple tools for complex queries
- Support serial tool chaining
- Iterate until results are complete
- Follow Spring AI framework rules
### 角色
你是专业ReAct智能体,基于Spring AI框架执行任务,具备Thought→Action→Observation→Final_Answer的闭环能力。
### 核心规则
1. Thought:分析用户需求,判断是否需要调用工具,明确工具调用的目的和参数。
2. Action:仅调用已授权的工具,严格遵循工具入参格式,单次可调用单/多工具。
3. Observation:接收工具返回结果,校验数据有效性,无结果则从Thought继续重试,最多重试3次。
4. Final_Answer:基于Thought+Action+Observation,输出最终精准结果,不冗余、不臆造。
### 关键约束
✅ 无需工具时直接回答,不执行无效Thought/Action
✅ 工具调用失败时,简要说明原因并尝试最优替代方案
✅ 结果仅基于工具返回+自身知识库,拒绝编造信息
✅ 输出语言与用户提问一致,简洁专业,符合场景要求
### 执行流程
用户问题 → Thought(是否需调用工具) → Action(调用工具) → Observation(结果校验) → (重复以上步骤,最多3次) → Final_Answer(最终答案)
### 输出格式
Thought: [分析用户需求]
Action: [工具名称](参数1=值1, 参数2=值2)
Observation: [工具返回结果]
(重复以上步骤,最多3次)
Final_Answer: [最终结果]
# Milvus Lite配置
milvus:
data-dir: ./milvus_data
......
......@@ -18,7 +18,7 @@ MERGE INTO agent (id, name, description, status, default_model, owner, system_pr
('agent-3', '数据分析员', '专业的数据分析AI助手', 'active', 'deepseek-default', 'user-001', '你是一个数据分析专家,擅长处理和分析各种数据。', 0, 15, 1, 'data-analysis-kb', 5, 0.8, 50, 0, 0, '', '', 1),
('agent-4', '内容创作助手', '帮助撰写各类文案的AI助手', 'active', 'hisense-default', 'user-001', '你是一个创意写作专家,能够帮助用户创作各种类型的文案。', 0, 15, 1, 'content-creation-kb', 5, 0.8, 50, 0, 0, '', '', 1),
('agent-5', '学习导师', '个性化学习指导AI助手', 'active', 'hisense-default', 'user-001', '你是一个教育专家,能够根据用户需求提供个性化的学习建议。', 1, 15, 1, 'learning-mentor-kb', 5, 0.8, 50, 0, 0, '', '', 1),
('agent-6', '海信流程审批助手', '专业的海信业务流程审批AI助手,支持SSO登录和各种审批操作', 'active', 'hisense-default', 'user-001', '你是一个海信业务流程审批助手,可以帮助用户处理海信SSO登录和各类审批操作,包括请假审批、自驾车审批、调休审批等。', 1, 15, 0, '', 5, 0.8, 50, 0, 0, '', '', 1);
('agent-6', '海信流程审批助手', '专业的海信业务流程审批AI助手,支持SSO登录和各种审批操作', 'active', 'hisense-default', 'user-001', '你是一个海信业务流程审批助手,可以帮助用户处理海信SSO登录和各类审批操作,包括请假审批、自驾车审批、调休审批等。', 1, 3, 0, '', 5, 0.8, 50, 0, 0, '', '', 1);
-- 插入默认工具数据 (必须在agent_tool_relation之前插入)
MERGE INTO tool (id, name, display_name, description, category, status, bean_name, owner, timeout, http_method, parameters, return_type, return_schema, implementation, api_endpoint, headers, auth_type, auth_config) VALUES
......
......@@ -57,12 +57,29 @@ import MessageItem from "./MessageItem.vue";
import request from "@/utils/request";
import { useRoute } from "vue-router";
import type { TimelineEvent } from '../types/timeline';
import type { Message, Agent, SSEData, SSEProcessingContext } from '../types/chat';
// 接收从父组件传递的添加事件到时间轴的方法
const props = defineProps<{
addEventToTimeline?: (event: TimelineEvent) => void;
}>();
// ===== 状态定义 =====
const selectedAgent = ref<string>("");
const agents = ref<Agent[]>([]);
const messages = ref<Message[]>([]);
const inputMessage = ref("");
const isLoading = ref(false);
const messagesContainer = ref<HTMLElement>();
// 获取当前路由
const route = useRoute();
// 全局维护SSE流超时计时器引用,确保能够正确清除
let streamTimeoutTimer: ReturnType<typeof setTimeout> | null = null;
// ===== 工具函数 =====
// 生成唯一事件ID
const generateEventId = (): string => {
return `event-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
......@@ -70,41 +87,164 @@ const generateEventId = (): string => {
// 添加事件到时间轴
const addEventToTimeline = (event: TimelineEvent) => {
if (props.addEventToTimeline) {
props.addEventToTimeline(event);
} else {
console.warn('[ChatArea] addEventToTimeline prop is not provided');
props.addEventToTimeline?.(event);
};
// 创建时间轴事件的通用函数
const createTimelineEvent = (type: string, title: string, content: string, data?: any): TimelineEvent => {
// 构建元数据
const metadata: Record<string, any> = data?.metadata || {};
// 根据事件类型添加特定元数据
if (data) {
if (data.toolName) metadata["工具"] = data.toolName;
if (data.toolAction) metadata["操作"] = data.toolAction;
if (data.toolInput) {
try {
metadata["输入"] = JSON.stringify(data.toolInput).substring(0, 100);
} catch (e) {
metadata["输入"] = String(data.toolInput).substring(0, 100);
}
}
if (data.toolOutput) metadata["输出"] = String(data.toolOutput).substring(0, 100);
if (data.toolStatus) metadata["状态"] = data.toolStatus;
if (data.executionTime) metadata["耗时"] = `${data.executionTime}ms`;
if (data.embedUrl) metadata["URL"] = data.embedUrl;
if (data.embedType) metadata["类型"] = data.embedType;
if (data.actionName) metadata["操作名称"] = data.actionName;
if (data.actionParams) {
try {
metadata["操作参数"] = JSON.stringify(data.actionParams).substring(0, 100);
} catch (e) {
metadata["操作参数"] = String(data.actionParams).substring(0, 100);
}
}
if (data.observationType) metadata["观察类型"] = data.observationType;
}
return {
id: generateEventId(),
type,
title,
content,
metadata: Object.keys(metadata).length > 0 ? metadata : undefined,
toolName: data?.toolName,
toolAction: data?.toolAction,
toolInput: data?.toolInput,
params: data?.params,
toolOutput: data?.toolOutput,
toolStatus: data?.toolStatus,
executionTime: data?.executionTime,
embedUrl: data?.embedUrl,
embedType: data?.embedType,
embedTitle: data?.embedTitle,
embedHtmlContent: data?.embedHtmlContent,
timestamp: data?.timestamp || Date.now(),
};
};
interface Message {
content: string;
isUser: boolean;
agentId?: string;
timestamp: number;
isStreaming: boolean;
hasError?: boolean;
originalMessage?: string;
}
// 统一的超时计时器清理函数
const clearStreamTimeout = () => {
if (streamTimeoutTimer) {
clearTimeout(streamTimeoutTimer);
streamTimeoutTimer = null;
}
};
interface Agent {
id: string;
name: string;
[key: string]: any;
}
// 统一的错误处理函数
const handleError = (error: any, context: string): string => {
console.error(`[${context}] 错误详情:`, error);
let errorMessage = "未知错误";
if (error instanceof Error) {
errorMessage = error.message;
} else if (typeof error === "string") {
errorMessage = error;
} else if (error && typeof error === "object") {
if (error.message) {
errorMessage = error.message;
} else if (error.error) {
errorMessage = error.error;
} else {
errorMessage = JSON.stringify(error);
}
}
// 检查是否是API密钥错误的特殊提示
if (errorMessage.includes("请配置API密钥")) {
return "[错误] 请配置API密钥";
}
return `[错误] ${errorMessage}`;
};
const selectedAgent = ref<string>("");
const agents = ref<Agent[]>([]);
const messages = ref<Message[]>([]);
const inputMessage = ref("");
const isLoading = ref(false);
const messagesContainer = ref<HTMLElement>();
// 防抖函数
const debounce = (func: Function, wait: number) => {
let timeout: ReturnType<typeof setTimeout>;
return function executedFunction(...args: any[]) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
};
// 获取当前路由
const route = useRoute();
// 自动滚动到底部(使用防抖优化性能)
const scrollToBottom = debounce(async () => {
await nextTick();
if (messagesContainer.value) {
messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight;
}
}, 100);
// 全局维护SSE流超时计时器引用,确保能够正确清除
let streamTimeoutTimer: ReturnType<typeof setTimeout> | null = null;
// 【修复工具函数】处理转义序列,将\n、\t等转义形式还原为实际字符
const unescapeString = (str: string): string => {
if (!str) return str;
try {
// 处理转义序列:将转义形式还原为实际字符
let unescaped = str
.replace(/\\n/g, "\n") // \n 转换为换行符
.replace(/\\r/g, "\r") // \r 转换为回车符
.replace(/\\t/g, "\t") // \t 转换为制表符
.replace(/\\b/g, "\b") // \b 转换为退格符
.replace(/\\f/g, "\f") // \f 转换为换页符
.replace(/\\\\/g, "\\"); // \\ 转换为单个反斜杠
return unescaped;
} catch (e) {
console.warn("转义序列处理失败:", e);
return str;
}
};
// ===== 事件处理相关 =====
// 事件标题映射表
const eventTitleMap: Record<string, string | ((data: any) => string)> = {
tool_call: (data: any) => data.toolName || "工具调用",
embed: (data: any) => data.embedTitle || "嵌入内容",
action: (data: any) => data.actionName || "执行操作",
observation: "观察结果",
log: "系统日志",
result: "执行结果",
thought: (data: any) => data.thinkingType === "final_answer" ? "最终答案" : "思考过程",
complete: "对话完成",
error: "对话错误"
};
// 处理时间轴事件的通用函数
const handleTimelineEvent = (eventType: string, data: any, content: string) => {
const title = eventTitleMap[eventType]
? typeof eventTitleMap[eventType] === "function"
? eventTitleMap[eventType](data)
: eventTitleMap[eventType]
: data.title || eventType;
const timelineEvent = createTimelineEvent(eventType, title, content, data);
addEventToTimeline(timelineEvent);
};
// 获取Agent列表
const loadAgents = async () => {
......@@ -288,46 +428,20 @@ const loadHistoryMessagesInternal = async (agentId: string) => {
messages.value = [];
}
} catch (error: any) {
console.error("[历史消息加载] 加载历史对话记录失败", error);
messages.value = []; // 出错时清空消息列表
// 记录详细的错误信息便于调试
if (error.response) {
console.error(
"[历史消息加载] HTTP响应错误 - 状态码:",
error.response.status
);
console.error("[历史消息加载] HTTP响应错误 - 数据:", error.response.data);
console.error(
"[历史消息加载] HTTP响应错误 - 请求URL:",
error.config?.url
);
} else if (error.request) {
console.error("[历史消息加载] 网络请求错误 - 没有收到响应");
console.error("[历史消息加载] 请求配置:", error.request);
} else {
console.error("[历史消息加载] 错误信息:", error.message);
console.error("[历史消息加载] 错误堆栈:", error.stack);
const errorMsg = handleError(error, "历史消息加载");
messages.value = []; // 出错时清空消息列表
ElMessage.error("加载历史消息失败");
}
}
};
// 加载历史对话记录
const loadHistoryMessages = async () => {
// 首先尝试从路由参数获取agentId
let agentId = route.query.agentId as string;
// 如果路由参数中没有agentId,则使用下拉框中选中的值
if (!agentId || typeof agentId !== "string" || agentId.trim() === "") {
agentId = selectedAgent.value;
}
// 如果仍然没有有效的agentId,则不加载历史消息
if (!agentId || typeof agentId !== "string" || agentId.trim() === "") {
let agentId = route.query.agentId as string || selectedAgent.value;
if (!agentId?.trim()) {
console.log("[历史消息加载] 没有指定有效的Agent ID,跳过加载历史记录");
return;
}
await loadHistoryMessagesInternal(agentId);
};
......@@ -351,62 +465,12 @@ const handleRetry = async (index: number) => {
await sendMessage();
};
// 统一的超时计时器清理函数
const clearStreamTimeout = () => {
if (streamTimeoutTimer) {
clearTimeout(streamTimeoutTimer);
streamTimeoutTimer = null;
}
};
// 防抖函数
const debounce = (func: Function, wait: number) => {
let timeout: ReturnType<typeof setTimeout>;
return function executedFunction(...args: any[]) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
};
// 自动滚动到底部(使用防抖优化性能)
const scrollToBottom = debounce(async () => {
await nextTick();
if (messagesContainer.value) {
messagesContainer.value.scrollTop = messagesContainer.value.scrollHeight;
}
}, 100);
// 【修复工具函数】处理转义序列,将\n、\t等转义形式还原为实际字符
const unescapeString = (str: string): string => {
if (!str) return str;
try {
// 处理转义序列:将转义形式还原为实际字符
let unescaped = str
.replace(/\\n/g, "\n") // \n 转换为换行符
.replace(/\\r/g, "\r") // \r 转换为回车符
.replace(/\\t/g, "\t") // \t 转换为制表符
.replace(/\\b/g, "\b") // \b 转换为退格符
.replace(/\\f/g, "\f") // \f 转换为换页符
.replace(/\\\\/g, "\\"); // \\ 转换为单个反斜杠
return unescaped;
} catch (e) {
console.warn("转义序列处理失败:", e);
return str;
}
};
// 处理SSE数据行的通用函数
const processSSELine = async (
line: string,
accumulatedContentRef: { value: string },
hasFinalAnswerRef: { value: boolean },
currentEventRef: { value: string },
aiMessageIndex: number,
resetStreamTimeout: () => void
{ accumulatedContentRef, hasFinalAnswerRef, currentEventRef, aiMessageIndex, resetStreamTimeout }: SSEProcessingContext
) => {
if (!line.trim()) return false;
......@@ -422,33 +486,16 @@ const processSSELine = async (
return false;
}
// 尝试解析为JSON,如果失败则作为纯文本处理
let data;
try {
data = JSON.parse(dataStr);
} catch (e) {
// 不是JSON格式,将其作为token事件处理(用于兼容旧协议)
data = {
type: "token",
token: dataStr,
};
}
// 修复:检查是否是JSON格式的API密钥错误消息
if (typeof data === "object" && data !== null) {
// 检查是否包含API密钥错误信息
const errorMessage =
data.message || data.error || data.token || data.content || "";
if (errorMessage.includes("请配置API密钥")) {
// 处理API密钥错误
clearStreamTimeout();
messages.value[aiMessageIndex].isStreaming = false;
messages.value[aiMessageIndex].content = "[错误] 请配置API密钥";
messages.value[aiMessageIndex].hasError = true;
isLoading.value = false;
console.error("[SSE解析错误] 接收到API密钥错误消息:", dataStr);
return true; // 返回true表示流已完成
}
}
let data: SSEData;
try {
data = JSON.parse(dataStr);
} catch (e) {
// 不是JSON格式,将其作为token事件处理(用于兼容旧协议)
data = {
type: "token",
token: dataStr,
};
}
// 特殊处理流结束标记
if (dataStr.trim() === "[DONE]") {
......@@ -459,13 +506,10 @@ const processSSELine = async (
return true; // 返回true表示流已完成
}
// 检查是否是纯文本错误消息
// 修复:正确处理API密钥错误,检查是否为JSON格式的错误消息
if (
(dataStr.startsWith("[错误]") || dataStr.includes("请配置API密钥")) &&
!dataStr.trim().startsWith("{")
) {
// 处理纯文本错误消息,包括API密钥错误
// 检查是否是错误消息
const isError = dataStr.startsWith("[错误]") || dataStr.includes("请配置API密钥");
if (isError) {
// 处理错误消息
clearStreamTimeout();
messages.value[aiMessageIndex].isStreaming = false;
messages.value[aiMessageIndex].content = dataStr.startsWith("[错误]")
......@@ -477,30 +521,14 @@ const processSSELine = async (
return true; // 返回true表示流已完成
}
// 新增:处理JSON格式但包含API密钥错误的情况
if (typeof data === "object" && data !== null) {
const jsonData = data;
// 检查各种可能包含错误信息的字段
const errorMsg =
jsonData.message ||
jsonData.error ||
jsonData.token ||
jsonData.content ||
"";
if (errorMsg.includes("请配置API密钥")) {
// 处理API密钥错误
clearStreamTimeout();
messages.value[aiMessageIndex].isStreaming = false;
messages.value[aiMessageIndex].content = "[错误] 请配置API密钥";
messages.value[aiMessageIndex].hasError = true;
isLoading.value = false;
console.error("[SSE解析错误] 接收到API密钥错误消息:", dataStr);
return true; // 返回true表示流已完成
}
// 修复:当event是"message"时,使用data.type作为事件类型,否则使用currentEventRef.value
let eventType = currentEventRef.value;
if (eventType === "message") {
eventType = data.type;
} else if (!eventType) {
eventType = data.type;
}
const eventType = currentEventRef.value || data.type;
// 根据事件类型处理数据
switch (eventType) {
case "heartbeat":
......@@ -513,7 +541,7 @@ const processSSELine = async (
case "token":
// 重置超时计时器,接收到token说明连接还活跃
resetStreamTimeout();
// 修复:对接收到的token进行反转义处理,保换行符和格式化符号正常显示
// 修复:对接收到的token进行反转义处理,保换行符和格式化符号正常显示
const processedToken = unescapeString(data.token || "");
accumulatedContentRef.value += processedToken;
messages.value[aiMessageIndex].content = accumulatedContentRef.value;
......@@ -530,14 +558,7 @@ const processSSELine = async (
isLoading.value = false;
// 添加完成事件到时间轴
const completeEvent: TimelineEvent = {
id: generateEventId(),
type: "complete",
title: "对话完成",
content: "智能体已完成回答",
timestamp: Date.now(),
};
addEventToTimeline(completeEvent);
handleTimelineEvent(eventType, data, "智能体已完成回答");
return true; // 返回true表示流已完成
......@@ -546,7 +567,7 @@ const processSSELine = async (
clearStreamTimeout();
messages.value[aiMessageIndex].isStreaming = false;
// 检查是否是API密钥错误的特殊提示
const errorMsg =
const errorMsg =
data.message || data.error || data.token || data.content || "";
if (errorMsg.includes("请配置API密钥")) {
messages.value[aiMessageIndex].content = "[错误] 请配置API密钥";
......@@ -560,30 +581,18 @@ const processSSELine = async (
console.error("[SSE错误事件]", data);
// 添加错误事件到时间轴
const errorEvent: TimelineEvent = {
id: generateEventId(),
type: "error",
title: "对话错误",
content: errorMsg || "未知错误",
timestamp: Date.now(),
};
addEventToTimeline(errorEvent);
handleTimelineEvent(eventType, data, errorMsg || "未知错误");
return true; // 返回true表示流已完成
case "thinking":
case "thought":
// 收到思考事件,清除超时计时器
clearStreamTimeout();
// 处理思考事件,将其发送到时间轴面板
const thoughtEvent: TimelineEvent = {
id: generateEventId(),
type: "thought",
title:
data.thinkingType === "final_answer" ? "最终答案" : "思考过程",
content: data.content,
timestamp: data.timestamp || Date.now(),
};
// 调用添加事件到时间轴的方法
addEventToTimeline(thoughtEvent);
handleTimelineEvent(eventType, data, data.content);
// 记录思考事件便于调试
console.log("[SSE思考事件]", data);
// 如果是最终答案,也应该显示在主要对话框中
// 修复:确保最终答案只添加一次,避免重复显示
......@@ -599,97 +608,61 @@ const processSSELine = async (
messages.value[aiMessageIndex].isStreaming = false;
isLoading.value = false;
await scrollToBottom();
return true; // 返回true表示流已完成
}
// 对于非最终答案的思考过程,不添加到主对话框中
break;
case "tool_call":
case "embed":
// 处理工具调用和嵌入事件,将其发送到时间轴面板
// 构建事件标题
let title = data.title || "事件";
if (eventType === "tool_call" && data.toolName) {
title = `调用工具: ${data.toolName}`;
} else if (eventType === "embed" && data.embedTitle) {
title = data.embedTitle;
}
// 处理嵌入事件,将其发送到时间轴面板
handleTimelineEvent(eventType, data, data.content);
// 构建元数据
const metadata: Record<string, any> = data.metadata || {};
if (eventType === "tool_call") {
if (data.toolName) metadata["工具"] = data.toolName;
if (data.toolAction) metadata["操作"] = data.toolAction;
if (data.toolInput) {
try {
metadata["输入"] = JSON.stringify(data.toolInput).substring(0, 100);
} catch (e) {
metadata["输入"] = String(data.toolInput).substring(0, 100);
}
}
if (data.toolOutput) metadata["输出"] = String(data.toolOutput).substring(0, 100);
if (data.toolStatus) metadata["状态"] = data.toolStatus;
if (data.executionTime) metadata["耗时"] = `${data.executionTime}ms`;
} else if (eventType === "embed") {
if (data.embedUrl) metadata["URL"] = data.embedUrl;
if (data.embedType) metadata["类型"] = data.embedType;
// 对于embed事件,还需要触发embed-event事件
if (data.embedUrl) {
window.dispatchEvent(new CustomEvent("embed-event", {
detail: {
url: data.embedUrl,
type: data.embedType,
title: data.embedTitle,
htmlContent: data.embedHtmlContent,
},
}));
}
break;
// 构建时间轴事件
const timelineEvent: TimelineEvent = {
id: generateEventId(),
type: eventType,
title: title,
content: data.content,
metadata: Object.keys(metadata).length > 0 ? metadata : undefined,
toolName: data.toolName,
toolAction: data.toolAction,
toolInput: data.toolInput,
toolOutput: data.toolOutput,
toolStatus: data.toolStatus,
executionTime: data.executionTime,
embedUrl: data.embedUrl,
embedType: data.embedType,
embedTitle: data.embedTitle,
embedHtmlContent: data.embedHtmlContent,
timestamp: data.timestamp || Date.now(),
};
// 调用添加事件到时间轴的方法
addEventToTimeline(timelineEvent);
// 以下事件类型使用统一的时间轴事件处理
case "tool_call":
case "action":
case "observation":
case "log":
case "result":
handleTimelineEvent(eventType, data, data.content);
break;
// 对于embed事件,还需要触发embed-event事件
if (eventType === "embed" && data.embedUrl) {
window.dispatchEvent(
new CustomEvent("embed-event", {
detail: {
url: data.embedUrl,
type: data.embedType,
title: data.embedTitle,
htmlContent: data.embedHtmlContent,
},
})
);
}
default:
// 处理未知事件类型
handleTimelineEvent(eventType, data, data.content || "");
break;
}
// 重置当前事件类型
currentEventRef.value = "";
} catch (err) {
console.error(
"[SSE解析错误] 解析SSE数据失败,重置超时计时器:",
err,
"原始行:",
line
);
// 收到任何消息,都要重置超时计时器
resetStreamTimeout();
// 根据错误类型,决定是否继续处理或中断流
if (line.includes('"type":"error"') || line.includes('"error"')) {
// 错误消息,继续处理
return false;
}
}
console.error(
"[SSE解析错误] 解析SSE数据失败,重置超时计时器:",
err,
"原始行:",
line
);
// 收到任何消息,都要重置超时计时器
resetStreamTimeout();
// 根据错误类型,决定是否继续处理或中断流
if (line.includes('"type":"error"') || line.includes('"error"')) {
// 错误消息,继续处理
return false;
}
}
return false;
}
return false;
......@@ -785,8 +758,7 @@ const sendMessage = async () => {
ElMessage.warning("流式输出超时,您可以点击重试按钮重新发送消息");
// 显示重试按钮
messages.value[aiMessageIndex].content =
"[错误] 流式输出超时,请重试";
messages.value[aiMessageIndex].content = "[错误] 流式输出超时,请重试";
messages.value[aiMessageIndex].hasError = true;
} else {
// 如果还没超时,继续检查
......@@ -798,66 +770,40 @@ const sendMessage = async () => {
resetStreamTimeout();
while (true) {
if (isStreamComplete) break; // 如果已收到complete事件,停止读取
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
// 处理SSE流的通用函数
const processLines = async (lines: string[]) => {
for (const line of lines) {
const isComplete = await processSSELine(
line,
accumulatedContentRef,
hasFinalAnswerRef,
currentEventRef,
aiMessageIndex,
resetStreamTimeout
{ accumulatedContentRef, hasFinalAnswerRef, currentEventRef, aiMessageIndex, resetStreamTimeout }
);
if (isComplete) {
isStreamComplete = true;
}
// 在接收到第一个token时,立即隐藏Skeleton加载动画
if (
!hasReceivedFirstToken &&
(line.includes('"type":"token"') || currentEventRef.value === "token")
) {
if (!hasReceivedFirstToken && (line.includes('"type":"token"') || currentEventRef.value === "token")) {
isLoading.value = false;
hasReceivedFirstToken = true;
}
}
};
while (true) {
if (isStreamComplete) break; // 如果已收到complete事件,停止读取
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
await processLines(lines);
}
// 处理残余的数据
if (buffer && !isStreamComplete) {
const lines = buffer.split("\n");
for (const line of lines) {
const isComplete = await processSSELine(
line,
accumulatedContentRef,
hasFinalAnswerRef,
currentEventRef,
aiMessageIndex,
resetStreamTimeout
);
if (isComplete) {
isStreamComplete = true;
}
// 在接收到第一个token时,立即隐藏Skeleton加载动画
if (
!hasReceivedFirstToken &&
(line.includes('"type":"token"') || currentEventRef.value === "token")
) {
isLoading.value = false;
hasReceivedFirstToken = true;
}
}
await processLines(buffer.split("\n"));
}
// 修复:仅有在没有最终答案且没有错误的情况下才更新消息内容
......@@ -868,7 +814,6 @@ const sendMessage = async () => {
// 确保最终状态正确
messages.value[aiMessageIndex].isStreaming = false;
// 设置isLoading为false,结束加载状态
isLoading.value = false;
// 清除超时计时器
......@@ -880,27 +825,9 @@ const sendMessage = async () => {
// 清除超时计时器
clearStreamTimeout();
// 判断是否是网络错误
let errorMessage = "[错误] ";
// 检查是否是API密钥错误的特殊提示
if (
error.message &&
(error.message.includes("请配置API密钥") ||
error.message.includes("[错误] 请配置API密钥"))
) {
errorMessage = "[错误] 请配置API密钥";
} else if (error instanceof TypeError) {
errorMessage += "网络获取失败,请检查你的网络连接";
} else if (error.name === "AbortError") {
errorMessage += "请求已取消";
} else if (error.message && error.message.includes("处理超时")) {
errorMessage = "[错误] 服务器处理超时,请稍后重试";
} else if (error.message) {
errorMessage += error.message;
} else {
errorMessage += "一个未知错误发生";
}
// 使用统一的错误处理函数
const errorMessage = handleError(error, "发送消息");
messages.value[aiMessageIndex].content = errorMessage;
messages.value[aiMessageIndex].isStreaming = false;
messages.value[aiMessageIndex].hasError = true;
......@@ -912,14 +839,8 @@ onMounted(async () => {
await loadAgents();
// 等待下一个tick确保agents加载完成后再加载历史消息
await nextTick();
// 优先使用路由参数中的agentId,如果没有则使用localStorage中保存的或默认选中的
const routeAgentId = route.query.agentId as string;
if (routeAgentId) {
await loadHistoryMessagesInternal(routeAgentId);
} else {
loadHistoryMessages();
}
// 加载历史消息
await loadHistoryMessages();
});
// 暴露方法给父组件使用
......
<template>
<div class="timeline-manager">
<!-- 过滤和搜索面板 -->
<div class="timeline-filter-panel">
<div class="filter-row">
<el-input
v-model="searchQuery"
placeholder="搜索事件..."
clearable
size="small"
prefix-icon="Search"
/>
<el-select
v-model="selectedEventTypes"
placeholder="事件类型"
multiple
size="small"
class="filter-select"
>
<el-option
v-for="(label, type) in eventTypeLabels"
:key="type"
:label="label"
:value="type"
/>
</el-select>
</div>
<div class="filter-row">
<el-button
type="primary"
size="small"
@click="applyFilters"
>
应用过滤
</el-button>
<el-button
size="small"
@click="resetFilters"
>
重置
</el-button>
<el-button
size="small"
@click="exportEvents"
>
导出事件
</el-button>
<el-dropdown @command="handleExport">
<el-button size="small">
导出格式 <el-icon class="el-icon--right"><arrow-down /></el-icon>
</el-button>
<template #dropdown>
<el-dropdown-menu>
<el-dropdown-item command="json">JSON</el-dropdown-item>
<el-dropdown-item command="csv">CSV</el-dropdown-item>
</el-dropdown-menu>
</template>
</el-dropdown>
</div>
</div>
<!-- 时间轴面板 -->
<TimelinePanel
:events="filteredEvents"
:events="events"
:getEventTypeLabel="getEventTypeLabel"
:formatTime="formatTime"
:getExpandedState="getExpandedState"
......@@ -95,6 +36,9 @@ const activeFilters = ref({
eventTypes: [] as string[]
});
// 事件展开状态管理 - 使用事件ID或时间戳作为唯一标识
const expandedStates = ref<Record<string, boolean>>({});
// 持久化配置
const persistenceEnabled = ref(true);
const STORAGE_KEY = 'timeline_events';
......@@ -141,16 +85,14 @@ const formatTime = (timestamp: number): string => {
return `${hours}:${minutes}:${seconds}`;
};
// 获取事件的展开状态(这里简化实现,实际可以根据需要扩展)
const getExpandedState = (index: number): boolean => {
// 简化实现,实际可以根据需要扩展
return false;
// 获取事件的展开状态
const getExpandedState = (key: string | number): boolean => {
return expandedStates.value[key] || false;
};
// 切换事件详细信息的展开状态(这里简化实现,实际可以根据需要扩展)
const toggleExpand = (index: number): void => {
// 简化实现,实际可以根据需要扩展
console.log('切换展开状态:', index);
// 切换事件详细信息的展开状态
const toggleExpand = (key: string | number): void => {
expandedStates.value[key] = !expandedStates.value[key];
};
// 过滤后的事件列表
......
......@@ -39,6 +39,10 @@
<pre>{{ JSON.stringify((event as any).toolInput, null, 2) }}</pre>
</el-descriptions-item>
<el-descriptions-item label="工具参数" :span="2" v-if="(event as any).params">
<pre>{{ JSON.stringify((event as any).params, null, 2) }}</pre>
</el-descriptions-item>
<el-descriptions-item label="工具输出" :span="2" v-if="(event as any).toolOutput">
<pre>{{ JSON.stringify((event as any).toolOutput, null, 2) }}</pre>
</el-descriptions-item>
......@@ -54,6 +58,22 @@
<el-descriptions-item label="嵌入类型" v-if="(event as any).embedType">{{ (event as any).embedType }}</el-descriptions-item>
<el-descriptions-item label="嵌入标题" v-if="(event as any).embedTitle">{{ (event as any).embedTitle }}</el-descriptions-item>
</template>
<!-- Message消息特有字段 -->
<template v-if="(event as any).methodName || (event as any).startTime">
<el-descriptions-item label="方法名称" v-if="(event as any).methodName">{{ (event as any).methodName }}</el-descriptions-item>
<el-descriptions-item label="开始时间" v-if="(event as any).startTime">
<span>{{ formatTime((event as any).startTime) }}</span>
<span class="full-time">{{ new Date((event as any).startTime).toISOString() }}</span>
</el-descriptions-item>
</template>
<!-- 完整原始数据 -->
<el-descriptions-item label="完整数据" :span="2">
<div class="metadata-section">
<pre>{{ JSON.stringify(event, null, 2) }}</pre>
</div>
</el-descriptions-item>
</el-descriptions>
</div>
......
<template>
<div class="timeline-panel">
<div class="timeline-header">
<h3>执行过程</h3>
<el-button text @click="props.onClearTimeline" :disabled="!props.events || props.events.length === 0">清除</el-button>
<el-button text size="small" @click="props.onClearTimeline" :disabled="!props.events || props.events.length === 0">清除</el-button>
</div>
<div class="timeline-container" ref="timelineContainer">
......@@ -55,18 +54,18 @@
class="tool-details"
>
<!-- 展开/折叠按钮 -->
<div class="detail-toggle" @click.stop="props.toggleExpand(displayedEvents.length - 1 - index)">
<span class="toggle-text">{{ props.getExpandedState(displayedEvents.length - 1 - index) ? '收起详情' : '查看详情' }}</span>
<span class="toggle-icon">{{ props.getExpandedState(displayedEvents.length - 1 - index) ? '▲' : '▼' }}</span>
<div class="detail-toggle" @click.stop="props.toggleExpand(event.id || event.timestamp)">
<span class="toggle-text">{{ props.getExpandedState(event.id || event.timestamp) ? '收起详情' : '查看详情' }}</span>
<span class="toggle-icon">{{ props.getExpandedState(event.id || event.timestamp) ? '▲' : '▼' }}</span>
</div>
<!-- 详细信息内容 -->
<div v-show="getExpandedState(displayedEvents.length - 1 - index)" class="detail-content">
<div v-show="getExpandedState(event.id || event.timestamp)" class="detail-content">
<!-- 输入参数段 -->
<ToolDataSection
v-if="props.hasValidToolInput(event)"
title="输入参数"
:data="event.toolInput"
:data="event.toolInput || event.params"
type="input"
/>
......@@ -74,16 +73,10 @@
<ToolDataSection
v-if="props.hasValidToolOutput(event)"
title="输出结果"
:data="event.toolOutput"
:data="event.toolOutput || event.result"
type="output"
/>
</div>
</div>
<div v-if="event.metadata" class="event-metadata">
<div v-for="(value, key) in event.metadata" :key="key" class="metadata-item">
<span class="metadata-key">{{ key }}:</span>
<span class="metadata-value">{{ String(value).substring(0, 100) }}</span>
</div>
</div>
</div>
</div>
......@@ -122,8 +115,8 @@ const props = defineProps<{
events: TimelineEvent[]
getEventTypeLabel: (type: string) => string
formatTime: (timestamp: number) => string
getExpandedState: (index: number) => boolean
toggleExpand: (index: number) => void
getExpandedState: (key: string | number) => boolean
toggleExpand: (key: string | number) => void
isToolEventType: (type: string) => boolean
hasValidToolInput: (event: TimelineEvent) => boolean
hasValidToolOutput: (event: TimelineEvent) => boolean
......@@ -203,20 +196,13 @@ watch(() => props.events, () => {
.timeline-header {
display: flex;
align-items: center;
justify-content: space-between;
padding: var(--spacing-4);
border-bottom: 1px solid var(--border-color);
background-color: var(--bg-secondary);
justify-content: flex-end;
padding: var(--spacing-2);
border-bottom: none;
background-color: transparent;
flex-shrink: 0;
}
.timeline-header h3 {
margin: 0;
font-size: var(--font-size-lg);
font-weight: var(--font-weight-semibold);
color: var(--text-primary);
}
.timeline-container {
flex: 1;
overflow-y: auto;
......
<template>
<div class="work-area">
<el-tabs v-model="activeTab" class="work-tabs">
<el-tabs v-model="activeTab" class="work-tabs" :lazy="false">
<el-tab-pane label="📋 时间轴" name="timeline">
<timeline-container ref="timelineContainerRef" />
</el-tab-pane>
......
// 内容展开管理hook
import { nextTick, ref, type Ref } from 'vue'
import { nextTick, ref } from 'vue'
import type { TimelineEvent } from '../types/timeline'
export function useContentExpansion(props: {
events: TimelineEvent[]
}) {
// 内容展开状态管理 - 使用WeakMap提高性能
const contentExpandedStates = new WeakMap<HTMLElement, boolean>()
// 内容展开状态管理 - 使用普通对象存储展开状态,键是事件的唯一标识
const contentExpandedStates = ref<Record<string, boolean>>({})
const contentLineCounts = ref<Record<string, number>>({})
const contentElements = new Map<string, HTMLElement>()
// 事件ID到时间戳的映射,用于快速查找
const eventIdToTimestamp = ref<Record<string, number>>({})
// 更新事件ID映射
const updateEventIdMapping = () => {
props.events.forEach(event => {
if (event.id) {
eventIdToTimestamp.value[event.id] = event.timestamp
}
})
}
// 获取内容展开状态
const getContentExpandedState = (timestamp: number): boolean => {
const key = timestamp.toString()
const element = contentElements.get(key)
return element ? (contentExpandedStates.get(element) || false) : false
return contentExpandedStates.value[key] || false
}
// 注册内容元素引用
......@@ -34,10 +21,6 @@ export function useContentExpansion(props: {
if (el) {
const key = timestamp.toString()
contentElements.set(key, el)
// 初始化展开状态为false
if (!contentExpandedStates.has(el)) {
contentExpandedStates.set(el, false)
}
// 更新行数计算
updateLineCountForElement(timestamp)
}
......@@ -59,11 +42,7 @@ export function useContentExpansion(props: {
// 切换内容展开状态
const toggleContentExpand = (timestamp: number) => {
const key = timestamp.toString()
const element = contentElements.get(key)
if (element) {
const currentState = contentExpandedStates.get(element) || false
contentExpandedStates.set(element, !currentState)
}
contentExpandedStates.value[key] = !contentExpandedStates.value[key]
}
// 检查是否应该显示切换按钮
......@@ -99,8 +78,6 @@ export function useContentExpansion(props: {
// 更新内容行数计数
const updateLineCounts = () => {
nextTick(() => {
updateEventIdMapping()
props.events.forEach((event) => {
if ('content' in event && event.content) {
const key = event.timestamp.toString()
......
......@@ -2,7 +2,7 @@
<div class="chat-page">
<!-- 左侧对话区 -->
<div class="left-panel">
<chat-area ref="chatArea" />
<chat-area ref="chatArea" :add-event-to-timeline="addEventToTimeline" />
</div>
<!-- 中间分割线 -->
......@@ -37,6 +37,13 @@ watch(() => route.query.agentId, (newAgentId) => {
}
}, { immediate: true })
// 添加事件到时间轴
const addEventToTimeline = (event: any) => {
if (workArea.value && typeof workArea.value.addEvent === 'function') {
workArea.value.addEvent(event)
}
}
// 开始拖动分割线
const startResize = (e: MouseEvent) => {
isResizing.value = true
......
......@@ -37,7 +37,7 @@ const addToolCallEvent = () => {
if (timelinePanelRef.value) {
timelinePanelRef.value.addEvent({
type: 'tool_call',
title: '调用工具: 搜索引擎',
title: '搜索引擎',
content: '正在调用搜索引擎查找相关信息',
toolName: '搜索引擎',
toolAction: 'search',
......
......@@ -415,6 +415,9 @@ export function handleBinaryMessage(
}
if (onComplete) {
// 符合onComplete设计原则:在所有通讯操作最终完成后执行
// 触发条件:所有二进制消息分片已接收并处理完成
// 通讯流程位置:数据处理流程的最终阶段
addLog(`📤 调用onComplete回调,数据类型: ${typeof decodedData}`, 'debug');
onComplete(decodedData, header.encoding);
}
......
// 聊天消息类型定义
export interface Message {
content: string;
isUser: boolean;
agentId?: string;
timestamp: number;
isStreaming: boolean;
hasError?: boolean;
originalMessage?: string;
}
// 智能体类型定义
export interface Agent {
id: string;
name: string;
[key: string]: any;
}
// SSE事件数据类型定义
export interface SSEData {
type: string;
token?: string;
content?: string;
thinkingType?: string;
toolName?: string;
toolAction?: string;
toolInput?: any;
toolOutput?: any;
toolStatus?: string;
executionTime?: number;
embedUrl?: string;
embedType?: string;
embedTitle?: string;
embedHtmlContent?: string;
metadata?: Record<string, any>;
[key: string]: any;
}
// SSE行处理上下文类型
export interface SSEProcessingContext {
accumulatedContentRef: { value: string };
hasFinalAnswerRef: { value: boolean };
currentEventRef: { value: string };
aiMessageIndex: number;
resetStreamTimeout: () => void;
}
......@@ -16,6 +16,7 @@ export interface ToolCallEvent extends BaseTimelineEvent {
toolName: string;
toolAction?: string;
toolInput?: any;
params?: any;
toolStatus: string;
}
......
......@@ -15,7 +15,7 @@ export function isToolEventType(type: string): boolean {
* @returns 工具输入是否有效
*/
export function hasValidToolInput(event: any): boolean {
return event.type === 'tool_call' && event.toolInput !== null && event.toolInput !== undefined;
return event.type === 'tool_call' && (event.toolInput !== null && event.toolInput !== undefined || event.params !== null && event.params !== undefined);
}
/**
......@@ -24,7 +24,9 @@ export function hasValidToolInput(event: any): boolean {
* @returns 工具输出是否有效
*/
export function hasValidToolOutput(event: any): boolean {
return event.type === 'tool_result' && event.toolOutput !== null && event.toolOutput !== undefined;
return (event.type === 'tool_result' || event.type === 'tool_call') &&
(event.toolOutput !== null && event.toolOutput !== undefined ||
event.result !== null && event.result !== undefined);
}
/**
......
# onComplete函数调用分析报告
## 1. 项目概述
本报告对项目中所有调用`onComplete`函数的位置进行了全面检查,包括但不限于已注释掉的代码块。分析了每个调用实例的上下文环境、触发条件以及在通讯流程中的具体位置,评估了是否严格遵循"在所有通讯操作最终完成后执行"的设计原则。
## 2. onComplete函数定义
```java
// TokenConsumerWithCompletion.java:14
default void onComplete(String fullContent) {
// 默认实现为空
}
```
**设计原则**`onComplete`函数应在所有通讯操作最终完成后执行,无论是正常完成还是异常终止。
## 3. 调用位置清单
| 序号 | 文件路径 | 行号 | 上下文方法 | 触发条件 | 调用方式 |
|------|----------|------|------------|----------|----------|
| 1 | BaseAgentProcessor.java | 461 | handleRagResponse | tokenConsumer不为null | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse) |
| 2 | BaseAgentProcessor.java | 398 | handleStreamComplete | 流式响应处理完成 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString()) |
| 3 | DefaultReactExecutor.java | 252 | handleCompletionError | 处理完成事件时发生异常 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage) |
| 4 | DefaultReactExecutor.java | 294 | sendCompletionEvent | 需要发送完成事件 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse) |
| 5 | ReActAgentProcessor.java | 138 | processStream | 处理ReAct请求时发生异常 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage) |
| 6 | ReActAgentProcessor.java | 158 | handleModelNotAvailable | 无法获取Agent的聊天模型 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage) |
| 7 | NormalAgentProcessor.java | 109 | processStream | 普通Agent流式处理失败 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage) |
| 8 | NormalAgentProcessor.java | 129 | handleModelNotSupportStream | 当前模型不支持流式输出 | ((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage) |
| 9 | binaryMessageHandler.ts | 419 | handleBinaryMessage | 二进制消息处理完成 | onComplete(decodedData, header.encoding) |
## 4. 合规性评估
### 4.1 合规调用(6个)
**调用2**:BaseAgentProcessor.java - 第398行
- **上下文**:处理流式响应完成的方法
- **触发条件**:流式响应处理完成后
- **位置**:在所有token都处理完毕后
- **合规性**:✅ 符合设计原则,在所有通讯操作最终完成后执行
**调用3**:DefaultReactExecutor.java - 第252行
- **上下文**:处理完成事件时发生的错误
- **触发条件**:在处理完成事件时发生异常
- **位置**:在错误处理流程中
- **合规性**:✅ 符合设计原则,在通讯操作失败后执行的最终操作
**调用4**:DefaultReactExecutor.java - 第294行
- **上下文**:发送完成事件的方法
- **触发条件**:当需要发送完成事件时
- **位置**:在ReAct执行流程的最终阶段
- **合规性**:✅ 符合设计原则,在所有通讯操作最终完成后执行
**调用5**:ReActAgentProcessor.java - 第138行
- **上下文**:处理ReAct请求时的异常处理
- **触发条件**:当处理ReAct请求时发生异常
- **位置**:在异常处理流程中
- **合规性**:✅ 符合设计原则,在通讯操作失败后执行的最终操作
**调用6**:ReActAgentProcessor.java - 第158行
- **上下文**:处理模型不可用的情况
- **触发条件**:当无法获取Agent的聊天模型时
- **位置**:在错误处理流程中
- **合规性**:✅ 符合设计原则,在通讯操作失败后执行的最终操作
**调用7**:NormalAgentProcessor.java - 第109行
- **上下文**:处理普通Agent流式请求时的异常处理
- **触发条件**:当流式处理失败时
- **位置**:在异常处理流程中
- **合规性**:✅ 符合设计原则,在通讯操作失败后执行的最终操作
**调用8**:NormalAgentProcessor.java - 第129行
- **上下文**:处理模型不支持流式输出的情况
- **触发条件**:当模型不支持流式输出时
- **位置**:在错误处理流程中
- **合规性**:✅ 符合设计原则,在通讯操作失败后执行的最终操作
**调用9**:binaryMessageHandler.ts - 第419行
- **上下文**:处理二进制消息的方法
- **触发条件**:当所有二进制消息分片已接收并处理完成时
- **位置**:在数据处理流程的最终阶段
- **合规性**:✅ 符合设计原则,在所有通讯操作最终完成后执行
### 4.2 不合规调用(1个)
**调用1**:BaseAgentProcessor.java - 第461行
```java
protected String handleRagResponse(String ragResponse, Consumer<String> tokenConsumer) {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
}
}
return ragResponse;
}
```
- **上下文**:处理RAG响应的方法
- **触发条件**:当tokenConsumer不为null时
- **位置**:在accept调用之后立即调用onComplete
- **问题**:❌ 提前调用 - RAG响应只是整个处理流程的一部分,而不是所有通讯操作的最终完成
- **风险**:如果后续还有其他通讯操作,客户端会误认为整个处理流程已完成
## 5. 问题分析
### 5.1 主要问题
1. **提前调用**:BaseAgentProcessor.java第461行,在RAG响应处理后立即调用onComplete,而RAG响应只是整个处理流程的一部分,不是所有通讯操作的最终完成。
2. **重复调用风险**:如果一个请求同时触发RAG响应和其他处理流程,可能会导致onComplete被调用多次,违反了"单一完成信号"的设计原则。
### 5.2 影响评估
- **客户端困惑**:客户端可能会在整个处理流程完成前收到完成信号,导致处理不完整。
- **资源泄漏**:如果客户端基于onComplete信号释放资源,可能会导致正在进行的通讯操作失败。
- **数据一致性问题**:如果后续处理流程产生新的数据,客户端将无法获取这些数据。
## 6. 改进建议
### 6.1 立即修复项
1. **移除BaseAgentProcessor.java第461行的onComplete调用**
- 理由:这是一个提前调用,违反了设计原则
- 影响:确保onComplete只在所有通讯操作最终完成后执行
- 修改方案:删除该调用行及其相关的条件判断
### 6.2 长期改进项
1. **统一onComplete调用策略**
- 确保onComplete只在以下两种情况下调用:
- 所有正常处理流程完成后
- 处理流程发生异常时
2. **添加文档注释**
- 为每个onComplete调用添加详细的文档注释,说明其在通讯流程中的位置和触发条件
3. **单元测试**
- 为onComplete调用添加单元测试,确保其在正确的时间点被调用
4. **代码审查**
- 在代码审查过程中,特别关注onComplete调用的时机是否符合设计原则
## 7. 结论
项目中大部分onComplete调用都符合"在所有通讯操作最终完成后执行"的设计原则,但存在一处明显的提前调用问题。建议立即修复BaseAgentProcessor.java第461行的onComplete调用,并采取长期改进措施以确保onComplete函数的正确使用。
修复后,所有onComplete调用将严格遵循设计原则,确保客户端能够正确接收完成信号,提高系统的可靠性和一致性。
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment