agent-Specialization/modules/terminal_ops.py

779 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/terminal_ops.py - 终端操作模块修复Python命令检测
import os
import sys
import asyncio
import subprocess
import shutil
import time
import signal
import re
from pathlib import Path
from typing import Dict, Optional, Tuple, TYPE_CHECKING
from types import SimpleNamespace
try:
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
except ImportError:
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
from modules.toolbox_container import ToolboxContainer
if TYPE_CHECKING:
from modules.user_container_manager import ContainerHandle
from modules.terminal_manager import TerminalManager
class TerminalOperator:
def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None):
self.project_path = Path(project_path).resolve()
self.process = None
# 自动检测Python命令并记录虚拟环境变量仅宿主机使用
self._python_env: Dict[str, str] = {}
self.python_cmd = self._detect_python_runtime()
# Docker 容器内的 Python 命令(默认指向预装 venv
self.container_python_cmd = os.environ.get("CONTAINER_PYTHON_CMD", "/opt/agent-venv/bin/python3")
print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}")
self._toolbox: Optional[ToolboxContainer] = None
self.container_session: Optional["ContainerHandle"] = container_session
# 记录 TerminalManager 引用,便于 CLI 场景复用同一容器
self._terminal_manager: Optional["TerminalManager"] = None
def _reset_toolbox(self):
"""强制关闭并重建工具终端,保证每次命令/脚本运行独立环境。"""
if self._toolbox:
try:
self._toolbox.shutdown()
except Exception:
pass
self._toolbox = None
def _detect_python_runtime(self) -> str:
"""
自动检测可用的 Python 命令,优先使用预装虚拟环境。
1) 优先查找 AGENT_TOOLBOX_VENV /opt/agent-venv / 当前进程 VIRTUAL_ENV。
2) 找不到预装虚拟环境时,回退到系统可执行文件。
"""
preferred = self._detect_preinstalled_python()
if preferred:
# 记录虚拟环境相关环境变量,供宿主机子进程复用
self._python_env = self._build_python_env(preferred)
return preferred
return self._detect_system_python()
@staticmethod
def _derive_pip_from_python(python_path: str) -> str:
"""
根据 python 可执行文件推导匹配的 pip可避免“python 在 venv、pip 却指向系统”。
若同目录下找不到 pip3/pip则回退为 `<python> -m pip`。
"""
try:
bin_dir = Path(python_path).resolve().parent
for name in ("pip3", "pip"):
cand = bin_dir / name
if cand.exists() and os.access(cand, os.X_OK):
return str(cand)
except Exception:
pass
return f"{python_path} -m pip"
def attach_terminal_manager(self, manager: Optional["TerminalManager"]):
"""由 MainTerminal/WebTerminal 注入 TerminalManager便于共享终端容器。"""
self._terminal_manager = manager
def _resolve_active_container_session(self) -> Optional[SimpleNamespace]:
"""
若已存在活动终端且在容器内运行,返回一个临时的容器句柄,
以便 run_command/run_python 复用同一个容器环境。
"""
manager = self._terminal_manager
if not manager:
return None
active_name = getattr(manager, "active_terminal", None)
if not active_name:
return None
terminal = manager.terminals.get(active_name) if getattr(manager, "terminals", None) else None
if not terminal or not getattr(terminal, "using_container", False):
return None
container_name = getattr(terminal, "sandbox_container_name", None)
if not container_name:
return None
try:
mount_path = (terminal.sandbox_options.get("mount_path") or "/workspace").rstrip("/") or "/workspace"
except Exception:
mount_path = "/workspace"
return SimpleNamespace(mode="docker", container_name=container_name, mount_path=mount_path)
def _will_use_container(self, session_override: Optional["ContainerHandle"]) -> bool:
"""根据会话/回退策略判断此次执行是否在容器中进行。"""
if session_override:
return getattr(session_override, "mode", None) == "docker"
if self.container_session:
return getattr(self.container_session, "mode", None) == "docker"
# 未绑定容器会话时会使用工具箱容器(同样是 Docker
return True
def _detect_preinstalled_python(self) -> Optional[str]:
"""尝试定位预装虚拟环境的 python 可执行文件。"""
candidates = []
# 环境变量优先Dockerfile 已设置 AGENT_TOOLBOX_VENV=/opt/agent-venv
env_venv = os.environ.get("AGENT_TOOLBOX_VENV")
if env_venv:
candidates.append(env_venv)
# 默认预装路径
candidates.append("/opt/agent-venv")
# 当前进程若已激活虚拟环境,也纳入候选
current_venv = os.environ.get("VIRTUAL_ENV")
if current_venv:
candidates.append(current_venv)
# 去重并依次检查
seen = set()
for raw in candidates:
if not raw or raw in seen:
continue
seen.add(raw)
root = Path(raw).expanduser()
bin_dir = root / ("Scripts" if sys.platform == "win32" else "bin")
for name in ("python3", "python"):
candidate = bin_dir / name
if candidate.exists() and os.access(candidate, os.X_OK):
return str(candidate.resolve())
return None
def _detect_system_python(self) -> str:
"""在系统 PATH 中探测可用的 Python3 可执行文件。"""
commands_to_try = []
if sys.platform == "win32":
commands_to_try = ["python", "py", "python3"]
else:
commands_to_try = ["python3", "python"]
for cmd in commands_to_try:
if shutil.which(cmd):
try:
result = subprocess.run(
[cmd, "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
output = result.stdout + result.stderr
if "Python 3" in output or "Python 2" not in output:
return cmd
except Exception:
continue
return "python" if sys.platform == "win32" else "python3"
def _build_python_env(self, python_path: str) -> Dict[str, str]:
"""构造与预装虚拟环境匹配的环境变量(仅作用于宿主机子进程)。"""
env: Dict[str, str] = {}
try:
py_path = Path(python_path).resolve()
bin_dir = py_path.parent
venv_dir = bin_dir.parent
env["VIRTUAL_ENV"] = str(venv_dir)
current_path = os.environ.get("PATH", "")
# 避免重复添加
prefix = str(bin_dir)
if current_path.startswith(prefix + os.pathsep) or current_path == prefix:
env["PATH"] = current_path
else:
env["PATH"] = f"{prefix}{os.pathsep}{current_path}" if current_path else prefix
except Exception:
pass
return env
def _get_toolbox(self) -> ToolboxContainer:
if self._toolbox is None:
self._toolbox = ToolboxContainer(
project_path=str(self.project_path),
idle_timeout=TOOLBOX_TERMINAL_IDLE_SECONDS,
container_session=self.container_session,
)
return self._toolbox
def set_container_session(self, session: Optional["ContainerHandle"]):
if session is self.container_session:
return
self.container_session = session
if self._toolbox:
self._toolbox.set_container_session(session)
def _validate_command(self, command: str) -> Tuple[bool, str]:
"""验证命令安全性"""
# 检查禁止的命令
for forbidden in FORBIDDEN_COMMANDS:
if forbidden in command.lower():
return False, f"禁止执行的命令: {forbidden}"
# 检查危险的命令模式
dangerous_patterns = [
"sudo",
"chmod 777",
"rm -rf",
"> /dev/",
"fork bomb"
]
for pattern in dangerous_patterns:
if pattern in command.lower():
return False, f"检测到危险命令模式: {pattern}"
return True, ""
@staticmethod
def _clamp_timeout(requested: Optional[int], default: int, max_limit: int) -> int:
"""对timeout进行默认化与上限夹紧。"""
if not requested or requested <= 0:
return default
return min(int(requested), max_limit)
async def run_command(
self,
command: str,
working_dir: str = None,
timeout: int = None
) -> Dict:
"""
执行终端命令
Args:
command: 要执行的命令
working_dir: 工作目录
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
if timeout is None or timeout <= 0:
return {
"success": False,
"error": "timeout 参数必填且需大于0",
"status": "error",
"output": "timeout 参数缺失",
"return_code": -1
}
# 每次执行前重置工具容器(保持隔离),但下面改用一次性子进程执行,仍保留重置以兼容后续逻辑
self._reset_toolbox()
# 尝试复用活动终端的容器CLI 场景与 terminal_input 环境保持一致)
session_override = None
if not self.container_session:
session_override = self._resolve_active_container_session()
# 判断本次应在容器中执行:已绑定容器会话、复用终端容器或将使用工具箱容器
execution_in_container = self._will_use_container(session_override)
python_rewrite = self.container_python_cmd if execution_in_container else self.python_cmd
pip_rewrite = self._derive_pip_from_python(python_rewrite)
# 统一替换 python/python3为保障虚拟环境命中只替换独立单词
if re.search(r"\bpython3?\b", command):
command = re.sub(r"\bpython3?\b", python_rewrite, command)
# 同步替换 pip/pip3确保指向同一环境
if re.search(r"(?<![/.\w-])pip3?\b", command):
command = re.sub(r"(?<![/.\w-])pip3?\b", pip_rewrite, command)
# 验证命令
valid, error = self._validate_command(command)
if not valid:
return {
"success": False,
"error": error,
"output": "",
"return_code": -1
}
# 设置工作目录
try:
work_path = self._resolve_work_path(working_dir)
except ValueError:
return {
"success": False,
"error": "工作目录必须在项目文件夹内",
"output": "",
"return_code": -1
}
# 默认10s上限30s
timeout = self._clamp_timeout(timeout, default=10, max_limit=TERMINAL_COMMAND_TIMEOUT)
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}")
print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}")
start_ts = time.time()
# 优先在绑定的容器或活动终端的容器内执行,保证与实时终端环境一致
if self.container_session or session_override:
result_payload = await self._run_command_subprocess(
command,
work_path,
timeout,
session_override=session_override
)
else:
# 若未绑定用户容器,则使用工具箱容器(与终端相同镜像/预装包)
toolbox = self._get_toolbox()
payload = await toolbox.run(command, work_path, timeout)
result_payload = self._format_toolbox_output(payload)
# 追加耗时信息以对齐接口
result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000)
result_payload["timeout"] = timeout
# 字符数检查(与主流程一致)
if result_payload.get("success") and "output" in result_payload:
char_count = len(result_payload["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
return {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": command
}
return result_payload
# 改为一次性子进程执行,确保等待到超时或命令结束
result_payload = result_payload if result_payload is not None else await self._run_command_subprocess(
command, work_path, timeout
)
# 字符数检查
if result_payload.get("success") and "output" in result_payload:
char_count = len(result_payload["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
return {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": command
}
result_payload.setdefault("status", "completed" if result_payload.get("success") else "error")
result_payload["timeout"] = timeout
result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000)
return result_payload
def _resolve_work_path(self, working_dir: Optional[str]) -> Path:
if working_dir:
work_path = (self.project_path / working_dir).resolve()
work_path.relative_to(self.project_path)
return work_path
return self.project_path
def _format_toolbox_output(self, payload: Dict) -> Dict:
success = bool(payload.get("success"))
output_text = payload.get("output", "") or ""
# 去掉常见的交互式shell警告
noisy_lines = (
"bash: cannot set terminal process group",
"bash: no job control in this shell",
)
filtered = []
for line in output_text.splitlines():
if any(noise in line for noise in noisy_lines):
continue
filtered.append(line)
output_text = "\n".join(filtered)
result = {
"success": success,
"output": output_text,
"return_code": 0 if success else -1
}
if not success:
result["error"] = payload.get("error") or payload.get("message", "命令执行失败")
if "message" in payload:
result["message"] = payload["message"]
if "status" in payload:
result["status"] = payload["status"]
if "truncated" in payload:
result["truncated"] = payload["truncated"]
return result
async def _run_command_subprocess(
self,
command: str,
work_path: Path,
timeout: int,
session_override: Optional["ContainerHandle"] = None
) -> Dict:
start_ts = time.time()
try:
process = None
exec_cmd = None
use_shell = True
session = session_override or self.container_session
# 如果存在容器会话且模式为docker则在容器内执行
if session and getattr(session, "mode", None) == "docker":
container_name = getattr(session, "container_name", None)
mount_path = getattr(session, "mount_path", "/workspace") or "/workspace"
docker_bin = shutil.which("docker") or "docker"
try:
relative = work_path.relative_to(self.project_path).as_posix()
except ValueError:
relative = ""
container_workdir = mount_path.rstrip("/")
if relative:
container_workdir = f"{container_workdir}/{relative}"
exec_cmd = [
docker_bin,
"exec",
"-e",
"PATH=/opt/agent-venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
"-e",
"VIRTUAL_ENV=/opt/agent-venv",
"-w",
container_workdir,
container_name,
"/bin/bash",
"-lc",
command,
]
use_shell = False
# 统一环境,确保 Python 输出无缓冲
env = os.environ.copy()
env.setdefault("PYTHONUNBUFFERED", "1")
if self._python_env:
env.update(self._python_env)
if use_shell:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(work_path),
shell=True,
env=env,
start_new_session=True,
)
else:
process = await asyncio.create_subprocess_exec(
*exec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
start_new_session=True,
)
stdout_buf: list[bytes] = []
stderr_buf: list[bytes] = []
async def _read_stream(stream, collector):
try:
async for chunk in stream:
if chunk:
collector.append(chunk)
except asyncio.CancelledError:
pass
except Exception:
pass
stdout_task = asyncio.create_task(_read_stream(process.stdout, stdout_buf))
stderr_task = asyncio.create_task(_read_stream(process.stderr, stderr_buf))
timed_out = False
try:
await asyncio.wait_for(process.wait(), timeout=timeout)
except asyncio.TimeoutError:
timed_out = True
try:
os.killpg(process.pid, signal.SIGINT)
except Exception:
pass
try:
await asyncio.wait_for(process.wait(), timeout=2)
except asyncio.TimeoutError:
try:
os.killpg(process.pid, signal.SIGKILL)
except Exception:
process.kill()
await process.wait()
# 确保读取协程结束
await asyncio.gather(stdout_task, stderr_task, return_exceptions=True)
# 兜底再读一次,防止剩余缓冲未被读取
try:
remaining_out = await process.stdout.read()
if remaining_out:
stdout_buf.append(remaining_out)
except Exception:
pass
try:
remaining_err = await process.stderr.read()
if remaining_err:
stderr_buf.append(remaining_err)
except Exception:
pass
stdout = b"".join(stdout_buf)
stderr = b"".join(stderr_buf)
stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
success = (process.returncode == 0) and not timed_out
status = "completed" if success else ("timeout" if timed_out else "error")
output_parts = []
if stdout_text:
output_parts.append(stdout_text)
if stderr_text:
output_parts.append(stderr_text)
combined_output = "\n".join(output_parts)
truncated = False
if MAX_RUN_COMMAND_CHARS and len(combined_output) > MAX_RUN_COMMAND_CHARS:
truncated = True
combined_output = combined_output[-MAX_RUN_COMMAND_CHARS:]
response = {
"success": success,
"status": status,
"command": command,
"output": combined_output,
"return_code": process.returncode,
"truncated": truncated,
"timeout": timeout,
"elapsed_ms": int((time.time() - start_ts) * 1000)
}
if not success and timed_out:
response["message"] = f"命令执行超时 ({timeout}秒)"
elif not success and process.returncode is not None:
response["message"] = f"命令执行失败 (返回码: {process.returncode})"
if stderr_text:
response["stderr"] = stderr_text
return response
except Exception as exc:
return {
"success": False,
"status": "error",
"error": f"执行失败: {str(exc)}",
"output": "",
"return_code": -1,
"timeout": timeout,
"elapsed_ms": int((time.time() - start_ts) * 1000)
}
async def run_python_code(
self,
code: str,
timeout: int = None
) -> Dict:
"""
执行Python代码
Args:
code: Python代码
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
if timeout is None or timeout <= 0:
return {
"success": False,
"error": "timeout 参数必填且需大于0",
"status": "error",
"output": "timeout 参数缺失",
"return_code": -1
}
timeout = self._clamp_timeout(timeout, default=timeout, max_limit=CODE_EXECUTION_TIMEOUT)
# 强制重置工具容器,避免上一段代码仍在运行时输出混入
self._reset_toolbox()
# 创建临时Python文件
temp_file = self.project_path / ".temp_code.py"
try:
# 写入代码
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
print(f"{OUTPUT_FORMATS['code']} 执行Python代码")
try:
relative_temp = temp_file.relative_to(self.project_path).as_posix()
except ValueError:
relative_temp = temp_file.as_posix()
# 使用通用 python3 占位,由 run_command 根据执行环境重写
result = await self.run_command(
f'python3 -u "{relative_temp}"',
timeout=timeout
)
# 添加代码到结果
result["code"] = code
return result
finally:
# 清理临时文件
if temp_file.exists():
temp_file.unlink()
async def run_python_file(
self,
file_path: str,
args: str = "",
timeout: int = None
) -> Dict:
"""
执行Python文件
Args:
file_path: Python文件路径
args: 命令行参数
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 构建完整路径
full_path = (self.project_path / file_path).resolve()
# 验证文件存在
if not full_path.exists():
return {
"success": False,
"error": "文件不存在",
"output": "",
"return_code": -1
}
# 验证是Python文件
if not full_path.suffix == '.py':
return {
"success": False,
"error": "不是Python文件",
"output": "",
"return_code": -1
}
# 验证文件在项目内
try:
full_path.relative_to(self.project_path)
except ValueError:
return {
"success": False,
"error": "文件必须在项目文件夹内",
"output": "",
"return_code": -1
}
print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}")
try:
relative_path = full_path.relative_to(self.project_path).as_posix()
except ValueError:
relative_path = full_path.as_posix()
# 使用通用 python3 占位,由 run_command 根据执行环境重写
command = f'python3 "{relative_path}"'
if args:
command += f" {args}"
# 执行命令
return await self.run_command(command, timeout=timeout)
async def install_package(self, package: str) -> Dict:
"""
安装Python包
Args:
package: 包名
Returns:
安装结果
"""
print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}")
# 使用通用 python3 占位,由 run_command 根据执行环境重写
command = f'python3 -m pip install {package}'
result = await self.run_command(command, timeout=120)
if result["success"]:
print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}")
else:
print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}")
return result
async def check_environment(self) -> Dict:
"""检查Python环境"""
print(f"{OUTPUT_FORMATS['info']} 检查Python环境...")
env_info = {
"python_command": self.python_cmd,
"python_version": "",
"pip_version": "",
"installed_packages": [],
"working_directory": str(self.project_path)
}
# 获取Python版本
version_result = await self.run_command(
'python3 --version',
timeout=5
)
if version_result["success"]:
env_info["python_version"] = version_result["output"].strip()
# 获取pip版本
pip_result = await self.run_command(
'python3 -m pip --version',
timeout=5
)
if pip_result["success"]:
env_info["pip_version"] = pip_result["output"].strip()
# 获取已安装的包
packages_result = await self.run_command(
'python3 -m pip list --format=json',
timeout=10
)
if packages_result["success"]:
try:
import json
packages = json.loads(packages_result["output"])
env_info["installed_packages"] = [
f"{p['name']}=={p['version']}" for p in packages
]
except:
pass
return {
"success": True,
"environment": env_info
}
def kill_process(self):
"""终止当前运行的进程"""
if self.process and self.process.returncode is None:
self.process.kill()
print(f"{OUTPUT_FORMATS['warning']} 进程已终止")