Commit 5f364ac1 authored by ligaowei's avatar ligaowei

refactor(react): 优化DefaultReactExecutor系统提示文本为中文详细说明

- 将系统默认提示文本改为中文,详细描述ReAct框架核心原则与执行流程
- 明确严格串行调用工具链及ReAct迭代闭环的核心规则
- 详细列举工具协同策略和标准化回复格式以提升工具协作效率
- 增加ReAct循环终止及容错机制说明,确保逻辑严谨且易于追踪

fix(service): 优化流式请求完成处理与SSE连接关闭逻辑

- 在CompletionHandlerService中对Emitter完成状态进行判断,避免重复关闭
- StreamRequestService中增加CompletionHandlerService调用保护,防止NoClassDefFoundError异常
- 提供默认发送完成事件和安全完成Emitter的辅助方法,增强稳定性
- UserSseService新增isEmitterCompleted方法,避免冗余事件发送和异常
- 关闭Emitter时增加异常捕获与日志记录,保障连接正确释放

fix(security): 加强SSE端点的权限校验及错误响应处理

- SseAuthorizationFilter中引入AgentService,验证Agent存在与用户访问权限
- 针对未授权、Agent不存在及访问拒绝,发送符合SSE格式的错误事件提示
- 优化全局异常处理GlobalExceptionHandler,对SSE端点访问拒绝情况返回SSE错误数据,避免二次异常
- 细化AccessDeniedException处理逻辑,对响应提交状态进行检查,保证安全输出

feat(tool/hisense): 增强SSO登录工具URL跳转等待逻辑和MFA流程

- 新增等待多个可能成功跳转URL的方法,提升登录成功跳转的容错性
- 增加针对特定URL更灵活的等待匹配方法
- 将等待URL超时延长至60秒,MFA等待时间延长至45秒,提高稳定性
- MFA验证流程改为轮询方式检测页面跳转,增强对超时和异常场景的兼容性
- 登录后结果判断更全面,支持多种登录成功页面URL和状态处理,完善登录状态更新机制

refactor(controller): 移除AgentChatController内重复权限检查

- 注释说明权限校验已由SseAuthorizationFilter完成
- 避免在流式响应开始后抛出异常导致响应状态异常
- 保证流式对话请求的权限安全与流程顺畅
parent 198ca2f1
......@@ -112,8 +112,13 @@ public class CompletionHandlerService {
// 最后才关闭emitter,确保所有操作都完成后再提交响应
try {
unifiedSseService.completeEmitter(emitter, isCompleted);
log.debug("SSE Emitter已关闭");
// 检查emitter是否已经完成,避免重复关闭
if (!unifiedSseService.isEmitterCompleted(emitter)) {
unifiedSseService.completeEmitter(emitter, isCompleted);
log.debug("SSE Emitter已关闭");
} else {
log.debug("SSE Emitter已完成,跳过关闭操作");
}
} catch (Exception e) {
log.error("关闭Emitter时发生错误", e);
}
......
......@@ -130,26 +130,60 @@ public class StreamRequestService {
try {
// 使用CompletionHandlerService处理完成回调
if (completionHandlerService != null) {
completionHandlerService.handleCompletion(emitter, processor, agent, request, userId, fullContent, isCompleted);
// 添加对CompletionHandlerService调用的额外保护,防止NoClassDefFoundError
try {
completionHandlerService.handleCompletion(emitter, processor, agent, request, userId, fullContent, isCompleted);
} catch (NoClassDefFoundError e) {
log.error("CompletionHandlerService依赖类未找到,使用默认处理逻辑: {}", e.getMessage());
// 如果类未找到,使用默认逻辑完成emitter
sendCompletionAndCompleteEmitter();
}
} else {
// 如果completionHandlerService不可用,使用默认处理逻辑
try {
// 发送完成事件
emitter.send("[DONE]");
// 完成 emitter
emitter.complete();
} catch (Exception e) {
log.error("处理完成事件失败", e);
} }
sendCompletionAndCompleteEmitter();
}
} catch (Exception e) {
log.error("处理完成事件失败", e);
// 确保即使出现异常也完成emitter
try {
sendCompletionAndCompleteEmitter();
} catch (Exception ex) {
log.error("在异常处理路径中完成emitter也失败", ex);
// 最终保障:直接完成emitter,避免连接未关闭
try {
emitter.complete();
} catch (Exception finalEx) {
log.error("最终尝试完成emitter也失败", finalEx);
}
}
}
}
/**
* 安全地发送完成事件并完成emitter
* 避免重复完成和异常情况
* 注意:此方法不调用onComplete,避免循环调用
*/
private void sendCompletionAndCompleteEmitter() {
try {
// 发送完成事件
emitter.send("[DONE]");
// 完成 emitter - 直接完成,不再通过CompletionHandlerService重复调用onComplete
try {
emitter.complete();
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
} catch (Exception e) {
log.error("发送完成事件时发生错误", e);
// 尝试直接完成emitter
try {
emitter.complete();
} catch (Exception ex) {
log.error("完成emitter时也发生错误", ex);
}
}
} }
}
\ No newline at end of file
......@@ -355,7 +355,16 @@ public class UserSseService {
}
if (emitter != null) {
try {
// 检查emitter是否已经完成,避免重复关闭
if (isEmitterCompleted(emitter)) {
log.debug("Emitter已经完成,跳过关闭操作");
return;
}
emitter.complete();
log.debug("Emitter已成功关闭");
} catch (IllegalStateException e) {
log.debug("Emitter已经关闭: {}", e.getMessage());
} catch (Exception e) {
log.warn("完成Emitter时发生异常: {}", e.getMessage());
}
......@@ -377,6 +386,7 @@ public class UserSseService {
// 检查逻辑,仅通过尝试发送ping事件来验证连接状态
try {
// 尝试发送一个空事件来检查连接状态
// 注意:这个方法会实际发送一个事件到客户端,这可能不是理想的方式
emitter.send(SseEmitter.event().name("ping").data("").build());
return true;
} catch (Exception ex) {
......@@ -385,6 +395,35 @@ public class UserSseService {
}
}
/**
* 检查SSE Emitter是否已经完成
* 使用更安全的方式检查完成状态,不发送实际事件
*
* @param emitter 要检查的emitter
* @return 如果已完成返回true,否则返回false
*/
public boolean isEmitterCompleted(SseEmitter emitter) {
if (emitter == null) {
return true; // 认为null emitter是已完成的
}
try {
// 尝试发送一个空事件,如果抛出IllegalStateException则表示已关闭
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException e) {
// 检查错误消息是否包含完成相关的文本
String message = e.getMessage();
if (message != null && (message.contains("completed") || message.contains("closed"))) {
return true;
}
return false;
} catch (Exception e) {
// 其他异常不认为是完成
return false;
}
}
/**
* 发送SSE事件
* 职责:统一发送SSE事件的基础方法
......
......@@ -14,6 +14,7 @@ import org.springframework.web.method.annotation.MethodArgumentTypeMismatchExcep
import pangea.hiagent.agent.service.ExceptionMonitoringService;
import pangea.hiagent.web.dto.ApiResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.util.stream.Collectors;
import org.springframework.security.authorization.AuthorizationDeniedException;
......@@ -212,29 +213,65 @@ public class GlobalExceptionHandler {
AuthorizationDeniedException e, HttpServletRequest request) {
log.warn("访问被拒绝: {} - URL: {}", e.getMessage(), request.getRequestURL());
// 更全面地检查响应是否已经提交
boolean responseCommitted = false;
// 检查request属性
if (request.getAttribute("jakarta.servlet.error.exception") != null) {
responseCommitted = true;
}
// 检查是否为SSE端点的异常
String requestUri = request.getRequestURI();
boolean isSseEndpoint = requestUri.contains("/api/v1/agent/chat-stream") || requestUri.contains("/api/v1/agent/timeline-events");
// 检查response是否已提交
if (request instanceof org.springframework.web.context.request.NativeWebRequest) {
Object nativeResponse = ((org.springframework.web.context.request.NativeWebRequest) request).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
if (((jakarta.servlet.http.HttpServletResponse) nativeResponse).isCommitted()) {
responseCommitted = true;
// 检查响应是否已经提交
if (request instanceof jakarta.servlet.http.HttpServletRequest) {
jakarta.servlet.http.HttpServletRequest httpRequest = (jakarta.servlet.http.HttpServletRequest) request;
// 获取当前的response对象
jakarta.servlet.http.HttpServletResponse response = null;
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object nativeResponse = ((org.springframework.web.context.request.ServletWebRequest)
org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes()).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
response = (jakarta.servlet.http.HttpServletResponse) nativeResponse;
}
}
// 检查响应是否已提交
if (response != null && response.isCommitted()) {
log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL());
// 如果是SSE端点且响应已提交,返回空响应避免二次异常
return ResponseEntity.ok().build();
}
}
// 如果响应已提交,记录日志并返回空响应以避免二次异常
if (responseCommitted) {
log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL());
// 返回空响应而不是build(),避免潜在的响应提交冲突
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
// 如果是SSE端点,但响应未提交,发送SSE格式的错误响应
if (isSseEndpoint) {
try {
jakarta.servlet.http.HttpServletResponse response = null;
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object nativeResponse = ((org.springframework.web.context.request.ServletWebRequest)
org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes()).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
response = (jakarta.servlet.http.HttpServletResponse) nativeResponse;
}
}
if (response != null) {
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"访问被拒绝,无权限访问该资源\", \"code\": 403, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE 访问拒绝错误响应");
}
return ResponseEntity.ok().build();
} catch (Exception ex) {
log.error("发送SSE访问拒绝错误响应失败", ex);
return ResponseEntity.ok().build();
}
}
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......
......@@ -12,6 +12,9 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.common.utils.UserUtils;
import java.io.IOException;
import java.util.Collections;
......@@ -30,11 +33,76 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
private static final String TIMELINE_ENDPOINT = "/api/v1/agent/timeline-events";
private final JwtUtil jwtUtil;
private final AgentService agentService;
public SseAuthorizationFilter(JwtUtil jwtUtil) {
public SseAuthorizationFilter(JwtUtil jwtUtil, AgentService agentService) {
this.jwtUtil = jwtUtil;
this.agentService = agentService;
}
/**
* 发送SSE格式的未授权错误响应
*/
private void sendSseUnauthorizedError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"未授权访问,请先登录\", \"code\": 401, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE未授权错误响应");
} catch (IOException e) {
log.error("发送SSE未授权错误响应失败", e);
}
}
/**
* 发送SSE格式的Agent不存在错误响应
*/
private void sendSseAgentNotFoundError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"Agent不存在\", \"code\": 404, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE Agent不存在错误响应");
} catch (IOException e) {
log.error("发送SSE Agent不存在错误响应失败", e);
}
}
/**
* 发送SSE格式的访问拒绝错误响应
*/
private void sendSseAccessDeniedError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"访问被拒绝,无权限访问该Agent\", \"code\": 403, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE 访问拒绝错误响应");
} catch (IOException e) {
log.error("发送SSE 访问拒绝错误响应失败", e);
}
}
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
......@@ -63,6 +131,40 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
new UsernamePasswordAuthenticationToken(userId, null, authorities);
SecurityContextHolder.getContext().setAuthentication(authentication);
log.debug("SSE端点JWT验证成功,用户: {}", userId);
// 如果是chat-stream端点,需要额外验证agent权限
if (isStreamEndpoint) {
// 从请求参数中获取agentId
String agentId = request.getParameter("agentId");
if (agentId != null) {
try {
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("SSE端点访问失败:Agent不存在 - AgentId: {}", agentId);
sendSseAgentNotFoundError(response);
return;
}
// 验证用户是否有权限访问该agent
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
log.warn("SSE端点访问失败:用户 {} 无权限访问Agent: {}", userId, agentId);
sendSseAccessDeniedError(response);
return;
}
log.debug("SSE端点Agent权限验证成功,用户: {}, Agent: {}", userId, agentId);
} catch (Exception e) {
log.error("SSE端点Agent权限验证异常: {}", e.getMessage());
sendSseAccessDeniedError(response);
return;
}
} else {
log.warn("SSE端点请求缺少agentId参数");
sendSseAgentNotFoundError(response);
return;
}
}
// 继续执行过滤器链
filterChain.doFilter(request, response);
return;
......@@ -83,27 +185,6 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
filterChain.doFilter(request, response);
}
/**
* 发送SSE格式的未授权错误响应
*/
private void sendSseUnauthorizedError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"未授权访问,请先登录\", \"code\": 401, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE未授权错误响应");
} catch (IOException e) {
log.error("发送SSE未授权错误响应失败", e);
}
}
/**
* 从请求头或参数中提取Token
*/
......
......@@ -46,27 +46,18 @@ public class AgentChatController {
HttpServletResponse response) {
log.info("接收到流式对话请求,AgentId: {}", agentId);
// 在主线程中完成权限检查,避免在异步线程中触发Spring Security异常
String userId = UserUtils.getCurrentUserId();
if (userId == null) {
log.warn("用户未认证,无法执行Agent对话");
throw new org.springframework.security.access.AccessDeniedException("用户未认证");
}
// 注意:权限检查已由 SseAuthorizationFilter 在更早的阶段处理
// 此时响应尚未开始流式传输,确保在流式开始前完成所有权限验证
// 这样可以避免在流式传输过程中突然抛出异常导致响应已提交的问题
// 验证Agent存在性和权限
// 仅验证Agent存在性,权限检查由过滤器处理
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("Agent不存在: {}", agentId);
throw new IllegalArgumentException("Agent不存在");
}
// 检查权限
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
log.warn("用户 {} 无权限访问Agent: {}", userId, agentId);
throw new org.springframework.security.access.AccessDeniedException("无权限访问该Agent");
}
// 权限验证通过,调用异步处理
// 调用异步处理
return agentChatService.handleChatStream(agentId, chatRequest, response);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment