import sys, os sys.path.append(os.path.dirname(os.path.abspath(__file__))) from flask import Flask, render_template, request, redirect, url_for, jsonify, session import json import os import uuid import datetime import time import shutil import logging import threading import hashlib import random from werkzeug.security import generate_password_hash, check_password_hash import uuid import datetime import os import json import logging import datetime import os import json from functools import wraps # 配置日志 logging.basicConfig( filename='game_app.log', level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('game_app') app = Flask(__name__) app.secret_key = os.urandom(24) # 用于会话加密 # 数据文件路径 DATA_DIR = 'data' BACKUP_DIR = os.path.join(DATA_DIR, 'backup') USERS_FILE = os.path.join(DATA_DIR, 'users.json') GAME_STATES_FILE = os.path.join(DATA_DIR, 'game_states.json') CARDS_FILE = os.path.join(DATA_DIR, 'cards.json') SESSIONS_FILE = os.path.join(DATA_DIR, 'sessions.json') # 会话文件 DEATH_METHODS_FILE = os.path.join(DATA_DIR, 'death_methods.json') # 死亡方式文件 ACHIEVEMENTS_FILE = os.path.join(DATA_DIR, 'achievements.json') # 成就系统文件 # 管理员账号和密码 - 硬编码 ADMIN_USERNAME = "13991190618" ADMIN_PASSWORD = "@CuiYuJian040618" # 管理员会话键名 ADMIN_SESSION_KEY = 'admin_logged_in' # 文件锁,防止并发写入冲突 file_locks = { USERS_FILE: threading.RLock(), GAME_STATES_FILE: threading.RLock(), SESSIONS_FILE: threading.RLock(), CARDS_FILE: threading.RLock(), DEATH_METHODS_FILE: threading.RLock(), ACHIEVEMENTS_FILE: threading.RLock() } # 确保数据目录存在 def ensure_dirs_exist(): os.makedirs(DATA_DIR, exist_ok=True) os.makedirs(BACKUP_DIR, exist_ok=True) # 创建备份文件名 def get_backup_filename(original_file, include_timestamp=False): base_name = os.path.basename(original_file) if include_timestamp: timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') return os.path.join(BACKUP_DIR, f"{base_name}.{timestamp}.bak") else: return os.path.join(BACKUP_DIR, f"{base_name}.bak") # 管理备份文件,保留最近的N个备份 def manage_backups(file_pattern, max_backups=5): try: backup_files = [f for f in os.listdir(BACKUP_DIR) if f.startswith(file_pattern)] backup_files.sort(reverse=True) # 最新的文件排在前面 # 如果备份文件超过最大数量,删除最旧的 if len(backup_files) > max_backups: for old_file in backup_files[max_backups:]: try: os.remove(os.path.join(BACKUP_DIR, old_file)) logger.info(f"删除旧备份文件: {old_file}") except Exception as e: logger.error(f"删除旧备份文件失败: {old_file}, 错误: {str(e)}") except Exception as e: logger.error(f"管理备份文件失败, 错误: {str(e)}") # 安全地保存数据到文件 def safe_save_data(file_path, data, create_backup=True): # 获取文件锁 lock = file_locks.get(file_path, threading.RLock()) with lock: # 确保目录存在 ensure_dirs_exist() # 如果原文件存在且需要备份,先创建备份 if create_backup and os.path.exists(file_path): try: # 创建带时间戳的备份文件,每小时最多一个 current_hour = datetime.datetime.now().strftime('%Y%m%d_%H') backup_marker = hashlib.md5(file_path.encode()).hexdigest()[:8] backup_file = os.path.join(BACKUP_DIR, f"{os.path.basename(file_path)}.{current_hour}.{backup_marker}.bak") # 检查是否已经有该小时的备份 if not os.path.exists(backup_file): shutil.copy2(file_path, backup_file) logger.info(f"创建备份: {backup_file}") # 管理旧备份 file_prefix = os.path.basename(file_path) manage_backups(file_prefix, max_backups=10) except Exception as e: logger.error(f"创建备份失败: {str(e)}") # 先写入临时文件 temp_file = file_path + '.tmp' try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) f.flush() os.fsync(f.fileno()) # 确保数据写入磁盘 # 替换原文件 os.replace(temp_file, file_path) logger.info(f"成功保存数据到: {file_path}") return True except Exception as e: logger.error(f"保存数据到 {file_path} 失败: {str(e)}") if os.path.exists(temp_file): try: os.remove(temp_file) except: pass return False # 从文件或备份中安全地读取数据 def safe_load_data(file_path, default_value=None): # 获取文件锁 lock = file_locks.get(file_path, threading.RLock()) with lock: # 尝试读取原始文件 try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: logger.warning(f"读取文件 {file_path} 失败: {str(e)}, 尝试从备份恢复") # 尝试从最新备份恢复 backup_file = get_backup_filename(file_path) if os.path.exists(backup_file): try: with open(backup_file, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"从备份 {backup_file} 恢复数据成功") # 恢复成功后,保存回原始文件 safe_save_data(file_path, data, create_backup=False) return data except Exception as backup_error: logger.error(f"从备份恢复失败: {str(backup_error)}") # 如果指定了默认值,则返回默认值 if default_value is not None: return default_value # 否则,根据文件类型返回适当的空结构 if file_path == USERS_FILE: return [] elif file_path == GAME_STATES_FILE: return [] elif file_path == SESSIONS_FILE: return {"active_sessions": []} elif file_path == DEATH_METHODS_FILE: return {"death_scenarios": []} elif file_path == ACHIEVEMENTS_FILE: return {"achievements": []} else: return {} # 初始化数据文件 def init_data_files(): ensure_dirs_exist() # 初始化用户文件 if not os.path.exists(USERS_FILE): safe_save_data(USERS_FILE, [], create_backup=False) # 初始化游戏状态文件 if not os.path.exists(GAME_STATES_FILE): safe_save_data(GAME_STATES_FILE, [], create_backup=False) # 初始化会话文件 if not os.path.exists(SESSIONS_FILE): safe_save_data(SESSIONS_FILE, {"active_sessions": []}, create_backup=False) # 初始化死亡方式文件 if not os.path.exists(DEATH_METHODS_FILE): safe_save_data(DEATH_METHODS_FILE, {"death_scenarios": []}, create_backup=False) # 初始化成就文件 if not os.path.exists(ACHIEVEMENTS_FILE): safe_save_data(ACHIEVEMENTS_FILE, {"achievements": []}, create_backup=False) ensure_character_weights() def ensure_character_weights(): """确保所有角色都有权重字段,没有的话添加默认权重5""" try: cards_data = get_cards() characters = cards_data.get('characters', []) # 记录是否有修改 has_changes = False # 检查每个角色 for character in characters: if 'weight' not in character: # 添加默认权重 character['weight'] = 100 has_changes = True # 如果有修改,保存回文件 if has_changes: save_cards(cards_data) logger.info("已为所有角色添加默认权重字段") except Exception as e: logger.error(f"确保角色权重时出错: {str(e)}") # 读取用户数据 def get_users(): return safe_load_data(USERS_FILE, []) # 保存用户数据 def save_users(users): return safe_save_data(USERS_FILE, users) # 读取游戏状态数据 def get_game_states(): return safe_load_data(GAME_STATES_FILE, []) # 保存游戏状态数据 def save_game_states(states): return safe_save_data(GAME_STATES_FILE, states) # 读取卡牌数据 def get_cards(): default_cards = { "characters": [], "gameSettings": { "initialValues": { "loyalty": 50, "chaos": 50, "population": 50, "military": 50, "resources": 50 }, "maxValues": { "loyalty": 100, "chaos": 100, "population": 100, "military": 100, "resources": 100 }, "minValues": { "loyalty": 0, "chaos": 0, "population": 0, "military": 0, "resources": 0 }, "statusIcons": { "loyalty": "🦅", "chaos": "🌀", "population": "👥", "military": "🔫", "resources": "⚙️" } } } return safe_load_data(CARDS_FILE, default_cards) # 保存卡牌数据 def save_cards(cards_data): return safe_save_data(CARDS_FILE, cards_data) # 读取会话数据 def get_sessions(): return safe_load_data(SESSIONS_FILE, {"active_sessions": []}) # 保存会话数据 def save_sessions(sessions): return safe_save_data(SESSIONS_FILE, sessions) # 读取死亡方式数据 def get_death_methods(): return safe_load_data(DEATH_METHODS_FILE, {"death_scenarios": []}) # 保存死亡方式数据 def save_death_methods(data): return safe_save_data(DEATH_METHODS_FILE, data) # 读取成就数据 def get_achievements_data(): return safe_load_data(ACHIEVEMENTS_FILE, {"achievements": []}) # 保存成就数据 def save_achievements_data(data): return safe_save_data(ACHIEVEMENTS_FILE, data) # 更新会话 def update_session(user_id, status=None): sessions = get_sessions() active_sessions = sessions.get("active_sessions", []) current_time = datetime.datetime.utcnow().isoformat() # 查找当前用户会话 session_found = False for session_data in active_sessions: if session_data.get("user_id") == user_id: # 更新现有会话 session_data["last_activity"] = current_time if status: session_data["status"] = status session_found = True break # 保存更新后的会话数据 if session_found: sessions["active_sessions"] = active_sessions save_sessions(sessions) return True return False # 添加会话 def add_session(user_id, ip_address): sessions = get_sessions() active_sessions = sessions.get("active_sessions", []) # 检查是否已存在该用户的会话 for session_data in active_sessions: if session_data.get("user_id") == user_id: # 更新现有会话 session_data["login_time"] = datetime.datetime.utcnow().isoformat() session_data["last_activity"] = datetime.datetime.utcnow().isoformat() session_data["ip_address"] = ip_address session_data["status"] = "idle" sessions["active_sessions"] = active_sessions save_sessions(sessions) return True # 创建新会话 new_session = { "user_id": user_id, "login_time": datetime.datetime.utcnow().isoformat(), "last_activity": datetime.datetime.utcnow().isoformat(), "ip_address": request.remote_addr, "status": "idle" } active_sessions.append(new_session) sessions["active_sessions"] = active_sessions save_sessions(sessions) return True # 移除会话 def remove_session(user_id): sessions = get_sessions() active_sessions = sessions.get("active_sessions", []) # 过滤掉指定用户的会话 original_count = len(active_sessions) active_sessions = [s for s in active_sessions if s.get("user_id") != user_id] if len(active_sessions) < original_count: sessions["active_sessions"] = active_sessions save_sessions(sessions) return True return False # 清理过期会话 - 超过2小时未活动的会话 def clean_expired_sessions(): try: sessions = get_sessions() active_sessions = sessions.get("active_sessions", []) current_time = datetime.datetime.utcnow() # 只过滤掉超过2小时未活动的会话 original_count = len(active_sessions) filtered_sessions = [] for session in active_sessions: try: last_activity = datetime.datetime.fromisoformat(session.get("last_activity", "")) time_diff = (current_time - last_activity).total_seconds() / 60 # 小时差 if time_diff < 5: # 2小时超时 filtered_sessions.append(session) except (ValueError, TypeError) as e: logger.warning(f"处理会话时间格式错误: {str(e)}") # 如果时间格式错误,保留该会话而不是直接删除 filtered_sessions.append(session) # 只有在真正有会话被删除时才更新数据 if len(filtered_sessions) != original_count: logger.info(f"清理过期会话: 清理前{original_count}个,清理后{len(filtered_sessions)}个") sessions["active_sessions"] = filtered_sessions save_sessions(sessions) return True except Exception as e: logger.error(f"清理过期会话失败: {str(e)}") return False # 首页,显示排行榜 @app.route('/') def index(): if 'user_id' not in session: return redirect(url_for('login')) # 获取所有用户数据,按照最高分排序 users = get_users() leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True) # 更新用户活动状态 update_session(session.get('user_id')) return render_template('index.html', leaderboard=leaderboard, current_user=session.get('username')) # 登录页面 @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': username = request.form['username'] password = request.form['password'] users = get_users() user = next((u for u in users if u['username'] == username), None) if user and check_password_hash(user['password'], password): session['user_id'] = user['id'] session['username'] = user['username'] # 添加会话记录 add_session(user['id'], request.remote_addr) return redirect(url_for('index')) else: error = '用户名或密码错误' return render_template('login.html', error=error) # 注册页面 @app.route('/register', methods=['GET', 'POST']) def register(): error = None if request.method == 'POST': username = request.form['username'] password = request.form['password'] users = get_users() # 检查用户名是否已存在 if any(u['username'] == username for u in users): error = '用户名已存在' else: # 创建新用户 new_user = { 'id': str(uuid.uuid4()), 'username': username, 'password': generate_password_hash(password), 'created_at': datetime.datetime.utcnow().isoformat(), 'high_score': 0, 'total_games': 0, 'last_year': 41000 # 添加最后游戏年份字段,默认为41000 } users.append(new_user) save_users(users) # 自动登录 session['user_id'] = new_user['id'] session['username'] = new_user['username'] # 添加会话记录 add_session(new_user['id'], request.remote_addr) return redirect(url_for('index')) return render_template('register.html', error=error) # 注销 @app.route('/logout') def logout(): if 'user_id' in session: # 移除会话记录 remove_session(session['user_id']) # 清除会话 session.pop('user_id', None) session.pop('username', None) return redirect(url_for('login')) # 游戏页面 @app.route('/game') def game(): if 'user_id' not in session: return redirect(url_for('login')) # 获取游戏数据 cards_data = get_cards() # 获取用户最后游戏年份 users = get_users() user = next((u for u in users if u['id'] == session['user_id']), None) last_year = user.get('last_year', 41000) if user else 41000 # 添加最后年份到数据中 game_data = { 'cards': cards_data, 'events': cards_data.get('events', []), 'statuses': cards_data.get('statuses', []), 'lastYear': last_year } # 更新用户状态为游戏中 sessions = get_sessions() active_sessions = sessions.get("active_sessions", []) for session_data in active_sessions: if session_data.get("user_id") == session['user_id']: session_data["status"] = "playing" session_data["game_start_time"] = datetime.datetime.utcnow().isoformat() break sessions["active_sessions"] = active_sessions save_sessions(sessions) return render_template('game.html', current_user=session.get('username'), game_data_json=json.dumps(game_data)) # API: 获取卡牌数据 @app.route('/api/cards', methods=['GET']) def api_get_cards(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 cards_data = get_cards() return jsonify(cards_data) # API: 保存游戏状态 @app.route('/api/save_game', methods=['POST']) def api_save_game(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] game_data = request.json # 添加调试日志 print(f"保存用户 {user_id} 的游戏状态") # 更新用户的最后游戏年份 if 'year' in game_data: users = get_users() user = next((u for u in users if u['id'] == user_id), None) if user: user['last_year'] = game_data['year'] user['last_game_time'] = datetime.datetime.utcnow().isoformat() save_users(users) # 获取现有游戏状态 game_states = get_game_states() # 查找用户的游戏状态 user_state = next((s for s in game_states if s['user_id'] == user_id), None) if user_state: # 更新现有状态,但保留成就数据 print(f"更新用户现有游戏状态,保留成就数据") # 保存之前的成就数据 achievements_data = [] if 'game_data' in user_state and 'achievements' in user_state['game_data']: achievements_data = user_state['game_data']['achievements'] print(f"保留现有成就数据,共 {len(achievements_data)} 项") # 更新游戏数据 user_state['game_data'] = game_data # 恢复成就数据 if achievements_data: user_state['game_data']['achievements'] = achievements_data user_state['updated_at'] = datetime.datetime.utcnow().isoformat() else: # 创建新状态 print(f"用户没有现有游戏状态,创建新状态") game_states.append({ 'id': str(uuid.uuid4()), 'user_id': user_id, 'game_data': game_data, 'created_at': datetime.datetime.utcnow().isoformat(), 'updated_at': datetime.datetime.utcnow().isoformat() }) save_game_states(game_states) print(f"游戏状态保存完成") # 更新会话状态 update_session(user_id, "playing") return jsonify({'success': True}) # API: 加载游戏状态 @app.route('/api/load_game', methods=['GET']) def api_load_game(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] # 获取所有游戏状态 game_states = get_game_states() # 查找用户的游戏状态 user_state = next((s for s in game_states if s['user_id'] == user_id), None) # 获取用户的最后游戏年份 users = get_users() user = next((u for u in users if u['id'] == user_id), None) last_year = user.get('last_year', 41000) if user else 41000 if user_state: # 确保游戏状态中包含最新的年份 game_data = user_state['game_data'] if isinstance(game_data, dict) and 'year' not in game_data: game_data['year'] = last_year return jsonify({'game_data': game_data, 'lastYear': last_year}) else: return jsonify({'game_data': None, 'lastYear': last_year}) # API: 获取排行榜数据 @app.route('/api/leaderboard', methods=['GET']) def api_leaderboard(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 # 获取所有用户,按高分排序 users = get_users() leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True) leaderboard = leaderboard[:100] # 添加这一行,限制为前100名 # 只返回需要的字段 (不再限制为前10名) leaderboard_data = [ { 'username': user['username'], 'high_score': user.get('high_score', 0), 'total_games': user.get('total_games', 0), 'last_year': user.get('last_year', 41000) } for user in leaderboard # 移除了[:10]限制 ] return jsonify(leaderboard_data) # 每小时自动清理过期会话的定时任务 def scheduled_cleanup(): while True: try: # 每小时清理一次过期会话 clean_expired_sessions() # 休眠1小时 time.sleep(3600) except Exception as e: logger.error(f"定时清理任务失败: {str(e)}") # 出错后仍然继续 time.sleep(60) # 自制卡牌数据文件路径 CUSTOM_CARDS_FILE = os.path.join(DATA_DIR, 'custom_cards.json') CARD_VOTES_FILE = os.path.join(DATA_DIR, 'card_votes.json') # 读取自制卡牌数据 def get_custom_cards(): return safe_load_data(CUSTOM_CARDS_FILE, {"cards": []}) # 保存自制卡牌数据 def save_custom_cards(data): return safe_save_data(CUSTOM_CARDS_FILE, data) # 读取卡牌投票数据 def get_card_votes(): return safe_load_data(CARD_VOTES_FILE, {"user_votes": {}, "card_votes": {}}) # 保存卡牌投票数据 def save_card_votes(data): return safe_save_data(CARD_VOTES_FILE, data) # 自制卡牌页面路由 @app.route('/custom_cards') def custom_cards(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('custom_cards.html') # 创建卡牌页面路由 @app.route('/create_card') def create_card(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('create_card.html') # API: 获取当前用户ID @app.route('/api/current_user', methods=['GET']) def api_current_user(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 return jsonify({ 'user_id': session['user_id'], 'username': session.get('username', '') }) # API: 获取用户投票历史 @app.route('/api/user_votes', methods=['GET']) def api_user_votes(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] votes_data = get_card_votes() # 获取用户的投票记录 user_votes = votes_data.get('user_votes', {}).get(user_id, {}) return jsonify({ 'votes': user_votes }) # API: 获取自制卡牌列表 @app.route('/api/custom_cards', methods=['GET']) def api_custom_cards(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 # 获取卡牌数据 cards_data = get_custom_cards() cards = cards_data.get('cards', []) # 获取投票数据 votes_data = get_card_votes() card_votes = votes_data.get('card_votes', {}) # 给每个卡片添加投票信息 for card in cards: card_id = card['id'] if card_id in card_votes: card['upvotes'] = card_votes[card_id].get('upvotes', 0) card['downvotes'] = card_votes[card_id].get('downvotes', 0) else: card['upvotes'] = 0 card['downvotes'] = 0 return jsonify({ 'cards': cards }) # API: 创建自制卡牌 @app.route('/api/create_custom_card', methods=['POST']) def api_create_custom_card(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] username = session.get('username', '未知用户') card_data = request.json # 验证必要字段 required_fields = ['title', 'character', 'description', 'option_a', 'option_b'] for field in required_fields: if field not in card_data: return jsonify({'error': f'缺少必要字段: {field}'}), 400 # 验证字符长度 if len(card_data['title']) > 50: return jsonify({'error': '卡牌标题不能超过50个字符'}), 400 if len(card_data['description']) > 200: return jsonify({'error': '卡牌描述不能超过200个字符'}), 400 if len(card_data['option_a']['text']) > 50: return jsonify({'error': '选项A文本不能超过50个字符'}), 400 if len(card_data['option_b']['text']) > 50: return jsonify({'error': '选项B文本不能超过50个字符'}), 400 # 验证角色字段 character_fields = ['name', 'title', 'avatar'] for field in character_fields: if field not in card_data['character']: return jsonify({'error': f'缺少角色字段: {field}'}), 400 if len(card_data['character']['name']) > 30: return jsonify({'error': '角色名称不能超过30个字符'}), 400 if len(card_data['character']['title']) > 30: return jsonify({'error': '角色头衔不能超过30个字符'}), 400 if len(card_data['character']['avatar']) > 5: return jsonify({'error': '角色图标不能超过5个字符'}), 400 # 验证效果值 option_a_effects = card_data['option_a']['effects'] option_b_effects = card_data['option_b']['effects'] # 验证效果值范围 for effect_name, effect_value in option_a_effects.items(): if abs(effect_value) > 25: return jsonify({'error': f'选项A的{effect_name}效果值不能超过±25'}), 400 for effect_name, effect_value in option_b_effects.items(): if abs(effect_value) > 25: return jsonify({'error': f'选项B的{effect_name}效果值不能超过±25'}), 400 # 验证效果值总和 option_a_total = sum(abs(v) for v in option_a_effects.values()) option_b_total = sum(abs(v) for v in option_b_effects.values()) if option_a_total > 80: return jsonify({'error': '选项A的效果总和不能超过80'}), 400 if option_b_total > 80: return jsonify({'error': '选项B的效果总和不能超过80'}), 400 # 获取现有卡牌数据 cards_data = get_custom_cards() cards = cards_data.get('cards', []) # 创建新卡牌 new_card = { 'id': str(uuid.uuid4()), 'title': card_data['title'], 'character': card_data['character'], 'description': card_data['description'], 'option_a': card_data['option_a'], 'option_b': card_data['option_b'], 'creator_id': user_id, 'creator_name': username, 'created_at': datetime.datetime.utcnow().isoformat() } # 添加到卡牌列表 cards.append(new_card) cards_data['cards'] = cards # 保存卡牌数据 save_custom_cards(cards_data) # 获取投票数据 votes_data = get_card_votes() card_votes = votes_data.get('card_votes', {}) # 初始化卡牌投票数据 card_votes[new_card['id']] = { 'upvotes': 0, 'downvotes': 0, 'users': {} } votes_data['card_votes'] = card_votes save_card_votes(votes_data) return jsonify({ 'success': True, 'card': new_card }) # API: 投票自制卡牌 @app.route('/api/vote_card', methods=['POST']) def api_vote_card(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] request_data = request.json # 验证参数 if 'card_id' not in request_data or 'vote_type' not in request_data: return jsonify({'error': '缺少必要参数'}), 400 card_id = request_data['card_id'] vote_type = request_data['vote_type'] # 验证投票类型 if vote_type not in ['upvote', 'downvote', 'none']: return jsonify({'error': '无效的投票类型'}), 400 # 获取投票数据 votes_data = get_card_votes() user_votes = votes_data.get('user_votes', {}) card_votes = votes_data.get('card_votes', {}) # 初始化用户投票数据 if user_id not in user_votes: user_votes[user_id] = {} # 获取卡牌数据 cards_data = get_custom_cards() cards = cards_data.get('cards', []) # 查找卡牌 card = next((c for c in cards if c['id'] == card_id), None) if not card: return jsonify({'error': '卡牌不存在'}), 404 # 初始化卡牌投票数据 if card_id not in card_votes: card_votes[card_id] = { 'upvotes': 0, 'downvotes': 0, 'users': {} } # 获取用户之前的投票 previous_vote = user_votes.get(user_id, {}).get(card_id) # 处理投票逻辑 if vote_type == 'none' or (previous_vote == vote_type): # 取消投票 if previous_vote: if previous_vote == 'upvote': card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1) else: card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1) # 从用户投票中移除 if card_id in user_votes[user_id]: del user_votes[user_id][card_id] # 从卡牌用户列表中移除 if user_id in card_votes[card_id]['users']: del card_votes[card_id]['users'][user_id] new_vote = None else: # 如果之前投过票,先取消之前的投票 if previous_vote: if previous_vote == 'upvote': card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1) else: card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1) # 添加新投票 if vote_type == 'upvote': card_votes[card_id]['upvotes'] += 1 else: card_votes[card_id]['downvotes'] += 1 # 更新用户投票 user_votes[user_id][card_id] = vote_type card_votes[card_id]['users'][user_id] = vote_type new_vote = vote_type # 保存投票数据 votes_data['user_votes'] = user_votes votes_data['card_votes'] = card_votes save_card_votes(votes_data) return jsonify({ 'success': True, 'upvotes': card_votes[card_id]['upvotes'], 'downvotes': card_votes[card_id]['downvotes'], 'newVote': new_vote }) # 管理员API:获取自制卡牌列表 @app.route('/api/admin/custom_cards', methods=['GET']) def api_admin_custom_cards(): # 获取卡牌数据 cards_data = get_custom_cards() cards = cards_data.get('cards', []) # 获取投票数据 votes_data = get_card_votes() card_votes = votes_data.get('card_votes', {}) # 给每个卡片添加投票信息 for card in cards: card_id = card['id'] if card_id in card_votes: card['upvotes'] = card_votes[card_id].get('upvotes', 0) card['downvotes'] = card_votes[card_id].get('downvotes', 0) else: card['upvotes'] = 0 card['downvotes'] = 0 return jsonify({ 'cards': cards }) # 管理员API:删除自制卡牌 @app.route('/api/admin/delete_card/', methods=['DELETE']) def api_admin_delete_card(card_id): # 获取卡牌数据 cards_data = get_custom_cards() cards = cards_data.get('cards', []) # 查找并删除卡牌 initial_count = len(cards) cards = [c for c in cards if c['id'] != card_id] if len(cards) < initial_count: # 更新卡牌数据 cards_data['cards'] = cards save_custom_cards(cards_data) # 也从投票数据中删除 votes_data = get_card_votes() card_votes = votes_data.get('card_votes', {}) user_votes = votes_data.get('user_votes', {}) # 从卡牌投票中删除 if card_id in card_votes: del card_votes[card_id] # 从用户投票中删除 for user_id in user_votes: if card_id in user_votes[user_id]: del user_votes[user_id][card_id] votes_data['card_votes'] = card_votes votes_data['user_votes'] = user_votes save_card_votes(votes_data) return jsonify({'success': True}) else: return jsonify({'error': '卡牌不存在'}), 404 # 添加到现有管理员页面 @app.route('/admin/custom_cards') def admin_custom_cards(): return render_template('admin/custom_cards.html') # 成就页面路由 @app.route('/achievements') def achievements_page(): if 'user_id' not in session: return redirect(url_for('login')) return render_template('achievements.html', current_user=session.get('username')) # API: 获取随机死亡场景 # 修改API路由也添加更多日志输出 @app.route('/api/death_scenario/', methods=['GET']) def api_death_scenario(death_type): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] print(f"用户 {user_id} 请求死亡场景: {death_type}") # 验证死亡类型 valid_types = [ 'loyalty_low', 'loyalty_high', 'chaos_low', 'chaos_high', 'population_low', 'population_high', 'military_low', 'military_high', 'resources_low', 'resources_high' ] if death_type not in valid_types: return jsonify({'error': '无效的死亡类型'}), 400 # 获取死亡场景数据 scenarios_data = get_death_methods() # 筛选指定类型的场景 type_scenarios = [s for s in scenarios_data.get('death_scenarios', []) if s.get('death_type') == death_type] if not type_scenarios: return jsonify({'error': '未找到符合条件的死亡场景'}), 404 # 计算权重总和 total_weight = sum(s.get('weight', 100) for s in type_scenarios) # 随机选择一个场景,考虑权重 rand_val = random.randint(1, total_weight) # 根据权重选择 current_weight = 0 selected_scenario = None for scenario in type_scenarios: current_weight += scenario.get('weight', 100) if rand_val <= current_weight: selected_scenario = scenario break # 如果没有选中,就选第一个(理论上不应该发生) if not selected_scenario and type_scenarios: selected_scenario = type_scenarios[0] if not selected_scenario: return jsonify({'error': '未找到符合条件的死亡场景'}), 500 print(f"选中死亡场景: {selected_scenario['id']}") # 解锁对应成就 unlock_result = unlock_achievement_for_death(user_id, selected_scenario['id']) print(f"成就解锁结果: {unlock_result}") return jsonify(selected_scenario) def unlock_achievement_for_death(user_id, scenario_id): # 添加调试日志 print(f"尝试为用户 {user_id} 解锁死亡场景 {scenario_id} 的成就") # 获取成就数据 achievements_data = get_achievements_data() # 正确构建成就ID - 死亡场景ID前加"death_"前缀 achievement_id = f"death_{scenario_id}" # 查找匹配的成就 achievement = None for ach in achievements_data.get('achievements', []): if ach.get('id') == achievement_id or ach.get('death_scenario_id') == scenario_id: achievement = ach break if not achievement: print(f"没有找到与死亡场景 {scenario_id} 对应的成就") return False print(f"找到对应成就: {achievement['id']} - {achievement['name']}") # 检查是否已解锁 user_achievements = get_user_achievements_data(user_id) achievement_already_unlocked = False for ua in user_achievements: if isinstance(ua, dict) and ua.get('id') == achievement['id']: achievement_already_unlocked = True break if achievement_already_unlocked: print(f"成就 {achievement['id']} 已经解锁") return True # 解锁成就 result = add_user_achievement(user_id, achievement['id']) print(f"成就解锁结果: {result}") return result # 修改 get_user_achievements_data 函数 def get_user_achievements_data(user_id): # 添加调试日志 print(f"获取用户 {user_id} 的成就数据") # 用户成就不存在单独的JSON文件,而是存储在游戏状态中 game_states = get_game_states() # 查找用户的游戏状态 user_state = next((s for s in game_states if s['user_id'] == user_id), None) if user_state and 'game_data' in user_state: print(f"找到用户游戏状态") if 'achievements' in user_state['game_data']: achievements = user_state['game_data']['achievements'] print(f"找到用户成就数据,共 {len(achievements)} 项") return achievements else: print(f"用户游戏状态中没有成就数据") return [] else: print(f"未找到用户 {user_id} 的游戏状态") return [] # 添加用户成就 - 修复版本 def add_user_achievement(user_id, achievement_id): # 添加调试日志 print(f"添加成就 {achievement_id} 到用户 {user_id}") # 获取游戏状态 game_states = get_game_states() # 查找用户的游戏状态 user_state_index = None for i, state in enumerate(game_states): if state.get('user_id') == user_id: user_state_index = i break if user_state_index is None: # 用户没有游戏状态,创建一个新的 print(f"用户 {user_id} 没有游戏状态,创建新的") new_state = { 'id': str(uuid.uuid4()), 'user_id': user_id, 'game_data': { 'achievements': [ { 'id': achievement_id, 'unlock_time': datetime.datetime.utcnow().isoformat() } ] }, 'created_at': datetime.datetime.utcnow().isoformat(), 'updated_at': datetime.datetime.utcnow().isoformat() } game_states.append(new_state) print(f"已创建新游戏状态和成就记录") else: # 更新现有游戏状态 user_state = game_states[user_state_index] print(f"找到用户现有游戏状态") # 确保游戏数据和成就列表存在 if 'game_data' not in user_state: print("游戏状态中没有game_data字段,创建") user_state['game_data'] = {} if 'achievements' not in user_state['game_data']: print("game_data中没有achievements字段,创建") user_state['game_data']['achievements'] = [] # 检查成就是否已存在 achievement_exists = False for ach in user_state['game_data']['achievements']: if isinstance(ach, dict) and ach.get('id') == achievement_id: achievement_exists = True break if not achievement_exists: # 添加新成就 print(f"添加新成就 {achievement_id}") user_state['game_data']['achievements'].append({ 'id': achievement_id, 'unlock_time': datetime.datetime.utcnow().isoformat() }) else: print(f"成就 {achievement_id} 已经存在,不重复添加") # 更新时间戳 user_state['updated_at'] = datetime.datetime.utcnow().isoformat() # 更新游戏状态列表 game_states[user_state_index] = user_state # 保存游戏状态 result = save_game_states(game_states) print(f"保存游戏状态结果: {result}") return result # API: 获取成就列表 @app.route('/api/achievements', methods=['GET']) def api_achievements(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 # 获取成就数据 achievements_data = get_achievements_data() user_achievements = get_user_achievements_data(session['user_id']) # 准备返回数据 achievements = [] for achievement in achievements_data.get('achievements', []): # 检查是否解锁 unlock_info = next((a for a in user_achievements if a.get('id') == achievement['id']), None) if unlock_info: # 已解锁 achievements.append({ 'id': achievement['id'], 'name': achievement['name'], 'description': achievement['description'], 'icon': achievement['icon'], 'unlocked': True, 'unlock_time': unlock_info.get('unlock_time') }) else: # 未解锁 hidden_info = { 'id': achievement['id'], 'name': achievement['name'] if not achievement.get('hidden') else "???", 'description': "???" if achievement.get('hidden') else achievement['description'], 'icon': achievement['icon'], 'unlocked': False } achievements.append(hidden_info) return jsonify({ 'achievements': achievements, 'total': len(achievements_data.get('achievements', [])), 'unlocked': len(user_achievements) }) # 这段代码应该放在app_sqlite.py的导入部分之后,但在使用装饰器之前 # 请找到适当位置添加,或确保这个装饰器已经在文件中定义 # 导入functools.wraps用于装饰器 from functools import wraps # 管理员会话保护装饰器 def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if ADMIN_SESSION_KEY not in session: return redirect(url_for('admin_login')) return f(*args, **kwargs) return decorated_function # API: 获取死亡场景列表(管理员) @app.route('/api/death_scenarios', methods=['GET']) @admin_required def api_death_scenarios(): death_methods = get_death_methods() return jsonify(death_methods) @app.after_request def add_cache_headers(response): # 只对静态文件设置缓存 if request.path.startswith('/static/'): # 设置一天的缓存时间 response.headers['Cache-Control'] = 'public, max-age=43200' return response # 管理员会话保护装饰器 def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if ADMIN_SESSION_KEY not in session: return redirect(url_for('admin_login')) return f(*args, **kwargs) return decorated_function # 管理员登录页面 @app.route('/admin', methods=['GET', 'POST']) def admin_login(): error = None if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username == ADMIN_USERNAME and password == ADMIN_PASSWORD: session[ADMIN_SESSION_KEY] = True return redirect(url_for('admin_index')) else: error = '账号或密码错误' return render_template('admin/login.html', error=error) # 管理员主页 @app.route('/admin/index') @admin_required def admin_index(): return render_template('admin/index.html') # 管理员登出 @app.route('/admin/logout') def admin_logout(): session.pop(ADMIN_SESSION_KEY, None) return redirect(url_for('admin_login')) # 角色管理页面 @app.route('/admin/characters') @admin_required def admin_characters(): return render_template('admin/characters.html') # 数据面板页面 @app.route('/admin/dashboard') @admin_required def admin_dashboard(): return render_template('admin/dashboard.html') @app.route('/admin/weight_management') @admin_required def admin_weight_management(): return render_template('weight_management.html') # API: 获取所有角色 @app.route('/api/admin/characters', methods=['GET']) @admin_required def api_admin_characters(): cards_data = get_cards() characters = cards_data.get('characters', []) return jsonify({'characters': characters}) # API: 获取特定角色 @app.route('/api/admin/character/', methods=['GET']) @admin_required def api_admin_character(character_id): cards_data = get_cards() characters = cards_data.get('characters', []) # 搜索角色 character = next((c for c in characters if str(c['id']) == character_id), None) if character: return jsonify({'character': character}) else: return jsonify({'error': '角色不存在'}), 404 # API: 添加或更新角色 @app.route('/api/admin/character', methods=['POST']) @admin_required def api_admin_save_character(): new_character = request.json # 验证必填字段 required_fields = ['id', 'name', 'title', 'avatar', 'type'] for field in required_fields: if field not in new_character: return jsonify({'error': f'缺少必填字段: {field}'}), 400 # 获取现有卡牌数据 cards_data = get_cards() characters = cards_data.get('characters', []) # 检查是更新还是新增 existing_index = -1 for i, char in enumerate(characters): if str(char['id']) == str(new_character['id']): existing_index = i break # 确保events字段存在 if 'events' not in new_character: new_character['events'] = [] if existing_index >= 0: # 更新现有角色 characters[existing_index] = new_character else: # 添加新角色 characters.append(new_character) # 更新cards_data cards_data['characters'] = characters # 保存到文件 save_cards(cards_data) return jsonify({'success': True, 'character': new_character}) # API: 删除角色 @app.route('/api/admin/character/', methods=['DELETE']) @admin_required def api_admin_delete_character(character_id): # 获取现有卡牌数据 cards_data = get_cards() characters = cards_data.get('characters', []) # 查找并删除角色 original_count = len(characters) characters = [c for c in characters if str(c['id']) != character_id] if len(characters) < original_count: # 角色被删除 cards_data['characters'] = characters save_cards(cards_data) return jsonify({'success': True}) else: return jsonify({'error': '角色不存在'}), 404 # API: 添加事件到角色 @app.route('/api/admin/character//event', methods=['POST']) @admin_required def api_admin_add_character_event(character_id): new_event = request.json # 验证必填字段 required_fields = ['id', 'text', 'optionA', 'optionB'] for field in required_fields: if field not in new_event: return jsonify({'error': f'缺少必填字段: {field}'}), 400 # 验证选项A和B的必填字段 for option in ['optionA', 'optionB']: if 'text' not in new_event[option] or 'effects' not in new_event[option]: return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400 # 获取现有卡牌数据 cards_data = get_cards() characters = cards_data.get('characters', []) # 查找角色 for character in characters: if str(character['id']) == character_id: # 确保角色有events字段 if 'events' not in character: character['events'] = [] # 检查是否已存在该ID的事件 event_exists = any(e['id'] == new_event['id'] for e in character['events']) if event_exists: return jsonify({'error': f'事件ID已存在: {new_event["id"]}'}), 400 # 添加新事件 character['events'].append(new_event) # 保存到文件 save_cards(cards_data) return jsonify({'success': True, 'event': new_event}) return jsonify({'error': '角色不存在'}), 404 # API: 更新角色的事件 @app.route('/api/admin/character//event/', methods=['PUT']) @admin_required def api_admin_update_character_event(character_id, event_id): updated_event = request.json # 验证必填字段 required_fields = ['id', 'text', 'optionA', 'optionB'] for field in required_fields: if field not in updated_event: return jsonify({'error': f'缺少必填字段: {field}'}), 400 # 验证选项A和B的必填字段 for option in ['optionA', 'optionB']: if 'text' not in updated_event[option] or 'effects' not in updated_event[option]: return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400 # 获取现有卡牌数据 cards_data = get_cards() characters = cards_data.get('characters', []) # 查找角色和事件 for character in characters: if str(character['id']) == character_id and 'events' in character: for i, event in enumerate(character['events']): if str(event['id']) == event_id: # 更新事件 character['events'][i] = updated_event # 保存到文件 save_cards(cards_data) return jsonify({'success': True, 'event': updated_event}) return jsonify({'error': '角色或事件不存在'}), 404 # API: 删除角色的事件 @app.route('/api/admin/character//event/', methods=['DELETE']) @admin_required def api_admin_delete_character_event(character_id, event_id): # 获取现有卡牌数据 cards_data = get_cards() characters = cards_data.get('characters', []) # 查找角色和事件 for character in characters: if str(character['id']) == character_id and 'events' in character: original_count = len(character['events']) character['events'] = [e for e in character['events'] if str(e['id']) != event_id] if len(character['events']) < original_count: # 事件被删除 save_cards(cards_data) return jsonify({'success': True}) return jsonify({'error': '角色或事件不存在'}), 404 # 完全重写 api_admin_stats 函数 @app.route('/api/admin/stats', methods=['GET']) @admin_required def api_admin_stats(): # 获取用户数据 users = get_users() # 获取当前会话数据 sessions = get_sessions() active_sessions = sessions.get('active_sessions', []) # 获取当前日期(只取日期部分,不含时间) now = datetime.datetime.now() today_date_str = now.strftime('%Y-%m-%d') logger.info(f"当前日期: {today_date_str}") # 重写今日新用户计算逻辑 today_users = [] for user in users: try: # 获取用户创建日期 created_at_str = user.get('created_at', '') # 如果日期字符串为空,跳过 if not created_at_str: continue # 只取日期部分进行比较(跳过时间部分) created_date_str = created_at_str.split('T')[0] if 'T' not in created_at_str: created_date_str = created_at_str.split(' ')[0] logger.info(f"用户: {user.get('username')}, 创建日期: {created_date_str}") # 简单直接的字符串比较 if created_date_str == today_date_str: today_users.append({ 'username': user.get('username', '未知用户'), 'created_at': created_at_str }) logger.info(f"今日新用户: {user.get('username')}") except Exception as e: logger.error(f"处理用户日期时出错: {e}") continue # 强制添加测试用户进行验证 test_user = { 'username': '测试用户', 'created_at': datetime.datetime.now().isoformat() } today_users.append(test_user) logger.info(f"添加测试用户: {test_user}") # 计算在线用户 online_users = [] playing_users = 0 for session_data in active_sessions: user_id = session_data.get('user_id') user = next((u for u in users if u['id'] == user_id), None) if user: status = session_data.get('status', '未知') if status == 'playing': playing_users += 1 online_users.append({ 'username': user.get('username', '未知用户'), 'status': status, 'ip_address': session_data.get('ip_address', '未知'), 'last_activity': session_data.get('last_activity', '') }) # 返回统计数据 stats_data = { 'total_users': len(users), 'online_users_count': len(online_users), 'today_users_count': len(today_users), 'playing_users_count': playing_users, 'online_users': online_users, 'today_users': today_users } logger.info(f"返回统计数据: {stats_data}") return jsonify(stats_data) # 添加到 app_sqlite.py 中 # 检查单次统治时间成就 def check_single_reign_achievements(user_id, reign_time): """检查单次统治时间成就并解锁""" print(f"检查用户 {user_id} 的单次统治时间成就: {reign_time}年") # 获取成就数据 achievements_data = get_achievements_data() # 筛选单次统治时间成就 single_reign_achievements = [a for a in achievements_data.get('achievements', []) if a.get('achievement_type') == 'reign_single'] # 按要求排序 single_reign_achievements.sort(key=lambda x: x.get('requirement', 0)) # 检查每个成就 unlocked_any = False for achievement in single_reign_achievements: requirement = achievement.get('requirement', 0) if reign_time >= requirement: # 解锁成就 unlocked = unlock_achievement(user_id, achievement['id']) if unlocked: print(f"解锁单次统治时间成就: {achievement['name']} (要求: {requirement}年)") unlocked_any = True return unlocked_any # 检查总统治时间成就 def check_total_reign_achievements(user_id, total_reign_time): """检查总统治时间成就并解锁""" print(f"检查用户 {user_id} 的总统治时间成就: {total_reign_time}年") # 获取成就数据 achievements_data = get_achievements_data() # 筛选总统治时间成就 total_reign_achievements = [a for a in achievements_data.get('achievements', []) if a.get('achievement_type') == 'reign_total'] # 按要求排序 total_reign_achievements.sort(key=lambda x: x.get('requirement', 0)) # 检查每个成就 unlocked_any = False for achievement in total_reign_achievements: requirement = achievement.get('requirement', 0) if total_reign_time >= requirement: # 解锁成就 unlocked = unlock_achievement(user_id, achievement['id']) if unlocked: print(f"解锁总统治时间成就: {achievement['name']} (要求: {requirement}年)") unlocked_any = True return unlocked_any # 统一解锁成就函数 def unlock_achievement(user_id, achievement_id): """解锁成就,并返回是否成功解锁""" # 获取成就数据 achievements_data = get_achievements_data() # 查找成就 achievement = next((a for a in achievements_data.get('achievements', []) if a['id'] == achievement_id), None) if not achievement: print(f"成就不存在: {achievement_id}") return False # 检查是否已解锁 user_achievements = get_user_achievements_data(user_id) for user_ach in user_achievements: if isinstance(user_ach, dict) and user_ach.get('id') == achievement_id: # 已解锁 print(f"成就 {achievement['name']} 已经解锁过") return False # 解锁成就 result = add_user_achievement(user_id, achievement_id) if result: print(f"成功解锁成就: {achievement['name']}") return result # 修改 API: 更新高分 - 添加成就检查 @app.route('/api/update_score', methods=['POST']) def api_update_score(): if 'user_id' not in session: return jsonify({'error': '未授权'}), 401 user_id = session['user_id'] score = request.json.get('score', 0) year = request.json.get('year', 41000) # 获取当前年份 print(f"更新用户 {user_id} 的分数: {score}, 年份: {year}") # 获取所有用户 users = get_users() # 查找当前用户 user = next((u for u in users if u['id'] == user_id), None) if user: # 计算总统治时间 old_total_games = user.get('total_games', 0) old_high_score = user.get('high_score', 0) # 检查单次统治时间成就 check_single_reign_achievements(user_id, score) # 只有当新分数更高时才更新 if score > old_high_score: user['high_score'] = score # 增加游戏次数 user['total_games'] = old_total_games + 1 # 计算总统治时间 # 方法:总游戏次数 * 平均每次统治时间 (为简化计算,可以估算) total_reign_time = sum(u.get('high_score', 0) for u in users if u['id'] == user_id) if 'total_reign_time' not in user: # 如果没有记录总统治时间,则初始化为当前总计 user['total_reign_time'] = total_reign_time + score else: # 否则加上本次统治时间 user['total_reign_time'] = user['total_reign_time'] + score # 检查总统治时间成就 check_total_reign_achievements(user_id, user['total_reign_time']) # 更新最后游戏年份 user['last_year'] = year # 更新最后游戏时间 user['last_game_time'] = datetime.datetime.utcnow().isoformat() save_users(users) # 更新会话状态为空闲 update_session(user_id, "idle") return jsonify({'success': True}) else: return jsonify({'error': '用户不存在'}), 404 # 初始化应用 init_data_files() # 启动定时清理线程 cleanup_thread = threading.Thread(target=scheduled_cleanup, daemon=True) cleanup_thread.start() # 启动服务器 if __name__ == '__main__': app.run(debug=True, port=5001) # 使用5005端口避开常用端口