diff --git a/core/main_terminal_parts/tools_definition.py b/core/main_terminal_parts/tools_definition.py index ddb504a..b5de149 100644 --- a/core/main_terminal_parts/tools_definition.py +++ b/core/main_terminal_parts/tools_definition.py @@ -700,7 +700,7 @@ class MainTerminalToolsDefinitionMixin: }, "deliverables_dir": { "type": "string", - "description": "交付文件夹的相对路径(相对于项目根目录)。子智能体会将所有结果文件放在此目录。\n\n留空则使用默认路径:sub_agent_results/agent_{agent_id}\n\n示例:'docs/api'、'reports/performance'、'tests/generated'" + "description": "交付文件夹的相对路径(相对于项目根目录)。子智能体会将所有结果文件放在此目录。\n\n要求:必须是不存在的新目录;若目录不存在会自动创建;若目录已存在(无论是否为空)将报错。\n\n留空则使用默认路径:sub_agent_results/agent_{agent_id}\n\n示例:'docs/api'、'reports/performance'、'tests/generated'" }, "run_in_background": { "type": "boolean", @@ -709,9 +709,14 @@ class MainTerminalToolsDefinitionMixin: "timeout_seconds": { "type": "integer", "description": "超时时间(秒),范围 60-3600。超时后子智能体会被强制终止,已生成的部分结果会保留。默认 600 秒(10分钟)。" + }, + "thinking_mode": { + "type": "string", + "enum": ["fast", "thinking"], + "description": "子智能体思考模式,根据任务复杂度选择:\n\nfast(快速模式)- 适合简单明确的任务:\n- 网络信息搜集和整理\n- 批量文件读取和简单处理\n- 执行已知的命令序列\n- 生成简单的文档或报告\n- 数据格式转换\n\nthinking(思考模式)- 适合复杂任务:\n- 代码架构分析和重构设计\n- 复杂算法实现和优化\n- 多步骤问题诊断和调试\n- 技术方案选型和对比\n- 需要深度推理的代码审查\n\n不填则使用默认模式。" } }), - "required": ["agent_id", "summary", "task", "deliverables_dir"] + "required": ["agent_id", "summary", "task", "deliverables_dir", "thinking_mode"] } } }, diff --git a/core/main_terminal_parts/tools_execution.py b/core/main_terminal_parts/tools_execution.py index 59f3d65..bf74dde 100644 --- a/core/main_terminal_parts/tools_execution.py +++ b/core/main_terminal_parts/tools_execution.py @@ -628,6 +628,7 @@ class MainTerminalToolsExecutionMixin: deliverables_dir=arguments.get("deliverables_dir", ""), run_in_background=arguments.get("run_in_background", False), timeout_seconds=arguments.get("timeout_seconds"), + thinking_mode=arguments.get("thinking_mode"), conversation_id=self.context_manager.current_conversation_id ) diff --git a/easyagent/models.json b/easyagent/models.json index 1afd5c7..a26599c 100644 --- a/easyagent/models.json +++ b/easyagent/models.json @@ -3,9 +3,9 @@ "default_model": "kimi-k2.5", "models": [ { - "url": "https://api.moonshot.cn/v1", + "url": "https://coding.dashscope.aliyuncs.com/v1", "name": "kimi-k2.5", - "apikey": "sk-xW0xjfQM6Mp9ZCWMLlnHiRJcpEOIZPTkXcN0dQ15xpZSuw2y", + "apikey": "sk-sp-0967ae3d7be84839b4c532d1ce72d6f6", "modes": "快速,思考", "multimodal": "图片,视频", "max_output": 32000, diff --git a/easyagent/src/batch/index.js b/easyagent/src/batch/index.js index a14a603..49ad4c2 100644 --- a/easyagent/src/batch/index.js +++ b/easyagent/src/batch/index.js @@ -24,6 +24,7 @@ function parseArgs() { statsFile: null, agentId: null, modelKey: null, + thinkingMode: null, timeout: 600, }; @@ -43,6 +44,8 @@ function parseArgs() { config.agentId = args[++i]; } else if (arg === '--model-key' && i + 1 < args.length) { config.modelKey = args[++i]; + } else if (arg === '--thinking-mode' && i + 1 < args.length) { + config.thinkingMode = String(args[++i] || '').trim().toLowerCase(); } else if (arg === '--timeout' && i + 1 < args.length) { config.timeout = parseInt(args[++i], 10); } @@ -155,9 +158,11 @@ async function main() { runtime_start: Date.now(), runtime_seconds: 0, files_read: 0, + edit_files: 0, searches: 0, web_pages: 0, commands: 0, + api_calls: 0, token_usage: { prompt: 0, completion: 0, total: 0 }, }; @@ -169,6 +174,8 @@ async function main() { try { while (true) { turnCount++; + stats.api_calls += 1; + stats.turn_count = turnCount; // 检查超时 const elapsed = Date.now() - startTime; @@ -210,6 +217,7 @@ async function main() { // 调用 API let assistantMessage = { role: 'assistant', content: '', tool_calls: [] }; + let reasoningBuffer = ''; let currentToolCall = null; let usage = null; @@ -219,11 +227,12 @@ async function main() { modelKey, messages, tools, - thinkingMode: false, + thinkingMode: config.thinkingMode === 'thinking', currentContextTokens: 0, abortSignal: null, })) { - const delta = chunk.choices?.[0]?.delta; + const choice = chunk.choices?.[0]; + const delta = choice?.delta; if (!delta) continue; // 处理内容 @@ -231,6 +240,31 @@ async function main() { assistantMessage.content += delta.content; } + // 处理 reasoning_content(兼容 reasoning_details) + if (delta.reasoning_content || delta.reasoning_details || choice?.reasoning_details) { + let rc = ''; + if (delta.reasoning_content) { + rc = delta.reasoning_content; + } else if (delta.reasoning_details) { + if (Array.isArray(delta.reasoning_details)) { + rc = delta.reasoning_details.map((d) => d.text || '').join(''); + } else if (typeof delta.reasoning_details === 'string') { + rc = delta.reasoning_details; + } else if (delta.reasoning_details && typeof delta.reasoning_details.text === 'string') { + rc = delta.reasoning_details.text; + } + } else if (choice?.reasoning_details) { + if (Array.isArray(choice.reasoning_details)) { + rc = choice.reasoning_details.map((d) => d.text || '').join(''); + } else if (typeof choice.reasoning_details === 'string') { + rc = choice.reasoning_details; + } else if (choice.reasoning_details && typeof choice.reasoning_details.text === 'string') { + rc = choice.reasoning_details.text; + } + } + if (rc) reasoningBuffer += rc; + } + // 处理工具调用 if (delta.tool_calls) { for (const tc of delta.tool_calls) { @@ -276,8 +310,16 @@ async function main() { applyUsage(stats.token_usage, usage); } - // 添加助手消息到历史 - messages.push(assistantMessage); + // 添加助手消息到历史(reasoning_content 放在 content 前) + const finalAssistantMessage = reasoningBuffer + ? { + role: 'assistant', + reasoning_content: reasoningBuffer, + content: assistantMessage.content, + tool_calls: assistantMessage.tool_calls, + } + : assistantMessage; + messages.push(finalAssistantMessage); // 如果没有工具调用,检查是否忘记调用 finish_task if (!assistantMessage.tool_calls || assistantMessage.tool_calls.length === 0) { @@ -292,6 +334,7 @@ async function main() { // 执行工具调用 for (const toolCall of assistantMessage.tool_calls) { const toolName = toolCall.function.name; + stats.tool_calls = (stats.tool_calls || 0) + 1; // 检查是否是 finish_task if (toolName === 'finish_task') { @@ -372,6 +415,7 @@ async function main() { // 更新统计 if (toolName === 'read_file') stats.files_read++; + else if (toolName === 'edit_file') stats.edit_files++; else if (toolName === 'search_workspace') stats.searches++; else if (toolName === 'web_search' || toolName === 'extract_webpage') stats.web_pages++; else if (toolName === 'run_command') stats.commands++; diff --git a/easyagent/src/model/model_profiles.js b/easyagent/src/model/model_profiles.js index f37c452..adc58a4 100644 --- a/easyagent/src/model/model_profiles.js +++ b/easyagent/src/model/model_profiles.js @@ -17,8 +17,8 @@ function buildProfile(model) { supports_thinking: supportsThinking, thinking_params: supportsThinking ? { - fast: { thinking: { type: 'disabled' } }, - thinking: { thinking: { type: 'enabled' } }, + fast: { thinking: { type: 'disabled' }, enable_thinking: false }, + thinking: { thinking: { type: 'enabled' }, enable_thinking: true }, } : { fast: {}, thinking: {} }, }; diff --git a/modules/sub_agent_manager.py b/modules/sub_agent_manager.py index 13546b9..c31c762 100644 --- a/modules/sub_agent_manager.py +++ b/modules/sub_agent_manager.py @@ -62,12 +62,18 @@ class SubAgentManager: conversation_id: Optional[str] = None, run_in_background: bool = False, model_key: Optional[str] = None, + thinking_mode: Optional[str] = None, ) -> Dict: """创建子智能体任务并启动子进程。""" validation_error = self._validate_create_params(agent_id, summary, task, deliverables_dir) if validation_error: return {"success": False, "error": validation_error} + if not thinking_mode: + return {"success": False, "error": "缺少 thinking_mode 参数,必须指定 fast 或 thinking"} + if thinking_mode not in {"fast", "thinking"}: + return {"success": False, "error": "thinking_mode 仅支持 fast 或 thinking"} + if not conversation_id: return {"success": False, "error": "缺少对话ID,无法创建子智能体"} @@ -126,6 +132,8 @@ class SubAgentManager: ] if model_key: cmd.extend(["--model-key", model_key]) + if thinking_mode: + cmd.extend(["--thinking-mode", thinking_mode]) try: process = subprocess.Popen( @@ -147,6 +155,7 @@ class SubAgentManager: "deliverables_dir": str(deliverables_path), "subagent_dir": str(subagent_dir), "timeout_seconds": timeout_seconds, + "thinking_mode": thinking_mode, "created_at": time.time(), "conversation_id": conversation_id, "run_in_background": run_in_background, @@ -231,6 +240,8 @@ class SubAgentManager: return {"success": False, "error": f"终止进程失败: {exc}"} task["status"] = "terminated" + task["updated_at"] = time.time() + task["notified"] = True task["final_result"] = { "success": False, "status": "terminated", @@ -300,6 +311,7 @@ class SubAgentManager: success = output.get("success", False) summary = output.get("summary", "") stats = output.get("stats", {}) + stats_summary = self._build_stats_summary(stats) if output.get("timeout"): status = "timeout" @@ -320,11 +332,24 @@ class SubAgentManager: deliverables_dir = task.get("deliverables_dir") if status == "completed": - system_message = f"✅ 子智能体{agent_id} 任务摘要:{task_summary} 已完成。\n\n{summary}\n\n交付目录:{deliverables_dir}" + system_message = self._compose_sub_agent_message( + prefix=f"✅ 子智能体{agent_id} 任务摘要:{task_summary} 已完成。", + stats_summary=stats_summary, + summary=summary, + deliverables_dir=deliverables_dir, + ) elif status == "timeout": - system_message = f"⏱️ 子智能体{agent_id} 任务摘要:{task_summary} 超时未完成。\n\n{summary}" + system_message = self._compose_sub_agent_message( + prefix=f"⏱️ 子智能体{agent_id} 任务摘要:{task_summary} 超时未完成。", + stats_summary=stats_summary, + summary=summary, + ) else: - system_message = f"❌ 子智能体{agent_id} 任务摘要:{task_summary} 执行失败。\n\n{summary}" + system_message = self._compose_sub_agent_message( + prefix=f"❌ 子智能体{agent_id} 任务摘要:{task_summary} 执行失败。", + stats_summary=stats_summary, + summary=summary, + ) result = { "success": success, @@ -334,6 +359,7 @@ class SubAgentManager: "message": summary, "deliverables_dir": deliverables_dir, "stats": stats, + "stats_summary": stats_summary, "system_message": system_message, } task["final_result"] = result @@ -359,13 +385,29 @@ class SubAgentManager: task["status"] = "timeout" task["updated_at"] = time.time() + stats = {} + stats_file = Path(task.get("stats_file", "")) + if stats_file.exists(): + try: + stats = json.loads(stats_file.read_text(encoding="utf-8")) + except Exception: + stats = {} + stats_summary = self._build_stats_summary(stats) + system_message = self._compose_sub_agent_message( + prefix=f"⏱️ 子智能体{task.get('agent_id')} 任务摘要:{task.get('summary')} 超时未完成。", + stats_summary=stats_summary, + summary="等待超时,子智能体已被终止。", + ) + result = { "success": False, "status": "timeout", "task_id": task_id, "agent_id": task.get("agent_id"), "message": "等待超时,子智能体已被终止。", - "system_message": f"⏱️ 子智能体{task.get('agent_id')} 任务摘要:{task.get('summary')} 超时未完成。", + "stats": stats, + "stats_summary": stats_summary, + "system_message": system_message, } task["final_result"] = result self._save_state() @@ -466,6 +508,7 @@ class SubAgentManager: # 注意事项 +1. **结果传达**:你在运行期间产生的记录与输出不会被直接传递给主智能体。务必把所有需要传达的信息写进 `finish_task` 工具的 `summary` 字段,以及交付目录中的落盘文件里。 1. **不要无限循环**:如果任务无法完成,说明原因并提交报告 2. **不要超出范围**:只操作任务描述中指定的文件/目录 3. **不要等待输入**:你是自主运行的,不会收到用户的进一步指令 @@ -489,6 +532,8 @@ class SubAgentManager: if not str(deliverables_path).startswith(str(self.project_path)): raise ValueError("交付目录必须位于项目目录内") + if deliverables_path.exists(): + raise ValueError("交付目录必须为不存在的新目录") deliverables_path.mkdir(parents=True, exist_ok=True) return deliverables_path def _load_state(self): @@ -666,6 +711,7 @@ class SubAgentManager: stats = json.loads(stats_file.read_text(encoding="utf-8")) except Exception: pass + stats_summary = self._build_stats_summary(stats) results.append({ "agent_id": agent_id, @@ -677,6 +723,7 @@ class SubAgentManager: "updated_at": task.get("updated_at"), "deliverables_dir": task.get("deliverables_dir"), "stats": stats, + "stats_summary": stats_summary, "final_result": task.get("final_result"), }) @@ -859,9 +906,57 @@ class SubAgentManager: return f"{prefix} 状态:{status}。" + (extra if extra else "") + @staticmethod + def _coerce_stat_int(value: Any) -> int: + try: + return max(0, int(value)) + except (TypeError, ValueError): + return 0 + + def _build_stats_summary(self, stats: Optional[Dict[str, Any]]) -> str: + if not isinstance(stats, dict): + stats = {} + api_calls = self._coerce_stat_int( + stats.get("api_calls") + or stats.get("api_call_count") + or stats.get("turn_count") + ) + files_read = self._coerce_stat_int(stats.get("files_read")) + edit_files = self._coerce_stat_int(stats.get("edit_files")) + searches = self._coerce_stat_int(stats.get("searches")) + web_pages = self._coerce_stat_int(stats.get("web_pages")) + commands = self._coerce_stat_int(stats.get("commands")) + lines = [ + f"调用了{api_calls}次", + f"阅读了{files_read}次文件", + f"编辑了{edit_files}次文件", + f"搜索了{searches}次内容", + f"查看了{web_pages}个网页", + f"运行了{commands}个指令", + ] + return "\n".join(lines) + + def _compose_sub_agent_message( + self, + *, + prefix: str, + stats_summary: str, + summary: str, + deliverables_dir: Optional[str] = None, + ) -> str: + parts = [prefix] + if stats_summary: + parts.append(stats_summary) + if summary: + parts.append(summary) + if deliverables_dir: + parts.append(f"交付目录:{deliverables_dir}") + return "\n\n".join(parts) + def get_overview(self, conversation_id: Optional[str] = None) -> List[Dict[str, Any]]: """返回子智能体任务概览,用于前端展示。""" overview: List[Dict[str, Any]] = [] + state_changed = False for task_id, task in self.tasks.items(): if conversation_id and task.get("conversation_id") != conversation_id: continue @@ -894,6 +989,16 @@ class SubAgentManager: if status_result.get("status") in TERMINAL_STATUSES: task["status"] = status_result["status"] task["final_result"] = status_result + state_changed = True + else: + # 进程句柄丢失(重启后常见),尝试直接检查输出文件 + logger.debug("[SubAgentManager] 进程句柄缺失,尝试读取输出文件: %s", task_id) + status_result = self._check_task_status(task) + snapshot["status"] = status_result.get("status", snapshot["status"]) + if status_result.get("status") in TERMINAL_STATUSES: + task["status"] = status_result["status"] + task["final_result"] = status_result + state_changed = True if snapshot["status"] in TERMINAL_STATUSES or snapshot["status"] == "terminated": # 已结束的任务带上最终结果/系统消息,方便前端展示 @@ -904,4 +1009,6 @@ class SubAgentManager: overview.append(snapshot) overview.sort(key=lambda item: item.get("created_at") or 0, reverse=True) + if state_changed: + self._save_state() return overview diff --git a/server/chat_flow_task_main.py b/server/chat_flow_task_main.py index 1888378..ab25f8b 100644 --- a/server/chat_flow_task_main.py +++ b/server/chat_flow_task_main.py @@ -42,6 +42,7 @@ from modules.personalization_manager import ( from modules.upload_security import UploadSecurityError from modules.user_manager import UserWorkspace from modules.usage_tracker import QUOTA_DEFAULTS +from modules.sub_agent_manager import TERMINAL_STATUSES from core.web_terminal import WebTerminal from utils.tool_result_formatter import format_tool_result_for_context from utils.conversation_manager import ConversationManager @@ -137,6 +138,8 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, if not manager: debug_log("[SubAgent] poll_sub_agent_completion: manager 不存在") return + if not hasattr(web_terminal, "_announced_sub_agent_tasks"): + web_terminal._announced_sub_agent_tasks = set() max_wait_time = 3600 # 最多等待1小时 start_wait = time.time() @@ -152,7 +155,6 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, debug_log(f"[SubAgent] 发送事件失败: {event_type}, 错误: {e}") while (time.time() - start_wait) < max_wait_time: - await asyncio.sleep(5) debug_log(f"[SubAgent] 轮询检查...") # 检查停止标志 @@ -172,6 +174,18 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, result_summary = update.get("result_summary") or update.get("message", "") deliverables_dir = update.get("deliverables_dir", "") status = update.get("status") + task_id = update.get("task_id") + task_info = manager.tasks.get(task_id) if task_id else None + task_conv_id = task_info.get("conversation_id") if isinstance(task_info, dict) else None + if task_conv_id and task_conv_id != conversation_id: + debug_log(f"[SubAgent] 跳过非当前对话任务: task={task_id} conv={task_conv_id} current={conversation_id}") + continue + if task_id and task_info is None: + debug_log(f"[SubAgent] 找不到任务详情,跳过: task={task_id}") + continue + if status == "terminated" or (isinstance(task_info, dict) and task_info.get("notified")): + debug_log(f"[SubAgent] 跳过已终止/已通知任务: task={task_id} status={status}") + continue debug_log(f"[SubAgent] 子智能体{agent_id}完成,状态: {status}") @@ -186,36 +200,62 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, debug_log(f"[SubAgent] 消息内容: {user_message[:100]}...") try: + if task_id: + web_terminal._announced_sub_agent_tasks.add(task_id) + if isinstance(task_info, dict): + task_info["notified"] = True + task_info["updated_at"] = time.time() + try: + manager._save_state() + except Exception as exc: + debug_log(f"[SubAgent] 保存通知状态失败: {exc}") sender('user_message', { 'message': user_message, 'conversation_id': conversation_id }) - # 直接在当前事件循环中处理,避免嵌套事件循环 - entry = get_stop_flag(client_sid, username) - if not isinstance(entry, dict): - entry = {'stop': False, 'task': None, 'terminal': None} - entry['stop'] = False - task = asyncio.create_task(handle_task_with_sender( - terminal=web_terminal, - workspace=workspace, - message=user_message, - images=[], - sender=sender, - client_sid=client_sid, - username=username, - videos=[] - )) - entry['task'] = task - entry['terminal'] = web_terminal - set_stop_flag(client_sid, username, entry) - await task - debug_log(f"[SubAgent] process_message_task 调用成功") + # 注册为后台任务,确保刷新后可恢复轮询 + from .tasks import task_manager + workspace_id = getattr(workspace, "workspace_id", None) or "default" + session_data = { + "username": username, + "role": getattr(web_terminal, "user_role", "user"), + "is_api_user": getattr(web_terminal, "user_role", "") == "api", + "workspace_id": workspace_id, + "run_mode": getattr(web_terminal, "run_mode", None), + "thinking_mode": getattr(web_terminal, "thinking_mode", None), + "model_key": getattr(web_terminal, "model_key", None), + } + rec = task_manager.create_chat_task( + username, + workspace_id, + user_message, + [], + conversation_id, + model_key=session_data.get("model_key"), + thinking_mode=session_data.get("thinking_mode"), + run_mode=session_data.get("run_mode"), + session_data=session_data, + ) + debug_log(f"[SubAgent] 已创建后台任务: task_id={rec.task_id}") except Exception as e: - debug_log(f"[SubAgent] process_message_task 失败: {e}") - import traceback - debug_log(f"[SubAgent] 错误堆栈: {traceback.format_exc()}") - finally: - clear_stop_flag(client_sid, username) + debug_log(f"[SubAgent] 创建后台任务失败,回退直接执行: {e}") + try: + task = asyncio.create_task(handle_task_with_sender( + terminal=web_terminal, + workspace=workspace, + message=user_message, + images=[], + sender=sender, + client_sid=client_sid, + username=username, + videos=[] + )) + await task + debug_log(f"[SubAgent] process_message_task 调用成功") + except Exception as inner_exc: + debug_log(f"[SubAgent] process_message_task 失败: {inner_exc}") + import traceback + debug_log(f"[SubAgent] 错误堆栈: {traceback.format_exc()}") return # 只处理第一个完成的子智能体 @@ -234,9 +274,10 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, # 若状态已提前被更新为终态(poll_updates 返回空),补发完成提示 completed_tasks = [ task for task in manager.tasks.values() - if task.get("status") in {"completed", "failed", "timeout", "terminated"} + if task.get("status") in {"completed", "failed", "timeout"} and task.get("run_in_background") and task.get("conversation_id") == conversation_id + and not task.get("notified") ] if completed_tasks: completed_tasks.sort( @@ -264,36 +305,65 @@ async def poll_sub_agent_completion(*, web_terminal, workspace, conversation_id, 交付目录:{deliverables_dir}""" try: + task_id = task.get("task_id") + if task_id: + web_terminal._announced_sub_agent_tasks.add(task_id) + if isinstance(task, dict): + task["notified"] = True + task["updated_at"] = time.time() + try: + manager._save_state() + except Exception as exc: + debug_log(f"[SubAgent] 保存通知状态失败: {exc}") sender('user_message', { 'message': user_message, 'conversation_id': conversation_id }) - entry = get_stop_flag(client_sid, username) - if not isinstance(entry, dict): - entry = {'stop': False, 'task': None, 'terminal': None} - entry['stop'] = False - task_handle = asyncio.create_task(handle_task_with_sender( - terminal=web_terminal, - workspace=workspace, - message=user_message, - images=[], - sender=sender, - client_sid=client_sid, - username=username, - videos=[] - )) - entry['task'] = task_handle - entry['terminal'] = web_terminal - set_stop_flag(client_sid, username, entry) - await task_handle + from .tasks import task_manager + workspace_id = getattr(workspace, "workspace_id", None) or "default" + session_data = { + "username": username, + "role": getattr(web_terminal, "user_role", "user"), + "is_api_user": getattr(web_terminal, "user_role", "") == "api", + "workspace_id": workspace_id, + "run_mode": getattr(web_terminal, "run_mode", None), + "thinking_mode": getattr(web_terminal, "thinking_mode", None), + "model_key": getattr(web_terminal, "model_key", None), + } + rec = task_manager.create_chat_task( + username, + workspace_id, + user_message, + [], + conversation_id, + model_key=session_data.get("model_key"), + thinking_mode=session_data.get("thinking_mode"), + run_mode=session_data.get("run_mode"), + session_data=session_data, + ) + debug_log(f"[SubAgent] 补发通知创建后台任务: task_id={rec.task_id}") except Exception as e: - debug_log(f"[SubAgent] 补发完成提示失败: {e}") - import traceback - debug_log(f"[SubAgent] 错误堆栈: {traceback.format_exc()}") - finally: - clear_stop_flag(client_sid, username) + debug_log(f"[SubAgent] 补发通知创建后台任务失败,回退直接执行: {e}") + try: + task_handle = asyncio.create_task(handle_task_with_sender( + terminal=web_terminal, + workspace=workspace, + message=user_message, + images=[], + sender=sender, + client_sid=client_sid, + username=username, + videos=[] + )) + await task_handle + except Exception as inner_exc: + debug_log(f"[SubAgent] 补发完成提示失败: {inner_exc}") + import traceback + debug_log(f"[SubAgent] 错误堆栈: {traceback.format_exc()}") break + await asyncio.sleep(5) + debug_log("[SubAgent] 后台轮询结束") async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspace, message, images, sender, client_sid, username: str, videos=None): @@ -928,27 +998,37 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac debug_log(f" 累积响应: {len(accumulated_response)} 字符") debug_log(f"{'='*40}\n") - # 检查是否有后台运行的子智能体 + # 检查是否有后台运行的子智能体或待通知的完成任务 manager = getattr(web_terminal, "sub_agent_manager", None) has_running_sub_agents = False if manager: + if not hasattr(web_terminal, "_announced_sub_agent_tasks"): + web_terminal._announced_sub_agent_tasks = set() running_tasks = [ task for task in manager.tasks.values() - if task.get("status") not in {"completed", "failed", "timeout", "terminated"} + if task.get("status") not in TERMINAL_STATUSES.union({"terminated"}) and task.get("run_in_background") and task.get("conversation_id") == conversation_id ] + pending_notice_tasks = [ + task for task in manager.tasks.values() + if task.get("status") in TERMINAL_STATUSES.union({"terminated"}) + and task.get("run_in_background") + and task.get("conversation_id") == conversation_id + and task.get("task_id") not in web_terminal._announced_sub_agent_tasks + ] - if running_tasks: + if running_tasks or pending_notice_tasks: has_running_sub_agents = True - debug_log(f"[SubAgent] 检测到 {len(running_tasks)} 个后台子智能体运行中,通知前端等待") - # 先通知前端:有子智能体在运行,保持等待状态 + notify_tasks = running_tasks + pending_notice_tasks + debug_log(f"[SubAgent] 后台子智能体等待: running={len(running_tasks)} pending_notice={len(pending_notice_tasks)}") + # 先通知前端:有子智能体在运行/待通知,保持等待状态 sender('sub_agent_waiting', { - 'count': len(running_tasks), - 'tasks': [{'agent_id': t.get('agent_id'), 'summary': t.get('summary')} for t in running_tasks] + 'count': len(notify_tasks), + 'tasks': [{'agent_id': t.get('agent_id'), 'summary': t.get('summary')} for t in notify_tasks] }) - # 启动后台任务来轮询子智能体完成 + # 启动后台任务来轮询/补发子智能体完成 def run_poll(): import asyncio loop = asyncio.new_event_loop() diff --git a/server/chat_flow_task_support.py b/server/chat_flow_task_support.py index f176222..7c0c27b 100644 --- a/server/chat_flow_task_support.py +++ b/server/chat_flow_task_support.py @@ -24,6 +24,15 @@ async def process_sub_agent_updates(*, messages: List[Dict], inline: bool = Fals for update in updates: task_id = update.get("task_id") + task_info = manager.tasks.get(task_id) if task_id else None + current_conv_id = getattr(getattr(web_terminal, "context_manager", None), "current_conversation_id", None) + task_conv_id = task_info.get("conversation_id") if isinstance(task_info, dict) else None + if task_conv_id and current_conv_id and task_conv_id != current_conv_id: + debug_log(f"[SubAgent] 跳过非当前对话任务: task={task_id} conv={task_conv_id} current={current_conv_id}") + continue + if task_id and task_info is None: + debug_log(f"[SubAgent] 找不到任务详情,跳过: task={task_id}") + continue # 检查是否已经通知过这个任务 if task_id and task_id in web_terminal._announced_sub_agent_tasks: @@ -36,9 +45,23 @@ async def process_sub_agent_updates(*, messages: List[Dict], inline: bool = Fals debug_log(f"[SubAgent] update task={task_id} inline={inline} msg={message}") + # 记录到对话历史(用于后续 build_messages 转换为 user 消息) + if hasattr(web_terminal, "_record_sub_agent_message"): + try: + web_terminal._record_sub_agent_message(message, task_id, inline=inline) + except Exception as exc: + debug_log(f"[SubAgent] 记录子智能体消息失败: {exc}") + # 标记任务已通知 if task_id: web_terminal._announced_sub_agent_tasks.add(task_id) + if isinstance(task_info, dict): + task_info["notified"] = True + task_info["updated_at"] = time.time() + try: + manager._save_state() + except Exception as exc: + debug_log(f"[SubAgent] 保存通知状态失败: {exc}") debug_log(f"[SubAgent] 计算插入位置") @@ -49,12 +72,13 @@ async def process_sub_agent_updates(*, messages: List[Dict], inline: bool = Fals insert_index = idx + 1 break + # 直接插入 user 消息,确保下一轮调用能看到子智能体完成通知 messages.insert(insert_index, { - "role": "system", + "role": "user", "content": message, "metadata": {"sub_agent_notice": True, "inline": inline, "task_id": task_id} }) - debug_log(f"[SubAgent] 插入系统消息位置: {insert_index}") + debug_log(f"[SubAgent] 插入子智能体通知位置: {insert_index}") sender('system_message', { 'content': message, 'inline': inline diff --git a/server/chat_flow_tool_loop.py b/server/chat_flow_tool_loop.py index 7db104f..bb53d8a 100644 --- a/server/chat_flow_tool_loop.py +++ b/server/chat_flow_tool_loop.py @@ -461,10 +461,11 @@ async def execute_tool_calls(*, web_terminal, tool_calls, sender, messages, clie videos=tool_videos ) debug_log(f"💾 增量保存:工具结果 {function_name}") - system_message = result_data.get("system_message") if isinstance(result_data, dict) else None - if system_message: - web_terminal._record_sub_agent_message(system_message, result_data.get("task_id"), inline=False) - maybe_mark_failure_from_message(web_terminal, system_message) + if function_name != "wait_sub_agent": + system_message = result_data.get("system_message") if isinstance(result_data, dict) else None + if system_message: + web_terminal._record_sub_agent_message(system_message, result_data.get("task_id"), inline=False) + maybe_mark_failure_from_message(web_terminal, system_message) # 添加到消息历史(用于API继续对话) messages.append({ @@ -485,4 +486,3 @@ async def execute_tool_calls(*, web_terminal, tool_calls, sender, messages, clie return {"stopped": False, "last_tool_call_time": last_tool_call_time} - diff --git a/server/conversation.py b/server/conversation.py index 8886087..d801dd2 100644 --- a/server/conversation.py +++ b/server/conversation.py @@ -44,6 +44,7 @@ from modules.personalization_manager import ( from modules.upload_security import UploadSecurityError from modules.user_manager import UserWorkspace from modules.usage_tracker import QUOTA_DEFAULTS +from modules.sub_agent_manager import TERMINAL_STATUSES from core.web_terminal import WebTerminal from utils.tool_result_formatter import format_tool_result_for_context from utils.conversation_manager import ConversationManager @@ -83,6 +84,30 @@ from .conversation_stats import ( conversation_bp = Blueprint('conversation', __name__) +def _terminate_running_sub_agents(terminal: WebTerminal, reason: str = "") -> int: + """切换/新建对话时,强制终止当前对话仍在运行的子智能体,并记录系统消息。""" + manager = getattr(terminal, "sub_agent_manager", None) + if not manager: + return 0 + current_conv_id = getattr(getattr(terminal, "context_manager", None), "current_conversation_id", None) + if not current_conv_id: + return 0 + running_tasks = [ + task for task in manager.tasks.values() + if task.get("status") not in TERMINAL_STATUSES.union({"terminated"}) + and task.get("run_in_background") + and task.get("conversation_id") == current_conv_id + ] + if not running_tasks: + return 0 + stopped_count = 0 + for task in running_tasks: + task_id = task.get("task_id") + manager.terminate_sub_agent(task_id=task_id) + stopped_count += 1 + return stopped_count + + # === 背景生成对话标题(从 app_legacy 拆分) === @conversation_bp.route('/api/conversations', methods=['GET']) @api_login_required @@ -133,6 +158,7 @@ def create_conversation(terminal: WebTerminal, workspace: UserWorkspace, usernam thinking_mode = data.get('thinking_mode') if preserve_mode and 'thinking_mode' in data else None run_mode = data.get('mode') if preserve_mode and 'mode' in data else None + _terminate_running_sub_agents(terminal, reason="用户创建新对话") result = terminal.create_new_conversation(thinking_mode=thinking_mode, run_mode=run_mode) if result["success"]: @@ -207,6 +233,9 @@ def get_conversation_info(terminal: WebTerminal, workspace: UserWorkspace, usern def load_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """加载特定对话""" try: + current_id = getattr(getattr(terminal, "context_manager", None), "current_conversation_id", None) + if current_id and current_id != conversation_id: + _terminate_running_sub_agents(terminal, reason="用户切换对话") result = terminal.load_conversation(conversation_id) if result["success"]: @@ -425,6 +454,61 @@ def list_sub_agents(terminal: WebTerminal, workspace: UserWorkspace, username: s try: conversation_id = terminal.context_manager.current_conversation_id data = manager.get_overview(conversation_id=conversation_id) + debug_log("[SubAgent] /api/sub_agents overview", { + "conversation_id": conversation_id, + "count": len(data), + "tasks": [ + { + "task_id": item.get("task_id"), + "status": item.get("status"), + "run_in_background": item.get("run_in_background"), + "conversation_id": item.get("conversation_id") + } for item in data + ] + }) + if not hasattr(terminal, "_announced_sub_agent_tasks"): + terminal._announced_sub_agent_tasks = set() + announced = terminal._announced_sub_agent_tasks + notified_from_history = set() + try: + history = getattr(terminal.context_manager, "conversation_history", []) or [] + for msg in history: + meta = msg.get("metadata") or {} + task_id = meta.get("task_id") + if meta.get("sub_agent_notice") and task_id: + notified_from_history.add(task_id) + except Exception: + notified_from_history = set() + for item in data: + task_id = item.get("task_id") + raw_task = manager.tasks.get(task_id) if task_id else None + run_in_background = bool(raw_task.get("run_in_background")) if isinstance(raw_task, dict) else False + item["run_in_background"] = run_in_background + status = item.get("status") + notified_flag = bool(raw_task.get("notified")) if isinstance(raw_task, dict) else False + already_notified = ( + (task_id in announced) or + (task_id in notified_from_history) or + notified_flag + ) + notice_pending = ( + run_in_background + and task_id + and not already_notified + and (status in TERMINAL_STATUSES or status == "terminated") + ) + item["notice_pending"] = notice_pending + debug_log("[SubAgent] /api/sub_agents notice_pending computed", { + "conversation_id": conversation_id, + "tasks": [ + { + "task_id": item.get("task_id"), + "status": item.get("status"), + "run_in_background": item.get("run_in_background"), + "notice_pending": item.get("notice_pending") + } for item in data + ] + }) return jsonify({"success": True, "data": data}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 diff --git a/server/tasks.py b/server/tasks.py index 3d8f074..5dc583d 100644 --- a/server/tasks.py +++ b/server/tasks.py @@ -87,6 +87,7 @@ class TaskManager: thinking_mode: Optional[bool] = None, run_mode: Optional[str] = None, max_iterations: Optional[int] = None, + session_data: Optional[Dict[str, Any]] = None, ) -> TaskRecord: if run_mode: normalized = str(run_mode).lower() @@ -100,18 +101,21 @@ class TaskManager: task_id = str(uuid.uuid4()) record = TaskRecord(task_id, username, workspace_id, message, conversation_id, model_key, thinking_mode, run_mode, max_iterations) # 记录当前 session 快照,便于后台线程内使用 - try: - record.session_data = { - "username": session.get("username"), - "role": session.get("role"), - "is_api_user": session.get("is_api_user"), - "workspace_id": workspace_id, - "run_mode": session.get("run_mode"), - "thinking_mode": session.get("thinking_mode"), - "model_key": session.get("model_key"), - } - except Exception: - record.session_data = {} + if session_data is not None: + record.session_data = dict(session_data) + else: + try: + record.session_data = { + "username": session.get("username"), + "role": session.get("role"), + "is_api_user": session.get("is_api_user"), + "workspace_id": workspace_id, + "run_mode": session.get("run_mode"), + "thinking_mode": session.get("thinking_mode"), + "model_key": session.get("model_key"), + } + except Exception: + record.session_data = {} with self._lock: self._tasks[task_id] = record thread = threading.Thread(target=self._run_chat_task, args=(record, images), daemon=True) diff --git a/static/src/app/methods/taskPolling.ts b/static/src/app/methods/taskPolling.ts index 091c53b..c3fab53 100644 --- a/static/src/app/methods/taskPolling.ts +++ b/static/src/app/methods/taskPolling.ts @@ -634,7 +634,11 @@ export const taskPollingMethods = { // 如果已经在流式输出中,不重复恢复 if (this.streamingMessage || this.taskInProgress) { - debugLog('[TaskPolling] 任务已在进行中,跳过恢复'); + debugLog('[TaskPolling] 任务已在进行中,跳过恢复', { + streamingMessage: this.streamingMessage, + taskInProgress: this.taskInProgress, + currentConversationId: this.currentConversationId + }); return; } @@ -642,11 +646,18 @@ export const taskPollingMethods = { const runningTask = await taskStore.loadRunningTask(this.currentConversationId); if (!runningTask) { - debugLog('[TaskPolling] 没有运行中的任务'); + debugLog('[TaskPolling] 没有运行中的任务', { + currentConversationId: this.currentConversationId + }); + await this.restoreSubAgentWaitingState(); return; } - debugLog('[TaskPolling] 发现运行中的任务,开始恢复状态'); + debugLog('[TaskPolling] 发现运行中的任务,开始恢复状态', { + taskId: runningTask?.task_id, + status: runningTask?.status, + conversationId: runningTask?.conversation_id + }); // 检查历史是否已加载 const hasMessages = Array.isArray(this.messages) && this.messages.length > 0; @@ -901,4 +912,70 @@ export const taskPollingMethods = { console.error('[TaskPolling] 恢复任务状态失败:', error); } }, + + /** + * 恢复子智能体等待状态(页面刷新后调用) + */ + async restoreSubAgentWaitingState(retry = 0) { + try { + if (!this.currentConversationId) { + if (retry < 5) { + setTimeout(() => { + this.restoreSubAgentWaitingState(retry + 1); + }, 300); + } + return; + } + + const response = await fetch('/api/sub_agents'); + if (!response.ok) { + debugLog('[TaskPolling] 获取子智能体状态失败'); + return; + } + const result = await response.json(); + if (!result.success) { + debugLog('[TaskPolling] 子智能体状态响应无效'); + return; + } + + const tasks = Array.isArray(result.data) ? result.data : []; + debugLog('[TaskPolling] 子智能体状态响应', { + total: tasks.length, + currentConversationId: this.currentConversationId, + tasks: tasks.map((task: any) => ({ + task_id: task?.task_id, + status: task?.status, + run_in_background: task?.run_in_background, + notice_pending: task?.notice_pending, + conversation_id: task?.conversation_id + })) + }); + const terminalStatuses = new Set(['completed', 'failed', 'timeout', 'terminated']); + const relevant = tasks.filter((task: any) => { + if (task && task.conversation_id && task.conversation_id !== this.currentConversationId) { + return false; + } + return true; + }); + + const running = relevant.filter((task: any) => task?.run_in_background && !terminalStatuses.has(task?.status)); + const pendingNotice = relevant.filter((task: any) => task?.notice_pending); + + if (running.length || pendingNotice.length) { + debugLog('[TaskPolling] 恢复子智能体等待状态', { + running: running.length, + pendingNotice: pendingNotice.length, + runningTasks: running.map((task: any) => ({ task_id: task?.task_id, status: task?.status })), + pendingTasks: pendingNotice.map((task: any) => ({ task_id: task?.task_id, status: task?.status })) + }); + this.waitingForSubAgent = true; + this.taskInProgress = true; + this.streamingMessage = false; + this.stopRequested = false; + this.$forceUpdate(); + } + } catch (error) { + console.error('[TaskPolling] 恢复子智能体等待状态失败:', error); + } + }, }; diff --git a/utils/tool_result_formatter.py b/utils/tool_result_formatter.py index 570dea8..fed4dff 100644 --- a/utils/tool_result_formatter.py +++ b/utils/tool_result_formatter.py @@ -487,6 +487,33 @@ def _format_update_memory(result_data: Dict[str, Any]) -> str: return f"记忆已更新。" +def _format_sub_agent_stats(stats: Optional[Dict[str, Any]]) -> str: + if not isinstance(stats, dict): + return "" + + def _to_int(value: Any) -> int: + try: + return max(0, int(value)) + except (TypeError, ValueError): + return 0 + + api_calls = _to_int(stats.get("api_calls") or stats.get("api_call_count") or stats.get("turn_count")) + files_read = _to_int(stats.get("files_read")) + edit_files = _to_int(stats.get("edit_files")) + searches = _to_int(stats.get("searches")) + web_pages = _to_int(stats.get("web_pages")) + commands = _to_int(stats.get("commands")) + lines = [ + f"调用了{api_calls}次", + f"阅读了{files_read}次文件", + f"编辑了{edit_files}次文件", + f"搜索了{searches}次内容", + f"查看了{web_pages}个网页", + f"运行了{commands}个指令", + ] + return "\n".join(lines) + + def _format_create_sub_agent(result_data: Dict[str, Any]) -> str: if not result_data.get("success"): return _format_failure("create_sub_agent", result_data) @@ -497,20 +524,69 @@ def _format_create_sub_agent(result_data: Dict[str, Any]) -> str: ref_note = f",附带 {len(refs)} 份参考文件" if refs else "" deliver_dir = result_data.get("deliverables_dir") deliver_note = f",交付目录: {deliver_dir}" if deliver_dir else "" - return f"子智能体 #{agent_id} 已创建(task_id={task_id},状态 {status}{ref_note}{deliver_note})。" + header = f"子智能体 #{agent_id} 已创建(task_id={task_id},状态 {status}{ref_note}{deliver_note})。" + stats_text = _format_sub_agent_stats( + result_data.get("stats") or (result_data.get("final_result") or {}).get("stats") + ) + summary = result_data.get("message") or result_data.get("summary") + lines = [header] + if stats_text: + lines.append(stats_text) + if summary and status in {"completed", "failed", "timeout", "terminated"}: + lines.append(str(summary)) + return "\n".join(lines) def _format_wait_sub_agent(result_data: Dict[str, Any]) -> str: task_id = result_data.get("task_id") agent_id = result_data.get("agent_id") status = result_data.get("status") + stats_text = _format_sub_agent_stats(result_data.get("stats")) if result_data.get("success"): copied_path = result_data.get("copied_path") or result_data.get("deliverables_path") message = result_data.get("message") or "子智能体任务已完成。" deliver_note = f"交付已复制到 {copied_path}" if copied_path else "交付目录已生成" - return f"子智能体 #{agent_id}/{task_id} 完成:{message}({deliver_note})" + lines = [f"子智能体 #{agent_id}/{task_id} 完成"] + if stats_text: + lines.append(stats_text) + lines.append(message) + lines.append(deliver_note) + return "\n".join(lines) message = result_data.get("message") or result_data.get("error") or "子智能体任务失败" - return f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}: {message}" + lines = [f"⚠️ 子智能体 #{agent_id}/{task_id} 状态 {status}"] + if stats_text: + lines.append(stats_text) + lines.append(message) + return "\n".join(lines) + + +def _format_get_sub_agent_status(result_data: Dict[str, Any]) -> str: + if not result_data.get("success"): + return _format_failure("get_sub_agent_status", result_data) + results = result_data.get("results") or [] + if not results: + return "未找到子智能体状态。" + blocks = [] + for item in results: + agent_id = item.get("agent_id") + if not item.get("found"): + blocks.append(f"子智能体 #{agent_id} 未找到。") + continue + status = item.get("status") + summary = None + final_result = item.get("final_result") or {} + if isinstance(final_result, dict): + summary = final_result.get("message") or final_result.get("summary") + if not summary: + summary = item.get("summary") or "" + stats_text = _format_sub_agent_stats(item.get("stats")) + lines = [f"子智能体 #{agent_id} 状态: {status}"] + if stats_text: + lines.append(stats_text) + if summary: + lines.append(str(summary)) + blocks.append("\n".join(lines)) + return "\n\n".join(blocks) def _format_close_sub_agent(result_data: Dict[str, Any]) -> str: @@ -694,4 +770,5 @@ TOOL_FORMATTERS = { "create_sub_agent": _format_create_sub_agent, "wait_sub_agent": _format_wait_sub_agent, "close_sub_agent": _format_close_sub_agent, + "get_sub_agent_status": _format_get_sub_agent_status, }