166 lines
5.5 KiB
Python
166 lines
5.5 KiB
Python
"""消息轮询器 - 定期获取群消息"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Set, Optional
|
|
|
|
from .config import TARGET_GROUP_ID, POLL_INTERVAL
|
|
from .napcat_client import NapCatClient
|
|
from .message_handler import MessageHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessagePoller:
|
|
"""消息轮询器 - 使用 get_group_msg_history 获取消息"""
|
|
|
|
def __init__(
|
|
self,
|
|
napcat_client: NapCatClient,
|
|
message_handler: MessageHandler,
|
|
poll_interval: float = POLL_INTERVAL,
|
|
fetch_count: int = 20,
|
|
):
|
|
self.napcat = napcat_client
|
|
self.handler = message_handler
|
|
self.poll_interval = poll_interval
|
|
self.fetch_count = fetch_count
|
|
self.running = False
|
|
|
|
# 已处理的消息 ID 集合
|
|
self.processed_ids: Set[int] = set()
|
|
|
|
# 机器人自己的 QQ 号
|
|
self.bot_qq_id: Optional[int] = None
|
|
|
|
async def start(self):
|
|
"""开始轮询"""
|
|
self.running = True
|
|
logger.info("消息轮询器已启动")
|
|
|
|
# 获取机器人 QQ 号
|
|
login_info = await self.napcat.get_login_info()
|
|
if login_info.get("status") == "ok":
|
|
self.bot_qq_id = login_info.get("data", {}).get("user_id")
|
|
logger.info(f"机器人 QQ 号: {self.bot_qq_id}")
|
|
|
|
# 设置到 config 模块
|
|
from . import config
|
|
config.BOT_QQ_ID = self.bot_qq_id
|
|
logger.info(f"已设置 config.BOT_QQ_ID = {self.bot_qq_id}")
|
|
|
|
# 首次轮询,标记所有历史消息为已处理(避免重复处理旧消息)
|
|
logger.info("首次轮询,标记历史消息...")
|
|
try:
|
|
result = await self.napcat.get_group_msg_history(
|
|
TARGET_GROUP_ID, message_seq=0, count=self.fetch_count
|
|
)
|
|
if result.get("status") == "ok":
|
|
messages = result.get("data", {}).get("messages", [])
|
|
for msg in messages:
|
|
msg_id = msg.get("message_id")
|
|
if msg_id:
|
|
self.processed_ids.add(msg_id)
|
|
logger.info(f"已标记 {len(self.processed_ids)} 条历史消息")
|
|
except Exception as e:
|
|
logger.error(f"首次轮询失败: {e}")
|
|
|
|
logger.info("开始监听新消息...")
|
|
|
|
while self.running:
|
|
try:
|
|
await self._poll_once()
|
|
except Exception as e:
|
|
logger.error(f"轮询异常: {e}", exc_info=True)
|
|
|
|
await asyncio.sleep(self.poll_interval)
|
|
|
|
def stop(self):
|
|
"""停止轮询"""
|
|
self.running = False
|
|
logger.info("消息轮询器已停止")
|
|
|
|
async def _poll_once(self):
|
|
"""执行一次轮询"""
|
|
try:
|
|
logger.debug(f"开始轮询群 {TARGET_GROUP_ID} 的消息...")
|
|
|
|
# 获取群消息历史(从最新消息开始)
|
|
result = await self.napcat.get_group_msg_history(
|
|
TARGET_GROUP_ID, message_seq=0, count=self.fetch_count
|
|
)
|
|
|
|
logger.debug(f"轮询结果: {result}")
|
|
|
|
if result.get("status") != "ok":
|
|
logger.warning(f"获取消息历史失败: {result}")
|
|
return
|
|
|
|
data = result.get("data", {})
|
|
messages = data.get("messages", [])
|
|
|
|
logger.debug(f"获取到 {len(messages)} 条消息")
|
|
|
|
if not messages:
|
|
return
|
|
|
|
# 消息按时间倒序(最新的在前),需要反转
|
|
messages.reverse()
|
|
|
|
# 处理每条消息
|
|
for msg in messages:
|
|
await self._process_message(msg)
|
|
|
|
except Exception as e:
|
|
logger.error(f"轮询消息失败: {e}", exc_info=True)
|
|
|
|
async def _process_message(self, msg: dict):
|
|
"""处理单条消息"""
|
|
try:
|
|
message_id = msg.get("message_id")
|
|
logger.debug(f"处理消息 ID: {message_id}")
|
|
|
|
if not message_id:
|
|
logger.debug("消息 ID 为空,跳过")
|
|
return
|
|
|
|
# 跳过已处理的消息
|
|
if message_id in self.processed_ids:
|
|
logger.debug(f"消息 {message_id} 已处理,跳过")
|
|
return
|
|
|
|
# 记录消息 ID
|
|
self.processed_ids.add(message_id)
|
|
logger.debug(f"已处理消息数: {len(self.processed_ids)}")
|
|
|
|
# 限制集合大小
|
|
if len(self.processed_ids) > 1000:
|
|
# 保留最近的 500 条
|
|
self.processed_ids = set(list(self.processed_ids)[-500:])
|
|
|
|
# 获取发送者信息
|
|
sender = msg.get("sender", {})
|
|
user_id = sender.get("user_id")
|
|
|
|
logger.debug(f"发送者 ID: {user_id}, 机器人 ID: {self.bot_qq_id}")
|
|
|
|
# 跳过机器人自己的消息
|
|
if self.bot_qq_id and user_id == self.bot_qq_id:
|
|
logger.debug("跳过机器人自己的消息")
|
|
return
|
|
|
|
# 解析消息内容
|
|
raw_message = msg.get("raw_message", "")
|
|
if not raw_message:
|
|
logger.debug("消息内容为空,跳过")
|
|
return
|
|
|
|
logger.info(f"收到新消息 [{user_id}]: {raw_message}")
|
|
|
|
# 处理消息
|
|
handled = await self.handler.handle_message(raw_message, user_id)
|
|
logger.info(f"消息处理结果: {handled}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"处理消息失败: {e}", exc_info=True)
|