@app.route('/api/conversations', methods=['GET']) @api_login_required @with_terminal def get_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话列表""" try: # 获取查询参数 limit = request.args.get('limit', 20, type=int) offset = request.args.get('offset', 0, type=int) # 限制参数范围 limit = max(1, min(limit, 100)) # 限制在1-100之间 offset = max(0, offset) result = terminal.get_conversations_list(limit=limit, offset=offset) if result["success"]: return jsonify({ "success": True, "data": result["data"] }) else: return jsonify({ "success": False, "error": result.get("error", "Unknown error"), "message": result.get("message", "获取对话列表失败") }), 500 except Exception as e: print(f"[API] 获取对话列表错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话列表时发生异常" }), 500 @app.route('/api/conversations', methods=['POST']) @api_login_required @with_terminal def create_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str): """创建新对话""" try: data = request.get_json() or {} # 前端现在期望“新建对话”回到用户配置的默认模型/模式, # 只有当客户端显式要求保留当前模式时才使用传入值。 preserve_mode = bool(data.get('preserve_mode')) thinking_mode = data.get('thinking_mode') if preserve_mode and 'thinking_mode' in data else None run_mode = data.get('mode') if preserve_mode and 'mode' in data else None result = terminal.create_new_conversation(thinking_mode=thinking_mode, run_mode=run_mode) if result["success"]: session['run_mode'] = terminal.run_mode session['thinking_mode'] = terminal.thinking_mode # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'created', 'conversation_id': result["conversation_id"] }, room=f"user_{username}") # 广播当前对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': result["conversation_id"], 'title': "新对话" }, room=f"user_{username}") return jsonify(result), 201 else: return jsonify(result), 500 except Exception as e: print(f"[API] 创建对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "创建对话时发生异常" }), 500 @app.route('/api/conversations/', methods=['GET']) @api_login_required @with_terminal def get_conversation_info(terminal: WebTerminal, workspace: UserWorkspace, username: str, conversation_id): """获取特定对话信息""" try: # 通过ConversationManager直接获取对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: # 提取关键信息,不返回完整消息内容(避免数据量过大) info = { "id": conversation_data["id"], "title": conversation_data["title"], "created_at": conversation_data["created_at"], "updated_at": conversation_data["updated_at"], "metadata": conversation_data["metadata"], "messages_count": len(conversation_data.get("messages", [])) } return jsonify({ "success": True, "data": info }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话信息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话信息时发生异常" }), 500 @app.route('/api/conversations//load', methods=['PUT']) @api_login_required @with_terminal def load_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """加载特定对话""" try: result = terminal.load_conversation(conversation_id) if result["success"]: # 广播对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': conversation_id, 'title': result.get("title", "未知对话"), 'messages_count': result.get("messages_count", 0) }, room=f"user_{username}") # 广播系统状态更新(因为当前对话改变了) status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") # 清理和重置相关UI状态 socketio.emit('conversation_loaded', { 'conversation_id': conversation_id, 'clear_ui': True # 提示前端清理当前UI状态 }, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 加载对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "加载对话时发生异常" }), 500 @app.route('/api/conversations/', methods=['DELETE']) @api_login_required @with_terminal def delete_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """删除特定对话""" try: # 检查是否是当前对话 is_current = (terminal.context_manager.current_conversation_id == conversation_id) result = terminal.delete_conversation(conversation_id) if result["success"]: # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'deleted', 'conversation_id': conversation_id }, room=f"user_{username}") # 如果删除的是当前对话,广播对话清空事件 if is_current: socketio.emit('conversation_changed', { 'conversation_id': None, 'title': None, 'cleared': True }, room=f"user_{username}") # 更新系统状态 status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 删除对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "删除对话时发生异常" }), 500 @app.route('/api/conversations/search', methods=['GET']) @api_login_required @with_terminal def search_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """搜索对话""" try: query = request.args.get('q', '').strip() limit = request.args.get('limit', 20, type=int) if not query: return jsonify({ "success": False, "error": "Missing query parameter", "message": "请提供搜索关键词" }), 400 # 限制参数范围 limit = max(1, min(limit, 50)) result = terminal.search_conversations(query, limit) return jsonify({ "success": True, "data": { "results": result["results"], "count": result["count"], "query": query } }) except Exception as e: print(f"[API] 搜索对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "搜索对话时发生异常" }), 500 @app.route('/api/conversations//messages', methods=['GET']) @api_login_required @with_terminal def get_conversation_messages(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话的消息历史(可选功能,用于调试或详细查看)""" try: # 获取完整对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: messages = conversation_data.get("messages", []) # 可选:限制消息数量,避免返回过多数据 limit = request.args.get('limit', type=int) if limit: messages = messages[-limit:] # 获取最后N条消息 return jsonify({ "success": True, "data": { "conversation_id": conversation_id, "messages": messages, "total_count": len(conversation_data.get("messages", [])) } }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话消息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话消息时发生异常" }), 500 @app.route('/api/conversations//compress', methods=['POST']) @api_login_required @with_terminal def compress_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """压缩指定对话的大体积消息,生成压缩版新对话""" try: policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_compress_conversation"): return jsonify({"success": False, "error": "压缩对话已被管理员禁用"}), 403 normalized_id = conversation_id if conversation_id.startswith('conv_') else f"conv_{conversation_id}" result = terminal.context_manager.compress_conversation(normalized_id) if not result.get("success"): status_code = 404 if "不存在" in result.get("error", "") else 400 return jsonify(result), status_code new_conversation_id = result["compressed_conversation_id"] load_result = terminal.load_conversation(new_conversation_id) if load_result.get("success"): socketio.emit('conversation_list_update', { 'action': 'compressed', 'conversation_id': new_conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': new_conversation_id, 'title': load_result.get('title', '压缩后的对话'), 'messages_count': load_result.get('messages_count', 0) }, room=f"user_{username}") socketio.emit('conversation_loaded', { 'conversation_id': new_conversation_id, 'clear_ui': True }, room=f"user_{username}") response_payload = { "success": True, "compressed_conversation_id": new_conversation_id, "compressed_types": result.get("compressed_types", []), "system_message": result.get("system_message"), "load_result": load_result } return jsonify(response_payload) except Exception as e: print(f"[API] 压缩对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "压缩对话时发生异常" }), 500 @app.route('/api/sub_agents', methods=['GET']) @api_login_required @with_terminal def list_sub_agents(terminal: WebTerminal, workspace: UserWorkspace, username: str): """返回当前对话的子智能体任务列表。""" manager = getattr(terminal, "sub_agent_manager", None) if not manager: return jsonify({"success": True, "data": []}) try: conversation_id = terminal.context_manager.current_conversation_id data = manager.get_overview(conversation_id=conversation_id) return jsonify({"success": True, "data": data}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @app.route('/api/conversations//duplicate', methods=['POST']) @api_login_required @with_terminal def duplicate_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """复制指定对话,生成新的对话副本""" try: result = terminal.context_manager.duplicate_conversation(conversation_id) if not result.get("success"): status_code = 404 if "不存在" in result.get("error", "") else 400 return jsonify(result), status_code new_conversation_id = result["duplicate_conversation_id"] load_result = terminal.load_conversation(new_conversation_id) if load_result.get("success"): socketio.emit('conversation_list_update', { 'action': 'duplicated', 'conversation_id': new_conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': new_conversation_id, 'title': load_result.get('title', '复制的对话'), 'messages_count': load_result.get('messages_count', 0) }, room=f"user_{username}") socketio.emit('conversation_loaded', { 'conversation_id': new_conversation_id, 'clear_ui': True }, room=f"user_{username}") response_payload = { "success": True, "duplicate_conversation_id": new_conversation_id, "load_result": load_result } return jsonify(response_payload) except Exception as e: print(f"[API] 复制对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "复制对话时发生异常" }), 500 @app.route('/api/conversations//review_preview', methods=['GET']) @api_login_required @with_terminal def review_conversation_preview(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """生成对话回顾预览(不落盘,只返回前若干行文本)""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_conversation_review"): return jsonify({"success": False, "error": "对话引用已被管理员禁用"}), 403 try: current_id = terminal.context_manager.current_conversation_id if conversation_id == current_id: return jsonify({ "success": False, "message": "无法引用当前对话" }), 400 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if not conversation_data: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 limit = request.args.get('limit', default=20, type=int) or 20 lines = build_review_lines(conversation_data.get("messages", []), limit=limit) return jsonify({ "success": True, "data": { "preview": lines, "count": len(lines) } }) except Exception as e: print(f"[API] 对话回顾预览错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "生成预览时发生异常" }), 500 @app.route('/api/conversations//review', methods=['POST']) @api_login_required @with_terminal def review_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """生成完整对话回顾 Markdown 文件""" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_conversation_review"): return jsonify({"success": False, "error": "对话引用已被管理员禁用"}), 403 try: current_id = terminal.context_manager.current_conversation_id if conversation_id == current_id: return jsonify({ "success": False, "message": "无法引用当前对话" }), 400 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if not conversation_data: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 messages = conversation_data.get("messages", []) lines = build_review_lines(messages) content = "\n".join(lines) + "\n" char_count = len(content) uploads_dir = workspace.uploads_dir / "review" uploads_dir.mkdir(parents=True, exist_ok=True) title = conversation_data.get("title") or "untitled" safe_title = _sanitize_filename_component(title) timestamp = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"review_{safe_title}_{timestamp}.md" target = uploads_dir / filename target.write_text(content, encoding='utf-8') return jsonify({ "success": True, "data": { "path": f"user_upload/review/{filename}", "char_count": char_count } }) except Exception as e: print(f"[API] 对话回顾生成错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "生成对话回顾时发生异常" }), 500 @app.route('/api/conversations/statistics', methods=['GET']) @api_login_required @with_terminal def get_conversations_statistics(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话统计信息""" try: stats = terminal.context_manager.get_conversation_statistics() return jsonify({ "success": True, "data": stats }) except Exception as e: print(f"[API] 获取对话统计错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话统计时发生异常" }), 500 @app.route('/api/conversations/current', methods=['GET']) @api_login_required @with_terminal def get_current_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取当前对话信息""" current_id = terminal.context_manager.current_conversation_id # 如果是临时ID,返回空的对话信息 if not current_id or current_id.startswith('temp_'): return jsonify({ "success": True, "data": { "id": current_id, "title": "新对话", "messages_count": 0, "is_temporary": True } }) # 如果是真实的对话ID,查找对话数据 try: conversation_data = terminal.context_manager.conversation_manager.load_conversation(current_id) if conversation_data: return jsonify({ "success": True, "data": { "id": current_id, "title": conversation_data.get("title", "未知对话"), "messages_count": len(conversation_data.get("messages", [])), "is_temporary": False } }) else: return jsonify({ "success": False, "error": "对话不存在" }), 404 except Exception as e: print(f"[API] 获取当前对话错误: {e}") return jsonify({ "success": False, "error": str(e) }), 500 def process_message_task(terminal: WebTerminal, message: str, images, sender, client_sid, workspace: UserWorkspace, username: str): """在后台处理消息任务""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 创建可取消的任务 task = loop.create_task(handle_task_with_sender(terminal, workspace, message, images, sender, client_sid, username)) entry = stop_flags.get(client_sid) if not isinstance(entry, dict): entry = {'stop': False, 'task': None, 'terminal': None} stop_flags[client_sid] = entry entry['stop'] = False entry['task'] = task entry['terminal'] = terminal try: loop.run_until_complete(task) except asyncio.CancelledError: debug_log(f"任务 {client_sid} 被成功取消") sender('task_stopped', { 'message': '任务已停止', 'reason': 'user_requested' }) reset_system_state(terminal) loop.close() except Exception as e: # 【新增】错误时确保对话状态不丢失 try: if terminal and terminal.context_manager: # 尝试保存当前对话状态 terminal.context_manager.auto_save_conversation() debug_log("错误恢复:对话状态已保存") except Exception as save_error: debug_log(f"错误恢复:保存对话状态失败: {save_error}") # 原有的错误处理逻辑 print(f"[Task] 错误: {e}") debug_log(f"任务处理错误: {e}") import traceback traceback.print_exc() sender('error', {'message': str(e)}) sender('task_complete', { 'total_iterations': 0, 'total_tool_calls': 0, 'auto_fix_attempts': 0, 'error': str(e) }) finally: # 清理任务引用 stop_flags.pop(client_sid, None) def detect_malformed_tool_call(text): """检测文本中是否包含格式错误的工具调用""" # 检测多种可能的工具调用格式 patterns = [ r'执行工具[::]\s*\w+<.*?tool.*?sep.*?>', # 执行工具: xxx<|tool▼sep|> r'<\|?tool[_▼]?call[_▼]?start\|?>', # <|tool_call_start|> r'```tool[_\s]?call', # ```tool_call 或 ```tool call r'{\s*"tool":\s*"[^"]+",\s*"arguments"', # JSON格式的工具调用 r'function_calls?:\s*\[?\s*{', # function_call: [{ ] for pattern in patterns: if re.search(pattern, text, re.IGNORECASE): return True # 检测特定的工具名称后跟JSON tool_names = ['create_file', 'read_file', 'write_file', 'edit_file', 'delete_file', 'terminal_session', 'terminal_input', 'web_search', 'extract_webpage', 'save_webpage', 'run_python', 'run_command', 'sleep'] for tool in tool_names: if tool in text and '{' in text: # 可能是工具调用但格式错误 return True return False async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspace, message, images, sender, client_sid, username: str): """处理任务并发送消息 - 集成token统计版本""" web_terminal = terminal conversation_id = getattr(web_terminal.context_manager, "current_conversation_id", None) # 如果是思考模式,重置状态 if web_terminal.thinking_mode: web_terminal.api_client.start_new_task(force_deep=web_terminal.deep_thinking_mode) state = get_thinking_state(web_terminal) state["fast_streak"] = 0 state["force_next"] = False state["suppress_next"] = False # 添加到对话历史 history_len_before = len(getattr(web_terminal.context_manager, "conversation_history", []) or []) is_first_user_message = history_len_before == 0 web_terminal.context_manager.add_conversation("user", message, images=images) if is_first_user_message and getattr(web_terminal, "context_manager", None): try: personal_config = load_personalization_config(workspace.data_dir) except Exception: personal_config = {} auto_title_enabled = personal_config.get("auto_generate_title", True) if auto_title_enabled: conv_id = getattr(web_terminal.context_manager, "current_conversation_id", None) socketio.start_background_task( generate_conversation_title_background, web_terminal, conv_id, message, username ) # === 移除:不在这里计算输入token,改为在每次API调用前计算 === # 构建上下文和消息(用于API调用) context = web_terminal.build_context() messages = web_terminal.build_messages(context, message) tools = web_terminal.define_tools() # 开始新的AI消息 sender('ai_message_start', {}) # 增量保存相关变量 accumulated_response = "" # 累积的响应内容 is_first_iteration = True # 是否是第一次迭代 # 统计和限制变量 total_iterations = 0 total_tool_calls = 0 consecutive_same_tool = defaultdict(int) last_tool_name = "" auto_fix_attempts = 0 last_tool_call_time = 0 detected_tool_intent: Dict[str, str] = {} # 设置最大迭代次数 max_iterations = MAX_ITERATIONS_PER_TASK pending_append = None # {"path": str, "tool_call_id": str, "buffer": str, ...} append_probe_buffer = "" pending_modify = None # {"path": str, "tool_call_id": str, "buffer": str, ...} modify_probe_buffer = "" def extract_intent_from_partial(arg_str: str) -> Optional[str]: """从不完整的JSON字符串中粗略提取 intent 字段,容错用于流式阶段。""" if not arg_str or "intent" not in arg_str: return None import re # 匹配 "intent": "xxx" 形式,允许前面有换行或空格;宽松匹配未闭合的引号 match = re.search(r'"intent"\s*:\s*"([^"]{0,128})', arg_str, re.IGNORECASE | re.DOTALL) if match: return match.group(1) return None def resolve_monitor_path(args: Dict[str, Any], fallback: Optional[str] = None) -> Optional[str]: candidates = [ args.get('path'), args.get('target_path'), args.get('file_path'), args.get('destination_path'), fallback ] for candidate in candidates: if isinstance(candidate, str): trimmed = candidate.strip() if trimmed: return trimmed return None def resolve_monitor_memory(entries: Any) -> Optional[List[str]]: if isinstance(entries, list): return [str(item) for item in entries][:MONITOR_MEMORY_ENTRY_LIMIT] return None def capture_monitor_snapshot(path: Optional[str]) -> Optional[Dict[str, Any]]: if not path: return None try: read_result = web_terminal.file_manager.read_file(path) except Exception as exc: debug_log(f"[MonitorSnapshot] 读取文件失败: {path} ({exc})") return None if not isinstance(read_result, dict) or not read_result.get('success'): return None content = read_result.get('content') if not isinstance(content, str): content = '' if len(content) > MONITOR_SNAPSHOT_CHAR_LIMIT: content = content[:MONITOR_SNAPSHOT_CHAR_LIMIT] return { 'path': read_result.get('path') or path, 'content': content } async def finalize_pending_append(response_text: str, stream_completed: bool, finish_reason: str = None) -> Dict: """在流式输出结束后处理追加写入""" nonlocal pending_append, append_probe_buffer result = { "handled": False, "success": False, "summary": None, "summary_message": None, "tool_content": None, "tool_call_id": None, "path": None, "forced": False, "error": None, "assistant_content": response_text, "lines": 0, "bytes": 0, "finish_reason": finish_reason, "appended_content": "", "assistant_metadata": None } if not pending_append: return result state = pending_append path = state.get("path") tool_call_id = state.get("tool_call_id") buffer = state.get("buffer", "") start_marker = state.get("start_marker") end_marker = state.get("end_marker") start_idx = state.get("content_start") end_idx = state.get("end_index") display_id = state.get("display_id") result.update({ "handled": True, "path": path, "tool_call_id": tool_call_id, "display_id": display_id }) if path is None or tool_call_id is None: error_msg = "append_to_file 状态不完整,缺少路径或ID。" debug_log(error_msg) result["error"] = error_msg result["summary_message"] = error_msg result["tool_content"] = json.dumps({ "success": False, "error": error_msg }, ensure_ascii=False) if display_id: sender('update_action', { 'id': display_id, 'status': 'failed', 'preparing_id': tool_call_id, 'message': error_msg }) pending_append = None return result if start_idx is None: error_msg = f"未检测到格式正确的开始标识 {start_marker}。" debug_log(error_msg) result["error"] = error_msg result["summary_message"] = error_msg result["tool_content"] = json.dumps({ "success": False, "path": path, "error": error_msg }, ensure_ascii=False) if display_id: sender('update_action', { 'id': display_id, 'status': 'failed', 'preparing_id': tool_call_id, 'message': error_msg }) pending_append = None return result forced = False if end_idx is None: forced = True # 查找下一个<<<,否则使用整个缓冲结尾 remaining = buffer[start_idx:] next_marker = remaining.find("<<<", len(end_marker)) if next_marker != -1: end_idx = start_idx + next_marker else: end_idx = len(buffer) content = buffer[start_idx:end_idx] if content.startswith('\n'): content = content[1:] if not content: error_msg = "未检测到需要追加的内容,请严格按照<<>>...<<>>格式输出。" debug_log(error_msg) result["error"] = error_msg result["forced"] = forced result["tool_content"] = json.dumps({ "success": False, "path": path, "error": error_msg }, ensure_ascii=False) if display_id: sender('update_action', { 'id': display_id, 'status': 'failed', 'preparing_id': tool_call_id, 'message': error_msg }) pending_append = None return result assistant_message_lines = [] if start_marker: assistant_message_lines.append(start_marker) assistant_message_lines.append(content) if not forced and end_marker: assistant_message_lines.append(end_marker) assistant_message_text = "\n".join(assistant_message_lines) result["assistant_content"] = assistant_message_text assistant_metadata = { "append_payload": { "path": path, "tool_call_id": tool_call_id, "forced": forced, "has_end_marker": not forced } } result["assistant_metadata"] = assistant_metadata write_result = web_terminal.file_manager.append_file(path, content) if write_result.get("success"): bytes_written = len(content.encode('utf-8')) line_count = content.count('\n') if content and not content.endswith('\n'): line_count += 1 summary = f"已向 {path} 追加 {line_count} 行({bytes_written} 字节)" if forced: summary += "。未检测到 <<>> 标记,系统已在流结束处完成写入。如内容未完成,请重新调用 append_to_file 并按标准格式补充;如已完成,可继续后续步骤。" result.update({ "success": True, "summary": summary, "summary_message": summary, "forced": forced, "lines": line_count, "bytes": bytes_written, "appended_content": content, "tool_content": json.dumps({ "success": True, "path": path, "lines": line_count, "bytes": bytes_written, "forced": forced, "message": summary, "finish_reason": finish_reason }, ensure_ascii=False) }) assistant_meta_payload = result["assistant_metadata"]["append_payload"] assistant_meta_payload["lines"] = line_count assistant_meta_payload["bytes"] = bytes_written assistant_meta_payload["success"] = True summary_payload = { "success": True, "path": path, "lines": line_count, "bytes": bytes_written, "forced": forced, "message": summary } if display_id: sender('update_action', { 'id': display_id, 'status': 'completed', 'result': summary_payload, 'preparing_id': tool_call_id, 'message': summary }) debug_log(f"追加写入完成: {summary}") else: error_msg = write_result.get("error", "追加写入失败") result.update({ "error": error_msg, "summary_message": error_msg, "forced": forced, "appended_content": content, "tool_content": json.dumps({ "success": False, "path": path, "error": error_msg, "finish_reason": finish_reason }, ensure_ascii=False) }) debug_log(f"追加写入失败: {error_msg}") if result["assistant_metadata"]: assistant_meta_payload = result["assistant_metadata"]["append_payload"] assistant_meta_payload["lines"] = content.count('\n') + (0 if content.endswith('\n') or not content else 1) assistant_meta_payload["bytes"] = len(content.encode('utf-8')) assistant_meta_payload["success"] = False failure_payload = { "success": False, "path": path, "error": error_msg, "forced": forced } if display_id: sender('update_action', { 'id': display_id, 'status': 'completed', 'result': failure_payload, 'preparing_id': tool_call_id, 'message': error_msg }) pending_append = None append_probe_buffer = "" if hasattr(web_terminal, "pending_append_request"): web_terminal.pending_append_request = None return result async def finalize_pending_modify(response_text: str, stream_completed: bool, finish_reason: str = None) -> Dict: """在流式输出结束后处理修改写入""" nonlocal pending_modify, modify_probe_buffer result = { "handled": False, "success": False, "path": None, "tool_call_id": None, "display_id": None, "total_blocks": 0, "completed_blocks": [], "failed_blocks": [], "forced": False, "details": [], "error": None, "assistant_content": response_text, "assistant_metadata": None, "tool_content": None, "summary_message": None, "finish_reason": finish_reason } if not pending_modify: return result state = pending_modify path = state.get("path") tool_call_id = state.get("tool_call_id") display_id = state.get("display_id") start_marker = state.get("start_marker") end_marker = state.get("end_marker") buffer = state.get("buffer", "") raw_buffer = state.get("raw_buffer", "") end_index = state.get("end_index") result.update({ "handled": True, "path": path, "tool_call_id": tool_call_id, "display_id": display_id }) if not state.get("start_seen"): error_msg = "未检测到格式正确的 <<>> 标记。" debug_log(error_msg) result["error"] = error_msg result["summary_message"] = error_msg result["tool_content"] = json.dumps({ "success": False, "path": path, "error": error_msg, "finish_reason": finish_reason }, ensure_ascii=False) if display_id: sender('update_action', { 'id': display_id, 'status': 'failed', 'preparing_id': tool_call_id, 'message': error_msg }) if hasattr(web_terminal, "pending_modify_request"): web_terminal.pending_modify_request = None pending_modify = None modify_probe_buffer = "" return result forced = end_index is None apply_text = buffer if forced else buffer[:end_index] raw_content = raw_buffer if forced else raw_buffer[:len(start_marker) + end_index + len(end_marker)] if raw_content: result["assistant_content"] = raw_content blocks_info = [] block_reports = {} detected_indices = set() block_pattern = re.compile(r"\[replace:(\d+)\](.*?)\[/replace\]", re.DOTALL) structure_warnings: List[str] = [] structure_detail_entries: List[Dict] = [] def record_structure_warning(message: str, hint: Optional[str] = None): """记录结构性缺陷,便于给出更具体的反馈。""" if message in structure_warnings: return structure_warnings.append(message) structure_detail_entries.append({ "index": 0, "status": "failed", "reason": message, "removed_lines": 0, "added_lines": 0, "hint": hint or "请严格按照模板输出:[replace:n] + <>/<> + [/replace],并使用 <<>> 收尾。" }) def extract_segment(body: str, tag: str): marker = f"<<{tag}>>" end_tag = "<>" start_pos = body.find(marker) if start_pos == -1: return None, f"缺少 {marker}" start_pos += len(marker) if body[start_pos:start_pos+2] == "\r\n": start_pos += 2 elif body[start_pos:start_pos+1] == "\n": start_pos += 1 end_pos = body.find(end_tag, start_pos) if end_pos == -1: return None, f"缺少 {end_tag}" segment = body[start_pos:end_pos] return segment, None for match in block_pattern.finditer(apply_text): try: index = int(match.group(1)) except ValueError: continue body = match.group(2) if index in detected_indices: continue detected_indices.add(index) block_reports[index] = { "index": index, "status": "pending", "reason": None, "removed_lines": 0, "added_lines": 0, "hint": None } old_content, old_error = extract_segment(body, "OLD") new_content, new_error = extract_segment(body, "NEW") if old_error or new_error: reason = old_error or new_error block_reports[index]["status"] = "failed" block_reports[index]["reason"] = reason blocks_info.append({ "index": index, "old": old_content, "new": new_content, "error": old_error or new_error }) if not blocks_info: has_replace_start = bool(re.search(r"\[replace:\s*\d+\]", apply_text)) has_replace_end = "[/replace]" in apply_text has_old_tag = "<>" in apply_text has_new_tag = "<>" in apply_text if has_replace_start and not has_replace_end: record_structure_warning("检测到 [replace:n] 标记但缺少对应的 [/replace] 结束标记。") if has_replace_end and not has_replace_start: record_structure_warning("检测到 [/replace] 结束标记但缺少对应的 [replace:n] 起始标记。") old_tags = len(re.findall(r"<>", apply_text)) completed_old_tags = len(re.findall(r"<>[\s\S]*?<>", apply_text)) if old_tags and completed_old_tags < old_tags: record_structure_warning("检测到 <> 段落但未看到对应的 <> 结束标记。") new_tags = len(re.findall(r"<>", apply_text)) completed_new_tags = len(re.findall(r"<>[\s\S]*?<>", apply_text)) if new_tags and completed_new_tags < new_tags: record_structure_warning("检测到 <> 段落但未看到对应的 <> 结束标记。") if (has_replace_start or has_replace_end or has_old_tag or has_new_tag) and not structure_warnings: record_structure_warning("检测到部分补丁标记,但整体结构不完整,请严格按照模板填写所有标记。") total_blocks = len(blocks_info) result["total_blocks"] = total_blocks if forced: debug_log("未检测到 <<>>,将在流结束处执行已识别的修改块。") result["forced"] = True blocks_to_apply = [ {"index": block["index"], "old": block["old"], "new": block["new"]} for block in blocks_info if block["error"] is None and block["old"] is not None and block["new"] is not None ] # 记录格式残缺的块 for block in blocks_info: if block["error"]: idx = block["index"] block_reports[idx]["status"] = "failed" block_reports[idx]["reason"] = block["error"] block_reports[idx]["hint"] = "请检查补丁块的 OLD/NEW 标记是否完整,必要时复用 terminal_snapshot 或终端命令重新调整。" apply_result = {} if blocks_to_apply: apply_result = web_terminal.file_manager.apply_modify_blocks(path, blocks_to_apply) else: apply_result = {"success": False, "completed": [], "failed": [], "results": [], "write_performed": False, "error": None} block_result_map = {item["index"]: item for item in apply_result.get("results", [])} for block in blocks_info: idx = block["index"] report = block_reports.get(idx) if report is None: continue if report["status"] == "failed": continue block_apply = block_result_map.get(idx) if not block_apply: report["status"] = "failed" report["reason"] = "未执行,可能未找到匹配原文" report["hint"] = report.get("hint") or "请确认 OLD 文本与文件内容完全一致;若多次失败,可改用终端命令/Python 进行精准替换。" continue status = block_apply.get("status") report["removed_lines"] = block_apply.get("removed_lines", 0) report["added_lines"] = block_apply.get("added_lines", 0) if block_apply.get("hint"): report["hint"] = block_apply.get("hint") if status == "success": report["status"] = "completed" elif status == "not_found": report["status"] = "failed" report["reason"] = block_apply.get("reason") or "未找到匹配的原文" if not report.get("hint"): report["hint"] = "请使用 terminal_snapshot/grep -n 校验原文,或在说明后改用 run_command/python 精确替换。" else: report["status"] = "failed" report["reason"] = block_apply.get("reason") or "替换失败" if not report.get("hint"): report["hint"] = block_apply.get("hint") or "若多次尝试仍失败,可考虑利用终端命令或 Python 小脚本完成此次修改。" completed_blocks = sorted([idx for idx, rep in block_reports.items() if rep["status"] == "completed"]) failed_blocks = sorted([idx for idx, rep in block_reports.items() if rep["status"] != "completed"]) result["completed_blocks"] = completed_blocks result["failed_blocks"] = failed_blocks details = sorted(block_reports.values(), key=lambda x: x["index"]) if structure_detail_entries: details = structure_detail_entries + details result["details"] = details summary_parts = [] if total_blocks == 0: summary_parts.append("未检测到有效的修改块,未执行任何修改。") summary_parts.extend(structure_warnings) else: if not completed_blocks and failed_blocks: summary_parts.append(f"共检测到 {total_blocks} 个修改块,全部未执行。") elif completed_blocks and not failed_blocks: summary_parts.append(f"共 {total_blocks} 个修改块全部完成。") else: summary_parts.append( f"共检测到 {total_blocks} 个修改块,其中成功 {len(completed_blocks)} 个,失败 {len(failed_blocks)} 个。" ) if forced: summary_parts.append("未检测到 <<>> 标记,系统已在流结束处执行补丁。") if apply_result.get("error"): summary_parts.append(apply_result["error"]) matching_note = "提示:补丁匹配基于完整文本,包含注释和空白符,请确保 <<>> 段落与文件内容逐字一致。如果修改成功,请忽略,如果失败,请明确原文后再次尝试。" summary_parts.append(matching_note) summary_message = " ".join(summary_parts).strip() result["summary_message"] = summary_message result["success"] = bool(completed_blocks) and not failed_blocks and apply_result.get("error") is None tool_payload = { "success": result["success"], "path": path, "total_blocks": total_blocks, "completed": completed_blocks, "failed": [ { "index": rep["index"], "reason": rep.get("reason"), "hint": rep.get("hint") } for rep in result["details"] if rep["status"] != "completed" ], "forced": forced, "message": summary_message, "finish_reason": finish_reason, "details": result["details"] } if apply_result.get("error"): tool_payload["error"] = apply_result["error"] result["tool_content"] = json.dumps(tool_payload, ensure_ascii=False) result["assistant_metadata"] = { "modify_payload": { "path": path, "total_blocks": total_blocks, "completed": completed_blocks, "failed": failed_blocks, "forced": forced, "details": result["details"] } } if display_id: sender('update_action', { 'id': display_id, 'status': 'completed' if result["success"] else 'failed', 'result': tool_payload, 'preparing_id': tool_call_id, 'message': summary_message }) pending_modify = None modify_probe_buffer = "" if hasattr(web_terminal, "pending_modify_request"): web_terminal.pending_modify_request = None return result async def process_sub_agent_updates( messages: List[Dict], inline: bool = False, after_tool_call_id: Optional[str] = None ): """轮询子智能体任务并通知前端,并把结果插入当前对话上下文。""" manager = getattr(web_terminal, "sub_agent_manager", None) if not manager: return try: updates = manager.poll_updates() debug_log(f"[SubAgent] poll inline={inline} updates={len(updates)}") except Exception as exc: debug_log(f"子智能体状态检查失败: {exc}") return for update in updates: message = update.get("system_message") if not message: continue task_id = update.get("task_id") debug_log(f"[SubAgent] update task={task_id} inline={inline} msg={message}") web_terminal._record_sub_agent_message(message, task_id, inline=inline) debug_log(f"[SubAgent] recorded task={task_id}, 计算插入位置") insert_index = len(messages) if after_tool_call_id: for idx, msg in enumerate(messages): if msg.get("role") == "tool" and msg.get("tool_call_id") == after_tool_call_id: insert_index = idx + 1 break messages.insert(insert_index, { "role": "system", "content": message, "metadata": {"sub_agent_notice": True, "inline": inline, "task_id": task_id} }) debug_log(f"[SubAgent] 插入系统消息位置: {insert_index}") sender('system_message', { 'content': message, 'inline': inline }) maybe_mark_failure_from_message(web_terminal, message) for iteration in range(max_iterations): total_iterations += 1 debug_log(f"\n--- 迭代 {iteration + 1}/{max_iterations} 开始 ---") # 检查是否超过总工具调用限制 if total_tool_calls >= MAX_TOTAL_TOOL_CALLS: debug_log(f"已达到最大工具调用次数限制 ({MAX_TOTAL_TOOL_CALLS})") sender('system_message', { 'content': f'⚠️ 已达到最大工具调用次数限制 ({MAX_TOTAL_TOOL_CALLS}),任务结束。' }) mark_force_thinking(web_terminal, reason="tool_limit") break apply_thinking_schedule(web_terminal) full_response = "" tool_calls = [] current_thinking = "" detected_tools = {} last_usage_payload = None # 状态标志 in_thinking = False thinking_started = False thinking_ended = False text_started = False text_has_content = False text_streaming = False text_chunk_index = 0 last_text_chunk_time: Optional[float] = None # 计数器 chunk_count = 0 reasoning_chunks = 0 content_chunks = 0 tool_chunks = 0 append_break_triggered = False append_result = {"handled": False} modify_break_triggered = False modify_result = {"handled": False} last_finish_reason = None thinking_expected = web_terminal.api_client.get_current_thinking_mode() debug_log(f"思考模式: {thinking_expected}") quota_allowed = True quota_info = {} if hasattr(web_terminal, "record_model_call"): quota_allowed, quota_info = web_terminal.record_model_call(bool(thinking_expected)) if not quota_allowed: quota_type = 'thinking' if thinking_expected else 'fast' socketio.emit('quota_notice', { 'type': quota_type, 'reset_at': quota_info.get('reset_at'), 'limit': quota_info.get('limit'), 'count': quota_info.get('count') }, room=f"user_{getattr(web_terminal, 'username', '')}") sender('quota_exceeded', { 'type': quota_type, 'reset_at': quota_info.get('reset_at') }) sender('error', { 'message': "配额已达到上限,暂时无法继续调用模型。", 'quota': quota_info }) return print(f"[API] 第{iteration + 1}次调用 (总工具调用: {total_tool_calls}/{MAX_TOTAL_TOOL_CALLS})") # 收集流式响应 async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): chunk_count += 1 # 检查停止标志 client_stop_info = stop_flags.get(client_sid) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log(f"检测到停止请求,中断流处理") if pending_append: append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop") break if pending_modify: modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop") break # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) usage_info = chunk.get("usage") if usage_info: last_usage_payload = usage_info if "choices" not in chunk: debug_log(f"Chunk {chunk_count}: 无choices字段") continue if not chunk.get("choices"): debug_log(f"Chunk {chunk_count}: choices为空列表") continue choice = chunk["choices"][0] if not usage_info and isinstance(choice, dict) and choice.get("usage"): # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) last_usage_payload = choice.get("usage") delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") if finish_reason: last_finish_reason = finish_reason # 处理思考内容 if "reasoning_content" in delta: reasoning_content = delta["reasoning_content"] if reasoning_content: reasoning_chunks += 1 debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") if not thinking_started: in_thinking = True thinking_started = True sender('thinking_start', {}) await asyncio.sleep(0.05) current_thinking += reasoning_content sender('thinking_chunk', {'content': reasoning_content}) # 处理正常内容 if "content" in delta: content = delta["content"] if content: content_chunks += 1 debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") if in_thinking and not thinking_ended: in_thinking = False thinking_ended = True sender('thinking_end', {'full_content': current_thinking}) await asyncio.sleep(0.1) expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) if pending_modify: if not pending_modify.get("start_seen"): probe_buffer = pending_modify.get("probe_buffer", "") + content if len(probe_buffer) > 10000: probe_buffer = probe_buffer[-10000:] marker = pending_modify.get("start_marker") marker_index = probe_buffer.find(marker) if marker_index == -1: pending_modify["probe_buffer"] = probe_buffer continue after_marker = marker_index + len(marker) remainder = probe_buffer[after_marker:] pending_modify["buffer"] = remainder pending_modify["raw_buffer"] = marker + remainder pending_modify["start_seen"] = True pending_modify["detected_blocks"] = set() pending_modify["probe_buffer"] = "" if pending_modify.get("display_id"): sender('update_action', { 'id': pending_modify["display_id"], 'status': 'running', 'preparing_id': pending_modify.get("tool_call_id"), 'message': f"正在修改 {pending_modify['path']}..." }) else: pending_modify["buffer"] += content pending_modify["raw_buffer"] += content if pending_modify.get("start_seen"): block_text = pending_modify["buffer"] for match in re.finditer(r"\[replace:(\d+)\]", block_text): try: block_index = int(match.group(1)) except ValueError: continue detected_blocks = pending_modify.setdefault("detected_blocks", set()) if block_index not in detected_blocks: detected_blocks.add(block_index) if pending_modify.get("display_id"): sender('update_action', { 'id': pending_modify["display_id"], 'status': 'running', 'preparing_id': pending_modify.get("tool_call_id"), 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." }) if pending_modify.get("start_seen"): end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) if end_pos != -1: pending_modify["end_index"] = end_pos modify_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并应用修改") break continue elif expecting_modify: modify_probe_buffer += content if len(modify_probe_buffer) > 10000: modify_probe_buffer = modify_probe_buffer[-10000:] marker_match = re.search(r"<<>>", modify_probe_buffer) if marker_match: detected_raw_path = marker_match.group(1) detected_path = detected_raw_path.strip() marker_full = marker_match.group(0) after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) remainder = modify_probe_buffer[after_marker_index:] modify_probe_buffer = "" if not detected_path: debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") continue pending_modify = { "path": detected_path, "tool_call_id": None, "buffer": remainder, "raw_buffer": marker_full + remainder, "start_marker": marker_full, "end_marker": "<<>>", "start_seen": True, "end_index": None, "display_id": None, "detected_blocks": set() } if hasattr(web_terminal, "pending_modify_request"): web_terminal.pending_modify_request = {"path": detected_path} debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) if end_pos != -1: pending_modify["end_index"] = end_pos modify_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并应用修改") break continue if pending_append: pending_append["buffer"] += content if pending_append.get("content_start") is None: marker_index = pending_append["buffer"].find(pending_append["start_marker"]) if marker_index != -1: pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") if pending_append.get("content_start") is not None: end_index = pending_append["buffer"].find( pending_append["end_marker"], pending_append["content_start"] ) if end_index != -1: pending_append["end_index"] = end_index append_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并写入文件") break # 继续累积追加内容 continue elif expecting_append: append_probe_buffer += content # 限制缓冲区大小防止过长 if len(append_probe_buffer) > 10000: append_probe_buffer = append_probe_buffer[-10000:] marker_match = re.search(r"<<>>", append_probe_buffer) if marker_match: detected_raw_path = marker_match.group(1) detected_path = detected_raw_path.strip() if not detected_path: append_probe_buffer = append_probe_buffer[marker_match.end():] continue marker_full = marker_match.group(0) after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) remainder = append_probe_buffer[after_marker_index:] append_probe_buffer = "" pending_append = { "path": detected_path, "tool_call_id": None, "buffer": remainder, "start_marker": marker_full, "end_marker": "<<>>", "content_start": 0, "end_index": None, "display_id": None } if hasattr(web_terminal, "pending_append_request"): web_terminal.pending_append_request = {"path": detected_path} debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") # 检查是否立即包含结束标记 if pending_append["buffer"]: end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) if end_index != -1: pending_append["end_index"] = end_index append_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并写入文件") break continue if not text_started: text_started = True text_streaming = True sender('text_start', {}) brief_log("模型输出了内容") await asyncio.sleep(0.05) if not pending_append: full_response += content accumulated_response += content text_has_content = True emit_time = time.time() elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time last_text_chunk_time = emit_time text_chunk_index += 1 log_backend_chunk( conversation_id, iteration + 1, text_chunk_index, elapsed, len(content), content[:32] ) sender('text_chunk', { 'content': content, 'index': text_chunk_index, 'elapsed': elapsed }) # 收集工具调用 - 实时发送准备状态 if "tool_calls" in delta: tool_chunks += 1 for tc in delta["tool_calls"]: found = False for existing in tool_calls: if existing.get("index") == tc.get("index"): if "function" in tc and "arguments" in tc["function"]: arg_chunk = tc["function"]["arguments"] existing_fn = existing.get("function", {}) existing_args = existing_fn.get("arguments", "") existing_fn["arguments"] = (existing_args or "") + arg_chunk existing["function"] = existing_fn combined_args = existing_fn.get("arguments", "") tool_id = existing.get("id") or tc.get("id") tool_name = ( existing_fn.get("name") or tc.get("function", {}).get("name", "") ) intent_value = extract_intent_from_partial(combined_args) if ( intent_value and tool_id and detected_tool_intent.get(tool_id) != intent_value ): detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) found = True break if not found and tc.get("id"): tool_id = tc["id"] tool_name = tc.get("function", {}).get("name", "") arguments_str = tc.get("function", {}).get("arguments", "") or "" # 新工具检测到,立即发送准备事件 if tool_id not in detected_tools and tool_name: detected_tools[tool_id] = tool_name # 尝试提前提取 intent intent_value = None if arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value: detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") # 立即发送工具准备中事件 brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") sender('tool_preparing', { 'id': tool_id, 'name': tool_name, 'message': f'准备调用 {tool_name}...', 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具准备事件: {tool_name}") await asyncio.sleep(0.1) tool_calls.append({ "id": tool_id, "index": tc.get("index"), "type": "function", "function": { "name": tool_name, "arguments": arguments_str } }) # 尝试从增量参数中抽取 intent,并单独推送 if tool_id and arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value and detected_tool_intent.get(tool_id) != intent_value: detected_tool_intent[tool_id] = intent_value sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) debug_log(f" 新工具: {tool_name}") # 检查是否被停止 client_stop_info = stop_flags.get(client_sid) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log("任务在流处理完成后检测到停止状态") return # === API响应完成后只计算输出token === if last_usage_payload: try: web_terminal.context_manager.apply_usage_statistics(last_usage_payload) debug_log( f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " f"completion={last_usage_payload.get('completion_tokens', 0)}, " f"total={last_usage_payload.get('total_tokens', 0)}" ) except Exception as e: debug_log(f"Usage统计更新失败: {e}") else: debug_log("未获取到usage字段,跳过token统计更新") # 流结束后的处理 debug_log(f"\n流结束统计:") debug_log(f" 总chunks: {chunk_count}") debug_log(f" 思考chunks: {reasoning_chunks}") debug_log(f" 内容chunks: {content_chunks}") debug_log(f" 工具chunks: {tool_chunks}") debug_log(f" 收集到的思考: {len(current_thinking)} 字符") debug_log(f" 收集到的正文: {len(full_response)} 字符") debug_log(f" 收集到的工具: {len(tool_calls)} 个") if not append_result["handled"] and pending_append: append_result = await finalize_pending_append(full_response, True, finish_reason=last_finish_reason) if not modify_result["handled"] and pending_modify: modify_result = await finalize_pending_modify(full_response, True, finish_reason=last_finish_reason) # 结束未完成的流 if in_thinking and not thinking_ended: sender('thinking_end', {'full_content': current_thinking}) await asyncio.sleep(0.1) # 确保text_end事件被发送 if text_started and text_has_content and not append_result["handled"] and not modify_result["handled"]: debug_log(f"发送text_end事件,完整内容长度: {len(full_response)}") sender('text_end', {'full_content': full_response}) await asyncio.sleep(0.1) text_streaming = False if full_response.strip(): debug_log(f"流式文本内容长度: {len(full_response)} 字符") if append_result["handled"]: append_metadata = append_result.get("assistant_metadata") append_content_text = append_result.get("assistant_content") if append_content_text: web_terminal.context_manager.add_conversation( "assistant", append_content_text, metadata=append_metadata ) debug_log("💾 增量保存:追加正文快照") payload_info = append_metadata.get("append_payload") if append_metadata else {} sender('append_payload', { 'path': payload_info.get("path") or append_result.get("path"), 'forced': payload_info.get("forced", False), 'lines': payload_info.get("lines"), 'bytes': payload_info.get("bytes"), 'tool_call_id': payload_info.get("tool_call_id") or append_result.get("tool_call_id"), 'success': payload_info.get("success", append_result.get("success", False)), 'conversation_id': conversation_id }) if append_result["tool_content"]: tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}" system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"]) web_terminal.context_manager.add_conversation("system", system_notice) append_result["tool_call_id"] = tool_call_id debug_log("💾 增量保存:append_to_file 工具结果(system 通知)") finish_reason = append_result.get("finish_reason") path_for_prompt = append_result.get("path") need_follow_prompt = ( finish_reason == "length" or append_result.get("forced") or not append_result.get("success") ) if need_follow_prompt and path_for_prompt: prompt_lines = [ f"append_to_file 在处理 {path_for_prompt} 时未完成,需要重新发起写入。" ] if finish_reason == "length": prompt_lines.append( "上一次输出达到系统单次输出上限,已写入的内容已保存。" ) if append_result.get("forced"): prompt_lines.append( "收到的内容缺少 <<>> 标记,系统依据流式结束位置落盘。" ) if not append_result.get("success"): prompt_lines.append("系统未能识别有效的追加标记。") prompt_lines.append( "请再次调用 append_to_file 工具获取新的写入窗口,并在工具调用的输出中遵循以下格式:" ) prompt_lines.append(f"<<>>") prompt_lines.append("...填写剩余正文,如内容已完成可留空...") prompt_lines.append("<<>>") prompt_lines.append("不要在普通回复中粘贴上述标记,必须通过 append_to_file 工具发送。") follow_prompt = "\n".join(prompt_lines) messages.append({ "role": "system", "content": follow_prompt }) web_terminal.context_manager.add_conversation("system", follow_prompt) debug_log("已注入追加任务提示") if append_result["handled"] and append_result.get("forced") and append_result.get("success"): mark_force_thinking(web_terminal, reason="append_forced_finish") if append_result["handled"] and not append_result.get("success"): sender('system_message', { 'content': f'⚠️ 追加写入失败:{append_result.get("error")}' }) maybe_mark_failure_from_message(web_terminal, f'⚠️ 追加写入失败:{append_result.get("error")}') mark_force_thinking(web_terminal, reason="append_failed") if modify_result["handled"]: modify_metadata = modify_result.get("assistant_metadata") modify_content_text = modify_result.get("assistant_content") if modify_content_text: web_terminal.context_manager.add_conversation( "assistant", modify_content_text, metadata=modify_metadata ) debug_log("💾 增量保存:修改正文快照") payload_info = modify_metadata.get("modify_payload") if modify_metadata else {} sender('modify_payload', { 'path': payload_info.get("path") or modify_result.get("path"), 'total': payload_info.get("total_blocks") or modify_result.get("total_blocks"), 'completed': payload_info.get("completed") or modify_result.get("completed_blocks"), 'failed': payload_info.get("failed") or modify_result.get("failed_blocks"), 'forced': payload_info.get("forced", modify_result.get("forced", False)), 'success': modify_result.get("success", False), 'conversation_id': conversation_id }) if modify_result["tool_content"]: tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}" system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"]) web_terminal.context_manager.add_conversation("system", system_notice) modify_result["tool_call_id"] = tool_call_id debug_log("💾 增量保存:modify_file 工具结果(system 通知)") path_for_prompt = modify_result.get("path") failed_blocks = modify_result.get("failed_blocks") or [] need_follow_prompt = modify_result.get("forced") or bool(failed_blocks) if need_follow_prompt and path_for_prompt: prompt_lines = [ f"modify_file 在处理 {path_for_prompt} 时未完成,需要重新发起补丁。" ] if modify_result.get("forced"): prompt_lines.append( "刚才的内容缺少 <<>> 标记,系统仅应用了已识别的部分。" ) if failed_blocks: failed_text = "、".join(str(idx) for idx in failed_blocks) prompt_lines.append(f"以下补丁未成功:第 {failed_text} 处。") prompt_lines.append( "请再次调用 modify_file 工具,并在新的工具调用中按以下模板提供完整补丁:" ) prompt_lines.append(f"<<>>") prompt_lines.append("[replace:序号]") prompt_lines.append("<>") prompt_lines.append("...原文(必须逐字匹配,包含全部缩进、空格和换行)...") prompt_lines.append("<>") prompt_lines.append("<>") prompt_lines.append("...新内容,可留空表示清空,注意保持结构完整...") prompt_lines.append("<>") prompt_lines.append("[/replace]") prompt_lines.append("<<>>") prompt_lines.append("请勿在普通回复中直接粘贴补丁,必须通过 modify_file 工具发送。") follow_prompt = "\n".join(prompt_lines) messages.append({ "role": "system", "content": follow_prompt }) web_terminal.context_manager.add_conversation("system", follow_prompt) debug_log("已注入修改任务提示") if modify_result["handled"] and modify_result.get("failed_blocks"): mark_force_thinking(web_terminal, reason="modify_partial_failure") if modify_result["handled"] and modify_result.get("forced") and modify_result.get("success"): mark_force_thinking(web_terminal, reason="modify_forced_finish") if modify_result["handled"] and not modify_result.get("success"): error_message = modify_result.get("summary_message") or modify_result.get("error") or "修改操作未成功,请根据提示重新执行。" sender('system_message', { 'content': f'⚠️ 修改操作存在未完成的内容:{error_message}' }) maybe_mark_failure_from_message(web_terminal, f'⚠️ 修改操作存在未完成的内容:{error_message}') mark_force_thinking(web_terminal, reason="modify_failed") if web_terminal.api_client.last_call_used_thinking and current_thinking: web_terminal.api_client.current_task_thinking = current_thinking or "" if web_terminal.api_client.current_task_first_call: web_terminal.api_client.current_task_first_call = False update_thinking_after_call(web_terminal) # 检测是否有格式错误的工具调用 if not tool_calls and full_response and AUTO_FIX_TOOL_CALL and not append_result["handled"] and not modify_result["handled"]: if detect_malformed_tool_call(full_response): auto_fix_attempts += 1 if auto_fix_attempts <= AUTO_FIX_MAX_ATTEMPTS: debug_log(f"检测到格式错误的工具调用,尝试自动修复 (尝试 {auto_fix_attempts}/{AUTO_FIX_MAX_ATTEMPTS})") fix_message = "你使用了错误的格式输出工具调用。请使用正确的工具调用格式而不是直接输出JSON。根据当前进度继续执行任务。" sender('system_message', { 'content': f'⚠️ 自动修复: {fix_message}' }) maybe_mark_failure_from_message(web_terminal, f'⚠️ 自动修复: {fix_message}') messages.append({ "role": "user", "content": fix_message }) await asyncio.sleep(1) continue else: debug_log(f"自动修复尝试已达上限 ({AUTO_FIX_MAX_ATTEMPTS})") sender('system_message', { 'content': f'⌘ 工具调用格式错误,自动修复失败。请手动检查并重试。' }) maybe_mark_failure_from_message(web_terminal, '⌘ 工具调用格式错误,自动修复失败。请手动检查并重试。') break # 构建助手消息(用于API继续对话) assistant_content_parts = [] if full_response: assistant_content_parts.append(full_response) elif append_result["handled"] and append_result["assistant_content"]: assistant_content_parts.append(append_result["assistant_content"]) elif modify_result["handled"] and modify_result.get("assistant_content"): assistant_content_parts.append(modify_result["assistant_content"]) assistant_content = "\n".join(assistant_content_parts) if assistant_content_parts else "" # 添加到消息历史(用于API继续对话,不保存到文件) assistant_message = { "role": "assistant", "content": assistant_content, "tool_calls": tool_calls } if current_thinking: assistant_message["reasoning_content"] = current_thinking messages.append(assistant_message) if assistant_content or current_thinking or tool_calls: web_terminal.context_manager.add_conversation( "assistant", assistant_content, tool_calls=tool_calls if tool_calls else None, reasoning_content=current_thinking or None ) # 为下一轮迭代重置流状态标志,但保留 full_response 供上面保存使用 text_streaming = False text_started = False text_has_content = False full_response = "" if append_result["handled"] and append_result.get("tool_content"): tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}" system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"]) messages.append({ "role": "system", "content": system_notice }) append_result["tool_call_id"] = tool_call_id debug_log("已将 append_to_file 工具结果以 system 形式追加到对话上下文") if modify_result["handled"] and modify_result.get("tool_content"): tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}" system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"]) messages.append({ "role": "system", "content": system_notice }) modify_result["tool_call_id"] = tool_call_id debug_log("已将 modify_file 工具结果以 system 形式追加到对话上下文") force_continue = append_result["handled"] or modify_result["handled"] if force_continue: if append_result["handled"]: debug_log("append_to_file 已处理,继续下一轮以让模型返回确认回复") elif modify_result["handled"]: debug_log("modify_file 已处理,继续下一轮以让模型返回确认回复") else: debug_log("补丁处理完成,继续下一轮以获取模型回复") continue if not tool_calls: debug_log("没有工具调用,结束迭代") break # 检查连续相同工具调用 for tc in tool_calls: tool_name = tc["function"]["name"] if tool_name == last_tool_name: consecutive_same_tool[tool_name] += 1 if consecutive_same_tool[tool_name] >= MAX_CONSECUTIVE_SAME_TOOL: debug_log(f"警告: 连续调用相同工具 {tool_name} 已达 {MAX_CONSECUTIVE_SAME_TOOL} 次") sender('system_message', { 'content': f'⚠️ 检测到重复调用 {tool_name} 工具 {MAX_CONSECUTIVE_SAME_TOOL} 次,可能存在循环。' }) maybe_mark_failure_from_message(web_terminal, f'⚠️ 检测到重复调用 {tool_name} 工具 {MAX_CONSECUTIVE_SAME_TOOL} 次,可能存在循环。') if consecutive_same_tool[tool_name] >= MAX_CONSECUTIVE_SAME_TOOL + 2: debug_log(f"终止: 工具 {tool_name} 调用次数过多") sender('system_message', { 'content': f'⌘ 工具 {tool_name} 重复调用过多,任务终止。' }) maybe_mark_failure_from_message(web_terminal, f'⌘ 工具 {tool_name} 重复调用过多,任务终止。') break else: consecutive_same_tool.clear() consecutive_same_tool[tool_name] = 1 last_tool_name = tool_name # 更新统计 total_tool_calls += len(tool_calls) # 执行每个工具 for tool_call in tool_calls: # 检查停止标志 client_stop_info = stop_flags.get(client_sid) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log("在工具调用过程中检测到停止状态") return # 工具调用间隔控制 current_time = time.time() if last_tool_call_time > 0: elapsed = current_time - last_tool_call_time if elapsed < TOOL_CALL_COOLDOWN: await asyncio.sleep(TOOL_CALL_COOLDOWN - elapsed) last_tool_call_time = time.time() function_name = tool_call["function"]["name"] arguments_str = tool_call["function"]["arguments"] tool_call_id = tool_call["id"] debug_log(f"准备解析JSON,工具: {function_name}, 参数长度: {len(arguments_str)}") debug_log(f"JSON参数前200字符: {arguments_str[:200]}") debug_log(f"JSON参数后200字符: {arguments_str[-200:]}") # 使用改进的参数解析方法 if hasattr(web_terminal, 'api_client') and hasattr(web_terminal.api_client, '_safe_tool_arguments_parse'): success, arguments, error_msg = web_terminal.api_client._safe_tool_arguments_parse(arguments_str, function_name) if not success: debug_log(f"安全解析失败: {error_msg}") error_text = f'工具参数解析失败: {error_msg}' error_payload = { "success": False, "error": error_text, "error_type": "parameter_format_error", "tool_name": function_name, "tool_call_id": tool_call_id, "message": error_text } sender('error', {'message': error_text}) sender('update_action', { 'preparing_id': tool_call_id, 'status': 'completed', 'result': error_payload, 'message': error_text }) error_content = json.dumps(error_payload, ensure_ascii=False) web_terminal.context_manager.add_conversation( "tool", error_content, tool_call_id=tool_call_id, name=function_name ) messages.append({ "role": "tool", "tool_call_id": tool_call_id, "name": function_name, "content": error_content }) continue debug_log(f"使用安全解析成功,参数键: {list(arguments.keys())}") else: # 回退到带有基本修复逻辑的解析 try: arguments = json.loads(arguments_str) if arguments_str.strip() else {} debug_log(f"直接JSON解析成功,参数键: {list(arguments.keys())}") except json.JSONDecodeError as e: debug_log(f"原始JSON解析失败: {e}") # 尝试基本的JSON修复 repaired_str = arguments_str.strip() repair_attempts = [] # 修复1: 未闭合字符串 if repaired_str.count('"') % 2 == 1: repaired_str += '"' repair_attempts.append("添加闭合引号") # 修复2: 未闭合JSON对象 if repaired_str.startswith('{') and not repaired_str.rstrip().endswith('}'): repaired_str = repaired_str.rstrip() + '}' repair_attempts.append("添加闭合括号") # 修复3: 截断的JSON(移除不完整的最后一个键值对) if not repair_attempts: # 如果前面的修复都没用上 last_comma = repaired_str.rfind(',') if last_comma > 0: repaired_str = repaired_str[:last_comma] + '}' repair_attempts.append("移除不完整的键值对") # 尝试解析修复后的JSON try: arguments = json.loads(repaired_str) debug_log(f"JSON修复成功: {', '.join(repair_attempts)}") debug_log(f"修复后参数键: {list(arguments.keys())}") except json.JSONDecodeError as repair_error: debug_log(f"JSON修复也失败: {repair_error}") debug_log(f"修复尝试: {repair_attempts}") debug_log(f"修复后内容前100字符: {repaired_str[:100]}") error_text = f'工具参数解析失败: {e}' error_payload = { "success": False, "error": error_text, "error_type": "parameter_format_error", "tool_name": function_name, "tool_call_id": tool_call_id, "message": error_text } sender('error', {'message': error_text}) sender('update_action', { 'preparing_id': tool_call_id, 'status': 'completed', 'result': error_payload, 'message': error_text }) error_content = json.dumps(error_payload, ensure_ascii=False) web_terminal.context_manager.add_conversation( "tool", error_content, tool_call_id=tool_call_id, name=function_name ) messages.append({ "role": "tool", "tool_call_id": tool_call_id, "name": function_name, "content": error_content }) continue debug_log(f"执行工具: {function_name} (ID: {tool_call_id})") # 发送工具开始事件 tool_display_id = f"tool_{iteration}_{function_name}_{time.time()}" monitor_snapshot = None snapshot_path = None memory_snapshot_type = None if function_name in MONITOR_FILE_TOOLS: snapshot_path = resolve_monitor_path(arguments) monitor_snapshot = capture_monitor_snapshot(snapshot_path) if monitor_snapshot: cache_monitor_snapshot(tool_display_id, 'before', monitor_snapshot) elif function_name in MONITOR_MEMORY_TOOLS: memory_snapshot_type = (arguments.get('memory_type') or 'main').lower() before_entries = None try: before_entries = resolve_monitor_memory(web_terminal.memory_manager._read_entries(memory_snapshot_type)) except Exception as exc: debug_log(f"[MonitorSnapshot] 读取记忆失败: {memory_snapshot_type} ({exc})") if before_entries is not None: monitor_snapshot = { 'memory_type': memory_snapshot_type, 'entries': before_entries } cache_monitor_snapshot(tool_display_id, 'before', monitor_snapshot) sender('tool_start', { 'id': tool_display_id, 'name': function_name, 'arguments': arguments, 'preparing_id': tool_call_id, 'monitor_snapshot': monitor_snapshot, 'conversation_id': conversation_id }) brief_log(f"调用了工具: {function_name}") await asyncio.sleep(0.3) start_time = time.time() # 执行工具 tool_result = await web_terminal.handle_tool_call(function_name, arguments) debug_log(f"工具结果: {tool_result[:200]}...") execution_time = time.time() - start_time if execution_time < 1.5: await asyncio.sleep(1.5 - execution_time) # 更新工具状态 result_data = {} try: result_data = json.loads(tool_result) except: result_data = {'output': tool_result} tool_failed = detect_tool_failure(result_data) action_status = 'completed' action_message = None awaiting_flag = False if function_name in {"write_file", "edit_file"}: diff_path = result_data.get("path") or arguments.get("file_path") summary = result_data.get("summary") or result_data.get("message") if summary: action_message = summary debug_log(f"{function_name} 执行完成: {summary or '无摘要'}") if function_name == "wait_sub_agent": system_msg = result_data.get("system_message") if system_msg: messages.append({ "role": "system", "content": system_msg }) sender('system_message', { 'content': system_msg, 'inline': False }) maybe_mark_failure_from_message(web_terminal, system_msg) monitor_snapshot_after = None if function_name in MONITOR_FILE_TOOLS: result_path = None if isinstance(result_data, dict): result_path = resolve_monitor_path(result_data) if not result_path: candidate_path = result_data.get('path') if isinstance(candidate_path, str) and candidate_path.strip(): result_path = candidate_path.strip() if not result_path: result_path = resolve_monitor_path(arguments, snapshot_path) or snapshot_path monitor_snapshot_after = capture_monitor_snapshot(result_path) elif function_name in MONITOR_MEMORY_TOOLS: memory_after_type = str( arguments.get('memory_type') or (isinstance(result_data, dict) and result_data.get('memory_type')) or memory_snapshot_type or 'main' ).lower() after_entries = None try: after_entries = resolve_monitor_memory(web_terminal.memory_manager._read_entries(memory_after_type)) except Exception as exc: debug_log(f"[MonitorSnapshot] 读取记忆失败(after): {memory_after_type} ({exc})") if after_entries is not None: monitor_snapshot_after = { 'memory_type': memory_after_type, 'entries': after_entries } update_payload = { 'id': tool_display_id, 'status': action_status, 'result': result_data, 'preparing_id': tool_call_id, 'conversation_id': conversation_id } if action_message: update_payload['message'] = action_message if awaiting_flag: update_payload['awaiting_content'] = True if monitor_snapshot_after: update_payload['monitor_snapshot_after'] = monitor_snapshot_after cache_monitor_snapshot(tool_display_id, 'after', monitor_snapshot_after) sender('update_action', update_payload) if function_name in ['create_file', 'delete_file', 'rename_file', 'create_folder']: structure = web_terminal.context_manager.get_project_structure() sender('file_tree_update', structure) # ===== 增量保存:立即保存工具结果 ===== metadata_payload = None if isinstance(result_data, dict): # 特殊处理 web_search:保留可供前端渲染的精简结构,以便历史记录复现搜索结果 if function_name == "web_search": try: tool_result_content = json.dumps(compact_web_search_result(result_data), ensure_ascii=False) except Exception: tool_result_content = tool_result else: tool_result_content = format_tool_result_for_context(function_name, result_data, tool_result) metadata_payload = {"tool_payload": result_data} else: tool_result_content = tool_result # 立即保存工具结果 web_terminal.context_manager.add_conversation( "tool", tool_result_content, tool_call_id=tool_call_id, name=function_name, metadata=metadata_payload ) debug_log(f"💾 增量保存:工具结果 {function_name}") system_message = result_data.get("system_message") if isinstance(result_data, dict) else None if system_message: web_terminal._record_sub_agent_message(system_message, result_data.get("task_id"), inline=False) maybe_mark_failure_from_message(web_terminal, system_message) # 添加到消息历史(用于API继续对话) messages.append({ "role": "tool", "tool_call_id": tool_call_id, "name": function_name, "content": tool_result_content }) # 收集图片注入请求,延后统一追加 if ( function_name == "view_image" and getattr(web_terminal, "pending_image_view", None) and not tool_failed and (isinstance(result_data, dict) and result_data.get("success") is not False) ): inj = web_terminal.pending_image_view web_terminal.pending_image_view = None if inj and inj.get("path"): image_injections.append(inj["path"]) if function_name not in {'write_file', 'edit_file'}: await process_sub_agent_updates(messages, inline=True, after_tool_call_id=tool_call_id) await asyncio.sleep(0.2) if tool_failed: mark_force_thinking(web_terminal, reason=f"{function_name}_failed") # 标记不再是第一次迭代 is_first_iteration = False # 统一附加图片消息,保证所有 tool 响应先完成 if image_injections: for img_path in image_injections: injected_text = "这是一条系统控制发送的信息,并非用户主动发送,目的是返回你需要查看的图片。" web_terminal.context_manager.add_conversation( "user", injected_text, images=[img_path], metadata={"system_injected_image": True} ) content_payload = web_terminal.context_manager._build_content_with_images( injected_text, [img_path] ) messages.append({ "role": "user", "content": content_payload, "metadata": {"system_injected_image": True} }) sender('system_message', { 'content': f'系统已按模型请求插入图片: {img_path}' }) # 最终统计 debug_log(f"\n{'='*40}") debug_log(f"任务完成统计:") debug_log(f" 总迭代次数: {total_iterations}") debug_log(f" 总工具调用: {total_tool_calls}") debug_log(f" 自动修复尝试: {auto_fix_attempts}") debug_log(f" 累积响应: {len(accumulated_response)} 字符") debug_log(f"{'='*40}\n") # 发送完成事件 sender('task_complete', { 'total_iterations': total_iterations, 'total_tool_calls': total_tool_calls, 'auto_fix_attempts': auto_fix_attempts }) @socketio.on('send_command') def handle_command(data): """处理系统命令""" command = data.get('command', '') username, terminal, _ = get_terminal_for_sid(request.sid) if not terminal: emit('error', {'message': 'System not initialized'}) return record_user_activity(username) if command.startswith('/'): command = command[1:] parts = command.split(maxsplit=1) cmd = parts[0].lower() if cmd == "clear": terminal.context_manager.conversation_history.clear() if terminal.thinking_mode: terminal.api_client.start_new_task(force_deep=terminal.deep_thinking_mode) emit('command_result', { 'command': cmd, 'success': True, 'message': '对话已清除' }) elif cmd == "status": status = terminal.get_status() # 添加终端状态 if terminal.terminal_manager: terminal_status = terminal.terminal_manager.list_terminals() status['terminals'] = terminal_status emit('command_result', { 'command': cmd, 'success': True, 'data': status }) elif cmd == "terminals": # 列出终端会话 if terminal.terminal_manager: result = terminal.terminal_manager.list_terminals() emit('command_result', { 'command': cmd, 'success': True, 'data': result }) else: emit('command_result', { 'command': cmd, 'success': False, 'message': '终端系统未初始化' }) else: emit('command_result', { 'command': cmd, 'success': False, 'message': f'未知命令: {cmd}' }) @app.route('/api/conversations//token-statistics', methods=['GET']) @api_login_required @with_terminal def get_conversation_token_statistics(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取特定对话的token统计""" try: stats = terminal.context_manager.get_conversation_token_statistics(conversation_id) if stats: return jsonify({ "success": True, "data": stats }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取token统计错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取token统计时发生异常" }), 500 @app.route('/api/conversations//tokens', methods=['GET']) @api_login_required @with_terminal def get_conversation_tokens(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话的当前完整上下文token数(包含所有动态内容)""" try: current_tokens = terminal.context_manager.get_current_context_tokens(conversation_id) return jsonify({ "success": True, "data": { "total_tokens": current_tokens } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 def calculate_directory_size(root: Path) -> int: if not root.exists(): return 0 total = 0 stack = [root] while stack: current = stack.pop() try: with os.scandir(current) as iterator: for entry in iterator: try: if entry.is_symlink(): continue if entry.is_file(follow_symlinks=False): total += entry.stat(follow_symlinks=False).st_size elif entry.is_dir(follow_symlinks=False): stack.append(Path(entry.path)) except (OSError, FileNotFoundError, PermissionError): continue except (NotADirectoryError, FileNotFoundError, PermissionError, OSError): continue return total def iso_datetime_from_epoch(epoch: Optional[float]) -> Optional[str]: if not epoch: return None try: return datetime.utcfromtimestamp(epoch).replace(microsecond=0).isoformat() + "Z" except (ValueError, OSError): return None def compute_workspace_storage(workspace: UserWorkspace) -> Dict[str, Any]: project_bytes = calculate_directory_size(workspace.project_path) data_bytes = calculate_directory_size(workspace.data_dir) logs_bytes = calculate_directory_size(workspace.logs_dir) quarantine_bytes = calculate_directory_size(workspace.quarantine_dir) uploads_bytes = calculate_directory_size(workspace.uploads_dir) backups_bytes = calculate_directory_size(workspace.data_dir / "backups") usage_percent = None if PROJECT_MAX_STORAGE_BYTES: usage_percent = round(project_bytes / PROJECT_MAX_STORAGE_BYTES * 100, 2) if project_bytes else 0.0 status = "ok" if usage_percent is not None: if usage_percent >= 95: status = "critical" elif usage_percent >= 80: status = "warning" return { "project_bytes": project_bytes, "data_bytes": data_bytes, "logs_bytes": logs_bytes, "quarantine_bytes": quarantine_bytes, "uploads_bytes": uploads_bytes, "backups_bytes": backups_bytes, "total_bytes": project_bytes + data_bytes + logs_bytes + quarantine_bytes, "limit_bytes": PROJECT_MAX_STORAGE_BYTES, "usage_percent": usage_percent, "status": status, } def collect_usage_snapshot(username: str, workspace: UserWorkspace, role: Optional[str]) -> Dict[str, Any]: tracker = get_or_create_usage_tracker(username, workspace) stats = tracker.get_stats() quotas = stats.get("quotas") or {} windows = stats.get("windows") or {} snapshot: Dict[str, Any] = {} for metric in ("fast", "thinking", "search"): window_meta = windows.get(metric) or {} quota_meta = quotas.get(metric) or {} default_limit = QUOTA_DEFAULTS.get("default", {}).get(metric, {}).get("limit", 0) snapshot[metric] = { "count": int(window_meta.get("count", 0) or 0), "window_start": window_meta.get("window_start"), "reset_at": window_meta.get("reset_at") or quota_meta.get("reset_at"), "limit": quota_meta.get("limit", default_limit), } snapshot["role"] = role or quotas.get("role") or "user" return snapshot def _read_token_totals_file(workspace: UserWorkspace) -> Dict[str, int]: path = workspace.data_dir / "token_totals.json" if not path.exists(): return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} try: with open(path, 'r', encoding='utf-8') as fh: payload = json.load(fh) or {} input_tokens = int(payload.get("input_tokens") or payload.get("total_input_tokens") or 0) output_tokens = int(payload.get("output_tokens") or payload.get("total_output_tokens") or 0) total_tokens = int(payload.get("total_tokens") or (input_tokens + output_tokens)) return { "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": total_tokens, } except (OSError, json.JSONDecodeError, ValueError) as exc: print(f"[admin] 解析 token_totals.json 失败 ({workspace.username}): {exc}") return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} def _collect_conversation_token_totals(workspace: UserWorkspace) -> Dict[str, int]: try: manager = ConversationManager(base_dir=workspace.data_dir) stats = manager.get_statistics() or {} token_stats = stats.get("token_statistics") or {} input_tokens = int(token_stats.get("total_input_tokens") or 0) output_tokens = int(token_stats.get("total_output_tokens") or 0) total_tokens = int(token_stats.get("total_tokens") or (input_tokens + output_tokens)) return { "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": total_tokens, } except Exception as exc: print(f"[admin] 读取 legacy token 统计失败 ({workspace.username}): {exc}") return {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} def collect_user_token_statistics(workspace: UserWorkspace) -> Dict[str, int]: """汇总单个用户在所有对话中的token累计数据。""" file_totals = _read_token_totals_file(workspace) legacy_totals = _collect_conversation_token_totals(workspace) return { "input_tokens": max(file_totals["input_tokens"], legacy_totals["input_tokens"]), "output_tokens": max(file_totals["output_tokens"], legacy_totals["output_tokens"]), "total_tokens": max(file_totals["total_tokens"], legacy_totals["total_tokens"]), } def compute_usage_leaders(users: List[Dict[str, Any]], metric: str, top_n: int = 5) -> List[Dict[str, Any]]: ranked = sorted( ( { "username": entry["username"], "count": entry.get("usage", {}).get(metric, {}).get("count", 0), "limit": entry.get("usage", {}).get(metric, {}).get("limit"), } for entry in users ), key=lambda item: item["count"], reverse=True, ) return [row for row in ranked[:top_n] if row["count"]] def collect_user_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: user_map = user_manager.list_users() items: List[Dict[str, Any]] = [] role_counter: Counter = Counter() usage_totals = {"fast": 0, "thinking": 0, "search": 0} token_totals = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} storage_total_bytes = 0 quarantine_total_bytes = 0 now = time.time() for username, record in user_map.items(): workspace = user_manager.ensure_user_workspace(username) storage = compute_workspace_storage(workspace) usage = collect_usage_snapshot(username, workspace, record.role) tokens = collect_user_token_statistics(workspace) storage_total_bytes += storage["total_bytes"] quarantine_total_bytes += storage["quarantine_bytes"] for metric in usage_totals: usage_totals[metric] += usage.get(metric, {}).get("count", 0) for key in token_totals: token_totals[key] += tokens.get(key, 0) normalized_role = (record.role or "user").lower() role_counter[normalized_role] += 1 handle = handle_map.get(username) handle_last = handle.get("last_active") if handle else None last_active = get_last_active_ts(username, handle_last) idle_seconds = max(0.0, now - last_active) if last_active else None items.append({ "username": username, "email": record.email, "role": record.role or "user", "created_at": record.created_at, "invite_code": record.invite_code, "storage": storage, "usage": usage, "tokens": tokens, "workspace": { "project_path": str(workspace.project_path), "data_dir": str(workspace.data_dir), "logs_dir": str(workspace.logs_dir), "uploads_dir": str(workspace.uploads_dir), }, "status": { "online": handle is not None, "container_mode": handle.get("mode") if handle else None, "last_active": iso_datetime_from_epoch(last_active), "idle_seconds": idle_seconds, }, }) items.sort(key=lambda entry: entry["username"]) return { "items": items, "roles": dict(role_counter), "usage_totals": usage_totals, "token_totals": token_totals, "storage_total_bytes": storage_total_bytes, "quarantine_total_bytes": quarantine_total_bytes, "active_users": sum(1 for entry in items if entry["status"]["online"]), "total_users": len(items), } def collect_container_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: items: List[Dict[str, Any]] = [] cpu_values: List[float] = [] mem_percent_values: List[float] = [] total_mem_used = 0 total_mem_limit = 0 total_net_rx = 0 total_net_tx = 0 docker_count = 0 failure_count = 0 now = time.time() for username, handle in sorted(handle_map.items()): try: status = container_manager.get_container_status(username) except Exception as exc: status = { "username": username, "mode": handle.get("mode"), "error": str(exc), "workspace_path": handle.get("workspace_path"), } stats = status.get("stats") or {} state = status.get("state") or {} if status.get("mode") == "docker": docker_count += 1 last_active = get_last_active_ts(username, handle.get("last_active")) idle_seconds = max(0.0, now - last_active) if last_active else None entry = { "username": username, "mode": status.get("mode", handle.get("mode")), "workspace_path": status.get("workspace_path") or handle.get("workspace_path"), "container_name": status.get("container_name") or handle.get("container_name"), "created_at": iso_datetime_from_epoch(status.get("created_at") or handle.get("created_at")), "last_active": iso_datetime_from_epoch(status.get("last_active") or last_active), "idle_seconds": idle_seconds, "stats": stats, "state": state, "error": status.get("error"), } if entry["error"] or (state and not state.get("running", True)): failure_count += 1 mem_info = stats.get("memory") or {} net_info = stats.get("net_io") or {} cpu_val = stats.get("cpu_percent") mem_percent = mem_info.get("percent") mem_used = mem_info.get("used_bytes") mem_limit = mem_info.get("limit_bytes") rx_bytes = net_info.get("rx_bytes") tx_bytes = net_info.get("tx_bytes") if isinstance(cpu_val, (int, float)): cpu_values.append(cpu_val) if isinstance(mem_percent, (int, float)): mem_percent_values.append(mem_percent) if isinstance(mem_used, (int, float)): total_mem_used += mem_used if isinstance(mem_limit, (int, float)): total_mem_limit += mem_limit if isinstance(rx_bytes, (int, float)): total_net_rx += rx_bytes if isinstance(tx_bytes, (int, float)): total_net_tx += tx_bytes items.append(entry) active_total = len(handle_map) summary = { "active": active_total, "docker": docker_count, "host": active_total - docker_count, "issues": failure_count, "max_containers": container_manager.max_containers, "available_slots": max(0, container_manager.max_containers - active_total) if container_manager.max_containers > 0 else None, "avg_cpu_percent": round(sum(cpu_values) / len(cpu_values), 2) if cpu_values else None, "avg_mem_percent": round(sum(mem_percent_values) / len(mem_percent_values), 2) if mem_percent_values else None, "total_mem_used_bytes": total_mem_used, "total_mem_limit_bytes": total_mem_limit, "net_rx_bytes": total_net_rx, "net_tx_bytes": total_net_tx, } return {"items": items, "summary": summary} def parse_upload_line(line: str) -> Optional[Dict[str, Any]]: marker = "UPLOAD_AUDIT " idx = line.find(marker) if idx == -1: return None payload = line[idx + len(marker):].strip() try: data = json.loads(payload) except json.JSONDecodeError: return None timestamp_value = data.get("timestamp") timestamp_dt = None if isinstance(timestamp_value, str): try: timestamp_dt = datetime.fromisoformat(timestamp_value) except ValueError: timestamp_dt = None data["_dt"] = timestamp_dt return data def collect_upload_events(limit: int = RECENT_UPLOAD_EVENT_LIMIT) -> List[Dict[str, Any]]: base_dir = (Path(LOGS_DIR).expanduser().resolve() / UPLOAD_SCAN_LOG_SUBDIR).resolve() events: List[Dict[str, Any]] = [] if not base_dir.exists(): return [] for log_file in sorted(base_dir.glob('*.log')): buffer: deque = deque(maxlen=limit) try: with open(log_file, 'r', encoding='utf-8') as fh: for line in fh: if 'UPLOAD_AUDIT' not in line: continue buffer.append(line.strip()) except OSError: continue for raw in buffer: event = parse_upload_line(raw) if event: events.append(event) events.sort(key=lambda item: item.get('_dt') or datetime.min, reverse=True) return events[:limit] def summarize_upload_events(events: List[Dict[str, Any]], quarantine_total_bytes: int) -> Dict[str, Any]: now = datetime.utcnow() cutoff = now - timedelta(hours=24) last_24h = [evt for evt in events if evt.get('_dt') and evt['_dt'] >= cutoff] accepted_24h = sum(1 for evt in last_24h if evt.get('accepted')) blocked_24h = len(last_24h) - accepted_24h skipped_24h = sum(1 for evt in last_24h if ((evt.get('scan') or {}).get('status') == 'skipped')) source_counter = Counter((evt.get('source') or 'unknown') for evt in events) sanitized_events: List[Dict[str, Any]] = [] for evt in events[:RECENT_UPLOAD_FEED_LIMIT]: sanitized_events.append({k: v for k, v in evt.items() if k != '_dt'}) return { "stats": { "total_tracked": len(events), "last_24h": len(last_24h), "accepted_last_24h": accepted_24h, "blocked_last_24h": blocked_24h, "skipped_scan_last_24h": skipped_24h, "quarantine_bytes": quarantine_total_bytes, }, "recent_events": sanitized_events, "sources": [{"source": src, "count": count} for src, count in source_counter.most_common()], } def summarize_invite_codes(codes: List[Dict[str, Any]]) -> Dict[str, int]: active = consumed = unlimited = 0 for code in codes: remaining = code.get('remaining') if remaining is None: unlimited += 1 elif remaining > 0: active += 1 else: consumed += 1 return { "total": len(codes), "active": active, "consumed": consumed, "unlimited": unlimited, } def build_admin_dashboard_snapshot() -> Dict[str, Any]: handle_map = container_manager.list_containers() user_data = collect_user_snapshots(handle_map) container_data = collect_container_snapshots(handle_map) invite_codes = user_manager.list_invite_codes() upload_events = collect_upload_events() uploads_summary = summarize_upload_events(upload_events, user_data['quarantine_total_bytes']) overview = { "generated_at": datetime.utcnow().replace(microsecond=0).isoformat() + "Z", "totals": { "users": user_data['total_users'], "active_users": user_data['active_users'], "containers_active": container_data['summary']['active'], "containers_max": container_data['summary']['max_containers'], "available_container_slots": container_data['summary']['available_slots'], }, "roles": user_data['roles'], "usage_totals": user_data['usage_totals'], "token_totals": user_data['token_totals'], "usage_leaders": { metric: compute_usage_leaders(user_data['items'], metric) for metric in ("fast", "thinking", "search") }, "storage": { "total_bytes": user_data['storage_total_bytes'], "per_user_limit_bytes": PROJECT_MAX_STORAGE_BYTES, "project_max_mb": PROJECT_MAX_STORAGE_MB, "warning_users": [ { "username": entry['username'], "usage_percent": entry['storage']['usage_percent'], "status": entry['storage']['status'], } for entry in user_data['items'] if entry['storage']['status'] != 'ok' ], }, "containers": container_data['summary'], "invites": summarize_invite_codes(invite_codes), "uploads": uploads_summary['stats'], } return { "generated_at": overview['generated_at'], "overview": overview, "users": user_data['items'], "containers": container_data['items'], "invites": { "summary": summarize_invite_codes(invite_codes), "codes": invite_codes, }, "uploads": uploads_summary, } def initialize_system(path: str, thinking_mode: bool = False): """初始化系统(多用户版本仅负责写日志和配置)""" # 清空或创建调试日志 DEBUG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True) with DEBUG_LOG_FILE.open('w', encoding='utf-8') as f: f.write(f"调试日志开始 - {datetime.now()}\n") f.write(f"项目路径: {path}\n") f.write(f"思考模式: {'思考模式' if thinking_mode else '快速模式'}\n") f.write(f"自动修复: {'开启' if AUTO_FIX_TOOL_CALL else '关闭'}\n") f.write(f"最大迭代: {MAX_ITERATIONS_PER_TASK}\n") f.write(f"最大工具调用: {MAX_TOTAL_TOOL_CALLS}\n") f.write("="*80 + "\n") print(f"[Init] 初始化Web系统...") print(f"[Init] 项目路径: {path}") print(f"[Init] 运行模式: {'思考模式(首次思考,后续快速)' if thinking_mode else '快速模式(无思考)'}") print(f"[Init] 自动修复: {'开启' if AUTO_FIX_TOOL_CALL else '关闭'}") print(f"[Init] 调试日志: {DEBUG_LOG_FILE}") app.config['DEFAULT_THINKING_MODE'] = thinking_mode app.config['DEFAULT_RUN_MODE'] = "thinking" if thinking_mode else "fast" print(f"{OUTPUT_FORMATS['success']} Web系统初始化完成(多用户模式)") def run_server(path: str, thinking_mode: bool = False, port: int = DEFAULT_PORT, debug: bool = False): """运行Web服务器""" initialize_system(path, thinking_mode) start_background_jobs() socketio.run( app, host='0.0.0.0', port=port, debug=debug, use_reloader=debug, allow_unsafe_werkzeug=True ) def parse_arguments(): parser = argparse.ArgumentParser(description="AI Agent Web Server") parser.add_argument( "--path", default=str(Path(DEFAULT_PROJECT_PATH).resolve()), help="项目工作目录(默认使用 config.DEFAULT_PROJECT_PATH)" ) parser.add_argument( "--port", type=int, default=DEFAULT_PORT, help=f"监听端口(默认 {DEFAULT_PORT})" ) parser.add_argument( "--debug", action="store_true", help="开发模式,启用 Flask/Socket.IO 热重载" ) parser.add_argument( "--thinking-mode", action="store_true", help="启用思考模式(首次请求使用 reasoning)" ) return parser.parse_args() if __name__ == "__main__": args = parse_arguments() run_server( path=args.path, thinking_mode=args.thinking_mode, port=args.port, debug=args.debug )