agent/core/main_terminal.py

2396 lines
110 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# core/main_terminal.py - 主终端(集成对话持久化)
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Set
from datetime import datetime
try:
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE, MAX_FOCUS_FILE_CHARS
)
except ImportError:
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
OUTPUT_FORMATS, DATA_DIR, PROMPTS_DIR, NEED_CONFIRMATION,
MAX_TERMINALS, TERMINAL_BUFFER_SIZE, TERMINAL_DISPLAY_SIZE,
MAX_READ_FILE_CHARS, READ_TOOL_DEFAULT_MAX_CHARS,
READ_TOOL_DEFAULT_CONTEXT_BEFORE, READ_TOOL_DEFAULT_CONTEXT_AFTER,
READ_TOOL_MAX_CONTEXT_BEFORE, READ_TOOL_MAX_CONTEXT_AFTER,
READ_TOOL_DEFAULT_MAX_MATCHES, READ_TOOL_MAX_MATCHES,
READ_TOOL_MAX_FILE_SIZE, MAX_FOCUS_FILE_CHARS
)
from modules.file_manager import FileManager
from modules.search_engine import SearchEngine
from modules.terminal_ops import TerminalOperator
from modules.memory_manager import MemoryManager
from modules.terminal_manager import TerminalManager
from modules.todo_manager import TodoManager
from modules.sub_agent_manager import SubAgentManager
from modules.webpage_extractor import extract_webpage_content, tavily_extract
from modules.ocr_client import OCRClient
from modules.easter_egg_manager import EasterEggManager
from modules.personalization_manager import (
load_personalization_config,
build_personalization_prompt,
)
from core.tool_config import TOOL_CATEGORIES
from utils.api_client import DeepSeekClient
from utils.context_manager import ContextManager
from utils.logger import setup_logger
logger = setup_logger(__name__)
# 临时禁用长度检查
DISABLE_LENGTH_CHECK = True
class MainTerminal:
def __init__(
self,
project_path: str,
thinking_mode: bool = False,
data_dir: Optional[str] = None,
):
self.project_path = project_path
self.thinking_mode = thinking_mode # False=快速模式, True=思考模式
self.data_dir = Path(data_dir).expanduser().resolve() if data_dir else Path(DATA_DIR).resolve()
# 初始化组件
self.api_client = DeepSeekClient(thinking_mode=thinking_mode)
self.context_manager = ContextManager(project_path, data_dir=str(self.data_dir))
self.context_manager.main_terminal = self
self.memory_manager = MemoryManager(data_dir=str(self.data_dir))
self.file_manager = FileManager(project_path)
self.search_engine = SearchEngine()
self.terminal_ops = TerminalOperator(project_path)
self.ocr_client = OCRClient(project_path, self.file_manager)
# 新增:终端管理器
self.terminal_manager = TerminalManager(
project_path=project_path,
max_terminals=MAX_TERMINALS,
terminal_buffer_size=TERMINAL_BUFFER_SIZE,
terminal_display_size=TERMINAL_DISPLAY_SIZE,
broadcast_callback=None # CLI模式不需要广播
)
self.todo_manager = TodoManager(self.context_manager)
self.sub_agent_manager = SubAgentManager(
project_path=self.project_path,
data_dir=str(self.data_dir)
)
self.easter_egg_manager = EasterEggManager()
self._announced_sub_agent_tasks = set()
# 聚焦文件管理
self.focused_files = {} # {path: content} 存储聚焦的文件内容
self.current_session_id = 0 # 用于标识不同的任务会话
# 新增:追加内容状态
self.pending_append_request = None # {"path": str}
self.pending_modify_request = None # {"path": str}
# 工具启用状态
self.tool_category_states = {
key: category.default_enabled
for key, category in TOOL_CATEGORIES.items()
}
self.disabled_tools = set()
self.disabled_notice_tools = set()
self._refresh_disabled_tools()
# 新增:自动开始新对话
self._ensure_conversation()
# 命令映射
self.commands = {
"help": self.show_help,
"exit": self.exit_system,
"status": self.show_status,
"memory": self.manage_memory,
"clear": self.clear_conversation,
"history": self.show_history,
"files": self.show_files,
"mode": self.toggle_mode,
"focused": self.show_focused_files,
"terminals": self.show_terminals,
# 新增:对话管理命令
"conversations": self.show_conversations,
"load": self.load_conversation_command,
"new": self.new_conversation_command,
"save": self.save_conversation_command
}
#self.context_manager._web_terminal_callback = message_callback
#self.context_manager._focused_files = self.focused_files # 引用传递
def _ensure_conversation(self):
"""确保CLI模式下存在可用的对话ID"""
if self.context_manager.current_conversation_id:
return
latest_list = self.context_manager.get_conversation_list(limit=1, offset=0)
conversations = latest_list.get("conversations", []) if latest_list else []
if conversations:
latest = conversations[0]
conv_id = latest.get("id")
if conv_id and self.context_manager.load_conversation_by_id(conv_id):
print(f"{OUTPUT_FORMATS['info']} 已加载最近对话: {conv_id}")
return
conversation_id = self.context_manager.start_new_conversation(
project_path=self.project_path,
thinking_mode=self.thinking_mode
)
print(f"{OUTPUT_FORMATS['info']} 新建对话: {conversation_id}")
@staticmethod
def _clamp_int(value, default, min_value=None, max_value=None):
"""将输入转换为整数并限制范围。"""
if value is None:
return default
try:
num = int(value)
except (TypeError, ValueError):
return default
if min_value is not None:
num = max(min_value, num)
if max_value is not None:
num = min(max_value, num)
return num
@staticmethod
def _parse_optional_line(value, field_name: str):
"""解析可选的行号参数。"""
if value is None:
return None, None
try:
number = int(value)
except (TypeError, ValueError):
return None, f"{field_name} 必须是整数"
if number < 1:
return None, f"{field_name} 必须大于等于1"
return number, None
@staticmethod
def _truncate_text_block(text: str, max_chars: int):
"""对单段文本应用字符限制。"""
if max_chars and len(text) > max_chars:
return text[:max_chars], True, max_chars
return text, False, len(text)
@staticmethod
def _limit_text_chunks(chunks: List[Dict], text_key: str, max_chars: int):
"""对多个文本片段应用全局字符限制。"""
if max_chars is None or max_chars <= 0:
return chunks, False, sum(len(chunk.get(text_key, "") or "") for chunk in chunks)
remaining = max_chars
limited_chunks: List[Dict] = []
truncated = False
consumed = 0
for chunk in chunks:
snippet = chunk.get(text_key, "") or ""
snippet_len = len(snippet)
chunk_copy = dict(chunk)
if remaining <= 0:
truncated = True
break
if snippet_len > remaining:
chunk_copy[text_key] = snippet[:remaining]
chunk_copy["truncated"] = True
consumed += remaining
limited_chunks.append(chunk_copy)
truncated = True
remaining = 0
break
limited_chunks.append(chunk_copy)
consumed += snippet_len
remaining -= snippet_len
return limited_chunks, truncated, consumed
def _record_sub_agent_message(self, message: Optional[str], task_id: Optional[str] = None, inline: bool = False):
"""以 system 消息记录子智能体状态。"""
if not message:
return
if task_id and task_id in self._announced_sub_agent_tasks:
return
if task_id:
self._announced_sub_agent_tasks.add(task_id)
logger.info(
"[SubAgent] record message | task=%s | inline=%s | content=%s",
task_id,
inline,
message.replace("\n", "\\n")[:200],
)
metadata = {"sub_agent_notice": True, "inline": inline}
if task_id:
metadata["task_id"] = task_id
self.context_manager.add_conversation("system", message, metadata=metadata)
print(f"{OUTPUT_FORMATS['info']} {message}")
def _handle_read_tool(self, arguments: Dict) -> Dict:
"""集中处理 read_file 工具的三种模式。"""
file_path = arguments.get("path")
if not file_path:
return {"success": False, "error": "缺少文件路径参数"}
read_type = (arguments.get("type") or "read").lower()
if read_type not in {"read", "search", "extract"}:
return {"success": False, "error": f"未知的读取类型: {read_type}"}
max_chars = self._clamp_int(
arguments.get("max_chars"),
READ_TOOL_DEFAULT_MAX_CHARS,
1,
MAX_READ_FILE_CHARS
)
base_result = {
"success": True,
"type": read_type,
"path": None,
"encoding": "utf-8",
"max_chars": max_chars,
"truncated": False
}
if read_type == "read":
start_line, error = self._parse_optional_line(arguments.get("start_line"), "start_line")
if error:
return {"success": False, "error": error}
end_line_val = arguments.get("end_line")
end_line = None
if end_line_val is not None:
end_line, error = self._parse_optional_line(end_line_val, "end_line")
if error:
return {"success": False, "error": error}
if start_line and end_line < start_line:
return {"success": False, "error": "end_line 必须大于等于 start_line"}
read_result = self.file_manager.read_text_segment(
file_path,
start_line=start_line,
end_line=end_line,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not read_result.get("success"):
return read_result
content, truncated, char_count = self._truncate_text_block(read_result["content"], max_chars)
base_result.update({
"path": read_result["path"],
"content": content,
"line_start": read_result["line_start"],
"line_end": read_result["line_end"],
"total_lines": read_result["total_lines"],
"file_size": read_result["size"],
"char_count": char_count,
"message": f"已读取 {read_result['path']} 的内容(行 {read_result['line_start']}~{read_result['line_end']}"
})
base_result["truncated"] = truncated
self.context_manager.load_file(read_result["path"])
return base_result
if read_type == "search":
query = arguments.get("query")
if not query:
return {"success": False, "error": "搜索模式需要提供 query 参数"}
max_matches = self._clamp_int(
arguments.get("max_matches"),
READ_TOOL_DEFAULT_MAX_MATCHES,
1,
READ_TOOL_MAX_MATCHES
)
context_before = self._clamp_int(
arguments.get("context_before"),
READ_TOOL_DEFAULT_CONTEXT_BEFORE,
0,
READ_TOOL_MAX_CONTEXT_BEFORE
)
context_after = self._clamp_int(
arguments.get("context_after"),
READ_TOOL_DEFAULT_CONTEXT_AFTER,
0,
READ_TOOL_MAX_CONTEXT_AFTER
)
case_sensitive = bool(arguments.get("case_sensitive"))
search_result = self.file_manager.search_text(
file_path,
query=query,
max_matches=max_matches,
context_before=context_before,
context_after=context_after,
case_sensitive=case_sensitive,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not search_result.get("success"):
return search_result
matches = search_result["matches"]
limited_matches, truncated, char_count = self._limit_text_chunks(matches, "snippet", max_chars)
base_result.update({
"path": search_result["path"],
"file_size": search_result["size"],
"query": query,
"max_matches": max_matches,
"actual_matches": len(matches),
"returned_matches": len(limited_matches),
"context_before": context_before,
"context_after": context_after,
"case_sensitive": case_sensitive,
"matches": limited_matches,
"char_count": char_count,
"message": f"{search_result['path']} 中搜索 \"{query}\",返回 {len(limited_matches)} 条结果"
})
base_result["truncated"] = truncated
return base_result
# extract
segments = arguments.get("segments")
if not isinstance(segments, list) or not segments:
return {"success": False, "error": "extract 模式需要提供 segments 数组"}
extract_result = self.file_manager.extract_segments(
file_path,
segments=segments,
size_limit=READ_TOOL_MAX_FILE_SIZE
)
if not extract_result.get("success"):
return extract_result
limited_segments, truncated, char_count = self._limit_text_chunks(
extract_result["segments"],
"content",
max_chars
)
base_result.update({
"path": extract_result["path"],
"segments": limited_segments,
"file_size": extract_result["size"],
"total_lines": extract_result["total_lines"],
"segment_count": len(limited_segments),
"char_count": char_count,
"message": f"已从 {extract_result['path']} 抽取 {len(limited_segments)} 个片段"
})
base_result["truncated"] = truncated
self.context_manager.load_file(extract_result["path"])
return base_result
def set_tool_category_enabled(self, category: str, enabled: bool) -> None:
"""设置工具类别的启用状态 / Toggle tool category enablement."""
if category not in TOOL_CATEGORIES:
raise ValueError(f"未知的工具类别: {category}")
self.tool_category_states[category] = bool(enabled)
self._refresh_disabled_tools()
def get_tool_settings_snapshot(self) -> List[Dict[str, object]]:
"""获取工具类别状态快照 / Return tool category states snapshot."""
snapshot: List[Dict[str, object]] = []
for key, category in TOOL_CATEGORIES.items():
snapshot.append({
"id": key,
"label": category.label,
"enabled": self.tool_category_states.get(key, category.default_enabled),
"tools": list(category.tools),
})
return snapshot
def _refresh_disabled_tools(self) -> None:
"""刷新禁用工具列表 / Refresh disabled tool set."""
disabled = set()
notice = set()
for key, enabled in self.tool_category_states.items():
if not enabled:
category = TOOL_CATEGORIES[key]
disabled.update(category.tools)
if not getattr(category, "silent_when_disabled", False):
notice.update(category.tools)
self.disabled_tools = disabled
self.disabled_notice_tools = notice
def _format_disabled_tool_notice(self) -> Optional[str]:
"""生成禁用工具提示信息 / Format disabled tool notice."""
if not self.disabled_notice_tools:
return None
lines = ["=== 工具可用性提醒 ==="]
for tool_name in sorted(self.disabled_notice_tools):
lines.append(f"{tool_name}:已被用户禁用")
lines.append("=== 提示结束 ===")
return "\n".join(lines)
async def run(self):
"""运行主终端循环"""
print(f"\n{OUTPUT_FORMATS['info']} 主终端已启动")
print(f"{OUTPUT_FORMATS['info']} 当前对话: {self.context_manager.current_conversation_id}")
while True:
try:
# 获取用户输入(使用人的表情)
user_input = input("\n👤 > ").strip()
if not user_input:
continue
# 处理命令(命令不记录到对话历史)
if user_input.startswith('/'):
await self.handle_command(user_input[1:])
elif user_input.lower() in ['exit', 'quit', 'q']:
# 用户可能忘记加斜杠
print(f"{OUTPUT_FORMATS['info']} 提示: 使用 /exit 退出系统")
continue
elif user_input.lower() == 'help':
print(f"{OUTPUT_FORMATS['info']} 提示: 使用 /help 查看帮助")
continue
else:
# 确保有活动对话
self._ensure_conversation()
# 只有非命令的输入才记录到对话历史
self.context_manager.add_conversation("user", user_input)
# 新增:开始新的任务会话
self.current_session_id += 1
# AI回复前空一行并显示机器人图标
print("\n🤖 >", end=" ")
await self.handle_task(user_input)
# 回复后自动空一行在handle_task完成后
except KeyboardInterrupt:
print(f"\n{OUTPUT_FORMATS['warning']} 使用 /exit 退出系统")
continue
except Exception as e:
logger.error(f"主终端错误: {e}", exc_info=True)
print(f"{OUTPUT_FORMATS['error']} 发生错误: {e}")
# 错误后仍然尝试自动保存
try:
self.context_manager.auto_save_conversation()
except:
pass
async def handle_command(self, command: str):
"""处理系统命令"""
parts = command.split(maxsplit=1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
if cmd in self.commands:
await self.commands[cmd](args)
else:
print(f"{OUTPUT_FORMATS['error']} 未知命令: {cmd}")
await self.show_help()
async def handle_task(self, user_input: str):
"""处理用户任务(完全修复版:彻底解决对话记录重复问题)"""
try:
# 如果是思考模式,每个新任务重置状态
# 注意:这里重置的是当前任务的第一次调用标志,确保新用户请求重新思考
if self.thinking_mode:
self.api_client.start_new_task()
# 新增:开始新的任务会话
self.current_session_id += 1
# 构建上下文
context = self.build_context()
# 构建消息
messages = self.build_messages(context, user_input)
# 定义可用工具
tools = self.define_tools()
# 用于收集本次任务的所有信息(关键:不立即保存到对话历史)
collected_tool_calls = []
collected_tool_results = []
final_response = ""
final_thinking = ""
# 工具处理器:只执行工具,收集信息,绝不保存到对话历史
async def tool_handler(tool_name: str, arguments: Dict) -> str:
# 执行工具调用
result = await self.handle_tool_call(tool_name, arguments)
# 生成工具调用ID
tool_call_id = f"call_{datetime.now().timestamp()}_{tool_name}"
# 收集工具调用信息(不保存)
tool_call_info = {
"id": tool_call_id,
"type": "function",
"function": {
"name": tool_name,
"arguments": json.dumps(arguments, ensure_ascii=False)
}
}
collected_tool_calls.append(tool_call_info)
# 处理工具结果用于保存
result_data = {}
try:
result_data = json.loads(result)
if tool_name == "read_file" and result_data.get("success"):
file_content = result_data.get("content", "")
tool_result_content = f"文件内容:\n```\n{file_content}\n```\n大小: {result_data.get('size')} 字节"
else:
tool_result_content = result
except:
tool_result_content = result
# 收集工具结果(不保存)
collected_tool_results.append({
"tool_call_id": tool_call_id,
"name": tool_name,
"content": tool_result_content,
"system_message": result_data.get("system_message") if isinstance(result_data, dict) else None,
"task_id": result_data.get("task_id") if isinstance(result_data, dict) else None
})
return result
# 调用带工具的API模型自己决定是否使用工具
response = await self.api_client.chat_with_tools(
messages=messages,
tools=tools,
tool_handler=tool_handler
)
# 保存响应内容
final_response = response
# 获取思考内容(如果有的话)
if self.api_client.current_task_thinking:
final_thinking = self.api_client.current_task_thinking
# ===== 统一保存到对话历史(关键修复) =====
# 1. 构建完整的assistant消息内容
assistant_content_parts = []
# 添加思考内容
if final_thinking:
assistant_content_parts.append(f"<think>\n{final_thinking}\n</think>")
# 添加回复内容
if final_response:
assistant_content_parts.append(final_response)
# 合并内容
assistant_content = "\n".join(assistant_content_parts) if assistant_content_parts else "已完成操作。"
# 2. 保存assistant消息包含tool_calls但不包含结果
self.context_manager.add_conversation(
"assistant",
assistant_content,
collected_tool_calls if collected_tool_calls else None
)
# 3. 保存独立的tool消息
for tool_result in collected_tool_results:
self.context_manager.add_conversation(
"tool",
tool_result["content"],
tool_call_id=tool_result["tool_call_id"],
name=tool_result["name"]
)
system_message = tool_result.get("system_message")
if system_message:
self._record_sub_agent_message(system_message, tool_result.get("task_id"), inline=False)
# 补充TODO完成提示放在tool消息之后保证格式正确
todo_note = None
try:
parsed = json.loads(tool_result["content"])
todo_note = parsed.get("system_note")
except Exception:
todo_note = None
if todo_note:
self.context_manager.add_conversation("system", todo_note)
# 4. 在终端显示执行信息(不保存到历史)
if collected_tool_calls:
tool_names = [tc['function']['name'] for tc in collected_tool_calls]
for tool_name in tool_names:
if tool_name == "create_file":
print(f"{OUTPUT_FORMATS['file']} 创建文件")
elif tool_name == "read_file":
print(f"{OUTPUT_FORMATS['file']} 读取文件")
elif tool_name == "ocr_image":
print(f"{OUTPUT_FORMATS['file']} 图片OCR")
elif tool_name == "modify_file":
print(f"{OUTPUT_FORMATS['file']} 修改文件")
elif tool_name == "delete_file":
print(f"{OUTPUT_FORMATS['file']} 删除文件")
elif tool_name == "terminal_session":
print(f"{OUTPUT_FORMATS['session']} 终端会话操作")
elif tool_name == "terminal_input":
print(f"{OUTPUT_FORMATS['terminal']} 执行终端命令")
elif tool_name == "web_search":
print(f"{OUTPUT_FORMATS['search']} 网络搜索")
elif tool_name == "run_python":
print(f"{OUTPUT_FORMATS['code']} 执行Python代码")
elif tool_name == "run_command":
print(f"{OUTPUT_FORMATS['terminal']} 执行系统命令")
elif tool_name == "update_memory":
print(f"{OUTPUT_FORMATS['memory']} 更新记忆")
elif tool_name == "focus_file":
print(f"🔍 聚焦文件")
elif tool_name == "unfocus_file":
print(f"❌ 取消聚焦")
elif tool_name == "sleep":
print(f"{OUTPUT_FORMATS['info']} 等待操作")
else:
print(f"{OUTPUT_FORMATS['action']} 执行: {tool_name}")
if len(tool_names) > 1:
print(f"{OUTPUT_FORMATS['info']} 共执行 {len(tool_names)} 个操作")
except Exception as e:
logger.error(f"任务处理错误: {e}", exc_info=True)
print(f"{OUTPUT_FORMATS['error']} 任务处理失败: {e}")
# 错误时也尝试自动保存
try:
self.context_manager.auto_save_conversation()
except:
pass
async def show_conversations(self, args: str = ""):
"""显示对话列表"""
try:
limit = 10 # 默认显示最近10个对话
if args:
try:
limit = int(args)
limit = max(1, min(limit, 50)) # 限制在1-50之间
except ValueError:
print(f"{OUTPUT_FORMATS['warning']} 无效数量使用默认值10")
limit = 10
conversations = self.context_manager.get_conversation_list(limit=limit)
if not conversations["conversations"]:
print(f"{OUTPUT_FORMATS['info']} 暂无对话记录")
return
print(f"\n📚 最近 {len(conversations['conversations'])} 个对话:")
print("="*70)
for i, conv in enumerate(conversations["conversations"], 1):
# 状态图标
status_icon = "🟢" if conv["status"] == "active" else "📦" if conv["status"] == "archived" else ""
# 当前对话标记
current_mark = " [当前]" if conv["id"] == self.context_manager.current_conversation_id else ""
# 思考模式标记
mode_mark = "💭" if conv["thinking_mode"] else ""
print(f"{i:2d}. {status_icon} {conv['id'][:16]}...{current_mark}")
print(f" {mode_mark} {conv['title'][:50]}{'...' if len(conv['title']) > 50 else ''}")
print(f" 📅 {conv['updated_at'][:19]} | 💬 {conv['total_messages']} 条消息 | 🔧 {conv['total_tools']} 个工具")
print(f" 📁 {conv['project_path']}")
print()
print(f"总计: {conversations['total']} 个对话")
if conversations["has_more"]:
print(f"使用 /conversations {limit + 10} 查看更多")
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 获取对话列表失败: {e}")
async def load_conversation_command(self, args: str):
"""加载指定对话"""
if not args:
print(f"{OUTPUT_FORMATS['error']} 请指定对话ID")
print("使用方法: /load <对话ID>")
await self.show_conversations("5") # 显示最近5个对话作为提示
return
conversation_id = args.strip()
try:
success = self.context_manager.load_conversation_by_id(conversation_id)
if success:
print(f"{OUTPUT_FORMATS['success']} 对话已加载: {conversation_id}")
print(f"{OUTPUT_FORMATS['info']} 消息数量: {len(self.context_manager.conversation_history)}")
# 如果是思考模式,重置状态(下次任务会重新思考)
if self.thinking_mode:
self.api_client.start_new_task()
self.current_session_id += 1
else:
print(f"{OUTPUT_FORMATS['error']} 对话加载失败")
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 加载对话异常: {e}")
async def new_conversation_command(self, args: str = ""):
"""创建新对话"""
try:
conversation_id = self.context_manager.start_new_conversation(
project_path=self.project_path,
thinking_mode=self.thinking_mode
)
print(f"{OUTPUT_FORMATS['success']} 已创建新对话: {conversation_id}")
# 重置相关状态
if self.thinking_mode:
self.api_client.start_new_task()
self.current_session_id += 1
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 创建新对话失败: {e}")
async def save_conversation_command(self, args: str = ""):
"""手动保存当前对话"""
try:
success = self.context_manager.save_current_conversation()
if success:
print(f"{OUTPUT_FORMATS['success']} 对话已保存")
else:
print(f"{OUTPUT_FORMATS['error']} 对话保存失败")
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 保存对话异常: {e}")
# ===== 修改现有命令,集成对话管理 =====
async def clear_conversation(self, args: str = ""):
"""清除对话记录(修改版:创建新对话而不是清空)"""
if input("确认创建新对话? 当前对话将被保存 (y/n): ").lower() == 'y':
try:
# 保存当前对话
if self.context_manager.current_conversation_id:
self.context_manager.save_current_conversation()
# 创建新对话
await self.new_conversation_command()
print(f"{OUTPUT_FORMATS['success']} 已开始新对话")
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 创建新对话失败: {e}")
async def show_status(self, args: str = ""):
"""显示系统状态"""
# 上下文状态
context_status = self.context_manager.check_context_size()
# 记忆状态
memory_stats = self.memory_manager.get_memory_stats()
# 文件结构
structure = self.context_manager.get_project_structure()
# 聚焦文件状态
focused_size = sum(len(content) for content in self.focused_files.values())
# 终端会话状态
terminal_status = self.terminal_manager.list_terminals()
# 思考模式状态
thinking_status = '思考模式' if self.thinking_mode else '快速模式'
if self.thinking_mode:
thinking_status += f" ({'等待新任务' if self.api_client.current_task_first_call else '任务进行中'})"
# 新增:对话统计
conversation_stats = self.context_manager.get_conversation_statistics()
status_text = f"""
📊 系统状态:
项目路径: {self.project_path}
运行模式: {thinking_status}
当前对话: {self.context_manager.current_conversation_id or ''}
上下文使用: {context_status['usage_percent']:.1f}%
当前消息: {len(self.context_manager.conversation_history)}
聚焦文件: {len(self.focused_files)}/3 个 ({focused_size/1024:.1f}KB)
终端会话: {terminal_status['total']}/{terminal_status['max_allowed']}
当前会话ID: {self.current_session_id}
项目文件: {structure['total_files']}
项目大小: {structure['total_size'] / 1024 / 1024:.2f} MB
对话总数: {conversation_stats.get('total_conversations', 0)}
历史消息: {conversation_stats.get('total_messages', 0)}
工具调用: {conversation_stats.get('total_tools', 0)}
主记忆: {memory_stats['main_memory']['lines']}
任务记忆: {memory_stats['task_memory']['lines']}
"""
print(status_text)
async def save_state(self):
"""保存状态"""
try:
# 保存对话历史(使用新的持久化系统)
self.context_manager.save_current_conversation()
# 保存文件备注
self.context_manager.save_annotations()
print(f"{OUTPUT_FORMATS['success']} 状态已保存")
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 状态保存失败: {e}")
async def show_help(self, args: str = ""):
"""显示帮助信息"""
# 根据当前模式显示不同的帮助信息
mode_info = ""
if self.thinking_mode:
mode_info = "\n💡 思考模式:\n - 每个新任务首次调用深度思考\n - 同一任务后续调用快速响应\n - 每个新任务都会重新思考"
else:
mode_info = "\n⚡ 快速模式:\n - 不进行思考,直接响应\n - 适合简单任务和快速交互"
help_text = f"""
📚 可用命令:
/help - 显示此帮助信息
/exit - 退出系统
/status - 显示系统状态
/memory - 管理记忆
/clear - 创建新对话
/history - 显示对话历史
/files - 显示项目文件
/focused - 显示聚焦文件
/terminals - 显示终端会话
/mode - 切换运行模式
🗂️ 对话管理:
/conversations [数量] - 显示对话列表
/load <对话ID> - 加载指定对话
/new - 创建新对话
/save - 手动保存当前对话
💡 使用提示:
- 直接输入任务描述,系统会自动判断是否需要执行
- 使用 Ctrl+C 可以中断当前操作
- 重要操作会要求确认
- 所有对话都会自动保存,不用担心丢失
🔍 文件聚焦功能:
- 系统可以聚焦最多3个文件实现"边看边改"
- 聚焦的文件内容会持续显示在上下文中
- 适合需要频繁查看和修改的文件
📺 持久化终端:
- 可以打开最多3个终端会话
- 终端保持运行状态,支持交互式程序
- 使用 terminal_session 和 terminal_input 工具控制{mode_info}
"""
print(help_text)
# ===== 保持原有的其他方法不变,只需要小修改 =====
def define_tools(self) -> List[Dict]:
"""定义可用工具(添加确认工具)"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
tools = [
{
"type": "function",
"function": {
"name": "sleep",
"description": "等待指定的秒数。用于等待长时间操作完成,如安装包、编译、服务启动等。当终端或进程需要时间完成操作时使用。",
"parameters": {
"type": "object",
"properties": {
"seconds": {
"type": "number",
"description": "等待的秒数可以是小数如2.5秒。建议范围0.5-30秒"
},
"reason": {
"type": "string",
"description": "等待的原因说明(可选)"
}
},
"required": ["seconds"]
}
}
},
{
"type": "function",
"function": {
"name": "create_file",
"description": "创建新文件(仅创建空文件,正文请使用 append_to_file 追加)",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"file_type": {"type": "string", "enum": ["txt", "py", "md"], "description": "文件类型"},
"annotation": {"type": "string", "description": "文件备注"}
},
"required": ["path", "file_type", "annotation"]
}
}
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "读取/搜索/抽取 UTF-8 文本文件内容。通过 type 参数选择 read阅读、search搜索、extract具体行段支持限制返回字符数。若文件非 UTF-8 或过大,请改用 run_python。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"type": {
"type": "string",
"enum": ["read", "search", "extract"],
"description": "读取模式read=阅读、search=搜索、extract=按行抽取"
},
"max_chars": {
"type": "integer",
"description": "返回内容的最大字符数,默认与 config 一致"
},
"start_line": {
"type": "integer",
"description": "[read] 可选的起始行号1开始"
},
"end_line": {
"type": "integer",
"description": "[read] 可选的结束行号(>=start_line"
},
"query": {
"type": "string",
"description": "[search] 搜索关键词"
},
"max_matches": {
"type": "integer",
"description": "[search] 最多返回多少条命中默认5最大50"
},
"context_before": {
"type": "integer",
"description": "[search] 命中行向上追加的行数默认1最大3"
},
"context_after": {
"type": "integer",
"description": "[search] 命中行向下追加的行数默认1最大5"
},
"case_sensitive": {
"type": "boolean",
"description": "[search] 是否区分大小写,默认 false"
},
"segments": {
"type": "array",
"description": "[extract] 需要抽取的行区间",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "该片段的标签(可选)"
},
"start_line": {
"type": "integer",
"description": "起始行号(>=1"
},
"end_line": {
"type": "integer",
"description": "结束行号(>=start_line"
}
},
"required": ["start_line", "end_line"]
},
"minItems": 1
}
},
"required": ["path", "type"]
}
}
},
{
"type": "function",
"function": {
"name": "ocr_image",
"description": "使用 DeepSeek-OCR 读取图片中的文字或根据提示生成描述,仅支持本地图片路径。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "项目内的图片路径"},
"prompt": {"type": "string", "description": "传递给 OCR 模型的提示词,如“请识别图片中的文字”,必须使用中文提示词。"}
},
"required": ["path", "prompt"]
}
}
},
{
"type": "function",
"function": {
"name": "delete_file",
"description": "删除文件",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "rename_file",
"description": "重命名文件",
"parameters": {
"type": "object",
"properties": {
"old_path": {"type": "string", "description": "原文件路径"},
"new_path": {"type": "string", "description": "新文件路径"}
},
"required": ["old_path", "new_path"]
}
}
},
{
"type": "function",
"function": {
"name": "modify_file",
"description": "准备替换文件中的指定内容。调用后系统会发放写入窗口,请严格按照模板输出<<<MODIFY:path>>>…<<<END_MODIFY>>>结构。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "append_to_file",
"description": "准备向文件追加大段内容。调用后必须按照系统指令输出<<<APPEND:path>>>...<<<END_APPEND>>>格式的正文,禁止夹带解释性文字。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "create_folder",
"description": "创建文件夹",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件夹路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "focus_file",
"description": "聚焦 UTF-8 文本文件,将完整内容持续注入上下文。适合频繁查看/修改的核心文件;超过字符限制或非 UTF-8 时会拒绝。",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "unfocus_file",
"description": "取消聚焦文件,从上下文中移除",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_session",
"description": "管理持久化终端会话,可打开、关闭、列出或切换终端。请在授权工作区内执行命令,禁止启动需要完整 TTY 的程序python REPL、vim、top 等)。",
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["open", "close", "list", "switch"],
"description": "操作类型open-打开新终端close-关闭终端list-列出所有终端switch-切换活动终端"
},
"session_name": {
"type": "string",
"description": "终端会话名称open、close、switch时需要"
},
"working_dir": {
"type": "string",
"description": "工作目录相对于项目路径open时可选"
}
},
"required": ["action"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_input",
"description": "向活动终端发送命令或输入。禁止启动会占用终端界面的程序python/node/nano/vim 等);如遇卡死请结合 terminal_snapshot 并使用 terminal_reset 恢复。",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "要执行的命令或发送的输入"
},
"session_name": {
"type": "string",
"description": "目标终端会话名称(可选,默认使用活动终端)"
},
"wait_for_output": {
"type": "boolean",
"description": "是否等待输出默认true"
},
"timeout": {
"type": "number",
"description": "等待输出的最长秒数,默认使用配置项 TERMINAL_OUTPUT_WAIT"
}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal_snapshot",
"description": "获取指定终端最近的输出快照用于判断当前状态。默认返回末尾的50行可通过参数调整。",
"parameters": {
"type": "object",
"properties": {
"session_name": {
"type": "string",
"description": "目标终端会话名称(可选,默认活动终端)"
},
"lines": {
"type": "integer",
"description": "返回的最大行数(可选)"
},
"max_chars": {
"type": "integer",
"description": "返回的最大字符数(可选)"
}
}
}
}
},
{
"type": "function",
"function": {
"name": "terminal_reset",
"description": "重置指定终端:关闭当前进程并重新创建同名会话,用于从卡死或非法状态中恢复。请在总结中说明重置原因。",
"parameters": {
"type": "object",
"properties": {
"session_name": {
"type": "string",
"description": "目标终端会话名称(可选,默认活动终端)"
}
}
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": f"当现有资料不足时搜索外部信息(当前时间 {current_time})。调用前说明目的,精准撰写 query并合理设置时间/主题参数;避免重复或无意义的搜索。",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询内容(不要包含日期或时间范围)"
},
"max_results": {
"type": "integer",
"description": "最大结果数,可选"
},
"topic": {
"type": "string",
"description": "搜索主题可选值general默认/news/finance"
},
"time_range": {
"type": "string",
"description": "相对时间范围,可选 day/week/month/year支持缩写 d/w/m/y与 days 和 start_date/end_date 互斥"
},
"days": {
"type": "integer",
"description": "最近 N 天,仅当 topic=news 时可用;与 time_range、start_date/end_date 互斥"
},
"start_date": {
"type": "string",
"description": "开始日期YYYY-MM-DD必须与 end_date 同时提供,与 time_range、days 互斥"
},
"end_date": {
"type": "string",
"description": "结束日期YYYY-MM-DD必须与 start_date 同时提供,与 time_range、days 互斥"
},
"country": {
"type": "string",
"description": "国家过滤,仅 topic=general 可用,使用英文小写国名"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "extract_webpage",
"description": "在 web_search 结果不够详细时提取网页正文。调用前说明用途,注意提取内容会消耗大量 token超过80000字符将被拒绝。",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "要提取内容的网页URL"}
},
"required": ["url"]
}
}
},
{
"type": "function",
"function": {
"name": "save_webpage",
"description": "提取网页内容并保存为纯文本文件,适合需要长期留存的长文档。请提供网址与目标路径(含 .txt 后缀),落地后请通过终端命令查看。",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "要保存的网页URL"},
"target_path": {"type": "string", "description": "保存位置,包含文件名,相对于项目根目录"}
},
"required": ["url", "target_path"]
}
}
},
{
"type": "function",
"function": {
"name": "run_python",
"description": "执行一次性 Python 脚本,可用于处理二进制或非 UTF-8 文件(如 Excel、Word、PDF、图片或进行数据分析与验证。请在脚本内显式读取文件并输出结果避免长时间阻塞。",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python代码"}
},
"required": ["code"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "执行一次性终端命令适合查看文件信息file/ls/stat/iconv 等)、转换编码或调用 CLI 工具。禁止启动交互式程序;对已聚焦文件仅允许使用 grep -n 等定位命令。输出超过10000字符将被拒绝可先限制返回体量。",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "终端命令"}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "update_memory",
"description": "更新记忆文件",
"parameters": {
"type": "object",
"properties": {
"memory_type": {"type": "string", "enum": ["main", "task"], "description": "记忆类型"},
"content": {"type": "string", "description": "要添加的内容"},
"operation": {"type": "string", "enum": ["append", "replace"], "description": "操作类型"}
},
"required": ["memory_type", "content", "operation"]
}
}
},
{
"type": "function",
"function": {
"name": "todo_create",
"description": "创建待办列表,将多步骤任务拆解为最多 8 条可执行项。概述请控制在 50 字以内,直接说明清单目标;任务列表只写 2~4 条明确步骤。",
"parameters": {
"type": "object",
"properties": {
"overview": {"type": "string", "description": "一句话概述待办清单要完成的目标50 字以内。"},
"tasks": {
"type": "array",
"description": "任务列表,建议 2~4 条,每条写清“动词+对象+目标”。",
"items": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "单个任务描述,写成可执行的步骤"}
},
"required": ["title"]
},
"minItems": 1,
"maxItems": 8
}
},
"required": ["overview", "tasks"]
}
}
},
{
"type": "function",
"function": {
"name": "todo_update_task",
"description": "勾选或取消指定任务。在调整任务顺序或内容前,请先向用户说明最新理解或变更。",
"parameters": {
"type": "object",
"properties": {
"task_index": {"type": "integer", "description": "任务序号1-8"},
"completed": {"type": "boolean", "description": "true=打勾false=取消"}
},
"required": ["task_index", "completed"]
}
}
},
{
"type": "function",
"function": {
"name": "todo_finish",
"description": "尝试结束待办列表,需同步汇报每项任务结果。若仍有未完事项,请注明原因与后续建议。",
"parameters": {
"type": "object",
"properties": {
"reason": {"type": "string", "description": "可选说明"}
}
}
}
},
{
"type": "function",
"function": {
"name": "todo_finish_confirm",
"description": "在任务未完成时确认是否提前结束。若确认结束,请说明后续交付建议或遗留风险。",
"parameters": {
"type": "object",
"properties": {
"confirm": {"type": "boolean", "description": "true=确认结束false=继续执行"},
"reason": {"type": "string", "description": "确认结束时的说明"}
},
"required": ["confirm"]
}
}
},
{
"type": "function",
"function": {
"name": "close_sub_agent",
"description": "强制关闭指定子智能体,适用于长时间无响应、超时或卡死的任务。使用前请确认必要的日志/文件已保留,操作会立即终止该任务。",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "子智能体任务ID"},
"agent_id": {"type": "integer", "description": "子智能体编号1~5若缺少 task_id 可用"}
}
}
}
},
{
"type": "function",
"function": {
"name": "create_sub_agent",
"description": "创建新的子智能体任务。适合大规模信息搜集、网页提取与多文档总结等会占用大量上下文的工作需要提供任务摘要、详细要求、交付目录以及参考文件。注意同一时间最多运行5个子智能体。",
"parameters": {
"type": "object",
"properties": {
"agent_id": {"type": "integer", "description": "子智能体代号1~5"},
"summary": {"type": "string", "description": "任务摘要,简要说明目标"},
"task": {"type": "string", "description": "任务详细要求"},
"target_dir": {"type": "string", "description": "项目下用于接收交付的相对目录"},
"reference_files": {
"type": "array",
"description": "提供给子智能体的参考文件列表相对路径禁止在summary和task中直接告知子智能体引用图片的路径必须使用本参数提供",
"items": {"type": "string"},
"maxItems": 10
},
"timeout_seconds": {"type": "integer", "description": "子智能体最大运行秒数:单/双次搜索建议180秒多轮搜索整理建议300秒深度调研或长篇分析可设600秒"}
},
"required": ["agent_id", "summary", "task", "target_dir"]
}
}
},
{
"type": "function",
"function": {
"name": "wait_sub_agent",
"description": "等待指定子智能体任务结束(或超时)。任务完成后会返回交付目录,并将结果复制到指定的项目文件夹。调用时 `timeout_seconds` 应不少于对应子智能体的 `timeout_seconds`,否则可能提前终止等待。",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "子智能体任务ID"},
"agent_id": {"type": "integer", "description": "子智能体代号(可选,用于缺省 task_id 的情况)"},
"timeout_seconds": {"type": "integer", "description": "本次等待的超时时长(秒)"}
},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "trigger_easter_egg",
"description": "触发隐藏彩蛋,用于展示非功能性特效。需指定 effect 参数,例如 flood灌水或 snake贪吃蛇",
"parameters": {
"type": "object",
"properties": {
"effect": {
"type": "string",
"description": "彩蛋标识,目前支持 flood灌水与 snake贪吃蛇"
}
},
"required": ["effect"]
}
}
}
]
if self.disabled_tools:
tools = [
tool for tool in tools
if tool.get("function", {}).get("name") not in self.disabled_tools
]
return tools
async def handle_tool_call(self, tool_name: str, arguments: Dict) -> str:
"""处理工具调用(添加参数预检查和改进错误处理)"""
# 导入字符限制配置
from config import (
MAX_READ_FILE_CHARS, MAX_FOCUS_FILE_CHARS,
MAX_RUN_COMMAND_CHARS, MAX_EXTRACT_WEBPAGE_CHARS
)
# 检查是否需要确认
if tool_name in NEED_CONFIRMATION:
if not await self.confirm_action(tool_name, arguments):
return json.dumps({"success": False, "error": "用户取消操作"})
# === 新增:预检查参数大小和格式 ===
try:
# 检查参数总大小
arguments_str = json.dumps(arguments, ensure_ascii=False)
if len(arguments_str) > 50000: # 50KB限制
return json.dumps({
"success": False,
"error": f"参数过大({len(arguments_str)}字符)超过50KB限制",
"suggestion": "请分块处理或减少参数内容"
}, ensure_ascii=False)
# 针对特定工具的内容检查
if tool_name in ["modify_file", "create_file"] and "content" in arguments:
content = arguments.get("content", "")
if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 30KB内容限制
return json.dumps({
"success": False,
"error": f"文件内容过长({len(content)}字符),建议分块处理",
"suggestion": "请拆分内容或使用 modify_file 工具输出结构化补丁"
}, ensure_ascii=False)
# 检查内容中的特殊字符
if '\\' in content and content.count('\\') > len(content) / 10:
print(f"{OUTPUT_FORMATS['warning']} 检测到大量转义字符,可能存在格式问题")
except Exception as e:
return json.dumps({
"success": False,
"error": f"参数预检查失败: {str(e)}"
}, ensure_ascii=False)
try:
if tool_name == "read_file":
result = self._handle_read_tool(arguments)
elif tool_name == "ocr_image":
path = arguments.get("path")
prompt = arguments.get("prompt")
if not path:
return json.dumps({"success": False, "error": "缺少 path 参数", "warnings": []}, ensure_ascii=False)
result = self.ocr_client.ocr_image(path=path, prompt=prompt or "")
# 终端会话管理工具
elif tool_name == "terminal_session":
action = arguments["action"]
if action == "open":
result = self.terminal_manager.open_terminal(
session_name=arguments.get("session_name", "default"),
working_dir=arguments.get("working_dir"),
make_active=True
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已打开: {arguments.get('session_name', 'default')}")
elif action == "close":
result = self.terminal_manager.close_terminal(
session_name=arguments.get("session_name", "default")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已关闭: {arguments.get('session_name', 'default')}")
elif action == "list":
result = self.terminal_manager.list_terminals()
elif action == "switch":
result = self.terminal_manager.switch_terminal(
session_name=arguments.get("session_name", "default")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 切换到终端: {arguments.get('session_name', 'default')}")
else:
result = {"success": False, "error": f"未知操作: {action}"}
# 终端输入工具
elif tool_name == "terminal_input":
result = self.terminal_manager.send_to_terminal(
command=arguments["command"],
session_name=arguments.get("session_name"),
wait_for_output=arguments.get("wait_for_output", True),
timeout=arguments.get("timeout")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {arguments['command']}")
elif tool_name == "terminal_snapshot":
result = self.terminal_manager.get_terminal_snapshot(
session_name=arguments.get("session_name"),
lines=arguments.get("lines"),
max_chars=arguments.get("max_chars")
)
elif tool_name == "terminal_reset":
result = self.terminal_manager.reset_terminal(
session_name=arguments.get("session_name")
)
if result["success"]:
print(f"{OUTPUT_FORMATS['session']} 终端会话已重置: {result['session']}")
# sleep工具
elif tool_name == "sleep":
seconds = arguments.get("seconds", 1)
reason = arguments.get("reason", "等待操作完成")
# 限制最大等待时间
max_sleep = 600 # 最多等待60秒
if seconds > max_sleep:
result = {
"success": False,
"error": f"等待时间过长,最多允许 {max_sleep}",
"suggestion": f"建议分多次等待或减少等待时间"
}
else:
# 确保秒数为正数
if seconds <= 0:
result = {
"success": False,
"error": "等待时间必须大于0"
}
else:
print(f"{OUTPUT_FORMATS['info']} 等待 {seconds} 秒: {reason}")
# 执行等待
import asyncio
await asyncio.sleep(seconds)
result = {
"success": True,
"message": f"已等待 {seconds}",
"reason": reason,
"timestamp": datetime.now().isoformat()
}
print(f"{OUTPUT_FORMATS['success']} 等待完成")
elif tool_name == "create_file":
result = self.file_manager.create_file(
path=arguments["path"],
file_type=arguments["file_type"]
)
# 添加备注
if result["success"] and arguments.get("annotation"):
self.context_manager.update_annotation(
result["path"],
arguments["annotation"]
)
if result.get("success"):
result["message"] = (
f"已创建空文件: {result['path']}。请使用 append_to_file "
"追加正文内容,或使用 modify_file 进行小范围替换。"
)
elif tool_name == "delete_file":
result = self.file_manager.delete_file(arguments["path"])
# 如果删除成功,同时删除备注和聚焦
if result.get("success") and result.get("action") == "deleted":
deleted_path = result.get("path")
# 删除备注
if deleted_path in self.context_manager.file_annotations:
del self.context_manager.file_annotations[deleted_path]
self.context_manager.save_annotations()
print(f"🧹 已删除文件备注: {deleted_path}")
# 删除聚焦
if deleted_path in self.focused_files:
del self.focused_files[deleted_path]
print(f"🔍 已取消文件聚焦: {deleted_path}")
elif tool_name == "rename_file":
result = self.file_manager.rename_file(
arguments["old_path"],
arguments["new_path"]
)
# 如果重命名成功更新备注和聚焦的key
if result.get("success") and result.get("action") == "renamed":
old_path = result.get("old_path")
new_path = result.get("new_path")
# 更新备注
if old_path in self.context_manager.file_annotations:
annotation = self.context_manager.file_annotations[old_path]
del self.context_manager.file_annotations[old_path]
self.context_manager.file_annotations[new_path] = annotation
self.context_manager.save_annotations()
print(f"📝 已更新文件备注: {old_path} -> {new_path}")
# 更新聚焦
if old_path in self.focused_files:
content = self.focused_files[old_path]
del self.focused_files[old_path]
self.focused_files[new_path] = content
print(f"🔍 已更新文件聚焦: {old_path} -> {new_path}")
elif tool_name == "modify_file":
path = arguments.get("path")
if not path:
result = {"success": False, "error": "缺少必要参数: path"}
else:
if self.pending_append_request:
active_path = self.pending_append_request.get("path")
result = {
"success": False,
"error": f"当前仍有 append_to_file 任务未完成: {active_path}",
"suggestion": "请先完成追加,再继续执行 modify_file。"
}
else:
valid, error, full_path = self.file_manager._validate_path(path)
if not valid:
result = {"success": False, "error": error}
else:
relative_path = str(full_path.relative_to(self.project_path))
self.pending_modify_request = {"path": relative_path}
instructions = (
f"\n请按照以下格式输出需要替换的全部内容,标记需独立成行,缩进必须和原文完全一致(包括首行缩进和整体缩进,任何一个字符不匹配都会导致失败):\n"
f"<<<MODIFY:{relative_path}>>>\n"
"[replace:1]\n"
"<<OLD>>\n"
"(第一处需要修改的原文内容,必须逐字匹配(包含所有缩进和换行))\n"
"<<END>>\n"
"<<NEW>>\n"
"(第一处需要修改的新内容,可留空表示清空)\n"
"<<END>>\n"
"[/replace]\n"
"[replace:2]\n"
"<<OLD>>\n"
"(第二处需要修改的原文内容,必须逐字匹配,包含所有缩进和换行)\n"
"<<END>>\n"
"<<NEW>>\n"
"(第二处需要修改的新内容,可留空表示清空)\n"
"<<END>>\n"
"[/replace]\n"
"...如需更多修改,请递增序号继续添加 [replace:n] 块。\n"
"<<<END_MODIFY>>>\n"
"⚠️ 注意:每个 replace 块必须完整闭合,并且 OLD/NEW 内容必须与原始代码逐字匹配(包含所有缩进和换行)。\n"
"<<<MODIFY:{relative_path}>>>为起始标记,由**三个**<和>组成的闭合标记组成内容为“MODIFY:”+文件的相对位置,整个标记中禁止有任何的换行,空格,必须完全匹配\n"
"<<<END_MODIFY>>>为结束标记,同样由**三个**<和>组成的闭合标记组成内容为“END_MODIFY”整个标记中禁止有任何的换行空格必须完全匹配"
)
result = {
"success": True,
"awaiting_content": True,
"path": relative_path,
"message": instructions
}
elif tool_name == "append_to_file":
path = arguments.get("path")
if not path:
result = {"success": False, "error": "缺少必要参数: path"}
else:
valid, error, full_path = self.file_manager._validate_path(path)
if not valid:
result = {"success": False, "error": error}
else:
relative_path = str(full_path.relative_to(self.project_path))
self.pending_append_request = {"path": relative_path}
instructions = (
f"\n请按照以下格式输出需要追加到文件的完整内容,禁止输出任何解释性文字:\n"
f"<<<APPEND:{relative_path}>>>\n"
"(在此行之后紧接着写入要追加的全部内容,可包含多行代码)\n"
"<<<END_APPEND>>>\n"
"⚠️ 注意:<<<APPEND>>> 与 <<<END_APPEND>>> 必须成对出现,内容之间不能包含解释或额外标记。\n"
)
result = {
"success": True,
"awaiting_content": True,
"path": relative_path,
"message": instructions
}
elif tool_name == "create_folder":
result = self.file_manager.create_folder(arguments["path"])
elif tool_name == "focus_file":
path = arguments["path"]
# 检查是否已经聚焦
if path in self.focused_files:
result = {"success": False, "error": f"文件已经处于聚焦状态: {path}"}
else:
# 检查聚焦文件数量限制
if len(self.focused_files) >= 3:
result = {
"success": False,
"error": f"已达到最大聚焦文件数量(3个),当前聚焦: {list(self.focused_files.keys())}",
"suggestion": "请先使用 unfocus_file 取消部分文件的聚焦"
}
else:
# 读取文件内容
read_result = self.file_manager.read_file(path)
if read_result["success"]:
# 字符数检查
char_count = len(read_result["content"])
if char_count > MAX_FOCUS_FILE_CHARS:
result = {
"success": False,
"error": f"文件过大,有{char_count}字符请使用run_command限制字符数返回",
"char_count": char_count,
"limit": MAX_FOCUS_FILE_CHARS
}
else:
self.focused_files[path] = read_result["content"]
result = {
"success": True,
"message": f"文件已聚焦: {path}",
"focused_files": list(self.focused_files.keys()),
"file_size": len(read_result["content"])
}
print(f"🔍 文件已聚焦: {path} ({len(read_result['content'])} 字节)")
else:
result = read_result
elif tool_name == "unfocus_file":
path = arguments["path"]
if path in self.focused_files:
del self.focused_files[path]
result = {
"success": True,
"message": f"已取消文件聚焦: {path}",
"remaining_focused": list(self.focused_files.keys())
}
print(f"✖️ 已取消文件聚焦: {path}")
else:
result = {"success": False, "error": f"文件未处于聚焦状态: {path}"}
elif tool_name == "web_search":
search_response = await self.search_engine.search_with_summary(
query=arguments["query"],
max_results=arguments.get("max_results"),
topic=arguments.get("topic"),
time_range=arguments.get("time_range"),
days=arguments.get("days"),
start_date=arguments.get("start_date"),
end_date=arguments.get("end_date"),
country=arguments.get("country")
)
if search_response["success"]:
result = {
"success": True,
"summary": search_response["summary"],
"filters": search_response.get("filters", {}),
"query": search_response.get("query"),
"results": search_response.get("results", []),
"total_results": search_response.get("total_results", 0)
}
else:
result = {
"success": False,
"error": search_response.get("error", "搜索失败"),
"filters": search_response.get("filters", {}),
"query": search_response.get("query"),
"results": search_response.get("results", []),
"total_results": search_response.get("total_results", 0)
}
elif tool_name == "extract_webpage":
url = arguments["url"]
try:
# 从config获取API密钥
from config import TAVILY_API_KEY
full_content, _ = await extract_webpage_content(
urls=url,
api_key=TAVILY_API_KEY,
extract_depth="basic",
max_urls=1
)
# 字符数检查
char_count = len(full_content)
if char_count > MAX_EXTRACT_WEBPAGE_CHARS:
result = {
"success": False,
"error": f"网页提取返回了过长的{char_count}字符请不要提取这个网页可以使用网页保存功能然后使用read工具查找或查看网页",
"char_count": char_count,
"limit": MAX_EXTRACT_WEBPAGE_CHARS,
"url": url
}
else:
result = {
"success": True,
"url": url,
"content": full_content
}
except Exception as e:
result = {
"success": False,
"error": f"网页提取失败: {str(e)}",
"url": url
}
elif tool_name == "save_webpage":
url = arguments["url"]
target_path = arguments["target_path"]
try:
from config import TAVILY_API_KEY
except ImportError:
TAVILY_API_KEY = None
if not TAVILY_API_KEY or TAVILY_API_KEY == "your-tavily-api-key":
result = {
"success": False,
"error": "Tavily API密钥未配置无法保存网页",
"url": url,
"path": target_path
}
else:
try:
extract_result = await tavily_extract(
urls=url,
api_key=TAVILY_API_KEY,
extract_depth="basic",
max_urls=1
)
if not extract_result or "error" in extract_result:
error_message = extract_result.get("error", "提取失败,未返回任何内容") if isinstance(extract_result, dict) else "提取失败"
result = {
"success": False,
"error": error_message,
"url": url,
"path": target_path
}
else:
results_list = extract_result.get("results", []) if isinstance(extract_result, dict) else []
primary_result = None
for item in results_list:
if item.get("raw_content"):
primary_result = item
break
if primary_result is None and results_list:
primary_result = results_list[0]
if not primary_result:
failed_list = extract_result.get("failed_results", []) if isinstance(extract_result, dict) else []
result = {
"success": False,
"error": "提取成功结果为空,无法保存",
"url": url,
"path": target_path,
"failed": failed_list
}
else:
content_to_save = primary_result.get("raw_content") or primary_result.get("content") or ""
if not content_to_save:
result = {
"success": False,
"error": "网页内容为空,未写入文件",
"url": url,
"path": target_path
}
else:
write_result = self.file_manager.write_file(target_path, content_to_save, mode="w")
if not write_result.get("success"):
result = {
"success": False,
"error": write_result.get("error", "写入文件失败"),
"url": url,
"path": target_path
}
else:
char_count = len(content_to_save)
byte_size = len(content_to_save.encode("utf-8"))
result = {
"success": True,
"url": url,
"path": write_result.get("path", target_path),
"char_count": char_count,
"byte_size": byte_size,
"message": f"网页内容已以纯文本保存到 {write_result.get('path', target_path)},请使用终端命令查看(文件建议为 .txt"
}
if isinstance(extract_result, dict) and extract_result.get("failed_results"):
result["warnings"] = extract_result["failed_results"]
except Exception as e:
result = {
"success": False,
"error": f"网页保存失败: {str(e)}",
"url": url,
"path": target_path
}
elif tool_name == "run_python":
result = await self.terminal_ops.run_python_code(arguments["code"])
elif tool_name == "run_command":
result = await self.terminal_ops.run_command(arguments["command"])
# 字符数检查
if result.get("success") and "output" in result:
char_count = len(result["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
result = {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": arguments["command"]
}
elif tool_name == "update_memory":
memory_type = arguments["memory_type"]
content = arguments["content"]
operation = arguments["operation"]
if memory_type == "main":
if operation == "append":
success = self.memory_manager.append_main_memory(content)
else:
success = self.memory_manager.write_main_memory(content)
else:
if operation == "append":
success = self.memory_manager.append_task_memory(content)
else:
success = self.memory_manager.write_task_memory(content)
result = {"success": success}
elif tool_name == "todo_create":
result = self.todo_manager.create_todo_list(
overview=arguments.get("overview", ""),
tasks=arguments.get("tasks", [])
)
elif tool_name == "todo_update_task":
result = self.todo_manager.update_task_status(
task_index=arguments.get("task_index"),
completed=arguments.get("completed", True)
)
elif tool_name == "todo_finish":
result = self.todo_manager.finish_todo(
reason=arguments.get("reason")
)
elif tool_name == "todo_finish_confirm":
result = self.todo_manager.confirm_finish(
confirm=arguments.get("confirm", False),
reason=arguments.get("reason")
)
elif tool_name == "create_sub_agent":
result = self.sub_agent_manager.create_sub_agent(
agent_id=arguments.get("agent_id"),
summary=arguments.get("summary", ""),
task=arguments.get("task", ""),
target_dir=arguments.get("target_dir", ""),
reference_files=arguments.get("reference_files", []),
timeout_seconds=arguments.get("timeout_seconds"),
conversation_id=self.context_manager.current_conversation_id
)
elif tool_name == "wait_sub_agent":
wait_timeout = arguments.get("timeout_seconds")
if not wait_timeout:
task_ref = self.sub_agent_manager.lookup_task(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id")
)
if task_ref:
wait_timeout = task_ref.get("timeout_seconds")
result = self.sub_agent_manager.wait_for_completion(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id"),
timeout_seconds=wait_timeout
)
elif tool_name == "close_sub_agent":
result = self.sub_agent_manager.terminate_sub_agent(
task_id=arguments.get("task_id"),
agent_id=arguments.get("agent_id")
)
elif tool_name == "trigger_easter_egg":
result = self.easter_egg_manager.trigger_effect(arguments.get("effect"))
else:
result = {"success": False, "error": f"未知工具: {tool_name}"}
except Exception as e:
logger.error(f"工具执行失败: {tool_name} - {e}")
result = {"success": False, "error": f"工具执行异常: {str(e)}"}
return json.dumps(result, ensure_ascii=False)
async def confirm_action(self, action: str, arguments: Dict) -> bool:
"""确认危险操作"""
print(f"\n{OUTPUT_FORMATS['confirm']} 需要确认的操作:")
print(f" 操作: {action}")
print(f" 参数: {json.dumps(arguments, ensure_ascii=False, indent=2)}")
response = input("\n是否继续? (y/n): ").strip().lower()
return response == 'y'
def build_context(self) -> Dict:
"""构建主终端上下文"""
# 读取记忆
memory = self.memory_manager.read_main_memory()
# 构建上下文
return self.context_manager.build_main_context(memory)
def _tool_calls_followed_by_tools(self, conversation: List[Dict], start_idx: int, tool_calls: List[Dict]) -> bool:
"""判断指定助手消息的工具调用是否拥有后续的工具响应。"""
if not tool_calls:
return False
expected_ids = [tc.get("id") for tc in tool_calls if tc.get("id")]
if not expected_ids:
return False
matched: Set[str] = set()
idx = start_idx + 1
total = len(conversation)
while idx < total and len(matched) < len(expected_ids):
next_conv = conversation[idx]
role = next_conv.get("role")
if role == "tool":
call_id = next_conv.get("tool_call_id")
if call_id in expected_ids:
matched.add(call_id)
else:
break
elif role in ("assistant", "user"):
break
idx += 1
return len(matched) == len(expected_ids)
def build_messages(self, context: Dict, user_input: str) -> List[Dict]:
"""构建消息列表(添加终端内容注入)"""
# 加载系统提示
system_prompt = self.load_prompt("main_system")
# 格式化系统提示
system_prompt = system_prompt.format(
project_path=self.project_path,
file_tree=context["project_info"]["file_tree"],
memory=context["memory"],
current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
)
messages = [
{"role": "system", "content": system_prompt}
]
if self.tool_category_states.get("todo", True):
todo_prompt = self.load_prompt("todo_guidelines").strip()
if todo_prompt:
messages.append({"role": "system", "content": todo_prompt})
if self.tool_category_states.get("sub_agent", True):
sub_agent_prompt = self.load_prompt("sub_agent_guidelines").strip()
if sub_agent_prompt:
messages.append({"role": "system", "content": sub_agent_prompt})
if self.thinking_mode:
thinking_prompt = self.load_prompt("thinking_mode_guidelines").strip()
if thinking_prompt:
messages.append({"role": "system", "content": thinking_prompt})
personalization_config = load_personalization_config(self.data_dir)
personalization_block = build_personalization_prompt(personalization_config, include_header=False)
if personalization_block:
personalization_template = self.load_prompt("personalization").strip()
if personalization_template and "{personalization_block}" in personalization_template:
personalization_text = personalization_template.format(personalization_block=personalization_block)
elif personalization_template:
personalization_text = f"{personalization_template}\n{personalization_block}"
else:
personalization_text = personalization_block
messages.append({"role": "system", "content": personalization_text})
# 添加对话历史保留完整结构包括tool_calls和tool消息
conversation = context["conversation"]
for idx, conv in enumerate(conversation):
metadata = conv.get("metadata") or {}
if conv["role"] == "assistant":
# Assistant消息可能包含工具调用
message = {
"role": conv["role"],
"content": conv["content"]
}
# 如果有工具调用信息,添加到消息中
tool_calls = conv.get("tool_calls") or []
if tool_calls and self._tool_calls_followed_by_tools(conversation, idx, tool_calls):
message["tool_calls"] = tool_calls
messages.append(message)
elif conv["role"] == "tool":
# Tool消息需要保留完整结构
message = {
"role": "tool",
"content": conv["content"],
"tool_call_id": conv.get("tool_call_id", ""),
"name": conv.get("name", "")
}
messages.append(message)
elif conv["role"] == "system" and metadata.get("sub_agent_notice"):
# 转换为用户消息,让模型能及时响应
messages.append({
"role": "user",
"content": conv["content"]
})
else:
# User 或普通 System 消息
messages.append({
"role": conv["role"],
"content": conv["content"]
})
# 当前用户输入已经在conversation中了不需要重复添加
todo_message = self.context_manager.render_todo_system_message()
if todo_message:
messages.append({
"role": "system",
"content": todo_message
})
# 在最后注入聚焦文件内容作为系统消息
if self.focused_files:
focused_content = "\n\n=== 🔍 正在聚焦的文件 ===\n"
focused_content += f"(共 {len(self.focused_files)} 个文件处于聚焦状态)\n"
for path, content in self.focused_files.items():
size_kb = len(content) / 1024
focused_content += f"\n--- 文件: {path} ({size_kb:.1f}KB) ---\n"
focused_content += f"```\n{content}\n```\n"
focused_content += "\n=== 聚焦文件结束 ===\n"
focused_content += "提示:以上文件正在被聚焦,你可以直接看到完整内容并进行修改,禁止再次读取。"
messages.append({
"role": "system",
"content": focused_content
})
disabled_notice = self._format_disabled_tool_notice()
if disabled_notice:
messages.append({
"role": "system",
"content": disabled_notice
})
return messages
def load_prompt(self, name: str) -> str:
"""加载提示模板"""
prompt_file = Path(PROMPTS_DIR) / f"{name}.txt"
if prompt_file.exists():
with open(prompt_file, 'r', encoding='utf-8') as f:
return f.read()
return "你是一个AI助手。"
async def show_focused_files(self, args: str = ""):
"""显示当前聚焦的文件"""
if not self.focused_files:
print(f"{OUTPUT_FORMATS['info']} 当前没有聚焦的文件")
else:
print(f"\n🔍 聚焦文件列表 ({len(self.focused_files)}/3):")
print("="*50)
for path, content in self.focused_files.items():
size_kb = len(content) / 1024
lines = content.count('\n') + 1
print(f" 📄 {path}")
print(f" 大小: {size_kb:.1f}KB | 行数: {lines}")
print("="*50)
async def show_terminals(self, args: str = ""):
"""显示终端会话列表"""
result = self.terminal_manager.list_terminals()
if result["total"] == 0:
print(f"{OUTPUT_FORMATS['info']} 当前没有活动的终端会话")
else:
print(f"\n📺 终端会话列表 ({result['total']}/{result['max_allowed']}):")
print("="*50)
for session in result["sessions"]:
status_icon = "🟢" if session["is_running"] else "🔴"
active_mark = " [活动]" if session["is_active"] else ""
print(f" {status_icon} {session['session_name']}{active_mark}")
print(f" 工作目录: {session['working_dir']}")
print(f" Shell: {session['shell']}")
print(f" 运行时间: {session['uptime_seconds']:.1f}")
if session["is_interactive"]:
print(f" ⚠️ 等待输入")
print("="*50)
async def exit_system(self, args: str = ""):
"""退出系统"""
print(f"{OUTPUT_FORMATS['info']} 正在退出...")
# 关闭所有终端会话
self.terminal_manager.close_all()
# 保存状态
await self.save_state()
exit(0)
async def manage_memory(self, args: str = ""):
"""管理记忆"""
if not args:
print("""
🧠 记忆管理:
/memory show [main|task] - 显示记忆内容
/memory edit [main|task] - 编辑记忆
/memory clear task - 清空任务记忆
/memory merge - 合并任务记忆到主记忆
/memory backup [main|task]- 备份记忆
""")
return
parts = args.split()
action = parts[0] if parts else ""
target = parts[1] if len(parts) > 1 else "main"
if action == "show":
if target == "main":
content = self.memory_manager.read_main_memory()
else:
content = self.memory_manager.read_task_memory()
print(f"\n{'='*50}")
print(content)
print('='*50)
elif action == "clear" and target == "task":
if input("确认清空任务记忆? (y/n): ").lower() == 'y':
self.memory_manager.clear_task_memory()
elif action == "merge":
self.memory_manager.merge_memories()
elif action == "backup":
path = self.memory_manager.backup_memory(target)
if path:
print(f"备份保存到: {path}")
async def show_history(self, args: str = ""):
"""显示对话历史"""
history = self.context_manager.conversation_history[-2000:] # 最近2000条
print("\n📜 对话历史:")
print("="*50)
for conv in history:
timestamp = conv.get("timestamp", "")
if conv["role"] == "user":
role = "👤 用户"
elif conv["role"] == "assistant":
role = "🤖 助手"
elif conv["role"] == "tool":
role = f"🔧 工具[{conv.get('name', 'unknown')}]"
else:
role = conv["role"]
content = conv["content"][:100] + "..." if len(conv["content"]) > 100 else conv["content"]
print(f"\n[{timestamp[:19]}] {role}:")
print(content)
# 如果是助手消息且有工具调用,显示工具信息
if conv["role"] == "assistant" and "tool_calls" in conv and conv["tool_calls"]:
tools = [tc["function"]["name"] for tc in conv["tool_calls"]]
print(f" 🔗 调用工具: {', '.join(tools)}")
print("="*50)
async def show_files(self, args: str = ""):
"""显示项目文件"""
structure = self.context_manager.get_project_structure()
print(f"\n📁 项目文件结构:")
print(self.context_manager._build_file_tree(structure))
print(f"\n总计: {structure['total_files']} 个文件, {structure['total_size'] / 1024 / 1024:.2f} MB")
async def toggle_mode(self, args: str = ""):
"""切换运行模式(简化版)"""
if self.thinking_mode:
# 当前是思考模式,切换到快速模式
self.thinking_mode = False
self.api_client.thinking_mode = False
print(f"{OUTPUT_FORMATS['info']} 已切换到: 快速模式(不思考)")
else:
# 当前是快速模式,切换到思考模式
self.thinking_mode = True
self.api_client.thinking_mode = True
self.api_client.start_new_task()
print(f"{OUTPUT_FORMATS['info']} 已切换到: 思考模式(智能思考)")