535 lines
24 KiB
Python
535 lines
24 KiB
Python
"""将工具执行结果转换为对话上下文可用的纯文本摘要。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
|
||
def format_read_file_result(result_data: Dict[str, Any]) -> str:
|
||
"""格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。"""
|
||
if not isinstance(result_data, dict):
|
||
return str(result_data)
|
||
if not result_data.get("success"):
|
||
return _format_failure("read_file", result_data)
|
||
|
||
read_type = result_data.get("type", "read")
|
||
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
|
||
path = result_data.get("path", "未知路径")
|
||
max_chars = result_data.get("max_chars")
|
||
max_note = f"(max_chars={max_chars})" if max_chars else ""
|
||
|
||
if read_type == "read":
|
||
header = (
|
||
f"读取 {path} 行 {result_data.get('line_start')}~{result_data.get('line_end')} "
|
||
f"{max_note}{truncated_note}"
|
||
).strip()
|
||
content = result_data.get("content", "")
|
||
return f"{header}\n```\n{content}\n```"
|
||
|
||
if read_type == "search":
|
||
query = result_data.get("query", "")
|
||
actual = result_data.get("actual_matches", 0)
|
||
returned = result_data.get("returned_matches", 0)
|
||
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
|
||
header = (
|
||
f"在 {path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint}) "
|
||
f"{max_note}{truncated_note}"
|
||
).strip()
|
||
match_texts: List[str] = []
|
||
for idx, match in enumerate(result_data.get("matches", []), 1):
|
||
match_note = "(片段截断)" if match.get("truncated") else ""
|
||
hits = match.get("hits") or []
|
||
hit_text = ", ".join(str(h) for h in hits) if hits else "无"
|
||
label = match.get("id") or f"match_{idx}"
|
||
snippet = match.get("snippet", "")
|
||
match_texts.append(
|
||
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
|
||
)
|
||
if not match_texts:
|
||
match_texts.append("未找到匹配内容。")
|
||
return "\n".join([header] + match_texts)
|
||
|
||
if read_type == "extract":
|
||
segments = result_data.get("segments", [])
|
||
header = f"从 {path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
|
||
seg_texts: List[str] = []
|
||
for idx, segment in enumerate(segments, 1):
|
||
seg_note = "(片段截断)" if segment.get("truncated") else ""
|
||
label = segment.get("label") or f"segment_{idx}"
|
||
snippet = segment.get("content", "")
|
||
seg_texts.append(
|
||
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
|
||
)
|
||
if not seg_texts:
|
||
seg_texts.append("未提供可抽取的片段。")
|
||
return "\n".join([header] + seg_texts)
|
||
|
||
return _format_failure("read_file", {"error": "不支持的读取模式"})
|
||
|
||
|
||
def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str:
|
||
"""根据工具名称输出纯文本摘要,必要时附加关键信息。"""
|
||
if function_name == "read_file" and isinstance(result_data, dict):
|
||
return format_read_file_result(result_data)
|
||
|
||
if function_name == "write_file_diff" and isinstance(result_data, dict):
|
||
return _format_write_file_diff(result_data, raw_text)
|
||
|
||
if not isinstance(result_data, dict):
|
||
return raw_text
|
||
|
||
handler = TOOL_FORMATTERS.get(function_name)
|
||
if handler:
|
||
return handler(result_data)
|
||
|
||
summary = result_data.get("summary") or result_data.get("message")
|
||
error_msg = result_data.get("error")
|
||
parts: List[str] = []
|
||
if summary:
|
||
parts.append(str(summary))
|
||
if error_msg:
|
||
parts.append(f"⚠️ 错误: {error_msg}")
|
||
return "\n".join(parts) if parts else raw_text
|
||
|
||
|
||
def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str:
|
||
path = result_data.get("path", "目标文件")
|
||
summary = result_data.get("summary") or result_data.get("message")
|
||
completed = result_data.get("completed") or []
|
||
failed_blocks = result_data.get("failed") or []
|
||
success_blocks = result_data.get("blocks") or []
|
||
lines = [f"[文件补丁] {path}"]
|
||
if summary:
|
||
lines.append(summary)
|
||
if completed:
|
||
lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}")
|
||
if failed_blocks:
|
||
fail_descriptions = []
|
||
for item in failed_blocks[:3]:
|
||
idx = item.get("index")
|
||
reason = item.get("reason") or item.get("error") or "未说明原因"
|
||
fail_descriptions.append(f"#{idx}: {reason}")
|
||
lines.append("⚠️ 失败块: " + ";".join(fail_descriptions))
|
||
if len(failed_blocks) > 3:
|
||
lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)")
|
||
# 通用排查提示:把最常见的坑点一次性说清楚,减少来回沟通成本
|
||
lines.append("🔎 排查提示(常见易错点):")
|
||
lines.append("- 是否把“要新增/要删除/要替换”的每一行都标了 `+` 或 `-`?(漏标会被当成上下文/锚点)")
|
||
lines.append("- 空行也要写成单独一行的 `+`(只有 `+` 和换行),否则空行会消失或被当成上下文导致匹配失败。")
|
||
lines.append("- 若目标文件是空文件:应使用“仅追加”写法(块内只有 `+` 行,不要混入未加前缀的正文)。")
|
||
lines.append("- 若希望在文件中间插入/替换:必须提供足够的上下文行(以空格开头)或删除行(`-`)来锚定位置,不能只贴 `+`。")
|
||
lines.append("- 是否存在空格/Tab/缩进差异、全角半角标点差异、大小写差异?上下文与原文必须字节级一致。")
|
||
lines.append("- 是否是 CRLF(\\r\\n) 与 LF(\\n) 混用导致原文匹配失败?可先用终端查看/统一换行后再补丁。")
|
||
lines.append("- 是否遗漏 `*** Begin Patch`/`*** End Patch` 或在第一个 `@@` 之前写了其它内容?")
|
||
detail_sections: List[str] = []
|
||
for item in failed_blocks:
|
||
idx = item.get("index")
|
||
reason = item.get("reason") or item.get("error") or "未说明原因"
|
||
hint = item.get("hint")
|
||
block_patch = item.get("block_patch") or item.get("patch")
|
||
# 自动判别常见错误形态,便于快速定位问题
|
||
diagnostics = _classify_diff_block_issue(item, result_data)
|
||
if not block_patch:
|
||
old_text = item.get("old_text") or ""
|
||
new_text = item.get("new_text") or ""
|
||
synthetic_lines: List[str] = []
|
||
if old_text:
|
||
synthetic_lines.extend(f"-{line}" for line in old_text.splitlines())
|
||
if new_text:
|
||
synthetic_lines.extend(f"+{line}" for line in new_text.splitlines())
|
||
if synthetic_lines:
|
||
block_patch = "\n".join(synthetic_lines)
|
||
detail_sections.append(f"- #{idx}: {reason}")
|
||
if diagnostics:
|
||
detail_sections.append(f" 错误类型: {diagnostics}")
|
||
if hint:
|
||
detail_sections.append(f" 提示: {hint}")
|
||
if block_patch:
|
||
detail_sections.append("```diff")
|
||
detail_sections.append(block_patch.rstrip("\n"))
|
||
detail_sections.append("```")
|
||
detail_sections.append("")
|
||
if detail_sections and detail_sections[-1] == "":
|
||
detail_sections.pop()
|
||
if detail_sections:
|
||
lines.append("⚠️ 失败块详情:")
|
||
lines.extend(detail_sections)
|
||
|
||
# 对“成功块”做轻量体检:如果检测到潜在格式风险,给出风险提示(不影响 success 判定)
|
||
risk_sections: List[str] = []
|
||
for item in success_blocks:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
status = item.get("status")
|
||
idx = item.get("index")
|
||
if status != "success":
|
||
continue
|
||
diag = _classify_diff_block_issue(item, result_data)
|
||
if diag:
|
||
risk_sections.append(f"- #{idx}: {diag}")
|
||
if risk_sections:
|
||
lines.append("⚠️ 风险提示(补丁虽成功但格式可能有隐患):")
|
||
lines.extend(risk_sections)
|
||
|
||
if result_data.get("success") is False and result_data.get("error"):
|
||
lines.append(f"⚠️ 错误: {result_data.get('error')}")
|
||
formatted = "\n".join(line for line in lines if line)
|
||
return formatted or raw_text
|
||
|
||
|
||
def _format_create_file(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("create_file", result_data)
|
||
return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}"
|
||
|
||
|
||
def _classify_diff_block_issue(block: Dict[str, Any], result_data: Dict[str, Any]) -> str:
|
||
"""
|
||
针对 write_file_diff 常见的“离谱/易错”用法做启发式判别,返回简短错误类型说明。
|
||
不改变后端逻辑,只用于提示。
|
||
"""
|
||
patch_text = block.get("block_patch") or block.get("patch") or ""
|
||
lines = patch_text.splitlines()
|
||
plus = sum(1 for ln in lines if ln.startswith("+"))
|
||
minus = sum(1 for ln in lines if ln.startswith("-"))
|
||
context = sum(1 for ln in lines if ln.startswith(" "))
|
||
total = len([ln for ln in lines if ln.strip() != ""])
|
||
|
||
reasons: List[str] = []
|
||
# 1) 完全没加 + / - :最常见的“把目标当上下文”
|
||
if total > 0 and plus == 0 and minus == 0:
|
||
reasons.append("缺少 + / -,整块被当作上下文,无法定位到文件")
|
||
# 2) 全是 + 且没有上下文/删除:解析为“纯追加”,若目标非末尾插入会失败
|
||
if plus > 0 and minus == 0 and context == 0:
|
||
reasons.append("仅包含 + 行,被视为追加块;若想中间插入/替换需提供上下文或 -")
|
||
# 3) 没有上下文或删除行却不是 append_only(多数是漏写空格前缀)
|
||
if block.get("append_only") and (context > 0 or minus > 0):
|
||
reasons.append("块被解析为追加模式,但混入了上下文/删除行,可能写法不一致")
|
||
# 4) 未找到匹配时,提示检查空格/缩进/全角半角/换行差异
|
||
reason_text = (block.get("reason") or "").lower()
|
||
if "未找到匹配" in reason_text:
|
||
reasons.append("上下文未匹配:检查空格/缩进、全角半角、CRLF/LF、大小写是否与原文完全一致")
|
||
# 5) 空行未加 '+' 的典型情形:
|
||
# a) 有空白行但整块没有前缀(此前已由 #1 捕获),仍补充提示
|
||
# b) 有空白行且块中存在 + / -,说明空行漏写前缀,易导致上下文匹配失败
|
||
if any(ln == "" or ln.strip() == "" for ln in lines):
|
||
if plus == 0 and minus == 0:
|
||
reasons.append("空行未写 `+`,被当作上下文,建议空行写成单独一行 `+`")
|
||
else:
|
||
reasons.append("空行未写 `+`(或空格上下文),混入补丁时会被当作上下文,建议空行单独写成 `+`")
|
||
|
||
return ";".join(reasons)
|
||
|
||
|
||
def _format_delete_file(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("delete_file", result_data)
|
||
path = result_data.get("path") or "未知路径"
|
||
action = result_data.get("action") or "deleted"
|
||
return f"已{action}文件: {path}"
|
||
|
||
|
||
def _format_rename_file(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("rename_file", result_data)
|
||
old_path = result_data.get("old_path") or "旧路径未知"
|
||
new_path = result_data.get("new_path") or "新路径未知"
|
||
return f"已重命名: {old_path} -> {new_path}"
|
||
|
||
|
||
def _format_create_folder(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("create_folder", result_data)
|
||
return f"已创建文件夹: {result_data.get('path', '未知路径')}"
|
||
|
||
|
||
def _format_focus_file(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("focus_file", result_data)
|
||
message = result_data.get("message") or "文件已聚焦"
|
||
size = result_data.get("file_size")
|
||
size_note = f"({size} 字符)" if isinstance(size, int) else ""
|
||
focused = result_data.get("focused_files") or []
|
||
focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件"
|
||
return f"{message}{size_note}\n{focused_note}"
|
||
|
||
|
||
def _format_unfocus_file(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("unfocus_file", result_data)
|
||
message = result_data.get("message") or "已取消聚焦"
|
||
remaining = result_data.get("remaining_focused") or []
|
||
remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件"
|
||
return f"{message}\n{remain_note}"
|
||
|
||
|
||
def _format_terminal_session(result_data: Dict[str, Any]) -> str:
|
||
action = result_data.get("action") or result_data.get("terminal_action") or "未知操作"
|
||
tag = f"terminal_session[{action}]"
|
||
if not result_data.get("success"):
|
||
return _format_failure(tag, result_data)
|
||
if action == "open":
|
||
return (
|
||
f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')},"
|
||
f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)"
|
||
)
|
||
if action == "close":
|
||
new_active = result_data.get("new_active") or "无"
|
||
remaining = result_data.get("remaining_sessions") or []
|
||
return (
|
||
f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}。"
|
||
f"剩余会话: {', '.join(remaining) if remaining else '无'}"
|
||
)
|
||
if action == "switch":
|
||
previous = result_data.get("previous") or "无"
|
||
current = result_data.get("current") or "未知"
|
||
return f"终端已从 {previous} 切换到 {current}。"
|
||
if action == "list":
|
||
sessions = result_data.get("sessions") or []
|
||
total = result_data.get("total", len(sessions))
|
||
max_allowed = result_data.get("max_allowed")
|
||
active = result_data.get("active") or "无"
|
||
header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}"
|
||
session_lines = []
|
||
for session in sessions:
|
||
name = session.get("session_name") or session.get("name") or "未命名"
|
||
state = "运行中" if session.get("is_running") else "已停止"
|
||
marker = "⭐" if session.get("is_active") else " "
|
||
working_dir = session.get("working_dir") or "未知目录"
|
||
session_lines.append(f"{marker} {name} | {state} | {working_dir}")
|
||
return "\n".join([header] + session_lines) if session_lines else header
|
||
return result_data.get("message") or f"{tag} 操作已完成。"
|
||
|
||
|
||
def _format_terminal_input(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("terminal_input", result_data)
|
||
session = result_data.get("session") or result_data.get("session_name") or "default"
|
||
command = result_data.get("command") or "(命令缺失)"
|
||
status = result_data.get("status") or "completed"
|
||
message = result_data.get("message") or ""
|
||
lines = [
|
||
f"terminal_input: 在 {session} 执行 `{command}`,状态 {status}",
|
||
]
|
||
if message:
|
||
lines.append(message)
|
||
lines.append(_summarize_output_block(result_data.get("output"), result_data.get("truncated")))
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _format_sleep(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("sleep", result_data)
|
||
reason = result_data.get("reason")
|
||
timestamp = result_data.get("timestamp")
|
||
message = result_data.get("message") or "等待完成"
|
||
parts = [message]
|
||
if reason:
|
||
parts.append(f"原因:{reason}")
|
||
if timestamp:
|
||
parts.append(f"时间:{timestamp}")
|
||
return ";".join(parts)
|
||
|
||
|
||
def _format_run_command(result_data: Dict[str, Any]) -> str:
|
||
return _format_command_result("run_command", result_data)
|
||
|
||
|
||
def _format_run_python(result_data: Dict[str, Any]) -> str:
|
||
base = _format_command_result("run_python", result_data)
|
||
code = result_data.get("code")
|
||
if not isinstance(code, str):
|
||
return base
|
||
header = f"run_python: 执行临时代码({len(code)} 字符)"
|
||
return "\n".join([header, base])
|
||
|
||
|
||
def _format_todo_create(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("todo_create", result_data)
|
||
todo = (result_data.get("todo_list") or {}).copy()
|
||
overview = todo.get("overview") or "未命名任务"
|
||
tasks = _summarize_todo_tasks(todo)
|
||
return f"已创建 TODO:{overview}\n{tasks}"
|
||
|
||
|
||
def _format_todo_update_task(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("todo_update_task", result_data)
|
||
message = result_data.get("message") or "任务状态已更新"
|
||
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
|
||
return f"{message}\n{tasks}" if tasks else message
|
||
|
||
|
||
def _format_todo_finish(result_data: Dict[str, Any]) -> str:
|
||
if result_data.get("success"):
|
||
message = result_data.get("message") or "待办列表已结束"
|
||
tasks = _summarize_todo_tasks(result_data.get("todo_list"))
|
||
return f"{message}\n{tasks}" if tasks else message
|
||
if result_data.get("requires_confirmation"):
|
||
remaining = result_data.get("remaining") or []
|
||
remain_note = ", ".join(remaining) if remaining else "未知"
|
||
return f"仍有未完成任务({remain_note}),需要确认是否提前结束。"
|
||
return _format_failure("todo_finish", result_data)
|
||
|
||
|
||
def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("todo_finish_confirm", result_data)
|
||
message = result_data.get("message") or "已处理 TODO 完结确认"
|
||
todo = result_data.get("todo_list") or {}
|
||
if todo.get("forced_finish"):
|
||
reason = todo.get("forced_reason") or "未提供原因"
|
||
message = f"{message}(强制结束,原因:{reason})"
|
||
return message
|
||
|
||
|
||
def _format_update_memory(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("update_memory", result_data)
|
||
mem_type = result_data.get("memory_type") or "main"
|
||
operation = result_data.get("operation") or "write"
|
||
label = "主记忆" if mem_type == "main" else "任务记忆"
|
||
idx = result_data.get("index")
|
||
count = result_data.get("count")
|
||
if operation == "append":
|
||
suffix = f"(共 {count} 条)" if count is not None else ""
|
||
return f"{label}已追加新条目{suffix}"
|
||
if operation == "replace":
|
||
return f"{label}第 {idx} 条已替换。"
|
||
if operation == "delete":
|
||
suffix = f"(剩余 {count} 条)" if count is not None else ""
|
||
return f"{label}第 {idx} 条已删除{suffix}"
|
||
return f"{label}已更新。"
|
||
|
||
|
||
def _format_create_sub_agent(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("create_sub_agent", result_data)
|
||
agent_id = result_data.get("agent_id")
|
||
task_id = result_data.get("task_id")
|
||
status = result_data.get("status")
|
||
refs = result_data.get("copied_references") or []
|
||
ref_note = f",附带 {len(refs)} 份参考文件" if refs else ""
|
||
deliver_dir = result_data.get("deliverables_dir")
|
||
deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else ""
|
||
return f"子智能体 #{agent_id} 已创建(task_id={task_id},状态 {status}{ref_note}{deliver_note})。"
|
||
|
||
|
||
def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str:
|
||
task_id = result_data.get("task_id")
|
||
agent_id = result_data.get("agent_id")
|
||
status = result_data.get("status")
|
||
if result_data.get("success"):
|
||
copied_path = result_data.get("copied_path") or result_data.get("deliverables_path")
|
||
message = result_data.get("message") or "子智能体任务已完成。"
|
||
deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成"
|
||
return f"子智能体 #{agent_id}/{task_id} 完成:{message}({deliver_note})"
|
||
message = result_data.get("message") or result_data.get("error") or "子智能体任务失败"
|
||
return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}"
|
||
|
||
|
||
def _format_close_sub_agent(result_data: Dict[str, Any]) -> str:
|
||
if not result_data.get("success"):
|
||
return _format_failure("close_sub_agent", result_data)
|
||
message = result_data.get("message") or "子智能体已关闭。"
|
||
task_id = result_data.get("task_id")
|
||
status = result_data.get("status")
|
||
status_note = f"(状态 {status})" if status else ""
|
||
return f"{message}{status_note}(task_id={task_id})"
|
||
|
||
|
||
def _format_failure(tag: str, result_data: Dict[str, Any]) -> str:
|
||
error = result_data.get("error") or result_data.get("message") or "未知错误"
|
||
suggestion = result_data.get("suggestion")
|
||
details = result_data.get("details")
|
||
parts = [f"⚠️ {tag} 失败: {error}"]
|
||
if suggestion:
|
||
parts.append(f"建议:{suggestion}")
|
||
elif isinstance(details, str) and details:
|
||
parts.append(f"详情:{details}")
|
||
elif isinstance(details, dict):
|
||
detail_msg = details.get("message") or details.get("error")
|
||
if detail_msg:
|
||
parts.append(f"详情:{detail_msg}")
|
||
return ";".join(parts)
|
||
|
||
|
||
def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str:
|
||
if not output:
|
||
return "无可见输出"
|
||
lines = output.splitlines()
|
||
line_count = len(lines)
|
||
char_count = len(output)
|
||
meta = f"输出 {line_count} 行 / {char_count} 字符"
|
||
if truncated:
|
||
meta += "(已截断)"
|
||
return f"{meta}\n```\n{output}\n```"
|
||
|
||
|
||
def _format_command_result(label: str, result_data: Dict[str, Any]) -> str:
|
||
command = result_data.get("command") or ""
|
||
return_code = result_data.get("return_code")
|
||
success = result_data.get("success")
|
||
status = result_data.get("status")
|
||
output = result_data.get("output")
|
||
truncated = result_data.get("truncated")
|
||
message = result_data.get("message")
|
||
|
||
if success:
|
||
header = f"{label}: `{command}`" if command else label
|
||
if return_code is not None and return_code != "":
|
||
header += f" (return_code={return_code})"
|
||
lines = [header]
|
||
if status and status not in {"completed", "success"}:
|
||
lines.append(f"终端状态: {status}")
|
||
if message:
|
||
lines.append(message)
|
||
lines.append(_summarize_output_block(output, truncated))
|
||
return "\n".join(lines)
|
||
|
||
error_msg = result_data.get("error") or message or "执行失败"
|
||
header = f"⚠️ {label} 失败"
|
||
if command:
|
||
header += f"(命令 `{command}`)"
|
||
lines = [f"{header}: {error_msg}"]
|
||
if return_code not in {None, ""}:
|
||
lines.append(f"返回码: {return_code}")
|
||
if output:
|
||
lines.append(_summarize_output_block(output, truncated))
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str:
|
||
if not isinstance(todo, dict):
|
||
return ""
|
||
tasks = todo.get("tasks") or []
|
||
parts = []
|
||
for task in tasks:
|
||
status_icon = "✅" if task.get("status") == "done" else "⬜️"
|
||
parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}")
|
||
return ";".join(parts)
|
||
|
||
|
||
TOOL_FORMATTERS = {
|
||
"create_file": _format_create_file,
|
||
"delete_file": _format_delete_file,
|
||
"rename_file": _format_rename_file,
|
||
"create_folder": _format_create_folder,
|
||
"focus_file": _format_focus_file,
|
||
"unfocus_file": _format_unfocus_file,
|
||
"terminal_session": _format_terminal_session,
|
||
"terminal_input": _format_terminal_input,
|
||
"sleep": _format_sleep,
|
||
"run_command": _format_run_command,
|
||
"run_python": _format_run_python,
|
||
"todo_create": _format_todo_create,
|
||
"todo_update_task": _format_todo_update_task,
|
||
"todo_finish": _format_todo_finish,
|
||
"todo_finish_confirm": _format_todo_finish_confirm,
|
||
"update_memory": _format_update_memory,
|
||
"create_sub_agent": _format_create_sub_agent,
|
||
"wait_sub_agent": _format_wait_sub_agent,
|
||
"close_sub_agent": _format_close_sub_agent,
|
||
}
|