agent-Specialization/sub_agent/modules/terminal_ops.py

423 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/terminal_ops.py - 终端操作模块修复Python命令检测
import os
import sys
import asyncio
import subprocess
import shutil
import re
from pathlib import Path
from typing import Dict, Optional, Tuple
try:
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS
)
except ImportError:
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS
)
class TerminalOperator:
def __init__(self, project_path: str):
self.project_path = Path(project_path).resolve()
self.process = None
# 自动检测Python命令并尝试复用预装虚拟环境
self._python_env: Dict[str, str] = {}
self.python_cmd = self._detect_python_runtime()
print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}")
def _detect_python_runtime(self) -> str:
"""优先选择预装虚拟环境的 Python其次回退系统可执行文件。"""
preferred = self._detect_preinstalled_python()
if preferred:
self._python_env = self._build_python_env(preferred)
return preferred
return self._detect_system_python()
def _detect_preinstalled_python(self) -> Optional[str]:
candidates = []
env_venv = os.environ.get("AGENT_TOOLBOX_VENV") or os.environ.get("VIRTUAL_ENV")
if env_venv:
candidates.append(env_venv)
candidates.append("/opt/agent-venv")
seen = set()
for raw in candidates:
if not raw or raw in seen:
continue
seen.add(raw)
root = Path(raw).expanduser()
bin_dir = root / ("Scripts" if sys.platform == "win32" else "bin")
for name in ("python3", "python"):
cand = bin_dir / name
if cand.exists() and os.access(cand, os.X_OK):
return str(cand.resolve())
return None
def _detect_system_python(self) -> str:
if sys.platform == "win32":
commands_to_try = ["python", "py", "python3"]
else:
commands_to_try = ["python3", "python"]
for cmd in commands_to_try:
if shutil.which(cmd):
try:
result = subprocess.run(
[cmd, "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
output = result.stdout + result.stderr
if "Python 3" in output or "Python 2" not in output:
return cmd
except Exception:
continue
return "python" if sys.platform == "win32" else "python3"
def _build_python_env(self, python_path: str) -> Dict[str, str]:
env: Dict[str, str] = {}
try:
py_path = Path(python_path).resolve()
bin_dir = py_path.parent
venv_dir = bin_dir.parent
env["VIRTUAL_ENV"] = str(venv_dir)
current_path = os.environ.get("PATH", "")
prefix = str(bin_dir)
if current_path.startswith(prefix + os.pathsep) or current_path == prefix:
env["PATH"] = current_path
else:
env["PATH"] = f"{prefix}{os.pathsep}{current_path}" if current_path else prefix
except Exception:
pass
return env
def _validate_command(self, command: str) -> Tuple[bool, str]:
"""验证命令安全性"""
# 检查禁止的命令
for forbidden in FORBIDDEN_COMMANDS:
if forbidden in command.lower():
return False, f"禁止执行的命令: {forbidden}"
# 检查危险的命令模式
dangerous_patterns = [
"sudo",
"chmod 777",
"rm -rf",
"> /dev/",
"fork bomb"
]
for pattern in dangerous_patterns:
if pattern in command.lower():
return False, f"检测到危险命令模式: {pattern}"
return True, ""
async def run_command(
self,
command: str,
working_dir: str = None,
timeout: int = None
) -> Dict:
"""
执行终端命令
Args:
command: 要执行的命令
working_dir: 工作目录
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 替换 python/python3确保命中预装虚拟环境
if re.search(r"\bpython3?\b", command):
command = re.sub(r"\bpython3?\b", self.python_cmd, command)
# 验证命令
valid, error = self._validate_command(command)
if not valid:
return {
"success": False,
"error": error,
"output": "",
"return_code": -1
}
# 设置工作目录
if working_dir:
work_path = (self.project_path / working_dir).resolve()
# 确保工作目录在项目内
try:
work_path.relative_to(self.project_path)
except ValueError:
return {
"success": False,
"error": "工作目录必须在项目文件夹内",
"output": "",
"return_code": -1
}
else:
work_path = self.project_path
timeout = timeout or TERMINAL_COMMAND_TIMEOUT
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}")
print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}")
try:
# 创建进程
env = os.environ.copy()
if self._python_env:
env.update(self._python_env)
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(work_path),
shell=True,
env=env
)
# 等待执行完成
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
"success": False,
"error": f"命令执行超时 ({timeout}秒)",
"output": "",
"return_code": -1
}
# 解码输出
stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
output = stdout_text
if stderr_text:
output += f"\n[错误输出]\n{stderr_text}"
success = process.returncode == 0
if success:
print(f"{OUTPUT_FORMATS['success']} 命令执行成功")
else:
print(f"{OUTPUT_FORMATS['error']} 命令执行失败 (返回码: {process.returncode})")
return {
"success": success,
"output": output,
"stdout": stdout_text,
"stderr": stderr_text,
"return_code": process.returncode,
"command": command
}
except Exception as e:
return {
"success": False,
"error": f"执行失败: {str(e)}",
"output": "",
"return_code": -1
}
async def run_python_code(
self,
code: str,
timeout: int = None
) -> Dict:
"""
执行Python代码
Args:
code: Python代码
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
timeout = timeout or CODE_EXECUTION_TIMEOUT
# 创建临时Python文件
temp_file = self.project_path / ".temp_code.py"
try:
# 写入代码
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
print(f"{OUTPUT_FORMATS['code']} 执行Python代码")
# 使用检测到的Python命令执行文件
result = await self.run_command(
f'{self.python_cmd} "{temp_file}"',
timeout=timeout
)
# 添加代码到结果
result["code"] = code
return result
finally:
# 清理临时文件
if temp_file.exists():
temp_file.unlink()
async def run_python_file(
self,
file_path: str,
args: str = "",
timeout: int = None
) -> Dict:
"""
执行Python文件
Args:
file_path: Python文件路径
args: 命令行参数
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 构建完整路径
full_path = (self.project_path / file_path).resolve()
# 验证文件存在
if not full_path.exists():
return {
"success": False,
"error": "文件不存在",
"output": "",
"return_code": -1
}
# 验证是Python文件
if not full_path.suffix == '.py':
return {
"success": False,
"error": "不是Python文件",
"output": "",
"return_code": -1
}
# 验证文件在项目内
try:
full_path.relative_to(self.project_path)
except ValueError:
return {
"success": False,
"error": "文件必须在项目文件夹内",
"output": "",
"return_code": -1
}
print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}")
# 使用检测到的Python命令构建命令
command = f'{self.python_cmd} "{full_path}"'
if args:
command += f" {args}"
# 执行命令
return await self.run_command(command, timeout=timeout)
async def install_package(self, package: str) -> Dict:
"""
安装Python包
Args:
package: 包名
Returns:
安装结果
"""
print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}")
# 使用检测到的Python命令安装
command = f'{self.python_cmd} -m pip install {package}'
result = await self.run_command(command, timeout=120)
if result["success"]:
print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}")
else:
print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}")
return result
async def check_environment(self) -> Dict:
"""检查Python环境"""
print(f"{OUTPUT_FORMATS['info']} 检查Python环境...")
env_info = {
"python_command": self.python_cmd,
"python_version": "",
"pip_version": "",
"installed_packages": [],
"working_directory": str(self.project_path)
}
# 获取Python版本
version_result = await self.run_command(
f'{self.python_cmd} --version',
timeout=5
)
if version_result["success"]:
env_info["python_version"] = version_result["output"].strip()
# 获取pip版本
pip_result = await self.run_command(
f'{self.python_cmd} -m pip --version',
timeout=5
)
if pip_result["success"]:
env_info["pip_version"] = pip_result["output"].strip()
# 获取已安装的包
packages_result = await self.run_command(
f'{self.python_cmd} -m pip list --format=json',
timeout=10
)
if packages_result["success"]:
try:
import json
packages = json.loads(packages_result["output"])
env_info["installed_packages"] = [
f"{p['name']}=={p['version']}" for p in packages
]
except:
pass
return {
"success": True,
"environment": env_info
}
def kill_process(self):
"""终止当前运行的进程"""
if self.process and self.process.returncode is None:
self.process.kill()
print(f"{OUTPUT_FORMATS['warning']} 进程已终止")