agent-Specialization/modules/terminal_ops.py

462 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/terminal_ops.py - 终端操作模块修复Python命令检测
import os
import sys
import asyncio
import subprocess
import shutil
from pathlib import Path
from typing import Dict, Optional, Tuple, TYPE_CHECKING
try:
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
except ImportError:
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
CODE_EXECUTION_TIMEOUT,
TERMINAL_COMMAND_TIMEOUT,
FORBIDDEN_COMMANDS,
OUTPUT_FORMATS,
MAX_RUN_COMMAND_CHARS,
TOOLBOX_TERMINAL_IDLE_SECONDS,
)
from modules.toolbox_container import ToolboxContainer
if TYPE_CHECKING:
from modules.user_container_manager import ContainerHandle
class TerminalOperator:
def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None):
self.project_path = Path(project_path).resolve()
self.process = None
# 自动检测Python命令
self.python_cmd = self._detect_python_command()
print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}")
self._toolbox: Optional[ToolboxContainer] = None
self.container_session: Optional["ContainerHandle"] = container_session
def _detect_python_command(self) -> str:
"""
自动检测可用的Python命令
Returns:
可用的Python命令python、python3、py
"""
# 按优先级尝试不同的Python命令
commands_to_try = []
if sys.platform == "win32":
# Windows优先顺序
commands_to_try = ["python", "py", "python3"]
else:
# Unix-like系统优先顺序
commands_to_try = ["python3", "python"]
# 检测哪个命令可用
for cmd in commands_to_try:
if shutil.which(cmd):
try:
# 验证是否真的可以运行
result = subprocess.run(
[cmd, "--version"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
# 检查版本是否为Python 3
output = result.stdout + result.stderr
if "Python 3" in output or "Python 2" not in output:
return cmd
except:
continue
# 如果都没找到,根据平台返回默认值
return "python" if sys.platform == "win32" else "python3"
def _get_toolbox(self) -> ToolboxContainer:
if self._toolbox is None:
self._toolbox = ToolboxContainer(
project_path=str(self.project_path),
idle_timeout=TOOLBOX_TERMINAL_IDLE_SECONDS,
container_session=self.container_session,
)
return self._toolbox
def set_container_session(self, session: Optional["ContainerHandle"]):
if session is self.container_session:
return
self.container_session = session
if self._toolbox:
self._toolbox.set_container_session(session)
def _validate_command(self, command: str) -> Tuple[bool, str]:
"""验证命令安全性"""
# 检查禁止的命令
for forbidden in FORBIDDEN_COMMANDS:
if forbidden in command.lower():
return False, f"禁止执行的命令: {forbidden}"
# 检查危险的命令模式
dangerous_patterns = [
"sudo",
"chmod 777",
"rm -rf",
"> /dev/",
"fork bomb"
]
for pattern in dangerous_patterns:
if pattern in command.lower():
return False, f"检测到危险命令模式: {pattern}"
return True, ""
async def run_command(
self,
command: str,
working_dir: str = None,
timeout: int = None
) -> Dict:
"""
执行终端命令
Args:
command: 要执行的命令
working_dir: 工作目录
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 替换命令中的python3为实际可用的命令
if "python3" in command and self.python_cmd != "python3":
command = command.replace("python3", self.python_cmd)
elif "python" in command and "python3" not in command and self.python_cmd == "python3":
# 如果命令中有python但不是python3而系统使用python3
command = command.replace("python", self.python_cmd)
# 验证命令
valid, error = self._validate_command(command)
if not valid:
return {
"success": False,
"error": error,
"output": "",
"return_code": -1
}
# 设置工作目录
try:
work_path = self._resolve_work_path(working_dir)
except ValueError:
return {
"success": False,
"error": "工作目录必须在项目文件夹内",
"output": "",
"return_code": -1
}
timeout = timeout or TERMINAL_COMMAND_TIMEOUT
print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}")
print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}")
try:
toolbox_raw = await self._get_toolbox().run(command, work_path, timeout=timeout)
result_payload = self._format_toolbox_output(toolbox_raw)
except Exception as exc:
print(f"{OUTPUT_FORMATS['warning']} 工具容器执行失败,回退到本地子进程: {exc}")
result_payload = await self._run_command_subprocess(command, work_path, timeout)
# 字符数检查
if result_payload.get("success") and "output" in result_payload:
char_count = len(result_payload["output"])
if char_count > MAX_RUN_COMMAND_CHARS:
return {
"success": False,
"error": f"结果内容过大,有{char_count}字符请使用限制字符数的获取内容方式根据程度选择10k以内的数",
"char_count": char_count,
"limit": MAX_RUN_COMMAND_CHARS,
"command": command
}
return result_payload
def _resolve_work_path(self, working_dir: Optional[str]) -> Path:
if working_dir:
work_path = (self.project_path / working_dir).resolve()
work_path.relative_to(self.project_path)
return work_path
return self.project_path
def _format_toolbox_output(self, payload: Dict) -> Dict:
success = bool(payload.get("success"))
output_text = payload.get("output", "") or ""
result = {
"success": success,
"output": output_text,
"return_code": 0 if success else -1
}
if not success:
result["error"] = payload.get("error") or payload.get("message", "命令执行失败")
if "message" in payload:
result["message"] = payload["message"]
if "status" in payload:
result["status"] = payload["status"]
if "truncated" in payload:
result["truncated"] = payload["truncated"]
return result
async def _run_command_subprocess(self, command: str, work_path: Path, timeout: int) -> Dict:
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(work_path),
shell=True
)
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
"success": False,
"error": f"命令执行超时 ({timeout}秒)",
"output": "",
"return_code": -1
}
stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""
success = process.returncode == 0
if success:
print(f"{OUTPUT_FORMATS['success']} 命令执行成功")
else:
print(f"{OUTPUT_FORMATS['error']} 命令执行失败 (返回码: {process.returncode})")
output_parts = []
if stdout_text:
output_parts.append(stdout_text)
if stderr_text:
output_parts.append(f"[stderr]\n{stderr_text}")
combined_output = "\n".join(output_parts)
truncated = False
if MAX_RUN_COMMAND_CHARS and len(combined_output) > MAX_RUN_COMMAND_CHARS:
truncated = True
combined_output = combined_output[-MAX_RUN_COMMAND_CHARS:]
response = {
"success": success,
"command": command,
"output": combined_output,
"return_code": process.returncode,
"truncated": truncated
}
if stderr_text:
response["stderr"] = stderr_text
return response
except Exception as exc:
return {
"success": False,
"error": f"执行失败: {str(exc)}",
"output": "",
"return_code": -1
}
async def run_python_code(
self,
code: str,
timeout: int = None
) -> Dict:
"""
执行Python代码
Args:
code: Python代码
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
timeout = timeout or CODE_EXECUTION_TIMEOUT
# 创建临时Python文件
temp_file = self.project_path / ".temp_code.py"
try:
# 写入代码
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(code)
print(f"{OUTPUT_FORMATS['code']} 执行Python代码")
# 使用检测到的Python命令执行文件
result = await self.run_command(
f'{self.python_cmd} "{temp_file}"',
timeout=timeout
)
# 添加代码到结果
result["code"] = code
return result
finally:
# 清理临时文件
if temp_file.exists():
temp_file.unlink()
async def run_python_file(
self,
file_path: str,
args: str = "",
timeout: int = None
) -> Dict:
"""
执行Python文件
Args:
file_path: Python文件路径
args: 命令行参数
timeout: 超时时间(秒)
Returns:
执行结果字典
"""
# 构建完整路径
full_path = (self.project_path / file_path).resolve()
# 验证文件存在
if not full_path.exists():
return {
"success": False,
"error": "文件不存在",
"output": "",
"return_code": -1
}
# 验证是Python文件
if not full_path.suffix == '.py':
return {
"success": False,
"error": "不是Python文件",
"output": "",
"return_code": -1
}
# 验证文件在项目内
try:
full_path.relative_to(self.project_path)
except ValueError:
return {
"success": False,
"error": "文件必须在项目文件夹内",
"output": "",
"return_code": -1
}
print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}")
# 使用检测到的Python命令构建命令
command = f'{self.python_cmd} "{full_path}"'
if args:
command += f" {args}"
# 执行命令
return await self.run_command(command, timeout=timeout)
async def install_package(self, package: str) -> Dict:
"""
安装Python包
Args:
package: 包名
Returns:
安装结果
"""
print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}")
# 使用检测到的Python命令安装
command = f'{self.python_cmd} -m pip install {package}'
result = await self.run_command(command, timeout=120)
if result["success"]:
print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}")
else:
print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}")
return result
async def check_environment(self) -> Dict:
"""检查Python环境"""
print(f"{OUTPUT_FORMATS['info']} 检查Python环境...")
env_info = {
"python_command": self.python_cmd,
"python_version": "",
"pip_version": "",
"installed_packages": [],
"working_directory": str(self.project_path)
}
# 获取Python版本
version_result = await self.run_command(
f'{self.python_cmd} --version',
timeout=5
)
if version_result["success"]:
env_info["python_version"] = version_result["output"].strip()
# 获取pip版本
pip_result = await self.run_command(
f'{self.python_cmd} -m pip --version',
timeout=5
)
if pip_result["success"]:
env_info["pip_version"] = pip_result["output"].strip()
# 获取已安装的包
packages_result = await self.run_command(
f'{self.python_cmd} -m pip list --format=json',
timeout=10
)
if packages_result["success"]:
try:
import json
packages = json.loads(packages_result["output"])
env_info["installed_packages"] = [
f"{p['name']}=={p['version']}" for p in packages
]
except:
pass
return {
"success": True,
"environment": env_info
}
def kill_process(self):
"""终止当前运行的进程"""
if self.process and self.process.returncode is None:
self.process.kill()
print(f"{OUTPUT_FORMATS['warning']} 进程已终止")