# modules/terminal_ops.py - 终端操作模块(修复Python命令检测) import os import sys import asyncio import subprocess import shutil import time import signal from pathlib import Path from typing import Dict, Optional, Tuple, TYPE_CHECKING try: from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS, MAX_RUN_COMMAND_CHARS, TOOLBOX_TERMINAL_IDLE_SECONDS, ) except ImportError: project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( CODE_EXECUTION_TIMEOUT, TERMINAL_COMMAND_TIMEOUT, FORBIDDEN_COMMANDS, OUTPUT_FORMATS, MAX_RUN_COMMAND_CHARS, TOOLBOX_TERMINAL_IDLE_SECONDS, ) from modules.toolbox_container import ToolboxContainer if TYPE_CHECKING: from modules.user_container_manager import ContainerHandle class TerminalOperator: def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None): self.project_path = Path(project_path).resolve() self.process = None # 自动检测Python命令 self.python_cmd = self._detect_python_command() print(f"{OUTPUT_FORMATS['info']} 检测到Python命令: {self.python_cmd}") self._toolbox: Optional[ToolboxContainer] = None self.container_session: Optional["ContainerHandle"] = container_session def _reset_toolbox(self): """强制关闭并重建工具终端,保证每次命令/脚本运行独立环境。""" if self._toolbox: try: self._toolbox.shutdown() except Exception: pass self._toolbox = None def _detect_python_command(self) -> str: """ 自动检测可用的Python命令 Returns: 可用的Python命令(python、python3、py) """ # 按优先级尝试不同的Python命令 commands_to_try = [] if sys.platform == "win32": # Windows优先顺序 commands_to_try = ["python", "py", "python3"] else: # Unix-like系统优先顺序 commands_to_try = ["python3", "python"] # 检测哪个命令可用 for cmd in commands_to_try: if shutil.which(cmd): try: # 验证是否真的可以运行 result = subprocess.run( [cmd, "--version"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: # 检查版本是否为Python 3 output = result.stdout + result.stderr if "Python 3" in output or "Python 2" not in output: return cmd except: continue # 如果都没找到,根据平台返回默认值 return "python" if sys.platform == "win32" else "python3" def _get_toolbox(self) -> ToolboxContainer: if self._toolbox is None: self._toolbox = ToolboxContainer( project_path=str(self.project_path), idle_timeout=TOOLBOX_TERMINAL_IDLE_SECONDS, container_session=self.container_session, ) return self._toolbox def set_container_session(self, session: Optional["ContainerHandle"]): if session is self.container_session: return self.container_session = session if self._toolbox: self._toolbox.set_container_session(session) def _validate_command(self, command: str) -> Tuple[bool, str]: """验证命令安全性""" # 检查禁止的命令 for forbidden in FORBIDDEN_COMMANDS: if forbidden in command.lower(): return False, f"禁止执行的命令: {forbidden}" # 检查危险的命令模式 dangerous_patterns = [ "sudo", "chmod 777", "rm -rf", "> /dev/", "fork bomb" ] for pattern in dangerous_patterns: if pattern in command.lower(): return False, f"检测到危险命令模式: {pattern}" return True, "" @staticmethod def _clamp_timeout(requested: Optional[int], default: int, max_limit: int) -> int: """对timeout进行默认化与上限夹紧。""" if not requested or requested <= 0: return default return min(int(requested), max_limit) async def run_command( self, command: str, working_dir: str = None, timeout: int = None ) -> Dict: """ 执行终端命令 Args: command: 要执行的命令 working_dir: 工作目录 timeout: 超时时间(秒) Returns: 执行结果字典 """ if timeout is None or timeout <= 0: return { "success": False, "error": "timeout 参数必填且需大于0", "status": "error", "output": "timeout 参数缺失", "return_code": -1 } # 每次执行前重置工具容器(保持隔离),但下面改用一次性子进程执行,仍保留重置以兼容后续逻辑 self._reset_toolbox() # 替换命令中的python3为实际可用的命令 if "python3" in command and self.python_cmd != "python3": command = command.replace("python3", self.python_cmd) elif "python" in command and "python3" not in command and self.python_cmd == "python3": # 如果命令中有python(但不是python3),而系统使用python3 command = command.replace("python", self.python_cmd) # 验证命令 valid, error = self._validate_command(command) if not valid: return { "success": False, "error": error, "output": "", "return_code": -1 } # 设置工作目录 try: work_path = self._resolve_work_path(working_dir) except ValueError: return { "success": False, "error": "工作目录必须在项目文件夹内", "output": "", "return_code": -1 } # 默认10s,上限30s timeout = self._clamp_timeout(timeout, default=10, max_limit=TERMINAL_COMMAND_TIMEOUT) print(f"{OUTPUT_FORMATS['terminal']} 执行命令: {command}") print(f"{OUTPUT_FORMATS['info']} 工作目录: {work_path}") start_ts = time.time() # 改为一次性子进程执行,确保等待到超时或命令结束 result_payload = await self._run_command_subprocess(command, work_path, timeout) # 字符数检查 if result_payload.get("success") and "output" in result_payload: char_count = len(result_payload["output"]) if char_count > MAX_RUN_COMMAND_CHARS: return { "success": False, "error": f"结果内容过大,有{char_count}字符,请使用限制字符数的获取内容方式,根据程度选择10k以内的数", "char_count": char_count, "limit": MAX_RUN_COMMAND_CHARS, "command": command } result_payload.setdefault("status", "completed" if result_payload.get("success") else "error") result_payload["timeout"] = timeout result_payload["elapsed_ms"] = int((time.time() - start_ts) * 1000) return result_payload def _resolve_work_path(self, working_dir: Optional[str]) -> Path: if working_dir: work_path = (self.project_path / working_dir).resolve() work_path.relative_to(self.project_path) return work_path return self.project_path def _format_toolbox_output(self, payload: Dict) -> Dict: success = bool(payload.get("success")) output_text = payload.get("output", "") or "" # 去掉常见的交互式shell警告 noisy_lines = ( "bash: cannot set terminal process group", "bash: no job control in this shell", ) filtered = [] for line in output_text.splitlines(): if any(noise in line for noise in noisy_lines): continue filtered.append(line) output_text = "\n".join(filtered) result = { "success": success, "output": output_text, "return_code": 0 if success else -1 } if not success: result["error"] = payload.get("error") or payload.get("message", "命令执行失败") if "message" in payload: result["message"] = payload["message"] if "status" in payload: result["status"] = payload["status"] if "truncated" in payload: result["truncated"] = payload["truncated"] return result async def _run_command_subprocess(self, command: str, work_path: Path, timeout: int) -> Dict: start_ts = time.time() try: process = None exec_cmd = None use_shell = True # 如果存在容器会话且模式为docker,则在容器内执行 if self.container_session and getattr(self.container_session, "mode", None) == "docker": container_name = getattr(self.container_session, "container_name", None) mount_path = getattr(self.container_session, "mount_path", "/workspace") or "/workspace" docker_bin = shutil.which("docker") or "docker" try: relative = work_path.relative_to(self.project_path).as_posix() except ValueError: relative = "" container_workdir = mount_path.rstrip("/") if relative: container_workdir = f"{container_workdir}/{relative}" exec_cmd = [ docker_bin, "exec", "-w", container_workdir, container_name, "/bin/bash", "-lc", command, ] use_shell = False # 统一环境,确保 Python 输出无缓冲 env = os.environ.copy() env.setdefault("PYTHONUNBUFFERED", "1") if use_shell: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(work_path), shell=True, env=env, start_new_session=True, ) else: process = await asyncio.create_subprocess_exec( *exec_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, start_new_session=True, ) stdout_buf: list[bytes] = [] stderr_buf: list[bytes] = [] async def _read_stream(stream, collector): try: async for chunk in stream: if chunk: collector.append(chunk) except asyncio.CancelledError: pass except Exception: pass stdout_task = asyncio.create_task(_read_stream(process.stdout, stdout_buf)) stderr_task = asyncio.create_task(_read_stream(process.stderr, stderr_buf)) timed_out = False try: await asyncio.wait_for(process.wait(), timeout=timeout) except asyncio.TimeoutError: timed_out = True try: os.killpg(process.pid, signal.SIGINT) except Exception: pass try: await asyncio.wait_for(process.wait(), timeout=2) except asyncio.TimeoutError: try: os.killpg(process.pid, signal.SIGKILL) except Exception: process.kill() await process.wait() # 确保读取协程结束 await asyncio.gather(stdout_task, stderr_task, return_exceptions=True) # 兜底再读一次,防止剩余缓冲未被读取 try: remaining_out = await process.stdout.read() if remaining_out: stdout_buf.append(remaining_out) except Exception: pass try: remaining_err = await process.stderr.read() if remaining_err: stderr_buf.append(remaining_err) except Exception: pass stdout = b"".join(stdout_buf) stderr = b"".join(stderr_buf) stdout_text = stdout.decode('utf-8', errors='replace') if stdout else "" stderr_text = stderr.decode('utf-8', errors='replace') if stderr else "" success = (process.returncode == 0) and not timed_out status = "completed" if success else ("timeout" if timed_out else "error") output_parts = [] if stdout_text: output_parts.append(stdout_text) if stderr_text: output_parts.append(stderr_text) combined_output = "\n".join(output_parts) truncated = False if MAX_RUN_COMMAND_CHARS and len(combined_output) > MAX_RUN_COMMAND_CHARS: truncated = True combined_output = combined_output[-MAX_RUN_COMMAND_CHARS:] response = { "success": success, "status": status, "command": command, "output": combined_output, "return_code": process.returncode, "truncated": truncated, "timeout": timeout, "elapsed_ms": int((time.time() - start_ts) * 1000) } if not success and timed_out: response["message"] = f"命令执行超时 ({timeout}秒)" elif not success and process.returncode is not None: response["message"] = f"命令执行失败 (返回码: {process.returncode})" if stderr_text: response["stderr"] = stderr_text return response except Exception as exc: return { "success": False, "status": "error", "error": f"执行失败: {str(exc)}", "output": "", "return_code": -1, "timeout": timeout, "elapsed_ms": int((time.time() - start_ts) * 1000) } async def run_python_code( self, code: str, timeout: int = None ) -> Dict: """ 执行Python代码 Args: code: Python代码 timeout: 超时时间(秒) Returns: 执行结果字典 """ if timeout is None or timeout <= 0: return { "success": False, "error": "timeout 参数必填且需大于0", "status": "error", "output": "timeout 参数缺失", "return_code": -1 } timeout = self._clamp_timeout(timeout, default=timeout, max_limit=CODE_EXECUTION_TIMEOUT) # 强制重置工具容器,避免上一段代码仍在运行时输出混入 self._reset_toolbox() # 创建临时Python文件 temp_file = self.project_path / ".temp_code.py" try: # 写入代码 with open(temp_file, 'w', encoding='utf-8') as f: f.write(code) print(f"{OUTPUT_FORMATS['code']} 执行Python代码") try: relative_temp = temp_file.relative_to(self.project_path).as_posix() except ValueError: relative_temp = temp_file.as_posix() # 使用检测到的Python命令执行文件(相对路径可兼容容器挂载路径) result = await self.run_command( f'{self.python_cmd} -u "{relative_temp}"', timeout=timeout ) # 添加代码到结果 result["code"] = code return result finally: # 清理临时文件 if temp_file.exists(): temp_file.unlink() async def run_python_file( self, file_path: str, args: str = "", timeout: int = None ) -> Dict: """ 执行Python文件 Args: file_path: Python文件路径 args: 命令行参数 timeout: 超时时间(秒) Returns: 执行结果字典 """ # 构建完整路径 full_path = (self.project_path / file_path).resolve() # 验证文件存在 if not full_path.exists(): return { "success": False, "error": "文件不存在", "output": "", "return_code": -1 } # 验证是Python文件 if not full_path.suffix == '.py': return { "success": False, "error": "不是Python文件", "output": "", "return_code": -1 } # 验证文件在项目内 try: full_path.relative_to(self.project_path) except ValueError: return { "success": False, "error": "文件必须在项目文件夹内", "output": "", "return_code": -1 } print(f"{OUTPUT_FORMATS['code']} 执行Python文件: {file_path}") try: relative_path = full_path.relative_to(self.project_path).as_posix() except ValueError: relative_path = full_path.as_posix() # 使用检测到的Python命令构建命令 command = f'{self.python_cmd} "{relative_path}"' if args: command += f" {args}" # 执行命令 return await self.run_command(command, timeout=timeout) async def install_package(self, package: str) -> Dict: """ 安装Python包 Args: package: 包名 Returns: 安装结果 """ print(f"{OUTPUT_FORMATS['terminal']} 安装包: {package}") # 使用检测到的Python命令安装 command = f'{self.python_cmd} -m pip install {package}' result = await self.run_command(command, timeout=120) if result["success"]: print(f"{OUTPUT_FORMATS['success']} 包安装成功: {package}") else: print(f"{OUTPUT_FORMATS['error']} 包安装失败: {package}") return result async def check_environment(self) -> Dict: """检查Python环境""" print(f"{OUTPUT_FORMATS['info']} 检查Python环境...") env_info = { "python_command": self.python_cmd, "python_version": "", "pip_version": "", "installed_packages": [], "working_directory": str(self.project_path) } # 获取Python版本 version_result = await self.run_command( f'{self.python_cmd} --version', timeout=5 ) if version_result["success"]: env_info["python_version"] = version_result["output"].strip() # 获取pip版本 pip_result = await self.run_command( f'{self.python_cmd} -m pip --version', timeout=5 ) if pip_result["success"]: env_info["pip_version"] = pip_result["output"].strip() # 获取已安装的包 packages_result = await self.run_command( f'{self.python_cmd} -m pip list --format=json', timeout=10 ) if packages_result["success"]: try: import json packages = json.loads(packages_result["output"]) env_info["installed_packages"] = [ f"{p['name']}=={p['version']}" for p in packages ] except: pass return { "success": True, "environment": env_info } def kill_process(self): """终止当前运行的进程""" if self.process and self.process.returncode is None: self.process.kill() print(f"{OUTPUT_FORMATS['warning']} 进程已终止")