agent-Specialization/utils/tool_result_formatter.py
JOJO 5ab3acef9c feat: 添加 terminal-guide 和 sub-agent-guide skills
- 新增 terminal-guide skill: 持久化终端使用指南
- 新增 sub-agent-guide skill: 子智能体使用指南
- 优化终端工具定义和执行逻辑
- 更新系统提示词以引用新 skills
- 添加 utils/__init__.py 模块初始化文件

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-16 21:17:02 +08:00

796 lines
35 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""将工具执行结果转换为对话上下文可用的纯文本摘要。"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
def format_read_file_result(result_data: Dict[str, Any]) -> str:
"""格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。"""
if not isinstance(result_data, dict):
return str(result_data)
if not result_data.get("success"):
return _format_failure("read_file", result_data)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = (
f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} "
f"{max_note}{truncated_note}"
).strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts: List[str] = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts: List[str] = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return _format_failure("read_file", {"error": "不支持的读取模式"})
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str:
"""根据工具名称输出纯文本摘要,必要时附加关键信息。"""
# 自定义工具统一格式
if isinstance(result_data, dict) and result_data.get("custom_tool"):
tool_id = result_data.get("tool_id") or function_name
success = result_data.get("success")
status = "成功" if success else "失败"
message = result_data.get("message") or result_data.get("summary") or ""
output = result_data.get("output") or ""
parts = [f"[自定义工具] {tool_id} 执行{status}"]
if message:
parts.append(str(message))
if output:
parts.append(str(output))
return "\n".join(parts).strip() or raw_text
if function_name == "read_file" and isinstance(result_data, dict):
return format_read_file_result(result_data)
if function_name == "write_file_diff" and isinstance(result_data, dict):
return _format_write_file_diff(result_data, raw_text)
if not isinstance(result_data, dict):
return raw_text
handler = TOOL_FORMATTERS.get(function_name)
if handler:
return handler(result_data)
summary = result_data.get("summary") or result_data.get("message")
error_msg = result_data.get("error")
parts: List[str] = []
if summary:
parts.append(str(summary))
if error_msg:
parts.append(f"⚠️ 错误: {error_msg}")
return "\n".join(parts) if parts else raw_text
def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str:
path = result_data.get("path", "目标文件")
summary = result_data.get("summary") or result_data.get("message")
completed = result_data.get("completed") or []
failed_blocks = result_data.get("failed") or []
success_blocks = result_data.get("blocks") or []
lines = [f"[文件补丁] {path}"]
if summary:
lines.append(summary)
if completed:
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
if failed_blocks:
fail_descriptions = []
for item in failed_blocks[:3]:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
fail_descriptions.append(f"#{idx}: {reason}")
lines.append("⚠️ 失败块: " + "".join(fail_descriptions))
if len(failed_blocks) > 3:
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
# 通用排查提示:把最常见的坑点一次性说清楚,减少来回沟通成本
lines.append("🔎 排查提示(常见易错点):")
lines.append("- 是否把“要新增/要删除/要替换”的每一行都标了 `+` 或 `-`?(漏标会被当成上下文/锚点)")
lines.append("- 空行也要写成单独一行的 `+`(只有 `+` 和换行),否则空行会消失或被当成上下文导致匹配失败。")
lines.append("- 若目标文件是空文件:应使用“仅追加”写法(块内只有 `+` 行,不要混入未加前缀的正文)。")
lines.append("- 若希望在文件中间插入/替换:必须提供足够的上下文行(以空格开头)或删除行(`-`)来锚定位置,不能只贴 `+`。")
lines.append("- 是否存在空格/Tab/缩进差异、全角半角标点差异、大小写差异?上下文与原文必须字节级一致。")
lines.append("- 是否是 CRLF(\\r\\n) 与 LF(\\n) 混用导致原文匹配失败?可先用终端查看/统一换行后再补丁。")
lines.append("- 是否遗漏 `*** Begin Patch`/`*** End Patch` 或在第一个 `@@` 之前写了其它内容?")
detail_sections: List[str] = []
for item in failed_blocks:
idx = item.get("index")
reason = item.get("reason") or item.get("error") or "未说明原因"
hint = item.get("hint")
block_patch = item.get("block_patch") or item.get("patch")
# 自动判别常见错误形态,便于快速定位问题
diagnostics = _classify_diff_block_issue(item, result_data)
if not block_patch:
old_text = item.get("old_text") or ""
new_text = item.get("new_text") or ""
synthetic_lines: List[str] = []
if old_text:
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
if new_text:
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
if synthetic_lines:
block_patch = "\n".join(synthetic_lines)
detail_sections.append(f"- #{idx}: {reason}")
if diagnostics:
detail_sections.append(f" 错误类型: {diagnostics}")
if hint:
detail_sections.append(f" 提示: {hint}")
if block_patch:
detail_sections.append("```diff")
detail_sections.append(block_patch.rstrip("\n"))
detail_sections.append("```")
detail_sections.append("")
if detail_sections and detail_sections[-1] == "":
detail_sections.pop()
if detail_sections:
lines.append("⚠️ 失败块详情:")
lines.extend(detail_sections)
# 对“成功块”做轻量体检:如果检测到潜在格式风险,给出风险提示(不影响 success 判定)
risk_sections: List[str] = []
for item in success_blocks:
if not isinstance(item, dict):
continue
status = item.get("status")
idx = item.get("index")
if status != "success":
continue
diag = _classify_diff_block_issue(item, result_data)
if diag:
risk_sections.append(f"- #{idx}: {diag}")
if risk_sections:
lines.append("⚠️ 风险提示(补丁虽成功但格式可能有隐患):")
lines.extend(risk_sections)
if result_data.get("success") is False and result_data.get("error"):
lines.append(f"⚠️ 错误: {result_data.get('error')}")
formatted = "\n".join(line for line in lines if line)
return formatted or raw_text
def _format_create_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_file", result_data)
return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}"
def _format_write_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("write_file", result_data)
path = result_data.get("path") or "未知路径"
mode = str(result_data.get("mode") or "w")
action = "追加" if mode == "a" else "覆盖"
size = result_data.get("size")
message = result_data.get("message")
parts = [f"{action}写入: {path}"]
if size is not None:
parts.append(f"{size} 字节")
if message:
parts.append(str(message))
return "".join(parts)
def _format_edit_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("edit_file", result_data)
path = result_data.get("path") or "目标文件"
count = result_data.get("replacements")
msg = result_data.get("message")
replaced_note = f"替换 {count}" if isinstance(count, int) else "已完成替换"
parts = [f"{replaced_note}: {path}"]
if msg:
parts.append(str(msg))
return "".join(parts)
def _classify_diff_block_issue(block: Dict[str, Any], result_data: Dict[str, Any]) -> str:
"""
针对 write_file_diff 常见的“离谱/易错”用法做启发式判别,返回简短错误类型说明。
不改变后端逻辑,只用于提示。
"""
patch_text = block.get("block_patch") or block.get("patch") or ""
lines = patch_text.splitlines()
plus = sum(1 for ln in lines if ln.startswith("+"))
minus = sum(1 for ln in lines if ln.startswith("-"))
context = sum(1 for ln in lines if ln.startswith(" "))
total = len([ln for ln in lines if ln.strip() != ""])
reasons: List[str] = []
# 1) 完全没加 + / - :最常见的“把目标当上下文”
if total > 0 and plus == 0 and minus == 0:
reasons.append("缺少 + / -,整块被当作上下文,无法定位到文件")
# 2) 全是 + 且没有上下文/删除:解析为“纯追加”,若目标非末尾插入会失败
if plus > 0 and minus == 0 and context == 0:
reasons.append("仅包含 + 行,被视为追加块;若想中间插入/替换需提供上下文或 -")
# 3) 没有上下文或删除行却不是 append_only多数是漏写空格前缀
if block.get("append_only") and (context > 0 or minus > 0):
reasons.append("块被解析为追加模式,但混入了上下文/删除行,可能写法不一致")
# 4) 未找到匹配时,提示检查空格/缩进/全角半角/换行差异
reason_text = (block.get("reason") or "").lower()
if "未找到匹配" in reason_text:
reasons.append("上下文未匹配:检查空格/缩进、全角半角、CRLF/LF、大小写是否与原文完全一致")
# 5) 空行未加 '+' 的典型情形:
# a) 有空白行但整块没有前缀(此前已由 #1 捕获),仍补充提示
# b) 有空白行且块中存在 + / -,说明空行漏写前缀,易导致上下文匹配失败
if any(ln == "" or ln.strip() == "" for ln in lines):
if plus == 0 and minus == 0:
reasons.append("空行未写 `+`,被当作上下文,建议空行写成单独一行 `+`")
else:
reasons.append("空行未写 `+`(或空格上下文),混入补丁时会被当作上下文,建议空行单独写成 `+`")
return "".join(reasons)
def _format_delete_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("delete_file", result_data)
path = result_data.get("path") or "未知路径"
action = result_data.get("action") or "deleted"
return f"{action}文件: {path}"
def _format_rename_file(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("rename_file", result_data)
old_path = result_data.get("old_path") or "旧路径未知"
new_path = result_data.get("new_path") or "新路径未知"
return f"已重命名: {old_path} -> {new_path}"
def _format_create_folder(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_folder", result_data)
return f"已创建文件夹: {result_data.get('path', '未知路径')}"
def _format_terminal_session(result_data: Dict[str, Any]) -> str:
action = result_data.get("action") or result_data.get("terminal_action") or "未知操作"
tag = f"terminal_session[{action}]"
if not result_data.get("success"):
return _format_failure(tag, result_data)
if action == "open":
return (
f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}"
f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)"
)
if action == "close":
new_active = result_data.get("new_active") or ""
remaining = result_data.get("remaining_sessions") or []
return (
f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}"
f"剩余会话: {', '.join(remaining) if remaining else ''}"
)
if action == "reset":
session = result_data.get("session") or "未知会话"
working_dir = result_data.get("working_dir") or "未知目录"
return f"终端 {session} 已重置,工作目录 {working_dir}"
if action == "list":
sessions = result_data.get("sessions") or []
total = result_data.get("total", len(sessions))
max_allowed = result_data.get("max_allowed")
active = result_data.get("active") or ""
header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}"
session_lines = []
for session in sessions:
name = session.get("session_name") or session.get("name") or "未命名"
state = "运行中" if session.get("is_running") else "已停止"
marker = "" if session.get("is_active") else " "
working_dir = session.get("working_dir") or "未知目录"
session_lines.append(f"{marker} {name} | {state} | {working_dir}")
return "\n".join([header] + session_lines) if session_lines else header
return result_data.get("message") or f"{tag} 操作已完成。"
def _plain_command_output(result_data: Dict[str, Any]) -> str:
"""生成纯文本输出,按需要加状态前缀。"""
output = result_data.get("output") or ""
status = (result_data.get("status") or "").lower()
timeout = result_data.get("timeout")
if timeout is None:
timeout = result_data.get("output_wait")
return_code = result_data.get("return_code")
truncated = result_data.get("truncated")
error = result_data.get("error")
message = result_data.get("message")
prefixes = []
partial_output_note = None
partial_no_output_note = None
if status in {"timeout"}:
never_timeout = (
(isinstance(timeout, str) and timeout.lower() == "never")
or result_data.get("never_timeout")
)
if never_timeout:
elapsed_ms = result_data.get("elapsed_ms")
secs = None
if isinstance(elapsed_ms, (int, float)) and elapsed_ms > 0:
secs = max(1, int(round(elapsed_ms / 1000)))
if secs:
prefixes.append(f"[partial_output ~{secs}s]")
partial_output_note = f"已返回约{secs}秒内输出,命令可能仍在运行"
partial_no_output_note = f"已等待约{secs}秒未捕获输出,命令可能仍在运行"
else:
prefixes.append("[partial_output]")
partial_output_note = "已返回当前输出,命令可能仍在运行"
partial_no_output_note = "未捕获输出,命令可能仍在运行"
else:
timeout_seconds = None
if isinstance(timeout, (int, float)) and timeout > 0:
timeout_seconds = int(timeout)
elif isinstance(timeout, str):
stripped = timeout.strip()
if stripped.isdigit():
timeout_seconds = int(stripped)
else:
digits = "".join(ch for ch in stripped if ch.isdigit())
if digits:
timeout_seconds = int(digits)
if timeout_seconds and timeout_seconds >= 60:
prefixes.append(f"[partial_output ~{timeout_seconds}s]")
partial_output_note = f"已等待约{timeout_seconds}秒,命令可能仍在运行"
partial_no_output_note = f"已等待约{timeout_seconds}秒未捕获输出,命令可能仍在运行"
elif status in {"killed"}:
prefixes.append("[killed]")
elif status in {"awaiting_input"}:
prefixes.append("[awaiting_input]")
elif status in {"no_output"} and not output:
prefixes.append("[no_output]")
elif status in {"error"} and return_code is not None:
prefixes.append(f"[error rc={return_code}]")
elif status in {"error"}:
prefixes.append("[error]")
if truncated:
prefixes.append("[truncated]")
# 如果执行失败且没有输出,优先显示错误信息
if not result_data.get("success") and not output:
err_text = partial_no_output_note or error or message
if err_text:
prefix_text = "".join(prefixes) if prefixes else "[error]"
return f"{prefix_text} {err_text}" if prefix_text else err_text
# 没有错误文本则仍走后面的输出逻辑(可能显示 no_output
prefix_text = "".join(prefixes)
if prefix_text and output:
if partial_output_note:
prefix_text = f"{prefix_text} {partial_output_note}"
return f"{prefix_text}\n{output}"
if prefix_text:
if partial_no_output_note:
prefix_text = f"{prefix_text} {partial_no_output_note}"
return prefix_text
if not output:
return "[no_output]"
return output
def _format_terminal_input(result_data: Dict[str, Any]) -> str:
text = _plain_command_output(result_data)
timeout_hint = result_data.get("timeout_hint")
if timeout_hint == "suggest_adjust_timeout":
suggestion = "请根据指令和输出结果判断是否需要提高等待输出时长或修改指令输入"
return f"{text}\n{suggestion}" if text else suggestion
if timeout_hint == "suggest_never_timeout":
suggestion = "请考虑设置 timeout 为 never让终端持续执行该命令注意 在该命令彻底执行完成前该终端会被占用,处于不可输入的状态)"
return f"{text}\n{suggestion}" if text else suggestion
return text
def _format_sleep(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("sleep", result_data)
reason = result_data.get("reason")
timestamp = result_data.get("timestamp")
message = result_data.get("message") or "等待完成"
parts = [message]
if reason:
parts.append(f"原因:{reason}")
if timestamp:
parts.append(f"时间:{timestamp}")
return "".join(parts)
def _format_run_command(result_data: Dict[str, Any]) -> str:
text = _plain_command_output(result_data)
if (result_data.get("status") or "").lower() == "timeout":
suggestion = "建议在持久终端中直接运行该命令terminal_session + terminal_input或缩短命令执行时间。"
text = f"{text}\n{suggestion}" if text else suggestion
return text
def _format_run_python(result_data: Dict[str, Any]) -> str:
text = _plain_command_output(result_data)
if (result_data.get("status") or "").lower() == "timeout":
suggestion = "建议将代码保存为脚本后在持久终端中执行terminal_session + terminal_input或拆分/优化代码以缩短运行时间。"
text = f"{text}\n{suggestion}" if text else suggestion
return text
def _format_todo_create(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_create", result_data)
todo = (result_data.get("todo_list") or {}).copy()
overview = todo.get("overview") or "未命名任务"
total = len(todo.get("tasks") or [])
return f"已创建 TODO{overview}(共 {total} 项)"
def _format_todo_update_task(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("todo_update_task", result_data)
message = result_data.get("message") or "任务状态已更新"
todo = result_data.get("todo_list") or {}
tasks = todo.get("tasks") or []
total = len(tasks)
done = sum(1 for t in tasks if t.get("status") == "done")
progress_note = f"进度 {done}/{total}" if total else ""
return f"{message}{progress_note}".strip("")
def _format_update_memory(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("update_memory", result_data)
operation = result_data.get("operation") or "write"
idx = result_data.get("index")
count = result_data.get("count")
if operation == "append":
suffix = f"(共 {count} 条)" if count is not None else ""
return f"记忆已追加新条目{suffix}"
if operation == "replace":
return f"记忆第 {idx} 条已替换。"
if operation == "delete":
suffix = f"(剩余 {count} 条)" if count is not None else ""
return f"记忆第 {idx} 条已删除{suffix}"
return f"记忆已更新。"
def _format_sub_agent_stats(stats: Optional[Dict[str, Any]]) -> str:
if not isinstance(stats, dict):
return ""
def _to_int(value: Any) -> int:
try:
return max(0, int(value))
except (TypeError, ValueError):
return 0
api_calls = _to_int(stats.get("api_calls") or stats.get("api_call_count") or stats.get("turn_count"))
files_read = _to_int(stats.get("files_read"))
edit_files = _to_int(stats.get("edit_files"))
searches = _to_int(stats.get("searches"))
web_pages = _to_int(stats.get("web_pages"))
commands = _to_int(stats.get("commands"))
lines = [
f"调用了{api_calls}",
f"阅读了{files_read}次文件",
f"编辑了{edit_files}次文件",
f"搜索了{searches}次内容",
f"查看了{web_pages}个网页",
f"运行了{commands}个指令",
]
return "\n".join(lines)
def _format_create_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("create_sub_agent", result_data)
agent_id = result_data.get("agent_id")
task_id = result_data.get("task_id")
status = result_data.get("status")
refs = result_data.get("copied_references") or []
ref_note = f",附带 {len(refs)} 份参考文件" if refs else ""
deliver_dir = result_data.get("deliverables_dir")
deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else ""
header = f"子智能体 #{agent_id} 已创建task_id={task_id},状态 {status}{ref_note}{deliver_note})。"
stats_text = _format_sub_agent_stats(
result_data.get("stats") or (result_data.get("final_result") or {}).get("stats")
)
summary = result_data.get("message") or result_data.get("summary")
elapsed_seconds = result_data.get("runtime_seconds")
if elapsed_seconds is None:
elapsed_seconds = result_data.get("elapsed_seconds")
lines = [header]
if stats_text:
lines.append(stats_text)
if status == "completed" and isinstance(elapsed_seconds, (int, float)):
lines.append(f"运行了{int(round(elapsed_seconds))}")
if summary and status in {"completed", "failed", "timeout", "terminated"}:
lines.append(str(summary))
return "\n".join(lines)
def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str:
task_id = result_data.get("task_id")
agent_id = result_data.get("agent_id")
status = result_data.get("status")
stats_value = result_data.get("stats")
if not isinstance(stats_value, dict) and status == "timeout":
stats_value = {}
stats_text = _format_sub_agent_stats(stats_value)
elapsed_seconds = result_data.get("runtime_seconds")
if elapsed_seconds is None:
elapsed_seconds = result_data.get("elapsed_seconds")
if result_data.get("success"):
copied_path = result_data.get("copied_path") or result_data.get("deliverables_path")
message = result_data.get("message") or "子智能体任务已完成。"
deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成"
lines = [f"子智能体 #{agent_id}/{task_id} 完成"]
if stats_text:
lines.append(stats_text)
if isinstance(elapsed_seconds, (int, float)):
lines.append(f"运行了{int(round(elapsed_seconds))}")
lines.append(message)
lines.append(deliver_note)
return "\n".join(lines)
message = result_data.get("message") or result_data.get("error") or "子智能体任务失败"
lines = [f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}"]
if stats_text:
lines.append(stats_text)
lines.append(message)
return "\n".join(lines)
def _format_get_sub_agent_status(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("get_sub_agent_status", result_data)
results = result_data.get("results") or []
if not results:
return "未找到子智能体状态。"
blocks = []
for item in results:
agent_id = item.get("agent_id")
if not item.get("found"):
blocks.append(f"子智能体 #{agent_id} 未找到。")
continue
status = item.get("status")
summary = None
final_result = item.get("final_result") or {}
elapsed_seconds = None
if isinstance(final_result, dict):
summary = final_result.get("message") or final_result.get("summary")
elapsed_seconds = final_result.get("runtime_seconds")
if elapsed_seconds is None:
elapsed_seconds = final_result.get("elapsed_seconds")
if not summary:
summary = item.get("summary") or ""
stats_text = _format_sub_agent_stats(item.get("stats"))
lines = [f"子智能体 #{agent_id} 状态: {status}"]
if stats_text:
lines.append(stats_text)
if status == "completed" and isinstance(elapsed_seconds, (int, float)):
lines.append(f"运行了{int(round(elapsed_seconds))}")
if summary:
lines.append(str(summary))
blocks.append("\n".join(lines))
return "\n\n".join(blocks)
def _format_close_sub_agent(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("close_sub_agent", result_data)
message = result_data.get("message") or "子智能体已关闭。"
task_id = result_data.get("task_id")
status = result_data.get("status")
status_note = f"(状态 {status}" if status else ""
return f"{message}{status_note}task_id={task_id}"
def _format_failure(tag: str, result_data: Dict[str, Any]) -> str:
error = result_data.get("error") or result_data.get("message") or "未知错误"
suggestion = result_data.get("suggestion")
details = result_data.get("details")
parts = [f"⚠️ {tag} 失败: {error}"]
if suggestion:
parts.append(f"建议:{suggestion}")
elif isinstance(details, str) and details:
parts.append(f"详情:{details}")
elif isinstance(details, dict):
detail_msg = details.get("message") or details.get("error")
if detail_msg:
parts.append(f"详情:{detail_msg}")
return "".join(parts)
def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str:
if not output:
return "无可见输出"
lines = output.splitlines()
line_count = len(lines)
char_count = len(output)
meta = f"输出 {line_count} 行 / {char_count} 字符"
if truncated:
meta += "(已截断)"
return f"{meta}\n```\n{output}\n```"
def _preview_text(text: str, limit: int) -> Tuple[str, bool]:
"""返回截断预览及是否截断标记。"""
if text is None:
return "", False
if len(text) <= limit:
return text, False
return text[:limit], True
def _format_terminal_snapshot(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("terminal_snapshot", result_data)
session = result_data.get("session") or "default"
requested = result_data.get("line_limit")
requested_note = f"{requested}" if requested is not None else "全部"
actual = result_data.get("lines_returned")
actual_note = f"{actual}" if actual is not None else "未知行数"
truncated = result_data.get("truncated")
trunc_note = "已截断" if truncated else "未截断"
output = result_data.get("output") or ""
header = f"会话 {session}:请求 {requested_note},实际返回 {actual_note}{trunc_note})。"
body = _summarize_output_block(output, truncated)
return f"{header}\n{body}"
def _format_extract_webpage(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("extract_webpage", result_data)
url = result_data.get("url") or "目标网页"
content = result_data.get("content") or ""
length = len(content)
truncated_flag = result_data.get("truncated") or False
header = f"提取完成:{url},长度 {length} 字符。"
if not content:
return f"{header} 内容为空。"
# 为模型保留完整正文,避免 800 字预览导致上下文缺失
note_parts = []
if truncated_flag:
note_parts.append("原始内容已被上游截断")
note = f"{''.join(note_parts)}" if note_parts else ""
return "\n".join([f"{header}{note}", "```", content, "```"])
def _format_vlm_analyze(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("vlm_analyze", result_data)
content = result_data.get("content") or ""
length = len(content)
preview, truncated = _preview_text(content, 800)
note = "(截断预览)" if truncated else "(未截断)"
header = f"VLM 解析完成,长度 {length} 字符{note}"
if not content:
return f"{header};未返回可识别文本。"
return "\n".join([header, "```", preview, "```"])
# 兼容旧名
def _format_ocr_image(result_data: Dict[str, Any]) -> str:
return _format_vlm_analyze(result_data)
def _format_trigger_easter_egg(result_data: Dict[str, Any]) -> str:
if not result_data.get("success"):
return _format_failure("trigger_easter_egg", result_data)
effect = (result_data.get("effect") or "").lower()
duration = result_data.get("duration_seconds") or result_data.get("duration")
if effect == "flood":
return "大水即将淹没屏幕!预计持续 45 秒,不过这期间你和用户还可以继续对话。"
if effect == "snake":
dur_text = f"{duration}" if duration else "约 200 秒"
return f"发光贪吃蛇来访,约 {dur_text} 后或吃满 20 个苹果离场;动画不挡操作。"
message = result_data.get("message")
if message:
return message
return f"已触发彩蛋:{effect or '未知效果'}"
def _format_command_result(label: str, result_data: Dict[str, Any]) -> str:
command = result_data.get("command") or ""
return_code = result_data.get("return_code")
success = result_data.get("success")
status = result_data.get("status")
output = result_data.get("output")
truncated = result_data.get("truncated")
message = result_data.get("message")
if success:
header = f"{label}: `{command}`" if command else label
if return_code is not None and return_code != "":
header += f" (return_code={return_code})"
lines = [header]
if status and status not in {"completed", "success"}:
lines.append(f"终端状态: {status}")
if message:
lines.append(message)
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
error_msg = result_data.get("error") or message or "执行失败"
header = f"⚠️ {label} 失败"
if command:
header += f"(命令 `{command}`"
lines = [f"{header}: {error_msg}"]
if return_code not in {None, ""}:
lines.append(f"返回码: {return_code}")
if output:
lines.append(_summarize_output_block(output, truncated))
return "\n".join(lines)
def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str:
if not isinstance(todo, dict):
return ""
tasks = todo.get("tasks") or []
parts = []
for task in tasks:
status_icon = "" if task.get("status") == "done" else "⬜️"
parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}")
return "".join(parts)
TOOL_FORMATTERS = {
"create_file": _format_create_file,
"write_file": _format_write_file,
"edit_file": _format_edit_file,
"delete_file": _format_delete_file,
"rename_file": _format_rename_file,
"create_folder": _format_create_folder,
"terminal_snapshot": _format_terminal_snapshot,
"terminal_session": _format_terminal_session,
"terminal_input": _format_terminal_input,
"sleep": _format_sleep,
"run_command": _format_run_command,
"run_python": _format_run_python,
"extract_webpage": _format_extract_webpage,
"vlm_analyze": _format_vlm_analyze,
"ocr_image": _format_ocr_image,
"trigger_easter_egg": _format_trigger_easter_egg,
"todo_create": _format_todo_create,
"todo_update_task": _format_todo_update_task,
"update_memory": _format_update_memory,
"create_sub_agent": _format_create_sub_agent,
"wait_sub_agent": _format_wait_sub_agent,
"close_sub_agent": _format_close_sub_agent,
"get_sub_agent_status": _format_get_sub_agent_status,
}