From aee18837e446c7a5a773ee8306d4419d92f78dd0 Mon Sep 17 00:00:00 2001 From: JOJO <1498581755@qq.com> Date: Thu, 20 Nov 2025 17:25:30 +0800 Subject: [PATCH] feat: enhance smart thinking scheduling --- config/limits.py | 2 + prompts/thinking_mode_guidelines.txt | 10 +- static/app.js | 17 +++- static/index.html | 8 +- utils/api_client.py | 35 +++++-- web_server.py | 141 ++++++++++++++++++++++++++- 6 files changed, 188 insertions(+), 25 deletions(-) diff --git a/config/limits.py b/config/limits.py index ff9dd44..c3687c6 100644 --- a/config/limits.py +++ b/config/limits.py @@ -18,6 +18,7 @@ MAX_ITERATIONS_PER_TASK = 100 MAX_CONSECUTIVE_SAME_TOOL = 50 MAX_TOTAL_TOOL_CALLS = 100 TOOL_CALL_COOLDOWN = 0.5 +THINKING_FAST_INTERVAL = 10 # 工具字符/体积限制 MAX_READ_FILE_CHARS = 30000 @@ -49,6 +50,7 @@ __all__ = [ "MAX_CONSECUTIVE_SAME_TOOL", "MAX_TOTAL_TOOL_CALLS", "TOOL_CALL_COOLDOWN", + "THINKING_FAST_INTERVAL", "MAX_READ_FILE_CHARS", "MAX_FOCUS_FILE_CHARS", "MAX_RUN_COMMAND_CHARS", diff --git a/prompts/thinking_mode_guidelines.txt b/prompts/thinking_mode_guidelines.txt index ef21918..94c5964 100644 --- a/prompts/thinking_mode_guidelines.txt +++ b/prompts/thinking_mode_guidelines.txt @@ -1,5 +1,6 @@ -你现在处于「思考模式」。 -思考模式时,第一次请求的模型不是kimi-k2,而是kimi-k2-thinking 一个更善于分析复杂问题,规划复杂流程的模型,在后续请求时,模型会换回kimi-k2 +你现在处于「思考模式」 +思考模式时,第一次请求的模型不是kimi-k2,而是kimi-k2-thinking 一个更善于分析复杂问题,规划复杂流程的模型,在后续请求时,模型会换回kimi-k2。 +并且,在系统监控到工具或写入失败时,会自动再次切换到思考模型,思考模型会更加深入地分析错误的原因,保证任务顺利进行。 请百分百遵循一下原则: @@ -24,4 +25,7 @@ - 若判断无需任何工具或修改,也要明确说明理由。 - 保持语气专业但亲切,让用户清楚你即将采取的行动。 -# 你的思考过程在后文会不可见,所以你需要在用户给出要求时的第一次回答中尽可能在正式输出中详细描述你的规划,来作为后面kimi-k2模型行动时提供参考依据 \ No newline at end of file +# 你的思考过程在后文会不可见,所以你需要在用户给出要求时的第一次回答中尽可能在正式输出中详细描述你的规划,来作为后面kimi-k2模型行动时提供参考依据 + +**⚠️重要提示⚠️** +思考模式下,token会被大量消耗,所以请保持思路连贯,在思考内容中禁止回顾全部的上下文历史,只需紧扣上一步的状态并规划下一步行动。 \ No newline at end of file diff --git a/static/app.js b/static/app.js index e5cfd51..7361314 100644 --- a/static/app.js +++ b/static/app.js @@ -737,7 +737,7 @@ async function bootstrapApp() { }; msg.actions.push(action); - const blockId = `${this.currentMessageIndex}-thinking-${msg.actions.length - 1}`; + const blockId = action.blockId || `thinking-${Date.now()}-${Math.random().toString(36).slice(2)}`; action.blockId = blockId; this.expandedBlocks.add(blockId); // 开始思考时自动锁定滚动到底部 @@ -777,12 +777,17 @@ async function bootstrapApp() { if (lastAction && lastAction.type === 'thinking') { lastAction.streaming = false; lastAction.content = data.full_content; - if (lastAction.blockId) { + const blockId = lastAction.blockId || `thinking-${Date.now()}-${Math.random().toString(36).slice(2)}`; + if (!lastAction.blockId) { + lastAction.blockId = blockId; + } + if (blockId) { setTimeout(() => { - this.expandedBlocks.delete(lastAction.blockId); + this.expandedBlocks.delete(blockId); + this.thinkingScrollLocks.delete(blockId); this.$forceUpdate(); }, 1000); - this.$nextTick(() => this.scrollThinkingToBottom(lastAction.blockId)); + this.$nextTick(() => this.scrollThinkingToBottom(blockId)); } } msg.streamingThinking = ''; @@ -1694,12 +1699,14 @@ async function bootstrapApp() { } if (reasoningText) { + const blockId = `history-thinking-${Date.now()}-${Math.random().toString(36).slice(2)}`; currentAssistantMessage.actions.push({ id: `history-think-${Date.now()}-${Math.random()}`, type: 'thinking', content: reasoningText, streaming: false, - timestamp: Date.now() + timestamp: Date.now(), + blockId }); console.log('添加思考内容:', reasoningText.substring(0, 50) + '...'); } diff --git a/static/index.html b/static/index.html index bf33efa..d4e081a 100644 --- a/static/index.html +++ b/static/index.html @@ -269,8 +269,8 @@
-
+ :class="{ expanded: expandedBlocks.has(action.blockId || `${index}-thinking-${actionIndex}`) }"> +
🧠 @@ -279,8 +279,8 @@
{{ action.content }}
diff --git a/utils/api_client.py b/utils/api_client.py index d41542a..e90b6ac 100644 --- a/utils/api_client.py +++ b/utils/api_client.py @@ -54,6 +54,9 @@ class DeepSeekClient: # 每个任务的独立状态 self.current_task_first_call = True # 当前任务是否是第一次调用 self.current_task_thinking = "" # 当前任务的思考内容 + self.force_thinking_next_call = False # 单次强制思考 + self.skip_thinking_next_call = False # 单次强制快速 + self.last_call_used_thinking = False # 最近一次调用是否使用思考模型 def _print(self, message: str, end: str = "\n", flush: bool = False): """安全的打印函数,在Web模式下不输出""" @@ -127,6 +130,9 @@ class DeepSeekClient: """开始新任务(重置任务级别的状态)""" self.current_task_first_call = True self.current_task_thinking = "" + self.force_thinking_next_call = False + self.skip_thinking_next_call = False + self.last_call_used_thinking = False def _build_headers(self, api_key: str) -> Dict[str, str]: return { @@ -149,11 +155,12 @@ class DeepSeekClient: def get_current_thinking_mode(self) -> bool: """获取当前应该使用的思考模式""" if not self.thinking_mode: - # 快速模式,始终不使用思考 return False - else: - # 思考模式:当前任务的第一次用思考,后续不用 - return self.current_task_first_call + if self.force_thinking_next_call: + return True + if self.skip_thinking_next_call: + return False + return self.current_task_first_call def _validate_json_string(self, json_str: str) -> tuple: """ @@ -249,10 +256,17 @@ class DeepSeekClient: api_config = self._select_api_config(current_thinking_mode) headers = self._build_headers(api_config["api_key"]) - # 如果是思考模式且不是当前任务的第一次,显示提示 - if self.thinking_mode and not self.current_task_first_call: + # 如果当前为快速模式但已有思考内容,提示沿用 + if self.thinking_mode and not current_thinking_mode and self.current_task_thinking: self._print(f"{OUTPUT_FORMATS['info']} [任务内快速模式] 使用本次任务的思考继续处理...") + # 记录本次调用的模式 + self.last_call_used_thinking = current_thinking_mode + if current_thinking_mode and self.force_thinking_next_call: + self.force_thinking_next_call = False + if not current_thinking_mode and self.skip_thinking_next_call: + self.skip_thinking_next_call = False + try: max_tokens = int(DEFAULT_RESPONSE_MAX_TOKENS) if max_tokens <= 0: @@ -416,9 +430,10 @@ class DeepSeekClient: if in_thinking: self._print("\n💭 [思考结束]\n") - # 在思考模式下,如果是当前任务的第一次调用且有思考内容,保存它 - if self.thinking_mode and self.current_task_first_call and current_thinking: + # 记录思考内容并更新调用状态 + if self.last_call_used_thinking and current_thinking: self.current_task_thinking = current_thinking + if self.current_task_first_call: self.current_task_first_call = False # 标记当前任务的第一次调用已完成 # 如果没有工具调用,说明完成了 @@ -610,9 +625,9 @@ class DeepSeekClient: if in_thinking: self._print("\n💭 [思考结束]\n") - # 在思考模式下,如果是当前任务的第一次调用且有思考内容,保存它 - if self.thinking_mode and self.current_task_first_call and thinking_content: + if self.last_call_used_thinking and thinking_content: self.current_task_thinking = thinking_content + if self.current_task_first_call: self.current_task_first_call = False # 如果没有收到任何响应 diff --git a/web_server.py b/web_server.py index 90da384..cc105b0 100644 --- a/web_server.py +++ b/web_server.py @@ -41,7 +41,8 @@ from config import ( DEFAULT_RESPONSE_MAX_TOKENS, DEFAULT_PROJECT_PATH, LOGS_DIR, - AGENT_VERSION + AGENT_VERSION, + THINKING_FAST_INTERVAL ) from modules.user_manager import UserManager, UserWorkspace from modules.gui_file_manager import GuiFileManager @@ -68,6 +69,8 @@ connection_users: Dict[str, str] = {} stop_flags: Dict[str, Dict[str, Any]] = {} DEFAULT_PORT = 8091 +THINKING_FAILURE_KEYWORDS = ["⚠️", "🛑", "失败", "错误", "异常", "终止", "error", "failed", "未完成", "超时", "强制"] + def format_read_file_result(result_data: Dict) -> str: """格式化 read_file 工具的输出,便于在Web端展示。""" @@ -337,6 +340,104 @@ def debug_log(message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] f.write(f"[{timestamp}] {message}\n") + +def get_thinking_state(terminal: WebTerminal) -> Dict[str, Any]: + """获取(或初始化)思考调度状态。""" + state = getattr(terminal, "_thinking_state", None) + if not state: + state = {"fast_streak": 0, "force_next": False, "suppress_next": False} + terminal._thinking_state = state + return state + + +def mark_force_thinking(terminal: WebTerminal, reason: str = ""): + """标记下一次API调用必须使用思考模型。""" + if not getattr(terminal, "thinking_mode", False): + return + state = get_thinking_state(terminal) + state["force_next"] = True + if reason: + debug_log(f"[Thinking] 下次强制思考,原因: {reason}") + + +def mark_suppress_thinking(terminal: WebTerminal): + """标记下一次API调用必须跳过思考模型(例如写入窗口)。""" + if not getattr(terminal, "thinking_mode", False): + return + state = get_thinking_state(terminal) + state["suppress_next"] = True + + +def apply_thinking_schedule(terminal: WebTerminal): + """根据当前状态配置API客户端的思考/快速模式。""" + client = terminal.api_client + if not getattr(terminal, "thinking_mode", False): + client.force_thinking_next_call = False + client.skip_thinking_next_call = False + return + state = get_thinking_state(terminal) + awaiting_writes = getattr(terminal, "pending_append_request", None) or getattr(terminal, "pending_modify_request", None) + if awaiting_writes: + client.skip_thinking_next_call = True + state["suppress_next"] = False + debug_log("[Thinking] 检测到写入窗口请求,跳过思考。") + return + if state.get("suppress_next"): + client.skip_thinking_next_call = True + state["suppress_next"] = False + debug_log("[Thinking] 由于写入窗口,下一次跳过思考。") + return + if state.get("force_next"): + client.force_thinking_next_call = True + state["force_next"] = False + state["fast_streak"] = 0 + debug_log("[Thinking] 响应失败,下一次强制思考。") + return + interval = max(0, THINKING_FAST_INTERVAL or 0) + if interval > 0 and state.get("fast_streak", 0) >= interval: + client.force_thinking_next_call = True + state["fast_streak"] = 0 + debug_log(f"[Thinking] 连续快速 {interval} 次,下一次强制思考。") + return + client.force_thinking_next_call = False + client.skip_thinking_next_call = False + + +def update_thinking_after_call(terminal: WebTerminal): + """一次API调用完成后更新快速计数。""" + if not getattr(terminal, "thinking_mode", False): + return + state = get_thinking_state(terminal) + if terminal.api_client.last_call_used_thinking: + state["fast_streak"] = 0 + else: + state["fast_streak"] = state.get("fast_streak", 0) + 1 + debug_log(f"[Thinking] 快速模式计数: {state['fast_streak']}") + + +def maybe_mark_failure_from_message(terminal: WebTerminal, content: Optional[str]): + """根据system消息内容判断是否需要强制思考。""" + if not content: + return + normalized = content.lower() + if any(keyword.lower() in normalized for keyword in THINKING_FAILURE_KEYWORDS): + mark_force_thinking(terminal, reason="system_message") + + +def detect_tool_failure(result_data: Any) -> bool: + """识别工具返回结果是否代表失败。""" + if not isinstance(result_data, dict): + return False + if result_data.get("success") is False: + return True + status = str(result_data.get("status", "")).lower() + if status in {"failed", "error"}: + return True + error_msg = result_data.get("error") + if isinstance(error_msg, str) and error_msg.strip(): + return True + return False + # 终端广播回调函数 def terminal_broadcast(event_type, data): """广播终端事件到所有订阅者""" @@ -1843,6 +1944,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client # 如果是思考模式,重置状态 if web_terminal.thinking_mode: web_terminal.api_client.start_new_task() + state = get_thinking_state(web_terminal) + state["fast_streak"] = 0 + state["force_next"] = False + state["suppress_next"] = False # 添加到对话历史 web_terminal.context_manager.add_conversation("user", message) @@ -2470,6 +2575,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client 'content': message, 'inline': inline }) + maybe_mark_failure_from_message(web_terminal, message) for iteration in range(max_iterations): total_iterations += 1 @@ -2481,6 +2587,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client sender('system_message', { 'content': f'⚠️ 已达到最大工具调用次数限制 ({MAX_TOTAL_TOOL_CALLS}),任务结束。' }) + mark_force_thinking(web_terminal, reason="tool_limit") break # === 修改:每次API调用前都计算输入token === @@ -2493,6 +2600,8 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client except Exception as e: debug_log(f"输入token统计失败: {e}") + apply_thinking_schedule(web_terminal) + full_response = "" tool_calls = [] current_thinking = "" @@ -3006,10 +3115,14 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client web_terminal.context_manager.add_conversation("system", follow_prompt) debug_log("已注入追加任务提示") + if append_result["handled"] and append_result.get("forced") and append_result.get("success"): + mark_force_thinking(web_terminal, reason="append_forced_finish") if append_result["handled"] and not append_result.get("success"): sender('system_message', { 'content': f'⚠️ 追加写入失败:{append_result.get("error")}' }) + maybe_mark_failure_from_message(web_terminal, f'⚠️ 追加写入失败:{append_result.get("error")}') + mark_force_thinking(web_terminal, reason="append_failed") if modify_result["handled"]: modify_metadata = modify_result.get("assistant_metadata") @@ -3076,16 +3189,23 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client web_terminal.context_manager.add_conversation("system", follow_prompt) debug_log("已注入修改任务提示") + if modify_result["handled"] and modify_result.get("failed_blocks"): + mark_force_thinking(web_terminal, reason="modify_partial_failure") + if modify_result["handled"] and modify_result.get("forced") and modify_result.get("success"): + mark_force_thinking(web_terminal, reason="modify_forced_finish") if modify_result["handled"] and not modify_result.get("success"): error_message = modify_result.get("summary_message") or modify_result.get("error") or "修改操作未成功,请根据提示重新执行。" sender('system_message', { 'content': f'⚠️ 修改操作存在未完成的内容:{error_message}' }) + maybe_mark_failure_from_message(web_terminal, f'⚠️ 修改操作存在未完成的内容:{error_message}') + mark_force_thinking(web_terminal, reason="modify_failed") - # 保存思考内容(如果这是第一次迭代且有思考) - if web_terminal.thinking_mode and web_terminal.api_client.current_task_first_call: + if web_terminal.api_client.last_call_used_thinking and current_thinking: web_terminal.api_client.current_task_thinking = current_thinking or "" + if web_terminal.api_client.current_task_first_call: web_terminal.api_client.current_task_first_call = False + update_thinking_after_call(web_terminal) # 检测是否有格式错误的工具调用 if not tool_calls and full_response and AUTO_FIX_TOOL_CALL and not append_result["handled"] and not modify_result["handled"]: @@ -3100,6 +3220,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client sender('system_message', { 'content': f'⚠️ 自动修复: {fix_message}' }) + maybe_mark_failure_from_message(web_terminal, f'⚠️ 自动修复: {fix_message}') messages.append({ "role": "user", @@ -3113,6 +3234,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client sender('system_message', { 'content': f'⌘ 工具调用格式错误,自动修复失败。请手动检查并重试。' }) + maybe_mark_failure_from_message(web_terminal, '⌘ 工具调用格式错误,自动修复失败。请手动检查并重试。') break # 构建助手消息(用于API继续对话) @@ -3194,12 +3316,14 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client sender('system_message', { 'content': f'⚠️ 检测到重复调用 {tool_name} 工具 {MAX_CONSECUTIVE_SAME_TOOL} 次,可能存在循环。' }) + maybe_mark_failure_from_message(web_terminal, f'⚠️ 检测到重复调用 {tool_name} 工具 {MAX_CONSECUTIVE_SAME_TOOL} 次,可能存在循环。') if consecutive_same_tool[tool_name] >= MAX_CONSECUTIVE_SAME_TOOL + 2: debug_log(f"终止: 工具 {tool_name} 调用次数过多") sender('system_message', { 'content': f'⌘ 工具 {tool_name} 重复调用过多,任务终止。' }) + maybe_mark_failure_from_message(web_terminal, f'⌘ 工具 {tool_name} 重复调用过多,任务终止。') break else: consecutive_same_tool.clear() @@ -3313,6 +3437,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client result_data = json.loads(tool_result) except: result_data = {'output': tool_result} + tool_failed = detect_tool_failure(result_data) action_status = 'completed' action_message = None @@ -3338,6 +3463,9 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client text_started = False text_streaming = False text_has_content = False + if hasattr(web_terminal, "pending_append_request"): + web_terminal.pending_append_request = {"path": append_path} + mark_suppress_thinking(web_terminal) debug_log(f"append_to_file 等待输出: {append_path}") else: debug_log("append_to_file 返回完成状态") @@ -3366,6 +3494,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client text_started = False text_streaming = False text_has_content = False + mark_suppress_thinking(web_terminal) debug_log(f"modify_file 等待输出: {modify_path}") else: debug_log("modify_file 返回完成状态") @@ -3381,6 +3510,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client 'content': system_msg, 'inline': False }) + maybe_mark_failure_from_message(web_terminal, system_msg) update_payload = { 'id': tool_display_id, @@ -3424,9 +3554,11 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client system_message = result_data.get("system_message") if isinstance(result_data, dict) else None if system_message: web_terminal._record_sub_agent_message(system_message, result_data.get("task_id"), inline=False) + maybe_mark_failure_from_message(web_terminal, system_message) todo_note = result_data.get("system_note") if isinstance(result_data, dict) else None if todo_note: web_terminal.context_manager.add_conversation("system", todo_note) + maybe_mark_failure_from_message(web_terminal, todo_note) # 添加到消息历史(用于API继续对话) messages.append({ @@ -3440,6 +3572,9 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client await process_sub_agent_updates(messages, inline=True, after_tool_call_id=tool_call_id) await asyncio.sleep(0.2) + + if tool_failed: + mark_force_thinking(web_terminal, reason=f"{function_name}_failed") # 标记不再是第一次迭代 is_first_iteration = False