Commit 8bff979e authored by ligaowei's avatar ligaowei

feat: 重构用户ID获取逻辑并优化ReAct执行流程

重构UserUtils类,提供静态方法支持并优化线程安全
新增EventSplitter组件用于实时分割ReAct事件流
统一所有Controller和Service使用静态方法获取用户ID
移除冗余的SseEventBroadcaster组件,简化事件发送逻辑
更新.gitignore排除数据库文件
parent b230dbdc
......@@ -218,3 +218,5 @@ Thumbs.db
ehthumbs.db
Icon?
*.icon?
backend/data/hiagent_dev_db.trace.db
backend/data/hiagent_dev_db.mv.db
......@@ -3,6 +3,7 @@ package pangea.hiagent.agent.react;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.workpanel.IWorkPanelDataCollector;
/**
......@@ -15,6 +16,9 @@ public class DefaultReactCallback implements ReactCallback {
@Autowired
private IWorkPanelDataCollector workPanelCollector;
@Autowired
private UserSseService userSseService;
@Override
public void onStep(ReactStep reactStep) {
log.info("ReAct步骤触发: 类型={}, 内容摘要={}",
......@@ -32,7 +36,9 @@ public class DefaultReactCallback implements ReactCallback {
try {
switch (reactStep.getStepType()) {
case THOUGHT:
workPanelCollector.recordThinking(reactStep.getContent(), "thought");
// userSseService.sendWorkPanelEvent(reactStep.getContent(), "thought");
// workPanelCollector.recordThinking(reactStep.getContent(), "thought");
log.info("[WorkPanel] 记录思考步骤: {}",
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
break;
......
......@@ -5,7 +5,6 @@ import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.messages.*;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.service.ErrorHandlerService;
......@@ -13,7 +12,6 @@ import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.tool.impl.DateTimeTools;
import pangea.hiagent.common.utils.UserUtils;
import java.util.List;
import java.util.ArrayList;
......@@ -31,19 +29,21 @@ public class DefaultReactExecutor implements ReactExecutor {
private final List<ReactCallback> reactCallbacks = new ArrayList<>();
@Autowired
private DateTimeTools dateTimeTools;
private final EventSplitter eventSplitter;
@Autowired
private MemoryService memoryService;
@Autowired
private ErrorHandlerService errorHandlerService;
private final AgentToolManager agentToolManager;
public DefaultReactExecutor(AgentToolManager agentToolManager) {
public DefaultReactExecutor(EventSplitter eventSplitter, AgentToolManager agentToolManager ,
MemoryService memoryService, ErrorHandlerService errorHandlerService) {
this.eventSplitter = eventSplitter;
this.agentToolManager = agentToolManager;
this.memoryService = memoryService;
this.errorHandlerService = errorHandlerService;
}
@Override
......@@ -56,7 +56,7 @@ public class DefaultReactExecutor implements ReactExecutor {
@Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) {
// 调用带用户ID的方法,首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
return execute(chatClient, userInput, tools, agent, userId);
}
......@@ -117,7 +117,7 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
// 如果没有提供用户ID,则尝试获取当前用户ID
if (userId == null) {
userId = UserUtils.getCurrentUserId();
userId = UserUtils.getCurrentUserIdStatic();
}
String sessionId = memoryService.generateSessionId(agent, userId);
......@@ -142,7 +142,7 @@ public class DefaultReactExecutor implements ReactExecutor {
@Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) {
// 调用带用户ID的方法,但首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
executeStream(chatClient, userInput, tools, tokenConsumer, agent, userId);
}
......@@ -190,9 +190,12 @@ public class DefaultReactExecutor implements ReactExecutor {
if (tokenConsumer != null) {
tokenConsumer.accept(token);
}
eventSplitter.feedToken(token);
}
} catch (Exception e) {
log.error("处理token时发生错误", e);
errorHandlerService.handleReactFlowError(e, tokenConsumer);
}
}
......@@ -217,17 +220,6 @@ public class DefaultReactExecutor implements ReactExecutor {
}
}
/**
* 检查是否已经触发了Final Answer步骤
*
* @param fullResponse 完整响应内容
* @return 如果已经触发了Final Answer则返回true,否则返回false
*/
private boolean hasFinalAnswerBeenTriggered(String fullResponse) {
// 使用正则表达式进行高效的不区分大小写匹配
return fullResponse.matches("(?i).*(Final Answer:|Final_Answer:|最终答案:).*");
}
/**
* 将助手的回复保存到内存中
*
......@@ -336,11 +328,6 @@ public class DefaultReactExecutor implements ReactExecutor {
}
}
// 添加默认的日期时间工具(如果尚未添加)
if (dateTimeTools != null && !tools.contains(dateTimeTools)) {
tools.add(dateTimeTools);
}
return tools;
}
}
\ No newline at end of file
package pangea.hiagent.agent.react;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.stereotype.Component;
@Component
public class EventSplitter {
private final List<String> keywords = Arrays.asList(
"Thought", "Action", "Observation", "Iteration_Decision", "Final_Answer"
);
private final Pattern keywordPattern = Pattern.compile(
String.format("(%s):", String.join("|", keywords))
);
private String currentType = null;
private StringBuilder currentContent = new StringBuilder();
private StringBuilder buffer = new StringBuilder();
private final ReactCallback callback;
private volatile int stepNumber = 0;
public EventSplitter(ReactCallback callback) {
this.callback = callback;
}
// 每收到一个token/字符,调用此方法
public void feedToken(String token) {
buffer.append(token);
Matcher matcher = keywordPattern.matcher(buffer);
if (matcher.find()) {
// 发现新事件
if (currentType != null && currentContent.length() > 0) {
// 实时输出已分割事件
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
// 更新事件类型
currentType = matcher.group(1);
currentContent.setLength(0);
// 移除关键词和冒号
buffer.delete(0, matcher.end());
}
// 累积内容
currentContent.append(buffer);
buffer.setLength(0);
}
// 流式结束时,调用此方法输出最后一个事件
public void endStream() {
if (currentType != null && currentContent.length() > 0) {
callback.onStep(new ReactStep(stepNumber++, ReactStepType.fromString(currentType), currentContent.toString()));
}
}
}
......@@ -22,5 +22,9 @@ public enum ReactStepType {
/**
* 最终答案步骤:结合工具结果生成最终回答
*/
FINAL_ANSWER
FINAL_ANSWER;
public static ReactStepType fromString(String currentType) {
return ReactStepType.valueOf(currentType.toUpperCase());
}
}
\ No newline at end of file
......@@ -84,11 +84,11 @@ public class AgentChatService {
log.info("开始处理流式对话请求,AgentId: {}, 用户消息: {}", agentId, chatRequest.getMessage());
// 尝试获取当前用户ID,优先从SecurityContext获取,其次从请求中解析JWT
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
// 如果在主线程中未能获取到用户ID,尝试在异步环境中获取
// 如果在主线程中未能获取到用户ID,再次尝试获取(支持异步环境)
if (userId == null) {
userId = UserUtils.getCurrentUserIdInAsync();
userId = UserUtils.getCurrentUserIdStatic();
}
if (userId == null) {
......
package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.ToolEvent;
import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.workpanel.data.TokenEventDataBuilder;
......@@ -587,30 +589,6 @@ public class UserSseService {
}
}
/**
* 发送工作面板事件给指定用户
*
* @param userId 用户ID
* @param event 工作面板事件
*/
public void sendWorkPanelEventToUser(String userId, WorkPanelEvent event) {
log.debug("开始向用户 {} 发送工作面板事件: {}", userId, event.getType());
// 检查连接是否仍然有效
SseEmitter emitter = getSession(userId);
if (emitter != null) {
try {
// 直接向当前 emitter 发送事件
sendWorkPanelEvent(emitter, event);
log.debug("已发送工作面板事件到客户端: {}", event.getType());
} catch (IOException e) {
log.error("发送工作面板事件失败: {}", e.getMessage(), e);
}
} else {
log.debug("连接已失效,跳过发送事件: {}", event.getType());
}
}
/**
* 发送连接成功事件
*
......
......@@ -112,9 +112,9 @@ public class MetaObjectHandlerConfig implements MetaObjectHandler {
*/
private String getCurrentUserIdWithContext() {
try {
// 直接调用UserUtils.getCurrentUserId(),该方法已经包含了所有获取用户ID的方式
// 直接调用UserUtils.getCurrentUserIdStatic(),该方法已经包含了所有获取用户ID的方式
// 并且优先从ThreadLocal获取,支持异步线程
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId != null) {
log.debug("成功获取用户ID: {}", userId);
return userId;
......
......@@ -99,7 +99,7 @@ public class AsyncUserContextDecorator {
// 捕获当前线程的用户上下文
UserContextHolder userContext = captureUserContext();
// 同时捕获当前线程的用户ID(用于ThreadLocal传播)
String currentUserId = UserUtils.getCurrentUserId();
String currentUserId = UserUtils.getCurrentUserIdStatic();
return () -> {
try {
......@@ -107,7 +107,7 @@ public class AsyncUserContextDecorator {
propagateUserContext(userContext);
// 将用户ID设置到ThreadLocal中,增强可靠性
if (currentUserId != null) {
UserUtils.setCurrentUserId(currentUserId);
UserUtils.setCurrentUserIdStatic(currentUserId);
}
// 执行原始任务
......@@ -116,7 +116,7 @@ public class AsyncUserContextDecorator {
// 清理当前线程的用户上下文
clearUserContext();
// 清理ThreadLocal中的用户ID
UserUtils.clearCurrentUserId();
UserUtils.clearCurrentUserIdStatic();
}
};
}
......@@ -131,7 +131,7 @@ public class AsyncUserContextDecorator {
// 捕获当前线程的用户上下文
UserContextHolder userContext = captureUserContext();
// 同时捕获当前线程的用户ID(用于ThreadLocal传播)
String currentUserId = UserUtils.getCurrentUserId();
String currentUserId = UserUtils.getCurrentUserIdStatic();
return () -> {
try {
......@@ -139,7 +139,7 @@ public class AsyncUserContextDecorator {
propagateUserContext(userContext);
// 将用户ID设置到ThreadLocal中,增强可靠性
if (currentUserId != null) {
UserUtils.setCurrentUserId(currentUserId);
UserUtils.setCurrentUserIdStatic(currentUserId);
}
// 执行原始任务
......@@ -148,7 +148,7 @@ public class AsyncUserContextDecorator {
// 清理当前线程的用户上下文
clearUserContext();
// 清理ThreadLocal中的用户ID
UserUtils.clearCurrentUserId();
UserUtils.clearCurrentUserIdStatic();
}
};
}
......
......@@ -59,7 +59,7 @@ public class MemoryService {
* @return 用户ID
*/
private String getCurrentUserId() {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("无法通过UserUtils获取当前用户ID");
}
......
......@@ -12,6 +12,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.common.utils.UserUtils;
import java.io.IOException;
import java.util.Collections;
......@@ -28,8 +29,11 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
private final JwtUtil jwtUtil;
public JwtAuthenticationFilter(JwtUtil jwtUtil) {
private final UserUtils userUtils;
public JwtAuthenticationFilter(JwtUtil jwtUtil, UserUtils userUtils) {
this.jwtUtil = jwtUtil;
this.userUtils = userUtils;
}
@Override
......@@ -53,6 +57,8 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
var authorities = Collections.singletonList(new SimpleGrantedAuthority("ROLE_USER"));
var authentication = new UsernamePasswordAuthenticationToken(userId, null, authorities);
SecurityContextHolder.getContext().setAuthentication(authentication);
userUtils.setCurrentUserId(userId);
}
}
}
......
......@@ -2,7 +2,10 @@ package pangea.hiagent.tool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import pangea.hiagent.workpanel.event.SseEventBroadcaster;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.dto.WorkPanelEvent;
import java.util.HashMap;
......@@ -16,7 +19,10 @@ import java.util.Map;
public abstract class BaseTool {
@Autowired
private SseEventBroadcaster sseEventBroadcaster;
private UserSseService userSseService;
@Autowired
private UserUtils userUtils;
/**
* 工具执行包装方法
......@@ -31,8 +37,11 @@ public abstract class BaseTool {
String toolName = this.getClass().getSimpleName();
long startTime = System.currentTimeMillis();
// 在方法开始时获取用户ID,此时线程通常是原始请求线程,能够正确获取
String userId = userUtils.getCurrentUserId();
// 1. 发送工具开始执行事件
sendToolEvent(toolName, methodName, params, null, "执行中", startTime, null);
sendToolEvent(toolName, methodName, params, null, "执行中", startTime, null,null, userId);
T result = null;
String status = "成功";
......@@ -51,7 +60,7 @@ public abstract class BaseTool {
long duration = endTime - startTime;
// 3. 发送工具执行完成事件
sendToolEvent(toolName, methodName, params, result, status, startTime, duration, exception);
sendToolEvent(toolName, methodName, params, result, status, startTime, duration, exception, userId);
}
return result;
......@@ -78,10 +87,11 @@ public abstract class BaseTool {
* @param startTime 开始时间戳
* @param duration 执行耗时(毫秒)
* @param exception 异常信息(可选)
* @param userId 用户ID,从方法开始时传递
*/
private void sendToolEvent(String toolName, String methodName,
Map<String, Object> params, Object result, String status,
Long startTime, Long duration, Exception... exception) {
Long startTime, Long duration, Exception exception, String userId) {
try {
Map<String, Object> eventData = new HashMap<>();
eventData.put("toolName", toolName);
......@@ -92,9 +102,9 @@ public abstract class BaseTool {
eventData.put("startTime", startTime);
eventData.put("duration", duration);
if (exception != null && exception.length > 0 && exception[0] != null) {
eventData.put("error", exception[0].getMessage());
eventData.put("errorType", exception[0].getClass().getSimpleName());
if (exception != null) {
eventData.put("error", exception.getMessage());
eventData.put("errorType", exception.getClass().getSimpleName());
}
WorkPanelEvent event = WorkPanelEvent.builder()
......@@ -102,9 +112,16 @@ public abstract class BaseTool {
.title(toolName + "." + methodName)
.timestamp(System.currentTimeMillis())
.metadata(eventData)
.userId(userId)
.build();
sseEventBroadcaster.broadcastWorkPanelEvent(event);
// 获取用户的SSE发射器
SseEmitter emitter = userSseService.getSession(userId);
if (emitter != null) {
userSseService.sendWorkPanelEvent(emitter, event);
} else {
log.debug("未找到用户 {} 的SSE连接,跳过发送事件", userId);
}
log.debug("已发送工具事件: {}#{}, 状态: {}", toolName, methodName, status);
} catch (Exception e) {
......
......@@ -44,7 +44,7 @@ public class AgentController {
@PostMapping
public ApiResponse<Agent> createAgent(@RequestBody Agent agent) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
return ApiResponse.error(4001, "用户未认证");
}
......@@ -67,7 +67,7 @@ public class AgentController {
@PostMapping("/with-tools")
public ApiResponse<Agent> createAgentWithTools(@RequestBody AgentWithToolsDTO agentWithToolsDTO) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
return ApiResponse.error(4001, "用户未认证");
}
......@@ -109,7 +109,7 @@ public class AgentController {
@PreAuthorize("@permissionEvaluator.hasPermission(authentication, #id, 'Agent', 'write')")
@PutMapping("/{id}")
public ApiResponse<Agent> updateAgent(@PathVariable(name = "id") String id, @RequestBody Agent agent) {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法更新Agent: {}", id);
return ApiResponse.error(4001, "用户未认证");
......@@ -163,7 +163,7 @@ public class AgentController {
@PreAuthorize("@permissionEvaluator.hasPermission(authentication, #id, 'Agent', 'write')")
@PutMapping("/{id}/with-tools")
public ApiResponse<Agent> updateAgentWithTools(@PathVariable(name = "id") String id, @RequestBody AgentWithToolsDTO agentWithToolsDTO) {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法更新Agent: {}", id);
return ApiResponse.error(4001, "用户未认证");
......@@ -238,7 +238,7 @@ public class AgentController {
@DeleteMapping("/{id}")
public ApiResponse<Void> deleteAgent(@PathVariable(name = "id") String id) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
log.info("用户 {} 开始删除Agent: {}", userId, id);
agentService.deleteAgent(id);
log.info("用户 {} 成功删除Agent: {}", userId, id);
......@@ -292,7 +292,7 @@ public class AgentController {
@PreAuthorize("isAuthenticated()")
public ApiResponse<java.util.List<Agent>> getUserAgents() {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
return ApiResponse.error(4001, "用户未认证");
}
......
......@@ -40,7 +40,7 @@ public class MemoryController {
@GetMapping("/dialogue")
public ApiResponse<List<Map<String, Object>>> getDialogueMemories() {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法获取对话记忆列表");
return ApiResponse.error(401, "用户未认证");
......@@ -82,7 +82,7 @@ public class MemoryController {
@GetMapping("/knowledge")
public ApiResponse<List<Map<String, Object>>> getKnowledgeMemories() {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法获取知识记忆列表");
return ApiResponse.error(401, "用户未认证");
......@@ -110,7 +110,7 @@ public class MemoryController {
@GetMapping("/dialogue/agent/{agentId}")
public ApiResponse<Map<String, Object>> getDialogueMemoryDetail(@PathVariable String agentId) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法获取对话记忆详情");
return ApiResponse.error(401, "用户未认证");
......@@ -190,7 +190,7 @@ public class MemoryController {
@DeleteMapping("/dialogue/{sessionId}")
public ApiResponse<Void> clearDialogueMemory(@PathVariable String sessionId) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法清空对话记忆");
return ApiResponse.error(401, "用户未认证");
......@@ -223,7 +223,7 @@ public class MemoryController {
@DeleteMapping("/knowledge/{id}")
public ApiResponse<Void> deleteKnowledgeMemory(@PathVariable String id) {
try {
String userId = UserUtils.getCurrentUserId();
String userId = UserUtils.getCurrentUserIdStatic();
if (userId == null) {
log.warn("用户未认证,无法删除知识记忆");
return ApiResponse.error(401, "用户未认证");
......
......@@ -258,7 +258,7 @@ public class TimerController {
* 获取当前认证用户ID
*/
private String getCurrentUserId() {
return UserUtils.getCurrentUserId();
return UserUtils.getCurrentUserIdStatic();
}
/**
......
......@@ -39,7 +39,7 @@ public class ToolController {
* @return 用户ID
*/
private String getCurrentUserId() {
return UserUtils.getCurrentUserId();
return UserUtils.getCurrentUserIdStatic();
}
/**
......
......@@ -38,4 +38,9 @@ public class WorkPanelEvent implements Serializable {
* 元数据
*/
private Map<String, Object> metadata;
/**
* 触发事件的用户ID
*/
private String userId;
}
\ No newline at end of file
......@@ -145,7 +145,7 @@ public class AgentService {
}
// 验证用户权限(确保用户是所有者)
String currentUserId = UserUtils.getCurrentUserId();
String currentUserId = UserUtils.getCurrentUserIdStatic();
if (currentUserId == null) {
log.warn("用户未认证,无法更新Agent: {}", agent.getId());
throw new BusinessException(ErrorCode.UNAUTHORIZED.getCode(), "用户未认证");
......
......@@ -89,7 +89,7 @@ public class WebSocketConnectionManager {
String userId = (String) session.getAttributes().get("userId");
if (userId == null || userId.isEmpty()) {
// 如果没有有效的用户ID,尝试从SecurityContext获取
userId = UserUtils.getCurrentUserId();
userId = UserUtils.getCurrentUserIdStatic();
if (userId == null || userId.isEmpty()) {
// 如果仍然无法获取用户ID,使用默认值
userId = "unknown-user";
......
package pangea.hiagent.workpanel.event;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.web.dto.ToolEvent;
import pangea.hiagent.web.dto.WorkPanelEvent;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
* SSE事件广播器
* 专门负责广播事件给所有订阅者
*/
@Slf4j
@Component
public class SseEventBroadcaster {
@Autowired
private UserSseService unifiedSseService;
@Autowired
private EventService eventService;
/**
* 广播工作面板事件给所有订阅者
*
* @param event 工作面板事件
*/
public void broadcastWorkPanelEvent(WorkPanelEvent event) {
if (event == null) {
log.warn("广播事件时接收到null事件");
return;
}
try {
// 预构建事件数据,避免重复构建
Map<String, Object> eventData = eventService.buildWorkPanelEventData(event);
try {
// 获取所有emitter并广播
List<SseEmitter> emitters = unifiedSseService.getEmitters();
int successCount = 0;
int failureCount = 0;
// 使用CopyOnWriteArrayList避免并发修改异常
for (SseEmitter emitter : new CopyOnWriteArrayList<>(emitters)) {
try {
// 检查emitter是否仍然有效
if (unifiedSseService.isEmitterValid(emitter)) {
emitter.send(SseEmitter.event().name("message").data(eventData));
successCount++;
} else {
// 移除无效的emitter
log.debug("移除无效的SSE连接");
unifiedSseService.removeEmitter(emitter);
failureCount++;
}
} catch (IOException e) {
log.error("发送事件失败,移除失效连接: {}", e.getMessage());
unifiedSseService.removeEmitter(emitter);
failureCount++;
} catch (IllegalStateException e) {
log.debug("Emitter已关闭,移除连接: {}", e.getMessage());
unifiedSseService.removeEmitter(emitter);
failureCount++;
} catch (Exception e) {
log.error("发送事件时发生未知异常,移除连接: {}", e.getMessage(), e);
unifiedSseService.removeEmitter(emitter);
failureCount++;
}
}
if (failureCount > 0) {
log.warn("事件广播部分失败: 成功={}, 失败={}", successCount, failureCount);
}
// 记录对象池使用统计信息(每100次广播记录一次)
if ((successCount + failureCount) % 100 == 0) {
log.debug("对象池使用统计: {}", eventService.getMapPoolStatistics());
}
} finally {
// 确保eventData被归还到对象池
eventService.releaseMap(eventData);
}
} catch (Exception e) {
String toolName = null;
if (event instanceof ToolEvent) {
toolName = ((ToolEvent) event).getToolName();
}
log.error("广播事件失败: 事件类型={}, 工具={}, 错误信息={}",
event.getType(),
toolName,
e.getMessage(),
e);
}
}
}
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment