fix: isolate sub agent limits

This commit is contained in:
JOJO 2025-11-15 16:33:25 +08:00
parent 42433c3062
commit 1c4a7ba003
2 changed files with 72 additions and 8 deletions

View File

@ -74,10 +74,10 @@ class SubAgentManager:
"error": f"该对话已使用过编号 {agent_id},请更换新的子智能体代号。"
}
if self._active_task_count() >= SUB_AGENT_MAX_ACTIVE:
if self._active_task_count(conversation_id) >= SUB_AGENT_MAX_ACTIVE:
return {
"success": False,
"error": f"已有 {SUB_AGENT_MAX_ACTIVE}子智能体在运行,请稍后再试。",
"error": f"该对话已存在 {SUB_AGENT_MAX_ACTIVE}运行中的子智能体,请稍后再试。",
}
task_id = self._generate_task_id(agent_id)
@ -245,8 +245,17 @@ class SubAgentManager:
suffix = uuid.uuid4().hex[:6]
return f"sub_{agent_id}_{int(time.time())}_{suffix}"
def _active_task_count(self) -> int:
return len([t for t in self.tasks.values() if t.get("status") in {"pending", "running"}])
def _active_task_count(self, conversation_id: Optional[str] = None) -> int:
active = [
t for t in self.tasks.values()
if t.get("status") in {"pending", "running"}
]
if conversation_id:
active = [
t for t in active
if t.get("conversation_id") == conversation_id
]
return len(active)
def _copy_reference_files(self, references: List[str], dest_dir: Path) -> Tuple[List[str], List[str]]:
copied = []
@ -497,6 +506,9 @@ class SubAgentManager:
"""返回子智能体任务概览,用于前端展示。"""
overview: List[Dict[str, Any]] = []
for task_id, task in self.tasks.items():
status = task.get("status")
if status not in {"pending", "running"}:
continue
if conversation_id and task.get("conversation_id") != conversation_id:
continue
snapshot = {

View File

@ -82,6 +82,8 @@ sub_agent_tasks: Dict[str, Dict[str, Any]] = {}
sub_agent_terminals: Dict[str, SubAgentTerminal] = {}
sub_agent_rooms: Dict[str, set] = defaultdict(set)
sub_agent_connections: Dict[str, str] = {}
SUB_AGENT_TERMINAL_STATUSES = {"completed", "failed", "timeout"}
STOPPING_GRACE_SECONDS = 30
def format_read_file_result(result_data: Dict) -> str:
"""格式化 read_file 工具的输出便于在Web端展示。"""
@ -244,8 +246,52 @@ def build_finish_tool_reminder(meta: Dict[str, Any]) -> str:
)
def get_active_sub_agent_count() -> int:
return len([task for task in sub_agent_tasks.values() if task.get("status") in {"pending", "running"}])
def cleanup_inactive_sub_agent_tasks(force: bool = False):
"""移除已结束或长期停止中的子智能体,避免占用名额。"""
now = time.time()
for task_id, task in list(sub_agent_tasks.items()):
status = (task.get("status") or "").lower()
if status in SUB_AGENT_TERMINAL_STATUSES:
_purge_sub_agent_task(task_id)
continue
if status == "stopping":
updated_at = task.get("updated_at") or task.get("created_at") or now
if force or (now - updated_at) > STOPPING_GRACE_SECONDS:
_purge_sub_agent_task(task_id)
def _purge_sub_agent_task(task_id: str):
"""移除本地记录与相关连接。"""
sub_agent_tasks.pop(task_id, None)
terminal = sub_agent_terminals.pop(task_id, None)
if terminal and hasattr(terminal, "close"):
try:
terminal.close()
except Exception:
pass
room_sids = sub_agent_rooms.pop(task_id, set())
for sid in list(room_sids):
sub_agent_connections.pop(sid, None)
for sid, current in list(sub_agent_connections.items()):
if current == task_id:
sub_agent_connections.pop(sid, None)
for sid in list(stop_flags.keys()):
if task_id in sid:
stop_flags.pop(sid, None)
def get_active_sub_agent_count(conversation_id: Optional[str] = None) -> int:
cleanup_inactive_sub_agent_tasks()
normalized = _normalize_conversation_id(conversation_id)
count = 0
for task in sub_agent_tasks.values():
if task.get("status") not in {"pending", "running"}:
continue
if normalized:
if _normalize_conversation_id(task.get("parent_conversation_id")) != normalized:
continue
count += 1
return count
def find_sub_agent_conversation_file(conv_id: str) -> Optional[Path]:
@ -1984,12 +2030,15 @@ def create_sub_agent_task(payload: Dict[str, Any]) -> Dict[str, Any]:
if not payload.get(field):
return {"success": False, "error": f"缺少必要参数: {field}"}
cleanup_inactive_sub_agent_tasks()
task_id = payload["task_id"]
if task_id in sub_agent_tasks:
return {"success": False, "error": f"任务 {task_id} 已存在"}
if get_active_sub_agent_count() >= SUB_AGENT_MAX_ACTIVE:
return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试"}
parent_conv = payload.get("parent_conversation_id")
if get_active_sub_agent_count(parent_conv) >= SUB_AGENT_MAX_ACTIVE:
limit_note = f"同一对话最多可同时运行 {SUB_AGENT_MAX_ACTIVE} 个子智能体"
return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试({limit_note})。"}
workspace_dir = Path(payload["workspace_dir"]).resolve()
references_dir = Path(payload["references_dir"]).resolve()
@ -4153,6 +4202,7 @@ def mark_task_completed(task_id: str, reason: Optional[str] = None):
"status": "completed",
"reason": reason or ""
})
cleanup_inactive_sub_agent_tasks()
def mark_task_failed(task_id: str, message: str):
@ -4161,6 +4211,7 @@ def mark_task_failed(task_id: str, message: str):
"status": "failed",
"message": message
})
cleanup_inactive_sub_agent_tasks()
def mark_task_timeout(task_id: str):
@ -4172,6 +4223,7 @@ def mark_task_timeout(task_id: str):
"status": "timeout",
"message": "任务已超时"
})
cleanup_inactive_sub_agent_tasks()
def start_sub_agent_monitor():