agent-Specialization/scripts/qq_bot/message_poller.py

166 lines
5.5 KiB
Python

"""消息轮询器 - 定期获取群消息"""
import asyncio
import logging
from typing import Set, Optional
from .config import TARGET_GROUP_ID, POLL_INTERVAL
from .napcat_client import NapCatClient
from .message_handler import MessageHandler
logger = logging.getLogger(__name__)
class MessagePoller:
"""消息轮询器 - 使用 get_group_msg_history 获取消息"""
def __init__(
self,
napcat_client: NapCatClient,
message_handler: MessageHandler,
poll_interval: float = POLL_INTERVAL,
fetch_count: int = 20,
):
self.napcat = napcat_client
self.handler = message_handler
self.poll_interval = poll_interval
self.fetch_count = fetch_count
self.running = False
# 已处理的消息 ID 集合
self.processed_ids: Set[int] = set()
# 机器人自己的 QQ 号
self.bot_qq_id: Optional[int] = None
async def start(self):
"""开始轮询"""
self.running = True
logger.info("消息轮询器已启动")
# 获取机器人 QQ 号
login_info = await self.napcat.get_login_info()
if login_info.get("status") == "ok":
self.bot_qq_id = login_info.get("data", {}).get("user_id")
logger.info(f"机器人 QQ 号: {self.bot_qq_id}")
# 设置到 config 模块
from . import config
config.BOT_QQ_ID = self.bot_qq_id
logger.info(f"已设置 config.BOT_QQ_ID = {self.bot_qq_id}")
# 首次轮询,标记所有历史消息为已处理(避免重复处理旧消息)
logger.info("首次轮询,标记历史消息...")
try:
result = await self.napcat.get_group_msg_history(
TARGET_GROUP_ID, message_seq=0, count=self.fetch_count
)
if result.get("status") == "ok":
messages = result.get("data", {}).get("messages", [])
for msg in messages:
msg_id = msg.get("message_id")
if msg_id:
self.processed_ids.add(msg_id)
logger.info(f"已标记 {len(self.processed_ids)} 条历史消息")
except Exception as e:
logger.error(f"首次轮询失败: {e}")
logger.info("开始监听新消息...")
while self.running:
try:
await self._poll_once()
except Exception as e:
logger.error(f"轮询异常: {e}", exc_info=True)
await asyncio.sleep(self.poll_interval)
def stop(self):
"""停止轮询"""
self.running = False
logger.info("消息轮询器已停止")
async def _poll_once(self):
"""执行一次轮询"""
try:
logger.debug(f"开始轮询群 {TARGET_GROUP_ID} 的消息...")
# 获取群消息历史(从最新消息开始)
result = await self.napcat.get_group_msg_history(
TARGET_GROUP_ID, message_seq=0, count=self.fetch_count
)
logger.debug(f"轮询结果: {result}")
if result.get("status") != "ok":
logger.warning(f"获取消息历史失败: {result}")
return
data = result.get("data", {})
messages = data.get("messages", [])
logger.debug(f"获取到 {len(messages)} 条消息")
if not messages:
return
# 消息按时间倒序(最新的在前),需要反转
messages.reverse()
# 处理每条消息
for msg in messages:
await self._process_message(msg)
except Exception as e:
logger.error(f"轮询消息失败: {e}", exc_info=True)
async def _process_message(self, msg: dict):
"""处理单条消息"""
try:
message_id = msg.get("message_id")
logger.debug(f"处理消息 ID: {message_id}")
if not message_id:
logger.debug("消息 ID 为空,跳过")
return
# 跳过已处理的消息
if message_id in self.processed_ids:
logger.debug(f"消息 {message_id} 已处理,跳过")
return
# 记录消息 ID
self.processed_ids.add(message_id)
logger.debug(f"已处理消息数: {len(self.processed_ids)}")
# 限制集合大小
if len(self.processed_ids) > 1000:
# 保留最近的 500 条
self.processed_ids = set(list(self.processed_ids)[-500:])
# 获取发送者信息
sender = msg.get("sender", {})
user_id = sender.get("user_id")
logger.debug(f"发送者 ID: {user_id}, 机器人 ID: {self.bot_qq_id}")
# 跳过机器人自己的消息
if self.bot_qq_id and user_id == self.bot_qq_id:
logger.debug("跳过机器人自己的消息")
return
# 解析消息内容
raw_message = msg.get("raw_message", "")
if not raw_message:
logger.debug("消息内容为空,跳过")
return
logger.info(f"收到新消息 [{user_id}]: {raw_message}")
# 处理消息
handled = await self.handler.handle_message(raw_message, user_id)
logger.info(f"消息处理结果: {handled}")
except Exception as e:
logger.error(f"处理消息失败: {e}", exc_info=True)