agent-Specialization/modules/user_container_manager.py

325 lines
12 KiB
Python

"""Per-user Docker container manager for main agent."""
from __future__ import annotations
import json
import re
import shutil
import subprocess
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Optional
from config import (
MAX_ACTIVE_USER_CONTAINERS,
OUTPUT_FORMATS,
TERMINAL_SANDBOX_BIN,
TERMINAL_SANDBOX_BINDS,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_ENV,
TERMINAL_SANDBOX_IMAGE,
TERMINAL_SANDBOX_MEMORY,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_NAME_PREFIX,
TERMINAL_SANDBOX_NETWORK,
TERMINAL_SANDBOX_REQUIRE,
LOGS_DIR,
)
from modules.container_monitor import collect_stats, inspect_state
@dataclass
class ContainerHandle:
"""Lightweight record describing a user workspace container."""
username: str
mode: str
workspace_path: str
mount_path: str
container_name: Optional[str] = None
container_id: Optional[str] = None
sandbox_bin: Optional[str] = None
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
def touch(self):
self.last_active = time.time()
def to_dict(self) -> Dict:
return {
"username": self.username,
"mode": self.mode,
"workspace_path": self.workspace_path,
"mount_path": self.mount_path,
"container_name": self.container_name,
"container_id": self.container_id,
"created_at": self.created_at,
"last_active": self.last_active,
}
class UserContainerManager:
"""Create and track long-lived containers for each logged-in user."""
def __init__(
self,
sandbox_mode: Optional[str] = None,
max_containers: int = MAX_ACTIVE_USER_CONTAINERS,
):
self.sandbox_mode = (sandbox_mode or TERMINAL_SANDBOX_MODE or "host").lower()
self.max_containers = max_containers
self.image = TERMINAL_SANDBOX_IMAGE
self.mount_path = TERMINAL_SANDBOX_MOUNT_PATH or "/workspace"
self.network = TERMINAL_SANDBOX_NETWORK
self.cpus = TERMINAL_SANDBOX_CPUS
self.memory = TERMINAL_SANDBOX_MEMORY
self.binds = list(TERMINAL_SANDBOX_BINDS)
self.sandbox_bin = TERMINAL_SANDBOX_BIN or "docker"
self.name_prefix = TERMINAL_SANDBOX_NAME_PREFIX or "agent-user"
self.require = bool(TERMINAL_SANDBOX_REQUIRE)
self.extra_env = dict(TERMINAL_SANDBOX_ENV)
self._containers: Dict[str, ContainerHandle] = {}
self._lock = threading.Lock()
self._stats_log_path = Path(LOGS_DIR).expanduser().resolve() / "container_stats.log"
self._stats_log_path.parent.mkdir(parents=True, exist_ok=True)
if not self._stats_log_path.exists():
self._stats_log_path.touch()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ensure_container(self, username: str, workspace_path: str) -> ContainerHandle:
username = self._normalize_username(username)
workspace = str(Path(workspace_path).expanduser().resolve())
Path(workspace).mkdir(parents=True, exist_ok=True)
with self._lock:
handle = self._containers.get(username)
if handle:
if handle.mode == "docker" and not self._is_container_running(handle):
self._containers.pop(username, None)
self._kill_container(handle.container_name, handle.sandbox_bin)
handle = None
else:
handle.workspace_path = workspace
handle.touch()
return handle
if not self._has_capacity(username):
raise RuntimeError("资源繁忙:容器配额已用尽,请稍候再试。")
handle = self._create_handle(username, workspace)
self._containers[username] = handle
return handle
def release_container(self, username: str, reason: str = "logout"):
username = self._normalize_username(username)
with self._lock:
handle = self._containers.pop(username, None)
if not handle:
return
if handle.mode == "docker" and handle.container_name:
self._kill_container(handle.container_name, handle.sandbox_bin)
print(f"{OUTPUT_FORMATS['info']} 容器已释放: {handle.container_name} ({reason})")
def has_capacity(self, username: Optional[str] = None) -> bool:
username = self._normalize_username(username) if username else None
with self._lock:
if username and username in self._containers:
return True
if self.max_containers <= 0:
return True
return len(self._containers) < self.max_containers
def get_handle(self, username: str) -> Optional[ContainerHandle]:
username = self._normalize_username(username)
with self._lock:
handle = self._containers.get(username)
if handle:
handle.touch()
return handle
def list_containers(self) -> Dict[str, Dict]:
with self._lock:
return {user: handle.to_dict() for user, handle in self._containers.items()}
def get_container_status(self, username: str, include_stats: bool = True) -> Dict:
username = self._normalize_username(username)
with self._lock:
handle = self._containers.get(username)
if not handle:
return {"username": username, "mode": "host"}
info = {
"username": username,
"mode": handle.mode,
"workspace_path": handle.workspace_path,
"mount_path": handle.mount_path,
"container_name": handle.container_name,
"created_at": handle.created_at,
"last_active": handle.last_active,
}
if handle.mode == "docker" and include_stats:
stats = collect_stats(handle.container_name, handle.sandbox_bin)
state = inspect_state(handle.container_name, handle.sandbox_bin)
if stats:
info["stats"] = stats
self._log_stats(username, stats)
if state:
info["state"] = state
return info
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _has_capacity(self, username: str) -> bool:
if self.max_containers <= 0:
return True
existing = 1 if username in self._containers else 0
return (len(self._containers) - existing) < self.max_containers
def _create_handle(self, username: str, workspace: str) -> ContainerHandle:
if self.sandbox_mode != "docker":
return self._host_handle(username, workspace)
docker_path = shutil.which(self.sandbox_bin or "docker")
if not docker_path:
message = f"未找到容器运行时 {self.sandbox_bin}"
if self.require:
raise RuntimeError(message)
print(f"{OUTPUT_FORMATS['warning']} {message},回退到宿主机执行。")
return self._host_handle(username, workspace)
if not self.image:
raise RuntimeError("TERMINAL_SANDBOX_IMAGE 未配置,无法启动容器。")
container_name = self._build_container_name(username)
self._kill_container(container_name, docker_path)
cmd = [
docker_path,
"run",
"-d",
"--name",
container_name,
"-w",
self.mount_path,
"-v",
f"{workspace}:{self.mount_path}",
]
if self.network:
cmd += ["--network", self.network]
if self.cpus:
cmd += ["--cpus", str(self.cpus)]
if self.memory:
cmd += ["--memory", str(self.memory)]
for bind in self.binds:
chunk = bind.strip()
if chunk:
cmd += ["-v", chunk]
envs = {
"PYTHONIOENCODING": "utf-8",
"TERM": "xterm-256color",
}
envs.update({k: v for k, v in self.extra_env.items() if v is not None})
for key, value in envs.items():
cmd += ["-e", f"{key}={value}"]
cmd.append(self.image)
cmd += ["tail", "-f", "/dev/null"]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
if result.returncode != 0:
message = result.stderr.strip() or result.stdout.strip() or "容器启动失败"
if self.require:
raise RuntimeError(message)
print(f"{OUTPUT_FORMATS['warning']} {message},回退到宿主机。")
return self._host_handle(username, workspace)
container_id = result.stdout.strip() or None
print(f"{OUTPUT_FORMATS['success']} 启动用户容器: {container_name} ({username})")
return ContainerHandle(
username=username,
mode="docker",
workspace_path=workspace,
mount_path=self.mount_path,
container_name=container_name,
container_id=container_id,
sandbox_bin=docker_path,
)
def _host_handle(self, username: str, workspace: str) -> ContainerHandle:
return ContainerHandle(
username=username,
mode="host",
workspace_path=workspace,
mount_path=workspace,
)
def _kill_container(self, container_name: Optional[str], docker_bin: Optional[str]):
if not container_name or not docker_bin:
return
subprocess.run(
[docker_bin, "rm", "-f", container_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
def _is_container_running(self, handle: ContainerHandle) -> bool:
if handle.mode != "docker" or not handle.container_name or not handle.sandbox_bin:
return True
try:
result = subprocess.run(
[
handle.sandbox_bin,
"inspect",
"-f",
"{{.State.Running}}",
handle.container_name,
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
timeout=3,
check=False,
)
except (OSError, subprocess.SubprocessError):
return False
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _build_container_name(self, username: str) -> str:
slug = re.sub(r"[^a-z0-9\-]", "-", username.lower()).strip("-")
if not slug:
slug = "user"
return f"{self.name_prefix}-{slug}"
def _log_stats(self, username: str, stats: Dict):
try:
record = {
"username": username,
"timestamp": time.time(),
"stats": stats,
}
with self._stats_log_path.open('a', encoding='utf-8') as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception:
pass
@staticmethod
def _normalize_username(username: Optional[str]) -> str:
return (username or "").strip().lower()