Commit cadf09e0 authored by ligaowei's avatar ligaowei

refactor(agent): 优化错误处理逻辑及回调机制

- 替换 BaseAgentProcessor 中的 agentErrorHandler 为 errorHandlerService,统一错误处理调用
- 增强 handleSyncError,实现401 Unauthorized错误的特殊提示“请配置API密钥”
- 流式处理异常时调用 errorHandlerService 处理错误消息,避免抛出异常中断处理流程
- 在 NormalAgentProcessor 和 ReActAgentProcessor 中,异常时调用 TokenConsumerWithCompletion 的 onComplete 方法通知完成状态
- 确保错误发送后均触发完成回调,避免流式调用卡死或状态不一致
- 清理无用依赖,确保错误处理逻辑集中管理并规范调用
parent 6642b159
# SSE 心跳保活机制改进方案
## 问题描述
之前对话返回信息过长时,会因为流式响应超时(60秒无消息)而显示"[错误] 流式输出超时,请重试",导致SSE连接被关闭。
## 解决方案
### 前端改进 (ChatArea.vue)
#### 1. 改进超时检测机制
- **原来**: 简单的60秒全局超时,无任何数据到达就关闭
- **现在**: 使用心跳保活机制,定期检查是否收到心跳
```typescript
// 关键参数
const HEARTBEAT_TIMEOUT = 60000; // 60秒无心跳则为超时
const HEARTBEAT_CHECK_INTERVAL = 5000; // 每5秒检查一次
let lastHeartbeatTime = Date.now(); // 记录最后一次心跳时间
```
#### 2. 新增心跳事件处理
`processSSELine` 函数中新增 heartbeat case:
```typescript
case "heartbeat":
// 收到心跳事件,重置超时计时器
resetStreamTimeout();
// 心跳事件本身不处理,只用于保活连接
console.debug("[心跳] 收到心跳事件,连接保活");
return false;
```
#### 3. 改进的超时判断逻辑
```typescript
const resetStreamTimeout = () => {
clearStreamTimeout();
lastHeartbeatTime = Date.now(); // 更新最后心跳时间
streamTimeoutTimer = setTimeout(() => {
if (!isStreamComplete) {
// 检查是否在指定时间内收到过心跳或数据
const timeSinceLastHeartbeat = Date.now() - lastHeartbeatTime;
if (timeSinceLastHeartbeat >= HEARTBEAT_TIMEOUT) {
// 真正的超时,关闭连接
isStreamComplete = true;
reader.cancel();
// ... 显示超时错误
} else {
// 还没超时,继续检查
resetStreamTimeout();
}
}
}, HEARTBEAT_CHECK_INTERVAL);
};
```
**工作原理**
1. 每当收到token、心跳或其他数据时,重置超时计时器并更新`lastHeartbeatTime`
2. 每5秒检查一次是否超时
3. 只有当最后一次心跳/数据距现在超过60秒时,才真正认为超时并关闭连接
4. 否则继续检查,保持连接活跃
---
### 后端改进 (UserSseService.java)
#### 1. 调整心跳发送频率
- **原来**: 每30秒发送一次心跳
- **现在**: 每20秒发送一次心跳
```java
}, 20, 20, TimeUnit.SECONDS); // 每20秒发送一次心跳,确保前端60秒超时前至少收到2次心跳
```
**原因**: 确保在前端60秒超时前,至少能收到2次心跳,增加可靠性
#### 2. 增强心跳日志
```java
long heartbeatTimestamp = System.currentTimeMillis();
emitter.send(SseEmitter.event().name("heartbeat").data(heartbeatTimestamp));
log.debug("[心跳] 成功发送心跳事件,时间戳: {}", heartbeatTimestamp);
```
#### 3. 心跳机制的完整生命周期
- **启动**: 创建连接时调用 `startHeartbeat()`
- **运行**: 每20秒检查一次连接有效性,如果有效则发送心跳
- **停止**: 在连接完成/超时/错误时自动取消心跳任务
```java
// 注册回调,在连接完成时取消心跳任务
emitter.onCompletion(() -> {
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(true);
log.debug("SSE连接完成,心跳任务已取消");
}
});
// 类似的处理: onTimeout(), onError()
```
---
## 工作流程
### 正常情况(消息持续到达)
```
时间轴: 0s ─── 10s ─── 20s ─── 30s ─── 40s ─── 50s ─── 60s
│ │ │
token token token
│ │ │
重置超时 重置超时 重置超时
(60s) (60s) (60s)
```
连接保持活跃,不会超时。
### 有心跳但消息间隔长(解决长时间处理问题)
```
时间轴: 0s ─── 10s ─── 20s ─── 30s ─── 40s ─── 50s ─── 60s ─── 70s ─── 80s
token 心跳 心跳 心跳 token
│ │ │ │ │
重置超时 重置超时 重置超时 重置超时 重置超时
(60s) (60s) (60s) (60s) (60s)
```
心跳每20秒发送一次,保持连接活跃,即使消息处理需要很长时间。
### 真正超时的情况(心跳也断开)
```
时间轴: 0s ─── 20s ─── 40s ─── 60s ─── 70s(超时)
token 心跳 心跳 [无更多心跳]
│ │ │
重置超时 重置超时 重置超时
(60s) (60s) (60s)
超过60秒无响应,关闭连接
```
当网络真的中断或服务器崩溃时,经过60秒无任何响应,客户端才会超时并提示用户。
---
## 关键时间参数
| 参数 | 值 | 说明 |
|------|-----|------|
| 心跳间隔(后端)| 20秒 | 后端定期向客户端发送心跳 |
| 前端超时时间 | 60秒 | 前端在60秒内无心跳/数据则超时 |
| 检查间隔(前端)| 5秒 | 前端每5秒检查一次是否超时 |
| SSE连接超时(后端)| 120秒 | Spring框架层面的连接超时 |
**设计原理**: 心跳间隔 (20s) < 前端超时时间 (60s) / 2,保证前端超时前至少收到2次心跳。
---
## 对话结束和错误处理
### 对话正常结束
1. 后端发送 `complete` 事件
2. 前端收到 `complete` 事件,调用 `clearStreamTimeout()`
3. 流式处理完成,关闭所有计时器和监听
### 发生错误时
1. 后端发送 `error` 事件
2. 前端收到 `error` 事件,调用 `clearStreamTimeout()`
3. 关闭连接和心跳检查,显示错误信息
### 心跳中断且超时
1. 前端在60秒内未收到任何心跳/数据
2. 前端认定连接超时,取消读取并显示错误
3. 用户可以点击重试按钮重新发送消息
---
## 调试
### 后端日志
```
[心跳] 成功发送心跳事件,时间戳: 1640000000000
```
### 前端日志
```
[心跳] 收到心跳事件,连接保活
[SSE完成事件] {type: "complete", ...}
```
### 超时测试
1. 故意让后端处理延迟超过60秒的请求
2. 观察是否能收到心跳事件
3. 连接应该保持活跃,不会因为消息间隔长而断开
4. 直到对话完成或心跳真的中断,才会关闭连接
---
## 总结
这个改进方案通过引入心跳保活机制,解决了以下问题:
✅ 长时间处理的对话不会因为超时而意外断开
✅ 心跳中断才会真正关闭连接(而不是任意时间无消息就关闭)
✅ 流式响应自然结束或错误发生时,及时清理资源
✅ 系统更加稳定可靠,特别是对于复杂AI任务处理
# HiAgent 工具管理方案
## 1. 概述
本文档旨在详细说明 HiAgent 平台的工具管理机制,确保工具方法能够被 Spring AOP 正确代理,并支持手动扫描注册及 UI 配置功能。
通过对现有代码的分析,我们发现当前系统在工具管理方面还存在一些问题,主要包括:
1. 缺少手动触发工具扫描的前端界面
2. 工具无法被正确找到和调用的问题
3. 工具扫描API端点未暴露给前端使用
本文档将在分析这些问题的基础上,提出相应的改进建议.
## 2. Spring AOP 代理兼容性方案
### 2.1 工具类设计规范
为了确保工具方法能够被 Spring AOP 正确代理,所有工具类需要遵循以下规范:
1. **注解使用**
- 工具类必须使用 `@Component` 或其派生注解(如 `@Service`)进行标记
- 工具方法必须使用 `@org.springframework.ai.tool.annotation.Tool` 注解进行标记
2. **访问修饰符**
- 工具方法必须是 `public` 方法
- 避免在同一个类中直接调用其他带有 `@Tool` 注解的方法
3. **类设计**
- 工具类应该是无状态的,或者状态应该是线程安全的
- 避免使用 `final` 方法,因为这会影响 CGLIB 代理的创建
### 2.2 AOP 代理穿透机制
系统已经实现了 AOP 代理穿透机制,确保即使在使用 Spring AOP 代理的情况下也能正确获取工具信息:
1.[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 中提供了 `getTargetClass()` 方法来获取代理对象的原始类:
```java
private Class<?> getTargetClass(Object bean) {
if (bean == null) {
return null;
}
return AopUtils.getTargetClass(bean);
}
```
2. 在工具匹配过程中,系统会穿透代理获取真实的类信息进行比较,确保匹配准确性。
### 2.3 工具执行日志切面
系统通过 [ToolExecutionLoggerAspect.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/aspect/ToolExecutionLoggerAspect.java) 实现了工具执行的日志记录和监控:
1. 使用 `@Around("@annotation(tool)")` 环绕通知拦截所有带有 `@Tool` 注解的方法
2. 自动记录工具执行的输入参数、输出结果、执行时间等信息
3. 将工具执行信息同步到 WorkPanel 进行可视化展示
## 3. 工具扫描与注册机制
### 3.1 自动扫描机制
系统通过 [ToolBeanNameInitializer.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/ToolBeanNameInitializer.java) 实现工具的自动扫描和注册:
1. **扫描范围**
- 扫描所有 Spring 容器中的 Bean
- 识别带有 `@Tool` 注解方法的类作为工具类
- 过滤掉 Spring 框架自带的 Bean
2. **工具识别规则**
- 类名包含 "Tool" 关键字
-`@Component``@Service` 标注
- 类中包含带有 `@Tool` 注解的方法
3. **工具名称推导**
- 从类名推导工具名称,去除 "Tool" 后缀
- 转换为小驼峰命名格式
### 3.2 手动触发扫描
系统支持通过管理界面手动触发工具扫描和注册:
1. 提供 `initializeToolBeanNamesManually()` 方法用于手动触发扫描
2. 扫描过程会与数据库中的工具记录进行同步:
- 如果数据库中已存在对应工具,则更新 beanName
- 如果数据库中不存在对应工具,则创建新的工具记录
- 如果数据库中有记录但 Spring 容器中不存在对应 Bean,则记录警告信息
目前系统已经实现了手动扫描功能的后端API端点,位于 `/api/v1/admin/system/initialize-tool-beans`,通过 POST 请求触发。但在前端界面上还未提供相应的人机交互界面。
### 3.3 数据库同步策略
工具信息会被持久化存储在数据库中,确保系统重启后配置不会丢失:
1. **工具实体**
- 工具名称(唯一标识)
- Spring Bean 名称(用于查找对应的实例)
- 工具显示名称
- 工具描述
- 工具状态(active/inactive)
- 工具所有者等信息
2. **同步机制**
- 系统启动时不自动执行扫描(避免影响启动速度)
- 通过管理界面手动触发扫描和同步
- 支持增量更新,只处理发生变化的工具
## 4. 当前存在的问题与改进建议
### 4.1 当前存在的主要问题
通过分析现有代码和功能实现,我们发现工具管理系统存在以下主要问题:
1. **缺少手动扫描的前端界面**
- 后端已经实现了手动扫描工具的API端点(`/api/v1/admin/system/initialize-tool-beans`
- 但前端尚未提供相应的用户界面来触发这一功能
2. **工具无法正确找到和调用**
-[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java)`getAvailableToolInstances` 方法中,当工具的 beanName 为空或查找失败时,仅记录日志而没有提供有效的错误反馈机制
- 工具调用失败时缺乏详细的错误信息和调试手段
3. **工具管理页面功能不完善**
- 当前的 [ToolManagement.vue](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/frontend/src/pages/ToolManagement.vue) 页面仅支持基础的增删改查功能
- 缺少与后端扫描功能的集成
### 4.2 改进建议
针对上述问题,我们提出以下改进建议:
#### 4.2.1 完善前端工具管理界面
1. **增加手动扫描按钮**
- 在工具管理页面添加"扫描工具"按钮
- 点击后调用后端API `/api/v1/admin/system/initialize-tool-beans` 触发扫描
- 显示扫描进度和结果
- 提供扫描历史记录查看功能
2. **增强工具详情展示**
- 在工具列表中增加显示工具的Bean名称、状态等详细信息
- 提供工具测试功能,允许用户直接测试工具调用
- 显示工具的最后更新时间和创建者信息
3. **优化错误提示**
- 当工具无法找到或调用失败时,提供更明确的错误信息
- 增加工具诊断功能,帮助用户排查问题
- 提供常见问题解决方案链接和帮助文档
4. **增加工具诊断界面**
- 提供单个工具的详细诊断信息查看
- 支持批量工具状态检查
- 显示工具依赖关系图谱#### 4.2.2 后端功能优化
1. **完善工具调用错误处理**
-[AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 中增强错误处理机制
- 提供更详细的错误信息,便于前端展示和用户排查问题
- 增加结构化的错误信息返回,包含具体的原因和解决方案建议
2. **增加工具诊断API**
- 提供工具诊断端点,检查工具是否正确定义和注册
- 返回工具的详细信息和可能存在的问题
- 支持单个工具诊断和批量工具诊断功能
3. **优化日志记录**
- 增强工具调用过程中的日志记录
- 提供更详细的调试信息,便于问题追踪
- 结构化日志信息,方便后续分析和问题定位## 5. 实施步骤
### 5.1 后端实施
1. 完善 [ToolBeanNameInitializer.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/ToolBeanNameInitializer.java) 的手动扫描接口
2. 优化 [AgentToolManager.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/AgentToolManager.java) 的工具获取逻辑,增强错误处理和日志记录
3. 增强 [ToolExecutionLoggerAspect.java](file:///c:/Users/Gavin/Documents/PangeaFinal/HiAgent/backend/src/main/java/pangea/hiagent/tool/aspect/ToolExecutionLoggerAspect.java) 的日志记录功能
4. 增加工具诊断API端点,提供工具状态检查功能
5. 实现工具依赖关系分析功能
6. 增加工具使用统计和性能监控
### 5.2 前端实施
1. 在工具管理页面增加手动扫描按钮,调用 `/api/v1/admin/system/initialize-tool-beans` 端点
2. 增强工具列表展示,显示更多工具详细信息如Bean名称、状态等
3. 增加工具测试功能,允许用户直接测试工具调用
4. 优化错误提示,提供更明确的错误信息帮助用户排查问题
5. 实现工具诊断界面,支持单个和批量工具诊断
6. 增加工具依赖关系可视化展示
### 5.3 测试验证
1. 验证工具方法的 AOP 代理兼容性
2. 测试手动扫描和自动注册功能
3. 验证 UI 配置功能的完整性和易用性
4. 测试工具执行的日志记录和监控功能
5. 验证错误处理机制的有效性
6. 测试工具诊断功能的准确性和完整性
7. 验证工具依赖关系分析的正确性
## 6. 总结
本方案通过规范工具类设计、实现 AOP 代理穿透、建立完善的扫描注册机制以及提供友好的 UI 配置界面,全面解决了工具管理的相关需求。该方案既保证了系统的稳定性和扩展性,又提升了用户的使用体验。
通过对现有代码的分析,我们确认系统已经具备了良好的基础架构,包括:
1. 完善的 AOP 代理支持
2. 工具扫描和注册的核心功能实现
3. 工具调用的日志记录机制
接下来的工作重点应该放在完善前端界面和增强错误处理上,使系统更加易于使用和维护。特别需要关注的是工具诊断功能的实现,这将大大提高系统运维和问题排查的效率。
\ No newline at end of file
This diff is collapsed.
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.8</version>
<version>3.5.9</version>
<relativePath/>
</parent>
......@@ -108,7 +108,6 @@
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-milvus-store</artifactId>
<version>${spring-ai.version}</version>
</dependency>
......@@ -155,14 +154,12 @@
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
<!-- H2 Database -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
<!-- Redis -->
......@@ -194,14 +191,12 @@
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<!-- Jackson -->
......@@ -234,7 +229,6 @@
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>8.0.1.Final</version>
</dependency>
<!-- SpringDoc OpenAPI for Swagger -->
......@@ -351,7 +345,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
......@@ -370,7 +363,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<argLine>-Dfile.encoding=UTF-8</argLine>
</configuration>
......
......@@ -9,7 +9,7 @@ import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.memory.SmartHistorySummarizer;
import pangea.hiagent.model.Agent;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.agent.service.AgentErrorHandler;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
......@@ -38,8 +38,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
@Autowired
protected ErrorHandlerService errorHandlerService;
@Autowired
protected AgentErrorHandler agentErrorHandler;
// 默认系统提示词
protected static final String DEFAULT_SYSTEM_PROMPT = "你是一个智能助手";
......@@ -135,7 +134,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @return 是否为401错误
*/
protected boolean isUnauthorizedError(Throwable e) {
return agentErrorHandler.isUnauthorizedError(e);
return errorHandlerService.isUnauthorizedError(new Exception(e));
}
/**
......@@ -146,7 +145,17 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @return 错误消息
*/
protected String handleSyncError(Throwable e, String errorMessagePrefix) {
return agentErrorHandler.handleSyncError(e, errorMessagePrefix);
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return "请配置API密钥";
} else {
String errorMessage = e.getMessage();
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
return errorMessagePrefix + ": " + errorMessage;
}
}
/**
......@@ -325,7 +334,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
hasError.set(true);
// 不再重新抛出异常,避免中断流式处理
// 但我们应该记录这个错误并向客户端发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] 处理token时发生错误: " + e.getMessage());
errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] 处理token时发生错误: " + e.getMessage());
}
}
} catch (Exception e) {
......@@ -344,7 +353,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
*/
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer, AtomicBoolean hasError) {
hasError.set(true);
agentErrorHandler.handleStreamError(throwable, tokenConsumer, "流式调用出错");
errorHandlerService.handleStreamError(throwable, tokenConsumer, "流式调用出错");
}
/**
......@@ -415,7 +424,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
* @param isCompleted 是否已完成
*/
private void handleStreamModelError(Consumer<String> tokenConsumer, AtomicBoolean isCompleted) {
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] 流式模型或提示词为空,无法启动流式处理");
errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] 流式模型或提示词为空,无法启动流式处理");
// 标记完成
isCompleted.set(true);
}
......@@ -430,7 +439,7 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
*/
private void handleUnexpectedError(Exception e, Consumer<String> tokenConsumer, AtomicBoolean isCompleted) {
String errorMessage = handleSyncError(e, "处理流式响应时发生错误");
agentErrorHandler.sendErrorMessage(tokenConsumer, "[错误] " + errorMessage);
errorHandlerService.sendErrorMessage(tokenConsumer, "[错误] " + errorMessage);
// 确保标记为已完成
isCompleted.set(true);
}
......
......@@ -11,6 +11,7 @@ import pangea.hiagent.rag.RagService;
import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/**
* 普通Agent处理器实现类
......@@ -67,7 +68,7 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
return responseContent;
} catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "模型调用失败");
return handleSyncError(e, "模型调用失败");
}
}
......@@ -101,8 +102,15 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
// 流式处理
handleStreamingResponse(tokenConsumer, prompt, streamingChatModel, sessionId);
} catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage());
errorHandlerService.handleStreamError(e, tokenConsumer, "普通Agent流式处理失败");
// 直接调用完成回调,不依赖AgentErrorHandler
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
......@@ -114,9 +122,15 @@ public class NormalAgentProcessor extends BaseAgentProcessor {
private void handleModelNotSupportStream(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 当前模型不支持流式输出";
// 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage);
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage);
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
@Override
......
......@@ -15,6 +15,7 @@ import pangea.hiagent.web.service.AgentService;
import java.util.List;
import java.util.function.Consumer;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
/**
* ReAct Agent处理器实现类
......@@ -94,7 +95,7 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
return finalAnswer;
} catch (Exception e) {
return agentErrorHandler.handleSyncError(e, "处理ReAct请求时发生错误");
return handleSyncError(e, "处理ReAct请求时发生错误");
}
}
......@@ -139,8 +140,15 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
// 使用ReAct执行器流式执行流程,传递Agent对象以支持记忆功能和用户ID以确保上下文传播
defaultReactExecutor.executeStream(client, userMessage, tools, tokenConsumer, agent, userId);
} catch (Exception e) {
agentErrorHandler.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误");
agentErrorHandler.ensureCompletionCallback(tokenConsumer, "处理请求时发生错误: " + e.getMessage());
errorHandlerService.handleStreamError(e, tokenConsumer, "流式处理ReAct请求时发生错误");
// 直接调用完成回调,不依赖AgentErrorHandler
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("处理请求时发生错误: " + e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
......@@ -152,8 +160,14 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
private void handleModelNotAvailable(Consumer<String> tokenConsumer) {
String errorMessage = "[错误] 无法获取Agent的聊天模型";
// 发送错误信息
agentErrorHandler.sendErrorMessage(tokenConsumer, errorMessage);
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
// 确保在异常情况下也调用完成回调
agentErrorHandler.ensureCompletionCallback(tokenConsumer, errorMessage);
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.react;
import lombok.Data;
/**
* ReAct步骤对象,包含步骤的所有核心信息
* ReAct步骤类,用于表示ReAct执行过程中的单个步骤
*/
@Data
public class ReactStep {
/**
* 步骤编号
*/
private int stepNumber;
/**
* 步骤类型
*/
private ReactStepType stepType;
/**
* 步骤核心内容(思维描述、动作指令、观察结果等)
*/
private String content;
/**
* 工具调用信息(仅在ACTION步骤时有值)
*/
private ToolCallAction action;
/**
* 工具观察结果(仅在OBSERVATION步骤时有值)
*/
private ToolObservation observation;
/**
* 构造函数
*/
public ReactStep() {}
/**
* 构造函数
* @param stepNumber 步骤编号
* @param stepType 步骤类型
* @param content 步骤内容
*/
public ReactStep(int stepNumber, ReactStepType stepType, String content) {
this.stepNumber = stepNumber;
this.stepType = stepType;
this.content = content;
}
// Getters and Setters
public int getStepNumber() { return stepNumber; }
public void setStepNumber(int stepNumber) { this.stepNumber = stepNumber; }
public ReactStepType getStepType() { return stepType; }
public void setStepType(ReactStepType stepType) { this.stepType = stepType; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public ToolCallAction getAction() { return action; }
public void setAction(ToolCallAction action) { this.action = action; }
public ToolObservation getObservation() { return observation; }
public void setObservation(ToolObservation observation) { this.observation = observation; }
/**
* 工具调用动作类
* 工具调用动作内部
*/
@Data
public static class ToolCallAction {
/**
* 工具名称
*/
private String toolName;
private Object toolArgs;
/**
* 工具调用参数
*/
private Object parameters;
public ToolCallAction() {}
public ToolCallAction(String toolName, Object parameters) {
public ToolCallAction(String toolName, Object toolArgs) {
this.toolName = toolName;
this.parameters = parameters;
this.toolArgs = toolArgs;
}
public String getToolName() { return toolName; }
public void setToolName(String toolName) { this.toolName = toolName; }
public Object getToolArgs() { return toolArgs; }
public void setToolArgs(Object toolArgs) { this.toolArgs = toolArgs; }
// 根据DefaultReactCallback.java中的使用情况添加getParameters方法
public Object getParameters() { return toolArgs; }
}
/**
* 工具观察结果类
* 工具观察结果内部
*/
@Data
public static class ToolObservation {
/**
* 观察内容
*/
private String content;
public ToolObservation() {}
private String result;
public ToolObservation(String content) {
this.content = content;
public ToolObservation(String result) {
this.result = result;
}
public String getResult() { return result; }
public void setResult(String result) { this.result = result; }
// 根据DefaultReactCallback.java中的使用情况添加getContent方法
public String getContent() { return result; }
}
}
\ No newline at end of file
package pangea.hiagent.agent.react;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
@Slf4j
@Component
public class TokenTextSegmenter {
@Autowired
private IWorkPanelDataCollector workPanelCollector;
// 定义分段标识(按出现优先级排序)
private static final String[] SEGMENT_MARKERS = {
"Thought:",
"Action:",
"Observation:",
"Final_Answer:"
};
// 当前缓存的输入字符
private StringBuilder currentInputBuffer;
public TokenTextSegmenter() {
currentInputBuffer = new StringBuilder();
}
/**
* 逐字输入字符并处理分段
*
* @param inputChar 单个输入字符
* @return 当分割出有效文本段时返回该段内容,否则返回null
*/
public synchronized void inputChar(String inputChar) {
// 输入验证
if (inputChar == null) {
return;
}
// 将字符加入缓存
currentInputBuffer.append(inputChar);
String currentBufferStr = currentInputBuffer.toString();
log.info("【输入字符】: {}", currentBufferStr);
// 检查当前缓冲区中是否出现任何SEGMENT_MARKERS字段
for (String marker : SEGMENT_MARKERS) {
int markerIndex = currentBufferStr.indexOf(marker);
if (markerIndex != -1) {
// 找到SEGMENT_MARKERS字段,截取该字段之前的文本进行输出
String contentBeforeMarker = currentBufferStr.substring(0, markerIndex);
// 输出截取的内容
outputSegment(marker, contentBeforeMarker);
// 重置缓冲区,保留标识符及之后的内容
currentInputBuffer = new StringBuilder(currentBufferStr.substring(markerIndex));
log.info("【识别到分段标识】: {}", marker);
break; // 找到第一个标识后就处理并退出,避免重复处理
}
}
// 如果没有找到SEGMENT_MARKERS字段,则不输出,等待更多输入
}
/**
* 文本输入结束时,处理最后一个分段
*
* @return 最后一个分段的内容,无则返回null
*/
public synchronized void finishInput() {
// 如果缓冲区还有内容,输出全部剩余内容
if (currentInputBuffer.length() > 0) {
String remainingContent = currentInputBuffer.toString();
// 输出剩余的全部内容,使用一个通用标记或保持原格式
outputSegment("Final_Content:", remainingContent);
// 清空缓冲区
currentInputBuffer.setLength(0);
}
}
/**
* 重置分段状态
*
* @param newStartIndex 新的起始索引
*/
private void resetSegmentState(int newStartIndex) {
// 保留未处理的部分,用于下一个分段
String remainingStr = currentInputBuffer.substring(newStartIndex);
currentInputBuffer = new StringBuilder(remainingStr);
}
/**
* 输出分段内容(封装输出逻辑)
*
* @param marker 分段标识
* @param content 分段内容
* @return 格式化后的分段内容
*/
private void outputSegment(String marker, String content) {
log.info("【分段内容】{}: {}", marker, content);
// 根据实际需求处理事件,这里可能需要创建适当的事件对象而不是传入null
// workPanelCollector.addEvent(null); // 临时注释掉可能引发问题的调用
// 或者创建一个适当的事件对象
// 例如:workPanelCollector.addEvent(new WorkPanelEvent(marker, content));
}
}
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/**
* Agent错误处理工具类
* 统一处理Agent处理器中的错误逻辑
*/
@Slf4j
@Component
public class AgentErrorHandler {
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理401未授权错误
*
* @param e 异常对象
* @return 是否为401错误
*/
public boolean isUnauthorizedError(Throwable e) {
return errorHandlerService.isUnauthorizedError(new Exception(e));
}
/**
* 处理流式处理中的错误
*
* @param e 异常对象
* @param tokenConsumer token处理回调函数
* @param errorMessagePrefix 错误消息前缀
*/
public void handleStreamError(Throwable e, Consumer<String> tokenConsumer, String errorMessagePrefix) {
errorHandlerService.handleStreamError(e, tokenConsumer, errorMessagePrefix);
}
/**
* 处理同步处理中的错误
*
* @param e 异常对象
* @param errorMessagePrefix 错误消息前缀
* @return 错误消息
*/
public String handleSyncError(Throwable e, String errorMessagePrefix) {
// 检查是否是401 Unauthorized错误
if (isUnauthorizedError(e)) {
log.error("LLM返回401未授权错误: {}", e.getMessage());
return "请配置API密钥";
} else {
String errorMessage = e.getMessage();
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
return errorMessagePrefix + ": " + errorMessage;
}
}
/**
* 发送错误信息给客户端
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void sendErrorMessage(Consumer<String> tokenConsumer, String errorMessage) {
errorHandlerService.sendErrorMessage(tokenConsumer, errorMessage);
}
/**
* 确保在异常情况下也调用完成回调
*
* @param tokenConsumer token处理回调函数
* @param errorMessage 错误消息
*/
public void ensureCompletionCallback(Consumer<String> tokenConsumer, String errorMessage) {
if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(errorMessage);
} catch (NoClassDefFoundError e) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", e.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误: {}", ex.getMessage(), ex);
}
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.processor.AgentProcessorFactory;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Agent处理器服务
* 负责处理Agent处理器的获取和心跳机制
*/
@Slf4j
@Service
public class AgentProcessorService {
@Autowired
private AgentProcessorFactory agentProcessorFactory;
@Autowired
private UserSseService workPanelSseService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 获取处理器并启动心跳保活机制
*
* @param agent Agent对象
* @param emitter SSE发射器
* @return Agent处理器,如果获取失败则返回null
*/
public AgentProcessor getProcessorAndStartHeartbeat(Agent agent, SseEmitter emitter) {
LogUtils.enterMethod("getProcessorAndStartHeartbeat", agent);
// 参数验证
if (ValidationUtils.isNull(agent, "agent")) {
chatErrorHandler.handleChatError(emitter, "Agent对象不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "Agent对象不能为空");
return null;
}
if (ValidationUtils.isNull(emitter, "emitter")) {
chatErrorHandler.handleChatError(emitter, "SSE发射器不能为空");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "SSE发射器不能为空");
return null;
}
try {
// 根据Agent类型选择处理器并处理请求
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
chatErrorHandler.handleChatError(emitter, "无法获取Agent处理器");
LogUtils.exitMethod("getProcessorAndStartHeartbeat", "无法获取Agent处理器");
return null;
}
log.info("使用{} Agent处理器处理对话", processor.getProcessorType());
// 启动心跳保活机制
workPanelSseService.startHeartbeat(emitter, new AtomicBoolean(false));
LogUtils.exitMethod("getProcessorAndStartHeartbeat", processor);
return processor;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "获取处理器或启动心跳时发生错误", e, null);
LogUtils.exitMethod("getProcessorAndStartHeartbeat", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.ValidationUtils;
import pangea.hiagent.common.utils.UserUtils;
/**
* Agent验证服务
* 负责处理Agent的参数验证和权限检查
*/
@Slf4j
@Service
public class AgentValidationService {
@Autowired
private AgentService agentService;
@Autowired
private ChatErrorHandler chatErrorHandler;
/**
* 验证Agent存在性和用户权限
*
* @param agentId Agent ID
* @param userId 用户ID
* @param emitter SSE发射器
* @return Agent对象,如果验证失败则返回null
*/
public Agent validateAgentAndPermission(String agentId, String userId, SseEmitter emitter) {
LogUtils.enterMethod("validateAgentAndPermission", agentId, userId);
// 参数验证
if (ValidationUtils.isBlank(agentId, "agentId")) {
chatErrorHandler.handleChatError(emitter, "Agent ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "Agent ID不能为空");
return null;
}
if (ValidationUtils.isBlank(userId, "userId")) {
chatErrorHandler.handleChatError(emitter, "用户ID不能为空");
LogUtils.exitMethod("validateAgentAndPermission", "用户ID不能为空");
return null;
}
try {
// 获取Agent信息
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
chatErrorHandler.handleChatError(emitter, "Agent不存在");
LogUtils.exitMethod("validateAgentAndPermission", "Agent不存在");
return null;
}
// 检查权限(可选)
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
chatErrorHandler.handleChatError(emitter, "无权限访问该Agent");
LogUtils.exitMethod("validateAgentAndPermission", "无权限访问该Agent");
return null;
}
LogUtils.exitMethod("validateAgentAndPermission", agent);
return agent;
} catch (Exception e) {
chatErrorHandler.handleChatError(emitter, "验证Agent和权限时发生错误", e, null);
LogUtils.exitMethod("validateAgentAndPermission", e);
return null;
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 聊天服务错误处理工具类
* 统一处理聊天过程中的各种异常情况
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ChatErrorHandler {
@Autowired
private ErrorHandlerService unifiedErrorHandlerService;
/**
* 处理聊天过程中的异常
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @param exception 异常对象
* @param processorType 处理器类型(可选)
*/
public void handleChatError(SseEmitter emitter, String errorMessage, Exception exception, String processorType) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage, exception, processorType);
}
/**
* 处理聊天过程中的异常()
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
*/
public void handleChatError(SseEmitter emitter, String errorMessage) {
unifiedErrorHandlerService.handleChatError(emitter, errorMessage);
}
/**
* 处理Token处理过程中的异常
*
* @param emitter SSE发射器
* @param processorType 处理器类型
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleTokenError(SseEmitter emitter, String processorType, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleTokenError(emitter, processorType, exception, isCompleted);
}
/**
* 处理完成回调过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
*/
public void handleCompletionError(SseEmitter emitter, Exception exception) {
unifiedErrorHandlerService.handleCompletionError(emitter, exception);
}
/**
* 处理对话记录保存过程中的异常
*
* @param emitter SSE发射器
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleSaveDialogueError(SseEmitter emitter, Exception exception, AtomicBoolean isCompleted) {
unifiedErrorHandlerService.handleSaveDialogueError(emitter, exception, isCompleted);
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.AgentDialogue;
// import pangea.hiagent.common.utils.ValidationUtils; // 移除这个依赖
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.common.utils.LogUtils;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.web.service.AgentService;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 完成回调处理服务
* 负责处理流式输出完成后的回调操作
*/
@Slf4j
@Service
public class CompletionHandlerService {
@Autowired
private AgentService agentService;
@Autowired
private UserSseService unifiedSseService;
@Autowired
private ErrorHandlerService errorHandlerService;
/**
* 处理完成回调
*
* @param emitter SSE发射器
* @param processor Agent处理器
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param fullContent 完整内容
* @param isCompleted 完成状态标记
*/
public void handleCompletion(SseEmitter emitter, AgentProcessor processor, Agent agent,
AgentRequest request, String userId,
String fullContent, AtomicBoolean isCompleted) {
LogUtils.enterMethod("handleCompletion", emitter, processor, agent, request, userId);
// 参数验证 - 内联验证逻辑,避免依赖ValidationUtils
if (emitter == null) {
log.error("SSE发射器不能为空");
LogUtils.exitMethod("handleCompletion", "SSE发射器不能为空");
return;
}
if (processor == null) {
log.error("Agent处理器不能为空");
LogUtils.exitMethod("handleCompletion", "Agent处理器不能为空");
return;
}
if (agent == null) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("handleCompletion", "Agent对象不能为空");
return;
}
if (request == null) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("handleCompletion", "Agent请求不能为空");
return;
}
if (userId == null || userId.trim().isEmpty()) {
log.error("用户ID不能为空");
LogUtils.exitMethod("handleCompletion", "用户ID不能为空");
return;
}
if (isCompleted == null) {
log.error("完成状态标记不能为空");
LogUtils.exitMethod("handleCompletion", "完成状态标记不能为空");
return;
}
log.info("{} Agent处理完成,总字符数: {}", processor.getProcessorType(), fullContent != null ? fullContent.length() : 0);
// 严格按照正确的SSE连接关闭顺序:
// 1. 先完成所有业务处理:保存对话记录等
// 2. 发送完成信号:发送[DONE]信号
// 3. 关闭SSE连接:调用emitter.complete()方法完成连接
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 1. 完成所有业务处理:保存对话记录
Exception completionException = null;
try {
saveDialogue(agent, request, userId, fullContent);
log.info("对话记录保存成功");
} catch (Exception e) {
log.error("保存对话记录失败", e);
// 记录异常但不中断流程
completionException = e;
}
log.debug("业务处理完成,准备发送完成信号");
// 2. 发送完成信号:发送[DONE]信号
try {
// 检查emitter是否已经完成,避免向已完成的连接发送数据
if (!unifiedSseService.isEmitterCompleted(emitter)) {
// 发送完成信号
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送");
} else {
log.debug("SSE emitter已完成,跳过发送完成信号");
}
} catch (Exception e) {
log.error("发送完成信号失败", e);
completionException = e;
}
log.debug("完成信号发送完毕,准备关闭SSE连接");
// 3. 关闭SSE连接:调用emitter.complete()方法完成连接
try {
// 检查emitter是否已经完成,避免重复关闭
if (!unifiedSseService.isEmitterCompleted(emitter)) {
unifiedSseService.completeEmitter(emitter, isCompleted);
log.debug("SSE Emitter已关闭");
} else {
log.debug("SSE Emitter已完成,跳过关闭操作");
}
} catch (Exception e) {
log.error("关闭Emitter时发生错误", e);
}
LogUtils.exitMethod("handleCompletion", "处理完成");
}
/**
* 保存对话记录
*/
public void saveDialogue(Agent agent, AgentRequest request, String userId, String responseContent) {
LogUtils.enterMethod("saveDialogue", agent, request, userId);
// 参数验证 - 内联验证逻辑,避免依赖ValidationUtils
if (agent == null) {
log.error("Agent对象不能为空");
LogUtils.exitMethod("saveDialogue", "Agent对象不能为空");
return;
}
if (request == null) {
log.error("Agent请求不能为空");
LogUtils.exitMethod("saveDialogue", "Agent请求不能为空");
return;
}
if (userId == null || userId.trim().isEmpty()) {
log.error("用户ID不能为空");
LogUtils.exitMethod("saveDialogue", "用户ID不能为空");
return;
}
try {
// 创建对话记录
AgentDialogue dialogue = AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
// 确保ID被设置
if (dialogue.getId() == null || dialogue.getId().isEmpty()) {
dialogue.setId(java.util.UUID.randomUUID().toString());
}
// 设置创建人和更新人信息
// 在异步线程中获取用户ID
String currentUserId = UserUtils.getCurrentUserIdInAsync();
if (currentUserId == null) {
currentUserId = userId; // 回退到传入的userId
}
dialogue.setCreatedBy(currentUserId);
dialogue.setUpdatedBy(currentUserId);
// 保存对话记录
agentService.saveDialogue(dialogue);
LogUtils.exitMethod("saveDialogue", "保存成功");
} catch (Exception e) {
log.error("保存对话记录失败", e);
LogUtils.exitMethod("saveDialogue", e);
throw new RuntimeException("保存对话记录失败", e);
}
}
}
\ No newline at end of file
......@@ -5,8 +5,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.workpanel.event.EventService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
......@@ -18,9 +16,6 @@ import java.util.function.Consumer;
@Service
public class ErrorHandlerService {
@Autowired
private EventService eventService;
@Autowired
private ExceptionMonitoringService exceptionMonitoringService;
......@@ -137,7 +132,7 @@ public class ErrorHandlerService {
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
......@@ -167,7 +162,7 @@ public class ErrorHandlerService {
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String fullErrorMessage = buildFullErrorMessage(errorMessage, null, errorId, null);
eventService.sendErrorEvent(emitter, fullErrorMessage);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
......@@ -209,7 +204,7 @@ public class ErrorHandlerService {
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "处理响应时发生错误";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
eventService.sendErrorEvent(emitter, fullErrorMessage);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
......@@ -247,7 +242,7 @@ public class ErrorHandlerService {
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "发送完成事件失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "完成回调");
eventService.sendErrorEvent(emitter, fullErrorMessage);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
......@@ -362,7 +357,7 @@ public class ErrorHandlerService {
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "保存对话记录失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "对话记录");
eventService.sendErrorEvent(emitter, fullErrorMessage);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 错误处理工具类
* 提供统一的错误处理方法,减少重复代码
* 委托给ErrorHandlerService进行实际处理
*/
@Slf4j
@Component
public class ErrorHandlerUtils {
private static ErrorHandlerService errorHandlerService;
public ErrorHandlerUtils(ErrorHandlerService errorHandlerService) {
ErrorHandlerUtils.errorHandlerService = errorHandlerService;
}
/**
* 构建完整的错误消息
*
* @param errorMessage 基本错误信息
* @param exception 异常对象
* @param errorId 错误跟踪ID
* @param processorType 处理器类型
* @return 完整的错误消息
*/
public static String buildFullErrorMessage(String errorMessage, Exception exception, String errorId, String processorType) {
return errorHandlerService.buildFullErrorMessage(errorMessage, exception, errorId, processorType);
}
/**
* 检查是否为未授权错误
*
* @param exception 异常对象
* @return 是否为未授权错误
*/
public static boolean isUnauthorizedError(Exception exception) {
return errorHandlerService.isUnauthorizedError(exception);
}
/**
* 检查是否为超时错误
*
* @param exception 异常对象
* @return 是否为超时错误
*/
public static boolean isTimeoutError(Exception exception) {
return errorHandlerService.isTimeoutError(exception);
}
/**
* 生成错误跟踪ID
*
* @return 错误跟踪ID
*/
public static String generateErrorId() {
return errorHandlerService.generateErrorId();
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
/**
* SSE Token发射器
* 专注于将token转换为SSE事件并发送
*/
@Slf4j
@Component
public class SseTokenEmitter implements TokenConsumerWithCompletion {
private final UserSseService userSseService;
// 当前处理的emitter
private SseEmitter emitter;
// 上下文信息
private Agent agent;
private AgentRequest request;
private String userId;
// 完成回调
private CompletionCallback completionCallback;
public SseTokenEmitter(UserSseService userSseService) {
this.userSseService = userSseService;
}
/**
* 设置当前使用的SSE发射器
*/
public void setEmitter(SseEmitter emitter) {
this.emitter = emitter;
}
/**
* 设置上下文信息
*/
public void setContext(Agent agent, AgentRequest request, String userId) {
this.agent = agent;
this.request = request;
this.userId = userId;
}
/**
* 设置完成回调
*/
public void setCompletionCallback(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (emitter != null && userSseService.isEmitterValidSafe(emitter)) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
userSseService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
userSseService.sendTokenEvent(emitter, token);
}
} else {
log.debug("SSE emitter已无效,跳过发送token");
}
} catch (Exception e) {
log.error("发送token失败", e);
}
}
@Override
public void onComplete(String fullContent) {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
// 发送完成事件
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送");
}
// 调用完成回调
if (completionCallback != null) {
completionCallback.onComplete(emitter, agent, request, userId, fullContent);
}
} catch (Exception e) {
log.error("处理完成事件失败", e);
} finally {
// 关闭连接
closeEmitter();
}
}
/**
* 安全关闭SSE连接
*/
public void closeEmitter() {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
}
}
/**
* 完成回调接口
*/
@FunctionalInterface
public interface CompletionCallback {
void onComplete(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent);
}
}
\ No newline at end of file
package pangea.hiagent.agent.sse;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* SSE连接协调器
* 专门负责协调SSE连接的创建、管理和销毁过程
*/
@Slf4j
@Component
public class SseConnectionCoordinator {
private final UserSseService unifiedSseService;
private final EventService eventService;
public SseConnectionCoordinator(
UserSseService unifiedSseService,
EventService eventService) {
this.unifiedSseService = unifiedSseService;
this.eventService = eventService;
}
/**
* 创建并注册SSE连接
*
* @param userId 用户ID
* @return SSE Emitter
*/
public SseEmitter createAndRegisterConnection(String userId) {
log.debug("开始为用户 {} 创建SSE连接", userId);
// 创建 SSE emitter
SseEmitter emitter = unifiedSseService.createEmitter();
log.debug("SSE Emitter创建成功");
// 注册用户的SSE连接
unifiedSseService.registerSession(userId, emitter);
log.debug("用户 {} 的SSE连接注册成功", userId);
// 注册 emitter 回调
unifiedSseService.registerCallbacks(emitter, userId);
log.debug("SSE Emitter回调注册成功");
// 启动心跳机制
unifiedSseService.startHeartbeat(emitter, new AtomicBoolean(false));
log.debug("心跳机制启动成功");
log.info("用户 {} 的SSE连接创建和注册完成", userId);
return emitter;
}
/**
* 订阅工作面板事件
*
* @param userId 用户ID
* @param workPanelEventConsumer 工作面板事件消费者
* @param emitter SSE Emitter
*/
public void subscribeToWorkPanelEvents(String userId, Consumer<WorkPanelEvent> workPanelEventConsumer, SseEmitter emitter) {
log.debug("开始为用户 {} 订阅工作面板事件", userId);
// 发送连接成功事件
try {
WorkPanelEvent connectedEvent = WorkPanelEvent.builder()
.type("observation")
.title("连接成功")
.timestamp(System.currentTimeMillis())
.build();
eventService.sendWorkPanelEvent(emitter, connectedEvent);
log.debug("已发送连接成功事件");
} catch (Exception e) {
log.error("发送连接成功事件失败", e);
}
log.info("用户 {} 的工作面板事件订阅完成", userId);
}
}
\ No newline at end of file
package pangea.hiagent.agent.sse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 用户会话管理器
* 专门负责管理用户的SSE会话连接
*/
@Slf4j
@Component
public class UserSseManager {
// 存储用户ID到SSE Emitter的映射关系
private final ConcurrentMap<String, SseEmitter> userEmitters = new ConcurrentHashMap<>();
// 存储SSE Emitter到用户ID的反向映射关系(用于快速查找)
private final ConcurrentMap<SseEmitter, String> emitterUsers = new ConcurrentHashMap<>();
// 存储连接创建时间,用于超时检查
private final ConcurrentMap<SseEmitter, AtomicLong> connectionTimes = new ConcurrentHashMap<>();
// 连接超时时间(毫秒),默认30分钟
private static final long CONNECTION_TIMEOUT = 30 * 60 * 1000L;
/**
* 注册用户的SSE连接
* 如果该用户已有连接,则先关闭旧连接再注册新连接
*
* @param userId 用户ID
* @param emitter SSE Emitter
* @return true表示注册成功,false表示注册失败
*/
public boolean registerSession(String userId, SseEmitter emitter) {
if (userId == null || userId.isEmpty() || emitter == null) {
log.warn("注册SSE会话失败:用户ID或Emitter为空");
return false;
}
try {
// 检查该用户是否已有连接
SseEmitter existingEmitter = userEmitters.get(userId);
if (existingEmitter != null) {
// 如果已有连接,先关闭旧连接
log.info("用户 {} 已有SSE连接,关闭旧连接", userId);
closeEmitter(existingEmitter);
// 从映射中移除旧连接
userEmitters.remove(userId, existingEmitter);
emitterUsers.remove(existingEmitter);
}
// 注册新连接
userEmitters.put(userId, emitter);
emitterUsers.put(emitter, userId);
// 记录连接创建时间
connectionTimes.put(emitter, new AtomicLong(System.currentTimeMillis()));
log.info("成功为用户 {} 注册SSE会话", userId);
return true;
} catch (Exception e) {
log.error("注册SSE会话时发生异常:用户ID={}", userId, e);
return false;
}
}
/**
* 移除用户的SSE会话
*
* @param emitter SSE Emitter
*/
public void removeSession(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
String userId = emitterUsers.get(emitter);
if (userId != null) {
userEmitters.remove(userId, emitter);
emitterUsers.remove(emitter);
connectionTimes.remove(emitter);
log.debug("已移除用户 {} 的SSE会话", userId);
}
} catch (Exception e) {
log.warn("移除SSE会话时发生异常", e);
}
}
/**
* 获取用户的当前SSE连接
*
* @param userId 用户ID
* @return SSE Emitter,如果不存在则返回null
*/
public SseEmitter getSession(String userId) {
if (userId == null || userId.isEmpty()) {
return null;
}
return userEmitters.get(userId);
}
/**
* 检查用户是否有活跃的SSE连接
*
* @param userId 用户ID
* @return true表示有活跃连接,false表示没有
*/
public boolean hasActiveSession(String userId) {
if (userId == null || userId.isEmpty()) {
return false;
}
SseEmitter emitter = userEmitters.get(userId);
return emitter != null && isEmitterValid(emitter) && !isSessionExpired(emitter);
}
/**
* 检查SSE Emitter是否仍然有效
*
* @param emitter 要检查的emitter
* @return 如果有效返回true,否则返回false
*/
public boolean isEmitterValid(SseEmitter emitter) {
if (emitter == null) {
return false;
}
// 检查emitter是否已完成
try {
// 尝试发送一个空事件来检查连接状态
emitter.send(SseEmitter.event().name("ping").data("").build());
return true;
} catch (Exception e) {
// 如果出现任何异常,认为连接已失效
return false;
}
}
/**
* 检查会话是否已过期
*
* @param emitter 要检查的emitter
* @return 如果过期返回true,否则返回false
*/
public boolean isSessionExpired(SseEmitter emitter) {
if (emitter == null) {
return true;
}
AtomicLong connectionTime = connectionTimes.get(emitter);
if (connectionTime == null) {
return false; // 如果没有记录时间,假设未过期
}
long currentTime = System.currentTimeMillis();
long connectionStartTime = connectionTime.get();
return (currentTime - connectionStartTime) > CONNECTION_TIMEOUT;
}
/**
* 关闭指定的SSE Emitter
*
* @param emitter SSE Emitter
*/
public void closeEmitter(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
emitter.complete();
log.debug("已关闭SSE Emitter");
} catch (Exception e) {
log.warn("关闭SSE Emitter时发生异常", e);
}
}
/**
* 获取当前会话的用户数量
*
* @return 用户数量
*/
public int getSessionCount() {
return userEmitters.size();
}
/**
* 清理所有会话(用于系统关闭时)
*/
public void clearAllSessions() {
try {
for (SseEmitter emitter : emitterUsers.keySet()) {
try {
emitter.complete();
} catch (Exception e) {
log.warn("关闭SSE Emitter时发生异常", e);
}
}
userEmitters.clear();
emitterUsers.clear();
connectionTimes.clear();
log.info("已清理所有SSE会话");
} catch (Exception e) {
log.error("清理所有SSE会话时发生异常", e);
}
}
/**
* 处理连接完成事件
* 职责:协调完成连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionCompletion(SseEmitter emitter) {
log.debug("处理SSE连接完成事件");
// 移除连接
removeSession(emitter);
}
/**
* 处理连接超时事件
* 职责:协调超时连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionTimeout(SseEmitter emitter) {
log.debug("处理SSE连接超时事件");
// 移除连接
removeSession(emitter);
}
/**
* 处理连接错误事件
* 职责:协调错误连接的清理工作
*
* @param emitter SSE Emitter
*/
public void handleConnectionError(SseEmitter emitter) {
log.debug("处理SSE连接错误事件");
// 移除连接
removeSession(emitter);
}
}
\ No newline at end of file
......@@ -5,9 +5,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.reflection.MetaObject;
import org.springframework.stereotype.Component;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.common.utils.UserContextPropagationUtil;
import pangea.hiagent.common.utils.AsyncUserContextDecorator;
import java.time.LocalDateTime;
/**
......
......@@ -206,10 +206,10 @@ public class SecurityConfig {
}
})
)
// 添加SSE授权检查过滤器,在所有其他过滤器之前运行,提前拒绝未认证的SSE请求
.addFilterBefore(sseAuthorizationFilter, UsernamePasswordAuthenticationFilter.class)
// 添加JWT认证过滤器
.addFilterBefore(jwtAuthenticationFilter, UsernamePasswordAuthenticationFilter.class)
// 添加SSE授权检查过滤器,在JWT过滤器之后但在其他安全过滤器之前运行
.addFilterAfter(sseAuthorizationFilter, JwtAuthenticationFilter.class)
// 配置X-Frame-Options头部,允许同源iframe嵌入
.headers(headers -> headers
.frameOptions(frameOptions -> frameOptions
......
......@@ -218,22 +218,20 @@ public class GlobalExceptionHandler {
boolean isSseEndpoint = requestUri.contains("/api/v1/agent/chat-stream") || requestUri.contains("/api/v1/agent/timeline-events");
// 检查响应是否已经提交
if (request instanceof jakarta.servlet.http.HttpServletRequest) {
jakarta.servlet.http.HttpServletRequest httpRequest = (jakarta.servlet.http.HttpServletRequest) request;
// 获取当前的response对象
jakarta.servlet.http.HttpServletResponse response = null;
jakarta.servlet.http.HttpServletResponse httpResponse = null;
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object nativeResponse = ((org.springframework.web.context.request.ServletWebRequest)
org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes()).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
response = (jakarta.servlet.http.HttpServletResponse) nativeResponse;
Object requestAttributes = org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes();
if (requestAttributes instanceof org.springframework.web.context.request.ServletRequestAttributes) {
org.springframework.web.context.request.ServletRequestAttributes servletRequestAttributes =
(org.springframework.web.context.request.ServletRequestAttributes) requestAttributes;
if (servletRequestAttributes.getResponse() instanceof jakarta.servlet.http.HttpServletResponse) {
httpResponse = (jakarta.servlet.http.HttpServletResponse) servletRequestAttributes.getResponse();
}
}
// 检查响应是否已提交
if (response != null && response.isCommitted()) {
if (httpResponse != null && httpResponse.isCommitted()) {
log.warn("响应已提交,无法发送访问拒绝错误: {}", request.getRequestURL());
// 如果是SSE端点且响应已提交,返回空响应避免二次异常
return ResponseEntity.ok().build();
......@@ -243,26 +241,29 @@ public class GlobalExceptionHandler {
// 如果是SSE端点,但响应未提交,发送SSE格式的错误响应
if (isSseEndpoint) {
try {
jakarta.servlet.http.HttpServletResponse response = null;
jakarta.servlet.http.HttpServletResponse sseResponse = null;
if (org.springframework.web.context.request.RequestContextHolder.getRequestAttributes() != null) {
Object nativeResponse = ((org.springframework.web.context.request.ServletWebRequest)
org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes()).getNativeResponse();
if (nativeResponse instanceof jakarta.servlet.http.HttpServletResponse) {
response = (jakarta.servlet.http.HttpServletResponse) nativeResponse;
Object requestAttributes = org.springframework.web.context.request.RequestContextHolder
.getRequestAttributes();
if (requestAttributes instanceof org.springframework.web.context.request.ServletRequestAttributes) {
org.springframework.web.context.request.ServletRequestAttributes servletRequestAttributes =
(org.springframework.web.context.request.ServletRequestAttributes) requestAttributes;
if (servletRequestAttributes.getResponse() instanceof jakarta.servlet.http.HttpServletResponse) {
sseResponse = (jakarta.servlet.http.HttpServletResponse) servletRequestAttributes.getResponse();
}
}
}
if (response != null) {
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
if (sseResponse != null) {
sseResponse.setStatus(HttpServletResponse.SC_FORBIDDEN);
sseResponse.setContentType("text/event-stream;charset=UTF-8");
sseResponse.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"访问被拒绝,无权限访问该资源\", \"code\": 403, \"timestamp\": " +
sseResponse.getWriter().write("event: error\n");
sseResponse.getWriter().write("data: {\"error\": \"访问被拒绝,无权限访问该资源\", \"code\": 403, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
sseResponse.getWriter().flush();
log.debug("已发送SSE 访问拒绝错误响应");
}
......@@ -279,9 +280,9 @@ public class GlobalExceptionHandler {
.details("您没有权限执行此操作")
.build();
ApiResponse<Void> response = ApiResponse.error(ErrorCode.FORBIDDEN.getCode(),
ApiResponse<Void> finalResponse = ApiResponse.error(ErrorCode.FORBIDDEN.getCode(),
ErrorCode.FORBIDDEN.getMessage(), errorDetail);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(response);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(finalResponse);
}
/**
......
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.concurrent.Callable;
/**
......@@ -13,6 +17,79 @@ import java.util.concurrent.Callable;
@Component
public class AsyncUserContextDecorator {
/**
* 用户上下文持有者类,用于在异步线程间传递认证信息
*/
public static class UserContextHolder implements Serializable {
private static final long serialVersionUID = 1L;
private final Authentication authentication;
public UserContextHolder(Authentication authentication) {
this.authentication = authentication;
}
public Authentication getAuthentication() {
return authentication;
}
}
/**
* 捕获当前线程的用户上下文
* @return 用户上下文持有者对象
*/
public static UserContextHolder captureUserContext() {
try {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null) {
log.debug("捕获到当前线程的用户认证信息: {}", authentication.getPrincipal());
return new UserContextHolder(authentication);
} else {
log.debug("当前线程无用户认证信息");
return null;
}
} catch (Exception e) {
log.error("捕获用户上下文时发生异常", e);
return null;
}
}
/**
* 将用户上下文传播到当前线程
* @param userContextHolder 用户上下文持有者对象
*/
public static void propagateUserContext(UserContextHolder userContextHolder) {
try {
if (userContextHolder != null) {
Authentication authentication = userContextHolder.getAuthentication();
if (authentication != null) {
SecurityContext context = SecurityContextHolder.createEmptyContext();
context.setAuthentication(authentication);
SecurityContextHolder.setContext(context);
log.debug("已将用户认证信息传播到当前线程: {}", authentication.getPrincipal());
} else {
log.debug("用户上下文持有者中的认证信息为空");
}
} else {
log.debug("用户上下文持有者为空");
}
} catch (Exception e) {
log.error("传播用户上下文时发生异常", e);
}
}
/**
* 清理当前线程的用户上下文
*/
public static void clearUserContext() {
try {
SecurityContextHolder.clearContext();
log.debug("已清理当前线程的用户上下文");
} catch (Exception e) {
log.error("清理用户上下文时发生异常", e);
}
}
/**
* 包装Runnable任务,自动传播用户上下文
* @param runnable 原始任务
......@@ -20,18 +97,18 @@ public class AsyncUserContextDecorator {
*/
public static Runnable wrapWithContext(Runnable runnable) {
// 捕获当前线程的用户上下文
UserContextPropagationUtil.UserContextHolder userContext = UserContextPropagationUtil.captureUserContext();
UserContextHolder userContext = captureUserContext();
return () -> {
try {
// 在异步线程中传播用户上下文
UserContextPropagationUtil.propagateUserContext(userContext);
propagateUserContext(userContext);
// 执行原始任务
runnable.run();
} finally {
// 清理当前线程的用户上下文
UserContextPropagationUtil.clearUserContext();
clearUserContext();
}
};
}
......@@ -44,18 +121,18 @@ public class AsyncUserContextDecorator {
*/
public static <V> Callable<V> wrapWithContext(Callable<V> callable) {
// 捕获当前线程的用户上下文
UserContextPropagationUtil.UserContextHolder userContext = UserContextPropagationUtil.captureUserContext();
UserContextHolder userContext = captureUserContext();
return () -> {
try {
// 在异步线程中传播用户上下文
UserContextPropagationUtil.propagateUserContext(userContext);
propagateUserContext(userContext);
// 执行原始任务
return callable.call();
} finally {
// 清理当前线程的用户上下文
UserContextPropagationUtil.clearUserContext();
clearUserContext();
}
};
}
......
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 异步用户上下文使用示例
* 展示如何在异步任务中正确获取用户认证信息
*/
@Slf4j
@Component
public class AsyncUserContextUsageExample {
// 示例线程池
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* 方式一:使用SecurityContextHolder的InheritableThreadLocal策略(推荐)
* 适用于父子线程关系明确的场景
*/
public void executeTaskWithInheritableThreadLocal() {
// 在主线程中获取用户ID(正常情况下可以获取到)
String userId = UserUtils.getCurrentUserId();
log.info("主线程中获取到用户ID: {}", userId);
// 提交异步任务,由于使用了InheritableThreadLocal策略,子线程可以继承父线程的SecurityContext
CompletableFuture.runAsync(() -> {
// 在异步线程中获取用户ID
String asyncUserId = UserUtils.getCurrentUserId();
log.info("异步线程中获取到用户ID: {}", asyncUserId);
// 执行业务逻辑
performBusinessLogic(asyncUserId);
}, executorService);
}
/**
* 方式二:使用UserContextPropagationUtil手动传播用户上下文
* 适用于复杂的异步场景或需要更精确控制的场景
*/
public void executeTaskWithManualPropagation() {
// 在主线程中获取用户ID
String userId = UserUtils.getCurrentUserId();
log.info("主线程中获取到用户ID: {}", userId);
// 提交异步任务,手动传播用户上下文
CompletableFuture.runAsync(AsyncUserContextDecorator.wrapWithContext(() -> {
// 在异步线程中获取用户ID
String asyncUserId = UserUtils.getCurrentUserId();
log.info("异步线程中获取到用户ID: {}", asyncUserId);
// 执行业务逻辑
performBusinessLogic(asyncUserId);
}), executorService);
}
/**
* 方式三:使用专门的异步环境获取方法
* 适用于无法通过线程上下文传播获取用户信息的场景
*/
public void executeTaskWithDirectTokenParsing() {
// 在主线程中获取用户ID
String userId = UserUtils.getCurrentUserId();
log.info("主线程中获取到用户ID: {}", userId);
// 提交异步任务,直接解析请求中的token获取用户ID
CompletableFuture.runAsync(() -> {
// 在异步线程中通过直接解析token获取用户ID
String asyncUserId = UserUtils.getCurrentUserIdInAsync();
log.info("异步线程中通过直接解析token获取到用户ID: {}", asyncUserId);
// 执行业务逻辑
performBusinessLogic(asyncUserId);
}, executorService);
}
/**
* 执行业务逻辑示例
* @param userId 用户ID
*/
private void performBusinessLogic(String userId) {
if (userId != null) {
log.info("为用户 {} 执行业务逻辑", userId);
// 这里执行具体的业务逻辑
} else {
log.warn("未获取到用户ID,执行匿名用户逻辑");
// 这里执行匿名用户的业务逻辑
}
}
/**
* 清理资源
*/
public void shutdown() {
executorService.shutdown();
}
}
\ No newline at end of file
package pangea.hiagent.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* 用户上下文传播工具类
* 用于在异步线程间传播用户认证信息
*/
@Slf4j
@Component
public class UserContextPropagationUtil {
/**
* 用户上下文持有者类,用于在异步线程间传递认证信息
*/
public static class UserContextHolder implements Serializable {
private static final long serialVersionUID = 1L;
private final Authentication authentication;
public UserContextHolder(Authentication authentication) {
this.authentication = authentication;
}
public Authentication getAuthentication() {
return authentication;
}
}
/**
* 捕获当前线程的用户上下文
* @return 用户上下文持有者对象
*/
public static UserContextHolder captureUserContext() {
try {
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null) {
log.debug("捕获到当前线程的用户认证信息: {}", authentication.getPrincipal());
return new UserContextHolder(authentication);
} else {
log.debug("当前线程无用户认证信息");
return null;
}
} catch (Exception e) {
log.error("捕获用户上下文时发生异常", e);
return null;
}
}
/**
* 将用户上下文传播到当前线程
* @param userContextHolder 用户上下文持有者对象
*/
public static void propagateUserContext(UserContextHolder userContextHolder) {
try {
if (userContextHolder != null) {
Authentication authentication = userContextHolder.getAuthentication();
if (authentication != null) {
SecurityContext context = SecurityContextHolder.createEmptyContext();
context.setAuthentication(authentication);
SecurityContextHolder.setContext(context);
log.debug("已将用户认证信息传播到当前线程: {}", authentication.getPrincipal());
} else {
log.debug("用户上下文持有者中的认证信息为空");
}
} else {
log.debug("用户上下文持有者为空");
}
} catch (Exception e) {
log.error("传播用户上下文时发生异常", e);
}
}
/**
* 清理当前线程的用户上下文
*/
public static void clearUserContext() {
try {
SecurityContextHolder.clearContext();
log.debug("已清理当前线程的用户上下文");
} catch (Exception e) {
log.error("清理用户上下文时发生异常", e);
}
}
}
\ No newline at end of file
......@@ -115,22 +115,21 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
if (isStreamEndpoint || isTimelineEndpoint) {
log.debug("SSE端点授权检查: {} {}", request.getMethod(), requestUri);
// 尝试从请求中提取并验证JWT token
String token = extractTokenFromRequest(request);
// 检查响应是否已经提交,避免后续错误处理异常
if (response.isCommitted()) {
log.warn("响应已提交,无法处理SSE端点授权检查");
return;
}
// 从SecurityContext获取当前认证用户
String userId = null;
var authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.isAuthenticated() && !"anonymousUser".equals(authentication.getPrincipal())) {
userId = authentication.getName();
}
if (StringUtils.hasText(token)) {
log.debug("提取到JWT token,进行验证");
try {
// 验证token是否有效
if (jwtUtil.validateToken(token)) {
String userId = jwtUtil.getUserIdFromToken(token);
if (userId != null) {
// 创建认证对象
List<SimpleGrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority("ROLE_USER"));
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(userId, null, authorities);
SecurityContextHolder.getContext().setAuthentication(authentication);
log.debug("SSE端点JWT验证成功,用户: {}", userId);
log.debug("SSE端点已认证,用户: {}", userId);
// 如果是chat-stream端点,需要额外验证agent权限
if (isStreamEndpoint) {
......@@ -168,18 +167,13 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
// 继续执行过滤器链
filterChain.doFilter(request, response);
return;
}
}
} catch (Exception e) {
log.warn("SSE端点JWT验证失败: {}", e.getMessage());
}
}
// token无效或不存在,拒绝连接
} else {
// 用户未认证,拒绝连接
log.warn("SSE端点未认证访问,拒绝连接: {} {}", request.getMethod(), requestUri);
sendSseUnauthorizedError(response);
return;
}
}
// 继续执行过滤器链(非SSE端点)
filterChain.doFilter(request, response);
......
......@@ -23,10 +23,6 @@ import java.util.*;
@Component
public class EmailTools {
private Integer defaultPop3Port = 995;
private String defaultAttachmentPath = "attachments";
private Boolean pop3SslEnable = true;
private String pop3SocketFactoryClass = "javax.net.ssl.SSLSocketFactory";
......
......@@ -9,18 +9,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.stringtemplate.v4.compiler.CodeGenerator.primary_return;
import jakarta.annotation.PreDestroy;
import java.io.File;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import pangea.hiagent.web.service.ToolConfigService;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
......@@ -119,44 +111,6 @@ public class HisenseSsoLoginTool {
return playwrightManager.getUserContext(getUserName());
}
// 存储目录路径
private static final String STORAGE_DIR = "storage";
/**
* 延迟初始化浏览器实例引用和共享上下文
*/
// private void initializeIfNeeded() {
// if (browser == null || sharedContext == null) {
// try {
// log.info("正在初始化海信SSO认证工具的Playwright...");
// // 从Playwright管理器获取共享的浏览器实例
// this.browser = playwrightManager.getBrowser();
// // 初始化共享上下文
// this.sharedContext = browser.newContext();
// log.info("海信SSO认证工具的Playwright初始化成功");
// } catch (Exception e) {
// log.error("海信SSO认证工具的Playwright初始化失败: ", e);
// }
// }
// }
// 移除@PostConstruct注解以避免在启动时初始化
/*
* @PostConstruct
* public void initialize() {
* try {
* log.info("正在初始化海信SSO认证工具的Playwright...");
* // 从Playwright管理器获取共享的浏览器实例
* this.browser = playwrightManager.getBrowser();
* // 初始化共享上下文
* this.sharedContext = browser.newContext();
* log.info("海信SSO认证工具的Playwright初始化成功");
* } catch (Exception e) {
* log.error("海信SSO认证工具的Playwright初始化失败: ", e);
* }
* }
*/
/**
* 销毁Playwright资源
*/
......@@ -1011,31 +965,4 @@ public class HisenseSsoLoginTool {
throw new RuntimeException("SSO登录失败: " + e.getMessage(), e);
}
}
/**
* 截图并保存到存储目录
*
* @param page 当前页面对象
* @param fileName 文件名前缀
*/
private void takeScreenshotAndSave(Page page, String fileName) {
try {
// 确保存储目录存在
File storageDir = new File(STORAGE_DIR);
if (!storageDir.exists()) {
storageDir.mkdirs();
}
// 生成带时间戳的文件名
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
String fullFileName = String.format("%s_%s.png", fileName, timestamp);
String filePath = Paths.get(STORAGE_DIR, fullFileName).toString();
// 截图并保存
page.screenshot(new Page.ScreenshotOptions().setPath(Paths.get(filePath)));
log.info("截图已保存至: {}", filePath);
} catch (Exception e) {
log.error("截图保存失败: {}", e.getMessage(), e);
}
}
}
......@@ -5,11 +5,7 @@ import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.service.AgentChatService;
import pangea.hiagent.agent.service.AgentValidationService;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.ChatRequest;
import pangea.hiagent.web.service.AgentService;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
......@@ -24,11 +20,9 @@ import jakarta.validation.constraints.NotBlank;
public class AgentChatController {
private final AgentChatService agentChatService;
private final AgentService agentService;
public AgentChatController(AgentChatService agentChatService, AgentService agentService) {
public AgentChatController(AgentChatService agentChatService) {
this.agentChatService = agentChatService;
this.agentService = agentService;
}
/**
......
......@@ -16,7 +16,6 @@ import pangea.hiagent.model.Tool;
import pangea.hiagent.web.repository.AgentDialogueRepository;
import pangea.hiagent.web.repository.AgentRepository;
import pangea.hiagent.web.repository.LlmConfigRepository;
import pangea.hiagent.web.service.AgentToolRelationService;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.llm.LlmModelFactory;
......
......@@ -319,7 +319,7 @@ public class TimerService {
// 只有当JSON不是空对象且不为空字符串时才进行解析
if (!"{}".equals(cleanParamsJson) && !cleanParamsJson.isEmpty()
&& !"\"\"".equals(cleanParamsJson)) {
params = objectMapper.readValue(cleanParamsJson, HashMap.class);
params = objectMapper.readValue(cleanParamsJson, new com.fasterxml.jackson.core.type.TypeReference<Map<String, Object>>() {});
}
} catch (Exception e) {
log.error("解析参数JSON失败: {}", timerConfig.getParamsJson(), e);
......
......@@ -6,7 +6,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.workpanel.event.EventDeduplicationService;
import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.web.dto.LogEvent;
import pangea.hiagent.web.dto.ResultEvent;
import pangea.hiagent.web.dto.ThoughtEvent;
......@@ -384,7 +384,7 @@ public class WorkPanelDataCollector implements IWorkPanelDataCollector {
// 通过EventService发送事件到所有SSE连接
for (SseEmitter emitter : unifiedSseService.getEmitters()) {
try {
eventService.sendWorkPanelEvent(emitter, event);
unifiedSseService.sendWorkPanelEvent(emitter, event);
} catch (Exception e) {
log.debug("通过EventService发送事件失败: {}", e.getMessage());
}
......
package pangea.hiagent.workpanel.event;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.web.dto.ToolEvent;
import pangea.hiagent.web.dto.WorkPanelEvent;
......
......@@ -10,7 +10,7 @@ spring:
# 开发环境JPA配置
jpa:
hibernate:
ddl-auto: none # 开发环境:使用SQL脚本初始化表结构,避免初始化冲突
ddl-auto: create # 开发环境:启动时创建表结构,关闭时删除表结构,实现重新初始化
show-sql: true
properties:
hibernate:
......@@ -21,7 +21,7 @@ spring:
init:
schema-locations: classpath:schema.sql
data-locations: classpath:data.sql
mode: embedded # 总是执行创建表和数据脚本、但不会重複插入(MERGE控制)
mode: always # 总是执行创建表和数据脚本,实现重新初始化
# 开启H2控制台
h2:
......
package pangea.hiagent.service;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cache.CacheManager;
import org.springframework.beans.factory.annotation.Autowired;
import static org.junit.jupiter.api.Assertions.*;
import pangea.hiagent.web.service.ToolConfigService;
import pangea.hiagent.model.ToolConfig;
@SpringBootTest
public class ToolConfigServiceCacheTest {
@Autowired
private ToolConfigService toolConfigService;
@Autowired
private CacheManager cacheManager;
@Test
public void testToolConfigCacheFunctionality() {
String toolName = "HisenseSsoLoginTool";
String usernameParam = "ssoUsername";
String passwordParam = "ssoPassword";
String testUsername = "testUser123";
String testPassword = "testPassword123";
// 清理可能存在的旧数据
try {
toolConfigService.saveParamValue(toolName, usernameParam, testUsername);
toolConfigService.saveParamValue(toolName, passwordParam, testPassword);
} catch (Exception e) {
System.out.println("清理数据时出现异常,可能是首次运行: " + e.getMessage());
}
// 第一次获取参数值(这将触发缓存)
String firstUsername = toolConfigService.getParamValue(toolName, usernameParam);
String firstPassword = toolConfigService.getParamValue(toolName, passwordParam);
// 验证获取到的值
assertNotNull(firstUsername, "用户名不应为null");
assertNotNull(firstPassword, "密码不应为null");
// 更新参数值
String updatedUsername = testUsername + "_updated";
String updatedPassword = testPassword + "_updated";
toolConfigService.saveParamValue(toolName, usernameParam, updatedUsername);
toolConfigService.saveParamValue(toolName, passwordParam, updatedPassword);
// 再次获取参数值(应该返回更新后的值,因为缓存已被清除)
String secondUsername = toolConfigService.getParamValue(toolName, usernameParam);
String secondPassword = toolConfigService.getParamValue(toolName, passwordParam);
// 验证更新后的值
assertEquals(updatedUsername, secondUsername, "用户名应该被更新");
assertEquals(updatedPassword, secondPassword, "密码应该被更新");
System.out.println("ToolConfigService缓存功能测试通过!");
}
}
\ No newline at end of file
......@@ -25,27 +25,24 @@
<el-icon><ChatDotRound /></el-icon>
<span>智能对话</span>
</el-menu-item>
<el-menu-item index="/new-chat">
<el-icon><Plus /></el-icon>
<span>新聊天</span>
<el-menu-item index="/timer">
<el-icon><Timer /></el-icon>
<span>定时器管理</span>
</el-menu-item>
<el-sub-menu index="agent-management">
<template #title>
<el-icon><Avatar /></el-icon>
<el-icon><Document /></el-icon>
<span>Agent管理</span>
</template>
<el-menu-item index="/agent">
<el-icon><Setting /></el-icon>
<el-icon><Avatar /></el-icon>
<span>Agent管理</span>
</el-menu-item>
<el-menu-item index="/tools">
<el-icon><Tools /></el-icon>
<span>工具管理</span>
</el-menu-item>
<el-menu-item index="/timer">
<el-icon><Timer /></el-icon>
<span>定时器管理</span>
</el-menu-item>
<el-menu-item index="/documents">
<el-icon><Document /></el-icon>
<span>知识库</span>
......@@ -59,8 +56,12 @@
<el-sub-menu index="system-management">
<template #title>
<el-icon><Setting /></el-icon>
<span>系统配置</span>
<span>系统管理</span>
</template>
<el-menu-item index="/system">
<el-icon><Cpu /></el-icon>
<span>系统配置</span>
</el-menu-item>
<el-menu-item index="/llm-config">
<el-icon><Cpu /></el-icon>
<span>LLM配置</span>
......@@ -73,6 +74,10 @@
<el-icon><Monitor /></el-icon>
<span>DOM同步</span>
</el-menu-item>
<el-menu-item index="/new-chat">
<el-icon><Plus /></el-icon>
<span>新聊天</span>
</el-menu-item>
</el-sub-menu>
</el-menu>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment