Commit 0306580c authored by ligaowei's avatar ligaowei

refactor(agent): 移除未使用的回调方法和冗余代码

重构ReAct相关组件,移除未使用的onFinalAnswer回调方法和冗余的getParameters/getContent方法。优化SseTokenEmitter为无状态设计,通过构造函数一次性传入所有状态。简化JwtAuthenticationFilter和DefaultPermissionEvaluator的权限检查逻辑。改进ErrorHandlerService的错误处理代码复用性。调整代码结构以提高可维护性。
parent 40bd44a9
......@@ -31,14 +31,16 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
@Autowired
private RagService ragService;
@Autowired
private ReactCallback defaultReactCallback;
@Autowired
private ReactExecutor defaultReactExecutor;
@Autowired
private AgentToolManager agentToolManager;
@Autowired
private ReactCallback defaultReactCallback;
@Override
public String processRequest(Agent agent, AgentRequest request, String userId) {
......@@ -72,10 +74,6 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, null);
if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return ragResponse;
}
......@@ -83,10 +81,7 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
ChatClient client = ChatClient.builder(agentService.getChatModelForAgent(agent)).build();
List<Object> tools = agentToolManager.getAvailableToolInstances(agent);
// 添加自定义回调到ReAct执行器
if (defaultReactExecutor != null && defaultReactCallback != null) {
defaultReactExecutor.addReactCallback(defaultReactCallback);
}
// 使用ReAct执行器执行流程,传递Agent对象和用户ID以支持记忆功能
String finalAnswer = defaultReactExecutor.execute(client, userMessage, tools, agent, userId);
......@@ -114,10 +109,6 @@ public class ReActAgentProcessor extends BaseAgentProcessor {
// 处理请求的通用前置逻辑
String ragResponse = handlePreProcessing(agent, userMessage, userId, ragService, tokenConsumer);
if (ragResponse != null) {
// 触发最终答案回调
if (defaultReactCallback != null) {
defaultReactCallback.onFinalAnswer(ragResponse);
}
return;
}
......
......@@ -24,12 +24,6 @@ public class DefaultReactCallback implements ReactCallback {
recordReactStepToWorkPanel(reactStep);
}
@Override
public void onFinalAnswer(String finalAnswer) {
ReactStep finalStep = new ReactStep(0, ReactStepType.FINAL_ANSWER, finalAnswer);
recordReactStepToWorkPanel(finalStep);
}
private void recordReactStepToWorkPanel(ReactStep reactStep) {
if (workPanelCollector == null) {
return;
......@@ -46,7 +40,7 @@ public class DefaultReactCallback implements ReactCallback {
if (reactStep.getAction() != null) {
// 记录工具调用动作
String toolName = reactStep.getAction().getToolName();
Object parameters = reactStep.getAction().getParameters();
Object parameters = reactStep.getAction().getToolArgs();
// 记录工具调用,初始状态为pending
workPanelCollector.recordToolCallAction(
......@@ -73,15 +67,15 @@ public class DefaultReactCallback implements ReactCallback {
// 使用动作信息更新工具调用结果
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getParameters(),
reactStep.getObservation().getContent(),
reactStep.getAction().getToolArgs(),
reactStep.getObservation().getResult(),
"success", // 状态为success
null // 无错误信息
);
log.info("[WorkPanel] 更新工具调用结果: 工具={} 结果摘要={}",
reactStep.getAction().getToolName(),
reactStep.getObservation().getContent().substring(0, Math.min(50, reactStep.getObservation().getContent().length())));
reactStep.getObservation().getResult().substring(0, Math.min(50, reactStep.getObservation().getResult().length())));
} else {
// 如果没有动作信息,记录为观察结果
workPanelCollector.recordThinking(reactStep.getContent(), "observation");
......@@ -108,7 +102,7 @@ public class DefaultReactCallback implements ReactCallback {
if (reactStep != null && reactStep.getAction() != null) {
workPanelCollector.recordToolCallAction(
reactStep.getAction().getToolName(),
reactStep.getAction().getParameters(),
reactStep.getAction().getToolArgs(),
"记录失败: " + e.getMessage(),
"error",
System.currentTimeMillis() // 使用当前时间戳作为执行时间
......
......@@ -6,6 +6,7 @@ import org.springframework.ai.chat.messages.*;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import pangea.hiagent.agent.service.ErrorHandlerService;
import pangea.hiagent.agent.service.TokenConsumerWithCompletion;
......@@ -16,7 +17,6 @@ import pangea.hiagent.tool.impl.DateTimeTools;
import pangea.hiagent.common.utils.UserUtils;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
/**
......@@ -26,97 +26,10 @@ import java.util.function.Consumer;
@Service
public class DefaultReactExecutor implements ReactExecutor {
private static final String DEFAULT_SYSTEM_PROMPT =
"You are a powerful professional AI assistant powered by the enhanced ReAct (Reasoning + Acting) iterative framework, specialized for Spring AI tool orchestration. Your core mission is to solve complex, multi-step user queries with high accuracy by following the upgraded rules. The TOP PRIORITY principle is: ALWAYS CALL TOOLS FIRST, and answer questions EXCLUSIVELY based on tool execution results. You have full authority to intelligently select, combine, and serially invoke multiple tools, and iterate reasoning until a complete and satisfactory answer is obtained.\n\n" +
"=== CORE UPGRADED RULE - NON-NEGOTIABLE (Tool-First Priority Highlighted) ===\n\n" +
"1. Tool-First Mandate: For any query that requires factual verification, data calculation, information extraction, content analysis, or scenario-based processing, YOU MUST CALL RELEVANT TOOLS FIRST. Never answer directly relying on internal knowledge without tool invocation, except for extremely simple common-sense questions (e.g., \"What is 1+1?\").\n" +
"2. Answer Based on Tool Results Only: All conclusions, data, and insights in the final answer must be strictly derived from the real execution results of Spring AI tools. Never fabricate any data, assumptions, or inferences that are not supported by tool outputs.\n" +
"3. Serial Multi-Tool Invocation Supported: You can invoke multiple tools in serial order in one Action phase. By default, the output of the previous tool is the directly valid input of the next tool (first-class support for tool chaining).\n" +
"4. Iterative ReAct Closed-Loop: The ReAct thinking process is a cyclic loop. After each Observation phase, you can return to the Thought phase to re-analyze, reselect tools, and re-execute until the answer is complete/satisfactory.\n" +
"5. Mandatory Tool Synergy: Complex queries must use multi-tool combinations. A single tool can only solve simple problems; never rely on a single tool for complex tasks.\n" +
"6. Strict Compliance with Spring AI Mechanism: All tool calls are executed automatically by the Spring AI framework. You only need to make optimal tool selection and sequence planning.\n\n\n" +
"=== ENHANCED TOOL SYNERGY & ORCHESTRATION STRATEGY ===\n\n" +
"You have access to a full set of specialized Spring AI tools and must create value through intelligent tool collocation, with tool-first logic throughout:\n\n" +
"- Serial Chaining (Highest Priority): The output of one tool directly feeds into the input of another, forming a closed tool call chain (e.g., File Reader → Text Processor → Calculator → File Writer → Chart Generator).\n\n" +
"- Parallel Combination: Call multiple independent tools simultaneously to collect multi-dimensional data, then merge results for comprehensive analysis.\n\n" +
"- Preprocessing & Postprocessing: Use formatting tools to clean raw data before core tool execution; use conversion tools to optimize result presentation afterward.\n\n" +
"- Layered Enrichment: Combine extraction, analysis, and calculation tools to gain in-depth insights instead of superficial data.\n\n" +
"- Priority Matching: Select lightweight tools first for simple sub-tasks; use heavyweight tools only for complex ones (resource efficiency).\n\n" +
"- Fault Tolerance Fallback: If a selected tool is unavailable/returns invalid results, immediately invoke an alternative tool with the same function to re-execute the sub-task.\n\n\n" +
"=== Typical High-Value Tool Synergy Examples ===\n\n" +
"1. Web Content Extractor → Text Parser & Cleaner → NLP Analyzer → Statistical Calculator → Result Formatter → File Saver\n\n" +
"2. Current DateTime Tool → Date Formatter → Data Filter → Time Series Analyzer → Visualization Tool\n\n" +
"3. Document Reader → Table Extractor → Data Validator → Formula Calculator → Report Generator\n\n" +
"4. Input Parameter Parser → Multiple Business Tools (Serial) → Result Aggregator → Answer Polisher\n\n\n" +
"=== UPGRADED ITERATIVE ReAct THINKING PROCESS (Tool-First Oriented) ===\n\n" +
"This is a cyclic, repeatable process for EVERY query, with tool-first logic as the core. Execute in order and loop infinitely until the answer meets completeness requirements.\n\n" +
"▶ Cycle Trigger Rule: After Step 4 (Observation), if results are incomplete/insufficient/need optimization → Return to Step 1 (Thought) to re-analyze and re-execute.\n\n" +
"▶ Cycle Termination Rule: After Step 4 (Observation), if results are complete/accurate/satisfactory → Enter Step 5 (Final Answer) directly.\n\n\n" +
"Step 1 - THOUGHT (Tool-First Iterative Reasoning & Planning): Deeply analyze the user's core query and current context with tool-first logic\n" +
" - Break down the main problem into hierarchical sub-tasks (primary → secondary → fine-grained).\n" +
" - Tool-First Matching: For each sub-task, FIRST identify relevant tools (never consider direct answering first). Mark alternative tools for fault tolerance.\n" +
" - Confirm Tool Synergy Feasibility: Judge serial/parallel combination of multi-tools and define the exact invocation sequence.\n" +
" - Iterative Scenario Adjustment: Re-analyze the gap between current tool results and expected answers, adjust tool selection/sequence.\n" +
" - Verify Preconditions: Ensure input format and parameter validity for tool invocation are met.\n\n\n" +
"Step 2 - ACTION (Multi-Tool Serial/Parallel Execution): Execute the planned tool chain with clear purpose, adhering to tool-first principle\n" +
" - Call tools in the pre-defined serial/parallel order based on Thought phase analysis.\n" +
" - Support multiple consecutive tool calls in one Action phase (serial chain) for Spring AI, no limit on the number of tools.\n" +
" - Wait for ALL tool execution results (serial: one by one / parallel: all at once) before proceeding; never jump early.\n" +
" - Fault Tolerance Execution: If a tool returns invalid/empty results, immediately invoke the pre-marked alternative tool and re-execute the sub-task.\n\n\n" +
"Step 3 - OBSERVATION (Tool Result-Centric Analysis & Validation): Comprehensively interpret all tool execution results\n" +
" - Examine data/results from each tool in detail, cross-verify accuracy, completeness, and logical consistency.\n" +
" - Extract key information, patterns, and insights EXCLUSIVELY from combined tool results.\n" +
" - Judge Completion Status: Confirm if current results cover all sub-tasks and meet the user's core needs.\n" +
" - Identify Gaps: Mark missing information/unsolved sub-tasks that require further tool invocation.\n" +
" - Evaluate Tool Synergy Effect: Confirm if the tool chain provides deeper insights than single-tool usage.\n\n\n" +
"Step 4 - ITERATION DECISION: Critical judgment for ReAct cycle\n" +
" - ✅ TERMINATE CYCLE: If observation results are complete, accurate, sufficient, and fully meet the user's query → Proceed to Step 5.\n\n" +
" ♻️ RESTART CYCLE: If observation results are incomplete/insufficient/have missing information → Return to Step 1.\n\n\n" +
"Step 5 - FINAL ANSWER (Tool Result-Synthesized Response): Generate the ultimate answer based solely on tool results\n" +
" - Synthesize all valid tool results (from iterative cycles) into a coherent, logical, and complete answer.\n" +
" - Present information in clear, easy-to-understand natural language, distinguishing key insights from basic information.\n" +
" - Explicitly explain tool synergy logic (e.g., \"Tool A processed raw data for Tool B, enabling accurate calculation by Tool C\").\n" +
" - Provide actionable conclusions, recommendations, or follow-up suggestions based on integrated tool results.\n" +
" - Keep the answer conversational and business-oriented; remove redundant technical tool details.\n\n\n" +
"=== STANDARDIZED RESPONSE FORMAT ===\n\n" +
"Strictly follow this fixed structure for all responses to ensure correct parsing by Spring AI:\n\n\n" +
"1. Thought: Detailed explanation of problem analysis, sub-task breakdown, tool-first selection strategy, and invocation sequence\n" +
" - Identified Sub-Problems: List all primary/secondary sub-tasks clearly.\n" +
" - Tool-First Matching: Tools assigned to each sub-task + alternative tools (if any).\n" +
" - Execution Sequence: Exact serial/parallel order of multi-tool invocation and its optimality.\n" +
" - Iteration Note: If re-analyzing (loop), explain gaps in previous results and tool selection adjustments.\n\n\n" +
"2. Action: Clear description of all tool calls in this phase (serial number + tool name + core purpose)\n" +
" - Tool_Call: 1.[Tool Name] → Purpose: [Exact business objective and core value]\n" +
" - Tool_Call: 2.[Tool Name] → Purpose: [Complement the previous tool, use its output as input]\n" +
" - Tool_Call: N.[Tool Name] → Purpose: [Final enrichment/validation/formatting of the result chain]\n" +
" - (Fallback) If Tool X Unavailable: Use [Alternative Tool Name] → Purpose: [Same objective as Tool X]\n\n\n" +
"3. Observation: Comprehensive interpretation of all tool execution results\n" +
" - Results from each individual tool (key data, no redundant details).\n" +
" - Logical relationship between multiple tool results (how they connect and complement).\n" +
" - Core patterns/insights from the tool chain.\n" +
" - Completion Status: Whether results cover all sub-tasks and missing information (if any).\n\n\n" +
"4. Iteration_Decision: Explicit single choice\n" +
" - Option 1: Terminate Cycle → Proceed to Final Answer (complete results)\n" +
" - Option 2: Restart Cycle → Re-enter Thought phase (incomplete results)\n\n\n" +
"5. Final_Answer: Polished, complete, and user-friendly natural language solution\n" +
" - Direct answer to the original query, with core conclusions first.\n" +
" - Highlight key insights from tool synergy/iterative reasoning.\n" +
" - Provide actionable follow-up suggestions.\n" +
" - Conversational tone; no technical jargon about tools/frameworks.\n\n\n\n" +
"=== CRITICAL HARD RULES (Tool-First as Core) ===\n\n" +
"1. Tool-First is Non-Negotiable: For non-trivial queries, call tools first. Never answer directly with internal knowledge unless it's extremely simple common sense.\n" +
"2. Tool Results are the Sole Basis: All answers must rely on real Spring AI tool execution results. Never fabricate data/results.\n" +
"3. Mandatory Multi-Tool Synergy: Complex queries must use tool combinations. Never rely on a single tool for complex tasks.\n" +
"4. Full Support for Serial Invocation: One Action phase can call N tools in sequence, with prior output as next input.\n" +
"5. Iterative ReAct is Mandatory: Never stop at one-time execution; loop until the answer is complete and satisfactory.\n" +
"6. Explicit Tool Strategy: All tool selection, sequence planning, and fallback options must be clearly stated in Thought.\n" +
"7. Unavailable Tool Handling: Immediately use an alternative tool if the selected one is unavailable; do not suspend execution.\n" +
"8. User Experience Priority: The Final Answer must be conversational and business-focused, hiding technical tool details.\n" +
"9. Spring AI Compliance: All tool calls follow the framework's automatic execution rules; no custom execution logic.";
@Value("${hiagent.react.system-prompt}")
private String defaultSystemPrompt;
private final List<ReactCallback> reactCallbacks = new ArrayList<>();
private final AtomicInteger stepCounter = new AtomicInteger(0);
@Autowired
private DateTimeTools dateTimeTools;
......@@ -151,14 +64,10 @@ public class DefaultReactExecutor implements ReactExecutor {
public String execute(ChatClient chatClient, String userInput, List<Object> tools, Agent agent, String userId) {
log.info("开始执行ReAct流程,用户输入: {}", userInput);
stepCounter.set(0);
List<Object> agentTools = getAgentTools(agent);
try {
// triggerThinkStep("开始处理用户请求: " + userInput);
Prompt prompt = buildPromptWithHistory(DEFAULT_SYSTEM_PROMPT, userInput, agent, userId);
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
ChatResponse response = chatClient.prompt(prompt)
.tools(agentTools.toArray())
......@@ -167,12 +76,8 @@ public class DefaultReactExecutor implements ReactExecutor {
String responseText = response.getResult().getOutput().getText();
// triggerObservationStep(responseText);
log.info("最终答案: {}", responseText);
// triggerFinalAnswerStep(responseText);
// 保存助手回复到内存,使用提供的用户ID
saveAssistantResponseToMemory(agent, responseText, userId);
......@@ -190,6 +95,7 @@ public class DefaultReactExecutor implements ReactExecutor {
* @return 错误处理结果
*/
private String handleReActError(Exception e) {
log.error("ReAct执行过程中发生错误", e);
return errorHandlerService.handleSyncError(e, "处理ReAct请求时发生错误");
}
......@@ -244,16 +150,12 @@ public class DefaultReactExecutor implements ReactExecutor {
public void executeStream(ChatClient chatClient, String userInput, List<Object> tools, Consumer<String> tokenConsumer, Agent agent, String userId) {
log.info("使用stream()方法处理ReAct流程,支持真正的流式输出");
stepCounter.set(0);
List<Object> agentTools = getAgentTools(agent);
StringBuilder fullResponse = new StringBuilder();
try {
// triggerThinkStep("开始处理用户请求: " + userInput);
Prompt prompt = buildPromptWithHistory(DEFAULT_SYSTEM_PROMPT, userInput, agent, userId);
Prompt prompt = buildPromptWithHistory(defaultSystemPrompt, userInput, agent, userId);
chatClient.prompt(prompt)
.tools(agentTools.toArray())
......@@ -285,17 +187,9 @@ public class DefaultReactExecutor implements ReactExecutor {
if (isValidToken(token)) {
fullResponse.append(token);
// analyzeAndRecordToolEvents(token, fullResponse.toString());
if (tokenConsumer != null) {
tokenConsumer.accept(token);
}
// tokenTextSegmenter.inputChar(token);
// tokenTextSegmenter.finishInput();
// 改进:在流式处理过程中实时解析关键词
// processTokenForStepsWithFullResponse(token, fullResponse.toString());
}
} catch (Exception e) {
log.error("处理token时发生错误", e);
......@@ -314,14 +208,8 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
log.info("流式处理完成");
// 检查是否已经处理了Final Answer,如果没有,则将整个响应作为最终答案
String responseStr = fullResponse.toString();
if (!hasFinalAnswerBeenTriggered(responseStr)) {
// triggerFinalAnswerStep(responseStr);
}
saveAssistantResponseToMemory(agent, responseStr, userId);
sendCompletionEvent(tokenConsumer, responseStr);
} catch (Exception e) {
log.error("处理流式完成回调时发生错误", e);
......@@ -336,13 +224,8 @@ public class DefaultReactExecutor implements ReactExecutor {
* @return 如果已经触发了Final Answer则返回true,否则返回false
*/
private boolean hasFinalAnswerBeenTriggered(String fullResponse) {
String[] finalAnswerPatterns = {"Final Answer:", "final answer:", "FINAL ANSWER:", "Final_Answer:", "final_answer:", "FINAL_ANSWER:", "最终答案:"};
for (String pattern : finalAnswerPatterns) {
if (fullResponse.toLowerCase().contains(pattern.toLowerCase())) {
return true;
}
}
return false;
// 使用正则表达式进行高效的不区分大小写匹配
return fullResponse.matches("(?i).*(Final Answer:|Final_Answer:|最终答案:).*");
}
/**
......@@ -374,11 +257,9 @@ public class DefaultReactExecutor implements ReactExecutor {
try {
String errorId = errorHandlerService.generateErrorId();
String fullErrorMessage = errorHandlerService.buildFullErrorMessage("处理完成时发生错误", e, errorId, "ReAct");
try {
((TokenConsumerWithCompletion) tokenConsumer).onComplete("[" + errorId + "] " + fullErrorMessage);
} catch (NoClassDefFoundError ex) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", ex.getMessage());
}
((TokenConsumerWithCompletion) tokenConsumer).onComplete("[" + errorId + "] " + fullErrorMessage);
} catch (NoClassDefFoundError ex) {
log.error("TokenConsumerWithCompletion依赖类未找到,跳过完成回调: {}", ex.getMessage());
} catch (Exception ex) {
log.error("调用onComplete时发生错误", ex);
}
......@@ -444,25 +325,22 @@ public class DefaultReactExecutor implements ReactExecutor {
* @return 智能体可用的工具列表
*/
private List<Object> getAgentTools(Agent agent) {
if (agent == null) {
List<Object> defaultTools = new ArrayList<>();
defaultTools.add(dateTimeTools);
return defaultTools;
}
List<Object> tools = new ArrayList<>();
try {
List<Object> tools = agentToolManager.getAvailableToolInstances(agent);
if (dateTimeTools != null && !tools.contains(dateTimeTools)) {
tools.add(dateTimeTools);
if (agent != null) {
try {
tools = agentToolManager.getAvailableToolInstances(agent);
} catch (Exception e) {
log.error("获取工具实例时发生错误: {}", e.getMessage());
// 发生异常时,tools 保持为空列表
}
return tools;
} catch (Exception e) {
log.error("获取工具实例时发生错误: {}", e.getMessage());
List<Object> fallbackTools = new ArrayList<>();
fallbackTools.add(dateTimeTools);
return fallbackTools;
}
// 添加默认的日期时间工具(如果尚未添加)
if (dateTimeTools != null && !tools.contains(dateTimeTools)) {
tools.add(dateTimeTools);
}
return tools;
}
}
\ No newline at end of file
......@@ -10,6 +10,4 @@ public interface ReactCallback {
* @param reactStep ReAct步骤对象,包含步骤的所有核心信息
*/
void onStep(ReactStep reactStep);
void onFinalAnswer(String ragResponse);
}
\ No newline at end of file
......@@ -49,9 +49,6 @@ public class ReactStep {
public Object getToolArgs() { return toolArgs; }
public void setToolArgs(Object toolArgs) { this.toolArgs = toolArgs; }
// 根据DefaultReactCallback.java中的使用情况添加getParameters方法
public Object getParameters() { return toolArgs; }
}
/**
......@@ -66,8 +63,5 @@ public class ReactStep {
public String getResult() { return result; }
public void setResult(String result) { this.result = result; }
// 根据DefaultReactCallback.java中的使用情况添加getContent方法
public String getContent() { return result; }
}
}
\ No newline at end of file
......@@ -27,58 +27,60 @@ public class AgentChatService {
private final ErrorHandlerService errorHandlerService;
private final AgentProcessorFactory agentProcessorFactory;
private final AgentToolManager agentToolManager;
private final UserSseService userSseSerivce;
private final UserSseService userSseService;
private final pangea.hiagent.web.service.AgentService agentService;
private final SseTokenEmitter sseTokenEmitter;
public AgentChatService(
EventService eventService,
ErrorHandlerService errorHandlerService,
AgentProcessorFactory agentProcessorFactory,
AgentToolManager agentToolManager,
UserSseService workPanelSseService,
UserSseService userSseService,
pangea.hiagent.web.service.AgentService agentService,
SseTokenEmitter sseTokenEmitter) {
this.errorHandlerService = errorHandlerService;
this.agentProcessorFactory = agentProcessorFactory;
this.agentToolManager = agentToolManager;
this.userSseSerivce = workPanelSseService;
this.userSseService = userSseService;
this.agentService = agentService;
this.sseTokenEmitter = sseTokenEmitter;
}
// /**
// * 处理同步对话请求的统一入口
// * @param agent Agent对象
// * @param request 请求对象
// * @param userId 用户ID
// * @return 处理结果
// */
// public String handleChatSync(Agent agent, AgentRequest request, String userId) {
// log.info("开始处理同步对话请求,AgentId: {}, 用户消息: {}", agent.getId(), request.getUserMessage());
//
// try {
// // 获取处理器
// AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
// if (processor == null) {
// log.error("无法获取Agent处理器");
// return "[错误] 无法获取Agent处理器";
// }
//
// // 处理请求
// return processor.processRequest(agent, request, userId);
// } catch (Exception e) {
// log.error("处理普通Agent请求时发生错误", e);
// return "[错误] 处理请求时发生错误: " + e.getMessage();
// }
// * 处理同步对话请求的统一入口
// * @param agent Agent对象
// * @param request 请求对象
// * @param userId 用户ID
// * @return 处理结果
// */
// public String handleChatSync(Agent agent, AgentRequest request, String
// userId) {
// log.info("开始处理同步对话请求,AgentId: {}, 用户消息: {}", agent.getId(),
// request.getUserMessage());
//
// try {
// // 获取处理器
// AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
// if (processor == null) {
// log.error("无法获取Agent处理器");
// return "[错误] 无法获取Agent处理器";
// }
//
// // 处理请求
// return processor.processRequest(agent, request, userId);
// } catch (Exception e) {
// log.error("处理普通Agent请求时发生错误", e);
// return "[错误] 处理请求时发生错误: " + e.getMessage();
// }
// }
/**
* 处理流式对话请求的统一入口
*
* @param agentId Agent ID
* @param agentId Agent ID
* @param chatRequest 对话请求
* @param response HTTP响应
* @param response HTTP响应
* @return SSE emitter
*/
public SseEmitter handleChatStream(String agentId, ChatRequest chatRequest, HttpServletResponse response) {
......@@ -86,22 +88,22 @@ public class AgentChatService {
// 尝试获取当前用户ID,优先从SecurityContext获取,其次从请求中解析JWT
String userId = UserUtils.getCurrentUserId();
// 如果在主线程中未能获取到用户ID,尝试在异步环境中获取
if (userId == null) {
userId = UserUtils.getCurrentUserIdInAsync();
}
if (userId == null) {
log.error("用户未认证");
SseEmitter emitter = userSseSerivce.createEmitter();
SseEmitter emitter = userSseService.createEmitter();
// 检查响应是否已经提交
if (!response.isCommitted()) {
errorHandlerService.handleChatError(emitter, "用户未认证,请重新登录");
} else {
log.warn("响应已提交,无法发送用户未认证错误信息");
// 检查emitter是否已经完成,避免重复关闭
if (!userSseSerivce.isEmitterCompleted(emitter)) {
if (!userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
}
}
......@@ -112,14 +114,14 @@ public class AgentChatService {
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("Agent不存在: {}", agentId);
SseEmitter emitter = userSseSerivce.createEmitter();
SseEmitter emitter = userSseService.createEmitter();
// 检查响应是否已经提交
if (!response.isCommitted()) {
errorHandlerService.handleChatError(emitter, "Agent不存在");
} else {
log.warn("响应已提交,无法发送Agent不存在错误信息");
// 检查emitter是否已经完成,避免重复关闭
if (!userSseSerivce.isEmitterCompleted(emitter)) {
if (!userSseService.isEmitterCompleted(emitter)) {
emitter.complete();
}
}
......@@ -127,14 +129,14 @@ public class AgentChatService {
}
// 创建 SSE emitter
SseEmitter emitter = userSseSerivce.createEmitter();
SseEmitter emitter = userSseService.createEmitter();
// 异步处理对话,避免阻塞HTTP连接
processChatStreamAsync(emitter, agent, chatRequest, userId);
return emitter;
}
/**
* 异步处理流式对话
*/
......@@ -144,15 +146,10 @@ public class AgentChatService {
processChatRequest(emitter, agent, chatRequest, userId);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
try {
// 检查响应是否已经提交
if (emitter != null && !userSseSerivce.isEmitterCompleted(emitter)) {
errorHandlerService.handleChatError(emitter, "处理请求时发生错误", e, null);
} else {
log.warn("响应已提交或emitter已完成,无法发送处理请求错误信息");
}
} catch (Exception handlerException) {
log.error("处理错误信息时发生异常", handlerException);
// 检查响应是否已经提交
if (emitter != null && !userSseService.isEmitterCompleted(emitter)) {
errorHandlerService.handleChatError(emitter, "处理请求时发生错误", e, null);
}
}
}
......@@ -161,10 +158,10 @@ public class AgentChatService {
* 处理聊天请求的核心逻辑
* 注意:权限验证已在主线程中完成,此正仅执行业务逻辑不进行权限检查
*
* @param emitter SSE发射器
* @param agent Agent对象
* @param emitter SSE发射器
* @param agent Agent对象
* @param chatRequest 聊天请求
* @param userId 用户ID
* @param userId 用户ID
*/
private void processChatRequest(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
try {
......@@ -172,7 +169,7 @@ public class AgentChatService {
if (!validateParameters(emitter, agent, chatRequest, userId)) {
return;
}
// 获取处理器
AgentProcessor processor = agentProcessorFactory.getProcessor(agent);
if (processor == null) {
......@@ -180,39 +177,34 @@ public class AgentChatService {
errorHandlerService.handleChatError(emitter, "无法获取Agent处理器");
return;
}
// 转换请求对象
AgentRequest request = chatRequest.toAgentRequest(agent.getId(), agent, agentToolManager);
// 设置SSE发射器到token发射器
sseTokenEmitter.setEmitter(emitter);
// 设置上下文信息
sseTokenEmitter.setContext(agent, request, userId);
// 设置完成回调
sseTokenEmitter.setCompletionCallback(this::handleCompletion);
// 创建新的SseTokenEmitter实例
SseTokenEmitter tokenEmitter = sseTokenEmitter.createNewInstance(emitter, agent, request, userId, this::handleCompletion);
// 处理流式请求
processor.processStreamRequest(request, agent, userId, sseTokenEmitter);
processor.processStreamRequest(request, agent, userId, tokenEmitter);
} catch (Exception e) {
log.error("处理聊天请求时发生异常", e);
errorHandlerService.handleChatError(emitter, "处理请求时发生错误", e, null);
}
}
/**
* 处理完成回调
*
* @param emitter SSE发射器
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param emitter SSE发射器
* @param agent Agent对象
* @param request Agent请求
* @param userId 用户ID
* @param fullContent 完整内容
*/
private void handleCompletion(SseEmitter emitter, Agent agent, AgentRequest request, String userId, String fullContent) {
private void handleCompletion(SseEmitter emitter, Agent agent, AgentRequest request, String userId,
String fullContent) {
log.info("Agent处理完成,总字符数: {}", fullContent != null ? fullContent.length() : 0);
// 保存对话记录
try {
saveDialogue(agent, request, userId, fullContent);
......@@ -222,7 +214,7 @@ public class AgentChatService {
// 记录异常但不中断流程
}
}
/**
* 保存对话记录
*/
......@@ -232,16 +224,16 @@ public class AgentChatService {
log.error("保存对话记录失败:参数无效");
return;
}
try {
// 创建对话记录
pangea.hiagent.model.AgentDialogue dialogue = pangea.hiagent.model.AgentDialogue.builder()
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
.agentId(request.getAgentId())
.userMessage(request.getUserMessage())
.agentResponse(responseContent)
.userId(userId)
.build();
// 保存对话记录
agentService.saveDialogue(dialogue);
} catch (Exception e) {
......@@ -249,14 +241,14 @@ public class AgentChatService {
throw new RuntimeException("保存对话记录失败", e);
}
}
/**
* 验证所有必需参数
*
* @param emitter SSE发射器
* @param agent Agent对象
* @param emitter SSE发射器
* @param agent Agent对象
* @param chatRequest 聊天请求
* @param userId 用户ID
* @param userId 用户ID
* @return 验证是否通过
*/
private boolean validateParameters(SseEmitter emitter, Agent agent, ChatRequest chatRequest, String userId) {
......
......@@ -102,7 +102,7 @@ public class ErrorHandlerService {
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @param exception 异常对象
* @param exception 异常对象(可选)
* @param processorType 处理器类型(可选)
*/
public void handleChatError(SseEmitter emitter, String errorMessage, Exception exception, String processorType) {
......@@ -142,44 +142,25 @@ public class ErrorHandlerService {
}
/**
* 处理聊天过程中的异常()
* 处理聊天过程中的异常(简化版
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
*/
public void handleChatError(SseEmitter emitter, String errorMessage) {
// 参数验证
if (errorMessage == null || errorMessage.isEmpty()) {
errorMessage = "未知错误";
}
// 生成错误跟踪ID
String errorId = generateErrorId();
log.error("[{}] 处理聊天请求时发生错误: {}", errorId, errorMessage);
try {
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String fullErrorMessage = buildFullErrorMessage(errorMessage, null, errorId, null);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
handleChatError(emitter, errorMessage, null, null);
}
/**
* 处理Token处理过程中的异常
* 处理带完成状态标记的异常
*
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @param processorType 处理器类型
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleTokenError(SseEmitter emitter, String processorType, Exception exception, AtomicBoolean isCompleted) {
private void handleErrorWithCompletion(SseEmitter emitter, String errorMessage, String processorType, Exception exception, AtomicBoolean isCompleted) {
// 参数验证
if (processorType == null || processorType.isEmpty()) {
processorType = "未知处理器";
......@@ -192,17 +173,17 @@ public class ErrorHandlerService {
if (exception != null) {
exceptionMonitoringService.recordException(
exception.getClass().getSimpleName(),
"处理token时发生错误",
errorMessage,
java.util.Arrays.toString(exception.getStackTrace())
);
}
log.error("[{}] {}处理token时发生错误", errorId, processorType, exception);
log.error("[{}] {}: {}", errorId, processorType, errorMessage, exception);
if (!isCompleted.getAndSet(true)) {
try {
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "处理响应时发生错误";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, processorType);
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
......@@ -216,6 +197,18 @@ public class ErrorHandlerService {
}
}
/**
* 处理Token处理过程中的异常
*
* @param emitter SSE发射器
* @param processorType 处理器类型
* @param exception 异常对象
* @param isCompleted 完成状态标记
*/
public void handleTokenError(SseEmitter emitter, String processorType, Exception exception, AtomicBoolean isCompleted) {
handleErrorWithCompletion(emitter, "处理token时发生错误", processorType, exception, isCompleted);
}
/**
* 处理完成回调过程中的异常
*
......@@ -252,15 +245,13 @@ public class ErrorHandlerService {
}
/**
* 处理流式处理中的错误
* 处理基于Consumer的流式错误
*
* @param e 异常对象
* @param tokenConsumer token处理回调函数
* @param errorMessagePrefix 错误消息前缀
* @param errorMessage 完整错误消息
*/
public void handleStreamError(Throwable e, Consumer<String> tokenConsumer, String errorMessagePrefix) {
String errorMessage = errorMessagePrefix + ": " + e.getMessage();
private void handleConsumerError(Throwable e, Consumer<String> tokenConsumer, String errorMessage) {
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
......@@ -268,12 +259,24 @@ public class ErrorHandlerService {
java.util.Arrays.toString(e.getStackTrace())
);
log.error("流式处理错误: {}", errorMessage, e);
log.error(errorMessage, e);
if (tokenConsumer != null) {
tokenConsumer.accept("[ERROR] " + errorMessage);
}
}
/**
* 处理流式处理中的错误
*
* @param e 异常对象
* @param tokenConsumer token处理回调函数
* @param errorMessagePrefix 错误消息前缀
*/
public void handleStreamError(Throwable e, Consumer<String> tokenConsumer, String errorMessagePrefix) {
String errorMessage = errorMessagePrefix + ": " + e.getMessage();
handleConsumerError(e, tokenConsumer, errorMessage);
}
/**
* 发送错误信息给客户端
*
......@@ -294,18 +297,7 @@ public class ErrorHandlerService {
*/
public void handleReactFlowError(Exception e, Consumer<String> tokenConsumer) {
String errorMessage = "处理ReAct流程时发生错误: " + e.getMessage();
// 记录异常到监控服务
exceptionMonitoringService.recordException(
e.getClass().getSimpleName(),
errorMessage,
java.util.Arrays.toString(e.getStackTrace())
);
log.error("ReAct流程错误: {}", errorMessage, e);
if (tokenConsumer != null) {
tokenConsumer.accept("[ERROR] " + errorMessage);
}
handleConsumerError(e, tokenConsumer, errorMessage);
}
/**
......@@ -337,33 +329,6 @@ public class ErrorHandlerService {
* @param isCompleted 完成状态标记
*/
public void handleSaveDialogueError(SseEmitter emitter, Exception exception, AtomicBoolean isCompleted) {
// 生成错误跟踪ID
String errorId = generateErrorId();
// 记录异常到监控服务
if (exception != null) {
exceptionMonitoringService.recordException(
exception.getClass().getSimpleName(),
"保存对话记录失败",
java.util.Arrays.toString(exception.getStackTrace())
);
}
log.error("[{}] 保存对话记录失败", errorId, exception);
if (!isCompleted.getAndSet(true)) {
try {
// 检查emitter是否已经完成,避免向已完成的连接发送错误信息
if (userSseService != null && !userSseService.isEmitterCompleted(emitter)) {
String errorMessage = "保存对话记录失败,请联系技术支持";
String fullErrorMessage = buildFullErrorMessage(errorMessage, exception, errorId, "对话记录");
userSseService.sendErrorEvent(emitter, fullErrorMessage);
} else {
log.debug("[{}] SSE emitter已完成,跳过发送错误信息", errorId);
}
} catch (Exception sendErrorEx) {
log.error("[{}] 发送错误信息失败", errorId, sendErrorEx);
}
}
handleErrorWithCompletion(emitter, "保存对话记录失败", "对话记录", exception, isCompleted);
}
}
\ No newline at end of file
......@@ -2,9 +2,10 @@ package pangea.hiagent.agent.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 异常监控服务
......@@ -17,12 +18,18 @@ public class ExceptionMonitoringService {
// 异常统计信息
private final Map<String, AtomicLong> exceptionCounters = new ConcurrentHashMap<>();
// 异常详细信息缓存
private final Map<String, String> exceptionDetails = new ConcurrentHashMap<>();
// 异常详细信息缓存,使用时间戳作为键,便于按时间排序
private final Map<Long, String> exceptionDetails = new ConcurrentHashMap<>();
// 锁,用于保护缓存清理操作
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 最大缓存条目数
private static final int MAX_CACHE_SIZE = 1000;
// 清理阈值,当缓存超过最大值时,清理到这个值
private static final int CLEANUP_THRESHOLD = MAX_CACHE_SIZE - 200;
/**
* 记录异常信息
*
......@@ -37,14 +44,31 @@ public class ExceptionMonitoringService {
counter.incrementAndGet();
// 记录异常详细信息(保留最新的)
String detailKey = exceptionType + "_" + System.currentTimeMillis();
exceptionDetails.put(detailKey, formatExceptionDetail(exceptionType, errorMessage, stackTrace));
long timestamp = System.currentTimeMillis();
exceptionDetails.put(timestamp, formatExceptionDetail(exceptionType, errorMessage, stackTrace));
// 控制缓存大小
// 控制缓存大小,使用写锁保护清理操作
if (exceptionDetails.size() > MAX_CACHE_SIZE) {
// 移除最老的条目
String oldestKey = exceptionDetails.keySet().iterator().next();
exceptionDetails.remove(oldestKey);
lock.writeLock().lock();
try {
// 再次检查,避免竞态条件
if (exceptionDetails.size() > MAX_CACHE_SIZE) {
// 找出最老的条目并移除,直到达到清理阈值
while (exceptionDetails.size() > CLEANUP_THRESHOLD) {
// 找出最小的时间戳(最老的条目)
Long oldestTimestamp = exceptionDetails.keySet().stream()
.min(Long::compare)
.orElse(null);
if (oldestTimestamp != null) {
exceptionDetails.remove(oldestTimestamp);
} else {
break;
}
}
}
} finally {
lock.writeLock().unlock();
}
}
// 记录日志
......@@ -102,7 +126,11 @@ public class ExceptionMonitoringService {
* @return 异常详细信息
*/
public Map<String, String> getExceptionDetails() {
return new ConcurrentHashMap<>(exceptionDetails);
Map<String, String> result = new ConcurrentHashMap<>();
for (Map.Entry<Long, String> entry : exceptionDetails.entrySet()) {
result.put(entry.getKey().toString(), entry.getValue());
}
return result;
}
/**
......
......@@ -10,6 +10,7 @@ import pangea.hiagent.web.dto.AgentRequest;
/**
* SSE Token发射器
* 专注于将token转换为SSE事件并发送
* 无状态设计,每次使用时创建新实例
*/
@Slf4j
@Component
......@@ -17,42 +18,51 @@ public class SseTokenEmitter implements TokenConsumerWithCompletion {
private final UserSseService userSseService;
// 当前处理的emitter
private SseEmitter emitter;
// 上下文信息
private Agent agent;
private AgentRequest request;
private String userId;
// 完成回调
private CompletionCallback completionCallback;
public SseTokenEmitter(UserSseService userSseService) {
this.userSseService = userSseService;
}
// 所有状态通过构造函数一次性传入
private final SseEmitter emitter;
private final Agent agent;
private final AgentRequest request;
private final String userId;
private final CompletionCallback completionCallback;
/**
* 设置当前使用的SSE发射器
* 构造函数
* @param userSseService SSE服务
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
*/
public void setEmitter(SseEmitter emitter) {
public SseTokenEmitter(UserSseService userSseService, SseEmitter emitter, Agent agent,
AgentRequest request, String userId, CompletionCallback completionCallback) {
this.userSseService = userSseService;
this.emitter = emitter;
this.agent = agent;
this.request = request;
this.userId = userId;
this.completionCallback = completionCallback;
}
/**
* 设置上下文信息
* 无参构造函数,用于Spring容器初始化
*/
public void setContext(Agent agent, AgentRequest request, String userId) {
this.agent = agent;
this.request = request;
this.userId = userId;
public SseTokenEmitter(UserSseService userSseService) {
this(userSseService, null, null, null, null, null);
}
/**
* 设置完成回调
* 创建新的SseTokenEmitter实例
* @param emitter SSE发射器
* @param agent Agent对象
* @param request 请求对象
* @param userId 用户ID
* @param completionCallback 完成回调
* @return 新的SseTokenEmitter实例
*/
public void setCompletionCallback(CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
public SseTokenEmitter createNewInstance(SseEmitter emitter, Agent agent, AgentRequest request,
String userId, CompletionCallback completionCallback) {
return new SseTokenEmitter(userSseService, emitter, agent, request, userId, completionCallback);
}
@Override
......
package pangea.hiagent.agent.service;
import java.util.function.Consumer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import pangea.hiagent.workpanel.event.EventService;
/**
* Token消费者接口,支持完成回调
......@@ -17,17 +14,4 @@ public interface TokenConsumerWithCompletion extends Consumer<String> {
default void onComplete(String fullContent) {
// 默认实现为空
}
/**
* 当流式处理完成时调用,发送完成事件到前端
* @param fullContent 完整的内容
* @param emitter SSE发射器
* @param sseEventSender SSE事件发送器
* @param isCompleted 完成状态标记
*/
default void onComplete(String fullContent, SseEmitter emitter,
EventService eventService,
AtomicBoolean isCompleted) {
// 默认实现将在子类中覆盖
}
}
\ No newline at end of file
......@@ -25,33 +25,34 @@ import java.util.concurrent.ScheduledFuture;
@Slf4j
@Service
public class UserSseService {
// 存储所有活动的 emitter
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
// 存储用户ID到SSE Emitter的映射关系
private final ConcurrentMap<String, SseEmitter> userEmitters = new ConcurrentHashMap<>();
// 存储SSE Emitter到用户ID的反向映射关系(用于快速查找)
private final ConcurrentMap<SseEmitter, String> emitterUsers = new ConcurrentHashMap<>();
// 心跳任务执行器 - 使用共享线程池以提高资源利用率
private final ScheduledExecutorService heartbeatExecutor;
// SSE超时时间(毫秒)
private static final long SSE_TIMEOUT = 0L; // 0表示不使用默认超时,由心跳机制管理连接
private final EventService eventService;
private final TokenEventDataBuilder tokenEventDataBuilder;
private final ErrorEventDataBuilder errorEventDataBuilder;
public UserSseService(EventService eventService, TokenEventDataBuilder tokenEventDataBuilder, ErrorEventDataBuilder errorEventDataBuilder) {
public UserSseService(EventService eventService, TokenEventDataBuilder tokenEventDataBuilder,
ErrorEventDataBuilder errorEventDataBuilder) {
this.eventService = eventService;
this.tokenEventDataBuilder = tokenEventDataBuilder;
this.errorEventDataBuilder = errorEventDataBuilder;
this.heartbeatExecutor = Executors.newScheduledThreadPool(2);
}
/**
* 创建并注册SSE连接
*
......@@ -60,27 +61,27 @@ public class UserSseService {
*/
public SseEmitter createAndRegisterConnection(String userId) {
log.debug("开始为用户 {} 创建SSE连接", userId);
// 创建 SSE emitter
SseEmitter emitter = createEmitter();
log.debug("SSE Emitter创建成功");
// 注册用户的SSE连接
registerSession(userId, emitter);
log.debug("用户 {} 的SSE连接注册成功", userId);
// 注册 emitter 回调
registerCallbacks(emitter, userId);
log.debug("SSE Emitter回调注册成功");
// 启动心跳机制
startHeartbeat(emitter, new AtomicBoolean(false));
log.debug("心跳机制启动成功");
log.info("用户 {} 的SSE连接创建和注册完成", userId);
return emitter;
}
/**
* 创建SSE发射器
*
......@@ -94,12 +95,12 @@ public class UserSseService {
startHeartbeat(emitter, new AtomicBoolean(false));
return emitter;
}
/**
* 注册用户的SSE连接
* 如果该用户已有连接,则先关闭旧连接再注册新连接
*
* @param userId 用户ID
* @param userId 用户ID
* @param emitter SSE Emitter
* @return true表示注册成功,false表示注册失败
*/
......@@ -108,7 +109,7 @@ public class UserSseService {
log.warn("注册SSE会话失败:用户ID或Emitter为空");
return false;
}
try {
// 检查该用户是否已有连接
SseEmitter existingEmitter = userEmitters.get(userId);
......@@ -123,11 +124,11 @@ public class UserSseService {
userEmitters.remove(userId);
emitterUsers.remove(existingEmitter);
}
// 注册新连接
userEmitters.put(userId, emitter);
emitterUsers.put(emitter, userId);
log.info("用户 {} 的SSE连接注册成功", userId);
return true;
} catch (Exception e) {
......@@ -135,7 +136,7 @@ public class UserSseService {
return false;
}
}
/**
* 获取用户的SSE连接
*
......@@ -145,22 +146,19 @@ public class UserSseService {
public SseEmitter getSession(String userId) {
return userEmitters.get(userId);
}
/**
* 处理连接完成事件
* 通用连接关闭处理方法
*
* @param emitter SSE Emitter
* @param emitter SSE Emitter
* @param connectionType 连接类型(用于日志)
*/
public void handleConnectionCompletion(SseEmitter emitter) {
private void handleConnectionClose(SseEmitter emitter, String connectionType) {
if (emitter == null) {
return;
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
try {
......@@ -169,94 +167,47 @@ public class UserSseService {
log.debug("完成emitter时发生异常(可能是由于已关闭): {}", e.getMessage());
}
}
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
userEmitters.remove(userId);
}
emitters.remove(emitter);
log.debug("SSE连接完成,用户: {}", userId);
log.debug("SSE连接{},用户: {}", connectionType, userId);
} catch (Exception e) {
log.error("处理SSE连接完成事件时发生异常", e);
log.error("处理SSE连接{}事件时发生异常", connectionType, e);
}
}
/**
* 处理连接完成事件
*
* @param emitter SSE Emitter
*/
public void handleConnectionCompletion(SseEmitter emitter) {
handleConnectionClose(emitter, "完成");
}
/**
* 处理连接超时事件
*
* @param emitter SSE Emitter
*/
public void handleConnectionTimeout(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
try {
emitter.complete();
} catch (Exception e) {
log.debug("完成emitter时发生异常(可能是由于已关闭): {}", e.getMessage());
}
}
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
userEmitters.remove(userId);
}
emitters.remove(emitter);
log.debug("SSE连接超时,用户: {}", userId);
} catch (Exception e) {
log.error("处理SSE连接超时事件时发生异常", e);
}
handleConnectionClose(emitter, "超时");
}
/**
* 处理连接错误事件
*
* @param emitter SSE Emitter
*/
public void handleConnectionError(SseEmitter emitter) {
if (emitter == null) {
return;
}
try {
// 按照正确的SSE连接关闭顺序:
// 4. 取消心跳任务:清理相关的ScheduledFuture心跳任务(已在回调中处理)
// 5. 移除连接映射:从连接管理器(userEmitters、emitterUsers、emitters)中移除连接映射
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
try {
emitter.complete();
} catch (Exception e) {
log.debug("完成emitter时发生异常(可能是由于已关闭): {}", e.getMessage());
}
}
// 从映射表中移除连接
String userId = emitterUsers.remove(emitter);
if (userId != null) {
userEmitters.remove(userId);
}
emitters.remove(emitter);
log.debug("SSE连接错误,用户: {}", userId);
} catch (Exception e) {
log.error("处理SSE连接错误事件时发生异常", e);
}
handleConnectionClose(emitter, "错误");
}
/**
* 移除SSE发射器
*
......@@ -267,11 +218,11 @@ public class UserSseService {
log.debug("已移除SSE Emitter,剩余连接数: {}", emitters.size());
}
}
/**
* 启动心跳机制
*
* @param emitter SSE发射器
* @param emitter SSE发射器
* @param isCompleted 是否已完成
*/
public void startHeartbeat(SseEmitter emitter, AtomicBoolean isCompleted) {
......@@ -279,13 +230,13 @@ public class UserSseService {
log.warn("SSE发射器为空,无法启动心跳机制");
return;
}
// 用于追踪心跳失败次数
AtomicInteger consecutiveFailures = new AtomicInteger(0);
// 使用数组包装ScheduledFuture以解决Lambda中的变量访问问题
final ScheduledFuture<?>[] heartbeatTaskRef = new ScheduledFuture<?>[1];
// 创建心跳任务并保存ScheduledFuture引用
heartbeatTaskRef[0] = heartbeatExecutor.scheduleAtFixedRate(() -> {
try {
......@@ -297,37 +248,37 @@ public class UserSseService {
}
return;
}
// 发送心跳事件
boolean heartbeatSuccess = sendHeartbeat(emitter);
if (heartbeatSuccess) {
// 如果心跳成功,重置失败计数
consecutiveFailures.set(0);
log.debug("心跳发送成功,连续失败次数重置为0");
// 心跳成功后,连接保持活动状态,不需要额外操作,因为SSE_TIMEOUT为0
} else {
// 心跳失败,增加失败计数
int currentFailures = consecutiveFailures.incrementAndGet();
log.debug("心跳连续失败次数: {}", currentFailures);
// 如果心跳连续失败达到阈值,启动延迟关闭
if (currentFailures >= 2) { // 连续2次失败后,启动30秒延迟关闭
log.warn("心跳连续失败{}次,启动30秒延迟关闭机制", currentFailures);
// 调度一个延迟任务来关闭连接
heartbeatExecutor.schedule(() -> {
if (!isCompleted.get() && !isEmitterCompleted(emitter)) {
log.info("30秒延迟到期,主动关闭SSE连接");
// 首先取消心跳任务
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
heartbeatTaskRef[0].cancel(true);
log.debug("心跳任务已取消");
}
// 然后关闭SSE连接
try {
if (!isEmitterCompleted(emitter)) {
......@@ -341,7 +292,7 @@ public class UserSseService {
log.debug("SSE连接已完成或已关闭,跳过延迟关闭");
}
}, 30, TimeUnit.SECONDS);
// 立即取消当前心跳任务
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
heartbeatTaskRef[0].cancel(true);
......@@ -349,12 +300,12 @@ public class UserSseService {
}
}
}
} catch (Exception e) {
log.error("心跳任务执行异常: {}", e.getMessage(), e);
}
}, 20, 20, TimeUnit.SECONDS); // 每20秒发送一次心跳,确保前端60秒超时前至少收到2次心跳
// 注册回调,在连接完成时取消心跳任务
emitter.onCompletion(() -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
......@@ -362,7 +313,7 @@ public class UserSseService {
log.debug("SSE连接完成,心跳任务已取消");
}
});
// 注册回调,在连接超时时取消心跳任务
emitter.onTimeout(() -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
......@@ -370,7 +321,7 @@ public class UserSseService {
log.debug("SSE连接超时,心跳任务已取消");
}
});
// 注册回调,在连接错误时取消心跳任务
emitter.onError(throwable -> {
if (heartbeatTaskRef[0] != null && !heartbeatTaskRef[0].isCancelled()) {
......@@ -379,23 +330,36 @@ public class UserSseService {
}
});
}
/**
* 注册回调函数
*
* @param emitter SSE发射器
* @param userId 用户ID(可选,用于完整的连接管理)
*/
public void registerCallbacks(SseEmitter emitter) {
public void registerCallbacks(SseEmitter emitter, String... userId) {
boolean hasUserId = userId != null && userId.length > 0 && userId[0] != null;
emitter.onCompletion(() -> {
log.debug("【注册回调函数】SSE连接完成");
// 按照正确的关闭顺序,连接完成时已经完成关闭,只需移除连接映射
removeEmitter(emitter);
if (hasUserId) {
handleConnectionCompletion(emitter);
} else {
removeEmitter(emitter);
}
});
emitter.onError((Throwable t) -> {
log.debug("SSE连接发生错误: {}", t.getMessage());
// 错误发生时,先移除连接映射
removeEmitter(emitter);
String errorMessage = t.getMessage();
String errorType = t.getClass().getSimpleName();
log.debug("SSE连接发生错误 - 类型: {}, 消息: {}", errorType, errorMessage);
if (hasUserId) {
handleConnectionError(emitter);
} else {
removeEmitter(emitter);
}
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
try {
......@@ -406,56 +370,18 @@ public class UserSseService {
} catch (Exception e) {
log.debug("关闭SSE连接时发生异常(可能是由于已关闭): {}", e.getMessage());
}
// 超时时也移除连接映射
removeEmitter(emitter);
});
}
/**
* 注册Emitter回调函数
* 职责:注册所有必要的回调处理函数
*
* @param emitter SSE Emitter
* @param userId 用户ID
*/
public void registerCallbacks(SseEmitter emitter, String userId) {
emitter.onCompletion(() -> {
log.debug("【注册Emitter回调函数】SSE连接完成");
// 通知用户连接管理器连接已完成
handleConnectionCompletion(emitter);
});
emitter.onTimeout(() -> {
log.warn("SSE连接超时");
try {
// 检查emitter是否已经完成,避免重复关闭
if (!isEmitterCompleted(emitter)) {
emitter.complete();
}
} catch (Exception e) {
log.debug("关闭SSE连接失败(可能是由于已关闭): {}", e.getMessage());
if (hasUserId) {
handleConnectionTimeout(emitter);
} else {
removeEmitter(emitter);
}
// 通知用户连接管理器连接已超时
handleConnectionTimeout(emitter);
});
emitter.onError(throwable -> {
// 记录详细的错误信息,包括异常类型和消息
String errorMessage = throwable.getMessage();
String errorType = throwable.getClass().getSimpleName();
log.error("SSE连接错误 - 类型: {}, 消息: {}", errorType, errorMessage, throwable);
// 通知用户连接管理器连接发生错误
handleConnectionError(emitter);
});
// 注册 emitter 到管理器
registerCallbacks(emitter);
}
/**
* 完成SSE发射器
*
* @param emitter SSE发射器
* @param emitter SSE发射器
* @param isCompleted 是否已完成
*/
public void completeEmitter(SseEmitter emitter, AtomicBoolean isCompleted) {
......@@ -469,7 +395,7 @@ public class UserSseService {
log.debug("Emitter已经完成,跳过关闭操作");
return;
}
emitter.complete();
log.debug("Emitter已成功关闭");
} catch (IllegalStateException e) {
......@@ -479,7 +405,7 @@ public class UserSseService {
}
}
}
/**
* 检查SSE Emitter是否仍然有效
* 职责:提供轻量级的连接有效性检查
......@@ -491,12 +417,12 @@ public class UserSseService {
if (emitter == null) {
return false;
}
// 首先检查是否已经完成,避免不必要的事件发送
if (isEmitterCompleted(emitter)) {
return false;
}
// 检查逻辑,仅通过尝试发送ping事件来验证连接状态
try {
// 尝试发送一个空事件来检查连接状态
......@@ -508,7 +434,7 @@ public class UserSseService {
return false;
}
}
/**
* 安全检查SSE Emitter是否仍然有效(不发送实际事件)
* 职责:提供非侵入性的连接有效性检查
......@@ -520,11 +446,11 @@ public class UserSseService {
if (emitter == null) {
return false;
}
// 检查是否已经完成,而不发送任何事件
return !isEmitterCompleted(emitter);
}
/**
* 检查SSE Emitter是否已经完成
* 使用更安全的方式检查完成状态,不发送实际事件
......@@ -536,57 +462,68 @@ public class UserSseService {
if (emitter == null) {
return true; // 认为null emitter是已完成的
}
// 使用反射检查SseEmitter的完成状态
// 通过尝试发送空事件来检查emitter状态
// 这是一种更可靠的方式,不会依赖于内部实现细节
try {
java.lang.reflect.Field completedField = SseEmitter.class.getDeclaredField("completed");
completedField.setAccessible(true);
boolean completed = completedField.getBoolean(emitter);
return completed;
} catch (Exception e) {
// 如果反射失败,尝试通过发送事件检测
try {
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException ex) {
// 检查错误消息是否包含完成相关的文本
String message = ex.getMessage();
if (message != null && (message.contains("completed") || message.contains("closed"))) {
return true;
}
return true; // IllegalStateException通常表示连接已关闭
} catch (Exception ex) {
// 其他异常通常也表示连接已不可用
return true;
}
emitter.send(SseEmitter.event());
return false; // 没有异常说明未完成
} catch (IllegalStateException ex) {
// IllegalStateException通常表示连接已关闭或完成
return true;
} catch (Exception ex) {
// 其他异常通常也表示连接已不可用
return true;
}
}
/**
* 发送SSE事件
* 职责:统一发送SSE事件的基础方法
* 安全发送SSE事件,处理所有异常情况
*
* @param emitter SSE发射器
* @param emitter SSE发射器
* @param eventName 事件名称
* @param data 事件数据
* @throws IOException IO异常
* @param data 事件数据
* @return 是否发送成功
*/
public void sendEvent(SseEmitter emitter, String eventName, Object data) throws IOException {
private boolean safeSendEvent(SseEmitter emitter, String eventName, Object data) {
// 参数验证
if (emitter == null || eventName == null || eventName.isEmpty() || data == null) {
log.warn("参数验证失败,无法发送事件");
return;
return false;
}
// 检查emitter是否已经完成
if (isEmitterCompleted(emitter)) {
log.debug("SSE emitter已完成,跳过发送{}事件", eventName);
return false;
}
try {
emitter.send(SseEmitter.event().name(eventName).data(data));
return true;
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送事件,emitter已关闭: {}", e.getMessage());
// 不重新抛出异常,避免影响主流程
log.debug("无法发送{}事件,emitter已关闭: {}", eventName, e.getMessage());
return false;
} catch (Exception e) {
log.error("发送{}事件失败: {}", eventName, e.getMessage(), e);
return false;
}
}
/**
* 发送SSE事件
* 职责:统一发送SSE事件的基础方法
*
* @param emitter SSE发射器
* @param eventName 事件名称
* @param data 事件数据
* @throws IOException IO异常
*/
public void sendEvent(SseEmitter emitter, String eventName, Object data) throws IOException {
safeSendEvent(emitter, eventName, data);
}
/**
* 发送心跳事件
*
......@@ -598,34 +535,23 @@ public class UserSseService {
log.warn("SSE发射器为空,无法发送心跳事件");
return false;
}
// 检查emitter是否已经完成,避免向已完成的连接发送心跳
if (isEmitterCompleted(emitter)) {
log.debug("SSE发射器已完成,跳过发送心跳事件");
return false;
}
try {
// 发送心跳事件
long heartbeatTimestamp = System.currentTimeMillis();
emitter.send(SseEmitter.event().name("heartbeat").data(heartbeatTimestamp));
// 发送心跳事件
long heartbeatTimestamp = System.currentTimeMillis();
boolean success = safeSendEvent(emitter, "heartbeat", heartbeatTimestamp);
if (success) {
log.debug("[心跳] 成功发送心跳事件,时间戳: {}", heartbeatTimestamp);
return true;
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送心跳事件,emitter已关闭或完成: {}", e.getMessage());
return false;
} catch (Exception e) {
log.warn("发送心跳事件失败: {}", e.getMessage());
return false;
}
return success;
}
/**
* 发送工作面板事件给指定的SSE连接
*
* @param emitter SSE发射器
* @param event 工作面板事件
* @param event 工作面板事件
* @throws IOException IO异常
*/
public void sendWorkPanelEvent(SseEmitter emitter, WorkPanelEvent event) throws IOException {
......@@ -637,14 +563,14 @@ public class UserSseService {
try {
// 构建事件数据
Map<String, Object> data = eventService.buildWorkPanelEventData(event);
if (data != null) {
log.debug("准备发送工作面板事件: 类型={}, 事件内容={}", event.getType(), event);
log.debug("事件数据: {}", data);
// 发送事件
emitter.send(SseEmitter.event().name("message").data(data));
log.debug("工作面板事件发送成功: 类型={}", event.getType());
} else {
log.warn("构建事件数据失败,无法发送事件: 类型={}", event.getType());
......@@ -656,20 +582,20 @@ public class UserSseService {
} catch (Exception e) {
// 记录详细错误信息,但不中断主流程
log.error("发送工作面板事件失败: 类型={}, 错误={}", event.getType(), e.getMessage(), e);
// 其他异常不重新抛出,避免影响主流程
}
}
/**
* 发送工作面板事件给指定用户
*
* @param userId 用户ID
* @param event 工作面板事件
* @param event 工作面板事件
*/
public void sendWorkPanelEventToUser(String userId, WorkPanelEvent event) {
log.debug("开始向用户 {} 发送工作面板事件: {}", userId, event.getType());
// 检查连接是否仍然有效
SseEmitter emitter = getSession(userId);
if (emitter != null) {
......@@ -684,7 +610,7 @@ public class UserSseService {
log.debug("连接已失效,跳过发送事件: {}", event.getType());
}
}
/**
* 发送连接成功事件
*
......@@ -696,29 +622,26 @@ public class UserSseService {
log.warn("SSE发射器为空,无法发送连接成功事件");
return;
}
try {
WorkPanelEvent connectedEvent = WorkPanelEvent.builder()
WorkPanelEvent connectedEvent = WorkPanelEvent.builder()
.type("observation")
.title("连接成功")
.timestamp(System.currentTimeMillis())
.build();
Map<String, Object> data = eventService.buildWorkPanelEventData(connectedEvent);
emitter.send(SseEmitter.event().name("message").data(data));
Map<String, Object> data = eventService.buildWorkPanelEventData(connectedEvent);
boolean success = safeSendEvent(emitter, "message", data);
if (success) {
log.debug("已发送连接成功事件");
} catch (IOException e) {
log.error("发送连接成功事件失败", e);
throw e;
}
}
/**
* 发送Token事件
*
* @param emitter SSE发射器
* @param token Token内容
* @param token Token内容
* @throws IOException IO异常
*/
public void sendTokenEvent(SseEmitter emitter, String token) throws IOException {
......@@ -726,34 +649,26 @@ public class UserSseService {
log.warn("SSE发射器或Token为空,无法发送Token事件");
return;
}
try {
// 检查emitter是否已经完成
if (!isEmitterCompleted(emitter)) {
// 构建token事件数据
Map<String, Object> data = tokenEventDataBuilder.createOptimizedTokenEventData(token);
if (data != null) {
// 发送事件
emitter.send(SseEmitter.event().name("token").data(data));
} else {
log.warn("构建token事件数据失败,无法发送事件");
}
// 构建token事件数据
Map<String, Object> data = tokenEventDataBuilder.createOptimizedTokenEventData(token);
if (data != null) {
// 发送事件
safeSendEvent(emitter, "token", data);
} else {
log.debug("SSE emitter已完成,跳过发送token事件");
log.warn("构建token事件数据失败,无法发送事件");
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送token事件,emitter已关闭或完成: {}", e.getMessage());
} catch (Exception e) {
log.error("发送token事件失败: token长度={}, 错误={}", token.length(), e.getMessage(), e);
}
}
/**
* 发送错误事件
*
* @param emitter SSE发射器
* @param emitter SSE发射器
* @param errorMessage 错误信息
* @throws IOException IO异常
*/
......@@ -762,30 +677,22 @@ public class UserSseService {
log.warn("SSE发射器或错误信息为空,无法发送错误事件");
return;
}
try {
// 检查emitter是否已经完成
if (!isEmitterCompleted(emitter)) {
// 构建错误事件数据
Map<String, Object> data = errorEventDataBuilder.createErrorEventData(errorMessage);
if (data != null) {
// 发送事件
emitter.send(SseEmitter.event().name("error").data(data));
} else {
log.warn("构建错误事件数据失败,无法发送事件");
}
// 构建错误事件数据
Map<String, Object> data = errorEventDataBuilder.createErrorEventData(errorMessage);
if (data != null) {
// 发送事件
safeSendEvent(emitter, "error", data);
} else {
log.debug("SSE emitter已完成,跳过发送错误事件");
log.warn("构建错误事件数据失败,无法发送事件");
}
} catch (IllegalStateException e) {
// 处理 emitter 已关闭的情况
log.debug("无法发送错误事件,emitter已关闭或完成: {}", e.getMessage());
} catch (Exception e) {
log.error("发送错误事件失败: 错误信息={}, 错误={}", errorMessage, e.getMessage(), e);
}
}
/**
* 获取所有活动的emitters
*
......@@ -794,7 +701,7 @@ public class UserSseService {
public List<SseEmitter> getEmitters() {
return new ArrayList<>(emitters);
}
/**
* 销毁资源
*/
......
......@@ -4,11 +4,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.security.access.PermissionEvaluator;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Component;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.web.service.TimerService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.model.TimerConfig;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.web.service.TimerService;
import java.io.Serializable;
......@@ -20,6 +19,9 @@ import java.io.Serializable;
@Component("permissionEvaluator")
public class DefaultPermissionEvaluator implements PermissionEvaluator {
private static final String AGENT_TYPE = "Agent";
private static final String TIMER_CONFIG_TYPE = "TimerConfig";
private final AgentService agentService;
private final TimerService timerService;
......@@ -37,33 +39,21 @@ public class DefaultPermissionEvaluator implements PermissionEvaluator {
return false;
}
Object principal = authentication.getPrincipal();
if (principal == null) {
return false;
}
String userId = principal.toString();
String userId = authentication.getPrincipal().toString();
String perm = (String) permission;
try {
// 处理Agent访问权限
if (targetDomainObject instanceof Agent) {
Agent agent = (Agent) targetDomainObject;
return checkAgentAccess(userId, agent, perm);
return checkAgentAccess(userId, (Agent) targetDomainObject, perm);
}
// 处理TimerConfig访问权限
else if (targetDomainObject instanceof TimerConfig) {
TimerConfig timer = (TimerConfig) targetDomainObject;
return checkTimerAccess(userId, timer, perm);
}
// 处理基于ID的资源访问
else if (targetDomainObject instanceof String) {
// 这种情况在hasPermission(Authentication, Serializable, String, Object)方法中处理
return false;
return checkTimerAccess(userId, (TimerConfig) targetDomainObject, perm);
}
} catch (Exception e) {
log.error("权限检查过程中发生异常: userId={}, targetDomainObject={}, permission={}", userId, targetDomainObject, permission, e);
return false;
log.error("权限检查异常: userId={}, target={}, permission={}, error={}",
userId, targetDomainObject.getClass().getSimpleName(), perm, e.getMessage());
}
return false;
......@@ -75,36 +65,23 @@ public class DefaultPermissionEvaluator implements PermissionEvaluator {
return false;
}
Object principal = authentication.getPrincipal();
if (principal == null) {
return false;
}
String userId = principal.toString();
String userId = authentication.getPrincipal().toString();
String perm = (String) permission;
try {
// 处理基于ID的权限检查
if ("Agent".equals(targetType)) {
if (AGENT_TYPE.equals(targetType)) {
Agent agent = agentService.getAgent(targetId.toString());
if (agent == null) {
log.warn("未找到ID为 {} 的Agent", targetId);
return false;
}
return checkAgentAccess(userId, agent, perm);
return agent != null && checkAgentAccess(userId, agent, perm);
}
// 处理TimerConfig资源的权限检查
else if ("TimerConfig".equals(targetType)) {
else if (TIMER_CONFIG_TYPE.equals(targetType)) {
TimerConfig timer = timerService.getTimerById(targetId.toString());
if (timer == null) {
log.warn("未找到ID为 {} 的TimerConfig", targetId);
return false;
}
return checkTimerAccess(userId, timer, perm);
return timer != null && checkTimerAccess(userId, timer, perm);
}
} catch (Exception e) {
log.error("基于ID的权限检查过程中发生异常: userId={}, targetId={}, targetType={}, permission={}", userId, targetId, targetType, permission, e);
return false;
log.error("基于ID的权限检查异常: userId={}, targetId={}, targetType={}, permission={}, error={}",
userId, targetId, targetType, perm, e.getMessage());
}
return false;
......@@ -119,24 +96,17 @@ public class DefaultPermissionEvaluator implements PermissionEvaluator {
return true;
}
// 检查Agent所有者
// 所有者可以访问
if (agent.getOwner().equals(userId)) {
return true;
}
// 根据权限类型进行检查
switch (permission.toLowerCase()) {
case "read":
// 所有用户都可以读取公开的Agent(如果有此概念)
return false; // 暂时不支持公开Agent
case "write":
case "delete":
case "execute":
// 只有所有者可以写入、删除或执行Agent
return agent.getOwner().equals(userId);
default:
return false;
}
// 根据权限类型进行检查(目前只支持所有者访问)
String permissionLower = permission.toLowerCase();
return switch (permissionLower) {
case "read", "write", "delete", "execute" -> agent.getOwner().equals(userId);
default -> false;
};
}
/**
......@@ -148,32 +118,24 @@ public class DefaultPermissionEvaluator implements PermissionEvaluator {
return true;
}
// 检查定时器创建者
// 创建者可以访问
if (timer.getCreatedBy() != null && timer.getCreatedBy().equals(userId)) {
return true;
}
// 根据权限类型进行检查
switch (permission.toLowerCase()) {
case "read":
// 所有用户都可以读取公开的定时器(如果有此概念)
return false; // 暂时不支持公开定时器
case "write":
case "delete":
// 只有创建者可以修改或删除定时器
return timer.getCreatedBy() != null && timer.getCreatedBy().equals(userId);
default:
return false;
}
// 根据权限类型进行检查(目前只支持创建者访问)
String permissionLower = permission.toLowerCase();
return switch (permissionLower) {
case "read", "write", "delete" -> timer.getCreatedBy() != null && timer.getCreatedBy().equals(userId);
default -> false;
};
}
/**
* 检查是否为管理员用户
*/
private boolean isAdminUser(String userId) {
// 这里可以根据实际需求实现管理员检查逻辑
// 例如查询数据库或检查特殊用户ID
// 当前实现保留原有逻辑,但可以通过配置或数据库来管理管理员用户
// 管理员用户检查,可扩展为从配置或数据库读取
return "admin".equals(userId) || "user-001".equals(userId);
}
}
\ No newline at end of file
......@@ -5,18 +5,16 @@ import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import pangea.hiagent.common.utils.JwtUtil;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import pangea.hiagent.common.utils.JwtUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* JWT认证过滤器
......@@ -26,6 +24,8 @@ import java.util.List;
@Component
public class JwtAuthenticationFilter extends OncePerRequestFilter {
private static final String BEARER_PREFIX = "Bearer ";
private final JwtUtil jwtUtil;
public JwtAuthenticationFilter(JwtUtil jwtUtil) {
......@@ -35,19 +35,8 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
boolean isStreamEndpoint = request.getRequestURI().contains("/api/v1/agent/chat-stream");
boolean isTimelineEndpoint = request.getRequestURI().contains("/api/v1/agent/timeline-events");
if (isStreamEndpoint) {
log.info("处理Agent流式对话请求: {} {}", request.getMethod(), request.getRequestURI());
}
if (isTimelineEndpoint) {
log.info("处理时间轴事件订阅请求: {} {}", request.getMethod(), request.getRequestURI());
}
// 对于OPTIONS请求,直接放行
if ("OPTIONS".equalsIgnoreCase(request.getMethod())) {
log.debug("OPTIONS请求,直接放行");
filterChain.doFilter(request, response);
return;
}
......@@ -55,66 +44,25 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
try {
String token = extractTokenFromRequest(request);
log.debug("JWT过滤器处理请求: {} {},提取到token: {}", request.getMethod(), request.getRequestURI(), token);
if (StringUtils.hasText(token)) {
log.debug("开始JWT验证,token长度: {}", token.length());
// 验证token是否有效
boolean isValid = jwtUtil.validateToken(token);
log.debug("JWT验证结果: {}", isValid);
if (isValid) {
if (jwtUtil.validateToken(token)) {
String userId = jwtUtil.getUserIdFromToken(token);
log.debug("JWT验证通过,用户ID: {}", userId);
if (userId != null) {
// 创建认证对象,添加基本权限
List<SimpleGrantedAuthority> authorities = Collections.singletonList(new SimpleGrantedAuthority("ROLE_USER"));
UsernamePasswordAuthenticationToken authentication =
new UsernamePasswordAuthenticationToken(userId, null, authorities);
var authorities = Collections.singletonList(new SimpleGrantedAuthority("ROLE_USER"));
var authentication = new UsernamePasswordAuthenticationToken(userId, null, authorities);
SecurityContextHolder.getContext().setAuthentication(authentication);
log.debug("已设置SecurityContext中的认证信息,用户ID: {}, 权限: {}", userId, authentication.getAuthorities());
} else {
log.warn("从token中提取的用户ID为空");
}
} else {
log.warn("JWT验证失败,token可能已过期或无效");
// 检查token是否过期
boolean isExpired = jwtUtil.isTokenExpired(token);
log.warn("Token过期状态: {}", isExpired);
}
} else {
log.debug("未找到有效的token");
// 记录请求信息以便调试
log.debug("请求URL: {}", request.getRequestURL());
log.debug("请求方法: {}", request.getMethod());
log.debug("Authorization头: {}", request.getHeader("Authorization"));
log.debug("token参数: {}", request.getParameter("token"));
}
} catch (Exception e) {
log.error("JWT认证处理异常", e);
log.error("JWT认证处理异常: {}", e.getMessage());
// 不在此处发送错误响应,让Spring Security的ExceptionTranslationFilter处理
// 这样可以避免响应被提前提交
}
// 特别处理流式端点的权限问题
if (isStreamEndpoint || isTimelineEndpoint) {
// 检查是否已认证
if (SecurityContextHolder.getContext().getAuthentication() == null) {
log.warn("流式端点未认证访问: {} {}", request.getMethod(), request.getRequestURI());
// 对于SSE端点,如果未认证,我们不立即返回错误,而是让后续处理决定
// 因为客户端可能会在重新连接时带上token
}
// 对于SSE端点,直接执行过滤器链,不进行额外的响应检查
filterChain.doFilter(request, response);
log.debug("JwtAuthenticationFilter处理完成(SSE端点): {} {}", request.getMethod(), request.getRequestURI());
return;
}
// 继续执行过滤器链,让Spring Security的其他过滤器处理认证和授权
// 这样可以让ExceptionTranslationFilter和AuthorizationFilter正确处理认证失败和权限拒绝
// 继续执行过滤器链
filterChain.doFilter(request, response);
log.debug("JwtAuthenticationFilter处理完成: {} {}", request.getMethod(), request.getRequestURI());
}
/**
......@@ -124,23 +72,11 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
private String extractTokenFromRequest(HttpServletRequest request) {
// 首先尝试从请求头中提取Token
String authHeader = request.getHeader("Authorization");
log.debug("从请求头中提取Authorization: {}", authHeader);
if (StringUtils.hasText(authHeader) && authHeader.startsWith("Bearer ")) {
String token = authHeader.substring(7);
log.debug("从Authorization头中提取到token");
return token;
if (StringUtils.hasText(authHeader) && authHeader.startsWith(BEARER_PREFIX)) {
return authHeader.substring(BEARER_PREFIX.length());
}
// 如果请求头中没有Token,则尝试从URL参数中提取
// 这对于SSE连接特别有用,因为浏览器在自动重连时可能不会发送Authorization头
String tokenParam = request.getParameter("token");
log.debug("从URL参数中提取token参数: {}", tokenParam);
if (StringUtils.hasText(tokenParam)) {
log.debug("从URL参数中提取到token");
return tokenParam;
}
log.debug("未找到有效的token");
return null;
return request.getParameter("token");
}
}
\ No newline at end of file
......@@ -5,20 +5,13 @@ import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.filter.OncePerRequestFilter;
import pangea.hiagent.common.utils.JwtUtil;
import pangea.hiagent.web.service.AgentService;
import pangea.hiagent.model.Agent;
import pangea.hiagent.common.utils.UserUtils;
import pangea.hiagent.web.service.AgentService;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* SSE流式端点授权检查过滤器
......@@ -32,74 +25,30 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
private static final String STREAM_ENDPOINT = "/api/v1/agent/chat-stream";
private static final String TIMELINE_ENDPOINT = "/api/v1/agent/timeline-events";
private final JwtUtil jwtUtil;
private final AgentService agentService;
public SseAuthorizationFilter(JwtUtil jwtUtil, AgentService agentService) {
this.jwtUtil = jwtUtil;
public SseAuthorizationFilter(AgentService agentService) {
this.agentService = agentService;
}
/**
* 发送SSE格式的未授权错误响应
* 发送SSE格式的错误响应
*/
private void sendSseUnauthorizedError(HttpServletResponse response) {
private void sendSseError(HttpServletResponse response, int status, String errorMessage) {
try {
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.setStatus(status);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"未授权访问,请先登录\", \"code\": 401, \"timestamp\": " +
response.getWriter().write("data: {\"error\": \"" + errorMessage + "\", \"code\": " + status + ", \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE未授权错误响应");
log.debug("已发送SSE错误响应: {} - {}", status, errorMessage);
} catch (IOException e) {
log.error("发送SSE未授权错误响应失败", e);
}
}
/**
* 发送SSE格式的Agent不存在错误响应
*/
private void sendSseAgentNotFoundError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"Agent不存在\", \"code\": 404, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE Agent不存在错误响应");
} catch (IOException e) {
log.error("发送SSE Agent不存在错误响应失败", e);
}
}
/**
* 发送SSE格式的访问拒绝错误响应
*/
private void sendSseAccessDeniedError(HttpServletResponse response) {
try {
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
response.setContentType("text/event-stream;charset=UTF-8");
response.setCharacterEncoding("UTF-8");
// 发送SSE格式的错误事件
response.getWriter().write("event: error\n");
response.getWriter().write("data: {\"error\": \"访问被拒绝,无权限访问该Agent\", \"code\": 403, \"timestamp\": " +
System.currentTimeMillis() + "}\n\n");
response.getWriter().flush();
log.debug("已发送SSE 访问拒绝错误响应");
} catch (IOException e) {
log.error("发送SSE 访问拒绝错误响应失败", e);
log.error("发送SSE错误响应失败: {}", e.getMessage());
}
}
......@@ -122,44 +71,39 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
}
// 从SecurityContext获取当前认证用户
String userId = null;
var authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.isAuthenticated() && !"anonymousUser".equals(authentication.getPrincipal())) {
userId = authentication.getName();
}
String userId = getCurrentUserId();
if (userId != null) {
log.debug("SSE端点已认证,用户: {}", userId);
// 如果是chat-stream端点,需要额外验证agent权限
if (isStreamEndpoint) {
// 从请求参数中获取agentId
String agentId = request.getParameter("agentId");
if (agentId != null) {
try {
Agent agent = agentService.getAgent(agentId);
if (agent == null) {
log.warn("SSE端点访问失败:Agent不存在 - AgentId: {}", agentId);
sendSseAgentNotFoundError(response);
sendSseError(response, HttpServletResponse.SC_NOT_FOUND, "Agent不存在");
return;
}
// 验证用户是否有权限访问该agent
if (!agent.getOwner().equals(userId) && !UserUtils.isAdminUser(userId)) {
if (!agent.getOwner().equals(userId) && !isAdminUser(userId)) {
log.warn("SSE端点访问失败:用户 {} 无权限访问Agent: {}", userId, agentId);
sendSseAccessDeniedError(response);
sendSseError(response, HttpServletResponse.SC_FORBIDDEN, "访问被拒绝,无权限访问该Agent");
return;
}
log.debug("SSE端点Agent权限验证成功,用户: {}, Agent: {}", userId, agentId);
} catch (Exception e) {
log.error("SSE端点Agent权限验证异常: {}", e.getMessage());
sendSseAccessDeniedError(response);
sendSseError(response, HttpServletResponse.SC_FORBIDDEN, "访问被拒绝");
return;
}
} else {
log.warn("SSE端点请求缺少agentId参数");
sendSseAgentNotFoundError(response);
sendSseError(response, HttpServletResponse.SC_NOT_FOUND, "Agent不存在");
return;
}
}
......@@ -170,7 +114,7 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
} else {
// 用户未认证,拒绝连接
log.warn("SSE端点未认证访问,拒绝连接: {} {}", request.getMethod(), requestUri);
sendSseUnauthorizedError(response);
sendSseError(response, HttpServletResponse.SC_UNAUTHORIZED, "未授权访问,请先登录");
return;
}
}
......@@ -180,35 +124,31 @@ public class SseAuthorizationFilter extends OncePerRequestFilter {
}
/**
* 从请求头或参数中提取Token
* 从SecurityContext获取当前认证用户ID
*/
private String extractTokenFromRequest(HttpServletRequest request) {
// 首先尝试从请求头中提取Token
String authHeader = request.getHeader("Authorization");
if (StringUtils.hasText(authHeader) && authHeader.startsWith("Bearer ")) {
return authHeader.substring(7);
}
// 如果请求头中没有Token,则尝试从URL参数中提取
String tokenParam = request.getParameter("token");
if (StringUtils.hasText(tokenParam)) {
return tokenParam;
private String getCurrentUserId() {
var authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.isAuthenticated() && !"anonymousUser".equals(authentication.getPrincipal())) {
return authentication.getName();
}
return null;
}
/**
* 检查是否为管理员用户
*/
private boolean isAdminUser(String userId) {
// 与DefaultPermissionEvaluator保持一致的管理员检查逻辑
return "admin".equals(userId) || "user-001".equals(userId);
}
/**
* 确定此过滤器是否应处理给定请求
* 只处理SSE流式端点
*/
@Override
protected boolean shouldNotFilter(HttpServletRequest request) throws ServletException {
protected boolean shouldNotFilter(HttpServletRequest request) {
String requestUri = request.getRequestURI();
boolean isStreamEndpoint = requestUri.contains(STREAM_ENDPOINT);
boolean isTimelineEndpoint = requestUri.contains(TIMELINE_ENDPOINT);
// 如果不是SSE端点,跳过此过滤器
return !(isStreamEndpoint || isTimelineEndpoint);
return !(requestUri.contains(STREAM_ENDPOINT) || requestUri.contains(TIMELINE_ENDPOINT));
}
}
......@@ -228,6 +228,130 @@ hiagent:
top-k: 5
score-threshold: 0.8
# ReAct配置
react:
system-prompt: >
You are a powerful professional AI assistant powered by the enhanced ReAct (Reasoning + Acting) iterative framework, specialized for Spring AI tool orchestration. Your core mission is to solve complex, multi-step user queries with high accuracy by following the upgraded rules. The TOP PRIORITY principle is: ALWAYS CALL TOOLS FIRST, and answer questions EXCLUSIVELY based on tool execution results. You have full authority to intelligently select, combine, and serially invoke multiple tools, and iterate reasoning until a complete and satisfactory answer is obtained.
=== CORE UPGRADED RULE - NON-NEGOTIABLE (Tool-First Priority Highlighted) ===
1. Tool-First Mandate: For any query that requires factual verification, data calculation, information extraction, content analysis, or scenario-based processing, YOU MUST CALL RELEVANT TOOLS FIRST. Never answer directly relying on internal knowledge without tool invocation, except for extremely simple common-sense questions (e.g., "What is 1+1?").
2. Answer Based on Tool Results Only: All conclusions, data, and insights in the final answer must be strictly derived from the real execution results of Spring AI tools. Never fabricate any data, assumptions, or inferences that are not supported by tool outputs.
3. Serial Multi-Tool Invocation Supported: You can invoke multiple tools in serial order in one Action phase. By default, the output of the previous tool is the directly valid input of the next tool (first-class support for tool chaining).
4. Iterative ReAct Closed-Loop: The ReAct thinking process is a cyclic loop. After each Observation phase, you can return to the Thought phase to re-analyze, reselect tools, and re-execute until the answer is complete/satisfactory.
5. Mandatory Tool Synergy: Complex queries must use multi-tool combinations. A single tool can only solve simple problems; never rely on a single tool for complex tasks.
6. Strict Compliance with Spring AI Mechanism: All tool calls are executed automatically by the Spring AI framework. You only need to make optimal tool selection and sequence planning.
=== ENHANCED TOOL SYNERGY & ORCHESTRATION STRATEGY ===
You have access to a full set of specialized Spring AI tools and must create value through intelligent tool collocation, with tool-first logic throughout:
- Serial Chaining (Highest Priority): The output of one tool directly feeds into the input of another, forming a closed tool call chain (e.g., File Reader → Text Processor → Calculator → File Writer → Chart Generator).
- Parallel Combination: Call multiple independent tools simultaneously to collect multi-dimensional data, then merge results for comprehensive analysis.
- Preprocessing & Postprocessing: Use formatting tools to clean raw data before core tool execution; use conversion tools to optimize result presentation afterward.
- Layered Enrichment: Combine extraction, analysis, and calculation tools to gain in-depth insights instead of superficial data.
- Priority Matching: Select lightweight tools first for simple sub-tasks; use heavyweight tools only for complex ones (resource efficiency).
- Fault Tolerance Fallback: If a selected tool is unavailable/returns invalid results, immediately invoke an alternative tool with the same function to re-execute the sub-task.
=== Typical High-Value Tool Synergy Examples ===
1. Web Content Extractor → Text Parser & Cleaner → NLP Analyzer → Statistical Calculator → Result Formatter → File Saver
2. Current DateTime Tool → Date Formatter → Data Filter → Time Series Analyzer → Visualization Tool
3. Document Reader → Table Extractor → Data Validator → Formula Calculator → Report Generator
4. Input Parameter Parser → Multiple Business Tools (Serial) → Result Aggregator → Answer Polisher
=== UPGRADED ITERATIVE ReAct THINKING PROCESS (Tool-First Oriented) ===
This is a cyclic, repeatable process for EVERY query, with tool-first logic as the core. Execute in order and loop infinitely until the answer meets completeness requirements.
▶ Cycle Trigger Rule: After Step 4 (Observation), if results are incomplete/insufficient/need optimization → Return to Step 1 (Thought) to re-analyze and re-execute.
▶ Cycle Termination Rule: After Step 4 (Observation), if results are complete/accurate/satisfactory → Enter Step 5 (Final Answer) directly.
Step 1 - THOUGHT (Tool-First Iterative Reasoning & Planning): Deeply analyze the user's core query and current context with tool-first logic
- Break down the main problem into hierarchical sub-tasks (primary → secondary → fine-grained).
- Tool-First Matching: For each sub-task, FIRST identify relevant tools (never consider direct answering first). Mark alternative tools for fault tolerance.
- Confirm Tool Synergy Feasibility: Judge serial/parallel combination of multi-tools and define the exact invocation sequence.
- Iterative Scenario Adjustment: Re-analyze the gap between current tool results and expected answers, adjust tool selection/sequence.
- Verify Preconditions: Ensure input format and parameter validity for tool invocation are met.
Step 2 - ACTION (Multi-Tool Serial/Parallel Execution): Execute the planned tool chain with clear purpose, adhering to tool-first principle
- Call tools in the pre-defined serial/parallel order based on Thought phase analysis.
- Support multiple consecutive tool calls in one Action phase (serial chain) for Spring AI, no limit on the number of tools.
- Wait for ALL tool execution results (serial: one by one / parallel: all at once) before proceeding; never jump early.
- Fault Tolerance Execution: If a tool returns invalid/empty results, immediately invoke the pre-marked alternative tool and re-execute the sub-task.
Step 3 - OBSERVATION (Tool Result-Centric Analysis & Validation): Comprehensively interpret all tool execution results
- Examine data/results from each tool in detail, cross-verify accuracy, completeness, and logical consistency.
- Extract key information, patterns, and insights EXCLUSIVELY from combined tool results.
- Judge Completion Status: Confirm if current results cover all sub-tasks and meet the user's core needs.
- Identify Gaps: Mark missing information/unsolved sub-tasks that require further tool invocation.
- Evaluate Tool Synergy Effect: Confirm if the tool chain provides deeper insights than single-tool usage.
Step 4 - ITERATION DECISION: Critical judgment for ReAct cycle
- ✅ TERMINATE CYCLE: If observation results are complete, accurate, sufficient, and fully meet the user's query → Proceed to Step 5.
- ♻️ RESTART CYCLE: If observation results are incomplete/insufficient/have missing information → Return to Step 1.
Step 5 - FINAL ANSWER (Tool Result-Synthesized Response): Generate the ultimate answer based solely on tool results
- Synthesize all valid tool results (from iterative cycles) into a coherent, logical, and complete answer.
- Present information in clear, easy-to-understand natural language, distinguishing key insights from basic information.
- Explicitly explain tool synergy logic (e.g., "Tool A processed raw data for Tool B, enabling accurate calculation by Tool C").
- Provide actionable conclusions, recommendations, or follow-up suggestions based on integrated tool results.
- Keep the answer conversational and business-oriented; remove redundant technical tool details.
=== STANDARDIZED RESPONSE FORMAT ===
Strictly follow this fixed structure for all responses to ensure correct parsing by Spring AI:
1. Thought: Detailed explanation of problem analysis, sub-task breakdown, tool-first selection strategy, and invocation sequence
- Identified Sub-Problems: List all primary/secondary sub-tasks clearly.
- Tool-First Matching: Tools assigned to each sub-task + alternative tools (if any).
- Execution Sequence: Exact serial/parallel order of multi-tool invocation and its optimality.
- Iteration Note: If re-analyzing (loop), explain gaps in previous results and tool selection adjustments.
2. Action: Clear description of all tool calls in this phase (serial number + tool name + core purpose)
- Tool_Call: 1.[Tool Name] → Purpose: [Exact business objective and core value]
- Tool_Call: 2.[Tool Name] → Purpose: [Complement the previous tool, use its output as input]
- Tool_Call: N.[Tool Name] → Purpose: [Final enrichment/validation/formatting of the result chain]
- (Fallback) If Tool X Unavailable: Use [Alternative Tool Name] → Purpose: [Same objective as Tool X]
3. Observation: Comprehensive interpretation of all tool execution results
- Results from each individual tool (key data, no redundant details).
- Logical relationship between multiple tool results (how they connect and complement).
- Core patterns/insights from the tool chain.
- Completion Status: Whether results cover all sub-tasks and missing information (if any).
4. Iteration_Decision: Explicit single choice
- Option 1: Terminate Cycle → Proceed to Final Answer (complete results)
- Option 2: Restart Cycle → Re-enter Thought phase (incomplete results)
5. Final_Answer: Polished, complete, and user-friendly natural language solution
- Direct answer to the original query, with core conclusions first.
- Highlight key insights from tool synergy/iterative reasoning.
- Provide actionable follow-up suggestions.
- Conversational tone; no technical jargon about tools/frameworks.
=== CRITICAL HARD RULES (Tool-First as Core) ===
1. Tool-First is Non-Negotiable: For non-trivial queries, call tools first. Never answer directly with internal knowledge unless it's extremely simple common sense.
2. Tool Results are the Sole Basis: All answers must rely on real Spring AI tool execution results. Never fabricate data/results.
3. Mandatory Multi-Tool Synergy: Complex queries must use tool combinations. Never rely on a single tool for complex tasks.
4. Full Support for Serial Invocation: One Action phase can call N tools in sequence, with prior output as next input.
5. Iterative ReAct is Mandatory: Never stop at one-time execution; loop until the answer is complete and satisfactory.
6. Explicit Tool Strategy: All tool selection, sequence planning, and fallback options must be clearly stated in Thought.
7. Unavailable Tool Handling: Immediately use an alternative tool if the selected one is unavailable; do not suspend execution.
8. User Experience Priority: The Final Answer must be conversational and business-focused, hiding technical tool details.
9. Spring AI Compliance: All tool calls follow the framework's automatic execution rules; no custom execution logic.
# Milvus Lite配置
milvus:
data-dir: ./milvus_data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment