Commit 50254708 authored by 高如斌's avatar 高如斌

Merge branch 'main' of...

Merge branch 'main' of https://gitlab-cloud.hisense.com/gavin-group/pangea-agent into feature/chat-form
parents 307f751e 901b31c3
...@@ -193,6 +193,9 @@ backend/logs/ ...@@ -193,6 +193,9 @@ backend/logs/
backend/storage/ backend/storage/
backend/uploads/ backend/uploads/
backend/hiagentdb.mv.db backend/hiagentdb.mv.db
# H2 database files
backend/src/main/resources/hiagent_dev_db.*
./data/hiagent_dev_db.*
# Frontend files # Frontend files
frontend/node_modules/ frontend/node_modules/
...@@ -214,4 +217,6 @@ Thumbs.db ...@@ -214,4 +217,6 @@ Thumbs.db
.Trashes .Trashes
ehthumbs.db ehthumbs.db
Icon? Icon?
*.icon? *.icon?
\ No newline at end of file backend/data/hiagent_dev_db.trace.db
backend/data/hiagent_dev_db.mv.db
现有架构分析
后端架构
核心组件
WorkPanelDataCollector - 实现IWorkPanelDataCollector接口,负责收集和存储事件
DefaultEventManager - 负责创建各种类型的事件对象
SseEventSender - 负责通过SSE发送事件到前端
EventDataManager - 负责构建事件数据,用于发送到前端
EventDeduplicationService - 负责事件去重
WorkPanelEventSubscriber - 负责订阅工作面板事件
WorkPanelService - 提供工作面板状态查询服务
数据流向
Agent执行过程中产生事件
事件被WorkPanelDataCollector收集和存储
通过订阅机制,事件被推送到SseEventSender
SseEventSender使用EventDataManager构建事件数据
通过SSE连接将事件数据发送到前端
主要问题
职责不清:
WorkPanelDataCollector既负责收集又负责订阅管理
DefaultEventManager和WorkPanelDataCollector之间职责重叠
SseEventSender和EventDataManager功能分离但紧密耦合
冗余代码:
多个组件都有相似的事件处理逻辑
事件去重逻辑分散在多个地方
重复的对象转换和序列化逻辑
过度工程化:
太多的小型服务类,增加了复杂性
过度使用依赖注入和组件拆分
前端架构
核心组件
TimelineService - 协调Timeline相关的各种服务
TimelineSseManager - 管理SSE连接和事件处理
SseConnectionManager - 管理SSE连接生命周期
SseEventListenerService - 解析SSE消息
SseEventProcessorService - 处理SSE事件
EventProcessingOptimizer - 优化事件处理
主要问题
服务层次过多:
简单的SSE事件处理被拆分成太多小的服务类
服务之间的依赖关系复杂
缺乏清晰的数据流:
事件处理流程在多个服务间跳跃,难以追踪
已更新待办
架构设计方案
基于对现有代码的分析,我提出以下方案:
后端架构
设计目标
减少不必要的抽象层和服务类
明确各组件职责边界
事件处理流程
降低组件间的耦合度
后的架构
EventService - 统一的事件服务
合并DefaultEventManager和EventDataManager的功能
负责创建、构建和发送事件
集成事件去重逻辑
WorkPanelService - 工作面板核心服务
合并WorkPanelDataCollector的部分功能
负责事件的收集、存储和订阅管理
直接依赖EventService进行事件发送
移除冗余组件
移除WorkPanelEventSubscriber(功能合并到WorkPanelService)
移除SseEventSender(功能合并到EventService)
移除EventDataManager(功能合并到EventService)
数据流向
Agent执行过程中产生事件
WorkPanelService收集和存储事件
WorkPanelService通过EventService发送事件到前端
EventService负责构建事件数据并通过SSE发送
前端架构
设计目标
服务层次结构
明确数据流路径
减少不必要的服务类
后的架构
TimelineService - 时间轴核心服务
合并TimelineSseManager和SseConnectionManager的功能
负责SSE连接管理和事件处理
直接处理事件解析和优化
移除冗余服务
移除TimelineSseManager(功能合并到TimelineService)
移除SseEventListenerService(功能合并到TimelineService)
移除SseEventProcessorService(功能合并到TimelineService)
数据流向
建立SSE连接
TimelineService接收和解析事件
TimelineService优化和处理事件
事件传递给UI组件展示
技术实现要点
事件去重:
在EventService中集中实现事件去重逻辑
使用更高效的缓存策略
对象池:
保留MapPoolService用于对象复用
优化对象池管理策略
错误处理:
统一异常处理机制
错误日志记录
性能优化:
批量处理事件发送
优化事件数据构建过程
\ No newline at end of file
# HiAgent 工具管理方案
## 1. 概述
本文档旨在详细说明 HiAgent 平台的工具管理机制,确保工具方法能够被 Spring AOP 正确代理,并支持手动扫描注册及 UI 配置功能。
通过对现有代码的分析,我们发现当前系统在工具管理方面还存在一些问题,主要包括:
1. 缺少手动触发工具扫描的前端界面
2. 工具无法被正确找到和调用的问题
3. 工具扫描API端点未暴露给前端使用
本文档将在分析这些问题的基础上,提出相应的改进建议.
## 2. Spring AOP 代理兼容性方案
### 2.1 工具类设计规范
为了确保工具方法能够被 Spring AOP 正确代理,所有工具类需要遵循以下规范:
1. **注解使用**
- 工具类必须使用 `@Component` 或其派生注解(如 `@Service`)进行标记
- 工具方法必须使用 `@org.springframework.ai.tool.annotation.Tool` 注解进行标记
2. **访问修饰符**
- 工具方法必须是 `public` 方法
- 避免在同一个类中直接调用其他带有 `@Tool` 注解的方法
3. **类设计**
- 工具类应该是无状态的,或者状态应该是线程安全的
- 避免使用 `final` 方法,因为这会影响 CGLIB 代理的创建
### 2.2 AOP 代理穿透机制
系统已经实现了 AOP 代理穿透机制,确保即使在使用 Spring AOP 代理的情况下也能正确获取工具信息:
1.[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 中提供了 `getTargetClass()` 方法来获取代理对象的原始类:
```java
private Class<?> getTargetClass(Object bean) {
if (bean == null) {
return null;
}
return AopUtils.getTargetClass(bean);
}
```
2. 在工具匹配过程中,系统会穿透代理获取真实的类信息进行比较,确保匹配准确性。
### 2.3 工具执行日志切面
系统通过 [ToolExecutionLoggerAspect.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/aspect/ToolExecutionLoggerAspect.java) 实现了工具执行的日志记录和监控:
1. 使用 `@Around("@annotation(tool)")` 环绕通知拦截所有带有 `@Tool` 注解的方法
2. 自动记录工具执行的输入参数、输出结果、执行时间等信息
3. 将工具执行信息同步到 WorkPanel 进行可视化展示
## 3. 工具扫描与注册机制
### 3.1 自动扫描机制
系统通过 [ToolBeanNameInitializer.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/ToolBeanNameInitializer.java) 实现工具的自动扫描和注册:
1. **扫描范围**
- 扫描所有 Spring 容器中的 Bean
- 识别带有 `@Tool` 注解方法的类作为工具类
- 过滤掉 Spring 框架自带的 Bean
2. **工具识别规则**
- 类名包含 "Tool" 关键字
-`@Component``@Service` 标注
- 类中包含带有 `@Tool` 注解的方法
3. **工具名称推导**
- 从类名推导工具名称,去除 "Tool" 后缀
- 转换为小驼峰命名格式
### 3.2 手动触发扫描
系统支持通过管理界面手动触发工具扫描和注册:
1. 提供 `initializeToolBeanNamesManually()` 方法用于手动触发扫描
2. 扫描过程会与数据库中的工具记录进行同步:
- 如果数据库中已存在对应工具,则更新 beanName
- 如果数据库中不存在对应工具,则创建新的工具记录
- 如果数据库中有记录但 Spring 容器中不存在对应 Bean,则记录警告信息
目前系统已经实现了手动扫描功能的后端API端点,位于 `/api/v1/admin/system/initialize-tool-beans`,通过 POST 请求触发。但在前端界面上还未提供相应的人机交互界面。
### 3.3 数据库同步策略
工具信息会被持久化存储在数据库中,确保系统重启后配置不会丢失:
1. **工具实体**
- 工具名称(唯一标识)
- Spring Bean 名称(用于查找对应的实例)
- 工具显示名称
- 工具描述
- 工具状态(active/inactive)
- 工具所有者等信息
2. **同步机制**
- 系统启动时不自动执行扫描(避免影响启动速度)
- 通过管理界面手动触发扫描和同步
- 支持增量更新,只处理发生变化的工具
## 4. 当前存在的问题与改进建议
### 4.1 当前存在的主要问题
通过分析现有代码和功能实现,我们发现工具管理系统存在以下主要问题:
1. **缺少手动扫描的前端界面**
- 后端已经实现了手动扫描工具的API端点(`/api/v1/admin/system/initialize-tool-beans`
- 但前端尚未提供相应的用户界面来触发这一功能
2. **工具无法正确找到和调用**
-[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java)`getAvailableToolInstances` 方法中,当工具的 beanName 为空或查找失败时,仅记录日志而没有提供有效的错误反馈机制
- 工具调用失败时缺乏详细的错误信息和调试手段
3. **工具管理页面功能不完善**
- 当前的 [ToolManagement.vue](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/frontend/src/pages/ToolManagement.vue) 页面仅支持基础的增删改查功能
- 缺少与后端扫描功能的集成
### 4.2 改进建议
针对上述问题,我们提出以下改进建议:
#### 4.2.1 完善前端工具管理界面
1. **增加手动扫描按钮**
- 在工具管理页面添加"扫描工具"按钮
- 点击后调用后端API `/api/v1/admin/system/initialize-tool-beans` 触发扫描
- 显示扫描进度和结果
- 提供扫描历史记录查看功能
2. **增强工具详情展示**
- 在工具列表中增加显示工具的Bean名称、状态等详细信息
- 提供工具测试功能,允许用户直接测试工具调用
- 显示工具的最后更新时间和创建者信息
3. **优化错误提示**
- 当工具无法找到或调用失败时,提供更明确的错误信息
- 增加工具诊断功能,帮助用户排查问题
- 提供常见问题解决方案链接和帮助文档
4. **增加工具诊断界面**
- 提供单个工具的详细诊断信息查看
- 支持批量工具状态检查
- 显示工具依赖关系图谱#### 4.2.2 后端功能优化
1. **完善工具调用错误处理**
-[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 中增强错误处理机制
- 提供更详细的错误信息,便于前端展示和用户排查问题
- 增加结构化的错误信息返回,包含具体的原因和解决方案建议
2. **增加工具诊断API**
- 提供工具诊断端点,检查工具是否正确定义和注册
- 返回工具的详细信息和可能存在的问题
- 支持单个工具诊断和批量工具诊断功能
3. **优化日志记录**
- 增强工具调用过程中的日志记录
- 提供更详细的调试信息,便于问题追踪
- 结构化日志信息,方便后续分析和问题定位## 5. 实施步骤
### 5.1 后端实施
1. 完善 [ToolBeanNameInitializer.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/ToolBeanNameInitializer.java) 的手动扫描接口
2. 优化 [AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 的工具获取逻辑,增强错误处理和日志记录
3. 增强 [ToolExecutionLoggerAspect.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/aspect/ToolExecutionLoggerAspect.java) 的日志记录功能
4. 增加工具诊断API端点,提供工具状态检查功能
5. 实现工具依赖关系分析功能
6. 增加工具使用统计和性能监控
### 5.2 前端实施
1. 在工具管理页面增加手动扫描按钮,调用 `/api/v1/admin/system/initialize-tool-beans` 端点
2. 增强工具列表展示,显示更多工具详细信息如Bean名称、状态等
3. 增加工具测试功能,允许用户直接测试工具调用
4. 优化错误提示,提供更明确的错误信息帮助用户排查问题
5. 实现工具诊断界面,支持单个和批量工具诊断
6. 增加工具依赖关系可视化展示
### 5.3 测试验证
1. 验证工具方法的 AOP 代理兼容性
2. 测试手动扫描和自动注册功能
3. 验证 UI 配置功能的完整性和易用性
4. 测试工具执行的日志记录和监控功能
5. 验证错误处理机制的有效性
6. 测试工具诊断功能的准确性和完整性
7. 验证工具依赖关系分析的正确性
## 6. 总结
本方案通过规范工具类设计、实现 AOP 代理穿透、建立完善的扫描注册机制以及提供友好的 UI 配置界面,全面解决了工具管理的相关需求。该方案既保证了系统的稳定性和扩展性,又提升了用户的使用体验。
通过对现有代码的分析,我们确认系统已经具备了良好的基础架构,包括:
1. 完善的 AOP 代理支持
2. 工具扫描和注册的核心功能实现
3. 工具调用的日志记录机制
接下来的工作重点应该放在完善前端界面和增强错误处理上,使系统更加易于使用和维护。特别需要关注的是工具诊断功能的实现,这将大大提高系统运维和问题排查的效率。
\ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.8</version> <version>3.5.9</version>
<relativePath/> <relativePath/>
</parent> </parent>
...@@ -108,7 +108,6 @@ ...@@ -108,7 +108,6 @@
<dependency> <dependency>
<groupId>org.springframework.ai</groupId> <groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-milvus-store</artifactId> <artifactId>spring-ai-milvus-store</artifactId>
<version>${spring-ai.version}</version>
</dependency> </dependency>
...@@ -155,14 +154,12 @@ ...@@ -155,14 +154,12 @@
<dependency> <dependency>
<groupId>com.mysql</groupId> <groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId> <artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency> </dependency>
<!-- H2 Database --> <!-- H2 Database -->
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
<artifactId>h2</artifactId> <artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency> </dependency>
<!-- Redis --> <!-- Redis -->
...@@ -194,14 +191,12 @@ ...@@ -194,14 +191,12 @@
<dependency> <dependency>
<groupId>com.github.ben-manes.caffeine</groupId> <groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId> <artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency> </dependency>
<!-- Lombok --> <!-- Lombok -->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<!-- Jackson --> <!-- Jackson -->
...@@ -234,7 +229,6 @@ ...@@ -234,7 +229,6 @@
<dependency> <dependency>
<groupId>org.hibernate.validator</groupId> <groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId> <artifactId>hibernate-validator</artifactId>
<version>8.0.1.Final</version>
</dependency> </dependency>
<!-- SpringDoc OpenAPI for Swagger --> <!-- SpringDoc OpenAPI for Swagger -->
...@@ -351,7 +345,6 @@ ...@@ -351,7 +345,6 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration> <configuration>
<source>17</source> <source>17</source>
<target>17</target> <target>17</target>
...@@ -370,7 +363,6 @@ ...@@ -370,7 +363,6 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
<configuration> <configuration>
<argLine>-Dfile.encoding=UTF-8</argLine> <argLine>-Dfile.encoding=UTF-8</argLine>
</configuration> </configuration>
......
package pangea.hiagent.web.repository; package pangea.hiagent;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
......
package pangea.hiagent.workpanel.data; package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -29,4 +29,4 @@ public class CompletionEventDataBuilder { ...@@ -29,4 +29,4 @@ public class CompletionEventDataBuilder {
data.put("timestamp", System.currentTimeMillis()); data.put("timestamp", System.currentTimeMillis());
return data; return data;
} }
} }
\ No newline at end of file
package pangea.hiagent.workpanel.data; package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -30,4 +30,5 @@ public class ErrorEventDataBuilder { ...@@ -30,4 +30,5 @@ public class ErrorEventDataBuilder {
data.put("type", "error"); data.put("type", "error");
return data; return data;
} }
} }
\ No newline at end of file
package pangea.hiagent.workpanel.data; package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import pangea.hiagent.common.utils.ObjectPool; import pangea.hiagent.common.utils.ObjectPool;
...@@ -58,4 +58,5 @@ public class MapPoolService { ...@@ -58,4 +58,5 @@ public class MapPoolService {
public String getMapPoolStatistics() { public String getMapPoolStatistics() {
return mapPool.getStatistics(); return mapPool.getStatistics();
} }
} }
\ No newline at end of file
package pangea.hiagent.workpanel.data; package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
......
package pangea.hiagent.web.dto; package pangea.hiagent.agent.data;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
...@@ -34,8 +34,17 @@ public class WorkPanelEvent implements Serializable { ...@@ -34,8 +34,17 @@ public class WorkPanelEvent implements Serializable {
*/ */
private Long timestamp; private Long timestamp;
/**
* 事件内容
*/
private String content;
/** /**
* 元数据 * 元数据
*/ */
private Map<String, Object> metadata; private Map<String, Object> metadata;
/**
* 触发事件的用户ID
*/
private String userId;
} }
\ No newline at end of file
package pangea.hiagent.agent.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.agent.service.AgentErrorHandler;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import java.util.function.Consumer;
/**
* Agent处理器抽象基类
* 封装所有Agent处理器的公共逻辑
* 职责:提供所有Agent处理器共享的基础功能
*/
@Slf4j
public abstract class AbstractAgentProcessor extends BaseAgentProcessor {
@Autowired
protected AgentErrorHandler agentErrorHandler;
/**
* 处理RAG响应的通用逻辑
*
* @param ragResponse RAG响应
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应
*/
protected String handleRagResponse(String ragResponse, Consumer<String> tokenConsumer) {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
}
}
return ragResponse;
}
/**
* 处理请求的通用前置逻辑
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param userId 用户ID
* @param ragService RAG服务
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应,如果有的话;否则返回null继续正常处理流程
*/
protected String handlePreProcessing(Agent agent, String userMessage, String userId, RagService ragService, Consumer<String> tokenConsumer) {
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = generateSessionId(agent, userId);
// 添加用户消息到ChatMemory
addUserMessageToMemory(sessionId, userMessage);
// 检查是否启用RAG并尝试RAG增强
String ragResponse = tryRagEnhancement(agent, userMessage, ragService);
if (ragResponse != null) {
log.info("RAG增强返回结果,直接返回");
return handleRagResponse(ragResponse, tokenConsumer);
}
return null;
}
}
\ No newline at end of file
...@@ -9,7 +9,7 @@ import pangea.hiagent.memory.MemoryService; ...@@ -9,7 +9,7 @@ import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.memory.SmartHistorySummarizer; import pangea.hiagent.memory.SmartHistorySummarizer;
import pangea.hiagent.model.Agent; import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService; import pangea.hiagent.rag.RagService;
import pangea.hiagent.agent.service.AgentErrorHandler;
import pangea.hiagent.web.service.AgentService; import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.agent.service.ErrorHandlerService; import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion; import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
...@@ -38,8 +38,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -38,8 +38,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
@Autowired @Autowired
protected ErrorHandlerService errorHandlerService; protected ErrorHandlerService errorHandlerService;
@Autowired
protected AgentErrorHandler agentErrorHandler;
// 默认系统提示词 // 默认系统提示词
protected static final String DEFAULT_SYSTEM_PROMPT = "你是一个智能助手"; protected static final String DEFAULT_SYSTEM_PROMPT = "你是一个智能助手";
...@@ -135,7 +134,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -135,7 +134,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @return 是否为401错误 * @return 是否为401错误
*/ */
protected boolean isUnauthorizedError(Throwable e) { protected boolean isUnauthorizedError(Throwable e) {
return agentErrorHandler.isUnauthorizedError(e); return errorHandlerService.isUnauthorizedError(new Exception(e));
} }
/** /**
...@@ -146,7 +145,17 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -146,7 +145,17 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @return 错误消息 * @return 错误消息
*/ */
protected String handleSyncError(Throwable e, String errorMessagePrefix) { protected String handleSyncError(Throwable e, String errorMessagePrefix) {
return agentErrorHandler.handleSyncError(e, errorMessagePrefix); // 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return "请配置API密钥";
} else {
String errorMessage = e.getMessage();
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
return errorMessagePrefix + ": " + errorMessage;
}
} }
/** /**
...@@ -325,7 +334,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -325,7 +334,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
hasError.set(true); hasError.set(true);
// 不再重新抛出异常,避免中断流式处理 // 不再重新抛出异常,避免中断流式处理
// 但我们应该记录这个错误并向客户端发送错误信息 // 但我们应该记录这个错误并向客户端发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] 处理token时发生错误: " + e.getMessage()); errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] 处理token时发生错误: " + e.getMessage());
} }
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -344,7 +353,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -344,7 +353,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
*/ */
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer, AtomicBoolean hasError) { private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer, AtomicBoolean hasError) {
hasError.set(true); hasError.set(true);
agentErrorHandler.handleStreamError(throwable, tokenConsumer, "流式调用出错"); errorHandlerService.handleStreamError(throwable, tokenConsumer, "流式调用出错");
} }
/** /**
...@@ -385,7 +394,11 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -385,7 +394,11 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
// 发送完成事件,包含完整内容 // 发送完成事件,包含完整内容
try { try {
if (tokenConsumer instanceof TokenConsumerWithCompletion) { if (tokenConsumer instanceof TokenConsumerWithCompletion) {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString()); try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullText.toString());
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("完成事件已发送"); log.trace("完成事件已发送");
} }
...@@ -411,7 +424,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -411,7 +424,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @param isCompleted 是否已完成 * @param isCompleted 是否已完成
*/ */
private void handleStreamModelError(Consumer<String> tokenConsumer, AtomicBoolean isCompleted) { private void handleStreamModelError(Consumer<String> tokenConsumer, AtomicBoolean isCompleted) {
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] 流式模型或提示词为空,无法启动流式处理"); errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] 流式模型或提示词为空,无法启动流式处理");
// 标记完成 // 标记完成
isCompleted.set(true); isCompleted.set(true);
} }
...@@ -426,8 +439,58 @@ public abstract class BaseAgentProcessor implements AgentProcessor { ...@@ -426,8 +439,58 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
*/ */
private void handleUnexpectedError(Exception e, Consumer<String> tokenConsumer, AtomicBoolean isCompleted) { private void handleUnexpectedError(Exception e, Consumer<String> tokenConsumer, AtomicBoolean isCompleted) {
String errorMessage = handleSyncError(e, "处理流式响应时发生错误"); String errorMessage = handleSyncError(e, "处理流式响应时发生错误");
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] " + errorMessage); errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] " + errorMessage);
// 确保标记为已完成 // 确保标记为已完成
isCompleted.set(true); isCompleted.set(true);
} }
/**
* 处理RAG响应的通用逻辑
*
* @param ragResponse RAG响应
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应
*/
protected String handleRagResponse(String ragResponse, Consumer<String> tokenConsumer) {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 发送完成信号
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(ragResponse);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
}
}
}
return ragResponse;
}
/**
* 处理请求的通用前置逻辑
*
* @param agent Agent对象
* @param userMessage 用户消息
* @param userId 用户ID
* @param ragService RAG服务
* @param tokenConsumer token消费者(流式处理时使用)
* @return RAG响应,如果有的话;否则返回null继续正常处理流程
*/
protected String handlePreProcessing(Agent agent, String userMessage, String userId, RagService ragService, Consumer<String> tokenConsumer) {
// 为每个用户-Agent组合创建唯一的会话ID
String sessionId = generateSessionId(agent, userId);
// 添加用户消息到ChatMemory
addUserMessageToMemory(sessionId, userMessage);
// 检查是否启用RAG并尝试RAG增强
String ragResponse = tryRagEnhancement(agent, userMessage, ragService);
if (ragResponse != null) {
log.info("RAG增强返回结果,直接返回");
return handleRagResponse(ragResponse, tokenConsumer);
}
return null;
}
} }
\ No newline at end of file
...@@ -11,6 +11,7 @@ import pangea.hiagent.rag.RagService; ...@@ -11,6 +11,7 @@ import pangea.hiagent.rag.RagService;
import pangea.hiagent.web.dto.AgentRequest; import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer; import java.util.function.Consumer;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/** /**
* 普通Agent处理器实现类 * 普通Agent处理器实现类
...@@ -18,7 +19,7 @@ import java.util.function.Consumer; ...@@ -18,7 +19,7 @@ import java.util.function.Consumer;
*/ */
@Slf4j @Slf4j
@Service @Service
public class NormalAgentProcessor extends AbstractAgentProcessor { public class NormalAgentProcessor extends BaseAgentProcessor {
@Autowired(required = false) @Autowired(required = false)
private RagService ragService; private RagService ragService;
...@@ -67,7 +68,7 @@ public class NormalAgentProcessor extends AbstractAgentProcessor { ...@@ -67,7 +68,7 @@ public class NormalAgentProcessor extends AbstractAgentProcessor {
return responseContent; return responseContent;
} catch (Exception e) { } catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "模型调用失败"); return handleSyncError(e, "模型调用失败");
} }
} }
...@@ -101,8 +102,15 @@ public class NormalAgentProcessor extends AbstractAgentProcessor { ...@@ -101,8 +102,15 @@ public class NormalAgentProcessor extends AbstractAgentProcessor {
// 流式处理 // 流式处理
handleStreamingResponse(tokenConsumer, prompt, streamingChatModel, sessionId); handleStreamingResponse(tokenConsumer, prompt, streamingChatModel, sessionId);
} catch (Exception e) { } catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败"); errorHandlerService.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage()); // 直接调用完成回调,不依赖AgentErrorHandler
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
} }
} }
...@@ -114,9 +122,15 @@ public class NormalAgentProcessor extends AbstractAgentProcessor { ...@@ -114,9 +122,15 @@ public class NormalAgentProcessor extends AbstractAgentProcessor {
private void handleModelNotSupportStream(Consumer<String> tokenConsumer) { private void handleModelNotSupportStream(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 当前模型不支持流式输出"; String errorMessage = "[错误] 当前模型不支持流式输出";
// 发送错误信息 // 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage); errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调 // 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage); if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
} }
@Override @Override
......
...@@ -15,6 +15,7 @@ import pangea.hiagent.web.service.AgentService; ...@@ -15,6 +15,7 @@ import pangea.hiagent.web.service.AgentService;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/** /**
* ReAct Agent处理器实现类 * ReAct Agent处理器实现类
...@@ -22,7 +23,7 @@ import java.util.function.Consumer; ...@@ -22,7 +23,7 @@ import java.util.function.Consumer;
*/ */
@Slf4j @Slf4j
@Service @Service
public class ReActAgentProcessor extends AbstractAgentProcessor { public class ReActAgentProcessor extends BaseAgentProcessor {
@Autowired @Autowired
private AgentService agentService; private AgentService agentService;
...@@ -30,14 +31,16 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -30,14 +31,16 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
@Autowired @Autowired
private RagService ragService; private RagService ragService;
@Autowired
private ReactCallback defaultReactCallback;
@Autowired @Autowired
private ReactExecutor defaultReactExecutor; private ReactExecutor defaultReactExecutor;
@Autowired @Autowired
private AgentToolManager agentToolManager; private AgentToolManager agentToolManager;
@Autowired
private ReactCallback defaultReactCallback;
@Override @Override
public String processRequest(Agent agent, AgentRequest request, String userId) { public String processRequest(Agent agent, AgentRequest request, String userId) {
...@@ -71,10 +74,6 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -71,10 +74,6 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
// 处理请求的通用前置逻辑 // 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, null); String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, null);
if (ragResponse != null) { if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return ragResponse; return ragResponse;
} }
...@@ -82,21 +81,16 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -82,21 +81,16 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
ChatClient client = ChatClient.builder(agentService.getChatModelForAgent(agent)).build(); ChatClient client = ChatClient.builder(agentService.getChatModelForAgent(agent)).build();
List<Object> tools = agentToolManager.getAvailableToolInstances(agent); List<Object> tools = agentToolManager.getAvailableToolInstances(agent);
// 添加自定义回调到ReAct执行器
if (defaultReactExecutor != null && defaultReactCallback != null) {
defaultReactExecutor.addReactCallback(defaultReactCallback);
}
// 使用ReAct执行器执行流程,传递Agent对象以支持记忆功能
String finalAnswer = defaultReactExecutor.executeWithAgent(client, userMessage, tools, agent);
// 将助理回复添加到ChatMemory // 使用ReAct执行器执行流程,传递Agent对象和用户ID以支持记忆功能
String sessionId = generateSessionId(agent, userId); String finalAnswer = defaultReactExecutor.execute(client, userMessage, tools, agent, userId);
addAssistantMessageToMemory(sessionId, finalAnswer);
// 助手回复已经由执行器保存到内存中,不需要重复保存
return finalAnswer; return finalAnswer;
} catch (Exception e) { } catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "处理ReAct请求时发生错误"); return handleSyncError(e, "处理ReAct请求时发生错误");
} }
} }
...@@ -115,10 +109,6 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -115,10 +109,6 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
// 处理请求的通用前置逻辑 // 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, tokenConsumer); String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, tokenConsumer);
if (ragResponse != null) { if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return; return;
} }
...@@ -138,11 +128,18 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -138,11 +128,18 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
return; return;
} }
// 使用ReAct执行器流式执行流程,传递Agent对象以支持记忆功能 // 使用ReAct执行器流式执行流程,传递Agent对象以支持记忆功能和用户ID以确保上下文传播
defaultReactExecutor.executeStreamWithAgent(client, userMessage, tools, tokenConsumer, agent); defaultReactExecutor.executeStream(client, userMessage, tools, tokenConsumer, agent, userId);
} catch (Exception e) { } catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误"); errorHandlerService.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage()); // 直接调用完成回调,不依赖AgentErrorHandler
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
} }
} }
...@@ -154,8 +151,14 @@ public class ReActAgentProcessor extends AbstractAgentProcessor { ...@@ -154,8 +151,14 @@ public class ReActAgentProcessor extends AbstractAgentProcessor {
private void handleModelNotAvailable(Consumer<String> tokenConsumer) { private void handleModelNotAvailable(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 无法获取Agent的聊天模型"; String errorMessage = "[错误] 无法获取Agent的聊天模型";
// 发送错误信息 // 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage); errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调 // 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage); if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
} }
} }
\ No newline at end of file
package pangea.hiagent.agent.react; package pangea.hiagent.agent.react;
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.workpanel.IWorkPanelDataCollector; import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.agent.data.WorkPanelEvent;
/** /**
* 自定义 ReAct 回调类,用于捕获并处理 ReAct 的每一步思维过程 * 简化的ReAct回调类
* 适配项目现有的 ReAct 实现方式
*/ */
@Slf4j @Slf4j
@Component // 注册为 Spring 组件,方便注入 @Component
public class DefaultReactCallback implements ReactCallback { public class DefaultReactCallback implements ReactCallback {
@Autowired @Autowired
private IWorkPanelDataCollector workPanelCollector; private UserSseService userSseService;
/**
* ReAct 每执行一个步骤,该方法会被触发
* @param reactStep ReAct 步骤对象,包含步骤的所有核心信息
*/
@Override @Override
public void onStep(ReactStep reactStep) { public void onStep(ReactStep reactStep) {
// 将信息记录到工作面板
recordReactStepToWorkPanel(reactStep); String reactStepName = reactStep.getStepType().name();
}
/**
* 处理 ReAct 最终答案步骤
* @param finalAnswer 最终答案
*/
@Override
public void onFinalAnswer(String finalAnswer) {
// 创建一个FINAL_ANSWER类型的ReactStep并处理
ReactStep finalStep = new ReactStep(0, ReactStepType.FINAL_ANSWER, finalAnswer);
recordReactStepToWorkPanel(finalStep);
}
/**
* 将ReAct步骤记录到工作面板
* @param reactStep ReAct步骤
*/
private void recordReactStepToWorkPanel(ReactStep reactStep) {
if (workPanelCollector == null) {
log.debug("无法记录到工作面板:collector为null");
return;
}
try { try {
switch (reactStep.getStepType()) { userSseService.sendWorkPanelEvent(WorkPanelEvent.builder()
case THOUGHT: .type(reactStepName)
workPanelCollector.recordThinking(reactStep.getContent(), "reasoning"); .content(reactStep.getContent())
break; .userId(UserUtils.getCurrentUserIdStatic())
case ACTION: .build());
if (reactStep.getAction() != null) { } catch (IOException e) {
// 使用recordToolCallAction记录工具调用开始,状态为pending log.error("发送ReAct步骤到WorkPanel失败: 类型={}, 内容摘要={}",
workPanelCollector.recordToolCallAction( reactStep.getStepType(),
reactStep.getAction().getToolName(), reactStep.getContent() != null
reactStep.getAction().getParameters(), ? reactStep.getContent().substring(0, Math.min(50, reactStep.getContent().length()))
null, : "null",
"pending", e);
null
);
}
break;
case OBSERVATION:
if (reactStep.getObservation() != null && reactStep.getAction() != null) {
// 使用recordToolCallAction记录工具调用完成,状态为success
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getParameters(),
reactStep.getObservation().getContent(),
"success",
null
);
}
break;
case FINAL_ANSWER:
workPanelCollector.recordFinalAnswer(reactStep.getContent());
break;
default:
log.warn("未知的ReAct步骤类型: {}", reactStep.getStepType());
break;
}
} catch (Exception e) {
log.error("记录ReAct步骤到工作面板失败", e);
} }
// 记录最终答案到日志
log.info("[WorkPanel] 记录{} {}", reactStepName,
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
} }
} }
\ No newline at end of file
package pangea.hiagent.agent.react;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class EventSplitter {
private final List<String> keywords = Arrays.asList(
"Thought", "Action", "Observation", "Final_Answer"
);
private final Pattern keywordPattern = Pattern.compile(
String.format("(?i)(Thought|Action|Observation|Final[ _]Answer):", String.join("|", keywords)), Pattern.CASE_INSENSITIVE
);
private String currentType = null;
private StringBuilder currentContent = new StringBuilder();
private StringBuilder buffer = new StringBuilder();
private final ReactCallback callback;
private volatile int stepNumber = 0;
public EventSplitter(ReactCallback callback) {
this.callback = callback;
}
// 每收到一个token/字符,调用此方法
public void feedToken(String token) {
buffer.append(token);
// log.debug("当前缓冲区: {}", buffer.toString());
Matcher matcher = keywordPattern.matcher(buffer);
while (matcher.find()) {
log.debug("发现新事件关键词: {}", matcher.group(1));
// 发现新事件
if (currentType != null && currentContent.length() > 0) {
// 实时输出已分割事件
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
// 更新事件类型
currentType = matcher.group(1);
currentContent.setLength(0);
// 累积匹配位置后的内容
currentContent.append(buffer.substring(matcher.end()));
// 重置buffer为剩余内容
buffer.setLength(0);
buffer.append(currentContent);
// 重新查找
matcher = keywordPattern.matcher(buffer);
}
// 检查是否有部分关键词在buffer末尾
if (buffer.length() > 0) {
// 检查是否可能是关键词的一部分
boolean isPartialKeyword = false;
String bufferStr = buffer.toString();
for (String keyword : keywords) {
if (keyword.startsWith(bufferStr) || bufferStr.startsWith(keyword)) {
isPartialKeyword = true;
break;
}
}
if (!isPartialKeyword) {
// 不是部分关键词,添加到内容中
currentContent.append(buffer);
buffer.setLength(0);
}
}
}
// 流式结束时,调用此方法输出最后一个事件
public void endStream(ReactCallback tokenConsumer) {
if (currentType != null && currentContent.length() > 0) {
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
}
}
...@@ -10,6 +10,4 @@ public interface ReactCallback { ...@@ -10,6 +10,4 @@ public interface ReactCallback {
* @param reactStep ReAct步骤对象,包含步骤的所有核心信息 * @param reactStep ReAct步骤对象,包含步骤的所有核心信息
*/ */
void onStep(ReactStep reactStep); void onStep(ReactStep reactStep);
void onFinalAnswer(String ragResponse);
} }
\ No newline at end of file
...@@ -15,21 +15,21 @@ public interface ReactExecutor { ...@@ -15,21 +15,21 @@ public interface ReactExecutor {
* @param chatClient ChatClient实例 * @param chatClient ChatClient实例
* @param userInput 用户输入 * @param userInput 用户输入
* @param tools 工具列表 * @param tools 工具列表
* @param agent Agent对象
* @return 最终答案 * @return 最终答案
*/ */
String execute(ChatClient chatClient, String userInput, List<Object> tools); String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent);
/** /**
* 执行ReAct流程(同步方式)- 支持Agent配置 * 执行ReAct流程(同步方式)
* @param chatClient ChatClient实例 * @param chatClient ChatClient实例
* @param userInput 用户输入 * @param userInput 用户输入
* @param tools 工具列表 * @param tools 工具列表
* @param agent Agent对象(可选) * @param agent Agent对象
* @param userId 用户ID
* @return 最终答案 * @return 最终答案
*/ */
default String executeWithAgent(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) { String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent, String userId);
return execute(chatClient, userInput, tools);
}
/** /**
* 流式执行ReAct流程 * 流式执行ReAct流程
...@@ -37,20 +37,20 @@ public interface ReactExecutor { ...@@ -37,20 +37,20 @@ public interface ReactExecutor {
* @param userInput 用户输入 * @param userInput 用户输入
* @param tools 工具列表 * @param tools 工具列表
* @param tokenConsumer token处理回调函数 * @param tokenConsumer token处理回调函数
* @param agent Agent对象
* @param userId 用户ID
*/ */
void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer); void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId);
/** /**
* 流式执行ReAct流程 - 支持Agent配置 * 流式执行ReAct流程(旧方法,保持向后兼容)
* @param chatClient ChatClient实例 * @param chatClient ChatClient实例
* @param userInput 用户输入 * @param userInput 用户输入
* @param tools 工具列表 * @param tools 工具列表
* @param tokenConsumer token处理回调函数 * @param tokenConsumer token处理回调函数
* @param agent Agent对象(可选) * @param agent Agent对象
*/ */
default void executeStreamWithAgent(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) { void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent);
executeStream(chatClient, userInput, tools, tokenConsumer);
}
/** /**
* 添加ReAct回调 * 添加ReAct回调
......
package pangea.hiagent.agent.react; package pangea.hiagent.agent.react;
import lombok.Data;
/** /**
* ReAct步骤对象,包含步骤的所有核心信息 * ReAct步骤类,用于表示ReAct执行过程中的单个步骤
*/ */
@Data
public class ReactStep { public class ReactStep {
/**
* 步骤编号
*/
private int stepNumber; private int stepNumber;
/**
* 步骤类型
*/
private ReactStepType stepType; private ReactStepType stepType;
/**
* 步骤核心内容(思维描述、动作指令、观察结果等)
*/
private String content; private String content;
/**
* 工具调用信息(仅在ACTION步骤时有值)
*/
private ToolCallAction action; private ToolCallAction action;
/**
* 工具观察结果(仅在OBSERVATION步骤时有值)
*/
private ToolObservation observation; private ToolObservation observation;
/**
* 构造函数
*/
public ReactStep() {}
/**
* 构造函数
* @param stepNumber 步骤编号
* @param stepType 步骤类型
* @param content 步骤内容
*/
public ReactStep(int stepNumber, ReactStepType stepType, String content) { public ReactStep(int stepNumber, ReactStepType stepType, String content) {
this.stepNumber = stepNumber; this.stepNumber = stepNumber;
this.stepType = stepType; this.stepType = stepType;
this.content = content; this.content = content;
} }
// Getters and Setters
public int getStepNumber() { return stepNumber; }
public void setStepNumber(int stepNumber) { this.stepNumber = stepNumber; }
public ReactStepType getStepType() { return stepType; }
public void setStepType(ReactStepType stepType) { this.stepType = stepType; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public ToolCallAction getAction() { return action; }
public void setAction(ToolCallAction action) { this.action = action; }
public ToolObservation getObservation() { return observation; }
public void setObservation(ToolObservation observation) { this.observation = observation; }
/** /**
* 工具调用动作类 * 工具调用动作内部
*/ */
@Data
public static class ToolCallAction { public static class ToolCallAction {
/**
* 工具名称
*/
private String toolName; private String toolName;
private Object toolArgs;
/** public ToolCallAction(String toolName, Object toolArgs) {
* 工具调用参数
*/
private Object parameters;
public ToolCallAction() {}
public ToolCallAction(String toolName, Object parameters) {
this.toolName = toolName; this.toolName = toolName;
this.parameters = parameters; this.toolArgs = toolArgs;
} }
public String getToolName() { return toolName; }
public void setToolName(String toolName) { this.toolName = toolName; }
public Object getToolArgs() { return toolArgs; }
public void setToolArgs(Object toolArgs) { this.toolArgs = toolArgs; }
} }
/** /**
* 工具观察结果类 * 工具观察结果内部
*/ */
@Data
public static class ToolObservation { public static class ToolObservation {
/** private String result;
* 观察内容
*/
private String content;
public ToolObservation() {} public ToolObservation(String result) {
this.result = result;
public ToolObservation(String content) {
this.content = content;
} }
public String getResult() { return result; }
public void setResult(String result) { this.result = result; }
} }
} }
\ No newline at end of file
...@@ -22,5 +22,9 @@ public enum ReactStepType { ...@@ -22,5 +22,9 @@ public enum ReactStepType {
/** /**
* 最终答案步骤:结合工具结果生成最终回答 * 最终答案步骤:结合工具结果生成最终回答
*/ */
FINAL_ANSWER FINAL_ANSWER;
public static ReactStepType fromString(String currentType) {
return ReactStepType.valueOf(currentType.toUpperCase().replace(" ", "_"));
}
} }
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/**
* Agent错误处理工具类
* 统一处理Agent处理器中的错误逻辑
*/
@Slf4j
@Component
public class AgentErrorHandler {
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理401未授权错误
*
* @param e 异常对象
* @return 是否为401错误
*/
public boolean isUnauthorizedError(Throwable e) {
return errorHandlerService.isUnauthorizedError(new Exception(e));
}
/**
* 处理流式处理中的错误
*
* @param e 异常对象
* @param tokenConsumer token处理回调函数
* @param errorMessagePrefix 错误消息前缀
*/
public void handleStreamError(Throwable e, Consumer<String> tokenConsumer, String errorMessagePrefix) {
errorHandlerService.handleStreamError(e, tokenConsumer, errorMessagePrefix);
}
/**
* 处理同步处理中的错误
*
* @param e 异常对象
* @param errorMessagePrefix 错误消息前缀
* @return 错误消息
*/
public String handleSyncError(Throwable e, String errorMessagePrefix) {
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return "请配置API密钥";
} else {
String errorMessage = e.getMessage();
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
return errorMessagePrefix + ": " + errorMessage;
}
}
/**
* 发送错误信息给客户端
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void sendErrorMessage(Consumer<String> tokenConsumer, String errorMessage) {
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
}
/**
* 确保在异常情况下也调用完成回调
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void ensureCompletionCallback(Consumer<String> tokenConsumer, String errorMessage) {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.processor.AgentProcessorFactory;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Agent处理器服务
* 负责处理Agent处理器的获取和心跳机制
*/
@Slf4j
@Service
public class AgentProcessorService {
@Autowired
private AgentProcessorFactory agentProcessorFactory;
@Autowired
private UserSseService workPanelSseService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 获取处理器并启动心跳保活机制
*
* @param agent Agent对象
* @param emitter SSE发射器
* @return Agent处理器,如果获取失败则返回null
*/
public AgentProcessor getProcessorAndStartHeartbeat(Agent agent, SseEmitter emitter) {
LogUtils.enterMethod("getProcessorAndStartHeartbeat", agent);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
chatErrorHandler.handleChatError(emitter, "Agent对象不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "Agent对象不能为空");
return null;
}
if (ValidationUtils.isNull(emitter, "emitter")) {
chatErrorHandler.handleChatError(emitter, "SSE发射器不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "SSE发射器不能为空");
return null;
}
try {
// 根据Agent类型选择处理器并处理请求
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
chatErrorHandler.handleChatError(emitter, "无法获取Agent处理器");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "无法获取Agent处理器");
return null;
}
log.info("使用{} Agent处理器处理对话", processor.getProcessorType());
// 启动心跳保活机制
workPanelSseService.startHeartbeat(emitter, new AtomicBoolean(false));
LogUtils.exitMethod("getProcessorAndStartHeartbeat", processor);
return processor;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "获取处理器或启动心跳时发生错误", e, null);
LogUtils.exitMethod("getProcessorAndStartHeartbeat", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import pangea.hiagent.common.utils.UserUtils;
/**
* Agent验证服务
* 负责处理Agent的参数验证和权限检查
*/
@Slf4j
@Service
public class AgentValidationService {
@Autowired
private AgentService agentService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 验证Agent存在性和用户权限
*
* @param agentId Agent ID
* @param userId 用户ID
* @param emitter SSE发射器
* @return Agent对象,如果验证失败则返回null
*/
public Agent validateAgentAndPermission(String agentId, String userId, SseEmitter emitter) {
LogUtils.enterMethod("validateAgentAndPermission", agentId, userId);
// 参数验证
if (ValidationUtils.isBlank(agentId, "agentId")) {
chatErrorHandler.handleChatError(emitter, "Agent ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "Agent ID不能为空");
return null;
}
if (ValidationUtils.isBlank(userId, "userId")) {
chatErrorHandler.handleChatError(emitter, "用户ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "用户ID不能为空");
return null;
}
try {
// 获取Agent信息
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
chatErrorHandler.handleChatError(emitter, "Agent不存在");
LogUtils.exitMethod("validateAgentAndPermission", "Agent不存在");
return null;
}
// 检查权限(可选)
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
chatErrorHandler.handleChatError(emitter, "无权限访问该Agent");
LogUtils.exitMethod("validateAgentAndPermission", "无权限访问该Agent");
return null;
}
LogUtils.exitMethod("validateAgentAndPermission", agent);
return agent;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "验证Agent和权限时发生错误", e, null);
LogUtils.exitMethod("validateAgentAndPermission", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 聊天服务错误处理工具类
* 统一处理聊天过程中的各种异常情况
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ChatErrorHandler {
@Autowired
private ErrorHandlerService unifiedErrorHandlerService;
/**
* 处理聊天过程中的异常
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @param exception 异常对象
* @param processorType 处理器类型(可选)
*/
public void handleChatError(SseEmitter emitter, String errorMessage, Exception exception, String processorType) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage, exception, processorType);
}
/**
* 处理聊天过程中的异常()
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
*/
public void handleChatError(SseEmitter emitter, String errorMessage) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage);
}
/**
* 处理Token处理过程中的异常
*
* @param emitter SSE发射器
* @param processorType 处理器类型
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleTokenError(SseEmitter emitter, String processorType, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleTokenError(emitter, processorType, exception, isCompleted);
}
/**
* 处理完成回调过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
*/
public void handleCompletionError(SseEmitter emitter, Exception exception) {
unifiedErrorHandlerService.handleCompletionError(emitter, exception);
}
/**
* 处理对话记录保存过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleSaveDialogueError(SseEmitter emitter, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleSaveDialogueError(emitter, exception, isCompleted);
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.AgentDialogue;
import pangea.hiagent.common.utils.ValidationUtils;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.workpanel.event.EventService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 完成回调处理服务
* 负责处理流式输出完成后的回调操作
*/
@Slf4j
@Service
public class CompletionHandlerService {
@Autowired
private AgentService agentService;
@Autowired
private UserSseService unifiedSseService;
@Autowired
private EventService eventService;
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理完成回调
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param fullContent 完整内容
* @param isCompleted 完成状态标记
*/
public void handleCompletion(SseEmitter emitter, AgentProcessor processor, Agent agent,
AgentRequest request, String userId,
String fullContent, AtomicBoolean isCompleted) {
LogUtils.enterMethod("handleCompletion", emitter, processor, agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(emitter, "emitter")) {
log.error("SSE发射器不能为空");
LogUtils.exitMethod("handleCompletion", "SSE发射器不能为空");
return;
}
if (ValidationUtils.isNull(processor, "processor")) {
log.error("Agent处理器不能为空");
LogUtils.exitMethod("handleCompletion", "Agent处理器不能为空");
return;
}
if (ValidationUtils.isNull(agent, "agent")) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("handleCompletion", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("handleCompletion", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
log.error("用户ID不能为空");
LogUtils.exitMethod("handleCompletion", "用户ID不能为空");
return;
}
if (ValidationUtils.isNull(isCompleted, "isCompleted")) {
log.error("完成状态标记不能为空");
LogUtils.exitMethod("handleCompletion", "完成状态标记不能为空");
return;
}
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
// 发送完成事件
try {
// 发送完整内容作为最后一个token
if (fullContent != null && !fullContent.isEmpty()) {
eventService.sendTokenEvent(emitter, fullContent);
}
// 发送完成信号
emitter.send("[DONE]");
} catch (Exception e) {
errorHandlerService.handleCompletionError(emitter, e);
}
// 保存对话记录
try {
saveDialogue(agent, request, userId, fullContent);
} catch (Exception e) {
errorHandlerService.handleSaveDialogueError(emitter, e, isCompleted);
} finally {
unifiedSseService.completeEmitter(emitter, isCompleted);
}
LogUtils.exitMethod("handleCompletion", "处理完成");
}
/**
* 保存对话记录
*/
public void saveDialogue(Agent agent, AgentRequest request, String userId, String responseContent) {
LogUtils.enterMethod("saveDialogue", agent, request, userId);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("saveDialogue", "Agent对象不能为空");
return;
}
if (ValidationUtils.isNull(request, "request")) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("saveDialogue", "Agent请求不能为空");
return;
}
if (ValidationUtils.isBlank(userId, "userId")) {
log.error("用户ID不能为空");
LogUtils.exitMethod("saveDialogue", "用户ID不能为空");
return;
}
try {
// 创建对话记录
AgentDialogue dialogue = AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
// 确保ID被设置
if (dialogue.getId() == null || dialogue.getId().isEmpty()) {
dialogue.setId(java.util.UUID.randomUUID().toString());
}
// 设置创建人和更新人信息
// 在异步线程中获取用户ID
String currentUserId = UserUtils.getCurrentUserIdInAsync();
if (currentUserId == null) {
currentUserId = userId; // 回退到传入的userId
}
dialogue.setCreatedBy(currentUserId);
dialogue.setUpdatedBy(currentUserId);
// 保存对话记录
agentService.saveDialogue(dialogue);
LogUtils.exitMethod("saveDialogue", "保存成功");
} catch (Exception e) {
log.error("保存对话记录失败", e);
LogUtils.exitMethod("saveDialogue", e);
throw new RuntimeException("保存对话记录失败", e);
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 错误处理工具类
* 提供统一的错误处理方法,减少重复代码
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ErrorHandlerUtils {
private static ErrorHandlerService errorHandlerService;
@Autowired
public ErrorHandlerUtils(ErrorHandlerService errorHandlerService) {
ErrorHandlerUtils.errorHandlerService = errorHandlerService;
}
/**
* 构建完整的错误消息
*
* @param errorMessage 基本错误信息
* @param exception 异常对象
* @param errorId 错误跟踪ID
* @param processorType 处理器类型
* @return 完整的错误消息
*/
public static String buildFullErrorMessage(String errorMessage, Exception exception, String errorId, String processorType) {
return errorHandlerService.buildFullErrorMessage(errorMessage, exception, errorId, processorType);
}
/**
* 检查是否为未授权错误
*
* @param exception 异常对象
* @return 是否为未授权错误
*/
public static boolean isUnauthorizedError(Exception exception) {
return errorHandlerService.isUnauthorizedError(exception);
}
/**
* 检查是否为超时错误
*
* @param exception 异常对象
* @return 是否为超时错误
*/
public static boolean isTimeoutError(Exception exception) {
return errorHandlerService.isTimeoutError(exception);
}
/**
* 生成错误跟踪ID
*
* @return 错误跟踪ID
*/
public static String generateErrorId() {
return errorHandlerService.generateErrorId();
}
}
\ No newline at end of file
...@@ -2,9 +2,10 @@ package pangea.hiagent.agent.service; ...@@ -2,9 +2,10 @@ package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* 异常监控服务 * 异常监控服务
...@@ -17,12 +18,18 @@ public class ExceptionMonitoringService { ...@@ -17,12 +18,18 @@ public class ExceptionMonitoringService {
// 异常统计信息 // 异常统计信息
private final Map<String, AtomicLong> exceptionCounters = new ConcurrentHashMap<>(); private final Map<String, AtomicLong> exceptionCounters = new ConcurrentHashMap<>();
// 异常详细信息缓存 // 异常详细信息缓存,使用时间戳作为键,便于按时间排序
private final Map<String, String> exceptionDetails = new ConcurrentHashMap<>(); private final Map<Long, String> exceptionDetails = new ConcurrentHashMap<>();
// 锁,用于保护缓存清理操作
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 最大缓存条目数 // 最大缓存条目数
private static final int MAX_CACHE_SIZE = 1000; private static final int MAX_CACHE_SIZE = 1000;
// 清理阈值,当缓存超过最大值时,清理到这个值
private static final int CLEANUP_THRESHOLD = MAX_CACHE_SIZE - 200;
/** /**
* 记录异常信息 * 记录异常信息
* *
...@@ -37,14 +44,31 @@ public class ExceptionMonitoringService { ...@@ -37,14 +44,31 @@ public class ExceptionMonitoringService {
counter.incrementAndGet(); counter.incrementAndGet();
// 记录异常详细信息(保留最新的) // 记录异常详细信息(保留最新的)
String detailKey = exceptionType + "_" + System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
exceptionDetails.put(detailKey, formatExceptionDetail(exceptionType, errorMessage, stackTrace)); exceptionDetails.put(timestamp, formatExceptionDetail(exceptionType, errorMessage, stackTrace));
// 控制缓存大小 // 控制缓存大小,使用写锁保护清理操作
if (exceptionDetails.size() > MAX_CACHE_SIZE) { if (exceptionDetails.size() > MAX_CACHE_SIZE) {
// 移除最老的条目 lock.writeLock().lock();
String oldestKey = exceptionDetails.keySet().iterator().next(); try {
exceptionDetails.remove(oldestKey); // 再次检查,避免竞态条件
if (exceptionDetails.size() > MAX_CACHE_SIZE) {
// 找出最老的条目并移除,直到达到清理阈值
while (exceptionDetails.size() > CLEANUP_THRESHOLD) {
// 找出最小的时间戳(最老的条目)
Long oldestTimestamp = exceptionDetails.keySet().stream()
.min(Long::compare)
.orElse(null);
if (oldestTimestamp != null) {
exceptionDetails.remove(oldestTimestamp);
} else {
break;
}
}
}
} finally {
lock.writeLock().unlock();
}
} }
// 记录日志 // 记录日志
...@@ -102,7 +126,11 @@ public class ExceptionMonitoringService { ...@@ -102,7 +126,11 @@ public class ExceptionMonitoringService {
* @return 异常详细信息 * @return 异常详细信息
*/ */
public Map<String, String> getExceptionDetails() { public Map<String, String> getExceptionDetails() {
return new ConcurrentHashMap<>(exceptionDetails); Map<String, String> result = new ConcurrentHashMap<>();
for (Map.Entry<Long, String> entry : exceptionDetails.entrySet()) {
result.put(entry.getKey().toString(), entry.getValue());
}
return result;
} }
/** /**
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
import java.io.IOException;
/**
* SSE Token发射器
* 专注于将token转换为SSE事件并发送
* 无状态设计,每次使用时创建新实例
*/
@Slf4j
public class SseTokenEmitter implements TokenConsumerWithCompletion {
private final UserSseService userSseService;
// 所有状态通过构造函数一次性传入
private final SseEmitter emitter;
private final Agent agent;
private final AgentRequest request;
private final String userId;
private final CompletionCallback completionCallback;
/**
* 构造函数
* @param userSseService SSE服务
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
*/
public SseTokenEmitter(UserSseService userSseService, SseEmitter emitter, Agent agent,
AgentRequest request, String userId, CompletionCallback completionCallback) {
this.userSseService = userSseService;
this.emitter = emitter;
this.agent = agent;
this.request = request;
this.userId = userId;
this.completionCallback = completionCallback;
}
/**
* 无参构造函数,用于Spring容器初始化
*/
public SseTokenEmitter() {
this(null, null, null, null, null, null);
}
/**
* 构造函数,用于Spring容器初始化(带UserSseService参数)
*/
public SseTokenEmitter(UserSseService userSseService) {
this(userSseService, null, null, null, null, null);
}
/**
* 创建新的SseTokenEmitter实例
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
* @return 新的SseTokenEmitter实例
*/
public SseTokenEmitter createNewInstance(SseEmitter emitter, Agent agent, AgentRequest request,
String userId, CompletionCallback completionCallback) {
return new SseTokenEmitter(userSseService, emitter, agent, request, userId, completionCallback);
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (emitter != null && userSseService.isEmitterValidSafe(emitter)) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
userSseService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
userSseService.sendTokenEvent(emitter, token);
}
} else {
log.debug("SSE emitter已无效,跳过发送token");
}
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送token,SSE emitter已关闭: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送token,IO异常: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} catch (Exception e) {
log.error("发送token失败", e);
// 对于其他异常,也将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
}
}
@Override
public void onComplete(String fullContent) {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
// 发送完成事件
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送");
}
// 调用完成回调
if (completionCallback != null) {
completionCallback.onComplete(emitter, agent, request, userId, fullContent);
}
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送完成信号,SSE emitter已关闭: {}", e.getMessage());
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送完成信号,IO异常: {}", e.getMessage());
} catch (Exception e) {
log.error("处理完成事件失败", e);
} finally {
// 关闭连接
closeEmitter();
}
}
/**
* 安全关闭SSE连接
*/
public void closeEmitter() {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
// emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
}
/**
* 完成回调接口
*/
@FunctionalInterface
public interface CompletionCallback {
void onComplete(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent);
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.common.utils.LogUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 流式请求服务
* 负责处理流式请求
*/
@Slf4j
@Service
public class StreamRequestService {
@Autowired
private UserSseService unifiedSseService;
@Autowired
private EventService eventService;
@Autowired
private CompletionHandlerService completionHandlerService;
/**
* 处理流式请求
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param request Agent请求
* @param agent Agent对象
* @param userId 用户ID
*/
public void handleStreamRequest(SseEmitter emitter, AgentProcessor processor, pangea.hiagent.web.dto.AgentRequest request, Agent agent, String userId) {
LogUtils.enterMethod("handleStreamRequest", emitter, processor, request, agent, userId);
// 参数验证
if (!validateParameters(emitter, processor, request, agent, userId)) {
return;
}
// 创建流式处理的Token消费者
StreamTokenConsumer tokenConsumer = new StreamTokenConsumer(emitter, processor, unifiedSseService, eventService, completionHandlerService);
// 设置上下文信息,用于保存对话记录
tokenConsumer.setContext(agent, request, userId);
// 处理流式请求,将token缓冲和事件发送完全交给处理器实现
processor.processStreamRequest(request, agent, userId, tokenConsumer);
LogUtils.exitMethod("handleStreamRequest", "处理完成");
}
/**
* 验证所有必需参数
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param request Agent请求
* @param agent Agent对象
* @param userId 用户ID
* @return 验证是否通过
*/
private boolean validateParameters(SseEmitter emitter, AgentProcessor processor, pangea.hiagent.web.dto.AgentRequest request, Agent agent, String userId) {
return emitter != null && processor != null && request != null && agent != null && userId != null && !userId.isEmpty();
}
/**
* 流式处理的Token消费者实现
* 用于处理来自Agent处理器的token流,并将其转发给SSE emitter
*/
public static class StreamTokenConsumer implements TokenConsumerWithCompletion {
private final SseEmitter emitter;
private final AgentProcessor processor;
private final EventService eventService;
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private Agent agent;
private pangea.hiagent.web.dto.AgentRequest request;
private String userId;
private CompletionHandlerService completionHandlerService;
public StreamTokenConsumer(SseEmitter emitter, AgentProcessor processor, UserSseService unifiedSseService, EventService eventService, CompletionHandlerService completionHandlerService) {
this.emitter = emitter;
this.processor = processor;
this.eventService = eventService;
this.completionHandlerService = completionHandlerService;
}
public void setContext(Agent agent, pangea.hiagent.web.dto.AgentRequest request, String userId) {
this.agent = agent;
this.request = request;
this.userId = userId;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (!isCompleted.get()) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
eventService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
eventService.sendTokenEvent(emitter, token);
}
}
} catch (Exception e) {
log.error("发送token失败", e);
}
}
@Override
public void onComplete(String fullContent) {
// 处理完成时的回调
if (isCompleted.getAndSet(true)) {
log.debug("{} Agent处理已完成,跳过重复的完成回调", processor.getProcessorType());
return;
}
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
try {
// 使用CompletionHandlerService处理完成回调
if (completionHandlerService != null) {
completionHandlerService.handleCompletion(emitter, processor, agent, request, userId, fullContent, isCompleted);
} else {
// 如果completionHandlerService不可用,使用默认处理逻辑
try {
// 发送完成事件
emitter.send("[DONE]");
// 完成 emitter
emitter.complete();
} catch (Exception e) {
log.error("处理完成事件失败", e);
} }
} catch (Exception e) {
log.error("处理完成事件失败", e);
// 确保即使出现异常也完成emitter
try {
emitter.complete();
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
}
} }
}
\ No newline at end of file
package pangea.hiagent.agent.service; package pangea.hiagent.agent.service;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.workpanel.event.EventService;
/** /**
* Token消费者接口,支持完成回调 * Token消费者接口,支持完成回调
...@@ -17,17 +14,4 @@ public interface TokenConsumerWithCompletion extends Consumer<String> { ...@@ -17,17 +14,4 @@ public interface TokenConsumerWithCompletion extends Consumer<String> {
default void onComplete(String fullContent) { default void onComplete(String fullContent) {
// 默认实现为空 // 默认实现为空
} }
/**
* 当流式处理完成时调用,发送完成事件到前端
* @param fullContent 完整的内容
* @param emitter SSE发射器
* @param sseEventSender SSE事件发送器
* @param isCompleted 完成状态标记
*/
default void onComplete(String fullContent, SseEmitter emitter,
EventService eventService,
AtomicBoolean isCompleted) {
// 默认实现将在子类中覆盖
}
} }
\ No newline at end of file
package pangea.hiagent.agent.sse;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* SSE连接协调器
* 专门负责协调SSE连接的创建、管理和销毁过程
*/
@Slf4j
@Component
public class SseConnectionCoordinator {
private final UserSseService unifiedSseService;
private final EventService eventService;
public SseConnectionCoordinator(
UserSseService unifiedSseService,
EventService eventService) {
this.unifiedSseService = unifiedSseService;
this.eventService = eventService;
}
/**
* 创建并注册SSE连接
*
* @param userId 用户ID
* @return SSE Emitter
*/
public SseEmitter createAndRegisterConnection(String userId) {
log.debug("开始为用户 {} 创建SSE连接", userId);
// 创建 SSE emitter
SseEmitter emitter = unifiedSseService.createEmitter();
log.debug("SSE Emitter创建成功");
// 注册用户的SSE连接
unifiedSseService.registerSession(userId, emitter);
log.debug("用户 {} 的SSE连接注册成功", userId);
// 注册 emitter 回调
unifiedSseService.registerCallbacks(emitter, userId);
log.debug("SSE Emitter回调注册成功");
// 启动心跳机制
unifiedSseService.startHeartbeat(emitter, new AtomicBoolean(false));
log.debug("心跳机制启动成功");
log.info("用户 {} 的SSE连接创建和注册完成", userId);
return emitter;
}
/**
* 订阅工作面板事件
*
* @param userId 用户ID
* @param workPanelEventConsumer 工作面板事件消费者
* @param emitter SSE Emitter
*/
public void subscribeToWorkPanelEvents(String userId, Consumer<WorkPanelEvent> workPanelEventConsumer, SseEmitter emitter) {
log.debug("开始为用户 {} 订阅工作面板事件", userId);
// 发送连接成功事件
try {
WorkPanelEvent connectedEvent = WorkPanelEvent.builder()
.type("observation")
.title("连接成功")
.timestamp(System.currentTimeMillis())
.build();
eventService.sendWorkPanelEvent(emitter, connectedEvent);
log.debug("已发送连接成功事件");
} catch (Exception e) {
log.error("发送连接成功事件失败", e);
}
log.info("用户 {} 的工作面板事件订阅完成", userId);
}
}
\ No newline at end of file
package pangea.hiagent.agent.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 用户会话管理器
* 专门负责管理用户的SSE会话连接
*/
@Slf4j
@Component
public class UserSseManager {
// 存储用户ID到SSE Emitter的映射关系
private final ConcurrentMap<String, SseEmitter> userEmitters = new ConcurrentHashMap<>();
// 存储SSE Emitter到用户ID的反向映射关系(用于快速查找)
private final ConcurrentMap<SseEmitter, String> emitterUsers = new ConcurrentHashMap<>();
// 存储连接创建时间,用于超时检查
private final ConcurrentMap<SseEmitter, AtomicLong> connectionTimes = new ConcurrentHashMap<>();
// 连接超时时间(毫秒),默认30分钟
private static final long CONNECTION_TIMEOUT = 30 * 60 * 1000L;
/**
* 注册用户的SSE连接
* 如果该用户已有连接,则先关闭旧连接再注册新连接
*
* @param userId 用户ID
* @param emitter SSE Emitter
* @return true表示注册成功,false表示注册失败
*/
public boolean registerSession(String userId, SseEmitter emitter) {
if (userId == null || userId.isEmpty() || emitter == null) {
log.warn("注册SSE会话失败:用户ID或Emitter为空");
return false;
}
try {
// 检查该用户是否已有连接
SseEmitter existingEmitter = userEmitters.get(userId);
if (existingEmitter != null) {
// 如果已有连接,先关闭旧连接
log.info("用户 {} 已有SSE连接,关闭旧连接", userId);
closeEmitter(existingEmitter);
// 从映射中移除旧连接
userEmitters.remove(userId, existingEmitter);
emitterUsers.remove(existingEmitter);
}
// 注册新连接
userEmitters.put(userId, emitter);
emitterUsers.put(emitter, userId);
// 记录连接创建时间
connectionTimes.put(emitter, new AtomicLong(System.currentTimeMillis()));
log.info("成功为用户 {} 注册SSE会话", userId);
return true;
} catch (Exception e) {
log.error("注册SSE会话时发生异常:用户ID={}", userId, e);
return false;
}
}
/**
* 移除用户的SSE会话
*
* @param emitter SSE Emitter
*/
public void removeSession(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
String userId = emitterUsers.get(emitter);
if (userId != null) {
userEmitters.remove(userId, emitter);
emitterUsers.remove(emitter);
connectionTimes.remove(emitter);
log.debug("已移除用户 {} 的SSE会话", userId);
}
} catch (Exception e) {
log.warn("移除SSE会话时发生异常", e);
}
}
/**
* 获取用户的当前SSE连接
*
* @param userId 用户ID
* @return SSE Emitter,如果不存在则返回null
*/
public SseEmitter getSession(String userId) {
if (userId == null || userId.isEmpty()) {
return null;
}
return userEmitters.get(userId);
}
/**
* 检查用户是否有活跃的SSE连接
*
* @param userId 用户ID
* @return true表示有活跃连接,false表示没有
*/
public boolean hasActiveSession(String userId) {
if (userId == null || userId.isEmpty()) {
return false;
}
SseEmitter emitter = userEmitters.get(userId);
return emitter != null && isEmitterValid(emitter) && !isSessionExpired(emitter);
}
/**
* 检查SSE Emitter是否仍然有效
*
* @param emitter 要检查的emitter
* @return 如果有效返回true,否则返回false
*/
public boolean isEmitterValid(SseEmitter emitter) {
if (emitter == null) {
return false;
}
// 检查emitter是否已完成
try {
// 尝试发送一个空事件来检查连接状态
emitter.send(SseEmitter.event().name("ping").data("").build());
return true;
} catch (Exception e) {
// 如果出现任何异常,认为连接已失效
return false;
}
}
/**
* 检查会话是否已过期
*
* @param emitter 要检查的emitter
* @return 如果过期返回true,否则返回false
*/
public boolean isSessionExpired(SseEmitter emitter) {
if (emitter == null) {
return true;
}
AtomicLong connectionTime = connectionTimes.get(emitter);
if (connectionTime == null) {
return false; // 如果没有记录时间,假设未过期
}
long currentTime = System.currentTimeMillis();
long connectionStartTime = connectionTime.get();
return (currentTime - connectionStartTime) > CONNECTION_TIMEOUT;
}
/**
* 关闭指定的SSE Emitter
*
* @param emitter SSE Emitter
*/
public void closeEmitter(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
emitter.complete();
log.debug("已关闭SSE Emitter");
} catch (Exception e) {
log.warn("关闭SSE Emitter时发生异常", e);
}
}
/**
* 获取当前会话的用户数量
*
* @return 用户数量
*/
public int getSessionCount() {
return userEmitters.size();
}
/**
* 清理所有会话(用于系统关闭时)
*/
public void clearAllSessions() {
try {
for (SseEmitter emitter : emitterUsers.keySet()) {
try {
emitter.complete();
} catch (Exception e) {
log.warn("关闭SSE Emitter时发生异常", e);
}
}
userEmitters.clear();
emitterUsers.clear();
connectionTimes.clear();
log.info("已清理所有SSE会话");
} catch (Exception e) {
log.error("清理所有SSE会话时发生异常", e);
}
}
/**
* 处理连接完成事件
* 职责:协调完成连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionCompletion(SseEmitter emitter) {
log.debug("处理SSE连接完成事件");
// 移除连接
removeSession(emitter);
}
/**
* 处理连接超时事件
* 职责:协调超时连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionTimeout(SseEmitter emitter) {
log.debug("处理SSE连接超时事件");
// 移除连接
removeSession(emitter);
}
/**
* 处理连接错误事件
* 职责:协调错误连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionError(SseEmitter emitter) {
log.debug("处理SSE连接错误事件");
// 移除连接
removeSession(emitter);
}
}
\ No newline at end of file
...@@ -15,8 +15,8 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry ...@@ -15,8 +15,8 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.HandshakeInterceptor; import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.util.UriComponentsBuilder; import org.springframework.web.util.UriComponentsBuilder;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import pangea.hiagent.common.utils.JwtUtil; import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import pangea.hiagent.websocket.DomSyncHandler; import pangea.hiagent.websocket.DomSyncHandler;
import java.util.Map; import java.util.Map;
......
...@@ -5,7 +5,6 @@ import lombok.extern.slf4j.Slf4j; ...@@ -5,7 +5,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import pangea.hiagent.common.utils.UserUtils; import pangea.hiagent.common.utils.UserUtils;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
...@@ -46,7 +45,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler { ...@@ -46,7 +45,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler {
if (metaObject.hasSetter("createdBy")) { if (metaObject.hasSetter("createdBy")) {
Object createdBy = getFieldValByName("createdBy", metaObject); Object createdBy = getFieldValByName("createdBy", metaObject);
if (createdBy == null) { if (createdBy == null) {
String userId = UserUtils.getCurrentUserId(); String userId = getCurrentUserIdWithContext();
if (userId != null) { if (userId != null) {
this.strictInsertFill(metaObject, "createdBy", String.class, userId); this.strictInsertFill(metaObject, "createdBy", String.class, userId);
log.debug("自动填充createdBy字段: {}", userId); log.debug("自动填充createdBy字段: {}", userId);
...@@ -60,7 +59,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler { ...@@ -60,7 +59,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler {
if (metaObject.hasSetter("updatedBy")) { if (metaObject.hasSetter("updatedBy")) {
Object updatedBy = getFieldValByName("updatedBy", metaObject); Object updatedBy = getFieldValByName("updatedBy", metaObject);
if (updatedBy == null) { if (updatedBy == null) {
String userId = UserUtils.getCurrentUserId(); String userId = getCurrentUserIdWithContext();
if (userId != null) { if (userId != null) {
this.strictInsertFill(metaObject, "updatedBy", String.class, userId); this.strictInsertFill(metaObject, "updatedBy", String.class, userId);
log.debug("自动填充updatedBy字段: {}", userId); log.debug("自动填充updatedBy字段: {}", userId);
...@@ -91,7 +90,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler { ...@@ -91,7 +90,7 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler {
Object updatedBy = getFieldValByName("updatedBy", metaObject); Object updatedBy = getFieldValByName("updatedBy", metaObject);
// 如果updatedBy为空或者需要强制更新,则填充当前用户ID // 如果updatedBy为空或者需要强制更新,则填充当前用户ID
if (updatedBy == null) { if (updatedBy == null) {
String userId = UserUtils.getCurrentUserId(); String userId = getCurrentUserIdWithContext();
if (userId != null) { if (userId != null) {
this.strictUpdateFill(metaObject, "updatedBy", String.class, userId); this.strictUpdateFill(metaObject, "updatedBy", String.class, userId);
log.debug("自动填充updatedBy字段: {}", userId); log.debug("自动填充updatedBy字段: {}", userId);
...@@ -101,4 +100,31 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler { ...@@ -101,4 +100,31 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler {
} }
} }
} }
/**
* 获取当前用户ID,支持异步线程上下文
* 该方法支持以下场景:
* 1. 优先从ThreadLocal获取(支持异步线程)
* 2. 从SecurityContext获取(支持同步请求和AsyncUserContextDecorator传播)
* 3. 从请求中解析Token获取用户ID
*
* @return 用户ID,如果无法获取则返回null
*/
private String getCurrentUserIdWithContext() {
try {
// 直接调用UserUtils.getCurrentUserIdStatic(),该方法已经包含了所有获取用户ID的方式
// 并且优先从ThreadLocal获取,支持异步线程
String userId = UserUtils.getCurrentUserIdStatic();
if (userId != null) {
log.debug("成功获取用户ID: {}", userId);
return userId;
}
log.warn("无法通过任何方式获取当前用户ID,createdBy/updatedBy字段将不被填充");
return null;
} catch (Exception e) {
log.error("获取用户ID时发生异常", e);
return null;
}
}
} }
\ No newline at end of file
...@@ -21,6 +21,7 @@ import pangea.hiagent.web.service.AgentService; ...@@ -21,6 +21,7 @@ import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.web.service.TimerService; import pangea.hiagent.web.service.TimerService;
import pangea.hiagent.security.DefaultPermissionEvaluator; import pangea.hiagent.security.DefaultPermissionEvaluator;
import pangea.hiagent.security.JwtAuthenticationFilter; import pangea.hiagent.security.JwtAuthenticationFilter;
import pangea.hiagent.security.SseAuthorizationFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
...@@ -33,11 +34,13 @@ import java.util.Collections; ...@@ -33,11 +34,13 @@ import java.util.Collections;
public class SecurityConfig { public class SecurityConfig {
private final JwtAuthenticationFilter jwtAuthenticationFilter; private final JwtAuthenticationFilter jwtAuthenticationFilter;
private final SseAuthorizationFilter sseAuthorizationFilter;
private final AgentService agentService; private final AgentService agentService;
private final TimerService timerService; private final TimerService timerService;
public SecurityConfig(JwtAuthenticationFilter jwtAuthenticationFilter, AgentService agentService, TimerService timerService) { public SecurityConfig(JwtAuthenticationFilter jwtAuthenticationFilter, SseAuthorizationFilter sseAuthorizationFilter, AgentService agentService, TimerService timerService) {
this.jwtAuthenticationFilter = jwtAuthenticationFilter; this.jwtAuthenticationFilter = jwtAuthenticationFilter;
this.sseAuthorizationFilter = sseAuthorizationFilter;
this.agentService = agentService; this.agentService = agentService;
this.timerService = timerService; this.timerService = timerService;
} }
...@@ -131,21 +134,25 @@ public class SecurityConfig { ...@@ -131,21 +134,25 @@ public class SecurityConfig {
try { try {
// 对于SSE端点的特殊处理 // 对于SSE端点的特殊处理
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream"); boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
boolean isTimelineEndpoint = request.getRequestURI().contains("/api/v1/agent/timeline-events");
if (isStreamEndpoint) {
if (isStreamEndpoint || isTimelineEndpoint) { // 再次检查响应是否已经提交
// 对于SSE端点,发送SSE格式的错误事件 if (response.isCommitted()) {
response.setContentType("text/event-stream;charset=UTF-8"); log.warn("SSE端点响应已提交,无法发送认证错误");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"未授权访问\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return; return;
} }
// 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"未授权访问\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return;
}
response.setStatus(401); response.setStatus(401);
response.setContentType("application/json;charset=UTF-8"); response.setContentType("application/json;charset=UTF-8");
...@@ -171,21 +178,25 @@ public class SecurityConfig { ...@@ -171,21 +178,25 @@ public class SecurityConfig {
try { try {
// 对于SSE端点的特殊处理 // 对于SSE端点的特殊处理
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream"); boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
boolean isTimelineEndpoint = request.getRequestURI().contains("/api/v1/agent/timeline-events");
if (isStreamEndpoint) {
if (isStreamEndpoint || isTimelineEndpoint) { // 再次检查响应是否已经提交
// 对于SSE端点,发送SSE格式的错误事件 if (response.isCommitted()) {
response.setContentType("text/event-stream;charset=UTF-8"); log.warn("SSE端点响应已提交,无法发送访问拒绝错误");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"访问被拒绝\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return; return;
} }
// 对于SSE端点,发送SSE格式的错误事件
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
response.getWriter().write("event: error\ndata: {\"error\": \"访问被拒绝\", \"timestamp\": " + System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
// 确保响应被正确提交
if (!response.isCommitted()) {
response.flushBuffer();
}
return;
}
response.setStatus(403); response.setStatus(403);
response.setContentType("application/json;charset=UTF-8"); response.setContentType("application/json;charset=UTF-8");
...@@ -205,6 +216,8 @@ public class SecurityConfig { ...@@ -205,6 +216,8 @@ public class SecurityConfig {
) )
// 添加JWT认证过滤器 // 添加JWT认证过滤器
.addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class) .addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class)
// 添加SSE授权检查过滤器,在JWT过滤器之后但在其他安全过滤器之前运行
.addFilterAfter(sseAuthorizationFilter, JwtAuthenticationFilter.class)
// 配置X-Frame-Options头部,允许同源iframe嵌入 // 配置X-Frame-Options头部,允许同源iframe嵌入
.headers(headers -> headers .headers(headers -> headers
.frameOptions(frameOptions -> frameOptions .frameOptions(frameOptions -> frameOptions
......
package pangea.hiagent.common.config; // package pangea.hiagent.common.config;
import org.springframework.context.annotation.Configuration; // import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy; // import org.springframework.context.annotation.EnableAspectJAutoProxy;
/** // /**
* 工具执行日志记录配置类 // * 工具执行日志记录配置类
* 启用AOP代理以实现工具执行信息的自动记录 // * 启用AOP代理以实现工具执行信息的自动记录
*/ // */
@Configuration // @Configuration
@EnableAspectJAutoProxy // @EnableAspectJAutoProxy
public class ToolExecutionLoggingConfig { // public class ToolExecutionLoggingConfig {
} // }
\ No newline at end of file \ No newline at end of file
This diff is collapsed.
package pangea.hiagent.workpanel.playwright; package pangea.hiagent.tool.playwright;
import com.microsoft.playwright.Browser; import com.microsoft.playwright.Browser;
import com.microsoft.playwright.BrowserContext; import com.microsoft.playwright.BrowserContext;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment