agent/sub_agent/web_server.py

4338 lines
173 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# web_server.py - Web服务器修复版 - 确保text_end事件正确发送 + 停止功能)
import asyncio
import json
import os
import sys
import re
from typing import Dict, List, Optional, Callable, Any, Tuple
from flask import Flask, request, jsonify, send_from_directory, session, redirect, send_file, Response
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect
from flask_cors import CORS
from werkzeug.exceptions import RequestEntityTooLarge
from pathlib import Path
from io import BytesIO
import zipfile
import argparse
from functools import wraps
from datetime import timedelta
import time
from datetime import datetime
import threading
from collections import defaultdict, deque
from werkzeug.utils import secure_filename
from werkzeug.routing import BaseConverter
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.web_terminal import WebTerminal
from core.sub_agent_terminal import SubAgentTerminal
from config import (
OUTPUT_FORMATS,
AUTO_FIX_TOOL_CALL,
AUTO_FIX_MAX_ATTEMPTS,
MAX_ITERATIONS_PER_TASK,
MAX_CONSECUTIVE_SAME_TOOL,
MAX_TOTAL_TOOL_CALLS,
TOOL_CALL_COOLDOWN,
MAX_UPLOAD_SIZE,
DEFAULT_CONVERSATIONS_LIMIT,
MAX_CONVERSATIONS_LIMIT,
CONVERSATIONS_DIR,
DEFAULT_RESPONSE_MAX_TOKENS,
DEFAULT_PROJECT_PATH,
LOGS_DIR,
AGENT_VERSION,
SUB_AGENT_MAX_ACTIVE,
SUB_AGENT_DEFAULT_TIMEOUT,
SUB_AGENT_STATE_FILE,
DATA_DIR,
SUB_AGENT_TASKS_BASE_DIR,
)
from modules.user_manager import UserManager, UserWorkspace
from modules.gui_file_manager import GuiFileManager
app = Flask(__name__, static_folder='static')
app.config['MAX_CONTENT_LENGTH'] = MAX_UPLOAD_SIZE
app.config['SECRET_KEY'] = 'your-secret-key-here'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=12)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
SUB_AGENT_INDEX_FILE = Path(app.static_folder).resolve() / "index.html"
class ConversationIdConverter(BaseConverter):
regex = r'(?:conv_)?\d{8}_\d{6}_\d{3}'
app.url_map.converters['conv'] = ConversationIdConverter
user_manager = UserManager()
user_terminals: Dict[str, WebTerminal] = {}
terminal_rooms: Dict[str, set] = {}
connection_users: Dict[str, str] = {}
stop_flags: Dict[str, Dict[str, Any]] = {}
terminated_tasks: set = set()
DEFAULT_PORT = 8092
# 子智能体任务管理
sub_agent_tasks: Dict[str, Dict[str, Any]] = {}
sub_agent_terminals: Dict[str, SubAgentTerminal] = {}
sub_agent_rooms: Dict[str, set] = defaultdict(set)
sub_agent_connections: Dict[str, str] = {}
SUB_AGENT_TERMINAL_STATUSES = {"completed", "failed", "timeout"}
STOPPING_GRACE_SECONDS = 30
TERMINAL_ARCHIVE_GRACE_SECONDS = 20
def format_read_file_result(result_data: Dict) -> str:
"""格式化 read_file 工具的输出便于在Web端展示。"""
if not isinstance(result_data, dict):
return json.dumps(result_data, ensure_ascii=False)
if not result_data.get("success"):
return json.dumps(result_data, ensure_ascii=False)
read_type = result_data.get("type", "read")
truncated_note = "(内容已截断)" if result_data.get("truncated") else ""
path = result_data.get("path", "未知路径")
max_chars = result_data.get("max_chars")
max_note = f"(max_chars={max_chars})" if max_chars else ""
if read_type == "read":
header = f"读取 {path}{result_data.get('line_start')}~{result_data.get('line_end')} {max_note}{truncated_note}".strip()
content = result_data.get("content", "")
return f"{header}\n```\n{content}\n```"
if read_type == "search":
query = result_data.get("query", "")
actual = result_data.get("actual_matches", 0)
returned = result_data.get("returned_matches", 0)
case_hint = "区分大小写" if result_data.get("case_sensitive") else "不区分大小写"
header = (
f"{path} 中搜索 \"{query}\",返回 {returned}/{actual} 条结果({case_hint} "
f"{max_note}{truncated_note}"
).strip()
match_texts = []
for idx, match in enumerate(result_data.get("matches", []), 1):
match_note = "(片段截断)" if match.get("truncated") else ""
hits = match.get("hits") or []
hit_text = ", ".join(str(h) for h in hits) if hits else ""
label = match.get("id") or f"match_{idx}"
snippet = match.get("snippet", "")
match_texts.append(
f"[{label}] 行 {match.get('line_start')}~{match.get('line_end')} 命中行: {hit_text}{match_note}\n```\n{snippet}\n```"
)
if not match_texts:
match_texts.append("未找到匹配内容。")
return "\n".join([header] + match_texts)
if read_type == "extract":
segments = result_data.get("segments", [])
header = f"{path} 抽取 {len(segments)} 个片段 {max_note}{truncated_note}".strip()
seg_texts = []
for idx, segment in enumerate(segments, 1):
seg_note = "(片段截断)" if segment.get("truncated") else ""
label = segment.get("label") or f"segment_{idx}"
snippet = segment.get("content", "")
seg_texts.append(
f"[{label}] 行 {segment.get('line_start')}~{segment.get('line_end')}{seg_note}\n```\n{snippet}\n```"
)
if not seg_texts:
seg_texts.append("未提供可抽取的片段。")
return "\n".join([header] + seg_texts)
return json.dumps(result_data, ensure_ascii=False)
def sanitize_filename_preserve_unicode(filename: str) -> str:
"""在保留中文等字符的同时,移除危险字符和路径成分"""
if not filename:
return ""
cleaned = filename.strip().replace("\x00", "")
if not cleaned:
return ""
# 去除路径成分
cleaned = cleaned.replace("\\", "/").split("/")[-1]
# 替换不安全符号
cleaned = re.sub(r'[<>:"\\|?*\n\r\t]', "_", cleaned)
# 去掉前后的点避免隐藏文件/穿越
cleaned = cleaned.strip(". ")
if not cleaned:
return ""
# Windows/Unix 通用文件名长度安全上限
return cleaned[:255]
def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], content: str) -> str:
"""将工具执行结果转为系统消息文本,方便在对话中回传。"""
header = f"[工具结果] {tool_name}"
if tool_call_id:
header += f" (tool_call_id={tool_call_id})"
body = (content or "").strip()
if not body:
body = "(无附加输出)"
return f"{header}\n{body}"
def _format_relative_path(path: Optional[str], workspace: Optional[str]) -> str:
"""将绝对路径转换为相对 workspace 的表示,默认返回原始路径。"""
if not path:
return ""
try:
target = Path(path).resolve()
if workspace:
base = Path(workspace).resolve()
rel = target.relative_to(base)
rel_text = rel.as_posix()
if not rel_text or rel_text == ".":
return "."
return f"./{rel_text}"
except Exception:
pass
return Path(path).as_posix()
def build_sub_agent_instruction(meta: Dict[str, Any]) -> str:
"""根据任务元数据构建初始指令消息。"""
workspace_dir = meta.get("workspace_dir")
references_dir = meta.get("references_dir")
deliverables_dir = meta.get("deliverables_dir")
workspace_label = _format_relative_path(workspace_dir, workspace_dir) or "."
references_label = _format_relative_path(references_dir, workspace_dir) or "references/"
deliverables_label = _format_relative_path(deliverables_dir, workspace_dir) or "deliverables/"
lines = [
f"子智能体任务 - 代号 {meta.get('agent_id')} (task_id={meta.get('task_id')})",
"",
meta.get("summary", ""),
meta.get("task", ""),
"",
"工作区与目录说明:",
f"- workspace: {workspace_label}(唯一可写根目录,所有操作只能在此路径内进行)",
f"- references: {references_label}(系统已自动创建,仅供查阅,不得修改原文件)",
f"- deliverables: {deliverables_label}(系统已自动创建,用于存放交付文件与 result.md",
"",
"交付要求:",
"- 所有成果放入 deliverables/ 下,保持清晰的目录结构;",
"- deliverables/ 必须包含 result.md说明完成情况、交付列表、遗留风险和下一步建议",
"- references/ 只读,如需引用请复制到 workspace/ 再处理;",
"- 子智能体无法与主智能体实时沟通,如信息不足请在 result.md 中说明。",
"- 最后必须调用 finish_sub_agent并在 reason 中概括完成情况;若暂未完成,也要说明阻塞原因并继续执行直至可交付。",
"",
"提示references/ 与 deliverables/ 目录均已自动创建,如需额外子目录请在 workspace/ 内自行管理。",
"",
"请立即分析任务、规划步骤并开始执行,过程中记录关键操作,确保交付即可直接被主智能体使用。",
]
return "\n".join(lines)
def build_finish_tool_reminder(meta: Dict[str, Any]) -> str:
"""构建提醒子智能体调用结束工具的提示语。"""
workspace_dir = meta.get("workspace_dir")
deliverables_dir = meta.get("deliverables_dir")
deliverables_label = _format_relative_path(deliverables_dir, workspace_dir) or "deliverables"
if deliverables_label in {"", "."}:
result_path = "./result.md"
else:
normalized = deliverables_label.rstrip("/")
result_path = f"{normalized}/result.md"
return (
f"⚠️ 检测到你已停止输出,但尚未调用 finish_sub_agent。请确认 {deliverables_label} 内的交付文件与 {result_path} 已准备完毕,"
"再调用 finish_sub_agent(reason=...) 正式结束任务;如果任务仍未完成,请继续完成剩余步骤并在完成后立即调用 finish_sub_agent。"
)
def cleanup_inactive_sub_agent_tasks(force: bool = False):
"""移除已结束或长期停止中的子智能体,避免占用名额。"""
now = time.time()
for task_id, task in list(sub_agent_tasks.items()):
status = (task.get("status") or "").lower()
if status in SUB_AGENT_TERMINAL_STATUSES:
updated_at = task.get("updated_at") or task.get("created_at") or now
if force or (now - updated_at) > TERMINAL_ARCHIVE_GRACE_SECONDS:
_purge_sub_agent_task(task_id)
continue
if status == "stopping":
updated_at = task.get("updated_at") or task.get("created_at") or now
if force or (now - updated_at) > STOPPING_GRACE_SECONDS:
_purge_sub_agent_task(task_id)
def _purge_sub_agent_task(task_id: str):
"""移除本地记录与相关连接。"""
sub_agent_tasks.pop(task_id, None)
terminal = sub_agent_terminals.pop(task_id, None)
if terminal and hasattr(terminal, "close"):
try:
terminal.close()
except Exception:
pass
room_sids = sub_agent_rooms.pop(task_id, set())
for sid in list(room_sids):
sub_agent_connections.pop(sid, None)
for sid, current in list(sub_agent_connections.items()):
if current == task_id:
sub_agent_connections.pop(sid, None)
for sid in list(stop_flags.keys()):
if task_id in sid:
stop_flags.pop(sid, None)
def get_active_sub_agent_count(conversation_id: Optional[str] = None) -> int:
cleanup_inactive_sub_agent_tasks()
normalized = _normalize_conversation_id(conversation_id)
count = 0
for task in sub_agent_tasks.values():
if task.get("status") not in {"pending", "running"}:
continue
if normalized:
if _normalize_conversation_id(task.get("parent_conversation_id")) != normalized:
continue
count += 1
return count
def find_sub_agent_conversation_file(conv_id: str) -> Optional[Path]:
"""在已知目录中搜索子智能体对话文件。"""
possible_dirs = []
tasks_root = Path(SUB_AGENT_TASKS_BASE_DIR).expanduser().resolve()
if tasks_root.exists():
possible_dirs.append(tasks_root)
data_root = Path(DATA_DIR).expanduser().resolve()
if data_root.exists():
possible_dirs.append(data_root)
users_root = Path("users").resolve()
if users_root.exists():
possible_dirs.append(users_root)
for base in possible_dirs:
try:
matches = list(base.rglob(f"{conv_id}.json"))
except Exception:
matches = []
for match in matches:
try:
if match.name == f"{conv_id}.json":
return match
except Exception:
continue
return None
def build_workspace_tree(root_path: str, max_depth: int = 5) -> Dict[str, Any]:
"""构建工作目录的文件树,用于子智能体只读视图。"""
root = Path(root_path).expanduser().resolve()
if not root.exists():
return {
"path": str(root),
"tree": {},
"folders": [],
"files": [],
"total_files": 0,
"total_size": 0
}
structure = {
"path": str(root),
"tree": {},
"folders": [],
"files": [],
"total_files": 0,
"total_size": 0
}
def scan_directory(path: Path, tree: Dict[str, Any], depth: int = 0):
if depth > max_depth:
return
try:
entries = sorted(
[p for p in path.iterdir() if not p.name.startswith('.')],
key=lambda p: (not p.is_dir(), p.name.lower())
)
except PermissionError:
return
for entry in entries:
relative_path = str(entry.relative_to(root))
if entry.is_dir():
structure["folders"].append({
"name": entry.name,
"path": relative_path
})
tree[entry.name] = {
"type": "folder",
"path": relative_path,
"children": {}
}
scan_directory(entry, tree[entry.name]["children"], depth + 1)
else:
try:
size = entry.stat().st_size
modified = datetime.fromtimestamp(entry.stat().st_mtime).isoformat()
except OSError:
size = 0
modified = ""
structure["files"].append({
"name": entry.name,
"path": relative_path,
"size": size,
"modified": modified
})
structure["total_files"] += 1
structure["total_size"] += size
tree[entry.name] = {
"type": "file",
"path": relative_path,
"size": size,
"modified": modified
}
scan_directory(root, structure["tree"])
return structure
def _load_state_data() -> Dict[str, Any]:
state_file = Path(SUB_AGENT_STATE_FILE).expanduser().resolve()
if not state_file.exists():
return {}
try:
with state_file.open('r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def load_persisted_task(task_id: str) -> Optional[Dict[str, Any]]:
data = _load_state_data()
tasks = data.get("tasks") or {}
return tasks.get(task_id)
def iter_persisted_tasks():
data = _load_state_data()
tasks = data.get("tasks") or {}
return list(tasks.values())
def get_task_record(task_id: str) -> Optional[Dict[str, Any]]:
task = sub_agent_tasks.get(task_id)
if task:
return task
return load_persisted_task(task_id)
def broadcast_sub_agent_event(task_id: str, event_type: str, payload: Optional[Dict[str, Any]] = None):
room = f"sub_agent_{task_id}"
data = {"task_id": task_id}
if payload:
data.update(payload)
socketio.emit(event_type, data, room=room)
socketio.emit(f"event_{event_type}", data, room=room)
def make_sub_agent_sender(task_id: str):
def _sender(event_type: str, data: Optional[Dict]):
if data is None:
data = {}
data = dict(data)
data["task_id"] = task_id
if event_type in {"tool_start", "tool_preparing"}:
tool_name = data.get("tool_name") or data.get("tool")
if tool_name:
update_sub_agent_task(task_id, last_tool=tool_name, status="running")
if event_type == "task_complete":
update_sub_agent_task(task_id, last_tool=None)
if event_type == "error":
mark_task_failed(task_id, data.get("message") or "子智能体执行失败")
broadcast_sub_agent_event(task_id, event_type, data)
return _sender
def _normalize_conversation_id(value: Optional[str]) -> Optional[str]:
if not value:
return None
value = value.strip()
if not value:
return None
return value if value.startswith("conv_") else f"conv_{value}"
def _extract_parent_conversation(info: Optional[Dict[str, Any]]) -> Optional[str]:
"""尝试从任务记录中提取父对话ID。"""
if not info:
return None
service_payload = info.get("service_payload") or {}
candidates = [
info.get("parent_conversation_id"),
info.get("conversation_id"),
service_payload.get("parent_conversation_id"),
]
for candidate in candidates:
normalized = _normalize_conversation_id(candidate)
if normalized:
return normalized
return None
def _inject_sub_agent_script(html: str, task_id: str, parent_conv: Optional[str], sub_conv: Optional[str]) -> str:
script = (
"<script>"
"window.APP_VARIANT='sub_agent';"
f"window.SUB_AGENT_TASK_ID={json.dumps(task_id)};"
f"window.SUB_AGENT_CONVERSATION_ID={json.dumps(sub_conv or parent_conv or '')};"
"</script>"
)
return html.replace("</head>", f"{script}</head>", 1)
def _render_sub_agent_index(task_id: str, parent_conv: Optional[str], sub_conv: Optional[str]):
if not SUB_AGENT_INDEX_FILE.exists():
return send_from_directory(app.static_folder, 'index.html')
html = SUB_AGENT_INDEX_FILE.read_text(encoding="utf-8")
patched = _inject_sub_agent_script(html, task_id, parent_conv, sub_conv)
return Response(patched, mimetype="text/html")
def _resolve_task_by_conv(conv_slug: Optional[str], task_label: str):
normalized_conv = _normalize_conversation_id(conv_slug)
agent_id = None
if task_label.lower().startswith("sub_agent"):
suffix = task_label[len("sub_agent"):]
if suffix.isdigit():
agent_id = int(suffix)
state_data = _load_state_data()
stored_tasks = state_data.get("tasks") or {}
conv_map = state_data.get("conversation_agents") or {}
def _matches(info: Optional[Dict[str, Any]]) -> bool:
if not info:
return False
parent_conv = _extract_parent_conversation(info)
if normalized_conv and parent_conv != normalized_conv:
return False
if agent_id is not None and info.get("agent_id") != agent_id:
return False
if not normalized_conv and agent_id is None and info.get("task_id") != task_label:
return False
return True
record = sub_agent_tasks.get(task_label) or stored_tasks.get(task_label)
if not record:
record = next((info for info in sub_agent_tasks.values() if _matches(info)), None)
if not record:
record = next((info for info in stored_tasks.values() if _matches(info)), None)
if not record and normalized_conv and agent_id is not None:
agent_list = conv_map.get(normalized_conv, [])
if agent_list and agent_id in agent_list:
for info in stored_tasks.values():
if info.get("agent_id") == agent_id and _extract_parent_conversation(info) == normalized_conv:
record = info
break
actual_task_id = record.get("task_id") if record else task_label
parent_conv = _extract_parent_conversation(record) or normalized_conv
sub_conv = record.get("sub_conversation_id") if record else None
return actual_task_id, parent_conv, sub_conv
# 创建调试日志文件
DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log"
UPLOAD_FOLDER_NAME = "user_upload"
def is_logged_in() -> bool:
return session.get('username') is not None
def login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if not is_logged_in():
return redirect('/login')
return view_func(*args, **kwargs)
return wrapped
def api_login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if not is_logged_in():
return jsonify({"error": "Unauthorized"}), 401
return view_func(*args, **kwargs)
return wrapped
def get_current_username() -> Optional[str]:
return session.get('username')
def make_terminal_callback(username: str):
"""生成面向指定用户的广播函数"""
def _callback(event_type, data):
try:
socketio.emit(event_type, data, room=f"user_{username}")
except Exception as exc:
debug_log(f"广播事件失败 ({username}): {event_type} - {exc}")
return _callback
def attach_user_broadcast(terminal: WebTerminal, username: str):
"""确保终端的广播函数指向当前用户的房间"""
callback = make_terminal_callback(username)
terminal.message_callback = callback
if terminal.terminal_manager:
terminal.terminal_manager.broadcast = callback
def get_user_resources(username: Optional[str] = None) -> Tuple[Optional[WebTerminal], Optional[UserWorkspace]]:
username = (username or get_current_username())
if not username:
return None, None
workspace = user_manager.ensure_user_workspace(username)
terminal = user_terminals.get(username)
if not terminal:
thinking_mode = session.get('thinking_mode', False)
terminal = WebTerminal(
project_path=str(workspace.project_path),
thinking_mode=thinking_mode,
message_callback=make_terminal_callback(username),
data_dir=str(workspace.data_dir)
)
if terminal.terminal_manager:
terminal.terminal_manager.broadcast = terminal.message_callback
user_terminals[username] = terminal
else:
attach_user_broadcast(terminal, username)
return terminal, workspace
def with_terminal(func):
"""注入用户专属终端和工作区"""
@wraps(func)
def wrapper(*args, **kwargs):
username = get_current_username()
terminal, workspace = get_user_resources(username)
if not terminal or not workspace:
return jsonify({"error": "System not initialized"}), 503
kwargs.update({
'terminal': terminal,
'workspace': workspace,
'username': username
})
return func(*args, **kwargs)
return wrapper
def get_terminal_for_sid(sid: str) -> Tuple[Optional[str], Optional[WebTerminal], Optional[UserWorkspace]]:
username = connection_users.get(sid)
if not username:
return None, None, None
terminal, workspace = get_user_resources(username)
return username, terminal, workspace
def get_gui_manager(workspace: UserWorkspace) -> GuiFileManager:
"""构建 GUI 文件管理器"""
return GuiFileManager(str(workspace.project_path))
def ensure_conversation_loaded(terminal: WebTerminal, conversation_id: Optional[str], thinking_mode: bool) -> Tuple[str, bool]:
"""确保终端加载指定对话,若无则创建新的"""
created_new = False
if not conversation_id:
result = terminal.create_new_conversation(thinking_mode=thinking_mode)
if not result.get("success"):
raise RuntimeError(result.get("message", "创建对话失败"))
conversation_id = result["conversation_id"]
created_new = True
else:
conversation_id = conversation_id if conversation_id.startswith('conv_') else f"conv_{conversation_id}"
current_id = terminal.context_manager.current_conversation_id
if current_id != conversation_id:
load_result = terminal.load_conversation(conversation_id)
if not load_result.get("success"):
raise RuntimeError(load_result.get("message", "对话加载失败"))
return conversation_id, created_new
def reset_system_state(terminal: Optional[WebTerminal]):
"""完整重置系统状态,确保停止后能正常开始新任务"""
if not terminal:
return
try:
# 1. 重置API客户端状态
if hasattr(terminal, 'api_client') and terminal.api_client:
debug_log("重置API客户端状态")
terminal.api_client.start_new_task() # 重置思考模式状态
# 2. 重置主终端会话状态
if hasattr(terminal, 'current_session_id'):
terminal.current_session_id += 1 # 开始新会话
debug_log(f"重置会话ID为: {terminal.current_session_id}")
# 3. 清理读取文件跟踪器
debug_log("清理文件读取跟踪器")
# 4. 重置Web特有的状态属性
web_attrs = ['streamingMessage', 'currentMessageIndex', 'preparingTools', 'activeTools']
for attr in web_attrs:
if hasattr(terminal, attr):
if attr in ['streamingMessage']:
setattr(terminal, attr, False)
elif attr in ['currentMessageIndex']:
setattr(terminal, attr, -1)
elif attr in ['preparingTools', 'activeTools'] and hasattr(getattr(terminal, attr), 'clear'):
getattr(terminal, attr).clear()
debug_log("系统状态重置完成")
except Exception as e:
debug_log(f"状态重置过程中出现错误: {e}")
import traceback
debug_log(f"错误详情: {traceback.format_exc()}")
def debug_log(message):
"""写入调试日志"""
DEBUG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with DEBUG_LOG_FILE.open('a', encoding='utf-8') as f:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
f.write(f"[{timestamp}] {message}\n")
# 终端广播回调函数
def terminal_broadcast(event_type, data):
"""广播终端事件到所有订阅者"""
try:
# 对于全局事件,发送给所有连接的客户端
if event_type in ('todo_updated',):
socketio.emit(event_type, data) # 全局广播,不限制房间
debug_log(f"全局广播{event_type}: {data}")
else:
# 其他终端事件发送到终端订阅者房间
socketio.emit(event_type, data, room='terminal_subscribers')
# 如果是特定会话的事件,也发送到该会话的专属房间
if 'session' in data:
session_room = f"terminal_{data['session']}"
socketio.emit(event_type, data, room=session_room)
debug_log(f"终端广播: {event_type} - {data}")
except Exception as e:
debug_log(f"终端广播错误: {e}")
@app.route('/login', methods=['GET', 'POST'])
def login():
"""登录页面与认证"""
if request.method == 'GET':
if is_logged_in():
return redirect('/new')
return app.send_static_file('login.html')
data = request.get_json() or {}
email = (data.get('email') or '').strip()
password = data.get('password') or ''
record = user_manager.authenticate(email, password)
if not record:
return jsonify({"success": False, "error": "账号或密码错误"}), 401
session['logged_in'] = True
session['username'] = record.username
session['thinking_mode'] = app.config.get('DEFAULT_THINKING_MODE', False)
session.permanent = True
user_manager.ensure_user_workspace(record.username)
return jsonify({"success": True})
@app.route('/register', methods=['GET', 'POST'])
def register():
"""注册新用户(需要邀请码)"""
if request.method == 'GET':
if is_logged_in():
return redirect('/new')
return app.send_static_file('register.html')
data = request.get_json() or {}
username = (data.get('username') or '').strip()
email = (data.get('email') or '').strip()
password = data.get('password') or ''
invite_code = (data.get('invite_code') or '').strip()
try:
user_manager.register_user(username, email, password, invite_code)
return jsonify({"success": True})
except ValueError as exc:
return jsonify({"success": False, "error": str(exc)}), 400
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 500
@app.route('/logout', methods=['POST'])
def logout():
"""退出登录"""
username = session.get('username')
session.clear()
if username and username in user_terminals:
user_terminals.pop(username, None)
return jsonify({"success": True})
@app.route('/')
@login_required
def index():
"""主页 -> 重定向到 /new"""
return redirect('/new')
@app.route('/new')
@login_required
def new_page():
return app.send_static_file('index.html')
@app.route('/<conv:conversation_id>')
@login_required
def conversation_page(conversation_id):
return app.send_static_file('index.html')
@app.route('/terminal')
@login_required
def terminal_page():
"""终端监控页面"""
return app.send_static_file('terminal.html')
@app.route('/file-manager')
@login_required
def gui_file_manager_page():
"""桌面式文件管理器页面"""
return send_from_directory(Path(app.static_folder) / 'file_manager', 'index.html')
@app.route('/file-manager/editor')
@login_required
def gui_file_editor_page():
"""GUI 文件编辑器页面"""
return send_from_directory(Path(app.static_folder) / 'file_manager', 'editor.html')
@app.route('/file-preview/<path:relative_path>')
@login_required
@with_terminal
def gui_file_preview(relative_path: str, terminal: WebTerminal, workspace: UserWorkspace, username: str):
manager = get_gui_manager(workspace)
try:
target = manager.prepare_download(relative_path)
if not target.is_file():
return "预览仅支持文件", 400
return send_from_directory(
directory=target.parent,
path=target.name,
mimetype='text/html'
)
except Exception as exc:
return f"无法预览文件: {exc}", 400
@app.route('/static/<path:filename>')
def static_files(filename):
"""提供静态文件"""
return send_from_directory('static', filename)
@app.route('/api/status')
@api_login_required
@with_terminal
def get_status(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取系统状态(增强版:包含对话信息)"""
status = terminal.get_status()
# 添加终端状态信息
if terminal.terminal_manager:
terminal_status = terminal.terminal_manager.list_terminals()
status['terminals'] = terminal_status
# 【新增】添加当前对话的详细信息
try:
current_conv = terminal.context_manager.current_conversation_id
status['conversation'] = status.get('conversation', {})
status['conversation']['current_id'] = current_conv
if current_conv and not current_conv.startswith('temp_'):
current_conv_data = terminal.context_manager.conversation_manager.load_conversation(current_conv)
if current_conv_data:
status['conversation']['title'] = current_conv_data.get('title', '未知对话')
status['conversation']['created_at'] = current_conv_data.get('created_at')
status['conversation']['updated_at'] = current_conv_data.get('updated_at')
except Exception as e:
print(f"[Status] 获取当前对话信息失败: {e}")
status['project_path'] = str(workspace.project_path)
status['version'] = AGENT_VERSION
return jsonify(status)
@app.route('/api/files')
@api_login_required
@with_terminal
def get_files(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取文件树"""
structure = terminal.context_manager.get_project_structure()
return jsonify(structure)
# ==========================================
# 新版 GUI 文件管理器 API
# ==========================================
def _format_entry(entry) -> Dict[str, Any]:
return {
"name": entry.name,
"path": entry.path,
"type": entry.type,
"size": entry.size,
"modified_at": entry.modified_at,
"extension": entry.extension,
"is_editable": entry.is_editable,
}
@app.route('/api/gui/files/entries', methods=['GET'])
@api_login_required
@with_terminal
def gui_list_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""列出指定目录内容"""
relative_path = request.args.get('path') or ""
manager = get_gui_manager(workspace)
try:
resolved_path, entries = manager.list_directory(relative_path)
breadcrumb = manager.breadcrumb(resolved_path)
return jsonify({
"success": True,
"data": {
"path": resolved_path,
"breadcrumb": breadcrumb,
"items": [_format_entry(entry) for entry in entries]
}
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/create', methods=['POST'])
@api_login_required
@with_terminal
def gui_create_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
parent = payload.get('path') or ""
name = payload.get('name') or ""
entry_type = payload.get('type') or "file"
manager = get_gui_manager(workspace)
try:
new_path = manager.create_entry(parent, name, entry_type)
return jsonify({
"success": True,
"path": new_path
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/delete', methods=['POST'])
@api_login_required
@with_terminal
def gui_delete_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
paths = payload.get('paths') or []
manager = get_gui_manager(workspace)
try:
result = manager.delete_entries(paths)
return jsonify({
"success": True,
"result": result
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/rename', methods=['POST'])
@api_login_required
@with_terminal
def gui_rename_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
path = payload.get('path')
new_name = payload.get('new_name')
if not path or not new_name:
return jsonify({"success": False, "error": "缺少 path 或 new_name"}), 400
manager = get_gui_manager(workspace)
try:
new_path = manager.rename_entry(path, new_name)
return jsonify({
"success": True,
"path": new_path
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/copy', methods=['POST'])
@api_login_required
@with_terminal
def gui_copy_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
paths = payload.get('paths') or []
target_dir = payload.get('target_dir') or ""
manager = get_gui_manager(workspace)
try:
result = manager.copy_entries(paths, target_dir)
return jsonify({
"success": True,
"result": result
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/move', methods=['POST'])
@api_login_required
@with_terminal
def gui_move_entries(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
paths = payload.get('paths') or []
target_dir = payload.get('target_dir') or ""
manager = get_gui_manager(workspace)
try:
result = manager.move_entries(paths, target_dir)
return jsonify({
"success": True,
"result": result
})
except Exception as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/gui/files/upload', methods=['POST'])
@api_login_required
@with_terminal
def gui_upload_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str):
if 'file' not in request.files:
return jsonify({"success": False, "error": "未找到文件"}), 400
file_obj = request.files['file']
if not file_obj or not file_obj.filename:
return jsonify({"success": False, "error": "文件名为空"}), 400
current_dir = request.form.get('path') or ""
raw_name = request.form.get('filename') or file_obj.filename
filename = sanitize_filename_preserve_unicode(raw_name) or secure_filename(raw_name)
if not filename:
return jsonify({"success": False, "error": "非法文件名"}), 400
manager = get_gui_manager(workspace)
try:
target_path = manager.prepare_upload(current_dir, filename)
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
try:
file_obj.save(target_path)
except Exception as exc:
return jsonify({"success": False, "error": f'保存文件失败: {exc}'}), 500
return jsonify({
"success": True,
"path": manager._to_relative(target_path)
})
@app.route('/api/gui/files/download', methods=['GET'])
@api_login_required
@with_terminal
def gui_download_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str):
path = request.args.get('path')
if not path:
return jsonify({"success": False, "error": "缺少 path"}), 400
manager = get_gui_manager(workspace)
try:
target = manager.prepare_download(path)
if target.is_dir():
memory_file = BytesIO()
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(target):
for file in files:
full_path = Path(root) / file
arcname = manager._to_relative(full_path)
zf.write(full_path, arcname=arcname)
memory_file.seek(0)
download_name = f"{target.name}.zip"
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
return send_file(target, as_attachment=True, download_name=target.name)
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@app.route('/api/gui/files/download/batch', methods=['POST'])
@api_login_required
@with_terminal
def gui_download_batch(terminal: WebTerminal, workspace: UserWorkspace, username: str):
payload = request.get_json() or {}
paths = payload.get('paths') or []
if not paths:
return jsonify({"success": False, "error": "缺少待下载的路径"}), 400
manager = get_gui_manager(workspace)
try:
memory_file = BytesIO()
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for rel in paths:
target = manager.prepare_download(rel)
arc_base = rel.strip('/') or target.name
if target.is_dir():
for root, _, files in os.walk(target):
for file in files:
full_path = Path(root) / file
relative_sub = full_path.relative_to(target)
arcname = Path(arc_base) / relative_sub
zf.write(full_path, arcname=str(arcname))
else:
zf.write(target, arcname=arc_base)
memory_file.seek(0)
download_name = f"selected_{len(paths)}.zip"
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@app.route('/api/gui/files/text', methods=['GET', 'POST'])
@api_login_required
@with_terminal
def gui_text_entry(terminal: WebTerminal, workspace: UserWorkspace, username: str):
manager = get_gui_manager(workspace)
if request.method == 'GET':
path = request.args.get('path')
if not path:
return jsonify({"success": False, "error": "缺少 path"}), 400
try:
content, modified = manager.read_text(path)
return jsonify({
"success": True,
"path": path,
"content": content,
"modified_at": modified
})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
payload = request.get_json() or {}
path = payload.get('path')
content = payload.get('content')
if path is None or content is None:
return jsonify({"success": False, "error": "缺少 path 或 content"}), 400
try:
result = manager.write_text(path, content)
return jsonify({"success": True, "data": result})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@app.route('/api/focused')
@api_login_required
@with_terminal
def get_focused_files(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取聚焦文件"""
focused = {}
for path, content in terminal.focused_files.items():
focused[path] = {
"content": content,
"size": len(content),
"lines": content.count('\n') + 1
}
return jsonify(focused)
@app.route('/api/todo-list')
@api_login_required
@with_terminal
def get_todo_list(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取当前待办列表"""
todo_snapshot = terminal.context_manager.get_todo_snapshot()
return jsonify({
"success": True,
"data": todo_snapshot
})
@app.route('/api/upload', methods=['POST'])
@api_login_required
@with_terminal
def upload_file(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""处理前端文件上传请求"""
if 'file' not in request.files:
return jsonify({
"success": False,
"error": "未找到文件",
"message": "请求中缺少文件字段"
}), 400
uploaded_file = request.files['file']
original_name = (request.form.get('filename') or '').strip()
if not uploaded_file or not uploaded_file.filename or uploaded_file.filename.strip() == '':
return jsonify({
"success": False,
"error": "文件名为空",
"message": "请选择要上传的文件"
}), 400
raw_name = original_name or uploaded_file.filename
filename = sanitize_filename_preserve_unicode(raw_name)
if not filename:
filename = secure_filename(raw_name)
if not filename:
return jsonify({
"success": False,
"error": "非法文件名",
"message": "文件名包含不支持的字符"
}), 400
file_manager = getattr(terminal, 'file_manager', None)
if file_manager is None:
return jsonify({
"success": False,
"error": "文件管理器未初始化"
}), 500
target_folder_relative = UPLOAD_FOLDER_NAME
valid_folder, folder_error, folder_path = file_manager._validate_path(target_folder_relative)
if not valid_folder:
return jsonify({
"success": False,
"error": folder_error
}), 400
try:
folder_path.mkdir(parents=True, exist_ok=True)
except Exception as exc:
return jsonify({
"success": False,
"error": f"创建上传目录失败: {exc}"
}), 500
target_relative = str(Path(target_folder_relative) / filename)
valid_file, file_error, target_full_path = file_manager._validate_path(target_relative)
if not valid_file:
return jsonify({
"success": False,
"error": file_error
}), 400
final_path = target_full_path
if final_path.exists():
stem = final_path.stem
suffix = final_path.suffix
counter = 1
while final_path.exists():
candidate_name = f"{stem}_{counter}{suffix}"
target_relative = str(Path(target_folder_relative) / candidate_name)
valid_file, file_error, candidate_path = file_manager._validate_path(target_relative)
if not valid_file:
return jsonify({
"success": False,
"error": file_error
}), 400
final_path = candidate_path
counter += 1
try:
uploaded_file.save(final_path)
except Exception as exc:
return jsonify({
"success": False,
"error": f"保存文件失败: {exc}"
}), 500
relative_path = str(final_path.relative_to(workspace.project_path))
print(f"{OUTPUT_FORMATS['file']} 上传文件: {relative_path}")
return jsonify({
"success": True,
"path": relative_path,
"filename": final_path.name,
"folder": target_folder_relative
})
@app.errorhandler(RequestEntityTooLarge)
def handle_file_too_large(error):
"""全局捕获上传超大小"""
size_mb = MAX_UPLOAD_SIZE / (1024 * 1024)
return jsonify({
"success": False,
"error": "文件过大",
"message": f"单个文件大小不可超过 {size_mb:.1f} MB"
}), 413
@app.route('/api/download/file')
@api_login_required
@with_terminal
def download_file_api(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""下载单个文件"""
path = (request.args.get('path') or '').strip()
if not path:
return jsonify({"success": False, "error": "缺少路径参数"}), 400
valid, error, full_path = terminal.file_manager._validate_path(path)
if not valid or full_path is None:
return jsonify({"success": False, "error": error or "路径校验失败"}), 400
if not full_path.exists() or not full_path.is_file():
return jsonify({"success": False, "error": "文件不存在"}), 404
return send_file(
full_path,
as_attachment=True,
download_name=full_path.name
)
@app.route('/api/download/folder')
@api_login_required
@with_terminal
def download_folder_api(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""打包并下载文件夹"""
path = (request.args.get('path') or '').strip()
if not path:
return jsonify({"success": False, "error": "缺少路径参数"}), 400
valid, error, full_path = terminal.file_manager._validate_path(path)
if not valid or full_path is None:
return jsonify({"success": False, "error": error or "路径校验失败"}), 400
if not full_path.exists() or not full_path.is_dir():
return jsonify({"success": False, "error": "文件夹不存在"}), 404
buffer = BytesIO()
folder_name = Path(path).name or full_path.name or "archive"
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zip_buffer:
# 确保目录本身被包含
zip_buffer.write(full_path, arcname=folder_name + '/')
for item in full_path.rglob('*'):
relative_name = Path(folder_name) / item.relative_to(full_path)
if item.is_dir():
zip_buffer.write(item, arcname=str(relative_name) + '/')
else:
zip_buffer.write(item, arcname=str(relative_name))
buffer.seek(0)
return send_file(
buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f"{folder_name}.zip"
)
@app.route('/api/tool-settings', methods=['GET', 'POST'])
@api_login_required
@with_terminal
def tool_settings(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取或更新工具启用状态"""
if request.method == 'GET':
snapshot = terminal.get_tool_settings_snapshot()
return jsonify({
"success": True,
"categories": snapshot
})
data = request.get_json() or {}
category = data.get('category')
if category is None:
return jsonify({
"success": False,
"error": "缺少类别参数",
"message": "请求体需要提供 category 字段"
}), 400
if 'enabled' not in data:
return jsonify({
"success": False,
"error": "缺少启用状态",
"message": "请求体需要提供 enabled 字段"
}), 400
try:
enabled = bool(data['enabled'])
terminal.set_tool_category_enabled(category, enabled)
snapshot = terminal.get_tool_settings_snapshot()
socketio.emit('tool_settings_updated', {
'categories': snapshot
}, room=f"user_{username}")
return jsonify({
"success": True,
"categories": snapshot
})
except ValueError as exc:
return jsonify({
"success": False,
"error": str(exc)
}), 400
@app.route('/api/terminals')
@api_login_required
@with_terminal
def get_terminals(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取终端会话列表"""
if terminal.terminal_manager:
result = terminal.terminal_manager.list_terminals()
return jsonify(result)
else:
return jsonify({"sessions": [], "active": None, "total": 0})
@socketio.on('connect')
def handle_connect():
"""客户端连接"""
print(f"[WebSocket] 客户端连接: {request.sid}")
username = get_current_username()
if not username:
connection_users[request.sid] = None
emit('connected', {'status': 'guest'})
return
emit('connected', {'status': 'Connected to server'})
connection_users[request.sid] = username
# 清理可能存在的停止标志和状态
stop_flags.pop(request.sid, None)
join_room(f"user_{username}")
join_room(f"user_{username}_terminal")
if request.sid not in terminal_rooms:
terminal_rooms[request.sid] = set()
terminal_rooms[request.sid].update({f"user_{username}", f"user_{username}_terminal"})
terminal, workspace = get_user_resources(username)
if terminal:
reset_system_state(terminal)
emit('system_ready', {
'project_path': str(workspace.project_path),
'thinking_mode': terminal.get_thinking_mode_status(),
'version': AGENT_VERSION
}, room=request.sid)
if terminal.terminal_manager:
terminals = terminal.terminal_manager.get_terminal_list()
emit('terminal_list_update', {
'terminals': terminals,
'active': terminal.terminal_manager.active_terminal
}, room=request.sid)
if terminal.terminal_manager.active_terminal:
for name, term in terminal.terminal_manager.terminals.items():
emit('terminal_started', {
'session': name,
'working_dir': str(term.working_dir),
'shell': term.shell_command,
'time': term.start_time.isoformat() if term.start_time else None
}, room=request.sid)
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开"""
print(f"[WebSocket] 客户端断开: {request.sid}")
username = connection_users.pop(request.sid, None)
# 清理停止标志
stop_flags.pop(request.sid, None)
# 从所有房间移除
for room in list(terminal_rooms.get(request.sid, [])):
leave_room(room)
if request.sid in terminal_rooms:
del terminal_rooms[request.sid]
if username:
leave_room(f"user_{username}")
leave_room(f"user_{username}_terminal")
task_id = sub_agent_connections.pop(request.sid, None)
if task_id:
room = f"sub_agent_{task_id}"
leave_room(room)
if request.sid in sub_agent_rooms.get(task_id, set()):
sub_agent_rooms[task_id].remove(request.sid)
@socketio.on('stop_task')
def handle_stop_task():
"""处理停止任务请求"""
print(f"[停止] 收到停止请求: {request.sid}")
# 检查是否有正在运行的任务
if request.sid in stop_flags and isinstance(stop_flags[request.sid], dict):
# 获取任务引用并取消
task_info = stop_flags[request.sid]
if 'task' in task_info and not task_info['task'].done():
debug_log(f"正在取消任务: {request.sid}")
task_info['task'].cancel()
# 设置停止标志
task_info['stop'] = True
if task_info.get('terminal'):
reset_system_state(task_info['terminal'])
else:
# 如果没有任务引用,使用旧的布尔标志
stop_flags[request.sid] = True
emit('stop_requested', {
'message': '停止请求已接收,正在取消任务...'
})
@socketio.on('terminal_subscribe')
def handle_terminal_subscribe(data):
"""订阅终端事件"""
session_name = data.get('session')
subscribe_all = data.get('all', False)
username, terminal, _ = get_terminal_for_sid(request.sid)
if not username or not terminal or not terminal.terminal_manager:
emit('error', {'message': 'Terminal system not initialized'})
return
if request.sid not in terminal_rooms:
terminal_rooms[request.sid] = set()
if subscribe_all:
# 订阅所有终端事件
room_name = f"user_{username}_terminal"
join_room(room_name)
terminal_rooms[request.sid].add(room_name)
print(f"[Terminal] {request.sid} 订阅所有终端事件")
# 发送当前终端状态
emit('terminal_subscribed', {
'type': 'all',
'terminals': terminal.terminal_manager.get_terminal_list()
})
elif session_name:
# 订阅特定终端会话
room_name = f'user_{username}_terminal_{session_name}'
join_room(room_name)
terminal_rooms[request.sid].add(room_name)
print(f"[Terminal] {request.sid} 订阅终端: {session_name}")
# 发送该终端的当前输出
output_result = terminal.terminal_manager.get_terminal_output(session_name, 100)
if output_result['success']:
emit('terminal_history', {
'session': session_name,
'output': output_result['output']
})
@socketio.on('terminal_unsubscribe')
def handle_terminal_unsubscribe(data):
"""取消订阅终端事件"""
session_name = data.get('session')
username = connection_users.get(request.sid)
if session_name:
room_name = f'user_{username}_terminal_{session_name}' if username else f'terminal_{session_name}'
leave_room(room_name)
if request.sid in terminal_rooms:
terminal_rooms[request.sid].discard(room_name)
print(f"[Terminal] {request.sid} 取消订阅终端: {session_name}")
@socketio.on('get_terminal_output')
def handle_get_terminal_output(data):
"""获取终端输出历史"""
session_name = data.get('session')
lines = data.get('lines', 50)
username, terminal, _ = get_terminal_for_sid(request.sid)
if not terminal or not terminal.terminal_manager:
emit('error', {'message': 'Terminal system not initialized'})
return
result = terminal.terminal_manager.get_terminal_output(session_name, lines)
if result['success']:
emit('terminal_output_history', {
'session': session_name,
'output': result['output'],
'is_interactive': result.get('is_interactive', False),
'last_command': result.get('last_command', '')
})
else:
emit('error', {'message': result['error']})
@socketio.on('send_message')
def handle_message(data):
"""处理用户消息"""
username, terminal, workspace = get_terminal_for_sid(request.sid)
if not terminal:
emit('error', {'message': 'System not initialized'})
return
message = (data.get('message') or '').strip()
if not message:
emit('error', {'message': '消息不能为空'})
return
print(f"[WebSocket] 收到消息: {message}")
debug_log(f"\n{'='*80}\n新任务开始: {message}\n{'='*80}")
requested_conversation_id = data.get('conversation_id')
try:
conversation_id, created_new = ensure_conversation_loaded(terminal, requested_conversation_id, terminal.thinking_mode)
except RuntimeError as exc:
emit('error', {'message': str(exc)})
return
try:
conv_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id) or {}
except Exception:
conv_data = {}
title = conv_data.get('title', '新对话')
socketio.emit('conversation_resolved', {
'conversation_id': conversation_id,
'title': title,
'created': created_new
}, room=request.sid)
if created_new:
socketio.emit('conversation_list_update', {
'action': 'created',
'conversation_id': conversation_id
}, room=f"user_{username}")
socketio.emit('conversation_changed', {
'conversation_id': conversation_id,
'title': title
}, room=request.sid)
client_sid = request.sid
def send_to_client(event_type, data):
"""发送消息到客户端"""
socketio.emit(event_type, data, room=client_sid)
# 传递客户端ID
socketio.start_background_task(process_message_task, terminal, message, send_to_client, client_sid)
@socketio.on('sub_agent_join')
def handle_sub_agent_join(data):
task_id = (data or {}).get("task_id")
task = sub_agent_tasks.get(task_id)
if not task:
emit('sub_agent_status', {"task_id": task_id, "status": "unknown", "message": "任务不存在"})
return
room = f"sub_agent_{task_id}"
join_room(room)
sub_agent_connections[request.sid] = task_id
sub_agent_rooms[task_id].add(request.sid)
emit('sub_agent_status', {
"task_id": task_id,
"status": task.get("status"),
"summary": task.get("summary"),
"last_tool": task.get("last_tool"),
"deliverables_dir": task.get("deliverables_dir"),
})
@socketio.on('sub_agent_leave')
def handle_sub_agent_leave():
task_id = sub_agent_connections.pop(request.sid, None)
if not task_id:
return
room = f"sub_agent_{task_id}"
leave_room(room)
if request.sid in sub_agent_rooms.get(task_id, set()):
sub_agent_rooms[task_id].remove(request.sid)
@socketio.on('sub_agent_message')
def handle_sub_agent_message(data):
task_id = (data or {}).get("task_id")
message = (data or {}).get("message", "").strip()
if not task_id or not message:
emit('sub_agent_status', {
"task_id": task_id,
"status": "error",
"message": "缺少任务ID或消息内容"
})
return
result = send_message_to_sub_agent(task_id, message)
if not result.get("success"):
emit('sub_agent_status', {
"task_id": task_id,
"status": "error",
"message": result.get("error")
})
# 在 web_server.py 中添加以下对话管理API接口
# 添加在现有路由之后,@socketio 事件处理之前
# ==========================================
# 对话管理API接口
# ==========================================
@app.route('/api/conversations', methods=['GET'])
@api_login_required
@with_terminal
def get_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取对话列表"""
try:
# 获取查询参数
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
# 限制参数范围
limit = max(1, min(limit, 100)) # 限制在1-100之间
offset = max(0, offset)
result = terminal.get_conversations_list(limit=limit, offset=offset)
if result["success"]:
return jsonify({
"success": True,
"data": result["data"]
})
else:
return jsonify({
"success": False,
"error": result.get("error", "Unknown error"),
"message": result.get("message", "获取对话列表失败")
}), 500
except Exception as e:
print(f"[API] 获取对话列表错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "获取对话列表时发生异常"
}), 500
@app.route('/api/conversations', methods=['POST'])
@api_login_required
@with_terminal
def create_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""创建新对话"""
try:
data = request.get_json() or {}
thinking_mode = data.get('thinking_mode', terminal.thinking_mode)
result = terminal.create_new_conversation(thinking_mode=thinking_mode)
if result["success"]:
# 广播对话列表更新事件
socketio.emit('conversation_list_update', {
'action': 'created',
'conversation_id': result["conversation_id"]
}, room=f"user_{username}")
# 广播当前对话切换事件
socketio.emit('conversation_changed', {
'conversation_id': result["conversation_id"],
'title': "新对话"
}, room=f"user_{username}")
return jsonify(result), 201
else:
return jsonify(result), 500
except Exception as e:
print(f"[API] 创建对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "创建对话时发生异常"
}), 500
@app.route('/api/conversations/<conversation_id>', methods=['GET'])
@api_login_required
@with_terminal
def get_conversation_info(terminal: WebTerminal, workspace: UserWorkspace, username: str, conversation_id):
"""获取特定对话信息"""
try:
# 通过ConversationManager直接获取对话数据
conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id)
if conversation_data:
# 提取关键信息,不返回完整消息内容(避免数据量过大)
info = {
"id": conversation_data["id"],
"title": conversation_data["title"],
"created_at": conversation_data["created_at"],
"updated_at": conversation_data["updated_at"],
"metadata": conversation_data["metadata"],
"messages_count": len(conversation_data.get("messages", []))
}
return jsonify({
"success": True,
"data": info
})
else:
return jsonify({
"success": False,
"error": "Conversation not found",
"message": f"对话 {conversation_id} 不存在"
}), 404
except Exception as e:
print(f"[API] 获取对话信息错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "获取对话信息时发生异常"
}), 500
@app.route('/api/conversations/<conversation_id>/load', methods=['PUT'])
@api_login_required
@with_terminal
def load_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""加载特定对话"""
try:
result = terminal.load_conversation(conversation_id)
if result["success"]:
# 广播对话切换事件
socketio.emit('conversation_changed', {
'conversation_id': conversation_id,
'title': result.get("title", "未知对话"),
'messages_count': result.get("messages_count", 0)
}, room=f"user_{username}")
# 广播系统状态更新(因为当前对话改变了)
status = terminal.get_status()
socketio.emit('status_update', status, room=f"user_{username}")
# 清理和重置相关UI状态
socketio.emit('conversation_loaded', {
'conversation_id': conversation_id,
'clear_ui': True # 提示前端清理当前UI状态
}, room=f"user_{username}")
return jsonify(result)
else:
return jsonify(result), 404 if "不存在" in result.get("message", "") else 500
except Exception as e:
print(f"[API] 加载对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "加载对话时发生异常"
}), 500
@app.route('/api/conversations/<conversation_id>', methods=['DELETE'])
@api_login_required
@with_terminal
def delete_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""删除特定对话"""
try:
# 检查是否是当前对话
is_current = (terminal.context_manager.current_conversation_id == conversation_id)
result = terminal.delete_conversation(conversation_id)
if result["success"]:
# 广播对话列表更新事件
socketio.emit('conversation_list_update', {
'action': 'deleted',
'conversation_id': conversation_id
}, room=f"user_{username}")
# 如果删除的是当前对话,广播对话清空事件
if is_current:
socketio.emit('conversation_changed', {
'conversation_id': None,
'title': None,
'cleared': True
}, room=f"user_{username}")
# 更新系统状态
status = terminal.get_status()
socketio.emit('status_update', status, room=f"user_{username}")
return jsonify(result)
else:
return jsonify(result), 404 if "不存在" in result.get("message", "") else 500
except Exception as e:
print(f"[API] 删除对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "删除对话时发生异常"
}), 500
@app.route('/api/conversations/search', methods=['GET'])
@api_login_required
@with_terminal
def search_conversations(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""搜索对话"""
try:
query = request.args.get('q', '').strip()
limit = request.args.get('limit', 20, type=int)
if not query:
return jsonify({
"success": False,
"error": "Missing query parameter",
"message": "请提供搜索关键词"
}), 400
# 限制参数范围
limit = max(1, min(limit, 50))
result = terminal.search_conversations(query, limit)
return jsonify({
"success": True,
"data": {
"results": result["results"],
"count": result["count"],
"query": query
}
})
except Exception as e:
print(f"[API] 搜索对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "搜索对话时发生异常"
}), 500
@app.route('/api/conversations/<conversation_id>/messages', methods=['GET'])
@api_login_required
@with_terminal
def get_conversation_messages(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取对话的消息历史(可选功能,用于调试或详细查看)"""
try:
# 获取完整对话数据
conversation_data = terminal.context_manager.conversation_manager.load_conversation(conversation_id)
if conversation_data:
messages = conversation_data.get("messages", [])
# 可选:限制消息数量,避免返回过多数据
limit = request.args.get('limit', type=int)
if limit:
messages = messages[-limit:] # 获取最后N条消息
return jsonify({
"success": True,
"data": {
"conversation_id": conversation_id,
"messages": messages,
"total_count": len(conversation_data.get("messages", []))
}
})
else:
return jsonify({
"success": False,
"error": "Conversation not found",
"message": f"对话 {conversation_id} 不存在"
}), 404
except Exception as e:
print(f"[API] 获取对话消息错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "获取对话消息时发生异常"
}), 500
@app.route('/api/conversations/<conversation_id>/compress', methods=['POST'])
@api_login_required
@with_terminal
def compress_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""压缩指定对话的大体积消息,生成压缩版新对话"""
try:
result = terminal.context_manager.compress_conversation(conversation_id)
if not result.get("success"):
status_code = 404 if "不存在" in result.get("error", "") else 400
return jsonify(result), status_code
new_conversation_id = result["compressed_conversation_id"]
load_result = terminal.load_conversation(new_conversation_id)
if load_result.get("success"):
socketio.emit('conversation_list_update', {
'action': 'compressed',
'conversation_id': new_conversation_id
}, room=f"user_{username}")
socketio.emit('conversation_changed', {
'conversation_id': new_conversation_id,
'title': load_result.get('title', '压缩后的对话'),
'messages_count': load_result.get('messages_count', 0)
}, room=f"user_{username}")
socketio.emit('conversation_loaded', {
'conversation_id': new_conversation_id,
'clear_ui': True
}, room=f"user_{username}")
response_payload = {
"success": True,
"compressed_conversation_id": new_conversation_id,
"compressed_types": result.get("compressed_types", []),
"system_message": result.get("system_message"),
"load_result": load_result
}
return jsonify(response_payload)
except Exception as e:
print(f"[API] 压缩对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "压缩对话时发生异常"
}), 500
# ==========================================
# 子智能体任务调度核心
# ==========================================
def create_sub_agent_task(payload: Dict[str, Any]) -> Dict[str, Any]:
required_fields = [
"task_id",
"agent_id",
"summary",
"task",
"workspace_dir",
"references_dir",
"deliverables_dir",
"target_project_dir",
"conversation_storage_dir",
]
for field in required_fields:
if not payload.get(field):
return {"success": False, "error": f"缺少必要参数: {field}"}
cleanup_inactive_sub_agent_tasks()
task_id = payload["task_id"]
if task_id in sub_agent_tasks:
return {"success": False, "error": f"任务 {task_id} 已存在"}
parent_conv = payload.get("parent_conversation_id")
if get_active_sub_agent_count(parent_conv) >= SUB_AGENT_MAX_ACTIVE:
limit_note = f"同一对话最多可同时运行 {SUB_AGENT_MAX_ACTIVE} 个子智能体"
return {"success": False, "error": f"已存在 {SUB_AGENT_MAX_ACTIVE} 个运行中的子智能体,请稍后再试({limit_note})。"}
workspace_dir = Path(payload["workspace_dir"]).resolve()
references_dir = Path(payload["references_dir"]).resolve()
deliverables_dir = Path(payload["deliverables_dir"]).resolve()
conversation_storage_root = Path(payload["conversation_storage_dir"]).expanduser().resolve()
conversation_storage_root.mkdir(parents=True, exist_ok=True)
conversation_data_dir = (conversation_storage_root / f"conv_{task_id}").resolve()
conversation_data_dir.mkdir(parents=True, exist_ok=True)
metadata = {
"task_id": task_id,
"agent_id": payload["agent_id"],
"summary": payload.get("summary", ""),
"task": payload.get("task", ""),
"workspace_dir": str(workspace_dir),
"references_dir": str(references_dir),
"deliverables_dir": str(deliverables_dir),
"target_project_dir": payload.get("target_project_dir"),
}
terminal = SubAgentTerminal(
workspace_dir=str(workspace_dir),
data_dir=str(conversation_data_dir),
metadata=metadata,
message_callback=None,
)
terminal.task_id = task_id
def _finish_hook(result: Dict[str, Any]):
reason = result.get("reason", "")
mark_task_completed(task_id, reason)
terminal.set_finish_callback(_finish_hook)
terminal._ensure_conversation()
sub_conversation_id = terminal.context_manager.current_conversation_id
sub_agent_terminals[task_id] = terminal
sub_agent_tasks[task_id] = {
**metadata,
"status": "running",
"created_at": time.time(),
"updated_at": time.time(),
"timeout_seconds": payload.get("timeout_seconds") or SUB_AGENT_DEFAULT_TIMEOUT,
"parent_conversation_id": payload.get("parent_conversation_id"),
"reference_manifest": payload.get("reference_manifest", []),
"conversation_data_dir": str(conversation_data_dir),
"sub_conversation_id": sub_conversation_id,
"last_client_sid": None,
}
initial_message = build_sub_agent_instruction(metadata)
sender = make_sub_agent_sender(task_id)
terminal.context_manager._web_terminal_callback = sender
client_sid = f"sub_agent::{task_id}::{int(time.time()*1000)}"
update_sub_agent_task(task_id, last_client_sid=client_sid)
socketio.start_background_task(process_message_task, terminal, initial_message, sender, client_sid)
return {
"success": True,
"status": "running",
"task_id": task_id,
"message": "子智能体任务已启动",
"deliverables_dir": str(deliverables_dir),
"sub_conversation_id": sub_conversation_id,
}
def send_message_to_sub_agent(task_id: str, message: str) -> Dict[str, Any]:
terminal = sub_agent_terminals.get(task_id)
if not terminal:
return {"success": False, "error": "未找到该子智能体"}
if sub_agent_tasks.get(task_id, {}).get("status") in {"completed", "failed", "timeout"}:
return {"success": False, "error": "任务已结束,无法继续发送消息"}
sender = make_sub_agent_sender(task_id)
client_sid = f"sub_agent::{task_id}::{int(time.time()*1000)}"
update_sub_agent_task(task_id, last_client_sid=client_sid)
socketio.start_background_task(process_message_task, terminal, message, sender, client_sid)
return {"success": True, "message": "消息已发送"}
def stop_sub_agent_task(task_id: str) -> Dict[str, Any]:
task = sub_agent_tasks.get(task_id)
if not task:
return {"success": False, "error": "未找到任务"}
client_sid = task.get("last_client_sid")
if not client_sid or client_sid not in stop_flags:
return {"success": False, "error": "当前没有可停止的执行"}
stop_flags[client_sid]["stop"] = True
update_sub_agent_task(task_id, status="stopping")
return {"success": True, "message": "已发送停止指令"}
def force_terminate_sub_agent(task_id: str) -> Dict[str, Any]:
cleanup_inactive_sub_agent_tasks()
task = sub_agent_tasks.get(task_id)
if not task:
return {"success": False, "error": "任务不存在"}
client_sid = task.get("last_client_sid")
if client_sid and client_sid in stop_flags:
stop_flags[client_sid]["stop"] = True
terminated_tasks.add(task_id)
terminal = sub_agent_terminals.get(task_id)
if terminal:
try:
reset_system_state(terminal)
except Exception:
pass
update_sub_agent_task(task_id, status="terminated")
broadcast_sub_agent_event(task_id, "sub_agent_status", {
"status": "terminated",
"message": "子智能体已被手动关闭"
})
_purge_sub_agent_task(task_id)
return {
"success": True,
"status": "terminated",
"message": "子智能体已被手动关闭",
"system_message": "🛑 子智能体已被手动关闭。"
}
# ==========================================
# 子智能体任务API主智能体调用免登录
# ==========================================
@app.route('/tasks', methods=['POST'])
def api_create_sub_agent_task():
data = request.get_json(silent=True) or {}
result = create_sub_agent_task(data)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@app.route('/tasks/<task_id>', methods=['GET'])
def api_get_sub_agent_task(task_id: str):
task = get_task_record(task_id)
from_store = task_id not in sub_agent_tasks
if not task:
return jsonify({"success": False, "status": "unknown", "message": "任务不存在"}), 404
payload = {
"success": True,
"task_id": task_id,
"status": task.get("status") or (task.get("final_result") or {}).get("status") or "unknown",
"message": task.get("error") or task.get("completion_reason") or (task.get("final_result") or {}).get("message") or "",
"deliverables_dir": task.get("deliverables_dir"),
"workspace_dir": task.get("workspace_dir"),
"references_dir": task.get("references_dir"),
"target_project_dir": task.get("target_project_dir"),
"summary": task.get("summary"),
"last_tool": task.get("last_tool"),
"updated_at": task.get("updated_at"),
"sub_conversation_id": task.get("sub_conversation_id"),
}
if task.get("status") == "completed":
payload["message"] = task.get("completion_reason") or "子智能体已完成任务"
if from_store:
payload["archived"] = True
return jsonify(payload)
@app.route('/tasks/<task_id>/messages', methods=['POST'])
def api_send_message_to_sub_agent(task_id: str):
data = request.get_json(silent=True) or {}
message = (data.get("message") or "").strip()
if not message:
return jsonify({"success": False, "error": "消息内容不能为空"}), 400
result = send_message_to_sub_agent(task_id, message)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@app.route('/tasks/<task_id>/stop', methods=['POST'])
def api_stop_sub_agent(task_id: str):
result = stop_sub_agent_task(task_id)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@app.route('/tasks/<task_id>/terminate', methods=['POST'])
def api_terminate_sub_agent(task_id: str):
result = force_terminate_sub_agent(task_id)
status_code = 200 if result.get("success") else 400
return jsonify(result), status_code
@app.route('/tasks/<task_id>/conversation', methods=['GET'])
def api_get_sub_agent_conversation(task_id: str):
info = get_task_record(task_id)
from_store = task_id not in sub_agent_tasks
if not info:
return jsonify({"success": False, "error": "任务不存在"}), 404
conv_id = info.get("sub_conversation_id") or info.get("conversation_id")
if not conv_id:
return jsonify({"success": True, "conversation_id": None, "messages": []})
conv_file = None
if not from_store:
data_dir = Path(info.get("conversation_data_dir") or "").expanduser()
candidate = data_dir / "conversations" / f"{conv_id}.json"
if candidate.exists():
conv_file = candidate
if conv_file is None:
conv_file = find_sub_agent_conversation_file(conv_id)
if conv_file is None:
return jsonify({"success": True, "conversation_id": conv_id, "messages": []})
try:
content = json.loads(conv_file.read_text(encoding="utf-8"))
except Exception as exc:
return jsonify({"success": False, "error": f"读取对话失败: {exc}"}), 500
messages = content.get("messages", []) if isinstance(content, dict) else []
return jsonify({
"success": True,
"conversation_id": conv_id,
"messages": messages
})
@app.route('/sub_agent/conversations/<conv_id>', methods=['GET'])
def api_get_archived_sub_agent_conversation(conv_id: str):
"""按conversation_id直接读取历史子智能体对话文件。"""
normalized = _normalize_conversation_id(conv_id)
file_path = find_sub_agent_conversation_file(normalized)
if not file_path or not file_path.exists():
return jsonify({"success": False, "error": "对话不存在"}), 404
try:
content = json.loads(file_path.read_text(encoding="utf-8"))
except Exception as exc:
return jsonify({"success": False, "error": f"读取对话失败: {exc}"}), 500
messages = content.get("messages", []) if isinstance(content, dict) else []
return jsonify({
"success": True,
"conversation_id": normalized,
"messages": messages
})
@app.route('/tasks/<task_id>/files', methods=['GET'])
def api_get_sub_agent_files(task_id: str):
"""返回子智能体工作目录的文件树,只读展示使用。"""
task = get_task_record(task_id)
if not task:
return jsonify({"success": False, "error": "任务不存在"}), 404
workspace_dir = task.get("workspace_dir")
if not workspace_dir:
return jsonify({"success": False, "error": "任务未设置工作目录"}), 400
try:
tree = build_workspace_tree(workspace_dir)
return jsonify({"success": True, "data": tree})
except Exception as exc:
return jsonify({"success": False, "error": f'构建文件树失败: {exc}'}), 500
@app.route('/sub_agent/<task_id>')
def serve_sub_agent_page(task_id: str):
"""子智能体监控页面仅任务ID"""
actual_id, parent_conv, sub_conv = _resolve_task_by_conv(None, task_id)
return _render_sub_agent_index(actual_id, parent_conv, sub_conv)
@app.route('/<conv_slug>/sub_agent_<task_label>')
def serve_sub_agent_with_conv(conv_slug: str, task_label: str):
"""带对话ID的子智能体页面 /conv_slug/sub_agent_<label>"""
task_id, parent_conv, sub_conv = _resolve_task_by_conv(conv_slug, f"sub_agent_{task_label}" if not task_label.startswith("sub_agent") else task_label)
return _render_sub_agent_index(task_id, parent_conv, sub_conv)
@app.route('/<conv_slug>+<task_label>')
def serve_sub_agent_plus(conv_slug: str, task_label: str):
"""带 + 分隔的子智能体页面 /conv_slug+sub_agent1"""
task_id, parent_conv, sub_conv = _resolve_task_by_conv(conv_slug, task_label)
return _render_sub_agent_index(task_id, parent_conv, sub_conv)
@app.route('/api/conversations/<conversation_id>/duplicate', methods=['POST'])
@api_login_required
@with_terminal
def duplicate_conversation(conversation_id, terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""复制指定对话,生成新的对话副本"""
try:
result = terminal.context_manager.duplicate_conversation(conversation_id)
if not result.get("success"):
status_code = 404 if "不存在" in result.get("error", "") else 400
return jsonify(result), status_code
new_conversation_id = result["duplicate_conversation_id"]
load_result = terminal.load_conversation(new_conversation_id)
if load_result.get("success"):
socketio.emit('conversation_list_update', {
'action': 'duplicated',
'conversation_id': new_conversation_id
}, room=f"user_{username}")
socketio.emit('conversation_changed', {
'conversation_id': new_conversation_id,
'title': load_result.get('title', '复制的对话'),
'messages_count': load_result.get('messages_count', 0)
}, room=f"user_{username}")
socketio.emit('conversation_loaded', {
'conversation_id': new_conversation_id,
'clear_ui': True
}, room=f"user_{username}")
response_payload = {
"success": True,
"duplicate_conversation_id": new_conversation_id,
"load_result": load_result
}
return jsonify(response_payload)
except Exception as e:
print(f"[API] 复制对话错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "复制对话时发生异常"
}), 500
@app.route('/api/conversations/statistics', methods=['GET'])
@api_login_required
@with_terminal
def get_conversations_statistics(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取对话统计信息"""
try:
stats = terminal.context_manager.get_conversation_statistics()
return jsonify({
"success": True,
"data": stats
})
except Exception as e:
print(f"[API] 获取对话统计错误: {e}")
return jsonify({
"success": False,
"error": str(e),
"message": "获取对话统计时发生异常"
}), 500
@app.route('/api/conversations/current', methods=['GET'])
@api_login_required
@with_terminal
def get_current_conversation(terminal: WebTerminal, workspace: UserWorkspace, username: str):
"""获取当前对话信息"""
current_id = terminal.context_manager.current_conversation_id
# 如果是临时ID返回空的对话信息
if not current_id or current_id.startswith('temp_'):
return jsonify({
"success": True,
"data": {
"id": current_id,
"title": "新对话",
"messages_count": 0,
"is_temporary": True
}
})
# 如果是真实的对话ID查找对话数据
try:
conversation_data = terminal.context_manager.conversation_manager.load_conversation(current_id)
if conversation_data:
return jsonify({
"success": True,
"data": {
"id": current_id,
"title": conversation_data.get("title", "未知对话"),
"messages_count": len(conversation_data.get("messages", [])),
"is_temporary": False
}
})
else:
return jsonify({
"success": False,
"error": "对话不存在"
}), 404
except Exception as e:
print(f"[API] 获取当前对话错误: {e}")
return jsonify({
"success": False,
"error": str(e)
}), 500
def process_message_task(terminal: WebTerminal, message: str, sender, client_sid):
"""在后台处理消息任务"""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 创建可取消的任务
task = loop.create_task(handle_task_with_sender(terminal, message, sender, client_sid))
# 存储任务引用,以便取消
if client_sid not in stop_flags:
stop_flags[client_sid] = {'stop': False, 'task': task, 'terminal': terminal}
else:
stop_flags[client_sid]['task'] = task
stop_flags[client_sid]['terminal'] = terminal
task_id = getattr(terminal, "task_id", None)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
debug_log(f"任务 {client_sid} 被成功取消")
sender('task_stopped', {
'message': '任务已停止',
'reason': 'user_requested'
})
reset_system_state(terminal)
finally:
if task_id:
terminated_tasks.discard(task_id)
loop.close()
except Exception as e:
# 【新增】错误时确保对话状态不丢失
try:
if terminal and terminal.context_manager:
# 尝试保存当前对话状态
terminal.context_manager.auto_save_conversation()
debug_log("错误恢复:对话状态已保存")
except Exception as save_error:
debug_log(f"错误恢复:保存对话状态失败: {save_error}")
# 原有的错误处理逻辑
print(f"[Task] 错误: {e}")
debug_log(f"任务处理错误: {e}")
import traceback
traceback.print_exc()
sender('error', {'message': str(e)})
sender('task_complete', {
'total_iterations': 0,
'total_tool_calls': 0,
'auto_fix_attempts': 0,
'error': str(e)
})
finally:
# 清理任务引用
if client_sid in stop_flags and isinstance(stop_flags[client_sid], dict):
stop_flags.pop(client_sid, None)
def detect_malformed_tool_call(text):
"""检测文本中是否包含格式错误的工具调用"""
# 检测多种可能的工具调用格式
patterns = [
r'执行工具[:]\s*\w+<.*?tool.*?sep.*?>', # 执行工具: xxx<tool▼sep>
r'<\|?tool[_▼]?call[_▼]?start\|?>', # <tool_call_start>
r'```tool[_\s]?call', # ```tool_call 或 ```tool call
r'{\s*"tool":\s*"[^"]+",\s*"arguments"', # JSON格式的工具调用
r'function_calls?:\s*\[?\s*{', # function_call: [{
]
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
# 检测特定的工具名称后跟JSON
tool_names = ['create_file', 'read_file', 'modify_file', 'delete_file',
'append_to_file', 'terminal_session', 'terminal_input', 'web_search',
'extract_webpage', 'save_webpage',
'run_python', 'run_command', 'focus_file', 'unfocus_file', 'sleep']
for tool in tool_names:
if tool in text and '{' in text:
# 可能是工具调用但格式错误
return True
return False
async def handle_task_with_sender(terminal: WebTerminal, message, sender, client_sid):
"""处理任务并发送消息子智能体Web终端"""
web_terminal = terminal
finish_called = False
finish_prompt_sent = False
# 如果是思考模式,重置状态
if web_terminal.thinking_mode:
web_terminal.api_client.start_new_task()
# 添加到对话历史
web_terminal.context_manager.add_conversation("user", message)
# === 子智能体版本:跳过额外统计 ===
# 构建上下文和消息用于API调用
context = web_terminal.build_context()
messages = web_terminal.build_messages(context, message)
tools = web_terminal.define_tools()
# 开始新的AI消息
sender('ai_message_start', {})
# 增量保存相关变量
has_saved_thinking = False # 是否已保存思考内容
accumulated_response = "" # 累积的响应内容
is_first_iteration = True # 是否是第一次迭代
# 统计和限制变量
total_iterations = 0
total_tool_calls = 0
consecutive_same_tool = defaultdict(int)
last_tool_name = ""
auto_fix_attempts = 0
last_tool_call_time = 0
# 设置最大迭代次数
max_iterations = MAX_ITERATIONS_PER_TASK
pending_append = None # {"path": str, "tool_call_id": str, "buffer": str, ...}
append_probe_buffer = ""
pending_modify = None # {"path": str, "tool_call_id": str, "buffer": str, ...}
modify_probe_buffer = ""
async def finalize_pending_append(response_text: str, stream_completed: bool, finish_reason: str = None) -> Dict:
"""在流式输出结束后处理追加写入"""
nonlocal pending_append, append_probe_buffer
result = {
"handled": False,
"success": False,
"summary": None,
"summary_message": None,
"tool_content": None,
"tool_call_id": None,
"path": None,
"forced": False,
"error": None,
"assistant_content": response_text,
"lines": 0,
"bytes": 0,
"finish_reason": finish_reason,
"appended_content": "",
"assistant_metadata": None
}
if not pending_append:
return result
state = pending_append
path = state.get("path")
tool_call_id = state.get("tool_call_id")
buffer = state.get("buffer", "")
start_marker = state.get("start_marker")
end_marker = state.get("end_marker")
start_idx = state.get("content_start")
end_idx = state.get("end_index")
display_id = state.get("display_id")
result.update({
"handled": True,
"path": path,
"tool_call_id": tool_call_id,
"display_id": display_id
})
if path is None or tool_call_id is None:
error_msg = "append_to_file 状态不完整缺少路径或ID。"
debug_log(error_msg)
result["error"] = error_msg
result["summary_message"] = error_msg
result["tool_content"] = json.dumps({
"success": False,
"error": error_msg
}, ensure_ascii=False)
if display_id:
sender('update_action', {
'id': display_id,
'status': 'failed',
'preparing_id': tool_call_id,
'message': error_msg
})
pending_append = None
return result
if start_idx is None:
error_msg = f"未检测到格式正确的开始标识 {start_marker}"
debug_log(error_msg)
result["error"] = error_msg
result["summary_message"] = error_msg
result["tool_content"] = json.dumps({
"success": False,
"path": path,
"error": error_msg
}, ensure_ascii=False)
if display_id:
sender('update_action', {
'id': display_id,
'status': 'failed',
'preparing_id': tool_call_id,
'message': error_msg
})
pending_append = None
return result
forced = False
if end_idx is None:
forced = True
# 查找下一个<<<,否则使用整个缓冲结尾
remaining = buffer[start_idx:]
next_marker = remaining.find("<<<", len(end_marker))
if next_marker != -1:
end_idx = start_idx + next_marker
else:
end_idx = len(buffer)
content = buffer[start_idx:end_idx]
if content.startswith('\n'):
content = content[1:]
if not content:
error_msg = "未检测到需要追加的内容,请严格按照<<<APPEND:path>>>...<<<END_APPEND>>>格式输出。"
debug_log(error_msg)
result["error"] = error_msg
result["forced"] = forced
result["tool_content"] = json.dumps({
"success": False,
"path": path,
"error": error_msg
}, ensure_ascii=False)
if display_id:
sender('update_action', {
'id': display_id,
'status': 'failed',
'preparing_id': tool_call_id,
'message': error_msg
})
pending_append = None
return result
assistant_message_lines = []
if start_marker:
assistant_message_lines.append(start_marker)
assistant_message_lines.append(content)
if not forced and end_marker:
assistant_message_lines.append(end_marker)
assistant_message_text = "\n".join(assistant_message_lines)
result["assistant_content"] = assistant_message_text
assistant_metadata = {
"append_payload": {
"path": path,
"tool_call_id": tool_call_id,
"forced": forced,
"has_end_marker": not forced
}
}
result["assistant_metadata"] = assistant_metadata
write_result = web_terminal.file_manager.append_file(path, content)
if write_result.get("success"):
bytes_written = len(content.encode('utf-8'))
line_count = content.count('\n')
if content and not content.endswith('\n'):
line_count += 1
summary = f"已向 {path} 追加 {line_count} 行({bytes_written} 字节)"
if forced:
summary += "。未检测到 <<<END_APPEND>>> 标记,系统已在流结束处完成写入。如内容未完成,请重新调用 append_to_file 并按标准格式补充;如已完成,可继续后续步骤。"
result.update({
"success": True,
"summary": summary,
"summary_message": summary,
"forced": forced,
"lines": line_count,
"bytes": bytes_written,
"appended_content": content,
"tool_content": json.dumps({
"success": True,
"path": path,
"lines": line_count,
"bytes": bytes_written,
"forced": forced,
"message": summary,
"finish_reason": finish_reason
}, ensure_ascii=False)
})
assistant_meta_payload = result["assistant_metadata"]["append_payload"]
assistant_meta_payload["lines"] = line_count
assistant_meta_payload["bytes"] = bytes_written
assistant_meta_payload["success"] = True
summary_payload = {
"success": True,
"path": path,
"lines": line_count,
"bytes": bytes_written,
"forced": forced,
"message": summary
}
if display_id:
sender('update_action', {
'id': display_id,
'status': 'completed',
'result': summary_payload,
'preparing_id': tool_call_id,
'message': summary
})
# 更新聚焦文件内容
if path in web_terminal.focused_files:
refreshed = web_terminal.file_manager.read_file(path)
if refreshed.get("success"):
web_terminal.focused_files[path] = refreshed["content"]
debug_log(f"聚焦文件已刷新: {path}")
debug_log(f"追加写入完成: {summary}")
else:
error_msg = write_result.get("error", "追加写入失败")
result.update({
"error": error_msg,
"summary_message": error_msg,
"forced": forced,
"appended_content": content,
"tool_content": json.dumps({
"success": False,
"path": path,
"error": error_msg,
"finish_reason": finish_reason
}, ensure_ascii=False)
})
debug_log(f"追加写入失败: {error_msg}")
if result["assistant_metadata"]:
assistant_meta_payload = result["assistant_metadata"]["append_payload"]
assistant_meta_payload["lines"] = content.count('\n') + (0 if content.endswith('\n') or not content else 1)
assistant_meta_payload["bytes"] = len(content.encode('utf-8'))
assistant_meta_payload["success"] = False
failure_payload = {
"success": False,
"path": path,
"error": error_msg,
"forced": forced
}
if display_id:
sender('update_action', {
'id': display_id,
'status': 'completed',
'result': failure_payload,
'preparing_id': tool_call_id,
'message': error_msg
})
pending_append = None
append_probe_buffer = ""
if hasattr(web_terminal, "pending_append_request"):
web_terminal.pending_append_request = None
return result
async def finalize_pending_modify(response_text: str, stream_completed: bool, finish_reason: str = None) -> Dict:
"""在流式输出结束后处理修改写入"""
nonlocal pending_modify, modify_probe_buffer
result = {
"handled": False,
"success": False,
"path": None,
"tool_call_id": None,
"display_id": None,
"total_blocks": 0,
"completed_blocks": [],
"failed_blocks": [],
"forced": False,
"details": [],
"error": None,
"assistant_content": response_text,
"assistant_metadata": None,
"tool_content": None,
"summary_message": None,
"finish_reason": finish_reason
}
if not pending_modify:
return result
state = pending_modify
path = state.get("path")
tool_call_id = state.get("tool_call_id")
display_id = state.get("display_id")
start_marker = state.get("start_marker")
end_marker = state.get("end_marker")
buffer = state.get("buffer", "")
raw_buffer = state.get("raw_buffer", "")
end_index = state.get("end_index")
result.update({
"handled": True,
"path": path,
"tool_call_id": tool_call_id,
"display_id": display_id
})
if not state.get("start_seen"):
error_msg = "未检测到格式正确的 <<<MODIFY:path>>> 标记。"
debug_log(error_msg)
result["error"] = error_msg
result["summary_message"] = error_msg
result["tool_content"] = json.dumps({
"success": False,
"path": path,
"error": error_msg,
"finish_reason": finish_reason
}, ensure_ascii=False)
if display_id:
sender('update_action', {
'id': display_id,
'status': 'failed',
'preparing_id': tool_call_id,
'message': error_msg
})
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = None
pending_modify = None
modify_probe_buffer = ""
return result
forced = end_index is None
apply_text = buffer if forced else buffer[:end_index]
raw_content = raw_buffer if forced else raw_buffer[:len(start_marker) + end_index + len(end_marker)]
if raw_content:
result["assistant_content"] = raw_content
blocks_info = []
block_reports = {}
detected_indices = set()
block_pattern = re.compile(r"\[replace:(\d+)\](.*?)\[/replace\]", re.DOTALL)
structure_warnings: List[str] = []
structure_detail_entries: List[Dict] = []
def record_structure_warning(message: str, hint: Optional[str] = None):
"""记录结构性缺陷,便于给出更具体的反馈。"""
if message in structure_warnings:
return
structure_warnings.append(message)
structure_detail_entries.append({
"index": 0,
"status": "failed",
"reason": message,
"removed_lines": 0,
"added_lines": 0,
"hint": hint or "请严格按照模板输出:[replace:n] + <<OLD>>/<<NEW>> + [/replace],并使用 <<<END_MODIFY>>> 收尾。"
})
def extract_segment(body: str, tag: str):
marker = f"<<{tag}>>"
end_tag = "<<END>>"
start_pos = body.find(marker)
if start_pos == -1:
return None, f"缺少 {marker}"
start_pos += len(marker)
if body[start_pos:start_pos+2] == "\r\n":
start_pos += 2
elif body[start_pos:start_pos+1] == "\n":
start_pos += 1
end_pos = body.find(end_tag, start_pos)
if end_pos == -1:
return None, f"缺少 {end_tag}"
segment = body[start_pos:end_pos]
return segment, None
for match in block_pattern.finditer(apply_text):
try:
index = int(match.group(1))
except ValueError:
continue
body = match.group(2)
if index in detected_indices:
continue
detected_indices.add(index)
block_reports[index] = {
"index": index,
"status": "pending",
"reason": None,
"removed_lines": 0,
"added_lines": 0,
"hint": None
}
old_content, old_error = extract_segment(body, "OLD")
new_content, new_error = extract_segment(body, "NEW")
if old_error or new_error:
reason = old_error or new_error
block_reports[index]["status"] = "failed"
block_reports[index]["reason"] = reason
blocks_info.append({
"index": index,
"old": old_content,
"new": new_content,
"error": old_error or new_error
})
if not blocks_info:
has_replace_start = bool(re.search(r"\[replace:\s*\d+\]", apply_text))
has_replace_end = "[/replace]" in apply_text
has_old_tag = "<<OLD>>" in apply_text
has_new_tag = "<<NEW>>" in apply_text
if has_replace_start and not has_replace_end:
record_structure_warning("检测到 [replace:n] 标记但缺少对应的 [/replace] 结束标记。")
if has_replace_end and not has_replace_start:
record_structure_warning("检测到 [/replace] 结束标记但缺少对应的 [replace:n] 起始标记。")
old_tags = len(re.findall(r"<<OLD>>", apply_text))
completed_old_tags = len(re.findall(r"<<OLD>>[\s\S]*?<<END>>", apply_text))
if old_tags and completed_old_tags < old_tags:
record_structure_warning("检测到 <<OLD>> 段落但未看到对应的 <<END>> 结束标记。")
new_tags = len(re.findall(r"<<NEW>>", apply_text))
completed_new_tags = len(re.findall(r"<<NEW>>[\s\S]*?<<END>>", apply_text))
if new_tags and completed_new_tags < new_tags:
record_structure_warning("检测到 <<NEW>> 段落但未看到对应的 <<END>> 结束标记。")
if (has_replace_start or has_replace_end or has_old_tag or has_new_tag) and not structure_warnings:
record_structure_warning("检测到部分补丁标记,但整体结构不完整,请严格按照模板填写所有标记。")
total_blocks = len(blocks_info)
result["total_blocks"] = total_blocks
if forced:
debug_log("未检测到 <<<END_MODIFY>>>,将在流结束处执行已识别的修改块。")
result["forced"] = True
blocks_to_apply = [
{"index": block["index"], "old": block["old"], "new": block["new"]}
for block in blocks_info
if block["error"] is None and block["old"] is not None and block["new"] is not None
]
# 记录格式残缺的块
for block in blocks_info:
if block["error"]:
idx = block["index"]
block_reports[idx]["status"] = "failed"
block_reports[idx]["reason"] = block["error"]
block_reports[idx]["hint"] = "请检查补丁块的 OLD/NEW 标记是否完整,必要时复用 terminal_snapshot 或终端命令重新调整。"
apply_result = {}
if blocks_to_apply:
apply_result = web_terminal.file_manager.apply_modify_blocks(path, blocks_to_apply)
else:
apply_result = {"success": False, "completed": [], "failed": [], "results": [], "write_performed": False, "error": None}
block_result_map = {item["index"]: item for item in apply_result.get("results", [])}
for block in blocks_info:
idx = block["index"]
report = block_reports.get(idx)
if report is None:
continue
if report["status"] == "failed":
continue
block_apply = block_result_map.get(idx)
if not block_apply:
report["status"] = "failed"
report["reason"] = "未执行,可能未找到匹配原文"
report["hint"] = report.get("hint") or "请确认 OLD 文本与文件内容完全一致;若多次失败,可改用终端命令/Python 进行精准替换。"
continue
status = block_apply.get("status")
report["removed_lines"] = block_apply.get("removed_lines", 0)
report["added_lines"] = block_apply.get("added_lines", 0)
if block_apply.get("hint"):
report["hint"] = block_apply.get("hint")
if status == "success":
report["status"] = "completed"
elif status == "not_found":
report["status"] = "failed"
report["reason"] = block_apply.get("reason") or "未找到匹配的原文"
if not report.get("hint"):
report["hint"] = "请使用 terminal_snapshot/grep -n 校验原文,或在说明后改用 run_command/python 精确替换。"
else:
report["status"] = "failed"
report["reason"] = block_apply.get("reason") or "替换失败"
if not report.get("hint"):
report["hint"] = block_apply.get("hint") or "若多次尝试仍失败,可考虑利用终端命令或 Python 小脚本完成此次修改。"
completed_blocks = sorted([idx for idx, rep in block_reports.items() if rep["status"] == "completed"])
failed_blocks = sorted([idx for idx, rep in block_reports.items() if rep["status"] != "completed"])
result["completed_blocks"] = completed_blocks
result["failed_blocks"] = failed_blocks
details = sorted(block_reports.values(), key=lambda x: x["index"])
if structure_detail_entries:
details = structure_detail_entries + details
result["details"] = details
summary_parts = []
if total_blocks == 0:
summary_parts.append("未检测到有效的修改块,未执行任何修改。")
summary_parts.extend(structure_warnings)
else:
if not completed_blocks and failed_blocks:
summary_parts.append(f"共检测到 {total_blocks} 个修改块,全部未执行。")
elif completed_blocks and not failed_blocks:
summary_parts.append(f"{total_blocks} 个修改块全部完成。")
else:
summary_parts.append(
f"共检测到 {total_blocks} 个修改块,其中成功 {len(completed_blocks)} 个,失败 {len(failed_blocks)} 个。"
)
if forced:
summary_parts.append("未检测到 <<<END_MODIFY>>> 标记,系统已在流结束处执行补丁。")
if apply_result.get("error"):
summary_parts.append(apply_result["error"])
matching_note = "提示:补丁匹配基于完整文本,包含注释和空白符,请确保 <<<OLD>>> 段落与文件内容逐字一致。如果修改成功,请忽略,如果失败,请明确原文后再次尝试。"
summary_parts.append(matching_note)
summary_message = " ".join(summary_parts).strip()
result["summary_message"] = summary_message
result["success"] = bool(completed_blocks) and not failed_blocks and apply_result.get("error") is None
tool_payload = {
"success": result["success"],
"path": path,
"total_blocks": total_blocks,
"completed": completed_blocks,
"failed": [
{
"index": rep["index"],
"reason": rep.get("reason"),
"hint": rep.get("hint")
}
for rep in result["details"] if rep["status"] != "completed"
],
"forced": forced,
"message": summary_message,
"finish_reason": finish_reason,
"details": result["details"]
}
if apply_result.get("error"):
tool_payload["error"] = apply_result["error"]
result["tool_content"] = json.dumps(tool_payload, ensure_ascii=False)
result["assistant_metadata"] = {
"modify_payload": {
"path": path,
"total_blocks": total_blocks,
"completed": completed_blocks,
"failed": failed_blocks,
"forced": forced,
"details": result["details"]
}
}
if display_id:
sender('update_action', {
'id': display_id,
'status': 'completed' if result["success"] else 'failed',
'result': tool_payload,
'preparing_id': tool_call_id,
'message': summary_message
})
if path in web_terminal.focused_files and tool_payload.get("success"):
refreshed = web_terminal.file_manager.read_file(path)
if refreshed.get("success"):
web_terminal.focused_files[path] = refreshed["content"]
debug_log(f"聚焦文件已刷新: {path}")
pending_modify = None
modify_probe_buffer = ""
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = None
return result
async def process_sub_agent_updates(
messages: List[Dict],
inline: bool = False,
after_tool_call_id: Optional[str] = None
):
"""轮询子智能体任务并通知前端,并把结果插入当前对话上下文。"""
manager = getattr(web_terminal, "sub_agent_manager", None)
if not manager:
return
try:
updates = manager.poll_updates()
debug_log(f"[SubAgent] poll inline={inline} updates={len(updates)}")
except Exception as exc:
debug_log(f"子智能体状态检查失败: {exc}")
return
for update in updates:
message = update.get("system_message")
if not message:
continue
task_id = update.get("task_id")
debug_log(f"[SubAgent] update task={task_id} inline={inline} msg={message}")
web_terminal._record_sub_agent_message(message, task_id, inline=inline)
debug_log(f"[SubAgent] recorded task={task_id}, 计算插入位置")
insert_index = len(messages)
if after_tool_call_id:
for idx, msg in enumerate(messages):
if msg.get("role") == "tool" and msg.get("tool_call_id") == after_tool_call_id:
insert_index = idx + 1
break
messages.insert(insert_index, {
"role": "system",
"content": message,
"metadata": {"sub_agent_notice": True, "inline": inline, "task_id": task_id}
})
debug_log(f"[SubAgent] 插入系统消息位置: {insert_index}")
sender('system_message', {
'content': message,
'inline': inline
})
for iteration in range(max_iterations):
total_iterations += 1
debug_log(f"\n--- 迭代 {iteration + 1}/{max_iterations} 开始 ---")
task_id = getattr(web_terminal, "task_id", None)
if task_id and task_id in terminated_tasks:
sender('system_message', {'content': '🛑 子智能体已被手动关闭。'})
break
# 检查是否超过总工具调用限制
if total_tool_calls >= MAX_TOTAL_TOOL_CALLS:
debug_log(f"已达到最大工具调用次数限制 ({MAX_TOTAL_TOOL_CALLS})")
sender('system_message', {
'content': f'⚠️ 已达到最大工具调用次数限制 ({MAX_TOTAL_TOOL_CALLS}),任务结束。'
})
break
full_response = ""
tool_calls = []
current_thinking = ""
detected_tools = {}
# 状态标志
in_thinking = False
thinking_started = False
thinking_ended = False
text_started = False
text_has_content = False
TEXT_BUFFER_MAX_CHARS = 1
TEXT_BUFFER_MAX_INTERVAL = 0.0
TEXT_BUFFER_FLUSH_CHARS = 1
text_chunk_buffer: deque[str] = deque()
text_chunk_buffer_size = 0
last_text_flush_time = time.time()
TEXT_BUFFER_CHAR_DELAY = 0.02
def _drain_text_buffer(force: bool = False) -> bool:
nonlocal text_chunk_buffer, text_chunk_buffer_size, last_text_flush_time
if not text_chunk_buffer:
return False
drain_all = force or TEXT_BUFFER_MAX_INTERVAL == 0.0
sent = False
while text_chunk_buffer:
now = time.time()
should_flush = (
force
or text_chunk_buffer_size >= TEXT_BUFFER_MAX_CHARS
or TEXT_BUFFER_MAX_INTERVAL == 0.0
or (TEXT_BUFFER_MAX_INTERVAL > 0 and (now - last_text_flush_time) >= TEXT_BUFFER_MAX_INTERVAL)
)
if not should_flush:
break
batch_size = text_chunk_buffer_size if drain_all else max(1, min(text_chunk_buffer_size, TEXT_BUFFER_FLUSH_CHARS or 1))
pieces: List[str] = []
remaining = batch_size
while text_chunk_buffer and remaining > 0:
chunk = text_chunk_buffer.popleft()
chunk_len = len(chunk)
if chunk_len <= remaining:
pieces.append(chunk)
remaining -= chunk_len
else:
pieces.append(chunk[:remaining])
text_chunk_buffer.appendleft(chunk[remaining:])
chunk_len = remaining
remaining = 0
text_chunk_buffer_size -= chunk_len
if not pieces:
break
sender('text_chunk', {'content': "".join(pieces)})
last_text_flush_time = now
sent = True
if not drain_all:
break
return sent
async def flush_text_buffer(force: bool = False):
sent = _drain_text_buffer(force)
if sent and not force and TEXT_BUFFER_CHAR_DELAY > 0:
await asyncio.sleep(TEXT_BUFFER_CHAR_DELAY)
text_streaming = False
# 计数器
chunk_count = 0
reasoning_chunks = 0
content_chunks = 0
tool_chunks = 0
append_break_triggered = False
append_result = {"handled": False}
modify_break_triggered = False
modify_result = {"handled": False}
last_finish_reason = None
# 获取是否显示思考
should_show_thinking = web_terminal.api_client.get_current_thinking_mode()
debug_log(f"思考模式: {should_show_thinking}")
print(f"[API] 第{iteration + 1}次调用 (总工具调用: {total_tool_calls}/{MAX_TOTAL_TOOL_CALLS})")
# 收集流式响应
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
chunk_count += 1
# 检查停止标志
client_stop_info = stop_flags.get(client_sid)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log(f"检测到停止请求,中断流处理")
if pending_append:
append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop")
break
if pending_modify:
modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop")
break
if "choices" not in chunk:
debug_log(f"Chunk {chunk_count}: 无choices字段")
continue
choice = chunk["choices"][0]
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
if finish_reason:
last_finish_reason = finish_reason
# 处理思考内容
if "reasoning_content" in delta:
reasoning_content = delta["reasoning_content"]
if reasoning_content:
reasoning_chunks += 1
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
if should_show_thinking:
if not thinking_started:
in_thinking = True
thinking_started = True
sender('thinking_start', {})
await asyncio.sleep(0.05)
current_thinking += reasoning_content
sender('thinking_chunk', {'content': reasoning_content})
# 处理正常内容
if "content" in delta:
content = delta["content"]
if content:
content_chunks += 1
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
# 通过文本内容提前检测工具调用意图
if not detected_tools:
# 检测常见的工具调用模式
tool_patterns = [
(r'(创建|新建|生成).*(文件|file)', 'create_file'),
(r'(读取|查看|打开).*(文件|file)', 'read_file'),
(r'(修改|编辑|更新).*(文件|file)', 'modify_file'),
(r'(删除|移除).*(文件|file)', 'delete_file'),
(r'(搜索|查找|search)', 'web_search'),
(r'(执行|运行).*(Python|python|代码)', 'run_python'),
(r'(执行|运行).*(命令|command)', 'run_command'),
(r'(等待|sleep|延迟)', 'sleep'),
(r'(聚焦|focus).*(文件|file)', 'focus_file'),
(r'(终端|terminal|会话|session)', 'terminal_session'),
]
for pattern, tool_name in tool_patterns:
if re.search(pattern, content, re.IGNORECASE):
early_tool_id = f"early_{tool_name}_{time.time()}"
if early_tool_id not in detected_tools:
sender('tool_hint', {
'id': early_tool_id,
'name': tool_name,
'message': f'检测到可能需要调用 {tool_name}...',
'confidence': 'low'
})
detected_tools[early_tool_id] = tool_name
debug_log(f" ⚡ 提前检测到工具意图: {tool_name}")
break
if in_thinking and not thinking_ended:
in_thinking = False
thinking_ended = True
sender('thinking_end', {'full_content': current_thinking})
await asyncio.sleep(0.1)
# ===== 增量保存:保存思考内容 =====
if current_thinking and not has_saved_thinking and is_first_iteration:
thinking_content = f"<think>\n{current_thinking}\n</think>"
web_terminal.context_manager.add_conversation("assistant", thinking_content)
has_saved_thinking = True
debug_log(f"💾 增量保存:思考内容 ({len(current_thinking)} 字符)")
expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None))
expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None))
if pending_modify:
if not pending_modify.get("start_seen"):
probe_buffer = pending_modify.get("probe_buffer", "") + content
if len(probe_buffer) > 10000:
probe_buffer = probe_buffer[-10000:]
marker = pending_modify.get("start_marker")
marker_index = probe_buffer.find(marker)
if marker_index == -1:
pending_modify["probe_buffer"] = probe_buffer
continue
after_marker = marker_index + len(marker)
remainder = probe_buffer[after_marker:]
pending_modify["buffer"] = remainder
pending_modify["raw_buffer"] = marker + remainder
pending_modify["start_seen"] = True
pending_modify["detected_blocks"] = set()
pending_modify["probe_buffer"] = ""
if pending_modify.get("display_id"):
sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在修改 {pending_modify['path']}..."
})
else:
pending_modify["buffer"] += content
pending_modify["raw_buffer"] += content
if pending_modify.get("start_seen"):
block_text = pending_modify["buffer"]
for match in re.finditer(r"\[replace:(\d+)\]", block_text):
try:
block_index = int(match.group(1))
except ValueError:
continue
detected_blocks = pending_modify.setdefault("detected_blocks", set())
if block_index not in detected_blocks:
detected_blocks.add(block_index)
if pending_modify.get("display_id"):
sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..."
})
if pending_modify.get("start_seen"):
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
if end_pos != -1:
pending_modify["end_index"] = end_pos
modify_break_triggered = True
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
break
continue
elif expecting_modify:
modify_probe_buffer += content
if len(modify_probe_buffer) > 10000:
modify_probe_buffer = modify_probe_buffer[-10000:]
marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer)
if marker_match:
detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
marker_full = marker_match.group(0)
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full)
remainder = modify_probe_buffer[after_marker_index:]
modify_probe_buffer = ""
if not detected_path:
debug_log("检测到 MODIFY 起始标记但路径为空,忽略。")
continue
pending_modify = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"raw_buffer": marker_full + remainder,
"start_marker": marker_full,
"end_marker": "<<<END_MODIFY>>>",
"start_seen": True,
"end_index": None,
"display_id": None,
"detected_blocks": set()
}
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = {"path": detected_path}
debug_log(f"直接检测到modify起始标记构建修改缓冲: {detected_path}")
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
if end_pos != -1:
pending_modify["end_index"] = end_pos
modify_break_triggered = True
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
break
continue
if pending_append:
pending_append["buffer"] += content
if pending_append.get("content_start") is None:
marker_index = pending_append["buffer"].find(pending_append["start_marker"])
if marker_index != -1:
pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
if pending_append.get("content_start") is not None:
end_index = pending_append["buffer"].find(
pending_append["end_marker"],
pending_append["content_start"]
)
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
# 继续累积追加内容
continue
elif expecting_append:
append_probe_buffer += content
# 限制缓冲区大小防止过长
if len(append_probe_buffer) > 10000:
append_probe_buffer = append_probe_buffer[-10000:]
marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
if marker_match:
detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
if not detected_path:
append_probe_buffer = append_probe_buffer[marker_match.end():]
continue
marker_full = marker_match.group(0)
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
remainder = append_probe_buffer[after_marker_index:]
append_probe_buffer = ""
pending_append = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"start_marker": marker_full,
"end_marker": "<<<END_APPEND>>>",
"content_start": 0,
"end_index": None,
"display_id": None
}
if hasattr(web_terminal, "pending_append_request"):
web_terminal.pending_append_request = {"path": detected_path}
debug_log(f"直接检测到append起始标记构建追加缓冲: {detected_path}")
# 检查是否立即包含结束标记
if pending_append["buffer"]:
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
continue
if not text_started:
text_started = True
text_streaming = True
sender('text_start', {})
await asyncio.sleep(0.05)
if not pending_append:
full_response += content
accumulated_response += content
text_has_content = True
for ch in content:
text_chunk_buffer.append(ch)
text_chunk_buffer_size += 1
await flush_text_buffer()
# 收集工具调用 - 实时发送准备状态
if "tool_calls" in delta:
tool_chunks += 1
for tc in delta["tool_calls"]:
found = False
for existing in tool_calls:
if existing.get("index") == tc.get("index"):
if "function" in tc and "arguments" in tc["function"]:
existing["function"]["arguments"] += tc["function"]["arguments"]
found = True
break
if not found and tc.get("id"):
tool_id = tc["id"]
tool_name = tc.get("function", {}).get("name", "")
# 新工具检测到,立即发送准备事件
if tool_id not in detected_tools and tool_name:
detected_tools[tool_id] = tool_name
# 立即发送工具准备中事件
sender('tool_preparing', {
'id': tool_id,
'name': tool_name,
'message': f'准备调用 {tool_name}...'
})
debug_log(f" 发送工具准备事件: {tool_name}")
await asyncio.sleep(0.1)
tool_calls.append({
"id": tool_id,
"index": tc.get("index"),
"type": "function",
"function": {
"name": tool_name,
"arguments": tc.get("function", {}).get("arguments", "")
}
})
debug_log(f" 新工具: {tool_name}")
# 检查是否被停止
client_stop_info = stop_flags.get(client_sid)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("任务在流处理完成后检测到停止状态")
return
# 流结束后的处理
await flush_text_buffer(force=True)
debug_log(f"\n流结束统计:")
debug_log(f" 总chunks: {chunk_count}")
debug_log(f" 思考chunks: {reasoning_chunks}")
debug_log(f" 内容chunks: {content_chunks}")
debug_log(f" 工具chunks: {tool_chunks}")
debug_log(f" 收集到的思考: {len(current_thinking)} 字符")
debug_log(f" 收集到的正文: {len(full_response)} 字符")
debug_log(f" 收集到的工具: {len(tool_calls)}")
if not append_result["handled"] and pending_append:
append_result = await finalize_pending_append(full_response, True, finish_reason=last_finish_reason)
if not modify_result["handled"] and pending_modify:
modify_result = await finalize_pending_modify(full_response, True, finish_reason=last_finish_reason)
# 结束未完成的流
if in_thinking and not thinking_ended:
sender('thinking_end', {'full_content': current_thinking})
await asyncio.sleep(0.1)
# 保存思考内容
if current_thinking and not has_saved_thinking and is_first_iteration:
thinking_content = f"<think>\n{current_thinking}\n</think>"
web_terminal.context_manager.add_conversation("assistant", thinking_content)
has_saved_thinking = True
debug_log(f"💾 增量保存:延迟思考内容 ({len(current_thinking)} 字符)")
# 确保text_end事件被发送
if text_started and text_has_content and not append_result["handled"] and not modify_result["handled"]:
await flush_text_buffer(force=True)
debug_log(f"发送text_end事件完整内容长度: {len(full_response)}")
sender('text_end', {'full_content': full_response})
await asyncio.sleep(0.1)
text_streaming = False
# ===== 增量保存:保存当前轮次的文本内容 =====
if full_response.strip():
web_terminal.context_manager.add_conversation("assistant", full_response)
debug_log(f"💾 增量保存:文本内容 ({len(full_response)} 字符)")
if append_result["handled"]:
append_metadata = append_result.get("assistant_metadata")
append_content_text = append_result.get("assistant_content")
if append_content_text:
web_terminal.context_manager.add_conversation(
"assistant",
append_content_text,
metadata=append_metadata
)
debug_log("💾 增量保存:追加正文快照")
payload_info = append_metadata.get("append_payload") if append_metadata else {}
sender('append_payload', {
'path': payload_info.get("path") or append_result.get("path"),
'forced': payload_info.get("forced", False),
'lines': payload_info.get("lines"),
'bytes': payload_info.get("bytes"),
'tool_call_id': payload_info.get("tool_call_id") or append_result.get("tool_call_id"),
'success': payload_info.get("success", append_result.get("success", False))
})
if append_result["tool_content"]:
tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"])
web_terminal.context_manager.add_conversation("system", system_notice)
append_result["tool_call_id"] = tool_call_id
debug_log("💾 增量保存append_to_file 工具结果system 通知)")
finish_reason = append_result.get("finish_reason")
path_for_prompt = append_result.get("path")
need_follow_prompt = (
finish_reason == "length" or
append_result.get("forced") or
not append_result.get("success")
)
if need_follow_prompt and path_for_prompt:
prompt_lines = [
f"append_to_file 在处理 {path_for_prompt} 时未完成,需要重新发起写入。"
]
if finish_reason == "length":
prompt_lines.append(
"上一次输出达到系统单次输出上限,已写入的内容已保存。"
)
if append_result.get("forced"):
prompt_lines.append(
"收到的内容缺少 <<<END_APPEND>>> 标记,系统依据流式结束位置落盘。"
)
if not append_result.get("success"):
prompt_lines.append("系统未能识别有效的追加标记。")
prompt_lines.append(
"请再次调用 append_to_file 工具获取新的写入窗口,并在工具调用的输出中遵循以下格式:"
)
prompt_lines.append(f"<<<APPEND:{path_for_prompt}>>>")
prompt_lines.append("...填写剩余正文,如内容已完成可留空...")
prompt_lines.append("<<<END_APPEND>>>")
prompt_lines.append("不要在普通回复中粘贴上述标记,必须通过 append_to_file 工具发送。")
follow_prompt = "\n".join(prompt_lines)
messages.append({
"role": "system",
"content": follow_prompt
})
web_terminal.context_manager.add_conversation("system", follow_prompt)
debug_log("已注入追加任务提示")
if append_result["handled"] and not append_result.get("success"):
sender('system_message', {
'content': f'⚠️ 追加写入失败:{append_result.get("error")}'
})
# 重置文本流状态,避免后续错误处理
text_streaming = False
text_started = False
text_has_content = False
full_response = ""
if modify_result["handled"]:
modify_metadata = modify_result.get("assistant_metadata")
modify_content_text = modify_result.get("assistant_content")
if modify_content_text:
web_terminal.context_manager.add_conversation(
"assistant",
modify_content_text,
metadata=modify_metadata
)
debug_log("💾 增量保存:修改正文快照")
payload_info = modify_metadata.get("modify_payload") if modify_metadata else {}
sender('modify_payload', {
'path': payload_info.get("path") or modify_result.get("path"),
'total': payload_info.get("total_blocks") or modify_result.get("total_blocks"),
'completed': payload_info.get("completed") or modify_result.get("completed_blocks"),
'failed': payload_info.get("failed") or modify_result.get("failed_blocks"),
'forced': payload_info.get("forced", modify_result.get("forced", False)),
'success': modify_result.get("success", False)
})
if modify_result["tool_content"]:
tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"])
web_terminal.context_manager.add_conversation("system", system_notice)
modify_result["tool_call_id"] = tool_call_id
debug_log("💾 增量保存modify_file 工具结果system 通知)")
path_for_prompt = modify_result.get("path")
failed_blocks = modify_result.get("failed_blocks") or []
need_follow_prompt = modify_result.get("forced") or bool(failed_blocks)
if need_follow_prompt and path_for_prompt:
prompt_lines = [
f"modify_file 在处理 {path_for_prompt} 时未完成,需要重新发起补丁。"
]
if modify_result.get("forced"):
prompt_lines.append(
"刚才的内容缺少 <<<END_MODIFY>>> 标记,系统仅应用了已识别的部分。"
)
if failed_blocks:
failed_text = "".join(str(idx) for idx in failed_blocks)
prompt_lines.append(f"以下补丁未成功:第 {failed_text} 处。")
prompt_lines.append(
"请再次调用 modify_file 工具,并在新的工具调用中按以下模板提供完整补丁:"
)
prompt_lines.append(f"<<<MODIFY:{path_for_prompt}>>>")
prompt_lines.append("[replace:序号]")
prompt_lines.append("<<OLD>>")
prompt_lines.append("...原文(必须逐字匹配,包含全部缩进、空格和换行)...")
prompt_lines.append("<<END>>")
prompt_lines.append("<<NEW>>")
prompt_lines.append("...新内容,可留空表示清空,注意保持结构完整...")
prompt_lines.append("<<END>>")
prompt_lines.append("[/replace]")
prompt_lines.append("<<<END_MODIFY>>>")
prompt_lines.append("请勿在普通回复中直接粘贴补丁,必须通过 modify_file 工具发送。")
follow_prompt = "\n".join(prompt_lines)
messages.append({
"role": "system",
"content": follow_prompt
})
web_terminal.context_manager.add_conversation("system", follow_prompt)
debug_log("已注入修改任务提示")
if modify_result["handled"] and not modify_result.get("success"):
error_message = modify_result.get("summary_message") or modify_result.get("error") or "修改操作未成功,请根据提示重新执行。"
sender('system_message', {
'content': f'⚠️ 修改操作存在未完成的内容:{error_message}'
})
text_streaming = False
text_started = False
text_has_content = False
full_response = ""
# 保存思考内容(如果这是第一次迭代且有思考)
if web_terminal.thinking_mode and web_terminal.api_client.current_task_first_call and current_thinking:
web_terminal.api_client.current_task_thinking = current_thinking
web_terminal.api_client.current_task_first_call = False
# 检测是否有格式错误的工具调用
if not tool_calls and full_response and AUTO_FIX_TOOL_CALL and not append_result["handled"] and not modify_result["handled"]:
if detect_malformed_tool_call(full_response):
auto_fix_attempts += 1
if auto_fix_attempts <= AUTO_FIX_MAX_ATTEMPTS:
debug_log(f"检测到格式错误的工具调用,尝试自动修复 (尝试 {auto_fix_attempts}/{AUTO_FIX_MAX_ATTEMPTS})")
fix_message = "你使用了错误的格式输出工具调用。请使用正确的工具调用格式而不是直接输出JSON。根据当前进度继续执行任务。"
sender('system_message', {
'content': f'⚠️ 自动修复: {fix_message}'
})
messages.append({
"role": "user",
"content": fix_message
})
await asyncio.sleep(1)
continue
else:
debug_log(f"自动修复尝试已达上限 ({AUTO_FIX_MAX_ATTEMPTS})")
sender('system_message', {
'content': f'⌘ 工具调用格式错误,自动修复失败。请手动检查并重试。'
})
break
# 构建助手消息用于API继续对话
assistant_content_parts = []
if current_thinking:
assistant_content_parts.append(f"<think>\n{current_thinking}\n</think>")
if full_response:
assistant_content_parts.append(full_response)
elif append_result["handled"] and append_result["assistant_content"]:
assistant_content_parts.append(append_result["assistant_content"])
elif modify_result["handled"] and modify_result.get("assistant_content"):
assistant_content_parts.append(modify_result["assistant_content"])
assistant_content = "\n".join(assistant_content_parts) if assistant_content_parts else ""
# 添加到消息历史用于API继续对话不保存到文件
assistant_message = {
"role": "assistant",
"content": assistant_content,
"tool_calls": tool_calls
}
messages.append(assistant_message)
if append_result["handled"] and append_result.get("tool_content"):
tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"])
messages.append({
"role": "system",
"content": system_notice
})
append_result["tool_call_id"] = tool_call_id
debug_log("已将 append_to_file 工具结果以 system 形式追加到对话上下文")
if modify_result["handled"] and modify_result.get("tool_content"):
tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"])
messages.append({
"role": "system",
"content": system_notice
})
modify_result["tool_call_id"] = tool_call_id
debug_log("已将 modify_file 工具结果以 system 形式追加到对话上下文")
force_continue = append_result["handled"] or modify_result["handled"]
if force_continue:
if append_result["handled"]:
debug_log("append_to_file 已处理,继续下一轮以让模型返回确认回复")
elif modify_result["handled"]:
debug_log("modify_file 已处理,继续下一轮以让模型返回确认回复")
else:
debug_log("补丁处理完成,继续下一轮以获取模型回复")
continue
if not tool_calls:
debug_log("没有工具调用,结束迭代")
if not finish_called and not finish_prompt_sent:
reminder = build_finish_tool_reminder(getattr(web_terminal, "sub_agent_meta", {}))
web_terminal.context_manager.add_conversation("user", reminder)
messages.append({
"role": "user",
"content": reminder
})
sender('system_message', {
'content': reminder
})
finish_prompt_sent = True
await asyncio.sleep(1)
continue
break
# 检查连续相同工具调用
for tc in tool_calls:
tool_name = tc["function"]["name"]
if tool_name == last_tool_name:
consecutive_same_tool[tool_name] += 1
if consecutive_same_tool[tool_name] >= MAX_CONSECUTIVE_SAME_TOOL:
debug_log(f"警告: 连续调用相同工具 {tool_name} 已达 {MAX_CONSECUTIVE_SAME_TOOL}")
sender('system_message', {
'content': f'⚠️ 检测到重复调用 {tool_name} 工具 {MAX_CONSECUTIVE_SAME_TOOL} 次,可能存在循环。'
})
if consecutive_same_tool[tool_name] >= MAX_CONSECUTIVE_SAME_TOOL + 2:
debug_log(f"终止: 工具 {tool_name} 调用次数过多")
sender('system_message', {
'content': f'⌘ 工具 {tool_name} 重复调用过多,任务终止。'
})
break
else:
consecutive_same_tool.clear()
consecutive_same_tool[tool_name] = 1
last_tool_name = tool_name
# 更新统计
total_tool_calls += len(tool_calls)
# 执行每个工具
for tool_call in tool_calls:
# 检查停止标志
client_stop_info = stop_flags.get(client_sid)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("在工具调用过程中检测到停止状态")
return
# 工具调用间隔控制
current_time = time.time()
if last_tool_call_time > 0:
elapsed = current_time - last_tool_call_time
if elapsed < TOOL_CALL_COOLDOWN:
await asyncio.sleep(TOOL_CALL_COOLDOWN - elapsed)
last_tool_call_time = time.time()
function_name = tool_call["function"]["name"]
if function_name == "finish_sub_agent":
finish_called = True
arguments_str = tool_call["function"]["arguments"]
tool_call_id = tool_call["id"]
debug_log(f"准备解析JSON工具: {function_name}, 参数长度: {len(arguments_str)}")
debug_log(f"JSON参数前200字符: {arguments_str[:200]}")
debug_log(f"JSON参数后200字符: {arguments_str[-200:]}")
# 使用改进的参数解析方法
if hasattr(web_terminal, 'api_client') and hasattr(web_terminal.api_client, '_safe_tool_arguments_parse'):
success, arguments, error_msg = web_terminal.api_client._safe_tool_arguments_parse(arguments_str, function_name)
if not success:
debug_log(f"安全解析失败: {error_msg}")
sender('error', {'message': f'工具参数解析失败: {error_msg}'})
continue
debug_log(f"使用安全解析成功,参数键: {list(arguments.keys())}")
else:
# 回退到带有基本修复逻辑的解析
try:
arguments = json.loads(arguments_str) if arguments_str.strip() else {}
debug_log(f"直接JSON解析成功参数键: {list(arguments.keys())}")
except json.JSONDecodeError as e:
debug_log(f"原始JSON解析失败: {e}")
# 尝试基本的JSON修复
repaired_str = arguments_str.strip()
repair_attempts = []
# 修复1: 未闭合字符串
if repaired_str.count('"') % 2 == 1:
repaired_str += '"'
repair_attempts.append("添加闭合引号")
# 修复2: 未闭合JSON对象
if repaired_str.startswith('{') and not repaired_str.rstrip().endswith('}'):
repaired_str = repaired_str.rstrip() + '}'
repair_attempts.append("添加闭合括号")
# 修复3: 截断的JSON移除不完整的最后一个键值对
if not repair_attempts: # 如果前面的修复都没用上
last_comma = repaired_str.rfind(',')
if last_comma > 0:
repaired_str = repaired_str[:last_comma] + '}'
repair_attempts.append("移除不完整的键值对")
# 尝试解析修复后的JSON
try:
arguments = json.loads(repaired_str)
debug_log(f"JSON修复成功: {', '.join(repair_attempts)}")
debug_log(f"修复后参数键: {list(arguments.keys())}")
except json.JSONDecodeError as repair_error:
debug_log(f"JSON修复也失败: {repair_error}")
debug_log(f"修复尝试: {repair_attempts}")
debug_log(f"修复后内容前100字符: {repaired_str[:100]}")
sender('error', {'message': f'工具参数解析失败: {e}'})
continue
debug_log(f"执行工具: {function_name} (ID: {tool_call_id})")
# 发送工具开始事件
tool_display_id = f"tool_{iteration}_{function_name}_{time.time()}"
sender('tool_start', {
'id': tool_display_id,
'name': function_name,
'arguments': arguments,
'preparing_id': tool_call_id
})
await asyncio.sleep(0.3)
start_time = time.time()
# 执行工具
tool_result = await web_terminal.handle_tool_call(function_name, arguments)
debug_log(f"工具结果: {tool_result[:200]}...")
execution_time = time.time() - start_time
if execution_time < 1.5:
await asyncio.sleep(1.5 - execution_time)
# 更新工具状态
result_data = {}
try:
result_data = json.loads(tool_result)
except:
result_data = {'output': tool_result}
action_status = 'completed'
action_message = None
awaiting_flag = False
if function_name == "append_to_file":
if result_data.get("success") and result_data.get("awaiting_content"):
append_path = result_data.get("path") or arguments.get("path")
pending_append = {
"path": append_path,
"tool_call_id": tool_call_id,
"buffer": "",
"start_marker": f"<<<APPEND:{append_path}>>>",
"end_marker": "<<<END_APPEND>>>",
"content_start": None,
"end_index": None,
"display_id": tool_display_id
}
append_probe_buffer = ""
awaiting_flag = True
action_status = 'running'
action_message = f"正在向 {append_path} 追加内容..."
text_started = False
text_streaming = False
text_has_content = False
debug_log(f"append_to_file 等待输出: {append_path}")
else:
debug_log("append_to_file 返回完成状态")
elif function_name == "modify_file":
if result_data.get("success") and result_data.get("awaiting_content"):
modify_path = result_data.get("path") or arguments.get("path")
pending_modify = {
"path": modify_path,
"tool_call_id": tool_call_id,
"buffer": "",
"raw_buffer": "",
"start_marker": f"<<<MODIFY:{modify_path}>>>",
"end_marker": "<<<END_MODIFY>>>",
"start_seen": False,
"end_index": None,
"display_id": tool_display_id,
"detected_blocks": set(),
"probe_buffer": ""
}
modify_probe_buffer = ""
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = {"path": modify_path}
awaiting_flag = True
action_status = 'running'
action_message = f"正在修改 {modify_path}..."
text_started = False
text_streaming = False
text_has_content = False
debug_log(f"modify_file 等待输出: {modify_path}")
else:
debug_log("modify_file 返回完成状态")
if function_name == "wait_sub_agent":
system_msg = result_data.get("system_message")
if system_msg:
messages.append({
"role": "system",
"content": system_msg
})
sender('system_message', {
'content': system_msg,
'inline': False
})
update_payload = {
'id': tool_display_id,
'status': action_status,
'result': result_data,
'preparing_id': tool_call_id
}
if action_message:
update_payload['message'] = action_message
if awaiting_flag:
update_payload['awaiting_content'] = True
sender('update_action', update_payload)
# 更新UI状态
if function_name in ['focus_file', 'unfocus_file', 'modify_file']:
sender('focused_files_update', web_terminal.get_focused_files_info())
if function_name in ['create_file', 'delete_file', 'rename_file', 'create_folder']:
structure = web_terminal.context_manager.get_project_structure()
sender('file_tree_update', structure)
# ===== 增量保存:立即保存工具结果 =====
try:
result_data = json.loads(tool_result)
if function_name == "read_file":
tool_result_content = format_read_file_result(result_data)
else:
tool_result_content = tool_result
except:
tool_result_content = tool_result
# 立即保存工具结果
web_terminal.context_manager.add_conversation(
"tool",
tool_result_content,
tool_call_id=tool_call_id,
name=function_name
)
debug_log(f"💾 增量保存:工具结果 {function_name}")
system_message = result_data.get("system_message") if isinstance(result_data, dict) else None
if system_message:
web_terminal._record_sub_agent_message(system_message, result_data.get("task_id"), inline=False)
# 添加到消息历史用于API继续对话
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": function_name,
"content": tool_result_content
})
if function_name not in ['append_to_file', 'modify_file']:
await process_sub_agent_updates(messages, inline=True, after_tool_call_id=tool_call_id)
await asyncio.sleep(0.2)
# 标记不再是第一次迭代
is_first_iteration = False
# 最终统计
debug_log(f"\n{'='*40}")
debug_log(f"任务完成统计:")
debug_log(f" 总迭代次数: {total_iterations}")
debug_log(f" 总工具调用: {total_tool_calls}")
debug_log(f" 自动修复尝试: {auto_fix_attempts}")
debug_log(f" 累积响应: {len(accumulated_response)} 字符")
debug_log(f"{'='*40}\n")
# 发送完成事件
sender('task_complete', {
'total_iterations': total_iterations,
'total_tool_calls': total_tool_calls,
'auto_fix_attempts': auto_fix_attempts
})
@socketio.on('send_command')
def handle_command(data):
"""处理系统命令"""
command = data.get('command', '')
username, terminal, _ = get_terminal_for_sid(request.sid)
if not terminal:
emit('error', {'message': 'System not initialized'})
return
if command.startswith('/'):
command = command[1:]
parts = command.split(maxsplit=1)
cmd = parts[0].lower()
if cmd == "clear":
terminal.context_manager.conversation_history.clear()
if terminal.thinking_mode:
terminal.api_client.start_new_task()
emit('command_result', {
'command': cmd,
'success': True,
'message': '对话已清除'
})
elif cmd == "status":
status = terminal.get_status()
# 添加终端状态
if terminal.terminal_manager:
terminal_status = terminal.terminal_manager.list_terminals()
status['terminals'] = terminal_status
emit('command_result', {
'command': cmd,
'success': True,
'data': status
})
elif cmd == "terminals":
# 列出终端会话
if terminal.terminal_manager:
result = terminal.terminal_manager.list_terminals()
emit('command_result', {
'command': cmd,
'success': True,
'data': result
})
else:
emit('command_result', {
'command': cmd,
'success': False,
'message': '终端系统未初始化'
})
else:
emit('command_result', {
'command': cmd,
'success': False,
'message': f'未知命令: {cmd}'
})
def initialize_system(path: str, thinking_mode: bool = False):
"""初始化系统(多用户版本仅负责写日志和配置)"""
# 清空或创建调试日志
DEBUG_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with DEBUG_LOG_FILE.open('w', encoding='utf-8') as f:
f.write(f"调试日志开始 - {datetime.now()}\n")
f.write(f"项目路径: {path}\n")
f.write(f"思考模式: {'思考模式' if thinking_mode else '快速模式'}\n")
f.write(f"自动修复: {'开启' if AUTO_FIX_TOOL_CALL else '关闭'}\n")
f.write(f"最大迭代: {MAX_ITERATIONS_PER_TASK}\n")
f.write(f"最大工具调用: {MAX_TOTAL_TOOL_CALLS}\n")
f.write("="*80 + "\n")
print(f"[Init] 初始化Web系统...")
print(f"[Init] 项目路径: {path}")
print(f"[Init] 运行模式: {'思考模式(首次思考,后续快速)' if thinking_mode else '快速模式(无思考)'}")
print(f"[Init] 自动修复: {'开启' if AUTO_FIX_TOOL_CALL else '关闭'}")
print(f"[Init] 调试日志: {DEBUG_LOG_FILE}")
app.config['DEFAULT_THINKING_MODE'] = thinking_mode
print(f"{OUTPUT_FORMATS['success']} Web系统初始化完成多用户模式")
def update_sub_agent_task(task_id: str, **changes):
task = sub_agent_tasks.get(task_id)
if not task:
return
task.update(changes)
task["updated_at"] = time.time()
def mark_task_completed(task_id: str, reason: Optional[str] = None):
update_sub_agent_task(task_id, status="completed", completion_reason=reason or "")
broadcast_sub_agent_event(task_id, "sub_agent_status", {
"status": "completed",
"reason": reason or ""
})
cleanup_inactive_sub_agent_tasks()
def mark_task_failed(task_id: str, message: str):
update_sub_agent_task(task_id, status="failed", error=message)
broadcast_sub_agent_event(task_id, "sub_agent_status", {
"status": "failed",
"message": message
})
cleanup_inactive_sub_agent_tasks()
def mark_task_timeout(task_id: str):
task = sub_agent_tasks.get(task_id)
if not task or task.get("status") in {"completed", "failed", "timeout"}:
return
update_sub_agent_task(task_id, status="timeout")
broadcast_sub_agent_event(task_id, "sub_agent_status", {
"status": "timeout",
"message": "任务已超时"
})
cleanup_inactive_sub_agent_tasks()
def start_sub_agent_monitor():
def _loop():
while True:
now = time.time()
for task_id, task in list(sub_agent_tasks.items()):
status = task.get("status")
if status in {"completed", "failed", "timeout"}:
continue
timeout = task.get("timeout_seconds") or SUB_AGENT_DEFAULT_TIMEOUT
if now - task.get("created_at", now) > timeout:
mark_task_timeout(task_id)
time.sleep(1.5)
threading.Thread(target=_loop, daemon=True).start()
def run_server(path: str, thinking_mode: bool = False, port: int = DEFAULT_PORT, debug: bool = False):
"""运行Web服务器"""
initialize_system(path, thinking_mode)
start_sub_agent_monitor()
socketio.run(
app,
host='0.0.0.0',
port=port,
debug=debug,
use_reloader=debug,
allow_unsafe_werkzeug=True
)
def parse_arguments():
parser = argparse.ArgumentParser(description="AI Agent Web Server")
parser.add_argument(
"--path",
default=str(Path(DEFAULT_PROJECT_PATH).resolve()),
help="项目工作目录(默认使用 config.DEFAULT_PROJECT_PATH"
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help=f"监听端口(默认 {DEFAULT_PORT}"
)
parser.add_argument(
"--debug",
action="store_true",
help="开发模式,启用 Flask/Socket.IO 热重载"
)
parser.add_argument(
"--thinking-mode",
action="store_true",
help="启用思考模式(首次请求使用 reasoning"
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
run_server(
path=args.path,
thinking_mode=args.thinking_mode,
port=args.port,
debug=args.debug
)