import asyncio import json from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set try: from config import ( OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION, MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE, MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS, READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER, READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER, READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES, READ_TOOL_MAX_FILE_SIZE, TERMINAL_SANDBOX_MOUNT_PATH, TERMINAL_SANDBOX_MODE, TERMINAL_SANDBOX_CPUS, TERMINAL_SANDBOX_MEMORY, PROJECT_MAX_STORAGE_MB, CUSTOM_TOOLS_ENABLED, ) except ImportError: import sys project_root = Path(__file__).resolve().parents[2] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION, MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE, MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS, READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER, READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER, READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES, READ_TOOL_MAX_FILE_SIZE, TERMINAL_SANDBOX_MOUNT_PATH, TERMINAL_SANDBOX_MODE, TERMINAL_SANDBOX_CPUS, TERMINAL_SANDBOX_MEMORY, PROJECT_MAX_STORAGE_MB, CUSTOM_TOOLS_ENABLED, ) from modules.file_manager import FileManager from modules.search_engine import SearchEngine from modules.terminal_ops import TerminalOperator from modules.memory_manager import MemoryManager from modules.terminal_manager import TerminalManager from modules.todo_manager import TodoManager from modules.sub_agent_manager import SubAgentManager from modules.webpage_extractor import extract_webpage_content, tavily_extract from modules.ocr_client import OCRClient from modules.easter_egg_manager import EasterEggManager from modules.personalization_manager import ( load_personalization_config, build_personalization_prompt, ) from modules.skills_manager import ( get_skills_catalog, build_skills_list, merge_enabled_skills, build_skills_prompt, ) from modules.custom_tool_registry import CustomToolRegistry, build_default_tool_category from modules.custom_tool_executor import CustomToolExecutor try: from config.limits import THINKING_FAST_INTERVAL except ImportError: THINKING_FAST_INTERVAL = 10 from modules.container_monitor import collect_stats, inspect_state from core.tool_config import TOOL_CATEGORIES from utils.api_client import DeepSeekClient from utils.context_manager import ContextManager from utils.tool_result_formatter import format_tool_result_for_context from utils.logger import setup_logger from config.model_profiles import ( get_model_profile, get_model_prompt_replacements, get_model_context_window, ) logger = setup_logger(__name__) DISABLE_LENGTH_CHECK = True class MainTerminalContextMixin: def build_context(self) -> Dict: """构建主终端上下文""" # 读取记忆 memory = self.memory_manager.read_main_memory() # 构建上下文 return self.context_manager.build_main_context(memory) def _tool_calls_followed_by_tools(self, conversation: List[Dict], start_idx: int, tool_calls: List[Dict]) -> bool: """判断指定助手消息的工具调用是否拥有后续的工具响应。""" if not tool_calls: return False expected_ids = [tc.get("id") for tc in tool_calls if tc.get("id")] if not expected_ids: return False matched: Set[str] = set() idx = start_idx + 1 total = len(conversation) while idx < total and len(matched) < len(expected_ids): next_conv = conversation[idx] role = next_conv.get("role") if role == "tool": call_id = next_conv.get("tool_call_id") if call_id in expected_ids: matched.add(call_id) else: break elif role in ("assistant", "user"): break idx += 1 return len(matched) == len(expected_ids) def build_messages(self, context: Dict, user_input: str) -> List[Dict]: """构建消息列表(添加终端内容注入)""" # 加载系统提示(Qwen3.5 使用专用提示) prompt_name = "main_system_qwenvl" if getattr(self, "model_key", "kimi") in {"qwen3-vl-plus", "kimi-k2.5"} else "main_system" system_prompt = self.load_prompt(prompt_name) # 格式化系统提示 container_path = self.container_mount_path or "/workspace" container_cpus = self.container_cpu_limit container_memory = self.container_memory_limit project_storage = self.project_storage_limit model_key = getattr(self, "model_key", "kimi") prompt_replacements = get_model_prompt_replacements(model_key) system_prompt = system_prompt.format( project_path=container_path, container_path=container_path, container_cpus=container_cpus, container_memory=container_memory, project_storage=project_storage, file_tree=context["project_info"]["file_tree"], memory=context["memory"], current_time=datetime.now().strftime("%Y-%m-%d %H"), model_description=prompt_replacements.get("model_description", "") ) messages = [ {"role": "system", "content": system_prompt} ] personalization_config = getattr(self.context_manager, "custom_personalization_config", None) or load_personalization_config(self.data_dir) skills_catalog = get_skills_catalog() enabled_skills = merge_enabled_skills( personalization_config.get("enabled_skills") if isinstance(personalization_config, dict) else None, skills_catalog, personalization_config.get("skills_catalog_snapshot") if isinstance(personalization_config, dict) else None, ) skills_template = self.load_prompt("skills_system").strip() skills_list = build_skills_list(skills_catalog, enabled_skills) skills_prompt = build_skills_prompt(skills_template, skills_list) if skills_prompt: messages.append({"role": "system", "content": skills_prompt}) workspace_system = self.context_manager._build_workspace_system_message(context) if workspace_system: messages.append({"role": "system", "content": workspace_system}) if self.tool_category_states.get("todo", True): todo_prompt = self.load_prompt("todo_guidelines").strip() if todo_prompt: messages.append({"role": "system", "content": todo_prompt}) if self.tool_category_states.get("sub_agent", True): sub_agent_prompt = self.load_prompt("sub_agent_guidelines").strip() if sub_agent_prompt: messages.append({"role": "system", "content": sub_agent_prompt}) if self.deep_thinking_mode: deep_prompt = self.load_prompt("deep_thinking_mode_guidelines").strip() if deep_prompt: deep_prompt = deep_prompt.format( deep_thinking_line=prompt_replacements.get("deep_thinking_line", "") ) messages.append({"role": "system", "content": deep_prompt}) elif self.thinking_mode: thinking_prompt = self.load_prompt("thinking_mode_guidelines").strip() if thinking_prompt: thinking_prompt = thinking_prompt.format( thinking_model_line=prompt_replacements.get("thinking_model_line", "") ) messages.append({"role": "system", "content": thinking_prompt}) # 支持按对话覆盖的个性化配置 personalization_block = build_personalization_prompt(personalization_config, include_header=False) if personalization_block: personalization_template = self.load_prompt("personalization").strip() if personalization_template and "{personalization_block}" in personalization_template: personalization_text = personalization_template.format(personalization_block=personalization_block) elif personalization_template: personalization_text = f"{personalization_template}\n{personalization_block}" else: personalization_text = personalization_block messages.append({"role": "system", "content": personalization_text}) # 支持按对话覆盖的自定义 system prompt(API 用途)。 # 放在最后一个 system 消息位置,确保优先级最高,便于业务场景强约束。 custom_system_prompt = getattr(self.context_manager, "custom_system_prompt", None) if isinstance(custom_system_prompt, str) and custom_system_prompt.strip(): messages.append({"role": "system", "content": custom_system_prompt.strip()}) # 添加对话历史(保留完整结构,包括tool_calls和tool消息) conversation = context["conversation"] for idx, conv in enumerate(conversation): metadata = conv.get("metadata") or {} if conv["role"] == "assistant": # Assistant消息可能包含工具调用 message = { "role": conv["role"], "content": conv["content"] } reasoning = conv.get("reasoning_content") if reasoning: message["reasoning_content"] = reasoning # 如果有工具调用信息,添加到消息中 tool_calls = conv.get("tool_calls") or [] if tool_calls and self._tool_calls_followed_by_tools(conversation, idx, tool_calls): message["tool_calls"] = tool_calls messages.append(message) elif conv["role"] == "tool": # Tool消息需要保留完整结构 images = conv.get("images") or metadata.get("images") or [] videos = conv.get("videos") or metadata.get("videos") or [] content_value = conv.get("content") if isinstance(content_value, list): content_payload = content_value elif images or videos: content_payload = self.context_manager._build_content_with_images(content_value, images, videos) else: content_payload = content_value message = { "role": "tool", "content": content_payload, "tool_call_id": conv.get("tool_call_id", ""), "name": conv.get("name", "") } messages.append(message) elif conv["role"] == "system" and metadata.get("sub_agent_notice"): # 转换为用户消息,让模型能及时响应 messages.append({ "role": "user", "content": conv["content"] }) else: # User 或普通 System 消息 images = conv.get("images") or metadata.get("images") or [] videos = conv.get("videos") or metadata.get("videos") or [] content_payload = ( self.context_manager._build_content_with_images(conv["content"], images, videos) if (images or videos) else conv["content"] ) messages.append({ "role": conv["role"], "content": content_payload }) # 当前用户输入已经在conversation中了,不需要重复添加 todo_message = self.context_manager.render_todo_system_message() if todo_message: messages.append({ "role": "system", "content": todo_message }) disabled_notice = self._format_disabled_tool_notice() if disabled_notice: messages.append({ "role": "system", "content": disabled_notice }) return messages def load_prompt(self, name: str) -> str: """加载提示模板""" prompt_file = Path(PROMPTS_DIR) / f"{name}.txt" if prompt_file.exists(): with open(prompt_file, 'r', encoding='utf-8') as f: return f.read() return "你是一个AI助手。"