agent-Specialization/scripts/qq_bot/main.py

153 lines
4.5 KiB
Python

"""主程序入口"""
import asyncio
import logging
import signal
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parents[2]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from scripts.qq_bot.config import (
LOG_LEVEL,
LOG_FILE,
TARGET_GROUP_ID,
NAPCAT_HOST,
AGENTS_HOST,
)
from scripts.qq_bot.napcat_client import NapCatClient
from scripts.qq_bot.web_api_client import WebAPIClient
from scripts.qq_bot.state_manager import StateManager
from scripts.qq_bot.message_handler import MessageHandler
from scripts.qq_bot.message_poller import MessagePoller
import scripts.qq_bot.config as config
# 配置日志
# 配置日志
logging.basicConfig(
level=logging.DEBUG, # 文件记录 DEBUG
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(LOG_FILE, encoding="utf-8"),
],
)
# 终端只输出 INFO 及以上
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logging.getLogger().addHandler(console_handler)
logger = logging.getLogger(__name__)
class QQBot:
"""QQ 机器人主类"""
def __init__(self):
self.napcat_client: Optional[NapCatClient] = None
self.web_client: Optional[WebAPIClient] = None
self.state_manager = StateManager()
self.message_handler: Optional[MessageHandler] = None
self.poller: Optional[MessagePoller] = None
self.running = False
async def start(self):
"""启动机器人"""
logger.info("=" * 60)
logger.info("QQ Bot for Agents 启动中...")
logger.info(f"目标群号: {TARGET_GROUP_ID}")
logger.info(f"NapCat: {NAPCAT_HOST}")
logger.info(f"Agents: {AGENTS_HOST}")
logger.info("=" * 60)
try:
# 初始化客户端
self.napcat_client = NapCatClient()
self.web_client = WebAPIClient()
async with self.napcat_client, self.web_client:
# 获取机器人 QQ 号
login_info = await self.napcat_client.get_login_info()
if login_info.get("status") == "ok":
bot_qq = login_info.get("data", {}).get("user_id")
config.BOT_QQ_ID = bot_qq
logger.info(f"机器人 QQ 号: {bot_qq}")
else:
logger.error("无法获取机器人 QQ 号")
return
# 初始化消息处理器
self.message_handler = MessageHandler(
self.napcat_client, self.web_client, self.state_manager
)
# 初始化消息轮询器
self.poller = MessagePoller(
self.napcat_client, self.message_handler
)
# 发送启动消息
await self.napcat_client.send_group_msg(
TARGET_GROUP_ID, "🤖 QQ Bot 已启动,@ 我来使用 Agents 系统"
)
logger.info("机器人已启动,开始轮询消息...")
self.running = True
# 启动轮询
await self.poller.start()
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭...")
except Exception as e:
logger.error(f"机器人运行异常: {e}", exc_info=True)
finally:
await self.stop()
async def stop(self):
"""停止机器人"""
if not self.running:
return
logger.info("正在停止机器人...")
self.running = False
if self.poller:
self.poller.stop()
if self.napcat_client:
try:
await self.napcat_client.send_group_msg(
TARGET_GROUP_ID, "🤖 QQ Bot 已停止"
)
except Exception:
pass
logger.info("机器人已停止")
async def main():
"""主函数"""
bot = QQBot()
# 注册信号处理
def signal_handler(sig, frame):
logger.info(f"收到信号 {sig},准备退出...")
asyncio.create_task(bot.stop())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
await bot.start()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("程序已退出")