@app.route('/api/usage', methods=['GET']) @api_login_required def get_usage_stats(): """返回当前用户的模型/搜索调用统计。""" username = get_current_username() tracker = get_or_create_usage_tracker(username) if not tracker: return jsonify({"success": False, "error": "未找到用户"}), 404 return jsonify({ "success": True, "data": tracker.get_stats() }) @app.route('/api/thinking-mode', methods=['POST']) @api_login_required @with_terminal @rate_limited("thinking_mode_toggle", 15, 60, scope="user") def update_thinking_mode(terminal: WebTerminal, workspace: UserWorkspace, username: str): """切换思考模式""" try: data = request.get_json() or {} requested_mode = data.get('mode') if requested_mode in {"fast", "thinking", "deep"}: target_mode = requested_mode elif 'thinking_mode' in data: target_mode = "thinking" if bool(data.get('thinking_mode')) else "fast" else: target_mode = terminal.run_mode terminal.set_run_mode(target_mode) if terminal.thinking_mode: terminal.api_client.start_new_task(force_deep=terminal.deep_thinking_mode) else: terminal.api_client.start_new_task() session['thinking_mode'] = terminal.thinking_mode session['run_mode'] = terminal.run_mode # 更新当前对话的元数据 ctx = terminal.context_manager if ctx.current_conversation_id: try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode, model_key=getattr(terminal, "model_key", None) ) except Exception as exc: print(f"[API] 保存思考模式到对话失败: {exc}") status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify({ "success": True, "data": { "thinking_mode": terminal.thinking_mode, "mode": terminal.run_mode } }) except Exception as exc: print(f"[API] 切换思考模式失败: {exc}") code = 400 if isinstance(exc, ValueError) else 500 return jsonify({ "success": False, "error": str(exc), "message": "切换思考模式时发生异常" }), code @app.route('/api/model', methods=['POST']) @api_login_required @with_terminal @rate_limited("model_switch", 10, 60, scope="user") def update_model(terminal: WebTerminal, workspace: UserWorkspace, username: str): """切换基础模型(快速/思考模型组合)。""" try: data = request.get_json() or {} model_key = data.get("model_key") if not model_key: return jsonify({"success": False, "error": "缺少 model_key"}), 400 # 管理员禁用模型校验 policy = resolve_admin_policy(get_current_user_record()) disabled_models = set(policy.get("disabled_models") or []) if model_key in disabled_models: return jsonify({ "success": False, "error": "该模型已被管理员禁用", "message": "被管理员强制禁用" }), 403 terminal.set_model(model_key) # fast-only 时 run_mode 可能被强制为 fast session["model_key"] = terminal.model_key session["run_mode"] = terminal.run_mode session["thinking_mode"] = terminal.thinking_mode # 更新当前对话元数据 ctx = terminal.context_manager if ctx.current_conversation_id: try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode, model_key=terminal.model_key, has_images=getattr(ctx, "has_images", False) ) except Exception as exc: print(f"[API] 保存模型到对话失败: {exc}") status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify({ "success": True, "data": { "model_key": terminal.model_key, "run_mode": terminal.run_mode, "thinking_mode": terminal.thinking_mode } }) except Exception as exc: print(f"[API] 切换模型失败: {exc}") code = 400 if isinstance(exc, ValueError) else 500 return jsonify({"success": False, "error": str(exc), "message": str(exc)}), code @app.route('/api/personalization', methods=['GET']) @api_login_required @with_terminal def get_personalization_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取个性化配置""" try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_personal_space"): return jsonify({"success": False, "error": "个人空间已被管理员禁用"}), 403 data = load_personalization_config(workspace.data_dir) return jsonify({ "success": True, "data": data, "tool_categories": terminal.get_tool_settings_snapshot(), "thinking_interval_default": THINKING_FAST_INTERVAL, "thinking_interval_range": { "min": THINKING_INTERVAL_MIN, "max": THINKING_INTERVAL_MAX } }) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @app.route('/api/personalization', methods=['POST']) @api_login_required @with_terminal @rate_limited("personalization_update", 20, 300, scope="user") def update_personalization_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """更新个性化配置""" payload = request.get_json() or {} try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_personal_space"): return jsonify({"success": False, "error": "个人空间已被管理员禁用"}), 403 config = save_personalization_config(workspace.data_dir, payload) try: terminal.apply_personalization_preferences(config) session['run_mode'] = terminal.run_mode session['thinking_mode'] = terminal.thinking_mode ctx = getattr(terminal, 'context_manager', None) if ctx and getattr(ctx, 'current_conversation_id', None): try: ctx.conversation_manager.save_conversation( conversation_id=ctx.current_conversation_id, messages=ctx.conversation_history, project_path=str(ctx.project_path), todo_list=ctx.todo_list, thinking_mode=terminal.thinking_mode, run_mode=terminal.run_mode ) except Exception as meta_exc: debug_log(f"应用个性化偏好失败: 同步对话元数据异常 {meta_exc}") try: status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") except Exception as status_exc: debug_log(f"广播个性化状态失败: {status_exc}") except Exception as exc: debug_log(f"应用个性化偏好失败: {exc}") return jsonify({ "success": True, "data": config, "tool_categories": terminal.get_tool_settings_snapshot(), "thinking_interval_default": THINKING_FAST_INTERVAL, "thinking_interval_range": { "min": THINKING_INTERVAL_MIN, "max": THINKING_INTERVAL_MAX } }) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @app.route('/api/memory', methods=['GET']) @api_login_required @with_terminal def api_memory_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): """返回主/任务记忆条目列表,供虚拟显示器加载""" memory_type = request.args.get('type', 'main') if memory_type not in ('main', 'task'): return jsonify({"success": False, "error": "type 必须是 main 或 task"}), 400 try: entries = terminal.memory_manager._read_entries(memory_type) # type: ignore return jsonify({"success": True, "type": memory_type, "entries": entries}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @app.route('/api/gui/monitor_snapshot', methods=['GET']) @api_login_required def get_monitor_snapshot_api(): execution_id = request.args.get('executionId') or request.args.get('execution_id') or request.args.get('id') if not execution_id: return jsonify({ 'success': False, 'error': '缺少 executionId 参数' }), 400 stage = (request.args.get('stage') or 'before').lower() if stage not in {'before', 'after'}: stage = 'before' snapshot = get_cached_monitor_snapshot(execution_id, stage) if not snapshot: return jsonify({ 'success': False, 'error': '未找到对应快照' }), 404 return jsonify({ 'success': True, 'snapshot': snapshot, 'stage': stage }) @app.route('/api/focused') @api_login_required @with_terminal def get_focused_files(terminal: WebTerminal, workspace: UserWorkspace, username: str): """聚焦功能已废弃,返回空列表保持接口兼容。""" return jsonify({}) @app.route('/api/todo-list') @api_login_required @with_terminal def get_todo_list(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取当前待办列表""" todo_snapshot = terminal.context_manager.get_todo_snapshot() return jsonify({ "success": True, "data": todo_snapshot }) @app.route('/api/upload', methods=['POST']) @api_login_required @with_terminal @rate_limited("legacy_upload", 20, 300, scope="user") def upload_file(terminal: WebTerminal, workspace: UserWorkspace, username: str): """处理前端文件上传请求""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_upload"): return jsonify({ "success": False, "error": "文件上传已被管理员禁用", "message": "被管理员禁用上传" }), 403 if 'file' not in request.files: return jsonify({ "success": False, "error": "未找到文件", "message": "请求中缺少文件字段" }), 400 uploaded_file = request.files['file'] original_name = (request.form.get('filename') or '').strip() if not uploaded_file or not uploaded_file.filename or uploaded_file.filename.strip() == '': return jsonify({ "success": False, "error": "文件名为空", "message": "请选择要上传的文件" }), 400 raw_name = original_name or uploaded_file.filename filename = sanitize_filename_preserve_unicode(raw_name) if not filename: filename = secure_filename(raw_name) if not filename: return jsonify({ "success": False, "error": "非法文件名", "message": "文件名包含不支持的字符" }), 400 file_manager = getattr(terminal, 'file_manager', None) if file_manager is None: return jsonify({ "success": False, "error": "文件管理器未初始化" }), 500 target_folder_relative = UPLOAD_FOLDER_NAME valid_folder, folder_error, folder_path = file_manager._validate_path(target_folder_relative) if not valid_folder: return jsonify({ "success": False, "error": folder_error }), 400 try: folder_path.mkdir(parents=True, exist_ok=True) except Exception as exc: return jsonify({ "success": False, "error": f"创建上传目录失败: {exc}" }), 500 target_relative = str(Path(target_folder_relative) / filename) valid_file, file_error, target_full_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = target_full_path if final_path.exists(): stem = final_path.stem suffix = final_path.suffix counter = 1 while final_path.exists(): candidate_name = f"{stem}_{counter}{suffix}" target_relative = str(Path(target_folder_relative) / candidate_name) valid_file, file_error, candidate_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = candidate_path counter += 1 try: relative_path = str(final_path.relative_to(workspace.project_path)) except Exception as exc: return jsonify({ "success": False, "error": f"路径解析失败: {exc}" }), 400 guard = get_upload_guard(workspace) try: result = guard.process_upload( uploaded_file, final_path, username=username, source="legacy_upload", original_name=raw_name, relative_path=relative_path, ) except UploadSecurityError as exc: return build_upload_error_response(exc) except Exception as exc: return jsonify({ "success": False, "error": f"保存文件失败: {exc}" }), 500 metadata = result.get("metadata", {}) print(f"{OUTPUT_FORMATS['file']} 上传文件: {relative_path}") return jsonify({ "success": True, "path": relative_path, "filename": final_path.name, "folder": target_folder_relative, "scan": metadata.get("scan"), "sha256": metadata.get("sha256"), "size": metadata.get("size"), }) @app.errorhandler(RequestEntityTooLarge) def handle_file_too_large(error): """全局捕获上传超大小""" size_mb = MAX_UPLOAD_SIZE / (1024 * 1024) return jsonify({ "success": False, "error": "文件过大", "message": f"单个文件大小不可超过 {size_mb:.1f} MB" }), 413 @app.route('/api/download/file') @api_login_required @with_terminal def download_file_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """下载单个文件""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_file(): return jsonify({"success": False, "error": "文件不存在"}), 404 return send_file( full_path, as_attachment=True, download_name=full_path.name ) @app.route('/api/download/folder') @api_login_required @with_terminal def download_folder_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """打包并下载文件夹""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_dir(): return jsonify({"success": False, "error": "文件夹不存在"}), 404 buffer = BytesIO() folder_name = Path(path).name or full_path.name or "archive" with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_buffer: # 确保目录本身被包含 zip_buffer.write(full_path, arcname=folder_name + '/') for item in full_path.rglob('*'): relative_name = Path(folder_name) / item.relative_to(full_path) if item.is_dir(): zip_buffer.write(item, arcname=str(relative_name) + '/') else: zip_buffer.write(item, arcname=str(relative_name)) buffer.seek(0) return send_file( buffer, mimetype='application/zip', as_attachment=True, download_name=f"{folder_name}.zip" ) @app.route('/api/tool-settings', methods=['GET', 'POST']) @api_login_required @with_terminal def tool_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取或更新工具启用状态""" if request.method == 'GET': snapshot = terminal.get_tool_settings_snapshot() return jsonify({ "success": True, "categories": snapshot }) data = request.get_json() or {} category = data.get('category') if category is None: return jsonify({ "success": False, "error": "缺少类别参数", "message": "请求体需要提供 category 字段" }), 400 if 'enabled' not in data: return jsonify({ "success": False, "error": "缺少启用状态", "message": "请求体需要提供 enabled 字段" }), 400 try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_tool_toggle"): return jsonify({ "success": False, "error": "工具开关已被管理员禁用", "message": "被管理员强制禁用" }), 403 enabled = bool(data['enabled']) forced = getattr(terminal, "admin_forced_category_states", {}) or {} if isinstance(forced.get(category), bool) and forced[category] != enabled: return jsonify({ "success": False, "error": "该工具类别已被管理员强制为启用/禁用,无法修改", "message": "被管理员强制启用/禁用" }), 403 terminal.set_tool_category_enabled(category, enabled) snapshot = terminal.get_tool_settings_snapshot() socketio.emit('tool_settings_updated', { 'categories': snapshot }, room=f"user_{username}") return jsonify({ "success": True, "categories": snapshot }) except ValueError as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/terminals') @api_login_required @with_terminal def get_terminals(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取终端会话列表""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_realtime_terminal"): return jsonify({"success": False, "error": "实时终端已被管理员禁用"}), 403 if terminal.terminal_manager: result = terminal.terminal_manager.list_terminals() return jsonify(result) else: return jsonify({"sessions": [], "active": None, "total": 0}) @app.route('/api/socket-token', methods=['GET']) @api_login_required def issue_socket_token(): """生成一次性 WebSocket token,供握手阶段使用。""" username = get_current_username() prune_socket_tokens() now = time.time() for token_value, meta in list(pending_socket_tokens.items()): if meta.get("username") == username: pending_socket_tokens.pop(token_value, None) token_value = secrets.token_urlsafe(32) pending_socket_tokens[token_value] = { "username": username, "expires_at": now + SOCKET_TOKEN_TTL_SECONDS, "fingerprint": (request.headers.get('User-Agent') or '')[:128], } return jsonify({ "success": True, "token": token_value, "expires_in": SOCKET_TOKEN_TTL_SECONDS })