Commit 5538d955 authored by ligaowei's avatar ligaowei

refactor(SSE): 简化事件发送逻辑并移除冗余代码

重构 UserSseService 的事件发送逻辑,将 sendWorkPanelEvent 方法改为直接接收事件对象
移除 DefaultReactCallback 中冗余的工作面板记录逻辑,统一通过 SSE 发送事件
优化事件数据构建过程,减少不必要的检查和重复代码
parent 0bd8e66f
...@@ -13,9 +13,6 @@ import pangea.hiagent.workpanel.IWorkPanelDataCollector; ...@@ -13,9 +13,6 @@ import pangea.hiagent.workpanel.IWorkPanelDataCollector;
@Component @Component
public class DefaultReactCallback implements ReactCallback { public class DefaultReactCallback implements ReactCallback {
@Autowired
private IWorkPanelDataCollector workPanelCollector;
@Autowired @Autowired
private UserSseService userSseService; private UserSseService userSseService;
...@@ -25,98 +22,18 @@ public class DefaultReactCallback implements ReactCallback { ...@@ -25,98 +22,18 @@ public class DefaultReactCallback implements ReactCallback {
reactStep.getStepType(), reactStep.getStepType(),
reactStep.getContent() != null ? reactStep.getContent() != null ?
reactStep.getContent().substring(0, Math.min(50, reactStep.getContent().length())) : "null"); reactStep.getContent().substring(0, Math.min(50, reactStep.getContent().length())) : "null");
recordReactStepToWorkPanel(reactStep);
}
private void recordReactStepToWorkPanel(ReactStep reactStep) {
if (workPanelCollector == null) {
return;
}
try { String reactStepName = reactStep.getStepType().name();
switch (reactStep.getStepType()) {
case THOUGHT:
// userSseService.sendWorkPanelEvent(reactStep.getContent(), "thought"); userSseService.sendWorkPanelEvent(WorkPanelEvent.builder()
// workPanelCollector.recordThinking(reactStep.getContent(), "thought"); .eventType(reactStepName)
log.info("[WorkPanel] 记录思考步骤: {}", .content(reactStep.getContent())
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length()))); .userId(reactStep.getUserId())
break; .build());
case ACTION:
if (reactStep.getAction() != null) {
// 记录工具调用动作
String toolName = reactStep.getAction().getToolName();
Object parameters = reactStep.getAction().getToolArgs();
// 记录工具调用,初始状态为pending
workPanelCollector.recordToolCallAction(
toolName,
parameters,
null, // 结果为空
"pending", // 状态为pending
null // 错误信息为空
);
// 同时记录工具调用信息到日志
log.info("[WorkPanel] 记录工具调用: 工具={} 参数={}", toolName, parameters);
} else {
// 如果没有具体的工具信息,记录为一般动作
workPanelCollector.recordThinking(reactStep.getContent(), "action");
log.info("[WorkPanel] 记录动作步骤: {}",
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
}
break;
case OBSERVATION:
if (reactStep.getObservation() != null) {
// 检查是否有对应的动作信息
if (reactStep.getAction() != null) {
// 使用动作信息更新工具调用结果
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getToolArgs(),
reactStep.getObservation().getResult(),
"success", // 状态为success
null // 无错误信息
);
log.info("[WorkPanel] 更新工具调用结果: 工具={} 结果摘要={}",
reactStep.getAction().getToolName(),
reactStep.getObservation().getResult().substring(0, Math.min(50, reactStep.getObservation().getResult().length())));
} else {
// 如果没有动作信息,记录为观察结果
workPanelCollector.recordThinking(reactStep.getContent(), "observation");
log.info("[WorkPanel] 记录观察步骤: {}",
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
}
}
break;
case FINAL_ANSWER:
workPanelCollector.recordFinalAnswer(reactStep.getContent());
// 记录最终答案到日志 // 记录最终答案到日志
log.info("[WorkPanel] 记录最终答案: {}", log.info("[WorkPanel] 记录{} {}", reactStepName,
reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length()))); reactStep.getContent().substring(0, Math.min(100, reactStep.getContent().length())));
break;
default:
log.warn("未知的ReAct步骤类型: {}", reactStep.getStepType());
break;
}
} catch (Exception e) {
log.error("记录ReAct步骤到工作面板失败", e);
// 即使发生异常,也尝试记录错误信息到工作面板
try {
if (reactStep != null && reactStep.getAction() != null) {
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getToolArgs(),
"记录失败: " + e.getMessage(),
"error",
System.currentTimeMillis() // 使用当前时间戳作为执行时间
);
}
} catch (Exception ex) {
log.error("记录错误信息到工作面板也失败", ex);
}
}
} }
} }
\ No newline at end of file
...@@ -5,13 +5,13 @@ import pangea.hiagent.web.dto.WorkPanelEvent; ...@@ -5,13 +5,13 @@ import pangea.hiagent.web.dto.WorkPanelEvent;
import pangea.hiagent.workpanel.event.EventService; import pangea.hiagent.workpanel.event.EventService;
import pangea.hiagent.workpanel.data.TokenEventDataBuilder; import pangea.hiagent.workpanel.data.TokenEventDataBuilder;
import pangea.hiagent.workpanel.data.ErrorEventDataBuilder; import pangea.hiagent.workpanel.data.ErrorEventDataBuilder;
import java.util.Set;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -436,25 +436,7 @@ public class UserSseService { ...@@ -436,25 +436,7 @@ public class UserSseService {
} }
// 检查是否已经完成,而不发送任何事件 // 检查是否已经完成,而不发送任何事件
if (isEmitterCompleted(emitter)) { return !isEmitterCompleted(emitter);
return false;
}
// 检查emitter是否在活动列表中
if (!emitters.contains(emitter)) {
// 如果不在活动列表中,将其标记为已完成
completedEmitters.add(emitter);
return false;
}
// 检查是否有用户ID关联
if (emitterUsers.get(emitter) == null) {
// 如果没有用户ID关联,可能是一个孤立的连接
log.debug("SSE emitter没有关联的用户ID,可能是孤立连接");
// 不立即标记为已完成,因为可能是新创建的连接
}
return true;
} }
/** /**
...@@ -510,21 +492,11 @@ public class UserSseService { ...@@ -510,21 +492,11 @@ public class UserSseService {
emitter.send(SseEmitter.event().name(eventName).data(data)); emitter.send(SseEmitter.event().name(eventName).data(data));
return true; return true;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况,这通常是由于客户端断开连接 // 处理 emitter 已关闭的情况
log.debug("无法发送{}事件,emitter已关闭: {}", eventName, e.getMessage()); log.debug("无法发送{}事件,emitter已关闭: {}", eventName, e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false;
} catch (IOException e) {
// 处理IO异常,这通常是由于客户端断开连接或网络问题
log.debug("无法发送{}事件,IO异常: {}", eventName, e.getMessage());
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false; return false;
} catch (Exception e) { } catch (Exception e) {
log.error("发送{}事件失败: {}", eventName, e.getMessage(), e); log.error("发送{}事件失败: {}", eventName, e.getMessage(), e);
// 将emitter标记为已完成,避免后续再次尝试发送
removeEmitter(emitter);
return false; return false;
} }
} }
...@@ -572,7 +544,7 @@ public class UserSseService { ...@@ -572,7 +544,7 @@ public class UserSseService {
* @param event 工作面板事件 * @param event 工作面板事件
* @throws IOException IO异常 * @throws IOException IO异常
*/ */
public void sendWorkPanelEvent(SseEmitter emitter, WorkPanelEvent event) throws IOException { public void sendWorkPanelEvent(WorkPanelEvent event) throws IOException {
if (event == null) { if (event == null) {
log.warn("工作面板事件为空,无法发送事件"); log.warn("工作面板事件为空,无法发送事件");
return; return;
...@@ -580,12 +552,19 @@ public class UserSseService { ...@@ -580,12 +552,19 @@ public class UserSseService {
try { try {
// 构建事件数据 // 构建事件数据
Map<String, Object> data = eventService.buildWorkPanelEventData(event); Map<String, Object> data = buildWorkPanelEventData(event);
if (data != null) { if (data != null) {
log.debug("准备发送工作面板事件: 类型={}, 事件内容={}", event.getType(), event); log.debug("准备发送工作面板事件: 类型={}, 事件内容={}", event.getType(), event);
log.debug("事件数据: {}", data); log.debug("事件数据: {}", data);
SseEmitter emitter = getSession(event.getUserId());
if (emitter == null) {
log.debug("未找到用户 {} 的SSE连接,跳过发送事件", event.getUserId());
return;
}
// 发送事件 // 发送事件
emitter.send(SseEmitter.event().name("message").data(data)); emitter.send(SseEmitter.event().name("message").data(data));
...@@ -605,6 +584,28 @@ public class UserSseService { ...@@ -605,6 +584,28 @@ public class UserSseService {
} }
} }
/**
* 构建工作面板事件数据
*
* @param event 工作面板事件
* @return 事件数据
*/
private Map<String, Object> buildWorkPanelEventData(WorkPanelEvent event) {
if (event == null) {
return null;
}
// 从对象池获取Map,避免频繁创建对象
Map<String, Object> data =event.getMetadata();
// 设置基础属性
data.put("type", event.getType());
data.put("eventType", event.getType());
data.put("timestamp", event.getTimestamp());
return data;
}
/** /**
* 发送连接成功事件 * 发送连接成功事件
* *
...@@ -623,7 +624,7 @@ public class UserSseService { ...@@ -623,7 +624,7 @@ public class UserSseService {
.timestamp(System.currentTimeMillis()) .timestamp(System.currentTimeMillis())
.build(); .build();
Map<String, Object> data = eventService.buildWorkPanelEventData(connectedEvent); Map<String, Object> data = buildWorkPanelEventData(connectedEvent);
boolean success = safeSendEvent(emitter, "message", data); boolean success = safeSendEvent(emitter, "message", data);
if (success) { if (success) {
......
...@@ -116,12 +116,8 @@ public abstract class BaseTool { ...@@ -116,12 +116,8 @@ public abstract class BaseTool {
.build(); .build();
// 获取用户的SSE发射器 // 获取用户的SSE发射器
SseEmitter emitter = userSseService.getSession(userId); userSseService.sendWorkPanelEvent(event, userId);
if (emitter != null) {
userSseService.sendWorkPanelEvent(emitter, event);
} else {
log.debug("未找到用户 {} 的SSE连接,跳过发送事件", userId);
}
log.debug("已发送工具事件: {}#{}, 状态: {}", toolName, methodName, status); log.debug("已发送工具事件: {}#{}, 状态: {}", toolName, methodName, status);
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment