agent-Specialization/core/main_terminal_parts/tools.py

1745 lines
90 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
try:
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
PROJECT_MAX_STORAGE_MB,
CUSTOM_TOOLS_ENABLED,
)
except ImportError:
import sys
project_root = Path(__file__).resolve().parents[2]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
PROJECT_MAX_STORAGE_MB,
CUSTOM_TOOLS_ENABLED,
)
from modules.file_manager import FileManager
from modules.search_engine import SearchEngine
from modules.terminal_ops import TerminalOperator
from modules.memory_manager import MemoryManager
from modules.terminal_manager import TerminalManager
from modules.todo_manager import TodoManager
from modules.sub_agent_manager import SubAgentManager
from modules.webpage_extractor import extract_webpage_content, tavily_extract
from modules.ocr_client import OCRClient
from modules.easter_egg_manager import EasterEggManager
from modules.personalization_manager import (
load_personalization_config,
build_personalization_prompt,
)
from modules.skills_manager import (
get_skills_catalog,
build_skills_list,
merge_enabled_skills,
build_skills_prompt,
)
from modules.custom_tool_registry import CustomToolRegistry, build_default_tool_category
from modules.custom_tool_executor import CustomToolExecutor
try:
from config.limits import THINKING_FAST_INTERVAL
except ImportError:
THINKING_FAST_INTERVAL = 10
from modules.container_monitor import collect_stats, inspect_state
from core.tool_config import TOOL_CATEGORIES
from utils.api_client import DeepSeekClient
from utils.context_manager import ContextManager
from utils.tool_result_formatter import format_tool_result_for_context
from utils.logger import setup_logger
from config.model_profiles import (
get_model_profile,
get_model_prompt_replacements,
get_model_context_window,
)
logger = setup_logger(__name__)
DISABLE_LENGTH_CHECK = True
class MainTerminalToolsMixin:
def _clamp_int(value, default, min_value=None, max_value=None):
"""将输入转换为整数并限制范围。"""
if value is None:
return default
try:
num = int(value)
except (TypeError, ValueError):
return default
if min_value is not None:
num = max(min_value, num)
if max_value is not None:
num = min(max_value, num)
return num
def _parse_optional_line(value, field_name: str):
"""解析可选的行号参数。"""
if value is None:
return None, None
try:
number = int(value)
except (TypeError, ValueError):
return None, f"{field_name} 必须是整数"
if number < 1:
return None, f"{field_name} 必须大于等于1"
return number, None
def _truncate_text_block(text: str, max_chars: int):
"""对单段文本应用字符限制。"""
if max_chars and len(text) > max_chars:
return text[:max_chars], True, max_chars
return text, False, len(text)
def _limit_text_chunks(chunks: List[Dict], text_key: str, max_chars: int):
"""对多个文本片段应用全局字符限制。"""
if max_chars is None or max_chars <= 0:
return chunks, False, sum(len(chunk.get(text_key, "") or "") for chunk in chunks)
remaining = max_chars
limited_chunks: List[Dict] = []
truncated = False
consumed = 0
for chunk in chunks:
snippet = chunk.get(text_key, "") or ""
snippet_len = len(snippet)
chunk_copy = dict(chunk)
if remaining <= 0:
truncated = True
break
if snippet_len > remaining:
chunk_copy[text_key] = snippet[:remaining]
chunk_copy["truncated"] = True
consumed += remaining
limited_chunks.append(chunk_copy)
truncated = True
remaining = 0
break
limited_chunks.append(chunk_copy)
consumed += snippet_len
remaining -= snippet_len
return limited_chunks, truncated, consumed
def _record_sub_agent_message(self, message: Optional[str], task_id: Optional[str] = None, inline: bool = False):
"""以 system 消息记录子智能体状态。"""
if not message:
return
if task_id and task_id in self._announced_sub_agent_tasks:
return
if task_id:
self._announced_sub_agent_tasks.add(task_id)
logger.info(
"[SubAgent] record message | task=%s | inline=%s | content=%s",
task_id,
inline,
message.replace("\n", "\\n")[:200],
)
metadata = {"sub_agent_notice": True, "inline": inline}
if task_id:
metadata["task_id"] = task_id
self.context_manager.add_conversation("system", message, metadata=metadata)
print(f"{OUTPUT_FORMATS['info']} {message}")
def apply_personalization_preferences(self, config: Optional[Dict[str, Any]] = None):
"""Apply persisted personalization settings that affect runtime behavior."""
try:
effective_config = config or load_personalization_config(self.data_dir)
except Exception:
effective_config = {}
# 工具意图开关
self.tool_intent_enabled = bool(effective_config.get("tool_intent_enabled"))
interval = effective_config.get("thinking_interval")
if isinstance(interval, int) and interval > 0:
self.thinking_fast_interval = interval
else:
self.thinking_fast_interval = THINKING_FAST_INTERVAL
disabled_categories = []
raw_disabled = effective_config.get("disabled_tool_categories")
if isinstance(raw_disabled, list):
disabled_categories = [
key for key in raw_disabled
if isinstance(key, str) and key in self.tool_categories_map
]
self.default_disabled_tool_categories = disabled_categories
# 图片压缩模式传递给上下文
img_mode = effective_config.get("image_compression")
if isinstance(img_mode, str):
self.context_manager.image_compression_mode = img_mode
# Reset category states to defaults before applying overrides
for key, category in self.tool_categories_map.items():
self.tool_category_states[key] = False if key in disabled_categories else category.default_enabled
self._refresh_disabled_tools()
# 默认模型偏好(优先应用,再处理运行模式)
preferred_model = effective_config.get("default_model")
if isinstance(preferred_model, str) and preferred_model != self.model_key:
try:
self.set_model(preferred_model)
except Exception as exc:
logger.warning("忽略无效默认模型: %s (%s)", preferred_model, exc)
preferred_mode = effective_config.get("default_run_mode")
if isinstance(preferred_mode, str):
normalized_mode = preferred_mode.strip().lower()
if normalized_mode in {"fast", "thinking", "deep"} and normalized_mode != self.run_mode:
try:
self.set_run_mode(normalized_mode)
except ValueError:
logger.warning("忽略无效默认运行模式: %s", preferred_mode)
# 静默禁用工具提示
self.silent_tool_disable = bool(effective_config.get("silent_tool_disable"))
def _handle_read_tool(self, arguments: Dict) -> Dict:
"""集中处理 read_file 工具的三种模式。"""
file_path = arguments.get("path")
if not file_path:
return {"success": False, "error": "缺少文件路径参数"}
read_type = (arguments.get("type") or "read").lower()
if read_type not in {"read", "search", "extract"}:
return {"success": False, "error": f"未知的读取类型: {read_type}"}
max_chars = self._clamp_int(
arguments.get("max_chars"),
READ_TOOL_DEFAULT_MAX_CHARS,
1,
MAX_READ_FILE_CHARS
)
base_result = {
"success": True,
"type": read_type,
"path": None,
"encoding": "utf-8",
"max_chars": max_chars,
"truncated": False
}
if read_type == "read":
start_line, error = self._parse_optional_line(arguments.get("start_line"), "start_line")
if error:
return {"success": False, "error": error}
end_line_val = arguments.get("end_line")
end_line = None
if end_line_val is not None:
end_line, error = self._parse_optional_line(end_line_val, "end_line")
if error:
return {"success": False, "error": error}
if start_line and end_line < start_line:
return {"success": False, "error": "end_line 必须大于等于 start_line"}
read_result = self.file_manager.read_text_segment(
file_path,
start_line=start_line,
end_line=end_line,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not read_result.get("success"):
return read_result
content, truncated, char_count = self._truncate_text_block(read_result["content"], max_chars)
base_result.update({
"path": read_result["path"],
"content": content,
"line_start": read_result["line_start"],
"line_end": read_result["line_end"],
"total_lines": read_result["total_lines"],
"file_size": read_result["size"],
"char_count": char_count,
"message": f"已读取 {read_result['path']} 的内容(行 {read_result['line_start']}~{read_result['line_end']}"
})
base_result["truncated"] = truncated
self.context_manager.load_file(read_result["path"])
return base_result
if read_type == "search":
query = arguments.get("query")
if not query:
return {"success": False, "error": "搜索模式需要提供 query 参数"}
max_matches = self._clamp_int(
arguments.get("max_matches"),
READ_TOOL_DEFAULT_MAX_MATCHES,
1,
READ_TOOL_MAX_MATCHES
)
context_before = self._clamp_int(
arguments.get("context_before"),
READ_TOOL_DEFAULT_CONTEXT_BEFORE,
0,
READ_TOOL_MAX_CONTEXT_BEFORE
)
context_after = self._clamp_int(
arguments.get("context_after"),
READ_TOOL_DEFAULT_CONTEXT_AFTER,
0,
READ_TOOL_MAX_CONTEXT_AFTER
)
case_sensitive = bool(arguments.get("case_sensitive"))
search_result = self.file_manager.search_text(
file_path,
query=query,
max_matches=max_matches,
context_before=context_before,
context_after=context_after,
case_sensitive=case_sensitive,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not search_result.get("success"):
return search_result
matches = search_result["matches"]
limited_matches, truncated, char_count = self._limit_text_chunks(matches, "snippet", max_chars)
base_result.update({
"path": search_result["path"],
"file_size": search_result["size"],
"query": query,
"max_matches": max_matches,
"actual_matches": len(matches),
"returned_matches": len(limited_matches),
"context_before": context_before,
"context_after": context_after,
"case_sensitive": case_sensitive,
"matches": limited_matches,
"char_count": char_count,
"message": f"{search_result['path']} 中搜索 \"{query}\",返回 {len(limited_matches)} 条结果"
})
base_result["truncated"] = truncated
return base_result
# extract
segments = arguments.get("segments")
if not isinstance(segments, list) or not segments:
return {"success": False, "error": "extract 模式需要提供 segments 数组"}
extract_result = self.file_manager.extract_segments(
file_path,
segments=segments,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not extract_result.get("success"):
return extract_result
limited_segments, truncated, char_count = self._limit_text_chunks(
extract_result["segments"],
"content",
max_chars
)
base_result.update({
"path": extract_result["path"],
"segments": limited_segments,
"file_size": extract_result["size"],
"total_lines": extract_result["total_lines"],
"segment_count": len(limited_segments),
"char_count": char_count,
"message": f"已从 {extract_result['path']} 抽取 {len(limited_segments)} 个片段"
})
base_result["truncated"] = truncated
self.context_manager.load_file(extract_result["path"])
return base_result
def set_tool_category_enabled(self, category: str, enabled: bool) -> None:
"""设置工具类别的启用状态 / Toggle tool category enablement."""
categories = self.tool_categories_map
if category not in categories:
raise ValueError(f"未知的工具类别: {category}")
forced = self.admin_forced_category_states.get(category)
if isinstance(forced, bool) and forced != enabled:
raise ValueError("该类别被管理员强制为启用/禁用,无法修改")
self.tool_category_states[category] = bool(enabled)
self._refresh_disabled_tools()
def set_admin_policy(
self,
categories: Optional[Dict[str, "ToolCategory"]] = None,
forced_category_states: Optional[Dict[str, Optional[bool]]] = None,
disabled_models: Optional[List[str]] = None,
) -> None:
"""应用管理员策略(工具分类、强制开关、模型禁用)。"""
if categories:
self.tool_categories_map = dict(categories)
# 保证自定义工具分类存在(仅当功能启用)
if self.custom_tools_enabled and "custom" not in self.tool_categories_map:
self.tool_categories_map["custom"] = type(next(iter(TOOL_CATEGORIES.values())))(
label="自定义工具",
tools=[],
default_enabled=True,
silent_when_disabled=False,
)
# 重新构建启用状态映射,保留已有值
new_states: Dict[str, bool] = {}
for key, cat in self.tool_categories_map.items():
if key in self.tool_category_states:
new_states[key] = self.tool_category_states[key]
else:
new_states[key] = cat.default_enabled
self.tool_category_states = new_states
# 清理已被移除的类别
for removed in list(self.tool_category_states.keys()):
if removed not in self.tool_categories_map:
self.tool_category_states.pop(removed, None)
self.admin_forced_category_states = forced_category_states or {}
self.admin_disabled_models = disabled_models or []
self._refresh_disabled_tools()
def get_tool_settings_snapshot(self) -> List[Dict[str, object]]:
"""获取工具类别状态快照 / Return tool category states snapshot."""
snapshot: List[Dict[str, object]] = []
categories = self.tool_categories_map
for key, category in categories.items():
forced = self.admin_forced_category_states.get(key)
enabled = self.tool_category_states.get(key, category.default_enabled)
if isinstance(forced, bool):
enabled = forced
snapshot.append({
"id": key,
"label": category.label,
"enabled": enabled,
"tools": list(category.tools),
"locked": isinstance(forced, bool),
"locked_state": forced if isinstance(forced, bool) else None,
})
return snapshot
def _refresh_disabled_tools(self) -> None:
"""刷新禁用工具列表 / Refresh disabled tool set."""
disabled: Set[str] = set()
notice: Set[str] = set()
categories = self.tool_categories_map
for key, category in categories.items():
state = self.tool_category_states.get(key, category.default_enabled)
forced = self.admin_forced_category_states.get(key)
if isinstance(forced, bool):
state = forced
if not state:
disabled.update(category.tools)
if not getattr(category, "silent_when_disabled", False):
notice.update(category.tools)
self.disabled_tools = disabled
self.disabled_notice_tools = notice
def _format_disabled_tool_notice(self) -> Optional[str]:
"""生成禁用工具提示信息 / Format disabled tool notice."""
if getattr(self, "silent_tool_disable", False):
return None
if not self.disabled_notice_tools:
return None
lines = ["=== 工具可用性提醒 ==="]
for tool_name in sorted(self.disabled_notice_tools):
lines.append(f"{tool_name}:已被用户禁用")
lines.append("=== 提示结束 ===")
return "\n".join(lines)
def _inject_intent(self, properties: Dict[str, Any]) -> Dict[str, Any]:
"""在工具参数中注入 intent简短意图说明仅当开关启用时。
字段含义要求模型用不超过15个中文字符对即将执行的动作做简要说明供前端展示。
"""
if not self.tool_intent_enabled:
return properties
if not isinstance(properties, dict):
return properties
intent_field = {
"intent": {
"type": "string",
"description": "用不超过15个字向用户说明你要做什么例如等待下载完成/创建日志文件"
}
}
# 将 intent 放在最前面以提高模型关注度
return {**intent_field, **properties}
def _apply_intent_to_tools(self, tools: List[Dict]) -> List[Dict]:
"""遍历工具列表,为缺少 intent 的工具补充字段(开关启用时生效)。"""
if not self.tool_intent_enabled:
return tools
intent_field = {
"intent": {
"type": "string",
"description": "用不超过15个字向用户说明你要做什么例如等待下载完成/创建日志文件/搜索最新新闻"
}
}
for tool in tools:
func = tool.get("function") or {}
params = func.get("parameters") or {}
if not isinstance(params, dict):
continue
if params.get("type") != "object":
continue
props = params.get("properties")
if not isinstance(props, dict):
continue
# 补充 intent 属性
if "intent" not in props:
params["properties"] = {**intent_field, **props}
# 将 intent 加入必填
required_list = params.get("required")
if isinstance(required_list, list):
if "intent" not in required_list:
required_list.insert(0, "intent")
params["required"] = required_list
else:
params["required"] = ["intent"]
return tools
def _build_custom_tools(self) -> List[Dict]:
if not (self.custom_tools_enabled and getattr(self, "user_role", "user") == "admin"):
return []
try:
definitions = self.custom_tool_registry.reload()
except Exception:
definitions = self.custom_tool_registry.list_tools()
if not definitions:
# 更新分类为空列表,避免旧缓存
if "custom" in self.tool_categories_map:
self.tool_categories_map["custom"].tools = []
return []
tools: List[Dict] = []
tool_ids: List[str] = []
for item in definitions:
tool_id = item.get("id")
if not tool_id:
continue
if item.get("invalid_id"):
# 跳过不合法的工具 ID避免供应商严格校验时报错
continue
tool_ids.append(tool_id)
params = item.get("parameters") or {"type": "object", "properties": {}}
if isinstance(params, dict) and params.get("type") != "object":
params = {"type": "object", "properties": {}}
required = item.get("required")
if isinstance(required, list):
params = dict(params)
params["required"] = required
tools.append({
"type": "function",
"function": {
"name": tool_id,
"description": item.get("description") or f"自定义工具: {tool_id}",
"parameters": params
}
})
# 覆盖 custom 分类的工具列表
if "custom" in self.tool_categories_map:
self.tool_categories_map["custom"].tools = tool_ids
return tools
def define_tools(self) -> List[Dict]:
"""定义可用工具(添加确认工具)"""
current_time = datetime.now().strftime("%Y-%m-%d %H")
tools = [
{
"type": "function",
"function": {
"name": "sleep",
"description": "等待指定的秒数,用于短暂延迟/节奏控制(例如让终端产生更多输出、或在两次快照之间留出间隔)。命令是否完成必须用 terminal_snapshot 确认;需要强制超时终止请使用 run_command。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"seconds": {
"type": "number",
"description": "等待的秒数可以是小数如0.2秒。建议范围0.1-10秒"
},
"reason": {
"type": "string",
"description": "等待的原因说明(可选)"
}
}),
"required": ["seconds"]
}
}
},
{
"type": "function",
"function": {
"name": "create_file",
"description": "创建新文件(仅创建空文件,正文请使用 write_file 或 edit_file 写入/替换)",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {"type": "string", "description": "文件路径"},
"file_type": {"type": "string", "enum": ["txt", "py", "md"], "description": "文件类型"},
"annotation": {"type": "string", "description": "文件备注"}
}),
"required": ["path", "file_type", "annotation"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "将内容写入本地文件系统append 为 False 时覆盖原文件True 时追加到末尾。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"file_path": {
"type": "string",
"description": "要写入的相对路径"
},
"content": {
"type": "string",
"description": "要写入文件的内容"
},
"append": {
"type": "boolean",
"description": "是否追加到文件而不是覆盖它",
"default": False
}
}),
"required": ["file_path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "读取/搜索/抽取 UTF-8 文本文件内容。通过 type 参数选择 read阅读、search搜索、extract具体行段支持限制返回字符数。若文件非 UTF-8 或过大,请改用 run_python。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {"type": "string", "description": "文件路径"},
"type": {
"type": "string",
"enum": ["read", "search", "extract"],
"description": "读取模式read=阅读、search=搜索、extract=按行抽取"
},
"max_chars": {
"type": "integer",
"description": "返回内容的最大字符数,默认与 config 一致"
},
"start_line": {
"type": "integer",
"description": "[read] 可选的起始行号1开始"
},
"end_line": {
"type": "integer",
"description": "[read] 可选的结束行号(>=start_line"
},
"query": {
"type": "string",
"description": "[search] 搜索关键词"
},
"max_matches": {
"type": "integer",
"description": "[search] 最多返回多少条命中默认5最大50"
},
"context_before": {
"type": "integer",
"description": "[search] 命中行向上追加的行数默认1最大3"
},
"context_after": {
"type": "integer",
"description": "[search] 命中行向下追加的行数默认1最大5"
},
"case_sensitive": {
"type": "boolean",
"description": "[search] 是否区分大小写,默认 false"
},
"segments": {
"type": "array",
"description": "[extract] 需要抽取的行区间",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "该片段的标签(可选)"
},
"start_line": {
"type": "integer",
"description": "起始行号(>=1"
},
"end_line": {
"type": "integer",
"description": "结束行号(>=start_line"
}
},
"required": ["start_line", "end_line"]
},
"minItems": 1
}
}),
"required": ["path", "type"]
}
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": "在文件中执行精确的字符串替换;建议先使用 read_file 获取最新内容以确保精确匹配。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"file_path": {
"type": "string",
"description": "要修改文件的相对路径"
},
"old_string": {
"type": "string",
"description": "要替换的文本(需与文件内容精确匹配,保留缩进)"
},
"new_string": {
"type": "string",
"description": "用于替换的新文本(必须不同于 old_string"
}
}),
"required": ["file_path", "old_string", "new_string"]
}
}
},
{
"type": "function",
"function": {
"name": "vlm_analyze",
"description": "使用大参数视觉语言模型Qwen3.5)理解图片:文字、物体、布局、表格等,仅支持本地路径。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {"type": "string", "description": "项目内的图片相对路径"},
"prompt": {"type": "string", "description": "传递给 VLM 的中文提示词,如“请总结这张图的内容”“表格的总金额是多少”“图中是什么车?”。"}
}),
"required": ["path", "prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_file",
"description": "删除文件",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {"type": "string", "description": "文件路径"}
}),
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "rename_file",
"description": "重命名文件",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"old_path": {"type": "string", "description": "原文件路径"},
"new_path": {"type": "string", "description": "新文件路径"}
}),
"required": ["old_path", "new_path"]
}
}
},
{
"type": "function",
"function": {
"name": "create_folder",
"description": "创建文件夹",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {"type": "string", "description": "文件夹路径"}
}),
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_session",
"description": "管理持久化终端会话,可打开、关闭、列出或切换终端。请在授权工作区内执行命令,禁止启动需要完整 TTY 的程序python REPL、vim、top 等)。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"action": {
"type": "string",
"enum": ["open", "close", "list", "reset"],
"description": "操作类型open-打开新终端close-关闭终端list-列出所有终端reset-重置终端"
},
"session_name": {
"type": "string",
"description": "终端会话名称open、close、reset时需要"
},
"working_dir": {
"type": "string",
"description": "工作目录相对于项目路径open时可选"
}
}),
"required": ["action"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_input",
"description": "向指定终端发送命令或输入。禁止启动会占用终端界面的程序python/node/nano/vim 等);如遇卡死请结合 terminal_snapshot 并使用 terminal_session 的 reset 恢复。timeout 必填传入数字最大300表示本次等待输出的时长不会封装命令、不会强杀进程在等待窗口内若检测到命令已完成会提前返回否则在超时后返回已产生的输出并保持命令继续运行。需要强制超时终止请使用 run_command。\n若不确定上一条命令是否结束,先用 terminal_snapshot 确认后再继续输入。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"command": {
"type": "string",
"description": "要执行的命令或发送的输入"
},
"session_name": {
"type": "string",
"description": "目标终端会话名称(必填)"
},
"timeout": {
"type": "number",
"description": "等待输出的最长秒数必填最大300不会封装命令、不会中断进程"
}
}),
"required": ["command", "timeout", "session_name"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_snapshot",
"description": "获取指定终端最近的输出快照用于判断当前状态。默认返回末尾的50行可通过参数调整。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"session_name": {
"type": "string",
"description": "目标终端会话名称(可选,默认活动终端)"
},
"lines": {
"type": "integer",
"description": "返回的最大行数(可选)"
},
"max_chars": {
"type": "integer",
"description": "返回的最大字符数(可选)"
}
})
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": f"当现有资料不足时搜索外部信息(当前时间 {current_time})。调用前说明目的,精准撰写 query并合理设置时间/主题参数;避免重复或无意义的搜索。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"query": {
"type": "string",
"description": "搜索查询内容(不要包含日期或时间范围)"
},
"max_results": {
"type": "integer",
"description": "最大结果数,可选"
},
"topic": {
"type": "string",
"description": "搜索主题可选值general默认/news/finance"
},
"time_range": {
"type": "string",
"description": "相对时间范围,可选 day/week/month/year支持缩写 d/w/m/y与 days 和 start_date/end_date 互斥"
},
"days": {
"type": "integer",
"description": "最近 N 天,仅当 topic=news 时可用;与 time_range、start_date/end_date 互斥"
},
"start_date": {
"type": "string",
"description": "开始日期YYYY-MM-DD必须与 end_date 同时提供,与 time_range、days 互斥"
},
"end_date": {
"type": "string",
"description": "结束日期YYYY-MM-DD必须与 start_date 同时提供,与 time_range、days 互斥"
},
"country": {
"type": "string",
"description": "国家过滤,仅 topic=general 可用,使用英文小写国名"
},
"include_domains": {
"type": "array",
"description": "仅包含这些域名可选最多300个",
"items": {
"type": "string"
}
}
}),
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "extract_webpage",
"description": "在 web_search 结果不够详细时提取网页正文。调用前说明用途,注意提取内容会消耗大量 token超过80000字符将被拒绝。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"url": {"type": "string", "description": "要提取内容的网页URL"}
}),
"required": ["url"]
}
}
},
{
"type": "function",
"function": {
"name": "save_webpage",
"description": "提取网页内容并保存为纯文本文件,适合需要长期留存的长文档。请提供网址与目标路径(含 .txt 后缀),落地后请通过终端命令查看。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"url": {"type": "string", "description": "要保存的网页URL"},
"target_path": {"type": "string", "description": "保存位置,包含文件名,相对于项目根目录"}
}),
"required": ["url", "target_path"]
}
}
},
{
"type": "function",
"function": {
"name": "run_python",
"description": "执行一次性 Python 脚本,可用于处理二进制或非 UTF-8 文件(如 Excel、Word、PDF、图片或进行数据分析与验证。必须提供 timeout最长60秒一旦超时脚本会被打断且无法继续执行需要重新运行并返回已捕获输出。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"code": {"type": "string", "description": "Python代码"},
"timeout": {
"type": "number",
"description": "超时时长必填最大60"
}
}),
"required": ["code", "timeout"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "执行一次性终端命令适合查看文件信息file/ls/stat/iconv 等)、转换编码或调用 CLI 工具。禁止启动交互式程序;对已聚焦文件仅允许使用 grep -n 等定位命令。必须提供 timeout最长30秒一旦超时命令**一定会被打断**且无法继续执行需要重新运行并返回已捕获输出输出超过10000字符将被截断或拒绝。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"command": {"type": "string", "description": "终端命令"},
"timeout": {
"type": "number",
"description": "超时时长必填最大30"
}
}),
"required": ["command", "timeout"]
}
}
},
{
"type": "function",
"function": {
"name": "update_memory",
"description": "按条目更新记忆列表自动编号。append 追加新条目replace 用序号替换delete 用序号删除。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"memory_type": {"type": "string", "enum": ["main", "task"], "description": "记忆类型"},
"content": {"type": "string", "description": "条目内容。append/replace 时必填"},
"operation": {"type": "string", "enum": ["append", "replace", "delete"], "description": "操作类型"},
"index": {"type": "integer", "description": "要替换/删除的序号从1开始"}
}),
"required": ["memory_type", "operation"]
}
}
},
{
"type": "function",
"function": {
"name": "todo_create",
"description": "创建待办列表,最多 8 条任务;若已有列表将被覆盖。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"overview": {"type": "string", "description": "一句话概述待办清单要完成的目标50 字以内。"},
"tasks": {
"type": "array",
"description": "任务列表1~8 条,每条写清“动词+对象+目标”。",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "单个任务描述,写成可执行的步骤"}
},
"required": ["title"]
},
"minItems": 1,
"maxItems": 8
}
}),
"required": ["overview", "tasks"]
}
}
},
{
"type": "function",
"function": {
"name": "todo_update_task",
"description": "批量勾选或取消任务(支持单个或多个任务);全部勾选时提示所有任务已完成。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"task_index": {"type": "integer", "description": "任务序号1-8兼容旧参数"},
"task_indices": {
"type": "array",
"items": {"type": "integer"},
"minItems": 1,
"maxItems": 8,
"description": "要更新的任务序号列表1-8可一次勾选多个"
},
"completed": {"type": "boolean", "description": "true=打勾false=取消"}
}),
"required": ["completed"]
}
}
},
{
"type": "function",
"function": {
"name": "close_sub_agent",
"description": "强制关闭指定子智能体,适用于长时间无响应、超时或卡死的任务。使用前请确认必要的日志/文件已保留,操作会立即终止该任务。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"task_id": {"type": "string", "description": "子智能体任务ID"},
"agent_id": {"type": "integer", "description": "子智能体编号1~5若缺少 task_id 可用"}
})
}
}
},
{
"type": "function",
"function": {
"name": "create_sub_agent",
"description": "创建新的子智能体任务。适合大规模信息搜集、网页提取与多文档总结等会占用大量上下文的工作需要提供任务摘要、详细要求、交付目录以及参考文件。注意同一时间最多运行5个子智能体。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"agent_id": {"type": "integer", "description": "子智能体代号1~5"},
"summary": {"type": "string", "description": "任务摘要,简要说明目标"},
"task": {"type": "string", "description": "任务详细要求"},
"target_dir": {"type": "string", "description": "项目下用于接收交付的相对目录"},
"reference_files": {
"type": "array",
"description": "提供给子智能体的参考文件列表相对路径禁止在summary和task中直接告知子智能体引用图片的路径必须使用本参数提供",
"items": {"type": "string"},
"maxItems": 10
},
"timeout_seconds": {"type": "integer", "description": "子智能体最大运行秒数:单/双次搜索建议180秒多轮搜索整理建议300秒深度调研或长篇分析可设600秒"}
}),
"required": ["agent_id", "summary", "task", "target_dir"]
}
}
},
{
"type": "function",
"function": {
"name": "wait_sub_agent",
"description": "等待指定子智能体任务结束(或超时)。任务完成后会返回交付目录,并将结果复制到指定的项目文件夹。调用时 `timeout_seconds` 应不少于对应子智能体的 `timeout_seconds`,否则可能提前终止等待。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"task_id": {"type": "string", "description": "子智能体任务ID"},
"agent_id": {"type": "integer", "description": "子智能体代号(可选,用于缺省 task_id 的情况)"},
"timeout_seconds": {"type": "integer", "description": "本次等待的超时时长(秒)"}
}),
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "trigger_easter_egg",
"description": "触发隐藏彩蛋,用于展示非功能性特效。需指定 effect 参数,例如 flood灌水或 snake贪吃蛇",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"effect": {
"type": "string",
"description": "彩蛋标识,目前支持 flood灌水与 snake贪吃蛇"
}
}),
"required": ["effect"]
}
}
}
]
# 视觉模型Qwen3.5 / Kimi-k2.5)自带多模态能力,不再暴露 vlm_analyze改为 view_image / view_video
if getattr(self, "model_key", None) in {"qwen3-vl-plus", "kimi-k2.5"}:
tools = [
tool for tool in tools
if (tool.get("function") or {}).get("name") != "vlm_analyze"
]
tools.append({
"type": "function",
"function": {
"name": "view_image",
"description": "将指定本地图片附加到工具结果中tool 消息携带 image_url便于模型主动查看图片内容。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {
"type": "string",
"description": "项目内的图片相对路径(不要以 /workspace 开头);宿主机模式可用绝对路径。支持 png/jpg/webp/gif/bmp/svg。"
}
}),
"required": ["path"]
}
}
})
tools.append({
"type": "function",
"function": {
"name": "view_video",
"description": "将指定本地视频附加到工具结果中tool 消息携带 video_url便于模型查看视频内容。",
"parameters": {
"type": "object",
"properties": self._inject_intent({
"path": {
"type": "string",
"description": "项目内的视频相对路径(不要以 /workspace 开头);宿主机模式可用绝对路径。支持 mp4/mov/mkv/avi/webm。"
}
}),
"required": ["path"]
}
}
})
# 附加自定义工具(仅管理员可见)
custom_tools = self._build_custom_tools()
if custom_tools:
tools.extend(custom_tools)
if self.disabled_tools:
tools = [
tool for tool in tools
if tool.get("function", {}).get("name") not in self.disabled_tools
]
return self._apply_intent_to_tools(tools)
async def handle_tool_call(self, tool_name: str, arguments: Dict) -> str:
"""处理工具调用(添加参数预检查和改进错误处理)"""
# 导入字符限制配置
from config import (
MAX_READ_FILE_CHARS,
MAX_RUN_COMMAND_CHARS, MAX_EXTRACT_WEBPAGE_CHARS
)
# 检查是否需要确认
if tool_name in NEED_CONFIRMATION:
if not await self.confirm_action(tool_name, arguments):
return json.dumps({"success": False, "error": "用户取消操作"})
# === 新增:预检查参数大小和格式 ===
try:
# 检查参数总大小
arguments_str = json.dumps(arguments, ensure_ascii=False)
if len(arguments_str) > 200000: # 200KB限制
return json.dumps({
"success": False,
"error": f"参数过大({len(arguments_str)}字符)超过200KB限制",
"suggestion": "请分块处理或减少参数内容"
}, ensure_ascii=False)
# 针对特定工具的内容检查
if tool_name == "write_file":
content = arguments.get("content", "")
length_limit = 200000
if not DISABLE_LENGTH_CHECK and len(content) > length_limit:
return json.dumps({
"success": False,
"error": f"文件内容过长({len(content)}字符),超过{length_limit}字符限制",
"suggestion": "请分块写入,或设置 append=true 多次写入"
}, ensure_ascii=False)
if '\\' in content and content.count('\\') > len(content) / 10:
print(f"{OUTPUT_FORMATS['warning']} 检测到大量转义字符,可能存在格式问题")
except Exception as e:
return json.dumps({
"success": False,
"error": f"参数预检查失败: {str(e)}"
}, ensure_ascii=False)
# 自定义工具预解析(仅管理员)
custom_tool = None
if self.custom_tools_enabled and getattr(self, "user_role", "user") == "admin":
try:
self.custom_tool_registry.reload()
except Exception:
pass
custom_tool = self.custom_tool_registry.get_tool(tool_name)
try:
if custom_tool:
result = await self.custom_tool_executor.run(tool_name, arguments)
elif tool_name == "read_file":
result = self._handle_read_tool(arguments)
elif tool_name in {"vlm_analyze", "ocr_image"}:
path = arguments.get("path")
prompt = arguments.get("prompt")
if not path:
return json.dumps({"success": False, "error": "缺少 path 参数", "warnings": []}, ensure_ascii=False)
result = self.ocr_client.vlm_analyze(path=path, prompt=prompt or "")
elif tool_name == "view_image":
path = (arguments.get("path") or "").strip()
if not path:
return json.dumps({"success": False, "error": "path 不能为空"}, ensure_ascii=False)
host_unrestricted = self._is_host_mode()
if path.startswith("/workspace"):
if host_unrestricted:
path = path.split("/workspace", 1)[1].lstrip("/")
else:
return json.dumps({"success": False, "error": "非法路径,超出项目根目录,请使用不带/workspace的相对路径"}, ensure_ascii=False)
if host_unrestricted and (Path(path).is_absolute() or (len(path) > 1 and path[1] == ":")):
abs_path = Path(path).expanduser().resolve()
else:
abs_path = (Path(self.context_manager.project_path) / path).resolve()
if not host_unrestricted:
try:
abs_path.relative_to(Path(self.context_manager.project_path).resolve())
except Exception:
return json.dumps({"success": False, "error": "非法路径,超出项目根目录,请使用不带/workspace的相对路径"}, ensure_ascii=False)
if not abs_path.exists() or not abs_path.is_file():
return json.dumps({"success": False, "error": f"图片不存在: {path}"}, ensure_ascii=False)
if abs_path.stat().st_size > 10 * 1024 * 1024:
return json.dumps({"success": False, "error": "图片过大,需 <= 10MB"}, ensure_ascii=False)
allowed_ext = {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".svg"}
if abs_path.suffix.lower() not in allowed_ext:
return json.dumps({"success": False, "error": f"不支持的图片格式: {abs_path.suffix}"}, ensure_ascii=False)
# 记录待附加图片,供上层将图片附加到工具结果
self.pending_image_view = {
"path": str(path)
}
result = {"success": True, "message": "图片已附加到工具结果中,将随 tool 返回。", "path": path}
elif tool_name == "view_video":
path = (arguments.get("path") or "").strip()
if not path:
return json.dumps({"success": False, "error": "path 不能为空"}, ensure_ascii=False)
host_unrestricted = self._is_host_mode()
if path.startswith("/workspace"):
if host_unrestricted:
path = path.split("/workspace", 1)[1].lstrip("/")
else:
return json.dumps({"success": False, "error": "非法路径,超出项目根目录,请使用相对路径"}, ensure_ascii=False)
if host_unrestricted and (Path(path).is_absolute() or (len(path) > 1 and path[1] == ":")):
abs_path = Path(path).expanduser().resolve()
else:
abs_path = (Path(self.context_manager.project_path) / path).resolve()
if not host_unrestricted:
try:
abs_path.relative_to(Path(self.context_manager.project_path).resolve())
except Exception:
return json.dumps({"success": False, "error": "非法路径,超出项目根目录,请使用相对路径"}, ensure_ascii=False)
if not abs_path.exists() or not abs_path.is_file():
return json.dumps({"success": False, "error": f"视频不存在: {path}"}, ensure_ascii=False)
allowed_ext = {".mp4", ".mov", ".mkv", ".avi", ".webm"}
if abs_path.suffix.lower() not in allowed_ext:
return json.dumps({"success": False, "error": f"不支持的视频格式: {abs_path.suffix}"}, ensure_ascii=False)
if abs_path.stat().st_size > 50 * 1024 * 1024:
return json.dumps({"success": False, "error": "视频过大,需 <= 50MB"}, ensure_ascii=False)
self.pending_video_view = {"path": str(path)}
result = {
"success": True,
"message": "视频已附加到工具结果中,将随 tool 返回。",
"path": path
}
# 终端会话管理工具
elif tool_name == "terminal_session":
action = arguments["action"]
if action == "open":
result = self.terminal_manager.open_terminal(
session_name=arguments.get("session_name", "default"),
working_dir=arguments.get("working_dir"),
make_active=True
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已打开: {arguments.get('session_name', 'default')}")
elif action == "close":
result = self.terminal_manager.close_terminal(
session_name=arguments.get("session_name", "default")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已关闭: {arguments.get('session_name', 'default')}")
elif action == "list":
result = self.terminal_manager.list_terminals()
elif action == "reset":
result = self.terminal_manager.reset_terminal(
session_name=arguments.get("session_name")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已重置: {result['session']}")
else:
result = {"success": False, "error": f"未知操作: {action}"}
result["action"] = action
# 终端输入工具
elif tool_name == "terminal_input":
result = self.terminal_manager.send_to_terminal(
command=arguments["command"],
session_name=arguments.get("session_name"),
timeout=arguments.get("timeout")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {arguments['command']}")
elif tool_name == "terminal_snapshot":
result = self.terminal_manager.get_terminal_snapshot(
session_name=arguments.get("session_name"),
lines=arguments.get("lines"),
max_chars=arguments.get("max_chars")
)
# sleep工具
elif tool_name == "sleep":
seconds = arguments.get("seconds", 1)
reason = arguments.get("reason", "等待操作完成")
# 限制最大等待时间
max_sleep = 600 # 最多等待60秒
if seconds > max_sleep:
result = {
"success": False,
"error": f"等待时间过长,最多允许 {max_sleep}",
"suggestion": f"建议分多次等待或减少等待时间"
}
else:
# 确保秒数为正数
if seconds <= 0:
result = {
"success": False,
"error": "等待时间必须大于0"
}
else:
print(f"{OUTPUT_FORMATS['info']} 等待 {seconds} 秒: {reason}")
# 执行等待
import asyncio
await asyncio.sleep(seconds)
result = {
"success": True,
"message": f"已等待 {seconds}",
"reason": reason,
"timestamp": datetime.now().isoformat()
}
print(f"{OUTPUT_FORMATS['success']} 等待完成")
elif tool_name == "create_file":
result = self.file_manager.create_file(
path=arguments["path"],
file_type=arguments["file_type"]
)
# 添加备注
if result["success"] and arguments.get("annotation"):
self.context_manager.update_annotation(
result["path"],
arguments["annotation"]
)
if result.get("success"):
result["message"] = (
f"已创建空文件: {result['path']}。请使用 write_file 写入内容,或使用 edit_file 进行替换。"
)
elif tool_name == "delete_file":
result = self.file_manager.delete_file(arguments["path"])
# 如果删除成功,同时删除备注
if result.get("success") and result.get("action") == "deleted":
deleted_path = result.get("path")
# 删除备注
if deleted_path in self.context_manager.file_annotations:
del self.context_manager.file_annotations[deleted_path]
self.context_manager.save_annotations()
print(f"🧹 已删除文件备注: {deleted_path}")
elif tool_name == "rename_file":
result = self.file_manager.rename_file(
arguments["old_path"],
arguments["new_path"]
)
# 如果重命名成功更新备注和聚焦的key
# 如果重命名成功,更新备注
if result.get("success") and result.get("action") == "renamed":
old_path = result.get("old_path")
new_path = result.get("new_path")
# 更新备注
if old_path in self.context_manager.file_annotations:
annotation = self.context_manager.file_annotations[old_path]
del self.context_manager.file_annotations[old_path]
self.context_manager.file_annotations[new_path] = annotation
self.context_manager.save_annotations()
print(f"📝 已更新文件备注: {old_path} -> {new_path}")
elif tool_name == "write_file":
path = arguments.get("file_path")
content = arguments.get("content", "")
append_flag = bool(arguments.get("append", False))
if not path:
result = {"success": False, "error": "缺少必要参数: file_path"}
else:
mode = "a" if append_flag else "w"
result = self.file_manager.write_file(path, content, mode=mode)
elif tool_name == "edit_file":
path = arguments.get("file_path")
old_text = arguments.get("old_string")
new_text = arguments.get("new_string")
if not path:
result = {"success": False, "error": "缺少必要参数: file_path"}
elif old_text is None or new_text is None:
result = {"success": False, "error": "缺少必要参数: old_string/new_string"}
elif old_text == new_text:
result = {"success": False, "error": "old_string 与 new_string 相同,无法执行替换"}
elif not old_text:
result = {"success": False, "error": "old_string 不能为空,请从 read_file 内容中精确复制"}
else:
result = self.file_manager.replace_in_file(path, old_text, new_text)
elif tool_name == "create_folder":
result = self.file_manager.create_folder(arguments["path"])
elif tool_name == "web_search":
allowed, quota_info = self.record_search_call()
if not allowed:
return json.dumps({
"success": False,
"error": f"搜索配额已用尽,将在 {quota_info.get('reset_at')} 重置。请向用户说明情况并提供替代方案。",
"quota": quota_info
}, ensure_ascii=False)
search_response = await self.search_engine.search_with_summary(
query=arguments["query"],
max_results=arguments.get("max_results"),
topic=arguments.get("topic"),
time_range=arguments.get("time_range"),
days=arguments.get("days"),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date"),
country=arguments.get("country"),
include_domains=arguments.get("include_domains")
)
if search_response["success"]:
result = {
"success": True,
"summary": search_response["summary"],
"filters": search_response.get("filters", {}),
"query": search_response.get("query"),
"results": search_response.get("results", []),
"total_results": search_response.get("total_results", 0)
}
else:
result = {
"success": False,
"error": search_response.get("error", "搜索失败"),
"filters": search_response.get("filters", {}),
"query": search_response.get("query"),
"results": search_response.get("results", []),
"total_results": search_response.get("total_results", 0)
}
elif tool_name == "extract_webpage":
url = arguments["url"]
try:
# 从config获取API密钥
from config import TAVILY_API_KEY
full_content, _ = await extract_webpage_content(
urls=url,
api_key=TAVILY_API_KEY,
extract_depth="basic",
max_urls=1
)
# 字符数检查
char_count = len(full_content)
if char_count > MAX_EXTRACT_WEBPAGE_CHARS:
result = {
"success": False,
"error": f"网页提取返回了过长的{char_count}字符请不要提取这个网页可以使用网页保存功能然后使用read工具查找或查看网页",
"char_count": char_count,
"limit": MAX_EXTRACT_WEBPAGE_CHARS,
"url": url
}
else:
result = {
"success": True,
"url": url,
"content": full_content
}
except Exception as e:
result = {
"success": False,
"error": f"网页提取失败: {str(e)}",
"url": url
}
elif tool_name == "save_webpage":
url = arguments["url"]
target_path = arguments["target_path"]
try:
from config import TAVILY_API_KEY
except ImportError:
TAVILY_API_KEY = None
if not TAVILY_API_KEY or TAVILY_API_KEY == "your-tavily-api-key":
result = {
"success": False,
"error": "Tavily API密钥未配置无法保存网页",
"url": url,
"path": target_path
}
else:
try:
extract_result = await tavily_extract(
urls=url,
api_key=TAVILY_API_KEY,
extract_depth="basic",
max_urls=1
)
if not extract_result or "error" in extract_result:
error_message = extract_result.get("error", "提取失败,未返回任何内容") if isinstance(extract_result, dict) else "提取失败"
result = {
"success": False,
"error": error_message,
"url": url,
"path": target_path
}
else:
results_list = extract_result.get("results", []) if isinstance(extract_result, dict) else []
primary_result = None
for item in results_list:
if item.get("raw_content"):
primary_result = item
break
if primary_result is None and results_list:
primary_result = results_list[0]
if not primary_result:
failed_list = extract_result.get("failed_results", []) if isinstance(extract_result, dict) else []
result = {
"success": False,
"error": "提取成功结果为空,无法保存",
"url": url,
"path": target_path,
"failed": failed_list
}
else:
content_to_save = primary_result.get("raw_content") or primary_result.get("content") or ""
if not content_to_save:
result = {
"success": False,
"error": "网页内容为空,未写入文件",
"url": url,
"path": target_path
}
else:
write_result = self.file_manager.write_file(target_path, content_to_save, mode="w")
if not write_result.get("success"):
result = {
"success": False,
"error": write_result.get("error", "写入文件失败"),
"url": url,
"path": target_path
}
else:
char_count = len(content_to_save)
byte_size = len(content_to_save.encode("utf-8"))
result = {
"success": True,
"url": url,
"path": write_result.get("path", target_path),
"char_count": char_count,
"byte_size": byte_size,
"message": f"网页内容已以纯文本保存到 {write_result.get('path', target_path)},可用 read_file 的 search/extract 查看,必要时再用终端命令。"
}
if isinstance(extract_result, dict) and extract_result.get("failed_results"):
result["warnings"] = extract_result["failed_results"]
except Exception as e:
result = {
"success": False,
"error": f"网页保存失败: {str(e)}",
"url": url,
"path": target_path
}
elif tool_name == "run_python":
result = await self.terminal_ops.run_python_code(
arguments["code"],
timeout=arguments.get("timeout")
)
elif tool_name == "run_command":
result = await self.terminal_ops.run_command(
arguments["command"],
timeout=arguments.get("timeout")
)
# 字符数检查
if result.get("success") and "output" in result:
char_count = len(result["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
result = {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": arguments["command"]
}
elif tool_name == "update_memory":
memory_type = arguments["memory_type"]
operation = arguments["operation"]
content = arguments.get("content")
index = arguments.get("index")
# 参数校验
if operation == "append" and (not content or not str(content).strip()):
result = {"success": False, "error": "append 操作需要 content"}
elif operation == "replace" and (index is None or index <= 0 or not content or not str(content).strip()):
result = {"success": False, "error": "replace 操作需要有效的 index 和 content"}
elif operation == "delete" and (index is None or index <= 0):
result = {"success": False, "error": "delete 操作需要有效的 index"}
else:
result = self.memory_manager.update_entries(
memory_type=memory_type,
operation=operation,
content=content,
index=index
)
elif tool_name == "todo_create":
result = self.todo_manager.create_todo_list(
overview=arguments.get("overview", ""),
tasks=arguments.get("tasks", [])
)
elif tool_name == "todo_update_task":
task_indices = arguments.get("task_indices")
if task_indices is None:
task_indices = arguments.get("task_index")
result = self.todo_manager.update_task_status(
task_indices=task_indices,
completed=arguments.get("completed", True)
)
elif tool_name == "create_sub_agent":
result = self.sub_agent_manager.create_sub_agent(
agent_id=arguments.get("agent_id"),
summary=arguments.get("summary", ""),
task=arguments.get("task", ""),
target_dir=arguments.get("target_dir", ""),
reference_files=arguments.get("reference_files", []),
timeout_seconds=arguments.get("timeout_seconds"),
conversation_id=self.context_manager.current_conversation_id
)
elif tool_name == "wait_sub_agent":
wait_timeout = arguments.get("timeout_seconds")
if not wait_timeout:
task_ref = self.sub_agent_manager.lookup_task(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id")
)
if task_ref:
wait_timeout = task_ref.get("timeout_seconds")
result = self.sub_agent_manager.wait_for_completion(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id"),
timeout_seconds=wait_timeout
)
elif tool_name == "close_sub_agent":
result = self.sub_agent_manager.terminate_sub_agent(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id")
)
elif tool_name == "trigger_easter_egg":
result = self.easter_egg_manager.trigger_effect(arguments.get("effect"))
else:
result = {"success": False, "error": f"未知工具: {tool_name}"}
except Exception as e:
logger.error(f"工具执行失败: {tool_name} - {e}")
result = {"success": False, "error": f"工具执行异常: {str(e)}"}
return json.dumps(result, ensure_ascii=False)
async def confirm_action(self, action: str, arguments: Dict) -> bool:
"""确认危险操作"""
print(f"\n{OUTPUT_FORMATS['confirm']} 需要确认的操作:")
print(f" 操作: {action}")
print(f" 参数: {json.dumps(arguments, ensure_ascii=False, indent=2)}")
response = input("\n是否继续? (y/n): ").strip().lower()
return response == 'y'