agent-Specialization/scripts/qq_bot/napcat_client.py

143 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""NapCat API 客户端"""
import asyncio
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path
import aiohttp
from .config import NAPCAT_HOST, NAPCAT_TOKEN, IMAGE_EXTENSIONS
logger = logging.getLogger(__name__)
class NapCatClient:
"""NapCat OneBot11 HTTP API 客户端"""
def __init__(self, host: str = NAPCAT_HOST, token: str = NAPCAT_TOKEN):
self.host = host.rstrip("/")
self.token = token
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _request(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""发送 HTTP 请求到 NapCat"""
if not self.session:
raise RuntimeError("Client not initialized. Use 'async with' context manager.")
url = f"{self.host}/{endpoint}"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}
logger.debug(f"请求 NapCat API: {endpoint}, data={data}")
try:
async with self.session.post(url, json=data or {}, headers=headers) as resp:
result = await resp.json()
logger.debug(f"NapCat API 响应: {result}")
if result.get("status") != "ok":
logger.warning(f"NapCat API 返回非 ok 状态: {result}")
return result
except Exception as e:
logger.error(f"NapCat API 请求失败: {e}")
raise
async def get_login_info(self) -> Dict[str, Any]:
"""获取登录账号信息"""
return await self._request("get_login_info")
async def send_group_msg(self, group_id: int, message: str) -> Dict[str, Any]:
"""发送群消息(纯文本)"""
return await self._request("send_group_msg", {
"group_id": group_id,
"message": message,
})
async def send_group_msg_with_segments(
self, group_id: int, segments: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""发送群消息(消息段数组)"""
return await self._request("send_group_msg", {
"group_id": group_id,
"message": segments,
})
async def send_file(self, group_id: int, file_path: str, description: str = "") -> Dict[str, Any]:
"""发送文件到群
Args:
group_id: 群号
file_path: 文件绝对路径
description: 文件描述
"""
import shutil
import os
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 复制文件到 NapCat 能访问的目录
# 根据你的测试NapCat 在容器内,需要复制到 ~/napcat-config/
napcat_dir = Path.home() / "napcat-config"
napcat_dir.mkdir(parents=True, exist_ok=True)
dest_file = napcat_dir / path.name
shutil.copy2(file_path, dest_file)
logger.info(f"已复制文件到 NapCat 目录: {file_path} -> {dest_file}")
# 使用 NapCat 容器内的路径
# 宿主机的 ~/napcat-config/ 映射到容器内的 /app/napcat/config/
napcat_file_path = f"/app/napcat/config/{path.name}"
# 判断是否为图片
is_image = path.suffix.lower() in IMAGE_EXTENSIONS
if is_image:
# 发送图片消息
segments = []
if description:
segments.append({"type": "text", "data": {"text": description}})
segments.append({"type": "image", "data": {"file": f"file://{napcat_file_path}"}})
return await self.send_group_msg_with_segments(group_id, segments)
else:
# 发送普通文件
segments = []
if description:
segments.append({"type": "text", "data": {"text": description}})
segments.append({
"type": "file",
"data": {
"file": f"file://{napcat_file_path}",
"name": path.name
}
})
return await self.send_group_msg_with_segments(group_id, segments)
async def get_group_msg_history(
self, group_id: int, message_seq: int = 0, count: int = 20
) -> Dict[str, Any]:
"""获取群消息历史
Args:
group_id: 群号
message_seq: 起始消息序号0表示从最新消息开始
count: 获取消息数量
"""
data = {
"group_id": group_id,
"message_seq": message_seq,
"count": count
}
return await self._request("get_group_msg_history", data)