agent-Specialization/modules/terminal_ops.py

613 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/terminal_ops.py - 终端操作模块修复Python命令检测
import os
import sys
import asyncio
import subprocess
import shutil
import time
import signal
from pathlib import Path
from typing import Dict, Optional, Tuple, TYPE_CHECKING
try:
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
except ImportError:
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
from modules.toolbox_container import ToolboxContainer
if TYPE_CHECKING:
from modules.user_container_manager import ContainerHandle
class TerminalOperator:
def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None):
self.project_path = Path(project_path).resolve()
self.process = None
# 自动检测Python命令
self.python_cmd = self._detect_python_command()
print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}")
self._toolbox: Optional[ToolboxContainer] = None
self.container_session: Optional["ContainerHandle"] = container_session
def _reset_toolbox(self):
"""强制关闭并重建工具终端,保证每次命令/脚本运行独立环境。"""
if self._toolbox:
try:
self._toolbox.shutdown()
except Exception:
pass
self._toolbox = None
def _detect_python_command(self) -> str:
"""
自动检测可用的Python命令
Returns:
可用的Python命令python、python3、py
"""
# 按优先级尝试不同的Python命令
commands_to_try = []
if sys.platform == "win32":
# Windows优先顺序
commands_to_try = ["python", "py", "python3"]
else:
# Unix-like系统优先顺序
commands_to_try = ["python3", "python"]
# 检测哪个命令可用
for cmd in commands_to_try:
if shutil.which(cmd):
try:
# 验证是否真的可以运行
result = subprocess.run(
[cmd, "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
# 检查版本是否为Python 3
output = result.stdout + result.stderr
if "Python 3" in output or "Python 2" not in output:
return cmd
except:
continue
# 如果都没找到,根据平台返回默认值
return "python" if sys.platform == "win32" else "python3"
def _get_toolbox(self) -> ToolboxContainer:
if self._toolbox is None:
self._toolbox = ToolboxContainer(
project_path=str(self.project_path),
idle_timeout=TOOLBOX_TERMINAL_IDLE_SECONDS,
container_session=self.container_session,
)
return self._toolbox
def set_container_session(self, session: Optional["ContainerHandle"]):
if session is self.container_session:
return
self.container_session = session
if self._toolbox:
self._toolbox.set_container_session(session)
def _validate_command(self, command: str) -> Tuple[bool, str]:
"""验证命令安全性"""
# 检查禁止的命令
for forbidden in FORBIDDEN_COMMANDS:
if forbidden in command.lower():
return False, f"禁止执行的命令: {forbidden}"
# 检查危险的命令模式
dangerous_patterns = [
"sudo",
"chmod 777",
"rm -rf",
"> /dev/",
"fork bomb"
]
for pattern in dangerous_patterns:
if pattern in command.lower():
return False, f"检测到危险命令模式: {pattern}"
return True, ""
@staticmethod
def _clamp_timeout(requested: Optional[int], default: int, max_limit: int) -> int:
"""对timeout进行默认化与上限夹紧。"""
if not requested or requested <= 0:
return default
return min(int(requested), max_limit)
async def run_command(
self,
command: str,
working_dir: str = None,
timeout: int = None
) -> Dict:
"""
执行终端命令
Args:
command: 要执行的命令
working_dir: 工作目录
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
if timeout is None or timeout <= 0:
return {
"success": False,
"error": "timeout 参数必填且需大于0",
"status": "error",
"output": "timeout 参数缺失",
"return_code": -1
}
# 每次执行前重置工具容器(保持隔离),但下面改用一次性子进程执行,仍保留重置以兼容后续逻辑
self._reset_toolbox()
# 替换命令中的python3为实际可用的命令
if "python3" in command and self.python_cmd != "python3":
command = command.replace("python3", self.python_cmd)
elif "python" in command and "python3" not in command and self.python_cmd == "python3":
# 如果命令中有python但不是python3而系统使用python3
command = command.replace("python", self.python_cmd)
# 验证命令
valid, error = self._validate_command(command)
if not valid:
return {
"success": False,
"error": error,
"output": "",
"return_code": -1
}
# 设置工作目录
try:
work_path = self._resolve_work_path(working_dir)
except ValueError:
return {
"success": False,
"error": "工作目录必须在项目文件夹内",
"output": "",
"return_code": -1
}
# 默认10s上限30s
timeout = self._clamp_timeout(timeout, default=10, max_limit=TERMINAL_COMMAND_TIMEOUT)
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}")
print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}")
start_ts = time.time()
# 改为一次性子进程执行,确保等待到超时或命令结束
result_payload = await self._run_command_subprocess(command, work_path, timeout)
# 字符数检查
if result_payload.get("success") and "output" in result_payload:
char_count = len(result_payload["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
return {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": command
}
result_payload.setdefault("status", "completed" if result_payload.get("success") else "error")
result_payload["timeout"] = timeout
result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000)
return result_payload
def _resolve_work_path(self, working_dir: Optional[str]) -> Path:
if working_dir:
work_path = (self.project_path / working_dir).resolve()
work_path.relative_to(self.project_path)
return work_path
return self.project_path
def _format_toolbox_output(self, payload: Dict) -> Dict:
success = bool(payload.get("success"))
output_text = payload.get("output", "") or ""
# 去掉常见的交互式shell警告
noisy_lines = (
"bash: cannot set terminal process group",
"bash: no job control in this shell",
)
filtered = []
for line in output_text.splitlines():
if any(noise in line for noise in noisy_lines):
continue
filtered.append(line)
output_text = "\n".join(filtered)
result = {
"success": success,
"output": output_text,
"return_code": 0 if success else -1
}
if not success:
result["error"] = payload.get("error") or payload.get("message", "命令执行失败")
if "message" in payload:
result["message"] = payload["message"]
if "status" in payload:
result["status"] = payload["status"]
if "truncated" in payload:
result["truncated"] = payload["truncated"]
return result
async def _run_command_subprocess(self, command: str, work_path: Path, timeout: int) -> Dict:
start_ts = time.time()
try:
process = None
exec_cmd = None
use_shell = True
# 如果存在容器会话且模式为docker则在容器内执行
if self.container_session and getattr(self.container_session, "mode", None) == "docker":
container_name = getattr(self.container_session, "container_name", None)
mount_path = getattr(self.container_session, "mount_path", "/workspace") or "/workspace"
docker_bin = shutil.which("docker") or "docker"
try:
relative = work_path.relative_to(self.project_path).as_posix()
except ValueError:
relative = ""
container_workdir = mount_path.rstrip("/")
if relative:
container_workdir = f"{container_workdir}/{relative}"
exec_cmd = [
docker_bin,
"exec",
"-w",
container_workdir,
container_name,
"/bin/bash",
"-lc",
command,
]
use_shell = False
# 统一环境,确保 Python 输出无缓冲
env = os.environ.copy()
env.setdefault("PYTHONUNBUFFERED", "1")
if use_shell:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(work_path),
shell=True,
env=env,
start_new_session=True,
)
else:
process = await asyncio.create_subprocess_exec(
*exec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
start_new_session=True,
)
stdout_buf: list[bytes] = []
stderr_buf: list[bytes] = []
async def _read_stream(stream, collector):
try:
async for chunk in stream:
if chunk:
collector.append(chunk)
except asyncio.CancelledError:
pass
except Exception:
pass
stdout_task = asyncio.create_task(_read_stream(process.stdout, stdout_buf))
stderr_task = asyncio.create_task(_read_stream(process.stderr, stderr_buf))
timed_out = False
try:
await asyncio.wait_for(process.wait(), timeout=timeout)
except asyncio.TimeoutError:
timed_out = True
try:
os.killpg(process.pid, signal.SIGINT)
except Exception:
pass
try:
await asyncio.wait_for(process.wait(), timeout=2)
except asyncio.TimeoutError:
try:
os.killpg(process.pid, signal.SIGKILL)
except Exception:
process.kill()
await process.wait()
# 确保读取协程结束
await asyncio.gather(stdout_task, stderr_task, return_exceptions=True)
# 兜底再读一次,防止剩余缓冲未被读取
try:
remaining_out = await process.stdout.read()
if remaining_out:
stdout_buf.append(remaining_out)
except Exception:
pass
try:
remaining_err = await process.stderr.read()
if remaining_err:
stderr_buf.append(remaining_err)
except Exception:
pass
stdout = b"".join(stdout_buf)
stderr = b"".join(stderr_buf)
stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
success = (process.returncode == 0) and not timed_out
status = "completed" if success else ("timeout" if timed_out else "error")
output_parts = []
if stdout_text:
output_parts.append(stdout_text)
if stderr_text:
output_parts.append(stderr_text)
combined_output = "\n".join(output_parts)
truncated = False
if MAX_RUN_COMMAND_CHARS and len(combined_output) > MAX_RUN_COMMAND_CHARS:
truncated = True
combined_output = combined_output[-MAX_RUN_COMMAND_CHARS:]
response = {
"success": success,
"status": status,
"command": command,
"output": combined_output,
"return_code": process.returncode,
"truncated": truncated,
"timeout": timeout,
"elapsed_ms": int((time.time() - start_ts) * 1000)
}
if not success and timed_out:
response["message"] = f"命令执行超时 ({timeout}秒)"
elif not success and process.returncode is not None:
response["message"] = f"命令执行失败 (返回码: {process.returncode})"
if stderr_text:
response["stderr"] = stderr_text
return response
except Exception as exc:
return {
"success": False,
"status": "error",
"error": f"执行失败: {str(exc)}",
"output": "",
"return_code": -1,
"timeout": timeout,
"elapsed_ms": int((time.time() - start_ts) * 1000)
}
async def run_python_code(
self,
code: str,
timeout: int = None
) -> Dict:
"""
执行Python代码
Args:
code: Python代码
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
if timeout is None or timeout <= 0:
return {
"success": False,
"error": "timeout 参数必填且需大于0",
"status": "error",
"output": "timeout 参数缺失",
"return_code": -1
}
timeout = self._clamp_timeout(timeout, default=timeout, max_limit=CODE_EXECUTION_TIMEOUT)
# 强制重置工具容器,避免上一段代码仍在运行时输出混入
self._reset_toolbox()
# 创建临时Python文件
temp_file = self.project_path / ".temp_code.py"
try:
# 写入代码
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
print(f"{OUTPUT_FORMATS['code']} 执行Python代码")
try:
relative_temp = temp_file.relative_to(self.project_path).as_posix()
except ValueError:
relative_temp = temp_file.as_posix()
# 使用检测到的Python命令执行文件相对路径可兼容容器挂载路径
result = await self.run_command(
f'{self.python_cmd} -u "{relative_temp}"',
timeout=timeout
)
# 添加代码到结果
result["code"] = code
return result
finally:
# 清理临时文件
if temp_file.exists():
temp_file.unlink()
async def run_python_file(
self,
file_path: str,
args: str = "",
timeout: int = None
) -> Dict:
"""
执行Python文件
Args:
file_path: Python文件路径
args: 命令行参数
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 构建完整路径
full_path = (self.project_path / file_path).resolve()
# 验证文件存在
if not full_path.exists():
return {
"success": False,
"error": "文件不存在",
"output": "",
"return_code": -1
}
# 验证是Python文件
if not full_path.suffix == '.py':
return {
"success": False,
"error": "不是Python文件",
"output": "",
"return_code": -1
}
# 验证文件在项目内
try:
full_path.relative_to(self.project_path)
except ValueError:
return {
"success": False,
"error": "文件必须在项目文件夹内",
"output": "",
"return_code": -1
}
print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}")
try:
relative_path = full_path.relative_to(self.project_path).as_posix()
except ValueError:
relative_path = full_path.as_posix()
# 使用检测到的Python命令构建命令
command = f'{self.python_cmd} "{relative_path}"'
if args:
command += f" {args}"
# 执行命令
return await self.run_command(command, timeout=timeout)
async def install_package(self, package: str) -> Dict:
"""
安装Python包
Args:
package: 包名
Returns:
安装结果
"""
print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}")
# 使用检测到的Python命令安装
command = f'{self.python_cmd} -m pip install {package}'
result = await self.run_command(command, timeout=120)
if result["success"]:
print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}")
else:
print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}")
return result
async def check_environment(self) -> Dict:
"""检查Python环境"""
print(f"{OUTPUT_FORMATS['info']} 检查Python环境...")
env_info = {
"python_command": self.python_cmd,
"python_version": "",
"pip_version": "",
"installed_packages": [],
"working_directory": str(self.project_path)
}
# 获取Python版本
version_result = await self.run_command(
f'{self.python_cmd} --version',
timeout=5
)
if version_result["success"]:
env_info["python_version"] = version_result["output"].strip()
# 获取pip版本
pip_result = await self.run_command(
f'{self.python_cmd} -m pip --version',
timeout=5
)
if pip_result["success"]:
env_info["pip_version"] = pip_result["output"].strip()
# 获取已安装的包
packages_result = await self.run_command(
f'{self.python_cmd} -m pip list --format=json',
timeout=10
)
if packages_result["success"]:
try:
import json
packages = json.loads(packages_result["output"])
env_info["installed_packages"] = [
f"{p['name']}=={p['version']}" for p in packages
]
except:
pass
return {
"success": True,
"environment": env_info
}
def kill_process(self):
"""终止当前运行的进程"""
if self.process and self.process.returncode is None:
self.process.kill()
print(f"{OUTPUT_FORMATS['warning']} 进程已终止")