agent-Specialization/modules/sub_agent_manager.py

908 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""子智能体任务管理(子进程模式)。"""
import json
import subprocess
import time
import uuid
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from config import (
OUTPUT_FORMATS,
SUB_AGENT_DEFAULT_TIMEOUT,
SUB_AGENT_MAX_ACTIVE,
SUB_AGENT_STATE_FILE,
SUB_AGENT_STATUS_POLL_INTERVAL,
SUB_AGENT_TASKS_BASE_DIR,
)
from utils.logger import setup_logger
import logging
# 静音子智能体日志(交由前端提示/brief_log处理
logger = setup_logger(__name__)
logger.setLevel(logging.CRITICAL)
logger.disabled = True
logger.propagate = False
for h in list(logger.handlers):
logger.removeHandler(h)
TERMINAL_STATUSES = {"completed", "failed", "timeout"}
class SubAgentManager:
"""负责主智能体与子智能体的任务调度(子进程模式)。"""
def __init__(self, project_path: str, data_dir: str):
self.project_path = Path(project_path).resolve()
self.data_dir = Path(data_dir).resolve()
self.base_dir = Path(SUB_AGENT_TASKS_BASE_DIR).resolve()
self.state_file = Path(SUB_AGENT_STATE_FILE).resolve()
# easyagent批处理入口
self.easyagent_batch = Path(__file__).parent.parent / "easyagent" / "src" / "batch" / "index.js"
self.base_dir.mkdir(parents=True, exist_ok=True)
self.state_file.parent.mkdir(parents=True, exist_ok=True)
self.tasks: Dict[str, Dict] = {}
self.conversation_agents: Dict[str, List[int]] = {}
self.processes: Dict[str, subprocess.Popen] = {} # task_id -> Popen对象
self._load_state()
# ------------------------------------------------------------------
# 公共方法
# ------------------------------------------------------------------
def create_sub_agent(
self,
*,
agent_id: int,
summary: str,
task: str,
deliverables_dir: str,
timeout_seconds: Optional[int] = None,
conversation_id: Optional[str] = None,
run_in_background: bool = False,
model_key: Optional[str] = None,
) -> Dict:
"""创建子智能体任务并启动子进程。"""
validation_error = self._validate_create_params(agent_id, summary, task, deliverables_dir)
if validation_error:
return {"success": False, "error": validation_error}
if not conversation_id:
return {"success": False, "error": "缺少对话ID无法创建子智能体"}
if not self._ensure_agent_slot_available(conversation_id, agent_id):
return {
"success": False,
"error": f"该对话已使用过编号 {agent_id},请更换新的子智能体代号。"
}
if self._active_task_count(conversation_id) >= SUB_AGENT_MAX_ACTIVE:
return {
"success": False,
"error": f"该对话已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试。",
}
task_id = self._generate_task_id(agent_id)
task_root = self.base_dir / task_id
task_root.mkdir(parents=True, exist_ok=True)
# 解析deliverables_dir相对于project_path
try:
deliverables_path = self._resolve_deliverables_dir(deliverables_dir)
except ValueError as exc:
return {"success": False, "error": str(exc)}
# 创建.subagent目录用于存储对话历史
subagent_dir = deliverables_path / ".subagent"
subagent_dir.mkdir(parents=True, exist_ok=True)
# 准备文件路径
task_file = task_root / "task.txt"
system_prompt_file = task_root / "system_prompt.txt"
output_file = task_root / "output.json"
stats_file = task_root / "stats.json"
# 构建用户消息
user_message = self._build_user_message(agent_id, summary, task, deliverables_path, timeout_seconds)
task_file.write_text(user_message, encoding="utf-8")
# 构建系统提示
system_prompt = self._build_system_prompt()
system_prompt_file.write_text(system_prompt, encoding="utf-8")
# 启动子进程
timeout_seconds = timeout_seconds or SUB_AGENT_DEFAULT_TIMEOUT
cmd = [
"node",
str(self.easyagent_batch),
"--workspace", str(self.project_path),
"--task-file", str(task_file),
"--system-prompt-file", str(system_prompt_file),
"--output-file", str(output_file),
"--stats-file", str(stats_file),
"--agent-id", str(agent_id),
"--timeout", str(timeout_seconds),
]
if model_key:
cmd.extend(["--model-key", model_key])
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.project_path),
)
except Exception as exc:
return {"success": False, "error": f"启动子智能体失败: {exc}"}
# 记录任务
task_record = {
"task_id": task_id,
"agent_id": agent_id,
"summary": summary,
"task": task,
"status": "running",
"deliverables_dir": str(deliverables_path),
"subagent_dir": str(subagent_dir),
"timeout_seconds": timeout_seconds,
"created_at": time.time(),
"conversation_id": conversation_id,
"run_in_background": run_in_background,
"task_root": str(task_root),
"output_file": str(output_file),
"stats_file": str(stats_file),
"pid": process.pid,
}
self.tasks[task_id] = task_record
self.processes[task_id] = process
self._mark_agent_id_used(conversation_id, agent_id)
self._save_state()
message = f"子智能体{agent_id} 已创建任务ID: {task_id}PID: {process.pid}"
print(f"{OUTPUT_FORMATS['info']} {message}")
return {
"success": True,
"task_id": task_id,
"agent_id": agent_id,
"status": "running",
"message": message,
"deliverables_dir": str(deliverables_path),
"run_in_background": run_in_background,
}
def wait_for_completion(
self,
*,
task_id: Optional[str] = None,
agent_id: Optional[int] = None,
timeout_seconds: Optional[int] = None,
) -> Dict:
"""阻塞等待子智能体完成或超时。"""
task = self._select_task(task_id, agent_id)
if not task:
return {"success": False, "error": "未找到对应的子智能体任务"}
if task.get("status") in TERMINAL_STATUSES or task.get("status") == "terminated":
if task.get("final_result"):
return task["final_result"]
return {"success": False, "status": task.get("status"), "message": "子智能体已结束。"}
timeout_seconds = timeout_seconds or task.get("timeout_seconds") or SUB_AGENT_DEFAULT_TIMEOUT
deadline = time.time() + timeout_seconds
while time.time() < deadline:
# 检查进程状态
status_result = self._check_task_status(task)
if status_result["status"] in TERMINAL_STATUSES:
return status_result
time.sleep(SUB_AGENT_STATUS_POLL_INTERVAL)
# 超时
return self._handle_timeout(task)
def terminate_sub_agent(
self,
*,
task_id: Optional[str] = None,
agent_id: Optional[int] = None,
) -> Dict:
"""强制关闭指定子智能体。"""
task = self._select_task(task_id, agent_id)
if not task:
return {"success": False, "error": "未找到对应的子智能体任务"}
task_id = task["task_id"]
process = self.processes.get(task_id)
if process and process.poll() is None:
# 进程还在运行,终止它
try:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except Exception as exc:
return {"success": False, "error": f"终止进程失败: {exc}"}
task["status"] = "terminated"
task["final_result"] = {
"success": False,
"status": "terminated",
"task_id": task_id,
"agent_id": task.get("agent_id"),
"message": "子智能体已被强制关闭。",
}
self._save_state()
return {
"success": True,
"task_id": task_id,
"message": "子智能体已被强制关闭。",
"system_message": f"🛑 子智能体{task.get('agent_id')} 已被手动关闭。",
}
# ------------------------------------------------------------------
# 内部工具方法
# ------------------------------------------------------------------
def _check_task_status(self, task: Dict) -> Dict:
"""检查任务状态,如果完成则解析输出。"""
task_id = task["task_id"]
process = self.processes.get(task_id)
# 检查进程是否结束
if process:
returncode = process.poll()
if returncode is None:
# 进程还在运行
return {"status": "running", "task_id": task_id}
# 进程已结束,读取输出文件
output_file = Path(task.get("output_file", ""))
if not output_file.exists():
# 输出文件不存在,可能是异常退出
task["status"] = "failed"
task["updated_at"] = time.time()
result = {
"success": False,
"status": "failed",
"task_id": task_id,
"agent_id": task.get("agent_id"),
"message": "子智能体异常退出,未生成输出文件。",
"system_message": f"❌ 子智能体{task.get('agent_id')} 异常退出。",
}
task["final_result"] = result
return result
# 解析输出
try:
output = json.loads(output_file.read_text(encoding="utf-8"))
except Exception as exc:
task["status"] = "failed"
task["updated_at"] = time.time()
result = {
"success": False,
"status": "failed",
"task_id": task_id,
"agent_id": task.get("agent_id"),
"message": f"输出文件解析失败: {exc}",
"system_message": f"❌ 子智能体{task.get('agent_id')} 输出解析失败。",
}
task["final_result"] = result
return result
# 根据输出更新任务状态
success = output.get("success", False)
summary = output.get("summary", "")
stats = output.get("stats", {})
if output.get("timeout"):
status = "timeout"
elif output.get("max_turns_exceeded"):
status = "failed"
summary = f"任务执行超过最大轮次限制。{summary}"
elif success:
status = "completed"
else:
status = "failed"
task["status"] = status
task["updated_at"] = time.time()
# 构建系统消息
agent_id = task.get("agent_id")
task_summary = task.get("summary")
deliverables_dir = task.get("deliverables_dir")
if status == "completed":
system_message = f"✅ 子智能体{agent_id} 任务摘要:{task_summary} 已完成。\n\n{summary}\n\n交付目录:{deliverables_dir}"
elif status == "timeout":
system_message = f"⏱️ 子智能体{agent_id} 任务摘要:{task_summary} 超时未完成。\n\n{summary}"
else:
system_message = f"❌ 子智能体{agent_id} 任务摘要:{task_summary} 执行失败。\n\n{summary}"
result = {
"success": success,
"status": status,
"task_id": task_id,
"agent_id": agent_id,
"message": summary,
"deliverables_dir": deliverables_dir,
"stats": stats,
"system_message": system_message,
}
task["final_result"] = result
return result
def _handle_timeout(self, task: Dict) -> Dict:
"""处理任务超时。"""
task_id = task["task_id"]
process = self.processes.get(task_id)
# 终止进程
if process and process.poll() is None:
try:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except Exception:
pass
task["status"] = "timeout"
task["updated_at"] = time.time()
result = {
"success": False,
"status": "timeout",
"task_id": task_id,
"agent_id": task.get("agent_id"),
"message": "等待超时,子智能体已被终止。",
"system_message": f"⏱️ 子智能体{task.get('agent_id')} 任务摘要:{task.get('summary')} 超时未完成。",
}
task["final_result"] = result
self._save_state()
return result
def _build_user_message(
self,
agent_id: int,
summary: str,
task: str,
deliverables_path: Path,
timeout_seconds: Optional[int],
) -> str:
"""构建发送给子智能体的用户消息。"""
timeout_seconds = timeout_seconds or SUB_AGENT_DEFAULT_TIMEOUT
return f"""你是子智能体 #{agent_id},负责完成以下任务:
**任务摘要**{summary}
**任务详情**
{task}
**交付目录**{deliverables_path}
请将所有生成的文件保存到此目录。对话历史会自动保存到 {deliverables_path}/.subagent/ 目录。
**超时时间**{timeout_seconds}
完成任务后,请调用 finish_task 工具提交完成报告。"""
def _build_system_prompt(self) -> str:
"""构建子智能体的系统提示。"""
import platform
from datetime import datetime
system_info = f"{platform.system()} {platform.release()}"
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return f"""你是一个专注的子智能体,负责独立完成分配的任务。
# 身份定位
你是主智能体创建的子智能体,拥有完整的工具能力(读写文件、执行命令、搜索网页等)。你的职责是专注完成分配的单一任务,不要偏离任务目标。
# 工作流程
1. **理解任务**:仔细阅读任务描述,明确目标和要求
2. **制定计划**:规划完成任务的步骤
3. **执行任务**:使用工具完成各个步骤
4. **生成交付**:将所有结果文件放到指定的交付目录
5. **提交报告**:使用 finish_task 工具提交完成报告并退出
# 工作原则
## 专注性
- 只完成分配的任务,不要做额外的工作
- 不要尝试与用户对话或询问问题
- 遇到问题时,在能力范围内解决或在报告中说明
## 独立性
- 你与主智能体共享工作区,可以访问所有文件
- 你的工作范围应该与其他子智能体不重叠
- 不要修改任务描述之外的文件
## 效率性
- 直接开始工作,不要过度解释
- 合理使用工具,避免重复操作
- 注意超时限制,在时间内完成核心工作
## 完整性
- 确保交付目录中的文件完整可用
- 生成的文档要清晰、格式正确
- 代码要包含必要的注释和说明
# 交付要求
所有结果文件必须放在指定的交付目录中,包括:
- 主要成果文件(文档、代码、报告等)
- 支持文件(数据、配置、示例等)
- 不要在交付目录外创建文件
# 完成任务
任务完成后,必须调用 finish_task 工具:
- success: 是否成功完成
- summary: 完成摘要(说明做了什么、生成了什么)
调用 finish_task 后,你会立即退出,无法继续工作。
# 工具使用
你拥有以下工具能力:
- read_file: 读取文件内容
- write_file / edit_file: 创建或修改文件
- search_workspace: 搜索文件和代码
- run_command: 执行终端命令
- web_search / extract_webpage: 搜索和提取网页内容
- finish_task: 完成任务并退出(必须调用)
# 注意事项
1. **不要无限循环**:如果任务无法完成,说明原因并提交报告
2. **不要超出范围**:只操作任务描述中指定的文件/目录
3. **不要等待输入**:你是自主运行的,不会收到用户的进一步指令
4. **注意时间限制**:超时会被强制终止,优先完成核心工作
# 当前环境
- 工作区路径: {self.project_path}
- 系统: {system_info}
- 当前时间: {current_time}
现在开始执行任务。"""
def _resolve_deliverables_dir(self, relative_dir: str) -> Path:
"""解析交付目录相对于project_path"""
relative_dir = relative_dir.strip() if relative_dir else ""
if not relative_dir:
raise ValueError("交付目录不能为空,必须指定")
deliverables_path = (self.project_path / relative_dir).resolve()
if not str(deliverables_path).startswith(str(self.project_path)):
raise ValueError("交付目录必须位于项目目录内")
deliverables_path.mkdir(parents=True, exist_ok=True)
return deliverables_path
def _load_state(self):
if self.state_file.exists():
try:
data = json.loads(self.state_file.read_text(encoding="utf-8"))
self.tasks = data.get("tasks", {})
self.conversation_agents = data.get("conversation_agents", {})
except json.JSONDecodeError:
logger.warning("子智能体状态文件损坏,已忽略。")
self.tasks = {}
self.conversation_agents = {}
else:
self.tasks = {}
self.conversation_agents = {}
if self.tasks:
migrated = False
for task in self.tasks.values():
if task.get("parent_conversation_id"):
continue
candidate = task.get("conversation_id") or (task.get("service_payload") or {}).get("parent_conversation_id")
if candidate:
task["parent_conversation_id"] = candidate
migrated = True
if migrated:
self._save_state()
def _save_state(self):
payload = {
"tasks": self.tasks,
"conversation_agents": self.conversation_agents
}
self.state_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def _generate_task_id(self, agent_id: int) -> str:
suffix = uuid.uuid4().hex[:6]
return f"sub_{agent_id}_{int(time.time())}_{suffix}"
def _active_task_count(self, conversation_id: Optional[str] = None) -> int:
active = [
t for t in self.tasks.values()
if t.get("status") in {"pending", "running"}
]
if conversation_id:
active = [
t for t in active
if t.get("conversation_id") == conversation_id
]
return len(active)
def _copy_reference_files(self, references: List[str], dest_dir: Path) -> Tuple[List[str], List[str]]:
copied = []
errors = []
for rel_path in references:
rel_path = rel_path.strip()
if not rel_path:
continue
try:
source = self._resolve_project_file(rel_path)
except ValueError as exc:
errors.append(str(exc))
continue
if not source.exists():
errors.append(f"参考文件不存在: {rel_path}")
continue
target_path = dest_dir / rel_path
target_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy2(source, target_path)
copied.append(rel_path)
except Exception as exc:
errors.append(f"复制 {rel_path} 失败: {exc}")
return copied, errors
def _ensure_project_subdir(self, relative_dir: str) -> Path:
relative_dir = relative_dir.strip() if relative_dir else ""
if not relative_dir:
relative_dir = "sub_agent_results"
target = (self.project_path / relative_dir).resolve()
if not str(target).startswith(str(self.project_path)):
raise ValueError("指定文件夹必须位于项目目录内")
target.mkdir(parents=True, exist_ok=True)
return target
def _resolve_project_file(self, relative_path: str) -> Path:
relative_path = relative_path.strip()
candidate = (self.project_path / relative_path).resolve()
if not str(candidate).startswith(str(self.project_path)):
raise ValueError(f"非法的参考文件路径: {relative_path}")
return candidate
def _select_task(self, task_id: Optional[str], agent_id: Optional[int]) -> Optional[Dict]:
if task_id:
return self.tasks.get(task_id)
if agent_id is None:
return None
# 返回最新的匹配任务
candidates = [
task for task in self.tasks.values()
if task.get("agent_id") == agent_id and task.get("status") in {"pending", "running"}
]
if candidates:
candidates.sort(key=lambda item: item.get("created_at", 0), reverse=True)
return candidates[0]
return None
def lookup_task(self, *, task_id: Optional[str] = None, agent_id: Optional[int] = None) -> Optional[Dict]:
"""只读查询任务信息,供 wait_sub_agent 自动调整超时时间。"""
task = self._select_task(task_id, agent_id)
if not task:
return None
return {
"task_id": task.get("task_id"),
"agent_id": task.get("agent_id"),
"status": task.get("status"),
"timeout_seconds": task.get("timeout_seconds"),
"conversation_id": task.get("conversation_id"),
}
def poll_updates(self) -> List[Dict]:
"""检查运行中的子智能体任务,返回新完成的结果。"""
updates: List[Dict] = []
pending_tasks = [
task for task in self.tasks.values()
if task.get("status") not in TERMINAL_STATUSES.union({"terminated"})
]
logger.debug(f"[SubAgentManager] 待检查任务: {len(pending_tasks)}")
if not pending_tasks:
return updates
state_changed = False
for task in pending_tasks:
result = self._check_task_status(task)
if result["status"] in TERMINAL_STATUSES:
updates.append(result)
state_changed = True
if state_changed:
self._save_state()
return updates
def get_sub_agent_status(
self,
*,
agent_ids: Optional[List[int]] = None,
) -> Dict:
"""获取指定子智能体的详细状态。"""
if not agent_ids:
return {"success": False, "error": "必须指定至少一个agent_id"}
results = []
for agent_id in agent_ids:
task = self._select_task(None, agent_id)
if not task:
results.append({
"agent_id": agent_id,
"found": False,
"error": "未找到对应的子智能体任务"
})
continue
# 如果任务还在运行,检查最新状态
if task.get("status") not in TERMINAL_STATUSES.union({"terminated"}):
self._check_task_status(task)
# 读取统计信息
stats = {}
stats_file = Path(task.get("stats_file", ""))
if stats_file.exists():
try:
stats = json.loads(stats_file.read_text(encoding="utf-8"))
except Exception:
pass
results.append({
"agent_id": agent_id,
"found": True,
"task_id": task["task_id"],
"status": task["status"],
"summary": task.get("summary"),
"created_at": task.get("created_at"),
"updated_at": task.get("updated_at"),
"deliverables_dir": task.get("deliverables_dir"),
"stats": stats,
"final_result": task.get("final_result"),
})
return {
"success": True,
"results": results,
}
def _call_service(self, method: str, path: str, payload: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
url = f"{SUB_AGENT_SERVICE_BASE_URL.rstrip('/')}{path}"
try:
with httpx.Client(timeout=timeout or 10) as client:
if method.upper() == "POST":
response = client.post(url, json=payload or {})
else:
response = client.get(url)
response.raise_for_status()
return response.json()
except httpx.RequestError as exc:
logger.error(f"子智能体服务请求失败: {exc}")
return {"success": False, "error": f"无法连接子智能体服务: {exc}"}
except httpx.HTTPStatusError as exc:
logger.error(f"子智能体服务返回错误: {exc}")
try:
return exc.response.json()
except Exception:
return {"success": False, "error": f"服务端错误: {exc.response.text}"}
except json.JSONDecodeError:
return {"success": False, "error": "子智能体服务返回格式错误"}
def _finalize_task(self, task: Dict, service_payload: Dict, status: str) -> Dict:
existing_result = task.get("final_result")
if existing_result and task.get("status") in TERMINAL_STATUSES.union({"terminated"}):
return existing_result
task["status"] = status
task["updated_at"] = time.time()
message = service_payload.get("message") or service_payload.get("error") or ""
deliverables_dir = Path(service_payload.get("deliverables_dir") or task.get("deliverables_dir", ""))
logger.debug(f"[SubAgentManager] finalize task={task['task_id']} status={status}")
if status == "terminated":
system_message = service_payload.get("system_message") or "🛑 子智能体已被手动关闭。"
result = {
"success": False,
"task_id": task["task_id"],
"agent_id": task["agent_id"],
"status": "terminated",
"message": message or "子智能体已被手动关闭。",
"details": service_payload,
"sub_conversation_id": task.get("sub_conversation_id"),
"system_message": system_message,
}
task["final_result"] = result
return result
if status != "completed":
result = {
"success": False,
"task_id": task["task_id"],
"agent_id": task["agent_id"],
"status": status,
"message": message or f"子智能体状态:{status}",
"details": service_payload,
"sub_conversation_id": task.get("sub_conversation_id"),
"system_message": self._build_system_message(task, status, None, message),
}
task["final_result"] = result
return result
if not deliverables_dir.exists():
result = {
"success": False,
"task_id": task["task_id"],
"agent_id": task["agent_id"],
"status": "failed",
"error": f"未找到交付目录: {deliverables_dir}",
"system_message": self._build_system_message(task, "failed", None, f"未找到交付目录: {deliverables_dir}"),
}
task["status"] = "failed"
task["final_result"] = result
return result
result_md = deliverables_dir / "result.md"
if not result_md.exists():
result = {
"success": False,
"task_id": task["task_id"],
"agent_id": task["agent_id"],
"status": "failed",
"error": "交付目录缺少 result.md无法完成任务。",
"system_message": self._build_system_message(task, "failed", None, "交付目录缺少 result.md"),
}
task["status"] = "failed"
task["final_result"] = result
return result
copied_path = self._copy_deliverables_to_project(task, deliverables_dir)
task["copied_path"] = str(copied_path)
system_message = self._build_system_message(task, status, copied_path, message)
result = {
"success": True,
"task_id": task["task_id"],
"agent_id": task["agent_id"],
"status": status,
"message": message or "子智能体已完成任务。",
"deliverables_path": str(deliverables_dir),
"copied_path": str(copied_path),
"sub_conversation_id": task.get("sub_conversation_id"),
"system_message": system_message,
"details": service_payload,
}
task["final_result"] = result
return result
def _copy_deliverables_to_project(self, task: Dict, source_dir: Path) -> Path:
"""将交付文件复制到项目目录下的指定文件夹。"""
target_dir = Path(task["target_project_dir"])
target_dir.mkdir(parents=True, exist_ok=True)
dest_dir = target_dir / f"{task['task_id']}_deliverables"
if dest_dir.exists():
shutil.rmtree(dest_dir)
shutil.copytree(source_dir, dest_dir)
return dest_dir
def _cleanup_task_folder(self, task_root: Path):
if task_root.exists():
shutil.rmtree(task_root, ignore_errors=True)
def _ensure_agent_slot_available(self, conversation_id: str, agent_id: int) -> bool:
used = self.conversation_agents.setdefault(conversation_id, [])
return agent_id not in used
def _mark_agent_id_used(self, conversation_id: str, agent_id: int):
used = self.conversation_agents.setdefault(conversation_id, [])
if agent_id not in used:
used.append(agent_id)
def _validate_create_params(self, agent_id: Optional[int], summary: str, task: str, target_dir: str) -> Optional[str]:
if agent_id is None:
return "子智能体代号不能为空"
try:
agent_id = int(agent_id)
except ValueError:
return "子智能体代号必须是整数"
if agent_id <= 0:
return "子智能体代号必须为正整数"
if not summary or not summary.strip():
return "任务摘要不能为空"
if not task or not task.strip():
return "任务详情不能为空"
if target_dir is None:
return "指定文件夹不能为空"
return None
def _build_system_message(
self,
task: Dict,
status: str,
copied_path: Optional[Path],
extra_message: Optional[str] = None,
) -> str:
prefix = f"子智能体{task['agent_id']} 任务摘要:{task['summary']}"
extra = (extra_message or "").strip()
if status == "completed" and copied_path:
msg = f"{prefix} 已完成,成果已复制到 {copied_path}"
if extra:
msg += f" ({extra})"
return msg
if status == "timeout":
return f"{prefix} 超时未完成。" + (f" {extra}" if extra else "")
if status == "failed":
return f"{prefix} 执行失败:" + (extra if extra else "请检查交付目录或任务状态。")
return f"{prefix} 状态:{status}" + (extra if extra else "")
def get_overview(self, conversation_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""返回子智能体任务概览,用于前端展示。"""
overview: List[Dict[str, Any]] = []
for task_id, task in self.tasks.items():
if conversation_id and task.get("conversation_id") != conversation_id:
continue
snapshot = {
"task_id": task_id,
"agent_id": task.get("agent_id"),
"summary": task.get("summary"),
"status": task.get("status"),
"created_at": task.get("created_at"),
"updated_at": task.get("updated_at"),
"target_dir": task.get("target_project_dir"),
"last_tool": task.get("last_tool"),
"deliverables_dir": task.get("deliverables_dir"),
"copied_path": task.get("copied_path"),
"conversation_id": task.get("conversation_id"),
"sub_conversation_id": task.get("sub_conversation_id"),
}
# 运行中的任务检查进程状态
if snapshot["status"] not in TERMINAL_STATUSES and snapshot["status"] != "terminated":
# 检查进程是否还在运行
process = self.processes.get(task_id)
if process:
poll_result = process.poll()
if poll_result is not None:
# 进程已结束,检查输出
status_result = self._check_task_status(task)
snapshot["status"] = status_result.get("status", "failed")
if status_result.get("status") in TERMINAL_STATUSES:
task["status"] = status_result["status"]
task["final_result"] = status_result
if snapshot["status"] in TERMINAL_STATUSES or snapshot["status"] == "terminated":
# 已结束的任务带上最终结果/系统消息,方便前端展示
final_result = task.get("final_result") or {}
snapshot["final_message"] = final_result.get("system_message") or final_result.get("message")
snapshot["success"] = final_result.get("success")
overview.append(snapshot)
overview.sort(key=lambda item: item.get("created_at") or 0, reverse=True)
return overview