agent-Specialization/utils/tool_result_formatter.py

573 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""将工具执行结果转换为对话上下文可用的纯文本摘要。"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
def format_read_file_result(result_data: Dict[str, Any]) -> str:
"""格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。"""
if not isinstance(result_data, dict):
return str(result_data)
if not result_data.get("success"):
return _format_failure("read_file", result_data)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = (
f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} "
f"{max_note}{truncated_note}"
).strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts: List[str] = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts: List[str] = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return _format_failure("read_file", {"error": "不支持的读取模式"})
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str:
"""根据工具名称输出纯文本摘要,必要时附加关键信息。"""
if function_name == "read_file" and isinstance(result_data, dict):
return format_read_file_result(result_data)
if function_name == "write_file_diff" and isinstance(result_data, dict):
return _format_write_file_diff(result_data, raw_text)
if not isinstance(result_data, dict):
return raw_text
handler = TOOL_FORMATTERS.get(function_name)
if handler:
return handler(result_data)
summary = result_data.get("summary") or result_data.get("message")
error_msg = result_data.get("error")
parts: List[str] = []
if summary:
parts.append(str(summary))
if error_msg:
parts.append(f"⚠️ 错误: {error_msg}")
return "\n".join(parts) if parts else raw_text
def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str:
path = result_data.get("path", "目标文件")
summary = result_data.get("summary") or result_data.get("message")
completed = result_data.get("completed") or []
failed_blocks = result_data.get("failed") or []
success_blocks = result_data.get("blocks") or []
lines = [f"[文件补丁] {path}"]
if summary:
lines.append(summary)
if completed:
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
if failed_blocks:
fail_descriptions = []
for item in failed_blocks[:3]:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
fail_descriptions.append(f"#{idx}: {reason}")
lines.append("⚠️ 失败块: " + "".join(fail_descriptions))
if len(failed_blocks) > 3:
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
# 通用排查提示:把最常见的坑点一次性说清楚,减少来回沟通成本
lines.append("🔎 排查提示(常见易错点):")
lines.append("- 是否把“要新增/要删除/要替换”的每一行都标了 `+` 或 `-`?(漏标会被当成上下文/锚点)")
lines.append("- 空行也要写成单独一行的 `+`(只有 `+` 和换行),否则空行会消失或被当成上下文导致匹配失败。")
lines.append("- 若目标文件是空文件:应使用“仅追加”写法(块内只有 `+` 行,不要混入未加前缀的正文)。")
lines.append("- 若希望在文件中间插入/替换:必须提供足够的上下文行(以空格开头)或删除行(`-`)来锚定位置,不能只贴 `+`。")
lines.append("- 是否存在空格/Tab/缩进差异、全角半角标点差异、大小写差异?上下文与原文必须字节级一致。")
lines.append("- 是否是 CRLF(\\r\\n) 与 LF(\\n) 混用导致原文匹配失败?可先用终端查看/统一换行后再补丁。")
lines.append("- 是否遗漏 `*** Begin Patch`/`*** End Patch` 或在第一个 `@@` 之前写了其它内容?")
detail_sections: List[str] = []
for item in failed_blocks:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
hint = item.get("hint")
block_patch = item.get("block_patch") or item.get("patch")
# 自动判别常见错误形态,便于快速定位问题
diagnostics = _classify_diff_block_issue(item, result_data)
if not block_patch:
old_text = item.get("old_text") or ""
new_text = item.get("new_text") or ""
synthetic_lines: List[str] = []
if old_text:
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
if new_text:
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
if synthetic_lines:
block_patch = "\n".join(synthetic_lines)
detail_sections.append(f"- #{idx}: {reason}")
if diagnostics:
detail_sections.append(f" 错误类型: {diagnostics}")
if hint:
detail_sections.append(f" 提示: {hint}")
if block_patch:
detail_sections.append("```diff")
detail_sections.append(block_patch.rstrip("\n"))
detail_sections.append("```")
detail_sections.append("")
if detail_sections and detail_sections[-1] == "":
detail_sections.pop()
if detail_sections:
lines.append("⚠️ 失败块详情:")
lines.extend(detail_sections)
# 对“成功块”做轻量体检:如果检测到潜在格式风险,给出风险提示(不影响 success 判定)
risk_sections: List[str] = []
for item in success_blocks:
if not isinstance(item, dict):
continue
status = item.get("status")
idx = item.get("index")
if status != "success":
continue
diag = _classify_diff_block_issue(item, result_data)
if diag:
risk_sections.append(f"- #{idx}: {diag}")
if risk_sections:
lines.append("⚠️ 风险提示(补丁虽成功但格式可能有隐患):")
lines.extend(risk_sections)
if result_data.get("success") is False and result_data.get("error"):
lines.append(f"⚠️ 错误: {result_data.get('error')}")
formatted = "\n".join(line for line in lines if line)
return formatted or raw_text
def _format_create_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_file", result_data)
return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}"
def _classify_diff_block_issue(block: Dict[str, Any], result_data: Dict[str, Any]) -> str:
"""
针对 write_file_diff 常见的“离谱/易错”用法做启发式判别,返回简短错误类型说明。
不改变后端逻辑,只用于提示。
"""
patch_text = block.get("block_patch") or block.get("patch") or ""
lines = patch_text.splitlines()
plus = sum(1 for ln in lines if ln.startswith("+"))
minus = sum(1 for ln in lines if ln.startswith("-"))
context = sum(1 for ln in lines if ln.startswith(" "))
total = len([ln for ln in lines if ln.strip() != ""])
reasons: List[str] = []
# 1) 完全没加 + / - :最常见的“把目标当上下文”
if total > 0 and plus == 0 and minus == 0:
reasons.append("缺少 + / -,整块被当作上下文,无法定位到文件")
# 2) 全是 + 且没有上下文/删除:解析为“纯追加”,若目标非末尾插入会失败
if plus > 0 and minus == 0 and context == 0:
reasons.append("仅包含 + 行,被视为追加块;若想中间插入/替换需提供上下文或 -")
# 3) 没有上下文或删除行却不是 append_only多数是漏写空格前缀
if block.get("append_only") and (context > 0 or minus > 0):
reasons.append("块被解析为追加模式,但混入了上下文/删除行,可能写法不一致")
# 4) 未找到匹配时,提示检查空格/缩进/全角半角/换行差异
reason_text = (block.get("reason") or "").lower()
if "未找到匹配" in reason_text:
reasons.append("上下文未匹配:检查空格/缩进、全角半角、CRLF/LF、大小写是否与原文完全一致")
# 5) 空行未加 '+' 的典型情形:
# a) 有空白行但整块没有前缀(此前已由 #1 捕获),仍补充提示
# b) 有空白行且块中存在 + / -,说明空行漏写前缀,易导致上下文匹配失败
if any(ln == "" or ln.strip() == "" for ln in lines):
if plus == 0 and minus == 0:
reasons.append("空行未写 `+`,被当作上下文,建议空行写成单独一行 `+`")
else:
reasons.append("空行未写 `+`(或空格上下文),混入补丁时会被当作上下文,建议空行单独写成 `+`")
return "".join(reasons)
def _format_delete_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("delete_file", result_data)
path = result_data.get("path") or "未知路径"
action = result_data.get("action") or "deleted"
return f"{action}文件: {path}"
def _format_rename_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("rename_file", result_data)
old_path = result_data.get("old_path") or "旧路径未知"
new_path = result_data.get("new_path") or "新路径未知"
return f"已重命名: {old_path} -> {new_path}"
def _format_create_folder(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_folder", result_data)
return f"已创建文件夹: {result_data.get('path', '未知路径')}"
def _format_focus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("focus_file", result_data)
message = result_data.get("message") or "文件已聚焦"
size = result_data.get("file_size")
size_note = f"{size} 字符)" if isinstance(size, int) else ""
focused = result_data.get("focused_files") or []
focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件"
return f"{message}{size_note}\n{focused_note}"
def _format_unfocus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("unfocus_file", result_data)
message = result_data.get("message") or "已取消聚焦"
remaining = result_data.get("remaining_focused") or []
remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件"
return f"{message}\n{remain_note}"
def _format_terminal_session(result_data: Dict[str, Any]) -> str:
action = result_data.get("action") or result_data.get("terminal_action") or "未知操作"
tag = f"terminal_session[{action}]"
if not result_data.get("success"):
return _format_failure(tag, result_data)
if action == "open":
return (
f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}"
f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)"
)
if action == "close":
new_active = result_data.get("new_active") or ""
remaining = result_data.get("remaining_sessions") or []
return (
f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}"
f"剩余会话: {', '.join(remaining) if remaining else ''}"
)
if action == "switch":
previous = result_data.get("previous") or ""
current = result_data.get("current") or "未知"
return f"终端已从 {previous} 切换到 {current}"
if action == "list":
sessions = result_data.get("sessions") or []
total = result_data.get("total", len(sessions))
max_allowed = result_data.get("max_allowed")
active = result_data.get("active") or ""
header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}"
session_lines = []
for session in sessions:
name = session.get("session_name") or session.get("name") or "未命名"
state = "运行中" if session.get("is_running") else "已停止"
marker = "" if session.get("is_active") else " "
working_dir = session.get("working_dir") or "未知目录"
session_lines.append(f"{marker} {name} | {state} | {working_dir}")
return "\n".join([header] + session_lines) if session_lines else header
return result_data.get("message") or f"{tag} 操作已完成。"
def _plain_command_output(result_data: Dict[str, Any]) -> str:
"""生成纯文本输出,按需要加状态前缀。"""
output = result_data.get("output") or ""
status = (result_data.get("status") or "").lower()
timeout = result_data.get("timeout")
return_code = result_data.get("return_code")
truncated = result_data.get("truncated")
error = result_data.get("error")
message = result_data.get("message")
prefixes = []
if status in {"timeout"} and timeout:
prefixes.append(f"[timeout after {int(timeout)}s]")
elif status in {"timeout"}:
prefixes.append("[timeout]")
elif status in {"killed"}:
prefixes.append("[killed]")
elif status in {"awaiting_input"}:
prefixes.append("[awaiting_input]")
elif status in {"no_output"} and not output:
prefixes.append("[no_output]")
elif status in {"error"} and return_code is not None:
prefixes.append(f"[error rc={return_code}]")
elif status in {"error"}:
prefixes.append("[error]")
if truncated:
prefixes.append("[truncated]")
# 如果执行失败且没有输出,优先显示错误信息
if not result_data.get("success") and not output:
err_text = error or message
if err_text:
prefix_text = "".join(prefixes) if prefixes else "[error]"
return f"{prefix_text} {err_text}" if prefix_text else err_text
# 没有错误文本则仍走后面的输出逻辑(可能显示 no_output
prefix_text = "".join(prefixes)
if prefix_text and output:
return f"{prefix_text}\n{output}"
if prefix_text:
return prefix_text
if not output:
return "[no_output]"
return output
def _format_terminal_input(result_data: Dict[str, Any]) -> str:
return _plain_command_output(result_data)
def _format_sleep(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("sleep", result_data)
reason = result_data.get("reason")
timestamp = result_data.get("timestamp")
message = result_data.get("message") or "等待完成"
parts = [message]
if reason:
parts.append(f"原因:{reason}")
if timestamp:
parts.append(f"时间:{timestamp}")
return "".join(parts)
def _format_run_command(result_data: Dict[str, Any]) -> str:
text = _plain_command_output(result_data)
if (result_data.get("status") or "").lower() == "timeout":
suggestion = "建议在持久终端中直接运行该命令terminal_session + terminal_input或缩短命令执行时间。"
text = f"{text}\n{suggestion}" if text else suggestion
return text
def _format_run_python(result_data: Dict[str, Any]) -> str:
text = _plain_command_output(result_data)
if (result_data.get("status") or "").lower() == "timeout":
suggestion = "建议将代码保存为脚本后在持久终端中执行terminal_session + terminal_input或拆分/优化代码以缩短运行时间。"
text = f"{text}\n{suggestion}" if text else suggestion
return text
def _format_todo_create(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_create", result_data)
todo = (result_data.get("todo_list") or {}).copy()
overview = todo.get("overview") or "未命名任务"
tasks = _summarize_todo_tasks(todo)
return f"已创建 TODO{overview}\n{tasks}"
def _format_todo_update_task(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_update_task", result_data)
message = result_data.get("message") or "任务状态已更新"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
def _format_todo_finish(result_data: Dict[str, Any]) -> str:
if result_data.get("success"):
message = result_data.get("message") or "待办列表已结束"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
if result_data.get("requires_confirmation"):
remaining = result_data.get("remaining") or []
remain_note = ", ".join(remaining) if remaining else "未知"
return f"仍有未完成任务({remain_note}),需要确认是否提前结束。"
return _format_failure("todo_finish", result_data)
def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_finish_confirm", result_data)
message = result_data.get("message") or "已处理 TODO 完结确认"
todo = result_data.get("todo_list") or {}
if todo.get("forced_finish"):
reason = todo.get("forced_reason") or "未提供原因"
message = f"{message}(强制结束,原因:{reason}"
return message
def _format_update_memory(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("update_memory", result_data)
mem_type = result_data.get("memory_type") or "main"
operation = result_data.get("operation") or "write"
label = "主记忆" if mem_type == "main" else "任务记忆"
idx = result_data.get("index")
count = result_data.get("count")
if operation == "append":
suffix = f"(共 {count} 条)" if count is not None else ""
return f"{label}已追加新条目{suffix}"
if operation == "replace":
return f"{label}{idx} 条已替换。"
if operation == "delete":
suffix = f"(剩余 {count} 条)" if count is not None else ""
return f"{label}{idx} 条已删除{suffix}"
return f"{label}已更新。"
def _format_create_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_sub_agent", result_data)
agent_id = result_data.get("agent_id")
task_id = result_data.get("task_id")
status = result_data.get("status")
refs = result_data.get("copied_references") or []
ref_note = f",附带 {len(refs)} 份参考文件" if refs else ""
deliver_dir = result_data.get("deliverables_dir")
deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else ""
return f"子智能体 #{agent_id} 已创建task_id={task_id},状态 {status}{ref_note}{deliver_note})。"
def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str:
task_id = result_data.get("task_id")
agent_id = result_data.get("agent_id")
status = result_data.get("status")
if result_data.get("success"):
copied_path = result_data.get("copied_path") or result_data.get("deliverables_path")
message = result_data.get("message") or "子智能体任务已完成。"
deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成"
return f"子智能体 #{agent_id}/{task_id} 完成:{message}{deliver_note}"
message = result_data.get("message") or result_data.get("error") or "子智能体任务失败"
return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}"
def _format_close_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("close_sub_agent", result_data)
message = result_data.get("message") or "子智能体已关闭。"
task_id = result_data.get("task_id")
status = result_data.get("status")
status_note = f"(状态 {status}" if status else ""
return f"{message}{status_note}task_id={task_id}"
def _format_failure(tag: str, result_data: Dict[str, Any]) -> str:
error = result_data.get("error") or result_data.get("message") or "未知错误"
suggestion = result_data.get("suggestion")
details = result_data.get("details")
parts = [f"⚠️ {tag} 失败: {error}"]
if suggestion:
parts.append(f"建议:{suggestion}")
elif isinstance(details, str) and details:
parts.append(f"详情:{details}")
elif isinstance(details, dict):
detail_msg = details.get("message") or details.get("error")
if detail_msg:
parts.append(f"详情:{detail_msg}")
return "".join(parts)
def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str:
if not output:
return "无可见输出"
lines = output.splitlines()
line_count = len(lines)
char_count = len(output)
meta = f"输出 {line_count} 行 / {char_count} 字符"
if truncated:
meta += "(已截断)"
return f"{meta}\n```\n{output}\n```"
def _format_command_result(label: str, result_data: Dict[str, Any]) -> str:
command = result_data.get("command") or ""
return_code = result_data.get("return_code")
success = result_data.get("success")
status = result_data.get("status")
output = result_data.get("output")
truncated = result_data.get("truncated")
message = result_data.get("message")
if success:
header = f"{label}: `{command}`" if command else label
if return_code is not None and return_code != "":
header += f" (return_code={return_code})"
lines = [header]
if status and status not in {"completed", "success"}:
lines.append(f"终端状态: {status}")
if message:
lines.append(message)
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
error_msg = result_data.get("error") or message or "执行失败"
header = f"⚠️ {label} 失败"
if command:
header += f"(命令 `{command}`"
lines = [f"{header}: {error_msg}"]
if return_code not in {None, ""}:
lines.append(f"返回码: {return_code}")
if output:
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str:
if not isinstance(todo, dict):
return ""
tasks = todo.get("tasks") or []
parts = []
for task in tasks:
status_icon = "" if task.get("status") == "done" else "⬜️"
parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}")
return "".join(parts)
TOOL_FORMATTERS = {
"create_file": _format_create_file,
"delete_file": _format_delete_file,
"rename_file": _format_rename_file,
"create_folder": _format_create_folder,
"focus_file": _format_focus_file,
"unfocus_file": _format_unfocus_file,
"terminal_session": _format_terminal_session,
"terminal_input": _format_terminal_input,
"sleep": _format_sleep,
"run_command": _format_run_command,
"run_python": _format_run_python,
"todo_create": _format_todo_create,
"todo_update_task": _format_todo_update_task,
"todo_finish": _format_todo_finish,
"todo_finish_confirm": _format_todo_finish_confirm,
"update_memory": _format_update_memory,
"create_sub_agent": _format_create_sub_agent,
"wait_sub_agent": _format_wait_sub_agent,
"close_sub_agent": _format_close_sub_agent,
}