agent-Specialization/server/auth_helpers.py
JOJO d6fb59e1d8 refactor: split web_server into modular architecture
- Refactor 6000+ line web_server.py into server/ module
- Create separate modules: auth, chat, conversation, files, admin, etc.
- Keep web_server.py as backward-compatible entry point
- Add container running status field in user_container_manager
- Improve admin dashboard API with credentials and debug support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-22 09:21:53 +08:00

104 lines
2.9 KiB
Python

"""认证与角色相关基础函数,供各模块复用。"""
from __future__ import annotations
from functools import wraps
from typing import Optional, Any, Dict
from flask import session, redirect, jsonify
from modules import admin_policy_manager
from .utils_common import debug_log
from . import state
def is_logged_in() -> bool:
return session.get('username') is not None
def login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if not is_logged_in():
return redirect('/login')
return view_func(*args, **kwargs)
return wrapped
def api_login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if not is_logged_in():
return jsonify({"error": "Unauthorized"}), 401
return view_func(*args, **kwargs)
return wrapped
def get_current_username() -> Optional[str]:
return session.get('username')
def get_current_user_record():
username = get_current_username()
if not username:
return None
return state.user_manager.get_user(username)
def get_current_user_role(record=None) -> str:
role = session.get('role')
if role:
return role
if record is None:
record = get_current_user_record()
return (record.role if record and record.role else 'user')
def is_admin_user(record=None) -> bool:
role = get_current_user_role(record)
return isinstance(role, str) and role.lower() == 'admin'
def resolve_admin_policy(record=None) -> Dict[str, Any]:
"""获取当前用户生效的管理员策略。"""
if record is None:
record = get_current_user_record()
username = record.username if record else None
role = get_current_user_role(record)
invite_code = getattr(record, "invite_code", None)
try:
return admin_policy_manager.get_effective_policy(username, role, invite_code)
except Exception as exc:
debug_log(f"[admin_policy] 加载失败: {exc}")
return admin_policy_manager.get_effective_policy(username, role, invite_code)
def admin_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
record = get_current_user_record()
if not record or not is_admin_user(record):
return redirect('/new')
return view_func(*args, **kwargs)
return wrapped
def admin_api_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
record = get_current_user_record()
if not record or not is_admin_user(record):
return jsonify({"success": False, "error": "需要管理员权限"}), 403
return view_func(*args, **kwargs)
return wrapped
__all__ = [
"is_logged_in",
"login_required",
"api_login_required",
"get_current_username",
"get_current_user_record",
"get_current_user_role",
"is_admin_user",
"resolve_admin_policy",
"admin_required",
"admin_api_required",
]