Commit ccbc32db authored by ligaowei's avatar ligaowei

fix(server): 优化SSE连接管理和异常安全处理

- 在BaseAgentProcessor和DefaultReactExecutor中添加对NoClassDefFoundError的捕获,避免完成回调异常导致中断
- AgentChatService中增强Agent存在性检查和连接关闭逻辑,防止重复关闭和未认证错误发送失败
- CompletionHandlerService重构完成事件处理流程,移除无用依赖并严格按顺序发送完成信号和关闭连接
- ErrorHandlerService和EventService统一检查SSE连接完成状态,防止向已完成连接发送事件导致异常
- StreamRequestService增加连接有效性确认,安全发送token和完成事件,保证异常路径安全关闭emitter
- UserSseService改进emitter状态检测,添加安全方法判断连接有效性,完善心跳和关闭回调,防止重复关闭
- AgentChatController调整Agent验证逻辑,避免安全异常,改由服务层处理Agent存在校验
- ChatArea组件优化Agent选择与消息列表渲染,支持本地保存选中Agent并加载历史消息
- 修复前端样式和交互细节,统一事件绑定格式,提升用户体验和代码整洁度
parent 5f364ac1
......@@ -385,7 +385,11 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
// 发送完成事件,包含完整内容
try {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString());
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString());
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
if (log.isTraceEnabled()) {
log.trace("完成事件已发送");
}
......@@ -444,7 +448,11 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
}
}
return ragResponse;
......
......@@ -390,7 +390,11 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
String errorId = errorHandlerService.generateErrorId();
String fullErrorMessage = errorHandlerService.buildFullErrorMessage("处理完成时发生错误", e, errorId, "ReAct");
((TokenConsumerWithCompletion) tokenConsumer).onComplete("[" + errorId + "] " + fullErrorMessage);
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("[" + errorId + "] " + fullErrorMessage);
} catch (NoClassDefFoundError ex) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", ex.getMessage());
}
} catch (Exception ex) {
log.error("调用onComplete时发生错误", ex);
}
......@@ -594,7 +598,21 @@ public class DefaultReactExecutor implements ReactExecutor {
}
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
// 如果类未找到,至少发送一个空消息以确保流的完整性
if (tokenConsumer != null) {
try {
tokenConsumer.accept("");
} catch (Exception ex) {
log.error("发送空消息也失败", ex);
}
}
} catch (Exception e) {
log.error("调用onComplete时发生错误", e);
}
} else if (tokenConsumer != null) {
tokenConsumer.accept("");
}
......
......@@ -118,7 +118,28 @@ public class AgentChatService {
chatErrorHandler.handleChatError(emitter, "用户未认证,请重新登录");
} else {
log.warn("响应已提交,无法发送用户未认证错误信息");
emitter.complete();
// 检查emitter是否已经完成,避免重复关闭
if (!workPanelSseService.isEmitterCompleted(emitter)) {
emitter.complete();
}
}
return emitter;
}
// 验证Agent是否存在
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("Agent不存在: {}", agentId);
SseEmitter emitter = workPanelSseService.createEmitter();
// 检查响应是否已经提交
if (!response.isCommitted()) {
chatErrorHandler.handleChatError(emitter, "Agent不存在");
} else {
log.warn("响应已提交,无法发送Agent不存在错误信息");
// 检查emitter是否已经完成,避免重复关闭
if (!workPanelSseService.isEmitterCompleted(emitter)) {
emitter.complete();
}
}
return emitter;
}
......@@ -126,21 +147,26 @@ public class AgentChatService {
// 创建 SSE emitter
SseEmitter emitter = workPanelSseService.createEmitter();
// 将userId设为final以在Lambda表达式中使用
// 将userId和agent设为final以在Lambda表达式中使用
final String finalUserId = userId;
final Agent finalAgent = agent;
// 异步处理对话,避免阻塞HTTP连接
// 使用用户上下文装饰器来确保在异步线程中也能获取到用户信息
executorService.execute(AsyncUserContextDecorator.wrapWithContext(() -> {
try {
processChatRequest(emitter, agentId, chatRequest, finalUserId);
processChatRequest(emitter, finalAgent, chatRequest, finalUserId);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
// 检查响应是否已经提交
if (emitter != null) {
chatErrorHandler.handleChatError(emitter, "处理请求时发生错误", e, null);
} else {
log.warn("响应已提交,无法发送处理请求错误信息");
try {
// 检查响应是否已经提交
if (emitter != null && !workPanelSseService.isEmitterCompleted(emitter)) {
chatErrorHandler.handleChatError(emitter, "处理请求时发生错误", e, null);
} else {
log.warn("响应已提交或emitter已完成,无法发送处理请求错误信息");
}
} catch (Exception handlerException) {
log.error("处理错误信息时发生异常", handlerException);
}
}
}));
......@@ -153,34 +179,22 @@ public class AgentChatService {
* 注意:权限验证已在主线程中完成,此正仅执行业务逻辑不进行权限检查
*
* @param emitter SSE发射器
* @param agentId Agent ID
* @param agent Agent对象
* @param chatRequest 聊天请求
* @param userId 用户ID
*/
private void processChatRequest(SseEmitter emitter, String agentId, ChatRequest chatRequest, String userId) {
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
try {
// 直接从 agentService 获取Agent,不需验证权限(权限检查已在主线程中完成)
// 使用 agentService.getAgent() 要比 validateAgentAndPermission 安全,因为前者不会在异步线程中访问SecurityContext
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.error("Agent不存在: {}", agentId);
chatErrorHandler.handleChatError(emitter, "Agent不存在");
return;
}
// 获取处理器
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
log.error("无法获取Agent处理器,Agent: {}", agentId);
log.error("无法获取Agent处理器,Agent: {}", agent.getId());
chatErrorHandler.handleChatError(emitter, "无法获取Agent处理器");
return;
}
// 启动心跳机制
workPanelSseService.startHeartbeat(emitter, new java.util.concurrent.atomic.AtomicBoolean(false));
// 转换请求对象
AgentRequest request = chatRequest.toAgentRequest(agentId, agent, agentToolManager);
AgentRequest request = chatRequest.toAgentRequest(agent.getId(), agent, agentToolManager);
// 处理流式请求
streamRequestService.handleStreamRequest(emitter, processor, request, agent, userId);
......
......@@ -78,6 +78,8 @@ public class AgentErrorHandler {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
......
......@@ -6,7 +6,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.AgentDialogue;
import pangea.hiagent.common.utils.ValidationUtils;
// import pangea.hiagent.common.utils.ValidationUtils; // 移除这个依赖
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.common.utils.LogUtils;
......@@ -47,38 +47,38 @@ public class CompletionHandlerService {
AgentRequest request, String userId,
String fullContent, AtomicBoolean isCompleted) {
LogUtils.enterMethod("handleCompletion", emitter, processor, agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(emitter, "emitter")) {
// 参数验证 - 内联验证逻辑,避免依赖ValidationUtils
if (emitter == null) {
log.error("SSE发射器不能为空");
LogUtils.exitMethod("handleCompletion", "SSE发射器不能为空");
return;
}
if (ValidationUtils.isNull(processor, "processor")) {
if (processor == null) {
log.error("Agent处理器不能为空");
LogUtils.exitMethod("handleCompletion", "Agent处理器不能为空");
return;
}
if (ValidationUtils.isNull(agent, "agent")) {
if (agent == null) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("handleCompletion", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
if (request == null) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("handleCompletion", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
if (userId == null || userId.trim().isEmpty()) {
log.error("用户ID不能为空");
LogUtils.exitMethod("handleCompletion", "用户ID不能为空");
return;
}
if (ValidationUtils.isNull(isCompleted, "isCompleted")) {
if (isCompleted == null) {
log.error("完成状态标记不能为空");
LogUtils.exitMethod("handleCompletion", "完成状态标记不能为空");
return;
......@@ -86,31 +86,44 @@ public class CompletionHandlerService {
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
// 发送完成事件
// 严格按照正确的SSE连接关闭顺序:
// 1. 先完成所有业务处理:保存对话记录等
// 2. 发送完成信号:发送[DONE]信号
// 3. 关闭SSE连接:调用emitter.complete()方法完成连接
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 1. 完成所有业务处理:保存对话记录
Exception completionException = null;
try {
// 发送完整内容作为最后一个token
// if (fullContent != null && !fullContent.isEmpty()) {
// eventService.sendTokenEvent(emitter, fullContent);
// }
// 发送完成信号
emitter.send("[DONE]");
saveDialogue(agent, request, userId, fullContent);
log.info("对话记录保存成功");
} catch (Exception e) {
log.error("发送完成信号失败", e);
log.error("保存对话记录失败", e);
// 记录异常但不中断流程
completionException = e;
}
// 保存对话记录
log.debug("业务处理完成,准备发送完成信号");
// 2. 发送完成信号:发送[DONE]信号
try {
saveDialogue(agent, request, userId, fullContent);
log.info("对话记录保存成功");
// 检查emitter是否已经完成,避免向已完成的连接发送数据
if (!unifiedSseService.isEmitterCompleted(emitter)) {
// 发送完成信号
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送");
} else {
log.debug("SSE emitter已完成,跳过发送完成信号");
}
} catch (Exception e) {
log.error("保存对话记录失败", e);
// 记录异常但不中断流程,继续关闭emitter
log.error("发送完成信号失败", e);
completionException = e;
}
// 最后才关闭emitter,确保所有操作都完成后再提交响应
log.debug("完成信号发送完毕,准备关闭SSE连接");
// 3. 关闭SSE连接:调用emitter.complete()方法完成连接
try {
// 检查emitter是否已经完成,避免重复关闭
if (!unifiedSseService.isEmitterCompleted(emitter)) {
......@@ -131,20 +144,20 @@ public class CompletionHandlerService {
*/
public void saveDialogue(Agent agent, AgentRequest request, String userId, String responseContent) {
LogUtils.enterMethod("saveDialogue", agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
// 参数验证 - 内联验证逻辑,避免依赖ValidationUtils
if (agent == null) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("saveDialogue", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
if (request == null) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("saveDialogue", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
if (userId == null || userId.trim().isEmpty()) {
log.error("用户ID不能为空");
LogUtils.exitMethod("saveDialogue", "用户ID不能为空");
return;
......
......@@ -4,6 +4,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.workpanel.event.EventService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
......@@ -22,6 +24,9 @@ public class ErrorHandlerService {
@Autowired
private ExceptionMonitoringService exceptionMonitoringService;
@Autowired
private UserSseService userSseService;
/**
* 生成错误跟踪ID
*
......@@ -129,8 +134,13 @@ public class ErrorHandlerService {
errorMessage, exception);
try {
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
......@@ -154,8 +164,13 @@ public class ErrorHandlerService {
log.error("[{}] 处理聊天请求时发生错误: {}", errorId, errorMessage);
try {
String fullErrorMessage = buildFullErrorMessage(errorMessage, null, errorId, null);
eventService.sendErrorEvent(emitter, fullErrorMessage);
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String fullErrorMessage = buildFullErrorMessage(errorMessage, null, errorId, null);
eventService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
......@@ -190,9 +205,14 @@ public class ErrorHandlerService {
log.error("[{}] {}处理token时发生错误", errorId, processorType, exception);
if (!isCompleted.getAndSet(true)) {
try {
String errorMessage = "处理响应时发生错误";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "处理响应时发生错误";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception ignored) {
if (log.isDebugEnabled()) {
log.debug("[{}] 无法发送错误信息", errorId);
......@@ -223,9 +243,14 @@ public class ErrorHandlerService {
log.error("[{}] 发送完成事件失败", errorId, exception);
try {
String errorMessage = "发送完成事件失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "完成回调");
eventService.sendErrorEvent(emitter, fullErrorMessage);
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "发送完成事件失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "完成回调");
eventService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
......@@ -333,9 +358,14 @@ public class ErrorHandlerService {
if (!isCompleted.getAndSet(true)) {
try {
String errorMessage = "保存对话记录失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "对话记录");
eventService.sendErrorEvent(emitter, fullErrorMessage);
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "保存对话记录失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "对话记录");
eventService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
......
......@@ -84,12 +84,14 @@ public class StreamRequestService {
private pangea.hiagent.web.dto.AgentRequest request;
private String userId;
private CompletionHandlerService completionHandlerService;
private UserSseService userSseService;
public StreamTokenConsumer(SseEmitter emitter, AgentProcessor processor, UserSseService unifiedSseService, EventService eventService, CompletionHandlerService completionHandlerService) {
this.emitter = emitter;
this.processor = processor;
this.eventService = eventService;
this.completionHandlerService = completionHandlerService;
this.userSseService = unifiedSseService;
}
public void setContext(Agent agent, pangea.hiagent.web.dto.AgentRequest request, String userId) {
......@@ -97,19 +99,31 @@ public class StreamRequestService {
this.request = request;
this.userId = userId;
}
private boolean isEmitterValid() {
if (userSseService != null && emitter != null) {
return userSseService.isEmitterValidSafe(emitter);
}
return false;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (!isCompleted.get()) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
eventService.sendErrorEvent(emitter, token);
// 检查emitter是否仍然有效
if (isEmitterValid()) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
eventService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
eventService.sendTokenEvent(emitter, token);
}
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
eventService.sendTokenEvent(emitter, token);
log.debug("SSE emitter已无效,跳过发送token");
}
}
} catch (Exception e) {
......@@ -128,6 +142,12 @@ public class StreamRequestService {
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
try {
// 检查emitter是否仍然有效,避免在连接已关闭时尝试处理完成事件
if (userSseService != null && userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE emitter已完成,跳过完成处理");
return;
}
// 使用CompletionHandlerService处理完成回调
if (completionHandlerService != null) {
// 添加对CompletionHandlerService调用的额外保护,防止NoClassDefFoundError
......@@ -151,7 +171,9 @@ public class StreamRequestService {
log.error("在异常处理路径中完成emitter也失败", ex);
// 最终保障:直接完成emitter,避免连接未关闭
try {
emitter.complete();
if (!userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
}
} catch (Exception finalEx) {
log.error("最终尝试完成emitter也失败", finalEx);
}
......@@ -161,17 +183,32 @@ public class StreamRequestService {
/**
* 安全地发送完成事件并完成emitter
* 严格按照正确的关闭顺序:
* 1. 完成所有业务处理(此处无业务处理)
* 2. 发送完成信号
* 3. 关闭SSE连接
* 避免重复完成和异常情况
* 注意:此方法不调用onComplete,避免循环调用
*/
private void sendCompletionAndCompleteEmitter() {
try {
// 检查emitter是否已经完成,避免向已完成的连接发送数据
if (userSseService != null && userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE emitter已完成,跳过发送完成事件");
return;
}
// 发送完成事件
emitter.send("[DONE]");
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送,准备关闭SSE连接");
// 完成 emitter - 直接完成,不再通过CompletionHandlerService重复调用onComplete
try {
emitter.complete();
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
......@@ -180,7 +217,10 @@ public class StreamRequestService {
// 尝试直接完成emitter
try {
emitter.complete();
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
log.debug("异常路径下SSE连接已关闭");
}
} catch (Exception ex) {
log.error("完成emitter时也发生错误", ex);
}
......
......@@ -83,6 +83,8 @@ public class UserSseService {
SseEmitter emitter = new SseEmitter(SSE_TIMEOUT);
registerCallbacks(emitter);
emitters.add(emitter);
// 启动心跳机制,确保新创建的连接有心跳
startHeartbeat(emitter, new AtomicBoolean(false));
return emitter;
}
......@@ -148,6 +150,10 @@ public class UserSseService {
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
......@@ -172,6 +178,10 @@ public class UserSseService {
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
......@@ -196,6 +206,10 @@ public class UserSseService {
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
......@@ -288,19 +302,25 @@ public class UserSseService {
public void registerCallbacks(SseEmitter emitter) {
emitter.onCompletion(() -> {
log.debug("【注册回调函数】SSE连接完成");
// 按照正确的关闭顺序,连接完成时已经完成关闭,只需移除连接映射
removeEmitter(emitter);
});
emitter.onError((Throwable t) -> {
log.error("SSE连接发生错误: {}", t.getMessage(), t);
// 错误发生时,先移除连接映射
removeEmitter(emitter);
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
try {
emitter.complete();
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
emitter.complete();
}
} catch (Exception e) {
log.error("关闭SSE连接时发生异常: {}", e.getMessage(), e);
}
// 超时时也移除连接映射
removeEmitter(emitter);
});
}
......@@ -322,7 +342,10 @@ public class UserSseService {
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
try {
emitter.complete();
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
emitter.complete();
}
} catch (Exception e) {
log.error("关闭SSE连接失败", e);
}
......@@ -395,6 +418,22 @@ public class UserSseService {
}
}
/**
* 安全检查SSE Emitter是否仍然有效(不发送实际事件)
* 职责:提供非侵入性的连接有效性检查
*
* @param emitter 要检查的emitter
* @return 如果有效返回true,否则返回false
*/
public boolean isEmitterValidSafe(SseEmitter emitter) {
if (emitter == null) {
return false;
}
// 检查是否已经完成,而不发送任何事件
return !isEmitterCompleted(emitter);
}
/**
* 检查SSE Emitter是否已经完成
* 使用更安全的方式检查完成状态,不发送实际事件
......@@ -407,20 +446,28 @@ public class UserSseService {
return true; // 认为null emitter是已完成的
}
// 使用反射检查SseEmitter的完成状态
try {
// 尝试发送一个空事件,如果抛出IllegalStateException则表示已关闭
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException e) {
// 检查错误消息是否包含完成相关的文本
String message = e.getMessage();
if (message != null && (message.contains("completed") || message.contains("closed"))) {
java.lang.reflect.Field completedField = SseEmitter.class.getDeclaredField("completed");
completedField.setAccessible(true);
boolean completed = completedField.getBoolean(emitter);
return completed;
} catch (Exception e) {
// 如果反射失败,尝试通过发送事件检测
try {
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException ex) {
// 检查错误消息是否包含完成相关的文本
String message = ex.getMessage();
if (message != null && (message.contains("completed") || message.contains("closed"))) {
return true;
}
return true; // IllegalStateException通常表示连接已关闭
} catch (Exception ex) {
// 其他异常通常也表示连接已不可用
return true;
}
return false;
} catch (Exception e) {
// 其他异常不认为是完成
return false;
}
}
......@@ -468,11 +515,11 @@ public class UserSseService {
log.debug("[心跳] 成功发送心跳事件,时间戳: {}", heartbeatTimestamp);
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送心跳事件,emitter已关闭: {}", e.getMessage());
log.debug("无法发送心跳事件,emitter已关闭或完成: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
} catch (Exception e) {
log.warn("发送心跳事件失败: {}", e.getMessage());
throw e;
// 不重新抛出异常,避免心跳任务中断
}
}
......@@ -504,15 +551,15 @@ public class UserSseService {
} else {
log.warn("构建事件数据失败,无法发送事件: 类型={}", event.getType());
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送工作面板事件,emitter已关闭或完成: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
} catch (Exception e) {
// 记录详细错误信息,但不中断主流程
log.error("发送工作面板事件失败: 类型={}, 错误={}", event.getType(), e.getMessage(), e);
// 如果是连接已关闭的异常,重新抛出以便上层处理
if (e instanceof IllegalStateException && e.getMessage() != null &&
e.getMessage().contains("Emitter is already completed")) {
throw e;
}
// 其他异常不重新抛出,避免影响主流程
}
}
......
......@@ -51,11 +51,7 @@ public class AgentChatController {
// 这样可以避免在流式传输过程中突然抛出异常导致响应已提交的问题
// 仅验证Agent存在性,权限检查由过滤器处理
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("Agent不存在: {}", agentId);
throw new IllegalArgumentException("Agent不存在");
}
// 为避免安全异常,这里不直接调用agentService.getAgent(),而是让agentChatService处理
// 调用异步处理
return agentChatService.handleChatStream(agentId, chatRequest, response);
......
......@@ -249,6 +249,16 @@ public class EventService {
}
try {
// 检查emitter是否已经完成,避免向已完成的连接发送数据
java.lang.reflect.Field completedField = SseEmitter.class.getDeclaredField("completed");
completedField.setAccessible(true);
boolean isCompleted = completedField.getBoolean(emitter);
if (isCompleted) {
log.debug("SSE emitter已完成,跳过发送工作面板事件");
return;
}
// 构建事件数据
Map<String, Object> data = buildWorkPanelEventData(event);
......@@ -263,15 +273,17 @@ public class EventService {
} else {
log.warn("构建事件数据失败,无法发送事件: 类型={}", event.getType());
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送工作面板事件,emitter已关闭或完成: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
} catch (Exception e) {
// 记录详细错误信息,但不中断主流程
log.error("发送工作面板事件失败: 类型={}, 错误={}", event.getType(), e.getMessage(), e);
// 如果是连接已关闭的异常,重新抛出以便上层处理
if (e instanceof IllegalStateException && e.getMessage() != null &&
e.getMessage().contains("Emitter is already completed")) {
throw e;
if (!(e instanceof java.lang.reflect.InaccessibleObjectException) && !(e instanceof java.lang.NoSuchFieldException)) {
log.error("发送工作面板事件失败: 类型={}, 错误={}", event.getType(), e.getMessage(), e);
}
// 其他异常不重新抛出,避免影响主流程
}
}
......@@ -311,6 +323,16 @@ public class EventService {
}
try {
// 检查emitter是否已经完成,避免向已完成的连接发送数据
java.lang.reflect.Field completedField = SseEmitter.class.getDeclaredField("completed");
completedField.setAccessible(true);
boolean isCompleted = completedField.getBoolean(emitter);
if (isCompleted) {
log.debug("SSE emitter已完成,跳过发送错误事件");
return;
}
// 构建错误事件数据
Map<String, Object> data = errorEventDataBuilder.createErrorEventData(errorMessage);
......@@ -325,15 +347,17 @@ public class EventService {
} else {
log.warn("构建错误事件数据失败,无法发送事件");
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送错误事件,emitter已关闭或完成: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
} catch (Exception e) {
// 记录详细错误信息,但不中断主流程
log.error("发送错误事件失败: 错误信息={}, 错误={}", errorMessage, e.getMessage(), e);
// 如果是连接已关闭的异常,重新抛出以便上层处理
if (e instanceof IllegalStateException && e.getMessage() != null &&
e.getMessage().contains("Emitter is already completed")) {
throw e;
if (!(e instanceof java.lang.reflect.InaccessibleObjectException) && !(e instanceof java.lang.NoSuchFieldException)) {
log.error("发送错误事件失败: 错误信息={}, 错误={}", errorMessage, e.getMessage(), e);
}
// 其他异常不重新抛出,避免影响主流程
}
}
......@@ -351,6 +375,16 @@ public class EventService {
}
try {
// 检查emitter是否已经完成,避免向已完成的连接发送数据
java.lang.reflect.Field completedField = SseEmitter.class.getDeclaredField("completed");
completedField.setAccessible(true);
boolean isCompleted = completedField.getBoolean(emitter);
if (isCompleted) {
log.debug("SSE emitter已完成,跳过发送token事件");
return;
}
// 构建token事件数据
Map<String, Object> data = tokenEventDataBuilder.createOptimizedTokenEventData(token);
......@@ -364,15 +398,17 @@ public class EventService {
} else {
log.warn("构建token事件数据失败,无法发送事件");
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送token事件,emitter已关闭或完成: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
} catch (Exception e) {
// 记录详细错误信息,但不中断主流程
log.error("发送token事件失败: token长度={}, 错误={}", token != null ? token.length() : 0, e.getMessage(), e);
// 如果是连接已关闭的异常,重新抛出以便上层处理
if (e instanceof IllegalStateException && e.getMessage() != null &&
e.getMessage().contains("Emitter is already completed")) {
throw e;
if (!(e instanceof java.lang.reflect.InaccessibleObjectException) && !(e instanceof java.lang.NoSuchFieldException)) {
log.error("发送token事件失败: token长度={}, 错误={}", token != null ? token.length() : 0, e.getMessage(), e);
}
// 其他异常不重新抛出,避免影响主流程
}
}
......
......@@ -2,27 +2,13 @@
<div class="chat-area">
<!-- 顶部Agent选择和操作栏 -->
<div class="chat-header">
<el-select
v-model="selectedAgent"
@change="handleAgentChange"
placeholder="选择智能体"
class="agent-select"
>
<el-option
v-for="agent in agents"
:key="agent.id"
:label="agent.name"
:value="agent.id"
>
<el-select v-model="selectedAgent" @change="handleAgentChange" placeholder="选择智能体" class="agent-select">
<el-option v-for="agent in agents" :key="agent.id" :label="agent.name" :value="agent.id">
<span>{{ agent.name }} (ID: {{ agent.id }})</span>
</el-option>
</el-select>
<el-tooltip content="清空对话">
<el-button
@click="clearMessages"
:disabled="messages.length === 0"
circle
>
<el-button @click="clearMessages" :disabled="messages.length === 0" circle>
<span>🗑️</span>
</el-button>
</el-tooltip>
......@@ -39,18 +25,10 @@
<div>📚 RAG能力 - 知识库增强</div>
</div>
</div>
<message-item
v-for="(msg, index) in messages"
:key="index"
:content="msg.content"
:is-user="msg.isUser"
:agent-name="getAgentName(msg.agentId)"
:timestamp="msg.timestamp"
:is-streaming="msg.isStreaming && index === messages.length - 1"
:is-markdown="!msg.isUser"
:has-error="msg.hasError"
@retry="handleRetry(index)"
/>
<message-item v-for="(msg, index) in messages" :key="index" :content="msg.content" :is-user="msg.isUser"
:agent-name="getAgentName(msg.agentId)" :timestamp="msg.timestamp"
:is-streaming="msg.isStreaming && index === messages.length - 1" :is-markdown="!msg.isUser"
:has-error="msg.hasError" @retry="handleRetry(index)" />
<div v-if="isLoading" class="loading-indicator">
<el-skeleton :rows="2" animated />
</div>
......@@ -58,24 +36,13 @@
<!-- 输入框区域 -->
<div class="chat-input-area">
<el-input
v-model="inputMessage"
type="textarea"
:rows="3"
placeholder="输入消息... (支持 Shift+Enter 换行,Ctrl+Enter 发送)"
@keydown.ctrl.enter="sendMessage"
@keydown.meta.enter="sendMessage"
:disabled="!selectedAgent || isLoading"
class="input-container"
/>
<el-input v-model="inputMessage" type="textarea" :rows="3" placeholder="输入消息... (支持 Shift+Enter 换行,Ctrl+Enter 发送)"
@keydown.ctrl.enter="sendMessage" @keydown.meta.enter="sendMessage" :disabled="!selectedAgent || isLoading"
class="input-container" />
<div class="input-footer">
<span class="input-tips">Ctrl/⌘ + Enter 发送</span>
<el-button
type="primary"
@click="sendMessage"
:loading="isLoading"
:disabled="!selectedAgent || !inputMessage.trim() || isLoading"
>
<el-button type="primary" @click="sendMessage" :loading="isLoading"
:disabled="!selectedAgent || !inputMessage.trim() || isLoading">
发送
</el-button>
</div>
......@@ -130,7 +97,12 @@ const loadAgents = async () => {
agents.value = [];
}
console.log("[Agent列表加载] 获取到的Agent列表:", agents.value);
if (agents.value.length > 0 && !selectedAgent.value) {
// 尝试从localStorage恢复选中的智能体ID
const savedAgentId = localStorage.getItem('selectedAgentId');
if (savedAgentId && agents.value.some(agent => agent.id === savedAgentId)) {
selectedAgent.value = savedAgentId;
} else if (agents.value.length > 0 && !selectedAgent.value) {
selectedAgent.value = agents.value[0].id;
}
} catch (error) {
......@@ -148,8 +120,17 @@ const getAgentName = (agentId?: string): string => {
};
// 处理Agent切换
const handleAgentChange = () => {
clearMessages();
const handleAgentChange = async () => {
// 清空当前消息
messages.value = [];
// 保存选中的智能体到localStorage
if (selectedAgent.value) {
localStorage.setItem('selectedAgentId', selectedAgent.value);
}
// 加载新选中智能体的历史消息
await loadHistoryMessagesInternal(selectedAgent.value);
};
// 清空消息
......@@ -539,9 +520,8 @@ const processSSELine = async (
if (errorMsg.includes("请配置API密钥")) {
messages.value[aiMessageIndex].content = "[错误] 请配置API密钥";
} else {
messages.value[aiMessageIndex].content = `[错误] ${
errorMsg || "未知错误"
}`;
messages.value[aiMessageIndex].content = `[错误] ${errorMsg || "未知错误"
}`;
}
messages.value[aiMessageIndex].hasError = true;
isLoading.value = false;
......@@ -892,7 +872,14 @@ onMounted(async () => {
await loadAgents();
// 等待下一个tick确保agents加载完成后再加载历史消息
await nextTick();
loadHistoryMessages();
// 优先使用路由参数中的agentId,如果没有则使用localStorage中保存的或默认选中的
const routeAgentId = route.query.agentId as string;
if (routeAgentId) {
await loadHistoryMessagesInternal(routeAgentId);
} else {
loadHistoryMessages();
}
});
// 暴露方法给父组件使用
......@@ -908,7 +895,8 @@ defineExpose({
height: 100%;
background-color: var(--bg-primary);
border-right: 1px solid var(--border-color);
min-height: 0; /* 允许容器收缩 */
min-height: 0;
/* 允许容器收缩 */
}
.chat-header {
......@@ -933,7 +921,8 @@ defineExpose({
display: flex;
flex-direction: column;
gap: var(--spacing-3);
min-height: 0; /* 允许容器收缩 */
min-height: 0;
/* 允许容器收缩 */
}
.empty-state {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment