1816 lines
60 KiB
Python
1816 lines
60 KiB
Python
import sys, os
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
from flask import Flask, render_template, request, redirect, url_for, jsonify, session
|
||
import json
|
||
import os
|
||
import uuid
|
||
import datetime
|
||
import time
|
||
import shutil
|
||
import logging
|
||
import threading
|
||
import hashlib
|
||
import random
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import uuid
|
||
import datetime
|
||
import os
|
||
import json
|
||
import logging
|
||
import datetime
|
||
import os
|
||
import json
|
||
from functools import wraps
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
filename='game_app.log',
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger('game_app')
|
||
|
||
app = Flask(__name__)
|
||
app.secret_key = os.urandom(24) # 用于会话加密
|
||
|
||
# 数据文件路径
|
||
DATA_DIR = 'data'
|
||
BACKUP_DIR = os.path.join(DATA_DIR, 'backup')
|
||
USERS_FILE = os.path.join(DATA_DIR, 'users.json')
|
||
GAME_STATES_FILE = os.path.join(DATA_DIR, 'game_states.json')
|
||
CARDS_FILE = os.path.join(DATA_DIR, 'cards.json')
|
||
SESSIONS_FILE = os.path.join(DATA_DIR, 'sessions.json') # 会话文件
|
||
DEATH_METHODS_FILE = os.path.join(DATA_DIR, 'death_methods.json') # 死亡方式文件
|
||
ACHIEVEMENTS_FILE = os.path.join(DATA_DIR, 'achievements.json') # 成就系统文件
|
||
|
||
# 管理员账号和密码 - 硬编码
|
||
ADMIN_USERNAME = "13991190618"
|
||
ADMIN_PASSWORD = "@CuiYuJian040618"
|
||
|
||
# 管理员会话键名
|
||
ADMIN_SESSION_KEY = 'admin_logged_in'
|
||
|
||
# 文件锁,防止并发写入冲突
|
||
file_locks = {
|
||
USERS_FILE: threading.RLock(),
|
||
GAME_STATES_FILE: threading.RLock(),
|
||
SESSIONS_FILE: threading.RLock(),
|
||
CARDS_FILE: threading.RLock(),
|
||
DEATH_METHODS_FILE: threading.RLock(),
|
||
ACHIEVEMENTS_FILE: threading.RLock()
|
||
}
|
||
|
||
# 确保数据目录存在
|
||
def ensure_dirs_exist():
|
||
os.makedirs(DATA_DIR, exist_ok=True)
|
||
os.makedirs(BACKUP_DIR, exist_ok=True)
|
||
|
||
# 创建备份文件名
|
||
def get_backup_filename(original_file, include_timestamp=False):
|
||
base_name = os.path.basename(original_file)
|
||
if include_timestamp:
|
||
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
return os.path.join(BACKUP_DIR, f"{base_name}.{timestamp}.bak")
|
||
else:
|
||
return os.path.join(BACKUP_DIR, f"{base_name}.bak")
|
||
|
||
# 管理备份文件,保留最近的N个备份
|
||
def manage_backups(file_pattern, max_backups=5):
|
||
try:
|
||
backup_files = [f for f in os.listdir(BACKUP_DIR) if f.startswith(file_pattern)]
|
||
backup_files.sort(reverse=True) # 最新的文件排在前面
|
||
|
||
# 如果备份文件超过最大数量,删除最旧的
|
||
if len(backup_files) > max_backups:
|
||
for old_file in backup_files[max_backups:]:
|
||
try:
|
||
os.remove(os.path.join(BACKUP_DIR, old_file))
|
||
logger.info(f"删除旧备份文件: {old_file}")
|
||
except Exception as e:
|
||
logger.error(f"删除旧备份文件失败: {old_file}, 错误: {str(e)}")
|
||
except Exception as e:
|
||
logger.error(f"管理备份文件失败, 错误: {str(e)}")
|
||
|
||
# 安全地保存数据到文件
|
||
def safe_save_data(file_path, data, create_backup=True):
|
||
# 获取文件锁
|
||
lock = file_locks.get(file_path, threading.RLock())
|
||
|
||
with lock:
|
||
# 确保目录存在
|
||
ensure_dirs_exist()
|
||
|
||
# 如果原文件存在且需要备份,先创建备份
|
||
if create_backup and os.path.exists(file_path):
|
||
try:
|
||
# 创建带时间戳的备份文件,每小时最多一个
|
||
current_hour = datetime.datetime.now().strftime('%Y%m%d_%H')
|
||
backup_marker = hashlib.md5(file_path.encode()).hexdigest()[:8]
|
||
backup_file = os.path.join(BACKUP_DIR, f"{os.path.basename(file_path)}.{current_hour}.{backup_marker}.bak")
|
||
|
||
# 检查是否已经有该小时的备份
|
||
if not os.path.exists(backup_file):
|
||
shutil.copy2(file_path, backup_file)
|
||
logger.info(f"创建备份: {backup_file}")
|
||
|
||
# 管理旧备份
|
||
file_prefix = os.path.basename(file_path)
|
||
manage_backups(file_prefix, max_backups=10)
|
||
except Exception as e:
|
||
logger.error(f"创建备份失败: {str(e)}")
|
||
|
||
# 先写入临时文件
|
||
temp_file = file_path + '.tmp'
|
||
try:
|
||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
f.flush()
|
||
os.fsync(f.fileno()) # 确保数据写入磁盘
|
||
|
||
# 替换原文件
|
||
os.replace(temp_file, file_path)
|
||
logger.info(f"成功保存数据到: {file_path}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存数据到 {file_path} 失败: {str(e)}")
|
||
if os.path.exists(temp_file):
|
||
try:
|
||
os.remove(temp_file)
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
# 从文件或备份中安全地读取数据
|
||
def safe_load_data(file_path, default_value=None):
|
||
# 获取文件锁
|
||
lock = file_locks.get(file_path, threading.RLock())
|
||
|
||
with lock:
|
||
# 尝试读取原始文件
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError) as e:
|
||
logger.warning(f"读取文件 {file_path} 失败: {str(e)}, 尝试从备份恢复")
|
||
|
||
# 尝试从最新备份恢复
|
||
backup_file = get_backup_filename(file_path)
|
||
if os.path.exists(backup_file):
|
||
try:
|
||
with open(backup_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
logger.info(f"从备份 {backup_file} 恢复数据成功")
|
||
|
||
# 恢复成功后,保存回原始文件
|
||
safe_save_data(file_path, data, create_backup=False)
|
||
return data
|
||
except Exception as backup_error:
|
||
logger.error(f"从备份恢复失败: {str(backup_error)}")
|
||
|
||
# 如果指定了默认值,则返回默认值
|
||
if default_value is not None:
|
||
return default_value
|
||
|
||
# 否则,根据文件类型返回适当的空结构
|
||
if file_path == USERS_FILE:
|
||
return []
|
||
elif file_path == GAME_STATES_FILE:
|
||
return []
|
||
elif file_path == SESSIONS_FILE:
|
||
return {"active_sessions": []}
|
||
elif file_path == DEATH_METHODS_FILE:
|
||
return {"death_scenarios": []}
|
||
elif file_path == ACHIEVEMENTS_FILE:
|
||
return {"achievements": []}
|
||
else:
|
||
return {}
|
||
|
||
# 初始化数据文件
|
||
def init_data_files():
|
||
ensure_dirs_exist()
|
||
|
||
# 初始化用户文件
|
||
if not os.path.exists(USERS_FILE):
|
||
safe_save_data(USERS_FILE, [], create_backup=False)
|
||
|
||
# 初始化游戏状态文件
|
||
if not os.path.exists(GAME_STATES_FILE):
|
||
safe_save_data(GAME_STATES_FILE, [], create_backup=False)
|
||
|
||
# 初始化会话文件
|
||
if not os.path.exists(SESSIONS_FILE):
|
||
safe_save_data(SESSIONS_FILE, {"active_sessions": []}, create_backup=False)
|
||
|
||
# 初始化死亡方式文件
|
||
if not os.path.exists(DEATH_METHODS_FILE):
|
||
safe_save_data(DEATH_METHODS_FILE, {"death_scenarios": []}, create_backup=False)
|
||
|
||
# 初始化成就文件
|
||
if not os.path.exists(ACHIEVEMENTS_FILE):
|
||
safe_save_data(ACHIEVEMENTS_FILE, {"achievements": []}, create_backup=False)
|
||
|
||
ensure_character_weights()
|
||
|
||
def ensure_character_weights():
|
||
"""确保所有角色都有权重字段,没有的话添加默认权重5"""
|
||
try:
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 记录是否有修改
|
||
has_changes = False
|
||
|
||
# 检查每个角色
|
||
for character in characters:
|
||
if 'weight' not in character:
|
||
# 添加默认权重
|
||
character['weight'] = 100
|
||
has_changes = True
|
||
|
||
# 如果有修改,保存回文件
|
||
if has_changes:
|
||
save_cards(cards_data)
|
||
logger.info("已为所有角色添加默认权重字段")
|
||
except Exception as e:
|
||
logger.error(f"确保角色权重时出错: {str(e)}")
|
||
|
||
# 读取用户数据
|
||
def get_users():
|
||
return safe_load_data(USERS_FILE, [])
|
||
|
||
# 保存用户数据
|
||
def save_users(users):
|
||
return safe_save_data(USERS_FILE, users)
|
||
|
||
# 读取游戏状态数据
|
||
def get_game_states():
|
||
return safe_load_data(GAME_STATES_FILE, [])
|
||
|
||
# 保存游戏状态数据
|
||
def save_game_states(states):
|
||
return safe_save_data(GAME_STATES_FILE, states)
|
||
|
||
# 读取卡牌数据
|
||
def get_cards():
|
||
default_cards = {
|
||
"characters": [],
|
||
"gameSettings": {
|
||
"initialValues": {
|
||
"loyalty": 50, "chaos": 50, "population": 50, "military": 50, "resources": 50
|
||
},
|
||
"maxValues": {
|
||
"loyalty": 100, "chaos": 100, "population": 100, "military": 100, "resources": 100
|
||
},
|
||
"minValues": {
|
||
"loyalty": 0, "chaos": 0, "population": 0, "military": 0, "resources": 0
|
||
},
|
||
"statusIcons": {
|
||
"loyalty": "🦅", "chaos": "🌀", "population": "👥", "military": "🔫", "resources": "⚙️"
|
||
}
|
||
}
|
||
}
|
||
return safe_load_data(CARDS_FILE, default_cards)
|
||
|
||
# 保存卡牌数据
|
||
def save_cards(cards_data):
|
||
return safe_save_data(CARDS_FILE, cards_data)
|
||
|
||
# 读取会话数据
|
||
def get_sessions():
|
||
return safe_load_data(SESSIONS_FILE, {"active_sessions": []})
|
||
|
||
# 保存会话数据
|
||
def save_sessions(sessions):
|
||
return safe_save_data(SESSIONS_FILE, sessions)
|
||
|
||
# 读取死亡方式数据
|
||
def get_death_methods():
|
||
return safe_load_data(DEATH_METHODS_FILE, {"death_scenarios": []})
|
||
|
||
# 保存死亡方式数据
|
||
def save_death_methods(data):
|
||
return safe_save_data(DEATH_METHODS_FILE, data)
|
||
|
||
# 读取成就数据
|
||
def get_achievements_data():
|
||
return safe_load_data(ACHIEVEMENTS_FILE, {"achievements": []})
|
||
|
||
# 保存成就数据
|
||
def save_achievements_data(data):
|
||
return safe_save_data(ACHIEVEMENTS_FILE, data)
|
||
|
||
# 更新会话
|
||
def update_session(user_id, status=None):
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get("active_sessions", [])
|
||
current_time = datetime.datetime.utcnow().isoformat()
|
||
|
||
# 查找当前用户会话
|
||
session_found = False
|
||
for session_data in active_sessions:
|
||
if session_data.get("user_id") == user_id:
|
||
# 更新现有会话
|
||
session_data["last_activity"] = current_time
|
||
if status:
|
||
session_data["status"] = status
|
||
session_found = True
|
||
break
|
||
|
||
# 保存更新后的会话数据
|
||
if session_found:
|
||
sessions["active_sessions"] = active_sessions
|
||
save_sessions(sessions)
|
||
return True
|
||
return False
|
||
|
||
# 添加会话
|
||
def add_session(user_id, ip_address):
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get("active_sessions", [])
|
||
|
||
# 检查是否已存在该用户的会话
|
||
for session_data in active_sessions:
|
||
if session_data.get("user_id") == user_id:
|
||
# 更新现有会话
|
||
session_data["login_time"] = datetime.datetime.utcnow().isoformat()
|
||
session_data["last_activity"] = datetime.datetime.utcnow().isoformat()
|
||
session_data["ip_address"] = ip_address
|
||
session_data["status"] = "idle"
|
||
sessions["active_sessions"] = active_sessions
|
||
save_sessions(sessions)
|
||
return True
|
||
|
||
# 创建新会话
|
||
new_session = {
|
||
"user_id": user_id,
|
||
"login_time": datetime.datetime.utcnow().isoformat(),
|
||
"last_activity": datetime.datetime.utcnow().isoformat(),
|
||
"ip_address": request.remote_addr,
|
||
"status": "idle"
|
||
}
|
||
|
||
active_sessions.append(new_session)
|
||
sessions["active_sessions"] = active_sessions
|
||
save_sessions(sessions)
|
||
return True
|
||
|
||
# 移除会话
|
||
def remove_session(user_id):
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get("active_sessions", [])
|
||
|
||
# 过滤掉指定用户的会话
|
||
original_count = len(active_sessions)
|
||
active_sessions = [s for s in active_sessions if s.get("user_id") != user_id]
|
||
|
||
if len(active_sessions) < original_count:
|
||
sessions["active_sessions"] = active_sessions
|
||
save_sessions(sessions)
|
||
return True
|
||
return False
|
||
|
||
# 清理过期会话 - 超过2小时未活动的会话
|
||
def clean_expired_sessions():
|
||
try:
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get("active_sessions", [])
|
||
current_time = datetime.datetime.utcnow()
|
||
|
||
# 只过滤掉超过2小时未活动的会话
|
||
original_count = len(active_sessions)
|
||
filtered_sessions = []
|
||
for session in active_sessions:
|
||
try:
|
||
last_activity = datetime.datetime.fromisoformat(session.get("last_activity", ""))
|
||
time_diff = (current_time - last_activity).total_seconds() / 60 # 小时差
|
||
if time_diff < 5: # 2小时超时
|
||
filtered_sessions.append(session)
|
||
except (ValueError, TypeError) as e:
|
||
logger.warning(f"处理会话时间格式错误: {str(e)}")
|
||
# 如果时间格式错误,保留该会话而不是直接删除
|
||
filtered_sessions.append(session)
|
||
|
||
# 只有在真正有会话被删除时才更新数据
|
||
if len(filtered_sessions) != original_count:
|
||
logger.info(f"清理过期会话: 清理前{original_count}个,清理后{len(filtered_sessions)}个")
|
||
sessions["active_sessions"] = filtered_sessions
|
||
save_sessions(sessions)
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"清理过期会话失败: {str(e)}")
|
||
return False
|
||
|
||
# 首页,显示排行榜
|
||
@app.route('/')
|
||
def index():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
# 获取所有用户数据,按照最高分排序
|
||
users = get_users()
|
||
leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True)
|
||
|
||
# 更新用户活动状态
|
||
update_session(session.get('user_id'))
|
||
|
||
return render_template('index.html', leaderboard=leaderboard, current_user=session.get('username'))
|
||
|
||
# 登录页面
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
error = None
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
|
||
users = get_users()
|
||
user = next((u for u in users if u['username'] == username), None)
|
||
|
||
if user and check_password_hash(user['password'], password):
|
||
session['user_id'] = user['id']
|
||
session['username'] = user['username']
|
||
|
||
# 添加会话记录
|
||
add_session(user['id'], request.remote_addr)
|
||
|
||
return redirect(url_for('index'))
|
||
else:
|
||
error = '用户名或密码错误'
|
||
|
||
return render_template('login.html', error=error)
|
||
|
||
# 注册页面
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
error = None
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
|
||
users = get_users()
|
||
|
||
# 检查用户名是否已存在
|
||
if any(u['username'] == username for u in users):
|
||
error = '用户名已存在'
|
||
else:
|
||
# 创建新用户
|
||
new_user = {
|
||
'id': str(uuid.uuid4()),
|
||
'username': username,
|
||
'password': generate_password_hash(password),
|
||
'created_at': datetime.datetime.utcnow().isoformat(),
|
||
'high_score': 0,
|
||
'total_games': 0,
|
||
'last_year': 41000 # 添加最后游戏年份字段,默认为41000
|
||
}
|
||
|
||
users.append(new_user)
|
||
save_users(users)
|
||
|
||
# 自动登录
|
||
session['user_id'] = new_user['id']
|
||
session['username'] = new_user['username']
|
||
|
||
# 添加会话记录
|
||
add_session(new_user['id'], request.remote_addr)
|
||
|
||
return redirect(url_for('index'))
|
||
|
||
return render_template('register.html', error=error)
|
||
|
||
# 注销
|
||
@app.route('/logout')
|
||
def logout():
|
||
if 'user_id' in session:
|
||
# 移除会话记录
|
||
remove_session(session['user_id'])
|
||
|
||
# 清除会话
|
||
session.pop('user_id', None)
|
||
session.pop('username', None)
|
||
|
||
return redirect(url_for('login'))
|
||
|
||
# 游戏页面
|
||
@app.route('/game')
|
||
def game():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
# 获取游戏数据
|
||
cards_data = get_cards()
|
||
|
||
# 获取用户最后游戏年份
|
||
users = get_users()
|
||
user = next((u for u in users if u['id'] == session['user_id']), None)
|
||
last_year = user.get('last_year', 41000) if user else 41000
|
||
|
||
# 添加最后年份到数据中
|
||
game_data = {
|
||
'cards': cards_data,
|
||
'events': cards_data.get('events', []),
|
||
'statuses': cards_data.get('statuses', []),
|
||
'lastYear': last_year
|
||
}
|
||
|
||
# 更新用户状态为游戏中
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get("active_sessions", [])
|
||
|
||
for session_data in active_sessions:
|
||
if session_data.get("user_id") == session['user_id']:
|
||
session_data["status"] = "playing"
|
||
session_data["game_start_time"] = datetime.datetime.utcnow().isoformat()
|
||
break
|
||
|
||
sessions["active_sessions"] = active_sessions
|
||
save_sessions(sessions)
|
||
|
||
return render_template('game.html',
|
||
current_user=session.get('username'),
|
||
game_data_json=json.dumps(game_data))
|
||
|
||
# API: 获取卡牌数据
|
||
@app.route('/api/cards', methods=['GET'])
|
||
def api_get_cards():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
cards_data = get_cards()
|
||
return jsonify(cards_data)
|
||
|
||
# API: 保存游戏状态
|
||
@app.route('/api/save_game', methods=['POST'])
|
||
def api_save_game():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
game_data = request.json
|
||
|
||
# 添加调试日志
|
||
print(f"保存用户 {user_id} 的游戏状态")
|
||
|
||
# 更新用户的最后游戏年份
|
||
if 'year' in game_data:
|
||
users = get_users()
|
||
user = next((u for u in users if u['id'] == user_id), None)
|
||
if user:
|
||
user['last_year'] = game_data['year']
|
||
user['last_game_time'] = datetime.datetime.utcnow().isoformat()
|
||
save_users(users)
|
||
|
||
# 获取现有游戏状态
|
||
game_states = get_game_states()
|
||
|
||
# 查找用户的游戏状态
|
||
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
|
||
|
||
if user_state:
|
||
# 更新现有状态,但保留成就数据
|
||
print(f"更新用户现有游戏状态,保留成就数据")
|
||
|
||
# 保存之前的成就数据
|
||
achievements_data = []
|
||
if 'game_data' in user_state and 'achievements' in user_state['game_data']:
|
||
achievements_data = user_state['game_data']['achievements']
|
||
print(f"保留现有成就数据,共 {len(achievements_data)} 项")
|
||
|
||
# 更新游戏数据
|
||
user_state['game_data'] = game_data
|
||
|
||
# 恢复成就数据
|
||
if achievements_data:
|
||
user_state['game_data']['achievements'] = achievements_data
|
||
|
||
user_state['updated_at'] = datetime.datetime.utcnow().isoformat()
|
||
else:
|
||
# 创建新状态
|
||
print(f"用户没有现有游戏状态,创建新状态")
|
||
game_states.append({
|
||
'id': str(uuid.uuid4()),
|
||
'user_id': user_id,
|
||
'game_data': game_data,
|
||
'created_at': datetime.datetime.utcnow().isoformat(),
|
||
'updated_at': datetime.datetime.utcnow().isoformat()
|
||
})
|
||
|
||
save_game_states(game_states)
|
||
print(f"游戏状态保存完成")
|
||
|
||
# 更新会话状态
|
||
update_session(user_id, "playing")
|
||
|
||
return jsonify({'success': True})
|
||
|
||
# API: 加载游戏状态
|
||
@app.route('/api/load_game', methods=['GET'])
|
||
def api_load_game():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
|
||
# 获取所有游戏状态
|
||
game_states = get_game_states()
|
||
|
||
# 查找用户的游戏状态
|
||
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
|
||
|
||
# 获取用户的最后游戏年份
|
||
users = get_users()
|
||
user = next((u for u in users if u['id'] == user_id), None)
|
||
last_year = user.get('last_year', 41000) if user else 41000
|
||
|
||
if user_state:
|
||
# 确保游戏状态中包含最新的年份
|
||
game_data = user_state['game_data']
|
||
if isinstance(game_data, dict) and 'year' not in game_data:
|
||
game_data['year'] = last_year
|
||
return jsonify({'game_data': game_data, 'lastYear': last_year})
|
||
else:
|
||
return jsonify({'game_data': None, 'lastYear': last_year})
|
||
|
||
|
||
|
||
# API: 获取排行榜数据
|
||
@app.route('/api/leaderboard', methods=['GET'])
|
||
def api_leaderboard():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
# 获取所有用户,按高分排序
|
||
users = get_users()
|
||
leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True)
|
||
|
||
leaderboard = leaderboard[:100] # 添加这一行,限制为前100名
|
||
|
||
# 只返回需要的字段 (不再限制为前10名)
|
||
leaderboard_data = [
|
||
{
|
||
'username': user['username'],
|
||
'high_score': user.get('high_score', 0),
|
||
'total_games': user.get('total_games', 0),
|
||
'last_year': user.get('last_year', 41000)
|
||
}
|
||
for user in leaderboard # 移除了[:10]限制
|
||
]
|
||
|
||
return jsonify(leaderboard_data)
|
||
|
||
# 每小时自动清理过期会话的定时任务
|
||
def scheduled_cleanup():
|
||
while True:
|
||
try:
|
||
# 每小时清理一次过期会话
|
||
clean_expired_sessions()
|
||
# 休眠1小时
|
||
time.sleep(3600)
|
||
except Exception as e:
|
||
logger.error(f"定时清理任务失败: {str(e)}")
|
||
# 出错后仍然继续
|
||
time.sleep(60)
|
||
|
||
# 自制卡牌数据文件路径
|
||
CUSTOM_CARDS_FILE = os.path.join(DATA_DIR, 'custom_cards.json')
|
||
CARD_VOTES_FILE = os.path.join(DATA_DIR, 'card_votes.json')
|
||
|
||
# 读取自制卡牌数据
|
||
def get_custom_cards():
|
||
return safe_load_data(CUSTOM_CARDS_FILE, {"cards": []})
|
||
|
||
# 保存自制卡牌数据
|
||
def save_custom_cards(data):
|
||
return safe_save_data(CUSTOM_CARDS_FILE, data)
|
||
|
||
# 读取卡牌投票数据
|
||
def get_card_votes():
|
||
return safe_load_data(CARD_VOTES_FILE, {"user_votes": {}, "card_votes": {}})
|
||
|
||
# 保存卡牌投票数据
|
||
def save_card_votes(data):
|
||
return safe_save_data(CARD_VOTES_FILE, data)
|
||
|
||
# 自制卡牌页面路由
|
||
@app.route('/custom_cards')
|
||
def custom_cards():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
return render_template('custom_cards.html')
|
||
|
||
# 创建卡牌页面路由
|
||
@app.route('/create_card')
|
||
def create_card():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
return render_template('create_card.html')
|
||
|
||
# API: 获取当前用户ID
|
||
@app.route('/api/current_user', methods=['GET'])
|
||
def api_current_user():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
return jsonify({
|
||
'user_id': session['user_id'],
|
||
'username': session.get('username', '')
|
||
})
|
||
|
||
# API: 获取用户投票历史
|
||
@app.route('/api/user_votes', methods=['GET'])
|
||
def api_user_votes():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
votes_data = get_card_votes()
|
||
|
||
# 获取用户的投票记录
|
||
user_votes = votes_data.get('user_votes', {}).get(user_id, {})
|
||
|
||
return jsonify({
|
||
'votes': user_votes
|
||
})
|
||
|
||
# API: 获取自制卡牌列表
|
||
@app.route('/api/custom_cards', methods=['GET'])
|
||
def api_custom_cards():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
# 获取卡牌数据
|
||
cards_data = get_custom_cards()
|
||
cards = cards_data.get('cards', [])
|
||
|
||
# 获取投票数据
|
||
votes_data = get_card_votes()
|
||
card_votes = votes_data.get('card_votes', {})
|
||
|
||
# 给每个卡片添加投票信息
|
||
for card in cards:
|
||
card_id = card['id']
|
||
if card_id in card_votes:
|
||
card['upvotes'] = card_votes[card_id].get('upvotes', 0)
|
||
card['downvotes'] = card_votes[card_id].get('downvotes', 0)
|
||
else:
|
||
card['upvotes'] = 0
|
||
card['downvotes'] = 0
|
||
|
||
return jsonify({
|
||
'cards': cards
|
||
})
|
||
|
||
# API: 创建自制卡牌
|
||
@app.route('/api/create_custom_card', methods=['POST'])
|
||
def api_create_custom_card():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
username = session.get('username', '未知用户')
|
||
card_data = request.json
|
||
|
||
# 验证必要字段
|
||
required_fields = ['title', 'character', 'description', 'option_a', 'option_b']
|
||
for field in required_fields:
|
||
if field not in card_data:
|
||
return jsonify({'error': f'缺少必要字段: {field}'}), 400
|
||
|
||
# 验证字符长度
|
||
if len(card_data['title']) > 50:
|
||
return jsonify({'error': '卡牌标题不能超过50个字符'}), 400
|
||
|
||
if len(card_data['description']) > 200:
|
||
return jsonify({'error': '卡牌描述不能超过200个字符'}), 400
|
||
|
||
if len(card_data['option_a']['text']) > 50:
|
||
return jsonify({'error': '选项A文本不能超过50个字符'}), 400
|
||
|
||
if len(card_data['option_b']['text']) > 50:
|
||
return jsonify({'error': '选项B文本不能超过50个字符'}), 400
|
||
|
||
# 验证角色字段
|
||
character_fields = ['name', 'title', 'avatar']
|
||
for field in character_fields:
|
||
if field not in card_data['character']:
|
||
return jsonify({'error': f'缺少角色字段: {field}'}), 400
|
||
|
||
if len(card_data['character']['name']) > 30:
|
||
return jsonify({'error': '角色名称不能超过30个字符'}), 400
|
||
|
||
if len(card_data['character']['title']) > 30:
|
||
return jsonify({'error': '角色头衔不能超过30个字符'}), 400
|
||
|
||
if len(card_data['character']['avatar']) > 5:
|
||
return jsonify({'error': '角色图标不能超过5个字符'}), 400
|
||
|
||
# 验证效果值
|
||
option_a_effects = card_data['option_a']['effects']
|
||
option_b_effects = card_data['option_b']['effects']
|
||
|
||
# 验证效果值范围
|
||
for effect_name, effect_value in option_a_effects.items():
|
||
if abs(effect_value) > 25:
|
||
return jsonify({'error': f'选项A的{effect_name}效果值不能超过±25'}), 400
|
||
|
||
for effect_name, effect_value in option_b_effects.items():
|
||
if abs(effect_value) > 25:
|
||
return jsonify({'error': f'选项B的{effect_name}效果值不能超过±25'}), 400
|
||
|
||
# 验证效果值总和
|
||
option_a_total = sum(abs(v) for v in option_a_effects.values())
|
||
option_b_total = sum(abs(v) for v in option_b_effects.values())
|
||
|
||
if option_a_total > 80:
|
||
return jsonify({'error': '选项A的效果总和不能超过80'}), 400
|
||
|
||
if option_b_total > 80:
|
||
return jsonify({'error': '选项B的效果总和不能超过80'}), 400
|
||
|
||
# 获取现有卡牌数据
|
||
cards_data = get_custom_cards()
|
||
cards = cards_data.get('cards', [])
|
||
|
||
# 创建新卡牌
|
||
new_card = {
|
||
'id': str(uuid.uuid4()),
|
||
'title': card_data['title'],
|
||
'character': card_data['character'],
|
||
'description': card_data['description'],
|
||
'option_a': card_data['option_a'],
|
||
'option_b': card_data['option_b'],
|
||
'creator_id': user_id,
|
||
'creator_name': username,
|
||
'created_at': datetime.datetime.utcnow().isoformat()
|
||
}
|
||
|
||
# 添加到卡牌列表
|
||
cards.append(new_card)
|
||
cards_data['cards'] = cards
|
||
|
||
# 保存卡牌数据
|
||
save_custom_cards(cards_data)
|
||
|
||
# 获取投票数据
|
||
votes_data = get_card_votes()
|
||
card_votes = votes_data.get('card_votes', {})
|
||
|
||
# 初始化卡牌投票数据
|
||
card_votes[new_card['id']] = {
|
||
'upvotes': 0,
|
||
'downvotes': 0,
|
||
'users': {}
|
||
}
|
||
|
||
votes_data['card_votes'] = card_votes
|
||
save_card_votes(votes_data)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'card': new_card
|
||
})
|
||
|
||
# API: 投票自制卡牌
|
||
@app.route('/api/vote_card', methods=['POST'])
|
||
def api_vote_card():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
request_data = request.json
|
||
|
||
# 验证参数
|
||
if 'card_id' not in request_data or 'vote_type' not in request_data:
|
||
return jsonify({'error': '缺少必要参数'}), 400
|
||
|
||
card_id = request_data['card_id']
|
||
vote_type = request_data['vote_type']
|
||
|
||
# 验证投票类型
|
||
if vote_type not in ['upvote', 'downvote', 'none']:
|
||
return jsonify({'error': '无效的投票类型'}), 400
|
||
|
||
# 获取投票数据
|
||
votes_data = get_card_votes()
|
||
user_votes = votes_data.get('user_votes', {})
|
||
card_votes = votes_data.get('card_votes', {})
|
||
|
||
# 初始化用户投票数据
|
||
if user_id not in user_votes:
|
||
user_votes[user_id] = {}
|
||
|
||
# 获取卡牌数据
|
||
cards_data = get_custom_cards()
|
||
cards = cards_data.get('cards', [])
|
||
|
||
# 查找卡牌
|
||
card = next((c for c in cards if c['id'] == card_id), None)
|
||
if not card:
|
||
return jsonify({'error': '卡牌不存在'}), 404
|
||
|
||
# 初始化卡牌投票数据
|
||
if card_id not in card_votes:
|
||
card_votes[card_id] = {
|
||
'upvotes': 0,
|
||
'downvotes': 0,
|
||
'users': {}
|
||
}
|
||
|
||
# 获取用户之前的投票
|
||
previous_vote = user_votes.get(user_id, {}).get(card_id)
|
||
|
||
# 处理投票逻辑
|
||
if vote_type == 'none' or (previous_vote == vote_type):
|
||
# 取消投票
|
||
if previous_vote:
|
||
if previous_vote == 'upvote':
|
||
card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1)
|
||
else:
|
||
card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1)
|
||
|
||
# 从用户投票中移除
|
||
if card_id in user_votes[user_id]:
|
||
del user_votes[user_id][card_id]
|
||
|
||
# 从卡牌用户列表中移除
|
||
if user_id in card_votes[card_id]['users']:
|
||
del card_votes[card_id]['users'][user_id]
|
||
|
||
new_vote = None
|
||
else:
|
||
# 如果之前投过票,先取消之前的投票
|
||
if previous_vote:
|
||
if previous_vote == 'upvote':
|
||
card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1)
|
||
else:
|
||
card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1)
|
||
|
||
# 添加新投票
|
||
if vote_type == 'upvote':
|
||
card_votes[card_id]['upvotes'] += 1
|
||
else:
|
||
card_votes[card_id]['downvotes'] += 1
|
||
|
||
# 更新用户投票
|
||
user_votes[user_id][card_id] = vote_type
|
||
card_votes[card_id]['users'][user_id] = vote_type
|
||
new_vote = vote_type
|
||
|
||
# 保存投票数据
|
||
votes_data['user_votes'] = user_votes
|
||
votes_data['card_votes'] = card_votes
|
||
save_card_votes(votes_data)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'upvotes': card_votes[card_id]['upvotes'],
|
||
'downvotes': card_votes[card_id]['downvotes'],
|
||
'newVote': new_vote
|
||
})
|
||
|
||
# 管理员API:获取自制卡牌列表
|
||
@app.route('/api/admin/custom_cards', methods=['GET'])
|
||
def api_admin_custom_cards():
|
||
# 获取卡牌数据
|
||
cards_data = get_custom_cards()
|
||
cards = cards_data.get('cards', [])
|
||
|
||
# 获取投票数据
|
||
votes_data = get_card_votes()
|
||
card_votes = votes_data.get('card_votes', {})
|
||
|
||
# 给每个卡片添加投票信息
|
||
for card in cards:
|
||
card_id = card['id']
|
||
if card_id in card_votes:
|
||
card['upvotes'] = card_votes[card_id].get('upvotes', 0)
|
||
card['downvotes'] = card_votes[card_id].get('downvotes', 0)
|
||
else:
|
||
card['upvotes'] = 0
|
||
card['downvotes'] = 0
|
||
|
||
return jsonify({
|
||
'cards': cards
|
||
})
|
||
|
||
# 管理员API:删除自制卡牌
|
||
@app.route('/api/admin/delete_card/<card_id>', methods=['DELETE'])
|
||
def api_admin_delete_card(card_id):
|
||
# 获取卡牌数据
|
||
cards_data = get_custom_cards()
|
||
cards = cards_data.get('cards', [])
|
||
|
||
# 查找并删除卡牌
|
||
initial_count = len(cards)
|
||
cards = [c for c in cards if c['id'] != card_id]
|
||
|
||
if len(cards) < initial_count:
|
||
# 更新卡牌数据
|
||
cards_data['cards'] = cards
|
||
save_custom_cards(cards_data)
|
||
|
||
# 也从投票数据中删除
|
||
votes_data = get_card_votes()
|
||
card_votes = votes_data.get('card_votes', {})
|
||
user_votes = votes_data.get('user_votes', {})
|
||
|
||
# 从卡牌投票中删除
|
||
if card_id in card_votes:
|
||
del card_votes[card_id]
|
||
|
||
# 从用户投票中删除
|
||
for user_id in user_votes:
|
||
if card_id in user_votes[user_id]:
|
||
del user_votes[user_id][card_id]
|
||
|
||
votes_data['card_votes'] = card_votes
|
||
votes_data['user_votes'] = user_votes
|
||
save_card_votes(votes_data)
|
||
|
||
return jsonify({'success': True})
|
||
else:
|
||
return jsonify({'error': '卡牌不存在'}), 404
|
||
|
||
# 添加到现有管理员页面
|
||
@app.route('/admin/custom_cards')
|
||
def admin_custom_cards():
|
||
return render_template('admin/custom_cards.html')
|
||
|
||
# 成就页面路由
|
||
@app.route('/achievements')
|
||
def achievements_page():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
return render_template('achievements.html', current_user=session.get('username'))
|
||
|
||
# API: 获取随机死亡场景
|
||
# 修改API路由也添加更多日志输出
|
||
@app.route('/api/death_scenario/<death_type>', methods=['GET'])
|
||
def api_death_scenario(death_type):
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
print(f"用户 {user_id} 请求死亡场景: {death_type}")
|
||
|
||
# 验证死亡类型
|
||
valid_types = [
|
||
'loyalty_low', 'loyalty_high',
|
||
'chaos_low', 'chaos_high',
|
||
'population_low', 'population_high',
|
||
'military_low', 'military_high',
|
||
'resources_low', 'resources_high'
|
||
]
|
||
|
||
if death_type not in valid_types:
|
||
return jsonify({'error': '无效的死亡类型'}), 400
|
||
|
||
# 获取死亡场景数据
|
||
scenarios_data = get_death_methods()
|
||
|
||
# 筛选指定类型的场景
|
||
type_scenarios = [s for s in scenarios_data.get('death_scenarios', []) if s.get('death_type') == death_type]
|
||
|
||
if not type_scenarios:
|
||
return jsonify({'error': '未找到符合条件的死亡场景'}), 404
|
||
|
||
# 计算权重总和
|
||
total_weight = sum(s.get('weight', 100) for s in type_scenarios)
|
||
|
||
# 随机选择一个场景,考虑权重
|
||
rand_val = random.randint(1, total_weight)
|
||
|
||
# 根据权重选择
|
||
current_weight = 0
|
||
selected_scenario = None
|
||
|
||
for scenario in type_scenarios:
|
||
current_weight += scenario.get('weight', 100)
|
||
if rand_val <= current_weight:
|
||
selected_scenario = scenario
|
||
break
|
||
|
||
# 如果没有选中,就选第一个(理论上不应该发生)
|
||
if not selected_scenario and type_scenarios:
|
||
selected_scenario = type_scenarios[0]
|
||
|
||
if not selected_scenario:
|
||
return jsonify({'error': '未找到符合条件的死亡场景'}), 500
|
||
|
||
print(f"选中死亡场景: {selected_scenario['id']}")
|
||
|
||
# 解锁对应成就
|
||
unlock_result = unlock_achievement_for_death(user_id, selected_scenario['id'])
|
||
print(f"成就解锁结果: {unlock_result}")
|
||
|
||
return jsonify(selected_scenario)
|
||
|
||
def unlock_achievement_for_death(user_id, scenario_id):
|
||
# 添加调试日志
|
||
print(f"尝试为用户 {user_id} 解锁死亡场景 {scenario_id} 的成就")
|
||
|
||
# 获取成就数据
|
||
achievements_data = get_achievements_data()
|
||
|
||
# 正确构建成就ID - 死亡场景ID前加"death_"前缀
|
||
achievement_id = f"death_{scenario_id}"
|
||
|
||
# 查找匹配的成就
|
||
achievement = None
|
||
for ach in achievements_data.get('achievements', []):
|
||
if ach.get('id') == achievement_id or ach.get('death_scenario_id') == scenario_id:
|
||
achievement = ach
|
||
break
|
||
|
||
if not achievement:
|
||
print(f"没有找到与死亡场景 {scenario_id} 对应的成就")
|
||
return False
|
||
|
||
print(f"找到对应成就: {achievement['id']} - {achievement['name']}")
|
||
|
||
# 检查是否已解锁
|
||
user_achievements = get_user_achievements_data(user_id)
|
||
achievement_already_unlocked = False
|
||
|
||
for ua in user_achievements:
|
||
if isinstance(ua, dict) and ua.get('id') == achievement['id']:
|
||
achievement_already_unlocked = True
|
||
break
|
||
|
||
if achievement_already_unlocked:
|
||
print(f"成就 {achievement['id']} 已经解锁")
|
||
return True
|
||
|
||
# 解锁成就
|
||
result = add_user_achievement(user_id, achievement['id'])
|
||
print(f"成就解锁结果: {result}")
|
||
|
||
return result
|
||
|
||
# 修改 get_user_achievements_data 函数
|
||
def get_user_achievements_data(user_id):
|
||
# 添加调试日志
|
||
print(f"获取用户 {user_id} 的成就数据")
|
||
|
||
# 用户成就不存在单独的JSON文件,而是存储在游戏状态中
|
||
game_states = get_game_states()
|
||
|
||
# 查找用户的游戏状态
|
||
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
|
||
|
||
if user_state and 'game_data' in user_state:
|
||
print(f"找到用户游戏状态")
|
||
if 'achievements' in user_state['game_data']:
|
||
achievements = user_state['game_data']['achievements']
|
||
print(f"找到用户成就数据,共 {len(achievements)} 项")
|
||
return achievements
|
||
else:
|
||
print(f"用户游戏状态中没有成就数据")
|
||
return []
|
||
else:
|
||
print(f"未找到用户 {user_id} 的游戏状态")
|
||
return []
|
||
|
||
# 添加用户成就 - 修复版本
|
||
def add_user_achievement(user_id, achievement_id):
|
||
# 添加调试日志
|
||
print(f"添加成就 {achievement_id} 到用户 {user_id}")
|
||
|
||
# 获取游戏状态
|
||
game_states = get_game_states()
|
||
|
||
# 查找用户的游戏状态
|
||
user_state_index = None
|
||
for i, state in enumerate(game_states):
|
||
if state.get('user_id') == user_id:
|
||
user_state_index = i
|
||
break
|
||
|
||
if user_state_index is None:
|
||
# 用户没有游戏状态,创建一个新的
|
||
print(f"用户 {user_id} 没有游戏状态,创建新的")
|
||
new_state = {
|
||
'id': str(uuid.uuid4()),
|
||
'user_id': user_id,
|
||
'game_data': {
|
||
'achievements': [
|
||
{
|
||
'id': achievement_id,
|
||
'unlock_time': datetime.datetime.utcnow().isoformat()
|
||
}
|
||
]
|
||
},
|
||
'created_at': datetime.datetime.utcnow().isoformat(),
|
||
'updated_at': datetime.datetime.utcnow().isoformat()
|
||
}
|
||
game_states.append(new_state)
|
||
print(f"已创建新游戏状态和成就记录")
|
||
else:
|
||
# 更新现有游戏状态
|
||
user_state = game_states[user_state_index]
|
||
print(f"找到用户现有游戏状态")
|
||
|
||
# 确保游戏数据和成就列表存在
|
||
if 'game_data' not in user_state:
|
||
print("游戏状态中没有game_data字段,创建")
|
||
user_state['game_data'] = {}
|
||
|
||
if 'achievements' not in user_state['game_data']:
|
||
print("game_data中没有achievements字段,创建")
|
||
user_state['game_data']['achievements'] = []
|
||
|
||
# 检查成就是否已存在
|
||
achievement_exists = False
|
||
for ach in user_state['game_data']['achievements']:
|
||
if isinstance(ach, dict) and ach.get('id') == achievement_id:
|
||
achievement_exists = True
|
||
break
|
||
|
||
if not achievement_exists:
|
||
# 添加新成就
|
||
print(f"添加新成就 {achievement_id}")
|
||
user_state['game_data']['achievements'].append({
|
||
'id': achievement_id,
|
||
'unlock_time': datetime.datetime.utcnow().isoformat()
|
||
})
|
||
else:
|
||
print(f"成就 {achievement_id} 已经存在,不重复添加")
|
||
|
||
# 更新时间戳
|
||
user_state['updated_at'] = datetime.datetime.utcnow().isoformat()
|
||
|
||
# 更新游戏状态列表
|
||
game_states[user_state_index] = user_state
|
||
|
||
# 保存游戏状态
|
||
result = save_game_states(game_states)
|
||
print(f"保存游戏状态结果: {result}")
|
||
|
||
return result
|
||
|
||
# API: 获取成就列表
|
||
@app.route('/api/achievements', methods=['GET'])
|
||
def api_achievements():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
# 获取成就数据
|
||
achievements_data = get_achievements_data()
|
||
user_achievements = get_user_achievements_data(session['user_id'])
|
||
|
||
# 准备返回数据
|
||
achievements = []
|
||
|
||
for achievement in achievements_data.get('achievements', []):
|
||
# 检查是否解锁
|
||
unlock_info = next((a for a in user_achievements if a.get('id') == achievement['id']), None)
|
||
|
||
if unlock_info:
|
||
# 已解锁
|
||
achievements.append({
|
||
'id': achievement['id'],
|
||
'name': achievement['name'],
|
||
'description': achievement['description'],
|
||
'icon': achievement['icon'],
|
||
'unlocked': True,
|
||
'unlock_time': unlock_info.get('unlock_time')
|
||
})
|
||
else:
|
||
# 未解锁
|
||
hidden_info = {
|
||
'id': achievement['id'],
|
||
'name': achievement['name'] if not achievement.get('hidden') else "???",
|
||
'description': "???" if achievement.get('hidden') else achievement['description'],
|
||
'icon': achievement['icon'],
|
||
'unlocked': False
|
||
}
|
||
achievements.append(hidden_info)
|
||
|
||
return jsonify({
|
||
'achievements': achievements,
|
||
'total': len(achievements_data.get('achievements', [])),
|
||
'unlocked': len(user_achievements)
|
||
})
|
||
|
||
# 这段代码应该放在app_sqlite.py的导入部分之后,但在使用装饰器之前
|
||
# 请找到适当位置添加,或确保这个装饰器已经在文件中定义
|
||
|
||
# 导入functools.wraps用于装饰器
|
||
from functools import wraps
|
||
|
||
# 管理员会话保护装饰器
|
||
def admin_required(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
if ADMIN_SESSION_KEY not in session:
|
||
return redirect(url_for('admin_login'))
|
||
return f(*args, **kwargs)
|
||
return decorated_function
|
||
|
||
# API: 获取死亡场景列表(管理员)
|
||
@app.route('/api/death_scenarios', methods=['GET'])
|
||
@admin_required
|
||
def api_death_scenarios():
|
||
death_methods = get_death_methods()
|
||
return jsonify(death_methods)
|
||
|
||
@app.after_request
|
||
def add_cache_headers(response):
|
||
# 只对静态文件设置缓存
|
||
if request.path.startswith('/static/'):
|
||
# 设置一天的缓存时间
|
||
response.headers['Cache-Control'] = 'public, max-age=43200'
|
||
return response
|
||
|
||
# 管理员会话保护装饰器
|
||
def admin_required(f):
|
||
@wraps(f)
|
||
def decorated_function(*args, **kwargs):
|
||
if ADMIN_SESSION_KEY not in session:
|
||
return redirect(url_for('admin_login'))
|
||
return f(*args, **kwargs)
|
||
return decorated_function
|
||
|
||
# 管理员登录页面
|
||
@app.route('/admin', methods=['GET', 'POST'])
|
||
def admin_login():
|
||
error = None
|
||
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
|
||
if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
|
||
session[ADMIN_SESSION_KEY] = True
|
||
return redirect(url_for('admin_index'))
|
||
else:
|
||
error = '账号或密码错误'
|
||
|
||
return render_template('admin/login.html', error=error)
|
||
|
||
# 管理员主页
|
||
@app.route('/admin/index')
|
||
@admin_required
|
||
def admin_index():
|
||
return render_template('admin/index.html')
|
||
|
||
# 管理员登出
|
||
@app.route('/admin/logout')
|
||
def admin_logout():
|
||
session.pop(ADMIN_SESSION_KEY, None)
|
||
return redirect(url_for('admin_login'))
|
||
|
||
# 角色管理页面
|
||
@app.route('/admin/characters')
|
||
@admin_required
|
||
def admin_characters():
|
||
return render_template('admin/characters.html')
|
||
|
||
# 数据面板页面
|
||
@app.route('/admin/dashboard')
|
||
@admin_required
|
||
def admin_dashboard():
|
||
return render_template('admin/dashboard.html')
|
||
|
||
@app.route('/admin/weight_management')
|
||
@admin_required
|
||
def admin_weight_management():
|
||
return render_template('weight_management.html')
|
||
|
||
# API: 获取所有角色
|
||
@app.route('/api/admin/characters', methods=['GET'])
|
||
@admin_required
|
||
def api_admin_characters():
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
return jsonify({'characters': characters})
|
||
|
||
# API: 获取特定角色
|
||
@app.route('/api/admin/character/<character_id>', methods=['GET'])
|
||
@admin_required
|
||
def api_admin_character(character_id):
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 搜索角色
|
||
character = next((c for c in characters if str(c['id']) == character_id), None)
|
||
|
||
if character:
|
||
return jsonify({'character': character})
|
||
else:
|
||
return jsonify({'error': '角色不存在'}), 404
|
||
|
||
# API: 添加或更新角色
|
||
@app.route('/api/admin/character', methods=['POST'])
|
||
@admin_required
|
||
def api_admin_save_character():
|
||
new_character = request.json
|
||
|
||
# 验证必填字段
|
||
required_fields = ['id', 'name', 'title', 'avatar', 'type']
|
||
for field in required_fields:
|
||
if field not in new_character:
|
||
return jsonify({'error': f'缺少必填字段: {field}'}), 400
|
||
|
||
# 获取现有卡牌数据
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 检查是更新还是新增
|
||
existing_index = -1
|
||
for i, char in enumerate(characters):
|
||
if str(char['id']) == str(new_character['id']):
|
||
existing_index = i
|
||
break
|
||
|
||
# 确保events字段存在
|
||
if 'events' not in new_character:
|
||
new_character['events'] = []
|
||
|
||
if existing_index >= 0:
|
||
# 更新现有角色
|
||
characters[existing_index] = new_character
|
||
else:
|
||
# 添加新角色
|
||
characters.append(new_character)
|
||
|
||
# 更新cards_data
|
||
cards_data['characters'] = characters
|
||
|
||
# 保存到文件
|
||
save_cards(cards_data)
|
||
|
||
return jsonify({'success': True, 'character': new_character})
|
||
|
||
# API: 删除角色
|
||
@app.route('/api/admin/character/<character_id>', methods=['DELETE'])
|
||
@admin_required
|
||
def api_admin_delete_character(character_id):
|
||
# 获取现有卡牌数据
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 查找并删除角色
|
||
original_count = len(characters)
|
||
characters = [c for c in characters if str(c['id']) != character_id]
|
||
|
||
if len(characters) < original_count:
|
||
# 角色被删除
|
||
cards_data['characters'] = characters
|
||
save_cards(cards_data)
|
||
return jsonify({'success': True})
|
||
else:
|
||
return jsonify({'error': '角色不存在'}), 404
|
||
|
||
# API: 添加事件到角色
|
||
@app.route('/api/admin/character/<character_id>/event', methods=['POST'])
|
||
@admin_required
|
||
def api_admin_add_character_event(character_id):
|
||
new_event = request.json
|
||
|
||
# 验证必填字段
|
||
required_fields = ['id', 'text', 'optionA', 'optionB']
|
||
for field in required_fields:
|
||
if field not in new_event:
|
||
return jsonify({'error': f'缺少必填字段: {field}'}), 400
|
||
|
||
# 验证选项A和B的必填字段
|
||
for option in ['optionA', 'optionB']:
|
||
if 'text' not in new_event[option] or 'effects' not in new_event[option]:
|
||
return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400
|
||
|
||
# 获取现有卡牌数据
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 查找角色
|
||
for character in characters:
|
||
if str(character['id']) == character_id:
|
||
# 确保角色有events字段
|
||
if 'events' not in character:
|
||
character['events'] = []
|
||
|
||
# 检查是否已存在该ID的事件
|
||
event_exists = any(e['id'] == new_event['id'] for e in character['events'])
|
||
if event_exists:
|
||
return jsonify({'error': f'事件ID已存在: {new_event["id"]}'}), 400
|
||
|
||
# 添加新事件
|
||
character['events'].append(new_event)
|
||
|
||
# 保存到文件
|
||
save_cards(cards_data)
|
||
|
||
return jsonify({'success': True, 'event': new_event})
|
||
|
||
return jsonify({'error': '角色不存在'}), 404
|
||
|
||
# API: 更新角色的事件
|
||
@app.route('/api/admin/character/<character_id>/event/<event_id>', methods=['PUT'])
|
||
@admin_required
|
||
def api_admin_update_character_event(character_id, event_id):
|
||
updated_event = request.json
|
||
|
||
# 验证必填字段
|
||
required_fields = ['id', 'text', 'optionA', 'optionB']
|
||
for field in required_fields:
|
||
if field not in updated_event:
|
||
return jsonify({'error': f'缺少必填字段: {field}'}), 400
|
||
|
||
# 验证选项A和B的必填字段
|
||
for option in ['optionA', 'optionB']:
|
||
if 'text' not in updated_event[option] or 'effects' not in updated_event[option]:
|
||
return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400
|
||
|
||
# 获取现有卡牌数据
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 查找角色和事件
|
||
for character in characters:
|
||
if str(character['id']) == character_id and 'events' in character:
|
||
for i, event in enumerate(character['events']):
|
||
if str(event['id']) == event_id:
|
||
# 更新事件
|
||
character['events'][i] = updated_event
|
||
|
||
# 保存到文件
|
||
save_cards(cards_data)
|
||
|
||
return jsonify({'success': True, 'event': updated_event})
|
||
|
||
return jsonify({'error': '角色或事件不存在'}), 404
|
||
|
||
# API: 删除角色的事件
|
||
@app.route('/api/admin/character/<character_id>/event/<event_id>', methods=['DELETE'])
|
||
@admin_required
|
||
def api_admin_delete_character_event(character_id, event_id):
|
||
# 获取现有卡牌数据
|
||
cards_data = get_cards()
|
||
characters = cards_data.get('characters', [])
|
||
|
||
# 查找角色和事件
|
||
for character in characters:
|
||
if str(character['id']) == character_id and 'events' in character:
|
||
original_count = len(character['events'])
|
||
character['events'] = [e for e in character['events'] if str(e['id']) != event_id]
|
||
|
||
if len(character['events']) < original_count:
|
||
# 事件被删除
|
||
save_cards(cards_data)
|
||
return jsonify({'success': True})
|
||
|
||
return jsonify({'error': '角色或事件不存在'}), 404
|
||
|
||
# 完全重写 api_admin_stats 函数
|
||
|
||
@app.route('/api/admin/stats', methods=['GET'])
|
||
@admin_required
|
||
def api_admin_stats():
|
||
# 获取用户数据
|
||
users = get_users()
|
||
|
||
# 获取当前会话数据
|
||
sessions = get_sessions()
|
||
active_sessions = sessions.get('active_sessions', [])
|
||
|
||
# 获取当前日期(只取日期部分,不含时间)
|
||
now = datetime.datetime.now()
|
||
today_date_str = now.strftime('%Y-%m-%d')
|
||
|
||
logger.info(f"当前日期: {today_date_str}")
|
||
|
||
# 重写今日新用户计算逻辑
|
||
today_users = []
|
||
for user in users:
|
||
try:
|
||
# 获取用户创建日期
|
||
created_at_str = user.get('created_at', '')
|
||
|
||
# 如果日期字符串为空,跳过
|
||
if not created_at_str:
|
||
continue
|
||
|
||
# 只取日期部分进行比较(跳过时间部分)
|
||
created_date_str = created_at_str.split('T')[0]
|
||
if 'T' not in created_at_str:
|
||
created_date_str = created_at_str.split(' ')[0]
|
||
|
||
logger.info(f"用户: {user.get('username')}, 创建日期: {created_date_str}")
|
||
|
||
# 简单直接的字符串比较
|
||
if created_date_str == today_date_str:
|
||
today_users.append({
|
||
'username': user.get('username', '未知用户'),
|
||
'created_at': created_at_str
|
||
})
|
||
logger.info(f"今日新用户: {user.get('username')}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理用户日期时出错: {e}")
|
||
continue
|
||
|
||
# 强制添加测试用户进行验证
|
||
test_user = {
|
||
'username': '测试用户',
|
||
'created_at': datetime.datetime.now().isoformat()
|
||
}
|
||
today_users.append(test_user)
|
||
logger.info(f"添加测试用户: {test_user}")
|
||
|
||
# 计算在线用户
|
||
online_users = []
|
||
playing_users = 0
|
||
|
||
for session_data in active_sessions:
|
||
user_id = session_data.get('user_id')
|
||
user = next((u for u in users if u['id'] == user_id), None)
|
||
if user:
|
||
status = session_data.get('status', '未知')
|
||
if status == 'playing':
|
||
playing_users += 1
|
||
|
||
online_users.append({
|
||
'username': user.get('username', '未知用户'),
|
||
'status': status,
|
||
'ip_address': session_data.get('ip_address', '未知'),
|
||
'last_activity': session_data.get('last_activity', '')
|
||
})
|
||
|
||
# 返回统计数据
|
||
stats_data = {
|
||
'total_users': len(users),
|
||
'online_users_count': len(online_users),
|
||
'today_users_count': len(today_users),
|
||
'playing_users_count': playing_users,
|
||
'online_users': online_users,
|
||
'today_users': today_users
|
||
}
|
||
|
||
logger.info(f"返回统计数据: {stats_data}")
|
||
|
||
return jsonify(stats_data)
|
||
# 添加到 app_sqlite.py 中
|
||
|
||
# 检查单次统治时间成就
|
||
def check_single_reign_achievements(user_id, reign_time):
|
||
"""检查单次统治时间成就并解锁"""
|
||
print(f"检查用户 {user_id} 的单次统治时间成就: {reign_time}年")
|
||
|
||
# 获取成就数据
|
||
achievements_data = get_achievements_data()
|
||
|
||
# 筛选单次统治时间成就
|
||
single_reign_achievements = [a for a in achievements_data.get('achievements', [])
|
||
if a.get('achievement_type') == 'reign_single']
|
||
|
||
# 按要求排序
|
||
single_reign_achievements.sort(key=lambda x: x.get('requirement', 0))
|
||
|
||
# 检查每个成就
|
||
unlocked_any = False
|
||
for achievement in single_reign_achievements:
|
||
requirement = achievement.get('requirement', 0)
|
||
if reign_time >= requirement:
|
||
# 解锁成就
|
||
unlocked = unlock_achievement(user_id, achievement['id'])
|
||
if unlocked:
|
||
print(f"解锁单次统治时间成就: {achievement['name']} (要求: {requirement}年)")
|
||
unlocked_any = True
|
||
|
||
return unlocked_any
|
||
|
||
# 检查总统治时间成就
|
||
def check_total_reign_achievements(user_id, total_reign_time):
|
||
"""检查总统治时间成就并解锁"""
|
||
print(f"检查用户 {user_id} 的总统治时间成就: {total_reign_time}年")
|
||
|
||
# 获取成就数据
|
||
achievements_data = get_achievements_data()
|
||
|
||
# 筛选总统治时间成就
|
||
total_reign_achievements = [a for a in achievements_data.get('achievements', [])
|
||
if a.get('achievement_type') == 'reign_total']
|
||
|
||
# 按要求排序
|
||
total_reign_achievements.sort(key=lambda x: x.get('requirement', 0))
|
||
|
||
# 检查每个成就
|
||
unlocked_any = False
|
||
for achievement in total_reign_achievements:
|
||
requirement = achievement.get('requirement', 0)
|
||
if total_reign_time >= requirement:
|
||
# 解锁成就
|
||
unlocked = unlock_achievement(user_id, achievement['id'])
|
||
if unlocked:
|
||
print(f"解锁总统治时间成就: {achievement['name']} (要求: {requirement}年)")
|
||
unlocked_any = True
|
||
|
||
return unlocked_any
|
||
|
||
# 统一解锁成就函数
|
||
def unlock_achievement(user_id, achievement_id):
|
||
"""解锁成就,并返回是否成功解锁"""
|
||
# 获取成就数据
|
||
achievements_data = get_achievements_data()
|
||
|
||
# 查找成就
|
||
achievement = next((a for a in achievements_data.get('achievements', [])
|
||
if a['id'] == achievement_id), None)
|
||
|
||
if not achievement:
|
||
print(f"成就不存在: {achievement_id}")
|
||
return False
|
||
|
||
# 检查是否已解锁
|
||
user_achievements = get_user_achievements_data(user_id)
|
||
|
||
for user_ach in user_achievements:
|
||
if isinstance(user_ach, dict) and user_ach.get('id') == achievement_id:
|
||
# 已解锁
|
||
print(f"成就 {achievement['name']} 已经解锁过")
|
||
return False
|
||
|
||
# 解锁成就
|
||
result = add_user_achievement(user_id, achievement_id)
|
||
if result:
|
||
print(f"成功解锁成就: {achievement['name']}")
|
||
return result
|
||
|
||
# 修改 API: 更新高分 - 添加成就检查
|
||
@app.route('/api/update_score', methods=['POST'])
|
||
def api_update_score():
|
||
if 'user_id' not in session:
|
||
return jsonify({'error': '未授权'}), 401
|
||
|
||
user_id = session['user_id']
|
||
score = request.json.get('score', 0)
|
||
year = request.json.get('year', 41000) # 获取当前年份
|
||
|
||
print(f"更新用户 {user_id} 的分数: {score}, 年份: {year}")
|
||
|
||
# 获取所有用户
|
||
users = get_users()
|
||
|
||
# 查找当前用户
|
||
user = next((u for u in users if u['id'] == user_id), None)
|
||
|
||
if user:
|
||
# 计算总统治时间
|
||
old_total_games = user.get('total_games', 0)
|
||
old_high_score = user.get('high_score', 0)
|
||
|
||
# 检查单次统治时间成就
|
||
check_single_reign_achievements(user_id, score)
|
||
|
||
# 只有当新分数更高时才更新
|
||
if score > old_high_score:
|
||
user['high_score'] = score
|
||
|
||
# 增加游戏次数
|
||
user['total_games'] = old_total_games + 1
|
||
|
||
# 计算总统治时间
|
||
# 方法:总游戏次数 * 平均每次统治时间 (为简化计算,可以估算)
|
||
total_reign_time = sum(u.get('high_score', 0) for u in users if u['id'] == user_id)
|
||
if 'total_reign_time' not in user:
|
||
# 如果没有记录总统治时间,则初始化为当前总计
|
||
user['total_reign_time'] = total_reign_time + score
|
||
else:
|
||
# 否则加上本次统治时间
|
||
user['total_reign_time'] = user['total_reign_time'] + score
|
||
|
||
# 检查总统治时间成就
|
||
check_total_reign_achievements(user_id, user['total_reign_time'])
|
||
|
||
# 更新最后游戏年份
|
||
user['last_year'] = year
|
||
|
||
# 更新最后游戏时间
|
||
user['last_game_time'] = datetime.datetime.utcnow().isoformat()
|
||
|
||
save_users(users)
|
||
|
||
# 更新会话状态为空闲
|
||
update_session(user_id, "idle")
|
||
|
||
return jsonify({'success': True})
|
||
else:
|
||
return jsonify({'error': '用户不存在'}), 404
|
||
|
||
# 初始化应用
|
||
init_data_files()
|
||
|
||
# 启动定时清理线程
|
||
cleanup_thread = threading.Thread(target=scheduled_cleanup, daemon=True)
|
||
cleanup_thread.start()
|
||
|
||
# 启动服务器
|
||
if __name__ == '__main__':
|
||
app.run(debug=True, port=5001) # 使用5005端口避开常用端口 |