agent-Specialization/utils/conversation_manager.py

937 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# utils/conversation_manager.py - 对话持久化管理器集成Token统计
import json
import os
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
try:
from config import DATA_DIR
except ImportError:
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import DATA_DIR
@dataclass
class ConversationMetadata:
"""对话元数据"""
id: str
title: str
created_at: str
updated_at: str
project_path: Optional[str]
project_relative_path: Optional[str]
thinking_mode: bool
total_messages: int
total_tools: int
run_mode: str = "fast"
model_key: Optional[str] = None
has_images: bool = False
status: str = "active" # active, archived, error
class ConversationManager:
"""对话持久化管理器"""
def __init__(self, base_dir: Optional[str] = None):
self.base_dir = Path(base_dir).expanduser().resolve() if base_dir else Path(DATA_DIR).resolve()
self.conversations_dir = self.base_dir / "conversations"
self.index_file = self.conversations_dir / "index.json"
self.current_conversation_id: Optional[str] = None
self.workspace_root = Path(__file__).resolve().parents[1]
self._ensure_directories()
self._index_verified = False
self._load_index(ensure_integrity=True)
def _ensure_directories(self):
"""确保必要的目录存在"""
self.base_dir.mkdir(parents=True, exist_ok=True)
self.conversations_dir.mkdir(parents=True, exist_ok=True)
# 如果索引文件不存在,创建空索引
if not self.index_file.exists():
self._save_index({})
def _iter_conversation_files(self):
"""遍历对话文件(排除索引文件)"""
for path in self.conversations_dir.glob("*.json"):
if path == self.index_file:
continue
yield path
def _rebuild_index_from_files(self) -> Dict:
"""从现有对话文件重建索引"""
rebuilt_index: Dict[str, Dict] = {}
for file_path in self._iter_conversation_files():
try:
with open(file_path, "r", encoding="utf-8") as f:
raw = f.read().strip()
if not raw:
continue
data = json.loads(raw)
except Exception as exc:
print(f"⚠️ 重建索引时跳过 {file_path.name}: {exc}")
continue
conv_id = data.get("id") or file_path.stem
metadata = data.get("metadata", {}) or {}
rebuilt_index[conv_id] = {
"title": data.get("title") or "未命名对话",
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"project_path": metadata.get("project_path"),
"project_relative_path": metadata.get("project_relative_path"),
"thinking_mode": metadata.get("thinking_mode", False),
"run_mode": metadata.get("run_mode") or ("thinking" if metadata.get("thinking_mode") else "fast"),
"model_key": metadata.get("model_key"),
"has_images": metadata.get("has_images", False),
"total_messages": metadata.get("total_messages", 0),
"total_tools": metadata.get("total_tools", 0),
"status": metadata.get("status", "active"),
}
if rebuilt_index:
print(f"🔄 已从对话文件重建索引,共 {len(rebuilt_index)} 条记录")
return rebuilt_index
def _index_missing_conversations(self, index: Dict) -> bool:
"""检查索引是否缺失本地对话文件"""
index_ids = set(index.keys())
for file_path in self._iter_conversation_files():
conv_id = file_path.stem
if conv_id and conv_id not in index_ids:
print(f"🔍 对话 {conv_id} 未出现在索引中,将重建索引。")
return True
return False
def _load_index(self, ensure_integrity: bool = False) -> Dict:
"""加载对话索引,可选地在缺失时自动重建"""
try:
index: Dict = {}
if self.index_file.exists():
with open(self.index_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content:
index = json.loads(content)
if index:
if ensure_integrity and not self._index_verified:
if self._index_missing_conversations(index):
rebuilt = self._rebuild_index_from_files()
if rebuilt:
self._save_index(rebuilt)
index = rebuilt
self._index_verified = True
return index
# 索引为空但对话文件仍然存在时尝试重建
rebuilt = self._rebuild_index_from_files()
if rebuilt:
self._save_index(rebuilt)
if ensure_integrity:
self._index_verified = True
return rebuilt
return {}
# 索引缺失但存在对话文件时重建
rebuilt = self._rebuild_index_from_files()
if rebuilt:
self._save_index(rebuilt)
if ensure_integrity:
self._index_verified = True
return rebuilt
return index
except (json.JSONDecodeError, Exception) as e:
print(f"⚠️ 加载对话索引失败,将尝试重建: {e}")
backup_path = self.index_file.with_name(
f"{self.index_file.stem}_corrupt_{int(time.time())}{self.index_file.suffix}"
)
try:
if self.index_file.exists():
self.index_file.replace(backup_path)
print(f"🗄️ 已备份损坏的索引文件到: {backup_path.name}")
except Exception as backup_exc:
print(f"⚠️ 备份损坏索引文件失败: {backup_exc}")
rebuilt = self._rebuild_index_from_files()
if rebuilt:
self._save_index(rebuilt)
if ensure_integrity:
self._index_verified = True
return rebuilt
return {}
def _save_index(self, index: Dict):
"""保存对话索引"""
temp_file = self.index_file.with_suffix(self.index_file.suffix + ".tmp")
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(index, f, ensure_ascii=False, indent=2)
os.replace(temp_file, self.index_file)
except Exception as e:
try:
if temp_file.exists():
temp_file.unlink()
except Exception:
pass
print(f"⌘ 保存对话索引失败: {e}")
def _generate_conversation_id(self) -> str:
"""生成唯一的对话ID"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 添加毫秒确保唯一性
ms = int(time.time() * 1000) % 1000
return f"conv_{timestamp}_{ms:03d}"
def _get_conversation_file_path(self, conversation_id: str) -> Path:
"""获取对话文件路径"""
return self.conversations_dir / f"{conversation_id}.json"
def _extract_title_from_messages(self, messages: List[Dict]) -> str:
"""从消息中提取标题"""
# 找到第一个用户消息作为标题
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "").strip()
if content:
# 取前50个字符作为标题
title = content[:50]
if len(content) > 50:
title += "..."
return title
return "新对话"
def _count_tools_in_messages(self, messages: List[Dict]) -> int:
"""统计消息中的工具调用数量"""
tool_count = 0
for msg in messages:
if msg.get("role") == "assistant" and "tool_calls" in msg:
tool_calls = msg.get("tool_calls", [])
tool_count += len(tool_calls) if isinstance(tool_calls, list) else 0
elif msg.get("role") == "tool":
tool_count += 1
return tool_count
def _prepare_project_path_metadata(self, project_path: Optional[str]) -> Dict[str, Optional[str]]:
"""
将项目路径规范化为绝对/相对形式,便于在不同机器间迁移
"""
normalized = {
"project_path": None,
"project_relative_path": None
}
if not project_path:
return normalized
try:
absolute_path = Path(project_path).expanduser().resolve()
normalized["project_path"] = str(absolute_path)
try:
relative_path = absolute_path.relative_to(self.workspace_root)
normalized["project_relative_path"] = relative_path.as_posix()
except ValueError:
normalized["project_relative_path"] = None
except Exception:
# 回退为原始字符串,至少不会阻止对话保存
normalized["project_path"] = str(project_path)
normalized["project_relative_path"] = None
return normalized
def _initialize_token_statistics(self) -> Dict:
"""初始化Token统计结构"""
now = datetime.now().isoformat()
return {
"total_input_tokens": 0,
"total_output_tokens": 0,
"total_tokens": 0,
"current_context_tokens": 0,
"updated_at": now
}
def _validate_token_statistics(self, data: Dict) -> Dict:
"""验证并修复Token统计数据"""
token_stats = data.get("token_statistics", {})
# 确保必要字段存在
defaults = self._initialize_token_statistics()
for key, default_value in defaults.items():
if key not in token_stats:
token_stats[key] = default_value
# 确保数值类型正确
try:
token_stats["total_input_tokens"] = int(token_stats.get("total_input_tokens", 0))
token_stats["total_output_tokens"] = int(token_stats.get("total_output_tokens", 0))
token_stats["total_tokens"] = int(token_stats.get("total_tokens", 0))
token_stats["current_context_tokens"] = int(token_stats.get("current_context_tokens", 0))
except (ValueError, TypeError):
print("⚠️ Token统计数据损坏重置为0")
token_stats = defaults
data["token_statistics"] = token_stats
return data
def create_conversation(
self,
project_path: str,
thinking_mode: bool = False,
run_mode: str = "fast",
initial_messages: List[Dict] = None,
model_key: Optional[str] = None,
has_images: bool = False
) -> str:
"""
创建新对话
Args:
project_path: 项目路径
thinking_mode: 思考模式
initial_messages: 初始消息列表
Returns:
conversation_id: 对话ID
"""
conversation_id = self._generate_conversation_id()
messages = initial_messages or []
# 创建对话数据
path_metadata = self._prepare_project_path_metadata(project_path)
normalized_mode = run_mode if run_mode in {"fast", "thinking", "deep"} else ("thinking" if thinking_mode else "fast")
conversation_data = {
"id": conversation_id,
"title": self._extract_title_from_messages(messages),
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"messages": messages,
"todo_list": None,
"metadata": {
"project_path": path_metadata["project_path"],
"project_relative_path": path_metadata["project_relative_path"],
"thinking_mode": thinking_mode,
"run_mode": normalized_mode,
"model_key": model_key,
"has_images": has_images,
"total_messages": len(messages),
"total_tools": self._count_tools_in_messages(messages),
"status": "active"
},
"token_statistics": self._initialize_token_statistics() # 新增
}
# 保存对话文件
self._save_conversation_file(conversation_id, conversation_data)
# 更新索引
self._update_index(conversation_id, conversation_data)
self.current_conversation_id = conversation_id
print(f"📝 创建新对话: {conversation_id} - {conversation_data['title']}")
return conversation_id
def update_conversation_title(self, conversation_id: str, title: str) -> bool:
"""更新对话标题并刷新索引。"""
if not conversation_id or not title:
return False
try:
data = self.load_conversation(conversation_id)
if not data:
return False
data["title"] = title
data["updated_at"] = datetime.now().isoformat()
meta = data.get("metadata", {}) or {}
meta["title_locked"] = True
data["metadata"] = meta
self._save_conversation_file(conversation_id, data)
self._update_index(conversation_id, data)
if self.current_conversation_id == conversation_id:
self.current_conversation_title = title
return True
except Exception as exc:
print(f"⚠️ 更新对话标题失败 {conversation_id}: {exc}")
return False
def _save_conversation_file(self, conversation_id: str, data: Dict):
"""保存对话文件"""
file_path = self._get_conversation_file_path(conversation_id)
temp_file = file_path.with_suffix(file_path.suffix + ".tmp")
try:
# 确保Token统计数据有效
data = self._validate_token_statistics(data)
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(temp_file, file_path)
except Exception as e:
try:
if temp_file.exists():
temp_file.unlink()
except Exception:
pass
print(f"⌘ 保存对话文件失败 {conversation_id}: {e}")
def _update_index(self, conversation_id: str, conversation_data: Dict):
"""更新对话索引"""
try:
index = self._load_index()
# 创建元数据
metadata = ConversationMetadata(
id=conversation_id,
title=conversation_data["title"],
created_at=conversation_data["created_at"],
updated_at=conversation_data["updated_at"],
project_path=conversation_data["metadata"]["project_path"],
project_relative_path=conversation_data["metadata"].get("project_relative_path"),
thinking_mode=conversation_data["metadata"]["thinking_mode"],
run_mode=conversation_data["metadata"].get("run_mode", "thinking" if conversation_data["metadata"]["thinking_mode"] else "fast"),
total_messages=conversation_data["metadata"]["total_messages"],
total_tools=conversation_data["metadata"]["total_tools"],
status=conversation_data["metadata"].get("status", "active")
)
# 添加到索引
index[conversation_id] = {
"title": metadata.title,
"created_at": metadata.created_at,
"updated_at": metadata.updated_at,
"project_path": metadata.project_path,
"project_relative_path": metadata.project_relative_path,
"thinking_mode": metadata.thinking_mode,
"run_mode": metadata.run_mode,
"model_key": conversation_data["metadata"].get("model_key"),
"has_images": conversation_data["metadata"].get("has_images", False),
"total_messages": metadata.total_messages,
"total_tools": metadata.total_tools,
"status": metadata.status
}
self._save_index(index)
except Exception as e:
print(f"⌘ 更新对话索引失败: {e}")
def save_conversation(
self,
conversation_id: str,
messages: List[Dict],
project_path: str = None,
thinking_mode: bool = None,
run_mode: Optional[str] = None,
todo_list: Optional[Dict] = None,
model_key: Optional[str] = None,
has_images: Optional[bool] = None
) -> bool:
"""
保存对话(更新现有对话)
Args:
conversation_id: 对话ID
messages: 消息列表
project_path: 项目路径
thinking_mode: 思考模式
Returns:
bool: 保存是否成功
"""
try:
# 加载现有对话数据
existing_data = self.load_conversation(conversation_id)
if not existing_data:
print(f"⚠️ 对话 {conversation_id} 不存在,无法更新")
return False
# 更新数据
existing_data["messages"] = messages
existing_data["updated_at"] = datetime.now().isoformat()
# 更新标题(如未锁定自动标题时,仍按首条消息回退)
title_locked = existing_data.get("metadata", {}).get("title_locked", False)
if not title_locked:
new_title = self._extract_title_from_messages(messages)
if new_title != "新对话":
existing_data["title"] = new_title
# 更新元数据
if project_path is not None:
path_metadata = self._prepare_project_path_metadata(project_path)
existing_data["metadata"]["project_path"] = path_metadata["project_path"]
existing_data["metadata"]["project_relative_path"] = path_metadata["project_relative_path"]
else:
existing_data["metadata"].setdefault("project_relative_path", None)
if thinking_mode is not None:
existing_data["metadata"]["thinking_mode"] = thinking_mode
if run_mode:
normalized_mode = run_mode if run_mode in {"fast", "thinking", "deep"} else (
"thinking" if existing_data["metadata"].get("thinking_mode") else "fast"
)
existing_data["metadata"]["run_mode"] = normalized_mode
elif "run_mode" not in existing_data["metadata"]:
existing_data["metadata"]["run_mode"] = "thinking" if existing_data["metadata"].get("thinking_mode") else "fast"
# 推断最新使用的模型(优先参数,其次倒序扫描助手消息)
inferred_model = None
if model_key is None:
for msg in reversed(messages):
if msg.get("role") != "assistant":
continue
msg_meta = msg.get("metadata") or {}
mk = msg_meta.get("model_key")
if mk:
inferred_model = mk
break
target_model = model_key if model_key is not None else inferred_model
if target_model is not None:
existing_data["metadata"]["model_key"] = target_model
elif "model_key" not in existing_data["metadata"]:
existing_data["metadata"]["model_key"] = None
if has_images is not None:
existing_data["metadata"]["has_images"] = bool(has_images)
elif "has_images" not in existing_data["metadata"]:
existing_data["metadata"]["has_images"] = False
existing_data["metadata"]["total_messages"] = len(messages)
existing_data["metadata"]["total_tools"] = self._count_tools_in_messages(messages)
# 更新待办列表
existing_data["todo_list"] = todo_list
# 确保Token统计结构存在向后兼容
if "token_statistics" not in existing_data:
existing_data["token_statistics"] = self._initialize_token_statistics()
else:
existing_data["token_statistics"]["updated_at"] = datetime.now().isoformat()
# 保存文件
self._save_conversation_file(conversation_id, existing_data)
# 更新索引
self._update_index(conversation_id, existing_data)
return True
except Exception as e:
print(f"⌘ 保存对话失败 {conversation_id}: {e}")
return False
def load_conversation(self, conversation_id: str) -> Optional[Dict]:
"""
加载对话数据
Args:
conversation_id: 对话ID
Returns:
Dict: 对话数据如果不存在返回None
"""
try:
file_path = self._get_conversation_file_path(conversation_id)
if not file_path.exists():
return None
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
if not content:
return None
data = json.loads(content)
metadata = data.get("metadata", {})
if "project_relative_path" not in metadata:
metadata["project_relative_path"] = None
self._save_conversation_file(conversation_id, data)
print(f"🔧 为对话 {conversation_id} 添加相对路径字段")
# 向后兼容确保Token统计结构存在
if "token_statistics" not in data:
data["token_statistics"] = self._initialize_token_statistics()
# 自动保存修复后的数据
self._save_conversation_file(conversation_id, data)
print(f"🔧 为对话 {conversation_id} 添加Token统计结构")
else:
# 验证现有Token统计数据
data = self._validate_token_statistics(data)
if "run_mode" not in metadata:
metadata["run_mode"] = "thinking" if metadata.get("thinking_mode") else "fast"
self._save_conversation_file(conversation_id, data)
print(f"🔧 为对话 {conversation_id} 添加运行模式字段")
# 回填缺失的模型字段:从最近的助手消息元数据推断
if metadata.get("model_key") is None:
inferred_model = None
for msg in reversed(data.get("messages") or []):
if msg.get("role") != "assistant":
continue
mk = (msg.get("metadata") or {}).get("model_key")
if mk:
inferred_model = mk
break
if inferred_model is not None:
metadata["model_key"] = inferred_model
self._save_conversation_file(conversation_id, data)
print(f"🔧 为对话 {conversation_id} 回填模型字段: {inferred_model}")
return data
except (json.JSONDecodeError, Exception) as e:
print(f"⌘ 加载对话失败 {conversation_id}: {e}")
return None
def update_token_statistics(
self,
conversation_id: str,
input_tokens: int,
output_tokens: int,
total_tokens: int
) -> bool:
"""
更新对话的Token统计
Args:
conversation_id: 对话ID
input_tokens: 输入Token数量
output_tokens: 输出Token数量
total_tokens: 本次请求的总Token数量prompt+completion
Returns:
bool: 更新是否成功
"""
try:
conversation_data = self.load_conversation(conversation_id)
if not conversation_data:
print(f"⚠️ 无法找到对话 {conversation_id}跳过Token统计")
return False
# 确保Token统计结构存在
if "token_statistics" not in conversation_data:
conversation_data["token_statistics"] = self._initialize_token_statistics()
# 更新统计数据
token_stats = conversation_data["token_statistics"]
token_stats["total_input_tokens"] = token_stats.get("total_input_tokens", 0) + input_tokens
token_stats["total_output_tokens"] = token_stats.get("total_output_tokens", 0) + output_tokens
token_stats["total_tokens"] = token_stats.get("total_tokens", 0) + total_tokens
token_stats["current_context_tokens"] = total_tokens
token_stats["updated_at"] = datetime.now().isoformat()
# 保存更新
self._save_conversation_file(conversation_id, conversation_data)
print(f"📊 Token统计已更新: +{input_tokens}输入, +{output_tokens}输出 "
f"(总计: {token_stats['total_input_tokens']}输入, {token_stats['total_output_tokens']}输出)")
return True
except Exception as e:
print(f"⌘ 更新Token统计失败 {conversation_id}: {e}")
return False
def get_token_statistics(self, conversation_id: str) -> Optional[Dict]:
"""
获取对话的Token统计
Args:
conversation_id: 对话ID
Returns:
Dict: Token统计数据
"""
try:
conversation_data = self.load_conversation(conversation_id)
if not conversation_data:
return None
validated = self._validate_token_statistics(conversation_data)
token_stats = validated.get("token_statistics", {})
result = {
"total_input_tokens": token_stats.get("total_input_tokens", 0),
"total_output_tokens": token_stats.get("total_output_tokens", 0),
"total_tokens": token_stats.get("total_tokens", 0),
"current_context_tokens": token_stats.get("current_context_tokens", 0),
"updated_at": token_stats.get("updated_at"),
"conversation_id": conversation_id
}
return result
except Exception as e:
print(f"⌘ 获取Token统计失败 {conversation_id}: {e}")
return None
def get_current_context_tokens(self, conversation_id: str) -> int:
"""获取最近一次请求的上下文token"""
stats = self.get_token_statistics(conversation_id)
if not stats:
return 0
return stats.get("current_context_tokens", 0)
def get_conversation_list(self, limit: int = 50, offset: int = 0) -> Dict:
"""
获取对话列表
Args:
limit: 限制数量
offset: 偏移量
Returns:
Dict: 包含对话列表和统计信息
"""
try:
index = self._load_index()
# 按更新时间倒序排列
sorted_conversations = sorted(
index.items(),
key=lambda x: x[1].get("updated_at", ""),
reverse=True
)
# 分页
total = len(sorted_conversations)
conversations = sorted_conversations[offset:offset+limit]
# 格式化结果
result = []
for conv_id, metadata in conversations:
result.append({
"id": conv_id,
"title": metadata.get("title", "未命名对话"),
"created_at": metadata.get("created_at"),
"updated_at": metadata.get("updated_at"),
"project_path": metadata.get("project_path"),
"project_relative_path": metadata.get("project_relative_path"),
"thinking_mode": metadata.get("thinking_mode", False),
"total_messages": metadata.get("total_messages", 0),
"total_tools": metadata.get("total_tools", 0),
"status": metadata.get("status", "active")
})
return {
"conversations": result,
"total": total,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total
}
except Exception as e:
print(f"⌘ 获取对话列表失败: {e}")
return {
"conversations": [],
"total": 0,
"limit": limit,
"offset": offset,
"has_more": False
}
def delete_conversation(self, conversation_id: str) -> bool:
"""
删除对话
Args:
conversation_id: 对话ID
Returns:
bool: 删除是否成功
"""
try:
# 删除对话文件
file_path = self._get_conversation_file_path(conversation_id)
if file_path.exists():
file_path.unlink()
# 从索引中删除
index = self._load_index()
if conversation_id in index:
del index[conversation_id]
self._save_index(index)
# 如果删除的是当前对话清除当前对话ID
if self.current_conversation_id == conversation_id:
self.current_conversation_id = None
print(f"🗑️ 已删除对话: {conversation_id}")
return True
except Exception as e:
print(f"⌘ 删除对话失败 {conversation_id}: {e}")
return False
def archive_conversation(self, conversation_id: str) -> bool:
"""
归档对话(标记为已归档,不删除)
Args:
conversation_id: 对话ID
Returns:
bool: 归档是否成功
"""
try:
# 更新对话状态
conversation_data = self.load_conversation(conversation_id)
if not conversation_data:
return False
conversation_data["metadata"]["status"] = "archived"
conversation_data["updated_at"] = datetime.now().isoformat()
# 保存更新
self._save_conversation_file(conversation_id, conversation_data)
self._update_index(conversation_id, conversation_data)
print(f"📦 已归档对话: {conversation_id}")
return True
except Exception as e:
print(f"⌘ 归档对话失败 {conversation_id}: {e}")
return False
def search_conversations(self, query: str, limit: int = 20) -> List[Dict]:
"""
搜索对话
Args:
query: 搜索关键词
limit: 限制数量
Returns:
List[Dict]: 匹配的对话列表
"""
try:
index = self._load_index()
results = []
query_lower = query.lower()
for conv_id, metadata in index.items():
# 搜索标题
title = metadata.get("title", "").lower()
if query_lower in title:
score = 100 # 标题匹配权重最高
results.append((score, {
"id": conv_id,
"title": metadata.get("title"),
"created_at": metadata.get("created_at"),
"updated_at": metadata.get("updated_at"),
"project_path": metadata.get("project_path"),
"match_type": "title"
}))
continue
# 搜索项目路径
project_path = metadata.get("project_path", "").lower()
if query_lower in project_path:
results.append((50, {
"id": conv_id,
"title": metadata.get("title"),
"created_at": metadata.get("created_at"),
"updated_at": metadata.get("updated_at"),
"project_path": metadata.get("project_path"),
"match_type": "project_path"
}))
# 按分数排序
results.sort(key=lambda x: x[0], reverse=True)
# 返回前N个结果
return [result[1] for result in results[:limit]]
except Exception as e:
print(f"⌘ 搜索对话失败: {e}")
return []
def cleanup_old_conversations(self, days: int = 30) -> int:
"""
清理旧对话(可选功能)
Args:
days: 保留天数
Returns:
int: 清理的对话数量
"""
try:
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
cutoff_iso = cutoff_date.isoformat()
index = self._load_index()
to_delete = []
for conv_id, metadata in index.items():
updated_at = metadata.get("updated_at", "")
if updated_at < cutoff_iso and metadata.get("status") != "archived":
to_delete.append(conv_id)
deleted_count = 0
for conv_id in to_delete:
if self.delete_conversation(conv_id):
deleted_count += 1
if deleted_count > 0:
print(f"🧹 清理了 {deleted_count} 个旧对话")
return deleted_count
except Exception as e:
print(f"⌘ 清理旧对话失败: {e}")
return 0
def get_statistics(self) -> Dict:
"""
获取对话统计信息
Returns:
Dict: 统计信息
"""
try:
index = self._load_index()
total_conversations = len(index)
total_messages = sum(meta.get("total_messages", 0) for meta in index.values())
total_tools = sum(meta.get("total_tools", 0) for meta in index.values())
# 按状态分类
status_count = {}
for metadata in index.values():
status = metadata.get("status", "active")
status_count[status] = status_count.get(status, 0) + 1
# 按思考模式分类
thinking_mode_count = {
"thinking": sum(1 for meta in index.values() if meta.get("thinking_mode")),
"fast": sum(1 for meta in index.values() if not meta.get("thinking_mode"))
}
# 新增Token统计汇总
total_input_tokens = 0
total_output_tokens = 0
token_stats_count = 0
for conv_id in index.keys():
token_stats = self.get_token_statistics(conv_id)
if token_stats:
total_input_tokens += token_stats.get("total_input_tokens", 0)
total_output_tokens += token_stats.get("total_output_tokens", 0)
token_stats_count += 1
return {
"total_conversations": total_conversations,
"total_messages": total_messages,
"total_tools": total_tools,
"status_distribution": status_count,
"thinking_mode_distribution": thinking_mode_count,
"token_statistics": {
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_tokens": total_input_tokens + total_output_tokens,
"conversations_with_stats": token_stats_count
}
}
except Exception as e:
print(f"⌘ 获取统计信息失败: {e}")
return {}
def get_current_conversation_id(self) -> Optional[str]:
"""获取当前对话ID"""
return self.current_conversation_id
def set_current_conversation_id(self, conversation_id: str):
"""设置当前对话ID"""
self.current_conversation_id = conversation_id