diff --git a/web_server.py b/web_server.py index c0f5ff6..0d9e8c6 100644 --- a/web_server.py +++ b/web_server.py @@ -5,6 +5,7 @@ import json import os import sys import re +import threading from typing import Dict, List, Optional, Callable, Any, Tuple from flask import Flask, request, jsonify, send_from_directory, session, redirect, send_file, abort from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect @@ -174,6 +175,11 @@ FAILED_LOGIN_LOCK_SECONDS = 300 SOCKET_TOKEN_TTL_SECONDS = 45 PROJECT_STORAGE_CACHE: Dict[str, Dict[str, Any]] = {} PROJECT_STORAGE_CACHE_TTL_SECONDS = float(os.environ.get("PROJECT_STORAGE_CACHE_TTL", "30")) +USER_IDLE_TIMEOUT_SECONDS = int(os.environ.get("USER_IDLE_TIMEOUT_SECONDS", "900")) +LAST_ACTIVE_FILE = Path(LOGS_DIR).expanduser().resolve() / "last_active.json" +_last_active_lock = threading.Lock() +_last_active_cache: Dict[str, float] = {} +_idle_reaper_started = False def sanitize_filename_preserve_unicode(filename: str) -> str: @@ -199,6 +205,85 @@ def sanitize_filename_preserve_unicode(filename: str) -> str: return cleaned[:255] +def _load_last_active_cache(): + """从持久化文件加载最近活跃时间,失败时保持空缓存。""" + try: + LAST_ACTIVE_FILE.parent.mkdir(parents=True, exist_ok=True) + if not LAST_ACTIVE_FILE.exists(): + return + data = json.loads(LAST_ACTIVE_FILE.read_text(encoding="utf-8")) + if isinstance(data, dict): + for user, ts in data.items(): + try: + _last_active_cache[user] = float(ts) + except (TypeError, ValueError): + continue + except Exception: + # 读取失败时忽略,避免影响启动 + pass + + +def _persist_last_active_cache(): + """原子写入最近活跃时间缓存。""" + try: + tmp = LAST_ACTIVE_FILE.with_suffix(".tmp") + tmp.write_text(json.dumps(_last_active_cache, ensure_ascii=False, indent=2), encoding="utf-8") + tmp.replace(LAST_ACTIVE_FILE) + except Exception: + # 写入失败不影响主流程,记录即可 + debug_log("[IdleReaper] 写入 last_active 文件失败") + + +def record_user_activity(username: Optional[str], ts: Optional[float] = None): + """记录用户最近活跃时间,刷新容器 handle 并持久化。""" + if not username: + return + now = ts or time.time() + with _last_active_lock: + _last_active_cache[username] = now + _persist_last_active_cache() + handle = container_manager.get_handle(username) + if handle: + handle.touch() + + +def get_last_active_ts(username: str, fallback: Optional[float] = None) -> Optional[float]: + """获取用户最近活跃时间,优先缓存,其次回退值。""" + with _last_active_lock: + cached = _last_active_cache.get(username) + if cached is not None: + return float(cached) + return float(fallback) if fallback is not None else None + + +def idle_reaper_loop(): + """后台轮询:长时间无消息则回收用户容器。""" + while True: + try: + now = time.time() + handle_map = container_manager.list_containers() + for username, handle in list(handle_map.items()): + last_ts = get_last_active_ts(username, handle.get("last_active")) + if not last_ts: + continue + if now - last_ts >= USER_IDLE_TIMEOUT_SECONDS: + debug_log(f"[IdleReaper] 回收容器: {username} (idle {int(now - last_ts)}s)") + container_manager.release_container(username, reason="idle_timeout") + time.sleep(60) + except Exception as exc: + debug_log(f"[IdleReaper] 后台循环异常: {exc}") + time.sleep(60) + + +def start_background_jobs(): + """启动一次性的后台任务(容器空闲回收)。""" + global _idle_reaper_started + if _idle_reaper_started: + return + _idle_reaper_started = True + _load_last_active_cache() + socketio.start_background_task(idle_reaper_loop) + def cache_monitor_snapshot(execution_id: Optional[str], stage: str, snapshot: Optional[Dict[str, Any]]): """缓存工具执行前/后的文件快照。""" if not execution_id or not snapshot or not snapshot.get('content'): @@ -970,6 +1055,7 @@ def login(): except RuntimeError as exc: session.clear() return jsonify({"success": False, "error": str(exc), "code": "resource_busy"}), 503 + record_user_activity(record.username) get_csrf_token(force_new=True) return jsonify({"success": True}) @@ -2156,6 +2242,7 @@ def handle_message(data): print(f"[WebSocket] 收到消息: {message}") debug_log(f"\n{'='*80}\n新任务开始: {message}\n{'='*80}") + record_user_activity(username) requested_conversation_id = data.get('conversation_id') try: @@ -4504,6 +4591,7 @@ def handle_command(data): if not terminal: emit('error', {'message': 'System not initialized'}) return + record_user_activity(username) if command.startswith('/'): command = command[1:] @@ -4773,7 +4861,8 @@ def collect_user_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[str, A normalized_role = (record.role or "user").lower() role_counter[normalized_role] += 1 handle = handle_map.get(username) - last_active = handle.get("last_active") if handle else None + handle_last = handle.get("last_active") if handle else None + last_active = get_last_active_ts(username, handle_last) idle_seconds = max(0.0, now - last_active) if last_active else None items.append({ "username": username, @@ -4837,7 +4926,7 @@ def collect_container_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[s state = status.get("state") or {} if status.get("mode") == "docker": docker_count += 1 - last_active = handle.get("last_active") + last_active = get_last_active_ts(username, handle.get("last_active")) idle_seconds = max(0.0, now - last_active) if last_active else None entry = { "username": username, @@ -5060,6 +5149,7 @@ def initialize_system(path: str, thinking_mode: bool = False): def run_server(path: str, thinking_mode: bool = False, port: int = DEFAULT_PORT, debug: bool = False): """运行Web服务器""" initialize_system(path, thinking_mode) + start_background_jobs() socketio.run( app, host='0.0.0.0',