From 7890926c3de5740c8c2c6bc692f9ec1ed42ca637 Mon Sep 17 00:00:00 2001 From: JOJO <1498581755@qq.com> Date: Fri, 30 Jan 2026 15:36:44 +0800 Subject: [PATCH] fix: improve cancellation flow and api error tracing --- core/main_terminal.py | 2 +- modules/terminal_ops.py | 83 ++++++++----- server/_conversation_segment.py | 15 +++ server/chat_flow.py | 87 ++++++++++++-- server/socket_handlers.py | 26 +++-- server/state.py | 34 +++++- static/src/app.ts | 11 +- static/src/composables/useLegacySocket.ts | 13 ++- utils/api_client.py | 135 +++++++++++++++++++++- utils/tool_result_formatter.py | 23 +++- 10 files changed, 370 insertions(+), 59 deletions(-) diff --git a/core/main_terminal.py b/core/main_terminal.py index a3f5a18..59f19d7 100644 --- a/core/main_terminal.py +++ b/core/main_terminal.py @@ -2421,7 +2421,7 @@ class MainTerminal: project_storage=project_storage, file_tree=context["project_info"]["file_tree"], memory=context["memory"], - current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + current_time=datetime.now().strftime("%Y-%m-%d"), model_description=prompt_replacements.get("model_description", "") ) diff --git a/modules/terminal_ops.py b/modules/terminal_ops.py index 25c7076..c0ac1da 100644 --- a/modules/terminal_ops.py +++ b/modules/terminal_ops.py @@ -323,33 +323,52 @@ class TerminalOperator: start_ts = time.time() # 优先在绑定的容器或活动终端的容器内执行,保证与实时终端环境一致 - if self.container_session or session_override: - result_payload = await self._run_command_subprocess( - command, - work_path, - timeout, - session_override=session_override - ) - else: - # 若未绑定用户容器,则使用工具箱容器(与终端相同镜像/预装包) - toolbox = self._get_toolbox() - payload = await toolbox.run(command, work_path, timeout) - result_payload = self._format_toolbox_output(payload) - # 追加耗时信息以对齐接口 - result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000) - result_payload["timeout"] = timeout - # 字符数检查(与主流程一致) - if result_payload.get("success") and "output" in result_payload: - char_count = len(result_payload["output"]) - if char_count > MAX_RUN_COMMAND_CHARS: - return { - "success": False, - "error": f"结果内容过大,有{char_count}字符,请使用限制字符数的获取内容方式,根据程度选择10k以内的数", - "char_count": char_count, - "limit": MAX_RUN_COMMAND_CHARS, - "command": command - } - return result_payload + try: + if self.container_session or session_override: + result_payload = await self._run_command_subprocess( + command, + work_path, + timeout, + session_override=session_override + ) + else: + # 若未绑定用户容器,则使用工具箱容器(与终端相同镜像/预装包) + toolbox = self._get_toolbox() + try: + payload = await toolbox.run(command, work_path, timeout) + except asyncio.CancelledError: + # 任务被取消时强制关闭工具箱终端,避免后台命令继续运行 + try: + toolbox.shutdown() + except Exception: + pass + raise + result_payload = self._format_toolbox_output(payload) + # 追加耗时信息以对齐接口 + result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000) + result_payload["timeout"] = timeout + # 字符数检查(与主流程一致) + if result_payload.get("success") and "output" in result_payload: + char_count = len(result_payload["output"]) + if char_count > MAX_RUN_COMMAND_CHARS: + return { + "success": False, + "error": f"结果内容过大,有{char_count}字符,请使用限制字符数的获取内容方式,根据程度选择10k以内的数", + "char_count": char_count, + "limit": MAX_RUN_COMMAND_CHARS, + "command": command + } + return result_payload + except asyncio.CancelledError: + return { + "success": False, + "message": "命令执行被用户取消", + "output": "", + "status": "cancelled", + "return_code": -1, + "timeout": timeout, + "elapsed_ms": int((time.time() - start_ts) * 1000) + } # 改为一次性子进程执行,确保等待到超时或命令结束 result_payload = result_payload if result_payload is not None else await self._run_command_subprocess( @@ -509,6 +528,16 @@ class TerminalOperator: except Exception: process.kill() await process.wait() + except asyncio.CancelledError: + # 用户主动停止任务或会话断开,立即终止子进程 + try: + os.killpg(process.pid, signal.SIGKILL) + except Exception: + try: + process.kill() + except Exception: + pass + raise # 确保读取协程结束 await asyncio.gather(stdout_task, stderr_task, return_exceptions=True) diff --git a/server/_conversation_segment.py b/server/_conversation_segment.py index 75faf1d..1e57663 100644 --- a/server/_conversation_segment.py +++ b/server/_conversation_segment.py @@ -1407,6 +1407,21 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac # 收集流式响应 async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): + if isinstance(chunk, dict) and chunk.get("error"): + err = chunk["error"] + status_code = err.get("status_code") + err_msg = err.get("error_message") or err.get("error_text") or "API请求失败" + err_type = err.get("error_type") + sender('error', { + 'message': err_msg, + 'status_code': status_code, + 'error_type': err_type, + 'request_dump': err.get("request_dump") + }) + # 标记任务结束,避免继续处理 + last_finish_reason = "api_error" + break + chunk_count += 1 # 检查停止标志 diff --git a/server/chat_flow.py b/server/chat_flow.py index c8d3909..cca96e6 100644 --- a/server/chat_flow.py +++ b/server/chat_flow.py @@ -158,6 +158,9 @@ from .state import ( terminal_rooms, connection_users, stop_flags, + get_stop_flag, + set_stop_flag, + clear_stop_flag, ) from .extensions import socketio @@ -386,13 +389,13 @@ def process_message_task(terminal: WebTerminal, message: str, images, sender, cl # 创建可取消的任务 task = loop.create_task(handle_task_with_sender(terminal, workspace, message, images, sender, client_sid, username)) - entry = stop_flags.get(client_sid) + entry = get_stop_flag(client_sid, username) if not isinstance(entry, dict): entry = {'stop': False, 'task': None, 'terminal': None} - stop_flags[client_sid] = entry entry['stop'] = False entry['task'] = task entry['terminal'] = terminal + set_stop_flag(client_sid, username, entry) try: loop.run_until_complete(task) @@ -430,7 +433,7 @@ def process_message_task(terminal: WebTerminal, message: str, images, sender, cl finally: # 清理任务引用 - stop_flags.pop(client_sid, None) + clear_stop_flag(client_sid, username) def detect_malformed_tool_call(text): """检测文本中是否包含格式错误的工具调用""" @@ -1199,6 +1202,32 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac modify_break_triggered = False modify_result = {"handled": False} last_finish_reason = None + + def _cancel_pending_tools(tool_calls_list): + """为尚未返回结果的工具生成取消结果,防止缺失 tool_call_id 造成后续 400。""" + if not tool_calls_list: + return + for tc in tool_calls_list: + tc_id = tc.get("id") + func_name = tc.get("function", {}).get("name") + sender('update_action', { + 'preparing_id': tc_id, + 'status': 'cancelled', + 'result': { + "success": False, + "status": "cancelled", + "message": "命令执行被用户取消", + "tool": func_name + } + }) + if tc_id: + messages.append({ + "role": "tool", + "tool_call_id": tc_id, + "name": func_name, + "content": "命令执行被用户取消", + "metadata": {"status": "cancelled"} + }) thinking_expected = web_terminal.api_client.get_current_thinking_mode() debug_log(f"思考模式: {thinking_expected}") @@ -1231,17 +1260,22 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac chunk_count += 1 # 检查停止标志 - client_stop_info = stop_flags.get(client_sid) + client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log(f"检测到停止请求,中断流处理") if pending_append: append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop") - break if pending_modify: modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop") - break + _cancel_pending_tools(tool_calls) + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + clear_stop_flag(client_sid, username) + return # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) usage_info = chunk.get("usage") @@ -1576,11 +1610,17 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac debug_log(f" 新工具: {tool_name}") # 检查是否被停止 - client_stop_info = stop_flags.get(client_sid) + client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log("任务在流处理完成后检测到停止状态") + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + _cancel_pending_tools(tool_calls) + clear_stop_flag(client_sid, username) return # === API响应完成后只计算输出token === @@ -1920,13 +1960,40 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac # 执行每个工具 for tool_call in tool_calls: # 检查停止标志 - client_stop_info = stop_flags.get(client_sid) + client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: - debug_log("在工具调用过程中检测到停止状态") + debug_log("在工具调用过程中检测到停止状态") + tool_call_id = tool_call.get("id") + function_name = tool_call.get("function", {}).get("name") + # 通知前端该工具已被取消,避免界面卡住 + sender('update_action', { + 'preparing_id': tool_call_id, + 'status': 'cancelled', + 'result': { + "success": False, + "status": "cancelled", + "message": "命令执行被用户取消", + "tool": function_name + } + }) + # 在消息列表中记录取消结果,防止重新加载时仍显示运行中 + if tool_call_id: + messages.append({ + "role": "tool", + "tool_call_id": tool_call_id, + "name": function_name, + "content": "命令执行被用户取消", + "metadata": {"status": "cancelled"} + }) + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + clear_stop_flag(client_sid, username) return - + # 工具调用间隔控制 current_time = time.time() if last_tool_call_time > 0: diff --git a/server/socket_handlers.py b/server/socket_handlers.py index 6e50b1a..810f0c8 100644 --- a/server/socket_handlers.py +++ b/server/socket_handlers.py @@ -13,7 +13,7 @@ from .context import ( get_user_resources, ) from .utils_common import debug_log, log_frontend_chunk, log_streaming_debug_entry -from .state import connection_users, stop_flags, terminal_rooms, pending_socket_tokens, user_manager +from .state import connection_users, stop_flags, terminal_rooms, pending_socket_tokens, user_manager, get_stop_flag, set_stop_flag, clear_stop_flag from .usage import record_user_activity from .chat_flow import start_chat_task from .security import consume_socket_token, prune_socket_tokens @@ -35,6 +35,10 @@ def handle_connect(auth): # 清理可能存在的停止标志和状态 stop_flags.pop(request.sid, None) + # 将旧的 username 级别任务映射到新的 sid,便于重新停止 + user_entry = get_stop_flag(None, username) + if user_entry: + set_stop_flag(request.sid, username, user_entry) join_room(f"user_{username}") join_room(f"user_{username}_terminal") @@ -72,7 +76,7 @@ def handle_disconnect(): """客户端断开""" print(f"[WebSocket] 客户端断开: {request.sid}") username = connection_users.pop(request.sid, None) - task_info = stop_flags.get(request.sid) + task_info = get_stop_flag(request.sid, username) if isinstance(task_info, dict): task_info['stop'] = True pending_task = task_info.get('task') @@ -84,7 +88,7 @@ def handle_disconnect(): reset_system_state(terminal) # 清理停止标志 - stop_flags.pop(request.sid, None) + clear_stop_flag(request.sid, None) # 从所有房间移除 for room in list(terminal_rooms.get(request.sid, [])): @@ -100,19 +104,19 @@ def handle_disconnect(): def handle_stop_task(): """处理停止任务请求""" print(f"[停止] 收到停止请求: {request.sid}") - - task_info = stop_flags.get(request.sid) + username = connection_users.get(request.sid) + task_info = get_stop_flag(request.sid, username) if not isinstance(task_info, dict): task_info = {'stop': False, 'task': None, 'terminal': None} - stop_flags[request.sid] = task_info - - if task_info.get('task') and not task_info['task'].done(): - debug_log(f"正在取消任务: {request.sid}") - task_info['task'].cancel() - + # 标记停止并尝试取消任务 task_info['stop'] = True + pending_task = task_info.get('task') + if pending_task and not pending_task.done(): + debug_log(f"正在取消任务: {request.sid}") + pending_task.cancel() if task_info.get('terminal'): reset_system_state(task_info['terminal']) + set_stop_flag(request.sid, username, task_info) emit('stop_requested', { 'message': '停止请求已接收,正在取消任务...' diff --git a/server/state.py b/server/state.py index dae3c33..e22898f 100644 --- a/server/state.py +++ b/server/state.py @@ -110,6 +110,10 @@ __all__ = [ "PROJECT_MAX_STORAGE_MB", "RECENT_UPLOAD_EVENT_LIMIT", "RECENT_UPLOAD_FEED_LIMIT", + "make_stop_keys", + "get_stop_flag", + "set_stop_flag", + "clear_stop_flag", "get_last_active_ts", ] @@ -141,5 +145,33 @@ def get_last_active_ts(username: str, fallback: Optional[float] = None) -> Optio if fallback_val is not None and fallback_val > cached_val: _last_active_cache[username] = fallback_val return fallback_val - + return cached_val + + +# ====== 停止标志辅助 ====== +def make_stop_keys(client_sid: Optional[str] = None, username: Optional[str] = None): + keys = [] + if client_sid: + keys.append(client_sid) + if username: + keys.append(f"user:{username}") + return keys + + +def set_stop_flag(client_sid: Optional[str], username: Optional[str], entry: Dict[str, Any]): + for k in make_stop_keys(client_sid, username): + stop_flags[k] = entry + + +def get_stop_flag(client_sid: Optional[str], username: Optional[str]) -> Optional[Dict[str, Any]]: + for k in make_stop_keys(client_sid, username): + val = stop_flags.get(k) + if val: + return val + return None + + +def clear_stop_flag(client_sid: Optional[str], username: Optional[str]): + for k in make_stop_keys(client_sid, username): + stop_flags.pop(k, None) diff --git a/static/src/app.ts b/static/src/app.ts index d85ada0..42c7e77 100644 --- a/static/src/app.ts +++ b/static/src/app.ts @@ -1983,7 +1983,7 @@ const appOptions = { arguments_obj = {}; } - currentAssistantMessage.actions.push({ + const action = { id: `history-tool-${toolCall.id || Date.now()}-${tcIndex}`, type: 'tool', tool: { @@ -1996,7 +1996,14 @@ const appOptions = { result: null }, timestamp: Date.now() - }); + }; + // 如果是历史加载的动作且状态仍为进行中,标记为 stale,避免刷新后按钮卡死 + if (['preparing', 'running', 'awaiting_content'].includes(action.tool.status)) { + action.tool.status = 'stale'; + action.tool.awaiting_content = false; + action.streaming = false; + } + currentAssistantMessage.actions.push(action); debugLog('添加工具调用:', toolCall.function.name); }); } diff --git a/static/src/composables/useLegacySocket.ts b/static/src/composables/useLegacySocket.ts index 1d5b399..c9c57b8 100644 --- a/static/src/composables/useLegacySocket.ts +++ b/static/src/composables/useLegacySocket.ts @@ -1377,7 +1377,18 @@ export async function initializeLegacySocket(ctx: any) { // 错误处理 ctx.socket.on('error', (data) => { - ctx.addSystemMessage(`错误: ${data.message}`); + const msg = data?.message || '发生未知错误'; + const code = data?.status_code; + const errType = data?.error_type; + ctx.addSystemMessage(`错误: ${msg}`); + if (typeof ctx.uiPushToast === 'function') { + ctx.uiPushToast({ + title: code ? `API错误 ${code}` : 'API错误', + message: errType ? `${errType}: ${msg}` : msg, + type: 'error', + duration: 6000 + }); + } // 仅标记当前流结束,避免状态错乱 ctx.streamingMessage = false; ctx.stopRequested = false; diff --git a/utils/api_client.py b/utils/api_client.py index 1004914..9b873ee 100644 --- a/utils/api_client.py +++ b/utils/api_client.py @@ -4,7 +4,9 @@ import httpx import json import asyncio -from typing import List, Dict, Optional, AsyncGenerator +from typing import List, Dict, Optional, AsyncGenerator, Any +from pathlib import Path +from datetime import datetime try: from config import ( API_BASE_URL, @@ -64,6 +66,11 @@ class DeepSeekClient: self.force_thinking_next_call = False # 单次强制思考 self.skip_thinking_next_call = False # 单次强制快速 self.last_call_used_thinking = False # 最近一次调用是否使用思考模型 + # 最近一次API错误详情 + self.last_error_info: Optional[Dict[str, Any]] = None + # 请求体落盘目录 + self.request_dump_dir = Path(__file__).resolve().parents[1] / "logs" / "api_requests" + self.request_dump_dir.mkdir(parents=True, exist_ok=True) def _print(self, message: str, end: str = "\n", flush: bool = False): """安全的打印函数,在Web模式下不输出""" @@ -166,6 +173,46 @@ class DeepSeekClient: "model_id": config.get("model_id") or fallback["model_id"] } + def _dump_request_payload(self, payload: Dict, api_config: Dict, headers: Dict) -> Path: + """ + 将本次请求的payload、headers、配置落盘,便于排查400等错误。 + 返回写入的文件路径。 + """ + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filename = f"req_{timestamp}.json" + path = self.request_dump_dir / filename + try: + headers_sanitized = {} + for k, v in headers.items(): + headers_sanitized[k] = "***" if k.lower() == "authorization" else v + data = { + "timestamp": datetime.now().isoformat(), + "api_config": {k: api_config.get(k) for k in ["base_url", "model_id"]}, + "headers": headers_sanitized, + "payload": payload + } + path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + except Exception as exc: + self._print(f"{OUTPUT_FORMATS['warning']} 请求体落盘失败: {exc}") + return path + + def _mark_request_error(self, dump_path: Path, status_code: int = None, error_text: str = None): + """ + 在已有请求文件中追加错误标记,便于快速定位。 + """ + if not dump_path or not dump_path.exists(): + return + try: + data = json.loads(dump_path.read_text(encoding="utf-8")) + data["error"] = { + "status_code": status_code, + "message": error_text, + "marked_at": datetime.now().isoformat() + } + dump_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + except Exception as exc: + self._print(f"{OUTPUT_FORMATS['warning']} 标记请求错误失败: {exc}") + def apply_profile(self, profile: Dict): """ @@ -354,6 +401,9 @@ class DeepSeekClient: if tools: payload["tools"] = tools payload["tool_choice"] = "auto" + + # 将本次请求落盘,便于出错时快速定位 + dump_path = self._dump_request_payload(payload, api_config, headers) try: async with httpx.AsyncClient(http2=True, timeout=300) as client: @@ -366,8 +416,26 @@ class DeepSeekClient: ) as response: # 检查响应状态 if response.status_code != 200: - error_text = await response.aread() + error_bytes = await response.aread() + error_text = error_bytes.decode('utf-8', errors='ignore') if hasattr(error_bytes, 'decode') else str(error_bytes) + self.last_error_info = { + "status_code": response.status_code, + "error_text": error_text, + "error_type": None, + "error_message": None, + "request_dump": str(dump_path) + } + try: + parsed = json.loads(error_text) + err = parsed.get("error") if isinstance(parsed, dict) else {} + if isinstance(err, dict): + self.last_error_info["error_type"] = err.get("type") + self.last_error_info["error_message"] = err.get("message") + except Exception: + pass self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") + self._mark_request_error(dump_path, response.status_code, error_text) + yield {"error": self.last_error_info} return async for line in response.aiter_lines(): @@ -389,16 +457,62 @@ class DeepSeekClient: ) if response.status_code != 200: error_text = response.text + self.last_error_info = { + "status_code": response.status_code, + "error_text": error_text, + "error_type": None, + "error_message": None, + "request_dump": str(dump_path) + } + try: + parsed = response.json() + err = parsed.get("error") if isinstance(parsed, dict) else {} + if isinstance(err, dict): + self.last_error_info["error_type"] = err.get("type") + self.last_error_info["error_message"] = err.get("message") + except Exception: + pass self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") + self._mark_request_error(dump_path, response.status_code, error_text) + yield {"error": self.last_error_info} return + # 成功则清空错误状态 + self.last_error_info = None yield response.json() except httpx.ConnectError: self._print(f"{OUTPUT_FORMATS['error']} 无法连接到API服务器,请检查网络连接") + self.last_error_info = { + "status_code": None, + "error_text": "connect_error", + "error_type": "connection_error", + "error_message": "无法连接到API服务器", + "request_dump": str(dump_path) + } + self._mark_request_error(dump_path, error_text="connect_error") + yield {"error": self.last_error_info} except httpx.TimeoutException: self._print(f"{OUTPUT_FORMATS['error']} API请求超时") + self.last_error_info = { + "status_code": None, + "error_text": "timeout", + "error_type": "timeout", + "error_message": "API请求超时", + "request_dump": str(dump_path) + } + self._mark_request_error(dump_path, error_text="timeout") + yield {"error": self.last_error_info} except Exception as e: self._print(f"{OUTPUT_FORMATS['error']} API调用异常: {e}") + self.last_error_info = { + "status_code": None, + "error_text": str(e), + "error_type": "exception", + "error_message": str(e), + "request_dump": str(dump_path) + } + self._mark_request_error(dump_path, error_text=str(e)) + yield {"error": self.last_error_info} async def chat_with_tools( self, @@ -438,6 +552,15 @@ class DeepSeekClient: thinking_printed = False async for chunk in self.chat(messages, tools, stream=True): + if chunk.get("error"): + # 直接返回错误,让上层处理 + err = chunk["error"] + self.last_error_info = err + err_msg = err.get("error_message") or err.get("error_text") or "API调用失败" + status = err.get("status_code") + self._print(f"{OUTPUT_FORMATS['error']} 模型API错误{f'({status})' if status is not None else ''}: {err_msg}") + return "" + if "choices" not in chunk: continue @@ -687,6 +810,14 @@ class DeepSeekClient: try: async for chunk in self.chat(messages, tools=None, stream=True): + if chunk.get("error"): + err = chunk["error"] + self.last_error_info = err + err_msg = err.get("error_message") or err.get("error_text") or "API调用失败" + status = err.get("status_code") + self._print(f"{OUTPUT_FORMATS['error']} 模型API错误{f'({status})' if status is not None else ''}: {err_msg}") + return "", "" + if "choices" not in chunk: continue diff --git a/utils/tool_result_formatter.py b/utils/tool_result_formatter.py index d1768b7..f9ef7d0 100644 --- a/utils/tool_result_formatter.py +++ b/utils/tool_result_formatter.py @@ -334,10 +334,25 @@ def _plain_command_output(result_data: Dict[str, Any]) -> str: message = result_data.get("message") prefixes = [] - if status in {"timeout"} and timeout: - prefixes.append(f"[timeout after {int(timeout)}s]") - elif status in {"timeout"}: - prefixes.append("[timeout]") + if status in {"timeout"}: + appended = False + # 1) 优先使用数值型 timeout + if isinstance(timeout, (int, float)) and timeout > 0: + prefixes.append(f"[timeout after {int(timeout)}s]") + appended = True + # 2) 字符串数字 + elif isinstance(timeout, str) and timeout.strip().isdigit(): + prefixes.append(f"[timeout after {int(timeout.strip())}s]") + appended = True + # 3) 未设置超时(never),用 elapsed_ms 近似 + elif (isinstance(timeout, str) and timeout.lower() == "never") or result_data.get("never_timeout"): + elapsed_ms = result_data.get("elapsed_ms") + if isinstance(elapsed_ms, (int, float)) and elapsed_ms > 0: + secs = max(1, int(round(elapsed_ms / 1000))) + prefixes.append(f"[timeout after ~{secs}s]") + appended = True + if not appended: + prefixes.append("[timeout]") elif status in {"killed"}: prefixes.append("[killed]") elif status in {"awaiting_input"}: