# modules/terminal_ops.py - 终端操作模块(修复Python命令检测) import os import sys import asyncio import subprocess import shutil import re from pathlib import Path from typing import Dict, Optional, Tuple try: from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS ) except ImportError: project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS ) class TerminalOperator: def __init__(self, project_path: str): self.project_path = Path(project_path).resolve() self.process = None # 自动检测Python命令,并尝试复用预装虚拟环境 self._python_env: Dict[str, str] = {} self.python_cmd = self._detect_python_runtime() print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}") def _detect_python_runtime(self) -> str: """优先选择预装虚拟环境的 Python,其次回退系统可执行文件。""" preferred = self._detect_preinstalled_python() if preferred: self._python_env = self._build_python_env(preferred) return preferred return self._detect_system_python() def _detect_preinstalled_python(self) -> Optional[str]: candidates = [] env_venv = os.environ.get("AGENT_TOOLBOX_VENV") or os.environ.get("VIRTUAL_ENV") if env_venv: candidates.append(env_venv) candidates.append("/opt/agent-venv") seen = set() for raw in candidates: if not raw or raw in seen: continue seen.add(raw) root = Path(raw).expanduser() bin_dir = root / ("Scripts" if sys.platform == "win32" else "bin") for name in ("python3", "python"): cand = bin_dir / name if cand.exists() and os.access(cand, os.X_OK): return str(cand.resolve()) return None def _detect_system_python(self) -> str: if sys.platform == "win32": commands_to_try = ["python", "py", "python3"] else: commands_to_try = ["python3", "python"] for cmd in commands_to_try: if shutil.which(cmd): try: result = subprocess.run( [cmd, "--version"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: output = result.stdout + result.stderr if "Python 3" in output or "Python 2" not in output: return cmd except Exception: continue return "python" if sys.platform == "win32" else "python3" def _build_python_env(self, python_path: str) -> Dict[str, str]: env: Dict[str, str] = {} try: py_path = Path(python_path).resolve() bin_dir = py_path.parent venv_dir = bin_dir.parent env["VIRTUAL_ENV"] = str(venv_dir) current_path = os.environ.get("PATH", "") prefix = str(bin_dir) if current_path.startswith(prefix + os.pathsep) or current_path == prefix: env["PATH"] = current_path else: env["PATH"] = f"{prefix}{os.pathsep}{current_path}" if current_path else prefix except Exception: pass return env def _validate_command(self, command: str) -> Tuple[bool, str]: """验证命令安全性""" # 检查禁止的命令 for forbidden in FORBIDDEN_COMMANDS: if forbidden in command.lower(): return False, f"禁止执行的命令: {forbidden}" # 检查危险的命令模式 dangerous_patterns = [ "sudo", "chmod 777", "rm -rf", "> /dev/", "fork bomb" ] for pattern in dangerous_patterns: if pattern in command.lower(): return False, f"检测到危险命令模式: {pattern}" return True, "" async def run_command( self, command: str, working_dir: str = None, timeout: int = None ) -> Dict: """ 执行终端命令 Args: command: 要执行的命令 working_dir: 工作目录 timeout: 超时时间(秒) Returns: 执行结果字典 """ # 替换 python/python3,确保命中预装虚拟环境 if re.search(r"\bpython3?\b", command): command = re.sub(r"\bpython3?\b", self.python_cmd, command) # 验证命令 valid, error = self._validate_command(command) if not valid: return { "success": False, "error": error, "output": "", "return_code": -1 } # 设置工作目录 if working_dir: work_path = (self.project_path / working_dir).resolve() # 确保工作目录在项目内 try: work_path.relative_to(self.project_path) except ValueError: return { "success": False, "error": "工作目录必须在项目文件夹内", "output": "", "return_code": -1 } else: work_path = self.project_path timeout = timeout or TERMINAL_COMMAND_TIMEOUT print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}") print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}") try: # 创建进程 env = os.environ.copy() if self._python_env: env.update(self._python_env) process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(work_path), shell=True, env=env ) # 等待执行完成 try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) except asyncio.TimeoutError: process.kill() await process.wait() return { "success": False, "error": f"命令执行超时 ({timeout}秒)", "output": "", "return_code": -1 } # 解码输出 stdout_text = stdout.decode('utf-8', errors='replace') if stdout else "" stderr_text = stderr.decode('utf-8', errors='replace') if stderr else "" output = stdout_text if stderr_text: output += f"\n[错误输出]\n{stderr_text}" success = process.returncode == 0 if success: print(f"{OUTPUT_FORMATS['success']} 命令执行成功") else: print(f"{OUTPUT_FORMATS['error']} 命令执行失败 (返回码: {process.returncode})") return { "success": success, "output": output, "stdout": stdout_text, "stderr": stderr_text, "return_code": process.returncode, "command": command } except Exception as e: return { "success": False, "error": f"执行失败: {str(e)}", "output": "", "return_code": -1 } async def run_python_code( self, code: str, timeout: int = None ) -> Dict: """ 执行Python代码 Args: code: Python代码 timeout: 超时时间(秒) Returns: 执行结果字典 """ timeout = timeout or CODE_EXECUTION_TIMEOUT # 创建临时Python文件 temp_file = self.project_path / ".temp_code.py" try: # 写入代码 with open(temp_file, 'w', encoding='utf-8') as f: f.write(code) print(f"{OUTPUT_FORMATS['code']} 执行Python代码") # 使用检测到的Python命令执行文件 result = await self.run_command( f'{self.python_cmd} "{temp_file}"', timeout=timeout ) # 添加代码到结果 result["code"] = code return result finally: # 清理临时文件 if temp_file.exists(): temp_file.unlink() async def run_python_file( self, file_path: str, args: str = "", timeout: int = None ) -> Dict: """ 执行Python文件 Args: file_path: Python文件路径 args: 命令行参数 timeout: 超时时间(秒) Returns: 执行结果字典 """ # 构建完整路径 full_path = (self.project_path / file_path).resolve() # 验证文件存在 if not full_path.exists(): return { "success": False, "error": "文件不存在", "output": "", "return_code": -1 } # 验证是Python文件 if not full_path.suffix == '.py': return { "success": False, "error": "不是Python文件", "output": "", "return_code": -1 } # 验证文件在项目内 try: full_path.relative_to(self.project_path) except ValueError: return { "success": False, "error": "文件必须在项目文件夹内", "output": "", "return_code": -1 } print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}") # 使用检测到的Python命令构建命令 command = f'{self.python_cmd} "{full_path}"' if args: command += f" {args}" # 执行命令 return await self.run_command(command, timeout=timeout) async def install_package(self, package: str) -> Dict: """ 安装Python包 Args: package: 包名 Returns: 安装结果 """ print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}") # 使用检测到的Python命令安装 command = f'{self.python_cmd} -m pip install {package}' result = await self.run_command(command, timeout=120) if result["success"]: print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}") else: print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}") return result async def check_environment(self) -> Dict: """检查Python环境""" print(f"{OUTPUT_FORMATS['info']} 检查Python环境...") env_info = { "python_command": self.python_cmd, "python_version": "", "pip_version": "", "installed_packages": [], "working_directory": str(self.project_path) } # 获取Python版本 version_result = await self.run_command( f'{self.python_cmd} --version', timeout=5 ) if version_result["success"]: env_info["python_version"] = version_result["output"].strip() # 获取pip版本 pip_result = await self.run_command( f'{self.python_cmd} -m pip --version', timeout=5 ) if pip_result["success"]: env_info["pip_version"] = pip_result["output"].strip() # 获取已安装的包 packages_result = await self.run_command( f'{self.python_cmd} -m pip list --format=json', timeout=10 ) if packages_result["success"]: try: import json packages = json.loads(packages_result["output"]) env_info["installed_packages"] = [ f"{p['name']}=={p['version']}" for p in packages ] except: pass return { "success": True, "environment": env_info } def kill_process(self): """终止当前运行的进程""" if self.process and self.process.returncode is None: self.process.kill() print(f"{OUTPUT_FORMATS['warning']} 进程已终止")