from __future__ import annotations from pathlib import Path import time import mimetypes import logging import json from flask import Blueprint, jsonify, send_from_directory, request, current_app, redirect, abort, session from werkzeug.security import check_password_hash from .auth_helpers import login_required, admin_required, admin_api_required, api_login_required, get_current_user_record, resolve_admin_policy from .security import rate_limited from .utils_common import debug_log from . import state # 使用动态 state,确保与入口实例保持一致 from .state import custom_tool_registry, user_manager, container_manager, PROJECT_MAX_STORAGE_MB from .conversation import ( build_admin_dashboard_snapshot as _build_dashboard_rich, compute_workspace_storage, collect_user_token_statistics, collect_upload_events, summarize_upload_events, ) from modules import admin_policy_manager, balance_client from collections import Counter from config import ADMIN_SECONDARY_PASSWORD_HASH, ADMIN_SECONDARY_PASSWORD, ADMIN_SECONDARY_TTL_SECONDS # CSRF 豁免:二级密码相关接口不需要 CSRF try: state.CSRF_EXEMPT_PATHS.update({ "/api/admin/secondary/status", "/api/admin/secondary/verify", }) except Exception: pass admin_bp = Blueprint('admin', __name__) # ------------------ 二级密码校验 ------------------ # def _is_secondary_verified() -> bool: ts = session.get("admin_secondary_verified_at") if not ts: return False try: return (time.time() - float(ts)) < ADMIN_SECONDARY_TTL_SECONDS except Exception: return False def _secondary_required(): if _is_secondary_verified(): return None return jsonify({"success": False, "error": "需要二级密码", "code": "secondary_required"}), 403 @admin_bp.route('/admin/monitor') @login_required @admin_required def admin_monitor_page(): """管理员监控页面入口""" return send_from_directory(str(Path(current_app.static_folder)/"admin_dashboard"), 'index.html') @admin_bp.route('/admin/policy') @login_required @admin_required def admin_policy_page(): """管理员策略配置页面""" return send_from_directory(Path(current_app.static_folder) / 'admin_policy', 'index.html') @admin_bp.route('/admin/custom-tools') @login_required @admin_required def admin_custom_tools_page(): """自定义工具管理页面""" return send_from_directory(str(Path(current_app.static_folder)/"custom_tools"), 'index.html') @admin_bp.route('/admin/api') @login_required @admin_required def admin_api_page(): """API 管理页面入口。""" return send_from_directory(str(Path(current_app.static_folder)/"admin_api"), 'index.html') @admin_bp.route('/api/admin/secondary/status', methods=['GET']) @login_required @admin_required def admin_secondary_status(): return jsonify({"success": True, "verified": _is_secondary_verified(), "ttl": ADMIN_SECONDARY_TTL_SECONDS}) @admin_bp.route('/api/admin/secondary/verify', methods=['POST']) @login_required @admin_required def admin_secondary_verify(): payload = request.get_json() or {} password = str(payload.get("password") or "").strip() if not password: return jsonify({"success": False, "error": "请输入二级密码"}), 400 try: if ADMIN_SECONDARY_PASSWORD_HASH: ok = check_password_hash(ADMIN_SECONDARY_PASSWORD_HASH, password) else: ok = password == (ADMIN_SECONDARY_PASSWORD or "") except Exception: ok = False if not ok: return jsonify({"success": False, "error": "二级密码错误"}), 401 session["admin_secondary_verified_at"] = time.time() return jsonify({"success": True, "ttl": ADMIN_SECONDARY_TTL_SECONDS}) @admin_bp.route('/api/admin/balance', methods=['GET']) @login_required @admin_required def admin_balance_api(): """查询第三方账户余额(Kimi/DeepSeek/Qwen)。""" guard = _secondary_required() if guard: return guard data = balance_client.fetch_all_balances() return jsonify({"success": True, "data": data}) @admin_bp.route('/admin/assets/') @login_required @admin_required def admin_asset_file(filename: str): return send_from_directory(str(Path(current_app.static_folder)/"admin_dashboard"), filename) @admin_bp.route('/user_upload/') @login_required def serve_user_upload(filename: str): """ 直接向前端暴露当前登录用户的上传目录文件,用于 等场景。 - 仅登录用户可访问 - 路径穿越校验:目标必须位于用户自己的 uploads_dir 内 """ user = get_current_user_record() if not user: return redirect('/login') workspace = user_manager.ensure_user_workspace(user.username) uploads_dir = workspace.uploads_dir.resolve() target = (uploads_dir / filename).resolve() try: target.relative_to(uploads_dir) except ValueError: abort(403) if not target.exists() or not target.is_file(): abort(404) return send_from_directory(str(uploads_dir), str(target.relative_to(uploads_dir))) @admin_bp.route('/workspace/') @login_required def serve_workspace_file(filename: str): """ 暴露当前登录用户项目目录下的文件(主要用于图片展示)。 - 仅登录用户可访问自己的项目文件 - 路径穿越校验:目标必须位于用户自己的 project_path 内 - 非图片直接拒绝,避免误暴露其他文件 """ user = get_current_user_record() if not user: return redirect('/login') workspace = user_manager.ensure_user_workspace(user.username) project_root = workspace.project_path.resolve() target = (project_root / filename).resolve() try: target.relative_to(project_root) except ValueError: abort(403) if not target.exists() or not target.is_file(): abort(404) mime_type, _ = mimetypes.guess_type(str(target)) if not mime_type or not mime_type.startswith("image/"): abort(415) return send_from_directory(str(target.parent), target.name) @admin_bp.route('/static/') def static_files(filename): """提供静态文件""" if filename.startswith('admin_dashboard'): abort(404) return send_from_directory('static', filename) @admin_bp.route('/api/admin/dashboard') @api_login_required @admin_api_required def admin_dashboard_snapshot_api(): guard = _secondary_required() if guard: return guard try: # 若当前管理员没有容器句柄,主动确保容器存在,避免面板始终显示“宿主机模式” try: record = get_current_user_record() uname = record.username if record else None if uname: handles = state.container_manager.list_containers() if uname not in handles: state.container_manager.ensure_container(uname, str(state.user_manager.ensure_user_workspace(uname).project_path)) except Exception as ensure_exc: logging.getLogger(__name__).warning("ensure_container for admin failed: %s", ensure_exc) snapshot = build_admin_dashboard_snapshot() # 双通道输出:标准日志 + 调试文件 try: logging.getLogger(__name__).info( "[admin_dashboard] snapshot=%s", json.dumps(snapshot, ensure_ascii=False)[:2000], ) except Exception: pass try: debug_log(f"[admin_dashboard] {json.dumps(snapshot, ensure_ascii=False)[:2000]}") except Exception: pass return jsonify({"success": True, "data": snapshot}) except Exception as exc: logging.exception("Failed to build admin dashboard") return jsonify({"success": False, "error": str(exc)}), 500 def build_admin_dashboard_snapshot(): """ 复用对话模块的完整统计逻辑,返回带用量/Token/存储/上传等数据的仪表盘快照。 """ try: return _build_dashboard_rich() except Exception as exc: # 兜底:若高级统计失败,仍返回最小信息,避免前端 500 logging.getLogger(__name__).warning("rich dashboard snapshot failed: %s", exc) handle_map = state.container_manager.list_containers() return { "generated_at": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), "overview": { "totals": { "users": len(state.user_manager.list_users() or {}), "active_users": len(handle_map), "containers_active": len(handle_map), "containers_max": getattr(state.container_manager, "max_containers", len(handle_map)), "available_container_slots": None, } }, "containers": [ {"username": u, "status": h} for u, h in handle_map.items() ], } def build_api_admin_dashboard_snapshot(): """构建 API 用户专用的监控快照。""" handle_map = container_manager.list_containers() api_users = state.api_user_manager.list_users() usage_map = state.api_user_manager.list_usage() items = [] token_totals = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} storage_total_bytes = 0 quarantine_total_bytes = 0 total_requests = 0 def _find_handle(name: str): for key in handle_map.keys(): if key.startswith(name): try: return container_manager.get_container_status(key, include_stats=True) except Exception: return None return None for username, record in api_users.items(): try: ws = state.api_user_manager.ensure_workspace(username) except Exception: continue storage = compute_workspace_storage(ws) tokens = collect_user_token_statistics(ws) usage = usage_map.get(username) or {} storage_total_bytes += storage.get("total_bytes", 0) or 0 quarantine_total_bytes += storage.get("quarantine_bytes", 0) or 0 for k in token_totals: token_totals[k] += tokens.get(k, 0) or 0 handle = _find_handle(username) total_requests += int(usage.get("total", 0) or 0) items.append({ "username": username, "note": getattr(record, "note", "") if record else "", "created_at": getattr(record, "created_at", ""), "token_sha256": getattr(record, "token_sha256", ""), "tokens": tokens, "storage": storage, "usage": { "total_requests": int(usage.get("total", 0)), "endpoints": usage.get("endpoints") or {}, "last_request_at": usage.get("last_request_at"), }, "container": handle, }) api_handles = { key: val for key, val in handle_map.items() if key.split("::")[0] in api_users } containers_active = len(api_handles) max_containers = getattr(container_manager, "max_containers", None) or len(handle_map) available_slots = None if max_containers: available_slots = max(0, max_containers - len(handle_map)) upload_events = collect_upload_events() uploads_summary = summarize_upload_events(upload_events, quarantine_total_bytes) overview = { "generated_at": time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), "totals": { "users": len(api_users), "containers_active": containers_active, "containers_max": max_containers, "available_container_slots": available_slots, }, "token_totals": token_totals, "usage_totals": { "requests": total_requests }, "storage": { "total_bytes": storage_total_bytes, "project_max_mb": PROJECT_MAX_STORAGE_MB, }, "uploads": uploads_summary.get("stats") or {}, } return { "generated_at": overview["generated_at"], "overview": overview, "users": items, "containers": api_handles, "uploads": uploads_summary, } @admin_bp.route('/api/admin/policy', methods=['GET', 'POST']) @api_login_required @admin_api_required def admin_policy_api(): guard = _secondary_required() if guard: return guard if request.method == 'GET': try: data = admin_policy_manager.load_policy() defaults = admin_policy_manager.describe_defaults() return jsonify({"success": True, "data": data, "defaults": defaults}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 # POST 更新 payload = request.get_json() or {} target_type = payload.get("target_type") target_value = payload.get("target_value") or "" config = payload.get("config") or {} try: saved = admin_policy_manager.save_scope_policy(target_type, target_value, config) return jsonify({"success": True, "data": saved}) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/custom-tools', methods=['GET', 'POST', 'DELETE']) @api_login_required @admin_api_required def admin_custom_tools_api(): """自定义工具管理(仅全局管理员)。""" guard = _secondary_required() if guard: return guard try: if request.method == 'GET': return jsonify({"success": True, "data": custom_tool_registry.list_tools()}) if request.method == 'POST': payload = request.get_json() or {} saved = custom_tool_registry.upsert_tool(payload) return jsonify({"success": True, "data": saved}) # DELETE tool_id = request.args.get("id") or (request.get_json() or {}).get("id") if not tool_id: return jsonify({"success": False, "error": "缺少 id"}), 400 removed = custom_tool_registry.delete_tool(tool_id) if removed: return jsonify({"success": True, "data": {"deleted": tool_id}}) return jsonify({"success": False, "error": "未找到该工具"}), 404 except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logging.exception("custom-tools API error") return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/custom-tools/file', methods=['GET', 'POST']) @api_login_required @admin_api_required def admin_custom_tools_file_api(): tool_id = request.args.get("id") or (request.get_json() or {}).get("id") name = request.args.get("name") or (request.get_json() or {}).get("name") guard = _secondary_required() if guard: return guard if not tool_id or not name: return jsonify({"success": False, "error": "缺少 id 或 name"}), 400 tool_dir = Path(custom_tool_registry.root) / tool_id if not tool_dir.exists(): return jsonify({"success": False, "error": "工具不存在"}), 404 target = tool_dir / name if request.method == 'GET': if not target.exists(): return jsonify({"success": False, "error": "文件不存在"}), 404 try: return target.read_text(encoding="utf-8") except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 # POST 保存文件 payload = request.get_json() or {} content = payload.get("content") try: target.write_text(content or "", encoding="utf-8") return jsonify({"success": True}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/custom-tools/reload', methods=['POST']) @api_login_required @admin_api_required def admin_custom_tools_reload_api(): guard = _secondary_required() if guard: return guard try: custom_tool_registry.reload() return jsonify({"success": True}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 # ------------------ API 用户与监控 ------------------ # @admin_bp.route('/api/admin/api-dashboard', methods=['GET']) @api_login_required @admin_api_required def admin_api_dashboard(): guard = _secondary_required() if guard: return guard try: snapshot = build_api_admin_dashboard_snapshot() return jsonify({"success": True, "data": snapshot}) except Exception as exc: logging.exception("Failed to build api dashboard") return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/api-users', methods=['GET', 'POST']) @api_login_required @admin_api_required def admin_api_users(): guard = _secondary_required() if guard: return guard if request.method == 'GET': users = state.api_user_manager.list_users() usage = state.api_user_manager.list_usage() items = [] for username, record in users.items(): meta = usage.get(username) or {} items.append({ "username": username, "note": getattr(record, "note", ""), "created_at": getattr(record, "created_at", ""), "token_sha256": getattr(record, "token_sha256", ""), "last_request_at": meta.get("last_request_at"), "total_requests": meta.get("total", 0), "endpoints": meta.get("endpoints") or {}, }) return jsonify({"success": True, "data": items}) payload = request.get_json() or {} username = (payload.get("username") or "").strip().lower() note = (payload.get("note") or "").strip() if not username: return jsonify({"success": False, "error": "用户名不能为空"}), 400 try: record, token = state.api_user_manager.create_user(username, note=note) return jsonify({ "success": True, "data": { "username": record.username, "token": token, "created_at": record.created_at, "note": record.note, } }) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: logging.exception("create api user failed") return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/api-users/', methods=['DELETE']) @api_login_required @admin_api_required def admin_api_users_delete(username: str): guard = _secondary_required() if guard: return guard if not username: return jsonify({"success": False, "error": "缺少用户名"}), 400 try: ok = state.api_user_manager.delete_user(username) if not ok: return jsonify({"success": False, "error": "用户不存在"}), 404 return jsonify({"success": True}) except Exception as exc: logging.exception("delete api user failed") return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/admin/api-users//token', methods=['GET']) @api_login_required @admin_api_required def admin_api_users_token(username: str): guard = _secondary_required() if guard: return guard try: # 尝试读取;若不存在或解密失败,自动重新签发 token = None try: token = state.api_user_manager.get_plain_token(username) except Exception: pass if not token: _, token = state.api_user_manager.issue_token(username) return jsonify({"success": True, "data": {"token": token}}) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 404 except Exception as exc: logging.exception("get api user token failed") return jsonify({"success": False, "error": str(exc)}), 500 @admin_bp.route('/api/effective-policy', methods=['GET']) @api_login_required def effective_policy_api(): record = get_current_user_record() policy = resolve_admin_policy(record) return jsonify({"success": True, "data": policy})