Warhummer/app_sqlite.py
2025-06-25 09:35:26 +08:00

1816 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys, os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from flask import Flask, render_template, request, redirect, url_for, jsonify, session
import json
import os
import uuid
import datetime
import time
import shutil
import logging
import threading
import hashlib
import random
from werkzeug.security import generate_password_hash, check_password_hash
import uuid
import datetime
import os
import json
import logging
import datetime
import os
import json
from functools import wraps
# 配置日志
logging.basicConfig(
filename='game_app.log',
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('game_app')
app = Flask(__name__)
app.secret_key = os.urandom(24) # 用于会话加密
# 数据文件路径
DATA_DIR = 'data'
BACKUP_DIR = os.path.join(DATA_DIR, 'backup')
USERS_FILE = os.path.join(DATA_DIR, 'users.json')
GAME_STATES_FILE = os.path.join(DATA_DIR, 'game_states.json')
CARDS_FILE = os.path.join(DATA_DIR, 'cards.json')
SESSIONS_FILE = os.path.join(DATA_DIR, 'sessions.json') # 会话文件
DEATH_METHODS_FILE = os.path.join(DATA_DIR, 'death_methods.json') # 死亡方式文件
ACHIEVEMENTS_FILE = os.path.join(DATA_DIR, 'achievements.json') # 成就系统文件
# 管理员账号和密码 - 硬编码
ADMIN_USERNAME = "13991190618"
ADMIN_PASSWORD = "@CuiYuJian040618"
# 管理员会话键名
ADMIN_SESSION_KEY = 'admin_logged_in'
# 文件锁,防止并发写入冲突
file_locks = {
USERS_FILE: threading.RLock(),
GAME_STATES_FILE: threading.RLock(),
SESSIONS_FILE: threading.RLock(),
CARDS_FILE: threading.RLock(),
DEATH_METHODS_FILE: threading.RLock(),
ACHIEVEMENTS_FILE: threading.RLock()
}
# 确保数据目录存在
def ensure_dirs_exist():
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(BACKUP_DIR, exist_ok=True)
# 创建备份文件名
def get_backup_filename(original_file, include_timestamp=False):
base_name = os.path.basename(original_file)
if include_timestamp:
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
return os.path.join(BACKUP_DIR, f"{base_name}.{timestamp}.bak")
else:
return os.path.join(BACKUP_DIR, f"{base_name}.bak")
# 管理备份文件保留最近的N个备份
def manage_backups(file_pattern, max_backups=5):
try:
backup_files = [f for f in os.listdir(BACKUP_DIR) if f.startswith(file_pattern)]
backup_files.sort(reverse=True) # 最新的文件排在前面
# 如果备份文件超过最大数量,删除最旧的
if len(backup_files) > max_backups:
for old_file in backup_files[max_backups:]:
try:
os.remove(os.path.join(BACKUP_DIR, old_file))
logger.info(f"删除旧备份文件: {old_file}")
except Exception as e:
logger.error(f"删除旧备份文件失败: {old_file}, 错误: {str(e)}")
except Exception as e:
logger.error(f"管理备份文件失败, 错误: {str(e)}")
# 安全地保存数据到文件
def safe_save_data(file_path, data, create_backup=True):
# 获取文件锁
lock = file_locks.get(file_path, threading.RLock())
with lock:
# 确保目录存在
ensure_dirs_exist()
# 如果原文件存在且需要备份,先创建备份
if create_backup and os.path.exists(file_path):
try:
# 创建带时间戳的备份文件,每小时最多一个
current_hour = datetime.datetime.now().strftime('%Y%m%d_%H')
backup_marker = hashlib.md5(file_path.encode()).hexdigest()[:8]
backup_file = os.path.join(BACKUP_DIR, f"{os.path.basename(file_path)}.{current_hour}.{backup_marker}.bak")
# 检查是否已经有该小时的备份
if not os.path.exists(backup_file):
shutil.copy2(file_path, backup_file)
logger.info(f"创建备份: {backup_file}")
# 管理旧备份
file_prefix = os.path.basename(file_path)
manage_backups(file_prefix, max_backups=10)
except Exception as e:
logger.error(f"创建备份失败: {str(e)}")
# 先写入临时文件
temp_file = file_path + '.tmp'
try:
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno()) # 确保数据写入磁盘
# 替换原文件
os.replace(temp_file, file_path)
logger.info(f"成功保存数据到: {file_path}")
return True
except Exception as e:
logger.error(f"保存数据到 {file_path} 失败: {str(e)}")
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except:
pass
return False
# 从文件或备份中安全地读取数据
def safe_load_data(file_path, default_value=None):
# 获取文件锁
lock = file_locks.get(file_path, threading.RLock())
with lock:
# 尝试读取原始文件
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.warning(f"读取文件 {file_path} 失败: {str(e)}, 尝试从备份恢复")
# 尝试从最新备份恢复
backup_file = get_backup_filename(file_path)
if os.path.exists(backup_file):
try:
with open(backup_file, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"从备份 {backup_file} 恢复数据成功")
# 恢复成功后,保存回原始文件
safe_save_data(file_path, data, create_backup=False)
return data
except Exception as backup_error:
logger.error(f"从备份恢复失败: {str(backup_error)}")
# 如果指定了默认值,则返回默认值
if default_value is not None:
return default_value
# 否则,根据文件类型返回适当的空结构
if file_path == USERS_FILE:
return []
elif file_path == GAME_STATES_FILE:
return []
elif file_path == SESSIONS_FILE:
return {"active_sessions": []}
elif file_path == DEATH_METHODS_FILE:
return {"death_scenarios": []}
elif file_path == ACHIEVEMENTS_FILE:
return {"achievements": []}
else:
return {}
# 初始化数据文件
def init_data_files():
ensure_dirs_exist()
# 初始化用户文件
if not os.path.exists(USERS_FILE):
safe_save_data(USERS_FILE, [], create_backup=False)
# 初始化游戏状态文件
if not os.path.exists(GAME_STATES_FILE):
safe_save_data(GAME_STATES_FILE, [], create_backup=False)
# 初始化会话文件
if not os.path.exists(SESSIONS_FILE):
safe_save_data(SESSIONS_FILE, {"active_sessions": []}, create_backup=False)
# 初始化死亡方式文件
if not os.path.exists(DEATH_METHODS_FILE):
safe_save_data(DEATH_METHODS_FILE, {"death_scenarios": []}, create_backup=False)
# 初始化成就文件
if not os.path.exists(ACHIEVEMENTS_FILE):
safe_save_data(ACHIEVEMENTS_FILE, {"achievements": []}, create_backup=False)
ensure_character_weights()
def ensure_character_weights():
"""确保所有角色都有权重字段没有的话添加默认权重5"""
try:
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 记录是否有修改
has_changes = False
# 检查每个角色
for character in characters:
if 'weight' not in character:
# 添加默认权重
character['weight'] = 100
has_changes = True
# 如果有修改,保存回文件
if has_changes:
save_cards(cards_data)
logger.info("已为所有角色添加默认权重字段")
except Exception as e:
logger.error(f"确保角色权重时出错: {str(e)}")
# 读取用户数据
def get_users():
return safe_load_data(USERS_FILE, [])
# 保存用户数据
def save_users(users):
return safe_save_data(USERS_FILE, users)
# 读取游戏状态数据
def get_game_states():
return safe_load_data(GAME_STATES_FILE, [])
# 保存游戏状态数据
def save_game_states(states):
return safe_save_data(GAME_STATES_FILE, states)
# 读取卡牌数据
def get_cards():
default_cards = {
"characters": [],
"gameSettings": {
"initialValues": {
"loyalty": 50, "chaos": 50, "population": 50, "military": 50, "resources": 50
},
"maxValues": {
"loyalty": 100, "chaos": 100, "population": 100, "military": 100, "resources": 100
},
"minValues": {
"loyalty": 0, "chaos": 0, "population": 0, "military": 0, "resources": 0
},
"statusIcons": {
"loyalty": "🦅", "chaos": "🌀", "population": "👥", "military": "🔫", "resources": "⚙️"
}
}
}
return safe_load_data(CARDS_FILE, default_cards)
# 保存卡牌数据
def save_cards(cards_data):
return safe_save_data(CARDS_FILE, cards_data)
# 读取会话数据
def get_sessions():
return safe_load_data(SESSIONS_FILE, {"active_sessions": []})
# 保存会话数据
def save_sessions(sessions):
return safe_save_data(SESSIONS_FILE, sessions)
# 读取死亡方式数据
def get_death_methods():
return safe_load_data(DEATH_METHODS_FILE, {"death_scenarios": []})
# 保存死亡方式数据
def save_death_methods(data):
return safe_save_data(DEATH_METHODS_FILE, data)
# 读取成就数据
def get_achievements_data():
return safe_load_data(ACHIEVEMENTS_FILE, {"achievements": []})
# 保存成就数据
def save_achievements_data(data):
return safe_save_data(ACHIEVEMENTS_FILE, data)
# 更新会话
def update_session(user_id, status=None):
sessions = get_sessions()
active_sessions = sessions.get("active_sessions", [])
current_time = datetime.datetime.utcnow().isoformat()
# 查找当前用户会话
session_found = False
for session_data in active_sessions:
if session_data.get("user_id") == user_id:
# 更新现有会话
session_data["last_activity"] = current_time
if status:
session_data["status"] = status
session_found = True
break
# 保存更新后的会话数据
if session_found:
sessions["active_sessions"] = active_sessions
save_sessions(sessions)
return True
return False
# 添加会话
def add_session(user_id, ip_address):
sessions = get_sessions()
active_sessions = sessions.get("active_sessions", [])
# 检查是否已存在该用户的会话
for session_data in active_sessions:
if session_data.get("user_id") == user_id:
# 更新现有会话
session_data["login_time"] = datetime.datetime.utcnow().isoformat()
session_data["last_activity"] = datetime.datetime.utcnow().isoformat()
session_data["ip_address"] = ip_address
session_data["status"] = "idle"
sessions["active_sessions"] = active_sessions
save_sessions(sessions)
return True
# 创建新会话
new_session = {
"user_id": user_id,
"login_time": datetime.datetime.utcnow().isoformat(),
"last_activity": datetime.datetime.utcnow().isoformat(),
"ip_address": request.remote_addr,
"status": "idle"
}
active_sessions.append(new_session)
sessions["active_sessions"] = active_sessions
save_sessions(sessions)
return True
# 移除会话
def remove_session(user_id):
sessions = get_sessions()
active_sessions = sessions.get("active_sessions", [])
# 过滤掉指定用户的会话
original_count = len(active_sessions)
active_sessions = [s for s in active_sessions if s.get("user_id") != user_id]
if len(active_sessions) < original_count:
sessions["active_sessions"] = active_sessions
save_sessions(sessions)
return True
return False
# 清理过期会话 - 超过2小时未活动的会话
def clean_expired_sessions():
try:
sessions = get_sessions()
active_sessions = sessions.get("active_sessions", [])
current_time = datetime.datetime.utcnow()
# 只过滤掉超过2小时未活动的会话
original_count = len(active_sessions)
filtered_sessions = []
for session in active_sessions:
try:
last_activity = datetime.datetime.fromisoformat(session.get("last_activity", ""))
time_diff = (current_time - last_activity).total_seconds() / 60 # 小时差
if time_diff < 5: # 2小时超时
filtered_sessions.append(session)
except (ValueError, TypeError) as e:
logger.warning(f"处理会话时间格式错误: {str(e)}")
# 如果时间格式错误,保留该会话而不是直接删除
filtered_sessions.append(session)
# 只有在真正有会话被删除时才更新数据
if len(filtered_sessions) != original_count:
logger.info(f"清理过期会话: 清理前{original_count}个,清理后{len(filtered_sessions)}")
sessions["active_sessions"] = filtered_sessions
save_sessions(sessions)
return True
except Exception as e:
logger.error(f"清理过期会话失败: {str(e)}")
return False
# 首页,显示排行榜
@app.route('/')
def index():
if 'user_id' not in session:
return redirect(url_for('login'))
# 获取所有用户数据,按照最高分排序
users = get_users()
leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True)
# 更新用户活动状态
update_session(session.get('user_id'))
return render_template('index.html', leaderboard=leaderboard, current_user=session.get('username'))
# 登录页面
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
users = get_users()
user = next((u for u in users if u['username'] == username), None)
if user and check_password_hash(user['password'], password):
session['user_id'] = user['id']
session['username'] = user['username']
# 添加会话记录
add_session(user['id'], request.remote_addr)
return redirect(url_for('index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 注册页面
@app.route('/register', methods=['GET', 'POST'])
def register():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
users = get_users()
# 检查用户名是否已存在
if any(u['username'] == username for u in users):
error = '用户名已存在'
else:
# 创建新用户
new_user = {
'id': str(uuid.uuid4()),
'username': username,
'password': generate_password_hash(password),
'created_at': datetime.datetime.utcnow().isoformat(),
'high_score': 0,
'total_games': 0,
'last_year': 41000 # 添加最后游戏年份字段默认为41000
}
users.append(new_user)
save_users(users)
# 自动登录
session['user_id'] = new_user['id']
session['username'] = new_user['username']
# 添加会话记录
add_session(new_user['id'], request.remote_addr)
return redirect(url_for('index'))
return render_template('register.html', error=error)
# 注销
@app.route('/logout')
def logout():
if 'user_id' in session:
# 移除会话记录
remove_session(session['user_id'])
# 清除会话
session.pop('user_id', None)
session.pop('username', None)
return redirect(url_for('login'))
# 游戏页面
@app.route('/game')
def game():
if 'user_id' not in session:
return redirect(url_for('login'))
# 获取游戏数据
cards_data = get_cards()
# 获取用户最后游戏年份
users = get_users()
user = next((u for u in users if u['id'] == session['user_id']), None)
last_year = user.get('last_year', 41000) if user else 41000
# 添加最后年份到数据中
game_data = {
'cards': cards_data,
'events': cards_data.get('events', []),
'statuses': cards_data.get('statuses', []),
'lastYear': last_year
}
# 更新用户状态为游戏中
sessions = get_sessions()
active_sessions = sessions.get("active_sessions", [])
for session_data in active_sessions:
if session_data.get("user_id") == session['user_id']:
session_data["status"] = "playing"
session_data["game_start_time"] = datetime.datetime.utcnow().isoformat()
break
sessions["active_sessions"] = active_sessions
save_sessions(sessions)
return render_template('game.html',
current_user=session.get('username'),
game_data_json=json.dumps(game_data))
# API: 获取卡牌数据
@app.route('/api/cards', methods=['GET'])
def api_get_cards():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
cards_data = get_cards()
return jsonify(cards_data)
# API: 保存游戏状态
@app.route('/api/save_game', methods=['POST'])
def api_save_game():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
game_data = request.json
# 添加调试日志
print(f"保存用户 {user_id} 的游戏状态")
# 更新用户的最后游戏年份
if 'year' in game_data:
users = get_users()
user = next((u for u in users if u['id'] == user_id), None)
if user:
user['last_year'] = game_data['year']
user['last_game_time'] = datetime.datetime.utcnow().isoformat()
save_users(users)
# 获取现有游戏状态
game_states = get_game_states()
# 查找用户的游戏状态
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
if user_state:
# 更新现有状态,但保留成就数据
print(f"更新用户现有游戏状态,保留成就数据")
# 保存之前的成就数据
achievements_data = []
if 'game_data' in user_state and 'achievements' in user_state['game_data']:
achievements_data = user_state['game_data']['achievements']
print(f"保留现有成就数据,共 {len(achievements_data)}")
# 更新游戏数据
user_state['game_data'] = game_data
# 恢复成就数据
if achievements_data:
user_state['game_data']['achievements'] = achievements_data
user_state['updated_at'] = datetime.datetime.utcnow().isoformat()
else:
# 创建新状态
print(f"用户没有现有游戏状态,创建新状态")
game_states.append({
'id': str(uuid.uuid4()),
'user_id': user_id,
'game_data': game_data,
'created_at': datetime.datetime.utcnow().isoformat(),
'updated_at': datetime.datetime.utcnow().isoformat()
})
save_game_states(game_states)
print(f"游戏状态保存完成")
# 更新会话状态
update_session(user_id, "playing")
return jsonify({'success': True})
# API: 加载游戏状态
@app.route('/api/load_game', methods=['GET'])
def api_load_game():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
# 获取所有游戏状态
game_states = get_game_states()
# 查找用户的游戏状态
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
# 获取用户的最后游戏年份
users = get_users()
user = next((u for u in users if u['id'] == user_id), None)
last_year = user.get('last_year', 41000) if user else 41000
if user_state:
# 确保游戏状态中包含最新的年份
game_data = user_state['game_data']
if isinstance(game_data, dict) and 'year' not in game_data:
game_data['year'] = last_year
return jsonify({'game_data': game_data, 'lastYear': last_year})
else:
return jsonify({'game_data': None, 'lastYear': last_year})
# API: 获取排行榜数据
@app.route('/api/leaderboard', methods=['GET'])
def api_leaderboard():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
# 获取所有用户,按高分排序
users = get_users()
leaderboard = sorted(users, key=lambda x: x.get('high_score', 0), reverse=True)
leaderboard = leaderboard[:100] # 添加这一行限制为前100名
# 只返回需要的字段 (不再限制为前10名)
leaderboard_data = [
{
'username': user['username'],
'high_score': user.get('high_score', 0),
'total_games': user.get('total_games', 0),
'last_year': user.get('last_year', 41000)
}
for user in leaderboard # 移除了[:10]限制
]
return jsonify(leaderboard_data)
# 每小时自动清理过期会话的定时任务
def scheduled_cleanup():
while True:
try:
# 每小时清理一次过期会话
clean_expired_sessions()
# 休眠1小时
time.sleep(3600)
except Exception as e:
logger.error(f"定时清理任务失败: {str(e)}")
# 出错后仍然继续
time.sleep(60)
# 自制卡牌数据文件路径
CUSTOM_CARDS_FILE = os.path.join(DATA_DIR, 'custom_cards.json')
CARD_VOTES_FILE = os.path.join(DATA_DIR, 'card_votes.json')
# 读取自制卡牌数据
def get_custom_cards():
return safe_load_data(CUSTOM_CARDS_FILE, {"cards": []})
# 保存自制卡牌数据
def save_custom_cards(data):
return safe_save_data(CUSTOM_CARDS_FILE, data)
# 读取卡牌投票数据
def get_card_votes():
return safe_load_data(CARD_VOTES_FILE, {"user_votes": {}, "card_votes": {}})
# 保存卡牌投票数据
def save_card_votes(data):
return safe_save_data(CARD_VOTES_FILE, data)
# 自制卡牌页面路由
@app.route('/custom_cards')
def custom_cards():
if 'user_id' not in session:
return redirect(url_for('login'))
return render_template('custom_cards.html')
# 创建卡牌页面路由
@app.route('/create_card')
def create_card():
if 'user_id' not in session:
return redirect(url_for('login'))
return render_template('create_card.html')
# API: 获取当前用户ID
@app.route('/api/current_user', methods=['GET'])
def api_current_user():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
return jsonify({
'user_id': session['user_id'],
'username': session.get('username', '')
})
# API: 获取用户投票历史
@app.route('/api/user_votes', methods=['GET'])
def api_user_votes():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
votes_data = get_card_votes()
# 获取用户的投票记录
user_votes = votes_data.get('user_votes', {}).get(user_id, {})
return jsonify({
'votes': user_votes
})
# API: 获取自制卡牌列表
@app.route('/api/custom_cards', methods=['GET'])
def api_custom_cards():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
# 获取卡牌数据
cards_data = get_custom_cards()
cards = cards_data.get('cards', [])
# 获取投票数据
votes_data = get_card_votes()
card_votes = votes_data.get('card_votes', {})
# 给每个卡片添加投票信息
for card in cards:
card_id = card['id']
if card_id in card_votes:
card['upvotes'] = card_votes[card_id].get('upvotes', 0)
card['downvotes'] = card_votes[card_id].get('downvotes', 0)
else:
card['upvotes'] = 0
card['downvotes'] = 0
return jsonify({
'cards': cards
})
# API: 创建自制卡牌
@app.route('/api/create_custom_card', methods=['POST'])
def api_create_custom_card():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
username = session.get('username', '未知用户')
card_data = request.json
# 验证必要字段
required_fields = ['title', 'character', 'description', 'option_a', 'option_b']
for field in required_fields:
if field not in card_data:
return jsonify({'error': f'缺少必要字段: {field}'}), 400
# 验证字符长度
if len(card_data['title']) > 50:
return jsonify({'error': '卡牌标题不能超过50个字符'}), 400
if len(card_data['description']) > 200:
return jsonify({'error': '卡牌描述不能超过200个字符'}), 400
if len(card_data['option_a']['text']) > 50:
return jsonify({'error': '选项A文本不能超过50个字符'}), 400
if len(card_data['option_b']['text']) > 50:
return jsonify({'error': '选项B文本不能超过50个字符'}), 400
# 验证角色字段
character_fields = ['name', 'title', 'avatar']
for field in character_fields:
if field not in card_data['character']:
return jsonify({'error': f'缺少角色字段: {field}'}), 400
if len(card_data['character']['name']) > 30:
return jsonify({'error': '角色名称不能超过30个字符'}), 400
if len(card_data['character']['title']) > 30:
return jsonify({'error': '角色头衔不能超过30个字符'}), 400
if len(card_data['character']['avatar']) > 5:
return jsonify({'error': '角色图标不能超过5个字符'}), 400
# 验证效果值
option_a_effects = card_data['option_a']['effects']
option_b_effects = card_data['option_b']['effects']
# 验证效果值范围
for effect_name, effect_value in option_a_effects.items():
if abs(effect_value) > 25:
return jsonify({'error': f'选项A的{effect_name}效果值不能超过±25'}), 400
for effect_name, effect_value in option_b_effects.items():
if abs(effect_value) > 25:
return jsonify({'error': f'选项B的{effect_name}效果值不能超过±25'}), 400
# 验证效果值总和
option_a_total = sum(abs(v) for v in option_a_effects.values())
option_b_total = sum(abs(v) for v in option_b_effects.values())
if option_a_total > 80:
return jsonify({'error': '选项A的效果总和不能超过80'}), 400
if option_b_total > 80:
return jsonify({'error': '选项B的效果总和不能超过80'}), 400
# 获取现有卡牌数据
cards_data = get_custom_cards()
cards = cards_data.get('cards', [])
# 创建新卡牌
new_card = {
'id': str(uuid.uuid4()),
'title': card_data['title'],
'character': card_data['character'],
'description': card_data['description'],
'option_a': card_data['option_a'],
'option_b': card_data['option_b'],
'creator_id': user_id,
'creator_name': username,
'created_at': datetime.datetime.utcnow().isoformat()
}
# 添加到卡牌列表
cards.append(new_card)
cards_data['cards'] = cards
# 保存卡牌数据
save_custom_cards(cards_data)
# 获取投票数据
votes_data = get_card_votes()
card_votes = votes_data.get('card_votes', {})
# 初始化卡牌投票数据
card_votes[new_card['id']] = {
'upvotes': 0,
'downvotes': 0,
'users': {}
}
votes_data['card_votes'] = card_votes
save_card_votes(votes_data)
return jsonify({
'success': True,
'card': new_card
})
# API: 投票自制卡牌
@app.route('/api/vote_card', methods=['POST'])
def api_vote_card():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
request_data = request.json
# 验证参数
if 'card_id' not in request_data or 'vote_type' not in request_data:
return jsonify({'error': '缺少必要参数'}), 400
card_id = request_data['card_id']
vote_type = request_data['vote_type']
# 验证投票类型
if vote_type not in ['upvote', 'downvote', 'none']:
return jsonify({'error': '无效的投票类型'}), 400
# 获取投票数据
votes_data = get_card_votes()
user_votes = votes_data.get('user_votes', {})
card_votes = votes_data.get('card_votes', {})
# 初始化用户投票数据
if user_id not in user_votes:
user_votes[user_id] = {}
# 获取卡牌数据
cards_data = get_custom_cards()
cards = cards_data.get('cards', [])
# 查找卡牌
card = next((c for c in cards if c['id'] == card_id), None)
if not card:
return jsonify({'error': '卡牌不存在'}), 404
# 初始化卡牌投票数据
if card_id not in card_votes:
card_votes[card_id] = {
'upvotes': 0,
'downvotes': 0,
'users': {}
}
# 获取用户之前的投票
previous_vote = user_votes.get(user_id, {}).get(card_id)
# 处理投票逻辑
if vote_type == 'none' or (previous_vote == vote_type):
# 取消投票
if previous_vote:
if previous_vote == 'upvote':
card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1)
else:
card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1)
# 从用户投票中移除
if card_id in user_votes[user_id]:
del user_votes[user_id][card_id]
# 从卡牌用户列表中移除
if user_id in card_votes[card_id]['users']:
del card_votes[card_id]['users'][user_id]
new_vote = None
else:
# 如果之前投过票,先取消之前的投票
if previous_vote:
if previous_vote == 'upvote':
card_votes[card_id]['upvotes'] = max(0, card_votes[card_id]['upvotes'] - 1)
else:
card_votes[card_id]['downvotes'] = max(0, card_votes[card_id]['downvotes'] - 1)
# 添加新投票
if vote_type == 'upvote':
card_votes[card_id]['upvotes'] += 1
else:
card_votes[card_id]['downvotes'] += 1
# 更新用户投票
user_votes[user_id][card_id] = vote_type
card_votes[card_id]['users'][user_id] = vote_type
new_vote = vote_type
# 保存投票数据
votes_data['user_votes'] = user_votes
votes_data['card_votes'] = card_votes
save_card_votes(votes_data)
return jsonify({
'success': True,
'upvotes': card_votes[card_id]['upvotes'],
'downvotes': card_votes[card_id]['downvotes'],
'newVote': new_vote
})
# 管理员API获取自制卡牌列表
@app.route('/api/admin/custom_cards', methods=['GET'])
def api_admin_custom_cards():
# 获取卡牌数据
cards_data = get_custom_cards()
cards = cards_data.get('cards', [])
# 获取投票数据
votes_data = get_card_votes()
card_votes = votes_data.get('card_votes', {})
# 给每个卡片添加投票信息
for card in cards:
card_id = card['id']
if card_id in card_votes:
card['upvotes'] = card_votes[card_id].get('upvotes', 0)
card['downvotes'] = card_votes[card_id].get('downvotes', 0)
else:
card['upvotes'] = 0
card['downvotes'] = 0
return jsonify({
'cards': cards
})
# 管理员API删除自制卡牌
@app.route('/api/admin/delete_card/<card_id>', methods=['DELETE'])
def api_admin_delete_card(card_id):
# 获取卡牌数据
cards_data = get_custom_cards()
cards = cards_data.get('cards', [])
# 查找并删除卡牌
initial_count = len(cards)
cards = [c for c in cards if c['id'] != card_id]
if len(cards) < initial_count:
# 更新卡牌数据
cards_data['cards'] = cards
save_custom_cards(cards_data)
# 也从投票数据中删除
votes_data = get_card_votes()
card_votes = votes_data.get('card_votes', {})
user_votes = votes_data.get('user_votes', {})
# 从卡牌投票中删除
if card_id in card_votes:
del card_votes[card_id]
# 从用户投票中删除
for user_id in user_votes:
if card_id in user_votes[user_id]:
del user_votes[user_id][card_id]
votes_data['card_votes'] = card_votes
votes_data['user_votes'] = user_votes
save_card_votes(votes_data)
return jsonify({'success': True})
else:
return jsonify({'error': '卡牌不存在'}), 404
# 添加到现有管理员页面
@app.route('/admin/custom_cards')
def admin_custom_cards():
return render_template('admin/custom_cards.html')
# 成就页面路由
@app.route('/achievements')
def achievements_page():
if 'user_id' not in session:
return redirect(url_for('login'))
return render_template('achievements.html', current_user=session.get('username'))
# API: 获取随机死亡场景
# 修改API路由也添加更多日志输出
@app.route('/api/death_scenario/<death_type>', methods=['GET'])
def api_death_scenario(death_type):
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
print(f"用户 {user_id} 请求死亡场景: {death_type}")
# 验证死亡类型
valid_types = [
'loyalty_low', 'loyalty_high',
'chaos_low', 'chaos_high',
'population_low', 'population_high',
'military_low', 'military_high',
'resources_low', 'resources_high'
]
if death_type not in valid_types:
return jsonify({'error': '无效的死亡类型'}), 400
# 获取死亡场景数据
scenarios_data = get_death_methods()
# 筛选指定类型的场景
type_scenarios = [s for s in scenarios_data.get('death_scenarios', []) if s.get('death_type') == death_type]
if not type_scenarios:
return jsonify({'error': '未找到符合条件的死亡场景'}), 404
# 计算权重总和
total_weight = sum(s.get('weight', 100) for s in type_scenarios)
# 随机选择一个场景,考虑权重
rand_val = random.randint(1, total_weight)
# 根据权重选择
current_weight = 0
selected_scenario = None
for scenario in type_scenarios:
current_weight += scenario.get('weight', 100)
if rand_val <= current_weight:
selected_scenario = scenario
break
# 如果没有选中,就选第一个(理论上不应该发生)
if not selected_scenario and type_scenarios:
selected_scenario = type_scenarios[0]
if not selected_scenario:
return jsonify({'error': '未找到符合条件的死亡场景'}), 500
print(f"选中死亡场景: {selected_scenario['id']}")
# 解锁对应成就
unlock_result = unlock_achievement_for_death(user_id, selected_scenario['id'])
print(f"成就解锁结果: {unlock_result}")
return jsonify(selected_scenario)
def unlock_achievement_for_death(user_id, scenario_id):
# 添加调试日志
print(f"尝试为用户 {user_id} 解锁死亡场景 {scenario_id} 的成就")
# 获取成就数据
achievements_data = get_achievements_data()
# 正确构建成就ID - 死亡场景ID前加"death_"前缀
achievement_id = f"death_{scenario_id}"
# 查找匹配的成就
achievement = None
for ach in achievements_data.get('achievements', []):
if ach.get('id') == achievement_id or ach.get('death_scenario_id') == scenario_id:
achievement = ach
break
if not achievement:
print(f"没有找到与死亡场景 {scenario_id} 对应的成就")
return False
print(f"找到对应成就: {achievement['id']} - {achievement['name']}")
# 检查是否已解锁
user_achievements = get_user_achievements_data(user_id)
achievement_already_unlocked = False
for ua in user_achievements:
if isinstance(ua, dict) and ua.get('id') == achievement['id']:
achievement_already_unlocked = True
break
if achievement_already_unlocked:
print(f"成就 {achievement['id']} 已经解锁")
return True
# 解锁成就
result = add_user_achievement(user_id, achievement['id'])
print(f"成就解锁结果: {result}")
return result
# 修改 get_user_achievements_data 函数
def get_user_achievements_data(user_id):
# 添加调试日志
print(f"获取用户 {user_id} 的成就数据")
# 用户成就不存在单独的JSON文件而是存储在游戏状态中
game_states = get_game_states()
# 查找用户的游戏状态
user_state = next((s for s in game_states if s['user_id'] == user_id), None)
if user_state and 'game_data' in user_state:
print(f"找到用户游戏状态")
if 'achievements' in user_state['game_data']:
achievements = user_state['game_data']['achievements']
print(f"找到用户成就数据,共 {len(achievements)}")
return achievements
else:
print(f"用户游戏状态中没有成就数据")
return []
else:
print(f"未找到用户 {user_id} 的游戏状态")
return []
# 添加用户成就 - 修复版本
def add_user_achievement(user_id, achievement_id):
# 添加调试日志
print(f"添加成就 {achievement_id} 到用户 {user_id}")
# 获取游戏状态
game_states = get_game_states()
# 查找用户的游戏状态
user_state_index = None
for i, state in enumerate(game_states):
if state.get('user_id') == user_id:
user_state_index = i
break
if user_state_index is None:
# 用户没有游戏状态,创建一个新的
print(f"用户 {user_id} 没有游戏状态,创建新的")
new_state = {
'id': str(uuid.uuid4()),
'user_id': user_id,
'game_data': {
'achievements': [
{
'id': achievement_id,
'unlock_time': datetime.datetime.utcnow().isoformat()
}
]
},
'created_at': datetime.datetime.utcnow().isoformat(),
'updated_at': datetime.datetime.utcnow().isoformat()
}
game_states.append(new_state)
print(f"已创建新游戏状态和成就记录")
else:
# 更新现有游戏状态
user_state = game_states[user_state_index]
print(f"找到用户现有游戏状态")
# 确保游戏数据和成就列表存在
if 'game_data' not in user_state:
print("游戏状态中没有game_data字段创建")
user_state['game_data'] = {}
if 'achievements' not in user_state['game_data']:
print("game_data中没有achievements字段创建")
user_state['game_data']['achievements'] = []
# 检查成就是否已存在
achievement_exists = False
for ach in user_state['game_data']['achievements']:
if isinstance(ach, dict) and ach.get('id') == achievement_id:
achievement_exists = True
break
if not achievement_exists:
# 添加新成就
print(f"添加新成就 {achievement_id}")
user_state['game_data']['achievements'].append({
'id': achievement_id,
'unlock_time': datetime.datetime.utcnow().isoformat()
})
else:
print(f"成就 {achievement_id} 已经存在,不重复添加")
# 更新时间戳
user_state['updated_at'] = datetime.datetime.utcnow().isoformat()
# 更新游戏状态列表
game_states[user_state_index] = user_state
# 保存游戏状态
result = save_game_states(game_states)
print(f"保存游戏状态结果: {result}")
return result
# API: 获取成就列表
@app.route('/api/achievements', methods=['GET'])
def api_achievements():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
# 获取成就数据
achievements_data = get_achievements_data()
user_achievements = get_user_achievements_data(session['user_id'])
# 准备返回数据
achievements = []
for achievement in achievements_data.get('achievements', []):
# 检查是否解锁
unlock_info = next((a for a in user_achievements if a.get('id') == achievement['id']), None)
if unlock_info:
# 已解锁
achievements.append({
'id': achievement['id'],
'name': achievement['name'],
'description': achievement['description'],
'icon': achievement['icon'],
'unlocked': True,
'unlock_time': unlock_info.get('unlock_time')
})
else:
# 未解锁
hidden_info = {
'id': achievement['id'],
'name': achievement['name'] if not achievement.get('hidden') else "???",
'description': "???" if achievement.get('hidden') else achievement['description'],
'icon': achievement['icon'],
'unlocked': False
}
achievements.append(hidden_info)
return jsonify({
'achievements': achievements,
'total': len(achievements_data.get('achievements', [])),
'unlocked': len(user_achievements)
})
# 这段代码应该放在app_sqlite.py的导入部分之后但在使用装饰器之前
# 请找到适当位置添加,或确保这个装饰器已经在文件中定义
# 导入functools.wraps用于装饰器
from functools import wraps
# 管理员会话保护装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if ADMIN_SESSION_KEY not in session:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
# API: 获取死亡场景列表(管理员)
@app.route('/api/death_scenarios', methods=['GET'])
@admin_required
def api_death_scenarios():
death_methods = get_death_methods()
return jsonify(death_methods)
@app.after_request
def add_cache_headers(response):
# 只对静态文件设置缓存
if request.path.startswith('/static/'):
# 设置一天的缓存时间
response.headers['Cache-Control'] = 'public, max-age=43200'
return response
# 管理员会话保护装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if ADMIN_SESSION_KEY not in session:
return redirect(url_for('admin_login'))
return f(*args, **kwargs)
return decorated_function
# 管理员登录页面
@app.route('/admin', methods=['GET', 'POST'])
def admin_login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
session[ADMIN_SESSION_KEY] = True
return redirect(url_for('admin_index'))
else:
error = '账号或密码错误'
return render_template('admin/login.html', error=error)
# 管理员主页
@app.route('/admin/index')
@admin_required
def admin_index():
return render_template('admin/index.html')
# 管理员登出
@app.route('/admin/logout')
def admin_logout():
session.pop(ADMIN_SESSION_KEY, None)
return redirect(url_for('admin_login'))
# 角色管理页面
@app.route('/admin/characters')
@admin_required
def admin_characters():
return render_template('admin/characters.html')
# 数据面板页面
@app.route('/admin/dashboard')
@admin_required
def admin_dashboard():
return render_template('admin/dashboard.html')
@app.route('/admin/weight_management')
@admin_required
def admin_weight_management():
return render_template('weight_management.html')
# API: 获取所有角色
@app.route('/api/admin/characters', methods=['GET'])
@admin_required
def api_admin_characters():
cards_data = get_cards()
characters = cards_data.get('characters', [])
return jsonify({'characters': characters})
# API: 获取特定角色
@app.route('/api/admin/character/<character_id>', methods=['GET'])
@admin_required
def api_admin_character(character_id):
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 搜索角色
character = next((c for c in characters if str(c['id']) == character_id), None)
if character:
return jsonify({'character': character})
else:
return jsonify({'error': '角色不存在'}), 404
# API: 添加或更新角色
@app.route('/api/admin/character', methods=['POST'])
@admin_required
def api_admin_save_character():
new_character = request.json
# 验证必填字段
required_fields = ['id', 'name', 'title', 'avatar', 'type']
for field in required_fields:
if field not in new_character:
return jsonify({'error': f'缺少必填字段: {field}'}), 400
# 获取现有卡牌数据
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 检查是更新还是新增
existing_index = -1
for i, char in enumerate(characters):
if str(char['id']) == str(new_character['id']):
existing_index = i
break
# 确保events字段存在
if 'events' not in new_character:
new_character['events'] = []
if existing_index >= 0:
# 更新现有角色
characters[existing_index] = new_character
else:
# 添加新角色
characters.append(new_character)
# 更新cards_data
cards_data['characters'] = characters
# 保存到文件
save_cards(cards_data)
return jsonify({'success': True, 'character': new_character})
# API: 删除角色
@app.route('/api/admin/character/<character_id>', methods=['DELETE'])
@admin_required
def api_admin_delete_character(character_id):
# 获取现有卡牌数据
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 查找并删除角色
original_count = len(characters)
characters = [c for c in characters if str(c['id']) != character_id]
if len(characters) < original_count:
# 角色被删除
cards_data['characters'] = characters
save_cards(cards_data)
return jsonify({'success': True})
else:
return jsonify({'error': '角色不存在'}), 404
# API: 添加事件到角色
@app.route('/api/admin/character/<character_id>/event', methods=['POST'])
@admin_required
def api_admin_add_character_event(character_id):
new_event = request.json
# 验证必填字段
required_fields = ['id', 'text', 'optionA', 'optionB']
for field in required_fields:
if field not in new_event:
return jsonify({'error': f'缺少必填字段: {field}'}), 400
# 验证选项A和B的必填字段
for option in ['optionA', 'optionB']:
if 'text' not in new_event[option] or 'effects' not in new_event[option]:
return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400
# 获取现有卡牌数据
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 查找角色
for character in characters:
if str(character['id']) == character_id:
# 确保角色有events字段
if 'events' not in character:
character['events'] = []
# 检查是否已存在该ID的事件
event_exists = any(e['id'] == new_event['id'] for e in character['events'])
if event_exists:
return jsonify({'error': f'事件ID已存在: {new_event["id"]}'}), 400
# 添加新事件
character['events'].append(new_event)
# 保存到文件
save_cards(cards_data)
return jsonify({'success': True, 'event': new_event})
return jsonify({'error': '角色不存在'}), 404
# API: 更新角色的事件
@app.route('/api/admin/character/<character_id>/event/<event_id>', methods=['PUT'])
@admin_required
def api_admin_update_character_event(character_id, event_id):
updated_event = request.json
# 验证必填字段
required_fields = ['id', 'text', 'optionA', 'optionB']
for field in required_fields:
if field not in updated_event:
return jsonify({'error': f'缺少必填字段: {field}'}), 400
# 验证选项A和B的必填字段
for option in ['optionA', 'optionB']:
if 'text' not in updated_event[option] or 'effects' not in updated_event[option]:
return jsonify({'error': f'选项{option[-1]}缺少必填字段'}), 400
# 获取现有卡牌数据
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 查找角色和事件
for character in characters:
if str(character['id']) == character_id and 'events' in character:
for i, event in enumerate(character['events']):
if str(event['id']) == event_id:
# 更新事件
character['events'][i] = updated_event
# 保存到文件
save_cards(cards_data)
return jsonify({'success': True, 'event': updated_event})
return jsonify({'error': '角色或事件不存在'}), 404
# API: 删除角色的事件
@app.route('/api/admin/character/<character_id>/event/<event_id>', methods=['DELETE'])
@admin_required
def api_admin_delete_character_event(character_id, event_id):
# 获取现有卡牌数据
cards_data = get_cards()
characters = cards_data.get('characters', [])
# 查找角色和事件
for character in characters:
if str(character['id']) == character_id and 'events' in character:
original_count = len(character['events'])
character['events'] = [e for e in character['events'] if str(e['id']) != event_id]
if len(character['events']) < original_count:
# 事件被删除
save_cards(cards_data)
return jsonify({'success': True})
return jsonify({'error': '角色或事件不存在'}), 404
# 完全重写 api_admin_stats 函数
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
def api_admin_stats():
# 获取用户数据
users = get_users()
# 获取当前会话数据
sessions = get_sessions()
active_sessions = sessions.get('active_sessions', [])
# 获取当前日期(只取日期部分,不含时间)
now = datetime.datetime.now()
today_date_str = now.strftime('%Y-%m-%d')
logger.info(f"当前日期: {today_date_str}")
# 重写今日新用户计算逻辑
today_users = []
for user in users:
try:
# 获取用户创建日期
created_at_str = user.get('created_at', '')
# 如果日期字符串为空,跳过
if not created_at_str:
continue
# 只取日期部分进行比较(跳过时间部分)
created_date_str = created_at_str.split('T')[0]
if 'T' not in created_at_str:
created_date_str = created_at_str.split(' ')[0]
logger.info(f"用户: {user.get('username')}, 创建日期: {created_date_str}")
# 简单直接的字符串比较
if created_date_str == today_date_str:
today_users.append({
'username': user.get('username', '未知用户'),
'created_at': created_at_str
})
logger.info(f"今日新用户: {user.get('username')}")
except Exception as e:
logger.error(f"处理用户日期时出错: {e}")
continue
# 强制添加测试用户进行验证
test_user = {
'username': '测试用户',
'created_at': datetime.datetime.now().isoformat()
}
today_users.append(test_user)
logger.info(f"添加测试用户: {test_user}")
# 计算在线用户
online_users = []
playing_users = 0
for session_data in active_sessions:
user_id = session_data.get('user_id')
user = next((u for u in users if u['id'] == user_id), None)
if user:
status = session_data.get('status', '未知')
if status == 'playing':
playing_users += 1
online_users.append({
'username': user.get('username', '未知用户'),
'status': status,
'ip_address': session_data.get('ip_address', '未知'),
'last_activity': session_data.get('last_activity', '')
})
# 返回统计数据
stats_data = {
'total_users': len(users),
'online_users_count': len(online_users),
'today_users_count': len(today_users),
'playing_users_count': playing_users,
'online_users': online_users,
'today_users': today_users
}
logger.info(f"返回统计数据: {stats_data}")
return jsonify(stats_data)
# 添加到 app_sqlite.py 中
# 检查单次统治时间成就
def check_single_reign_achievements(user_id, reign_time):
"""检查单次统治时间成就并解锁"""
print(f"检查用户 {user_id} 的单次统治时间成就: {reign_time}")
# 获取成就数据
achievements_data = get_achievements_data()
# 筛选单次统治时间成就
single_reign_achievements = [a for a in achievements_data.get('achievements', [])
if a.get('achievement_type') == 'reign_single']
# 按要求排序
single_reign_achievements.sort(key=lambda x: x.get('requirement', 0))
# 检查每个成就
unlocked_any = False
for achievement in single_reign_achievements:
requirement = achievement.get('requirement', 0)
if reign_time >= requirement:
# 解锁成就
unlocked = unlock_achievement(user_id, achievement['id'])
if unlocked:
print(f"解锁单次统治时间成就: {achievement['name']} (要求: {requirement}年)")
unlocked_any = True
return unlocked_any
# 检查总统治时间成就
def check_total_reign_achievements(user_id, total_reign_time):
"""检查总统治时间成就并解锁"""
print(f"检查用户 {user_id} 的总统治时间成就: {total_reign_time}")
# 获取成就数据
achievements_data = get_achievements_data()
# 筛选总统治时间成就
total_reign_achievements = [a for a in achievements_data.get('achievements', [])
if a.get('achievement_type') == 'reign_total']
# 按要求排序
total_reign_achievements.sort(key=lambda x: x.get('requirement', 0))
# 检查每个成就
unlocked_any = False
for achievement in total_reign_achievements:
requirement = achievement.get('requirement', 0)
if total_reign_time >= requirement:
# 解锁成就
unlocked = unlock_achievement(user_id, achievement['id'])
if unlocked:
print(f"解锁总统治时间成就: {achievement['name']} (要求: {requirement}年)")
unlocked_any = True
return unlocked_any
# 统一解锁成就函数
def unlock_achievement(user_id, achievement_id):
"""解锁成就,并返回是否成功解锁"""
# 获取成就数据
achievements_data = get_achievements_data()
# 查找成就
achievement = next((a for a in achievements_data.get('achievements', [])
if a['id'] == achievement_id), None)
if not achievement:
print(f"成就不存在: {achievement_id}")
return False
# 检查是否已解锁
user_achievements = get_user_achievements_data(user_id)
for user_ach in user_achievements:
if isinstance(user_ach, dict) and user_ach.get('id') == achievement_id:
# 已解锁
print(f"成就 {achievement['name']} 已经解锁过")
return False
# 解锁成就
result = add_user_achievement(user_id, achievement_id)
if result:
print(f"成功解锁成就: {achievement['name']}")
return result
# 修改 API: 更新高分 - 添加成就检查
@app.route('/api/update_score', methods=['POST'])
def api_update_score():
if 'user_id' not in session:
return jsonify({'error': '未授权'}), 401
user_id = session['user_id']
score = request.json.get('score', 0)
year = request.json.get('year', 41000) # 获取当前年份
print(f"更新用户 {user_id} 的分数: {score}, 年份: {year}")
# 获取所有用户
users = get_users()
# 查找当前用户
user = next((u for u in users if u['id'] == user_id), None)
if user:
# 计算总统治时间
old_total_games = user.get('total_games', 0)
old_high_score = user.get('high_score', 0)
# 检查单次统治时间成就
check_single_reign_achievements(user_id, score)
# 只有当新分数更高时才更新
if score > old_high_score:
user['high_score'] = score
# 增加游戏次数
user['total_games'] = old_total_games + 1
# 计算总统治时间
# 方法:总游戏次数 * 平均每次统治时间 (为简化计算,可以估算)
total_reign_time = sum(u.get('high_score', 0) for u in users if u['id'] == user_id)
if 'total_reign_time' not in user:
# 如果没有记录总统治时间,则初始化为当前总计
user['total_reign_time'] = total_reign_time + score
else:
# 否则加上本次统治时间
user['total_reign_time'] = user['total_reign_time'] + score
# 检查总统治时间成就
check_total_reign_achievements(user_id, user['total_reign_time'])
# 更新最后游戏年份
user['last_year'] = year
# 更新最后游戏时间
user['last_game_time'] = datetime.datetime.utcnow().isoformat()
save_users(users)
# 更新会话状态为空闲
update_session(user_id, "idle")
return jsonify({'success': True})
else:
return jsonify({'error': '用户不存在'}), 404
# 初始化应用
init_data_files()
# 启动定时清理线程
cleanup_thread = threading.Thread(target=scheduled_cleanup, daemon=True)
cleanup_thread.start()
# 启动服务器
if __name__ == '__main__':
app.run(debug=True, port=5001) # 使用5005端口避开常用端口