agent-Specialization/utils/tool_result_formatter.py

454 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""将工具执行结果转换为对话上下文可用的纯文本摘要。"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
def format_read_file_result(result_data: Dict[str, Any]) -> str:
"""格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。"""
if not isinstance(result_data, dict):
return str(result_data)
if not result_data.get("success"):
return _format_failure("read_file", result_data)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = (
f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} "
f"{max_note}{truncated_note}"
).strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts: List[str] = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts: List[str] = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return _format_failure("read_file", {"error": "不支持的读取模式"})
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str:
"""根据工具名称输出纯文本摘要,必要时附加关键信息。"""
if function_name == "read_file" and isinstance(result_data, dict):
return format_read_file_result(result_data)
if function_name == "write_file_diff" and isinstance(result_data, dict):
return _format_write_file_diff(result_data, raw_text)
if not isinstance(result_data, dict):
return raw_text
handler = TOOL_FORMATTERS.get(function_name)
if handler:
return handler(result_data)
summary = result_data.get("summary") or result_data.get("message")
error_msg = result_data.get("error")
parts: List[str] = []
if summary:
parts.append(str(summary))
if error_msg:
parts.append(f"⚠️ 错误: {error_msg}")
return "\n".join(parts) if parts else raw_text
def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str:
path = result_data.get("path", "目标文件")
summary = result_data.get("summary") or result_data.get("message")
completed = result_data.get("completed") or []
failed_blocks = result_data.get("failed") or []
lines = [f"[文件补丁] {path}"]
if summary:
lines.append(summary)
if completed:
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
if failed_blocks:
fail_descriptions = []
for item in failed_blocks[:3]:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
fail_descriptions.append(f"#{idx}: {reason}")
lines.append("⚠️ 失败块: " + "".join(fail_descriptions))
if len(failed_blocks) > 3:
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
detail_sections: List[str] = []
for item in failed_blocks:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
block_patch = item.get("block_patch") or item.get("patch")
if not block_patch:
old_text = item.get("old_text") or ""
new_text = item.get("new_text") or ""
synthetic_lines: List[str] = []
if old_text:
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
if new_text:
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
if synthetic_lines:
block_patch = "\n".join(synthetic_lines)
detail_sections.append(f"- #{idx}: {reason}")
if block_patch:
detail_sections.append("```diff")
detail_sections.append(block_patch.rstrip("\n"))
detail_sections.append("```")
detail_sections.append("")
if detail_sections and detail_sections[-1] == "":
detail_sections.pop()
if detail_sections:
lines.append("⚠️ 失败块详情:")
lines.extend(detail_sections)
if result_data.get("success") is False and result_data.get("error"):
lines.append(f"⚠️ 错误: {result_data.get('error')}")
formatted = "\n".join(line for line in lines if line)
return formatted or raw_text
def _format_create_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_file", result_data)
return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}"
def _format_delete_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("delete_file", result_data)
path = result_data.get("path") or "未知路径"
action = result_data.get("action") or "deleted"
return f"{action}文件: {path}"
def _format_rename_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("rename_file", result_data)
old_path = result_data.get("old_path") or "旧路径未知"
new_path = result_data.get("new_path") or "新路径未知"
return f"已重命名: {old_path} -> {new_path}"
def _format_create_folder(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_folder", result_data)
return f"已创建文件夹: {result_data.get('path', '未知路径')}"
def _format_focus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("focus_file", result_data)
message = result_data.get("message") or "文件已聚焦"
size = result_data.get("file_size")
size_note = f"{size} 字符)" if isinstance(size, int) else ""
focused = result_data.get("focused_files") or []
focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件"
return f"{message}{size_note}\n{focused_note}"
def _format_unfocus_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("unfocus_file", result_data)
message = result_data.get("message") or "已取消聚焦"
remaining = result_data.get("remaining_focused") or []
remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件"
return f"{message}\n{remain_note}"
def _format_terminal_session(result_data: Dict[str, Any]) -> str:
action = result_data.get("action") or result_data.get("terminal_action") or "未知操作"
tag = f"terminal_session[{action}]"
if not result_data.get("success"):
return _format_failure(tag, result_data)
if action == "open":
return (
f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}"
f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)"
)
if action == "close":
new_active = result_data.get("new_active") or ""
remaining = result_data.get("remaining_sessions") or []
return (
f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}"
f"剩余会话: {', '.join(remaining) if remaining else ''}"
)
if action == "switch":
previous = result_data.get("previous") or ""
current = result_data.get("current") or "未知"
return f"终端已从 {previous} 切换到 {current}"
if action == "list":
sessions = result_data.get("sessions") or []
total = result_data.get("total", len(sessions))
max_allowed = result_data.get("max_allowed")
active = result_data.get("active") or ""
header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}"
session_lines = []
for session in sessions:
name = session.get("session_name") or session.get("name") or "未命名"
state = "运行中" if session.get("is_running") else "已停止"
marker = "" if session.get("is_active") else " "
working_dir = session.get("working_dir") or "未知目录"
session_lines.append(f"{marker} {name} | {state} | {working_dir}")
return "\n".join([header] + session_lines) if session_lines else header
return result_data.get("message") or f"{tag} 操作已完成。"
def _format_terminal_input(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("terminal_input", result_data)
session = result_data.get("session") or result_data.get("session_name") or "default"
command = result_data.get("command") or "(命令缺失)"
status = result_data.get("status") or "completed"
message = result_data.get("message") or ""
lines = [
f"terminal_input: 在 {session} 执行 `{command}`,状态 {status}",
]
if message:
lines.append(message)
lines.append(_summarize_output_block(result_data.get("output"), result_data.get("truncated")))
return "\n".join(lines)
def _format_sleep(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("sleep", result_data)
reason = result_data.get("reason")
timestamp = result_data.get("timestamp")
message = result_data.get("message") or "等待完成"
parts = [message]
if reason:
parts.append(f"原因:{reason}")
if timestamp:
parts.append(f"时间:{timestamp}")
return "".join(parts)
def _format_run_command(result_data: Dict[str, Any]) -> str:
return _format_command_result("run_command", result_data)
def _format_run_python(result_data: Dict[str, Any]) -> str:
base = _format_command_result("run_python", result_data)
code = result_data.get("code")
if not isinstance(code, str):
return base
header = f"run_python: 执行临时代码({len(code)} 字符)"
return "\n".join([header, base])
def _format_todo_create(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_create", result_data)
todo = (result_data.get("todo_list") or {}).copy()
overview = todo.get("overview") or "未命名任务"
tasks = _summarize_todo_tasks(todo)
return f"已创建 TODO{overview}\n{tasks}"
def _format_todo_update_task(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_update_task", result_data)
message = result_data.get("message") or "任务状态已更新"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
def _format_todo_finish(result_data: Dict[str, Any]) -> str:
if result_data.get("success"):
message = result_data.get("message") or "待办列表已结束"
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
return f"{message}\n{tasks}" if tasks else message
if result_data.get("requires_confirmation"):
remaining = result_data.get("remaining") or []
remain_note = ", ".join(remaining) if remaining else "未知"
return f"仍有未完成任务({remain_note}),需要确认是否提前结束。"
return _format_failure("todo_finish", result_data)
def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_finish_confirm", result_data)
message = result_data.get("message") or "已处理 TODO 完结确认"
todo = result_data.get("todo_list") or {}
if todo.get("forced_finish"):
reason = todo.get("forced_reason") or "未提供原因"
message = f"{message}(强制结束,原因:{reason}"
return message
def _format_update_memory(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("update_memory", result_data)
mem_type = result_data.get("memory_type") or "main"
operation = result_data.get("operation") or "write"
verb = "追加" if operation == "append" else "覆盖"
label = "主记忆" if mem_type == "main" else "任务记忆"
return f"{label}{verb}完成。"
def _format_create_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_sub_agent", result_data)
agent_id = result_data.get("agent_id")
task_id = result_data.get("task_id")
status = result_data.get("status")
refs = result_data.get("copied_references") or []
ref_note = f",附带 {len(refs)} 份参考文件" if refs else ""
deliver_dir = result_data.get("deliverables_dir")
deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else ""
return f"子智能体 #{agent_id} 已创建task_id={task_id},状态 {status}{ref_note}{deliver_note})。"
def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str:
task_id = result_data.get("task_id")
agent_id = result_data.get("agent_id")
status = result_data.get("status")
if result_data.get("success"):
copied_path = result_data.get("copied_path") or result_data.get("deliverables_path")
message = result_data.get("message") or "子智能体任务已完成。"
deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成"
return f"子智能体 #{agent_id}/{task_id} 完成:{message}{deliver_note}"
message = result_data.get("message") or result_data.get("error") or "子智能体任务失败"
return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}"
def _format_close_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("close_sub_agent", result_data)
message = result_data.get("message") or "子智能体已关闭。"
task_id = result_data.get("task_id")
status = result_data.get("status")
status_note = f"(状态 {status}" if status else ""
return f"{message}{status_note}task_id={task_id}"
def _format_failure(tag: str, result_data: Dict[str, Any]) -> str:
error = result_data.get("error") or result_data.get("message") or "未知错误"
suggestion = result_data.get("suggestion")
details = result_data.get("details")
parts = [f"⚠️ {tag} 失败: {error}"]
if suggestion:
parts.append(f"建议:{suggestion}")
elif isinstance(details, str) and details:
parts.append(f"详情:{details}")
elif isinstance(details, dict):
detail_msg = details.get("message") or details.get("error")
if detail_msg:
parts.append(f"详情:{detail_msg}")
return "".join(parts)
def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str:
if not output:
return "无可见输出"
lines = output.splitlines()
line_count = len(lines)
char_count = len(output)
meta = f"输出 {line_count} 行 / {char_count} 字符"
if truncated:
meta += "(已截断)"
return f"{meta}\n```\n{output}\n```"
def _format_command_result(label: str, result_data: Dict[str, Any]) -> str:
command = result_data.get("command") or ""
return_code = result_data.get("return_code")
success = result_data.get("success")
status = result_data.get("status")
output = result_data.get("output")
truncated = result_data.get("truncated")
message = result_data.get("message")
if success:
header = f"{label}: `{command}`" if command else label
if return_code is not None and return_code != "":
header += f" (return_code={return_code})"
lines = [header]
if status and status not in {"completed", "success"}:
lines.append(f"终端状态: {status}")
if message:
lines.append(message)
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
error_msg = result_data.get("error") or message or "执行失败"
header = f"⚠️ {label} 失败"
if command:
header += f"(命令 `{command}`"
lines = [f"{header}: {error_msg}"]
if return_code not in {None, ""}:
lines.append(f"返回码: {return_code}")
if output:
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str:
if not isinstance(todo, dict):
return ""
tasks = todo.get("tasks") or []
parts = []
for task in tasks:
status_icon = "" if task.get("status") == "done" else "⬜️"
parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}")
return "".join(parts)
TOOL_FORMATTERS = {
"create_file": _format_create_file,
"delete_file": _format_delete_file,
"rename_file": _format_rename_file,
"create_folder": _format_create_folder,
"focus_file": _format_focus_file,
"unfocus_file": _format_unfocus_file,
"terminal_session": _format_terminal_session,
"terminal_input": _format_terminal_input,
"sleep": _format_sleep,
"run_command": _format_run_command,
"run_python": _format_run_python,
"todo_create": _format_todo_create,
"todo_update_task": _format_todo_update_task,
"todo_finish": _format_todo_finish,
"todo_finish_confirm": _format_todo_finish_confirm,
"update_memory": _format_update_memory,
"create_sub_agent": _format_create_sub_agent,
"wait_sub_agent": _format_wait_sub_agent,
"close_sub_agent": _format_close_sub_agent,
}