From 1c4a7ba003fb82f87a9007f02276c07fc6064973 Mon Sep 17 00:00:00 2001 From: JOJO <1498581755@qq.com> Date: Sat, 15 Nov 2025 16:33:25 +0800 Subject: [PATCH] fix: isolate sub agent limits --- modules/sub_agent_manager.py | 20 +++++++++--- sub_agent/web_server.py | 60 +++++++++++++++++++++++++++++++++--- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/modules/sub_agent_manager.py b/modules/sub_agent_manager.py index 057eb88..85cb1ba 100644 --- a/modules/sub_agent_manager.py +++ b/modules/sub_agent_manager.py @@ -74,10 +74,10 @@ class SubAgentManager: "error": f"该对话已使用过编号 {agent_id},请更换新的子智能体代号。" } - if self._active_task_count() >= SUB_AGENT_MAX_ACTIVE: + if self._active_task_count(conversation_id) >= SUB_AGENT_MAX_ACTIVE: return { "success": False, - "error": f"已有 {SUB_AGENT_MAX_ACTIVE} 个子智能体在运行,请稍后再试。", + "error": f"该对话已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试。", } task_id = self._generate_task_id(agent_id) @@ -245,8 +245,17 @@ class SubAgentManager: suffix = uuid.uuid4().hex[:6] return f"sub_{agent_id}_{int(time.time())}_{suffix}" - def _active_task_count(self) -> int: - return len([t for t in self.tasks.values() if t.get("status") in {"pending", "running"}]) + def _active_task_count(self, conversation_id: Optional[str] = None) -> int: + active = [ + t for t in self.tasks.values() + if t.get("status") in {"pending", "running"} + ] + if conversation_id: + active = [ + t for t in active + if t.get("conversation_id") == conversation_id + ] + return len(active) def _copy_reference_files(self, references: List[str], dest_dir: Path) -> Tuple[List[str], List[str]]: copied = [] @@ -497,6 +506,9 @@ class SubAgentManager: """返回子智能体任务概览,用于前端展示。""" overview: List[Dict[str, Any]] = [] for task_id, task in self.tasks.items(): + status = task.get("status") + if status not in {"pending", "running"}: + continue if conversation_id and task.get("conversation_id") != conversation_id: continue snapshot = { diff --git a/sub_agent/web_server.py b/sub_agent/web_server.py index 7f71aea..57b22d8 100644 --- a/sub_agent/web_server.py +++ b/sub_agent/web_server.py @@ -82,6 +82,8 @@ sub_agent_tasks: Dict[str, Dict[str, Any]] = {} sub_agent_terminals: Dict[str, SubAgentTerminal] = {} sub_agent_rooms: Dict[str, set] = defaultdict(set) sub_agent_connections: Dict[str, str] = {} +SUB_AGENT_TERMINAL_STATUSES = {"completed", "failed", "timeout"} +STOPPING_GRACE_SECONDS = 30 def format_read_file_result(result_data: Dict) -> str: """格式化 read_file 工具的输出,便于在Web端展示。""" @@ -244,8 +246,52 @@ def build_finish_tool_reminder(meta: Dict[str, Any]) -> str: ) -def get_active_sub_agent_count() -> int: - return len([task for task in sub_agent_tasks.values() if task.get("status") in {"pending", "running"}]) +def cleanup_inactive_sub_agent_tasks(force: bool = False): + """移除已结束或长期停止中的子智能体,避免占用名额。""" + now = time.time() + for task_id, task in list(sub_agent_tasks.items()): + status = (task.get("status") or "").lower() + if status in SUB_AGENT_TERMINAL_STATUSES: + _purge_sub_agent_task(task_id) + continue + if status == "stopping": + updated_at = task.get("updated_at") or task.get("created_at") or now + if force or (now - updated_at) > STOPPING_GRACE_SECONDS: + _purge_sub_agent_task(task_id) + + +def _purge_sub_agent_task(task_id: str): + """移除本地记录与相关连接。""" + sub_agent_tasks.pop(task_id, None) + terminal = sub_agent_terminals.pop(task_id, None) + if terminal and hasattr(terminal, "close"): + try: + terminal.close() + except Exception: + pass + room_sids = sub_agent_rooms.pop(task_id, set()) + for sid in list(room_sids): + sub_agent_connections.pop(sid, None) + for sid, current in list(sub_agent_connections.items()): + if current == task_id: + sub_agent_connections.pop(sid, None) + for sid in list(stop_flags.keys()): + if task_id in sid: + stop_flags.pop(sid, None) + + +def get_active_sub_agent_count(conversation_id: Optional[str] = None) -> int: + cleanup_inactive_sub_agent_tasks() + normalized = _normalize_conversation_id(conversation_id) + count = 0 + for task in sub_agent_tasks.values(): + if task.get("status") not in {"pending", "running"}: + continue + if normalized: + if _normalize_conversation_id(task.get("parent_conversation_id")) != normalized: + continue + count += 1 + return count def find_sub_agent_conversation_file(conv_id: str) -> Optional[Path]: @@ -1984,12 +2030,15 @@ def create_sub_agent_task(payload: Dict[str, Any]) -> Dict[str, Any]: if not payload.get(field): return {"success": False, "error": f"缺少必要参数: {field}"} + cleanup_inactive_sub_agent_tasks() task_id = payload["task_id"] if task_id in sub_agent_tasks: return {"success": False, "error": f"任务 {task_id} 已存在"} - if get_active_sub_agent_count() >= SUB_AGENT_MAX_ACTIVE: - return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试"} + parent_conv = payload.get("parent_conversation_id") + if get_active_sub_agent_count(parent_conv) >= SUB_AGENT_MAX_ACTIVE: + limit_note = f"同一对话最多可同时运行 {SUB_AGENT_MAX_ACTIVE} 个子智能体" + return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试({limit_note})。"} workspace_dir = Path(payload["workspace_dir"]).resolve() references_dir = Path(payload["references_dir"]).resolve() @@ -4153,6 +4202,7 @@ def mark_task_completed(task_id: str, reason: Optional[str] = None): "status": "completed", "reason": reason or "" }) + cleanup_inactive_sub_agent_tasks() def mark_task_failed(task_id: str, message: str): @@ -4161,6 +4211,7 @@ def mark_task_failed(task_id: str, message: str): "status": "failed", "message": message }) + cleanup_inactive_sub_agent_tasks() def mark_task_timeout(task_id: str): @@ -4172,6 +4223,7 @@ def mark_task_timeout(task_id: str): "status": "timeout", "message": "任务已超时" }) + cleanup_inactive_sub_agent_tasks() def start_sub_agent_monitor():