Commit ef0d62e2 authored by youxiaoji's avatar youxiaoji

* [主分支合并2]

parent 02fbf9da
......@@ -5,16 +5,23 @@ import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.messages.*;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.UserToken;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.service.UserTokenService;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.function.Consumer;
/**
......@@ -23,7 +30,8 @@ import java.util.function.Consumer;
@Slf4j
@Service
public class DefaultReactExecutor implements ReactExecutor {
private final UserSseService userSseService;
@Value("${hiagent.react.system-prompt}")
private String defaultSystemPrompt;
......@@ -37,13 +45,16 @@ public class DefaultReactExecutor implements ReactExecutor {
private ErrorHandlerService errorHandlerService;
private final AgentToolManager agentToolManager;
private final UserTokenService userTokenService;
public DefaultReactExecutor(EventSplitter eventSplitter, AgentToolManager agentToolManager ,
MemoryService memoryService, ErrorHandlerService errorHandlerService) {
MemoryService memoryService, ErrorHandlerService errorHandlerService, UserSseService userSseService, UserTokenService userTokenService) {
this.eventSplitter = eventSplitter;
this.agentToolManager = agentToolManager;
this.memoryService = memoryService;
this.errorHandlerService = errorHandlerService;
this.userSseService = userSseService;
this.userTokenService = userTokenService;
}
@Override
......@@ -156,15 +167,19 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
SseTokenEmitter sseTokenEmitter = (SseTokenEmitter)tokenConsumer;
UserToken userToken = userTokenService.getUserToken(sseTokenEmitter.getUserId(),"pangea");
String emitterId = sseTokenEmitter.getEmitterId();
chatClient.prompt(prompt)
.tools(agentTools.toArray())
.toolContext(Map.of("emitterId",emitterId))
.stream()
.chatResponse()
.subscribe(
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
throwable -> handleStreamError(throwable, tokenConsumer),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId)
throwable -> handleStreamError(throwable, tokenConsumer,emitterId),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId,emitterId)
);
} catch (Exception e) {
......@@ -207,12 +222,14 @@ public class DefaultReactExecutor implements ReactExecutor {
* @param agent 智能体对象
* @param userId 用户ID
*/
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId) {
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId, String emitterId) {
try {
log.info("流式处理完成");
String responseStr = fullResponse.toString();
saveAssistantResponseToMemory(agent, responseStr, userId);
log.info("complete, remove emitterId {}",emitterId);
userSseService.removeEmitter(emitterId);
sendCompletionEvent(tokenConsumer, responseStr);
} catch (Exception e) {
log.error("处理流式完成回调时发生错误", e);
......@@ -274,7 +291,9 @@ public class DefaultReactExecutor implements ReactExecutor {
* @param throwable 异常对象
* @param tokenConsumer token消费者
*/
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer) {
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer,String emitterId) {
log.info("error,remove emitterId:{}", emitterId);
userSseService.removeEmitter(emitterId);
errorHandlerService.handleStreamError(throwable, tokenConsumer, "ReAct流式处理");
}
......
......@@ -15,6 +15,8 @@ import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.web.dto.AgentRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.util.UUID;
/**
* Agent 对话服务
* 职责:协调整个AI对话流程,作为流式处理的统一入口和协调者
......@@ -125,9 +127,11 @@ public class AgentChatService {
// 创建 SSE emitter
SseEmitter emitter = userSseService.createAndRegisterConnection(userId);
String emitterId = UUID.randomUUID().toString();
log.info("emitterId: {}", emitterId);
userSseService.registerEmitter(emitterId, emitter);
// 异步处理对话,避免阻塞HTTP连接
processChatStreamAsync(emitter, agent, chatRequest, userId);
processChatStreamAsync(emitter, agent, chatRequest, userId,emitterId);
return emitter;
}
......@@ -136,14 +140,14 @@ public class AgentChatService {
* 异步处理流式对话
*/
@Async
private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId,String emitterId) {
try {
// 首先检查连接状态
if (emitter != null && userSseService.isEmitterCompleted(emitter)) {
log.debug("SSE连接已关闭,跳过异步处理");
return;
}
processChatRequest(emitter, agent, chatRequest, userId);
processChatRequest(emitter, agent, chatRequest, userId,emitterId);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
......@@ -163,7 +167,7 @@ public class AgentChatService {
* @param chatRequest 聊天请求
* @param userId 用户ID
*/
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId,String emitterId) {
try {
// 参数验证
if (!validateParameters(emitter, agent, chatRequest, userId)) {
......@@ -197,7 +201,7 @@ public class AgentChatService {
// 创建新的SseTokenEmitter实例
SseTokenEmitter tokenEmitter = new SseTokenEmitter(userSseService, emitter, agent, request, userId, this::handleCompletion);
tokenEmitter.setEmitterId(emitterId);
// 处理流式请求前再次检查连接状态
if (!userSseService.isEmitterCompleted(emitter)) {
processor.processStreamRequest(request, agent, userId, tokenEmitter);
......
......@@ -25,6 +25,7 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
private final AgentRequest request;
private final String userId;
private final CompletionCallback completionCallback;
private String emitterId;
/**
* 构造函数
......@@ -160,4 +161,13 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
public interface CompletionCallback {
void onComplete(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent);
}
public void setEmitterId(String emitterId) {
this.emitterId = emitterId;
}
public String getEmitterId() {
return emitterId;
}
public String getUserId() {
return userId;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment