feat: reclaim idle user containers

This commit is contained in:
JOJO 2025-12-16 23:15:21 +08:00
parent 385c8154ea
commit dcafcdd206

View File

@ -5,6 +5,7 @@ import json
import os import os
import sys import sys
import re import re
import threading
from typing import Dict, List, Optional, Callable, Any, Tuple from typing import Dict, List, Optional, Callable, Any, Tuple
from flask import Flask, request, jsonify, send_from_directory, session, redirect, send_file, abort from flask import Flask, request, jsonify, send_from_directory, session, redirect, send_file, abort
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect
@ -174,6 +175,11 @@ FAILED_LOGIN_LOCK_SECONDS = 300
SOCKET_TOKEN_TTL_SECONDS = 45 SOCKET_TOKEN_TTL_SECONDS = 45
PROJECT_STORAGE_CACHE: Dict[str, Dict[str, Any]] = {} PROJECT_STORAGE_CACHE: Dict[str, Dict[str, Any]] = {}
PROJECT_STORAGE_CACHE_TTL_SECONDS = float(os.environ.get("PROJECT_STORAGE_CACHE_TTL", "30")) PROJECT_STORAGE_CACHE_TTL_SECONDS = float(os.environ.get("PROJECT_STORAGE_CACHE_TTL", "30"))
USER_IDLE_TIMEOUT_SECONDS = int(os.environ.get("USER_IDLE_TIMEOUT_SECONDS", "900"))
LAST_ACTIVE_FILE = Path(LOGS_DIR).expanduser().resolve() / "last_active.json"
_last_active_lock = threading.Lock()
_last_active_cache: Dict[str, float] = {}
_idle_reaper_started = False
def sanitize_filename_preserve_unicode(filename: str) -> str: def sanitize_filename_preserve_unicode(filename: str) -> str:
@ -199,6 +205,85 @@ def sanitize_filename_preserve_unicode(filename: str) -> str:
return cleaned[:255] return cleaned[:255]
def _load_last_active_cache():
"""从持久化文件加载最近活跃时间,失败时保持空缓存。"""
try:
LAST_ACTIVE_FILE.parent.mkdir(parents=True, exist_ok=True)
if not LAST_ACTIVE_FILE.exists():
return
data = json.loads(LAST_ACTIVE_FILE.read_text(encoding="utf-8"))
if isinstance(data, dict):
for user, ts in data.items():
try:
_last_active_cache[user] = float(ts)
except (TypeError, ValueError):
continue
except Exception:
# 读取失败时忽略,避免影响启动
pass
def _persist_last_active_cache():
"""原子写入最近活跃时间缓存。"""
try:
tmp = LAST_ACTIVE_FILE.with_suffix(".tmp")
tmp.write_text(json.dumps(_last_active_cache, ensure_ascii=False, indent=2), encoding="utf-8")
tmp.replace(LAST_ACTIVE_FILE)
except Exception:
# 写入失败不影响主流程,记录即可
debug_log("[IdleReaper] 写入 last_active 文件失败")
def record_user_activity(username: Optional[str], ts: Optional[float] = None):
"""记录用户最近活跃时间,刷新容器 handle 并持久化。"""
if not username:
return
now = ts or time.time()
with _last_active_lock:
_last_active_cache[username] = now
_persist_last_active_cache()
handle = container_manager.get_handle(username)
if handle:
handle.touch()
def get_last_active_ts(username: str, fallback: Optional[float] = None) -> Optional[float]:
"""获取用户最近活跃时间,优先缓存,其次回退值。"""
with _last_active_lock:
cached = _last_active_cache.get(username)
if cached is not None:
return float(cached)
return float(fallback) if fallback is not None else None
def idle_reaper_loop():
"""后台轮询:长时间无消息则回收用户容器。"""
while True:
try:
now = time.time()
handle_map = container_manager.list_containers()
for username, handle in list(handle_map.items()):
last_ts = get_last_active_ts(username, handle.get("last_active"))
if not last_ts:
continue
if now - last_ts >= USER_IDLE_TIMEOUT_SECONDS:
debug_log(f"[IdleReaper] 回收容器: {username} (idle {int(now - last_ts)}s)")
container_manager.release_container(username, reason="idle_timeout")
time.sleep(60)
except Exception as exc:
debug_log(f"[IdleReaper] 后台循环异常: {exc}")
time.sleep(60)
def start_background_jobs():
"""启动一次性的后台任务(容器空闲回收)。"""
global _idle_reaper_started
if _idle_reaper_started:
return
_idle_reaper_started = True
_load_last_active_cache()
socketio.start_background_task(idle_reaper_loop)
def cache_monitor_snapshot(execution_id: Optional[str], stage: str, snapshot: Optional[Dict[str, Any]]): def cache_monitor_snapshot(execution_id: Optional[str], stage: str, snapshot: Optional[Dict[str, Any]]):
"""缓存工具执行前/后的文件快照。""" """缓存工具执行前/后的文件快照。"""
if not execution_id or not snapshot or not snapshot.get('content'): if not execution_id or not snapshot or not snapshot.get('content'):
@ -970,6 +1055,7 @@ def login():
except RuntimeError as exc: except RuntimeError as exc:
session.clear() session.clear()
return jsonify({"success": False, "error": str(exc), "code": "resource_busy"}), 503 return jsonify({"success": False, "error": str(exc), "code": "resource_busy"}), 503
record_user_activity(record.username)
get_csrf_token(force_new=True) get_csrf_token(force_new=True)
return jsonify({"success": True}) return jsonify({"success": True})
@ -2156,6 +2242,7 @@ def handle_message(data):
print(f"[WebSocket] 收到消息: {message}") print(f"[WebSocket] 收到消息: {message}")
debug_log(f"\n{'='*80}\n新任务开始: {message}\n{'='*80}") debug_log(f"\n{'='*80}\n新任务开始: {message}\n{'='*80}")
record_user_activity(username)
requested_conversation_id = data.get('conversation_id') requested_conversation_id = data.get('conversation_id')
try: try:
@ -4504,6 +4591,7 @@ def handle_command(data):
if not terminal: if not terminal:
emit('error', {'message': 'System not initialized'}) emit('error', {'message': 'System not initialized'})
return return
record_user_activity(username)
if command.startswith('/'): if command.startswith('/'):
command = command[1:] command = command[1:]
@ -4773,7 +4861,8 @@ def collect_user_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[str, A
normalized_role = (record.role or "user").lower() normalized_role = (record.role or "user").lower()
role_counter[normalized_role] += 1 role_counter[normalized_role] += 1
handle = handle_map.get(username) handle = handle_map.get(username)
last_active = handle.get("last_active") if handle else None handle_last = handle.get("last_active") if handle else None
last_active = get_last_active_ts(username, handle_last)
idle_seconds = max(0.0, now - last_active) if last_active else None idle_seconds = max(0.0, now - last_active) if last_active else None
items.append({ items.append({
"username": username, "username": username,
@ -4837,7 +4926,7 @@ def collect_container_snapshots(handle_map: Dict[str, Dict[str, Any]]) -> Dict[s
state = status.get("state") or {} state = status.get("state") or {}
if status.get("mode") == "docker": if status.get("mode") == "docker":
docker_count += 1 docker_count += 1
last_active = handle.get("last_active") last_active = get_last_active_ts(username, handle.get("last_active"))
idle_seconds = max(0.0, now - last_active) if last_active else None idle_seconds = max(0.0, now - last_active) if last_active else None
entry = { entry = {
"username": username, "username": username,
@ -5060,6 +5149,7 @@ def initialize_system(path: str, thinking_mode: bool = False):
def run_server(path: str, thinking_mode: bool = False, port: int = DEFAULT_PORT, debug: bool = False): def run_server(path: str, thinking_mode: bool = False, port: int = DEFAULT_PORT, debug: bool = False):
"""运行Web服务器""" """运行Web服务器"""
initialize_system(path, thinking_mode) initialize_system(path, thinking_mode)
start_background_jobs()
socketio.run( socketio.run(
app, app,
host='0.0.0.0', host='0.0.0.0',