Commit 0bd8e66f authored by ligaowei's avatar ligaowei

fix(SSE): 增强SSE连接状态检查和错误处理

添加响应提交检查防止重复操作,优化emitter状态管理
修复JWT认证过滤器中的代码格式问题
重构SSE端点授权检查逻辑
简化application.yml中的配置说明
parent 8bff979e
...@@ -126,7 +126,7 @@ public class AgentChatService { ...@@ -126,7 +126,7 @@ public class AgentChatService {
} }
// 创建 SSE emitter // 创建 SSE emitter
SseEmitter emitter = userSseService.createEmitter(); SseEmitter emitter = userSseService.createAndRegisterConnection(userId);
// 异步处理对话,避免阻塞HTTP连接 // 异步处理对话,避免阻塞HTTP连接
processChatStreamAsync(emitter, agent, chatRequest, userId); processChatStreamAsync(emitter, agent, chatRequest, userId);
...@@ -140,6 +140,11 @@ public class AgentChatService { ...@@ -140,6 +140,11 @@ public class AgentChatService {
@Async @Async
private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) { private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
try { try {
// 首先检查连接状态
if (emitter != null && userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过异步处理");
return;
}
processChatRequest(emitter, agent, chatRequest, userId); processChatRequest(emitter, agent, chatRequest, userId);
} catch (Exception e) { } catch (Exception e) {
log.error("处理聊天请求时发生异常", e); log.error("处理聊天请求时发生异常", e);
...@@ -167,11 +172,25 @@ public class AgentChatService { ...@@ -167,11 +172,25 @@ public class AgentChatService {
return; return;
} }
// 获取处理器前检查连接状态
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过获取处理器");
return;
}
// 获取处理器 // 获取处理器
AgentProcessor processor = agentProcessorFactory.getProcessor(agent); AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) { if (processor == null) {
log.error("无法获取Agent处理器,Agent: {}", agent.getId()); log.error("无法获取Agent处理器,Agent: {}", agent.getId());
if (!userSseService.isEmitterCompleted(emitter)) {
errorHandlerService.handleChatError(emitter, "无法获取Agent处理器"); errorHandlerService.handleChatError(emitter, "无法获取Agent处理器");
}
return;
}
// 处理请求前检查连接状态
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过处理请求");
return; return;
} }
...@@ -181,8 +200,12 @@ public class AgentChatService { ...@@ -181,8 +200,12 @@ public class AgentChatService {
// 创建新的SseTokenEmitter实例 // 创建新的SseTokenEmitter实例
SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent, request, userId, this::handleCompletion); SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent, request, userId, this::handleCompletion);
// 处理流式请求 // 处理流式请求前再次检查连接状态
if (!userSseService.isEmitterCompleted(emitter)) {
processor.processStreamRequest(request, agent, userId, tokenEmitter); processor.processStreamRequest(request, agent, userId, tokenEmitter);
} else {
log.debug("SSE连接已关闭,跳过流式处理");
}
} catch (Exception e) { } catch (Exception e) {
log.error("处理聊天请求时发生异常", e); log.error("处理聊天请求时发生异常", e);
errorHandlerService.handleChatError(emitter, "处理请求时发生错误", e, null); errorHandlerService.handleChatError(emitter, "处理请求时发生错误", e, null);
...@@ -202,14 +225,8 @@ public class AgentChatService { ...@@ -202,14 +225,8 @@ public class AgentChatService {
String fullContent) { String fullContent) {
log.info("Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0); log.info("Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
// 保存对话记录 // 保存对话记录 - 安全操作,不抛出异常
try {
saveDialogue(agent, request, userId, fullContent); saveDialogue(agent, request, userId, fullContent);
log.info("对话记录保存成功");
} catch (Exception e) {
log.error("保存对话记录失败", e);
// 记录异常但不中断流程
}
} }
/** /**
...@@ -233,9 +250,10 @@ public class AgentChatService { ...@@ -233,9 +250,10 @@ public class AgentChatService {
// 保存对话记录 // 保存对话记录
agentService.saveDialogue(dialogue); agentService.saveDialogue(dialogue);
log.info("对话记录保存成功");
} catch (Exception e) { } catch (Exception e) {
log.error("保存对话记录失败", e); log.error("保存对话记录失败", e);
throw new RuntimeException("保存对话记录失败", e); // 仅记录日志,不抛出异常
} }
} }
......
...@@ -7,6 +7,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; ...@@ -7,6 +7,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent; import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest; import pangea.hiagent.web.dto.AgentRequest;
import java.io.IOException;
/** /**
* SSE Token发射器 * SSE Token发射器
* 专注于将token转换为SSE事件并发送 * 专注于将token转换为SSE事件并发送
...@@ -87,8 +89,26 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion { ...@@ -87,8 +89,26 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
} else { } else {
log.debug("SSE emitter已无效,跳过发送token"); log.debug("SSE emitter已无效,跳过发送token");
} }
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送token,SSE emitter已关闭: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送token,IO异常: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} catch (Exception e) { } catch (Exception e) {
log.error("发送token失败", e); log.error("发送token失败", e);
// 对于其他异常,也将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} }
} }
...@@ -105,6 +125,12 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion { ...@@ -105,6 +125,12 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
if (completionCallback != null) { if (completionCallback != null) {
completionCallback.onComplete(emitter, agent, request, userId, fullContent); completionCallback.onComplete(emitter, agent, request, userId, fullContent);
} }
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送完成信号,SSE emitter已关闭: {}", e.getMessage());
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送完成信号,IO异常: {}", e.getMessage());
} catch (Exception e) { } catch (Exception e) {
log.error("处理完成事件失败", e); log.error("处理完成事件失败", e);
} finally { } finally {
......
package pangea.hiagent.agent.service; package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.ToolEvent;
import pangea.hiagent.web.dto.WorkPanelEvent; import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService; import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.workpanel.data.TokenEventDataBuilder; import pangea.hiagent.workpanel.data.TokenEventDataBuilder;
import pangea.hiagent.workpanel.data.ErrorEventDataBuilder; import pangea.hiagent.workpanel.data.ErrorEventDataBuilder;
import java.util.Set;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
...@@ -37,6 +36,9 @@ public class UserSseService { ...@@ -37,6 +36,9 @@ public class UserSseService {
// 存储SSE Emitter到用户ID的反向映射关系(用于快速查找) // 存储SSE Emitter到用户ID的反向映射关系(用于快速查找)
private final ConcurrentMap<SseEmitter, String> emitterUsers = new ConcurrentHashMap<>(); private final ConcurrentMap<SseEmitter, String> emitterUsers = new ConcurrentHashMap<>();
// 存储已完成的emitter集合,用于快速检查状态
private final Set<SseEmitter> completedEmitters = ConcurrentHashMap.newKeySet();
// 心跳任务执行器 - 使用共享线程池以提高资源利用率 // 心跳任务执行器 - 使用共享线程池以提高资源利用率
private final ScheduledExecutorService heartbeatExecutor; private final ScheduledExecutorService heartbeatExecutor;
...@@ -91,7 +93,6 @@ public class UserSseService { ...@@ -91,7 +93,6 @@ public class UserSseService {
*/ */
public SseEmitter createEmitter() { public SseEmitter createEmitter() {
SseEmitter emitter = new SseEmitter(SSE_TIMEOUT); SseEmitter emitter = new SseEmitter(SSE_TIMEOUT);
registerCallbacks(emitter);
emitters.add(emitter); emitters.add(emitter);
// 启动心跳机制,确保新创建的连接有心跳 // 启动心跳机制,确保新创建的连接有心跳
startHeartbeat(emitter, new AtomicBoolean(false)); startHeartbeat(emitter, new AtomicBoolean(false));
...@@ -161,6 +162,9 @@ public class UserSseService { ...@@ -161,6 +162,9 @@ public class UserSseService {
} }
try { try {
// 添加到已完成集合
completedEmitters.add(emitter);
// 检查emitter是否已经完成,避免重复关闭 // 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) { if (!isEmitterCompleted(emitter)) {
try { try {
...@@ -216,10 +220,13 @@ public class UserSseService { ...@@ -216,10 +220,13 @@ public class UserSseService {
* @param emitter SSE发射器 * @param emitter SSE发射器
*/ */
public void removeEmitter(SseEmitter emitter) { public void removeEmitter(SseEmitter emitter) {
if (emitter != null && emitters.remove(emitter)) { if (emitter != null) {
completedEmitters.add(emitter); // 添加到已完成集合
if (emitters.remove(emitter)) {
log.debug("已移除SSE Emitter,剩余连接数: {}", emitters.size()); log.debug("已移除SSE Emitter,剩余连接数: {}", emitters.size());
} }
} }
}
/** /**
* 启动心跳机制 * 启动心跳机制
...@@ -307,30 +314,6 @@ public class UserSseService { ...@@ -307,30 +314,6 @@ public class UserSseService {
log.error("心跳任务执行异常: {}", e.getMessage(), e); log.error("心跳任务执行异常: {}", e.getMessage(), e);
} }
}, 20, 20, TimeUnit.SECONDS); // 每20秒发送一次心跳,确保前端60秒超时前至少收到2次心跳 }, 20, 20, TimeUnit.SECONDS); // 每20秒发送一次心跳,确保前端60秒超时前至少收到2次心跳
// 注册回调,在连接完成时取消心跳任务
emitter.onCompletion(() -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
heartbeatTaskRef[0].cancel(true);
log.debug("SSE连接完成,心跳任务已取消");
}
});
// 注册回调,在连接超时时取消心跳任务
emitter.onTimeout(() -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
heartbeatTaskRef[0].cancel(true);
log.debug("SSE连接超时,心跳任务已取消");
}
});
// 注册回调,在连接错误时取消心跳任务
emitter.onError(throwable -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
heartbeatTaskRef[0].cancel(true);
log.debug("SSE连接错误,心跳任务已取消");
}
});
} }
/** /**
...@@ -399,11 +382,14 @@ public class UserSseService { ...@@ -399,11 +382,14 @@ public class UserSseService {
} }
emitter.complete(); emitter.complete();
completedEmitters.add(emitter); // 添加到已完成集合
log.debug("Emitter已成功关闭"); log.debug("Emitter已成功关闭");
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
log.debug("Emitter已经关闭: {}", e.getMessage()); log.debug("Emitter已经关闭: {}", e.getMessage());
completedEmitters.add(emitter); // 添加到已完成集合
} catch (Exception e) { } catch (Exception e) {
log.warn("完成Emitter时发生异常: {}", e.getMessage()); log.warn("完成Emitter时发生异常: {}", e.getMessage());
completedEmitters.add(emitter); // 添加到已完成集合
} }
} }
} }
...@@ -450,12 +436,30 @@ public class UserSseService { ...@@ -450,12 +436,30 @@ public class UserSseService {
} }
// 检查是否已经完成,而不发送任何事件 // 检查是否已经完成,而不发送任何事件
return !isEmitterCompleted(emitter); if (isEmitterCompleted(emitter)) {
return false;
}
// 检查emitter是否在活动列表中
if (!emitters.contains(emitter)) {
// 如果不在活动列表中,将其标记为已完成
completedEmitters.add(emitter);
return false;
}
// 检查是否有用户ID关联
if (emitterUsers.get(emitter) == null) {
// 如果没有用户ID关联,可能是一个孤立的连接
log.debug("SSE emitter没有关联的用户ID,可能是孤立连接");
// 不立即标记为已完成,因为可能是新创建的连接
}
return true;
} }
/** /**
* 检查SSE Emitter是否已经完成 * 检查SSE Emitter是否已经完成
* 使用更安全的方式检查完成状态,不发送实际事件 * 使用高效的方式检查完成状态,优先检查本地集合
* *
* @param emitter 要检查的emitter * @param emitter 要检查的emitter
* @return 如果已完成返回true,否则返回false * @return 如果已完成返回true,否则返回false
...@@ -465,18 +469,20 @@ public class UserSseService { ...@@ -465,18 +469,20 @@ public class UserSseService {
return true; // 认为null emitter是已完成的 return true; // 认为null emitter是已完成的
} }
// 通过尝试发送空事件来检查emitter状态 // 首先检查本地集合,避免网络操作
// 这是一种更可靠的方式,不会依赖于内部实现细节 if (completedEmitters.contains(emitter)) {
try {
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException ex) {
// IllegalStateException通常表示连接已关闭或完成
return true; return true;
} catch (Exception ex) { }
// 其他异常通常也表示连接已不可用
// 检查emitter是否在活动列表中
if (!emitters.contains(emitter)) {
completedEmitters.add(emitter); // 添加到已完成集合
return true; return true;
} }
// 尝试使用安全的方式检查emitter状态,避免发送实际事件
// 注意:这个方法不会捕获所有连接问题,但能有效避免UT005023异常
return false;
} }
/** /**
...@@ -504,11 +510,21 @@ public class UserSseService { ...@@ -504,11 +510,21 @@ public class UserSseService {
emitter.send(SseEmitter.event().name(eventName).data(data)); emitter.send(SseEmitter.event().name(eventName).data(data));
return true; return true;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况 // 处理 emitter 已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送{}事件,emitter已关闭: {}", eventName, e.getMessage()); log.debug("无法发送{}事件,emitter已关闭: {}", eventName, e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false;
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送{}事件,IO异常: {}", eventName, e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false; return false;
} catch (Exception e) { } catch (Exception e) {
log.error("发送{}事件失败: {}", eventName, e.getMessage(), e); log.error("发送{}事件失败: {}", eventName, e.getMessage(), e);
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false; return false;
} }
} }
......
...@@ -137,6 +137,11 @@ public class SecurityConfig { ...@@ -137,6 +137,11 @@ public class SecurityConfig {
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream"); boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
if (isStreamEndpoint) { if (isStreamEndpoint) {
// 再次检查响应是否已经提交
if (response.isCommitted()) {
log.warn("SSE端点响应已提交,无法发送认证错误");
return;
}
// 对于SSE端点,发送SSE格式的错误事件 // 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8"); response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8"); response.setCharacterEncoding("UTF-8");
...@@ -176,6 +181,11 @@ public class SecurityConfig { ...@@ -176,6 +181,11 @@ public class SecurityConfig {
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream"); boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
if (isStreamEndpoint) { if (isStreamEndpoint) {
// 再次检查响应是否已经提交
if (response.isCommitted()) {
log.warn("SSE端点响应已提交,无法发送访问拒绝错误");
return;
}
// 对于SSE端点,发送SSE格式的错误事件 // 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8"); response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8"); response.setCharacterEncoding("UTF-8");
......
...@@ -208,51 +208,49 @@ public class GlobalExceptionHandler { ...@@ -208,51 +208,49 @@ public class GlobalExceptionHandler {
/** /**
* 处理授权拒绝异常 * 处理授权拒绝异常
*/ */
@ExceptionHandler(AuthorizationDeniedException.class) /**
public ResponseEntity<ApiResponse<Void>> handleAuthorizationDeniedException( * 获取当前响应对象
AuthorizationDeniedException e, HttpServletRequest request) { */
log.warn("访问被拒绝: {} - URL: {}", e.getMessage(), request.getRequestURL()); private jakarta.servlet.http.HttpServletResponse getCurrentResponse() {
// 检查是否为SSE端点的异常
String requestUri = request.getRequestURI();
boolean isSseEndpoint = requestUri.contains("/api/v1/agent/chat-stream") || requestUri.contains("/api/v1/agent/timeline-events");
// 检查响应是否已经提交
jakarta.servlet.http.HttpServletResponse httpResponse = null;
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) { if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object requestAttributes = org.springframework.web.context.request.RequestContextHolder Object requestAttributes = org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes(); .getRequestAttributes();
if (requestAttributes instanceof org.springframework.web.context.request.ServletRequestAttributes) { if (requestAttributes instanceof org.springframework.web.context.request.ServletRequestAttributes) {
org.springframework.web.context.request.ServletRequestAttributes servletRequestAttributes = org.springframework.web.context.request.ServletRequestAttributes servletRequestAttributes =
(org.springframework.web.context.request.ServletRequestAttributes) requestAttributes; (org.springframework.web.context.request.ServletRequestAttributes) requestAttributes;
if (servletRequestAttributes.getResponse() instanceof jakarta.servlet.http.HttpServletResponse) { return servletRequestAttributes.getResponse();
httpResponse = (jakarta.servlet.http.HttpServletResponse) servletRequestAttributes.getResponse(); }
} }
return null;
} }
/**
* 检查响应是否已提交
*/
private boolean isResponseCommitted() {
jakarta.servlet.http.HttpServletResponse httpResponse = getCurrentResponse();
return httpResponse != null && httpResponse.isCommitted();
}
@ExceptionHandler(AuthorizationDeniedException.class)
public ResponseEntity<ApiResponse<Void>> handleAuthorizationDeniedException(
AuthorizationDeniedException e, HttpServletRequest request) {
log.warn("访问被拒绝: {} - URL: {}", e.getMessage(), request.getRequestURL());
// 检查是否为SSE端点的异常
String requestUri = request.getRequestURI();
boolean isSseEndpoint = requestUri.contains("/api/v1/agent/chat-stream") || requestUri.contains("/api/v1/agent/timeline-events");
// 检查响应是否已提交 // 检查响应是否已提交
if (httpResponse != null && httpResponse.isCommitted()) { if (isResponseCommitted()) {
log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL()); log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL());
// 如果是SSE端点且响应已提交,返回空响应避免二次异常
return ResponseEntity.ok().build(); return ResponseEntity.ok().build();
} }
}
// 如果是SSE端点,但响应未提交,发送SSE格式的错误响应 // 如果是SSE端点,但响应未提交,发送SSE格式的错误响应
if (isSseEndpoint) { if (isSseEndpoint) {
try { try {
jakarta.servlet.http.HttpServletResponse sseResponse = null; jakarta.servlet.http.HttpServletResponse sseResponse = getCurrentResponse();
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object requestAttributes = org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes();
if (requestAttributes instanceof org.springframework.web.context.request.ServletRequestAttributes) {
org.springframework.web.context.request.ServletRequestAttributes servletRequestAttributes =
(org.springframework.web.context.request.ServletRequestAttributes) requestAttributes;
if (servletRequestAttributes.getResponse() instanceof jakarta.servlet.http.HttpServletResponse) {
sseResponse = (jakarta.servlet.http.HttpServletResponse) servletRequestAttributes.getResponse();
}
}
}
if (sseResponse != null) { if (sseResponse != null) {
sseResponse.setStatus(HttpServletResponse.SC_FORBIDDEN); sseResponse.setStatus(HttpServletResponse.SC_FORBIDDEN);
......
...@@ -35,6 +35,12 @@ public class SseAuthorizationFilter extends OncePerRequestFilter { ...@@ -35,6 +35,12 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
* 发送SSE格式的错误响应 * 发送SSE格式的错误响应
*/ */
private void sendSseError(HttpServletResponse response, int status, String errorMessage) { private void sendSseError(HttpServletResponse response, int status, String errorMessage) {
// 检查响应是否已经提交,避免后续错误处理异常
if (response.isCommitted()) {
log.warn("响应已提交,无法发送SSE错误响应: {} - {}", status, errorMessage);
return;
}
try { try {
response.setStatus(status); response.setStatus(status);
response.setContentType("text/event-stream;charset=UTF-8"); response.setContentType("text/event-stream;charset=UTF-8");
...@@ -66,7 +72,7 @@ public class SseAuthorizationFilter extends OncePerRequestFilter { ...@@ -66,7 +72,7 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
// 检查响应是否已经提交,避免后续错误处理异常 // 检查响应是否已经提交,避免后续错误处理异常
if (response.isCommitted()) { if (response.isCommitted()) {
log.warn("响应已提交,无法处理SSE端点授权检查"); log.warn("响应已提交,无法处理SSE端点授权检查,终止过滤器链");
return; return;
} }
...@@ -108,6 +114,12 @@ public class SseAuthorizationFilter extends OncePerRequestFilter { ...@@ -108,6 +114,12 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
} }
} }
// 再次检查响应是否已经提交,避免后续过滤器尝试修改已提交的响应
if (response.isCommitted()) {
log.warn("响应已提交,跳过继续执行过滤器链");
return;
}
// 继续执行过滤器链 // 继续执行过滤器链
filterChain.doFilter(request, response); filterChain.doFilter(request, response);
return; return;
......
...@@ -241,32 +241,18 @@ hiagent: ...@@ -241,32 +241,18 @@ hiagent:
5. Complex Queries: Use multiple tools for complex tasks; avoid single-tool reliance. 5. Complex Queries: Use multiple tools for complex tasks; avoid single-tool reliance.
=== REACT PROCESS === === REACT PROCESS ===
Cyclic process for every query, execute in order until complete: Cyclic process for every query, execute in order until complete:
- Step 1 - THOUGHT: Analyze the query, break into sub-tasks, select relevant tools with alternatives, define execution sequence.
Step 1 - THOUGHT: Analyze the query, break into sub-tasks, select relevant tools with alternatives for fault tolerance, define execution sequence. - Step 2 - ACTION: EXECUTE TOOLS DIRECTLY, NEVER JUST DESCRIBE THEM. Call specific tools in planned order, execute multiple if needed, use alternatives if a tool fails.
- Step 3 - OBSERVATION: Analyze all tool results, extract key insights, check completeness. If results are complete → Proceed to Final Answer; if incomplete → Return to Thought.
Step 2 - ACTION: EXECUTE TOOLS DIRECTLY, NEVER JUST DESCRIBE THEM - Step 4 - FINAL ANSWER: Synthesize tool results into a clear, complete answer. Explain tool synergy if helpful. Keep it conversational.
- Call specific tools in planned order, not just list them
- Execute multiple tools consecutively if needed
- If a tool fails, immediately use an alternative
Step 3 - OBSERVATION: Analyze all tool results, extract key insights, check completeness against user needs, identify gaps.
Step 4 - ITERATION DECISION:
- ✅ TERMINATE: If results are complete → Proceed to Step 5
- ♻️ RESTART: If results are incomplete → Return to Step 1
Step 5 - FINAL ANSWER: Synthesize tool results into a clear, complete answer. Explain tool synergy if helpful. Keep it conversational.
=== RESPONSE FORMAT === === RESPONSE FORMAT ===
Strictly follow this structure: Strictly follow this structure:
1. Thought: Problem analysis, tool selection, execution sequence 1. Thought: Problem analysis, tool selection, execution sequence
2. Action: Actual tool calls (not descriptions) 2. Action: Actual tool calls (not descriptions)
3. Observation: Key results summary 3. Observation: Key results summary, decision (terminate/restart)
4. Iteration_Decision: Terminate or restart 4. Final_Answer: Result-based answer
5. Final_Answer: Result-based answer
=== HARD RULES === === HARD RULES ===
- Execute tools first, never just describe them - Execute tools first, never just describe them
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment