agent-Specialization/server/chat_flow.py

238 lines
8.0 KiB
Python

from __future__ import annotations
import sys
import os
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import asyncio
import json
import time
import re
import zipfile
from collections import defaultdict, Counter, deque
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple
from flask import Blueprint, request, jsonify, session
from werkzeug.utils import secure_filename
from config import (
OUTPUT_FORMATS,
AUTO_FIX_TOOL_CALL,
AUTO_FIX_MAX_ATTEMPTS,
MAX_ITERATIONS_PER_TASK,
MAX_CONSECUTIVE_SAME_TOOL,
MAX_TOTAL_TOOL_CALLS,
TOOL_CALL_COOLDOWN,
MAX_UPLOAD_SIZE,
DEFAULT_CONVERSATIONS_LIMIT,
MAX_CONVERSATIONS_LIMIT,
CONVERSATIONS_DIR,
DEFAULT_RESPONSE_MAX_TOKENS,
DEFAULT_PROJECT_PATH,
LOGS_DIR,
AGENT_VERSION,
THINKING_FAST_INTERVAL,
PROJECT_MAX_STORAGE_MB,
PROJECT_MAX_STORAGE_BYTES,
UPLOAD_SCAN_LOG_SUBDIR,
)
from modules.personalization_manager import (
load_personalization_config,
save_personalization_config,
THINKING_INTERVAL_MIN,
THINKING_INTERVAL_MAX,
)
from modules.upload_security import UploadSecurityError
from modules.user_manager import UserWorkspace
from modules.usage_tracker import QUOTA_DEFAULTS
from core.web_terminal import WebTerminal
from utils.tool_result_formatter import format_tool_result_for_context
from utils.conversation_manager import ConversationManager
from config.model_profiles import get_model_context_window, get_model_profile
from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record, get_current_username
from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response, ensure_conversation_loaded, reset_system_state, get_user_resources, get_or_create_usage_tracker
from .utils_common import (
build_review_lines,
debug_log,
log_backend_chunk,
log_frontend_chunk,
log_streaming_debug_entry,
brief_log,
DEBUG_LOG_FILE,
CHUNK_BACKEND_LOG_FILE,
CHUNK_FRONTEND_LOG_FILE,
STREAMING_DEBUG_LOG_FILE,
)
from .security import rate_limited, format_tool_result_notice, compact_web_search_result, consume_socket_token, prune_socket_tokens, validate_csrf_request, requires_csrf_protection, get_csrf_token
from .monitor import cache_monitor_snapshot, get_cached_monitor_snapshot
from .extensions import socketio
from .state import (
MONITOR_FILE_TOOLS,
MONITOR_MEMORY_TOOLS,
MONITOR_SNAPSHOT_CHAR_LIMIT,
MONITOR_MEMORY_ENTRY_LIMIT,
RATE_LIMIT_BUCKETS,
FAILURE_TRACKERS,
pending_socket_tokens,
usage_trackers,
MONITOR_SNAPSHOT_CACHE,
MONITOR_SNAPSHOT_CACHE_LIMIT,
PROJECT_STORAGE_CACHE,
PROJECT_STORAGE_CACHE_TTL_SECONDS,
RECENT_UPLOAD_EVENT_LIMIT,
RECENT_UPLOAD_FEED_LIMIT,
THINKING_FAILURE_KEYWORDS,
TITLE_PROMPT_PATH,
get_last_active_ts,
user_manager,
container_manager,
custom_tool_registry,
user_terminals,
terminal_rooms,
connection_users,
stop_flags,
get_stop_flag,
set_stop_flag,
clear_stop_flag,
)
from .chat_flow_helpers import (
detect_malformed_tool_call as _detect_malformed_tool_call,
detect_tool_failure,
get_thinking_state,
mark_force_thinking as _mark_force_thinking,
mark_suppress_thinking,
apply_thinking_schedule as _apply_thinking_schedule,
update_thinking_after_call as _update_thinking_after_call,
maybe_mark_failure_from_message as _maybe_mark_failure_from_message,
generate_conversation_title_background as _generate_conversation_title_background,
)
from .chat_flow_runner import handle_task_with_sender
conversation_bp = Blueprint('conversation', __name__)
def generate_conversation_title_background(web_terminal: WebTerminal, conversation_id: str, user_message: str, username: str):
"""在后台生成对话标题并更新索引、推送给前端。"""
return _generate_conversation_title_background(
web_terminal=web_terminal,
conversation_id=conversation_id,
user_message=user_message,
username=username,
socketio_instance=socketio,
title_prompt_path=TITLE_PROMPT_PATH,
debug_logger=debug_log,
)
def mark_force_thinking(terminal: WebTerminal, reason: str = ""):
return _mark_force_thinking(terminal, reason=reason, debug_logger=debug_log)
def apply_thinking_schedule(terminal: WebTerminal):
return _apply_thinking_schedule(terminal, default_interval=THINKING_FAST_INTERVAL, debug_logger=debug_log)
def update_thinking_after_call(terminal: WebTerminal):
return _update_thinking_after_call(terminal, debug_logger=debug_log)
def maybe_mark_failure_from_message(terminal: WebTerminal, content: Optional[str]):
return _maybe_mark_failure_from_message(
terminal,
content,
failure_keywords=THINKING_FAILURE_KEYWORDS,
debug_logger=debug_log,
)
def detect_malformed_tool_call(text):
return _detect_malformed_tool_call(text)
def process_message_task(terminal: WebTerminal, message: str, images, sender, client_sid, workspace: UserWorkspace, username: str, videos=None):
"""在后台处理消息任务"""
videos = videos or []
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 创建可取消的任务
task = loop.create_task(handle_task_with_sender(terminal, workspace, message, images, sender, client_sid, username, videos))
entry = get_stop_flag(client_sid, username)
if not isinstance(entry, dict):
entry = {'stop': False, 'task': None, 'terminal': None}
entry['stop'] = False
entry['task'] = task
entry['terminal'] = terminal
set_stop_flag(client_sid, username, entry)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
debug_log(f"任务 {client_sid} 被成功取消")
sender('task_stopped', {
'message': '任务已停止',
'reason': 'user_requested'
})
reset_system_state(terminal)
loop.close()
except Exception as e:
# 【新增】错误时确保对话状态不丢失
try:
if terminal and terminal.context_manager:
# 尝试保存当前对话状态
terminal.context_manager.auto_save_conversation()
debug_log("错误恢复:对话状态已保存")
except Exception as save_error:
debug_log(f"错误恢复:保存对话状态失败: {save_error}")
# 原有的错误处理逻辑
print(f"[Task] 错误: {e}")
debug_log(f"任务处理错误: {e}")
import traceback
traceback.print_exc()
sender('error', {
'message': str(e),
'conversation_id': getattr(getattr(terminal, "context_manager", None), "current_conversation_id", None),
'task_id': getattr(terminal, "task_id", None) or client_sid,
'client_sid': client_sid
})
sender('task_complete', {
'total_iterations': 0,
'total_tool_calls': 0,
'auto_fix_attempts': 0,
'error': str(e)
})
finally:
# 清理任务引用
clear_stop_flag(client_sid, username)
# === 统一对外入口 ===
def start_chat_task(terminal, message: str, images: Any, sender, client_sid: str, workspace, username: str, videos: Any = None):
"""在线程模式下启动对话任务,供 Socket 事件调用。"""
return socketio.start_background_task(
process_message_task,
terminal,
message,
images,
sender,
client_sid,
workspace,
username,
videos
)
def run_chat_task_sync(terminal, message: str, images: Any, sender, client_sid: str, workspace, username: str, videos: Any = None):
"""同步执行(测试/CLI 使用)。"""
return process_message_task(terminal, message, images, sender, client_sid, workspace, username, videos)