# modules/persistent_terminal.py - 持久化终端实例(修复版) import asyncio import subprocess import os import sys import time from pathlib import Path from typing import Optional, Callable, Dict, List from datetime import datetime import threading import queue from collections import deque try: from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS except ImportError: import sys from pathlib import Path project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS class PersistentTerminal: """单个持久化终端实例""" def __init__( self, session_name: str, working_dir: str = None, shell_command: str = None, broadcast_callback: Callable = None, max_buffer_size: int = 20000, display_size: int = 5000 ): """ 初始化持久化终端 Args: session_name: 会话名称 working_dir: 工作目录 shell_command: shell命令(None则自动选择) broadcast_callback: 广播回调函数(用于WebSocket) max_buffer_size: 最大缓冲区大小 display_size: 显示大小限制 """ self.session_name = session_name self.working_dir = Path(working_dir) if working_dir else Path.cwd() self.shell_command = shell_command self.broadcast = broadcast_callback self.max_buffer_size = max_buffer_size self.display_size = display_size # 进程相关 self.process = None self.is_running = False self.start_time = None # 输出缓冲 self.output_buffer = [] self.command_history = [] self.total_output_size = 0 self.truncated_lines = 0 self.output_history = deque() self._output_event_counter = 0 self.last_output_time = None self.last_input_time = None self.last_input_text = "" self.echo_loop_detected = False self._consecutive_echo_matches = 0 self.io_history = deque() self._io_history_max = 4000 # 线程和队列 self.output_queue = queue.Queue() self.reader_thread = None self.is_reading = False # 状态标志 self.is_interactive = False # 是否在等待输入 self.last_command = "" self.last_activity = time.time() # 系统特定设置 self.is_windows = sys.platform == "win32" def start(self) -> bool: """启动终端进程(统一处理编码)""" if self.is_running: return False try: # 确定使用的shell if self.is_windows: # Windows下使用CMD self.shell_command = self.shell_command or "cmd.exe" else: # Unix系统 self.shell_command = self.shell_command or os.environ.get('SHELL', '/bin/bash') # 设置环境变量 env = os.environ.copy() env['PYTHONIOENCODING'] = 'utf-8' if self.is_windows: # Windows特殊设置 env['CHCP'] = '65001' # UTF-8代码页 # Windows统一不使用text模式,手动处理编码 self.process = subprocess.Popen( self.shell_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=str(self.working_dir), shell=False, bufsize=0, # 无缓冲 env=env ) else: # Unix系统 env['TERM'] = 'xterm-256color' env['LANG'] = 'en_US.UTF-8' env['LC_ALL'] = 'en_US.UTF-8' # Unix也不使用text模式,统一处理 self.process = subprocess.Popen( self.shell_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=str(self.working_dir), shell=False, bufsize=0, env=env ) self.is_running = True self.start_time = datetime.now() self.last_output_time = None self.last_input_time = None self.last_input_text = "" self.echo_loop_detected = False self._consecutive_echo_matches = 0 # 启动输出读取线程 self.is_reading = True self.reader_thread = threading.Thread(target=self._read_output) self.reader_thread.daemon = True self.reader_thread.start() # 如果是Windows,设置代码页 if self.is_windows: time.sleep(0.5) # 等待终端初始化 self.send_command("chcp 65001", wait_for_output=False) time.sleep(0.5) # 清屏以去除代码页设置的输出 self.send_command("cls", wait_for_output=False) time.sleep(0.3) self.output_buffer.clear() # 清除初始化输出 self.total_output_size = 0 # 广播终端启动事件 if self.broadcast: self.broadcast('terminal_started', { 'session': self.session_name, 'working_dir': str(self.working_dir), 'shell': self.shell_command, 'time': self.start_time.isoformat() }) print(f"{OUTPUT_FORMATS['success']} 终端会话启动: {self.session_name}") return True except Exception as e: print(f"{OUTPUT_FORMATS['error']} 终端启动失败: {e}") self.is_running = False return False def _read_output(self): """后台线程:持续读取输出(修复版,正确处理编码)""" while self.is_reading and self.process: try: # 始终读取字节(因为我们没有使用text=True) line_bytes = self.process.stdout.readline() if line_bytes: # 解码字节到字符串 line = self._decode_output(line_bytes) # 处理输出 self.output_queue.put(line) self._process_output(line) elif self.process.poll() is not None: # 进程已结束 self.is_running = False break else: # 没有输出,短暂休眠 time.sleep(0.01) except Exception as e: # 不要因为单个错误而停止 print(f"[Terminal] 读取输出警告: {e}") time.sleep(0.01) continue def _decode_output(self, data): """安全地解码输出""" # 如果已经是字符串,直接返回 if isinstance(data, str): return data # 如果是字节,尝试解码 if isinstance(data, bytes): # Windows系统尝试的编码顺序 if self.is_windows: encodings = ['utf-8', 'gbk', 'gb2312', 'cp936', 'latin-1'] else: encodings = ['utf-8', 'latin-1'] for encoding in encodings: try: return data.decode(encoding) except (UnicodeDecodeError, AttributeError): continue # 如果所有编码都失败,使用替换模式 return data.decode('utf-8', errors='replace') # 其他类型,转换为字符串 return str(data) def _process_output(self, output: str): """处理输出行""" # 添加到缓冲区 self.output_buffer.append(output) self.total_output_size += len(output) now = time.time() self.last_output_time = now # 记录输出事件 self._output_event_counter += 1 self.output_history.append((self._output_event_counter, now, output)) self._append_io_event('output', output, timestamp=now) # 控制输出历史长度 if len(self.output_history) > 2000: self.output_history.popleft() # 检查是否需要截断 if self.total_output_size > self.max_buffer_size: self._truncate_buffer() # 更新活动时间 self.last_activity = now # 检测命令回显死循环 cleaned_output = output.replace('\r', '').strip() cleaned_input = self.last_input_text.strip() if self.last_input_text else "" if cleaned_output and cleaned_input and cleaned_output == cleaned_input: self._consecutive_echo_matches += 1 else: self._consecutive_echo_matches = 0 if cleaned_output: self.echo_loop_detected = False if self._consecutive_echo_matches >= 1 and self.last_input_time: if now - self.last_input_time <= 2: self.echo_loop_detected = True # 检测交互式提示 self._detect_interactive_prompt(output) # 广播输出 if self.broadcast: self.broadcast('terminal_output', { 'session': self.session_name, 'data': output, 'timestamp': time.time() }) def _truncate_buffer(self): """截断缓冲区以保持在限制内""" # 保留最后的N个字符 while self.total_output_size > self.max_buffer_size and self.output_buffer: removed = self.output_buffer.pop(0) self.total_output_size -= len(removed) self.truncated_lines += 1 if self.output_history: self.output_history.popleft() def _detect_interactive_prompt(self, output: str): """检测是否在等待交互输入""" self.is_interactive = False # 常见的交互提示模式 interactive_patterns = [ "? ", # 问题提示 ": ", # 输入提示 "> ", # 命令提示 "$ ", # shell提示 "# ", # root提示 ">>> ", # Python提示 "... ", # Python续行 "(y/n)", # 确认提示 "[Y/n]", # 确认提示 "Password:", # 密码提示 "password:", # 密码提示 "Enter", # 输入提示 "选择", # 中文选择 "请输入", # 中文输入 ] output_lower = output.lower().strip() for pattern in interactive_patterns: if pattern.lower() in output_lower: self.is_interactive = True return # 如果输出以常见提示符结尾且没有换行,也认为是交互式 if output and not output.endswith('\n'): last_chars = output.strip()[-3:] if last_chars in ['> ', '$ ', '# ', ': ']: self.is_interactive = True def _capture_history_marker(self) -> int: return self._output_event_counter def _get_output_since_marker(self, marker: int) -> str: if marker is None: return ''.join(item[2] for item in self.output_history) return ''.join(item[2] for item in self.output_history if item[0] > marker) def _append_io_event(self, event_type: str, data: str, timestamp: Optional[float] = None): """记录终端输入输出事件""" if timestamp is None: timestamp = time.time() self.io_history.append((event_type, timestamp, data)) while len(self.io_history) > self._io_history_max: self.io_history.popleft() def _seconds_since_last_output(self) -> Optional[float]: if not self.last_output_time: return None return round(time.time() - self.last_output_time, 3) def send_command(self, command: str, wait_for_output: bool = True, timeout: float = None) -> Dict: """发送命令到终端(统一编码处理)""" if not self.is_running or not self.process: return { "success": False, "error": "终端未运行", "session": self.session_name } try: marker = self._capture_history_marker() if timeout is None: timeout = TERMINAL_OUTPUT_WAIT else: try: timeout = float(timeout) except (TypeError, ValueError): timeout = TERMINAL_OUTPUT_WAIT if timeout < 0: timeout = 0 start_time = time.time() command_text = command.rstrip('\n') # 记录命令 self.command_history.append({ "command": command_text, "timestamp": datetime.now().isoformat() }) self.last_command = command_text self.is_interactive = False self.last_input_text = command_text self.last_input_time = time.time() self.echo_loop_detected = False self._consecutive_echo_matches = 0 self._append_io_event('input', command_text + '\n', timestamp=self.last_input_time) # 广播输入事件 if self.broadcast: self.broadcast('terminal_input', { 'session': self.session_name, 'data': command_text + '\n', 'timestamp': time.time() }) # 确保命令有换行符 to_send = command if command.endswith('\n') else command + '\n' # 发送命令(统一使用UTF-8编码) try: # 首先尝试UTF-8 command_bytes = to_send.encode('utf-8') except UnicodeEncodeError: # 如果UTF-8失败,Windows系统尝试GBK if self.is_windows: command_bytes = to_send.encode('gbk', errors='replace') else: command_bytes = to_send.encode('utf-8', errors='replace') self.process.stdin.write(command_bytes) self.process.stdin.flush() # 如果需要等待输出 if wait_for_output: output = self._wait_for_output(timeout=timeout) recent_output = self._get_output_since_marker(marker) if recent_output: output = recent_output output_truncated = False if len(output) > TERMINAL_INPUT_MAX_CHARS: output = output[-TERMINAL_INPUT_MAX_CHARS:] output_truncated = True output_clean = output.strip() has_output = bool(output_clean) status = "completed" if not has_output: if self.echo_loop_detected: status = "echo_loop" elif self.is_interactive: status = "awaiting_input" else: status = "no_output" else: if self.echo_loop_detected: status = "output_with_echo" message_map = { "completed": "命令执行完成,已捕获终端输出", "no_output": "未捕获任何输出,命令可能未产生可见结果或终端已卡死需要重制", "awaiting_input": "命令已发送,终端正在等待进一步输入或进程仍在运行", "echo_loop": "检测到终端正在回显输入,命令可能未成功执行", "output_with_echo": "命令产生输出,但终端疑似重复回显,请检查是否卡住" } message = message_map.get(status, "命令执行完成") if output_truncated: message += f"(输出已截断,保留末尾{TERMINAL_INPUT_MAX_CHARS}字符)" return { "success": True, "session": self.session_name, "command": command_text, "output": output, "message": message, "status": status, "truncated": output_truncated } else: return { "success": True, "session": self.session_name, "command": command_text, "output": "", "message": "命令已发送至终端,后续输出将实时流式返回", "status": "pending", "truncated": False } except Exception as e: error_msg = f"发送命令失败: {str(e)}" print(f"{OUTPUT_FORMATS['error']} {error_msg}") return { "success": False, "error": error_msg, "session": self.session_name } def _wait_for_output(self, timeout: float = 5) -> str: """等待并收集输出""" collected_output = [] start_time = time.time() last_output_time = time.time() if timeout is None or timeout <= 0: timeout = 0 if timeout == 0: try: while True: output = self.output_queue.get_nowait() collected_output.append(output) except queue.Empty: return ''.join(collected_output) while time.time() - start_time < timeout: try: remaining = max(0.05, min(0.5, timeout - (time.time() - start_time))) output = self.output_queue.get(timeout=remaining) collected_output.append(output) last_output_time = time.time() # 快速收集剩余输出,直到短暂空闲 while True: try: output = self.output_queue.get(timeout=0.1) collected_output.append(output) last_output_time = time.time() except queue.Empty: break except queue.Empty: if collected_output and time.time() - last_output_time > 0.3: break if timeout == 0: break return ''.join(collected_output) def get_output(self, last_n_lines: int = 50) -> str: """ 获取终端输出 Args: last_n_lines: 获取最后N行 Returns: 输出内容 """ if last_n_lines <= 0: return ''.join(self.output_buffer) # 获取最后N行 lines = [] for line in reversed(self.output_buffer): lines.insert(0, line) if len(lines) >= last_n_lines: break return ''.join(lines) def get_display_output(self) -> str: """获取用于显示的输出(截断到display_size)""" output = self.get_output() if len(output) > self.display_size: # 保留最后的display_size字符 output = output[-self.display_size:] output = f"[输出已截断,显示最后{self.display_size}字符]\n{output}" return output def get_snapshot(self, last_n_lines: Optional[int], max_chars: Optional[int]) -> Dict: """获取终端快照,包含按顺序排列的输入/输出""" include_all_lines = last_n_lines is None or last_n_lines <= 0 if not include_all_lines: last_n_lines = max(1, last_n_lines) segments: List[str] = [] reset = "\x1b[0m" command_color = "\x1b[1;32m" for event_type, _, data in self.io_history: if event_type == 'input': # 显示输入命令并加上ANSI颜色,保持与实时终端一致 input_text = data.rstrip('\n') decorated = f"{command_color}➜ {input_text}{reset}" if input_text.strip() else f"{command_color}➜{reset}" if not decorated.endswith('\n'): decorated += '\n' segments.append(decorated) else: cleaned = data.replace('\r', '') segments.append(cleaned) full_text = ''.join(segments) if full_text: line_chunks = full_text.splitlines(keepends=True) total_lines = len(line_chunks) if include_all_lines: selected_lines = line_chunks lines_requested = total_lines else: count = min(last_n_lines, total_lines) selected_lines = line_chunks[-count:] if count else [] lines_requested = last_n_lines output_text = ''.join(selected_lines) else: output_text = '' lines_requested = 0 total_lines = 0 truncated = False if max_chars is not None and max_chars > 0 and len(output_text) > max_chars: output_text = output_text[-max_chars:] truncated = True # 统计行数 if output_text: lines_returned = output_text.count('\n') + (0 if output_text.endswith('\n') else 1) else: lines_returned = 0 if include_all_lines: lines_requested = total_lines return { "success": True, "session": self.session_name, "output": output_text, "lines_requested": lines_requested, "lines_returned": lines_returned, "truncated": truncated, "is_interactive": self.is_interactive, "echo_loop_detected": self.echo_loop_detected, "seconds_since_last_output": self._seconds_since_last_output(), "last_command": self.last_command, "buffer_size": self.total_output_size, "timestamp": datetime.now().isoformat() } def get_status(self) -> Dict: """获取终端状态""" return { "session_name": self.session_name, "is_running": self.is_running, "working_dir": str(self.working_dir), "shell": self.shell_command, "start_time": self.start_time.isoformat() if self.start_time else None, "is_interactive": self.is_interactive, "last_command": self.last_command, "command_count": len(self.command_history), "buffer_size": self.total_output_size, "truncated_lines": self.truncated_lines, "last_activity": datetime.fromtimestamp(self.last_activity).isoformat(), "uptime_seconds": (datetime.now() - self.start_time).total_seconds() if self.start_time else 0, "seconds_since_last_output": self._seconds_since_last_output(), "echo_loop_detected": self.echo_loop_detected } def close(self) -> bool: """关闭终端""" if not self.is_running: return False try: # 停止读取线程 self.is_reading = False # 发送退出命令 if self.process and self.process.poll() is None: exit_cmd = "exit\n" try: self.process.stdin.write(exit_cmd.encode('utf-8')) self.process.stdin.flush() except: pass # 等待进程结束 try: self.process.wait(timeout=2) except subprocess.TimeoutExpired: # 强制终止 self.process.terminate() time.sleep(0.5) if self.process.poll() is None: self.process.kill() self.is_running = False # 等待读取线程结束 if self.reader_thread and self.reader_thread.is_alive(): self.reader_thread.join(timeout=1) # 广播终端关闭事件 if self.broadcast: self.broadcast('terminal_closed', { 'session': self.session_name, 'time': datetime.now().isoformat() }) print(f"{OUTPUT_FORMATS['info']} 终端会话关闭: {self.session_name}") return True except Exception as e: print(f"{OUTPUT_FORMATS['error']} 关闭终端失败: {e}") return False def __del__(self): """析构函数,确保进程被关闭""" if hasattr(self, 'is_running') and self.is_running: self.close()