"""消息处理器""" import asyncio import logging import re from typing import Optional, Dict, Any, List from .config import TARGET_GROUP_ID, POLL_INTERVAL, POLL_TIMEOUT from .napcat_client import NapCatClient from .web_api_client import WebAPIClient from .state_manager import StateManager from . import config # 导入 config 模块,而不是导入 BOT_QQ_ID logger = logging.getLogger(__name__) class MessageHandler: """消息处理器""" def __init__( self, napcat_client: NapCatClient, web_client: WebAPIClient, state_manager: StateManager, ): self.napcat = napcat_client self.web = web_client self.state = state_manager async def handle_message(self, message: str, user_id: int) -> bool: """处理群消息 Args: message: 消息内容 user_id: 发送者 QQ 号 Returns: 是否处理了该消息 """ logger.debug(f"handle_message 被调用: message={message}, user_id={user_id}") logger.debug(f"BOT_QQ_ID={config.BOT_QQ_ID}") # 检查是否 @ 机器人 is_mentioned = self._is_mentioned(message) logger.debug(f"是否 @ 机器人: {is_mentioned}") if not is_mentioned: logger.debug("未 @ 机器人,忽略消息") return False # 移除 @ 标记 clean_msg = self._clean_message(message) logger.info(f"处理消息: {clean_msg}") # 解析指令 if clean_msg.startswith("/"): logger.info(f"处理指令: {clean_msg}") await self._handle_command(clean_msg) else: # 普通消息 logger.info(f"处理普通消息: {clean_msg}") await self._handle_chat_message(clean_msg) return True def _is_mentioned(self, message: str) -> bool: """检查是否 @ 了机器人""" logger.debug(f"检查是否 @ 机器人: BOT_QQ_ID={config.BOT_QQ_ID}, message={message}") if not config.BOT_QQ_ID: logger.warning("BOT_QQ_ID 未设置") return False # 检查 CQ 码格式的 at pattern = rf"\[CQ:at,qq={config.BOT_QQ_ID}\]" result = bool(re.search(pattern, message)) logger.debug(f"匹配结果: {result}, pattern={pattern}") return result def _clean_message(self, message: str) -> str: """清理消息,移除 @ 标记""" # 移除所有 CQ:at 标记 cleaned = re.sub(r"\[CQ:at,qq=\d+\]", "", message) return cleaned.strip() async def _handle_command(self, command: str): """处理指令""" parts = command.split(maxsplit=1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" try: if cmd == "/help": await self._cmd_help() elif cmd == "/test": await self._cmd_test() elif cmd == "/new": await self._cmd_new() elif cmd == "/resume": limit = int(args) if args.isdigit() else 10 await self._cmd_resume(limit) elif cmd == "/switch": if not args: await self.napcat.send_group_msg( TARGET_GROUP_ID, "❌ 请提供对话 ID,例如:/switch conv_xxx" ) return await self._cmd_switch(args.strip()) elif cmd == "/stop": await self._cmd_stop() else: await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 未知指令: {cmd}\n发送 /help 查看帮助" ) except Exception as e: logger.error(f"处理指令失败: {e}", exc_info=True) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 指令执行失败: {str(e)}" ) async def _cmd_help(self): """显示帮助信息""" help_text = """🤖 QQ Bot 使用帮助 📝 基本指令: /help - 显示此帮助信息 /new - 创建新对话 /resume [数量] - 查看最近的对话列表(默认10条) /switch <对话ID> - 切换到指定对话 /stop - 停止当前运行的任务 /test - 测试自定义工具是否可用 💬 发送消息: 直接 @ 我并输入消息即可与 AI 对话 例如:@机器人 今天天气怎么样? 📌 注意事项: • 只能在空闲时发送消息或执行指令(/stop 除外) • 运行期间的消息会被忽略 • 模型可以使用工具执行命令、搜索网页等""" await self.napcat.send_group_msg(TARGET_GROUP_ID, help_text) async def _cmd_test(self): """测试自定义工具""" try: # 发送一个简单的测试消息 message = "请告诉我你有哪些可用的工具?特别是有没有 send_qq_file 工具?" await self._send_message(message) except Exception as e: logger.error(f"测试失败: {e}", exc_info=True) await self.napcat.send_group_msg(TARGET_GROUP_ID, f"❌ 测试失败: {str(e)}") async def _cmd_new(self): """创建新对话""" state = self.state.get_state() if state.is_running(): await self.napcat.send_group_msg( TARGET_GROUP_ID, "❌ 当前有任务正在运行,请等待完成或使用 /stop 停止" ) return result = await self.web.create_conversation(thinking_mode=False, mode="fast") if result.get("success"): conv_id = result.get("conversation_id") state.set_conversation(conv_id) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"✅ 已创建新对话: {conv_id}" ) else: await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 创建对话失败: {result.get('error')}" ) async def _cmd_resume(self, limit: int): """获取最近对话列表""" state = self.state.get_state() if state.is_running(): await self.napcat.send_group_msg( TARGET_GROUP_ID, "❌ 当前有任务正在运行,请等待完成或使用 /stop 停止" ) return result = await self.web.list_conversations(limit=limit) if not result.get("success"): await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 获取对话列表失败: {result.get('error')}" ) return conversations = result.get("data", {}).get("conversations", []) if not conversations: await self.napcat.send_group_msg(TARGET_GROUP_ID, "📋 暂无对话记录") return # 格式化对话列表 lines = [f"📋 最近 {len(conversations)} 条对话:\n"] for conv in conversations: conv_id = conv.get("id", "") title = conv.get("title", "无标题") lines.append(f"• {conv_id}\n {title}") message = "\n".join(lines) await self.napcat.send_group_msg(TARGET_GROUP_ID, message) async def _cmd_switch(self, conversation_id: str): """切换对话""" state = self.state.get_state() if state.is_running(): await self.napcat.send_group_msg( TARGET_GROUP_ID, "❌ 当前有任务正在运行,请等待完成或使用 /stop 停止" ) return result = await self.web.load_conversation(conversation_id) if result.get("success"): state.set_conversation(conversation_id) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"✅ 已切换到对话: {conversation_id}" ) else: await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 切换对话失败: {result.get('error')}" ) async def _cmd_stop(self): """停止当前任务""" state = self.state.get_state() if not state.is_running(): await self.napcat.send_group_msg(TARGET_GROUP_ID, "ℹ️ 当前没有运行中的任务") return if not state.task_id: await self.napcat.send_group_msg(TARGET_GROUP_ID, "❌ 任务 ID 丢失") state.finish_task() return result = await self.web.cancel_task(state.task_id) if result.get("success"): await self.napcat.send_group_msg(TARGET_GROUP_ID, "✅ 已停止任务") state.finish_task() else: await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 停止任务失败: {result.get('error')}" ) async def _handle_chat_message(self, message: str): """处理普通聊天消息""" state = self.state.get_state() # 检查是否空闲 if state.is_running(): logger.info("任务运行中,忽略消息") return # 检查是否有对话 if not state.conversation_id: # 自动创建对话 logger.info("自动创建对话...") result = await self.web.create_conversation(thinking_mode=False, mode="fast") if result.get("success"): conv_id = result.get("conversation_id") state.set_conversation(conv_id) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"✅ 已自动创建对话: {conv_id}" ) else: await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 创建对话失败: {result.get('error')}" ) return # 发送消息 try: logger.info(f"发送消息到对话 {state.conversation_id}: {message}") result = await self.web.send_message(message, state.conversation_id) if not result.get("success"): await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 发送消息失败: {result.get('error')}" ) return task_id = result.get("data", {}).get("task_id") if not task_id: await self.napcat.send_group_msg(TARGET_GROUP_ID, "❌ 未获取到任务 ID") return logger.info(f"任务已创建: {task_id}") state.start_task(task_id) # 开始轮询任务 logger.info(f"开始轮询任务 {task_id}") await self._poll_task(task_id) except Exception as e: logger.error(f"处理消息失败: {e}", exc_info=True) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 处理消息失败: {str(e)}" ) state.finish_task() async def _poll_task(self, task_id: str): """轮询任务直到完成""" state = self.state.get_state() timeout_counter = 0 max_timeout = POLL_TIMEOUT / POLL_INTERVAL logger.info(f"开始轮询任务 {task_id},间隔 {POLL_INTERVAL}秒") while state.is_running(): try: logger.debug(f"轮询任务 {task_id},offset={state.last_offset}") result = await self.web.poll_task(task_id, state.last_offset) if not result.get("success"): logger.error(f"轮询任务失败: {result}") await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 任务执行失败: {result.get('error')}" ) state.finish_task() break data = result.get("data", {}) status = data.get("status") events = data.get("events", []) next_offset = data.get("next_offset", state.last_offset) logger.debug(f"任务状态: {status}, 事件数: {len(events)}, next_offset: {next_offset}") # 更新偏移量 state.last_offset = next_offset # 处理事件 if events: logger.info(f"处理 {len(events)} 个事件") await self._process_events(events) # 检查任务状态 if status in {"succeeded", "failed", "cancelled"}: logger.info(f"任务完成,状态: {status}") if status == "failed": await self.napcat.send_group_msg( TARGET_GROUP_ID, "❌ 任务执行失败" ) elif status == "cancelled": await self.napcat.send_group_msg( TARGET_GROUP_ID, "ℹ️ 任务已取消" ) state.finish_task() break # 超时检查 timeout_counter += 1 if timeout_counter > max_timeout: logger.warning("任务超时") await self.napcat.send_group_msg( TARGET_GROUP_ID, "⚠️ 任务超时,已自动停止" ) await self.web.cancel_task(task_id) state.finish_task() break # 等待下次轮询 await asyncio.sleep(POLL_INTERVAL) except Exception as e: logger.error(f"轮询任务异常: {e}", exc_info=True) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 任务执行异常: {str(e)}" ) state.finish_task() break async def _process_events(self, events: List[Dict[str, Any]]): """处理任务事件""" for event in events: event_type = event.get("type") event_data = event.get("data", {}) logger.debug(f"收到事件: type={event_type}, data_keys={list(event_data.keys())}") try: if event_type == "tool_start": # 工具开始执行 tool_name = data.get("name", "") logger.info(f"工具开始: {tool_name}") await self._handle_tool_start(event_data) # 检测到 send_qq_file 工具,记录工具 ID if tool_name == "send_qq_file": tool_id = event_data.get("id", "") logger.info(f"检测到 send_qq_file 工具调用,ID: {tool_id}") # 保存到状态,等待 update_action state = self.state.get_state() if not hasattr(state, 'pending_send_file_tools'): state.pending_send_file_tools = set() state.pending_send_file_tools.add(tool_id) elif event_type == "update_action": # 工具执行完成(包含结果) tool_id = event_data.get("id", "") status = event_data.get("status", "") result = event_data.get("result", {}) # 检查是否是 send_qq_file 工具 state = self.state.get_state() if hasattr(state, 'pending_send_file_tools') and tool_id in state.pending_send_file_tools: logger.info(f"收到 send_qq_file 工具结果,status: {status}") state.pending_send_file_tools.remove(tool_id) # 处理文件发送 if status == "success": await self._handle_send_file_result(result) elif event_type == "text_end": # 模型输出完成 await self._handle_text_end(event_data) except Exception as e: logger.error(f"处理事件失败: {event_type}, {e}", exc_info=True) async def _handle_tool_start(self, data: Dict[str, Any]): """处理工具调用开始""" tool_name = data.get("name", "unknown") arguments = data.get("arguments", {}) # 格式化工具调用信息 lines = [f"[工具] {tool_name}"] # 提取关键参数 if tool_name == "run_command": cmd = arguments.get("command", "") lines.append(f"参数: {cmd}") elif tool_name == "web_search": query = arguments.get("query", "") lines.append(f"参数: {query}") elif tool_name == "read_file": path = arguments.get("path", "") lines.append(f"参数: {path}") elif tool_name == "write_file": path = arguments.get("path", "") lines.append(f"参数: {path}") else: # 其他工具,显示所有参数 if arguments: args_str = ", ".join(f"{k}={v}" for k, v in arguments.items()) lines.append(f"参数: {args_str}") message = "\n".join(lines) await self.napcat.send_group_msg(TARGET_GROUP_ID, message) async def _handle_text_end(self, data: Dict[str, Any]): """处理模型输出完成""" content = data.get("full_content", "") if content: # 发送模型输出 await self.napcat.send_group_msg(TARGET_GROUP_ID, content) async def _handle_send_file_result(self, result: Dict[str, Any]): """处理 send_qq_file 工具结果""" try: # result 可能是字符串或字典 if isinstance(result, str): import json result_data = json.loads(result) else: result_data = result if not result_data.get("success"): error = result_data.get("error", "未知错误") await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 发送文件失败: {error}" ) return file_path = result_data.get("file_path", "") description = result_data.get("description", "") if not file_path: logger.warning("send_qq_file 工具未提供文件路径") return # 转换容器内路径到宿主机路径 # 容器内: /workspace/xxx -> 宿主机: users/qqbot/project/xxx if file_path.startswith("/workspace/"): relative_path = file_path[len("/workspace/"):] # 获取项目根目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) host_path = os.path.join(project_root, "users", "qqbot", "project", relative_path) else: # 如果不是 /workspace 开头,假设是相对路径 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) host_path = os.path.join(project_root, "users", "qqbot", "project", file_path) logger.info(f"转换文件路径: {file_path} -> {host_path}") # 检查文件是否存在 if not os.path.exists(host_path): logger.error(f"文件不存在: {host_path}") await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 文件不存在: {file_path}" ) return # 发送文件到 QQ 群 await self.napcat.send_file(TARGET_GROUP_ID, host_path, description) logger.info(f"已发送文件: {host_path}") except json.JSONDecodeError as e: logger.error(f"解析工具输出失败: {e}") except Exception as e: logger.error(f"发送文件失败: {e}", exc_info=True) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 发送文件失败: {str(e)}" ) async def _handle_tool_result(self, data: Dict[str, Any]): """处理工具执行结果""" tool_name = data.get("name", "") # 只处理 send_qq_file 工具 if tool_name != "send_qq_file": return # 解析工具输出 output = data.get("output", "") if not output: logger.warning("send_qq_file 工具无输出") return try: import json result = json.loads(output) if not result.get("success"): error = result.get("error", "未知错误") await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 发送文件失败: {error}" ) return file_path = result.get("file_path", "") description = result.get("description", "") if not file_path: logger.warning("send_qq_file 工具未提供文件路径") return # 转换容器内路径到宿主机路径 # 容器内: /workspace/xxx -> 宿主机: users/qqbot/project/xxx if file_path.startswith("/workspace/"): relative_path = file_path[len("/workspace/"):] # 获取项目根目录 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) host_path = os.path.join(project_root, "users", "qqbot", "project", relative_path) else: # 如果不是 /workspace 开头,假设是相对路径 project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) host_path = os.path.join(project_root, "users", "qqbot", "project", file_path) logger.info(f"转换文件路径: {file_path} -> {host_path}") # 检查文件是否存在 if not os.path.exists(host_path): logger.error(f"文件不存在: {host_path}") await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 文件不存在: {file_path}" ) return # 发送文件到 QQ 群 await self.napcat.send_file(TARGET_GROUP_ID, host_path, description) logger.info(f"已发送文件: {host_path}") except json.JSONDecodeError as e: logger.error(f"解析工具输出失败: {e}") except Exception as e: logger.error(f"发送文件失败: {e}", exc_info=True) await self.napcat.send_group_msg( TARGET_GROUP_ID, f"❌ 发送文件失败: {str(e)}" )