diff --git a/core/main_terminal.py b/core/main_terminal.py index 41f2cda..a3c6de0 100644 --- a/core/main_terminal.py +++ b/core/main_terminal.py @@ -62,6 +62,7 @@ from modules.container_monitor import collect_stats, inspect_state from core.tool_config import TOOL_CATEGORIES from utils.api_client import DeepSeekClient from utils.context_manager import ContextManager +from utils.tool_result_formatter import format_tool_result_for_context from utils.logger import setup_logger if TYPE_CHECKING: @@ -131,10 +132,6 @@ class MainTerminal: self.focused_files = {} # {path: content} 存储聚焦的文件内容 self.current_session_id = 0 # 用于标识不同的任务会话 - # 新增:追加内容状态 - self.pending_append_request = None # {"path": str} - self.pending_modify_request = None # {"path": str} - # 工具启用状态 self.tool_category_states = { key: category.default_enabled @@ -665,16 +662,12 @@ class MainTerminal: collected_tool_calls.append(tool_call_info) # 处理工具结果用于保存 - result_data = {} try: - result_data = json.loads(result) - if tool_name == "read_file" and result_data.get("success"): - file_content = result_data.get("content", "") - tool_result_content = f"文件内容:\n```\n{file_content}\n```\n大小: {result_data.get('size')} 字节" - else: - tool_result_content = result - except: - tool_result_content = result + parsed = json.loads(result) + result_data = parsed if isinstance(parsed, dict) else {} + except Exception: + result_data = {} + tool_result_content = format_tool_result_for_context(tool_name, result_data, result) # 收集工具结果(不保存) collected_tool_results.append({ @@ -682,7 +675,8 @@ class MainTerminal: "name": tool_name, "content": tool_result_content, "system_message": result_data.get("system_message") if isinstance(result_data, dict) else None, - "task_id": result_data.get("task_id") if isinstance(result_data, dict) else None + "task_id": result_data.get("task_id") if isinstance(result_data, dict) else None, + "raw_result_data": result_data if result_data else None, }) return result @@ -726,12 +720,8 @@ class MainTerminal: if system_message: self._record_sub_agent_message(system_message, tool_result.get("task_id"), inline=False) # 补充TODO完成提示,放在tool消息之后保证格式正确 - todo_note = None - try: - parsed = json.loads(tool_result["content"]) - todo_note = parsed.get("system_note") - except Exception: - todo_note = None + raw_payload = tool_result.get("raw_result_data") or {} + todo_note = raw_payload.get("system_note") if todo_note: self.context_manager.add_conversation("system", todo_note) @@ -746,8 +736,8 @@ class MainTerminal: print(f"{OUTPUT_FORMATS['file']} 读取文件") elif tool_name == "ocr_image": print(f"{OUTPUT_FORMATS['file']} 图片OCR") - elif tool_name == "modify_file": - print(f"{OUTPUT_FORMATS['file']} 修改文件") + elif tool_name == "write_file_diff": + print(f"{OUTPUT_FORMATS['file']} 应用补丁") elif tool_name == "delete_file": print(f"{OUTPUT_FORMATS['file']} 删除文件") elif tool_name == "terminal_session": @@ -1084,7 +1074,7 @@ class MainTerminal: "type": "function", "function": { "name": "create_file", - "description": "创建新文件(仅创建空文件,正文请使用 append_to_file 追加)", + "description": "创建新文件(仅创建空文件,正文请使用 write_file_diff 提交补丁)", "parameters": { "type": "object", "properties": { @@ -1217,28 +1207,18 @@ class MainTerminal: { "type": "function", "function": { - "name": "modify_file", - "description": "准备替换文件中的指定内容。调用后系统会发放写入窗口,请严格按照模板输出<<>>…<<>>结构。", + "name": "write_file_diff", + "description": "使用统一 diff(@@ 块、- / + 行)直接写入文件,可一次性完成追加/替换/删除。每个块以 @@ [id:数字] 开头,块内只能包含上下文行(空格开头)、- 原文行、+ 新内容行,请务必包含足够的上下文以确保定位准确。", "parameters": { "type": "object", "properties": { - "path": {"type": "string", "description": "目标文件路径"} + "path": {"type": "string", "description": "目标文件路径(相对项目根目录)。"}, + "patch": { + "type": "string", + "description": "完整补丁文本,格式类似 unified diff,必须用 *** Begin Patch / *** End Patch 包裹,并用 @@ [id:数字] 划分多个修改块。示例:*** Begin Patch\\n@@ [id:1]\\n def main():\\n- def greet(self):\\n- return \"hi\"\\n+ def greet(self, name: str) -> str:\\n+ message = f\"Hello, {name}!\"\\n+ return message\\n@@ [id:2]\\n+\\n+if __name__ == \"__main__\":\\n+ print(\"hello world\")\\n*** End Patch" + } }, - "required": ["path"] - } - } - }, - { - "type": "function", - "function": { - "name": "append_to_file", - "description": "准备向文件追加大段内容。调用后必须按照系统指令输出<<>>...<<>>格式的正文,禁止夹带解释性文字。", - "parameters": { - "type": "object", - "properties": { - "path": {"type": "string", "description": "目标文件路径"} - }, - "required": ["path"] + "required": ["path", "patch"] } } }, @@ -1671,13 +1651,13 @@ class MainTerminal: }, ensure_ascii=False) # 针对特定工具的内容检查 - if tool_name in ["modify_file", "create_file"] and "content" in arguments: + if tool_name == "create_file" and "content" in arguments: content = arguments.get("content", "") if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 30KB内容限制 return json.dumps({ "success": False, "error": f"文件内容过长({len(content)}字符),建议分块处理", - "suggestion": "请拆分内容或使用 modify_file 工具输出结构化补丁" + "suggestion": "请拆分内容或使用 write_file_diff 工具输出结构化补丁" }, ensure_ascii=False) # 检查内容中的特殊字符 @@ -1732,6 +1712,7 @@ class MainTerminal: else: result = {"success": False, "error": f"未知操作: {action}"} + result["action"] = action # 终端输入工具 elif tool_name == "terminal_input": @@ -1807,8 +1788,7 @@ class MainTerminal: ) if result.get("success"): result["message"] = ( - f"已创建空文件: {result['path']}。请使用 append_to_file " - "追加正文内容,或使用 modify_file 进行小范围替换。" + f"已创建空文件: {result['path']}。请使用 write_file_diff 提交补丁写入正文。" ) elif tool_name == "delete_file": @@ -1849,81 +1829,22 @@ class MainTerminal: self.focused_files[new_path] = content print(f"🔍 已更新文件聚焦: {old_path} -> {new_path}") - elif tool_name == "modify_file": + elif tool_name == "write_file_diff": path = arguments.get("path") - if not path: - result = {"success": False, "error": "缺少必要参数: path"} + patch_text = arguments.get("patch") + if not path or not patch_text: + result = {"success": False, "error": "缺少必要参数: path/patch"} else: - if self.pending_append_request: - active_path = self.pending_append_request.get("path") + length_limit = 30000 + if not DISABLE_LENGTH_CHECK and len(patch_text) > length_limit: result = { "success": False, - "error": f"当前仍有 append_to_file 任务未完成: {active_path}", - "suggestion": "请先完成追加,再继续执行 modify_file。" + "error": f"补丁内容过长({len(patch_text)}字符),超过{length_limit}字符上限", + "suggestion": "请拆分补丁后多次调用 write_file_diff。" } else: - valid, error, full_path = self.file_manager._validate_path(path) - if not valid: - result = {"success": False, "error": error} - else: - relative_path = str(full_path.relative_to(self.project_path)) - self.pending_modify_request = {"path": relative_path} - instructions = ( - f"\n请按照以下格式输出需要替换的全部内容,标记需独立成行,缩进必须和原文完全一致(包括首行缩进和整体缩进,任何一个字符不匹配都会导致失败):\n" - f"<<>>\n" - "[replace:1]\n" - "<>\n" - "(第一处需要修改的原文内容,必须逐字匹配(包含所有缩进和换行))\n" - "<>\n" - "<>\n" - "(第一处需要修改的新内容,可留空表示清空)\n" - "<>\n" - "[/replace]\n" - "[replace:2]\n" - "<>\n" - "(第二处需要修改的原文内容,必须逐字匹配,包含所有缩进和换行)\n" - "<>\n" - "<>\n" - "(第二处需要修改的新内容,可留空表示清空)\n" - "<>\n" - "[/replace]\n" - "...如需更多修改,请递增序号继续添加 [replace:n] 块。\n" - "<<>>\n" - "⚠️ 注意:每个 replace 块必须完整闭合,并且 OLD/NEW 内容必须与原始代码逐字匹配(包含所有缩进和换行)。\n" - "<<>>为起始标记,由**三个**<和>组成的闭合标记组成,内容为“MODIFY:”+文件的相对位置,整个标记中禁止有任何的换行,空格,必须完全匹配\n" - "<<>>为结束标记,同样由**三个**<和>组成的闭合标记组成,内容为“END_MODIFY”,整个标记中禁止有任何的换行,空格,必须完全匹配" - ) - result = { - "success": True, - "awaiting_content": True, - "path": relative_path, - "message": instructions - } - - elif tool_name == "append_to_file": - path = arguments.get("path") - if not path: - result = {"success": False, "error": "缺少必要参数: path"} - else: - valid, error, full_path = self.file_manager._validate_path(path) - if not valid: - result = {"success": False, "error": error} - else: - relative_path = str(full_path.relative_to(self.project_path)) - self.pending_append_request = {"path": relative_path} - instructions = ( - f"\n请按照以下格式输出需要追加到文件的完整内容,禁止输出任何解释性文字:\n" - f"<<>>\n" - "(在此行之后紧接着写入要追加的全部内容,可包含多行代码)\n" - "<<>>\n" - "⚠️ 注意:<<>> 与 <<>> 必须成对出现,内容之间不能包含解释或额外标记。\n" - ) - result = { - "success": True, - "awaiting_content": True, - "path": relative_path, - "message": instructions - } + diff_result = self.file_manager.apply_diff_patch(path, patch_text) + result = diff_result elif tool_name == "create_folder": result = self.file_manager.create_folder(arguments["path"]) @@ -2181,7 +2102,11 @@ class MainTerminal: else: success = self.memory_manager.write_task_memory(content) - result = {"success": success} + result = { + "success": success, + "memory_type": memory_type, + "operation": operation + } elif tool_name == "todo_create": result = self.todo_manager.create_todo_list( diff --git a/utils/context_manager.py b/utils/context_manager.py index 55bfd10..dc586b0 100644 --- a/utils/context_manager.py +++ b/utils/context_manager.py @@ -920,7 +920,20 @@ class ContextManager: root_label = f"{container_root} (映射自 {project_name})" lines.append(f"📁 {root_label}/") - def build_tree_recursive(tree_dict: Dict, prefix: str = ""): + ROOT_FOLDER_CHILD_LIMIT = 20 + + def count_descendants(item: Dict) -> int: + """计算某个文件夹下(含多层)所有子项数量。""" + if item.get("type") != "folder": + return 0 + children = item.get("children") or {} + total = len(children) + for child in children.values(): + if child.get("type") == "folder": + total += count_descendants(child) + return total + + def build_tree_recursive(tree_dict: Dict, prefix: str = "", depth: int = 0): """递归构建树形结构""" if not tree_dict: return @@ -953,8 +966,17 @@ class ContextManager: lines.append(f"{prefix}{current_connector}📁 {name}/") # 递归处理子项目 - if info.get("children"): - build_tree_recursive(info["children"], next_prefix) + children = info.get("children") or {} + if depth == 0: + total_entries = count_descendants(info) + else: + total_entries = None + if depth == 0 and total_entries is not None and total_entries > ROOT_FOLDER_CHILD_LIMIT: + lines.append( + f"{next_prefix}… (该目录包含 {total_entries} 项,已省略以控制 prompt 体积)" + ) + elif children: + build_tree_recursive(children, next_prefix, depth + 1) else: # 文件 icon = self._get_file_icon(name) diff --git a/utils/tool_result_formatter.py b/utils/tool_result_formatter.py new file mode 100644 index 0000000..cf54116 --- /dev/null +++ b/utils/tool_result_formatter.py @@ -0,0 +1,453 @@ +"""将工具执行结果转换为对话上下文可用的纯文本摘要。""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + + +def format_read_file_result(result_data: Dict[str, Any]) -> str: + """格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。""" + if not isinstance(result_data, dict): + return str(result_data) + if not result_data.get("success"): + return _format_failure("read_file", result_data) + + read_type = result_data.get("type", "read") + truncated_note = "(内容已截断)" if result_data.get("truncated") else "" + path = result_data.get("path", "未知路径") + max_chars = result_data.get("max_chars") + max_note = f"(max_chars={max_chars})" if max_chars else "" + + if read_type == "read": + header = ( + f"读取 {path} 行 {result_data.get('line_start')}~{result_data.get('line_end')} " + f"{max_note}{truncated_note}" + ).strip() + content = result_data.get("content", "") + return f"{header}\n```\n{content}\n```" + + if read_type == "search": + query = result_data.get("query", "") + actual = result_data.get("actual_matches", 0) + returned = result_data.get("returned_matches", 0) + case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写" + header = ( + f"在 {path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint}) " + f"{max_note}{truncated_note}" + ).strip() + match_texts: List[str] = [] + for idx, match in enumerate(result_data.get("matches", []), 1): + match_note = "(片段截断)" if match.get("truncated") else "" + hits = match.get("hits") or [] + hit_text = ", ".join(str(h) for h in hits) if hits else "无" + label = match.get("id") or f"match_{idx}" + snippet = match.get("snippet", "") + match_texts.append( + f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```" + ) + if not match_texts: + match_texts.append("未找到匹配内容。") + return "\n".join([header] + match_texts) + + if read_type == "extract": + segments = result_data.get("segments", []) + header = f"从 {path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip() + seg_texts: List[str] = [] + for idx, segment in enumerate(segments, 1): + seg_note = "(片段截断)" if segment.get("truncated") else "" + label = segment.get("label") or f"segment_{idx}" + snippet = segment.get("content", "") + seg_texts.append( + f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```" + ) + if not seg_texts: + seg_texts.append("未提供可抽取的片段。") + return "\n".join([header] + seg_texts) + + return _format_failure("read_file", {"error": "不支持的读取模式"}) + + +def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str: + """根据工具名称输出纯文本摘要,必要时附加关键信息。""" + if function_name == "read_file" and isinstance(result_data, dict): + return format_read_file_result(result_data) + + if function_name == "write_file_diff" and isinstance(result_data, dict): + return _format_write_file_diff(result_data, raw_text) + + if not isinstance(result_data, dict): + return raw_text + + handler = TOOL_FORMATTERS.get(function_name) + if handler: + return handler(result_data) + + summary = result_data.get("summary") or result_data.get("message") + error_msg = result_data.get("error") + parts: List[str] = [] + if summary: + parts.append(str(summary)) + if error_msg: + parts.append(f"⚠️ 错误: {error_msg}") + return "\n".join(parts) if parts else raw_text + + +def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str: + path = result_data.get("path", "目标文件") + summary = result_data.get("summary") or result_data.get("message") + completed = result_data.get("completed") or [] + failed_blocks = result_data.get("failed") or [] + lines = [f"[文件补丁] {path}"] + if summary: + lines.append(summary) + if completed: + lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}") + if failed_blocks: + fail_descriptions = [] + for item in failed_blocks[:3]: + idx = item.get("index") + reason = item.get("reason") or item.get("error") or "未说明原因" + fail_descriptions.append(f"#{idx}: {reason}") + lines.append("⚠️ 失败块: " + ";".join(fail_descriptions)) + if len(failed_blocks) > 3: + lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)") + detail_sections: List[str] = [] + for item in failed_blocks: + idx = item.get("index") + reason = item.get("reason") or item.get("error") or "未说明原因" + block_patch = item.get("block_patch") or item.get("patch") + if not block_patch: + old_text = item.get("old_text") or "" + new_text = item.get("new_text") or "" + synthetic_lines: List[str] = [] + if old_text: + synthetic_lines.extend(f"-{line}" for line in old_text.splitlines()) + if new_text: + synthetic_lines.extend(f"+{line}" for line in new_text.splitlines()) + if synthetic_lines: + block_patch = "\n".join(synthetic_lines) + detail_sections.append(f"- #{idx}: {reason}") + if block_patch: + detail_sections.append("```diff") + detail_sections.append(block_patch.rstrip("\n")) + detail_sections.append("```") + detail_sections.append("") + if detail_sections and detail_sections[-1] == "": + detail_sections.pop() + if detail_sections: + lines.append("⚠️ 失败块详情:") + lines.extend(detail_sections) + if result_data.get("success") is False and result_data.get("error"): + lines.append(f"⚠️ 错误: {result_data.get('error')}") + formatted = "\n".join(line for line in lines if line) + return formatted or raw_text + + +def _format_create_file(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("create_file", result_data) + return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}" + + +def _format_delete_file(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("delete_file", result_data) + path = result_data.get("path") or "未知路径" + action = result_data.get("action") or "deleted" + return f"已{action}文件: {path}" + + +def _format_rename_file(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("rename_file", result_data) + old_path = result_data.get("old_path") or "旧路径未知" + new_path = result_data.get("new_path") or "新路径未知" + return f"已重命名: {old_path} -> {new_path}" + + +def _format_create_folder(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("create_folder", result_data) + return f"已创建文件夹: {result_data.get('path', '未知路径')}" + + +def _format_focus_file(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("focus_file", result_data) + message = result_data.get("message") or "文件已聚焦" + size = result_data.get("file_size") + size_note = f"({size} 字符)" if isinstance(size, int) else "" + focused = result_data.get("focused_files") or [] + focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件" + return f"{message}{size_note}\n{focused_note}" + + +def _format_unfocus_file(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("unfocus_file", result_data) + message = result_data.get("message") or "已取消聚焦" + remaining = result_data.get("remaining_focused") or [] + remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件" + return f"{message}\n{remain_note}" + + +def _format_terminal_session(result_data: Dict[str, Any]) -> str: + action = result_data.get("action") or result_data.get("terminal_action") or "未知操作" + tag = f"terminal_session[{action}]" + if not result_data.get("success"): + return _format_failure(tag, result_data) + if action == "open": + return ( + f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}," + f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)" + ) + if action == "close": + new_active = result_data.get("new_active") or "无" + remaining = result_data.get("remaining_sessions") or [] + return ( + f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}。" + f"剩余会话: {', '.join(remaining) if remaining else '无'}" + ) + if action == "switch": + previous = result_data.get("previous") or "无" + current = result_data.get("current") or "未知" + return f"终端已从 {previous} 切换到 {current}。" + if action == "list": + sessions = result_data.get("sessions") or [] + total = result_data.get("total", len(sessions)) + max_allowed = result_data.get("max_allowed") + active = result_data.get("active") or "无" + header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}" + session_lines = [] + for session in sessions: + name = session.get("session_name") or session.get("name") or "未命名" + state = "运行中" if session.get("is_running") else "已停止" + marker = "⭐" if session.get("is_active") else " " + working_dir = session.get("working_dir") or "未知目录" + session_lines.append(f"{marker} {name} | {state} | {working_dir}") + return "\n".join([header] + session_lines) if session_lines else header + return result_data.get("message") or f"{tag} 操作已完成。" + + +def _format_terminal_input(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("terminal_input", result_data) + session = result_data.get("session") or result_data.get("session_name") or "default" + command = result_data.get("command") or "(命令缺失)" + status = result_data.get("status") or "completed" + message = result_data.get("message") or "" + lines = [ + f"terminal_input: 在 {session} 执行 `{command}`,状态 {status}", + ] + if message: + lines.append(message) + lines.append(_summarize_output_block(result_data.get("output"), result_data.get("truncated"))) + return "\n".join(lines) + + +def _format_sleep(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("sleep", result_data) + reason = result_data.get("reason") + timestamp = result_data.get("timestamp") + message = result_data.get("message") or "等待完成" + parts = [message] + if reason: + parts.append(f"原因:{reason}") + if timestamp: + parts.append(f"时间:{timestamp}") + return ";".join(parts) + + +def _format_run_command(result_data: Dict[str, Any]) -> str: + return _format_command_result("run_command", result_data) + + +def _format_run_python(result_data: Dict[str, Any]) -> str: + base = _format_command_result("run_python", result_data) + code = result_data.get("code") + if not isinstance(code, str): + return base + header = f"run_python: 执行临时代码({len(code)} 字符)" + return "\n".join([header, base]) + + +def _format_todo_create(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("todo_create", result_data) + todo = (result_data.get("todo_list") or {}).copy() + overview = todo.get("overview") or "未命名任务" + tasks = _summarize_todo_tasks(todo) + return f"已创建 TODO:{overview}\n{tasks}" + + +def _format_todo_update_task(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("todo_update_task", result_data) + message = result_data.get("message") or "任务状态已更新" + tasks = _summarize_todo_tasks(result_data.get("todo_list")) + return f"{message}\n{tasks}" if tasks else message + + +def _format_todo_finish(result_data: Dict[str, Any]) -> str: + if result_data.get("success"): + message = result_data.get("message") or "待办列表已结束" + tasks = _summarize_todo_tasks(result_data.get("todo_list")) + return f"{message}\n{tasks}" if tasks else message + if result_data.get("requires_confirmation"): + remaining = result_data.get("remaining") or [] + remain_note = ", ".join(remaining) if remaining else "未知" + return f"仍有未完成任务({remain_note}),需要确认是否提前结束。" + return _format_failure("todo_finish", result_data) + + +def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("todo_finish_confirm", result_data) + message = result_data.get("message") or "已处理 TODO 完结确认" + todo = result_data.get("todo_list") or {} + if todo.get("forced_finish"): + reason = todo.get("forced_reason") or "未提供原因" + message = f"{message}(强制结束,原因:{reason})" + return message + + +def _format_update_memory(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("update_memory", result_data) + mem_type = result_data.get("memory_type") or "main" + operation = result_data.get("operation") or "write" + verb = "追加" if operation == "append" else "覆盖" + label = "主记忆" if mem_type == "main" else "任务记忆" + return f"{label}已{verb}完成。" + + +def _format_create_sub_agent(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("create_sub_agent", result_data) + agent_id = result_data.get("agent_id") + task_id = result_data.get("task_id") + status = result_data.get("status") + refs = result_data.get("copied_references") or [] + ref_note = f",附带 {len(refs)} 份参考文件" if refs else "" + deliver_dir = result_data.get("deliverables_dir") + deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else "" + return f"子智能体 #{agent_id} 已创建(task_id={task_id},状态 {status}{ref_note}{deliver_note})。" + + +def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str: + task_id = result_data.get("task_id") + agent_id = result_data.get("agent_id") + status = result_data.get("status") + if result_data.get("success"): + copied_path = result_data.get("copied_path") or result_data.get("deliverables_path") + message = result_data.get("message") or "子智能体任务已完成。" + deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成" + return f"子智能体 #{agent_id}/{task_id} 完成:{message}({deliver_note})" + message = result_data.get("message") or result_data.get("error") or "子智能体任务失败" + return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}" + + +def _format_close_sub_agent(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("close_sub_agent", result_data) + message = result_data.get("message") or "子智能体已关闭。" + task_id = result_data.get("task_id") + status = result_data.get("status") + status_note = f"(状态 {status})" if status else "" + return f"{message}{status_note}(task_id={task_id})" + + +def _format_failure(tag: str, result_data: Dict[str, Any]) -> str: + error = result_data.get("error") or result_data.get("message") or "未知错误" + suggestion = result_data.get("suggestion") + details = result_data.get("details") + parts = [f"⚠️ {tag} 失败: {error}"] + if suggestion: + parts.append(f"建议:{suggestion}") + elif isinstance(details, str) and details: + parts.append(f"详情:{details}") + elif isinstance(details, dict): + detail_msg = details.get("message") or details.get("error") + if detail_msg: + parts.append(f"详情:{detail_msg}") + return ";".join(parts) + + +def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str: + if not output: + return "无可见输出" + lines = output.splitlines() + line_count = len(lines) + char_count = len(output) + meta = f"输出 {line_count} 行 / {char_count} 字符" + if truncated: + meta += "(已截断)" + return f"{meta}\n```\n{output}\n```" + + +def _format_command_result(label: str, result_data: Dict[str, Any]) -> str: + command = result_data.get("command") or "" + return_code = result_data.get("return_code") + success = result_data.get("success") + status = result_data.get("status") + output = result_data.get("output") + truncated = result_data.get("truncated") + message = result_data.get("message") + + if success: + header = f"{label}: `{command}`" if command else label + if return_code is not None and return_code != "": + header += f" (return_code={return_code})" + lines = [header] + if status and status not in {"completed", "success"}: + lines.append(f"终端状态: {status}") + if message: + lines.append(message) + lines.append(_summarize_output_block(output, truncated)) + return "\n".join(lines) + + error_msg = result_data.get("error") or message or "执行失败" + header = f"⚠️ {label} 失败" + if command: + header += f"(命令 `{command}`)" + lines = [f"{header}: {error_msg}"] + if return_code not in {None, ""}: + lines.append(f"返回码: {return_code}") + if output: + lines.append(_summarize_output_block(output, truncated)) + return "\n".join(lines) + + +def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str: + if not isinstance(todo, dict): + return "" + tasks = todo.get("tasks") or [] + parts = [] + for task in tasks: + status_icon = "✅" if task.get("status") == "done" else "⬜️" + parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}") + return ";".join(parts) + + +TOOL_FORMATTERS = { + "create_file": _format_create_file, + "delete_file": _format_delete_file, + "rename_file": _format_rename_file, + "create_folder": _format_create_folder, + "focus_file": _format_focus_file, + "unfocus_file": _format_unfocus_file, + "terminal_session": _format_terminal_session, + "terminal_input": _format_terminal_input, + "sleep": _format_sleep, + "run_command": _format_run_command, + "run_python": _format_run_python, + "todo_create": _format_todo_create, + "todo_update_task": _format_todo_update_task, + "todo_finish": _format_todo_finish, + "todo_finish_confirm": _format_todo_finish_confirm, + "update_memory": _format_update_memory, + "create_sub_agent": _format_create_sub_agent, + "wait_sub_agent": _format_wait_sub_agent, + "close_sub_agent": _format_close_sub_agent, +} diff --git a/web_server.py b/web_server.py index 7992be5..8638a16 100644 --- a/web_server.py +++ b/web_server.py @@ -60,6 +60,7 @@ from modules.personalization_manager import ( ) from modules.user_container_manager import UserContainerManager from modules.usage_tracker import UsageTracker +from utils.tool_result_formatter import format_tool_result_for_context app = Flask(__name__, static_folder='static') app.config['MAX_CONTENT_LENGTH'] = MAX_UPLOAD_SIZE @@ -123,65 +124,6 @@ FAILED_LOGIN_LOCK_SECONDS = 300 SOCKET_TOKEN_TTL_SECONDS = 45 -def format_read_file_result(result_data: Dict) -> str: - """格式化 read_file 工具的输出,便于在Web端展示。""" - if not isinstance(result_data, dict): - return json.dumps(result_data, ensure_ascii=False) - if not result_data.get("success"): - return json.dumps(result_data, ensure_ascii=False) - - read_type = result_data.get("type", "read") - truncated_note = "(内容已截断)" if result_data.get("truncated") else "" - path = result_data.get("path", "未知路径") - max_chars = result_data.get("max_chars") - max_note = f"(max_chars={max_chars})" if max_chars else "" - - if read_type == "read": - header = f"读取 {path} 行 {result_data.get('line_start')}~{result_data.get('line_end')} {max_note}{truncated_note}".strip() - content = result_data.get("content", "") - return f"{header}\n```\n{content}\n```" - - if read_type == "search": - query = result_data.get("query", "") - actual = result_data.get("actual_matches", 0) - returned = result_data.get("returned_matches", 0) - case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写" - header = ( - f"在 {path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint}) " - f"{max_note}{truncated_note}" - ).strip() - match_texts = [] - for idx, match in enumerate(result_data.get("matches", []), 1): - match_note = "(片段截断)" if match.get("truncated") else "" - hits = match.get("hits") or [] - hit_text = ", ".join(str(h) for h in hits) if hits else "无" - label = match.get("id") or f"match_{idx}" - snippet = match.get("snippet", "") - match_texts.append( - f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```" - ) - if not match_texts: - match_texts.append("未找到匹配内容。") - return "\n".join([header] + match_texts) - - if read_type == "extract": - segments = result_data.get("segments", []) - header = f"从 {path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip() - seg_texts = [] - for idx, segment in enumerate(segments, 1): - seg_note = "(片段截断)" if segment.get("truncated") else "" - label = segment.get("label") or f"segment_{idx}" - snippet = segment.get("content", "") - seg_texts.append( - f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```" - ) - if not seg_texts: - seg_texts.append("未提供可抽取的片段。") - return "\n".join([header] + seg_texts) - - return json.dumps(result_data, ensure_ascii=False) - - def sanitize_filename_preserve_unicode(filename: str) -> str: """在保留中文等字符的同时,移除危险字符和路径成分""" if not filename: @@ -358,73 +300,6 @@ def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], conte body = "(无附加输出)" return f"{header}\n{body}" -def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str) -> str: - """将工具结果转成适合写入对话的简洁文本。""" - if function_name == "read_file" and isinstance(result_data, dict): - return format_read_file_result(result_data) - - if function_name == "write_file_diff" and isinstance(result_data, dict): - path = result_data.get("path", "目标文件") - summary = result_data.get("summary") or result_data.get("message") - completed = result_data.get("completed") or [] - failed_blocks = result_data.get("failed") or [] - lines = [f"[文件补丁] {path}"] - if summary: - lines.append(summary) - if completed: - lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}") - if failed_blocks: - fail_descriptions = [] - for item in failed_blocks[:3]: - idx = item.get("index") - reason = item.get("reason") or item.get("error") or "未说明原因" - fail_descriptions.append(f"#{idx}: {reason}") - lines.append("⚠️ 失败块: " + ";".join(fail_descriptions)) - if len(failed_blocks) > 3: - lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)") - detail_sections: List[str] = [] - for item in failed_blocks: - idx = item.get("index") - reason = item.get("reason") or item.get("error") or "未说明原因" - block_patch = item.get("block_patch") or item.get("patch") - if not block_patch: - old_text = item.get("old_text") or "" - new_text = item.get("new_text") or "" - synthetic_lines: List[str] = [] - if old_text: - synthetic_lines.extend(f"-{line}" for line in old_text.splitlines()) - if new_text: - synthetic_lines.extend(f"+{line}" for line in new_text.splitlines()) - if synthetic_lines: - block_patch = "\n".join(synthetic_lines) - detail_sections.append(f"- #{idx}: {reason}") - if block_patch: - detail_sections.append("```diff") - detail_sections.append(block_patch.rstrip("\n")) - detail_sections.append("```") - detail_sections.append("") - if detail_sections and detail_sections[-1] == "": - detail_sections.pop() - if detail_sections: - lines.append("⚠️ 失败块详情:") - lines.extend(detail_sections) - if result_data.get("success") is False and result_data.get("error"): - lines.append(f"⚠️ 错误: {result_data.get('error')}") - formatted = "\n".join(line for line in lines if line) - return formatted or raw_text - - if isinstance(result_data, dict): - parts = [] - summary = result_data.get("summary") or result_data.get("message") - if summary: - parts.append(str(summary)) - error_msg = result_data.get("error") - if error_msg: - parts.append(f"⚠️ 错误: {error_msg}") - if parts: - return "\n".join(parts) - return raw_text - # 创建调试日志文件 DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log" CHUNK_BACKEND_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "chunk_backend.log"