fix: compact tool outputs and file tree context

This commit is contained in:
JOJO 2025-12-02 09:33:03 +08:00
parent 51bb0f1033
commit 8cc3a24abf
4 changed files with 519 additions and 244 deletions

View File

@ -62,6 +62,7 @@ from modules.container_monitor import collect_stats, inspect_state
from core.tool_config import TOOL_CATEGORIES
from utils.api_client import DeepSeekClient
from utils.context_manager import ContextManager
from utils.tool_result_formatter import format_tool_result_for_context
from utils.logger import setup_logger
if TYPE_CHECKING:
@ -131,10 +132,6 @@ class MainTerminal:
self.focused_files = {} # {path: content} 存储聚焦的文件内容
self.current_session_id = 0 # 用于标识不同的任务会话
# 新增:追加内容状态
self.pending_append_request = None # {"path": str}
self.pending_modify_request = None # {"path": str}
# 工具启用状态
self.tool_category_states = {
key: category.default_enabled
@ -665,16 +662,12 @@ class MainTerminal:
collected_tool_calls.append(tool_call_info)
# 处理工具结果用于保存
result_data = {}
try:
result_data = json.loads(result)
if tool_name == "read_file" and result_data.get("success"):
file_content = result_data.get("content", "")
tool_result_content = f"文件内容:\n```\n{file_content}\n```\n大小: {result_data.get('size')} 字节"
else:
tool_result_content = result
except:
tool_result_content = result
parsed = json.loads(result)
result_data = parsed if isinstance(parsed, dict) else {}
except Exception:
result_data = {}
tool_result_content = format_tool_result_for_context(tool_name, result_data, result)
# 收集工具结果(不保存)
collected_tool_results.append({
@ -682,7 +675,8 @@ class MainTerminal:
"name": tool_name,
"content": tool_result_content,
"system_message": result_data.get("system_message") if isinstance(result_data, dict) else None,
"task_id": result_data.get("task_id") if isinstance(result_data, dict) else None
"task_id": result_data.get("task_id") if isinstance(result_data, dict) else None,
"raw_result_data": result_data if result_data else None,
})
return result
@ -726,12 +720,8 @@ class MainTerminal:
if system_message:
self._record_sub_agent_message(system_message, tool_result.get("task_id"), inline=False)
# 补充TODO完成提示放在tool消息之后保证格式正确
todo_note = None
try:
parsed = json.loads(tool_result["content"])
todo_note = parsed.get("system_note")
except Exception:
todo_note = None
raw_payload = tool_result.get("raw_result_data") or {}
todo_note = raw_payload.get("system_note")
if todo_note:
self.context_manager.add_conversation("system", todo_note)
@ -746,8 +736,8 @@ class MainTerminal:
print(f"{OUTPUT_FORMATS['file']} 读取文件")
elif tool_name == "ocr_image":
print(f"{OUTPUT_FORMATS['file']} 图片OCR")
elif tool_name == "modify_file":
print(f"{OUTPUT_FORMATS['file']} 修改文件")
elif tool_name == "write_file_diff":
print(f"{OUTPUT_FORMATS['file']} 应用补丁")
elif tool_name == "delete_file":
print(f"{OUTPUT_FORMATS['file']} 删除文件")
elif tool_name == "terminal_session":
@ -1084,7 +1074,7 @@ class MainTerminal:
"type": "function",
"function": {
"name": "create_file",
"description": "创建新文件(仅创建空文件,正文请使用 append_to_file 追加",
"description": "创建新文件(仅创建空文件,正文请使用 write_file_diff 提交补丁",
"parameters": {
"type": "object",
"properties": {
@ -1217,28 +1207,18 @@ class MainTerminal:
{
"type": "function",
"function": {
"name": "modify_file",
"description": "准备替换文件中的指定内容。调用后系统会发放写入窗口,请严格按照模板输出<<<MODIFY:path>>>…<<<END_MODIFY>>>结构",
"name": "write_file_diff",
"description": "使用统一 diff@@ 块、- / + 行)直接写入文件,可一次性完成追加/替换/删除。每个块以 @@ [id:数字] 开头,块内只能包含上下文行(空格开头)、- 原文行、+ 新内容行,请务必包含足够的上下文以确保定位准确",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件路径"}
"path": {"type": "string", "description": "目标文件路径(相对项目根目录)。"},
"patch": {
"type": "string",
"description": "完整补丁文本,格式类似 unified diff必须用 *** Begin Patch / *** End Patch 包裹,并用 @@ [id:数字] 划分多个修改块。示例:*** Begin Patch\\n@@ [id:1]\\n def main():\\n- def greet(self):\\n- return \"hi\"\\n+ def greet(self, name: str) -> str:\\n+ message = f\"Hello, {name}!\"\\n+ return message\\n@@ [id:2]\\n+\\n+if __name__ == \"__main__\":\\n+ print(\"hello world\")\\n*** End Patch"
}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "append_to_file",
"description": "准备向文件追加大段内容。调用后必须按照系统指令输出<<<APPEND:path>>>...<<<END_APPEND>>>格式的正文,禁止夹带解释性文字。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件路径"}
},
"required": ["path"]
"required": ["path", "patch"]
}
}
},
@ -1671,13 +1651,13 @@ class MainTerminal:
}, ensure_ascii=False)
# 针对特定工具的内容检查
if tool_name in ["modify_file", "create_file"] and "content" in arguments:
if tool_name == "create_file" and "content" in arguments:
content = arguments.get("content", "")
if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 30KB内容限制
return json.dumps({
"success": False,
"error": f"文件内容过长({len(content)}字符),建议分块处理",
"suggestion": "请拆分内容或使用 modify_file 工具输出结构化补丁"
"suggestion": "请拆分内容或使用 write_file_diff 工具输出结构化补丁"
}, ensure_ascii=False)
# 检查内容中的特殊字符
@ -1732,6 +1712,7 @@ class MainTerminal:
else:
result = {"success": False, "error": f"未知操作: {action}"}
result["action"] = action
# 终端输入工具
elif tool_name == "terminal_input":
@ -1807,8 +1788,7 @@ class MainTerminal:
)
if result.get("success"):
result["message"] = (
f"已创建空文件: {result['path']}。请使用 append_to_file "
"追加正文内容,或使用 modify_file 进行小范围替换。"
f"已创建空文件: {result['path']}。请使用 write_file_diff 提交补丁写入正文。"
)
elif tool_name == "delete_file":
@ -1849,81 +1829,22 @@ class MainTerminal:
self.focused_files[new_path] = content
print(f"🔍 已更新文件聚焦: {old_path} -> {new_path}")
elif tool_name == "modify_file":
elif tool_name == "write_file_diff":
path = arguments.get("path")
if not path:
result = {"success": False, "error": "缺少必要参数: path"}
patch_text = arguments.get("patch")
if not path or not patch_text:
result = {"success": False, "error": "缺少必要参数: path/patch"}
else:
if self.pending_append_request:
active_path = self.pending_append_request.get("path")
length_limit = 30000
if not DISABLE_LENGTH_CHECK and len(patch_text) > length_limit:
result = {
"success": False,
"error": f"当前仍有 append_to_file 任务未完成: {active_path}",
"suggestion": "先完成追加,再继续执行 modify_file"
"error": f"补丁内容过长({len(patch_text)}字符),超过{length_limit}字符上限",
"suggestion": "拆分补丁后多次调用 write_file_diff"
}
else:
valid, error, full_path = self.file_manager._validate_path(path)
if not valid:
result = {"success": False, "error": error}
else:
relative_path = str(full_path.relative_to(self.project_path))
self.pending_modify_request = {"path": relative_path}
instructions = (
f"\n请按照以下格式输出需要替换的全部内容,标记需独立成行,缩进必须和原文完全一致(包括首行缩进和整体缩进,任何一个字符不匹配都会导致失败):\n"
f"<<<MODIFY:{relative_path}>>>\n"
"[replace:1]\n"
"<<OLD>>\n"
"(第一处需要修改的原文内容,必须逐字匹配(包含所有缩进和换行))\n"
"<<END>>\n"
"<<NEW>>\n"
"(第一处需要修改的新内容,可留空表示清空)\n"
"<<END>>\n"
"[/replace]\n"
"[replace:2]\n"
"<<OLD>>\n"
"(第二处需要修改的原文内容,必须逐字匹配,包含所有缩进和换行)\n"
"<<END>>\n"
"<<NEW>>\n"
"(第二处需要修改的新内容,可留空表示清空)\n"
"<<END>>\n"
"[/replace]\n"
"...如需更多修改,请递增序号继续添加 [replace:n] 块。\n"
"<<<END_MODIFY>>>\n"
"⚠️ 注意:每个 replace 块必须完整闭合,并且 OLD/NEW 内容必须与原始代码逐字匹配(包含所有缩进和换行)。\n"
"<<<MODIFY:{relative_path}>>>为起始标记,由**三个**<和>组成的闭合标记组成内容为“MODIFY:”+文件的相对位置,整个标记中禁止有任何的换行,空格,必须完全匹配\n"
"<<<END_MODIFY>>>为结束标记,同样由**三个**<和>组成的闭合标记组成内容为“END_MODIFY”整个标记中禁止有任何的换行空格必须完全匹配"
)
result = {
"success": True,
"awaiting_content": True,
"path": relative_path,
"message": instructions
}
elif tool_name == "append_to_file":
path = arguments.get("path")
if not path:
result = {"success": False, "error": "缺少必要参数: path"}
else:
valid, error, full_path = self.file_manager._validate_path(path)
if not valid:
result = {"success": False, "error": error}
else:
relative_path = str(full_path.relative_to(self.project_path))
self.pending_append_request = {"path": relative_path}
instructions = (
f"\n请按照以下格式输出需要追加到文件的完整内容,禁止输出任何解释性文字:\n"
f"<<<APPEND:{relative_path}>>>\n"
"(在此行之后紧接着写入要追加的全部内容,可包含多行代码)\n"
"<<<END_APPEND>>>\n"
"⚠️ 注意:<<<APPEND>>> 与 <<<END_APPEND>>> 必须成对出现,内容之间不能包含解释或额外标记。\n"
)
result = {
"success": True,
"awaiting_content": True,
"path": relative_path,
"message": instructions
}
diff_result = self.file_manager.apply_diff_patch(path, patch_text)
result = diff_result
elif tool_name == "create_folder":
result = self.file_manager.create_folder(arguments["path"])
@ -2181,7 +2102,11 @@ class MainTerminal:
else:
success = self.memory_manager.write_task_memory(content)
result = {"success": success}
result = {
"success": success,
"memory_type": memory_type,
"operation": operation
}
elif tool_name == "todo_create":
result = self.todo_manager.create_todo_list(

View File

@ -920,7 +920,20 @@ class ContextManager:
root_label = f"{container_root} (映射自 {project_name})"
lines.append(f"📁 {root_label}/")
def build_tree_recursive(tree_dict: Dict, prefix: str = ""):
ROOT_FOLDER_CHILD_LIMIT = 20
def count_descendants(item: Dict) -> int:
"""计算某个文件夹下(含多层)所有子项数量。"""
if item.get("type") != "folder":
return 0
children = item.get("children") or {}
total = len(children)
for child in children.values():
if child.get("type") == "folder":
total += count_descendants(child)
return total
def build_tree_recursive(tree_dict: Dict, prefix: str = "", depth: int = 0):
"""递归构建树形结构"""
if not tree_dict:
return
@ -953,8 +966,17 @@ class ContextManager:
lines.append(f"{prefix}{current_connector}📁 {name}/")
# 递归处理子项目
if info.get("children"):
build_tree_recursive(info["children"], next_prefix)
children = info.get("children") or {}
if depth == 0:
total_entries = count_descendants(info)
else:
total_entries = None
if depth == 0 and total_entries is not None and total_entries > ROOT_FOLDER_CHILD_LIMIT:
lines.append(
f"{next_prefix}… (该目录包含 {total_entries} 项,已省略以控制 prompt 体积)"
)
elif children:
build_tree_recursive(children, next_prefix, depth + 1)
else:
# 文件
icon = self._get_file_icon(name)

View File

@ -0,0 +1,453 @@
"""将工具执行结果转换为对话上下文可用的纯文本摘要。"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
def format_read_file_result(result_data: Dict[str, Any]) -> str:
"""格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。"""
if not isinstance(result_data, dict):
return str(result_data)
if not result_data.get("success"):
return _format_failure("read_file", result_data)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = (
f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} "
f"{max_note}{truncated_note}"
).strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts: List[str] = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts: List[str] = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return _format_failure("read_file", {"error": "不支持的读取模式"})
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str:
"""根据工具名称输出纯文本摘要,必要时附加关键信息。"""
if function_name == "read_file" and isinstance(result_data, dict):
return format_read_file_result(result_data)
if function_name == "write_file_diff" and isinstance(result_data, dict):
return _format_write_file_diff(result_data, raw_text)
if not isinstance(result_data, dict):
return raw_text
handler = TOOL_FORMATTERS.get(function_name)
if handler:
return handler(result_data)
summary = result_data.get("summary") or result_data.get("message")
error_msg = result_data.get("error")
parts: List[str] = []
if summary:
parts.append(str(summary))
if error_msg:
parts.append(f"⚠️ 错误: {error_msg}")
return "\n".join(parts) if parts else raw_text
def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str:
path = result_data.get("path", "目标文件")
summary = result_data.get("summary") or result_data.get("message")
completed = result_data.get("completed") or []
failed_blocks = result_data.get("failed") or []
lines = [f"[文件补丁] {path}"]
if summary:
lines.append(summary)
if completed:
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
if failed_blocks:
fail_descriptions = []
for item in failed_blocks[:3]:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
fail_descriptions.append(f"#{idx}: {reason}")
lines.append("⚠️ 失败块: " + "".join(fail_descriptions))
if len(failed_blocks) > 3:
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
detail_sections: List[str] = []
for item in failed_blocks:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
block_patch = item.get("block_patch") or item.get("patch")
if not block_patch:
old_text = item.get("old_text") or ""
new_text = item.get("new_text") or ""
synthetic_lines: List[str] = []
if old_text:
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
if new_text:
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
if synthetic_lines:
block_patch = "\n".join(synthetic_lines)
detail_sections.append(f"- #{idx}: {reason}")
if block_patch:
detail_sections.append("```diff")
detail_sections.append(block_patch.rstrip("\n"))
detail_sections.append("```")
detail_sections.append("")
if detail_sections and detail_sections[-1] == "":
detail_sections.pop()
if detail_sections:
lines.append("⚠️ 失败块详情:")
lines.extend(detail_sections)
if result_data.get("success") is False and result_data.get("error"):
lines.append(f"⚠️ 错误: {result_data.get('error')}")
formatted = "\n".join(line for line in lines if line)
return formatted or raw_text
def _format_create_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_file", result_data)
return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}"
def _format_delete_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("delete_file", result_data)
path = result_data.get("path") or "未知路径"
action = result_data.get("action") or "deleted"
return f"{action}文件: {path}"
def _format_rename_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("rename_file", result_data)
old_path = result_data.get("old_path") or "旧路径未知"
new_path = result_data.get("new_path") or "新路径未知"
return f"已重命名: {old_path} -> {new_path}"
def _format_create_folder(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_folder", result_data)
return f"已创建文件夹: {result_data.get('path', '未知路径')}"
def _format_focus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("focus_file", result_data)
message = result_data.get("message") or "文件已聚焦"
size = result_data.get("file_size")
size_note = f"{size} 字符)" if isinstance(size, int) else ""
focused = result_data.get("focused_files") or []
focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件"
return f"{message}{size_note}\n{focused_note}"
def _format_unfocus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("unfocus_file", result_data)
message = result_data.get("message") or "已取消聚焦"
remaining = result_data.get("remaining_focused") or []
remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件"
return f"{message}\n{remain_note}"
def _format_terminal_session(result_data: Dict[str, Any]) -> str:
action = result_data.get("action") or result_data.get("terminal_action") or "未知操作"
tag = f"terminal_session[{action}]"
if not result_data.get("success"):
return _format_failure(tag, result_data)
if action == "open":
return (
f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}"
f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)"
)
if action == "close":
new_active = result_data.get("new_active") or ""
remaining = result_data.get("remaining_sessions") or []
return (
f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}"
f"剩余会话: {', '.join(remaining) if remaining else ''}"
)
if action == "switch":
previous = result_data.get("previous") or ""
current = result_data.get("current") or "未知"
return f"终端已从 {previous} 切换到 {current}"
if action == "list":
sessions = result_data.get("sessions") or []
total = result_data.get("total", len(sessions))
max_allowed = result_data.get("max_allowed")
active = result_data.get("active") or ""
header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}"
session_lines = []
for session in sessions:
name = session.get("session_name") or session.get("name") or "未命名"
state = "运行中" if session.get("is_running") else "已停止"
marker = "" if session.get("is_active") else " "
working_dir = session.get("working_dir") or "未知目录"
session_lines.append(f"{marker} {name} | {state} | {working_dir}")
return "\n".join([header] + session_lines) if session_lines else header
return result_data.get("message") or f"{tag} 操作已完成。"
def _format_terminal_input(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("terminal_input", result_data)
session = result_data.get("session") or result_data.get("session_name") or "default"
command = result_data.get("command") or "(命令缺失)"
status = result_data.get("status") or "completed"
message = result_data.get("message") or ""
lines = [
f"terminal_input: 在 {session} 执行 `{command}`,状态 {status}",
]
if message:
lines.append(message)
lines.append(_summarize_output_block(result_data.get("output"), result_data.get("truncated")))
return "\n".join(lines)
def _format_sleep(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("sleep", result_data)
reason = result_data.get("reason")
timestamp = result_data.get("timestamp")
message = result_data.get("message") or "等待完成"
parts = [message]
if reason:
parts.append(f"原因:{reason}")
if timestamp:
parts.append(f"时间:{timestamp}")
return "".join(parts)
def _format_run_command(result_data: Dict[str, Any]) -> str:
return _format_command_result("run_command", result_data)
def _format_run_python(result_data: Dict[str, Any]) -> str:
base = _format_command_result("run_python", result_data)
code = result_data.get("code")
if not isinstance(code, str):
return base
header = f"run_python: 执行临时代码({len(code)} 字符)"
return "\n".join([header, base])
def _format_todo_create(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_create", result_data)
todo = (result_data.get("todo_list") or {}).copy()
overview = todo.get("overview") or "未命名任务"
tasks = _summarize_todo_tasks(todo)
return f"已创建 TODO{overview}\n{tasks}"
def _format_todo_update_task(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_update_task", result_data)
message = result_data.get("message") or "任务状态已更新"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
def _format_todo_finish(result_data: Dict[str, Any]) -> str:
if result_data.get("success"):
message = result_data.get("message") or "待办列表已结束"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
if result_data.get("requires_confirmation"):
remaining = result_data.get("remaining") or []
remain_note = ", ".join(remaining) if remaining else "未知"
return f"仍有未完成任务({remain_note}),需要确认是否提前结束。"
return _format_failure("todo_finish", result_data)
def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_finish_confirm", result_data)
message = result_data.get("message") or "已处理 TODO 完结确认"
todo = result_data.get("todo_list") or {}
if todo.get("forced_finish"):
reason = todo.get("forced_reason") or "未提供原因"
message = f"{message}(强制结束,原因:{reason}"
return message
def _format_update_memory(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("update_memory", result_data)
mem_type = result_data.get("memory_type") or "main"
operation = result_data.get("operation") or "write"
verb = "追加" if operation == "append" else "覆盖"
label = "主记忆" if mem_type == "main" else "任务记忆"
return f"{label}{verb}完成。"
def _format_create_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_sub_agent", result_data)
agent_id = result_data.get("agent_id")
task_id = result_data.get("task_id")
status = result_data.get("status")
refs = result_data.get("copied_references") or []
ref_note = f",附带 {len(refs)} 份参考文件" if refs else ""
deliver_dir = result_data.get("deliverables_dir")
deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else ""
return f"子智能体 #{agent_id} 已创建task_id={task_id},状态 {status}{ref_note}{deliver_note})。"
def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str:
task_id = result_data.get("task_id")
agent_id = result_data.get("agent_id")
status = result_data.get("status")
if result_data.get("success"):
copied_path = result_data.get("copied_path") or result_data.get("deliverables_path")
message = result_data.get("message") or "子智能体任务已完成。"
deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成"
return f"子智能体 #{agent_id}/{task_id} 完成:{message}{deliver_note}"
message = result_data.get("message") or result_data.get("error") or "子智能体任务失败"
return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}"
def _format_close_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("close_sub_agent", result_data)
message = result_data.get("message") or "子智能体已关闭。"
task_id = result_data.get("task_id")
status = result_data.get("status")
status_note = f"(状态 {status}" if status else ""
return f"{message}{status_note}task_id={task_id}"
def _format_failure(tag: str, result_data: Dict[str, Any]) -> str:
error = result_data.get("error") or result_data.get("message") or "未知错误"
suggestion = result_data.get("suggestion")
details = result_data.get("details")
parts = [f"⚠️ {tag} 失败: {error}"]
if suggestion:
parts.append(f"建议:{suggestion}")
elif isinstance(details, str) and details:
parts.append(f"详情:{details}")
elif isinstance(details, dict):
detail_msg = details.get("message") or details.get("error")
if detail_msg:
parts.append(f"详情:{detail_msg}")
return "".join(parts)
def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str:
if not output:
return "无可见输出"
lines = output.splitlines()
line_count = len(lines)
char_count = len(output)
meta = f"输出 {line_count} 行 / {char_count} 字符"
if truncated:
meta += "(已截断)"
return f"{meta}\n```\n{output}\n```"
def _format_command_result(label: str, result_data: Dict[str, Any]) -> str:
command = result_data.get("command") or ""
return_code = result_data.get("return_code")
success = result_data.get("success")
status = result_data.get("status")
output = result_data.get("output")
truncated = result_data.get("truncated")
message = result_data.get("message")
if success:
header = f"{label}: `{command}`" if command else label
if return_code is not None and return_code != "":
header += f" (return_code={return_code})"
lines = [header]
if status and status not in {"completed", "success"}:
lines.append(f"终端状态: {status}")
if message:
lines.append(message)
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
error_msg = result_data.get("error") or message or "执行失败"
header = f"⚠️ {label} 失败"
if command:
header += f"(命令 `{command}`"
lines = [f"{header}: {error_msg}"]
if return_code not in {None, ""}:
lines.append(f"返回码: {return_code}")
if output:
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str:
if not isinstance(todo, dict):
return ""
tasks = todo.get("tasks") or []
parts = []
for task in tasks:
status_icon = "" if task.get("status") == "done" else "⬜️"
parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}")
return "".join(parts)
TOOL_FORMATTERS = {
"create_file": _format_create_file,
"delete_file": _format_delete_file,
"rename_file": _format_rename_file,
"create_folder": _format_create_folder,
"focus_file": _format_focus_file,
"unfocus_file": _format_unfocus_file,
"terminal_session": _format_terminal_session,
"terminal_input": _format_terminal_input,
"sleep": _format_sleep,
"run_command": _format_run_command,
"run_python": _format_run_python,
"todo_create": _format_todo_create,
"todo_update_task": _format_todo_update_task,
"todo_finish": _format_todo_finish,
"todo_finish_confirm": _format_todo_finish_confirm,
"update_memory": _format_update_memory,
"create_sub_agent": _format_create_sub_agent,
"wait_sub_agent": _format_wait_sub_agent,
"close_sub_agent": _format_close_sub_agent,
}

View File

@ -60,6 +60,7 @@ from modules.personalization_manager import (
)
from modules.user_container_manager import UserContainerManager
from modules.usage_tracker import UsageTracker
from utils.tool_result_formatter import format_tool_result_for_context
app = Flask(__name__, static_folder='static')
app.config['MAX_CONTENT_LENGTH'] = MAX_UPLOAD_SIZE
@ -123,65 +124,6 @@ FAILED_LOGIN_LOCK_SECONDS = 300
SOCKET_TOKEN_TTL_SECONDS = 45
def format_read_file_result(result_data: Dict) -> str:
"""格式化 read_file 工具的输出便于在Web端展示。"""
if not isinstance(result_data, dict):
return json.dumps(result_data, ensure_ascii=False)
if not result_data.get("success"):
return json.dumps(result_data, ensure_ascii=False)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} {max_note}{truncated_note}".strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return json.dumps(result_data, ensure_ascii=False)
def sanitize_filename_preserve_unicode(filename: str) -> str:
"""在保留中文等字符的同时,移除危险字符和路径成分"""
if not filename:
@ -358,73 +300,6 @@ def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], conte
body = "(无附加输出)"
return f"{header}\n{body}"
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str) -> str:
"""将工具结果转成适合写入对话的简洁文本。"""
if function_name == "read_file" and isinstance(result_data, dict):
return format_read_file_result(result_data)
if function_name == "write_file_diff" and isinstance(result_data, dict):
path = result_data.get("path", "目标文件")
summary = result_data.get("summary") or result_data.get("message")
completed = result_data.get("completed") or []
failed_blocks = result_data.get("failed") or []
lines = [f"[文件补丁] {path}"]
if summary:
lines.append(summary)
if completed:
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
if failed_blocks:
fail_descriptions = []
for item in failed_blocks[:3]:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
fail_descriptions.append(f"#{idx}: {reason}")
lines.append("⚠️ 失败块: " + "".join(fail_descriptions))
if len(failed_blocks) > 3:
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
detail_sections: List[str] = []
for item in failed_blocks:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
block_patch = item.get("block_patch") or item.get("patch")
if not block_patch:
old_text = item.get("old_text") or ""
new_text = item.get("new_text") or ""
synthetic_lines: List[str] = []
if old_text:
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
if new_text:
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
if synthetic_lines:
block_patch = "\n".join(synthetic_lines)
detail_sections.append(f"- #{idx}: {reason}")
if block_patch:
detail_sections.append("```diff")
detail_sections.append(block_patch.rstrip("\n"))
detail_sections.append("```")
detail_sections.append("")
if detail_sections and detail_sections[-1] == "":
detail_sections.pop()
if detail_sections:
lines.append("⚠️ 失败块详情:")
lines.extend(detail_sections)
if result_data.get("success") is False and result_data.get("error"):
lines.append(f"⚠️ 错误: {result_data.get('error')}")
formatted = "\n".join(line for line in lines if line)
return formatted or raw_text
if isinstance(result_data, dict):
parts = []
summary = result_data.get("summary") or result_data.get("message")
if summary:
parts.append(str(summary))
error_msg = result_data.get("error")
if error_msg:
parts.append(f"⚠️ 错误: {error_msg}")
if parts:
return "\n".join(parts)
return raw_text
# 创建调试日志文件
DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log"
CHUNK_BACKEND_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "chunk_backend.log"