# modules/terminal_ops.py - 终端操作模块(修复Python命令检测) import os import sys import asyncio import subprocess import shutil from pathlib import Path from typing import Dict, Optional, Tuple, TYPE_CHECKING try: from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS, MAX_RUN_COMMAND_CHARS, TOOLBOX_TERMINAL_IDLE_SECONDS, ) except ImportError: project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS, MAX_RUN_COMMAND_CHARS, TOOLBOX_TERMINAL_IDLE_SECONDS, ) from modules.toolbox_container import ToolboxContainer if TYPE_CHECKING: from modules.user_container_manager import ContainerHandle class TerminalOperator: def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None): self.project_path = Path(project_path).resolve() self.process = None # 自动检测Python命令 self.python_cmd = self._detect_python_command() print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}") self._toolbox: Optional[ToolboxContainer] = None self.container_session: Optional["ContainerHandle"] = container_session def _reset_toolbox(self): """强制关闭并重建工具终端,保证每次命令/脚本运行独立环境。""" if self._toolbox: try: self._toolbox.shutdown() except Exception: pass self._toolbox = None def _detect_python_command(self) -> str: """ 自动检测可用的Python命令 Returns: 可用的Python命令(python、python3、py) """ # 按优先级尝试不同的Python命令 commands_to_try = [] if sys.platform == "win32": # Windows优先顺序 commands_to_try = ["python", "py", "python3"] else: # Unix-like系统优先顺序 commands_to_try = ["python3", "python"] # 检测哪个命令可用 for cmd in commands_to_try: if shutil.which(cmd): try: # 验证是否真的可以运行 result = subprocess.run( [cmd, "--version"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: # 检查版本是否为Python 3 output = result.stdout + result.stderr if "Python 3" in output or "Python 2" not in output: return cmd except: continue # 如果都没找到,根据平台返回默认值 return "python" if sys.platform == "win32" else "python3" def _get_toolbox(self) -> ToolboxContainer: if self._toolbox is None: self._toolbox = ToolboxContainer( project_path=str(self.project_path), idle_timeout=TOOLBOX_TERMINAL_IDLE_SECONDS, container_session=self.container_session, ) return self._toolbox def set_container_session(self, session: Optional["ContainerHandle"]): if session is self.container_session: return self.container_session = session if self._toolbox: self._toolbox.set_container_session(session) def _validate_command(self, command: str) -> Tuple[bool, str]: """验证命令安全性""" # 检查禁止的命令 for forbidden in FORBIDDEN_COMMANDS: if forbidden in command.lower(): return False, f"禁止执行的命令: {forbidden}" # 检查危险的命令模式 dangerous_patterns = [ "sudo", "chmod 777", "rm -rf", "> /dev/", "fork bomb" ] for pattern in dangerous_patterns: if pattern in command.lower(): return False, f"检测到危险命令模式: {pattern}" return True, "" async def run_command( self, command: str, working_dir: str = None, timeout: int = None ) -> Dict: """ 执行终端命令 Args: command: 要执行的命令 working_dir: 工作目录 timeout: 超时时间(秒) Returns: 执行结果字典 """ # 每次执行前重置工具容器,防止上一条命令的输出/状态干扰 self._reset_toolbox() # 替换命令中的python3为实际可用的命令 if "python3" in command and self.python_cmd != "python3": command = command.replace("python3", self.python_cmd) elif "python" in command and "python3" not in command and self.python_cmd == "python3": # 如果命令中有python(但不是python3),而系统使用python3 command = command.replace("python", self.python_cmd) # 验证命令 valid, error = self._validate_command(command) if not valid: return { "success": False, "error": error, "output": "", "return_code": -1 } # 设置工作目录 try: work_path = self._resolve_work_path(working_dir) except ValueError: return { "success": False, "error": "工作目录必须在项目文件夹内", "output": "", "return_code": -1 } timeout = timeout or TERMINAL_COMMAND_TIMEOUT print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}") print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}") try: toolbox_raw = await self._get_toolbox().run(command, work_path, timeout=timeout) result_payload = self._format_toolbox_output(toolbox_raw) except Exception as exc: print(f"{OUTPUT_FORMATS['warning']} 工具容器执行失败,回退到本地子进程: {exc}") result_payload = await self._run_command_subprocess(command, work_path, timeout) # 字符数检查 if result_payload.get("success") and "output" in result_payload: char_count = len(result_payload["output"]) if char_count > MAX_RUN_COMMAND_CHARS: return { "success": False, "error": f"结果内容过大,有{char_count}字符,请使用限制字符数的获取内容方式,根据程度选择10k以内的数", "char_count": char_count, "limit": MAX_RUN_COMMAND_CHARS, "command": command } return result_payload def _resolve_work_path(self, working_dir: Optional[str]) -> Path: if working_dir: work_path = (self.project_path / working_dir).resolve() work_path.relative_to(self.project_path) return work_path return self.project_path def _format_toolbox_output(self, payload: Dict) -> Dict: success = bool(payload.get("success")) output_text = payload.get("output", "") or "" result = { "success": success, "output": output_text, "return_code": 0 if success else -1 } if not success: result["error"] = payload.get("error") or payload.get("message", "命令执行失败") if "message" in payload: result["message"] = payload["message"] if "status" in payload: result["status"] = payload["status"] if "truncated" in payload: result["truncated"] = payload["truncated"] return result async def _run_command_subprocess(self, command: str, work_path: Path, timeout: int) -> Dict: try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(work_path), shell=True ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) except asyncio.TimeoutError: process.kill() await process.wait() return { "success": False, "error": f"命令执行超时 ({timeout}秒)", "output": "", "return_code": -1 } stdout_text = stdout.decode('utf-8', errors='replace') if stdout else "" stderr_text = stderr.decode('utf-8', errors='replace') if stderr else "" success = process.returncode == 0 if success: print(f"{OUTPUT_FORMATS['success']} 命令执行成功") else: print(f"{OUTPUT_FORMATS['error']} 命令执行失败 (返回码: {process.returncode})") output_parts = [] if stdout_text: output_parts.append(stdout_text) if stderr_text: output_parts.append(f"[stderr]\n{stderr_text}") combined_output = "\n".join(output_parts) truncated = False if MAX_RUN_COMMAND_CHARS and len(combined_output) > MAX_RUN_COMMAND_CHARS: truncated = True combined_output = combined_output[-MAX_RUN_COMMAND_CHARS:] response = { "success": success, "command": command, "output": combined_output, "return_code": process.returncode, "truncated": truncated } if stderr_text: response["stderr"] = stderr_text return response except Exception as exc: return { "success": False, "error": f"执行失败: {str(exc)}", "output": "", "return_code": -1 } async def run_python_code( self, code: str, timeout: int = None ) -> Dict: """ 执行Python代码 Args: code: Python代码 timeout: 超时时间(秒) Returns: 执行结果字典 """ timeout = timeout or CODE_EXECUTION_TIMEOUT # 强制重置工具容器,避免上一段代码仍在运行时输出混入 self._reset_toolbox() # 创建临时Python文件 temp_file = self.project_path / ".temp_code.py" try: # 写入代码 with open(temp_file, 'w', encoding='utf-8') as f: f.write(code) print(f"{OUTPUT_FORMATS['code']} 执行Python代码") try: relative_temp = temp_file.relative_to(self.project_path).as_posix() except ValueError: relative_temp = temp_file.as_posix() # 使用检测到的Python命令执行文件(相对路径可兼容容器挂载路径) result = await self.run_command( f'{self.python_cmd} "{relative_temp}"', timeout=timeout ) # 添加代码到结果 result["code"] = code return result finally: # 清理临时文件 if temp_file.exists(): temp_file.unlink() async def run_python_file( self, file_path: str, args: str = "", timeout: int = None ) -> Dict: """ 执行Python文件 Args: file_path: Python文件路径 args: 命令行参数 timeout: 超时时间(秒) Returns: 执行结果字典 """ # 构建完整路径 full_path = (self.project_path / file_path).resolve() # 验证文件存在 if not full_path.exists(): return { "success": False, "error": "文件不存在", "output": "", "return_code": -1 } # 验证是Python文件 if not full_path.suffix == '.py': return { "success": False, "error": "不是Python文件", "output": "", "return_code": -1 } # 验证文件在项目内 try: full_path.relative_to(self.project_path) except ValueError: return { "success": False, "error": "文件必须在项目文件夹内", "output": "", "return_code": -1 } print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}") try: relative_path = full_path.relative_to(self.project_path).as_posix() except ValueError: relative_path = full_path.as_posix() # 使用检测到的Python命令构建命令 command = f'{self.python_cmd} "{relative_path}"' if args: command += f" {args}" # 执行命令 return await self.run_command(command, timeout=timeout) async def install_package(self, package: str) -> Dict: """ 安装Python包 Args: package: 包名 Returns: 安装结果 """ print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}") # 使用检测到的Python命令安装 command = f'{self.python_cmd} -m pip install {package}' result = await self.run_command(command, timeout=120) if result["success"]: print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}") else: print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}") return result async def check_environment(self) -> Dict: """检查Python环境""" print(f"{OUTPUT_FORMATS['info']} 检查Python环境...") env_info = { "python_command": self.python_cmd, "python_version": "", "pip_version": "", "installed_packages": [], "working_directory": str(self.project_path) } # 获取Python版本 version_result = await self.run_command( f'{self.python_cmd} --version', timeout=5 ) if version_result["success"]: env_info["python_version"] = version_result["output"].strip() # 获取pip版本 pip_result = await self.run_command( f'{self.python_cmd} -m pip --version', timeout=5 ) if pip_result["success"]: env_info["pip_version"] = pip_result["output"].strip() # 获取已安装的包 packages_result = await self.run_command( f'{self.python_cmd} -m pip list --format=json', timeout=10 ) if packages_result["success"]: try: import json packages = json.loads(packages_result["output"]) env_info["installed_packages"] = [ f"{p['name']}=={p['version']}" for p in packages ] except: pass return { "success": True, "environment": env_info } def kill_process(self): """终止当前运行的进程""" if self.process and self.process.returncode is None: self.process.kill() print(f"{OUTPUT_FORMATS['warning']} 进程已终止")