Warhummer/db_adapter.py
2025-06-25 09:35:26 +08:00

725 lines
23 KiB
Python

import sqlite3
import json
import logging
import os
import threading
import datetime
import uuid
import random
# 配置日志
logging.basicConfig(
filename='db_adapter.log',
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('db_adapter')
# 数据库文件路径
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "warhammer.db")
# 线程本地存储,确保每个线程使用独立的数据库连接
_thread_local = threading.local()
def get_db_connection():
"""获取数据库连接(每个线程一个)"""
if not hasattr(_thread_local, 'connection'):
_thread_local.connection = sqlite3.connect(DB_PATH)
# 启用外键约束
_thread_local.connection.execute("PRAGMA foreign_keys = ON")
# 配置连接返回行为字典格式
_thread_local.connection.row_factory = sqlite3.Row
return _thread_local.connection
def close_db_connection():
"""关闭当前线程的数据库连接"""
if hasattr(_thread_local, 'connection'):
_thread_local.connection.close()
delattr(_thread_local, 'connection')
# 用户数据操作
def get_users():
"""获取所有用户数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM users")
users = [dict(row) for row in cursor.fetchall()]
return users
except Exception as e:
logger.error(f"获取用户数据失败: {str(e)}")
return []
def save_users(users):
"""保存用户数据(批量更新)"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
for user in users:
cursor.execute(
"INSERT OR REPLACE INTO users (id, username, password, created_at, high_score, total_games, last_year, last_game_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
user['id'],
user['username'],
user['password'],
user.get('created_at', datetime.datetime.now().isoformat()),
user.get('high_score', 0),
user.get('total_games', 0),
user.get('last_year', 41000),
user.get('last_game_time', None)
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存用户数据失败: {str(e)}")
return False
# 游戏状态操作
def get_game_states():
"""获取所有游戏状态数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM game_states")
rows = cursor.fetchall()
game_states = []
for row in rows:
state = dict(row)
# 将JSON字符串转换为字典
state['game_data'] = json.loads(state['game_data'])
game_states.append(state)
return game_states
except Exception as e:
logger.error(f"获取游戏状态数据失败: {str(e)}")
return []
def save_game_states(states):
"""保存游戏状态数据(批量更新)"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
for state in states:
# 将游戏数据转换为JSON字符串
game_data_json = json.dumps(state['game_data'], ensure_ascii=False)
cursor.execute(
"INSERT OR REPLACE INTO game_states (id, user_id, game_data, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(
state['id'],
state['user_id'],
game_data_json,
state.get('created_at', datetime.datetime.now().isoformat()),
state.get('updated_at', datetime.datetime.now().isoformat())
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存游戏状态数据失败: {str(e)}")
return False
# 卡牌数据操作
def get_cards():
"""获取卡牌基础数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT cards_data FROM game_cards WHERE id = 1")
row = cursor.fetchone()
if row:
return json.loads(row['cards_data'])
else:
logger.warning("未找到卡牌数据,返回空字典")
return {}
except Exception as e:
logger.error(f"获取卡牌数据失败: {str(e)}")
return {}
# 自定义卡牌操作
def get_custom_cards():
"""获取自定义卡牌数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT c.*, vs.upvotes, vs.downvotes
FROM custom_cards c
LEFT JOIN card_vote_stats vs ON c.id = vs.card_id
""")
rows = cursor.fetchall()
cards = []
for row in rows:
card = {
'id': row['id'],
'title': row['title'],
'character': json.loads(row['character_data']),
'description': row['description'],
'option_a': json.loads(row['option_a']),
'option_b': json.loads(row['option_b']),
'creator_id': row['creator_id'],
'creator_name': row['creator_name'],
'created_at': row['created_at'],
'upvotes': row['upvotes'] or 0,
'downvotes': row['downvotes'] or 0
}
cards.append(card)
return {'cards': cards}
except Exception as e:
logger.error(f"获取自定义卡牌数据失败: {str(e)}")
return {'cards': []}
def save_custom_cards(data):
"""保存自定义卡牌数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 清空现有数据(可选,根据需要修改)
# cursor.execute("DELETE FROM custom_cards")
# 插入新数据
cards = data.get('cards', [])
for card in cards:
cursor.execute(
"""INSERT OR REPLACE INTO custom_cards
(id, title, character_data, description, option_a, option_b, creator_id, creator_name, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
card['id'],
card['title'],
json.dumps(card['character'], ensure_ascii=False),
card['description'],
json.dumps(card['option_a'], ensure_ascii=False),
json.dumps(card['option_b'], ensure_ascii=False),
card['creator_id'],
card['creator_name'],
card.get('created_at', datetime.datetime.now().isoformat())
)
)
# 确保有对应的投票统计记录
cursor.execute(
"INSERT OR IGNORE INTO card_vote_stats (card_id, upvotes, downvotes) VALUES (?, ?, ?)",
(card['id'], card.get('upvotes', 0), card.get('downvotes', 0))
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存自定义卡牌数据失败: {str(e)}")
return False
# 卡牌投票操作
def get_card_votes():
"""获取卡牌投票数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 获取用户投票
cursor.execute("SELECT user_id, card_id, vote_type FROM card_votes")
vote_rows = cursor.fetchall()
user_votes = {}
for row in vote_rows:
user_id = row['user_id']
if user_id not in user_votes:
user_votes[user_id] = {}
user_votes[user_id][row['card_id']] = row['vote_type']
# 获取投票统计
cursor.execute("SELECT card_id, upvotes, downvotes FROM card_vote_stats")
stat_rows = cursor.fetchall()
card_votes = {}
for row in stat_rows:
card_id = row['card_id']
card_votes[card_id] = {
'upvotes': row['upvotes'] or 0,
'downvotes': row['downvotes'] or 0,
'users': {} # 这部分可能需要额外填充,取决于原来的数据结构
}
return {'user_votes': user_votes, 'card_votes': card_votes}
except Exception as e:
logger.error(f"获取卡牌投票数据失败: {str(e)}")
return {'user_votes': {}, 'card_votes': {}}
def save_card_votes(data):
"""保存卡牌投票数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 更新用户投票
user_votes = data.get('user_votes', {})
for user_id, votes in user_votes.items():
for card_id, vote_type in votes.items():
cursor.execute(
"""INSERT OR REPLACE INTO card_votes
(card_id, user_id, vote_type, created_at)
VALUES (?, ?, ?, ?)""",
(
card_id,
user_id,
vote_type,
datetime.datetime.now().isoformat()
)
)
# 更新投票统计
card_votes = data.get('card_votes', {})
for card_id, votes in card_votes.items():
cursor.execute(
"INSERT OR REPLACE INTO card_vote_stats (card_id, upvotes, downvotes) VALUES (?, ?, ?)",
(
card_id,
votes.get('upvotes', 0),
votes.get('downvotes', 0)
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存卡牌投票数据失败: {str(e)}")
return False
# 会话操作
def get_sessions():
"""获取会话数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM sessions")
rows = cursor.fetchall()
active_sessions = []
for row in rows:
session = {
'user_id': row['user_id'],
'login_time': row['login_time'],
'last_activity': row['last_activity'],
'ip_address': row['ip_address'],
'status': row['status']
}
active_sessions.append(session)
return {'active_sessions': active_sessions}
except Exception as e:
logger.error(f"获取会话数据失败: {str(e)}")
return {'active_sessions': []}
def save_sessions(data):
"""保存会话数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 清空旧会话
cursor.execute("DELETE FROM sessions")
# 插入新会话
active_sessions = data.get('active_sessions', [])
for session in active_sessions:
# 生成会话ID
session_id = str(uuid.uuid4())
cursor.execute(
"""INSERT INTO sessions
(id, user_id, login_time, last_activity, ip_address, status)
VALUES (?, ?, ?, ?, ?, ?)""",
(
session_id,
session['user_id'],
session.get('login_time', datetime.datetime.now().isoformat()),
session.get('last_activity', datetime.datetime.now().isoformat()),
session.get('ip_address', ''),
session.get('status', 'idle')
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存会话数据失败: {str(e)}")
return False
# 投票卡牌的实用函数
def vote_card(card_id, user_id, vote_type):
"""对卡牌进行投票或取消投票"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 检查是否已存在投票
cursor.execute("SELECT vote_type FROM card_votes WHERE card_id = ? AND user_id = ?", (card_id, user_id))
existing_vote = cursor.fetchone()
# 检查卡牌投票统计
cursor.execute("SELECT upvotes, downvotes FROM card_vote_stats WHERE card_id = ?", (card_id,))
vote_stats = cursor.fetchone()
upvotes = 0
downvotes = 0
if vote_stats:
upvotes = vote_stats['upvotes'] or 0
downvotes = vote_stats['downvotes'] or 0
new_vote = None
if vote_type == 'none' or (existing_vote and existing_vote['vote_type'] == vote_type):
# 取消投票
if existing_vote:
if existing_vote['vote_type'] == 'upvote':
upvotes = max(0, upvotes - 1)
else:
downvotes = max(0, downvotes - 1)
# 删除投票记录
cursor.execute("DELETE FROM card_votes WHERE card_id = ? AND user_id = ?", (card_id, user_id))
else:
# 如果之前投过票,先取消之前的投票
if existing_vote:
if existing_vote['vote_type'] == 'upvote':
upvotes = max(0, upvotes - 1)
else:
downvotes = max(0, downvotes - 1)
# 添加新投票
if vote_type == 'upvote':
upvotes += 1
new_vote = 'upvote'
else:
downvotes += 1
new_vote = 'downvote'
# 更新或添加投票记录
cursor.execute(
"""INSERT OR REPLACE INTO card_votes
(card_id, user_id, vote_type, created_at)
VALUES (?, ?, ?, ?)""",
(card_id, user_id, vote_type, datetime.datetime.now().isoformat())
)
# 更新投票统计
cursor.execute(
"""INSERT OR REPLACE INTO card_vote_stats
(card_id, upvotes, downvotes)
VALUES (?, ?, ?)""",
(card_id, upvotes, downvotes)
)
# 提交事务
conn.commit()
return {
'success': True,
'upvotes': upvotes,
'downvotes': downvotes,
'newVote': new_vote
}
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"投票失败: {str(e)}")
return {
'success': False,
'error': str(e)
}
# 死亡场景操作
def get_death_scenarios(death_type=None):
"""获取死亡场景数据,可选按死亡类型筛选"""
conn = get_db_connection()
cursor = conn.cursor()
try:
if death_type:
cursor.execute("SELECT * FROM death_scenarios WHERE death_type = ?", (death_type,))
else:
cursor.execute("SELECT * FROM death_scenarios")
rows = cursor.fetchall()
scenarios = []
for row in rows:
scenario = {
'id': row['id'],
'death_type': row['death_type'],
'name': row['name'],
'weight': row['weight'] or 100,
'first_card': json.loads(row['first_card']),
'second_card': json.loads(row['second_card'])
}
scenarios.append(scenario)
return scenarios
except Exception as e:
logger.error(f"获取死亡场景数据失败: {str(e)}")
return []
def get_random_death_scenario(death_type):
"""根据死亡类型获取随机死亡场景,考虑权重"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 获取指定类型的所有死亡场景
cursor.execute("SELECT id, weight FROM death_scenarios WHERE death_type = ?", (death_type,))
scenarios = cursor.fetchall()
if not scenarios:
return None
# 计算权重总和
total_weight = sum(s['weight'] or 100 for s in scenarios)
# 随机选择一个场景,考虑权重
rand_val = random.randint(1, total_weight)
# 根据权重选择
current_weight = 0
selected_id = None
for scenario in scenarios:
current_weight += scenario['weight'] or 100
if rand_val <= current_weight:
selected_id = scenario['id']
break
# 获取选中的场景详情
if selected_id:
cursor.execute("SELECT * FROM death_scenarios WHERE id = ?", (selected_id,))
row = cursor.fetchone()
if row:
return {
'id': row['id'],
'death_type': row['death_type'],
'name': row['name'],
'weight': row['weight'] or 100,
'first_card': json.loads(row['first_card']),
'second_card': json.loads(row['second_card'])
}
return None
except Exception as e:
logger.error(f"获取随机死亡场景失败: {str(e)}")
return None
def save_death_scenarios(scenarios):
"""保存死亡场景数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
for scenario in scenarios:
cursor.execute(
"""INSERT OR REPLACE INTO death_scenarios
(id, death_type, name, weight, first_card, second_card)
VALUES (?, ?, ?, ?, ?, ?)""",
(
scenario['id'],
scenario['death_type'],
scenario['name'],
scenario.get('weight', 100),
json.dumps(scenario['first_card'], ensure_ascii=False),
json.dumps(scenario['second_card'], ensure_ascii=False)
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存死亡场景数据失败: {str(e)}")
return False
# 成就系统操作
def get_achievements():
"""获取所有成就数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM achievements")
rows = cursor.fetchall()
achievements = []
for row in rows:
achievement = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'icon': row['icon'],
'death_scenario_id': row['death_scenario_id'],
'hidden': row['hidden']
}
achievements.append(achievement)
return achievements
except Exception as e:
logger.error(f"获取成就数据失败: {str(e)}")
return []
def get_user_achievements(user_id):
"""获取用户已解锁的成就"""
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT a.*, ua.unlock_time
FROM achievements a
JOIN user_achievements ua ON a.id = ua.achievement_id
WHERE ua.user_id = ?
""", (user_id,))
rows = cursor.fetchall()
unlocked = []
for row in rows:
achievement = {
'id': row['id'],
'name': row['name'],
'description': row['description'],
'icon': row['icon'],
'death_scenario_id': row['death_scenario_id'],
'unlock_time': row['unlock_time']
}
unlocked.append(achievement)
return unlocked
except Exception as e:
logger.error(f"获取用户成就数据失败: {str(e)}")
return []
def unlock_achievement(user_id, achievement_id):
"""解锁用户成就"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 检查成就是否存在
cursor.execute("SELECT id FROM achievements WHERE id = ?", (achievement_id,))
if not cursor.fetchone():
logger.error(f"成就不存在: {achievement_id}")
return False
# 检查用户是否已解锁该成就
cursor.execute(
"SELECT * FROM user_achievements WHERE user_id = ? AND achievement_id = ?",
(user_id, achievement_id)
)
if cursor.fetchone():
# 已解锁,无需重复操作
return True
# 解锁新成就
cursor.execute(
"""INSERT INTO user_achievements (user_id, achievement_id, unlock_time)
VALUES (?, ?, ?)""",
(
user_id,
achievement_id,
datetime.datetime.now().isoformat()
)
)
conn.commit()
return True
except Exception as e:
conn.rollback()
logger.error(f"解锁成就失败: {str(e)}")
return False
def save_achievements(achievements):
"""保存成就数据"""
conn = get_db_connection()
cursor = conn.cursor()
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
for achievement in achievements:
cursor.execute(
"""INSERT OR REPLACE INTO achievements
(id, name, description, icon, death_scenario_id, hidden)
VALUES (?, ?, ?, ?, ?, ?)""",
(
achievement['id'],
achievement['name'],
achievement['description'],
achievement['icon'],
achievement.get('death_scenario_id'),
achievement.get('hidden', 0)
)
)
# 提交事务
conn.commit()
return True
except Exception as e:
# 回滚事务
conn.rollback()
logger.error(f"保存成就数据失败: {str(e)}")
return False