"""子智能体任务管理。""" import json import shutil import time import uuid from pathlib import Path from typing import Dict, List, Optional, Tuple import httpx from config import ( OUTPUT_FORMATS, SUB_AGENT_DEFAULT_TIMEOUT, SUB_AGENT_MAX_ACTIVE, SUB_AGENT_PROJECT_RESULTS_DIR, SUB_AGENT_SERVICE_BASE_URL, SUB_AGENT_STATE_FILE, SUB_AGENT_STATUS_POLL_INTERVAL, SUB_AGENT_TASKS_BASE_DIR, ) from utils.logger import setup_logger logger = setup_logger(__name__) TERMINAL_STATUSES = {"completed", "failed", "timeout"} class SubAgentManager: """负责主智能体与子智能体服务之间的任务调度。""" def __init__(self, project_path: str, data_dir: str): self.project_path = Path(project_path).resolve() self.data_dir = Path(data_dir).resolve() self.base_dir = Path(SUB_AGENT_TASKS_BASE_DIR).resolve() self.results_dir = Path(SUB_AGENT_PROJECT_RESULTS_DIR).resolve() self.state_file = Path(SUB_AGENT_STATE_FILE).resolve() self.base_dir.mkdir(parents=True, exist_ok=True) self.results_dir.mkdir(parents=True, exist_ok=True) self.state_file.parent.mkdir(parents=True, exist_ok=True) self.tasks: Dict[str, Dict] = {} self._load_state() # ------------------------------------------------------------------ # 公共方法 # ------------------------------------------------------------------ def create_sub_agent( self, *, agent_id: int, summary: str, task: str, target_dir: str, reference_files: Optional[List[str]] = None, timeout_seconds: Optional[int] = None, ) -> Dict: """创建子智能体任务并启动远端服务。""" reference_files = reference_files or [] validation_error = self._validate_create_params(agent_id, summary, task, target_dir) if validation_error: return {"success": False, "error": validation_error} if self._active_task_count() >= SUB_AGENT_MAX_ACTIVE: return { "success": False, "error": f"已有 {SUB_AGENT_MAX_ACTIVE} 个子智能体在运行,请稍后再试。", } task_id = self._generate_task_id(agent_id) task_root = self.base_dir / task_id references_dir = task_root / "references" deliverables_dir = task_root / "deliverables" workspace_dir = task_root / "workspace" for path in (task_root, references_dir, deliverables_dir, workspace_dir): path.mkdir(parents=True, exist_ok=True) copied_refs, copy_errors = self._copy_reference_files(reference_files, references_dir) if copy_errors: return {"success": False, "error": "; ".join(copy_errors)} try: target_project_dir = self._ensure_project_subdir(target_dir) except ValueError as exc: return {"success": False, "error": str(exc)} timeout_seconds = timeout_seconds or SUB_AGENT_DEFAULT_TIMEOUT payload = { "task_id": task_id, "agent_id": agent_id, "summary": summary, "task": task, "target_project_dir": str(target_project_dir), "workspace_dir": str(workspace_dir), "references_dir": str(references_dir), "deliverables_dir": str(deliverables_dir), "timeout_seconds": timeout_seconds, } service_response = self._call_service("POST", "/tasks", payload, timeout_seconds + 5) if not service_response.get("success"): self._cleanup_task_folder(task_root) return { "success": False, "error": service_response.get("error", "子智能体服务调用失败"), "details": service_response, } status = service_response.get("status", "pending") task_record = { "task_id": task_id, "agent_id": agent_id, "summary": summary, "task": task, "status": status, "target_project_dir": str(target_project_dir), "references_dir": str(references_dir), "deliverables_dir": str(deliverables_dir), "workspace_dir": str(workspace_dir), "copied_references": copied_refs, "timeout_seconds": timeout_seconds, "service_payload": payload, "created_at": time.time(), } self.tasks[task_id] = task_record self._save_state() message = f"子智能体{agent_id} 已创建,任务ID: {task_id},当前状态:{status}" print(f"{OUTPUT_FORMATS['info']} {message}") return { "success": True, "task_id": task_id, "agent_id": agent_id, "status": status, "message": message, "deliverables_dir": str(deliverables_dir), "copied_references": copied_refs, } def wait_for_completion( self, *, task_id: Optional[str] = None, agent_id: Optional[int] = None, timeout_seconds: Optional[int] = None, ) -> Dict: """阻塞等待子智能体完成或超时。""" task = self._select_task(task_id, agent_id) if not task: return {"success": False, "error": "未找到对应的子智能体任务"} if task.get("status") in TERMINAL_STATUSES and task.get("final_result"): return task["final_result"] timeout_seconds = timeout_seconds or task.get("timeout_seconds") or SUB_AGENT_DEFAULT_TIMEOUT deadline = time.time() + timeout_seconds last_payload: Optional[Dict] = None while time.time() < deadline: last_payload = self._call_service("GET", f"/tasks/{task['task_id']}", timeout=15) status = last_payload.get("status") if not last_payload.get("success") and status not in TERMINAL_STATUSES: time.sleep(SUB_AGENT_STATUS_POLL_INTERVAL) continue if status in {"completed", "failed", "timeout"}: break time.sleep(SUB_AGENT_STATUS_POLL_INTERVAL) else: status = "timeout" last_payload = {"success": False, "status": status, "message": "等待超时"} if not last_payload: last_payload = {"success": False, "status": "unknown", "message": "无法获取子智能体状态"} status = "unknown" else: status = last_payload.get("status", status) finalize_result = self._finalize_task(task, last_payload or {}, status) self._save_state() return finalize_result # ------------------------------------------------------------------ # 内部工具方法 # ------------------------------------------------------------------ def _load_state(self): if self.state_file.exists(): try: data = json.loads(self.state_file.read_text(encoding="utf-8")) self.tasks = data.get("tasks", {}) except json.JSONDecodeError: logger.warning("子智能体状态文件损坏,已忽略。") self.tasks = {} else: self.tasks = {} def _save_state(self): payload = {"tasks": self.tasks} self.state_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def _generate_task_id(self, agent_id: int) -> str: suffix = uuid.uuid4().hex[:6] return f"sub_{agent_id}_{int(time.time())}_{suffix}" def _active_task_count(self) -> int: return len([t for t in self.tasks.values() if t.get("status") in {"pending", "running"}]) def _copy_reference_files(self, references: List[str], dest_dir: Path) -> Tuple[List[str], List[str]]: copied = [] errors = [] for rel_path in references: rel_path = rel_path.strip() if not rel_path: continue try: source = self._resolve_project_file(rel_path) except ValueError as exc: errors.append(str(exc)) continue if not source.exists(): errors.append(f"参考文件不存在: {rel_path}") continue target_path = dest_dir / rel_path target_path.parent.mkdir(parents=True, exist_ok=True) try: shutil.copy2(source, target_path) copied.append(rel_path) except Exception as exc: errors.append(f"复制 {rel_path} 失败: {exc}") return copied, errors def _ensure_project_subdir(self, relative_dir: str) -> Path: relative_dir = relative_dir.strip() if relative_dir else "" if not relative_dir: relative_dir = "sub_agent_results" target = (self.project_path / relative_dir).resolve() if not str(target).startswith(str(self.project_path)): raise ValueError("指定文件夹必须位于项目目录内") target.mkdir(parents=True, exist_ok=True) return target def _resolve_project_file(self, relative_path: str) -> Path: relative_path = relative_path.strip() candidate = (self.project_path / relative_path).resolve() if not str(candidate).startswith(str(self.project_path)): raise ValueError(f"非法的参考文件路径: {relative_path}") return candidate def _select_task(self, task_id: Optional[str], agent_id: Optional[int]) -> Optional[Dict]: if task_id: return self.tasks.get(task_id) if agent_id is None: return None # 返回最新的匹配任务 candidates = [ task for task in self.tasks.values() if task.get("agent_id") == agent_id and task.get("status") in {"pending", "running"} ] if candidates: candidates.sort(key=lambda item: item.get("created_at", 0), reverse=True) return candidates[0] return None def poll_updates(self) -> List[Dict]: """检查运行中的子智能体任务,返回新完成的结果。""" updates: List[Dict] = [] pending_tasks = [ task for task in self.tasks.values() if task.get("status") not in TERMINAL_STATUSES ] logger.debug(f"[SubAgentManager] 待检查任务: {len(pending_tasks)}") if not pending_tasks: return updates state_changed = False for task in pending_tasks: payload = self._call_service("GET", f"/tasks/{task['task_id']}", timeout=10) status = payload.get("status") logger.debug(f"[SubAgentManager] 任务 {task['task_id']} 服务状态: {status}") if status not in TERMINAL_STATUSES: continue result = self._finalize_task(task, payload, status) updates.append(result) state_changed = True if state_changed: self._save_state() return updates def _call_service(self, method: str, path: str, payload: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict: url = f"{SUB_AGENT_SERVICE_BASE_URL.rstrip('/')}{path}" try: with httpx.Client(timeout=timeout or 10) as client: if method.upper() == "POST": response = client.post(url, json=payload or {}) else: response = client.get(url) response.raise_for_status() return response.json() except httpx.RequestError as exc: logger.error(f"子智能体服务请求失败: {exc}") return {"success": False, "error": f"无法连接子智能体服务: {exc}"} except httpx.HTTPStatusError as exc: logger.error(f"子智能体服务返回错误: {exc}") try: return exc.response.json() except Exception: return {"success": False, "error": f"服务端错误: {exc.response.text}"} except json.JSONDecodeError: return {"success": False, "error": "子智能体服务返回格式错误"} def _finalize_task(self, task: Dict, service_payload: Dict, status: str) -> Dict: existing_result = task.get("final_result") if existing_result and task.get("status") in TERMINAL_STATUSES: return existing_result task["status"] = status task["updated_at"] = time.time() message = service_payload.get("message") or service_payload.get("error") or "" deliverables_dir = Path(service_payload.get("deliverables_dir") or task.get("deliverables_dir", "")) logger.debug(f"[SubAgentManager] finalize task={task['task_id']} status={status}") if status != "completed": result = { "success": False, "task_id": task["task_id"], "agent_id": task["agent_id"], "status": status, "message": message or f"子智能体状态:{status}", "details": service_payload, "system_message": self._build_system_message(task, status, None, message), } task["final_result"] = result return result if not deliverables_dir.exists(): result = { "success": False, "task_id": task["task_id"], "agent_id": task["agent_id"], "status": "failed", "error": f"未找到交付目录: {deliverables_dir}", "system_message": self._build_system_message(task, "failed", None, f"未找到交付目录: {deliverables_dir}"), } task["status"] = "failed" task["final_result"] = result return result result_md = deliverables_dir / "result.md" if not result_md.exists(): result = { "success": False, "task_id": task["task_id"], "agent_id": task["agent_id"], "status": "failed", "error": "交付目录缺少 result.md,无法完成任务。", "system_message": self._build_system_message(task, "failed", None, "交付目录缺少 result.md"), } task["status"] = "failed" task["final_result"] = result return result copied_path = self._copy_deliverables_to_project(task, deliverables_dir) task["copied_path"] = str(copied_path) system_message = self._build_system_message(task, status, copied_path, message) result = { "success": True, "task_id": task["task_id"], "agent_id": task["agent_id"], "status": status, "message": message or "子智能体已完成任务。", "deliverables_path": str(deliverables_dir), "copied_path": str(copied_path), "system_message": system_message, "details": service_payload, } task["final_result"] = result return result def _copy_deliverables_to_project(self, task: Dict, source_dir: Path) -> Path: """将交付文件复制到项目目录下的指定文件夹。""" target_dir = Path(task["target_project_dir"]) target_dir.mkdir(parents=True, exist_ok=True) dest_dir = target_dir / f"{task['task_id']}_deliverables" if dest_dir.exists(): shutil.rmtree(dest_dir) shutil.copytree(source_dir, dest_dir) return dest_dir def _cleanup_task_folder(self, task_root: Path): if task_root.exists(): shutil.rmtree(task_root, ignore_errors=True) def _validate_create_params(self, agent_id: Optional[int], summary: str, task: str, target_dir: str) -> Optional[str]: if agent_id is None: return "子智能体代号不能为空" try: agent_id = int(agent_id) except ValueError: return "子智能体代号必须是整数" if not (1 <= agent_id <= SUB_AGENT_MAX_ACTIVE): return f"子智能体代号必须在 1~{SUB_AGENT_MAX_ACTIVE} 范围内" if not summary or not summary.strip(): return "任务摘要不能为空" if not task or not task.strip(): return "任务详情不能为空" if target_dir is None: return "指定文件夹不能为空" return None def _build_system_message( self, task: Dict, status: str, copied_path: Optional[Path], extra_message: Optional[str] = None, ) -> str: prefix = f"子智能体{task['agent_id']} 任务摘要:{task['summary']}" extra = (extra_message or "").strip() if status == "completed" and copied_path: msg = f"{prefix} 已完成,成果已复制到 {copied_path}。" if extra: msg += f" ({extra})" return msg if status == "timeout": return f"{prefix} 超时未完成。" + (f" {extra}" if extra else "") if status == "failed": return f"{prefix} 执行失败:" + (extra if extra else "请检查交付目录或任务状态。") return f"{prefix} 状态:{status}。" + (extra if extra else "")