from __future__ import annotations import sys, os PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) import asyncio, json, time, re, os from datetime import datetime, timedelta from pathlib import Path from collections import defaultdict, Counter, deque from typing import Dict, Any, Optional, List, Tuple from flask import Blueprint, request, jsonify, session from werkzeug.utils import secure_filename import zipfile from config import ( OUTPUT_FORMATS, AUTO_FIX_TOOL_CALL, AUTO_FIX_MAX_ATTEMPTS, MAX_ITERATIONS_PER_TASK, MAX_CONSECUTIVE_SAME_TOOL, MAX_TOTAL_TOOL_CALLS, TOOL_CALL_COOLDOWN, MAX_UPLOAD_SIZE, DEFAULT_CONVERSATIONS_LIMIT, MAX_CONVERSATIONS_LIMIT, CONVERSATIONS_DIR, DEFAULT_RESPONSE_MAX_TOKENS, DEFAULT_PROJECT_PATH, LOGS_DIR, AGENT_VERSION, THINKING_FAST_INTERVAL, PROJECT_MAX_STORAGE_MB, PROJECT_MAX_STORAGE_BYTES, UPLOAD_SCAN_LOG_SUBDIR, ) from modules.personalization_manager import ( load_personalization_config, save_personalization_config, THINKING_INTERVAL_MIN, THINKING_INTERVAL_MAX, ) from modules.upload_security import UploadSecurityError from modules.user_manager import UserWorkspace from modules.usage_tracker import QUOTA_DEFAULTS from core.web_terminal import WebTerminal from utils.tool_result_formatter import format_tool_result_for_context from utils.conversation_manager import ConversationManager from utils.api_client import DeepSeekClient from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record, get_current_username from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response, ensure_conversation_loaded, reset_system_state, get_user_resources, get_or_create_usage_tracker from .utils_common import ( build_review_lines, debug_log, _sanitize_filename_component, log_backend_chunk, log_frontend_chunk, log_streaming_debug_entry, brief_log, DEBUG_LOG_FILE, CHUNK_BACKEND_LOG_FILE, CHUNK_FRONTEND_LOG_FILE, STREAMING_DEBUG_LOG_FILE, ) from .extensions import socketio from .state import ( RECENT_UPLOAD_EVENT_LIMIT, RECENT_UPLOAD_FEED_LIMIT, user_manager, container_manager, get_last_active_ts, ) from .conversation_stats import ( build_admin_dashboard_snapshot, compute_workspace_storage, collect_user_token_statistics, collect_upload_events, summarize_upload_events, ) conversation_bp = Blueprint('conversation', __name__) # === 背景生成对话标题(从 app_legacy 拆分) === @conversation_bp.route('/api/conversations', methods=['GET']) @api_login_required @with_terminal def get_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话列表""" try: # 获取查询参数 limit = request.args.get('limit', 20, type=int) offset = request.args.get('offset', 0, type=int) # 限制参数范围 limit = max(1, min(limit, 10000)) # 限制在1-10000之间 offset = max(0, offset) result = terminal.get_conversations_list(limit=limit, offset=offset) if result["success"]: return jsonify({ "success": True, "data": result["data"] }) else: return jsonify({ "success": False, "error": result.get("error", "Unknown error"), "message": result.get("message", "获取对话列表失败") }), 500 except Exception as e: print(f"[API] 获取对话列表错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话列表时发生异常" }), 500 @conversation_bp.route('/api/conversations', methods=['POST']) @api_login_required @with_terminal def create_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str): """创建新对话""" try: data = request.get_json() or {} # 前端现在期望“新建对话”回到用户配置的默认模型/模式, # 只有当客户端显式要求保留当前模式时才使用传入值。 preserve_mode = bool(data.get('preserve_mode')) thinking_mode = data.get('thinking_mode') if preserve_mode and 'thinking_mode' in data else None run_mode = data.get('mode') if preserve_mode and 'mode' in data else None result = terminal.create_new_conversation(thinking_mode=thinking_mode, run_mode=run_mode) if result["success"]: session['run_mode'] = terminal.run_mode session['thinking_mode'] = terminal.thinking_mode # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'created', 'conversation_id': result["conversation_id"] }, room=f"user_{username}") # 广播当前对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': result["conversation_id"], 'title': "新对话" }, room=f"user_{username}") return jsonify(result), 201 else: return jsonify(result), 500 except Exception as e: print(f"[API] 创建对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "创建对话时发生异常" }), 500 @conversation_bp.route('/api/conversations/', methods=['GET']) @api_login_required @with_terminal def get_conversation_info(terminal: WebTerminal, workspace: UserWorkspace, username: str, conversation_id): """获取特定对话信息""" try: # 通过ConversationManager直接获取对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: # 提取关键信息,不返回完整消息内容(避免数据量过大) info = { "id": conversation_data["id"], "title": conversation_data["title"], "created_at": conversation_data["created_at"], "updated_at": conversation_data["updated_at"], "metadata": conversation_data["metadata"], "messages_count": len(conversation_data.get("messages", [])) } return jsonify({ "success": True, "data": info }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话信息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话信息时发生异常" }), 500 @conversation_bp.route('/api/conversations//load', methods=['PUT']) @api_login_required @with_terminal def load_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """加载特定对话""" try: result = terminal.load_conversation(conversation_id) if result["success"]: # 广播对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': conversation_id, 'title': result.get("title", "未知对话"), 'messages_count': result.get("messages_count", 0) }, room=f"user_{username}") # 广播系统状态更新(因为当前对话改变了) status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") # 清理和重置相关UI状态 socketio.emit('conversation_loaded', { 'conversation_id': conversation_id, 'clear_ui': True # 提示前端清理当前UI状态 }, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 加载对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "加载对话时发生异常" }), 500 @conversation_bp.route('/api/conversations/', methods=['DELETE']) @api_login_required @with_terminal def delete_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """删除特定对话""" try: # 检查是否是当前对话 is_current = (terminal.context_manager.current_conversation_id == conversation_id) result = terminal.delete_conversation(conversation_id) if result["success"]: # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'deleted', 'conversation_id': conversation_id }, room=f"user_{username}") # 如果删除的是当前对话,广播对话清空事件 if is_current: socketio.emit('conversation_changed', { 'conversation_id': None, 'title': None, 'cleared': True }, room=f"user_{username}") # 更新系统状态 status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 删除对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "删除对话时发生异常" }), 500 @conversation_bp.route('/api/conversations/search', methods=['GET']) @api_login_required @with_terminal def search_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """搜索对话""" try: query = request.args.get('q', '').strip() limit = request.args.get('limit', 20, type=int) if not query: return jsonify({ "success": False, "error": "Missing query parameter", "message": "请提供搜索关键词" }), 400 # 限制参数范围 limit = max(1, min(limit, 50)) result = terminal.search_conversations(query, limit) return jsonify({ "success": True, "data": { "results": result["results"], "count": result["count"], "query": query } }) except Exception as e: print(f"[API] 搜索对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "搜索对话时发生异常" }), 500 @conversation_bp.route('/api/conversations//messages', methods=['GET']) @api_login_required @with_terminal def get_conversation_messages(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话的消息历史(可选功能,用于调试或详细查看)""" try: # 获取完整对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: messages = conversation_data.get("messages", []) # 可选:限制消息数量,避免返回过多数据 limit = request.args.get('limit', type=int) if limit: messages = messages[-limit:] # 获取最后N条消息 return jsonify({ "success": True, "data": { "conversation_id": conversation_id, "messages": messages, "total_count": len(conversation_data.get("messages", [])) } }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话消息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话消息时发生异常" }), 500 @conversation_bp.route('/api/conversations//compress', methods=['POST']) @api_login_required @with_terminal def compress_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """压缩指定对话的大体积消息,生成压缩版新对话""" try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_compress_conversation"): return jsonify({"success": False, "error": "压缩对话已被管理员禁用"}), 403 normalized_id = conversation_id if conversation_id.startswith('conv_') else f"conv_{conversation_id}" result = terminal.context_manager.compress_conversation(normalized_id) if not result.get("success"): status_code = 404 if "不存在" in result.get("error", "") else 400 return jsonify(result), status_code new_conversation_id = result["compressed_conversation_id"] load_result = terminal.load_conversation(new_conversation_id) if load_result.get("success"): socketio.emit('conversation_list_update', { 'action': 'compressed', 'conversation_id': new_conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': new_conversation_id, 'title': load_result.get('title', '压缩后的对话'), 'messages_count': load_result.get('messages_count', 0) }, room=f"user_{username}") socketio.emit('conversation_loaded', { 'conversation_id': new_conversation_id, 'clear_ui': True }, room=f"user_{username}") response_payload = { "success": True, "compressed_conversation_id": new_conversation_id, "compressed_types": result.get("compressed_types", []), "system_message": result.get("system_message"), "load_result": load_result } return jsonify(response_payload) except Exception as e: print(f"[API] 压缩对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "压缩对话时发生异常" }), 500 @conversation_bp.route('/api/sub_agents', methods=['GET']) @api_login_required @with_terminal def list_sub_agents(terminal: WebTerminal, workspace: UserWorkspace, username: str): """返回当前对话的子智能体任务列表。""" manager = getattr(terminal, "sub_agent_manager", None) if not manager: return jsonify({"success": True, "data": []}) try: conversation_id = terminal.context_manager.current_conversation_id data = manager.get_overview(conversation_id=conversation_id) return jsonify({"success": True, "data": data}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @conversation_bp.route('/api/conversations//duplicate', methods=['POST']) @api_login_required @with_terminal def duplicate_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """复制指定对话,生成新的对话副本""" try: result = terminal.context_manager.duplicate_conversation(conversation_id) if not result.get("success"): status_code = 404 if "不存在" in result.get("error", "") else 400 return jsonify(result), status_code new_conversation_id = result["duplicate_conversation_id"] load_result = terminal.load_conversation(new_conversation_id) if load_result.get("success"): socketio.emit('conversation_list_update', { 'action': 'duplicated', 'conversation_id': new_conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': new_conversation_id, 'title': load_result.get('title', '复制的对话'), 'messages_count': load_result.get('messages_count', 0) }, room=f"user_{username}") socketio.emit('conversation_loaded', { 'conversation_id': new_conversation_id, 'clear_ui': True }, room=f"user_{username}") response_payload = { "success": True, "duplicate_conversation_id": new_conversation_id, "load_result": load_result } return jsonify(response_payload) except Exception as e: print(f"[API] 复制对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "复制对话时发生异常" }), 500 @conversation_bp.route('/api/conversations//review_preview', methods=['GET']) @api_login_required @with_terminal def review_conversation_preview(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """生成对话回顾预览(不落盘,只返回前若干行文本)""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_conversation_review"): return jsonify({"success": False, "error": "对话引用已被管理员禁用"}), 403 try: current_id = terminal.context_manager.current_conversation_id if conversation_id == current_id: return jsonify({ "success": False, "message": "无法引用当前对话" }), 400 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if not conversation_data: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 limit = request.args.get('limit', default=20, type=int) or 20 lines = build_review_lines(conversation_data.get("messages", []), limit=limit) return jsonify({ "success": True, "data": { "preview": lines, "count": len(lines) } }) except Exception as e: print(f"[API] 对话回顾预览错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "生成预览时发生异常" }), 500 @conversation_bp.route('/api/conversations//review', methods=['POST']) @api_login_required @with_terminal def review_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """生成完整对话回顾 Markdown 文件""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_conversation_review"): return jsonify({"success": False, "error": "对话引用已被管理员禁用"}), 403 try: current_id = terminal.context_manager.current_conversation_id if conversation_id == current_id: return jsonify({ "success": False, "message": "无法引用当前对话" }), 400 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if not conversation_data: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 messages = conversation_data.get("messages", []) lines = build_review_lines(messages) content = "\n".join(lines) + "\n" char_count = len(content) uploads_dir = workspace.uploads_dir / "review" uploads_dir.mkdir(parents=True, exist_ok=True) title = conversation_data.get("title") or "untitled" safe_title = _sanitize_filename_component(title) timestamp = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"review_{safe_title}_{timestamp}.md" target = uploads_dir / filename target.write_text(content, encoding='utf-8') return jsonify({ "success": True, "data": { "path": f"user_upload/review/{filename}", "char_count": char_count } }) except Exception as e: print(f"[API] 对话回顾生成错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "生成对话回顾时发生异常" }), 500 @conversation_bp.route('/api/conversations/statistics', methods=['GET']) @api_login_required @with_terminal def get_conversations_statistics(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话统计信息""" try: stats = terminal.context_manager.get_conversation_statistics() return jsonify({ "success": True, "data": stats }) except Exception as e: print(f"[API] 获取对话统计错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话统计时发生异常" }), 500 @conversation_bp.route('/api/conversations/current', methods=['GET']) @api_login_required @with_terminal def get_current_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取当前对话信息""" current_id = terminal.context_manager.current_conversation_id # 如果是临时ID,返回空的对话信息 if not current_id or current_id.startswith('temp_'): return jsonify({ "success": True, "data": { "id": current_id, "title": "新对话", "messages_count": 0, "is_temporary": True } }) # 如果是真实的对话ID,查找对话数据 try: conversation_data = terminal.context_manager.conversation_manager.load_conversation(current_id) if conversation_data: return jsonify({ "success": True, "data": { "id": current_id, "title": conversation_data.get("title", "未知对话"), "messages_count": len(conversation_data.get("messages", [])), "is_temporary": False } }) else: return jsonify({ "success": False, "error": "对话不存在" }), 404 except Exception as e: print(f"[API] 获取当前对话错误: {e}") return jsonify({ "success": False, "error": str(e) }), 500 @socketio.on('send_command') def handle_command(data): """处理系统命令""" command = data.get('command', '') username, terminal, _ = get_terminal_for_sid(request.sid) if not terminal: emit('error', {'message': 'System not initialized'}) return record_user_activity(username) if command.startswith('/'): command = command[1:] parts = command.split(maxsplit=1) cmd = parts[0].lower() if cmd == "clear": terminal.context_manager.conversation_history.clear() if terminal.thinking_mode: terminal.api_client.start_new_task(force_deep=terminal.deep_thinking_mode) emit('command_result', { 'command': cmd, 'success': True, 'message': '对话已清除' }) elif cmd == "status": status = terminal.get_status() # 添加终端状态 if terminal.terminal_manager: terminal_status = terminal.terminal_manager.list_terminals() status['terminals'] = terminal_status emit('command_result', { 'command': cmd, 'success': True, 'data': status }) elif cmd == "terminals": # 列出终端会话 if terminal.terminal_manager: result = terminal.terminal_manager.list_terminals() emit('command_result', { 'command': cmd, 'success': True, 'data': result }) else: emit('command_result', { 'command': cmd, 'success': False, 'message': '终端系统未初始化' }) else: emit('command_result', { 'command': cmd, 'success': False, 'message': f'未知命令: {cmd}' }) @conversation_bp.route('/api/conversations//token-statistics', methods=['GET']) @api_login_required @with_terminal def get_conversation_token_statistics(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取特定对话的token统计""" try: stats = terminal.context_manager.get_conversation_token_statistics(conversation_id) if stats: return jsonify({ "success": True, "data": stats }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取token统计错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取token统计时发生异常" }), 500 @conversation_bp.route('/api/conversations//tokens', methods=['GET']) @api_login_required @with_terminal def get_conversation_tokens(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话的当前完整上下文token数(包含所有动态内容)""" try: current_tokens = terminal.context_manager.get_current_context_tokens(conversation_id) return jsonify({ "success": True, "data": { "total_tokens": current_tokens } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500