From 1eaa9461a1b6bb62896e5b7b9ccde7c9752f604b Mon Sep 17 00:00:00 2001 From: JOJO <1498581755@qq.com> Date: Wed, 25 Feb 2026 17:21:27 +0800 Subject: [PATCH] feat: enrich host workspace info --- utils/context_manager.py | 180 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 179 insertions(+), 1 deletion(-) diff --git a/utils/context_manager.py b/utils/context_manager.py index 513efb3..594bd01 100644 --- a/utils/context_manager.py +++ b/utils/context_manager.py @@ -5,6 +5,9 @@ import json import base64 import mimetypes import io +import platform +import shutil +import subprocess from copy import deepcopy from typing import Dict, List, Optional, Any from pathlib import Path @@ -62,6 +65,7 @@ class ContextManager: # 对话元数据与项目快照缓存 self.conversation_metadata: Dict[str, Any] = {} self.project_snapshot: Optional[Dict[str, Any]] = None + self._host_runtime_cache: Optional[Dict[str, str]] = None # 新增:对话持久化管理器 self.conversation_manager = ConversationManager(base_dir=self.data_dir) @@ -78,6 +82,180 @@ class ContextManager: """是否处于宿主机模式且未启用安全保护。""" return (TERMINAL_SANDBOX_MODE or "").lower() == "host" and not LINUX_SAFETY + # =========================================== + # 运行环境信息 + # =========================================== + + def _run_command(self, cmd: List[str], *, timeout: float = 1.5, cwd: Optional[Path] = None) -> str: + """运行命令并返回标准输出 / Run command and return stdout.""" + try: + completed = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=str(cwd) if cwd else None, + ) + except (OSError, subprocess.TimeoutExpired): + return "" + if completed.returncode != 0: + return "" + return (completed.stdout or "").strip() + + def _read_first_line(self, path: Path) -> str: + """读取文件首行并去除空白 / Read first line and strip.""" + try: + with path.open("r", encoding="utf-8", errors="ignore") as fh: + return fh.readline().strip() + except OSError: + return "" + + def _read_os_release_pretty(self) -> str: + """读取 Linux 发行版信息 / Read Linux distro from os-release.""" + path = Path("/etc/os-release") + if not path.exists(): + return "" + try: + content = path.read_text(encoding="utf-8", errors="ignore") + except OSError: + return "" + for line in content.splitlines(): + if line.startswith("PRETTY_NAME="): + value = line.split("=", 1)[1].strip().strip('"') + return value + return "" + + def _get_os_description(self) -> str: + """获取 OS 描述 / Get OS description.""" + system = platform.system() + if system == "Darwin": + version = platform.mac_ver()[0] or platform.release() + return f"macOS {version}".strip() + if system == "Windows": + release, version, _csd, _ptype = platform.win32_ver() + if release and version and version not in release: + return f"Windows {release} ({version})".strip() + return f"Windows {release or version or platform.release()}".strip() + if system == "Linux": + pretty = self._read_os_release_pretty() + if pretty: + return f"Linux {pretty}".strip() + version = platform.release() or platform.version() + return f"Linux {version}".strip() + version = platform.release() or platform.version() + name = system or "Unknown" + return f"{name} {version}".strip() + + def _parse_wmic_model(self, output: str) -> str: + """解析 WMIC 输出 / Parse WMIC output.""" + if not output: + return "" + lines = [line.strip() for line in output.splitlines() if line.strip()] + for line in lines: + if line.lower() == "model": + continue + return line + return "" + + def _get_device_model(self) -> str: + """获取设备型号 / Get device model.""" + system = platform.system() + if system == "Darwin": + return self._run_command(["sysctl", "-n", "hw.model"]) + if system == "Windows": + model = self._run_command( + ["powershell", "-NoProfile", "-Command", "(Get-CimInstance -ClassName Win32_ComputerSystem).Model"] + ) + if model: + return model + output = self._run_command(["wmic", "computersystem", "get", "model"]) + return self._parse_wmic_model(output) + if system == "Linux": + product = self._read_first_line(Path("/sys/devices/virtual/dmi/id/product_name")) + vendor = self._read_first_line(Path("/sys/devices/virtual/dmi/id/sys_vendor")) + if vendor and product and vendor not in product: + return f"{vendor} {product}".strip() + return product or vendor + return "" + + def _get_python_info(self) -> str: + """获取 Python 版本与可用命令 / Get Python version and commands.""" + version = platform.python_version() + commands: List[str] = [] + if shutil.which("python"): + commands.append("python") + if shutil.which("python3"): + commands.append("python3") + if commands: + return f"{version} ({', '.join(commands)})" + return f"{version} (未在PATH)" + + def _get_node_info(self) -> str: + """获取 Node 版本信息 / Get Node version info.""" + node_cmd = None + if shutil.which("node"): + node_cmd = "node" + elif shutil.which("nodejs"): + node_cmd = "nodejs" + if not node_cmd: + return "nodejs 未安装" + version = self._run_command([node_cmd, "-v"]) + if version: + if node_cmd == "nodejs": + return f"{version} (nodejs)" + return version + return f"{node_cmd} 可用" + + def _get_git_info(self) -> str: + """获取 Git 分支与状态 / Get git branch and status.""" + if not shutil.which("git"): + return "无git环境" + cwd = self.project_path if self.project_path.exists() else None + if not cwd: + return "未初始化" + inside = self._run_command(["git", "rev-parse", "--is-inside-work-tree"], cwd=cwd) + if inside.strip() != "true": + return "未初始化" + branch = self._run_command(["git", "rev-parse", "--abbrev-ref", "HEAD"], cwd=cwd) + if not branch or branch == "HEAD": + sha = self._run_command(["git", "rev-parse", "--short", "HEAD"], cwd=cwd) + branch = f"detached@{sha}" if sha else "detached" + status = self._run_command(["git", "status", "--porcelain"], cwd=cwd) + dirty = bool(status.strip()) + return f"{branch} ({'dirty' if dirty else 'clean'})" + + def _get_host_runtime_cache(self) -> Dict[str, str]: + """获取宿主机固定信息 / Get cached host info.""" + if self._host_runtime_cache: + return self._host_runtime_cache + os_desc = self._get_os_description() or "unknown" + arch = platform.machine() or platform.processor() or "unknown" + model = self._get_device_model() or "unknown" + python_info = self._get_python_info() or "unknown" + node_info = self._get_node_info() or "unknown" + git_info = self._get_git_info() or "unknown" + self._host_runtime_cache = { + "os": os_desc, + "arch": arch, + "model": model, + "python": python_info, + "node": node_info, + "git": git_info, + } + return self._host_runtime_cache + + def _build_host_runtime_environment(self) -> str: + """构建宿主机运行环境提示 / Build host runtime environment text.""" + base = self._get_host_runtime_cache() + lines = [ + "宿主机模式", + f" OS: {base.get('os', 'unknown')} | Arch: {base.get('arch', 'unknown')} | Model: {base.get('model', 'unknown')}", + f" Python: {base.get('python', 'unknown')}", + f" Node: {base.get('node', 'unknown')}", + f" Git: {base.get('git', 'unknown')}", + ] + return "\n".join(lines) + # =========================================== # Token 累计文件工具 # =========================================== @@ -1482,7 +1660,7 @@ class ContextManager: is_host = self._is_host_mode_without_safety() runtime_environment = ( - "宿主机模式" + self._build_host_runtime_environment() if is_host else f"隔离容器中(挂载目录 {self.container_mount_path or '/workspace'}),宿主机路径已隐藏" )