- Refactor 6000+ line web_server.py into server/ module - Create separate modules: auth, chat, conversation, files, admin, etc. - Keep web_server.py as backward-compatible entry point - Add container running status field in user_container_manager - Improve admin dashboard API with credentials and debug support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
104 lines
2.9 KiB
Python
104 lines
2.9 KiB
Python
"""认证与角色相关基础函数,供各模块复用。"""
|
|
from __future__ import annotations
|
|
from functools import wraps
|
|
from typing import Optional, Any, Dict
|
|
from flask import session, redirect, jsonify
|
|
|
|
from modules import admin_policy_manager
|
|
from .utils_common import debug_log
|
|
from . import state
|
|
|
|
|
|
def is_logged_in() -> bool:
|
|
return session.get('username') is not None
|
|
|
|
|
|
def login_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return redirect('/login')
|
|
return view_func(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def api_login_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped(*args, **kwargs):
|
|
if not is_logged_in():
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
return view_func(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def get_current_username() -> Optional[str]:
|
|
return session.get('username')
|
|
|
|
|
|
def get_current_user_record():
|
|
username = get_current_username()
|
|
if not username:
|
|
return None
|
|
return state.user_manager.get_user(username)
|
|
|
|
|
|
def get_current_user_role(record=None) -> str:
|
|
role = session.get('role')
|
|
if role:
|
|
return role
|
|
if record is None:
|
|
record = get_current_user_record()
|
|
return (record.role if record and record.role else 'user')
|
|
|
|
|
|
def is_admin_user(record=None) -> bool:
|
|
role = get_current_user_role(record)
|
|
return isinstance(role, str) and role.lower() == 'admin'
|
|
|
|
|
|
def resolve_admin_policy(record=None) -> Dict[str, Any]:
|
|
"""获取当前用户生效的管理员策略。"""
|
|
if record is None:
|
|
record = get_current_user_record()
|
|
username = record.username if record else None
|
|
role = get_current_user_role(record)
|
|
invite_code = getattr(record, "invite_code", None)
|
|
try:
|
|
return admin_policy_manager.get_effective_policy(username, role, invite_code)
|
|
except Exception as exc:
|
|
debug_log(f"[admin_policy] 加载失败: {exc}")
|
|
return admin_policy_manager.get_effective_policy(username, role, invite_code)
|
|
|
|
|
|
def admin_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped(*args, **kwargs):
|
|
record = get_current_user_record()
|
|
if not record or not is_admin_user(record):
|
|
return redirect('/new')
|
|
return view_func(*args, **kwargs)
|
|
return wrapped
|
|
|
|
|
|
def admin_api_required(view_func):
|
|
@wraps(view_func)
|
|
def wrapped(*args, **kwargs):
|
|
record = get_current_user_record()
|
|
if not record or not is_admin_user(record):
|
|
return jsonify({"success": False, "error": "需要管理员权限"}), 403
|
|
return view_func(*args, **kwargs)
|
|
return wrapped
|
|
|
|
__all__ = [
|
|
"is_logged_in",
|
|
"login_required",
|
|
"api_login_required",
|
|
"get_current_username",
|
|
"get_current_user_record",
|
|
"get_current_user_role",
|
|
"is_admin_user",
|
|
"resolve_admin_policy",
|
|
"admin_required",
|
|
"admin_api_required",
|
|
]
|