agent-Specialization/core/main_terminal_parts/tools_policy.py

236 lines
11 KiB
Python

import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
try:
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
PROJECT_MAX_STORAGE_MB,
CUSTOM_TOOLS_ENABLED,
)
except ImportError:
import sys
project_root = Path(__file__).resolve().parents[2]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
PROJECT_MAX_STORAGE_MB,
CUSTOM_TOOLS_ENABLED,
)
from modules.file_manager import FileManager
from modules.search_engine import SearchEngine
from modules.terminal_ops import TerminalOperator
from modules.memory_manager import MemoryManager
from modules.terminal_manager import TerminalManager
from modules.todo_manager import TodoManager
from modules.sub_agent_manager import SubAgentManager
from modules.webpage_extractor import extract_webpage_content, tavily_extract
from modules.ocr_client import OCRClient
from modules.easter_egg_manager import EasterEggManager
from modules.personalization_manager import (
load_personalization_config,
build_personalization_prompt,
)
from modules.skills_manager import (
get_skills_catalog,
build_skills_list,
merge_enabled_skills,
build_skills_prompt,
)
from modules.custom_tool_registry import CustomToolRegistry, build_default_tool_category
from modules.custom_tool_executor import CustomToolExecutor
try:
from config.limits import THINKING_FAST_INTERVAL
except ImportError:
THINKING_FAST_INTERVAL = 10
from modules.container_monitor import collect_stats, inspect_state
from core.tool_config import TOOL_CATEGORIES
from utils.api_client import DeepSeekClient
from utils.context_manager import ContextManager
from utils.tool_result_formatter import format_tool_result_for_context
from utils.logger import setup_logger
from config.model_profiles import (
get_model_profile,
get_model_prompt_replacements,
get_model_context_window,
)
logger = setup_logger(__name__)
DISABLE_LENGTH_CHECK = True
class MainTerminalToolsPolicyMixin:
def apply_personalization_preferences(self, config: Optional[Dict[str, Any]] = None):
"""Apply persisted personalization settings that affect runtime behavior."""
try:
effective_config = config or load_personalization_config(self.data_dir)
except Exception:
effective_config = {}
# 工具意图开关
self.tool_intent_enabled = bool(effective_config.get("tool_intent_enabled"))
interval = effective_config.get("thinking_interval")
if isinstance(interval, int) and interval > 0:
self.thinking_fast_interval = interval
else:
self.thinking_fast_interval = THINKING_FAST_INTERVAL
disabled_categories = []
raw_disabled = effective_config.get("disabled_tool_categories")
if isinstance(raw_disabled, list):
disabled_categories = [
key for key in raw_disabled
if isinstance(key, str) and key in self.tool_categories_map
]
self.default_disabled_tool_categories = disabled_categories
# 图片压缩模式传递给上下文
img_mode = effective_config.get("image_compression")
if isinstance(img_mode, str):
self.context_manager.image_compression_mode = img_mode
# Reset category states to defaults before applying overrides
for key, category in self.tool_categories_map.items():
self.tool_category_states[key] = False if key in disabled_categories else category.default_enabled
self._refresh_disabled_tools()
# 默认模型偏好(优先应用,再处理运行模式)
preferred_model = effective_config.get("default_model")
if isinstance(preferred_model, str) and preferred_model != self.model_key:
try:
self.set_model(preferred_model)
except Exception as exc:
logger.warning("忽略无效默认模型: %s (%s)", preferred_model, exc)
preferred_mode = effective_config.get("default_run_mode")
if isinstance(preferred_mode, str):
normalized_mode = preferred_mode.strip().lower()
if normalized_mode in {"fast", "thinking", "deep"} and normalized_mode != self.run_mode:
try:
self.set_run_mode(normalized_mode)
except ValueError:
logger.warning("忽略无效默认运行模式: %s", preferred_mode)
# 静默禁用工具提示
self.silent_tool_disable = bool(effective_config.get("silent_tool_disable"))
def set_tool_category_enabled(self, category: str, enabled: bool) -> None:
"""设置工具类别的启用状态 / Toggle tool category enablement."""
categories = self.tool_categories_map
if category not in categories:
raise ValueError(f"未知的工具类别: {category}")
forced = self.admin_forced_category_states.get(category)
if isinstance(forced, bool) and forced != enabled:
raise ValueError("该类别被管理员强制为启用/禁用,无法修改")
self.tool_category_states[category] = bool(enabled)
self._refresh_disabled_tools()
def set_admin_policy(
self,
categories: Optional[Dict[str, "ToolCategory"]] = None,
forced_category_states: Optional[Dict[str, Optional[bool]]] = None,
disabled_models: Optional[List[str]] = None,
) -> None:
"""应用管理员策略(工具分类、强制开关、模型禁用)。"""
if categories:
self.tool_categories_map = dict(categories)
# 保证自定义工具分类存在(仅当功能启用)
if self.custom_tools_enabled and "custom" not in self.tool_categories_map:
self.tool_categories_map["custom"] = type(next(iter(TOOL_CATEGORIES.values())))(
label="自定义工具",
tools=[],
default_enabled=True,
silent_when_disabled=False,
)
# 重新构建启用状态映射,保留已有值
new_states: Dict[str, bool] = {}
for key, cat in self.tool_categories_map.items():
if key in self.tool_category_states:
new_states[key] = self.tool_category_states[key]
else:
new_states[key] = cat.default_enabled
self.tool_category_states = new_states
# 清理已被移除的类别
for removed in list(self.tool_category_states.keys()):
if removed not in self.tool_categories_map:
self.tool_category_states.pop(removed, None)
self.admin_forced_category_states = forced_category_states or {}
self.admin_disabled_models = disabled_models or []
self._refresh_disabled_tools()
def get_tool_settings_snapshot(self) -> List[Dict[str, object]]:
"""获取工具类别状态快照 / Return tool category states snapshot."""
snapshot: List[Dict[str, object]] = []
categories = self.tool_categories_map
for key, category in categories.items():
forced = self.admin_forced_category_states.get(key)
enabled = self.tool_category_states.get(key, category.default_enabled)
if isinstance(forced, bool):
enabled = forced
snapshot.append({
"id": key,
"label": category.label,
"enabled": enabled,
"tools": list(category.tools),
"locked": isinstance(forced, bool),
"locked_state": forced if isinstance(forced, bool) else None,
})
return snapshot
def _refresh_disabled_tools(self) -> None:
"""刷新禁用工具列表 / Refresh disabled tool set."""
disabled: Set[str] = set()
notice: Set[str] = set()
categories = self.tool_categories_map
for key, category in categories.items():
state = self.tool_category_states.get(key, category.default_enabled)
forced = self.admin_forced_category_states.get(key)
if isinstance(forced, bool):
state = forced
if not state:
disabled.update(category.tools)
if not getattr(category, "silent_when_disabled", False):
notice.update(category.tools)
self.disabled_tools = disabled
self.disabled_notice_tools = notice
def _format_disabled_tool_notice(self) -> Optional[str]:
"""生成禁用工具提示信息 / Format disabled tool notice."""
if getattr(self, "silent_tool_disable", False):
return None
if not self.disabled_notice_tools:
return None
lines = ["=== 工具可用性提醒 ==="]
for tool_name in sorted(self.disabled_notice_tools):
lines.append(f"{tool_name}:已被用户禁用")
lines.append("=== 提示结束 ===")
return "\n".join(lines)