Commit c9493204 authored by ligaowei's avatar ligaowei

修复AgentChatService中的NullPointerException问题

parent e87c7566
......@@ -58,6 +58,9 @@ buildNumber.properties
*.ipr
out/
# trae
.trae/
# NetBeans
nbproject/private/
build/
......
现有架构分析
后端架构
核心组件
WorkPanelDataCollector - 实现IWorkPanelDataCollector接口,负责收集和存储事件
DefaultEventManager - 负责创建各种类型的事件对象
SseEventSender - 负责通过SSE发送事件到前端
EventDataManager - 负责构建事件数据,用于发送到前端
EventDeduplicationService - 负责事件去重
WorkPanelEventSubscriber - 负责订阅工作面板事件
WorkPanelService - 提供工作面板状态查询服务
数据流向
Agent执行过程中产生事件
事件被WorkPanelDataCollector收集和存储
通过订阅机制,事件被推送到SseEventSender
SseEventSender使用EventDataManager构建事件数据
通过SSE连接将事件数据发送到前端
主要问题
职责不清:
WorkPanelDataCollector既负责收集又负责订阅管理
DefaultEventManager和WorkPanelDataCollector之间职责重叠
SseEventSender和EventDataManager功能分离但紧密耦合
冗余代码:
多个组件都有相似的事件处理逻辑
事件去重逻辑分散在多个地方
重复的对象转换和序列化逻辑
过度工程化:
太多的小型服务类,增加了复杂性
过度使用依赖注入和组件拆分
前端架构
核心组件
TimelineService - 协调Timeline相关的各种服务
TimelineSseManager - 管理SSE连接和事件处理
SseConnectionManager - 管理SSE连接生命周期
SseEventListenerService - 解析SSE消息
SseEventProcessorService - 处理SSE事件
EventProcessingOptimizer - 优化事件处理
主要问题
服务层次过多:
简单的SSE事件处理被拆分成太多小的服务类
服务之间的依赖关系复杂
缺乏清晰的数据流:
事件处理流程在多个服务间跳跃,难以追踪
已更新待办
架构设计方案
基于对现有代码的分析,我提出以下方案:
后端架构
设计目标
减少不必要的抽象层和服务类
明确各组件职责边界
事件处理流程
降低组件间的耦合度
后的架构
EventService - 统一的事件服务
合并DefaultEventManager和EventDataManager的功能
负责创建、构建和发送事件
集成事件去重逻辑
WorkPanelService - 工作面板核心服务
合并WorkPanelDataCollector的部分功能
负责事件的收集、存储和订阅管理
直接依赖EventService进行事件发送
移除冗余组件
移除WorkPanelEventSubscriber(功能合并到WorkPanelService)
移除SseEventSender(功能合并到EventService)
移除EventDataManager(功能合并到EventService)
数据流向
Agent执行过程中产生事件
WorkPanelService收集和存储事件
WorkPanelService通过EventService发送事件到前端
EventService负责构建事件数据并通过SSE发送
前端架构
设计目标
服务层次结构
明确数据流路径
减少不必要的服务类
后的架构
TimelineService - 时间轴核心服务
合并TimelineSseManager和SseConnectionManager的功能
负责SSE连接管理和事件处理
直接处理事件解析和优化
移除冗余服务
移除TimelineSseManager(功能合并到TimelineService)
移除SseEventListenerService(功能合并到TimelineService)
移除SseEventProcessorService(功能合并到TimelineService)
数据流向
建立SSE连接
TimelineService接收和解析事件
TimelineService优化和处理事件
事件传递给UI组件展示
技术实现要点
事件去重:
在EventService中集中实现事件去重逻辑
使用更高效的缓存策略
对象池:
保留MapPoolService用于对象复用
优化对象池管理策略
错误处理:
统一异常处理机制
错误日志记录
性能优化:
批量处理事件发送
优化事件数据构建过程
\ No newline at end of file
This diff is collapsed.
......@@ -201,9 +201,9 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
......@@ -322,6 +322,13 @@
<artifactId>cron-utils</artifactId>
<version>9.2.0</version>
</dependency>
<!-- OWASP Java HTML Sanitizer -->
<dependency>
<groupId>com.googlecode.owasp-java-html-sanitizer</groupId>
<artifactId>owasp-java-html-sanitizer</artifactId>
<version>20220608.1</version>
</dependency>
</dependencies>
......
package pangea.hiagent.agent.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.agent.service.AgentErrorHandler;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import java.util.function.Consumer;
/**
* Agent处理器抽象基类
* 封装所有Agent处理器的公共逻辑
* 职责:提供所有Agent处理器共享的基础功能
*/
@Slf4j
public abstract class AbstractAgentProcessor extends BaseAgentProcessor {
@Autowired
protected AgentErrorHandler agentErrorHandler;
/**
* 处理RAG响应的通用逻辑
*
* @param ragResponse RAG响应
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应
*/
protected String handleRagResponse(String ragResponse, Consumer<String> tokenConsumer) {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
}
}
return ragResponse;
}
/**
* 处理请求的通用前置逻辑
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param userId 用户ID
* @param ragService RAG服务
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应,如果有的话;否则返回null继续正常处理流程
*/
protected String handlePreProcessing(Agent agent, String userMessage, String userId, RagService ragService, Consumer<String> tokenConsumer) {
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = generateSessionId(agent, userId);
// 添加用户消息到ChatMemory
addUserMessageToMemory(sessionId, userMessage);
// 检查是否启用RAG并尝试RAG增强
String ragResponse = tryRagEnhancement(agent, userMessage, ragService);
if (ragResponse != null) {
log.info("RAG增强返回结果,直接返回");
return handleRagResponse(ragResponse, tokenConsumer);
}
return null;
}
}
\ No newline at end of file
package pangea.hiagent.agent.processor;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer;
/**
* 统一的Agent处理器接口
* 为不同类型的Agent提供统一的处理接口
*/
public interface AgentProcessor {
/**
* 同步处理Agent请求
*
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @return 处理结果
*/
String processRequest(Agent agent, AgentRequest request, String userId);
/**
* 流式处理Agent请求
*
* @param request 请求对象
* @param agent Agent对象
* @param userId 用户ID
* @param tokenConsumer token处理回调函数
*/
void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer);
/**
* 获取处理器类型
*
* @return 处理器类型字符串
*/
String getProcessorType();
}
\ No newline at end of file
package pangea.hiagent.agent.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pangea.hiagent.model.Agent;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.StringJoiner;
/**
* Agent处理器工厂类
* 根据Agent类型创建相应的处理器实例
*/
@Slf4j
@Component
public class AgentProcessorFactory {
@Autowired
private List<AgentProcessor> agentProcessors;
private final Map<String, AgentProcessor> processorCache = new HashMap<>();
/**
* 根据Agent获取对应的处理器
*
* @param agent Agent对象
* @return 对应的处理器实例
*/
public AgentProcessor getProcessor(Agent agent) {
if (agent == null) {
throw new IllegalArgumentException("Agent不能为null");
}
// 生成缓存键
String cacheKey = generateCacheKey(agent);
// 先从缓存中获取
AgentProcessor processor = processorCache.get(cacheKey);
if (processor != null) {
return processor;
}
// 根据Agent配置确定处理器类型
String processorType = determineProcessorType(agent);
// 查找对应的处理器
for (AgentProcessor agentProcessor : agentProcessors) {
if (agentProcessor.getProcessorType().equals(processorType)) {
// 放入缓存
processorCache.put(cacheKey, agentProcessor);
return agentProcessor;
}
}
// 如果没有找到匹配的处理器,抛出异常并提供更多信息以便调试
String errorMsg = buildErrorMessage(processorType);
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
/**
* 构建错误信息
*
* @param processorType 处理器类型
* @return 错误信息
*/
private String buildErrorMessage(String processorType) {
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("未找到适合Agent的处理器,类型: ").append(processorType);
errorMsg.append(", 可用处理器数量: ").append(agentProcessors != null ? agentProcessors.size() : 0);
if (agentProcessors != null && !agentProcessors.isEmpty()) {
StringJoiner joiner = new StringJoiner(", ");
for (AgentProcessor processor : agentProcessors) {
joiner.add(processor.getProcessorType());
}
errorMsg.append(", 可用处理器类型: ").append(joiner.toString());
}
return errorMsg.toString();
}
/**
* 根据Agent配置确定处理器类型
*
* @param agent Agent对象
* @return 处理器类型
*/
private String determineProcessorType(Agent agent) {
// 如果启用了ReAct模式,则使用ReAct处理器
if (Boolean.TRUE.equals(agent.getEnableReAct())) {
return "react";
}
// 默认使用普通处理器
return "normal";
}
/**
* 生成缓存键
*
* @param agent Agent对象
* @return 缓存键
*/
private String generateCacheKey(Agent agent) {
return agent.getId() + "_" + (agent.getEnableReAct() != null ? agent.getEnableReAct() : false);
}
/**
* 清除缓存
*/
public void clearCache() {
processorCache.clear();
}
}
\ No newline at end of file
package pangea.hiagent.agent.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer;
/**
* 普通Agent处理器实现类
* 职责:处理普通Agent的请求,调用LLM模型生成回复,支持RAG增强
*/
@Slf4j
@Service
public class NormalAgentProcessor extends AbstractAgentProcessor {
@Autowired(required = false)
private RagService ragService;
@Override
public String processRequest(Agent agent, AgentRequest request, String userId) {
log.info("使用普通Agent处理请求");
try {
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, request.getUserMessage(), userId, ragService, null);
if (ragResponse != null) {
return ragResponse;
}
// 获取系统提示词
String systemPrompt = getSystemPrompt(agent);
// 构建Prompt,使用Agent配置的历史记录长度
int historyLength = getHistoryLength(agent);
String sessionId = generateSessionId(agent, userId);
Prompt prompt = buildPrompt(systemPrompt, historyLength, sessionId);
// 根据Agent配置获取对应的ChatModel
ChatModel chatModel = agentService.getChatModelForAgent(agent);
if (chatModel == null) {
log.error("无法获取Agent的聊天模型");
return "[错误] 无法获取Agent的聊天模型,请检查模型配置";
}
// 使用对应模型进行调用
org.springframework.ai.chat.model.ChatResponse chatResponse = chatModel.call(prompt);
// 提取助理回复
String responseContent = "";
if (chatResponse.getResult() != null && chatResponse.getResult().getOutput() != null) {
responseContent = chatResponse.getResult().getOutput().getText();
log.info("模型调用成功,响应内容长度: {}", responseContent.length());
} else {
log.warn("模型返回空响应");
}
// 将助理回复添加到ChatMemory
addAssistantMessageToMemory(sessionId, responseContent);
return responseContent;
} catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "模型调用失败");
}
}
@Override
public void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer) {
try {
log.info("使用普通Agent处理流式请求");
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, request.getUserMessage(), userId, ragService, tokenConsumer);
if (ragResponse != null) {
return;
}
// 获取系统提示词
String systemPrompt = getSystemPrompt(agent);
// 构建Prompt,使用Agent配置的历史记录长度
int historyLength = getHistoryLength(agent);
String sessionId = generateSessionId(agent, userId);
Prompt prompt = buildPrompt(systemPrompt, historyLength, sessionId);
// 获取流式模型
StreamingChatModel streamingChatModel = getStreamingChatModel(agent);
if (streamingChatModel == null) {
log.warn("当前模型不支持流式输出");
handleModelNotSupportStream(tokenConsumer);
return;
}
// 流式处理
handleStreamingResponse(tokenConsumer, prompt, streamingChatModel, sessionId);
} catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage());
}
}
/**
* 处理模型不支持流式输出的情况
*
* @param tokenConsumer token消费者
*/
private void handleModelNotSupportStream(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 当前模型不支持流式输出";
// 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage);
}
@Override
public String getProcessorType() {
return "normal";
}
}
\ No newline at end of file
package pangea.hiagent.agent.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.react.ReactCallback;
import pangea.hiagent.agent.react.ReactExecutor;
import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.web.service.AgentService;
import java.util.List;
import java.util.function.Consumer;
/**
* ReAct Agent处理器实现类
* 职责:处理ReAct模式的Agent请求,集成ReAct执行器,支持工具调用和复杂推理
*/
@Slf4j
@Service
public class ReActAgentProcessor extends AbstractAgentProcessor {
@Autowired
private AgentService agentService;
@Autowired
private RagService ragService;
@Autowired
private ReactCallback defaultReactCallback;
@Autowired
private ReactExecutor defaultReactExecutor;
@Autowired
private AgentToolManager agentToolManager;
@Override
public String processRequest(Agent agent, AgentRequest request, String userId) {
log.info("使用ReAct Agent处理请求");
return processRequestInternal(agent, request.getUserMessage(), userId);
}
@Override
public void processStreamRequest(AgentRequest request, Agent agent, String userId, Consumer<String> tokenConsumer) {
log.info("使用ReAct Agent处理流式请求");
processRequestStreamInternal(agent, request.getUserMessage(), tokenConsumer, userId);
}
@Override
public String getProcessorType() {
return "react";
}
/**
* 处理用户请求的主方法(同步方式)
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param userId 用户ID(可选)
* @return 处理结果
*/
private String processRequestInternal(Agent agent, String userMessage, String userId) {
log.info("开始处理ReAct Agent请求,Agent ID: {}, 用户消息: {}", agent != null ? agent.getId() : "null", userMessage);
try {
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, null);
if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return ragResponse;
}
// 准备执行环境
ChatClient client = ChatClient.builder(agentService.getChatModelForAgent(agent)).build();
List<Object> tools = agentToolManager.getAvailableToolInstances(agent);
// 添加自定义回调到ReAct执行器
if (defaultReactExecutor != null && defaultReactCallback != null) {
defaultReactExecutor.addReactCallback(defaultReactCallback);
}
// 使用ReAct执行器执行流程,传递Agent对象以支持记忆功能
String finalAnswer = defaultReactExecutor.executeWithAgent(client, userMessage, tools, agent);
// 将助理回复添加到ChatMemory
String sessionId = generateSessionId(agent, userId);
addAssistantMessageToMemory(sessionId, finalAnswer);
return finalAnswer;
} catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "处理ReAct请求时发生错误");
}
}
/**
* 处理用户请求的主方法(流式方式)
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param tokenConsumer token消费者
* @param userId 用户ID(可选)
*/
private void processRequestStreamInternal(Agent agent, String userMessage, Consumer<String> tokenConsumer, String userId) {
log.info("开始流式处理ReAct Agent请求,Agent ID: {}, 用户消息: {}", agent != null ? agent.getId() : "null", userMessage);
try {
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, tokenConsumer);
if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return;
}
// 准备执行环境
ChatClient client = ChatClient.builder(agentService.getChatModelForAgent(agent)).build();
List<Object> tools = agentToolManager.getAvailableToolInstances(agent);
// 添加自定义回调到ReAct执行器
if (defaultReactExecutor != null && defaultReactCallback != null) {
defaultReactExecutor.addReactCallback(defaultReactCallback);
}
// 检查模型是否支持流式输出
if (agentService.getChatModelForAgent(agent) == null) {
log.error("无法获取Agent的聊天模型");
handleModelNotAvailable(tokenConsumer);
return;
}
// 使用ReAct执行器流式执行流程,传递Agent对象以支持记忆功能
defaultReactExecutor.executeStreamWithAgent(client, userMessage, tools, tokenConsumer, agent);
} catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage());
}
}
/**
* 处理模型不可用的情况
*
* @param tokenConsumer token消费者
*/
private void handleModelNotAvailable(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 无法获取Agent的聊天模型";
// 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage);
}
}
\ No newline at end of file
package pangea.hiagent.react;
package pangea.hiagent.agent.react;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -54,24 +54,30 @@ public class DefaultReactCallback implements ReactCallback {
break;
case ACTION:
if (reactStep.getAction() != null) {
workPanelCollector.recordToolCallStart(
// 使用recordToolCallAction记录工具调用开始,状态为pending
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
"execute",
reactStep.getAction().getParameters()
reactStep.getAction().getParameters(),
null,
"pending",
null
);
}
break;
case OBSERVATION:
if (reactStep.getObservation() != null) {
workPanelCollector.recordToolCallComplete(
"Unknown",
if (reactStep.getObservation() != null && reactStep.getAction() != null) {
// 使用recordToolCallAction记录工具调用完成,状态为success
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getParameters(),
reactStep.getObservation().getContent(),
"success"
"success",
null
);
}
break;
case FINAL_ANSWER:
workPanelCollector.recordThinking(reactStep.getContent(), "final_answer");
workPanelCollector.recordFinalAnswer(reactStep.getContent());
break;
default:
log.warn("未知的ReAct步骤类型: {}", reactStep.getStepType());
......
package pangea.hiagent.react;
package pangea.hiagent.agent.react;
/**
* ReAct回调接口,用于捕获ReAct执行的每一步
......
package pangea.hiagent.react;
package pangea.hiagent.agent.react;
import org.springframework.ai.chat.client.ChatClient;
import pangea.hiagent.model.Agent;
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.processor.AgentProcessorFactory;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.ChatRequest;
import pangea.hiagent.model.Agent;
import pangea.hiagent.sse.WorkPanelSseService;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.workpanel.event.EventService;
import jakarta.servlet.http.HttpServletResponse;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Agent 对话服务
* 职责:协调整个AI对话流程,作为流式处理的统一入口和协调者
*/
@Slf4j
@Service
public class AgentChatService {
private final ChatErrorHandler chatErrorHandler;
private final AgentValidationService agentValidationService;
private final AgentProcessorFactory agentProcessorFactory;
private final StreamRequestService streamRequestService;
private final AgentToolManager agentToolManager;
private final WorkPanelSseService workPanelSseService;
public AgentChatService(
EventService eventService,
ChatErrorHandler chatErrorHandler,
AgentValidationService agentValidationService,
AgentProcessorFactory agentProcessorFactory,
StreamRequestService streamRequestService,
AgentToolManager agentToolManager,
WorkPanelSseService workPanelSseService) {
this.chatErrorHandler = chatErrorHandler;
this.agentValidationService = agentValidationService;
this.agentProcessorFactory = agentProcessorFactory;
this.streamRequestService = streamRequestService;
this.agentToolManager = agentToolManager;
this.workPanelSseService = workPanelSseService;
}
// 专用线程池配置 - 使用静态变量确保线程池在整个应用中是单例的
private static final ExecutorService executorService = new ThreadPoolExecutor(
20,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// /**
// * 处理同步对话请求的统一入口
// * @param agent Agent对象
// * @param request 请求对象
// * @param userId 用户ID
// * @return 处理结果
// */
// public String handleChatSync(Agent agent, AgentRequest request, String userId) {
// log.info("开始处理同步对话请求,AgentId: {}, 用户消息: {}", agent.getId(), request.getUserMessage());
//
// try {
// // 获取处理器
// AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
// if (processor == null) {
// log.error("无法获取Agent处理器");
// return "[错误] 无法获取Agent处理器";
// }
//
// // 处理请求
// return processor.processRequest(agent, request, userId);
// } catch (Exception e) {
// log.error("处理普通Agent请求时发生错误", e);
// return "[错误] 处理请求时发生错误: " + e.getMessage();
// }
// }
/**
* 处理流式对话请求的统一入口
*
* @param agentId Agent ID
* @param chatRequest 对话请求
* @param response HTTP响应
* @return SSE emitter
*/
public SseEmitter handleChatStream(String agentId, ChatRequest chatRequest, HttpServletResponse response) {
log.info("开始处理流式对话请求,AgentId: {}, 用户消息: {}", agentId, chatRequest.getMessage());
// 尝试获取当前用户ID,优先从SecurityContext获取,其次从请求中解析JWT
String userId = UserUtils.getCurrentUserId();
// 如果在主线程中未能获取到用户ID,尝试在异步环境中获取
if (userId == null) {
userId = UserUtils.getCurrentUserIdInAsync();
}
if (userId == null) {
log.error("用户未认证");
SseEmitter emitter = workPanelSseService.createEmitter();
// 检查响应是否已经提交
if (!response.isCommitted()) {
chatErrorHandler.handleChatError(emitter, "用户未认证,请重新登录");
} else {
log.warn("响应已提交,无法发送用户未认证错误信息");
emitter.complete();
}
return emitter;
}
// 创建 SSE emitter
SseEmitter emitter = workPanelSseService.createEmitter();
// 将userId设为final以在Lambda表达式中使用
final String finalUserId = userId;
// 异步处理对话,避免阻塞HTTP连接
executorService.execute(() -> {
try {
processChatRequest(emitter, agentId, chatRequest, finalUserId);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
// 检查响应是否已经提交
if (emitter != null) {
chatErrorHandler.handleChatError(emitter, "处理请求时发生错误", e, null);
} else {
log.warn("响应已提交,无法发送处理请求错误信息");
}
}
});
return emitter;
}
/**
* 处理聊天请求的核心逻辑
*
* @param emitter SSE发射器
* @param agentId Agent ID
* @param chatRequest 聊天请求
* @param userId 用户ID
*/
private void processChatRequest(SseEmitter emitter, String agentId, ChatRequest chatRequest, String userId) {
try {
// 获取Agent信息并进行权限检查
Agent agent = agentValidationService.validateAgentAndPermission(agentId, userId, emitter);
if (agent == null) {
return; // 权限验证失败,直接返回
}
// 获取处理器并启动心跳保活机制
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
return; // 获取处理器失败,直接返回
}
// 启动心跳机制
workPanelSseService.startHeartbeat(emitter, new java.util.concurrent.atomic.AtomicBoolean(false));
// 转换请求对象
AgentRequest request = chatRequest.toAgentRequest(agentId, agent, agentToolManager);
// 处理流式请求
streamRequestService.handleStreamRequest(emitter, processor, request, agent, userId);
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "处理请求时发生错误", e, null);
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/**
* Agent错误处理工具类
* 统一处理Agent处理器中的错误逻辑
*/
@Slf4j
@Component
public class AgentErrorHandler {
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理401未授权错误
*
* @param e 异常对象
* @return 是否为401错误
*/
public boolean isUnauthorizedError(Throwable e) {
return errorHandlerService.isUnauthorizedError(new Exception(e));
}
/**
* 处理流式处理中的错误
*
* @param e 异常对象
* @param tokenConsumer token处理回调函数
* @param errorMessagePrefix 错误消息前缀
*/
public void handleStreamError(Throwable e, Consumer<String> tokenConsumer, String errorMessagePrefix) {
errorHandlerService.handleStreamError(e, tokenConsumer, errorMessagePrefix);
}
/**
* 处理同步处理中的错误
*
* @param e 异常对象
* @param errorMessagePrefix 错误消息前缀
* @return 错误消息
*/
public String handleSyncError(Throwable e, String errorMessagePrefix) {
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return "请配置API密钥";
} else {
String errorMessage = e.getMessage();
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
return errorMessagePrefix + ": " + errorMessage;
}
}
/**
* 发送错误信息给客户端
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void sendErrorMessage(Consumer<String> tokenConsumer, String errorMessage) {
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
}
/**
* 确保在异常情况下也调用完成回调
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void ensureCompletionCallback(Consumer<String> tokenConsumer, String errorMessage) {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.sse.WorkPanelSseService;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.processor.AgentProcessorFactory;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Agent处理器服务
* 负责处理Agent处理器的获取和心跳机制
*/
@Slf4j
@Service
public class AgentProcessorService {
@Autowired
private AgentProcessorFactory agentProcessorFactory;
@Autowired
private WorkPanelSseService workPanelSseService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 获取处理器并启动心跳保活机制
*
* @param agent Agent对象
* @param emitter SSE发射器
* @return Agent处理器,如果获取失败则返回null
*/
public AgentProcessor getProcessorAndStartHeartbeat(Agent agent, SseEmitter emitter) {
LogUtils.enterMethod("getProcessorAndStartHeartbeat", agent);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
chatErrorHandler.handleChatError(emitter, "Agent对象不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "Agent对象不能为空");
return null;
}
if (ValidationUtils.isNull(emitter, "emitter")) {
chatErrorHandler.handleChatError(emitter, "SSE发射器不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "SSE发射器不能为空");
return null;
}
try {
// 根据Agent类型选择处理器并处理请求
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
chatErrorHandler.handleChatError(emitter, "无法获取Agent处理器");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "无法获取Agent处理器");
return null;
}
log.info("使用{} Agent处理器处理对话", processor.getProcessorType());
// 启动心跳保活机制
workPanelSseService.startHeartbeat(emitter, new AtomicBoolean(false));
LogUtils.exitMethod("getProcessorAndStartHeartbeat", processor);
return processor;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "获取处理器或启动心跳时发生错误", e, null);
LogUtils.exitMethod("getProcessorAndStartHeartbeat", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import pangea.hiagent.common.utils.UserUtils;
/**
* Agent验证服务
* 负责处理Agent的参数验证和权限检查
*/
@Slf4j
@Service
public class AgentValidationService {
@Autowired
private AgentService agentService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 验证Agent存在性和用户权限
*
* @param agentId Agent ID
* @param userId 用户ID
* @param emitter SSE发射器
* @return Agent对象,如果验证失败则返回null
*/
public Agent validateAgentAndPermission(String agentId, String userId, SseEmitter emitter) {
LogUtils.enterMethod("validateAgentAndPermission", agentId, userId);
// 参数验证
if (ValidationUtils.isBlank(agentId, "agentId")) {
chatErrorHandler.handleChatError(emitter, "Agent ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "Agent ID不能为空");
return null;
}
if (ValidationUtils.isBlank(userId, "userId")) {
chatErrorHandler.handleChatError(emitter, "用户ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "用户ID不能为空");
return null;
}
try {
// 获取Agent信息
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
chatErrorHandler.handleChatError(emitter, "Agent不存在");
LogUtils.exitMethod("validateAgentAndPermission", "Agent不存在");
return null;
}
// 检查权限(可选)
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
chatErrorHandler.handleChatError(emitter, "无权限访问该Agent");
LogUtils.exitMethod("validateAgentAndPermission", "无权限访问该Agent");
return null;
}
LogUtils.exitMethod("validateAgentAndPermission", agent);
return agent;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "验证Agent和权限时发生错误", e, null);
LogUtils.exitMethod("validateAgentAndPermission", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 聊天服务错误处理工具类
* 统一处理聊天过程中的各种异常情况
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ChatErrorHandler {
@Autowired
private ErrorHandlerService unifiedErrorHandlerService;
/**
* 处理聊天过程中的异常
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @param exception 异常对象
* @param processorType 处理器类型(可选)
*/
public void handleChatError(SseEmitter emitter, String errorMessage, Exception exception, String processorType) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage, exception, processorType);
}
/**
* 处理聊天过程中的异常()
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
*/
public void handleChatError(SseEmitter emitter, String errorMessage) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage);
}
/**
* 处理Token处理过程中的异常
*
* @param emitter SSE发射器
* @param processorType 处理器类型
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleTokenError(SseEmitter emitter, String processorType, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleTokenError(emitter, processorType, exception, isCompleted);
}
/**
* 处理完成回调过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
*/
public void handleCompletionError(SseEmitter emitter, Exception exception) {
unifiedErrorHandlerService.handleCompletionError(emitter, exception);
}
/**
* 处理对话记录保存过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleSaveDialogueError(SseEmitter emitter, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleSaveDialogueError(emitter, exception, isCompleted);
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.AgentDialogue;
import pangea.hiagent.common.utils.ValidationUtils;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.sse.WorkPanelSseService;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.workpanel.event.EventService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 完成回调处理服务
* 负责处理流式输出完成后的回调操作
*/
@Slf4j
@Service
public class CompletionHandlerService {
@Autowired
private AgentService agentService;
@Autowired
private WorkPanelSseService unifiedSseService;
@Autowired
private EventService eventService;
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理完成回调
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param fullContent 完整内容
* @param isCompleted 完成状态标记
*/
public void handleCompletion(SseEmitter emitter, AgentProcessor processor, Agent agent,
AgentRequest request, String userId,
String fullContent, AtomicBoolean isCompleted) {
LogUtils.enterMethod("handleCompletion", emitter, processor, agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(emitter, "emitter")) {
log.error("SSE发射器不能为空");
LogUtils.exitMethod("handleCompletion", "SSE发射器不能为空");
return;
}
if (ValidationUtils.isNull(processor, "processor")) {
log.error("Agent处理器不能为空");
LogUtils.exitMethod("handleCompletion", "Agent处理器不能为空");
return;
}
if (ValidationUtils.isNull(agent, "agent")) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("handleCompletion", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("handleCompletion", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
log.error("用户ID不能为空");
LogUtils.exitMethod("handleCompletion", "用户ID不能为空");
return;
}
if (ValidationUtils.isNull(isCompleted, "isCompleted")) {
log.error("完成状态标记不能为空");
LogUtils.exitMethod("handleCompletion", "完成状态标记不能为空");
return;
}
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
// 发送完成事件
try {
// 发送完整内容作为最后一个token
if (fullContent != null && !fullContent.isEmpty()) {
eventService.sendTokenEvent(emitter, fullContent);
}
// 发送完成信号
emitter.send("[DONE]");
} catch (Exception e) {
errorHandlerService.handleCompletionError(emitter, e);
}
// 保存对话记录
try {
saveDialogue(agent, request, userId, fullContent);
} catch (Exception e) {
errorHandlerService.handleSaveDialogueError(emitter, e, isCompleted);
} finally {
unifiedSseService.completeEmitter(emitter, isCompleted);
}
LogUtils.exitMethod("handleCompletion", "处理完成");
}
/**
* 保存对话记录
*/
public void saveDialogue(Agent agent, AgentRequest request, String userId, String responseContent) {
LogUtils.enterMethod("saveDialogue", agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("saveDialogue", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("saveDialogue", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
log.error("用户ID不能为空");
LogUtils.exitMethod("saveDialogue", "用户ID不能为空");
return;
}
try {
// 创建对话记录
AgentDialogue dialogue = AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
// 确保ID被设置
if (dialogue.getId() == null || dialogue.getId().isEmpty()) {
dialogue.setId(java.util.UUID.randomUUID().toString());
}
// 设置创建人和更新人信息
// 在异步线程中获取用户ID
String currentUserId = UserUtils.getCurrentUserIdInAsync();
if (currentUserId == null) {
currentUserId = userId; // 回退到传入的userId
}
dialogue.setCreatedBy(currentUserId);
dialogue.setUpdatedBy(currentUserId);
// 保存对话记录
agentService.saveDialogue(dialogue);
LogUtils.exitMethod("saveDialogue", "保存成功");
} catch (Exception e) {
log.error("保存对话记录失败", e);
LogUtils.exitMethod("saveDialogue", e);
throw new RuntimeException("保存对话记录失败", e);
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 错误处理工具类
* 提供统一的错误处理方法,减少重复代码
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ErrorHandlerUtils {
private static ErrorHandlerService errorHandlerService;
@Autowired
public ErrorHandlerUtils(ErrorHandlerService errorHandlerService) {
ErrorHandlerUtils.errorHandlerService = errorHandlerService;
}
/**
* 构建完整的错误消息
*
* @param errorMessage 基本错误信息
* @param exception 异常对象
* @param errorId 错误跟踪ID
* @param processorType 处理器类型
* @return 完整的错误消息
*/
public static String buildFullErrorMessage(String errorMessage, Exception exception, String errorId, String processorType) {
return errorHandlerService.buildFullErrorMessage(errorMessage, exception, errorId, processorType);
}
/**
* 检查是否为未授权错误
*
* @param exception 异常对象
* @return 是否为未授权错误
*/
public static boolean isUnauthorizedError(Exception exception) {
return errorHandlerService.isUnauthorizedError(exception);
}
/**
* 检查是否为超时错误
*
* @param exception 异常对象
* @return 是否为超时错误
*/
public static boolean isTimeoutError(Exception exception) {
return errorHandlerService.isTimeoutError(exception);
}
/**
* 生成错误跟踪ID
*
* @return 错误跟踪ID
*/
public static String generateErrorId() {
return errorHandlerService.generateErrorId();
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Map;
/**
* 异常监控服务
* 负责监控、统计和报告系统中的异常情况
*/
@Slf4j
@Service
public class ExceptionMonitoringService {
// 异常统计信息
private final Map<String, AtomicLong> exceptionCounters = new ConcurrentHashMap<>();
// 异常详细信息缓存
private final Map<String, String> exceptionDetails = new ConcurrentHashMap<>();
// 最大缓存条目数
private static final int MAX_CACHE_SIZE = 1000;
/**
* 记录异常信息
*
* @param exceptionType 异常类型
* @param errorMessage 错误消息
* @param stackTrace 堆栈跟踪
*/
public void recordException(String exceptionType, String errorMessage, String stackTrace) {
try {
// 更新异常计数器
AtomicLong counter = exceptionCounters.computeIfAbsent(exceptionType, k -> new AtomicLong(0));
counter.incrementAndGet();
// 记录异常详细信息(保留最新的)
String detailKey = exceptionType + "_" + System.currentTimeMillis();
exceptionDetails.put(detailKey, formatExceptionDetail(exceptionType, errorMessage, stackTrace));
// 控制缓存大小
if (exceptionDetails.size() > MAX_CACHE_SIZE) {
// 移除最老的条目
String oldestKey = exceptionDetails.keySet().iterator().next();
exceptionDetails.remove(oldestKey);
}
// 记录日志
log.error("异常监控 - 类型: {}, 消息: {}", exceptionType, errorMessage);
} catch (Exception e) {
log.warn("记录异常信息时发生错误: {}", e.getMessage());
}
}
/**
* 格式化异常详细信息
*
* @param exceptionType 异常类型
* @param errorMessage 错误消息
* @param stackTrace 堆栈跟踪
* @return 格式化后的异常详细信息
*/
private String formatExceptionDetail(String exceptionType, String errorMessage, String stackTrace) {
StringBuilder detail = new StringBuilder();
detail.append("异常类型: ").append(exceptionType).append("\n");
detail.append("错误消息: ").append(errorMessage).append("\n");
detail.append("发生时间: ").append(java.time.Instant.now().toString()).append("\n");
detail.append("堆栈跟踪:\n").append(stackTrace).append("\n");
detail.append("---\n");
return detail.toString();
}
/**
* 获取异常统计信息
*
* @return 异常统计信息
*/
public Map<String, Long> getExceptionStatistics() {
Map<String, Long> statistics = new ConcurrentHashMap<>();
for (Map.Entry<String, AtomicLong> entry : exceptionCounters.entrySet()) {
statistics.put(entry.getKey(), entry.getValue().get());
}
return statistics;
}
/**
* 获取指定类型的异常计数
*
* @param exceptionType 异常类型
* @return 异常计数
*/
public long getExceptionCount(String exceptionType) {
AtomicLong counter = exceptionCounters.get(exceptionType);
return counter != null ? counter.get() : 0;
}
/**
* 获取所有异常详细信息
*
* @return 异常详细信息
*/
public Map<String, String> getExceptionDetails() {
return new ConcurrentHashMap<>(exceptionDetails);
}
/**
* 清空异常统计信息
*/
public void clearStatistics() {
exceptionCounters.clear();
}
/**
* 清空异常详细信息
*/
public void clearDetails() {
exceptionDetails.clear();
}
/**
* 清空所有异常信息
*/
public void clearAll() {
clearStatistics();
clearDetails();
}
}
\ No newline at end of file
package pangea.hiagent.core;
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.Tool;
import pangea.hiagent.core.AgentToolManager;
import pangea.hiagent.tool.AgentToolManager;
import java.util.List;
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.sse.WorkPanelSseService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.common.utils.LogUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 流式请求服务
* 负责处理流式请求
*/
@Slf4j
@Service
public class StreamRequestService {
@Autowired
private WorkPanelSseService unifiedSseService;
@Autowired
private EventService eventService;
@Autowired
private CompletionHandlerService completionHandlerService;
/**
* 处理流式请求
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param request Agent请求
* @param agent Agent对象
* @param userId 用户ID
*/
public void handleStreamRequest(SseEmitter emitter, AgentProcessor processor, pangea.hiagent.web.dto.AgentRequest request, Agent agent, String userId) {
LogUtils.enterMethod("handleStreamRequest", emitter, processor, request, agent, userId);
// 参数验证
if (!validateParameters(emitter, processor, request, agent, userId)) {
return;
}
// 创建流式处理的Token消费者
StreamTokenConsumer tokenConsumer = new StreamTokenConsumer(emitter, processor, unifiedSseService, eventService, completionHandlerService);
// 设置上下文信息,用于保存对话记录
tokenConsumer.setContext(agent, request, userId);
// 处理流式请求,将token缓冲和事件发送完全交给处理器实现
processor.processStreamRequest(request, agent, userId, tokenConsumer);
LogUtils.exitMethod("handleStreamRequest", "处理完成");
}
/**
* 验证所有必需参数
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param request Agent请求
* @param agent Agent对象
* @param userId 用户ID
* @return 验证是否通过
*/
private boolean validateParameters(SseEmitter emitter, AgentProcessor processor, pangea.hiagent.web.dto.AgentRequest request, Agent agent, String userId) {
return emitter != null && processor != null && request != null && agent != null && userId != null && !userId.isEmpty();
}
/**
* 流式处理的Token消费者实现
* 用于处理来自Agent处理器的token流,并将其转发给SSE emitter
*/
public static class StreamTokenConsumer implements TokenConsumerWithCompletion {
private final SseEmitter emitter;
private final AgentProcessor processor;
private final EventService eventService;
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private Agent agent;
private pangea.hiagent.web.dto.AgentRequest request;
private String userId;
private CompletionHandlerService completionHandlerService;
public StreamTokenConsumer(SseEmitter emitter, AgentProcessor processor, WorkPanelSseService unifiedSseService, EventService eventService, CompletionHandlerService completionHandlerService) {
this.emitter = emitter;
this.processor = processor;
this.eventService = eventService;
this.completionHandlerService = completionHandlerService;
}
public void setContext(Agent agent, pangea.hiagent.web.dto.AgentRequest request, String userId) {
this.agent = agent;
this.request = request;
this.userId = userId;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (!isCompleted.get()) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
eventService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
eventService.sendTokenEvent(emitter, token);
}
}
} catch (Exception e) {
log.error("发送token失败", e);
}
}
@Override
public void onComplete(String fullContent) {
// 处理完成时的回调
if (isCompleted.getAndSet(true)) {
log.debug("{} Agent处理已完成,跳过重复的完成回调", processor.getProcessorType());
return;
}
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
try {
// 使用CompletionHandlerService处理完成回调
if (completionHandlerService != null) {
completionHandlerService.handleCompletion(emitter, processor, agent, request, userId, fullContent, isCompleted);
} else {
// 如果completionHandlerService不可用,使用默认处理逻辑
try {
// 发送完成事件
emitter.send("[DONE]");
// 完成 emitter
emitter.complete();
} catch (Exception e) {
log.error("处理完成事件失败", e);
} }
} catch (Exception e) {
log.error("处理完成事件失败", e);
// 确保即使出现异常也完成emitter
try {
emitter.complete();
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
}
} }
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.workpanel.event.EventService;
/**
* Token消费者接口,支持完成回调
* 用于流式处理完成后执行特定操作
*/
public interface TokenConsumerWithCompletion extends Consumer<String> {
/**
* 当流式处理完成时调用
* @param fullContent 完整的内容
*/
default void onComplete(String fullContent) {
// 默认实现为空
}
/**
* 当流式处理完成时调用,发送完成事件到前端
* @param fullContent 完整的内容
* @param emitter SSE发射器
* @param sseEventSender SSE事件发送器
* @param isCompleted 完成状态标记
*/
default void onComplete(String fullContent, SseEmitter emitter,
EventService eventService,
AtomicBoolean isCompleted) {
// 默认实现将在子类中覆盖
}
}
\ No newline at end of file
......@@ -5,8 +5,9 @@ import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Component;
import pangea.hiagent.model.AuthMode;
import pangea.hiagent.model.User;
import pangea.hiagent.repository.UserRepository;
import pangea.hiagent.utils.JwtUtil;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.web.repository.UserRepository;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import java.util.Arrays;
......
......@@ -15,10 +15,10 @@ import pangea.hiagent.model.AuthMode;
import pangea.hiagent.model.OAuth2Account;
import pangea.hiagent.model.OAuth2Provider;
import pangea.hiagent.model.User;
import pangea.hiagent.repository.UserRepository;
import pangea.hiagent.repository.OAuth2AccountRepository;
import pangea.hiagent.repository.OAuth2ProviderRepository;
import pangea.hiagent.utils.JwtUtil;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.web.repository.OAuth2AccountRepository;
import pangea.hiagent.web.repository.OAuth2ProviderRepository;
import pangea.hiagent.web.repository.UserRepository;
import java.io.IOException;
import java.util.HashMap;
......@@ -78,7 +78,6 @@ public class OAuth2AuthenticationStrategy implements AuthenticationStrategy {
public String authenticate(Map<String, Object> credentials) {
String authorizationCode = (String) credentials.get("authorizationCode");
String providerName = (String) credentials.get("providerName");
String state = (String) credentials.get("state");
if (authorizationCode == null || authorizationCode.trim().isEmpty()) {
log.warn("OAuth2 认证失败: 授权码为空");
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.client.ChatClient;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
......@@ -13,7 +15,8 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.util.UriComponentsBuilder;
import pangea.hiagent.utils.JwtUtil;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.websocket.DomSyncHandler;
import java.util.Map;
......@@ -28,12 +31,13 @@ import lombok.extern.slf4j.Slf4j;
public class DomSyncWebSocketConfig implements WebSocketConfigurer {
private final JwtHandshakeInterceptor jwtHandshakeInterceptor;
private final pangea.hiagent.core.PlaywrightManager playwrightManager;
public DomSyncWebSocketConfig(JwtHandshakeInterceptor jwtHandshakeInterceptor,
pangea.hiagent.core.PlaywrightManager playwrightManager) {
@Autowired
@Lazy
private PlaywrightManager playwrightManager;
public DomSyncWebSocketConfig(JwtHandshakeInterceptor jwtHandshakeInterceptor) {
this.jwtHandshakeInterceptor = jwtHandshakeInterceptor;
this.playwrightManager = playwrightManager;
}
// 注入DomSyncHandler,交由Spring管理生命周期
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
......
package pangea.hiagent.common.config;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component;
import pangea.hiagent.common.utils.UserUtils;
import java.time.LocalDateTime;
/**
* MyBatis Plus自动填充配置类
* 用于自动填充实体类中的公共字段
*/
@Slf4j
@Component
public class MetaObjectHandlerConfig implements MetaObjectHandler {
/**
* 插入时自动填充
* @param metaObject 元对象
*/
@Override
public void insertFill(MetaObject metaObject) {
log.debug("开始执行插入自动填充");
// 自动填充创建时间
if (metaObject.hasSetter("createdAt")) {
Object createdAt = getFieldValByName("createdAt", metaObject);
if (createdAt == null) {
this.strictInsertFill(metaObject, "createdAt", LocalDateTime.class, LocalDateTime.now());
log.debug("自动填充createdAt字段");
}
}
// 自动填充更新时间
if (metaObject.hasSetter("updatedAt")) {
Object updatedAt = getFieldValByName("updatedAt", metaObject);
if (updatedAt == null) {
this.strictInsertFill(metaObject, "updatedAt", LocalDateTime.class, LocalDateTime.now());
log.debug("自动填充updatedAt字段");
}
}
// 自动填充创建人
if (metaObject.hasSetter("createdBy")) {
Object createdBy = getFieldValByName("createdBy", metaObject);
if (createdBy == null) {
String userId = UserUtils.getCurrentUserId();
if (userId != null) {
this.strictInsertFill(metaObject, "createdBy", String.class, userId);
log.debug("自动填充createdBy字段: {}", userId);
} else {
log.warn("无法获取当前用户ID,createdBy字段未填充");
}
}
}
// 自动填充更新人
if (metaObject.hasSetter("updatedBy")) {
Object updatedBy = getFieldValByName("updatedBy", metaObject);
if (updatedBy == null) {
String userId = UserUtils.getCurrentUserId();
if (userId != null) {
this.strictInsertFill(metaObject, "updatedBy", String.class, userId);
log.debug("自动填充updatedBy字段: {}", userId);
} else {
log.warn("无法获取当前用户ID,updatedBy字段未填充");
}
}
}
}
/**
* 更新时自动填充
* @param metaObject 元对象
*/
@Override
public void updateFill(MetaObject metaObject) {
log.debug("开始执行更新自动填充");
// 自动填充更新时间
if (metaObject.hasSetter("updatedAt")) {
// 更新时总是设置updatedAt字段
this.strictUpdateFill(metaObject, "updatedAt", LocalDateTime.class, LocalDateTime.now());
log.debug("自动填充updatedAt字段");
}
// 自动填充更新人
if (metaObject.hasSetter("updatedBy")) {
Object updatedBy = getFieldValByName("updatedBy", metaObject);
// 如果updatedBy为空或者需要强制更新,则填充当前用户ID
if (updatedBy == null) {
String userId = UserUtils.getCurrentUserId();
if (userId != null) {
this.strictUpdateFill(metaObject, "updatedBy", String.class, userId);
log.debug("自动填充updatedBy字段: {}", userId);
} else {
log.warn("无法获取当前用户ID,updatedBy字段未填充");
}
}
}
}
}
\ No newline at end of file
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
......@@ -13,15 +13,16 @@ import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
import org.springframework.security.web.header.writers.frameoptions.XFrameOptionsHeaderWriter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.web.service.TimerService;
import pangea.hiagent.security.DefaultPermissionEvaluator;
import pangea.hiagent.security.JwtAuthenticationFilter;
import pangea.hiagent.service.AgentService;
import pangea.hiagent.service.TimerService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
......@@ -112,6 +113,8 @@ public class SecurityConfig {
).permitAll()
// Agent相关端点 - 需要认证
.requestMatchers("/api/v1/agent/**").authenticated()
// 特别允许timeline-events端点通过认证检查(由JWT过滤器处理)
.requestMatchers("/api/v1/agent/timeline-events").authenticated()
// 工具相关端点 - 需要认证
.requestMatchers("/api/v1/tools/**").authenticated()
// 所有其他请求需要认证
......@@ -120,26 +123,84 @@ public class SecurityConfig {
// 异常处理
.exceptionHandling(exception -> exception
.authenticationEntryPoint((request, response, authException) -> {
// 检查响应是否已经提交
// 更全面地检查响应是否已经提交
if (response.isCommitted()) {
System.err.println("响应已经提交,无法处理认证异常: " + request.getRequestURI());
log.warn("响应已经提交,无法处理认证异常: {}", request.getRequestURI());
return;
}
response.setStatus(401);
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write("{\"code\":401,\"message\":\"未授权访问\",\"timestamp\":" + System.currentTimeMillis() + "}");
try {
// 对于SSE端点的特殊处理
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
boolean isTimelineEndpoint = request.getRequestURI().contains("/api/v1/agent/timeline-events");
if (isStreamEndpoint || isTimelineEndpoint) {
// 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"未授权访问\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return;
}
response.setStatus(401);
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write("{\"code\":401,\"message\":\"未授权访问\",\"timestamp\":" + System.currentTimeMillis() + "}");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
} catch (IOException e) {
log.error("发送认证错误响应失败: {}", request.getRequestURI(), e);
// 如果在发送错误响应时发生IO异常,确保不会导致未处理的异常
} catch (Exception e) {
log.error("处理认证异常时发生未知错误: {}", request.getRequestURI(), e);
}
})
.accessDeniedHandler((request, response, accessDeniedException) -> {
// 检查响应是否已经提交
// 更全面地检查响应是否已经提交
if (response.isCommitted()) {
System.err.println("响应已经提交,无法处理访问拒绝异常: " + request.getRequestURI());
log.warn("响应已经提交,无法处理访问拒绝异常: {}", request.getRequestURI());
return;
}
response.setStatus(403);
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write("{\"code\":403,\"message\":\"访问被拒绝\",\"timestamp\":" + System.currentTimeMillis() + "}");
try {
// 对于SSE端点的特殊处理
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
boolean isTimelineEndpoint = request.getRequestURI().contains("/api/v1/agent/timeline-events");
if (isStreamEndpoint || isTimelineEndpoint) {
// 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"访问被拒绝\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return;
}
response.setStatus(403);
response.setContentType("application/json;charset=UTF-8");
response.getWriter().write("{\"code\":403,\"message\":\"访问被拒绝\",\"timestamp\":" + System.currentTimeMillis() + "}");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
} catch (IOException e) {
log.error("发送访问拒绝响应失败: {}", request.getRequestURI(), e);
// 如果在发送错误响应时发生IO异常,确保不会导致未处理的异常
} catch (Exception e) {
log.error("处理访问拒绝异常时发生未知错误: {}", request.getRequestURI(), e);
}
})
)
// 添加JWT认证过滤器
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.core.context.SecurityContextHolder;
......
package pangea.hiagent.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* SSE相关配置类
* 负责配置SSE事件处理所需的线程池等资源
*/
@Configuration
public class SseConfig {
/**
* 创建共享的心跳任务执行器
* 使用共享线程池以提高资源利用率
*
* @return ScheduledExecutorService
*/
@Bean
public ScheduledExecutorService sharedHeartbeatExecutor() {
// 创建一个固定大小的线程池,用于处理心跳任务
// 线程池大小设置为2,足以处理常规的心跳任务
return Executors.newScheduledThreadPool(2, r -> {
Thread t = new Thread(r, "sse-heartbeat-thread");
t.setDaemon(true); // 设置为守护线程,应用关闭时自动退出
return t;
});
}
}
\ No newline at end of file
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.ai.embedding.EmbeddingModel;
import org.springframework.ai.openai.OpenAiEmbeddingModel;
......
package pangea.hiagent.config;
package pangea.hiagent.common.config;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
......
package pangea.hiagent.exception;
package pangea.hiagent.common.exception;
/**
* 错误码枚举类
......@@ -12,6 +12,7 @@ public enum ErrorCode {
UNAUTHORIZED(1002, "未授权访问"),
FORBIDDEN(1003, "权限不足"),
NOT_FOUND(1004, "资源不存在"),
PERMISSION_DENIED(1005, "权限拒绝"),
// 用户相关错误 (2000-2999)
USER_NOT_FOUND(2000, "用户不存在"),
......
package pangea.hiagent.exception;
package pangea.hiagent.common.exception;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
......@@ -10,13 +10,16 @@ import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.method.annotation.MethodArgumentTypeMismatchException;
import pangea.hiagent.dto.ApiResponse;
import pangea.hiagent.agent.service.ExceptionMonitoringService;
import pangea.hiagent.web.dto.ApiResponse;
import jakarta.servlet.http.HttpServletRequest;
import java.util.stream.Collectors;
import org.springframework.security.authorization.AuthorizationDeniedException;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 全局异常处理器
* 统一处理系统中的各种异常
......@@ -25,11 +28,21 @@ import org.springframework.security.authorization.AuthorizationDeniedException;
@RestControllerAdvice
public class GlobalExceptionHandler {
@Autowired
private ExceptionMonitoringService exceptionMonitoringService;
/**
* 处理业务异常
*/
@ExceptionHandler(BusinessException.class)
public ResponseEntity<ApiResponse<Void>> handleBusinessException(BusinessException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
e.getMessage(),
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("业务异常: {} - URL: {}", e.getMessage(), request.getRequestURL());
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......@@ -47,6 +60,13 @@ public class GlobalExceptionHandler {
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<ApiResponse<Void>> handleMethodArgumentNotValidException(
MethodArgumentNotValidException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
"参数验证异常",
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("参数验证异常: {} - URL: {}", e.getMessage(), request.getRequestURL());
String errorMessage = e.getBindingResult().getFieldErrors().stream()
......@@ -68,6 +88,13 @@ public class GlobalExceptionHandler {
*/
@ExceptionHandler(BindException.class)
public ResponseEntity<ApiResponse<Void>> handleBindException(BindException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
"绑定异常",
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("绑定异常: {} - URL: {}", e.getMessage(), request.getRequestURL());
String errorMessage = e.getBindingResult().getFieldErrors().stream()
......@@ -90,6 +117,13 @@ public class GlobalExceptionHandler {
@ExceptionHandler(MethodArgumentTypeMismatchException.class)
public ResponseEntity<ApiResponse<Void>> handleMethodArgumentTypeMismatchException(
MethodArgumentTypeMismatchException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
"参数类型不匹配异常",
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("参数类型不匹配异常: {} - URL: {}", e.getMessage(), request.getRequestURL());
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......@@ -108,6 +142,13 @@ public class GlobalExceptionHandler {
@ExceptionHandler(HttpMessageNotReadableException.class)
public ResponseEntity<ApiResponse<Void>> handleHttpMessageNotReadableException(
HttpMessageNotReadableException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
"HTTP消息不可读异常",
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("HTTP消息不可读异常: {} - URL: {}", e.getMessage(), request.getRequestURL());
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......@@ -126,6 +167,13 @@ public class GlobalExceptionHandler {
@ExceptionHandler(org.springframework.security.access.AccessDeniedException.class)
public ResponseEntity<ApiResponse<Void>> handleAccessDeniedException(
org.springframework.security.access.AccessDeniedException e, HttpServletRequest request) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
"访问被拒绝",
java.util.Arrays.toString(e.getStackTrace())
);
log.warn("访问被拒绝: {} - URL: {}", e.getMessage(), request.getRequestURL());
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......@@ -164,14 +212,29 @@ public class GlobalExceptionHandler {
AuthorizationDeniedException e, HttpServletRequest request) {
log.warn("访问被拒绝: {} - URL: {}", e.getMessage(), request.getRequestURL());
// 检查响应是否已经提交
if (request.getAttribute("jakarta.servlet.error.exception") != null ||
(request instanceof org.springframework.web.context.request.NativeWebRequest &&
((org.springframework.web.context.request.NativeWebRequest) request).getNativeResponse() instanceof jakarta.servlet.http.HttpServletResponse &&
((jakarta.servlet.http.HttpServletResponse) ((org.springframework.web.context.request.NativeWebRequest) request).getNativeResponse()).isCommitted())) {
// 更全面地检查响应是否已经提交
boolean responseCommitted = false;
// 检查request属性
if (request.getAttribute("jakarta.servlet.error.exception") != null) {
responseCommitted = true;
}
// 检查response是否已提交
if (request instanceof org.springframework.web.context.request.NativeWebRequest) {
Object nativeResponse = ((org.springframework.web.context.request.NativeWebRequest) request).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
if (((jakarta.servlet.http.HttpServletResponse) nativeResponse).isCommitted()) {
responseCommitted = true;
}
}
}
// 如果响应已提交,记录日志并返回空响应以避免二次异常
if (responseCommitted) {
log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL());
// 响应已提交,无法发送错误响应
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
// 返回空响应而不是build(),避免潜在的响应提交冲突
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
}
ApiResponse.ErrorDetail errorDetail = ApiResponse.ErrorDetail.builder()
......@@ -214,10 +277,11 @@ public class GlobalExceptionHandler {
}
} else {
// 非IOException的SSE异常才记录为ERROR
log.error("SSE流式处理异常 - URL: {} - 异常类型: {} - 异常消息: {}",
log.error("SSE流式处理异常 - URL: {} - 异常类型: {} - 异常消息: {} - 用户代理: {}",
request.getRequestURL(),
e.getClass().getSimpleName(),
e.getMessage(),
request.getHeader("User-Agent"),
e);
}
......
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
......
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
......
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
......
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.security.Keys;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.common.config.JwtProperties;
import org.springframework.stereotype.Component;
import pangea.hiagent.config.JwtProperties;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
......
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
/**
* 日志工具类
* 提供统一的日志记录方法,减少重复代码
*/
@Slf4j
public class LogUtils {
/**
* 记录跟踪日志
*
* @param message 日志消息
* @param params 参数
*/
public static void trace(String message, Object... params) {
if (log.isTraceEnabled()) {
log.trace(message, params);
}
}
/**
* 记录调试日志
*
* @param message 日志消息
* @param params 参数
*/
public static void debug(String message, Object... params) {
if (log.isDebugEnabled()) {
log.debug(message, params);
}
}
/**
* 记录信息日志
*
* @param message 日志消息
* @param params 参数
*/
public static void info(String message, Object... params) {
log.info(message, params);
}
/**
* 记录警告日志
*
* @param message 日志消息
* @param params 参数
*/
public static void warn(String message, Object... params) {
log.warn(message, params);
}
/**
* 记录错误日志
*
* @param message 日志消息
* @param throwable 异常对象
*/
public static void error(String message, Throwable throwable) {
log.error(message, throwable);
}
/**
* 记录错误日志
*
* @param message 日志消息
* @param params 参数
*/
public static void error(String message, Object... params) {
log.error(message, params);
}
/**
* 记录错误日志
*
* @param message 日志消息
* @param throwable 异常对象
* @param params 参数
*/
public static void error(String message, Throwable throwable, Object... params) {
log.error(message, throwable, params);
}
/**
* 记录带前缀的信息日志
*
* @param prefix 前缀
* @param message 日志消息
* @param params 参数
*/
public static void infoWithPrefix(String prefix, String message, Object... params) {
log.info("[{}] {}", prefix, String.format(message, params));
}
/**
* 记录带前缀的错误日志
*
* @param prefix 前缀
* @param message 日志消息
* @param throwable 异常对象
*/
public static void errorWithPrefix(String prefix, String message, Throwable throwable) {
log.error("[{}] {}", prefix, message, throwable);
}
/**
* 记录带前缀的警告日志
*
* @param prefix 前缀
* @param message 日志消息
* @param params 参数
*/
public static void warnWithPrefix(String prefix, String message, Object... params) {
log.warn("[{}] {}", prefix, String.format(message, params));
}
/**
* 记录方法进入日志
*
* @param methodName 方法名
* @param params 参数
*/
public static void enterMethod(String methodName, Object... params) {
if (log.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Entering method: ").append(methodName);
if (params.length > 0) {
sb.append(" with params: ");
for (int i = 0; i < params.length; i++) {
if (i > 0) sb.append(", ");
sb.append(params[i]);
}
}
log.debug(sb.toString());
}
}
/**
* 记录方法退出日志
*
* @param methodName 方法名
* @param result 返回结果
*/
public static void exitMethod(String methodName, Object result) {
if (log.isDebugEnabled()) {
log.debug("Exiting method: {} with result: {}", methodName, result);
}
}
}
\ No newline at end of file
package pangea.hiagent.common.utils;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
/**
* 通用对象池实现
* 用于重用对象实例,减少垃圾回收压力
* 使用LRU策略管理对象池,提高资源利用效率
*
* @param <T> 对象类型
*/
public class ObjectPool<T> {
private final Queue<T> pool = new ConcurrentLinkedQueue<>();
private final Supplier<T> factory;
private final int maxSize;
// 对象创建计数器,用于监控对象池使用情况
private volatile long createdCount = 0;
private volatile long acquiredCount = 0;
private volatile long releasedCount = 0;
/**
* 构造函数
*
* @param factory 对象工厂函数
* @param maxSize 对象池最大大小
*/
public ObjectPool(Supplier<T> factory, int maxSize) {
this.factory = factory;
this.maxSize = maxSize;
}
/**
* 从对象池获取对象实例
*
* @return 对象实例
*/
public T acquire() {
T object = pool.poll();
acquiredCount++;
if (object == null) {
createdCount++;
return factory.get();
}
// 如果对象有清理方法,应该在这里调用
if (object instanceof PoolableObject) {
((PoolableObject) object).reset();
}
return object;
}
/**
* 将对象实例归还到对象池
*
* @param object 对象实例
*/
public void release(T object) {
if (object != null) {
releasedCount++;
// 如果对象有清理方法,应该在这里调用
if (object instanceof PoolableObject) {
((PoolableObject) object).reset();
}
// 只有当对象池未达到最大大小时才归还对象
if (pool.size() < maxSize) {
pool.offer(object);
}
// 如果对象池已满,对象将被垃圾回收
}
}
/**
* 清空对象池
*/
public void clear() {
pool.clear();
}
/**
* 获取对象池当前大小
*
* @return 对象池大小
*/
public int size() {
return pool.size();
}
/**
* 获取对象创建总数
*
* @return 对象创建总数
*/
public long getCreatedCount() {
return createdCount;
}
/**
* 获取对象获取总数
*
* @return 对象获取总数
*/
public long getAcquiredCount() {
return acquiredCount;
}
/**
* 获取对象归还总数
*
* @return 对象归还总数
*/
public long getReleasedCount() {
return releasedCount;
}
/**
* 获取对象池使用统计信息
*
* @return 统计信息字符串
*/
public String getStatistics() {
return String.format("ObjectPool Stats - Created: %d, Acquired: %d, Released: %d, PoolSize: %d",
createdCount, acquiredCount, releasedCount, pool.size());
}
/**
* 可重置的对象接口
* 实现此接口的对象在归还到对象池时会被重置
*/
public interface PoolableObject {
/**
* 重置对象状态
*/
void reset();
}
}
\ No newline at end of file
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
......
package pangea.hiagent.utils;
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
......@@ -37,24 +37,30 @@ public class UserUtils {
if (authentication != null && authentication.isAuthenticated() && authentication.getPrincipal() != null) {
Object principal = authentication.getPrincipal();
if (principal instanceof String) {
return (String) principal;
String userId = (String) principal;
log.debug("从SecurityContext获取到用户ID: {}", userId);
return userId;
} else {
// 如果principal不是String类型,尝试获取getName()方法的返回值
log.debug("Authentication principal is not a String: {}", principal.getClass().getName());
try {
return principal.toString();
String userId = principal.toString();
log.debug("将principal转换为字符串获取用户ID: {}", userId);
return userId;
} catch (Exception toStringEx) {
log.warn("无法将principal转换为字符串: {}", toStringEx.getMessage());
}
}
}
// 如果SecurityContext中没有认证信息,尝试从请求中解析JWT令牌
// 如果SecurityContext中没有认证信息,尝试从请求中解析JWT令牌
String userId = getUserIdFromRequest();
if (userId != null) {
log.debug("从请求中解析到用户ID: {}", userId);
return userId;
}
log.debug("未能获取到有效的用户ID");
return null;
} catch (Exception e) {
log.error("获取当前用户ID时发生异常", e);
......@@ -70,14 +76,14 @@ public class UserUtils {
public static String getCurrentUserIdInAsync() {
try {
log.debug("在异步线程中尝试获取用户ID");
// 直接从请求中解析JWT令牌获取用户ID
String userId = getUserIdFromRequest();
if (userId != null) {
log.debug("在异步线程中成功获取用户ID: {}", userId);
return userId;
}
log.debug("在异步线程中未能获取到有效的用户ID");
return null;
} catch (Exception e) {
......@@ -160,4 +166,16 @@ public class UserUtils {
public static boolean isAuthenticated() {
return getCurrentUserId() != null;
}
/**
* 检查用户是否是管理员
* @param userId 用户ID
* @return true表示是管理员,false表示不是管理员
*/
public static boolean isAdminUser(String userId) {
// 这里可以根据实际需求实现管理员检查逻辑
// 例如查询数据库或检查特殊用户ID
// 当前实现保留原有逻辑,但可以通过配置或数据库来管理管理员用户
return "admin".equals(userId) || "user-001".equals(userId);
}
}
\ No newline at end of file
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
/**
* 参数验证工具类
* 提供常用的参数验证方法,减少重复代码
*/
@Slf4j
public class ValidationUtils {
/**
* 检查对象是否为null
*
* @param obj 待检查的对象
* @param paramName 参数名称
* @return 如果对象为null返回true,否则返回false
*/
public static boolean isNull(Object obj, String paramName) {
if (obj == null) {
log.warn("{}参数不能为空", paramName);
return true;
}
return false;
}
/**
* 检查字符串是否为空
*
* @param str 待检查的字符串
* @param paramName 参数名称
* @return 如果字符串为空返回true,否则返回false
*/
public static boolean isEmpty(String str, String paramName) {
if (str == null || str.isEmpty()) {
log.warn("{}参数不能为空", paramName);
return true;
}
return false;
}
/**
* 检查字符串是否为空白
*
* @param str 待检查的字符串
* @param paramName 参数名称
* @return 如果字符串为空白返回true,否则返回false
*/
public static boolean isBlank(String str, String paramName) {
if (str == null || str.trim().isEmpty()) {
log.warn("{}参数不能为空", paramName);
return true;
}
return false;
}
/**
* 检查集合是否为空
*
* @param collection 待检查的集合
* @param paramName 参数名称
* @return 如果集合为空返回true,否则返回false
*/
public static boolean isEmpty(java.util.Collection<?> collection, String paramName) {
if (collection == null || collection.isEmpty()) {
log.warn("{}参数不能为空", paramName);
return true;
}
return false;
}
/**
* 检查数组是否为空
*
* @param array 待检查的数组
* @param paramName 参数名称
* @return 如果数组为空返回true,否则返回false
*/
public static boolean isEmpty(Object[] array, String paramName) {
if (array == null || array.length == 0) {
log.warn("{}参数不能为空", paramName);
return true;
}
return false;
}
/**
* 检查数字是否小于等于0
*
* @param number 待检查的数字
* @param paramName 参数名称
* @return 如果数字小于等于0返回true,否则返回false
*/
public static boolean isNotPositive(Number number, String paramName) {
if (number == null || number.doubleValue() <= 0) {
log.warn("{}参数必须大于0", paramName);
return true;
}
return false;
}
/**
* 检查字符串长度是否超过指定长度
*
* @param str 待检查的字符串
* @param maxLength 最大长度
* @param paramName 参数名称
* @return 如果字符串长度超过指定长度返回true,否则返回false
*/
public static boolean isLengthExceeds(String str, int maxLength, String paramName) {
if (str != null && str.length() > maxLength) {
log.warn("{}参数长度不能超过{}个字符", paramName, maxLength);
return true;
}
return false;
}
/**
* 检查对象是否为null,如果为null则抛出IllegalArgumentException异常
*
* @param obj 待检查的对象
* @param paramName 参数名称
* @throws IllegalArgumentException 如果对象为null
*/
public static void checkNotNull(Object obj, String paramName) {
if (isNull(obj, paramName)) {
throw new IllegalArgumentException(paramName + "不能为null");
}
}
/**
* 检查字符串是否为空,如果为空则抛出IllegalArgumentException异常
*
* @param str 待检查的字符串
* @param paramName 参数名称
* @throws IllegalArgumentException 如果字符串为空
*/
public static void checkNotEmpty(String str, String paramName) {
if (isEmpty(str, paramName)) {
throw new IllegalArgumentException(paramName + "不能为空");
}
}
/**
* 检查字符串是否为空白,如果为空白则抛出IllegalArgumentException异常
*
* @param str 待检查的字符串
* @param paramName 参数名称
* @throws IllegalArgumentException 如果字符串为空白
*/
public static void checkNotBlank(String str, String paramName) {
if (isBlank(str, paramName)) {
throw new IllegalArgumentException(paramName + "不能为空");
}
}
/**
* 检查集合是否为空,如果为空则抛出IllegalArgumentException异常
*
* @param collection 待检查的集合
* @param paramName 参数名称
* @throws IllegalArgumentException 如果集合为空
*/
public static void checkNotEmpty(java.util.Collection<?> collection, String paramName) {
if (isEmpty(collection, paramName)) {
throw new IllegalArgumentException(paramName + "不能为空");
}
}
/**
* 检查数组是否为空,如果为空则抛出IllegalArgumentException异常
*
* @param array 待检查的数组
* @param paramName 参数名称
* @throws IllegalArgumentException 如果数组为空
*/
public static void checkNotEmpty(Object[] array, String paramName) {
if (isEmpty(array, paramName)) {
throw new IllegalArgumentException(paramName + "不能为空");
}
}
/**
* 检查数字是否小于等于0,如果是则抛出IllegalArgumentException异常
*
* @param number 待检查的数字
* @param paramName 参数名称
* @throws IllegalArgumentException 如果数字小于等于0
*/
public static void checkPositive(Number number, String paramName) {
if (isNotPositive(number, paramName)) {
throw new IllegalArgumentException(paramName + "必须大于0");
}
}
/**
* 检查字符串长度是否超过指定长度,如果是则抛出IllegalArgumentException异常
*
* @param str 待检查的字符串
* @param maxLength 最大长度
* @param paramName 参数名称
* @throws IllegalArgumentException 如果字符串长度超过指定长度
*/
public static void checkLength(String str, int maxLength, String paramName) {
if (isLengthExceeds(str, maxLength, paramName)) {
throw new IllegalArgumentException(paramName + "长度不能超过" + maxLength + "个字符");
}
}
}
\ No newline at end of file
package pangea.hiagent.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import pangea.hiagent.dto.ApiResponse;
import pangea.hiagent.model.LlmConfig;
import pangea.hiagent.repository.LlmConfigRepository;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import java.util.List;
@Slf4j
@RestController
@RequestMapping("/api/v1/test")
public class TestController {
private final LlmConfigRepository llmConfigRepository;
public TestController(LlmConfigRepository llmConfigRepository) {
this.llmConfigRepository = llmConfigRepository;
}
@GetMapping("/llm-configs")
public ApiResponse<List<LlmConfig>> getAllLlmConfigs() {
try {
List<LlmConfig> configs = llmConfigRepository.selectList(null);
log.info("查询到 {} 条LLM配置", configs.size());
for (LlmConfig config : configs) {
log.info("配置: ID={}, 名称={}, 模型名={}, 提供商={}, 启用状态={}",
config.getId(), config.getName(), config.getModelName(),
config.getProvider(), config.getEnabled());
}
return ApiResponse.success(configs);
} catch (Exception e) {
log.error("查询LLM配置失败", e);
return ApiResponse.error(5001, "查询LLM配置失败: " + e.getMessage());
}
}
@GetMapping("/hisense-config")
public ApiResponse<LlmConfig> getHisenseConfig() {
try {
LambdaQueryWrapper<LlmConfig> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(LlmConfig::getModelName, "hisense-default");
wrapper.eq(LlmConfig::getEnabled, true);
LlmConfig config = llmConfigRepository.selectOne(wrapper);
if (config != null) {
log.info("找到海信配置: ID={}, 名称={}, 模型名={}, 提供商={}, 启用状态={}",
config.getId(), config.getName(), config.getModelName(),
config.getProvider(), config.getEnabled());
return ApiResponse.success(config);
} else {
log.warn("未找到启用的海信配置");
// 尝试查找所有海信配置
LambdaQueryWrapper<LlmConfig> allWrapper = new LambdaQueryWrapper<>();
allWrapper.eq(LlmConfig::getModelName, "hisense-default");
List<LlmConfig> allConfigs = llmConfigRepository.selectList(allWrapper);
log.info("找到 {} 条海信配置", allConfigs.size());
for (LlmConfig c : allConfigs) {
log.info("海信配置详情: ID={}, 名称={}, 模型名={}, 提供商={}, 启用状态={}",
c.getId(), c.getName(), c.getModelName(),
c.getProvider(), c.getEnabled());
}
return ApiResponse.error(4001, "未找到启用的海信配置");
}
} catch (Exception e) {
log.error("查询海信配置失败", e);
return ApiResponse.error(5001, "查询海信配置失败: " + e.getMessage());
}
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment