"""将工具执行结果转换为对话上下文可用的纯文本摘要。""" from __future__ import annotations from typing import Any, Dict, List, Optional, Tuple def format_read_file_result(result_data: Dict[str, Any]) -> str: """格式化 read_file 工具的输出,兼容读取/搜索/抽取模式。""" if not isinstance(result_data, dict): return str(result_data) if not result_data.get("success"): return _format_failure("read_file", result_data) read_type = result_data.get("type", "read") truncated_note = "(内容已截断)" if result_data.get("truncated") else "" path = result_data.get("path", "未知路径") max_chars = result_data.get("max_chars") max_note = f"(max_chars={max_chars})" if max_chars else "" if read_type == "read": header = ( f"读取 {path} 行 {result_data.get('line_start')}~{result_data.get('line_end')} " f"{max_note}{truncated_note}" ).strip() content = result_data.get("content", "") return f"{header}\n```\n{content}\n```" if read_type == "search": query = result_data.get("query", "") actual = result_data.get("actual_matches", 0) returned = result_data.get("returned_matches", 0) case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写" header = ( f"在 {path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint}) " f"{max_note}{truncated_note}" ).strip() match_texts: List[str] = [] for idx, match in enumerate(result_data.get("matches", []), 1): match_note = "(片段截断)" if match.get("truncated") else "" hits = match.get("hits") or [] hit_text = ", ".join(str(h) for h in hits) if hits else "无" label = match.get("id") or f"match_{idx}" snippet = match.get("snippet", "") match_texts.append( f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```" ) if not match_texts: match_texts.append("未找到匹配内容。") return "\n".join([header] + match_texts) if read_type == "extract": segments = result_data.get("segments", []) header = f"从 {path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip() seg_texts: List[str] = [] for idx, segment in enumerate(segments, 1): seg_note = "(片段截断)" if segment.get("truncated") else "" label = segment.get("label") or f"segment_{idx}" snippet = segment.get("content", "") seg_texts.append( f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```" ) if not seg_texts: seg_texts.append("未提供可抽取的片段。") return "\n".join([header] + seg_texts) return _format_failure("read_file", {"error": "不支持的读取模式"}) def format_tool_result_for_context(function_name: str, result_data: Any, raw_text: str = "") -> str: """根据工具名称输出纯文本摘要,必要时附加关键信息。""" if function_name == "read_file" and isinstance(result_data, dict): return format_read_file_result(result_data) if function_name == "write_file_diff" and isinstance(result_data, dict): return _format_write_file_diff(result_data, raw_text) if not isinstance(result_data, dict): return raw_text handler = TOOL_FORMATTERS.get(function_name) if handler: return handler(result_data) summary = result_data.get("summary") or result_data.get("message") error_msg = result_data.get("error") parts: List[str] = [] if summary: parts.append(str(summary)) if error_msg: parts.append(f"⚠️ 错误: {error_msg}") return "\n".join(parts) if parts else raw_text def _format_write_file_diff(result_data: Dict[str, Any], raw_text: str) -> str: path = result_data.get("path", "目标文件") summary = result_data.get("summary") or result_data.get("message") completed = result_data.get("completed") or [] failed_blocks = result_data.get("failed") or [] success_blocks = result_data.get("blocks") or [] lines = [f"[文件补丁] {path}"] if summary: lines.append(summary) if completed: lines.append(f"✅ 成功块: {', '.join(str(i) for i in completed)}") if failed_blocks: fail_descriptions = [] for item in failed_blocks[:3]: idx = item.get("index") reason = item.get("reason") or item.get("error") or "未说明原因" fail_descriptions.append(f"#{idx}: {reason}") lines.append("⚠️ 失败块: " + ";".join(fail_descriptions)) if len(failed_blocks) > 3: lines.append(f"(其余 {len(failed_blocks) - 3} 个失败块略)") # 通用排查提示:把最常见的坑点一次性说清楚,减少来回沟通成本 lines.append("🔎 排查提示(常见易错点):") lines.append("- 是否把“要新增/要删除/要替换”的每一行都标了 `+` 或 `-`?(漏标会被当成上下文/锚点)") lines.append("- 空行也要写成单独一行的 `+`(只有 `+` 和换行),否则空行会消失或被当成上下文导致匹配失败。") lines.append("- 若目标文件是空文件:应使用“仅追加”写法(块内只有 `+` 行,不要混入未加前缀的正文)。") lines.append("- 若希望在文件中间插入/替换:必须提供足够的上下文行(以空格开头)或删除行(`-`)来锚定位置,不能只贴 `+`。") lines.append("- 是否存在空格/Tab/缩进差异、全角半角标点差异、大小写差异?上下文与原文必须字节级一致。") lines.append("- 是否是 CRLF(\\r\\n) 与 LF(\\n) 混用导致原文匹配失败?可先用终端查看/统一换行后再补丁。") lines.append("- 是否遗漏 `*** Begin Patch`/`*** End Patch` 或在第一个 `@@` 之前写了其它内容?") detail_sections: List[str] = [] for item in failed_blocks: idx = item.get("index") reason = item.get("reason") or item.get("error") or "未说明原因" hint = item.get("hint") block_patch = item.get("block_patch") or item.get("patch") # 自动判别常见错误形态,便于快速定位问题 diagnostics = _classify_diff_block_issue(item, result_data) if not block_patch: old_text = item.get("old_text") or "" new_text = item.get("new_text") or "" synthetic_lines: List[str] = [] if old_text: synthetic_lines.extend(f"-{line}" for line in old_text.splitlines()) if new_text: synthetic_lines.extend(f"+{line}" for line in new_text.splitlines()) if synthetic_lines: block_patch = "\n".join(synthetic_lines) detail_sections.append(f"- #{idx}: {reason}") if diagnostics: detail_sections.append(f" 错误类型: {diagnostics}") if hint: detail_sections.append(f" 提示: {hint}") if block_patch: detail_sections.append("```diff") detail_sections.append(block_patch.rstrip("\n")) detail_sections.append("```") detail_sections.append("") if detail_sections and detail_sections[-1] == "": detail_sections.pop() if detail_sections: lines.append("⚠️ 失败块详情:") lines.extend(detail_sections) # 对“成功块”做轻量体检:如果检测到潜在格式风险,给出风险提示(不影响 success 判定) risk_sections: List[str] = [] for item in success_blocks: if not isinstance(item, dict): continue status = item.get("status") idx = item.get("index") if status != "success": continue diag = _classify_diff_block_issue(item, result_data) if diag: risk_sections.append(f"- #{idx}: {diag}") if risk_sections: lines.append("⚠️ 风险提示(补丁虽成功但格式可能有隐患):") lines.extend(risk_sections) if result_data.get("success") is False and result_data.get("error"): lines.append(f"⚠️ 错误: {result_data.get('error')}") formatted = "\n".join(line for line in lines if line) return formatted or raw_text def _format_create_file(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("create_file", result_data) return result_data.get("message") or f"已创建空文件: {result_data.get('path', '未知路径')}" def _classify_diff_block_issue(block: Dict[str, Any], result_data: Dict[str, Any]) -> str: """ 针对 write_file_diff 常见的“离谱/易错”用法做启发式判别,返回简短错误类型说明。 不改变后端逻辑,只用于提示。 """ patch_text = block.get("block_patch") or block.get("patch") or "" lines = patch_text.splitlines() plus = sum(1 for ln in lines if ln.startswith("+")) minus = sum(1 for ln in lines if ln.startswith("-")) context = sum(1 for ln in lines if ln.startswith(" ")) total = len([ln for ln in lines if ln.strip() != ""]) reasons: List[str] = [] # 1) 完全没加 + / - :最常见的“把目标当上下文” if total > 0 and plus == 0 and minus == 0: reasons.append("缺少 + / -,整块被当作上下文,无法定位到文件") # 2) 全是 + 且没有上下文/删除:解析为“纯追加”,若目标非末尾插入会失败 if plus > 0 and minus == 0 and context == 0: reasons.append("仅包含 + 行,被视为追加块;若想中间插入/替换需提供上下文或 -") # 3) 没有上下文或删除行却不是 append_only(多数是漏写空格前缀) if block.get("append_only") and (context > 0 or minus > 0): reasons.append("块被解析为追加模式,但混入了上下文/删除行,可能写法不一致") # 4) 未找到匹配时,提示检查空格/缩进/全角半角/换行差异 reason_text = (block.get("reason") or "").lower() if "未找到匹配" in reason_text: reasons.append("上下文未匹配:检查空格/缩进、全角半角、CRLF/LF、大小写是否与原文完全一致") # 5) 空行未加 '+' 的典型情形: # a) 有空白行但整块没有前缀(此前已由 #1 捕获),仍补充提示 # b) 有空白行且块中存在 + / -,说明空行漏写前缀,易导致上下文匹配失败 if any(ln == "" or ln.strip() == "" for ln in lines): if plus == 0 and minus == 0: reasons.append("空行未写 `+`,被当作上下文,建议空行写成单独一行 `+`") else: reasons.append("空行未写 `+`(或空格上下文),混入补丁时会被当作上下文,建议空行单独写成 `+`") return ";".join(reasons) def _format_delete_file(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("delete_file", result_data) path = result_data.get("path") or "未知路径" action = result_data.get("action") or "deleted" return f"已{action}文件: {path}" def _format_rename_file(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("rename_file", result_data) old_path = result_data.get("old_path") or "旧路径未知" new_path = result_data.get("new_path") or "新路径未知" return f"已重命名: {old_path} -> {new_path}" def _format_create_folder(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("create_folder", result_data) return f"已创建文件夹: {result_data.get('path', '未知路径')}" def _format_focus_file(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("focus_file", result_data) message = result_data.get("message") or "文件已聚焦" size = result_data.get("file_size") size_note = f"({size} 字符)" if isinstance(size, int) else "" focused = result_data.get("focused_files") or [] focused_note = f"当前聚焦: {', '.join(focused)}" if focused else "当前没有其他聚焦文件" return f"{message}{size_note}\n{focused_note}" def _format_unfocus_file(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("unfocus_file", result_data) message = result_data.get("message") or "已取消聚焦" remaining = result_data.get("remaining_focused") or [] remain_note = f"剩余聚焦: {', '.join(remaining)}" if remaining else "当前没有聚焦文件" return f"{message}\n{remain_note}" def _format_terminal_session(result_data: Dict[str, Any]) -> str: action = result_data.get("action") or result_data.get("terminal_action") or "未知操作" tag = f"terminal_session[{action}]" if not result_data.get("success"): return _format_failure(tag, result_data) if action == "open": return ( f"终端 {result_data.get('session')} 已打开,工作目录 {result_data.get('working_dir')}," f"当前活动会话: {result_data.get('session')}(共 {result_data.get('total_sessions')} 个)" ) if action == "close": new_active = result_data.get("new_active") or "无" remaining = result_data.get("remaining_sessions") or [] return ( f"终端 {result_data.get('session')} 已关闭,新的活动会话: {new_active}。" f"剩余会话: {', '.join(remaining) if remaining else '无'}" ) if action == "switch": previous = result_data.get("previous") or "无" current = result_data.get("current") or "未知" return f"终端已从 {previous} 切换到 {current}。" if action == "list": sessions = result_data.get("sessions") or [] total = result_data.get("total", len(sessions)) max_allowed = result_data.get("max_allowed") active = result_data.get("active") or "无" header = f"共有 {total}/{max_allowed} 个终端会话,活动会话: {active}" session_lines = [] for session in sessions: name = session.get("session_name") or session.get("name") or "未命名" state = "运行中" if session.get("is_running") else "已停止" marker = "⭐" if session.get("is_active") else " " working_dir = session.get("working_dir") or "未知目录" session_lines.append(f"{marker} {name} | {state} | {working_dir}") return "\n".join([header] + session_lines) if session_lines else header return result_data.get("message") or f"{tag} 操作已完成。" def _plain_command_output(result_data: Dict[str, Any]) -> str: """生成纯文本输出,按需要加状态前缀。""" output = result_data.get("output") or "" status = (result_data.get("status") or "").lower() timeout = result_data.get("timeout") return_code = result_data.get("return_code") truncated = result_data.get("truncated") error = result_data.get("error") message = result_data.get("message") prefixes = [] if status in {"timeout"} and timeout: prefixes.append(f"[timeout after {int(timeout)}s]") elif status in {"timeout"}: prefixes.append("[timeout]") elif status in {"killed"}: prefixes.append("[killed]") elif status in {"awaiting_input"}: prefixes.append("[awaiting_input]") elif status in {"no_output"} and not output: prefixes.append("[no_output]") elif status in {"error"} and return_code is not None: prefixes.append(f"[error rc={return_code}]") elif status in {"error"}: prefixes.append("[error]") if truncated: prefixes.append("[truncated]") # 如果执行失败且没有输出,优先显示错误信息 if not result_data.get("success") and not output: err_text = error or message if err_text: prefix_text = "".join(prefixes) if prefixes else "[error]" return f"{prefix_text} {err_text}" if prefix_text else err_text # 没有错误文本则仍走后面的输出逻辑(可能显示 no_output) prefix_text = "".join(prefixes) if prefix_text and output: return f"{prefix_text}\n{output}" if prefix_text: return prefix_text if not output: return "[no_output]" return output def _format_terminal_input(result_data: Dict[str, Any]) -> str: return _plain_command_output(result_data) def _format_sleep(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("sleep", result_data) reason = result_data.get("reason") timestamp = result_data.get("timestamp") message = result_data.get("message") or "等待完成" parts = [message] if reason: parts.append(f"原因:{reason}") if timestamp: parts.append(f"时间:{timestamp}") return ";".join(parts) def _format_run_command(result_data: Dict[str, Any]) -> str: text = _plain_command_output(result_data) if (result_data.get("status") or "").lower() == "timeout": suggestion = "建议:在持久终端中直接运行该命令(terminal_session + terminal_input),或缩短命令执行时间。" text = f"{text}\n{suggestion}" if text else suggestion return text def _format_run_python(result_data: Dict[str, Any]) -> str: text = _plain_command_output(result_data) if (result_data.get("status") or "").lower() == "timeout": suggestion = "建议:将代码保存为脚本后,在持久终端中执行(terminal_session + terminal_input),或拆分/优化代码以缩短运行时间。" text = f"{text}\n{suggestion}" if text else suggestion return text def _format_todo_create(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("todo_create", result_data) todo = (result_data.get("todo_list") or {}).copy() overview = todo.get("overview") or "未命名任务" total = len(todo.get("tasks") or []) return f"已创建 TODO:{overview}(共 {total} 项)" def _format_todo_update_task(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("todo_update_task", result_data) message = result_data.get("message") or "任务状态已更新" todo = result_data.get("todo_list") or {} tasks = todo.get("tasks") or [] total = len(tasks) done = sum(1 for t in tasks if t.get("status") == "done") progress_note = f"进度 {done}/{total}" if total else "" return f"{message};{progress_note}".strip(";") def _format_todo_finish(result_data: Dict[str, Any]) -> str: if result_data.get("success"): todo = result_data.get("todo_list") or {} tasks = todo.get("tasks") or [] overview = todo.get("overview") or "未命名任务" total = len(tasks) done = sum(1 for t in tasks if t.get("status") == "done") status_note = "正常结束" if done == total else "已结束" lines = [ f"TODO 已结束({status_note}),进度 {done}/{total}", f"概述:{overview}", ] if tasks: lines.append(_summarize_todo_tasks(todo)) return "\n".join(lines) if result_data.get("requires_confirmation"): remaining = result_data.get("remaining") or [] remain_note = ", ".join(remaining) if remaining else "未知" return f"仍有未完成任务({remain_note}),需要确认是否提前结束。" return _format_failure("todo_finish", result_data) def _format_todo_finish_confirm(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("todo_finish_confirm", result_data) message = result_data.get("message") or "已处理 TODO 完结确认" todo = result_data.get("todo_list") or {} overview = todo.get("overview") or "未命名任务" tasks = todo.get("tasks") or [] total = len(tasks) done = sum(1 for t in tasks if t.get("status") == "done") forced = todo.get("forced_finish") reason = todo.get("forced_reason") status_note = "强制结束" if forced else "已结束" lines = [ f"{message}({status_note},进度 {done}/{total})", f"概述:{overview}", ] if forced and reason: lines.append(f"强制结束原因:{reason}") if tasks: lines.append(_summarize_todo_tasks(todo)) return "\n".join(lines) def _format_update_memory(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("update_memory", result_data) mem_type = result_data.get("memory_type") or "main" operation = result_data.get("operation") or "write" label = "主记忆" if mem_type == "main" else "任务记忆" idx = result_data.get("index") count = result_data.get("count") if operation == "append": suffix = f"(共 {count} 条)" if count is not None else "" return f"{label}已追加新条目{suffix}" if operation == "replace": return f"{label}第 {idx} 条已替换。" if operation == "delete": suffix = f"(剩余 {count} 条)" if count is not None else "" return f"{label}第 {idx} 条已删除{suffix}" return f"{label}已更新。" def _format_create_sub_agent(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("create_sub_agent", result_data) agent_id = result_data.get("agent_id") task_id = result_data.get("task_id") status = result_data.get("status") refs = result_data.get("copied_references") or [] ref_note = f",附带 {len(refs)} 份参考文件" if refs else "" deliver_dir = result_data.get("deliverables_dir") deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else "" return f"子智能体 #{agent_id} 已创建(task_id={task_id},状态 {status}{ref_note}{deliver_note})。" def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str: task_id = result_data.get("task_id") agent_id = result_data.get("agent_id") status = result_data.get("status") if result_data.get("success"): copied_path = result_data.get("copied_path") or result_data.get("deliverables_path") message = result_data.get("message") or "子智能体任务已完成。" deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成" return f"子智能体 #{agent_id}/{task_id} 完成:{message}({deliver_note})" message = result_data.get("message") or result_data.get("error") or "子智能体任务失败" return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}" def _format_close_sub_agent(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("close_sub_agent", result_data) message = result_data.get("message") or "子智能体已关闭。" task_id = result_data.get("task_id") status = result_data.get("status") status_note = f"(状态 {status})" if status else "" return f"{message}{status_note}(task_id={task_id})" def _format_failure(tag: str, result_data: Dict[str, Any]) -> str: error = result_data.get("error") or result_data.get("message") or "未知错误" suggestion = result_data.get("suggestion") details = result_data.get("details") parts = [f"⚠️ {tag} 失败: {error}"] if suggestion: parts.append(f"建议:{suggestion}") elif isinstance(details, str) and details: parts.append(f"详情:{details}") elif isinstance(details, dict): detail_msg = details.get("message") or details.get("error") if detail_msg: parts.append(f"详情:{detail_msg}") return ";".join(parts) def _summarize_output_block(output: Optional[str], truncated: Optional[bool]) -> str: if not output: return "无可见输出" lines = output.splitlines() line_count = len(lines) char_count = len(output) meta = f"输出 {line_count} 行 / {char_count} 字符" if truncated: meta += "(已截断)" return f"{meta}\n```\n{output}\n```" def _preview_text(text: str, limit: int) -> Tuple[str, bool]: """返回截断预览及是否截断标记。""" if text is None: return "", False if len(text) <= limit: return text, False return text[:limit], True def _format_terminal_snapshot(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("terminal_snapshot", result_data) session = result_data.get("session") or "default" requested = result_data.get("line_limit") requested_note = f"{requested} 行" if requested is not None else "全部" actual = result_data.get("lines_returned") actual_note = f"{actual} 行" if actual is not None else "未知行数" truncated = result_data.get("truncated") trunc_note = "已截断" if truncated else "未截断" output = result_data.get("output") or "" header = f"会话 {session}:请求 {requested_note},实际返回 {actual_note}({trunc_note})。" body = _summarize_output_block(output, truncated) return f"{header}\n{body}" def _format_extract_webpage(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("extract_webpage", result_data) url = result_data.get("url") or "目标网页" content = result_data.get("content") or "" length = len(content) preview, truncated = _preview_text(content, 800) header = f"提取完成:{url},长度 {length} 字符。" if not content: return f"{header} 内容为空。" note = "(截断预览)" if truncated else "(未截断)" return "\n".join([f"{header}{note}", "```", preview, "```"]) def _format_vlm_analyze(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("vlm_analyze", result_data) content = result_data.get("content") or "" length = len(content) preview, truncated = _preview_text(content, 800) note = "(截断预览)" if truncated else "(未截断)" header = f"VLM 解析完成,长度 {length} 字符{note}" if not content: return f"{header};未返回可识别文本。" return "\n".join([header, "```", preview, "```"]) # 兼容旧名 def _format_ocr_image(result_data: Dict[str, Any]) -> str: return _format_vlm_analyze(result_data) def _format_trigger_easter_egg(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("trigger_easter_egg", result_data) effect = (result_data.get("effect") or "").lower() duration = result_data.get("duration_seconds") or result_data.get("duration") if effect == "flood": return "大水即将淹没屏幕!预计持续 45 秒,不过这期间你和用户还可以继续对话。" if effect == "snake": dur_text = f"{duration} 秒" if duration else "约 200 秒" return f"发光贪吃蛇来访,约 {dur_text} 后或吃满 20 个苹果离场;动画不挡操作。" message = result_data.get("message") if message: return message return f"已触发彩蛋:{effect or '未知效果'}。" def _format_command_result(label: str, result_data: Dict[str, Any]) -> str: command = result_data.get("command") or "" return_code = result_data.get("return_code") success = result_data.get("success") status = result_data.get("status") output = result_data.get("output") truncated = result_data.get("truncated") message = result_data.get("message") if success: header = f"{label}: `{command}`" if command else label if return_code is not None and return_code != "": header += f" (return_code={return_code})" lines = [header] if status and status not in {"completed", "success"}: lines.append(f"终端状态: {status}") if message: lines.append(message) lines.append(_summarize_output_block(output, truncated)) return "\n".join(lines) error_msg = result_data.get("error") or message or "执行失败" header = f"⚠️ {label} 失败" if command: header += f"(命令 `{command}`)" lines = [f"{header}: {error_msg}"] if return_code not in {None, ""}: lines.append(f"返回码: {return_code}") if output: lines.append(_summarize_output_block(output, truncated)) return "\n".join(lines) def _summarize_todo_tasks(todo: Optional[Dict[str, Any]]) -> str: if not isinstance(todo, dict): return "" tasks = todo.get("tasks") or [] parts = [] for task in tasks: status_icon = "✅" if task.get("status") == "done" else "⬜️" parts.append(f"{status_icon} task{task.get('index')}: {task.get('title')}") return ";".join(parts) TOOL_FORMATTERS = { "create_file": _format_create_file, "delete_file": _format_delete_file, "rename_file": _format_rename_file, "create_folder": _format_create_folder, "focus_file": _format_focus_file, "unfocus_file": _format_unfocus_file, "terminal_snapshot": _format_terminal_snapshot, "terminal_session": _format_terminal_session, "terminal_input": _format_terminal_input, "sleep": _format_sleep, "run_command": _format_run_command, "run_python": _format_run_python, "extract_webpage": _format_extract_webpage, "vlm_analyze": _format_vlm_analyze, "ocr_image": _format_ocr_image, "trigger_easter_egg": _format_trigger_easter_egg, "todo_create": _format_todo_create, "todo_update_task": _format_todo_update_task, "todo_finish": _format_todo_finish, "todo_finish_confirm": _format_todo_finish_confirm, "update_memory": _format_update_memory, "create_sub_agent": _format_create_sub_agent, "wait_sub_agent": _format_wait_sub_agent, "close_sub_agent": _format_close_sub_agent, }