# modules/persistent_terminal.py - 持久化终端实例(修复版) import asyncio import subprocess import os import sys import time import signal from pathlib import Path from typing import Optional, Callable, Dict, List, Tuple from datetime import datetime import threading import queue from collections import deque import shutil import uuid try: from config import ( OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS, TERMINAL_SANDBOX_MODE, TERMINAL_SANDBOX_IMAGE, TERMINAL_SANDBOX_MOUNT_PATH, TERMINAL_SANDBOX_SHELL, TERMINAL_SANDBOX_NETWORK, TERMINAL_SANDBOX_CPUS, TERMINAL_SANDBOX_MEMORY, TERMINAL_SANDBOX_BINDS, TERMINAL_SANDBOX_BIN, TERMINAL_SANDBOX_NAME_PREFIX, TERMINAL_SANDBOX_ENV, TERMINAL_SANDBOX_REQUIRE, ) except ImportError: import sys from pathlib import Path project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS, TERMINAL_SANDBOX_MODE, TERMINAL_SANDBOX_IMAGE, TERMINAL_SANDBOX_MOUNT_PATH, TERMINAL_SANDBOX_SHELL, TERMINAL_SANDBOX_NETWORK, TERMINAL_SANDBOX_CPUS, TERMINAL_SANDBOX_MEMORY, TERMINAL_SANDBOX_BINDS, TERMINAL_SANDBOX_BIN, TERMINAL_SANDBOX_NAME_PREFIX, TERMINAL_SANDBOX_ENV, TERMINAL_SANDBOX_REQUIRE, ) class PersistentTerminal: """单个持久化终端实例""" def __init__( self, session_name: str, working_dir: str = None, shell_command: str = None, broadcast_callback: Callable = None, max_buffer_size: int = 20000, display_size: int = 5000, project_path: Optional[str] = None, sandbox_mode: Optional[str] = None, sandbox_options: Optional[Dict] = None, ): """ 初始化持久化终端 Args: session_name: 会话名称 working_dir: 工作目录 shell_command: shell命令(None则自动选择) broadcast_callback: 广播回调函数(用于WebSocket) max_buffer_size: 最大缓冲区大小 display_size: 显示大小限制 """ self.session_name = session_name self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd() self.project_path = Path(project_path).resolve() if project_path else self.working_dir self.host_shell_command = shell_command self.shell_command = shell_command self.broadcast = broadcast_callback self.max_buffer_size = max_buffer_size self.display_size = display_size # 进程相关 self.process = None self.is_running = False self.start_time = None # 输出缓冲 self.output_buffer = [] self.command_history = [] self.total_output_size = 0 self.truncated_lines = 0 self.output_history = deque() self._output_event_counter = 0 self.last_output_time = None self.last_input_time = None self.last_input_text = "" self.echo_loop_detected = False self._consecutive_echo_matches = 0 self.io_history = deque() self._io_history_max = 4000 # 线程和队列 self.output_queue = queue.Queue() self.reader_thread = None self.is_reading = False # 状态标志 self.is_interactive = False # 是否在等待输入 self.last_command = "" self.last_activity = time.time() # 系统特定设置 self.is_windows = sys.platform == "win32" sandbox_defaults = { "image": TERMINAL_SANDBOX_IMAGE, "mount_path": TERMINAL_SANDBOX_MOUNT_PATH, "shell": TERMINAL_SANDBOX_SHELL, "network": TERMINAL_SANDBOX_NETWORK, "cpus": TERMINAL_SANDBOX_CPUS, "memory": TERMINAL_SANDBOX_MEMORY, "binds": list(TERMINAL_SANDBOX_BINDS), "bin": TERMINAL_SANDBOX_BIN, "name_prefix": TERMINAL_SANDBOX_NAME_PREFIX, "env": dict(TERMINAL_SANDBOX_ENV), "require": TERMINAL_SANDBOX_REQUIRE, } if sandbox_options: for key, value in sandbox_options.items(): if key == "binds" and isinstance(value, list): sandbox_defaults[key] = list(value) elif key == "env" and isinstance(value, dict): sandbox_defaults[key] = dict(value) else: sandbox_defaults[key] = value self.sandbox_mode = (sandbox_mode or TERMINAL_SANDBOX_MODE or "host").lower() self.sandbox_options = sandbox_defaults self.sandbox_required = bool(self.sandbox_options.get("require")) self.sandbox_container_name = None self.execution_mode = "host" self.using_container = False self._sandbox_bin_path = None self._owns_container = False def start(self) -> bool: """启动终端进程(支持容器沙箱)""" if self.is_running: return False try: process = None selected_mode = self.sandbox_mode if selected_mode == "docker": try: process = self._start_docker_terminal() except Exception as exc: message = f"容器终端启动失败: {exc}" if self.sandbox_required: print(f"{OUTPUT_FORMATS['error']} {message}") return False print(f"{OUTPUT_FORMATS['warning']} {message},回退到宿主机终端。") process = None selected_mode = "host" if process is None: process = self._start_host_terminal() selected_mode = "host" if not process: return False self.process = process self.is_running = True self.execution_mode = "docker" if self.using_container else "host" self.start_time = datetime.now() self.last_output_time = None self.last_input_time = None self.last_input_text = "" self.echo_loop_detected = False self._consecutive_echo_matches = 0 # 启动输出读取线程 self.is_reading = True self.reader_thread = threading.Thread(target=self._read_output) self.reader_thread.daemon = True self.reader_thread.start() # 宿主机Windows初始化 if self.is_windows and not self.using_container: time.sleep(0.5) self.send_command("chcp 65001", timeout=1) time.sleep(0.5) self.send_command("cls", timeout=1) time.sleep(0.3) self.output_buffer.clear() self.total_output_size = 0 # 广播终端启动事件 if self.broadcast: self.broadcast('terminal_started', { 'session': self.session_name, 'working_dir': str(self.working_dir), 'shell': self.shell_command, 'mode': self.execution_mode, 'time': self.start_time.isoformat() }) mode_label = "容器" if self.using_container else "宿主机" print(f"{OUTPUT_FORMATS['success']} 终端会话启动({mode_label}): {self.session_name}") return True except Exception as e: print(f"{OUTPUT_FORMATS['error']} 终端启动失败: {e}") self.is_running = False if self.using_container: self._stop_sandbox_container(force=True) return False def _start_host_terminal(self): """启动宿主机终端""" self.using_container = False self._owns_container = False self.is_windows = sys.platform == "win32" shell_cmd = self.host_shell_command if self.is_windows: shell_cmd = shell_cmd or "cmd.exe" else: shell_cmd = shell_cmd or os.environ.get('SHELL', '/bin/bash') self.shell_command = shell_cmd env = os.environ.copy() env['PYTHONIOENCODING'] = 'utf-8' try: if self.is_windows: env['CHCP'] = '65001' process = subprocess.Popen( shell_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=str(self.working_dir), shell=False, bufsize=0, env=env ) else: env['TERM'] = 'xterm-256color' env['LANG'] = 'en_US.UTF-8' env['LC_ALL'] = 'en_US.UTF-8' process = subprocess.Popen( shell_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=str(self.working_dir), shell=False, bufsize=0, env=env ) return process except FileNotFoundError: print(f"{OUTPUT_FORMATS['error']} 无法找到终端程序: {shell_cmd}") return None def _start_docker_terminal(self): """启动或连接容器化终端。""" docker_bin = self.sandbox_options.get("bin") or "docker" docker_path = shutil.which(docker_bin) if not docker_path: message = f"未找到容器运行时: {docker_bin}" if self.sandbox_required: raise RuntimeError(message) print(f"{OUTPUT_FORMATS['warning']} {message}") return None self._sandbox_bin_path = docker_path target_container = self.sandbox_options.get("container_name") if target_container: return self._start_existing_container_terminal(docker_path, target_container) return self._start_new_container_terminal(docker_path) def _start_existing_container_terminal(self, docker_path: str, container_name: str): """通过 docker exec 连接到已有容器。""" if not self._ensure_container_alive(docker_path, container_name): raise RuntimeError(f"目标容器未运行: {container_name}") mount_path = self.sandbox_options.get("mount_path") or "/workspace" container_workdir = self._resolve_container_workdir(mount_path) shell_path = self.sandbox_options.get("shell") or "/bin/bash" cmd = [ docker_path, "exec", "-i", ] if container_workdir: cmd += ["-w", container_workdir] envs = { "PYTHONIOENCODING": "utf-8", "TERM": "xterm-256color", } for key, value in (self.sandbox_options.get("env") or {}).items(): if value is not None: envs[key] = value for key, value in envs.items(): cmd += ["-e", f"{key}={value}"] cmd.append(container_name) cmd.append(shell_path) if shell_path.endswith("sh"): cmd.append("-i") env = os.environ.copy() process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, env=env ) self.sandbox_container_name = container_name self.shell_command = f"{shell_path} (attach:{container_name})" self.using_container = True self.is_windows = False self._owns_container = False return process def _start_new_container_terminal(self, docker_path: str): """启动全新的容器终端。""" image = self.sandbox_options.get("image") if not image: raise RuntimeError("TERMINAL_SANDBOX_IMAGE 未配置") mount_path = self.sandbox_options.get("mount_path") or "/workspace" working_dir = str(self.working_dir) if not self.working_dir.exists(): self.working_dir.mkdir(parents=True, exist_ok=True) container_name = f"{self.sandbox_options.get('name_prefix', 'agent-term')}-{uuid.uuid4().hex[:10]}" cmd = [ docker_path, "run", "--rm", "-i", "--name", container_name, "-w", mount_path, "-v", f"{working_dir}:{mount_path}", ] network = self.sandbox_options.get("network") if network: cmd += ["--network", network] cpus = self.sandbox_options.get("cpus") if cpus: cmd += ["--cpus", cpus] memory = self.sandbox_options.get("memory") if memory: cmd += ["--memory", memory] for bind in self.sandbox_options.get("binds", []): bind = bind.strip() if bind: cmd += ["-v", bind] envs = { "PYTHONIOENCODING": "utf-8", "TERM": "xterm-256color", } for key, value in (self.sandbox_options.get("env") or {}).items(): if value is not None: envs[key] = value for key, value in envs.items(): cmd += ["-e", f"{key}={value}"] cmd.append(image) shell_path = self.sandbox_options.get("shell") or "/bin/bash" if shell_path: cmd.append(shell_path) if shell_path.endswith("sh"): cmd.append("-i") env = os.environ.copy() try: process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, env=env ) except FileNotFoundError: message = f"无法执行容器运行时: {docker_path}" if self.sandbox_required: raise RuntimeError(message) print(f"{OUTPUT_FORMATS['warning']} {message}") return None self.sandbox_container_name = container_name self.shell_command = f"{shell_path} (sandbox:{image})" self.using_container = True self.is_windows = False self._owns_container = True return process def _resolve_container_workdir(self, mount_path: str) -> str: """推导容器内工作目录路径。""" mount_path = (mount_path or "/workspace").rstrip("/") or "/workspace" try: relative = self.working_dir.relative_to(self.project_path) if str(relative) == ".": return mount_path return f"{mount_path}/{relative.as_posix()}" except Exception: return mount_path def _ensure_container_alive(self, docker_path: str, container_name: str) -> bool: """确认目标容器正在运行。""" try: result = subprocess.run( [ docker_path, "inspect", "-f", "{{.State.Running}}", container_name, ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, timeout=3, check=False, ) except (OSError, subprocess.SubprocessError): return False return result.returncode == 0 and result.stdout.strip().lower() == "true" def _read_output(self): """后台线程:持续读取输出(修复版,正确处理编码)""" while self.is_reading and self.process: try: # 始终读取字节(因为我们没有使用text=True) line_bytes = self.process.stdout.readline() if line_bytes: # 解码字节到字符串 line = self._decode_output(line_bytes) # 处理输出 self.output_queue.put(line) self._process_output(line) elif self.process.poll() is not None: # 进程已结束 self.is_running = False break else: # 没有输出,短暂休眠 time.sleep(0.01) except Exception as e: # 不要因为单个错误而停止 print(f"[Terminal] 读取输出警告: {e}") time.sleep(0.01) continue def _decode_output(self, data): """安全地解码输出""" # 如果已经是字符串,直接返回 if isinstance(data, str): return data # 如果是字节,尝试解码 if isinstance(data, bytes): # Windows系统尝试的编码顺序 if self.is_windows: encodings = ['utf-8', 'gbk', 'gb2312', 'cp936', 'latin-1'] else: encodings = ['utf-8', 'latin-1'] for encoding in encodings: try: return data.decode(encoding) except (UnicodeDecodeError, AttributeError): continue # 如果所有编码都失败,使用替换模式 return data.decode('utf-8', errors='replace') # 其他类型,转换为字符串 return str(data) def _process_output(self, output: str): """处理输出行""" now = time.time() noisy_markers = ( "bash: cannot set terminal process group", "bash: no job control in this shell", ) for line in output.splitlines(keepends=True): if any(noise in line for noise in noisy_markers): continue self.output_buffer.append(line) self.total_output_size += len(line) now = time.time() self.last_output_time = now # 记录输出事件 self._output_event_counter += 1 self.output_history.append((self._output_event_counter, now, line)) self._append_io_event('output', line, timestamp=now) # 控制输出历史长度 if len(self.output_history) > 2000: self.output_history.popleft() # 检查是否需要截断 if self.total_output_size > self.max_buffer_size: self._truncate_buffer() # 更新活动时间 self.last_activity = now # 检测命令回显死循环 cleaned_output = output.replace('\r', '').strip() cleaned_input = self.last_input_text.strip() if self.last_input_text else "" if cleaned_output and cleaned_input and cleaned_output == cleaned_input: self._consecutive_echo_matches += 1 else: self._consecutive_echo_matches = 0 if cleaned_output: self.echo_loop_detected = False if self._consecutive_echo_matches >= 1 and self.last_input_time: if now - self.last_input_time <= 2: self.echo_loop_detected = True # 检测交互式提示 self._detect_interactive_prompt(output) # 广播输出 if self.broadcast: self.broadcast('terminal_output', { 'session': self.session_name, 'data': output, 'timestamp': time.time() }) def _truncate_buffer(self): """截断缓冲区以保持在限制内""" # 保留最后的N个字符 while self.total_output_size > self.max_buffer_size and self.output_buffer: removed = self.output_buffer.pop(0) self.total_output_size -= len(removed) self.truncated_lines += 1 if self.output_history: self.output_history.popleft() def _detect_interactive_prompt(self, output: str): """检测是否在等待交互输入""" self.is_interactive = False # 常见的交互提示模式 interactive_patterns = [ "? ", # 问题提示 ": ", # 输入提示 "> ", # 命令提示 "$ ", # shell提示 "# ", # root提示 ">>> ", # Python提示 "... ", # Python续行 "(y/n)", # 确认提示 "[Y/n]", # 确认提示 "Password:", # 密码提示 "password:", # 密码提示 "Enter", # 输入提示 "选择", # 中文选择 "请输入", # 中文输入 ] output_lower = output.lower().strip() for pattern in interactive_patterns: if pattern.lower() in output_lower: self.is_interactive = True return # 如果输出以常见提示符结尾且没有换行,也认为是交互式 if output and not output.endswith('\n'): last_chars = output.strip()[-3:] if last_chars in ['> ', '$ ', '# ', ': ']: self.is_interactive = True def _capture_history_marker(self) -> int: return self._output_event_counter def _get_output_since_marker(self, marker: int) -> str: if marker is None: return ''.join(item[2] for item in self.output_history) return ''.join(item[2] for item in self.output_history if item[0] > marker) def _append_io_event(self, event_type: str, data: str, timestamp: Optional[float] = None): """记录终端输入输出事件""" if timestamp is None: timestamp = time.time() self.io_history.append((event_type, timestamp, data)) while len(self.io_history) > self._io_history_max: self.io_history.popleft() def _seconds_since_last_output(self) -> Optional[float]: if not self.last_output_time: return None return round(time.time() - self.last_output_time, 3) def send_command( self, command: str, timeout: float = None, timeout_cutoff: float = None, enforce_full_timeout: bool = False, sentinel: str = None, ) -> Dict: """ 发送命令到终端(统一编码处理)。 Args: command: 要执行的命令文本 timeout: 等待输出的最大秒数(可大于真实超时,用于等待收尾输出) timeout_cutoff: 将耗时大于此值视为超时,用于外层业务区分;默认为 timeout enforce_full_timeout: 若为 True,则不因空闲提前返回(除非捕获 sentinel) sentinel: 若提供,在输出中捕获到该标记即认为命令结束,并从输出中移除 """ if not self.is_running or not self.process: return { "success": False, "error": "终端未运行,请先打开终端会话。", "session": self.session_name } try: # 清空残留输出,防止上一条命令的输出干扰 try: while True: self.output_queue.get_nowait() except queue.Empty: pass marker = self._capture_history_marker() if timeout is None: timeout = TERMINAL_OUTPUT_WAIT else: try: timeout = float(timeout) except (TypeError, ValueError): timeout = TERMINAL_OUTPUT_WAIT if timeout < 0: timeout = 0 start_time = time.time() command_text = command.rstrip('\n') # 记录命令 self.command_history.append({ "command": command_text, "timestamp": datetime.now().isoformat() }) self.last_command = command_text self.is_interactive = False self.last_input_text = command_text self.last_input_time = time.time() self.echo_loop_detected = False self._consecutive_echo_matches = 0 self._append_io_event('input', command_text + '\n', timestamp=self.last_input_time) # 广播输入事件 if self.broadcast: self.broadcast('terminal_input', { 'session': self.session_name, 'data': command_text + '\n', 'timestamp': time.time() }) # 确保命令有换行符 to_send = command if command.endswith('\n') else command + '\n' # 发送命令(统一使用UTF-8编码) try: # 首先尝试UTF-8 command_bytes = to_send.encode('utf-8') except UnicodeEncodeError: # 如果UTF-8失败,Windows系统尝试GBK if self.is_windows: command_bytes = to_send.encode('gbk', errors='replace') else: command_bytes = to_send.encode('utf-8', errors='replace') try: self.process.stdin.write(command_bytes) self.process.stdin.flush() except Exception: return { "success": False, "error": "终端已不可用或输入失败,请重新打开终端会话。", "session": self.session_name } # 等待输出 output, timed_out, marker_seen = self._wait_for_output( timeout=timeout, timeout_cutoff=timeout_cutoff, enforce_full_timeout=enforce_full_timeout, sentinel=sentinel, command_echo=command_text, ) recent_output = self._get_output_since_marker(marker) if recent_output: if sentinel: recent_output = recent_output.replace(sentinel, "") output = recent_output output = self._clean_output(output, command_text, sentinel) output_truncated = False if len(output) > TERMINAL_INPUT_MAX_CHARS: output = output[-TERMINAL_INPUT_MAX_CHARS:] output_truncated = True output_clean = output.strip() has_output = bool(output_clean) status = "completed" if timed_out: status = "timeout" elif not has_output: if self.echo_loop_detected: status = "echo_loop" elif self.is_interactive: status = "awaiting_input" else: status = "no_output" else: if self.echo_loop_detected: status = "output_with_echo" if marker_seen and status == "completed": # 明确捕获到结束标记,视为完成 status = "completed" message_map = { "completed": "命令执行完成", "no_output": "未捕获输出,命令可能未产生结果", "awaiting_input": "命令已发送,终端等待进一步输入或仍在运行", "echo_loop": "检测到终端正在回显输入,命令可能未成功执行", "output_with_echo": "命令产生输出,但终端疑似重复回显", "timeout": f"命令超时({int(timeout)}秒)" } message = message_map.get(status, "命令执行完成") if output_truncated: message += f"(输出已截断,保留末尾{TERMINAL_INPUT_MAX_CHARS}字符)" elapsed_ms = int((time.time() - start_time) * 1000) return { "success": status in {"completed", "output_with_echo"}, "session": self.session_name, "command": command_text, "output": output, "message": message, "status": status, "truncated": output_truncated, "elapsed_ms": elapsed_ms, "timeout": timeout_cutoff or timeout } except Exception as e: error_msg = f"发送命令失败: {str(e)}" print(f"{OUTPUT_FORMATS['error']} {error_msg}") return { "success": False, "error": error_msg, "session": self.session_name } def _wait_for_output( self, timeout: float = 5, timeout_cutoff: Optional[float] = None, enforce_full_timeout: bool = False, sentinel: Optional[str] = None, command_echo: Optional[str] = None, ) -> Tuple[str, bool, bool]: """ 等待并收集输出,返回 (output, timed_out, marker_seen)。 - 若提供 sentinel,捕获后立即返回(仍会吸干队列中的剩余片段)。 - 若 enforce_full_timeout=True,则不因空闲提前返回;否则在输出后短暂空闲可提前返回。 - timed_out 判定使用 timeout_cutoff(若未提供则与 timeout 相同)。 """ collected_output = [] start_time = time.time() last_output_time = start_time output_seen = False marker_seen = False if timeout is None or timeout <= 0: timeout = 0 if timeout == 0: try: while True: output = self.output_queue.get_nowait() if sentinel and sentinel in output: output = output.replace(sentinel, "") marker_seen = True if output: collected_output.append(output) # unreachable except queue.Empty: return ''.join(collected_output), False, marker_seen end_time = start_time + timeout cutoff = timeout_cutoff if timeout_cutoff is not None else timeout # 空闲提前返回仅在未强制等待且未使用结束标记时有效 idle_threshold = None if enforce_full_timeout or sentinel else 1.5 while True: now = time.time() if now >= end_time: break remaining = max(0.05, min(0.5, end_time - now)) try: output = self.output_queue.get(timeout=remaining) if sentinel and sentinel in output: # 避免把命令回显中的标记误判为完成信号 if command_echo and command_echo in output: output = output.replace(sentinel, "") else: output = output.replace(sentinel, "") marker_seen = True if output: collected_output.append(output) last_output_time = time.time() output_seen = True # 尽量一次性收集当前批次,但受时间上限约束,避免无限循环 while time.time() < end_time: try: extra = self.output_queue.get(timeout=0.01) if sentinel and sentinel in extra: if command_echo and command_echo in extra: extra = extra.replace(sentinel, "") else: extra = extra.replace(sentinel, "") marker_seen = True if extra: collected_output.append(extra) last_output_time = time.time() output_seen = True except queue.Empty: break except queue.Empty: pass if marker_seen: # 捕获到结束标记,立即返回 break if idle_threshold and output_seen and (time.time() - last_output_time) > idle_threshold: break elapsed = time.time() - start_time timed_out = bool(cutoff and cutoff > 0 and elapsed >= cutoff) return ''.join(collected_output), timed_out, marker_seen @staticmethod def _clean_output(output: str, command_text: str, sentinel: Optional[str]) -> str: """ 移除封装命令回显和完成标记,保留纯净的命令输出。 """ if not output: return output lines = output.splitlines() cleaned = [] for idx, line in enumerate(lines): # 去掉标记行 if sentinel and sentinel in line: continue # 尝试剥离提示符 for token in ("# ", "$ "): pos = line.find(token) if 0 <= pos <= 40 and "@" in line[:pos]: line = line[pos + len(token):] break # 去掉封装命令回显 if idx == 0 and command_text: if line.strip() == command_text.strip(): continue # 包含 timeout/sh -c 的封装行也忽略 if "timeout -k" in line and "sh -c" in line: continue cleaned.append(line) # 保持末尾换行与原输出一致 out = "\n".join(cleaned) if output.endswith("\n") and cleaned: out += "\n" return out def get_output(self, last_n_lines: int = 50) -> str: """ 获取终端输出 Args: last_n_lines: 获取最后N行 Returns: 输出内容 """ if last_n_lines <= 0: return ''.join(self.output_buffer) # 获取最后N行 lines = [] for line in reversed(self.output_buffer): lines.insert(0, line) if len(lines) >= last_n_lines: break return ''.join(lines) def get_display_output(self) -> str: """获取用于显示的输出(截断到display_size)""" output = self.get_output() if len(output) > self.display_size: # 保留最后的display_size字符 output = output[-self.display_size:] output = f"[输出已截断,显示最后{self.display_size}字符]\n{output}" return output def get_snapshot(self, last_n_lines: Optional[int], max_chars: Optional[int]) -> Dict: """获取终端快照,包含按顺序排列的输入/输出""" include_all_lines = last_n_lines is None or last_n_lines <= 0 if not include_all_lines: last_n_lines = max(1, last_n_lines) segments: List[str] = [] reset = "\x1b[0m" command_color = "\x1b[1;32m" for event_type, _, data in self.io_history: if event_type == 'input': # 显示输入命令并加上ANSI颜色,保持与实时终端一致 input_text = data.rstrip('\n') decorated = f"{command_color}➜ {input_text}{reset}" if input_text.strip() else f"{command_color}➜{reset}" if not decorated.endswith('\n'): decorated += '\n' segments.append(decorated) else: cleaned = data.replace('\r', '') segments.append(cleaned) full_text = ''.join(segments) if full_text: line_chunks = full_text.splitlines(keepends=True) total_lines = len(line_chunks) if include_all_lines: selected_lines = line_chunks lines_requested = total_lines else: count = min(last_n_lines, total_lines) selected_lines = line_chunks[-count:] if count else [] lines_requested = last_n_lines output_text = ''.join(selected_lines) else: output_text = '' lines_requested = 0 total_lines = 0 truncated = False if max_chars is not None and max_chars > 0 and len(output_text) > max_chars: output_text = output_text[-max_chars:] truncated = True # 统计行数 if output_text: lines_returned = output_text.count('\n') + (0 if output_text.endswith('\n') else 1) else: lines_returned = 0 if include_all_lines: lines_requested = total_lines return { "success": True, "session": self.session_name, "output": output_text, "lines_requested": lines_requested, "lines_returned": lines_returned, "truncated": truncated, "is_interactive": self.is_interactive, "echo_loop_detected": self.echo_loop_detected, "seconds_since_last_output": self._seconds_since_last_output(), "last_command": self.last_command, "buffer_size": self.total_output_size, "timestamp": datetime.now().isoformat() } def get_status(self) -> Dict: """获取终端状态""" return { "session_name": self.session_name, "is_running": self.is_running, "working_dir": str(self.working_dir), "shell": self.shell_command, "execution_mode": self.execution_mode, "start_time": self.start_time.isoformat() if self.start_time else None, "is_interactive": self.is_interactive, "last_command": self.last_command, "command_count": len(self.command_history), "buffer_size": self.total_output_size, "truncated_lines": self.truncated_lines, "last_activity": datetime.fromtimestamp(self.last_activity).isoformat(), "uptime_seconds": (datetime.now() - self.start_time).total_seconds() if self.start_time else 0, "seconds_since_last_output": self._seconds_since_last_output(), "echo_loop_detected": self.echo_loop_detected } def close(self) -> bool: """关闭终端""" if not self.is_running: return False try: # 停止读取线程 self.is_reading = False # 发送退出命令 if self.process and self.process.poll() is None: exit_cmd = "exit\n" try: self.process.stdin.write(exit_cmd.encode('utf-8')) self.process.stdin.flush() except: pass # 等待进程结束 try: self.process.wait(timeout=2) except subprocess.TimeoutExpired: # 强制终止 self.process.terminate() time.sleep(0.5) if self.process.poll() is None: self.process.kill() self.is_running = False # 等待读取线程结束 if self.reader_thread and self.reader_thread.is_alive(): self.reader_thread.join(timeout=1) # 广播终端关闭事件 if self.broadcast: self.broadcast('terminal_closed', { 'session': self.session_name, 'time': datetime.now().isoformat() }) if self.using_container: self._stop_sandbox_container() self.using_container = False self.execution_mode = "host" print(f"{OUTPUT_FORMATS['info']} 终端会话关闭: {self.session_name}") return True except Exception as e: print(f"{OUTPUT_FORMATS['error']} 关闭终端失败: {e}") return False def __del__(self): """析构函数,确保进程被关闭""" if hasattr(self, 'is_running') and self.is_running: self.close() def _stop_sandbox_container(self, force: bool = False): """确保容器终端被停止""" if not self._owns_container or not self.sandbox_container_name or not self._sandbox_bin_path: return try: subprocess.run( [self._sandbox_bin_path, "kill", self.sandbox_container_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=3, check=False ) except Exception: if force: print(f"{OUTPUT_FORMATS['warning']} 强制终止容器 {self.sandbox_container_name} 失败,可能已退出。") finally: self.sandbox_container_name = None self._owns_container = False