from __future__ import annotations import json, time from datetime import datetime from typing import Dict, Any, Optional from pathlib import Path from io import BytesIO import zipfile import os from flask import Blueprint, jsonify, request, session, send_file from werkzeug.utils import secure_filename from werkzeug.exceptions import RequestEntityTooLarge import secrets from config import THINKING_FAST_INTERVAL, MAX_UPLOAD_SIZE, OUTPUT_FORMATS from modules.personalization_manager import ( load_personalization_config, save_personalization_config, THINKING_INTERVAL_MIN, THINKING_INTERVAL_MAX, ) from modules.upload_security import UploadSecurityError from modules.user_manager import UserWorkspace from core.web_terminal import WebTerminal from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record, get_current_username from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response, ensure_conversation_loaded, get_or_create_usage_tracker from .security import rate_limited, prune_socket_tokens from .utils_common import debug_log from .state import PROJECT_MAX_STORAGE_MB, THINKING_FAILURE_KEYWORDS, pending_socket_tokens, SOCKET_TOKEN_TTL_SECONDS from .extensions import socketio from .monitor import get_cached_monitor_snapshot from .files import sanitize_filename_preserve_unicode UPLOAD_FOLDER_NAME = "user_upload" chat_bp = Blueprint('chat', __name__) @chat_bp.route('/api/thinking-mode', methods=['POST']) @api_login_required @with_terminal @rate_limited("thinking_mode_toggle", 15, 60, scope="user") def update_thinking_mode(terminal: WebTerminal, workspace: UserWorkspace, username: str): """切换思考模式""" try: data = request.get_json() or {} requested_mode = data.get('mode') if requested_mode in {"fast", "thinking", "deep"}: target_mode = requested_mode elif 'thinking_mode' in data: target_mode = "thinking" if bool(data.get('thinking_mode')) else "fast" else: target_mode = terminal.run_mode terminal.set_run_mode(target_mode) if terminal.thinking_mode: terminal.api_client.start_new_task(force_deep=terminal.deep_thinking_mode) else: terminal.api_client.start_new_task() session['thinking_mode'] = terminal.thinking_mode session['run_mode'] = terminal.run_mode # 更新当前对话的元数据 ctx = terminal.context_manager if ctx.current_conversation_id: try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode, model_key=getattr(terminal, "model_key", None), has_images=getattr(ctx, "has_images", False), has_videos=getattr(ctx, "has_videos", False) ) except Exception as exc: print(f"[API] 保存思考模式到对话失败: {exc}") status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify({ "success": True, "data": { "thinking_mode": terminal.thinking_mode, "mode": terminal.run_mode } }) except Exception as exc: print(f"[API] 切换思考模式失败: {exc}") code = 400 if isinstance(exc, ValueError) else 500 return jsonify({ "success": False, "error": str(exc), "message": "切换思考模式时发生异常" }), code @chat_bp.route('/api/model', methods=['POST']) @api_login_required @with_terminal @rate_limited("model_switch", 10, 60, scope="user") def update_model(terminal: WebTerminal, workspace: UserWorkspace, username: str): """切换基础模型(快速/思考模型组合)。""" try: data = request.get_json() or {} model_key = data.get("model_key") if not model_key: return jsonify({"success": False, "error": "缺少 model_key"}), 400 # 管理员禁用模型校验 policy = resolve_admin_policy(get_current_user_record()) disabled_models = set(policy.get("disabled_models") or []) if model_key in disabled_models: return jsonify({ "success": False, "error": "该模型已被管理员禁用", "message": "被管理员强制禁用" }), 403 terminal.set_model(model_key) # fast-only 时 run_mode 可能被强制为 fast session["model_key"] = terminal.model_key session["run_mode"] = terminal.run_mode session["thinking_mode"] = terminal.thinking_mode # 更新当前对话元数据 ctx = terminal.context_manager if ctx.current_conversation_id: try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode, model_key=terminal.model_key, has_images=getattr(ctx, "has_images", False), has_videos=getattr(ctx, "has_videos", False) ) except Exception as exc: print(f"[API] 保存模型到对话失败: {exc}") status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify({ "success": True, "data": { "model_key": terminal.model_key, "run_mode": terminal.run_mode, "thinking_mode": terminal.thinking_mode } }) except Exception as exc: print(f"[API] 切换模型失败: {exc}") code = 400 if isinstance(exc, ValueError) else 500 return jsonify({"success": False, "error": str(exc), "message": str(exc)}), code @chat_bp.route('/api/personalization', methods=['GET']) @api_login_required @with_terminal def get_personalization_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取个性化配置""" try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_personal_space"): return jsonify({"success": False, "error": "个人空间已被管理员禁用"}), 403 data = load_personalization_config(workspace.data_dir) return jsonify({ "success": True, "data": data, "tool_categories": terminal.get_tool_settings_snapshot(), "thinking_interval_default": THINKING_FAST_INTERVAL, "thinking_interval_range": { "min": THINKING_INTERVAL_MIN, "max": THINKING_INTERVAL_MAX } }) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @chat_bp.route('/api/personalization', methods=['POST']) @api_login_required @with_terminal @rate_limited("personalization_update", 20, 300, scope="user") def update_personalization_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """更新个性化配置""" payload = request.get_json() or {} try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_personal_space"): return jsonify({"success": False, "error": "个人空间已被管理员禁用"}), 403 config = save_personalization_config(workspace.data_dir, payload) try: terminal.apply_personalization_preferences(config) session['run_mode'] = terminal.run_mode session['thinking_mode'] = terminal.thinking_mode ctx = getattr(terminal, 'context_manager', None) if ctx and getattr(ctx, 'current_conversation_id', None): try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode ) except Exception as meta_exc: debug_log(f"应用个性化偏好失败: 同步对话元数据异常 {meta_exc}") try: status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") except Exception as status_exc: debug_log(f"广播个性化状态失败: {status_exc}") except Exception as exc: debug_log(f"应用个性化偏好失败: {exc}") return jsonify({ "success": True, "data": config, "tool_categories": terminal.get_tool_settings_snapshot(), "thinking_interval_default": THINKING_FAST_INTERVAL, "thinking_interval_range": { "min": THINKING_INTERVAL_MIN, "max": THINKING_INTERVAL_MAX } }) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @chat_bp.route('/api/memory', methods=['GET']) @api_login_required @with_terminal def api_memory_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): """返回主/任务记忆条目列表,供虚拟显示器加载""" memory_type = request.args.get('type', 'main') if memory_type not in ('main', 'task'): return jsonify({"success": False, "error": "type 必须是 main 或 task"}), 400 try: entries = terminal.memory_manager._read_entries(memory_type) # type: ignore return jsonify({"success": True, "type": memory_type, "entries": entries}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @chat_bp.route('/api/gui/monitor_snapshot', methods=['GET']) @api_login_required def get_monitor_snapshot_api(): execution_id = request.args.get('executionId') or request.args.get('execution_id') or request.args.get('id') if not execution_id: return jsonify({ 'success': False, 'error': '缺少 executionId 参数' }), 400 stage = (request.args.get('stage') or 'before').lower() if stage not in {'before', 'after'}: stage = 'before' snapshot = get_cached_monitor_snapshot(execution_id, stage) if not snapshot: return jsonify({ 'success': False, 'error': '未找到对应快照' }), 404 return jsonify({ 'success': True, 'snapshot': snapshot, 'stage': stage }) @chat_bp.route('/api/focused') @api_login_required @with_terminal def get_focused_files(terminal: WebTerminal, workspace: UserWorkspace, username: str): """聚焦功能已废弃,返回空列表保持接口兼容。""" return jsonify({}) @chat_bp.route('/api/todo-list') @api_login_required @with_terminal def get_todo_list(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取当前待办列表""" todo_snapshot = terminal.context_manager.get_todo_snapshot() return jsonify({ "success": True, "data": todo_snapshot }) @chat_bp.route('/api/upload', methods=['POST']) @api_login_required @with_terminal @rate_limited("legacy_upload", 20, 300, scope="user") def upload_file(terminal: WebTerminal, workspace: UserWorkspace, username: str): """处理前端文件上传请求""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_upload"): return jsonify({ "success": False, "error": "文件上传已被管理员禁用", "message": "被管理员禁用上传" }), 403 if 'file' not in request.files: return jsonify({ "success": False, "error": "未找到文件", "message": "请求中缺少文件字段" }), 400 uploaded_file = request.files['file'] original_name = (request.form.get('filename') or '').strip() if not uploaded_file or not uploaded_file.filename or uploaded_file.filename.strip() == '': return jsonify({ "success": False, "error": "文件名为空", "message": "请选择要上传的文件" }), 400 raw_name = original_name or uploaded_file.filename filename = sanitize_filename_preserve_unicode(raw_name) if not filename: filename = secure_filename(raw_name) if not filename: return jsonify({ "success": False, "error": "非法文件名", "message": "文件名包含不支持的字符" }), 400 file_manager = getattr(terminal, 'file_manager', None) if file_manager is None: return jsonify({ "success": False, "error": "文件管理器未初始化" }), 500 target_folder_relative = UPLOAD_FOLDER_NAME valid_folder, folder_error, folder_path = file_manager._validate_path(target_folder_relative) if not valid_folder: return jsonify({ "success": False, "error": folder_error }), 400 try: folder_path.mkdir(parents=True, exist_ok=True) except Exception as exc: return jsonify({ "success": False, "error": f"创建上传目录失败: {exc}" }), 500 target_relative = str(Path(target_folder_relative) / filename) valid_file, file_error, target_full_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = target_full_path if final_path.exists(): stem = final_path.stem suffix = final_path.suffix counter = 1 while final_path.exists(): candidate_name = f"{stem}_{counter}{suffix}" target_relative = str(Path(target_folder_relative) / candidate_name) valid_file, file_error, candidate_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = candidate_path counter += 1 try: relative_path = str(final_path.relative_to(workspace.project_path)) except Exception as exc: return jsonify({ "success": False, "error": f"路径解析失败: {exc}" }), 400 guard = get_upload_guard(workspace) try: result = guard.process_upload( uploaded_file, final_path, username=username, source="legacy_upload", original_name=raw_name, relative_path=relative_path, ) except UploadSecurityError as exc: return build_upload_error_response(exc) except Exception as exc: return jsonify({ "success": False, "error": f"保存文件失败: {exc}" }), 500 metadata = result.get("metadata", {}) print(f"{OUTPUT_FORMATS['file']} 上传文件: {relative_path}") return jsonify({ "success": True, "path": relative_path, "filename": final_path.name, "folder": target_folder_relative, "scan": metadata.get("scan"), "sha256": metadata.get("sha256"), "size": metadata.get("size"), }) @chat_bp.errorhandler(RequestEntityTooLarge) def handle_file_too_large(error): """全局捕获上传超大小""" size_mb = MAX_UPLOAD_SIZE / (1024 * 1024) return jsonify({ "success": False, "error": "文件过大", "message": f"单个文件大小不可超过 {size_mb:.1f} MB" }), 413 @chat_bp.route('/api/download/file') @api_login_required @with_terminal def download_file_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """下载单个文件""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_file(): return jsonify({"success": False, "error": "文件不存在"}), 404 return send_file( full_path, as_attachment=True, download_name=full_path.name ) @chat_bp.route('/api/download/folder') @api_login_required @with_terminal def download_folder_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """打包并下载文件夹""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_dir(): return jsonify({"success": False, "error": "文件夹不存在"}), 404 buffer = BytesIO() folder_name = Path(path).name or full_path.name or "archive" with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_buffer: # 确保目录本身被包含 zip_buffer.write(full_path, arcname=folder_name + '/') for item in full_path.rglob('*'): relative_name = Path(folder_name) / item.relative_to(full_path) if item.is_dir(): zip_buffer.write(item, arcname=str(relative_name) + '/') else: zip_buffer.write(item, arcname=str(relative_name)) buffer.seek(0) return send_file( buffer, mimetype='application/zip', as_attachment=True, download_name=f"{folder_name}.zip" ) @chat_bp.route('/api/tool-settings', methods=['GET', 'POST']) @api_login_required @with_terminal def tool_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取或更新工具启用状态""" if request.method == 'GET': snapshot = terminal.get_tool_settings_snapshot() return jsonify({ "success": True, "categories": snapshot }) data = request.get_json() or {} category = data.get('category') if category is None: return jsonify({ "success": False, "error": "缺少类别参数", "message": "请求体需要提供 category 字段" }), 400 if 'enabled' not in data: return jsonify({ "success": False, "error": "缺少启用状态", "message": "请求体需要提供 enabled 字段" }), 400 try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_tool_toggle"): return jsonify({ "success": False, "error": "工具开关已被管理员禁用", "message": "被管理员强制禁用" }), 403 enabled = bool(data['enabled']) forced = getattr(terminal, "admin_forced_category_states", {}) or {} if isinstance(forced.get(category), bool) and forced[category] != enabled: return jsonify({ "success": False, "error": "该工具类别已被管理员强制为启用/禁用,无法修改", "message": "被管理员强制启用/禁用" }), 403 terminal.set_tool_category_enabled(category, enabled) snapshot = terminal.get_tool_settings_snapshot() socketio.emit('tool_settings_updated', { 'categories': snapshot }, room=f"user_{username}") return jsonify({ "success": True, "categories": snapshot }) except ValueError as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @chat_bp.route('/api/terminals') @api_login_required @with_terminal def get_terminals(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取终端会话列表""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_realtime_terminal"): return jsonify({"success": False, "error": "实时终端已被管理员禁用"}), 403 if terminal.terminal_manager: result = terminal.terminal_manager.list_terminals() return jsonify(result) else: return jsonify({"sessions": [], "active": None, "total": 0}) @chat_bp.route('/api/socket-token', methods=['GET']) @api_login_required def issue_socket_token(): """生成一次性 WebSocket token,供握手阶段使用。""" username = get_current_username() prune_socket_tokens() now = time.time() for token_value, meta in list(pending_socket_tokens.items()): if meta.get("username") == username: pending_socket_tokens.pop(token_value, None) token_value = secrets.token_urlsafe(32) pending_socket_tokens[token_value] = { "username": username, "expires_at": now + SOCKET_TOKEN_TTL_SECONDS, "fingerprint": (request.headers.get('User-Agent') or '')[:128], } return jsonify({ "success": True, "token": token_value, "expires_in": SOCKET_TOKEN_TTL_SECONDS })