Commit 12089728 authored by youxiaoji's avatar youxiaoji

* [区分智能体,非访客预约智能体暂不添加toolcontext]

parent 3f92e64d
...@@ -34,74 +34,75 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -34,74 +34,75 @@ public class DefaultReactExecutor implements ReactExecutor {
private final UserSseService userSseService; private final UserSseService userSseService;
@Value("${hiagent.react.system-prompt}") @Value("${hiagent.react.system-prompt}")
private String defaultSystemPrompt; private String defaultSystemPrompt;
private final List<ReactCallback> reactCallbacks = new ArrayList<>(); private final List<ReactCallback> reactCallbacks = new ArrayList<>();
private final EventSplitter eventSplitter; private final EventSplitter eventSplitter;
private MemoryService memoryService; private MemoryService memoryService;
private ErrorHandlerService errorHandlerService; private ErrorHandlerService errorHandlerService;
private final AgentToolManager agentToolManager; private final AgentToolManager agentToolManager;
private final UserTokenService userTokenService; private final UserTokenService userTokenService;
public DefaultReactExecutor(EventSplitter eventSplitter, AgentToolManager agentToolManager ,
public DefaultReactExecutor(EventSplitter eventSplitter, AgentToolManager agentToolManager,
MemoryService memoryService, ErrorHandlerService errorHandlerService, UserSseService userSseService, UserTokenService userTokenService) { MemoryService memoryService, ErrorHandlerService errorHandlerService, UserSseService userSseService, UserTokenService userTokenService) {
this.eventSplitter = eventSplitter; this.eventSplitter = eventSplitter;
this.agentToolManager = agentToolManager; this.agentToolManager = agentToolManager;
this.memoryService = memoryService; this.memoryService = memoryService;
this.errorHandlerService = errorHandlerService; this.errorHandlerService = errorHandlerService;
this.userSseService = userSseService; this.userSseService = userSseService;
this.userTokenService = userTokenService; this.userTokenService = userTokenService;
} }
@Override @Override
public void addReactCallback(ReactCallback callback) { public void addReactCallback(ReactCallback callback) {
if (callback != null) { if (callback != null) {
reactCallbacks.add(callback); reactCallbacks.add(callback);
} }
} }
@Override @Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) { public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent) {
// 调用带用户ID的方法,首先尝试获取当前用户ID // 调用带用户ID的方法,首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserIdStatic(); String userId = UserUtils.getCurrentUserIdStatic();
return execute(chatClient, userInput, tools, agent, userId); return execute(chatClient, userInput, tools, agent, userId);
} }
@Override @Override
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent, String userId) { public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent, String userId) {
log.info("开始执行ReAct流程,用户输入: {}", userInput); log.info("开始执行ReAct流程,用户输入: {}", userInput);
List<Object> agentTools = getAgentTools(agent); List<Object> agentTools = getAgentTools(agent);
try { try {
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId); Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
ChatResponse response = chatClient.prompt(prompt) ChatResponse response = chatClient.prompt(prompt)
.tools(agentTools.toArray()) .tools(agentTools.toArray())
.call() .call()
.chatResponse(); .chatResponse();
String responseText = response.getResult().getOutput().getText(); String responseText = response.getResult().getOutput().getText();
log.info("最终答案: {}", responseText); log.info("最终答案: {}", responseText);
// 保存助手回复到内存,使用提供的用户ID // 保存助手回复到内存,使用提供的用户ID
saveAssistantResponseToMemory(agent, responseText, userId); saveAssistantResponseToMemory(agent, responseText, userId);
return responseText; return responseText;
} catch (Exception e) { } catch (Exception e) {
log.error("执行ReAct流程时发生错误", e); log.error("执行ReAct流程时发生错误", e);
return handleReActError(e); return handleReActError(e);
} }
} }
/** /**
* 处理ReAct执行过程中发生的错误 * 处理ReAct执行过程中发生的错误
* *
* @param e 发生的异常 * @param e 发生的异常
* @return 错误处理结果 * @return 错误处理结果
*/ */
...@@ -109,21 +110,21 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -109,21 +110,21 @@ public class DefaultReactExecutor implements ReactExecutor {
log.error("ReAct执行过程中发生错误", e); log.error("ReAct执行过程中发生错误", e);
return errorHandlerService.handleSyncError(e, "处理ReAct请求时发生错误"); return errorHandlerService.handleSyncError(e, "处理ReAct请求时发生错误");
} }
/** /**
* 构建带有历史记录的提示词 * 构建带有历史记录的提示词
* *
* @param systemPrompt 系统提示词 * @param systemPrompt 系统提示词
* @param userInput 用户输入 * @param userInput 用户输入
* @param agent 智能体对象 * @param agent 智能体对象
* @param userId 用户ID(可选,如果为null则自动获取) * @param userId 用户ID(可选,如果为null则自动获取)
* @return 构建好的提示词对象 * @return 构建好的提示词对象
*/ */
private Prompt buildPromptWithHistory(String systemPrompt, String userInput, Agent agent, String userId) { private Prompt buildPromptWithHistory(String systemPrompt, String userInput, Agent agent, String userId) {
List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>(); List<org.springframework.ai.chat.messages.Message> messages = new ArrayList<>();
messages.add(new SystemMessage(systemPrompt)); messages.add(new SystemMessage(systemPrompt));
if (agent != null) { if (agent != null) {
try { try {
// 如果没有提供用户ID,则尝试获取当前用户ID // 如果没有提供用户ID,则尝试获取当前用户ID
...@@ -131,80 +132,93 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -131,80 +132,93 @@ public class DefaultReactExecutor implements ReactExecutor {
userId = UserUtils.getCurrentUserIdStatic(); userId = UserUtils.getCurrentUserIdStatic();
} }
String sessionId = memoryService.generateSessionId(agent, userId); String sessionId = memoryService.generateSessionId(agent, userId);
int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10; int historyLength = agent.getHistoryLength() != null ? agent.getHistoryLength() : 10;
List<org.springframework.ai.chat.messages.Message> historyMessages = List<org.springframework.ai.chat.messages.Message> historyMessages =
memoryService.getHistoryMessages(sessionId, historyLength); memoryService.getHistoryMessages(sessionId, historyLength);
messages.addAll(historyMessages); messages.addAll(historyMessages);
memoryService.addUserMessageToMemory(sessionId, userInput); memoryService.addUserMessageToMemory(sessionId, userInput);
} catch (Exception e) { } catch (Exception e) {
log.warn("获取历史对话记录时发生错误: {}", e.getMessage()); log.warn("获取历史对话记录时发生错误: {}", e.getMessage());
} }
} }
messages.add(new UserMessage(userInput)); messages.add(new UserMessage(userInput));
return new Prompt(messages); return new Prompt(messages);
} }
@Override @Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) { public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent) {
// 调用带用户ID的方法,但首先尝试获取当前用户ID // 调用带用户ID的方法,但首先尝试获取当前用户ID
String userId = UserUtils.getCurrentUserIdStatic(); String userId = UserUtils.getCurrentUserIdStatic();
executeStream(chatClient, userInput, tools, tokenConsumer, agent, userId); executeStream(chatClient, userInput, tools, tokenConsumer, agent, userId);
} }
@Override @Override
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId) { public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId) {
log.info("使用stream()方法处理ReAct流程,支持真正的流式输出"); log.info("使用stream()方法处理ReAct流程,支持真正的流式输出");
List<Object> agentTools = getAgentTools(agent); List<Object> agentTools = getAgentTools(agent);
StringBuilder fullResponse = new StringBuilder(); StringBuilder fullResponse = new StringBuilder();
try { try {
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId); Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
SseTokenEmitter sseTokenEmitter = (SseTokenEmitter)tokenConsumer; SseTokenEmitter sseTokenEmitter = (SseTokenEmitter) tokenConsumer;
String emitterId = sseTokenEmitter.getEmitterId(); String emitterId = sseTokenEmitter.getEmitterId();
chatClient.prompt(prompt) if (agent.getId().contains("agent-8")) {
.tools(agentTools.toArray()) chatClient.prompt(prompt)
.toolContext(Map.of("emitterId", emitterId, "userId", sseTokenEmitter.getUserId())) .tools(agentTools.toArray())
.stream() .toolContext(Map.of("emitterId", emitterId, "userId", sseTokenEmitter.getUserId()))
.chatResponse() .stream()
.subscribe( .chatResponse()
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse), .subscribe(
throwable -> handleStreamError(throwable, tokenConsumer,emitterId), chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId,emitterId) throwable -> handleStreamError(throwable, tokenConsumer, emitterId),
); () -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId, emitterId)
);
} catch (Exception e) { } else {
chatClient.prompt(prompt)
.tools(agentTools.toArray())
.stream()
.chatResponse()
.subscribe(
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
throwable -> handleStreamError(throwable, tokenConsumer, emitterId),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId, emitterId)
);
}
} catch (Exception e) {
log.error("流式执行ReAct流程时发生错误", e); log.error("流式执行ReAct流程时发生错误", e);
errorHandlerService.handleReactFlowError(e, tokenConsumer); errorHandlerService.handleReactFlowError(e, tokenConsumer);
} }
} }
/** /**
* 处理流式响应中的单个token * 处理流式响应中的单个token
* *
* @param chatResponse 聊天响应对象 * @param chatResponse 聊天响应对象
* @param tokenConsumer token消费者 * @param tokenConsumer token消费者
* @param fullResponse 完整响应构建器 * @param fullResponse 完整响应构建器
*/ */
private void handleTokenResponse(org.springframework.ai.chat.model.ChatResponse chatResponse, Consumer<String> tokenConsumer, StringBuilder fullResponse) { private void handleTokenResponse(org.springframework.ai.chat.model.ChatResponse chatResponse, Consumer<String> tokenConsumer, StringBuilder fullResponse) {
try { try {
String token = chatResponse.getResult().getOutput().getText(); String token = chatResponse.getResult().getOutput().getText();
if (isValidToken(token)) { if (isValidToken(token)) {
fullResponse.append(token); fullResponse.append(token);
if (tokenConsumer != null) { if (tokenConsumer != null) {
tokenConsumer.accept(token); tokenConsumer.accept(token);
} }
eventSplitter.feedToken(token); eventSplitter.feedToken(token);
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -212,22 +226,22 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -212,22 +226,22 @@ public class DefaultReactExecutor implements ReactExecutor {
errorHandlerService.handleReactFlowError(e, tokenConsumer); errorHandlerService.handleReactFlowError(e, tokenConsumer);
} }
} }
/** /**
* 处理流式响应完成事件 * 处理流式响应完成事件
* *
* @param tokenConsumer token消费者 * @param tokenConsumer token消费者
* @param fullResponse 完整响应内容 * @param fullResponse 完整响应内容
* @param agent 智能体对象 * @param agent 智能体对象
* @param userId 用户ID * @param userId 用户ID
*/ */
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId, String emitterId) { private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId, String emitterId) {
try { try {
log.info("流式处理完成"); log.info("流式处理完成");
String responseStr = fullResponse.toString(); String responseStr = fullResponse.toString();
saveAssistantResponseToMemory(agent, responseStr, userId); saveAssistantResponseToMemory(agent, responseStr, userId);
log.info("complete, remove emitterId {}",emitterId); log.info("complete, remove emitterId {}", emitterId);
userSseService.removeEmitter(emitterId); userSseService.removeEmitter(emitterId);
sendCompletionEvent(tokenConsumer, responseStr); sendCompletionEvent(tokenConsumer, responseStr);
} catch (Exception e) { } catch (Exception e) {
...@@ -235,13 +249,13 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -235,13 +249,13 @@ public class DefaultReactExecutor implements ReactExecutor {
handleCompletionError(tokenConsumer, e); handleCompletionError(tokenConsumer, e);
} }
} }
/** /**
* 将助手的回复保存到内存中 * 将助手的回复保存到内存中
* *
* @param agent 智能体对象 * @param agent 智能体对象
* @param response 助手的回复内容 * @param response 助手的回复内容
* @param userId 用户ID * @param userId 用户ID
*/ */
private void saveAssistantResponseToMemory(Agent agent, String response, String userId) { private void saveAssistantResponseToMemory(Agent agent, String response, String userId) {
if (agent != null) { if (agent != null) {
...@@ -253,12 +267,12 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -253,12 +267,12 @@ public class DefaultReactExecutor implements ReactExecutor {
} }
} }
} }
/** /**
* 处理完成事件时发生的错误 * 处理完成事件时发生的错误
* *
* @param tokenConsumer token消费者 * @param tokenConsumer token消费者
* @param e 发生的异常 * @param e 发生的异常
*/ */
private void handleCompletionError(Consumer<String> tokenConsumer, Exception e) { private void handleCompletionError(Consumer<String> tokenConsumer, Exception e) {
if (tokenConsumer instanceof TokenConsumerWithCompletion) { if (tokenConsumer instanceof TokenConsumerWithCompletion) {
...@@ -273,40 +287,40 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -273,40 +287,40 @@ public class DefaultReactExecutor implements ReactExecutor {
} }
} }
} }
/** /**
* 验证token是否有效 * 验证token是否有效
* *
* @param token 待验证的token * @param token 待验证的token
* @return 如果token有效则返回true,否则返回false * @return 如果token有效则返回true,否则返回false
*/ */
private boolean isValidToken(String token) { private boolean isValidToken(String token) {
return token != null && !token.isEmpty(); return token != null && !token.isEmpty();
} }
/** /**
* 处理流式响应中的错误 * 处理流式响应中的错误
* *
* @param throwable 异常对象 * @param throwable 异常对象
* @param tokenConsumer token消费者 * @param tokenConsumer token消费者
*/ */
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer,String emitterId) { private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer, String emitterId) {
log.info("error,remove emitterId:{}", emitterId); log.info("error,remove emitterId:{}", emitterId);
userSseService.removeEmitter(emitterId); userSseService.removeEmitter(emitterId);
errorHandlerService.handleStreamError(throwable, tokenConsumer, "ReAct流式处理"); errorHandlerService.handleStreamError(throwable, tokenConsumer, "ReAct流式处理");
} }
/** /**
* 发送完成事件 * 发送完成事件
* *
* @param tokenConsumer token消费者 * @param tokenConsumer token消费者
* @param fullResponse 完整响应内容 * @param fullResponse 完整响应内容
*/ */
private void sendCompletionEvent(Consumer<String> tokenConsumer, String fullResponse) { private void sendCompletionEvent(Consumer<String> tokenConsumer, String fullResponse) {
if (fullResponse == null) { if (fullResponse == null) {
fullResponse = ""; fullResponse = "";
} }
if (tokenConsumer instanceof TokenConsumerWithCompletion) { if (tokenConsumer instanceof TokenConsumerWithCompletion) {
try { try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse); ((TokenConsumerWithCompletion) tokenConsumer).onComplete(fullResponse);
...@@ -327,16 +341,16 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -327,16 +341,16 @@ public class DefaultReactExecutor implements ReactExecutor {
tokenConsumer.accept(""); tokenConsumer.accept("");
} }
} }
/** /**
* 获取智能体工具 * 获取智能体工具
* *
* @param agent 智能体对象 * @param agent 智能体对象
* @return 智能体可用的工具列表 * @return 智能体可用的工具列表
*/ */
private List<Object> getAgentTools(Agent agent) { private List<Object> getAgentTools(Agent agent) {
List<Object> tools = new ArrayList<>(); List<Object> tools = new ArrayList<>();
if (agent != null) { if (agent != null) {
try { try {
tools = agentToolManager.getAvailableToolInstances(agent); tools = agentToolManager.getAvailableToolInstances(agent);
...@@ -345,7 +359,7 @@ public class DefaultReactExecutor implements ReactExecutor { ...@@ -345,7 +359,7 @@ public class DefaultReactExecutor implements ReactExecutor {
// 发生异常时,tools 保持为空列表 // 发生异常时,tools 保持为空列表
} }
} }
return tools; return tools;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment