# web_server.py - Web服务器(修复版 - 确保text_end事件正确发送 + 停止功能) import asyncio import json import os import sys import re from typing import Dict, List, Optional, Callable, Any, Tuple from flask import Flask, request, jsonify, send_from_directory, session, redirect, send_file, Response from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect from flask_cors import CORS from werkzeug.exceptions import RequestEntityTooLarge from pathlib import Path from io import BytesIO import zipfile import argparse from functools import wraps from datetime import timedelta import time from datetime import datetime import threading from collections import defaultdict, deque from werkzeug.utils import secure_filename from werkzeug.routing import BaseConverter # 添加项目根目录到Python路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from core.web_terminal import WebTerminal from core.sub_agent_terminal import SubAgentTerminal from config import ( OUTPUT_FORMATS, AUTO_FIX_TOOL_CALL, AUTO_FIX_MAX_ATTEMPTS, MAX_ITERATIONS_PER_TASK, MAX_CONSECUTIVE_SAME_TOOL, MAX_TOTAL_TOOL_CALLS, TOOL_CALL_COOLDOWN, MAX_UPLOAD_SIZE, DEFAULT_CONVERSATIONS_LIMIT, MAX_CONVERSATIONS_LIMIT, CONVERSATIONS_DIR, DEFAULT_RESPONSE_MAX_TOKENS, DEFAULT_PROJECT_PATH, LOGS_DIR, AGENT_VERSION, SUB_AGENT_MAX_ACTIVE, SUB_AGENT_DEFAULT_TIMEOUT, SUB_AGENT_STATE_FILE, DATA_DIR, SUB_AGENT_TASKS_BASE_DIR, ) from modules.user_manager import UserManager, UserWorkspace from modules.gui_file_manager import GuiFileManager app = Flask(__name__, static_folder='static') app.config['MAX_CONTENT_LENGTH'] = MAX_UPLOAD_SIZE app.config['SECRET_KEY'] = 'your-secret-key-here' app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=12) CORS(app) socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') SUB_AGENT_INDEX_FILE = Path(app.static_folder).resolve() / "index.html" class ConversationIdConverter(BaseConverter): regex = r'(?:conv_)?\d{8}_\d{6}_\d{3}' app.url_map.converters['conv'] = ConversationIdConverter user_manager = UserManager() user_terminals: Dict[str, WebTerminal] = {} terminal_rooms: Dict[str, set] = {} connection_users: Dict[str, str] = {} stop_flags: Dict[str, Dict[str, Any]] = {} terminated_tasks: set = set() DEFAULT_PORT = 8092 # 子智能体任务管理 sub_agent_tasks: Dict[str, Dict[str, Any]] = {} sub_agent_terminals: Dict[str, SubAgentTerminal] = {} sub_agent_rooms: Dict[str, set] = defaultdict(set) sub_agent_connections: Dict[str, str] = {} SUB_AGENT_TERMINAL_STATUSES = {"completed", "failed", "timeout"} STOPPING_GRACE_SECONDS = 30 TERMINAL_ARCHIVE_GRACE_SECONDS = 20 def format_read_file_result(result_data: Dict) -> str: """格式化 read_file 工具的输出,便于在Web端展示。""" if not isinstance(result_data, dict): return json.dumps(result_data, ensure_ascii=False) if not result_data.get("success"): return json.dumps(result_data, ensure_ascii=False) read_type = result_data.get("type", "read") truncated_note = "(内容已截断)" if result_data.get("truncated") else "" path = result_data.get("path", "未知路径") max_chars = result_data.get("max_chars") max_note = f"(max_chars={max_chars})" if max_chars else "" if read_type == "read": header = f"读取 {path} 行 {result_data.get('line_start')}~{result_data.get('line_end')} {max_note}{truncated_note}".strip() content = result_data.get("content", "") return f"{header}\n```\n{content}\n```" if read_type == "search": query = result_data.get("query", "") actual = result_data.get("actual_matches", 0) returned = result_data.get("returned_matches", 0) case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写" header = ( f"在 {path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint}) " f"{max_note}{truncated_note}" ).strip() match_texts = [] for idx, match in enumerate(result_data.get("matches", []), 1): match_note = "(片段截断)" if match.get("truncated") else "" hits = match.get("hits") or [] hit_text = ", ".join(str(h) for h in hits) if hits else "无" label = match.get("id") or f"match_{idx}" snippet = match.get("snippet", "") match_texts.append( f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```" ) if not match_texts: match_texts.append("未找到匹配内容。") return "\n".join([header] + match_texts) if read_type == "extract": segments = result_data.get("segments", []) header = f"从 {path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip() seg_texts = [] for idx, segment in enumerate(segments, 1): seg_note = "(片段截断)" if segment.get("truncated") else "" label = segment.get("label") or f"segment_{idx}" snippet = segment.get("content", "") seg_texts.append( f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```" ) if not seg_texts: seg_texts.append("未提供可抽取的片段。") return "\n".join([header] + seg_texts) return json.dumps(result_data, ensure_ascii=False) def sanitize_filename_preserve_unicode(filename: str) -> str: """在保留中文等字符的同时,移除危险字符和路径成分""" if not filename: return "" cleaned = filename.strip().replace("\x00", "") if not cleaned: return "" # 去除路径成分 cleaned = cleaned.replace("\\", "/").split("/")[-1] # 替换不安全符号 cleaned = re.sub(r'[<>:"\\|?*\n\r\t]', "_", cleaned) # 去掉前后的点避免隐藏文件/穿越 cleaned = cleaned.strip(". ") if not cleaned: return "" # Windows/Unix 通用文件名长度安全上限 return cleaned[:255] def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], content: str) -> str: """将工具执行结果转为系统消息文本,方便在对话中回传。""" header = f"[工具结果] {tool_name}" if tool_call_id: header += f" (tool_call_id={tool_call_id})" body = (content or "").strip() if not body: body = "(无附加输出)" return f"{header}\n{body}" def _format_relative_path(path: Optional[str], workspace: Optional[str]) -> str: """将绝对路径转换为相对 workspace 的表示,默认返回原始路径。""" if not path: return "" try: target = Path(path).resolve() if workspace: base = Path(workspace).resolve() rel = target.relative_to(base) rel_text = rel.as_posix() if not rel_text or rel_text == ".": return "." return f"./{rel_text}" except Exception: pass return Path(path).as_posix() def build_sub_agent_instruction(meta: Dict[str, Any]) -> str: """根据任务元数据构建初始指令消息。""" workspace_dir = meta.get("workspace_dir") references_dir = meta.get("references_dir") deliverables_dir = meta.get("deliverables_dir") workspace_label = _format_relative_path(workspace_dir, workspace_dir) or "." references_label = _format_relative_path(references_dir, workspace_dir) or "references/" deliverables_label = _format_relative_path(deliverables_dir, workspace_dir) or "deliverables/" lines = [ f"子智能体任务 - 代号 {meta.get('agent_id')} (task_id={meta.get('task_id')})", "", meta.get("summary", ""), meta.get("task", ""), "", "工作区与目录说明:", f"- workspace: {workspace_label}(唯一可写根目录,所有操作只能在此路径内进行)", f"- references: {references_label}(系统已自动创建,仅供查阅,不得修改原文件)", f"- deliverables: {deliverables_label}(系统已自动创建,用于存放交付文件与 result.md)", "", "交付要求:", "- 所有成果放入 deliverables/ 下,保持清晰的目录结构;", "- deliverables/ 必须包含 result.md,说明完成情况、交付列表、遗留风险和下一步建议;", "- references/ 只读,如需引用请复制到 workspace/ 再处理;", "- 子智能体无法与主智能体实时沟通,如信息不足请在 result.md 中说明。", "- 最后必须调用 finish_sub_agent,并在 reason 中概括完成情况;若暂未完成,也要说明阻塞原因并继续执行直至可交付。", "", "提示:references/ 与 deliverables/ 目录均已自动创建,如需额外子目录请在 workspace/ 内自行管理。", "", "请立即分析任务、规划步骤并开始执行,过程中记录关键操作,确保交付即可直接被主智能体使用。", ] return "\n".join(lines) def build_finish_tool_reminder(meta: Dict[str, Any]) -> str: """构建提醒子智能体调用结束工具的提示语。""" workspace_dir = meta.get("workspace_dir") deliverables_dir = meta.get("deliverables_dir") deliverables_label = _format_relative_path(deliverables_dir, workspace_dir) or "deliverables" if deliverables_label in {"", "."}: result_path = "./result.md" else: normalized = deliverables_label.rstrip("/") result_path = f"{normalized}/result.md" return ( f"⚠️ 检测到你已停止输出,但尚未调用 finish_sub_agent。请确认 {deliverables_label} 内的交付文件与 {result_path} 已准备完毕," "再调用 finish_sub_agent(reason=...) 正式结束任务;如果任务仍未完成,请继续完成剩余步骤并在完成后立即调用 finish_sub_agent。" ) def cleanup_inactive_sub_agent_tasks(force: bool = False): """移除已结束或长期停止中的子智能体,避免占用名额。""" now = time.time() for task_id, task in list(sub_agent_tasks.items()): status = (task.get("status") or "").lower() if status in SUB_AGENT_TERMINAL_STATUSES: updated_at = task.get("updated_at") or task.get("created_at") or now if force or (now - updated_at) > TERMINAL_ARCHIVE_GRACE_SECONDS: _purge_sub_agent_task(task_id) continue if status == "stopping": updated_at = task.get("updated_at") or task.get("created_at") or now if force or (now - updated_at) > STOPPING_GRACE_SECONDS: _purge_sub_agent_task(task_id) def _purge_sub_agent_task(task_id: str): """移除本地记录与相关连接。""" sub_agent_tasks.pop(task_id, None) terminal = sub_agent_terminals.pop(task_id, None) if terminal and hasattr(terminal, "close"): try: terminal.close() except Exception: pass room_sids = sub_agent_rooms.pop(task_id, set()) for sid in list(room_sids): sub_agent_connections.pop(sid, None) for sid, current in list(sub_agent_connections.items()): if current == task_id: sub_agent_connections.pop(sid, None) for sid in list(stop_flags.keys()): if task_id in sid: stop_flags.pop(sid, None) def get_active_sub_agent_count(conversation_id: Optional[str] = None) -> int: cleanup_inactive_sub_agent_tasks() normalized = _normalize_conversation_id(conversation_id) count = 0 for task in sub_agent_tasks.values(): if task.get("status") not in {"pending", "running"}: continue if normalized: if _normalize_conversation_id(task.get("parent_conversation_id")) != normalized: continue count += 1 return count def find_sub_agent_conversation_file(conv_id: str) -> Optional[Path]: """在已知目录中搜索子智能体对话文件。""" possible_dirs = [] tasks_root = Path(SUB_AGENT_TASKS_BASE_DIR).expanduser().resolve() if tasks_root.exists(): possible_dirs.append(tasks_root) data_root = Path(DATA_DIR).expanduser().resolve() if data_root.exists(): possible_dirs.append(data_root) users_root = Path("users").resolve() if users_root.exists(): possible_dirs.append(users_root) for base in possible_dirs: try: matches = list(base.rglob(f"{conv_id}.json")) except Exception: matches = [] for match in matches: try: if match.name == f"{conv_id}.json": return match except Exception: continue return None def build_workspace_tree(root_path: str, max_depth: int = 5) -> Dict[str, Any]: """构建工作目录的文件树,用于子智能体只读视图。""" root = Path(root_path).expanduser().resolve() if not root.exists(): return { "path": str(root), "tree": {}, "folders": [], "files": [], "total_files": 0, "total_size": 0 } structure = { "path": str(root), "tree": {}, "folders": [], "files": [], "total_files": 0, "total_size": 0 } def scan_directory(path: Path, tree: Dict[str, Any], depth: int = 0): if depth > max_depth: return try: entries = sorted( [p for p in path.iterdir() if not p.name.startswith('.')], key=lambda p: (not p.is_dir(), p.name.lower()) ) except PermissionError: return for entry in entries: relative_path = str(entry.relative_to(root)) if entry.is_dir(): structure["folders"].append({ "name": entry.name, "path": relative_path }) tree[entry.name] = { "type": "folder", "path": relative_path, "children": {} } scan_directory(entry, tree[entry.name]["children"], depth + 1) else: try: size = entry.stat().st_size modified = datetime.fromtimestamp(entry.stat().st_mtime).isoformat() except OSError: size = 0 modified = "" structure["files"].append({ "name": entry.name, "path": relative_path, "size": size, "modified": modified }) structure["total_files"] += 1 structure["total_size"] += size tree[entry.name] = { "type": "file", "path": relative_path, "size": size, "modified": modified } scan_directory(root, structure["tree"]) return structure def _load_state_data() -> Dict[str, Any]: state_file = Path(SUB_AGENT_STATE_FILE).expanduser().resolve() if not state_file.exists(): return {} try: with state_file.open('r', encoding='utf-8') as f: return json.load(f) except Exception: return {} def load_persisted_task(task_id: str) -> Optional[Dict[str, Any]]: data = _load_state_data() tasks = data.get("tasks") or {} return tasks.get(task_id) def iter_persisted_tasks(): data = _load_state_data() tasks = data.get("tasks") or {} return list(tasks.values()) def get_task_record(task_id: str) -> Optional[Dict[str, Any]]: task = sub_agent_tasks.get(task_id) if task: return task return load_persisted_task(task_id) def broadcast_sub_agent_event(task_id: str, event_type: str, payload: Optional[Dict[str, Any]] = None): room = f"sub_agent_{task_id}" data = {"task_id": task_id} if payload: data.update(payload) socketio.emit(event_type, data, room=room) socketio.emit(f"event_{event_type}", data, room=room) def make_sub_agent_sender(task_id: str): def _sender(event_type: str, data: Optional[Dict]): if data is None: data = {} data = dict(data) data["task_id"] = task_id if event_type in {"tool_start", "tool_preparing"}: tool_name = data.get("tool_name") or data.get("tool") if tool_name: update_sub_agent_task(task_id, last_tool=tool_name, status="running") if event_type == "task_complete": update_sub_agent_task(task_id, last_tool=None) if event_type == "error": mark_task_failed(task_id, data.get("message") or "子智能体执行失败") broadcast_sub_agent_event(task_id, event_type, data) return _sender def _normalize_conversation_id(value: Optional[str]) -> Optional[str]: if not value: return None value = value.strip() if not value: return None return value if value.startswith("conv_") else f"conv_{value}" def _extract_parent_conversation(info: Optional[Dict[str, Any]]) -> Optional[str]: """尝试从任务记录中提取父对话ID。""" if not info: return None service_payload = info.get("service_payload") or {} candidates = [ info.get("parent_conversation_id"), info.get("conversation_id"), service_payload.get("parent_conversation_id"), ] for candidate in candidates: normalized = _normalize_conversation_id(candidate) if normalized: return normalized return None def _inject_sub_agent_script(html: str, task_id: str, parent_conv: Optional[str], sub_conv: Optional[str]) -> str: script = ( "" ) return html.replace("", f"{script}", 1) def _render_sub_agent_index(task_id: str, parent_conv: Optional[str], sub_conv: Optional[str]): if not SUB_AGENT_INDEX_FILE.exists(): return send_from_directory(app.static_folder, 'index.html') html = SUB_AGENT_INDEX_FILE.read_text(encoding="utf-8") patched = _inject_sub_agent_script(html, task_id, parent_conv, sub_conv) return Response(patched, mimetype="text/html") def _resolve_task_by_conv(conv_slug: Optional[str], task_label: str): normalized_conv = _normalize_conversation_id(conv_slug) agent_id = None if task_label.lower().startswith("sub_agent"): suffix = task_label[len("sub_agent"):] if suffix.isdigit(): agent_id = int(suffix) state_data = _load_state_data() stored_tasks = state_data.get("tasks") or {} conv_map = state_data.get("conversation_agents") or {} def _matches(info: Optional[Dict[str, Any]]) -> bool: if not info: return False parent_conv = _extract_parent_conversation(info) if normalized_conv and parent_conv != normalized_conv: return False if agent_id is not None and info.get("agent_id") != agent_id: return False if not normalized_conv and agent_id is None and info.get("task_id") != task_label: return False return True record = sub_agent_tasks.get(task_label) or stored_tasks.get(task_label) if not record: record = next((info for info in sub_agent_tasks.values() if _matches(info)), None) if not record: record = next((info for info in stored_tasks.values() if _matches(info)), None) if not record and normalized_conv and agent_id is not None: agent_list = conv_map.get(normalized_conv, []) if agent_list and agent_id in agent_list: for info in stored_tasks.values(): if info.get("agent_id") == agent_id and _extract_parent_conversation(info) == normalized_conv: record = info break actual_task_id = record.get("task_id") if record else task_label parent_conv = _extract_parent_conversation(record) or normalized_conv sub_conv = record.get("sub_conversation_id") if record else None return actual_task_id, parent_conv, sub_conv # 创建调试日志文件 DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log" UPLOAD_FOLDER_NAME = "user_upload" def is_logged_in() -> bool: return session.get('username') is not None def login_required(view_func): @wraps(view_func) def wrapped(*args, **kwargs): if not is_logged_in(): return redirect('/login') return view_func(*args, **kwargs) return wrapped def api_login_required(view_func): @wraps(view_func) def wrapped(*args, **kwargs): if not is_logged_in(): return jsonify({"error": "Unauthorized"}), 401 return view_func(*args, **kwargs) return wrapped def get_current_username() -> Optional[str]: return session.get('username') def make_terminal_callback(username: str): """生成面向指定用户的广播函数""" def _callback(event_type, data): try: socketio.emit(event_type, data, room=f"user_{username}") except Exception as exc: debug_log(f"广播事件失败 ({username}): {event_type} - {exc}") return _callback def attach_user_broadcast(terminal: WebTerminal, username: str): """确保终端的广播函数指向当前用户的房间""" callback = make_terminal_callback(username) terminal.message_callback = callback if terminal.terminal_manager: terminal.terminal_manager.broadcast = callback def get_user_resources(username: Optional[str] = None) -> Tuple[Optional[WebTerminal], Optional[UserWorkspace]]: username = (username or get_current_username()) if not username: return None, None workspace = user_manager.ensure_user_workspace(username) terminal = user_terminals.get(username) if not terminal: thinking_mode = session.get('thinking_mode', False) terminal = WebTerminal( project_path=str(workspace.project_path), thinking_mode=thinking_mode, message_callback=make_terminal_callback(username), data_dir=str(workspace.data_dir) ) if terminal.terminal_manager: terminal.terminal_manager.broadcast = terminal.message_callback user_terminals[username] = terminal else: attach_user_broadcast(terminal, username) return terminal, workspace def with_terminal(func): """注入用户专属终端和工作区""" @wraps(func) def wrapper(*args, **kwargs): username = get_current_username() terminal, workspace = get_user_resources(username) if not terminal or not workspace: return jsonify({"error": "System not initialized"}), 503 kwargs.update({ 'terminal': terminal, 'workspace': workspace, 'username': username }) return func(*args, **kwargs) return wrapper def get_terminal_for_sid(sid: str) -> Tuple[Optional[str], Optional[WebTerminal], Optional[UserWorkspace]]: username = connection_users.get(sid) if not username: return None, None, None terminal, workspace = get_user_resources(username) return username, terminal, workspace def get_gui_manager(workspace: UserWorkspace) -> GuiFileManager: """构建 GUI 文件管理器""" return GuiFileManager(str(workspace.project_path)) def ensure_conversation_loaded(terminal: WebTerminal, conversation_id: Optional[str], thinking_mode: bool) -> Tuple[str, bool]: """确保终端加载指定对话,若无则创建新的""" created_new = False if not conversation_id: result = terminal.create_new_conversation(thinking_mode=thinking_mode) if not result.get("success"): raise RuntimeError(result.get("message", "创建对话失败")) conversation_id = result["conversation_id"] created_new = True else: conversation_id = conversation_id if conversation_id.startswith('conv_') else f"conv_{conversation_id}" current_id = terminal.context_manager.current_conversation_id if current_id != conversation_id: load_result = terminal.load_conversation(conversation_id) if not load_result.get("success"): raise RuntimeError(load_result.get("message", "对话加载失败")) return conversation_id, created_new def reset_system_state(terminal: Optional[WebTerminal]): """完整重置系统状态,确保停止后能正常开始新任务""" if not terminal: return try: # 1. 重置API客户端状态 if hasattr(terminal, 'api_client') and terminal.api_client: debug_log("重置API客户端状态") terminal.api_client.start_new_task() # 重置思考模式状态 # 2. 重置主终端会话状态 if hasattr(terminal, 'current_session_id'): terminal.current_session_id += 1 # 开始新会话 debug_log(f"重置会话ID为: {terminal.current_session_id}") # 3. 清理读取文件跟踪器 debug_log("清理文件读取跟踪器") # 4. 重置Web特有的状态属性 web_attrs = ['streamingMessage', 'currentMessageIndex', 'preparingTools', 'activeTools'] for attr in web_attrs: if hasattr(terminal, attr): if attr in ['streamingMessage']: setattr(terminal, attr, False) elif attr in ['currentMessageIndex']: setattr(terminal, attr, -1) elif attr in ['preparingTools', 'activeTools'] and hasattr(getattr(terminal, attr), 'clear'): getattr(terminal, attr).clear() debug_log("系统状态重置完成") except Exception as e: debug_log(f"状态重置过程中出现错误: {e}") import traceback debug_log(f"错误详情: {traceback.format_exc()}") def debug_log(message): """写入调试日志""" DEBUG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True) with DEBUG_LOG_FILE.open('a', encoding='utf-8') as f: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] f.write(f"[{timestamp}] {message}\n") # 终端广播回调函数 def terminal_broadcast(event_type, data): """广播终端事件到所有订阅者""" try: # 对于全局事件,发送给所有连接的客户端 if event_type in ('todo_updated',): socketio.emit(event_type, data) # 全局广播,不限制房间 debug_log(f"全局广播{event_type}: {data}") else: # 其他终端事件发送到终端订阅者房间 socketio.emit(event_type, data, room='terminal_subscribers') # 如果是特定会话的事件,也发送到该会话的专属房间 if 'session' in data: session_room = f"terminal_{data['session']}" socketio.emit(event_type, data, room=session_room) debug_log(f"终端广播: {event_type} - {data}") except Exception as e: debug_log(f"终端广播错误: {e}") @app.route('/login', methods=['GET', 'POST']) def login(): """登录页面与认证""" if request.method == 'GET': if is_logged_in(): return redirect('/new') return app.send_static_file('login.html') data = request.get_json() or {} email = (data.get('email') or '').strip() password = data.get('password') or '' record = user_manager.authenticate(email, password) if not record: return jsonify({"success": False, "error": "账号或密码错误"}), 401 session['logged_in'] = True session['username'] = record.username session['thinking_mode'] = app.config.get('DEFAULT_THINKING_MODE', False) session.permanent = True user_manager.ensure_user_workspace(record.username) return jsonify({"success": True}) @app.route('/register', methods=['GET', 'POST']) def register(): """注册新用户(需要邀请码)""" if request.method == 'GET': if is_logged_in(): return redirect('/new') return app.send_static_file('register.html') data = request.get_json() or {} username = (data.get('username') or '').strip() email = (data.get('email') or '').strip() password = data.get('password') or '' invite_code = (data.get('invite_code') or '').strip() try: user_manager.register_user(username, email, password, invite_code) return jsonify({"success": True}) except ValueError as exc: return jsonify({"success": False, "error": str(exc)}), 400 except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 500 @app.route('/logout', methods=['POST']) def logout(): """退出登录""" username = session.get('username') session.clear() if username and username in user_terminals: user_terminals.pop(username, None) return jsonify({"success": True}) @app.route('/') @login_required def index(): """主页 -> 重定向到 /new""" return redirect('/new') @app.route('/new') @login_required def new_page(): return app.send_static_file('index.html') @app.route('/') @login_required def conversation_page(conversation_id): return app.send_static_file('index.html') @app.route('/terminal') @login_required def terminal_page(): """终端监控页面""" return app.send_static_file('terminal.html') @app.route('/file-manager') @login_required def gui_file_manager_page(): """桌面式文件管理器页面""" return send_from_directory(Path(app.static_folder) / 'file_manager', 'index.html') @app.route('/file-manager/editor') @login_required def gui_file_editor_page(): """GUI 文件编辑器页面""" return send_from_directory(Path(app.static_folder) / 'file_manager', 'editor.html') @app.route('/file-preview/') @login_required @with_terminal def gui_file_preview(relative_path: str, terminal: WebTerminal, workspace: UserWorkspace, username: str): manager = get_gui_manager(workspace) try: target = manager.prepare_download(relative_path) if not target.is_file(): return "预览仅支持文件", 400 return send_from_directory( directory=target.parent, path=target.name, mimetype='text/html' ) except Exception as exc: return f"无法预览文件: {exc}", 400 @app.route('/static/') def static_files(filename): """提供静态文件""" return send_from_directory('static', filename) @app.route('/api/status') @api_login_required @with_terminal def get_status(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取系统状态(增强版:包含对话信息)""" status = terminal.get_status() # 添加终端状态信息 if terminal.terminal_manager: terminal_status = terminal.terminal_manager.list_terminals() status['terminals'] = terminal_status # 【新增】添加当前对话的详细信息 try: current_conv = terminal.context_manager.current_conversation_id status['conversation'] = status.get('conversation', {}) status['conversation']['current_id'] = current_conv if current_conv and not current_conv.startswith('temp_'): current_conv_data = terminal.context_manager.conversation_manager.load_conversation(current_conv) if current_conv_data: status['conversation']['title'] = current_conv_data.get('title', '未知对话') status['conversation']['created_at'] = current_conv_data.get('created_at') status['conversation']['updated_at'] = current_conv_data.get('updated_at') except Exception as e: print(f"[Status] 获取当前对话信息失败: {e}") status['project_path'] = str(workspace.project_path) status['version'] = AGENT_VERSION return jsonify(status) @app.route('/api/files') @api_login_required @with_terminal def get_files(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取文件树""" structure = terminal.context_manager.get_project_structure() return jsonify(structure) # ========================================== # 新版 GUI 文件管理器 API # ========================================== def _format_entry(entry) -> Dict[str, Any]: return { "name": entry.name, "path": entry.path, "type": entry.type, "size": entry.size, "modified_at": entry.modified_at, "extension": entry.extension, "is_editable": entry.is_editable, } @app.route('/api/gui/files/entries', methods=['GET']) @api_login_required @with_terminal def gui_list_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): """列出指定目录内容""" relative_path = request.args.get('path') or "" manager = get_gui_manager(workspace) try: resolved_path, entries = manager.list_directory(relative_path) breadcrumb = manager.breadcrumb(resolved_path) return jsonify({ "success": True, "data": { "path": resolved_path, "breadcrumb": breadcrumb, "items": [_format_entry(entry) for entry in entries] } }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/create', methods=['POST']) @api_login_required @with_terminal def gui_create_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} parent = payload.get('path') or "" name = payload.get('name') or "" entry_type = payload.get('type') or "file" manager = get_gui_manager(workspace) try: new_path = manager.create_entry(parent, name, entry_type) return jsonify({ "success": True, "path": new_path }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/delete', methods=['POST']) @api_login_required @with_terminal def gui_delete_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} paths = payload.get('paths') or [] manager = get_gui_manager(workspace) try: result = manager.delete_entries(paths) return jsonify({ "success": True, "result": result }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/rename', methods=['POST']) @api_login_required @with_terminal def gui_rename_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} path = payload.get('path') new_name = payload.get('new_name') if not path or not new_name: return jsonify({"success": False, "error": "缺少 path 或 new_name"}), 400 manager = get_gui_manager(workspace) try: new_path = manager.rename_entry(path, new_name) return jsonify({ "success": True, "path": new_path }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/copy', methods=['POST']) @api_login_required @with_terminal def gui_copy_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} paths = payload.get('paths') or [] target_dir = payload.get('target_dir') or "" manager = get_gui_manager(workspace) try: result = manager.copy_entries(paths, target_dir) return jsonify({ "success": True, "result": result }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/move', methods=['POST']) @api_login_required @with_terminal def gui_move_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} paths = payload.get('paths') or [] target_dir = payload.get('target_dir') or "" manager = get_gui_manager(workspace) try: result = manager.move_entries(paths, target_dir) return jsonify({ "success": True, "result": result }) except Exception as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/gui/files/upload', methods=['POST']) @api_login_required @with_terminal def gui_upload_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str): if 'file' not in request.files: return jsonify({"success": False, "error": "未找到文件"}), 400 file_obj = request.files['file'] if not file_obj or not file_obj.filename: return jsonify({"success": False, "error": "文件名为空"}), 400 current_dir = request.form.get('path') or "" raw_name = request.form.get('filename') or file_obj.filename filename = sanitize_filename_preserve_unicode(raw_name) or secure_filename(raw_name) if not filename: return jsonify({"success": False, "error": "非法文件名"}), 400 manager = get_gui_manager(workspace) try: target_path = manager.prepare_upload(current_dir, filename) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 try: file_obj.save(target_path) except Exception as exc: return jsonify({"success": False, "error": f'保存文件失败: {exc}'}), 500 return jsonify({ "success": True, "path": manager._to_relative(target_path) }) @app.route('/api/gui/files/download', methods=['GET']) @api_login_required @with_terminal def gui_download_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str): path = request.args.get('path') if not path: return jsonify({"success": False, "error": "缺少 path"}), 400 manager = get_gui_manager(workspace) try: target = manager.prepare_download(path) if target.is_dir(): memory_file = BytesIO() with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(target): for file in files: full_path = Path(root) / file arcname = manager._to_relative(full_path) zf.write(full_path, arcname=arcname) memory_file.seek(0) download_name = f"{target.name}.zip" return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip') return send_file(target, as_attachment=True, download_name=target.name) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @app.route('/api/gui/files/download/batch', methods=['POST']) @api_login_required @with_terminal def gui_download_batch(terminal: WebTerminal, workspace: UserWorkspace, username: str): payload = request.get_json() or {} paths = payload.get('paths') or [] if not paths: return jsonify({"success": False, "error": "缺少待下载的路径"}), 400 manager = get_gui_manager(workspace) try: memory_file = BytesIO() with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for rel in paths: target = manager.prepare_download(rel) arc_base = rel.strip('/') or target.name if target.is_dir(): for root, _, files in os.walk(target): for file in files: full_path = Path(root) / file relative_sub = full_path.relative_to(target) arcname = Path(arc_base) / relative_sub zf.write(full_path, arcname=str(arcname)) else: zf.write(target, arcname=arc_base) memory_file.seek(0) download_name = f"selected_{len(paths)}.zip" return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip') except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @app.route('/api/gui/files/text', methods=['GET', 'POST']) @api_login_required @with_terminal def gui_text_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str): manager = get_gui_manager(workspace) if request.method == 'GET': path = request.args.get('path') if not path: return jsonify({"success": False, "error": "缺少 path"}), 400 try: content, modified = manager.read_text(path) return jsonify({ "success": True, "path": path, "content": content, "modified_at": modified }) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 payload = request.get_json() or {} path = payload.get('path') content = payload.get('content') if path is None or content is None: return jsonify({"success": False, "error": "缺少 path 或 content"}), 400 try: result = manager.write_text(path, content) return jsonify({"success": True, "data": result}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @app.route('/api/focused') @api_login_required @with_terminal def get_focused_files(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取聚焦文件""" focused = {} for path, content in terminal.focused_files.items(): focused[path] = { "content": content, "size": len(content), "lines": content.count('\n') + 1 } return jsonify(focused) @app.route('/api/todo-list') @api_login_required @with_terminal def get_todo_list(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取当前待办列表""" todo_snapshot = terminal.context_manager.get_todo_snapshot() return jsonify({ "success": True, "data": todo_snapshot }) @app.route('/api/upload', methods=['POST']) @api_login_required @with_terminal def upload_file(terminal: WebTerminal, workspace: UserWorkspace, username: str): """处理前端文件上传请求""" if 'file' not in request.files: return jsonify({ "success": False, "error": "未找到文件", "message": "请求中缺少文件字段" }), 400 uploaded_file = request.files['file'] original_name = (request.form.get('filename') or '').strip() if not uploaded_file or not uploaded_file.filename or uploaded_file.filename.strip() == '': return jsonify({ "success": False, "error": "文件名为空", "message": "请选择要上传的文件" }), 400 raw_name = original_name or uploaded_file.filename filename = sanitize_filename_preserve_unicode(raw_name) if not filename: filename = secure_filename(raw_name) if not filename: return jsonify({ "success": False, "error": "非法文件名", "message": "文件名包含不支持的字符" }), 400 file_manager = getattr(terminal, 'file_manager', None) if file_manager is None: return jsonify({ "success": False, "error": "文件管理器未初始化" }), 500 target_folder_relative = UPLOAD_FOLDER_NAME valid_folder, folder_error, folder_path = file_manager._validate_path(target_folder_relative) if not valid_folder: return jsonify({ "success": False, "error": folder_error }), 400 try: folder_path.mkdir(parents=True, exist_ok=True) except Exception as exc: return jsonify({ "success": False, "error": f"创建上传目录失败: {exc}" }), 500 target_relative = str(Path(target_folder_relative) / filename) valid_file, file_error, target_full_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = target_full_path if final_path.exists(): stem = final_path.stem suffix = final_path.suffix counter = 1 while final_path.exists(): candidate_name = f"{stem}_{counter}{suffix}" target_relative = str(Path(target_folder_relative) / candidate_name) valid_file, file_error, candidate_path = file_manager._validate_path(target_relative) if not valid_file: return jsonify({ "success": False, "error": file_error }), 400 final_path = candidate_path counter += 1 try: uploaded_file.save(final_path) except Exception as exc: return jsonify({ "success": False, "error": f"保存文件失败: {exc}" }), 500 relative_path = str(final_path.relative_to(workspace.project_path)) print(f"{OUTPUT_FORMATS['file']} 上传文件: {relative_path}") return jsonify({ "success": True, "path": relative_path, "filename": final_path.name, "folder": target_folder_relative }) @app.errorhandler(RequestEntityTooLarge) def handle_file_too_large(error): """全局捕获上传超大小""" size_mb = MAX_UPLOAD_SIZE / (1024 * 1024) return jsonify({ "success": False, "error": "文件过大", "message": f"单个文件大小不可超过 {size_mb:.1f} MB" }), 413 @app.route('/api/download/file') @api_login_required @with_terminal def download_file_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """下载单个文件""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_file(): return jsonify({"success": False, "error": "文件不存在"}), 404 return send_file( full_path, as_attachment=True, download_name=full_path.name ) @app.route('/api/download/folder') @api_login_required @with_terminal def download_folder_api(terminal: WebTerminal, workspace: UserWorkspace, username: str): """打包并下载文件夹""" path = (request.args.get('path') or '').strip() if not path: return jsonify({"success": False, "error": "缺少路径参数"}), 400 valid, error, full_path = terminal.file_manager._validate_path(path) if not valid or full_path is None: return jsonify({"success": False, "error": error or "路径校验失败"}), 400 if not full_path.exists() or not full_path.is_dir(): return jsonify({"success": False, "error": "文件夹不存在"}), 404 buffer = BytesIO() folder_name = Path(path).name or full_path.name or "archive" with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_buffer: # 确保目录本身被包含 zip_buffer.write(full_path, arcname=folder_name + '/') for item in full_path.rglob('*'): relative_name = Path(folder_name) / item.relative_to(full_path) if item.is_dir(): zip_buffer.write(item, arcname=str(relative_name) + '/') else: zip_buffer.write(item, arcname=str(relative_name)) buffer.seek(0) return send_file( buffer, mimetype='application/zip', as_attachment=True, download_name=f"{folder_name}.zip" ) @app.route('/api/tool-settings', methods=['GET', 'POST']) @api_login_required @with_terminal def tool_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取或更新工具启用状态""" if request.method == 'GET': snapshot = terminal.get_tool_settings_snapshot() return jsonify({ "success": True, "categories": snapshot }) data = request.get_json() or {} category = data.get('category') if category is None: return jsonify({ "success": False, "error": "缺少类别参数", "message": "请求体需要提供 category 字段" }), 400 if 'enabled' not in data: return jsonify({ "success": False, "error": "缺少启用状态", "message": "请求体需要提供 enabled 字段" }), 400 try: enabled = bool(data['enabled']) terminal.set_tool_category_enabled(category, enabled) snapshot = terminal.get_tool_settings_snapshot() socketio.emit('tool_settings_updated', { 'categories': snapshot }, room=f"user_{username}") return jsonify({ "success": True, "categories": snapshot }) except ValueError as exc: return jsonify({ "success": False, "error": str(exc) }), 400 @app.route('/api/terminals') @api_login_required @with_terminal def get_terminals(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取终端会话列表""" if terminal.terminal_manager: result = terminal.terminal_manager.list_terminals() return jsonify(result) else: return jsonify({"sessions": [], "active": None, "total": 0}) @socketio.on('connect') def handle_connect(): """客户端连接""" print(f"[WebSocket] 客户端连接: {request.sid}") username = get_current_username() if not username: connection_users[request.sid] = None emit('connected', {'status': 'guest'}) return emit('connected', {'status': 'Connected to server'}) connection_users[request.sid] = username # 清理可能存在的停止标志和状态 stop_flags.pop(request.sid, None) join_room(f"user_{username}") join_room(f"user_{username}_terminal") if request.sid not in terminal_rooms: terminal_rooms[request.sid] = set() terminal_rooms[request.sid].update({f"user_{username}", f"user_{username}_terminal"}) terminal, workspace = get_user_resources(username) if terminal: reset_system_state(terminal) emit('system_ready', { 'project_path': str(workspace.project_path), 'thinking_mode': terminal.get_thinking_mode_status(), 'version': AGENT_VERSION }, room=request.sid) if terminal.terminal_manager: terminals = terminal.terminal_manager.get_terminal_list() emit('terminal_list_update', { 'terminals': terminals, 'active': terminal.terminal_manager.active_terminal }, room=request.sid) if terminal.terminal_manager.active_terminal: for name, term in terminal.terminal_manager.terminals.items(): emit('terminal_started', { 'session': name, 'working_dir': str(term.working_dir), 'shell': term.shell_command, 'time': term.start_time.isoformat() if term.start_time else None }, room=request.sid) @socketio.on('disconnect') def handle_disconnect(): """客户端断开""" print(f"[WebSocket] 客户端断开: {request.sid}") username = connection_users.pop(request.sid, None) # 清理停止标志 stop_flags.pop(request.sid, None) # 从所有房间移除 for room in list(terminal_rooms.get(request.sid, [])): leave_room(room) if request.sid in terminal_rooms: del terminal_rooms[request.sid] if username: leave_room(f"user_{username}") leave_room(f"user_{username}_terminal") task_id = sub_agent_connections.pop(request.sid, None) if task_id: room = f"sub_agent_{task_id}" leave_room(room) if request.sid in sub_agent_rooms.get(task_id, set()): sub_agent_rooms[task_id].remove(request.sid) @socketio.on('stop_task') def handle_stop_task(): """处理停止任务请求""" print(f"[停止] 收到停止请求: {request.sid}") # 检查是否有正在运行的任务 if request.sid in stop_flags and isinstance(stop_flags[request.sid], dict): # 获取任务引用并取消 task_info = stop_flags[request.sid] if 'task' in task_info and not task_info['task'].done(): debug_log(f"正在取消任务: {request.sid}") task_info['task'].cancel() # 设置停止标志 task_info['stop'] = True if task_info.get('terminal'): reset_system_state(task_info['terminal']) else: # 如果没有任务引用,使用旧的布尔标志 stop_flags[request.sid] = True emit('stop_requested', { 'message': '停止请求已接收,正在取消任务...' }) @socketio.on('terminal_subscribe') def handle_terminal_subscribe(data): """订阅终端事件""" session_name = data.get('session') subscribe_all = data.get('all', False) username, terminal, _ = get_terminal_for_sid(request.sid) if not username or not terminal or not terminal.terminal_manager: emit('error', {'message': 'Terminal system not initialized'}) return if request.sid not in terminal_rooms: terminal_rooms[request.sid] = set() if subscribe_all: # 订阅所有终端事件 room_name = f"user_{username}_terminal" join_room(room_name) terminal_rooms[request.sid].add(room_name) print(f"[Terminal] {request.sid} 订阅所有终端事件") # 发送当前终端状态 emit('terminal_subscribed', { 'type': 'all', 'terminals': terminal.terminal_manager.get_terminal_list() }) elif session_name: # 订阅特定终端会话 room_name = f'user_{username}_terminal_{session_name}' join_room(room_name) terminal_rooms[request.sid].add(room_name) print(f"[Terminal] {request.sid} 订阅终端: {session_name}") # 发送该终端的当前输出 output_result = terminal.terminal_manager.get_terminal_output(session_name, 100) if output_result['success']: emit('terminal_history', { 'session': session_name, 'output': output_result['output'] }) @socketio.on('terminal_unsubscribe') def handle_terminal_unsubscribe(data): """取消订阅终端事件""" session_name = data.get('session') username = connection_users.get(request.sid) if session_name: room_name = f'user_{username}_terminal_{session_name}' if username else f'terminal_{session_name}' leave_room(room_name) if request.sid in terminal_rooms: terminal_rooms[request.sid].discard(room_name) print(f"[Terminal] {request.sid} 取消订阅终端: {session_name}") @socketio.on('get_terminal_output') def handle_get_terminal_output(data): """获取终端输出历史""" session_name = data.get('session') lines = data.get('lines', 50) username, terminal, _ = get_terminal_for_sid(request.sid) if not terminal or not terminal.terminal_manager: emit('error', {'message': 'Terminal system not initialized'}) return result = terminal.terminal_manager.get_terminal_output(session_name, lines) if result['success']: emit('terminal_output_history', { 'session': session_name, 'output': result['output'], 'is_interactive': result.get('is_interactive', False), 'last_command': result.get('last_command', '') }) else: emit('error', {'message': result['error']}) @socketio.on('send_message') def handle_message(data): """处理用户消息""" username, terminal, workspace = get_terminal_for_sid(request.sid) if not terminal: emit('error', {'message': 'System not initialized'}) return message = (data.get('message') or '').strip() if not message: emit('error', {'message': '消息不能为空'}) return print(f"[WebSocket] 收到消息: {message}") debug_log(f"\n{'='*80}\n新任务开始: {message}\n{'='*80}") requested_conversation_id = data.get('conversation_id') try: conversation_id, created_new = ensure_conversation_loaded(terminal, requested_conversation_id, terminal.thinking_mode) except RuntimeError as exc: emit('error', {'message': str(exc)}) return try: conv_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) or {} except Exception: conv_data = {} title = conv_data.get('title', '新对话') socketio.emit('conversation_resolved', { 'conversation_id': conversation_id, 'title': title, 'created': created_new }, room=request.sid) if created_new: socketio.emit('conversation_list_update', { 'action': 'created', 'conversation_id': conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': conversation_id, 'title': title }, room=request.sid) client_sid = request.sid def send_to_client(event_type, data): """发送消息到客户端""" socketio.emit(event_type, data, room=client_sid) # 传递客户端ID socketio.start_background_task(process_message_task, terminal, message, send_to_client, client_sid) @socketio.on('sub_agent_join') def handle_sub_agent_join(data): task_id = (data or {}).get("task_id") task = sub_agent_tasks.get(task_id) if not task: emit('sub_agent_status', {"task_id": task_id, "status": "unknown", "message": "任务不存在"}) return room = f"sub_agent_{task_id}" join_room(room) sub_agent_connections[request.sid] = task_id sub_agent_rooms[task_id].add(request.sid) emit('sub_agent_status', { "task_id": task_id, "status": task.get("status"), "summary": task.get("summary"), "last_tool": task.get("last_tool"), "deliverables_dir": task.get("deliverables_dir"), }) @socketio.on('sub_agent_leave') def handle_sub_agent_leave(): task_id = sub_agent_connections.pop(request.sid, None) if not task_id: return room = f"sub_agent_{task_id}" leave_room(room) if request.sid in sub_agent_rooms.get(task_id, set()): sub_agent_rooms[task_id].remove(request.sid) @socketio.on('sub_agent_message') def handle_sub_agent_message(data): task_id = (data or {}).get("task_id") message = (data or {}).get("message", "").strip() if not task_id or not message: emit('sub_agent_status', { "task_id": task_id, "status": "error", "message": "缺少任务ID或消息内容" }) return result = send_message_to_sub_agent(task_id, message) if not result.get("success"): emit('sub_agent_status', { "task_id": task_id, "status": "error", "message": result.get("error") }) # 在 web_server.py 中添加以下对话管理API接口 # 添加在现有路由之后,@socketio 事件处理之前 # ========================================== # 对话管理API接口 # ========================================== @app.route('/api/conversations', methods=['GET']) @api_login_required @with_terminal def get_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话列表""" try: # 获取查询参数 limit = request.args.get('limit', 20, type=int) offset = request.args.get('offset', 0, type=int) # 限制参数范围 limit = max(1, min(limit, 100)) # 限制在1-100之间 offset = max(0, offset) result = terminal.get_conversations_list(limit=limit, offset=offset) if result["success"]: return jsonify({ "success": True, "data": result["data"] }) else: return jsonify({ "success": False, "error": result.get("error", "Unknown error"), "message": result.get("message", "获取对话列表失败") }), 500 except Exception as e: print(f"[API] 获取对话列表错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话列表时发生异常" }), 500 @app.route('/api/conversations', methods=['POST']) @api_login_required @with_terminal def create_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str): """创建新对话""" try: data = request.get_json() or {} thinking_mode = data.get('thinking_mode', terminal.thinking_mode) result = terminal.create_new_conversation(thinking_mode=thinking_mode) if result["success"]: # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'created', 'conversation_id': result["conversation_id"] }, room=f"user_{username}") # 广播当前对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': result["conversation_id"], 'title': "新对话" }, room=f"user_{username}") return jsonify(result), 201 else: return jsonify(result), 500 except Exception as e: print(f"[API] 创建对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "创建对话时发生异常" }), 500 @app.route('/api/conversations/', methods=['GET']) @api_login_required @with_terminal def get_conversation_info(terminal: WebTerminal, workspace: UserWorkspace, username: str, conversation_id): """获取特定对话信息""" try: # 通过ConversationManager直接获取对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: # 提取关键信息,不返回完整消息内容(避免数据量过大) info = { "id": conversation_data["id"], "title": conversation_data["title"], "created_at": conversation_data["created_at"], "updated_at": conversation_data["updated_at"], "metadata": conversation_data["metadata"], "messages_count": len(conversation_data.get("messages", [])) } return jsonify({ "success": True, "data": info }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话信息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话信息时发生异常" }), 500 @app.route('/api/conversations//load', methods=['PUT']) @api_login_required @with_terminal def load_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """加载特定对话""" try: result = terminal.load_conversation(conversation_id) if result["success"]: # 广播对话切换事件 socketio.emit('conversation_changed', { 'conversation_id': conversation_id, 'title': result.get("title", "未知对话"), 'messages_count': result.get("messages_count", 0) }, room=f"user_{username}") # 广播系统状态更新(因为当前对话改变了) status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") # 清理和重置相关UI状态 socketio.emit('conversation_loaded', { 'conversation_id': conversation_id, 'clear_ui': True # 提示前端清理当前UI状态 }, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 加载对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "加载对话时发生异常" }), 500 @app.route('/api/conversations/', methods=['DELETE']) @api_login_required @with_terminal def delete_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """删除特定对话""" try: # 检查是否是当前对话 is_current = (terminal.context_manager.current_conversation_id == conversation_id) result = terminal.delete_conversation(conversation_id) if result["success"]: # 广播对话列表更新事件 socketio.emit('conversation_list_update', { 'action': 'deleted', 'conversation_id': conversation_id }, room=f"user_{username}") # 如果删除的是当前对话,广播对话清空事件 if is_current: socketio.emit('conversation_changed', { 'conversation_id': None, 'title': None, 'cleared': True }, room=f"user_{username}") # 更新系统状态 status = terminal.get_status() socketio.emit('status_update', status, room=f"user_{username}") return jsonify(result) else: return jsonify(result), 404 if "不存在" in result.get("message", "") else 500 except Exception as e: print(f"[API] 删除对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "删除对话时发生异常" }), 500 @app.route('/api/conversations/search', methods=['GET']) @api_login_required @with_terminal def search_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str): """搜索对话""" try: query = request.args.get('q', '').strip() limit = request.args.get('limit', 20, type=int) if not query: return jsonify({ "success": False, "error": "Missing query parameter", "message": "请提供搜索关键词" }), 400 # 限制参数范围 limit = max(1, min(limit, 50)) result = terminal.search_conversations(query, limit) return jsonify({ "success": True, "data": { "results": result["results"], "count": result["count"], "query": query } }) except Exception as e: print(f"[API] 搜索对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "搜索对话时发生异常" }), 500 @app.route('/api/conversations//messages', methods=['GET']) @api_login_required @with_terminal def get_conversation_messages(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """获取对话的消息历史(可选功能,用于调试或详细查看)""" try: # 获取完整对话数据 conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) if conversation_data: messages = conversation_data.get("messages", []) # 可选:限制消息数量,避免返回过多数据 limit = request.args.get('limit', type=int) if limit: messages = messages[-limit:] # 获取最后N条消息 return jsonify({ "success": True, "data": { "conversation_id": conversation_id, "messages": messages, "total_count": len(conversation_data.get("messages", [])) } }) else: return jsonify({ "success": False, "error": "Conversation not found", "message": f"对话 {conversation_id} 不存在" }), 404 except Exception as e: print(f"[API] 获取对话消息错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "获取对话消息时发生异常" }), 500 @app.route('/api/conversations//compress', methods=['POST']) @api_login_required @with_terminal def compress_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str): """压缩指定对话的大体积消息,生成压缩版新对话""" try: result = terminal.context_manager.compress_conversation(conversation_id) if not result.get("success"): status_code = 404 if "不存在" in result.get("error", "") else 400 return jsonify(result), status_code new_conversation_id = result["compressed_conversation_id"] load_result = terminal.load_conversation(new_conversation_id) if load_result.get("success"): socketio.emit('conversation_list_update', { 'action': 'compressed', 'conversation_id': new_conversation_id }, room=f"user_{username}") socketio.emit('conversation_changed', { 'conversation_id': new_conversation_id, 'title': load_result.get('title', '压缩后的对话'), 'messages_count': load_result.get('messages_count', 0) }, room=f"user_{username}") socketio.emit('conversation_loaded', { 'conversation_id': new_conversation_id, 'clear_ui': True }, room=f"user_{username}") response_payload = { "success": True, "compressed_conversation_id": new_conversation_id, "compressed_types": result.get("compressed_types", []), "system_message": result.get("system_message"), "load_result": load_result } return jsonify(response_payload) except Exception as e: print(f"[API] 压缩对话错误: {e}") return jsonify({ "success": False, "error": str(e), "message": "压缩对话时发生异常" }), 500 # ========================================== # 子智能体任务调度核心 # ========================================== def create_sub_agent_task(payload: Dict[str, Any]) -> Dict[str, Any]: required_fields = [ "task_id", "agent_id", "summary", "task", "workspace_dir", "references_dir", "deliverables_dir", "target_project_dir", "conversation_storage_dir", ] for field in required_fields: if not payload.get(field): return {"success": False, "error": f"缺少必要参数: {field}"} cleanup_inactive_sub_agent_tasks() task_id = payload["task_id"] if task_id in sub_agent_tasks: return {"success": False, "error": f"任务 {task_id} 已存在"} parent_conv = payload.get("parent_conversation_id") if get_active_sub_agent_count(parent_conv) >= SUB_AGENT_MAX_ACTIVE: limit_note = f"同一对话最多可同时运行 {SUB_AGENT_MAX_ACTIVE} 个子智能体" return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试({limit_note})。"} workspace_dir = Path(payload["workspace_dir"]).resolve() references_dir = Path(payload["references_dir"]).resolve() deliverables_dir = Path(payload["deliverables_dir"]).resolve() conversation_storage_root = Path(payload["conversation_storage_dir"]).expanduser().resolve() conversation_storage_root.mkdir(parents=True, exist_ok=True) conversation_data_dir = (conversation_storage_root / f"conv_{task_id}").resolve() conversation_data_dir.mkdir(parents=True, exist_ok=True) metadata = { "task_id": task_id, "agent_id": payload["agent_id"], "summary": payload.get("summary", ""), "task": payload.get("task", ""), "workspace_dir": str(workspace_dir), "references_dir": str(references_dir), "deliverables_dir": str(deliverables_dir), "target_project_dir": payload.get("target_project_dir"), } terminal = SubAgentTerminal( workspace_dir=str(workspace_dir), data_dir=str(conversation_data_dir), metadata=metadata, message_callback=None, ) terminal.task_id = task_id def _finish_hook(result: Dict[str, Any]): reason = result.get("reason", "") mark_task_completed(task_id, reason) terminal.set_finish_callback(_finish_hook) terminal._ensure_conversation() sub_conversation_id = terminal.context_manager.current_conversation_id sub_agent_terminals[task_id] = terminal sub_agent_tasks[task_id] = { **metadata, "status": "running", "created_at": time.time(), "updated_at": time.time(), "timeout_seconds": payload.get("timeout_seconds") or SUB_AGENT_DEFAULT_TIMEOUT, "parent_conversation_id": payload.get("parent_conversation_id"), "reference_manifest": payload.get("reference_manifest", []), "conversation_data_dir": str(conversation_data_dir), "sub_conversation_id": sub_conversation_id, "last_client_sid": None, } initial_message = build_sub_agent_instruction(metadata) sender = make_sub_agent_sender(task_id) terminal.context_manager._web_terminal_callback = sender client_sid = f"sub_agent::{task_id}::{int(time.time()*1000)}" update_sub_agent_task(task_id, last_client_sid=client_sid) socketio.start_background_task(process_message_task, terminal, initial_message, sender, client_sid) return { "success": True, "status": "running", "task_id": task_id, "message": "子智能体任务已启动", "deliverables_dir": str(deliverables_dir), "sub_conversation_id": sub_conversation_id, } def send_message_to_sub_agent(task_id: str, message: str) -> Dict[str, Any]: terminal = sub_agent_terminals.get(task_id) if not terminal: return {"success": False, "error": "未找到该子智能体"} if sub_agent_tasks.get(task_id, {}).get("status") in {"completed", "failed", "timeout"}: return {"success": False, "error": "任务已结束,无法继续发送消息"} sender = make_sub_agent_sender(task_id) client_sid = f"sub_agent::{task_id}::{int(time.time()*1000)}" update_sub_agent_task(task_id, last_client_sid=client_sid) socketio.start_background_task(process_message_task, terminal, message, sender, client_sid) return {"success": True, "message": "消息已发送"} def stop_sub_agent_task(task_id: str) -> Dict[str, Any]: task = sub_agent_tasks.get(task_id) if not task: return {"success": False, "error": "未找到任务"} client_sid = task.get("last_client_sid") if not client_sid or client_sid not in stop_flags: return {"success": False, "error": "当前没有可停止的执行"} stop_flags[client_sid]["stop"] = True update_sub_agent_task(task_id, status="stopping") return {"success": True, "message": "已发送停止指令"} def force_terminate_sub_agent(task_id: str) -> Dict[str, Any]: cleanup_inactive_sub_agent_tasks() task = sub_agent_tasks.get(task_id) if not task: return {"success": False, "error": "任务不存在"} client_sid = task.get("last_client_sid") if client_sid and client_sid in stop_flags: stop_flags[client_sid]["stop"] = True terminated_tasks.add(task_id) terminal = sub_agent_terminals.get(task_id) if terminal: try: reset_system_state(terminal) except Exception: pass update_sub_agent_task(task_id, status="terminated") broadcast_sub_agent_event(task_id, "sub_agent_status", { "status": "terminated", "message": "子智能体已被手动关闭" }) _purge_sub_agent_task(task_id) return { "success": True, "status": "terminated", "message": "子智能体已被手动关闭", "system_message": "🛑 子智能体已被手动关闭。" } # ========================================== # 子智能体任务API(主智能体调用,免登录) # ========================================== @app.route('/tasks', methods=['POST']) def api_create_sub_agent_task(): data = request.get_json(silent=True) or {} result = create_sub_agent_task(data) status_code = 200 if result.get("success") else 400 return jsonify(result), status_code @app.route('/tasks/', methods=['GET']) def api_get_sub_agent_task(task_id: str): task = get_task_record(task_id) from_store = task_id not in sub_agent_tasks if not task: return jsonify({"success": False, "status": "unknown", "message": "任务不存在"}), 404 payload = { "success": True, "task_id": task_id, "status": task.get("status") or (task.get("final_result") or {}).get("status") or "unknown", "message": task.get("error") or task.get("completion_reason") or (task.get("final_result") or {}).get("message") or "", "deliverables_dir": task.get("deliverables_dir"), "workspace_dir": task.get("workspace_dir"), "references_dir": task.get("references_dir"), "target_project_dir": task.get("target_project_dir"), "summary": task.get("summary"), "last_tool": task.get("last_tool"), "updated_at": task.get("updated_at"), "sub_conversation_id": task.get("sub_conversation_id"), } if task.get("status") == "completed": payload["message"] = task.get("completion_reason") or "子智能体已完成任务" if from_store: payload["archived"] = True return jsonify(payload) @app.route('/tasks//messages', methods=['POST']) def api_send_message_to_sub_agent(task_id: str): data = request.get_json(silent=True) or {} message = (data.get("message") or "").strip() if not message: return jsonify({"success": False, "error": "消息内容不能为空"}), 400 result = send_message_to_sub_agent(task_id, message) status_code = 200 if result.get("success") else 400 return jsonify(result), status_code @app.route('/tasks//stop', methods=['POST']) def api_stop_sub_agent(task_id: str): result = stop_sub_agent_task(task_id) status_code = 200 if result.get("success") else 400 return jsonify(result), status_code @app.route('/tasks//terminate', methods=['POST']) def api_terminate_sub_agent(task_id: str): result = force_terminate_sub_agent(task_id) status_code = 200 if result.get("success") else 400 return jsonify(result), status_code @app.route('/tasks//conversation', methods=['GET']) def api_get_sub_agent_conversation(task_id: str): info = get_task_record(task_id) from_store = task_id not in sub_agent_tasks if not info: return jsonify({"success": False, "error": "任务不存在"}), 404 conv_id = info.get("sub_conversation_id") or info.get("conversation_id") if not conv_id: return jsonify({"success": True, "conversation_id": None, "messages": []}) conv_file = None if not from_store: data_dir = Path(info.get("conversation_data_dir") or "").expanduser() candidate = data_dir / "conversations" / f"{conv_id}.json" if candidate.exists(): conv_file = candidate if conv_file is None: conv_file = find_sub_agent_conversation_file(conv_id) if conv_file is None: return jsonify({"success": True, "conversation_id": conv_id, "messages": []}) try: content = json.loads(conv_file.read_text(encoding="utf-8")) except Exception as exc: return jsonify({"success": False, "error": f"读取对话失败: {exc}"}), 500 messages = content.get("messages", []) if isinstance(content, dict) else [] return jsonify({ "success": True, "conversation_id": conv_id, "messages": messages }) @app.route('/sub_agent/conversations/', methods=['GET']) def api_get_archived_sub_agent_conversation(conv_id: str): """按conversation_id直接读取历史子智能体对话文件。""" normalized = _normalize_conversation_id(conv_id) file_path = find_sub_agent_conversation_file(normalized) if not file_path or not file_path.exists(): return jsonify({"success": False, "error": "对话不存在"}), 404 try: content = json.loads(file_path.read_text(encoding="utf-8")) except Exception as exc: return jsonify({"success": False, "error": f"读取对话失败: {exc}"}), 500 messages = content.get("messages", []) if isinstance(content, dict) else [] return jsonify({ "success": True, "conversation_id": normalized, "messages": messages }) @app.route('/tasks//files', methods=['GET']) def api_get_sub_agent_files(task_id: str): """返回子智能体工作目录的文件树,只读展示使用。""" task = get_task_record(task_id) if not task: return jsonify({"success": False, "error": "任务不存在"}), 404 workspace_dir = task.get("workspace_dir") if not workspace_dir: return jsonify({"success": False, "error": "任务未设置工作目录"}), 400 try: tree = build_workspace_tree(workspace_dir) return jsonify({"success": True, "data": tree}) except Exception as exc: return jsonify({"success": False, "error": f'构建文件树失败: {exc}'}), 500 @app.route('/sub_agent/') def serve_sub_agent_page(task_id: str): """子智能体监控页面(仅任务ID)。""" actual_id, parent_conv, sub_conv = _resolve_task_by_conv(None, task_id) return _render_sub_agent_index(actual_id, parent_conv, sub_conv) @app.route('//sub_agent_') def serve_sub_agent_with_conv(conv_slug: str, task_label: str): """带对话ID的子智能体页面 /conv_slug/sub_agent_