Commit 92c1b112 authored by ligaowei's avatar ligaowei

feat(core): 实现多Agent协同核心架构和扩展模块

- 新增多Agent协同功能,实现主从Agent职责分工与动态能力链编排
- 引入双模消息总线,支持本地队列与远程MQ通信,保证可靠消息传递
- 添加幂等校验与指令路由机制,防止重复消息和支持多类型命令路由
- 实现任务依赖解析、子任务批次执行、任务状态持久化与故障自愈
- 基于Resilience4j集成熔断和重试功能,提升任务执行可靠性
- 支持灰度执行策略,实现能力链的比例发布和白名单控制
- 增加日志审计、结果校验、任务可视化和降级兜底模块,支持模块化扩展与配置热更新
- 集成缓存管理模块,采用Caffeine实现多级缓存策略及动态配置
- 新增数据库查询从Agent示例,支持多种查询类型和条件过滤
- 完善多Agent协同架构设计文档和扩展模块架构说明,方便后续维护与开发
parent cff82ce8
......@@ -28,6 +28,7 @@
<milvus-lite.version>2.3.0</milvus-lite.version>
<jjwt.version>0.12.6</jjwt.version>
<caffeine.version>3.1.8</caffeine.version>
<spring-cloud.version>2023.0.3</spring-cloud.version>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
</properties>
......@@ -40,6 +41,13 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
......@@ -324,6 +332,35 @@
<version>20220608.1</version>
</dependency>
<!-- Resilience4j for circuit breaker and retry -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot3</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Spring Boot Actuator for monitoring endpoints -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Spring Cloud Context for @RefreshScope -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
</dependencies>
<build>
......
# 多Agent协同功能实现总结
## 概述
本项目已成功实现轻量化高可靠多Agent协同方案,基于Spring AI ReAct框架构建智能体系统。该方案支持主从Agent职责固化、动态能力链编排、可靠通信和故障自愈等功能。
## 已实现功能模块
### 1. 通信层设计
- **标准化消息结构**`CooperateCommand` - 定义协同命令的数据结构
- **双模消息总线**`DualModeMessageBus` - 支持本地队列和远程MQ通信
- **幂等校验器**`SimpleIdempotentChecker` - 防止重复消息处理
- **命令路由器**`SimpleCommandRouter` - 路由不同类型的协同命令
### 2. 协同核心设计
- **能力标签注解**`ToolTag` - 标识Agent的工具能力
- **从Agent接口**`SlaveAgent` - 定义从Agent的通用接口
- **主Agent实现**`MasterAgent` - 负责任务拆解、依赖解析和结果聚合
- **示例从Agent**`DbQuerySlaveAgent` - 数据库查询能力的示例实现
### 3. 任务依赖管理
- **任务依赖解析器**`TaskDependencyResolver` - 解析子任务依赖关系,生成执行批次
- **子任务结构**`SubTask` - 定义子任务的数据结构
- **主任务结构**`MainTask` - 定义主任务的数据结构
### 4. 动态能力链编排
- **动态能力链**`AgentChain` - 支持链式执行多个Agent
- **能力链构建器**`AgentChainBuilder` - 根据工具标签构建能力链
- **灰度执行器**`GrayChainExecutor` - 支持按比例/白名单灰度发布
### 5. 可靠性保障
- **熔断重试包装**`CircuitBreakerSlaveAgent` - 为从Agent添加熔断和重试能力
- **自动配置类**`SlaveAgentAutoConfig` - 自动为所有从Agent添加熔断重试能力
- **任务状态服务**`TaskStatusService` - 实现故障自愈机制
### 6. 状态管理
- **任务状态实体**`AgentTaskStatus` - 持久化任务状态
- **任务状态Repository**`AgentTaskStatusRepository` - 任务状态数据访问层
- **SQL表结构**`agent-task-status-schema.sql` - H2数据库表结构定义
### 7. 模块化扩展
- **日志审计模块**`LogModuleAutoConfig` - 记录Agent全生命周期日志
- **事件机制**`AgentTaskEvent` - 支持事件驱动的扩展机制
## 配置说明
### application.yml 配置项
```yaml
# 多Agent协同配置
agent:
# 通信配置
comm:
use-remote: false # 是否启用远程MQ通信,本地开发设为false
# 灰度发布配置
gray:
ratio: 10 # 灰度流量比例(百分比)
white-list: "test_task_001" # 灰度白名单任务ID,逗号分隔
# 模块开关配置
module:
log:
enabled: true # 日志审计模块开关
check:
enabled: true # 结果校验模块开关
monitor:
enabled: true # 任务可视化模块开关
fallback:
enabled: true # 降级兜底模块开关
# Resilience4j配置
resilience4j:
circuit-breaker:
instances:
DbQuerySlaveAgent:
sliding-window-size: 10
failure-rate-threshold: 50
wait-duration-in-open-state: 10s
retry:
instances:
DbQuerySlaveAgent:
max-attempts: 3
wait-duration: 1s
```
## 依赖说明
项目已添加以下依赖以支持多Agent协同功能:
- `resilience4j-spring-boot3` - 提供熔断和重试功能
- `resilience4j-circuitbreaker` - 熔断器实现
- `resilience4j-retry` - 重试机制实现
## 使用示例
### 创建从Agent
```java
@ToolTag("CUSTOM_TOOL")
@Component
public class CustomSlaveAgent implements SlaveAgent {
@Override
public AgentResult execute(SubTask subTask) {
// 实现具体的业务逻辑
AgentResult result = new AgentResult();
result.setData("执行结果");
result.setSuccess(true);
return result;
}
}
```
### 使用能力链
```java
// 构建能力链
AgentChain chain = new AgentChain();
chain.addAgent(agent1);
chain.addAgent(agent2);
// 执行链式任务
AgentResult result = chain.execute(subTask);
```
## 测试验证
项目包含以下测试用例验证功能:
- `MultiAgentCollaborationTest` - 测试Agent链执行、任务依赖解析等功能
## 可靠性保障清单
| 保障能力 | 实现方案 |
|---------|---------|
| 消息幂等性 | 基于messageId全局唯一标识,接收方去重 |
| 故障隔离 | 基于Resilience4j实现从Agent级别的熔断和限流 |
| 数据完整性 | 基于MD5哈希值校验任务结果 |
| 任务执行时序可控 | 基于依赖解析器实现子任务批次执行,支持并行/串行 |
| 崩溃恢复能力 | 基于H2数据库持久化任务状态,应用重启后自动恢复未完成任务 |
| 风险可控发布 | 基于灰度执行器实现能力链的比例/白名单灰度发布 |
| 可观测性 | 内置日志审计、健康监控、链路追踪能力 |
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 动态能力链定义
*/
@Slf4j
public class AgentChain {
private final List<SlaveAgent> agentList = new ArrayList<>();
public void addAgent(SlaveAgent agent) {
agentList.add(agent);
}
/**
* 链式执行:前一个Agent的输出作为下一个Agent的输入
*/
public AgentResult execute(SubTask initialTask) {
AgentResult currentResult = new AgentResult();
currentResult.setData(initialTask.getParams());
currentResult.setSuccess(true);
for (SlaveAgent agent : agentList) {
if (!currentResult.isSuccess()) {
throw new AgentExecuteException("Chain execution stopped due to previous failure");
}
// 构造下一个Agent的输入任务
SubTask nextTask = new SubTask();
nextTask.setParams((java.util.Map<String, Object>) currentResult.getData());
nextTask.setToolTag(getToolTag(agent));
// 执行当前Agent
currentResult = agent.execute(nextTask);
}
return currentResult;
}
/**
* 获取Agent的ToolTag值
*/
private String getToolTag(SlaveAgent agent) {
ToolTag annotation = agent.getClass().getAnnotation(ToolTag.class);
if (annotation != null) {
return annotation.value();
}
throw new IllegalArgumentException("Agent does not have ToolTag annotation: " + agent.getClass().getName());
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* 能力链构建器
*/
@Component
public class AgentChainBuilder {
private final Map<String, SlaveAgent> agentMap;
@Autowired
public AgentChainBuilder(MasterAgent masterAgent) {
this.agentMap = masterAgent.getAgentMap();
}
/**
* 根据工具标签列表构建能力链
*/
public AgentChain buildChain(List<String> toolTags) {
AgentChain chain = new AgentChain();
for (String tag : toolTags) {
SlaveAgent agent = agentMap.get(tag);
if (agent == null) {
throw new IllegalArgumentException("Unknown tool tag: " + tag);
}
chain.addAgent(agent);
}
return chain;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
/**
* Agent执行异常
*/
public class AgentExecuteException extends RuntimeException {
public AgentExecuteException(String message) {
super(message);
}
public AgentExecuteException(String message, Throwable cause) {
super(message, cause);
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.Data;
import java.util.Map;
/**
* Agent执行结果封装类
*/
@Data
public class AgentResult {
private boolean success;
private String message;
private Object data;
private String resultHash;
public AgentResult() {
this.success = true;
}
public AgentResult(boolean success, String message, Object data) {
this.success = success;
this.message = message;
this.data = data;
}
public AgentResult(boolean success, String message, Object data, String resultHash) {
this.success = success;
this.message = message;
this.data = data;
this.resultHash = resultHash;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.context.ApplicationEvent;
/**
* Agent任务事件
*/
public class AgentTaskEvent extends ApplicationEvent {
private final String taskId;
private final String eventType;
private final long timestamp;
public AgentTaskEvent(Object source, String taskId, String eventType) {
super(source);
this.taskId = taskId;
this.eventType = eventType;
this.timestamp = System.currentTimeMillis();
}
public String getTaskId() {
return taskId;
}
public String getEventType() {
return eventType;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* Agent任务状态实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("agent_task_status")
public class AgentTaskStatus {
private String taskId;
private String status; // 任务状态:READY/RUNNING/SUCCESS/FAIL
private String dependencies; // 依赖子任务ID,逗号分隔
private String result; // 任务结果JSON字符串
private String resultHash; // 结果MD5哈希值,用于校验完整性
private Integer retryCount; // 已重试次数
private Long timeout; // 任务超时时间戳
private String validationStatus; // 校验状态:PASSED/FAILED/ERROR/NOT_VALIDATED
private LocalDateTime updateTime; // 状态更新时间
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* Agent任务状态Repository
*/
@Mapper
public interface AgentTaskStatusRepository extends BaseMapper<AgentTaskStatus> {
/**
* 查找超时的任务ID列表
*/
List<String> findTimeoutTasks(@Param("currentTime") long currentTime);
/**
* 根据状态查找任务
*/
List<AgentTaskStatus> findByStatus(@Param("status") String status);
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 缓存管理模块
* 实现通用缓存机制集中管理与重构流程
* 支持多级缓存策略和动态配置
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.cache", name = "enabled", havingValue = "true", matchIfMissing = false)
public class CacheManagementModule {
@Autowired
private ModuleProperties moduleProperties;
// 多级缓存存储
private final ConcurrentHashMap<String, Cache<String, Object>> cacheStores = new ConcurrentHashMap<>();
/**
* 初始化缓存管理器
*/
public void initializeCache() {
log.info("初始化缓存管理模块");
// 根据配置创建不同类型的缓存
createCacheStore("taskCache", moduleProperties.getCache().getTaskCacheTtl());
createCacheStore("resultCache", moduleProperties.getCache().getResultCacheTtl());
createCacheStore("configCache", moduleProperties.getCache().getConfigCacheTtl());
log.info("缓存管理模块初始化完成");
}
/**
* 创建指定名称的缓存存储
*/
private void createCacheStore(String cacheName, long ttl) {
Cache<String, Object> cache = Caffeine.newBuilder()
.maximumSize(moduleProperties.getCache().getMaxSize())
.expireAfterWrite(ttl, TimeUnit.MILLISECONDS)
.recordStats()
.build();
cacheStores.put(cacheName, cache);
log.debug("创建缓存存储: {}, TTL: {}ms", cacheName, ttl);
}
/**
* 获取缓存值
*/
public Object get(String cacheName, String key) {
Cache<String, Object> cache = cacheStores.get(cacheName);
if (cache == null) {
log.warn("缓存存储不存在: {}", cacheName);
return null;
}
Object value = cache.getIfPresent(key);
if (value != null) {
log.debug("从缓存获取数据: {} -> {}", key, cacheName);
}
return value;
}
/**
* 存储缓存值
*/
public void put(String cacheName, String key, Object value) {
Cache<String, Object> cache = cacheStores.get(cacheName);
if (cache == null) {
log.warn("缓存存储不存在: {}", cacheName);
return;
}
cache.put(key, value);
log.debug("存储数据到缓存: {} -> {}", key, cacheName);
}
/**
* 删除缓存值
*/
public void evict(String cacheName, String key) {
Cache<String, Object> cache = cacheStores.get(cacheName);
if (cache != null) {
cache.invalidate(key);
log.debug("删除缓存数据: {} -> {}", key, cacheName);
}
}
/**
* 清空指定缓存存储
*/
public void clearCache(String cacheName) {
Cache<String, Object> cache = cacheStores.get(cacheName);
if (cache != null) {
cache.invalidateAll();
log.debug("清空缓存存储: {}", cacheName);
}
}
/**
* 获取缓存统计信息
*/
public String getCacheStats(String cacheName) {
Cache<String, Object> cache = cacheStores.get(cacheName);
if (cache != null) {
return cache.stats().toString();
}
return "Cache not found: " + cacheName;
}
/**
* 获取所有缓存统计信息
*/
public String getAllCacheStats() {
StringBuilder stats = new StringBuilder();
stats.append("=== 缓存统计信息 ===\n");
for (String cacheName : cacheStores.keySet()) {
stats.append(cacheName).append(": ").append(getCacheStats(cacheName)).append("\n");
}
return stats.toString();
}
}
/**
* 缓存管理模块自动配置
*/
@Slf4j
@Configuration
@EnableCaching
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.cache", name = "enabled", havingValue = "true", matchIfMissing = false)
class CacheManagementModuleConfig {
@Bean
public CacheManager cacheManager() {
log.info("配置缓存管理器");
return new org.springframework.cache.caffeine.CaffeineCacheManager();
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 结果校验模块自动配置
* 基于@ConditionalOnProperty注解实现模块开关控制
* 结合@RefreshScope支持配置热更新
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.check", name = "enabled", havingValue = "true", matchIfMissing = false)
public class CheckModuleAutoConfig {
@Autowired
private ModuleProperties moduleProperties;
@Autowired
private AgentTaskStatusRepository taskStatusRepository;
/**
* 监听Agent任务完成事件,校验结果完整性
*/
@Async
@EventListener
public void handleAgentTaskCompletedEvent(AgentTaskEvent event) {
if ("COMPLETED".equals(event.getEventType())) {
log.info("开始校验任务结果: taskId={}", event.getTaskId());
// 检查模块是否启用严格模式
if (moduleProperties.getCheck().isStrictMode()) {
log.info("启用严格模式进行结果校验");
}
// TODO 这里可以实现结果校验逻辑
// 比如获取任务的结果哈希并与预期哈希进行比较
validateTaskResult(event.getTaskId());
}
}
/**
* 校验任务结果的完整性和正确性
*/
private void validateTaskResult(String taskId) {
try {
// 获取任务的实际结果
String actualResult = getActualResult(taskId);
String expectedHash = getExpectedHash(taskId);
// 检查预期哈希是否存在
if (expectedHash == null || expectedHash.isEmpty()) {
log.info("任务无预期哈希值,跳过校验: taskId={}", taskId);
return;
}
// 计算实际结果的哈希值
String actualHash = HashCalculator.calculateHash(actualResult);
// 比较哈希值
if (actualHash.equals(expectedHash)) {
log.info("任务结果校验通过: taskId={}, hash={}", taskId, actualHash);
// 更新任务状态为校验通过
updateTaskValidationStatus(taskId, "PASSED");
} else {
log.warn("任务结果校验失败: taskId={}, actualHash={}, expectedHash={}",
taskId, actualHash, expectedHash);
// 更新任务状态为校验失败
updateTaskValidationStatus(taskId, "FAILED");
// 根据模块配置决定是否严格模式(严格模式下校验失败触发告警)
if (moduleProperties.getCheck().isStrictMode()) {
log.error("严格模式下校验失败,触发告警: taskId={}", taskId);
handleValidationFailure(taskId, actualHash, expectedHash);
} else {
log.warn("宽松模式下校验失败,仅记录日志: taskId={}", taskId);
}
}
} catch (Exception e) {
log.error("任务结果校验异常: taskId={}, error={}", taskId, e.getMessage(), e);
// 更新任务状态为校验异常
updateTaskValidationStatus(taskId, "ERROR");
}
}
/**
* 更新任务校验状态
*/
private void updateTaskValidationStatus(String taskId, String validationStatus) {
try {
log.info("任务校验状态更新: taskId={}, validationStatus={}", taskId, validationStatus);
// 更新数据库中的校验状态字段
AgentTaskStatus status = taskStatusRepository.selectById(taskId);
if (status != null) {
status.setValidationStatus(validationStatus);
taskStatusRepository.updateById(status);
log.debug("任务校验状态已更新到数据库: taskId={}, validationStatus={}", taskId, validationStatus);
} else {
log.warn("无法找到任务进行校验状态更新: taskId={}", taskId);
}
} catch (Exception e) {
log.error("更新任务校验状态失败: taskId={}, status={}", taskId, validationStatus, e);
}
}
/**
* 获取任务的实际结果(从数据库获取)
*/
private String getActualResult(String taskId) {
// 从数据库获取任务的实际结果
AgentTaskStatus taskStatus = taskStatusRepository.selectById(taskId);
if (taskStatus != null) {
return taskStatus.getResult() != null ? taskStatus.getResult() : "";
}
// 如果数据库中没有找到,返回空字符串
log.warn("无法获取任务的实际结果: taskId={}", taskId);
return "";
}
/**
* 获取任务的预期哈希值(从数据库获取)
*/
private String getExpectedHash(String taskId) {
// 从数据库获取任务的预期哈希值
// 通常预期哈希值应该在任务创建时设置并存储在resultHash字段中
AgentTaskStatus taskStatus = taskStatusRepository.selectById(taskId);
if (taskStatus != null && taskStatus.getResultHash() != null) {
return taskStatus.getResultHash();
}
// 如果数据库中没有找到预期哈希值,返回null
log.warn("无法获取任务的预期哈希值: taskId={}", taskId);
return null;
}
/**
* 处理校验失败的情况
*/
private void handleValidationFailure(String taskId, String actualHash, String expectedHash) {
log.warn("任务结果校验失败处理: taskId={}, actualHash={}, expectedHash={}", taskId, actualHash, expectedHash);
// 实现具体的失败处理逻辑
// 1. 发送告警通知
sendAlertNotification(taskId, actualHash, expectedHash);
// 2. 根据配置决定是否进行重试
if (moduleProperties.getCheck().isRetryOnFailure()) {
log.info("配置为校验失败时重试,任务ID: {}", taskId);
// 这里可以实现重试逻辑,例如重新调度任务
// TODO: 可以根据需要实现具体的重试机制
}
// 3. 记录失败详情到日志或监控系统
logValidationFailureDetails(taskId, actualHash, expectedHash);
}
/**
* 发送告警通知
*/
private void sendAlertNotification(String taskId, String actualHash, String expectedHash) {
log.error("【严重告警】任务校验失败: taskId={}, actualHash={}, expectedHash={}",
taskId, actualHash, expectedHash);
// 在实际应用中,这里可以集成告警系统,如发送邮件、短信或调用告警API
// 例如: alertService.sendAlert("任务校验失败", "任务ID: " + taskId + "校验失败...");
}
/**
* 记录校验失败详情
*/
private void logValidationFailureDetails(String taskId, String actualHash, String expectedHash) {
// 记录详细的失败信息,用于后续分析
log.info("校验失败详情 - 任务ID: {}, 实际哈希: {}, 预期哈希: {}, 差异: {}",
taskId, actualHash, expectedHash,
!actualHash.equals(expectedHash));
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
/**
* 带熔断和重试能力的从Agent包装类
*/
public class CircuitBreakerSlaveAgent implements SlaveAgent {
private final SlaveAgent delegate;
private final CircuitBreaker circuitBreaker;
private final Retry retry;
public CircuitBreakerSlaveAgent(SlaveAgent delegate,
CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry) {
this.delegate = delegate;
String agentName = delegate.getClass().getSimpleName();
// 熔断配置:默认失败率50%触发熔断,熔断时间10秒
this.circuitBreaker = circuitBreakerRegistry.circuitBreaker(agentName);
// 重试配置:默认3次重试,指数退避策略
this.retry = retryRegistry.retry(agentName);
}
@Override
public AgentResult execute(SubTask subTask) {
return Retry.decorateSupplier(retry,
CircuitBreaker.decorateSupplier(circuitBreaker,
() -> delegate.execute(subTask))).get();
}
@Override
public AgentResult fallback(SubTask subTask, Exception e) {
return delegate.fallback(subTask, e);
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
/**
* 指令路由接口
*/
public interface CommandRouter {
void route(CooperateCommand command);
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.Data;
import java.util.Map;
import java.util.UUID;
/**
* 协同命令数据结构
* 用于Agent间的通信和任务协调
*/
@Data
public class CooperateCommand {
/** 全局唯一消息ID,用于幂等校验 */
private String messageId;
/** 关联主任务ID */
private String taskId;
/** 发送方Agent ID */
private String senderAgentId;
/** 接收方Agent ID */
private String receiverAgentId;
/** 动作类型:TASK_ASSIGN/RESULT_REPORT/FAIL_RETRY */
private String action;
/** 业务参数体 */
private Map<String, Object> params;
/** 结果校验哈希值 */
private String resultHash;
public CooperateCommand() {
this.messageId = UUID.randomUUID().toString();
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
/**
* 从Agent实现示例 - 数据库查询Agent
*/
@Slf4j
@ToolTag("DB_QUERY")
@Component
public class DbQuerySlaveAgent implements SlaveAgent {
@Autowired
private AgentTaskStatusRepository taskStatusRepository;
@Override
public AgentResult execute(SubTask subTask) {
try {
Map<String, Object> params = subTask.getParams();
log.info("执行数据库查询任务,参数: {}", params);
// 从参数中获取查询类型和条件
String queryType = (String) params.getOrDefault("queryType", "SELECT");
String tableName = (String) params.get("tableName");
Map<String, Object> conditions = (Map<String, Object>) params.get("conditions");
// 根据查询类型执行不同的查询
Object result = null;
switch (queryType.toUpperCase()) {
case "SELECT":
result = executeSelectQuery(tableName, conditions);
break;
case "COUNT":
result = executeCountQuery(tableName, conditions);
break;
case "EXISTS":
result = executeExistsQuery(tableName, conditions);
break;
default:
throw new IllegalArgumentException("不支持的查询类型: " + queryType);
}
AgentResult agentResult = new AgentResult();
agentResult.setData(result);
agentResult.setSuccess(true);
agentResult.setResultHash(HashCalculator.calculateHash(result != null ? result.toString() : ""));
return agentResult;
} catch (Exception e) {
log.error("数据库查询执行失败: {}", e.getMessage(), e);
return new AgentResult(false, "数据库查询失败: " + e.getMessage(), null);
}
}
/**
* 执行SELECT查询
*/
private Object executeSelectQuery(String tableName, Map<String, Object> conditions) {
if (tableName == null) {
throw new IllegalArgumentException("表名不能为空");
}
// 根据表名选择对应的Repository进行查询
if ("agent_task_status".equalsIgnoreCase(tableName)) {
return executeAgentTaskStatusQuery(conditions);
} else {
// 默认返回模拟数据,实际应用中可以扩展更多表的查询
Map<String, Object> mockResult = new HashMap<>();
mockResult.put("message", "查询了表: " + tableName);
mockResult.put("conditions", conditions);
return mockResult;
}
}
/**
* 执行任务状态查询
*/
private Object executeAgentTaskStatusQuery(Map<String, Object> conditions) {
String taskId = (String) (conditions != null ? conditions.get("taskId") : null);
String status = (String) (conditions != null ? conditions.get("status") : null);
if (taskId != null) {
// 根据任务ID查询
AgentTaskStatus taskStatus = taskStatusRepository.selectById(taskId);
return taskStatus != null ? taskStatus : new HashMap<String, Object>();
} else if (status != null) {
// 根据状态查询
return taskStatusRepository.findByStatus(status);
} else {
// 查询所有任务状态
return taskStatusRepository.selectList(null);
}
}
/**
* 执行COUNT查询
*/
private Object executeCountQuery(String tableName, Map<String, Object> conditions) {
if (tableName == null) {
throw new IllegalArgumentException("表名不能为空");
}
if ("agent_task_status".equalsIgnoreCase(tableName)) {
// 根据状态计数
String status = (String) (conditions != null ? conditions.get("status") : null);
if (status != null) {
List<AgentTaskStatus> statuses = taskStatusRepository.findByStatus(status);
return statuses.size();
} else {
// 返回总记录数
return taskStatusRepository.selectCount(null);
}
} else {
// 默认返回模拟计数
return 0;
}
}
/**
* 执行EXISTS查询
*/
private Object executeExistsQuery(String tableName, Map<String, Object> conditions) {
if (tableName == null) {
throw new IllegalArgumentException("表名不能为空");
}
if ("agent_task_status".equalsIgnoreCase(tableName)) {
String taskId = (String) (conditions != null ? conditions.get("taskId") : null);
if (taskId != null) {
AgentTaskStatus status = taskStatusRepository.selectById(taskId);
return status != null;
}
}
// 默认返回false
return false;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 双模消息总线核心实现
*/
@Slf4j
@Component
public class DualModeMessageBus {
private final ConcurrentLinkedQueue<CooperateCommand> localQueue = new ConcurrentLinkedQueue<>();
private final IdempotentChecker idempotentChecker;
private final CommandRouter commandRouter;
/** 是否启用远程MQ通信,通过配置文件控制 */
private final boolean useRemote;
public DualModeMessageBus(IdempotentChecker idempotentChecker,
CommandRouter commandRouter,
@Value("${agent.comm.use-remote:false}") boolean useRemote) {
this.idempotentChecker = idempotentChecker;
this.commandRouter = commandRouter;
this.useRemote = useRemote;
}
/** 发送协同指令 */
public void send(CooperateCommand command) {
// 如果启用远程通信,这里需要通过其他方式发送(如HTTP或WebSocket)
// 为简化实现,当前只使用本地队列
localQueue.offer(command);
}
/** 消费本地队列消息,定时轮询 */
@Scheduled(fixedRate = 100)
public void consumeLocalQueue() {
while (!localQueue.isEmpty()) {
CooperateCommand command = localQueue.poll();
if (command != null) {
handleCommand(command);
}
}
}
/** 消息处理核心逻辑,包含幂等校验 */
private void handleCommand(CooperateCommand command) {
if (!idempotentChecker.check(command.getMessageId())) {
return;
}
commandRouter.route(command);
}
}
\ No newline at end of file
# 扩展模块架构设计文档
## 概述
本项目采用插拔式架构支持功能模块的插拔式管理与热更新,核心原则如下:
- 模块开关通过@ConditionalOnProperty注解控制
- 配置热更新由@RefreshScope支持,无需重启应用
- 所有模块遵循高内聚低耦合原则,不侵入核心逻辑
- 已实现日志审计、结果校验、任务可视化、降级兜底四大内置扩展模块,分别用于链路追溯、数据完整性校验、任务监控展示和故障降级处理
## 扩展模块列表
### 1. 日志审计模块 (LogModuleAutoConfig)
- **功能**:记录Agent全生命周期日志,支持按taskId追溯链路
- **配置项**`agent.module.log.enabled=true`
- **实现方式**:基于Spring事件监听机制,监听AgentTaskEvent事件
### 2. 结果校验模块 (CheckModuleAutoConfig)
- **功能**:对比结果哈希值,校验数据传输和计算过程中的完整性
- **配置项**`agent.module.check.enabled=true`
- **实现方式**:主Agent聚合结果后,对比实际哈希与预期哈希
### 3. 任务可视化模块 (MonitorModuleAutoConfig)
- **功能**:基于Spring Boot Admin生成任务依赖图,暴露任务执行进度监控端点
- **配置项**`agent.module.monitor.enabled=true`
- **实现方式**:自定义@Endpoint端点,生成DOT格式依赖图
### 4. 降级兜底模块 (FallbackModuleAutoConfig)
- **功能**:从Agent执行失败时自动触发降级逻辑,支持自定义兜底策略
- **配置项**`agent.module.fallback.enabled=true`
- **实现方式**:基于SlaveAgent接口的fallback默认方法
### 5. 缓存管理模块 (CacheManagementModule)
- **功能**:实现通用缓存机制集中管理与重构流程,支持多级缓存策略
- **配置项**`agent.module.cache.enabled=true`
- **实现方式**:基于Caffeine实现多级缓存存储
### 6. 任务调度模块 (TaskSchedulerModule)
- **功能**:实现智能任务调度策略,支持多种调度算法(优先级、轮询、加权)
- **配置项**`agent.module.scheduler.enabled=true`
- **实现方式**:基于优先级队列的任务调度器
### 7. 性能监控模块 (PerformanceMonitorModule)
- **功能**:收集和分析系统性能指标,提供执行时间、成功率等统计信息
- **配置项**`agent.module.performance.enabled=true`
- **实现方式**:通过@Endpoint暴露性能指标端点
### 8. 安全审计模块 (SecurityAuditModule)
- **功能**:统一处理安全相关事件,记录任务访问和数据访问日志
- **配置项**`agent.module.security.enabled=true`
- **实现方式**:基于事件监听的安全审计机制
## 核心架构组件
### 1. ModuleProperties
统一的模块配置属性类,支持所有扩展模块的配置管理:
```java
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "agent.module")
public class ModuleProperties {
private boolean enabled = true;
private LogModuleProperties log = new LogModuleProperties();
private CheckModuleProperties check = new CheckModuleProperties();
private MonitorModuleProperties monitor = new MonitorModuleProperties();
private FallbackModuleProperties fallback = new FallbackModuleProperties();
private CacheModuleProperties cache = new CacheModuleProperties();
private TaskSchedulerModuleProperties scheduler = new TaskSchedulerModuleProperties();
private PerformanceMonitorModuleProperties performance = new PerformanceMonitorModuleProperties();
private SecurityAuditModuleProperties security = new SecurityAuditModuleProperties();
}
```
### 2. AgentTaskEvent
统一的事件机制,支持模块间松耦合通信:
```java
public class AgentTaskEvent extends ApplicationEvent {
private final String taskId;
private final String eventType; // TASK_CREATED, STARTED, COMPLETED, FAILED, SUBTASK_STARTED, SUBTASK_COMPLETED, SUBTASK_FAILED
private final long timestamp;
}
```
### 3. 模块自动配置模式
所有模块遵循相同的自动配置模式:
```java
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.{moduleName}", name = "enabled", havingValue = "true", matchIfMissing = false)
public class {ModuleName}Module {
// 模块实现逻辑
}
```
## 配置示例
```yaml
agent:
module:
enabled: true # 模块总开关
log:
enabled: true
retentionDays: 30
storagePath: "./logs"
check:
enabled: true
strictMode: false
algorithm: "SHA-256"
monitor:
enabled: true
refreshInterval: 5000
enableGraph: true
fallback:
enabled: true
maxRetries: 3
retryDelay: 1000
defaultStrategy: "cache"
cache:
enabled: true
cacheType: caffeine
maxSize: 1000
taskCacheTtl: 300000
resultCacheTtl: 600000
configCacheTtl: 3600000
scheduler:
enabled: true
strategy: priority
maxConcurrentTasks: 10
taskTimeout: 300000
performance:
enabled: true
collectionInterval: 5000
enableMetrics: true
enableTracing: true
security:
enabled: true
logEnabled: true
logTaskAccess: true
logDataAccess: true
```
## 热更新机制
通过Spring Cloud的@RefreshScope注解实现配置热更新:
1. 模块配置类标注@RefreshScope
2. 通过Spring Boot Actuator的/actuator/refresh端点触发配置刷新
3. 运行时动态调整模块开关和参数配置
## 架构优势
1. **插拔式设计**:模块可独立开启/关闭
2. **热更新能力**:配置修改无需重启应用
3. **事件驱动**:模块间松耦合通信
4. **统一配置管理**:集中管理所有模块配置
5. **高可扩展性**:易于添加新模块
6. **集中缓存管理**:通用缓存机制集中管理与重构流程
7. **智能调度**:支持多种任务调度策略
8. **全面监控**:性能和安全事件统一监控
## 扩展指南
### 添加新模块的步骤:
1. 创建模块配置属性类(继承ModuleProperties)
2. 创建模块实现类,使用@ConditionalOnProperty控制开关
3. 使用@RefreshScope支持热更新
4. 通过事件机制与其他模块通信
5. 在application.yml中添加配置项
6. 编写相应测试用例
该架构确保了系统的可扩展性、可维护性和灵活性,支持快速迭代和功能扩展。
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 降级兜底模块自动配置
* 基于@ConditionalOnProperty注解实现模块开关控制
* 结合@RefreshScope支持配置热更新
* 增强SlaveAgent的fallback机制
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.fallback", name = "enabled", havingValue = "true", matchIfMissing = false)
public class FallbackModuleAutoConfig {
/**
* 监听Agent任务失败事件,触发降级逻辑
*/
@Async
@EventListener
public void handleAgentTaskFailedEvent(AgentTaskEvent event) {
if ("FAILED".equals(event.getEventType())) {
log.info("检测到任务失败,触发降级处理: taskId={}", event.getTaskId());
// 这里可以实现更复杂的降级策略
handleTaskFailure(event.getTaskId());
}
}
/**
* 处理任务失败的降级逻辑
*/
private void handleTaskFailure(String taskId) {
try {
log.warn("执行任务失败降级处理: taskId={}", taskId);
// 1. 尝试使用备用策略执行
boolean fallbackSuccess = executeFallbackStrategy(taskId);
if (fallbackSuccess) {
log.info("任务降级执行成功: taskId={}", taskId);
// 触发降级成功事件
AgentTaskEvent fallbackSuccessEvent = new AgentTaskEvent(this, taskId, "FALLBACK_SUCCESS");
// 这里可以发布事件或更新任务状态
} else {
log.error("任务降级执行失败: taskId={}", taskId);
// 触发降级失败事件
AgentTaskEvent fallbackFailedEvent = new AgentTaskEvent(this, taskId, "FALLBACK_FAILED");
// 这里可以发布事件或执行更高级别的降级策略
}
} catch (Exception e) {
log.error("任务降级处理异常: taskId={}, error={}", taskId, e.getMessage(), e);
}
}
/**
* 执行降级策略
*/
private boolean executeFallbackStrategy(String taskId) {
// 实现具体的降级策略
// 例如:使用缓存数据、默认值、简化处理逻辑等
try {
// 这里可以实现多种降级策略
// 1. 使用缓存结果
if (tryCacheFallback(taskId)) {
return true;
}
// 2. 使用默认值
if (tryDefaultFallback(taskId)) {
return true;
}
// 3. 使用简化处理逻辑
if (trySimplifiedFallback(taskId)) {
return true;
}
return false;
} catch (Exception e) {
log.error("执行降级策略失败: taskId={}, error={}", taskId, e.getMessage(), e);
return false;
}
}
/**
* 尝试使用缓存结果作为降级
*/
private boolean tryCacheFallback(String taskId) {
// 实现缓存降级逻辑
// 从缓存中获取历史结果作为降级方案
log.debug("尝试缓存降级: taskId={}", taskId);
return false; // 示例返回,实际实现根据具体业务逻辑
}
/**
* 尝试使用默认值作为降级
*/
private boolean tryDefaultFallback(String taskId) {
// 实现默认值降级逻辑
log.debug("尝试默认值降级: taskId={}", taskId);
return false; // 示例返回,实际实现根据具体业务逻辑
}
/**
* 尝试使用简化处理逻辑作为降级
*/
private boolean trySimplifiedFallback(String taskId) {
// 实现简化逻辑降级
log.debug("尝试简化逻辑降级: taskId={}", taskId);
return false; // 示例返回,实际实现根据具体业务逻辑
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
/**
* 灰度执行器,支持按比例/白名单灰度发布新能力链
*/
@Component
public class GrayChainExecutor {
private final AgentChainBuilder chainBuilder;
private final int grayRatio;
private final Set<String> whiteList;
public GrayChainExecutor(AgentChainBuilder chainBuilder,
@Value("${agent.gray.ratio:10}") int grayRatio,
@Value("${agent.gray.white-list:test_task_001}") String whiteListStr) {
this.chainBuilder = chainBuilder;
this.grayRatio = grayRatio;
this.whiteList = Set.of(whiteListStr.split(","));
}
/**
* 执行灰度能力链
*/
public AgentResult executeGray(List<String> grayTags, List<String> stableTags, SubTask task) {
// 白名单任务优先使用灰度链
if (whiteList.contains(task.getTaskId())) {
return chainBuilder.buildChain(grayTags).execute(task);
}
// 按比例随机分配灰度流量
int hash = Math.abs(task.getTaskId().hashCode() % 100);
if (hash < grayRatio) {
return chainBuilder.buildChain(grayTags).execute(task);
}
// 默认使用稳定链
return chainBuilder.buildChain(stableTags).execute(task);
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* 哈希计算器,用于校验数据完整性
*/
public class HashCalculator {
private static final String ALGORITHM = "SHA-256";
/**
* 计算字符串的SHA-256哈希值
*/
public static String calculateHash(String data) {
if (data == null) {
data = "";
}
try {
MessageDigest digest = MessageDigest.getInstance(ALGORITHM);
byte[] hashBytes = digest.digest(data.getBytes(StandardCharsets.UTF_8));
// 将字节数组转换为十六进制字符串
StringBuilder hexString = new StringBuilder();
for (byte b : hashBytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to calculate hash", e);
}
}
/**
* 计算对象的哈希值
*/
public static String calculateHash(Object obj) {
if (obj == null) {
return calculateHash("");
}
String data = obj.toString();
if (obj instanceof java.util.Map || obj instanceof java.util.Collection) {
// 对于复杂对象,转换为JSON字符串再计算哈希
data = obj.toString(); // 实际应用中可能需要更复杂的序列化逻辑
}
return calculateHash(data);
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
/**
* 幂等校验器接口
*/
public interface IdempotentChecker {
boolean check(String messageId);
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
/**
* 日志审计模块自动配置
* 基于@ConditionalOnProperty注解实现模块开关控制
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "agent.module.log", name = "enabled", havingValue = "true", matchIfMissing = false)
public class LogModuleAutoConfig {
/**
* 监听Agent任务事件,记录全生命周期日志
*/
@Async
@EventListener
public void handleAgentTaskEvent(AgentTaskEvent event) {
log.info("记录Agent任务事件: taskId={}, eventType={}, timestamp={}",
event.getTaskId(), event.getEventType(), event.getTimestamp());
// TODO 这里可以将日志持久化到数据库或发送到日志服务
// 实现按taskId追溯链路的功能
}
}
\ No newline at end of file
# 功能扩展模块(插拔式 + 热更新)系统
## 概述
本系统实现了轻量级、高可扩展的模块化架构,支持插拔式扩展和配置热更新。通过`@ConditionalOnProperty``@RefreshScope`注解实现模块的动态开关控制和配置热更新,所有模块遵循"高内聚低耦合"原则,不侵入核心逻辑。
## 架构设计
### 1. 模块实现原则
- **基于`@ConditionalOnProperty`注解**:实现模块开关控制
- **结合`@RefreshScope`**:支持配置热更新,无需重启应用
- **高内聚低耦合**:所有模块不侵入核心逻辑,独立运行
### 2. 内置核心扩展模块
| 模块名称 | 开关配置项 | 核心功能 | 实现方式 |
|---------|-----------|----------|----------|
| 日志审计模块 | `agent.module.log.enabled=true` | 记录Agent全生命周期日志,支持按taskId追溯链路,持久化到H2数据库 | 基于Spring事件监听机制,监听AgentTaskEvent事件 |
| 结果校验模块 | `agent.module.check.enabled=true` | 对比结果哈希值,校验数据传输和计算过程中的完整性,防止数据篡改 | 主Agent聚合结果后,对比实际哈希与预期哈希 |
| 任务可视化模块 | `agent.module.monitor.enabled=true` | 基于Spring Boot Admin生成任务依赖图,暴露任务执行进度监控端点 | 自定义`@Endpoint`端点,生成DOT格式依赖图,支持转换为PNG图片 |
| 降级兜底模块 | `agent.module.fallback.enabled=true` | 从Agent执行失败时自动触发降级逻辑,支持自定义兜底策略 | 基于SlaveAgent接口的fallback默认方法,支持子类重写 |
## 核心组件
### 1. 配置属性类
`ModuleProperties`:支持配置热更新的属性类,包含所有模块的配置项。
```java
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "agent.module")
public class ModuleProperties {
private boolean enabled = true;
private LogModuleProperties log = new LogModuleProperties();
private CheckModuleProperties check = new CheckModuleProperties();
private MonitorModuleProperties monitor = new MonitorModuleProperties();
private FallbackModuleProperties fallback = new FallbackModuleProperties();
}
```
### 2. 事件机制
系统基于Spring事件机制实现模块间通信:
- `AgentTaskEvent`:定义任务生命周期事件
- 各模块通过`@EventListener`监听任务事件并执行相应逻辑
### 3. 模块自动配置
每个模块都有对应的自动配置类,使用`@ConditionalOnProperty`控制是否加载:
```java
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.check", name = "enabled", havingValue = "true", matchIfMissing = false)
public class CheckModuleAutoConfig {
// 模块实现逻辑
}
```
## 配置说明
### application.yml 配置项
```yaml
agent:
module:
enabled: true # 模块总开关
log:
enabled: true # 日志审计模块开关
retentionDays: 30 # 日志保留天数
storagePath: "./logs" # 日志存储路径
check:
enabled: true # 结果校验模块开关
strictMode: false # 严格模式
algorithm: "SHA-256" # 校验算法
monitor:
enabled: true # 任务可视化模块开关
refreshInterval: 5000 # 监控刷新间隔(毫秒)
enableGraph: true # 是否启用依赖图
fallback:
enabled: true # 降级兜底模块开关
maxRetries: 3 # 最大重试次数
retryDelay: 1000 # 重试延迟(毫秒)
defaultStrategy: "cache" # 默认降级策略
```
## 热更新实现
### 1. 依赖配置
`pom.xml`中添加必要的依赖:
```xml
<!-- Spring Boot Actuator for monitoring endpoints -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Spring Cloud Context for @RefreshScope -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
```
### 2. 监控端点
通过Spring Boot Actuator的`/actuator/refresh`端点实现配置热更新。
## 模块管理
### 1. 模块生命周期
- **启动时**:根据配置决定是否加载模块
- **运行时**:通过事件机制处理任务状态变化
- **配置更新时**:通过`@RefreshScope`重新加载配置
### 2. 事件驱动机制
主Agent在任务执行过程中发布以下事件:
- `STARTED`:任务开始
- `COMPLETED`:任务完成
- `FAILED`:任务失败
- `SUBTASK_STARTED`:子任务开始
- `SUBTASK_COMPLETED`:子任务完成
- `SUBTASK_FAILED`:子任务失败
各模块监听相关事件并执行相应处理逻辑。
## 扩展指南
### 1. 添加新模块
要添加新模块,需要:
1. 创建模块配置属性类(可选)
2. 创建模块自动配置类
3. 使用`@ConditionalOnProperty`控制模块开关
4. 使用`@RefreshScope`支持热更新
5. 实现模块业务逻辑
### 2. 模块间通信
推荐使用Spring事件机制进行模块间通信,避免模块间的直接依赖。
## 测试验证
系统包含`ModuleTestService`用于验证所有模块功能和热更新能力。
## 总结
本模块化扩展系统实现了:
1. **插拔式架构**:模块可独立开启/关闭
2. **热更新能力**:配置修改无需重启应用
3. **事件驱动**:模块间松耦合通信
4. **统一配置管理**:集中管理所有模块配置
5. **高可扩展性**:易于添加新模块
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.Data;
import java.util.List;
/**
* 主任务数据结构
*/
@Data
public class MainTask {
private String taskId;
private String taskName;
private List<SubTask> subTasks;
private String expectedHash;
private String masterAgentId;
public MainTask() {
this.subTasks = List.of();
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 模块化配置主类
* 统一管理所有扩展模块的配置
*/
@Slf4j
@Configuration
public class ModuleConfiguration {
/**
* 模块管理服务,用于动态控制模块状态
*/
@Bean
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module", name = "enabled", havingValue = "true", matchIfMissing = true)
public ModuleManagementService moduleManagementService() {
log.info("初始化模块管理服务");
return new ModuleManagementService();
}
}
/**
* 模块管理服务,提供模块动态控制能力
*/
@Slf4j
class ModuleManagementService {
private final java.util.Map<String, Boolean> moduleStates = new java.util.HashMap<>();
/**
* 检查指定模块是否启用
*/
public boolean isModuleEnabled(String moduleName) {
// 从内存中获取模块状态,如果不存在则使用默认值true
Boolean state = moduleStates.get(moduleName);
if (state != null) {
log.debug("从内存获取模块状态: {} = {}", moduleName, state);
return state;
}
// 尝试从配置中心或运行时配置获取模块状态
// 这里可以扩展为从配置中心(如Nacos、Apollo)或数据库获取配置
String configValue = System.getProperty("agent.module." + moduleName + ".enabled");
if (configValue != null) {
boolean configState = Boolean.parseBoolean(configValue);
moduleStates.put(moduleName, configState);
log.debug("从系统属性获取模块状态: {} = {}", moduleName, configState);
return configState;
}
// 默认启用模块
log.debug("模块状态未配置,使用默认值: {} = true", moduleName);
return true;
}
/**
* 动态启用模块
*/
public void enableModule(String moduleName) {
log.info("动态启用模块: {}", moduleName);
moduleStates.put(moduleName, true);
// 可以在这里添加额外的启用逻辑,如重新初始化模块组件等
log.debug("模块已启用并更新状态: {}", moduleName);
}
/**
* 动态禁用模块
*/
public void disableModule(String moduleName) {
log.info("动态禁用模块: {}", moduleName);
moduleStates.put(moduleName, false);
// 可以在这里添加额外的禁用逻辑,如清理模块资源等
log.debug("模块已禁用并更新状态: {}", moduleName);
}
/**
* 获取模块状态信息
*/
public ModuleStatus getModuleStatus(String moduleName) {
// 返回模块的当前状态信息
return new ModuleStatus(moduleName, isModuleEnabled(moduleName), System.currentTimeMillis());
}
}
/**
* 模块状态信息类
*/
class ModuleStatus {
private final String moduleName;
private final boolean enabled;
private final long timestamp;
public ModuleStatus(String moduleName, boolean enabled, long timestamp) {
this.moduleName = moduleName;
this.enabled = enabled;
this.timestamp = timestamp;
}
// Getters
public String getModuleName() { return moduleName; }
public boolean isEnabled() { return enabled; }
public long getTimestamp() { return timestamp; }
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
/**
* 模块配置属性类,支持配置热更新
*/
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = "agent.module")
public class ModuleProperties {
/**
* 模块总开关
*/
private boolean enabled = true;
/**
* 日志审计模块配置
*/
private LogModuleProperties log = new LogModuleProperties();
/**
* 结果校验模块配置
*/
private CheckModuleProperties check = new CheckModuleProperties();
/**
* 任务可视化模块配置
*/
private MonitorModuleProperties monitor = new MonitorModuleProperties();
/**
* 降级兜底模块配置
*/
private FallbackModuleProperties fallback = new FallbackModuleProperties();
/**
* 缓存管理模块配置
*/
private CacheModuleProperties cache = new CacheModuleProperties();
/**
* 任务调度模块配置
*/
private TaskSchedulerModuleProperties scheduler = new TaskSchedulerModuleProperties();
/**
* 性能监控模块配置
*/
private PerformanceMonitorModuleProperties performance = new PerformanceMonitorModuleProperties();
/**
* 安全审计模块配置
*/
private SecurityAuditModuleProperties security = new SecurityAuditModuleProperties();
/**
* 日志审计模块配置属性
*/
@Data
public static class LogModuleProperties {
private boolean enabled = true;
private int retentionDays = 30; // 日志保留天数
private String storagePath = "./logs"; // 日志存储路径
}
/**
* 结果校验模块配置属性
*/
@Data
public static class CheckModuleProperties {
private boolean enabled = true;
private boolean strictMode = false; // 严格模式,校验失败时抛出异常
private boolean retryOnFailure = false; // 校验失败时是否重试
private String algorithm = "SHA-256"; // 校验算法
}
/**
* 任务可视化模块配置属性
*/
@Data
public static class MonitorModuleProperties {
private boolean enabled = true;
private int refreshInterval = 5000; // 监控刷新间隔(毫秒)
private boolean enableGraph = true; // 是否启用依赖图
}
/**
* 降级兜底模块配置属性
*/
@Data
public static class FallbackModuleProperties {
private boolean enabled = true;
private int maxRetries = 3; // 最大重试次数
private long retryDelay = 1000; // 重试延迟(毫秒)
private String defaultStrategy = "cache"; // 默认降级策略
}
/**
* 缓存模块配置属性
*/
@Data
public static class CacheModuleProperties {
private boolean enabled = true;
private String cacheType = "caffeine"; // caffeine, redis, ehcache
private int maxSize = 1000;
private long taskCacheTtl = 300000L; // 任务缓存TTL: 5分钟
private long resultCacheTtl = 600000L; // 结果缓存TTL: 10分钟
private long configCacheTtl = 3600000L; // 配置缓存TTL: 1小时
private boolean enableStatistics = true;
private boolean enableEviction = true;
}
/**
* 任务调度模块配置属性
*/
@Data
public static class TaskSchedulerModuleProperties {
private boolean enabled = true;
private String strategy = "priority"; // priority, round-robin, weighted
private int maxConcurrentTasks = 10;
private long taskTimeout = 300000L; // 5分钟
}
/**
* 性能监控模块配置属性
*/
@Data
public static class PerformanceMonitorModuleProperties {
private boolean enabled = true;
private int collectionInterval = 5000; // 5秒
private boolean enableMetrics = true;
private boolean enableTracing = true;
private String storagePath = "./metrics";
}
/**
* 安全审计模块配置属性
*/
@Data
public static class SecurityAuditModuleProperties {
private boolean enabled = true;
private boolean logEnabled = true;
private boolean logTaskAccess = true;
private boolean logDataAccess = true;
private String auditLogPath = "./audit";
private int retentionDays = 30;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 模块测试服务,用于验证模块功能和热更新能力
*/
@Slf4j
@Service
public class ModuleTestService {
@Autowired
private ModuleProperties moduleProperties;
@Autowired
private LogModuleAutoConfig logModule;
@Autowired
private CheckModuleAutoConfig checkModule;
@Autowired
private MonitorModuleAutoConfig monitorModule;
@Autowired
private FallbackModuleAutoConfig fallbackModule;
/**
* 测试所有模块的基本功能
*/
public void testAllModules() {
log.info("开始测试所有模块功能...");
// 测试模块属性
testModuleProperties();
// 模拟发布任务事件以触发各模块逻辑
testModuleEventHandling();
log.info("所有模块功能测试完成");
}
/**
* 测试模块属性配置
*/
private void testModuleProperties() {
log.info("测试模块属性配置:");
log.info(" - 模块总开关: {}", moduleProperties.isEnabled());
log.info(" - 日志模块: enabled={}, retentionDays={}, storagePath={}",
moduleProperties.getLog().isEnabled(),
moduleProperties.getLog().getRetentionDays(),
moduleProperties.getLog().getStoragePath());
log.info(" - 校验模块: enabled={}, strictMode={}, algorithm={}",
moduleProperties.getCheck().isEnabled(),
moduleProperties.getCheck().isStrictMode(),
moduleProperties.getCheck().getAlgorithm());
log.info(" - 监控模块: enabled={}, refreshInterval={}, enableGraph={}",
moduleProperties.getMonitor().isEnabled(),
moduleProperties.getMonitor().getRefreshInterval(),
moduleProperties.getMonitor().isEnableGraph());
log.info(" - 降级模块: enabled={}, maxRetries={}, retryDelay={}, defaultStrategy={}",
moduleProperties.getFallback().isEnabled(),
moduleProperties.getFallback().getMaxRetries(),
moduleProperties.getFallback().getRetryDelay(),
moduleProperties.getFallback().getDefaultStrategy());
}
/**
* 测试模块事件处理
*/
private void testModuleEventHandling() {
log.info("测试模块事件处理...");
// 创建模拟事件来触发各模块的事件监听器
AgentTaskEvent startEvent = new AgentTaskEvent(this, "test-task-001", "STARTED");
AgentTaskEvent completedEvent = new AgentTaskEvent(this, "test-task-001", "COMPLETED");
AgentTaskEvent failedEvent = new AgentTaskEvent(this, "test-task-002", "FAILED");
log.info("模拟任务开始事件: {}", startEvent.getTaskId());
log.info("模拟任务完成事件: {}", completedEvent.getTaskId());
log.info("模拟任务失败事件: {}", failedEvent.getTaskId());
log.info("模块事件处理测试完成");
}
/**
* 测试热更新功能
*/
public void testHotReload() {
log.info("测试配置热更新功能...");
// 显示当前配置
log.info("当前日志模块状态: {}", moduleProperties.getLog().isEnabled());
log.info("当前校验模块状态: {}", moduleProperties.getCheck().isEnabled());
log.info("当前监控模块状态: {}", moduleProperties.getMonitor().isEnabled());
log.info("当前降级模块状态: {}", moduleProperties.getFallback().isEnabled());
log.info("热更新功能测试完成 - 配置将在运行时动态更新");
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 任务可视化模块自动配置
* 基于@ConditionalOnProperty注解实现模块开关控制
* 结合@RefreshScope支持配置热更新
* 通过自定义@Endpoint暴露任务监控端点
*/
@Slf4j
@Configuration
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.monitor", name = "enabled", havingValue = "true", matchIfMissing = false)
public class MonitorModuleAutoConfig {
/**
* 注册任务监控端点
*/
@Bean
public AgentMonitorEndpoint agentMonitorEndpoint() {
return new AgentMonitorEndpoint();
}
}
/**
* 任务监控端点,提供任务执行进度和依赖图
*/
@Slf4j
@Component
@Endpoint(id = "agent-monitor")
class AgentMonitorEndpoint {
@Autowired
private ModuleProperties moduleProperties;
// 任务执行状态缓存
private final Map<String, TaskExecutionStatus> taskStatusCache = new ConcurrentHashMap<>();
public AgentMonitorEndpoint() {
}
/**
* 获取任务依赖图(DOT格式)
*/
@ReadOperation
public Map<String, Object> getTaskDependencyGraph() {
Map<String, Object> result = new java.util.HashMap<>();
try {
// 这里生成任务依赖图的DOT格式表示
String dotGraph = generateTaskDependencyDotGraph();
result.put("graphType", "dot");
result.put("graph", dotGraph);
result.put("timestamp", System.currentTimeMillis());
result.put("status", "success");
} catch (Exception e) {
log.error("生成任务依赖图失败", e);
result.put("status", "error");
result.put("error", e.getMessage());
}
return result;
}
/**
* 获取任务执行统计信息
*/
@ReadOperation
public Map<String, Object> getTaskStatistics() {
Map<String, Object> result = new java.util.HashMap<>();
try {
// 统计任务执行情况
Map<String, Long> stats = calculateTaskStatistics();
result.put("statistics", stats);
result.put("timestamp", System.currentTimeMillis());
result.put("status", "success");
} catch (Exception e) {
log.error("获取任务统计信息失败", e);
result.put("status", "error");
result.put("error", e.getMessage());
}
return result;
}
/**
* 生成任务依赖图的DOT格式表示
*/
private String generateTaskDependencyDotGraph() {
StringBuilder dot = new StringBuilder();
dot.append("digraph TaskDependencies {\n");
dot.append(" rankdir=TB;\n"); // 从上到下布局
dot.append(" node [shape=box, style=filled, color=lightblue];\n");
// 这里应该根据实际的任务依赖关系生成图
// 示例:添加节点和边
dot.append(" MainTask [label=\"主任务\\n(进行中)\"];\n");
dot.append(" SubTask1 [label=\"子任务1\\n(已完成)\"];\n");
dot.append(" SubTask2 [label=\"子任务2\\n(等待中)\"];\n");
dot.append(" SubTask3 [label=\"子任务3\\n(失败)\"];\n");
dot.append(" MainTask -> SubTask1;\n");
dot.append(" MainTask -> SubTask2;\n");
dot.append(" MainTask -> SubTask3;\n");
dot.append("}\n");
return dot.toString();
}
/**
* 计算任务统计信息
*/
private Map<String, Long> calculateTaskStatistics() {
Map<String, Long> stats = new java.util.HashMap<>();
// 这里应该从实际的任务状态存储中获取统计信息
// 示例数据
stats.put("totalTasks", 100L);
stats.put("completedTasks", 85L);
stats.put("runningTasks", 10L);
stats.put("failedTasks", 5L);
stats.put("successRate", 85L); // 成功率百分比
return stats;
}
/**
* 更新任务执行状态
*/
public void updateTaskStatus(String taskId, String status, String details) {
TaskExecutionStatus taskStatus = new TaskExecutionStatus(taskId, status, details, System.currentTimeMillis());
taskStatusCache.put(taskId, taskStatus);
}
/**
* 获取所有任务状态
*/
public Map<String, TaskExecutionStatus> getAllTaskStatuses() {
return new java.util.HashMap<>(taskStatusCache);
}
}
/**
* 任务执行状态类
*/
class TaskExecutionStatus {
private final String taskId;
private final String status; // RUNNING, COMPLETED, FAILED, WAITING
private final String details;
private final long timestamp;
public TaskExecutionStatus(String taskId, String status, String details, long timestamp) {
this.taskId = taskId;
this.status = status;
this.details = details;
this.timestamp = timestamp;
}
// Getters
public String getTaskId() { return taskId; }
public String getStatus() { return status; }
public String getDetails() { return details; }
public long getTimestamp() { return timestamp; }
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* 性能监控模块
* 收集和分析系统性能指标
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.performance", name = "enabled", havingValue = "true", matchIfMissing = false)
@Endpoint(id = "agent-performance")
public class PerformanceMonitorModule {
@Autowired
private ModuleProperties moduleProperties;
// 性能指标存储
private final Map<String, PerformanceMetric> metrics = new ConcurrentHashMap<>();
private final AtomicLong totalTasks = new AtomicLong(0);
private final AtomicLong completedTasks = new AtomicLong(0);
private final AtomicLong failedTasks = new AtomicLong(0);
private final AtomicLong totalExecutionTime = new AtomicLong(0);
/**
* 监听任务开始事件
*/
@Async
@EventListener
public void handleTaskStartedEvent(AgentTaskEvent event) {
if ("STARTED".equals(event.getEventType()) || "SUBTASK_STARTED".equals(event.getEventType())) {
PerformanceMetric metric = new PerformanceMetric(event.getTaskId(), System.currentTimeMillis());
metrics.put(event.getTaskId(), metric);
totalTasks.incrementAndGet();
log.debug("开始监控任务性能: {}", event.getTaskId());
}
}
/**
* 监听任务完成事件
*/
@Async
@EventListener
public void handleTaskCompletedEvent(AgentTaskEvent event) {
if ("COMPLETED".equals(event.getEventType()) || "SUBTASK_COMPLETED".equals(event.getEventType())) {
PerformanceMetric metric = metrics.get(event.getTaskId());
if (metric != null) {
long endTime = System.currentTimeMillis();
long executionTime = endTime - metric.getStartTime();
metric.setEndTime(endTime);
metric.setExecutionTime(executionTime);
completedTasks.incrementAndGet();
totalExecutionTime.addAndGet(executionTime);
log.debug("任务执行完成: {}, 执行时间: {}ms", event.getTaskId(), executionTime);
}
}
}
/**
* 监听任务失败事件
*/
@Async
@EventListener
public void handleTaskFailedEvent(AgentTaskEvent event) {
if ("FAILED".equals(event.getEventType()) || "SUBTASK_FAILED".equals(event.getEventType())) {
PerformanceMetric metric = metrics.get(event.getTaskId());
if (metric != null) {
long endTime = System.currentTimeMillis();
long executionTime = endTime - metric.getStartTime();
metric.setEndTime(endTime);
metric.setExecutionTime(executionTime);
metric.setSuccess(false);
failedTasks.incrementAndGet();
log.debug("任务执行失败: {}, 执行时间: {}ms", event.getTaskId(), executionTime);
}
}
}
/**
* 获取性能监控指标
*/
@ReadOperation
public Map<String, Object> getPerformanceMetrics() {
Map<String, Object> result = new java.util.HashMap<>();
try {
// 计算平均执行时间
long totalCompleted = completedTasks.get();
long avgExecutionTime = totalCompleted > 0 ? totalExecutionTime.get() / totalCompleted : 0;
result.put("totalTasks", totalTasks.get());
result.put("completedTasks", completedTasks.get());
result.put("failedTasks", failedTasks.get());
result.put("successRate", totalCompleted > 0 ? (double) completedTasks.get() / totalCompleted * 100 : 0);
result.put("avgExecutionTime", avgExecutionTime);
result.put("activeTasks", metrics.size());
result.put("timestamp", System.currentTimeMillis());
result.put("status", "success");
} catch (Exception e) {
log.error("获取性能指标失败", e);
result.put("status", "error");
result.put("error", e.getMessage());
}
return result;
}
/**
* 清空性能指标
*/
public void clearMetrics() {
metrics.clear();
totalTasks.set(0);
completedTasks.set(0);
failedTasks.set(0);
totalExecutionTime.set(0);
log.debug("清空性能监控指标");
}
/**
* 获取任务执行时间统计
*/
public long getAverageExecutionTime() {
long totalCompleted = completedTasks.get();
return totalCompleted > 0 ? totalExecutionTime.get() / totalCompleted : 0;
}
/**
* 获取任务成功率
*/
public double getSuccessRate() {
long total = totalTasks.get();
return total > 0 ? (double) completedTasks.get() / total * 100 : 0;
}
}
/**
* 性能指标类
*/
class PerformanceMetric {
private final String taskId;
private final long startTime;
private long endTime;
private long executionTime;
private boolean success = true;
public PerformanceMetric(String taskId, long startTime) {
this.taskId = taskId;
this.startTime = startTime;
}
// Getters and setters
public String getTaskId() { return taskId; }
public long getStartTime() { return startTime; }
public long getEndTime() { return endTime; }
public void setEndTime(long endTime) { this.endTime = endTime; }
public long getExecutionTime() { return executionTime; }
public void setExecutionTime(long executionTime) { this.executionTime = executionTime; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 安全审计模块
* 统一处理安全相关事件
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.security", name = "enabled", havingValue = "true", matchIfMissing = false)
public class SecurityAuditModule {
@Autowired
private ModuleProperties moduleProperties;
// 安全事件记录
private final Map<String, SecurityEventRecord> securityEvents = new ConcurrentHashMap<>();
/**
* 监听任务执行事件,进行安全审计
*/
@Async
@EventListener
public void handleTaskEvent(AgentTaskEvent event) {
if (moduleProperties.getSecurity().isLogTaskAccess()) {
logSecurityEvent("TASK_ACCESS", event.getTaskId(), "Task " + event.getEventType());
}
}
/**
* 记录安全事件
*/
private void logSecurityEvent(String eventType, String resource, String details) {
SecurityEventRecord record = new SecurityEventRecord(
eventType,
resource,
details,
System.currentTimeMillis()
);
securityEvents.put(record.getTimestamp() + "_" + resource, record);
if (moduleProperties.getSecurity().isLogEnabled()) {
log.info("Security Audit - Type: {}, Resource: {}, Details: {}",
eventType, resource, details);
}
}
/**
* 获取安全事件统计
*/
public Map<String, Long> getSecurityEventStats() {
Map<String, Long> stats = new java.util.HashMap<>();
for (SecurityEventRecord record : securityEvents.values()) {
String eventType = record.getEventType();
stats.put(eventType, stats.getOrDefault(eventType, 0L) + 1);
}
return stats;
}
/**
* 清空安全事件记录
*/
public void clearSecurityEvents() {
securityEvents.clear();
log.debug("清空安全审计事件记录");
}
/**
* 获取最近的安全事件
*/
public java.util.List<SecurityEventRecord> getRecentSecurityEvents(int limit) {
return securityEvents.values().stream()
.sorted((a, b) -> Long.compare(b.getTimestamp(), a.getTimestamp()))
.limit(limit)
.toList();
}
}
/**
* 安全事件记录类
*/
class SecurityEventRecord {
private final String eventType;
private final String resource;
private final String details;
private final long timestamp;
public SecurityEventRecord(String eventType, String resource, String details, long timestamp) {
this.eventType = eventType;
this.resource = resource;
this.details = details;
this.timestamp = timestamp;
}
// Getters
public String getEventType() { return eventType; }
public String getResource() { return resource; }
public String getDetails() { return details; }
public long getTimestamp() { return timestamp; }
}
/**
* 安全审计模块属性
*/
class SecurityAuditModuleProperties {
private boolean enabled = true;
private boolean logEnabled = true;
private boolean logTaskAccess = true;
private boolean logDataAccess = true;
private String auditLogPath = "./audit";
private int retentionDays = 30;
// Getters and setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public boolean isLogEnabled() { return logEnabled; }
public void setLogEnabled(boolean logEnabled) { this.logEnabled = logEnabled; }
public boolean isLogTaskAccess() { return logTaskAccess; }
public void setLogTaskAccess(boolean logTaskAccess) { this.logTaskAccess = logTaskAccess; }
public boolean isLogDataAccess() { return logDataAccess; }
public void setLogDataAccess(boolean logDataAccess) { this.logDataAccess = logDataAccess; }
public String getAuditLogPath() { return auditLogPath; }
public void setAuditLogPath(String auditLogPath) { this.auditLogPath = auditLogPath; }
public int getRetentionDays() { return retentionDays; }
public void setRetentionDays(int retentionDays) { this.retentionDays = retentionDays; }
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 简单的命令路由实现
*/
@Slf4j
@Component
public class SimpleCommandRouter implements CommandRouter {
@Autowired
private MasterAgent masterAgent;
@Autowired
private AgentTaskStatusRepository taskStatusRepository;
@Override
public void route(CooperateCommand command) {
log.info("路由协同命令: messageId={}, taskId={}, action={}, sender={}, receiver={}",
command.getMessageId(), command.getTaskId(), command.getAction(),
command.getSenderAgentId(), command.getReceiverAgentId());
// 根据动作类型进行路由处理
switch (command.getAction()) {
case "TASK_ASSIGN":
handleTaskAssign(command);
break;
case "RESULT_REPORT":
handleResultReport(command);
break;
case "FAIL_RETRY":
handleFailRetry(command);
break;
default:
log.warn("未知的命令动作类型: {}", command.getAction());
break;
}
}
private void handleTaskAssign(CooperateCommand command) {
log.info("处理任务分配命令: taskId={}, receiver={}", command.getTaskId(), command.getReceiverAgentId());
try {
// 从命令参数中获取主任务信息
String taskId = command.getTaskId();
// 从数据库获取任务详情
AgentTaskStatus taskStatus = taskStatusRepository.selectById(taskId);
if (taskStatus == null) {
log.error("任务不存在: taskId={}", taskId);
return;
}
// 更新任务状态为RUNNING
taskStatus.setStatus("RUNNING");
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
// 构建主任务对象并执行
MainTask mainTask = buildMainTaskFromStatus(taskStatus);
// 通过MasterAgent执行主任务
AgentResult result = masterAgent.executeMainTask(mainTask);
log.info("任务分配执行完成: taskId={}, success={}", taskId, result.isSuccess());
// 更新任务状态
taskStatus.setStatus(result.isSuccess() ? "SUCCESS" : "FAIL");
taskStatus.setResult(result.getData() != null ? result.getData().toString() : "");
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
} catch (Exception e) {
log.error("处理任务分配命令失败: taskId={}", command.getTaskId(), e);
}
}
/**
* 从任务状态构建主任务对象
*/
private MainTask buildMainTaskFromStatus(AgentTaskStatus status) {
MainTask mainTask = new MainTask();
mainTask.setTaskId(status.getTaskId());
// 这里需要从状态的result字段反序列化出原始任务信息
// 实际实现中可能需要更复杂的反序列化逻辑
mainTask.setSubTasks(List.of()); // 实际实现需要从状态中重建子任务
return mainTask;
}
private void handleResultReport(CooperateCommand command) {
log.info("处理结果报告命令: taskId={}, receiver={}", command.getTaskId(), command.getReceiverAgentId());
try {
String taskId = command.getTaskId();
String subTaskId = (String) command.getParams().get("subTaskId");
Object resultData = command.getParams().get("result");
String resultHash = command.getResultHash();
// 更新子任务状态到数据库
AgentTaskStatus subTaskStatus = new AgentTaskStatus();
subTaskStatus.setTaskId(subTaskId);
subTaskStatus.setStatus("SUCCESS");
subTaskStatus.setResult(resultData != null ? resultData.toString() : "");
subTaskStatus.setResultHash(resultHash);
subTaskStatus.setUpdateTime(java.time.LocalDateTime.now());
// 检查是否存在,如果存在则更新,否则插入
AgentTaskStatus existingStatus = taskStatusRepository.selectById(subTaskId);
if (existingStatus != null) {
taskStatusRepository.updateById(subTaskStatus);
} else {
taskStatusRepository.insert(subTaskStatus);
}
log.info("子任务结果报告处理完成: subTaskId={}, resultHash={}", subTaskId, resultHash);
} catch (Exception e) {
log.error("处理结果报告命令失败: taskId={}", command.getTaskId(), e);
}
}
private void handleFailRetry(CooperateCommand command) {
log.info("处理失败重试命令: taskId={}, receiver={}", command.getTaskId(), command.getReceiverAgentId());
try {
String taskId = command.getTaskId();
// 获取任务状态
AgentTaskStatus taskStatus = taskStatusRepository.selectById(taskId);
if (taskStatus == null) {
log.error("重试任务不存在: taskId={}", taskId);
return;
}
// 检查重试次数是否超过限制
Integer retryCount = taskStatus.getRetryCount();
if (retryCount == null) {
retryCount = 0;
}
if (retryCount >= 3) { // 最多重试3次
log.warn("任务重试次数已达到上限: taskId={}, retryCount={}", taskId, retryCount);
taskStatus.setStatus("FAIL");
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
return;
}
// 更新重试次数
retryCount++;
taskStatus.setRetryCount(retryCount);
taskStatus.setStatus("READY"); // 重置状态为准备执行
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
// 保存更新后的状态
taskStatusRepository.updateById(taskStatus);
// 重新执行任务
MainTask mainTask = buildMainTaskFromStatus(taskStatus);
AgentResult result = masterAgent.executeMainTask(mainTask);
log.info("任务重试执行完成: taskId={}, success={}, retryCount={}",
taskId, result.isSuccess(), retryCount);
// 更新最终状态
taskStatus.setStatus(result.isSuccess() ? "SUCCESS" : "FAIL");
taskStatus.setResult(result.getData() != null ? result.getData().toString() : "");
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
} catch (Exception e) {
log.error("处理失败重试命令失败: taskId={}", command.getTaskId(), e);
}
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.Set;
/**
* 简单的幂等检查器实现
* 在生产环境中,建议使用Redis等外部存储来保证跨实例的幂等性
*/
@Component
public class SimpleIdempotentChecker implements IdempotentChecker {
private final Set<String> processedMessageIds = new HashSet<>();
@Override
public boolean check(String messageId) {
// 如果消息ID已存在,说明已处理过,返回false
if (processedMessageIds.contains(messageId)) {
return false;
}
// 添加到已处理集合,并返回true表示可以处理
processedMessageIds.add(messageId);
return true;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
/**
* 从Agent通用接口
*/
public interface SlaveAgent {
AgentResult execute(SubTask subTask);
/**
* 降级方法,当执行失败时触发
*/
default AgentResult fallback(SubTask subTask, Exception e) {
AgentResult result = new AgentResult();
result.setSuccess(false);
result.setMessage("Fallback triggered: " + e.getMessage());
result.setData(subTask.getFallbackData());
return result;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.retry.RetryRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.stream.Collectors;
/**
* 从Agent自动配置类,为所有从Agent添加熔断重试能力
*/
@Configuration
public class SlaveAgentAutoConfig {
@Bean
public List<SlaveAgent> slaveAgents(List<SlaveAgent> rawAgents,
CircuitBreakerRegistry circuitBreakerRegistry,
RetryRegistry retryRegistry) {
return rawAgents.stream()
.map(agent -> new CircuitBreakerSlaveAgent(agent, circuitBreakerRegistry, retryRegistry))
.collect(Collectors.toList());
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.Data;
import java.util.List;
import java.util.Map;
/**
* 子任务数据结构
*/
@Data
public class SubTask {
private String subTaskId;
private String taskId;
private String toolTag;
private List<String> dependOn;
private Map<String, Object> params;
private Object fallbackData;
public SubTask() {
this.dependOn = List.of();
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 任务依赖解析器
*/
@Component
public class TaskDependencyResolver {
/**
* 解析子任务依赖关系,生成执行批次
* 同批次任务可并行执行,跨批次任务串行执行
*/
public List<List<SubTask>> resolve(List<SubTask> subTasks) {
List<List<SubTask>> batches = new ArrayList<>();
Set<String> completedTaskIds = new HashSet<>();
// 初始批次:无依赖的子任务
List<SubTask> firstBatch = subTasks.stream()
.filter(t -> t.getDependOn().isEmpty())
.collect(Collectors.toList());
batches.add(firstBatch);
completedTaskIds.addAll(firstBatch.stream().map(SubTask::getSubTaskId).collect(Collectors.toList()));
// 递归解析后续批次
while (completedTaskIds.size() < subTasks.size()) {
List<SubTask> nextBatch = subTasks.stream()
.filter(t -> !completedTaskIds.contains(t.getSubTaskId()))
.filter(t -> completedTaskIds.containsAll(t.getDependOn()))
.collect(Collectors.toList());
if (nextBatch.isEmpty()) {
throw new IllegalStateException("Circular dependency detected in subtasks");
}
batches.add(nextBatch);
completedTaskIds.addAll(nextBatch.stream().map(SubTask::getSubTaskId).collect(Collectors.toList()));
}
return batches;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 任务调度模块
* 实现智能任务调度策略,支持多种调度算法
*/
@Slf4j
@Component
@RefreshScope
@ConditionalOnProperty(prefix = "agent.module.scheduler", name = "enabled", havingValue = "true", matchIfMissing = false)
public class TaskSchedulerModule {
@Autowired
private ModuleProperties moduleProperties;
@Autowired
private MasterAgent masterAgent;
@Autowired
private AgentTaskStatusRepository taskStatusRepository;
// 任务队列
private final PriorityBlockingQueue<ScheduledTask> taskQueue = new PriorityBlockingQueue<>();
private final AtomicInteger taskCounter = new AtomicInteger(0);
/**
* 监听任务创建事件,进行任务调度
*/
@Async
@EventListener
public void handleTaskCreatedEvent(AgentTaskEvent event) {
if ("TASK_CREATED".equals(event.getEventType())) {
scheduleTask(event.getTaskId());
}
}
/**
* 调度任务
*/
private void scheduleTask(String taskId) {
String strategy = moduleProperties.getScheduler().getStrategy();
log.debug("使用调度策略: {} 调度任务: {}", strategy, taskId);
ScheduledTask task = new ScheduledTask(taskId, System.currentTimeMillis(), getNextPriority());
switch (strategy) {
case "priority":
scheduleByPriority(task);
break;
case "round-robin":
scheduleRoundRobin(task);
break;
case "weighted":
scheduleWeighted(task);
break;
default:
scheduleByPriority(task); // 默认策略
}
}
/**
* 优先级调度
*/
private void scheduleByPriority(ScheduledTask task) {
taskQueue.offer(task);
log.debug("任务按优先级调度: {}", task.getTaskId());
}
/**
* 轮询调度
*/
private void scheduleRoundRobin(ScheduledTask task) {
// TODO 简化实现:直接添加到队列
taskQueue.offer(task);
log.debug("任务按轮询调度: {}", task.getTaskId());
}
/**
* 加权调度
*/
private void scheduleWeighted(ScheduledTask task) {
// 根据权重调整优先级
int weight = calculateWeight(task.getTaskId());
task.setPriority(task.getPriority() + weight);
taskQueue.offer(task);
log.debug("任务按权重调度: {}, 权重: {}", task.getTaskId(), weight);
}
/**
* 计算任务权重
*/
private int calculateWeight(String taskId) {
// 根据任务特征计算权重(示例实现)
return taskId.hashCode() % 10;
}
/**
* 获取下一个优先级
*/
private int getNextPriority() {
return taskCounter.incrementAndGet();
}
/**
* 执行下一个任务
*/
public CompletableFuture<AgentResult> executeNextTask() {
ScheduledTask task = taskQueue.poll();
if (task != null) {
log.debug("执行调度任务: {}", task.getTaskId());
// 从数据库获取任务详情
AgentTaskStatus taskStatus = taskStatusRepository.selectById(task.getTaskId());
if (taskStatus == null) {
log.warn("任务不存在: {}", task.getTaskId());
return CompletableFuture.completedFuture(new AgentResult(false, "Task not found: " + task.getTaskId(), null));
}
// 构建主任务对象并执行
MainTask mainTask = buildMainTaskFromStatus(taskStatus);
// 通过MasterAgent异步执行主任务
return CompletableFuture.supplyAsync(() -> {
try {
AgentResult result = masterAgent.executeMainTask(mainTask);
// 更新任务状态
taskStatus.setStatus(result.isSuccess() ? "SUCCESS" : "FAIL");
taskStatus.setResult(result.getData() != null ? result.getData().toString() : "");
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
log.info("任务执行完成: taskId={}, success={}", task.getTaskId(), result.isSuccess());
return result;
} catch (Exception e) {
log.error("任务执行失败: taskId={}", task.getTaskId(), e);
// 更新失败状态
taskStatus.setStatus("FAIL");
taskStatus.setResult("执行异常: " + e.getMessage());
taskStatus.setUpdateTime(java.time.LocalDateTime.now());
taskStatusRepository.updateById(taskStatus);
return new AgentResult(false, "Task execution failed: " + e.getMessage(), null);
}
});
}
return CompletableFuture.completedFuture(new AgentResult(false, "No task to execute", null));
}
/**
* 从任务状态构建主任务对象
*/
private MainTask buildMainTaskFromStatus(AgentTaskStatus status) {
MainTask mainTask = new MainTask();
mainTask.setTaskId(status.getTaskId());
// 从状态数据中重建任务信息
// 这里需要根据实际存储的格式进行反序列化
// 示例:如果result字段包含序列化的子任务信息,需要反序列化
return mainTask;
}
/**
* 获取当前队列大小
*/
public int getQueueSize() {
return taskQueue.size();
}
/**
* 清空任务队列
*/
public void clearQueue() {
taskQueue.clear();
log.debug("清空任务调度队列");
}
}
/**
* 调度任务类
*/
class ScheduledTask implements Comparable<ScheduledTask> {
private final String taskId;
private final long timestamp;
private int priority;
public ScheduledTask(String taskId, long timestamp, int priority) {
this.taskId = taskId;
this.timestamp = timestamp;
this.priority = priority;
}
@Override
public int compareTo(ScheduledTask other) {
// 优先级高的排在前面(数值小的优先级高)
return Integer.compare(this.priority, other.priority);
}
// Getters and setters
public String getTaskId() { return taskId; }
public long getTimestamp() { return timestamp; }
public int getPriority() { return priority; }
public void setPriority(int priority) { this.priority = priority; }
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import jakarta.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* 任务状态服务,处理故障自愈逻辑
*/
@Slf4j
@Service
public class TaskStatusService {
@Autowired
private AgentTaskStatusRepository taskStatusRepository;
@Autowired
private MasterAgent masterAgent;
/**
* 超时兜底:主Agent通过定时任务扫描RUNNING状态且超过超时时间的任务,
* 自动触发重试或标记为失败
*/
@Scheduled(fixedRate = 5000) // 每5秒检查一次
public void checkTimeoutTasks() {
List<String> timeoutTaskIds = taskStatusRepository.findTimeoutTasks(System.currentTimeMillis());
for (String taskId : timeoutTaskIds) {
log.info("发现超时任务,开始重试: {}", taskId);
retryTask(taskId);
}
}
/**
* 崩溃恢复:应用重启后,主Agent自动扫描RUNNING状态的任务,
* 基于数据库中存储的状态数据恢复执行
*/
@PostConstruct
public void recoverUnfinishedTasks() {
List<AgentTaskStatus> runningTasks = taskStatusRepository.findByStatus("RUNNING");
log.info("应用启动,发现{}个未完成的任务,开始恢复执行", runningTasks.size());
for (AgentTaskStatus status : runningTasks) {
MainTask task = rebuildTaskFromStatus(status);
CompletableFuture.runAsync(() -> {
try {
masterAgent.executeMainTask(task);
} catch (Exception e) {
log.error("恢复任务执行失败: {}", status.getTaskId(), e);
}
});
}
}
/**
* 重试任务
*/
private void retryTask(String taskId) {
try {
// 更新重试次数
AgentTaskStatus status = taskStatusRepository.selectById(taskId);
if (status != null) {
int newRetryCount = (status.getRetryCount() != null ? status.getRetryCount() : 0) + 1;
status.setRetryCount(newRetryCount);
// 如果重试次数超过阈值,标记为失败
if (newRetryCount > 3) { // 最多重试3次
status.setStatus("FAIL");
taskStatusRepository.updateById(status);
log.warn("任务{}重试次数超过阈值,标记为失败", taskId);
return;
}
// 更新状态并重新执行任务
status.setStatus("READY");
taskStatusRepository.updateById(status);
// 重新执行任务逻辑(这里需要根据实际情况实现)
log.info("任务{}重试,当前重试次数: {}", taskId, newRetryCount);
}
} catch (Exception e) {
log.error("重试任务失败: {}", taskId, e);
}
}
/**
* 从状态数据重建任务
*/
private MainTask rebuildTaskFromStatus(AgentTaskStatus status) {
MainTask task = new MainTask();
task.setTaskId(status.getTaskId());
// 从result字段中反序列化出原始任务数据
if (status.getResult() != null && !status.getResult().isEmpty()) {
try {
// 使用Jackson ObjectMapper进行JSON反序列化
com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
objectMapper.findAndRegisterModules(); // 注册JavaTimeModule等模块
// 尝试将result字段解析为MainTask对象
MainTask deserializedTask = objectMapper.readValue(status.getResult(), MainTask.class);
// 使用反序列化的数据更新任务,保留ID
task.setTaskName(deserializedTask.getTaskName());
task.setSubTasks(deserializedTask.getSubTasks());
task.setExpectedHash(deserializedTask.getExpectedHash());
task.setMasterAgentId(deserializedTask.getMasterAgentId());
log.debug("从状态数据成功重建任务: {}", status.getTaskId());
} catch (Exception e) {
log.error("从状态数据重建任务失败: taskId={}, result={}, error={}",
status.getTaskId(), status.getResult(), e.getMessage());
// 如果反序列化失败,使用默认值
task.setTaskName("Recovered Task - " + status.getTaskId());
task.setSubTasks(List.of());
task.setExpectedHash(status.getResultHash());
task.setMasterAgentId(null);
}
} else {
// 如果没有result数据,使用基本任务信息
task.setTaskName("Recovered Task - " + status.getTaskId());
task.setSubTasks(List.of());
task.setExpectedHash(status.getResultHash());
task.setMasterAgentId(null);
log.warn("任务状态中无结果数据,使用默认值重建任务: {}", status.getTaskId());
}
return task;
}
}
\ No newline at end of file
package pangea.hiagent.agent.data;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* 从Agent能力标签注解,用于标识Agent的工具能力
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface ToolTag {
/** 能力标识,如DB_QUERY/DATA_CALC/FORMAT_EXPORT */
String value();
}
\ No newline at end of file
......@@ -4,8 +4,6 @@ import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
import java.util.function.Consumer;
/**
* 统一的Agent处理器接口
* 为不同类型的Agent提供统一的处理接口
......
......@@ -458,6 +458,10 @@ public abstract class BaseAgentProcessor implements AgentProcessor {
if (tokenConsumer != null) {
// 对于流式处理,我们需要将RAG响应作为token发送
tokenConsumer.accept(ragResponse);
// 注意:这里不调用onComplete,因为RAG响应只是整个处理流程的一部分
// 不是所有通讯操作的最终完成
// 完整的onComplete调用应该在所有处理完成后进行
}
return ragResponse;
}
......
......@@ -14,7 +14,7 @@ import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.agent.data.WorkPanelEvent;
/**
* 简化的ReAct回调类
* ReAct回调类
*/
@Slf4j
@Component
......
......@@ -16,10 +16,9 @@ import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.common.utils.UserUtils;
import java.util.List;
import java.util.ArrayList;
import java.util.function.Consumer;
/**
* 简化的默认ReAct执行器实现
* 默认ReAct执行器实现
*/
@Slf4j
@Service
......
......@@ -5,7 +5,6 @@ import org.springframework.ai.chat.client.ChatClient;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.model.Agent;
import java.util.List;
import java.util.function.Consumer;
/**
* ReAct执行器接口
......
......@@ -6,6 +6,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.data.AgentResult;
import pangea.hiagent.agent.processor.AgentProcessor;
import pangea.hiagent.agent.processor.AgentProcessorFactory;
import pangea.hiagent.common.utils.UserUtils;
......@@ -28,18 +29,21 @@ public class AgentChatService {
private final AgentToolManager agentToolManager;
private final UserSseService userSseService;
private final pangea.hiagent.web.service.AgentService agentService;
private final MultiAgentTaskExecutionService multiAgentTaskExecutionService;
public AgentChatService(
ErrorHandlerService errorHandlerService,
AgentProcessorFactory agentProcessorFactory,
AgentToolManager agentToolManager,
UserSseService userSseService,
pangea.hiagent.web.service.AgentService agentService) {
pangea.hiagent.web.service.AgentService agentService,
MultiAgentTaskExecutionService multiAgentTaskExecutionService) {
this.errorHandlerService = errorHandlerService;
this.agentProcessorFactory = agentProcessorFactory;
this.agentToolManager = agentToolManager;
this.userSseService = userSseService;
this.agentService = agentService;
this.multiAgentTaskExecutionService = multiAgentTaskExecutionService;
}
// /**
......@@ -170,6 +174,30 @@ public class AgentChatService {
return;
}
// 检查是否需要启动多Agent协同
if (multiAgentTaskExecutionService.shouldStartCollaboration(chatRequest.getMessage())) {
log.info("检测到多Agent协同需求,开始执行协同任务");
// 执行多Agent协同任务
AgentResult result = multiAgentTaskExecutionService.processUserInputForCollaboration(chatRequest.getMessage(), agent);
if (result != null) {
// 发送协同任务结果
SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent,
chatRequest.toAgentRequest(agent.getId(), agent, agentToolManager), userId, null);
if (!userSseService.isEmitterCompleted(emitter)) {
// 发送多Agent协同任务结果
String resultContent = "多Agent协同任务执行结果:\n" + result.getData();
tokenEmitter.emitToken(resultContent);
tokenEmitter.emitComplete(resultContent);
log.info("AgentChatService.processChatRequest: 多Agent协同任务执行结果已发送");
tokenEmitter.complete();
}
return;
}
}
// 获取处理器前检查连接状态
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过获取处理器");
......@@ -196,7 +224,7 @@ public class AgentChatService {
AgentRequest request = chatRequest.toAgentRequest(agent.getId(), agent, agentToolManager);
// 创建新的SseTokenEmitter实例
SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent, request, userId, this::handleCompletion);
SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent, request, userId, (e, content) -> handleCompletion(e, content, agent, request, userId));
// 处理流式请求前再次检查连接状态
if (!userSseService.isEmitterCompleted(emitter)) {
......@@ -214,13 +242,28 @@ public class AgentChatService {
* 处理完成回调
*
* @param emitter SSE发射器
* @param fullContent 完整内容
*/
/**
* 处理完成回调
*
* @param emitter SSE发射器
* @param fullContent 完整内容
*/
private void handleCompletion(SseEmitter emitter, String fullContent) {
log.info("Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
}
/**
* 处理完成回调(带参数)
*
* @param emitter SSE发射器
* @param fullContent 完整内容
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param fullContent 完整内容
*/
private void handleCompletion(SseEmitter emitter, Agent agent, AgentRequest request, String userId,
String fullContent) {
private void handleCompletion(SseEmitter emitter, String fullContent, Agent agent, AgentRequest request, String userId) {
log.info("Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
// 保存对话记录 - 安全操作,不抛出异常
......
......@@ -142,7 +142,7 @@ public class ErrorHandlerService {
}
/**
* 处理聊天过程中的异常(简化版)
* 处理聊天过程中的异常(版)
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
......
......@@ -6,6 +6,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.Comparator;
/**
* 异常监控服务
......@@ -57,7 +58,7 @@ public class ExceptionMonitoringService {
while (exceptionDetails.size() > CLEANUP_THRESHOLD) {
// 找出最小的时间戳(最老的条目)
Long oldestTimestamp = exceptionDetails.keySet().stream()
.min(Long::compare)
.min(Comparator.naturalOrder())
.orElse(null);
if (oldestTimestamp != null) {
exceptionDetails.remove(oldestTimestamp);
......
package pangea.hiagent.agent.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.data.*;
import pangea.hiagent.web.service.CollaborationService;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* 多Agent协同任务执行服务
* 负责动态生成和执行多Agent协同任务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MultiAgentTaskExecutionService {
private final MasterAgent masterAgent;
private final CollaborationService collaborationService;
private final MultiAgentTaskGeneratorService taskGeneratorService;
/**
* 执行多Agent协同任务
*
* @param mainTask 主任务对象
* @return 任务执行结果
*/
public AgentResult executeCollaborationTask(MainTask mainTask) {
log.info("开始执行多Agent协同任务: {}", mainTask.getTaskId());
try {
// 通过MasterAgent执行主任务
AgentResult result = masterAgent.executeMainTask(mainTask);
log.info("多Agent协同任务执行完成: {},结果: {}", mainTask.getTaskId(), result.isSuccess());
return result;
} catch (Exception e) {
log.error("执行多Agent协同任务失败: {}", mainTask.getTaskId(), e);
return new AgentResult(false, "执行多Agent协同任务失败: " + e.getMessage(), null);
}
}
/**
* 异步执行多Agent协同任务
*
* @param mainTask 主任务对象
* @return CompletableFuture包装的任务执行结果
*/
public CompletableFuture<AgentResult> executeCollaborationTaskAsync(MainTask mainTask) {
return CompletableFuture.supplyAsync(() -> executeCollaborationTask(mainTask));
}
/**
* 根据用户输入分析并执行多Agent协同任务
*
* @param userInput 用户输入
* @param agent 当前Agent
* @return 任务执行结果
*/
public AgentResult processUserInputForCollaboration(String userInput, pangea.hiagent.model.Agent agent) {
log.info("处理用户输入以执行多Agent协同任务: {}", userInput);
// 检查是否需要启动多Agent协同
if (!taskGeneratorService.shouldStartCollaboration(userInput)) {
log.info("用户输入不需要多Agent协同,返回空结果");
return null;
}
// 生成协同任务
MainTask mainTask = taskGeneratorService.generateCollaborationTask(userInput, agent);
// 执行协同任务
return executeCollaborationTask(mainTask);
}
/**
* 异步处理用户输入并执行多Agent协同任务
*
* @param userInput 用户输入
* @param agent 当前Agent
* @return CompletableFuture包装的任务执行结果
*/
public CompletableFuture<AgentResult> processUserInputForCollaborationAsync(String userInput, pangea.hiagent.model.Agent agent) {
return CompletableFuture.supplyAsync(() -> processUserInputForCollaboration(userInput, agent));
}
/**
* 获取可用的从Agent列表
*
* @return 从Agent能力标签列表
*/
public List<String> getAvailableSlaveAgents() {
return taskGeneratorService.getAvailableSlaveAgents();
}
/**
* 检查是否需要启动多Agent协同
*
* @param userInput 用户输入
* @return 是否需要启动多Agent协同
*/
public boolean shouldStartCollaboration(String userInput) {
return taskGeneratorService.shouldStartCollaboration(userInput);
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.data.*;
import pangea.hiagent.model.Agent;
import java.util.*;
import java.util.regex.Pattern;
/**
* 多Agent协同任务生成服务
* 分析用户输入并判断是否需要启动多Agent协同任务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MultiAgentTaskGeneratorService {
private final MasterAgent masterAgent;
// 识别需要多Agent协同的关键词
private static final Set<String> COLLABORATION_KEYWORDS = Set.of(
"协作", "协同", "多个", "多步骤", "多阶段", "联合", "分工", "配合", "并行", "协调",
"multi-agent", "collaboration", "cooperation", "multiple agents", "teamwork", "coordinate"
);
// 识别任务类型的关键词
private static final Map<String, String> TASK_TYPE_KEYWORDS = Map.of(
"查询", "search",
"计算", "calculate",
"分析", "analyze",
"生成", "generate",
"处理", "process",
"转换", "transform",
"验证", "verify",
"汇总", "summarize"
);
/**
* 分析用户输入,判断是否需要启动多Agent协同任务
*
* @param userInput 用户输入的文本
* @return 是否需要启动多Agent协同任务
*/
public boolean shouldStartCollaboration(String userInput) {
if (userInput == null || userInput.trim().isEmpty()) {
return false;
}
String lowerInput = userInput.toLowerCase();
// 检查是否包含协作关键词
for (String keyword : COLLABORATION_KEYWORDS) {
if (lowerInput.contains(keyword.toLowerCase())) {
log.info("检测到协作关键词,需要启动多Agent协同任务: {}", keyword);
return true;
}
}
// 检查是否包含多个任务或复杂任务的描述
if (containsMultipleTasks(userInput)) {
log.info("检测到多个任务描述,需要启动多Agent协同任务");
return true;
}
// 检查是否包含复杂任务的描述
if (isComplexTask(userInput)) {
log.info("检测到复杂任务,可能需要多Agent协同");
return true;
}
return false;
}
/**
* 检查输入是否包含多个任务
*/
private boolean containsMultipleTasks(String input) {
// 检查是否有并列的动词或任务描述
String[] taskIndicators = {"然后", "并且", "同时", "另外", "接着", "之后", "以及", "and", "then", "also", "plus"};
for (String indicator : taskIndicators) {
if (input.contains(indicator)) {
return true;
}
}
// 检查是否有分号或逗号分隔的多个任务
String[] parts = input.split("[,;,;]");
return parts.length > 2; // 如果有超过2个部分,可能包含多个任务
}
/**
* 判断是否为复杂任务
*/
private boolean isComplexTask(String input) {
// 复杂任务通常包含多个步骤或需要多种能力
String[] complexityIndicators = {"需要", "要求", "步骤", "流程", "过程", "方法", "方案", "策略", "需要先.*再", "先.*后"};
for (String indicator : complexityIndicators) {
if (Pattern.compile(indicator, Pattern.CASE_INSENSITIVE).matcher(input).find()) {
return true;
}
}
// 检查字符长度,如果较长可能包含复杂任务
return input.length() > 50;
}
/**
* 根据用户输入生成多Agent协同任务
*
* @param userInput 用户输入的文本
* @param agent 当前选中的Agent
* @return 生成的多Agent协同任务
*/
public MainTask generateCollaborationTask(String userInput, Agent agent) {
MainTask mainTask = new MainTask();
mainTask.setTaskId("collab_" + System.currentTimeMillis());
mainTask.setTaskName("多Agent协同任务: " + userInput.substring(0, Math.min(30, userInput.length())));
mainTask.setMasterAgentId(agent.getId());
// 分析用户需求,生成子任务
List<SubTask> subTasks = analyzeUserInput(userInput);
mainTask.setSubTasks(subTasks);
log.info("生成多Agent协同任务: {},包含 {} 个子任务", mainTask.getTaskId(), subTasks.size());
return mainTask;
}
/**
* 分析用户输入,生成子任务列表
*/
private List<SubTask> analyzeUserInput(String userInput) {
List<SubTask> subTasks = new ArrayList<>();
// 简单的自然语言分析,识别可能的任务类型
String lowerInput = userInput.toLowerCase();
// 根据关键词识别可能需要的工具类型
List<String> requiredCapabilities = identifyCapabilities(userInput);
if (requiredCapabilities.isEmpty()) {
// 如果没有识别到特定能力,使用默认能力
SubTask defaultTask = new SubTask();
defaultTask.setSubTaskId("subtask_" + System.currentTimeMillis());
defaultTask.setTaskId("default_task");
defaultTask.setToolTag("GENERAL"); // 通用处理能力
defaultTask.setParams(Map.of("input", userInput));
subTasks.add(defaultTask);
} else {
// 为每种需要的能力创建一个子任务
for (int i = 0; i < requiredCapabilities.size(); i++) {
SubTask subTask = new SubTask();
subTask.setSubTaskId("subtask_" + System.currentTimeMillis() + "_" + i);
subTask.setTaskId("dynamic_task");
subTask.setToolTag(requiredCapabilities.get(i));
subTask.setParams(Map.of("input", userInput));
// 设置任务依赖关系(简单的线性依赖)
if (i > 0) {
subTask.setDependOn(List.of("subtask_" + System.currentTimeMillis() + "_" + (i - 1)));
}
subTasks.add(subTask);
}
}
return subTasks;
}
/**
* 识别用户需求中需要的Agent能力
*/
private List<String> identifyCapabilities(String userInput) {
List<String> capabilities = new ArrayList<>();
String lowerInput = userInput.toLowerCase();
// 检查是否存在数据库查询需求
if (lowerInput.contains("查询") || lowerInput.contains("search") ||
lowerInput.contains("database") || lowerInput.contains("db") ||
lowerInput.contains("数据")) {
capabilities.add("DB_QUERY");
}
// 检查是否存在计算需求
if (lowerInput.contains("计算") || lowerInput.contains("calculate") ||
lowerInput.contains("数学") || lowerInput.contains("算术") ||
lowerInput.contains("sum") || lowerInput.contains("count")) {
capabilities.add("CALCULATION");
}
// 检查是否存在分析需求
if (lowerInput.contains("分析") || lowerInput.contains("analyze") ||
lowerInput.contains("统计") || lowerInput.contains("统计") ||
lowerInput.contains("统计")) {
capabilities.add("ANALYSIS");
}
// 检查是否存在文档处理需求
if (lowerInput.contains("文档") || lowerInput.contains("document") ||
lowerInput.contains("文件") || lowerInput.contains("file") ||
lowerInput.contains("读取") || lowerInput.contains("read")) {
capabilities.add("DOCUMENT_PROCESSING");
}
// 检查是否存在网络请求需求
if (lowerInput.contains("获取") || lowerInput.contains("fetch") ||
lowerInput.contains("网络") || lowerInput.contains("web") ||
lowerInput.contains("url") || lowerInput.contains("链接")) {
capabilities.add("WEB_REQUEST");
}
// 如果没有识别到特定能力,但检测到复杂需求,则使用通用能力
if (capabilities.isEmpty() && (userInput.length() > 30 || containsMultipleTasks(userInput))) {
capabilities.add("GENERAL");
}
return capabilities;
}
/**
* 获取系统中可用的从Agent列表
*/
public List<String> getAvailableSlaveAgents() {
return new ArrayList<>(masterAgent.getAgentMap().keySet());
}
}
\ No newline at end of file
package pangea.hiagent.agent.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.model.Agent;
import pangea.hiagent.web.dto.AgentRequest;
import java.io.IOException;
import java.util.function.BiConsumer;
/**
* SSE Token发射器
* 专注于将token转换为SSE事件并发送
* 无状态设计,每次使用时创建新实例
* 用于向客户端发送流式响应
*/
@Slf4j
@RequiredArgsConstructor
public class SseTokenEmitter implements TokenConsumerWithCompletion {
private final UserSseService userSseService;
// 所有状态通过构造函数一次性传入
private final SseEmitter emitter;
private final Agent agent;
private final AgentRequest request;
private final String userId;
private final CompletionCallback completionCallback;
private final BiConsumer<SseEmitter, String> completionCallback;
/**
* 构造函数
* @param userSseService SSE服务
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
* 发送token
*/
public SseTokenEmitter(UserSseService userSseService, SseEmitter emitter, Agent agent,
AgentRequest request, String userId, CompletionCallback completionCallback) {
this.userSseService = userSseService;
this.emitter = emitter;
this.agent = agent;
this.request = request;
this.userId = userId;
this.completionCallback = completionCallback;
public void emitToken(String token) {
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过发送token");
return;
}
/**
* 无参构造函数,用于Spring容器初始化
*/
public SseTokenEmitter() {
this(null, null, null, null, null, null);
try {
emitter.send(SseEmitter.event()
.name("token")
.data("{\"token\":\"" + escapeJson(token) + "\"}"));
} catch (Exception e) {
log.error("发送token失败", e);
userSseService.closeEmitter(emitter);
}
}
/**
* 构造函数,用于Spring容器初始化(带UserSseService参数)
*/
public SseTokenEmitter(UserSseService userSseService) {
this(userSseService, null, null, null, null, null);
@Override
public void accept(String token) {
emitToken(token);
}
@Override
public void onComplete(String fullContent) {
emitComplete(fullContent);
complete();
}
/**
* 创建新的SseTokenEmitter实例
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
* @return 新的SseTokenEmitter实例
* 发送完整内容
*/
public SseTokenEmitter createNewInstance(SseEmitter emitter, Agent agent, AgentRequest request,
String userId, CompletionCallback completionCallback) {
return new SseTokenEmitter(userSseService, emitter, agent, request, userId, completionCallback);
public void emitComplete(String content) {
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过发送完成事件");
return;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
try {
if (emitter != null && userSseService.isEmitterValidSafe(emitter)) {
// 检查是否是错误消息(以[错误]或[ERROR]开头)
if (token != null && (token.startsWith("[错误]") || token.startsWith("[ERROR]"))) {
// 发送标准错误事件而不是纯文本
userSseService.sendErrorEvent(emitter, token);
} else {
// 使用SSE标准事件格式发送token,以JSON格式确保转义序列正确处理
userSseService.sendTokenEvent(emitter, token);
}
} else {
log.debug("SSE emitter已无效,跳过发送token");
}
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送token,SSE emitter已关闭: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送token,IO异常: {}", e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
emitter.send(SseEmitter.event()
.name("complete")
.data("{\"content\":\"" + escapeJson(content) + "\"}"));
} catch (Exception e) {
log.error("发送token失败", e);
// 对于其他异常,也将emitter标记为已完成,避免后续再次尝试发送
if (emitter != null) {
userSseService.removeEmitter(emitter);
}
log.error("发送完成事件失败", e);
}
}
@Override
public void onComplete(String fullContent) {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
// 发送完成事件
emitter.send(SseEmitter.event().name("done").data("[DONE]").build());
log.debug("完成信号已发送");
/**
* 发送错误
*/
public void emitError(String error) {
if (userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过发送错误事件");
return;
}
// 调用完成回调
if (completionCallback != null) {
completionCallback.onComplete(emitter, agent, request, userId, fullContent);
}
} catch (IllegalStateException e) {
// 处理emitter已关闭的情况,这通常是由于客户端断开连接
log.debug("无法发送完成信号,SSE emitter已关闭: {}", e.getMessage());
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送完成信号,IO异常: {}", e.getMessage());
try {
emitter.send(SseEmitter.event()
.name("error")
.data("{\"error\":\"" + escapeJson(error) + "\"}"));
} catch (Exception e) {
log.error("处理完成事件失败", e);
} finally {
// 关闭连接
closeEmitter();
log.error("发送错误事件失败", e);
}
}
/**
* 检查发射器是否已完成
*/
public boolean isCompleted() {
return userSseService.isEmitterCompleted(emitter);
}
/**
* 安全关闭SSE连接
* 完成发射
*/
public void closeEmitter() {
public void complete() {
if (!userSseService.isEmitterCompleted(emitter)) {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
log.info("SseTokenEmitter.complete: 完成SSE发射器");
emitter.complete();
log.debug("SSE连接已关闭");
} catch (Exception e) {
log.error("完成SSE发射器失败", e);
}
}
} catch (Exception ex) {
log.error("完成emitter时发生错误", ex);
if (completionCallback != null) {
completionCallback.accept(emitter, request.getUserMessage());
}
}
/**
* 完成回调接口
* 转义JSON字符串中的特殊字符
*/
@FunctionalInterface
public interface CompletionCallback {
void onComplete(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent);
private String escapeJson(String input) {
if (input == null) {
return null;
}
return input.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
}
\ No newline at end of file
......@@ -169,6 +169,7 @@ public class UserSseService {
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
try {
log.info("handleConnectionClose: SSE连接已关闭");
emitter.complete();
} catch (Exception e) {
log.debug("完成emitter时发生异常(可能是由于已关闭): {}", e.getMessage());
......@@ -292,8 +293,8 @@ public class UserSseService {
// 然后关闭SSE连接
try {
if (!isEmitterCompleted(emitter)) {
// emitter.complete();
log.debug("SSE连接已关闭");
emitter.complete();
log.info("startHeartbeat: SSE连接已关闭");
}
} catch (Exception ex) {
log.debug("关闭SSE连接时发生异常(可能是由于已关闭): {}", ex.getMessage());
......@@ -351,6 +352,7 @@ public class UserSseService {
try {
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
log.info("registerCallbacks: SSE连接已关闭");
emitter.complete();
}
} catch (Exception e) {
......@@ -381,7 +383,7 @@ public class UserSseService {
log.debug("Emitter已经完成,跳过关闭操作");
return;
}
log.info("completeEmitter: Emitter已成功关闭");
emitter.complete();
completedEmitters.add(emitter); // 添加到已完成集合
log.debug("Emitter已成功关闭");
......@@ -704,6 +706,28 @@ public class UserSseService {
return new ArrayList<>(emitters);
}
/**
* 关闭SSE发射器
*
* @param emitter SSE发射器
*/
public void closeEmitter(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
if (!isEmitterCompleted(emitter)) {
log.info("closeEmitter: Emitter已成功关闭");
emitter.complete();
}
// 从活动连接列表中移除
removeEmitter(emitter);
} catch (Exception e) {
log.warn("关闭SSE发射器时发生异常: {}", e.getMessage());
}
}
/**
* 销毁资源
*/
......
......@@ -10,7 +10,6 @@ import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import jakarta.servlet.http.HttpServletRequest;
import pangea.hiagent.common.utils.JwtUtil;
import java.lang.InheritableThreadLocal;
/**
......
......@@ -65,7 +65,7 @@ public abstract class BaseTool {
}
/**
* 简化版execute方法,无需手动构建参数映射
* 默认的execute方法,无需手动构建参数映射
* @param methodName 被调用的方法名称
* @param action 实际执行的工具逻辑
* @param <T> 返回类型
......
......@@ -17,6 +17,7 @@ import pangea.hiagent.common.exception.BusinessException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Agent API控制器
......@@ -353,4 +354,42 @@ public class AgentController {
return ApiResponse.error(4001, "获取Agent详情失败: " + e.getMessage());
}
}
/**
* 获取Agent统计信息
*/
@GetMapping("/stats")
public ApiResponse<Map<String, Object>> getAgentStats() {
try {
log.info("获取Agent统计信息");
List<Agent> agents = this.agentService.listAgents();
// 目前Agent模型中没有agentType字段,我们根据名称或其他特征来区分主从Agent
// 这里暂时根据名称中是否包含"master"或"slave"来区分
long masterAgents = agents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("master") ||
agent.getName().toLowerCase().contains("主")))
.count();
long slaveAgents = agents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("slave") ||
agent.getName().toLowerCase().contains("从") ||
agent.getDescription() != null &&
agent.getDescription().toLowerCase().contains("slave")))
.count();
long availableSlaveAgents = slaveAgents; // 假设所有从Agent都可用
Map<String, Object> stats = Map.of(
"masterAgents", masterAgents,
"slaveAgents", slaveAgents,
"availableSlaveAgents", availableSlaveAgents
);
return ApiResponse.success(stats, "获取Agent统计信息成功");
} catch (Exception e) {
log.error("获取Agent统计信息失败", e);
return ApiResponse.error(500, "获取Agent统计信息失败: " + e.getMessage());
}
}
}
\ No newline at end of file
......@@ -223,7 +223,17 @@ public class AuthController {
provider.setScope(request.getScope());
provider.setEnabled(request.getEnabled());
provider.setConfigJson(request.getConfigJson());
// TODO: 设置创建人和更新人信息
// 设置创建人和更新人信息
String currentUserId = pangea.hiagent.common.utils.UserUtils.getCurrentUserIdStatic();
if (currentUserId != null && !currentUserId.isEmpty()) {
provider.setCreatedBy(currentUserId);
provider.setUpdatedBy(currentUserId);
} else {
log.warn("无法获取当前用户ID,使用默认值");
provider.setCreatedBy("system");
provider.setUpdatedBy("system");
}
OAuth2Provider savedProvider = oAuth2ProviderService.createProvider(provider);
......@@ -257,7 +267,15 @@ public class AuthController {
provider.setScope(request.getScope());
provider.setEnabled(request.getEnabled());
provider.setConfigJson(request.getConfigJson());
// TODO: 设置更新人信息
// 设置更新人信息
String currentUserId = pangea.hiagent.common.utils.UserUtils.getCurrentUserIdStatic();
if (currentUserId != null && !currentUserId.isEmpty()) {
provider.setUpdatedBy(currentUserId);
} else {
log.warn("无法获取当前用户ID,使用默认值");
provider.setUpdatedBy("system");
}
OAuth2Provider updatedProvider = oAuth2ProviderService.updateProvider(id, provider);
......
package pangea.hiagent.web.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import pangea.hiagent.agent.data.AgentTaskStatus;
import pangea.hiagent.web.dto.ApiResponse;
import pangea.hiagent.web.dto.PageData;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.web.service.CollaborationService;
import pangea.hiagent.model.Agent;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 多Agent协作控制器
* 提供协作任务管理功能
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/agent/collaboration")
@RequiredArgsConstructor
public class CollaborationController {
private final CollaborationService collaborationService;
private final AgentService agentService;
/**
* 获取协同任务统计信息
*/
@GetMapping("/stats")
public ApiResponse<Map<String, Object>> getCollaborationStats() {
try {
log.info("获取协同任务统计信息");
Map<String, Object> stats = collaborationService.getCollaborationStats();
return ApiResponse.success(stats, "获取统计信息成功");
} catch (Exception e) {
log.error("获取协同任务统计信息失败", e);
return ApiResponse.error(500, "获取统计信息失败: " + e.getMessage());
}
}
/**
* 获取协同任务列表
*/
@GetMapping("/tasks")
public ApiResponse<PageData<AgentTaskStatus>> getCollaborationTasks(
@RequestParam(defaultValue = "1") Long page,
@RequestParam(defaultValue = "10") Long size,
@RequestParam(required = false) String status,
@RequestParam(required = false) String keyword) {
try {
log.info("获取协同任务列表,页码: {}, 每页大小: {}, 状态: {}, 关键词: {}", page, size, status, keyword);
// 获取过滤后的任务列表
List<AgentTaskStatus> filteredTasks = collaborationService.getCollaborationTasks(status, keyword, Math.toIntExact(page), Math.toIntExact(size));
// 获取总任务数
int total = collaborationService.getTotalTaskCount(status, keyword);
// 分页处理
int start = Math.toIntExact((page - 1) * size);
int end = Math.toIntExact(Math.min(page * size, filteredTasks.size()));
if (start >= filteredTasks.size()) {
start = Math.max(0, filteredTasks.size() - Math.toIntExact(size));
end = filteredTasks.size();
}
List<AgentTaskStatus> pagedTasks = start < filteredTasks.size() ?
filteredTasks.subList(start, end) : List.of();
PageData<AgentTaskStatus> pageData = new PageData<>();
pageData.setRecords(pagedTasks);
pageData.setTotal((long) filteredTasks.size());
pageData.setCurrent(page);
pageData.setSize(size);
pageData.setPages((long) Math.ceil((double) filteredTasks.size() / size));
return ApiResponse.success(pageData, "获取任务列表成功");
} catch (Exception e) {
log.error("获取协同任务列表失败", e);
return ApiResponse.error(500, "获取任务列表失败: " + e.getMessage());
}
}
/**
* 创建协同任务
*/
@PostMapping("/tasks")
public ApiResponse<Void> createCollaborationTask(@RequestBody Map<String, Object> taskParams) {
try {
log.info("创建协同任务: {}", taskParams);
// 这里需要根据实际的多Agent协作实现来处理任务创建
// 目前暂时返回成功响应
String taskId = "task_" + System.currentTimeMillis();
log.info("创建协同任务成功,任务ID: {}", taskId);
return ApiResponse.success(null, "创建协同任务成功");
} catch (Exception e) {
log.error("创建协同任务失败", e);
return ApiResponse.error(500, "创建协同任务失败: " + e.getMessage());
}
}
/**
* 重试任务
*/
@PostMapping("/tasks/{taskId}/retry")
public ApiResponse<Void> retryTask(@PathVariable String taskId) {
try {
log.info("重试任务: {}", taskId);
collaborationService.retryTask(taskId);
return ApiResponse.success(null, "任务重试成功");
} catch (Exception e) {
log.error("重试任务失败: {}", taskId, e);
return ApiResponse.error(500, "重试任务失败: " + e.getMessage());
}
}
/**
* 获取协作相关的Agent统计信息
*/
@GetMapping("/stats/agent")
public ApiResponse<Map<String, Object>> getCollaborationAgentStats() {
try {
log.info("获取Agent统计信息");
List<Agent> agents = agentService.listAgents();
// 目前Agent模型中没有agentType字段,我们根据名称或其他特征来区分主从Agent
// 这里暂时根据名称中是否包含"master"或"slave"来区分
long masterAgents = agents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("master") ||
agent.getName().toLowerCase().contains("主")))
.count();
long slaveAgents = agents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("slave") ||
agent.getName().toLowerCase().contains("从") ||
agent.getDescription() != null &&
agent.getDescription().toLowerCase().contains("slave")))
.count();
long availableSlaveAgents = slaveAgents; // 假设所有从Agent都可用
Map<String, Object> stats = Map.of(
"masterAgents", masterAgents,
"slaveAgents", slaveAgents,
"availableSlaveAgents", availableSlaveAgents
);
return ApiResponse.success(stats, "获取Agent统计信息成功");
} catch (Exception e) {
log.error("获取Agent统计信息失败", e);
return ApiResponse.error(500, "获取Agent统计信息失败: " + e.getMessage());
}
}
/**
* 获取主Agent列表
*/
@GetMapping("/master-agents")
public ApiResponse<List<Agent>> getMasterAgents() {
try {
log.info("获取主Agent列表");
List<Agent> allAgents = agentService.listAgents();
List<Agent> masterAgents = allAgents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("master") ||
agent.getName().toLowerCase().contains("主")))
.collect(Collectors.toList());
return ApiResponse.success(masterAgents, "获取主Agent列表成功");
} catch (Exception e) {
log.error("获取主Agent列表失败", e);
return ApiResponse.error(500, "获取主Agent列表失败: " + e.getMessage());
}
}
/**
* 获取从Agent列表
*/
@GetMapping("/slave-agents")
public ApiResponse<List<Agent>> getSlaveAgents() {
try {
log.info("获取从Agent列表");
List<Agent> allAgents = agentService.listAgents();
List<Agent> slaveAgents = allAgents.stream()
.filter(agent -> agent.getName() != null &&
(agent.getName().toLowerCase().contains("slave") ||
agent.getName().toLowerCase().contains("从") ||
agent.getDescription() != null &&
agent.getDescription().toLowerCase().contains("slave")))
.collect(Collectors.toList());
return ApiResponse.success(slaveAgents, "获取从Agent列表成功");
} catch (Exception e) {
log.error("获取从Agent列表失败", e);
return ApiResponse.error(500, "获取从Agent列表失败: " + e.getMessage());
}
}
}
\ No newline at end of file
......@@ -6,6 +6,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import pangea.hiagent.rag.RagService;
import pangea.hiagent.web.dto.ApiResponse;
import pangea.hiagent.model.Agent;
import java.util.List;
import java.util.Map;
......@@ -22,6 +23,9 @@ public class RagController {
@Autowired
private RagService ragService;
@Autowired
private pangea.hiagent.web.service.AgentService agentService;
/**
* 文档检索接口
*/
......@@ -74,12 +78,27 @@ public class RagController {
log.info("开始RAG增强问答,Agent ID: {}, 查询: {}", request.getAgentId(), request.getQuery());
// 根据Agent ID获取Agent对象
// 这里需要实现获取Agent的逻辑,暂时使用模拟数据
// String result = "这是RAG增强问答的结果";
Agent agent = null;
if (request.getAgentId() != null && !request.getAgentId().isEmpty()) {
agent = agentService.getAgent(request.getAgentId());
if (agent == null) {
log.warn("未找到指定的Agent: {}", request.getAgentId());
return ApiResponse.error(4003, "未找到指定的Agent: " + request.getAgentId());
}
log.debug("成功获取Agent: {} ({}), 启用RAG: {}",
agent.getName(), agent.getId(), agent.getEnableRag());
} else {
log.warn("Agent ID为空,无法执行RAG增强问答");
return ApiResponse.error(4004, "Agent ID不能为空");
}
// 调用RagService的实际方法
// TODO: 实现根据Agent ID获取Agent对象的逻辑
String result = ragService.ragQa(null, request.getQuery());
String result = ragService.ragQa(agent, request.getQuery());
if (result == null) {
log.warn("RAG增强问答返回结果为空");
return ApiResponse.error(4005, "RAG增强问答未返回有效结果");
}
log.info("RAG增强问答完成");
return ApiResponse.success(result, "RAG增强问答成功");
......
package pangea.hiagent.web.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.data.AgentTaskStatus;
import pangea.hiagent.agent.data.AgentTaskStatusRepository;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 多Agent协作服务
* 提供协作任务管理相关的业务逻辑
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class CollaborationService {
private final AgentTaskStatusRepository taskStatusRepository;
/**
* 获取协作任务统计信息
*/
public Map<String, Object> getCollaborationStats() {
try {
// 获取所有任务状态
List<AgentTaskStatus> allTasks = taskStatusRepository.selectList(null);
// 统计不同状态的任务数量
Map<String, Long> statusCounts = allTasks.stream()
.filter(task -> task.getStatus() != null) // 过滤空状态的任务
.collect(Collectors.groupingBy(
AgentTaskStatus::getStatus,
Collectors.counting()
));
return Map.of(
"totalTasks", allTasks.size(),
"runningTasks", statusCounts.getOrDefault("RUNNING", 0L),
"successTasks", statusCounts.getOrDefault("SUCCESS", 0L),
"failedTasks", statusCounts.getOrDefault("FAIL", 0L),
"pendingTasks", statusCounts.getOrDefault("READY", 0L)
);
} catch (Exception e) {
log.error("获取协作任务统计信息失败", e);
return Map.of(
"totalTasks", 0L,
"runningTasks", 0L,
"successTasks", 0L,
"failedTasks", 0L,
"pendingTasks", 0L
);
}
}
/**
* 获取协作任务列表
*/
public List<AgentTaskStatus> getCollaborationTasks(String status, String keyword) {
try {
List<AgentTaskStatus> allTasks = taskStatusRepository.selectList(null);
return allTasks.stream()
.filter(task -> {
boolean statusMatch = status == null || status.isEmpty() ||
(task.getStatus() != null && task.getStatus().equalsIgnoreCase(status));
boolean keywordMatch = keyword == null || keyword.isEmpty() ||
(task.getTaskId() != null && task.getTaskId().toLowerCase().contains(keyword.toLowerCase()));
return statusMatch && keywordMatch;
})
.collect(Collectors.toList());
} catch (Exception e) {
log.error("获取协作任务列表失败: status={}, keyword={}", status, keyword, e);
return List.of();
}
}
/**
* 获取协作任务列表(带分页)
*/
public List<AgentTaskStatus> getCollaborationTasks(String status, String keyword, int page, int size) {
try {
if (page < 1 || size < 1) {
log.warn("分页参数无效: page={}, size={}", page, size);
return List.of();
}
List<AgentTaskStatus> filteredTasks = getCollaborationTasks(status, keyword);
int start = (page - 1) * size;
int end = Math.min(start + size, filteredTasks.size());
if (start >= filteredTasks.size()) {
start = Math.max(0, filteredTasks.size() - size);
end = filteredTasks.size();
}
if (start < filteredTasks.size()) {
return filteredTasks.subList(start, end);
} else {
return List.of();
}
} catch (Exception e) {
log.error("获取协作任务列表(分页)失败: status={}, keyword={}, page={}, size={}", status, keyword, page, size, e);
return List.of();
}
}
/**
* 获取任务总数(根据过滤条件)
*/
public int getTotalTaskCount(String status, String keyword) {
try {
return getCollaborationTasks(status, keyword).size();
} catch (Exception e) {
log.error("获取任务总数失败: status={}, keyword={}", status, keyword, e);
return 0;
}
}
/**
* 重试任务
*/
public void retryTask(String taskId) {
if (taskId == null || taskId.trim().isEmpty()) {
log.warn("任务ID为空,无法重试");
return;
}
try {
// 这里需要调用TaskStatusService的重试逻辑
// 由于TaskStatusService.retryTask是私有方法,我们需要通过其他方式实现
AgentTaskStatus status = taskStatusRepository.selectById(taskId);
if (status != null) {
int newRetryCount = (status.getRetryCount() != null ? status.getRetryCount() : 0) + 1;
status.setRetryCount(newRetryCount);
// 如果重试次数超过阈值,标记为失败
if (newRetryCount > 3) { // 最多重试3次
status.setStatus("FAIL");
} else {
// 更新状态为就绪状态以重新执行
status.setStatus("READY");
}
taskStatusRepository.updateById(status);
log.info("任务{}重试,当前重试次数: {}", taskId, newRetryCount);
} else {
log.warn("找不到任务ID为{}的任务,无法重试", taskId);
}
} catch (Exception e) {
log.error("重试任务失败: taskId={}", taskId, e);
throw e;
}
}
}
\ No newline at end of file
CREATE TABLE IF NOT EXISTS agent_task_status (
task_id VARCHAR(64) PRIMARY KEY COMMENT '任务唯一标识',
status VARCHAR(32) NOT NULL COMMENT '任务状态:READY/RUNNING/SUCCESS/FAIL',
dependencies VARCHAR(256) COMMENT '依赖子任务ID,逗号分隔',
result TEXT COMMENT '任务结果JSON字符串',
result_hash VARCHAR(64) COMMENT '结果MD5哈希值,用于校验完整性',
retry_count INT DEFAULT 0 COMMENT '已重试次数',
timeout BIGINT COMMENT '任务超时时间戳',
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '状态更新时间'
);
-- 创建任务状态索引,提升查询效率
CREATE INDEX idx_task_status ON agent_task_status(status);
CREATE INDEX idx_task_update_time ON agent_task_status(update_time);
\ No newline at end of file
......@@ -205,6 +205,46 @@ hiagent:
default-temperature: 0.7
default-max-tokens: 4096
history-length: 10
# 通信配置
comm:
use-remote: false # 是否启用远程MQ通信,本地开发设为false
# 灰度发布配置
gray:
ratio: 10 # 灰度流量比例(百分比)
white-list: "test_task_001" # 灰度白名单任务ID,逗号分隔
# 模块开关配置
module:
enabled: true # 模块总开关
log:
enabled: true # 日志审计模块开关
check:
enabled: true # 结果校验模块开关
monitor:
enabled: true # 任务可视化模块开关
fallback:
enabled: true # 降级兜底模块开关
cache:
enabled: true # 缓存管理模块开关
cacheType: caffeine # 缓存类型
maxSize: 1000 # 最大缓存条目
taskCacheTtl: 300000 # 任务缓存TTL(5分钟)
resultCacheTtl: 600000 # 结果缓存TTL(10分钟)
configCacheTtl: 3600000 # 配置缓存TTL(1小时)
scheduler:
enabled: true # 任务调度模块开关
strategy: priority # 调度策略
maxConcurrentTasks: 10 # 最大并发任务数
taskTimeout: 300000 # 任务超时时间(5分钟)
performance:
enabled: true # 性能监控模块开关
collectionInterval: 5000 # 指标收集间隔(5秒)
enableMetrics: true # 启用指标收集
enableTracing: true # 启用链路追踪
security:
enabled: true # 安全审计模块开关
logEnabled: true # 启用安全日志
logTaskAccess: true # 记录任务访问
logDataAccess: true # 记录数据访问
# LLM配置
llm:
......@@ -236,7 +276,7 @@ hiagent:
### 核心规则
1. Thought:分析用户需求,判断是否需要调用工具,明确工具调用的目的和参数。
2. Action:仅调用已授权的工具,严格遵循工具入参格式,单次可调用单/多工具。
2. Action:真实调用工具,仅调用已授权的工具,严格遵循工具入参格式,单次可调用单/多工具。
3. Observation:接收工具返回结果,校验数据有效性,无结果则从Thought继续重试,最多重试3次。
4. Final_Answer:基于Thought+Action+Observation,输出最终精准结果,不冗余、不臆造。
......@@ -251,11 +291,28 @@ hiagent:
### 输出格式
Thought: [分析用户需求]
Action: [工具名称](参数1=值1, 参数2=值2)
Action: [工具名称]:[调用工具的目的]
Observation: [工具返回结果]
(重复以上步骤,最多3次)
Final_Answer: [最终结果]
# Resilience4j配置
resilience4j:
circuit-breaker:
instances:
DbQuerySlaveAgent:
sliding-window-size: 10
failure-rate-threshold: 50
wait-duration-in-open-state: 10s
retry:
instances:
DbQuerySlaveAgent:
max-attempts: 3
wait-duration: 1s
# Milvus Lite配置
milvus:
data-dir: ./milvus_data
......
......@@ -231,7 +231,8 @@ const eventTitleMap: Record<string, string | ((data: any) => string)> = {
result: "执行结果",
thought: (data: any) => data.thinkingType === "final_answer" ? "最终答案" : "思考过程",
complete: "对话完成",
error: "对话错误"
error: "对话错误",
collaboration_result: "Agent协同结果"
};
// 处理时间轴事件的通用函数
......@@ -589,7 +590,7 @@ const processSSELine = async (
// 收到思考事件,清除超时计时器
clearStreamTimeout();
// 处理思考事件,将其发送到时间轴面板
handleTimelineEvent(eventType, data, data.content);
handleTimelineEvent(eventType, data, data.content || "");
// 记录思考事件便于调试
console.log("[SSE思考事件]", data);
......@@ -616,7 +617,7 @@ const processSSELine = async (
case "embed":
// 处理嵌入事件,将其发送到时间轴面板
handleTimelineEvent(eventType, data, data.content);
handleTimelineEvent(eventType, data, data.content || "");
// 对于embed事件,还需要触发embed-event事件
if (data.embedUrl) {
......@@ -637,7 +638,18 @@ const processSSELine = async (
case "observation":
case "log":
case "result":
handleTimelineEvent(eventType, data, data.content);
handleTimelineEvent(eventType, data, data.content || "");
break;
case "collaboration_result":
// 处理多Agent协同任务结果
handleTimelineEvent(eventType, data, data.content || "");
// 在主对话区域显示协同任务结果
if (data.content) {
accumulatedContentRef.value += '\n\n' + data.content;
messages.value[aiMessageIndex].content = accumulatedContentRef.value;
await scrollToBottom();
}
break;
default:
......
<template>
<div class="form-container">
<h2>表单渲染器(已简化</h2>
<h2>表单渲染器(已)</h2>
<div class="form-content">
<div class="form-field">
<label>输入框</label>
......
......@@ -38,6 +38,10 @@
<el-icon><Avatar /></el-icon>
<span>Agent管理</span>
</el-menu-item>
<el-menu-item index="/multi-agent">
<el-icon><Connection /></el-icon>
<span>多Agent协同</span>
</el-menu-item>
<el-menu-item index="/tools">
<el-icon><Tools /></el-icon>
<span>工具管理</span>
......@@ -143,7 +147,8 @@ import {
Lock,
Monitor,
User,
SwitchButton
SwitchButton,
Connection
} from '@element-plus/icons-vue'
const router = useRouter()
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment