Commit cfa74117 authored by ligaowei's avatar ligaowei

refactor(agent): 重构事件数据构建器和Playwright管理模块

将事件数据构建器从workpanel迁移到agent模块,统一管理事件数据构建
重构Playwright管理器接口位置,优化代码组织结构
移除冗余的workpanel相关代码,简化项目结构
调整ReactStepType枚举处理空格逻辑,增强健壮性
优化EventSplitter的事件分割逻辑,提高准确性
parent 55b91d26
package pangea.hiagent.workpanel.data;
package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......
package pangea.hiagent.workpanel.data;
package pangea.hiagent.agent.data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -31,3 +31,4 @@ public class ErrorEventDataBuilder {
return data;
}
}
package pangea.hiagent.workpanel.data;
package pangea.hiagent.agent.data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
......@@ -59,3 +59,4 @@ public class MapPoolService {
return mapPool.getStatistics();
}
}
......@@ -7,13 +7,18 @@ import java.util.regex.Pattern;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class EventSplitter {
private final List<String> keywords = Arrays.asList(
"Thought", "Action", "Observation", "Iteration_Decision", "Final_Answer"
"Thought", "Action", "Observation", "Final_Answer"
);
private final Pattern keywordPattern = Pattern.compile(
String.format("(%s):", String.join("|", keywords))
String.format("(?i)(Thought|Action|Observation|Final[ _]Answer):", String.join("|", keywords)), Pattern.CASE_INSENSITIVE
);
private String currentType = null;
......@@ -30,8 +35,12 @@ public class EventSplitter {
// 每收到一个token/字符,调用此方法
public void feedToken(String token) {
buffer.append(token);
// log.debug("当前缓冲区: {}", buffer.toString());
Matcher matcher = keywordPattern.matcher(buffer);
if (matcher.find()) {
while (matcher.find()) {
log.debug("发现新事件关键词: {}", matcher.group(1));
// 发现新事件
if (currentType != null && currentContent.length() > 0) {
// 实时输出已分割事件
......@@ -40,13 +49,32 @@ public class EventSplitter {
// 更新事件类型
currentType = matcher.group(1);
currentContent.setLength(0);
// 移除关键词和冒号
buffer.delete(0, matcher.end());
// 累积匹配位置后的内容
currentContent.append(buffer.substring(matcher.end()));
// 重置buffer为剩余内容
buffer.setLength(0);
buffer.append(currentContent);
// 重新查找
matcher = keywordPattern.matcher(buffer);
}
// 检查是否有部分关键词在buffer末尾
if (buffer.length() > 0) {
// 检查是否可能是关键词的一部分
boolean isPartialKeyword = false;
String bufferStr = buffer.toString();
for (String keyword : keywords) {
if (keyword.startsWith(bufferStr) || bufferStr.startsWith(keyword)) {
isPartialKeyword = true;
break;
}
}
// 累积内容
if (!isPartialKeyword) {
// 不是部分关键词,添加到内容中
currentContent.append(buffer);
buffer.setLength(0);
}
}
}
// 流式结束时,调用此方法输出最后一个事件
public void endStream(ReactCallback tokenConsumer) {
......
......@@ -25,6 +25,6 @@ public enum ReactStepType {
FINAL_ANSWER;
public static ReactStepType fromString(String currentType) {
return ReactStepType.valueOf(currentType.toUpperCase());
return ReactStepType.valueOf(currentType.toUpperCase().replace(" ", "_"));
}
}
\ No newline at end of file
......@@ -13,7 +13,6 @@ import pangea.hiagent.web.dto.ChatRequest;
import pangea.hiagent.model.Agent;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.workpanel.event.EventService;
import jakarta.servlet.http.HttpServletResponse;
/**
......@@ -31,7 +30,6 @@ public class AgentChatService {
private final pangea.hiagent.web.service.AgentService agentService;
public AgentChatService(
EventService eventService,
ErrorHandlerService errorHandlerService,
AgentProcessorFactory agentProcessorFactory,
AgentToolManager agentToolManager,
......
......@@ -145,7 +145,7 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
public void closeEmitter() {
try {
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
// emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.agent.data.ErrorEventDataBuilder;
import pangea.hiagent.agent.data.MapPoolService;
import pangea.hiagent.agent.data.TokenEventDataBuilder;
import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.workpanel.data.TokenEventDataBuilder;
import pangea.hiagent.workpanel.data.ErrorEventDataBuilder;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
......@@ -42,18 +43,19 @@ public class UserSseService {
// 心跳任务执行器 - 使用共享线程池以提高资源利用率
private final ScheduledExecutorService heartbeatExecutor;
// SSE超时时间(毫秒)
private static final long SSE_TIMEOUT = 0L; // 0表示不使用默认超时,由心跳机制管理连接
private final EventService eventService;
private final TokenEventDataBuilder tokenEventDataBuilder;
private final ErrorEventDataBuilder errorEventDataBuilder;
public UserSseService(EventService eventService, TokenEventDataBuilder tokenEventDataBuilder,
ErrorEventDataBuilder errorEventDataBuilder) {
this.eventService = eventService;
private final MapPoolService mapPoolService;
// SSE超时时间(毫秒)
private static final long SSE_TIMEOUT = 0L; // 0表示不使用默认超时,由心跳机制管理连接
public UserSseService(TokenEventDataBuilder tokenEventDataBuilder, ErrorEventDataBuilder errorEventDataBuilder, MapPoolService mapPoolService) {
this.tokenEventDataBuilder = tokenEventDataBuilder;
this.errorEventDataBuilder = errorEventDataBuilder;
this.mapPoolService = mapPoolService;
this.heartbeatExecutor = Executors.newScheduledThreadPool(2);
}
......@@ -291,7 +293,7 @@ public class UserSseService {
// 然后关闭SSE连接
try {
if (!isEmitterCompleted(emitter)) {
emitter.complete();
// emitter.complete();
log.debug("SSE连接已关闭");
}
} catch (Exception ex) {
......@@ -596,12 +598,19 @@ public class UserSseService {
}
// 从对象池获取Map,避免频繁创建对象
Map<String, Object> data =event.getMetadata();
Map<String, Object> data = mapPoolService.acquireMap();
// 设置基础属性
data.put("type", event.getType());
data.put("eventType", event.getType());
data.put("timestamp", event.getTimestamp());
data.put("title", event.getTitle());
data.put("content", event.getContent());
data.put("userId", event.getUserId());
if(event.getMetadata() != null) {
data.putAll(event.getMetadata());
}
return data;
}
......
......@@ -15,8 +15,8 @@ import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.util.UriComponentsBuilder;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import pangea.hiagent.websocket.DomSyncHandler;
import java.util.Map;
......
// package pangea.hiagent.tool.aspect;
// import lombok.extern.slf4j.Slf4j;
// import org.aspectj.lang.ProceedingJoinPoint;
// import org.aspectj.lang.annotation.Around;
// import org.aspectj.lang.annotation.Aspect;
// import org.aspectj.lang.reflect.MethodSignature;
// import org.springframework.ai.tool.annotation.Tool;
// import org.springframework.beans.factory.annotation.Autowired;
// import org.springframework.stereotype.Component;
// import pangea.hiagent.workpanel.IWorkPanelDataCollector;
// import java.util.HashMap;
// import java.util.Map;
// /**
// * 工具执行日志记录切面类
// * 自动记录带有@Tool注解的方法执行信息,包括工具名称、方法名、输入参数、输出结果、运行时长等
// */
// @Slf4j
// @Aspect
// @Component
// public class ToolExecutionLoggerAspect {
// @Autowired
// private IWorkPanelDataCollector workPanelDataCollector;
// /**
// * 环绕通知,拦截所有带有@Tool注解的方法
// * @param joinPoint 连接点
// * @return 方法执行结果
// * @throws Throwable 异常
// */
// @Around("@annotation(tool)")
// public Object logToolExecution(ProceedingJoinPoint joinPoint, Tool tool) throws Throwable {
// // 获取方法签名
// MethodSignature signature = (MethodSignature) joinPoint.getSignature();
// String methodName = signature.getName();
// String className = signature.getDeclaringType().getSimpleName();
// String fullMethodName = className + "." + methodName;
// // 获取工具描述
// String toolDescription = tool.description();
// // 获取方法参数
// String[] paramNames = signature.getParameterNames();
// Object[] args = joinPoint.getArgs();
// // 构建输入参数映射
// Map<String, Object> inputParams = new HashMap<>();
// if (paramNames != null && args != null) {
// for (int i = 0; i < paramNames.length; i++) {
// if (i < args.length) {
// inputParams.put(paramNames[i], args[i]);
// }
// }
// }
// // 记录开始时间
// long startTime = System.currentTimeMillis();
// // 记录工具调用开始
// if (workPanelDataCollector != null) {
// try {
// workPanelDataCollector.recordToolCallAction(className, inputParams, null, "pending", null);
// } catch (Exception e) {
// log.warn("记录工具调用开始时发生错误: {}", e.getMessage(), e);
// }
// }
// log.info("开始执行工具方法: {},描述: {}", fullMethodName, toolDescription);
// log.debug("工具方法参数: {}", inputParams);
// try {
// // 执行原方法
// Object result = joinPoint.proceed();
// // 记录结束时间
// long endTime = System.currentTimeMillis();
// long executionTime = endTime - startTime;
// // 记录工具调用完成
// if (workPanelDataCollector != null) {
// try {
// workPanelDataCollector.recordToolCallAction(className, inputParams, result, "success", executionTime);
// } catch (Exception e) {
// log.warn("记录工具调用完成时发生错误: {}", e.getMessage(), e);
// }
// }
// log.info("工具方法执行成功: {},描述: {},耗时: {}ms", fullMethodName, toolDescription, executionTime);
// // 精简日志记录,避免过多的debug级别日志
// if (log.isTraceEnabled()) {
// log.trace("工具方法执行结果类型: {},结果: {}",
// result != null ? result.getClass().getSimpleName() : "null", result);
// }
// return result;
// } catch (Exception e) {
// // 记录结束时间
// long endTime = System.currentTimeMillis();
// long executionTime = endTime - startTime;
// // 记录工具调用错误
// if (workPanelDataCollector != null) {
// try {
// workPanelDataCollector.recordToolCallAction(className, inputParams, e, "error", executionTime);
// } catch (Exception ex) {
// log.warn("记录工具调用错误时发生错误: {}", ex.getMessage(), ex);
// }
// }
// log.error("工具方法执行失败: {},描述: {},耗时: {}ms,错误类型: {}",
// fullMethodName, toolDescription, executionTime, e.getClass().getSimpleName(), e);
// throw e;
// }
// }
// }
\ No newline at end of file
......@@ -18,8 +18,8 @@ import java.util.HashMap;
import java.util.Map;
import pangea.hiagent.tool.BaseTool;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import pangea.hiagent.web.service.ToolConfigService;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
/**
* 海信LBPM流程审批工具类
......
......@@ -18,8 +18,8 @@ import java.util.HashMap;
import java.util.Map;
import pangea.hiagent.tool.BaseTool;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import pangea.hiagent.web.service.ToolConfigService;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
/**
* 海信绩效系统流程审批工具类
......
......@@ -16,8 +16,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.HashMap;
import java.util.Map;
import pangea.hiagent.tool.BaseTool;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import pangea.hiagent.web.service.ToolConfigService;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
/**
* 海信SSO认证工具类
......
......@@ -4,10 +4,11 @@ import com.microsoft.playwright.*;
import com.microsoft.playwright.options.LoadState;
import com.microsoft.playwright.options.WaitUntilState;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import java.util.Base64;
import java.util.List;
......
......@@ -3,7 +3,6 @@ package pangea.hiagent.tool.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.stereotype.Component;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
import java.io.BufferedReader;
import java.io.File;
......@@ -21,7 +20,6 @@ import java.util.List;
@Component
public class StorageFileAccessTool {
private final IWorkPanelDataCollector workPanelDataCollector;
// 支持的文件扩展名
private static final List<String> SUPPORTED_EXTENSIONS = Arrays.asList(
......@@ -32,9 +30,6 @@ public class StorageFileAccessTool {
// storage目录路径
private static final String STORAGE_DIR = "backend" + File.separator + "storage";
public StorageFileAccessTool(IWorkPanelDataCollector workPanelDataCollector) {
this.workPanelDataCollector = workPanelDataCollector;
}
/**
* 访问并预览storage目录下的文件
......@@ -88,8 +83,6 @@ public class StorageFileAccessTool {
log.info("成功读取文件: {}", fileName);
String result = "已成功在工作面板中预览文件: " + fileName;
// 发送embed事件到工作面板
workPanelDataCollector.recordEmbed(filePath, mimeType, title, content);
return result;
......
package pangea.hiagent.tool.impl;import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.stereotype.Component;
......@@ -18,13 +17,6 @@ import java.net.URLConnection;
@Component
public class WebPageAccessTools {
// 通过构造器注入的方式引入IWorkPanelDataCollector依赖
private final IWorkPanelDataCollector workPanelDataCollector;
public WebPageAccessTools(IWorkPanelDataCollector workPanelDataCollector) {
this.workPanelDataCollector = workPanelDataCollector;
}
/**
* 根据网站名称访问网页并在工作面板中预览
* @param siteName 网站名称(如"百度"、"谷歌"等)
......@@ -84,9 +76,6 @@ public class WebPageAccessTools {
log.info("成功访问网页: {}", url);
String result = "已成功在工作面板中预览网页: " + url;
// 发送embed事件到工作面板
workPanelDataCollector.recordEmbed(url, "text/html", title, webContent);
return result;
} catch (Exception e) {
return handleError(e, "获取网页内容时发生错误");
......
package pangea.hiagent.workpanel.playwright;
package pangea.hiagent.tool.playwright;
import com.microsoft.playwright.Browser;
import com.microsoft.playwright.BrowserContext;
......
package pangea.hiagent.workpanel.playwright;
package pangea.hiagent.tool.playwright;
import com.microsoft.playwright.*;
import lombok.extern.slf4j.Slf4j;
......
......@@ -2,9 +2,10 @@ package pangea.hiagent.websocket;
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.*;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import java.util.HashMap;
import java.util.Map;
......
......@@ -3,8 +3,8 @@ package pangea.hiagent.websocket;
import com.microsoft.playwright.*;
import com.microsoft.playwright.options.LoadState;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import pangea.hiagent.common.utils.AsyncUserContextDecorator;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import java.util.concurrent.ConcurrentMap;
......
......@@ -2,8 +2,9 @@ package pangea.hiagent.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.*;
import pangea.hiagent.workpanel.playwright.PlaywrightManager;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.tool.playwright.PlaywrightManager;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......
package pangea.hiagent.workpanel;
import pangea.hiagent.web.dto.WorkPanelEvent;
import java.util.function.Consumer;
import java.util.List;
/**
* 工作面板数据收集器接口
* 用于采集Agent执行过程中的各类数据(思考过程、工具调用等)
*/
public interface IWorkPanelDataCollector {
/**
* 记录思考过程
* @param content 思考内容
* @param thinkingType 思考类型(分析、规划、执行等)
*/
void recordThinking(String content, String thinkingType);
/**
* 记录工具调用Action
* @param toolName 工具名称
* @param input 工具输入参数
* @param output 工具输出结果
* @param status 执行状态(pending/success/failure/error)
* @param executionTime 执行时间(毫秒)
*/
void recordToolCallAction(String toolName, Object input, Object output, String status, Long executionTime);
/**
* 记录日志
* @param message 日志消息
* @param level 日志级别(info/warn/error/debug)
*/
void recordLog(String message, String level);
/**
* 记录embed嵌入事件
* @param url 嵌入资源URL(可选)
* @param type MIME类型
* @param title 嵌入标题
* @param htmlContent HTML内容(可选)
*/
void recordEmbed(String url, String type, String title, String htmlContent);
/**
* 获取所有收集的事件
*/
List<WorkPanelEvent> getEvents();
/**
* 订阅事件(用于实时推送)
* @param consumer 事件处理函数
*/
void subscribe(Consumer<WorkPanelEvent> consumer);
/**
* 清空所有事件
*/
void clear();
/**
* 获取最后一个工具调用事件
*/
WorkPanelEvent getLastToolCall();
/**
* 记录最终答案
* @param content 最终答案内容
*/
void recordFinalAnswer(String content);
/**
* 添加事件到收集器
* 统一的事件添加方法,用于避免重复实现
* @param event 工作面板事件
*/
void addEvent(WorkPanelEvent event);
}
\ No newline at end of file
package pangea.hiagent.workpanel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pangea.hiagent.web.dto.*;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.model.Agent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
/**
* 工作面板服务
* 负责处理工作面板相关的状态和事件
*/
@Slf4j
@Service
public class WorkPanelService {
@Autowired
private AgentService agentService;
// 用于跟踪已发送的事件ID,防止重复发送
private final Map<String, Set<String>> sentEventIds = new ConcurrentHashMap<>();
/**
* 获取工作面板当前状态
*/
public WorkPanelStatusDto getWorkPanelStatus(String agentId, String userId) {
try {
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
throw new RuntimeException("Agent不存在");
}
if (!agent.getOwner().equals(userId)) {
throw new RuntimeException("无权访问该Agent");
}
log.info("获取Agent {} 的工作面板状态", agentId);
// 从工作面板收集器中读取数据
IWorkPanelDataCollector collector = null; // 移除了对ReActService的依赖
List<WorkPanelEvent> allEvents = collector != null ? collector.getEvents() : new ArrayList<>();
// 统计不同类型的事件
int totalEvents = allEvents.size();
int successfulCalls = (int) allEvents.stream()
.filter(e -> "tool_result".equals(e.getType()) &&
(e instanceof ToolEvent && "success".equals(((ToolEvent) e).getToolStatus())))
.count();
int failedCalls = (int) allEvents.stream()
.filter(e -> "tool_error".equals(e.getType()) ||
("tool_result".equals(e.getType()) &&
(e instanceof ToolEvent && "failure".equals(((ToolEvent) e).getToolStatus()))))
.count();
List<WorkPanelEvent> thinkingEvents = new ArrayList<>();
List<WorkPanelEvent> toolCallEvents = new ArrayList<>();
List<WorkPanelEvent> logEvents = new ArrayList<>();
for (WorkPanelEvent event : allEvents) {
switch (event.getType()) {
case "thought" -> thinkingEvents.add(event);
case "tool_call", "tool_result", "tool_error" -> toolCallEvents.add(event);
case "log" -> logEvents.add(event);
default -> {
}
}
}
WorkPanelStatusDto status = WorkPanelStatusDto.builder()
.id(agentId + "_workpanel")
.agentId(agentId)
.agentName(agent.getName())
.events(allEvents)
.thinkingEvents(thinkingEvents)
.toolCallEvents(toolCallEvents)
.logEvents(logEvents)
.totalEvents(totalEvents)
.successfulToolCalls(successfulCalls)
.failedToolCalls(failedCalls)
.updateTimestamp(System.currentTimeMillis())
.isProcessing(false)
.build();
return status;
} catch (Exception e) {
log.error("获取工作面板状态失败", e);
throw new RuntimeException("获取工作面板状态失败: " + e.getMessage(), e);
}
}
/**
* 清空工作面板数据
*/
public void clearWorkPanel(String agentId, String userId) {
try {
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
throw new RuntimeException("Agent不存在");
}
if (!agent.getOwner().equals(userId)) {
throw new RuntimeException("无权访问该Agent");
}
log.info("清空Agent {} 的工作面板", agentId);
// 在实际应用中,这里应该从缓存中清除工作面板数据
// 清空已发送事件ID跟踪
sentEventIds.remove(agentId);
} catch (Exception e) {
log.error("清空工作面板失败", e);
throw new RuntimeException("清空工作面板失败: " + e.getMessage(), e);
}
}
/**
* 生成事件唯一标识
*/
public String generateEventId(WorkPanelEvent event) {
if (event == null) {
return "null_event_" + System.currentTimeMillis();
}
StringBuilder sb = new StringBuilder();
sb.append(event.getType()).append("_");
switch (event.getType()) {
case "thought":
if (event instanceof ThoughtEvent) {
ThoughtEvent thoughtEvent = (ThoughtEvent) event;
sb.append(thoughtEvent.getThinkingType() != null ? thoughtEvent.getThinkingType() : "default")
.append("_")
.append(thoughtEvent.getContent() != null ? thoughtEvent.getContent().hashCode() : 0);
} else {
sb.append("default").append("_").append(0);
}
break;
case "tool_call":
case "tool_result":
case "tool_error":
if (event instanceof ToolEvent) {
ToolEvent toolEvent = (ToolEvent) event;
sb.append(toolEvent.getToolName() != null ? toolEvent.getToolName() : "unknown")
.append("_")
.append(toolEvent.getToolAction() != null ? toolEvent.getToolAction() : "unknown")
.append("_")
.append(event.getTimestamp() != null ? event.getTimestamp() : System.currentTimeMillis());
} else {
sb.append("unknown").append("_").append("unknown").append("_").append(System.currentTimeMillis());
}
break;
case "log":
if (event instanceof LogEvent) {
LogEvent logEvent = (LogEvent) event;
sb.append(logEvent.getLogLevel() != null ? logEvent.getLogLevel() : "info")
.append("_")
.append(logEvent.getContent() != null ? logEvent.getContent().hashCode() : 0);
} else {
sb.append("info").append("_").append(0);
}
break;
case "embed":
if (event instanceof EmbedEvent) {
EmbedEvent embedEvent = (EmbedEvent) event;
sb.append(embedEvent.getEmbedTitle() != null ? embedEvent.getEmbedTitle() : "untitled")
.append("_")
.append(embedEvent.getEmbedUrl() != null ? embedEvent.getEmbedUrl().hashCode() : 0);
} else {
sb.append("untitled").append("_").append(0);
}
break;
default:
sb.append(event.getTimestamp() != null ? event.getTimestamp() : System.currentTimeMillis());
break;
}
return sb.toString();
}
/**
* 检查事件是否已发送
*/
public boolean isEventAlreadySent(String agentId, WorkPanelEvent event) {
String eventId = generateEventId(event);
Set<String> agentEventIds = sentEventIds.computeIfAbsent(agentId, k -> new HashSet<>());
return !agentEventIds.add(eventId); // 如果已存在,add返回false,表示已发送
}
}
\ No newline at end of file
package pangea.hiagent.workpanel;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.workpanel.event.EventTypeConverter;
import java.util.HashMap;
import java.util.Map;
/**
* 工作面板工具类
* 提供各种辅助方法用于工作面板事件处理
* 注意:此工具类已优化,移除了与EventTypeConverter重复的功能
*/
@Slf4j
public class WorkPanelUtils {
/**
* 将对象转换为Map(用于工具输入参数)
*/
public static Map<String, Object> convertToMap(Object input) {
if (input == null) {
return new HashMap<>();
}
if (input instanceof Map) {
// 安全地转换Map类型,确保键为String类型
Map<?, ?> rawMap = (Map<?, ?>) input;
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<?, ?> entry : rawMap.entrySet()) {
// 将键转换为String类型
String key = entry.getKey() != null ? entry.getKey().toString() : "null";
resultMap.put(key, entry.getValue());
}
return resultMap;
}
// 简单对象转换为Map
Map<String, Object> result = new HashMap<>();
result.put("value", input);
return result;
}
/**
* 获取状态文本
* 已委托给EventTypeConverter处理
*/
public static String getStatusText(String status) {
EventTypeConverter converter = new EventTypeConverter();
return converter.getStatusText(status);
}
/**
* 根据状态确定事件类型
* 已委托给EventTypeConverter处理
*/
public static String getEventTypeFromStatus(String status) {
EventTypeConverter converter = new EventTypeConverter();
return converter.getEventTypeFromStatus(status);
}
/**
* 将对象转换为JSON字符串
*
* @param obj 要转换的对象
* @return JSON字符串表示
*/
public static String convertToJsonString(Object obj) {
if (obj == null) {
return "null";
}
try {
// 使用Jackson ObjectMapper进行序列化
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
return mapper.writeValueAsString(obj);
} catch (Exception e) {
// 如果序列化失败,返回对象的toString()表示
return obj.toString();
}
}
}
\ No newline at end of file
package pangea.hiagent.workpanel.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import pangea.hiagent.web.dto.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 事件去重服务
* 统一处理事件去重逻辑,避免在多个地方重复实现
*/
@Slf4j
@Component
public class EventDeduplicationService {
/**
* 最近事件的缓存,用于去重检查
*/
private final Map<String, WorkPanelEvent> recentEventsCache = new ConcurrentHashMap<>();
/**
* 缓存过期时间(毫秒),默认5秒
*/
private static final long CACHE_EXPIRY_TIME = 5000L;
/**
* 最大缓存大小,防止内存溢出
*/
private static final int MAX_CACHE_SIZE = 1000;
/**
* 上次清理缓存的时间
*/
private volatile long lastCleanupTime = 0;
/**
* 清理间隔(毫秒)
*/
private static final long CLEANUP_INTERVAL = 30000L;
/**
* 检查是否为重复事件
*/
public boolean isDuplicateEvent(WorkPanelEvent event) {
if (event == null) {
return false;
}
// 定期清理过期缓存
cleanupExpiredCache();
// 生成事件唯一标识
String eventKey = generateEventKey(event);
WorkPanelEvent cachedEvent = recentEventsCache.get(eventKey);
if (cachedEvent != null) {
// 检查缓存是否过期
long currentTime = System.currentTimeMillis();
if ((currentTime - cachedEvent.getTimestamp()) <= CACHE_EXPIRY_TIME) {
// 检查事件内容是否相同
return isEventContentEqual(event, cachedEvent);
} else {
// 缓存过期,移除
recentEventsCache.remove(eventKey);
}
}
return false;
}
/**
* 更新最近事件缓存
*/
public void updateRecentEventsCache(WorkPanelEvent event) {
if (event == null) {
return;
}
// 控制缓存大小,防止内存溢出
if (recentEventsCache.size() >= MAX_CACHE_SIZE) {
// 移除最老的条目(简单实现,实际可以根据LRU算法优化)
java.util.Iterator<Map.Entry<String, WorkPanelEvent>> iterator = recentEventsCache.entrySet().iterator();
if (iterator.hasNext()) {
iterator.next();
iterator.remove();
}
}
// 添加新条目
String eventKey = generateEventKey(event);
recentEventsCache.put(eventKey, event);
}
/**
* 生成事件唯一标识
*/
private String generateEventKey(WorkPanelEvent event) {
StringBuilder key = new StringBuilder();
key.append(event.getType() != null ? event.getType() : "");
key.append("_");
// 获取工具名称(如果是ToolEvent)
String toolName = "";
if (event instanceof ToolEvent) {
toolName = ((ToolEvent) event).getToolName();
}
key.append(toolName != null ? toolName : "");
key.append("_");
// 获取思考类型(如果是ThoughtEvent)
String thinkingType = "";
if (event instanceof ThoughtEvent) {
thinkingType = ((ThoughtEvent) event).getThinkingType();
}
key.append(thinkingType != null ? thinkingType : "");
// 对于思考事件,添加内容摘要
if ("thought".equals(event.getType()) && event instanceof ThoughtEvent) {
ThoughtEvent thoughtEvent = (ThoughtEvent) event;
if (thoughtEvent.getContent() != null) {
// 取内容的前50个字符作为摘要
String contentSummary = thoughtEvent.getContent().length() > 50 ?
thoughtEvent.getContent().substring(0, 50) : thoughtEvent.getContent();
key.append("_").append(contentSummary.hashCode());
}
}
return key.toString();
}
/**
* 检查两个事件的内容是否相等
*/
private boolean isEventContentEqual(WorkPanelEvent event1, WorkPanelEvent event2) {
if (event1 == event2) {
return true;
}
if (event1 == null || event2 == null) {
return false;
}
// 比较基本字段
if (!java.util.Objects.equals(event1.getType(), event2.getType())) {
return false;
}
// 比较工具名称(如果是ToolEvent)
String toolName1 = "";
String toolName2 = "";
if (event1 instanceof ToolEvent) {
toolName1 = ((ToolEvent) event1).getToolName();
}
if (event2 instanceof ToolEvent) {
toolName2 = ((ToolEvent) event2).getToolName();
}
if (!java.util.Objects.equals(toolName1, toolName2)) {
return false;
}
// 比较思考类型(如果是ThoughtEvent)
String thinkingType1 = "";
String thinkingType2 = "";
if (event1 instanceof ThoughtEvent) {
thinkingType1 = ((ThoughtEvent) event1).getThinkingType();
}
if (event2 instanceof ThoughtEvent) {
thinkingType2 = ((ThoughtEvent) event2).getThinkingType();
}
if (!java.util.Objects.equals(thinkingType1, thinkingType2)) {
return false;
}
// 比较内容字段(根据不同事件类型)
if (event1 instanceof ThoughtEvent && event2 instanceof ThoughtEvent) {
ThoughtEvent thought1 = (ThoughtEvent) event1;
ThoughtEvent thought2 = (ThoughtEvent) event2;
return java.util.Objects.equals(thought1.getContent(), thought2.getContent());
}
// 比较工具输入(确保都是ToolEvent)
if (event1 instanceof ToolEvent && event2 instanceof ToolEvent) {
if (!java.util.Objects.equals(((ToolEvent) event1).getToolInput(),
((ToolEvent) event2).getToolInput())) {
return false;
}
// 比较工具输出
if (!java.util.Objects.equals(((ToolEvent) event1).getToolOutput(),
((ToolEvent) event2).getToolOutput())) {
return false;
}
} else if (event1 instanceof ResultEvent && event2 instanceof ResultEvent) {
// 比较结果内容
String content1 = ((ResultEvent) event1).getContent();
String content2 = ((ResultEvent) event2).getContent();
if (!java.util.Objects.equals(content1, content2)) {
return false;
}
}
return true;
}
/**
* 清理过期缓存
*/
private void cleanupExpiredCache() {
long currentTime = System.currentTimeMillis();
// 检查是否需要执行清理
if (currentTime - lastCleanupTime < CLEANUP_INTERVAL) {
return;
}
// 更新上次清理时间
lastCleanupTime = currentTime;
// 清理过期条目
recentEventsCache.entrySet().removeIf(entry -> {
WorkPanelEvent event = entry.getValue();
return event == null || (currentTime - event.getTimestamp()) > CACHE_EXPIRY_TIME;
});
}
/**
* 清空缓存
*/
public void clearCache() {
recentEventsCache.clear();
}
}
\ No newline at end of file
package pangea.hiagent.workpanel.event;
import org.springframework.stereotype.Component;
/**
* 事件类型转换工具类
* 统一处理事件类型转换逻辑,避免在多个地方重复实现
*/
@Component
public class EventTypeConverter {
/**
* 根据状态确定事件类型
*/
public String getEventTypeFromStatus(String status) {
if (status == null) {
return "tool_result";
}
switch (status.toLowerCase()) {
case "success":
return "tool_result";
case "error":
case "failure":
return "tool_error";
default:
return "tool_result";
}
}
/**
* 获取状态文本
*/
public String getStatusText(String status) {
if (status == null) {
return "未知";
}
switch (status.toLowerCase()) {
case "success": return "成功";
case "pending": return "处理中";
case "error": return "错误";
case "failure": return "失败";
default: return status;
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment