Current status includes: - Virtual monitor surface and components - Monitor store for state management - Tool call animations and transitions - Liquid glass shader integration Known issue to fix: Tool status display timing - "正在xx" appears after tool execution completes instead of when tool call starts. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1227 lines
49 KiB
Python
1227 lines
49 KiB
Python
# utils/context_manager.py - 上下文管理器(集成对话持久化和Token统计)
|
||
|
||
import os
|
||
import json
|
||
from copy import deepcopy
|
||
from typing import Dict, List, Optional, Any
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
try:
|
||
from config import (
|
||
MAX_CONTEXT_SIZE,
|
||
DATA_DIR,
|
||
PROMPTS_DIR,
|
||
TERMINAL_SANDBOX_MOUNT_PATH,
|
||
TERMINAL_SANDBOX_CPUS,
|
||
TERMINAL_SANDBOX_MEMORY,
|
||
PROJECT_MAX_STORAGE_MB,
|
||
)
|
||
except ImportError:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).resolve().parents[1]
|
||
if str(project_root) not in sys.path:
|
||
sys.path.insert(0, str(project_root))
|
||
from config import (
|
||
MAX_CONTEXT_SIZE,
|
||
DATA_DIR,
|
||
PROMPTS_DIR,
|
||
TERMINAL_SANDBOX_MOUNT_PATH,
|
||
TERMINAL_SANDBOX_CPUS,
|
||
TERMINAL_SANDBOX_MEMORY,
|
||
PROJECT_MAX_STORAGE_MB,
|
||
)
|
||
from utils.conversation_manager import ConversationManager
|
||
|
||
class ContextManager:
|
||
def __init__(self, project_path: str, data_dir: Optional[str] = None):
|
||
self.project_path = Path(project_path).resolve()
|
||
self.initial_project_path = self.project_path
|
||
self.container_mount_path = TERMINAL_SANDBOX_MOUNT_PATH or "/workspace"
|
||
self.container_cpu_limit = TERMINAL_SANDBOX_CPUS or "未限制"
|
||
self.container_memory_limit = TERMINAL_SANDBOX_MEMORY or "未限制"
|
||
self.project_storage_limit = f"{PROJECT_MAX_STORAGE_MB}MB" if PROJECT_MAX_STORAGE_MB else "未限制"
|
||
self.workspace_root = Path(__file__).resolve().parents[1]
|
||
self.data_dir = Path(data_dir).expanduser().resolve() if data_dir else Path(DATA_DIR).resolve()
|
||
self.temp_files = {} # 临时加载的文件内容
|
||
self.file_annotations = {} # 文件备注
|
||
self.conversation_history = [] # 当前对话历史(内存中)
|
||
self.todo_list: Optional[Dict[str, Any]] = None
|
||
|
||
# 新增:对话持久化管理器
|
||
self.conversation_manager = ConversationManager(base_dir=self.data_dir)
|
||
self.current_conversation_id: Optional[str] = None
|
||
self.auto_save_enabled = True
|
||
self.main_terminal = None # 由宿主终端在初始化后回填,用于工具定义访问
|
||
|
||
# 用于接收Web终端的回调函数
|
||
self._web_terminal_callback = None
|
||
self._focused_files = {}
|
||
|
||
self.load_annotations()
|
||
|
||
# ===========================================
|
||
# Token 累计文件工具
|
||
# ===========================================
|
||
|
||
def _token_totals_path(self) -> Path:
|
||
return self.data_dir / "token_totals.json"
|
||
|
||
def _load_token_totals(self) -> Dict[str, Any]:
|
||
path = self._token_totals_path()
|
||
if not path.exists():
|
||
return {
|
||
"input_tokens": 0,
|
||
"output_tokens": 0,
|
||
"total_tokens": 0,
|
||
"updated_at": None,
|
||
}
|
||
try:
|
||
with open(path, 'r', encoding='utf-8') as fh:
|
||
payload = json.load(fh) or {}
|
||
return {
|
||
"input_tokens": int(payload.get("input_tokens") or payload.get("total_input_tokens") or 0),
|
||
"output_tokens": int(payload.get("output_tokens") or payload.get("total_output_tokens") or 0),
|
||
"total_tokens": int(payload.get("total_tokens") or 0),
|
||
"updated_at": payload.get("updated_at"),
|
||
}
|
||
except (OSError, json.JSONDecodeError, ValueError) as exc:
|
||
print(f"[TokenStats] 读取累计Token失败: {exc}")
|
||
return {
|
||
"input_tokens": 0,
|
||
"output_tokens": 0,
|
||
"total_tokens": 0,
|
||
"updated_at": None,
|
||
}
|
||
|
||
def _save_token_totals(self, data: Dict[str, Any]):
|
||
path = self._token_totals_path()
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(path, 'w', encoding='utf-8') as fh:
|
||
json.dump(data, fh, ensure_ascii=False, indent=2)
|
||
|
||
def _increment_workspace_token_totals(self, input_tokens: int, output_tokens: int, total_tokens: int):
|
||
if input_tokens <= 0 and output_tokens <= 0 and total_tokens <= 0:
|
||
return
|
||
snapshot = self._load_token_totals()
|
||
snapshot["input_tokens"] = snapshot.get("input_tokens", 0) + max(0, int(input_tokens))
|
||
snapshot["output_tokens"] = snapshot.get("output_tokens", 0) + max(0, int(output_tokens))
|
||
snapshot["total_tokens"] = snapshot.get("total_tokens", 0) + max(0, int(total_tokens))
|
||
snapshot["updated_at"] = datetime.now().isoformat()
|
||
self._save_token_totals(snapshot)
|
||
|
||
def set_web_terminal_callback(self, callback):
|
||
"""设置Web终端回调函数,用于广播事件"""
|
||
self._web_terminal_callback = callback
|
||
|
||
def set_focused_files(self, focused_files: Dict):
|
||
"""设置聚焦文件信息,用于token计算"""
|
||
self._focused_files = focused_files
|
||
|
||
def load_annotations(self):
|
||
"""加载文件备注"""
|
||
annotations_file = self.data_dir / "file_annotations.json"
|
||
if annotations_file.exists():
|
||
try:
|
||
with open(annotations_file, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
if content.strip():
|
||
self.file_annotations = json.loads(content)
|
||
else:
|
||
self.file_annotations = {}
|
||
except (json.JSONDecodeError, KeyError):
|
||
print(f"⚠️ [警告] 文件备注格式错误,将重新初始化")
|
||
self.file_annotations = {}
|
||
self.save_annotations()
|
||
|
||
def save_annotations(self):
|
||
"""保存文件备注"""
|
||
annotations_file = self.data_dir / "file_annotations.json"
|
||
with open(annotations_file, 'w', encoding='utf-8') as f:
|
||
json.dump(self.file_annotations, f, ensure_ascii=False, indent=2)
|
||
|
||
def _resolve_project_path_from_metadata(self, metadata: Dict[str, Any]) -> Path:
|
||
"""
|
||
根据对话元数据解析项目路径,优先使用相对路径以提升可移植性
|
||
"""
|
||
candidates = []
|
||
|
||
relative_path = metadata.get("project_relative_path")
|
||
if isinstance(relative_path, str) and relative_path.strip():
|
||
rel_path_obj = Path(relative_path.strip())
|
||
if rel_path_obj.is_absolute():
|
||
candidates.append(rel_path_obj)
|
||
else:
|
||
candidates.append((self.workspace_root / rel_path_obj).resolve())
|
||
|
||
stored_path = metadata.get("project_path")
|
||
if isinstance(stored_path, str) and stored_path.strip():
|
||
try:
|
||
candidates.append(Path(stored_path.strip()).expanduser())
|
||
except Exception:
|
||
pass
|
||
|
||
for candidate in candidates:
|
||
try:
|
||
if candidate.exists():
|
||
return candidate
|
||
except Exception:
|
||
continue
|
||
|
||
# 最终回退到启动时指定的路径
|
||
return self.initial_project_path
|
||
|
||
# ===========================================
|
||
# TODO 列表管理
|
||
# ===========================================
|
||
|
||
def get_todo_snapshot(self) -> Optional[Dict[str, Any]]:
|
||
if not self.todo_list:
|
||
return None
|
||
snapshot = deepcopy(self.todo_list)
|
||
snapshot["all_done"] = all(
|
||
task.get("status") == "done" for task in snapshot.get("tasks", [])
|
||
)
|
||
snapshot["instruction"] = self._build_todo_instruction(snapshot)
|
||
return snapshot
|
||
|
||
def _build_todo_instruction(self, todo: Dict[str, Any]) -> str:
|
||
status = todo.get("status", "active")
|
||
all_done = all(task.get("status") == "done" for task in todo.get("tasks", []))
|
||
if status == "closed":
|
||
return "任务已结束,请在总结中说明未完成的事项。"
|
||
if status == "completed" or all_done:
|
||
return "所有任务已完成,可以结束任务并向用户汇报"
|
||
return "请在确认完成某项任务后再勾选,然后继续下一步"
|
||
|
||
def set_todo_list(self, todo_data: Optional[Dict[str, Any]]):
|
||
if todo_data is not None:
|
||
self.todo_list = deepcopy(todo_data)
|
||
else:
|
||
self.todo_list = None
|
||
self.auto_save_conversation(force=True)
|
||
self.broadcast_todo_update()
|
||
|
||
def broadcast_todo_update(self):
|
||
if not self._web_terminal_callback:
|
||
return
|
||
try:
|
||
self._web_terminal_callback('todo_updated', {
|
||
"conversation_id": self.current_conversation_id,
|
||
"todo_list": self.get_todo_snapshot()
|
||
})
|
||
except Exception as e:
|
||
print(f"[Debug] 广播todo更新失败: {e}")
|
||
|
||
def render_todo_system_message(self) -> Optional[str]:
|
||
snapshot = self.get_todo_snapshot()
|
||
if not snapshot:
|
||
return None
|
||
|
||
lines = ["=== TODO_LIST ===", f"任务概述:{snapshot.get('overview', '')}"]
|
||
for task in snapshot.get("tasks", []):
|
||
status_icon = "✅已完成" if task.get("status") == "done" else "❌未完成"
|
||
lines.append(f"task{task.get('index')}:{task.get('title')} [{status_icon}]")
|
||
lines.append(snapshot.get("instruction", "请在确认完成某项任务后再勾选,然后继续下一步"))
|
||
return "\n".join(lines)
|
||
|
||
# ===========================================
|
||
# 新增:Token统计相关方法
|
||
# ===========================================
|
||
|
||
def apply_usage_statistics(self, usage: Dict[str, Any]) -> bool:
|
||
"""
|
||
根据模型返回的 usage 字段更新token统计
|
||
"""
|
||
try:
|
||
prompt_tokens = int(usage.get("prompt_tokens") or 0)
|
||
completion_tokens = int(usage.get("completion_tokens") or 0)
|
||
total_tokens = int(usage.get("total_tokens") or (prompt_tokens + completion_tokens))
|
||
except (TypeError, ValueError):
|
||
prompt_tokens = completion_tokens = total_tokens = 0
|
||
|
||
try:
|
||
self._increment_workspace_token_totals(prompt_tokens, completion_tokens, total_tokens)
|
||
except Exception as exc:
|
||
print(f"[TokenStats] 无法写入累计Token: {exc}")
|
||
|
||
if not self.current_conversation_id:
|
||
print("⚠️ 没有当前对话ID,跳过usage统计更新")
|
||
return False
|
||
|
||
try:
|
||
success = self.conversation_manager.update_token_statistics(
|
||
self.current_conversation_id,
|
||
prompt_tokens,
|
||
completion_tokens,
|
||
total_tokens
|
||
)
|
||
|
||
if success:
|
||
self.safe_broadcast_token_update()
|
||
|
||
return success
|
||
except Exception as e:
|
||
print(f"更新usage统计失败: {e}")
|
||
return False
|
||
|
||
def get_conversation_token_statistics(self, conversation_id: str = None) -> Optional[Dict]:
|
||
"""
|
||
获取指定对话的token统计
|
||
|
||
Args:
|
||
conversation_id: 对话ID,默认为当前对话
|
||
|
||
Returns:
|
||
Dict: Token统计信息
|
||
"""
|
||
target_id = conversation_id or self.current_conversation_id
|
||
if not target_id:
|
||
return None
|
||
|
||
return self.conversation_manager.get_token_statistics(target_id)
|
||
|
||
def get_current_context_tokens(self, conversation_id: str = None) -> int:
|
||
"""
|
||
获取最近一次请求的上下文token数量
|
||
"""
|
||
stats = self.get_conversation_token_statistics(conversation_id)
|
||
if not stats:
|
||
return 0
|
||
return stats.get("current_context_tokens", 0)
|
||
|
||
# ===========================================
|
||
# 新增:对话持久化相关方法
|
||
# ===========================================
|
||
|
||
def start_new_conversation(self, project_path: str = None, thinking_mode: bool = False, run_mode: Optional[str] = None) -> str:
|
||
"""
|
||
开始新对话
|
||
|
||
Args:
|
||
project_path: 项目路径,默认使用当前项目路径
|
||
thinking_mode: 思考模式
|
||
|
||
Returns:
|
||
str: 新对话ID
|
||
"""
|
||
if project_path is None:
|
||
project_path = str(self.project_path)
|
||
|
||
# 保存当前对话(如果有的话)
|
||
if self.current_conversation_id and self.conversation_history:
|
||
self.save_current_conversation()
|
||
|
||
# 创建新对话
|
||
conversation_id = self.conversation_manager.create_conversation(
|
||
project_path=project_path,
|
||
thinking_mode=thinking_mode,
|
||
run_mode=run_mode or ("thinking" if thinking_mode else "fast"),
|
||
initial_messages=[]
|
||
)
|
||
|
||
# 重置当前状态
|
||
self.current_conversation_id = conversation_id
|
||
self.conversation_history = []
|
||
self.todo_list = None
|
||
|
||
print(f"📝 开始新对话: {conversation_id}")
|
||
return conversation_id
|
||
|
||
def load_conversation_by_id(self, conversation_id: str) -> bool:
|
||
"""
|
||
加载指定对话
|
||
|
||
Args:
|
||
conversation_id: 对话ID
|
||
|
||
Returns:
|
||
bool: 加载是否成功
|
||
"""
|
||
# 先保存当前对话
|
||
if self.current_conversation_id and self.conversation_history:
|
||
self.save_current_conversation()
|
||
|
||
# 加载指定对话
|
||
conversation_data = self.conversation_manager.load_conversation(conversation_id)
|
||
if not conversation_data:
|
||
print(f"⌘ 对话 {conversation_id} 不存在")
|
||
return False
|
||
|
||
# 更新当前状态
|
||
self.current_conversation_id = conversation_id
|
||
self.conversation_history = conversation_data.get("messages", [])
|
||
todo_data = conversation_data.get("todo_list")
|
||
self.todo_list = deepcopy(todo_data) if todo_data else None
|
||
|
||
# 更新项目路径(如果对话中有的话)
|
||
metadata = conversation_data.get("metadata", {})
|
||
resolved_project_path = self._resolve_project_path_from_metadata(metadata)
|
||
|
||
stored_path = metadata.get("project_path")
|
||
stored_path_obj = None
|
||
if isinstance(stored_path, str) and stored_path.strip():
|
||
try:
|
||
stored_path_obj = Path(stored_path.strip()).expanduser().resolve()
|
||
except Exception:
|
||
stored_path_obj = None
|
||
|
||
if stored_path_obj and stored_path_obj != resolved_project_path:
|
||
print(f"⚠️ 对话记录中的项目路径不可用,已回退至: {resolved_project_path}")
|
||
|
||
self.project_path = resolved_project_path
|
||
|
||
run_mode = metadata.get("run_mode")
|
||
if self.main_terminal:
|
||
try:
|
||
if run_mode:
|
||
self.main_terminal.set_run_mode(run_mode)
|
||
elif metadata.get("thinking_mode"):
|
||
self.main_terminal.set_run_mode("thinking")
|
||
else:
|
||
self.main_terminal.set_run_mode("fast")
|
||
except Exception:
|
||
pass
|
||
|
||
print(f"📖 加载对话: {conversation_id} - {conversation_data.get('title', '未知标题')}")
|
||
print(f"📊 包含 {len(self.conversation_history)} 条消息")
|
||
|
||
return True
|
||
|
||
def save_current_conversation(self) -> bool:
|
||
"""
|
||
保存当前对话
|
||
|
||
Returns:
|
||
bool: 保存是否成功
|
||
"""
|
||
if not self.current_conversation_id:
|
||
print("⚠️ 没有当前对话ID,无法保存")
|
||
return False
|
||
|
||
if not self.auto_save_enabled:
|
||
return False
|
||
|
||
try:
|
||
run_mode = getattr(self.main_terminal, "run_mode", None) if hasattr(self, "main_terminal") else None
|
||
success = self.conversation_manager.save_conversation(
|
||
conversation_id=self.current_conversation_id,
|
||
messages=self.conversation_history,
|
||
project_path=str(self.project_path),
|
||
todo_list=self.todo_list,
|
||
thinking_mode=getattr(self.main_terminal, "thinking_mode", None) if hasattr(self, "main_terminal") else None,
|
||
run_mode=run_mode
|
||
)
|
||
|
||
if success:
|
||
print(f"💾 对话已自动保存: {self.current_conversation_id}")
|
||
else:
|
||
print(f"⌘ 对话保存失败: {self.current_conversation_id}")
|
||
|
||
return success
|
||
except Exception as e:
|
||
print(f"⌘ 保存对话异常: {e}")
|
||
return False
|
||
|
||
def auto_save_conversation(self, force: bool = False):
|
||
"""自动保存对话(静默模式,减少日志输出)"""
|
||
if not self.auto_save_enabled or not self.current_conversation_id:
|
||
return
|
||
if not force and not self.conversation_history:
|
||
return
|
||
try:
|
||
run_mode = getattr(self.main_terminal, "run_mode", None) if hasattr(self, "main_terminal") else None
|
||
self.conversation_manager.save_conversation(
|
||
conversation_id=self.current_conversation_id,
|
||
messages=self.conversation_history,
|
||
project_path=str(self.project_path),
|
||
todo_list=self.todo_list,
|
||
thinking_mode=getattr(self.main_terminal, "thinking_mode", None) if hasattr(self, "main_terminal") else None,
|
||
run_mode=run_mode
|
||
)
|
||
# 静默保存,不输出日志
|
||
except Exception as e:
|
||
print(f"⌘ 自动保存异常: {e}")
|
||
|
||
def get_conversation_list(self, limit: int = 50, offset: int = 0) -> Dict:
|
||
"""获取对话列表"""
|
||
return self.conversation_manager.get_conversation_list(limit=limit, offset=offset)
|
||
|
||
def delete_conversation_by_id(self, conversation_id: str) -> bool:
|
||
"""删除指定对话"""
|
||
# 如果是当前对话,清理状态
|
||
if self.current_conversation_id == conversation_id:
|
||
self.current_conversation_id = None
|
||
self.conversation_history = []
|
||
self.todo_list = None
|
||
elif self.current_conversation_id and self.conversation_history:
|
||
try:
|
||
conversation_data = self.conversation_manager.load_conversation(self.current_conversation_id)
|
||
if not conversation_data:
|
||
self.current_conversation_id = None
|
||
self.conversation_history = []
|
||
self.todo_list = None
|
||
else:
|
||
todo_data = conversation_data.get("todo_list")
|
||
self.todo_list = deepcopy(todo_data) if todo_data else None
|
||
except Exception as exc:
|
||
print(f"⌘ 刷新待办列表失败: {exc}")
|
||
self.todo_list = None
|
||
|
||
return self.conversation_manager.delete_conversation(conversation_id)
|
||
|
||
def search_conversations(self, query: str, limit: int = 20) -> List[Dict]:
|
||
"""搜索对话"""
|
||
return self.conversation_manager.search_conversations(query, limit)
|
||
|
||
def get_conversation_statistics(self) -> Dict:
|
||
"""获取对话统计"""
|
||
return self.conversation_manager.get_statistics()
|
||
|
||
def compress_conversation(self, conversation_id: str) -> Dict:
|
||
"""压缩指定对话中的大体积消息,生成新对话"""
|
||
conversation_data = self.conversation_manager.load_conversation(conversation_id)
|
||
if not conversation_data:
|
||
return {
|
||
"success": False,
|
||
"error": f"对话不存在: {conversation_id}"
|
||
}
|
||
|
||
original_messages = conversation_data.get("messages", []) or []
|
||
if not original_messages:
|
||
return {
|
||
"success": False,
|
||
"error": "当前对话没有可压缩的内容"
|
||
}
|
||
|
||
append_placeholder = "由于当前对话记录已经被压缩,此处的输出的内容不可见,如需查看内容,请直接查看创建的文件"
|
||
extract_placeholder = "由于当前对话记录已经被压缩,此处的网页提取的内容不可见,如需查看内容,请再次提取或查看保存的文件(如果当时进行了保存)"
|
||
|
||
compressed_messages: List[Dict] = []
|
||
compressed_types = set()
|
||
|
||
for message in original_messages:
|
||
new_msg = deepcopy(message)
|
||
role = new_msg.get("role")
|
||
|
||
if role == "assistant":
|
||
metadata = new_msg.get("metadata") or {}
|
||
content = new_msg.get("content") or ""
|
||
|
||
if ("<<<APPEND" in content) or metadata.get("append_payload"):
|
||
new_msg["content"] = append_placeholder
|
||
compressed_types.add("write_file_diff")
|
||
elif ("<<<MODIFY" in content) or metadata.get("modify_payload"):
|
||
new_msg["content"] = append_placeholder
|
||
compressed_types.add("write_file_diff")
|
||
|
||
elif role == "tool":
|
||
tool_name = new_msg.get("name")
|
||
raw_content = new_msg.get("content")
|
||
if tool_name == "read_file":
|
||
new_msg["content"] = append_placeholder
|
||
compressed_types.add("read_file")
|
||
elif tool_name == "write_file_diff":
|
||
new_msg["content"] = append_placeholder
|
||
compressed_types.add("write_file_diff")
|
||
else:
|
||
payload = None
|
||
|
||
if isinstance(raw_content, dict):
|
||
payload = deepcopy(raw_content)
|
||
elif isinstance(raw_content, str):
|
||
try:
|
||
payload = json.loads(raw_content)
|
||
except Exception:
|
||
payload = None
|
||
|
||
if isinstance(payload, dict):
|
||
updated = False
|
||
if tool_name == "extract_webpage" and payload.get("content"):
|
||
payload["content"] = extract_placeholder
|
||
compressed_types.add("extract_webpage")
|
||
updated = True
|
||
|
||
if updated:
|
||
new_msg["content"] = json.dumps(payload, ensure_ascii=False)
|
||
else:
|
||
if tool_name == "extract_webpage":
|
||
new_msg["content"] = extract_placeholder
|
||
compressed_types.add("extract_webpage")
|
||
|
||
compressed_messages.append(new_msg)
|
||
|
||
if not compressed_types:
|
||
return {
|
||
"success": False,
|
||
"error": "未找到需要压缩的记录"
|
||
}
|
||
|
||
type_labels = {
|
||
"write_file_diff": "文件补丁输出",
|
||
"extract_webpage": "网页提取内容",
|
||
"read_file": "文件读取内容"
|
||
}
|
||
|
||
ordered_types = [type_labels[t] for t in ["write_file_diff", "extract_webpage", "read_file"] if t in compressed_types]
|
||
summary_text = "系统提示:对话已压缩,之前的对话记录中的以下内容被替换为占位文本:" + "、".join(ordered_types) + "。其他内容不受影响,如需查看原文,请查看对应文件或重新执行相关工具。"
|
||
|
||
system_message = {
|
||
"role": "system",
|
||
"content": summary_text,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"metadata": {
|
||
"compression": {
|
||
"source_conversation_id": conversation_id,
|
||
"types": sorted(list(compressed_types))
|
||
}
|
||
}
|
||
}
|
||
compressed_messages.append(system_message)
|
||
|
||
metadata = conversation_data.get("metadata", {})
|
||
resolved_project_path = self._resolve_project_path_from_metadata(metadata)
|
||
project_path = str(resolved_project_path)
|
||
thinking_mode = metadata.get("thinking_mode", False)
|
||
run_mode = metadata.get("run_mode") or ("thinking" if thinking_mode else "fast")
|
||
|
||
compressed_conversation_id = self.conversation_manager.create_conversation(
|
||
project_path=project_path,
|
||
thinking_mode=thinking_mode,
|
||
run_mode=run_mode,
|
||
initial_messages=compressed_messages
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"compressed_conversation_id": compressed_conversation_id,
|
||
"compressed_types": sorted(list(compressed_types)),
|
||
"system_message": summary_text
|
||
}
|
||
|
||
def duplicate_conversation(self, conversation_id: str) -> Dict:
|
||
"""复制对话,生成新的对话副本"""
|
||
conversation_data = self.conversation_manager.load_conversation(conversation_id)
|
||
if not conversation_data:
|
||
return {
|
||
"success": False,
|
||
"error": f"对话不存在: {conversation_id}"
|
||
}
|
||
|
||
original_messages = deepcopy(conversation_data.get("messages", []) or [])
|
||
|
||
metadata = conversation_data.get("metadata", {})
|
||
resolved_project_path = self._resolve_project_path_from_metadata(metadata)
|
||
project_path = str(resolved_project_path)
|
||
thinking_mode = metadata.get("thinking_mode", False)
|
||
run_mode = metadata.get("run_mode") or ("thinking" if thinking_mode else "fast")
|
||
|
||
duplicate_conversation_id = self.conversation_manager.create_conversation(
|
||
project_path=project_path,
|
||
thinking_mode=thinking_mode,
|
||
run_mode=run_mode,
|
||
initial_messages=original_messages
|
||
)
|
||
|
||
token_stats = conversation_data.get("token_statistics")
|
||
if token_stats:
|
||
new_data = self.conversation_manager.load_conversation(duplicate_conversation_id)
|
||
if new_data:
|
||
new_data["token_statistics"] = deepcopy(token_stats)
|
||
new_metadata = new_data.get("metadata", {})
|
||
new_metadata["total_messages"] = metadata.get("total_messages", len(original_messages))
|
||
new_metadata["total_tools"] = metadata.get("total_tools", 0)
|
||
new_metadata["status"] = metadata.get("status", "active")
|
||
new_data["metadata"] = new_metadata
|
||
new_data["updated_at"] = datetime.now().isoformat()
|
||
self.conversation_manager._save_conversation_file(duplicate_conversation_id, new_data)
|
||
self.conversation_manager._update_index(duplicate_conversation_id, new_data)
|
||
|
||
return {
|
||
"success": True,
|
||
"duplicate_conversation_id": duplicate_conversation_id
|
||
}
|
||
|
||
# ===========================================
|
||
# 修改现有方法,集成自动保存和Token统计
|
||
# ===========================================
|
||
|
||
def safe_broadcast_token_update(self):
|
||
"""安全的token更新广播(只广播累计统计,不重新计算)"""
|
||
try:
|
||
print(f"[Debug] 尝试广播token更新")
|
||
|
||
# 检查是否有回调函数
|
||
if not hasattr(self, '_web_terminal_callback'):
|
||
print(f"[Debug] 没有_web_terminal_callback属性")
|
||
return
|
||
|
||
if not self._web_terminal_callback:
|
||
print(f"[Debug] _web_terminal_callback为None")
|
||
return
|
||
|
||
if not self.current_conversation_id:
|
||
print(f"[Debug] 没有当前对话ID")
|
||
return
|
||
|
||
print(f"[Debug] 广播token统计,对话ID: {self.current_conversation_id}")
|
||
|
||
# 只获取已有的累计token统计,不重新计算
|
||
cumulative_stats = self.get_conversation_token_statistics()
|
||
|
||
# 准备广播数据
|
||
broadcast_data = {
|
||
'conversation_id': self.current_conversation_id,
|
||
'cumulative_input_tokens': cumulative_stats.get("total_input_tokens", 0) if cumulative_stats else 0,
|
||
'cumulative_output_tokens': cumulative_stats.get("total_output_tokens", 0) if cumulative_stats else 0,
|
||
'cumulative_total_tokens': cumulative_stats.get("total_tokens", 0) if cumulative_stats else 0,
|
||
'current_context_tokens': cumulative_stats.get("current_context_tokens", 0) if cumulative_stats else 0,
|
||
'updated_at': datetime.now().isoformat()
|
||
}
|
||
|
||
print(f"[Debug] Token统计: 累计输入={broadcast_data['cumulative_input_tokens']}, 累计输出={broadcast_data['cumulative_output_tokens']}")
|
||
|
||
# 广播到前端
|
||
self._web_terminal_callback('token_update', broadcast_data)
|
||
|
||
print(f"[Debug] token更新已广播")
|
||
|
||
except Exception as e:
|
||
print(f"[Debug] 广播token更新失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def add_conversation(
|
||
self,
|
||
role: str,
|
||
content: str,
|
||
tool_calls: Optional[List[Dict]] = None,
|
||
tool_call_id: Optional[str] = None,
|
||
name: Optional[str] = None,
|
||
metadata: Optional[Dict[str, Any]] = None,
|
||
reasoning_content: Optional[str] = None
|
||
):
|
||
"""添加对话记录(改进版:集成自动保存 + 智能token统计)"""
|
||
timestamp = datetime.now().isoformat()
|
||
if role == "assistant":
|
||
message = {
|
||
"role": role,
|
||
"reasoning_content": reasoning_content if reasoning_content is not None else "",
|
||
"content": content or "",
|
||
"timestamp": timestamp
|
||
}
|
||
else:
|
||
message = {
|
||
"role": role,
|
||
"content": content,
|
||
"timestamp": timestamp
|
||
}
|
||
|
||
if metadata:
|
||
message["metadata"] = metadata
|
||
|
||
# 如果是assistant消息且有工具调用,保存完整格式
|
||
if role == "assistant" and tool_calls:
|
||
# 确保工具调用格式完整
|
||
formatted_tool_calls = []
|
||
for tc in tool_calls:
|
||
# 如果是简化格式,补全它
|
||
if "function" in tc and not tc.get("id"):
|
||
formatted_tc = {
|
||
"id": f"call_{datetime.now().timestamp()}_{tc['function'].get('name', 'unknown')}",
|
||
"type": "function",
|
||
"function": tc["function"]
|
||
}
|
||
else:
|
||
formatted_tc = tc
|
||
formatted_tool_calls.append(formatted_tc)
|
||
message["tool_calls"] = formatted_tool_calls
|
||
|
||
# 如果是tool消息,保存必要信息
|
||
if role == "tool":
|
||
if tool_call_id:
|
||
message["tool_call_id"] = tool_call_id
|
||
if name:
|
||
message["name"] = name
|
||
|
||
self.conversation_history.append(message)
|
||
|
||
# 自动保存
|
||
self.auto_save_conversation()
|
||
|
||
print(f"[Debug] 添加{role}消息后广播token更新")
|
||
self.safe_broadcast_token_update()
|
||
|
||
def add_tool_result(self, tool_call_id: str, function_name: str, result: str):
|
||
"""添加工具调用结果(保留方法以兼容)"""
|
||
self.add_conversation(
|
||
role="tool",
|
||
content=result,
|
||
tool_call_id=tool_call_id,
|
||
name=function_name
|
||
)
|
||
|
||
# ===========================================
|
||
# 废弃旧的保存/加载方法,保持兼容性
|
||
# ===========================================
|
||
|
||
def save_conversation(self):
|
||
"""保存对话历史(废弃,使用新的持久化系统)"""
|
||
print("⚠️ save_conversation() 已废弃,使用新的持久化系统")
|
||
return self.save_current_conversation()
|
||
|
||
def load_conversation(self):
|
||
"""加载对话历史(废弃,使用新的持久化系统)"""
|
||
print("⚠️ load_conversation() 已废弃,使用 load_conversation_by_id()")
|
||
# 兼容性:尝试加载最近的对话
|
||
conversations = self.get_conversation_list(limit=1)
|
||
if conversations["conversations"]:
|
||
latest_conv = conversations["conversations"][0]
|
||
return self.load_conversation_by_id(latest_conv["id"])
|
||
return False
|
||
|
||
# ===========================================
|
||
# 保持原有的其他方法不变
|
||
# ===========================================
|
||
|
||
def get_project_structure(self) -> Dict:
|
||
"""获取项目文件结构"""
|
||
structure = {
|
||
"path": str(self.project_path),
|
||
"files": [],
|
||
"folders": [],
|
||
"total_files": 0,
|
||
"total_size": 0,
|
||
"tree": {} # 新增:树形结构数据
|
||
}
|
||
|
||
# 记录实际存在的文件
|
||
existing_files = set()
|
||
|
||
def scan_directory(path: Path, level: int = 0, max_level: int = 5, parent_tree: Dict = None):
|
||
if level > max_level:
|
||
return
|
||
|
||
if parent_tree is None:
|
||
parent_tree = structure["tree"]
|
||
|
||
try:
|
||
# 获取目录内容并排序(文件夹在前,文件在后)
|
||
items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
|
||
|
||
for item in items:
|
||
if item.name.startswith('.'):
|
||
continue
|
||
|
||
relative_path = str(item.relative_to(self.project_path))
|
||
|
||
if item.is_file():
|
||
existing_files.add(relative_path) # 记录存在的文件
|
||
file_info = {
|
||
"name": item.name,
|
||
"path": relative_path,
|
||
"size": item.stat().st_size,
|
||
"modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
|
||
"annotation": self.file_annotations.get(relative_path, "")
|
||
}
|
||
structure["files"].append(file_info)
|
||
structure["total_files"] += 1
|
||
structure["total_size"] += file_info["size"]
|
||
|
||
# 添加到树形结构
|
||
parent_tree[item.name] = {
|
||
"type": "file",
|
||
"path": relative_path,
|
||
"size": file_info["size"],
|
||
"annotation": file_info["annotation"]
|
||
}
|
||
|
||
elif item.is_dir():
|
||
folder_info = {
|
||
"name": item.name,
|
||
"path": relative_path
|
||
}
|
||
structure["folders"].append(folder_info)
|
||
|
||
# 创建文件夹节点
|
||
parent_tree[item.name] = {
|
||
"type": "folder",
|
||
"path": relative_path,
|
||
"children": {}
|
||
}
|
||
|
||
# 递归扫描子目录
|
||
scan_directory(item, level + 1, max_level, parent_tree[item.name]["children"])
|
||
except PermissionError:
|
||
pass
|
||
|
||
scan_directory(self.project_path)
|
||
|
||
# 清理不存在文件的备注
|
||
invalid_annotations = []
|
||
for annotation_path in self.file_annotations.keys():
|
||
if annotation_path not in existing_files:
|
||
invalid_annotations.append(annotation_path)
|
||
|
||
if invalid_annotations:
|
||
for path in invalid_annotations:
|
||
del self.file_annotations[path]
|
||
print(f"🧹 清理无效备注: {path}")
|
||
self.save_annotations()
|
||
|
||
return structure
|
||
|
||
def load_file(self, file_path: str) -> bool:
|
||
"""加载文件到临时上下文"""
|
||
full_path = self.project_path / file_path
|
||
|
||
if not full_path.exists():
|
||
return False
|
||
|
||
if not full_path.is_file():
|
||
return False
|
||
|
||
try:
|
||
with open(full_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
self.temp_files[file_path] = content
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def unload_file(self, file_path: str) -> bool:
|
||
"""从临时上下文移除文件"""
|
||
if file_path in self.temp_files:
|
||
del self.temp_files[file_path]
|
||
return True
|
||
return False
|
||
|
||
def update_annotation(self, file_path: str, annotation: str):
|
||
"""更新文件备注"""
|
||
self.file_annotations[file_path] = annotation
|
||
self.save_annotations()
|
||
|
||
def load_prompt(self, prompt_name: str) -> str:
|
||
"""加载prompt模板"""
|
||
prompt_file = Path(PROMPTS_DIR) / f"{prompt_name}.txt"
|
||
if prompt_file.exists():
|
||
with open(prompt_file, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
return ""
|
||
|
||
def build_main_context(self, memory_content: str) -> Dict:
|
||
"""构建主终端上下文"""
|
||
structure = self.get_project_structure()
|
||
|
||
context = {
|
||
"project_info": {
|
||
"path": str(self.project_path),
|
||
"file_tree": self._build_file_tree(structure),
|
||
"file_annotations": self.file_annotations,
|
||
"statistics": {
|
||
"total_files": structure["total_files"],
|
||
"total_size": f"{structure['total_size'] / 1024 / 1024:.2f}MB"
|
||
}
|
||
},
|
||
"memory": memory_content,
|
||
"conversation": self.conversation_history,
|
||
"todo_list": self.get_todo_snapshot()
|
||
}
|
||
|
||
return context
|
||
|
||
def build_task_context(
|
||
self,
|
||
task_info: Dict,
|
||
main_memory: str,
|
||
task_memory: str,
|
||
execution_results: List[Dict] = None
|
||
) -> Dict:
|
||
"""构建子任务上下文"""
|
||
structure = self.get_project_structure()
|
||
|
||
context = {
|
||
"task_info": task_info,
|
||
"project_info": {
|
||
"path": str(self.project_path),
|
||
"file_tree": self._build_file_tree(structure),
|
||
"file_annotations": self.file_annotations
|
||
},
|
||
"memory": {
|
||
"main_memory": main_memory,
|
||
"task_memory": task_memory
|
||
},
|
||
"temp_files": self.temp_files,
|
||
"execution_results": execution_results or [],
|
||
"conversation": {
|
||
"main": self.conversation_history[-10:], # 最近10条主对话
|
||
"sub": [] # 子任务对话
|
||
}
|
||
}
|
||
|
||
return context
|
||
|
||
def _build_file_tree(self, structure: Dict) -> str:
|
||
"""构建文件树字符串(修复版:正确显示树形结构)"""
|
||
if not structure.get("tree"):
|
||
return f"📁 {structure['path']}/\n(空项目)"
|
||
|
||
lines = []
|
||
project_name = Path(structure['path']).name
|
||
container_root = (self.container_mount_path or "").strip() or "/workspace"
|
||
container_root = container_root.rstrip("/") or "/"
|
||
if container_root == "/":
|
||
root_label = f"/ (项目根)"
|
||
elif container_root.endswith(project_name):
|
||
root_label = f"{container_root} (项目根)"
|
||
else:
|
||
root_label = f"{container_root} (映射自 {project_name})"
|
||
lines.append(f"📁 {root_label}/")
|
||
|
||
ROOT_FOLDER_CHILD_LIMIT = 20
|
||
|
||
def count_descendants(item: Dict) -> int:
|
||
"""计算某个文件夹下(含多层)所有子项数量。"""
|
||
if item.get("type") != "folder":
|
||
return 0
|
||
children = item.get("children") or {}
|
||
total = len(children)
|
||
for child in children.values():
|
||
if child.get("type") == "folder":
|
||
total += count_descendants(child)
|
||
return total
|
||
|
||
def build_tree_recursive(tree_dict: Dict, prefix: str = "", depth: int = 0):
|
||
"""递归构建树形结构"""
|
||
if not tree_dict:
|
||
return
|
||
|
||
# 将项目按类型和名称排序:文件夹在前,文件在后,同类型按名称排序
|
||
items = list(tree_dict.items())
|
||
folders = [(name, info) for name, info in items if info["type"] == "folder"]
|
||
files = [(name, info) for name, info in items if info["type"] == "file"]
|
||
|
||
# 排序
|
||
folders.sort(key=lambda x: x[0].lower())
|
||
files.sort(key=lambda x: x[0].lower())
|
||
|
||
# 合并列表
|
||
sorted_items = folders + files
|
||
|
||
for i, (name, info) in enumerate(sorted_items):
|
||
is_last = (i == len(sorted_items) - 1)
|
||
|
||
# 选择连接符
|
||
if is_last:
|
||
current_connector = "└── "
|
||
next_prefix = prefix + " "
|
||
else:
|
||
current_connector = "├── "
|
||
next_prefix = prefix + "│ "
|
||
|
||
if info["type"] == "folder":
|
||
# 文件夹
|
||
lines.append(f"{prefix}{current_connector}📁 {name}/")
|
||
|
||
# 递归处理子项目
|
||
children = info.get("children") or {}
|
||
if depth == 0:
|
||
total_entries = count_descendants(info)
|
||
else:
|
||
total_entries = None
|
||
if depth == 0 and total_entries is not None and total_entries > ROOT_FOLDER_CHILD_LIMIT:
|
||
lines.append(
|
||
f"{next_prefix}… (该目录包含 {total_entries} 项,已省略以控制 prompt 体积)"
|
||
)
|
||
elif children:
|
||
build_tree_recursive(children, next_prefix, depth + 1)
|
||
else:
|
||
# 文件
|
||
icon = self._get_file_icon(name)
|
||
size_info = self._format_file_size(info['size'])
|
||
|
||
# 构建文件行
|
||
file_line = f"{prefix}{current_connector}{icon} {name}"
|
||
|
||
# 添加大小信息(简化版)
|
||
if info['size'] > 1024: # 只显示大于1KB的文件大小
|
||
file_line += f" {size_info}"
|
||
|
||
# 添加备注
|
||
if info.get('annotation'):
|
||
file_line += f" # {info['annotation']}"
|
||
|
||
lines.append(file_line)
|
||
|
||
# 构建树形结构
|
||
build_tree_recursive(structure["tree"])
|
||
|
||
# 添加统计信息
|
||
lines.append("")
|
||
lines.append(f"📊 统计: {structure['total_files']} 个文件, {structure['total_size']/1024/1024:.2f}MB")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _format_file_size(self, size_bytes: int) -> str:
|
||
"""格式化文件大小"""
|
||
if size_bytes < 1024:
|
||
return f"({size_bytes}B)"
|
||
elif size_bytes < 1024 * 1024:
|
||
return f"({size_bytes/1024:.1f}KB)"
|
||
else:
|
||
return f"({size_bytes/1024/1024:.1f}MB)"
|
||
|
||
def _get_file_icon(self, filename: str) -> str:
|
||
"""根据文件类型返回合适的图标"""
|
||
ext = filename.split('.')[-1].lower() if '.' in filename else ''
|
||
|
||
icon_map = {
|
||
'py': '🐍', # Python
|
||
'js': '📜', # JavaScript
|
||
'ts': '📘', # TypeScript
|
||
'jsx': '⚛️', # React JSX
|
||
'tsx': '⚛️', # React TSX
|
||
'java': '☕', # Java
|
||
'cpp': '⚙️', # C++
|
||
'c': '⚙️', # C
|
||
'h': '📎', # Header files
|
||
'cs': '💷', # C#
|
||
'go': '🐹', # Go
|
||
'rs': '🦀', # Rust
|
||
'rb': '💎', # Ruby
|
||
'php': '🐘', # PHP
|
||
'swift': '🦉', # Swift
|
||
'kt': '🟣', # Kotlin
|
||
'md': '📝', # Markdown
|
||
'txt': '📄', # Text
|
||
'json': '📊', # JSON
|
||
'yaml': '📋', # YAML
|
||
'yml': '📋', # YAML
|
||
'toml': '📋', # TOML
|
||
'xml': '📰', # XML
|
||
'html': '🌐', # HTML
|
||
'css': '🎨', # CSS
|
||
'scss': '🎨', # SCSS
|
||
'less': '🎨', # LESS
|
||
'sql': '🗃️', # SQL
|
||
'db': '🗄️', # Database
|
||
'sh': '💻', # Shell script
|
||
'bash': '💻', # Bash script
|
||
'bat': '💻', # Batch file
|
||
'ps1': '💻', # PowerShell
|
||
'env': '🔧', # Environment
|
||
'gitignore': '🚫', # Gitignore
|
||
'dockerfile': '🐳', # Docker
|
||
'png': '🖼️', # Image
|
||
'jpg': '🖼️', # Image
|
||
'jpeg': '🖼️', # Image
|
||
'gif': '🖼️', # Image
|
||
'svg': '🖼️', # Image
|
||
'ico': '🖼️', # Icon
|
||
'mp4': '🎬', # Video
|
||
'mp3': '🎵', # Audio
|
||
'wav': '🎵', # Audio
|
||
'pdf': '📕', # PDF
|
||
'doc': '📘', # Word
|
||
'docx': '📘', # Word
|
||
'xls': '📗', # Excel
|
||
'xlsx': '📗', # Excel
|
||
'ppt': '📙', # PowerPoint
|
||
'pptx': '📙', # PowerPoint
|
||
'zip': '📦', # Archive
|
||
'rar': '📦', # Archive
|
||
'tar': '📦', # Archive
|
||
'gz': '📦', # Archive
|
||
'log': '📋', # Log file
|
||
'lock': '🔒', # Lock file
|
||
}
|
||
|
||
return icon_map.get(ext, '📄') # 默认文件图标
|
||
|
||
def check_context_size(self) -> Dict:
|
||
"""检查上下文大小"""
|
||
sizes = {
|
||
"temp_files": sum(len(content) for content in self.temp_files.values()),
|
||
"conversation": sum(len(json.dumps(msg, ensure_ascii=False)) for msg in self.conversation_history),
|
||
"total": 0
|
||
}
|
||
sizes["total"] = sum(sizes.values())
|
||
|
||
return {
|
||
"sizes": sizes,
|
||
"is_overflow": sizes["total"] > MAX_CONTEXT_SIZE,
|
||
"usage_percent": (sizes["total"] / MAX_CONTEXT_SIZE) * 100
|
||
}
|
||
def build_messages(self, context: Dict, user_input: str) -> List[Dict]:
|
||
"""构建消息列表(添加终端内容注入)"""
|
||
# 加载系统提示
|
||
system_prompt = self.load_prompt("main_system")
|
||
|
||
# 格式化系统提示
|
||
container_path = self.container_mount_path or "/workspace"
|
||
container_cpus = self.container_cpu_limit
|
||
container_memory = self.container_memory_limit
|
||
project_storage = self.project_storage_limit
|
||
system_prompt = system_prompt.format(
|
||
project_path=container_path,
|
||
container_path=container_path,
|
||
container_cpus=container_cpus,
|
||
container_memory=container_memory,
|
||
project_storage=project_storage,
|
||
file_tree=context["project_info"]["file_tree"],
|
||
memory=context["memory"],
|
||
current_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
)
|
||
|
||
messages = [
|
||
{"role": "system", "content": system_prompt}
|
||
]
|
||
|
||
# 添加对话历史
|
||
for conv in context["conversation"]:
|
||
if conv["role"] == "assistant":
|
||
message = {
|
||
"role": conv["role"],
|
||
"content": conv["content"]
|
||
}
|
||
reasoning = conv.get("reasoning_content")
|
||
if reasoning:
|
||
message["reasoning_content"] = reasoning
|
||
if "tool_calls" in conv and conv["tool_calls"]:
|
||
message["tool_calls"] = conv["tool_calls"]
|
||
messages.append(message)
|
||
elif conv["role"] == "tool":
|
||
message = {
|
||
"role": "tool",
|
||
"content": conv["content"],
|
||
"tool_call_id": conv.get("tool_call_id", ""),
|
||
"name": conv.get("name", "")
|
||
}
|
||
messages.append(message)
|
||
else:
|
||
messages.append({
|
||
"role": conv["role"],
|
||
"content": conv["content"]
|
||
})
|
||
|
||
# 添加聚焦文件内容
|
||
if self._focused_files:
|
||
focused_content = "\n\n=== 🔍 正在聚焦的文件 ===\n"
|
||
focused_content += f"(共 {len(self._focused_files)} 个文件处于聚焦状态)\n"
|
||
|
||
for path, content in self._focused_files.items():
|
||
size_kb = len(content) / 1024
|
||
focused_content += f"\n--- 文件: {path} ({size_kb:.1f}KB) ---\n"
|
||
focused_content += f"```\n{content}\n```\n"
|
||
|
||
focused_content += "\n=== 聚焦文件结束 ===\n"
|
||
messages.append({
|
||
"role": "system",
|
||
"content": focused_content
|
||
})
|
||
|
||
# 添加终端内容(如果有的话)
|
||
# 这里需要从参数传入或获取
|
||
|
||
return messages
|