Commit c6b7dbf6 authored by youxiaoji's avatar youxiaoji

* [主分支合并]

parent 72ec5880
......@@ -8,14 +8,20 @@ import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.SseTokenEmitter;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.memory.MemoryService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.UserToken;
import pangea.hiagent.tool.AgentToolManager;
import pangea.hiagent.tool.impl.DateTimeTools;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.service.UserTokenService;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
......@@ -126,9 +132,14 @@ public class DefaultReactExecutor implements ReactExecutor {
@Autowired
private ErrorHandlerService errorHandlerService;
@Autowired
private UserTokenService userTokenService;
private final AgentToolManager agentToolManager;
@Autowired
private UserSseService userSseService;
public DefaultReactExecutor(AgentToolManager agentToolManager) {
this.agentToolManager = agentToolManager;
}
......@@ -252,17 +263,21 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
// triggerThinkStep("开始处理用户请求: " + userInput);
SseTokenEmitter sseTokenEmitter = (SseTokenEmitter)tokenConsumer;
String emitterId = sseTokenEmitter.getEmitterId();
Prompt prompt = buildPromptWithHistory(DEFAULT_SYSTEM_PROMPT, userInput, agent, userId);
UserToken userToken = userTokenService.getUserToken(sseTokenEmitter.getUserId(),"pangea");
chatClient.prompt(prompt)
.tools(agentTools.toArray())
.toolContext(Map.of("emitterId",emitterId))
.stream()
.chatResponse()
.subscribe(
chatResponse -> handleTokenResponse(chatResponse, tokenConsumer, fullResponse),
throwable -> handleStreamError(throwable, tokenConsumer),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId)
throwable -> handleStreamError(throwable, tokenConsumer,emitterId),
() -> handleStreamCompletion(tokenConsumer, fullResponse, agent, userId,emitterId)
);
} catch (Exception e) {
......@@ -310,7 +325,7 @@ public class DefaultReactExecutor implements ReactExecutor {
* @param agent 智能体对象
* @param userId 用户ID
*/
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId) {
private void handleStreamCompletion(Consumer<String> tokenConsumer, StringBuilder fullResponse, Agent agent, String userId,String emitterId) {
try {
log.info("流式处理完成");
......@@ -321,7 +336,8 @@ public class DefaultReactExecutor implements ReactExecutor {
}
saveAssistantResponseToMemory(agent, responseStr, userId);
log.info("complete, remove emitterId {}",emitterId);
userSseService.removeEmitter(emitterId);
sendCompletionEvent(tokenConsumer, responseStr);
} catch (Exception e) {
log.error("处理流式完成回调时发生错误", e);
......@@ -401,7 +417,9 @@ public class DefaultReactExecutor implements ReactExecutor {
* @param throwable 异常对象
* @param tokenConsumer token消费者
*/
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer) {
private void handleStreamError(Throwable throwable, Consumer<String> tokenConsumer,String emitterId) {
log.info("error,remove emitterId:{}", emitterId);
userSseService.removeEmitter(emitterId);
errorHandlerService.handleStreamError(throwable, tokenConsumer, "ReAct流式处理");
}
......
......@@ -16,6 +16,8 @@ import pangea.hiagent.web.dto.AgentRequest;
import pangea.hiagent.workpanel.event.EventService;
import jakarta.servlet.http.HttpServletResponse;
import java.util.UUID;
/**
* Agent 对话服务
* 职责:协调整个AI对话流程,作为流式处理的统一入口和协调者
......@@ -128,9 +130,11 @@ public class AgentChatService {
// 创建 SSE emitter
SseEmitter emitter = userSseSerivce.createEmitter();
String emitterId = UUID.randomUUID().toString();
log.info("emitterId: {}", emitterId);
userSseSerivce.registerEmitter(emitterId, emitter);
// 异步处理对话,避免阻塞HTTP连接
processChatStreamAsync(emitter, agent, chatRequest, userId);
processChatStreamAsync(emitter, agent, chatRequest, userId,emitterId);
return emitter;
}
......@@ -139,9 +143,9 @@ public class AgentChatService {
* 异步处理流式对话
*/
@Async
private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
private void processChatStreamAsync(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId,String emitterId) {
try {
processChatRequest(emitter, agent, chatRequest, userId);
processChatRequest(emitter, agent, chatRequest, userId,emitterId);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
try {
......@@ -166,7 +170,7 @@ public class AgentChatService {
* @param chatRequest 聊天请求
* @param userId 用户ID
*/
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId,String emitterId) {
try {
// 参数验证
if (!validateParameters(emitter, agent, chatRequest, userId)) {
......@@ -192,6 +196,8 @@ public class AgentChatService {
// 设置完成回调
sseTokenEmitter.setCompletionCallback(this::handleCompletion);
sseTokenEmitter.setEmitterId(emitterId);
// 处理流式请求
processor.processStreamRequest(request, agent, userId, sseTokenEmitter);
......
......@@ -24,6 +24,7 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
private Agent agent;
private AgentRequest request;
private String userId;
private String emitterId;
// 完成回调
private CompletionCallback completionCallback;
......@@ -47,6 +48,7 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
this.request = request;
this.userId = userId;
}
/**
* 设置完成回调
......@@ -54,7 +56,12 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
public void setCompletionCallback(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
}
public void setEmitterId(String emitterId) {
this.emitterId = emitterId;
}
public String getEmitterId() {
return emitterId;
}
@Override
public void accept(String token) {
// 使用JSON格式发送token,确保转义序列被正确处理
......@@ -118,4 +125,8 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
public interface CompletionCallback {
void onComplete(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent);
}
public String getUserId() {
return userId;
}
}
\ No newline at end of file
......@@ -810,4 +810,17 @@ public class UserSseService {
Thread.currentThread().interrupt();
}
}
public void registerEmitter(String id,SseEmitter emitter) {
this.userEmitters.put(id, emitter);
}
public SseEmitter getEmitter(String id) {
return userEmitters.get(id);
}
public boolean removeEmitter(String id) {
userEmitters.remove(id);
return true;
}
}
\ No newline at end of file
package pangea.hiagent.common.utils;
public class Contants {
public static final String LOCATOR_SCHEMA = "{\n" +
" \"$schema\": \"http://json-schema.org/draft-07/schema#\",\n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"field_name\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"locator\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"label_tag\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"attributes\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
" \"type\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"maxlength\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"class\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"value\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"autocomplete\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"placeholder\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"readonly\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"id\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"droptreeids\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"vetitle\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"contenteditable\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"style\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"tipstext\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"fylx\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" \"additionalProperties\": false,\n" +
" \"required\": [\n" +
" \"class\",\n" +
" \"value\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"additionalProperties\": false,\n" +
" \"required\": [\n" +
" \"field_name\",\n" +
" \"locator\",\n" +
" \"attributes\"\n" +
" ]\n" +
" }\n" +
"}";
}
package pangea.hiagent.common.utils;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicLong;
public class HybridUniqueLongGenerator {
private static final SecureRandom random = new SecureRandom();
private static final AtomicLong counter = new AtomicLong(0);
public static long generateUnique13DigitNumber() {
long timestamp = System.currentTimeMillis();
long count = counter.incrementAndGet();
// 使用时间戳的前10位 + 计数器的后3位
long timestampPart = (timestamp / 1000) * 1000;
long counterPart = count % 1000;
return timestampPart + counterPart;
}
// 更随机的版本,但仍保证唯一
public static synchronized long generateRandomUnique() {
long timestamp = System.currentTimeMillis();
// 在时间戳基础上加上一个小的随机偏移
int randomOffset = random.nextInt(100);
long result = timestamp * 100 + randomOffset;
// 确保是13位
while (result >= 10000000000000L) {
result /= 10;
}
while (result < 1000000000000L) {
result *= 10;
result += random.nextInt(10);
}
return result;
}
}
\ No newline at end of file
package pangea.hiagent.common.utils;
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicLong;
public class InputCodeGenerator {
private static final String CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
private static final SecureRandom random = new SecureRandom();
private static final AtomicLong sequence = new AtomicLong(0);
public static String generateUniqueInputCode(String prefix) {
// 当前时间戳(毫秒)
long timestamp = System.currentTimeMillis();
// 序列号
long seq = sequence.incrementAndGet();
// 组合时间戳和序列号
long combined = (timestamp << 10) | (seq & 0x3FF); // 取序列号后10位
// 转为36进制
String code = Long.toString(Math.abs(combined), 36).toUpperCase();
// 确保8位长度
if (code.length() > 8) {
code = code.substring(code.length() - 8);
} else if (code.length() < 8) {
// 前面补随机字符
StringBuilder sb = new StringBuilder();
for (int i = code.length(); i < 8; i++) {
sb.append(CHARS.charAt(random.nextInt(CHARS.length())));
}
code = sb.toString() + code;
}
return prefix + code;
}
}
......@@ -17,7 +17,7 @@ import org.springframework.ai.chat.model.ToolContext;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.agent.sse.UserSseService;
import pangea.hiagent.agent.service.UserSseService;
import pangea.hiagent.common.utils.Contants;
import pangea.hiagent.common.utils.HybridUniqueLongGenerator;
import pangea.hiagent.common.utils.InputCodeGenerator;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment