661 lines
24 KiB
Python
661 lines
24 KiB
Python
# modules/persistent_terminal.py - 持久化终端实例(修复版)
|
||
|
||
import asyncio
|
||
import subprocess
|
||
import os
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, Callable, Dict, List
|
||
from datetime import datetime
|
||
import threading
|
||
import queue
|
||
from collections import deque
|
||
try:
|
||
from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS
|
||
except ImportError:
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).resolve().parents[1]
|
||
if str(project_root) not in sys.path:
|
||
sys.path.insert(0, str(project_root))
|
||
from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS
|
||
|
||
class PersistentTerminal:
|
||
"""单个持久化终端实例"""
|
||
|
||
def __init__(
|
||
self,
|
||
session_name: str,
|
||
working_dir: str = None,
|
||
shell_command: str = None,
|
||
broadcast_callback: Callable = None,
|
||
max_buffer_size: int = 20000,
|
||
display_size: int = 5000
|
||
):
|
||
"""
|
||
初始化持久化终端
|
||
|
||
Args:
|
||
session_name: 会话名称
|
||
working_dir: 工作目录
|
||
shell_command: shell命令(None则自动选择)
|
||
broadcast_callback: 广播回调函数(用于WebSocket)
|
||
max_buffer_size: 最大缓冲区大小
|
||
display_size: 显示大小限制
|
||
"""
|
||
self.session_name = session_name
|
||
self.working_dir = Path(working_dir) if working_dir else Path.cwd()
|
||
self.shell_command = shell_command
|
||
self.broadcast = broadcast_callback
|
||
self.max_buffer_size = max_buffer_size
|
||
self.display_size = display_size
|
||
|
||
# 进程相关
|
||
self.process = None
|
||
self.is_running = False
|
||
self.start_time = None
|
||
|
||
# 输出缓冲
|
||
self.output_buffer = []
|
||
self.command_history = []
|
||
self.total_output_size = 0
|
||
self.truncated_lines = 0
|
||
self.output_history = deque()
|
||
self._output_event_counter = 0
|
||
self.last_output_time = None
|
||
self.last_input_time = None
|
||
self.last_input_text = ""
|
||
self.echo_loop_detected = False
|
||
self._consecutive_echo_matches = 0
|
||
self.io_history = deque()
|
||
self._io_history_max = 4000
|
||
|
||
# 线程和队列
|
||
self.output_queue = queue.Queue()
|
||
self.reader_thread = None
|
||
self.is_reading = False
|
||
|
||
# 状态标志
|
||
self.is_interactive = False # 是否在等待输入
|
||
self.last_command = ""
|
||
self.last_activity = time.time()
|
||
|
||
# 系统特定设置
|
||
self.is_windows = sys.platform == "win32"
|
||
|
||
def start(self) -> bool:
|
||
"""启动终端进程(统一处理编码)"""
|
||
if self.is_running:
|
||
return False
|
||
|
||
try:
|
||
# 确定使用的shell
|
||
if self.is_windows:
|
||
# Windows下使用CMD
|
||
self.shell_command = self.shell_command or "cmd.exe"
|
||
else:
|
||
# Unix系统
|
||
self.shell_command = self.shell_command or os.environ.get('SHELL', '/bin/bash')
|
||
|
||
# 设置环境变量
|
||
env = os.environ.copy()
|
||
env['PYTHONIOENCODING'] = 'utf-8'
|
||
|
||
if self.is_windows:
|
||
# Windows特殊设置
|
||
env['CHCP'] = '65001' # UTF-8代码页
|
||
|
||
# Windows统一不使用text模式,手动处理编码
|
||
self.process = subprocess.Popen(
|
||
self.shell_command,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=str(self.working_dir),
|
||
shell=False,
|
||
bufsize=0, # 无缓冲
|
||
env=env
|
||
)
|
||
else:
|
||
# Unix系统
|
||
env['TERM'] = 'xterm-256color'
|
||
env['LANG'] = 'en_US.UTF-8'
|
||
env['LC_ALL'] = 'en_US.UTF-8'
|
||
|
||
# Unix也不使用text模式,统一处理
|
||
self.process = subprocess.Popen(
|
||
self.shell_command,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
cwd=str(self.working_dir),
|
||
shell=False,
|
||
bufsize=0,
|
||
env=env
|
||
)
|
||
|
||
self.is_running = True
|
||
self.start_time = datetime.now()
|
||
self.last_output_time = None
|
||
self.last_input_time = None
|
||
self.last_input_text = ""
|
||
self.echo_loop_detected = False
|
||
self._consecutive_echo_matches = 0
|
||
|
||
# 启动输出读取线程
|
||
self.is_reading = True
|
||
self.reader_thread = threading.Thread(target=self._read_output)
|
||
self.reader_thread.daemon = True
|
||
self.reader_thread.start()
|
||
|
||
# 如果是Windows,设置代码页
|
||
if self.is_windows:
|
||
time.sleep(0.5) # 等待终端初始化
|
||
self.send_command("chcp 65001", wait_for_output=False)
|
||
time.sleep(0.5)
|
||
# 清屏以去除代码页设置的输出
|
||
self.send_command("cls", wait_for_output=False)
|
||
time.sleep(0.3)
|
||
self.output_buffer.clear() # 清除初始化输出
|
||
self.total_output_size = 0
|
||
|
||
# 广播终端启动事件
|
||
if self.broadcast:
|
||
self.broadcast('terminal_started', {
|
||
'session': self.session_name,
|
||
'working_dir': str(self.working_dir),
|
||
'shell': self.shell_command,
|
||
'time': self.start_time.isoformat()
|
||
})
|
||
|
||
print(f"{OUTPUT_FORMATS['success']} 终端会话启动: {self.session_name}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"{OUTPUT_FORMATS['error']} 终端启动失败: {e}")
|
||
self.is_running = False
|
||
return False
|
||
|
||
def _read_output(self):
|
||
"""后台线程:持续读取输出(修复版,正确处理编码)"""
|
||
while self.is_reading and self.process:
|
||
try:
|
||
# 始终读取字节(因为我们没有使用text=True)
|
||
line_bytes = self.process.stdout.readline()
|
||
|
||
if line_bytes:
|
||
# 解码字节到字符串
|
||
line = self._decode_output(line_bytes)
|
||
|
||
# 处理输出
|
||
self.output_queue.put(line)
|
||
self._process_output(line)
|
||
|
||
elif self.process.poll() is not None:
|
||
# 进程已结束
|
||
self.is_running = False
|
||
break
|
||
else:
|
||
# 没有输出,短暂休眠
|
||
time.sleep(0.01)
|
||
|
||
except Exception as e:
|
||
# 不要因为单个错误而停止
|
||
print(f"[Terminal] 读取输出警告: {e}")
|
||
time.sleep(0.01)
|
||
continue
|
||
|
||
def _decode_output(self, data):
|
||
"""安全地解码输出"""
|
||
# 如果已经是字符串,直接返回
|
||
if isinstance(data, str):
|
||
return data
|
||
|
||
# 如果是字节,尝试解码
|
||
if isinstance(data, bytes):
|
||
# Windows系统尝试的编码顺序
|
||
if self.is_windows:
|
||
encodings = ['utf-8', 'gbk', 'gb2312', 'cp936', 'latin-1']
|
||
else:
|
||
encodings = ['utf-8', 'latin-1']
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
return data.decode(encoding)
|
||
except (UnicodeDecodeError, AttributeError):
|
||
continue
|
||
|
||
# 如果所有编码都失败,使用替换模式
|
||
return data.decode('utf-8', errors='replace')
|
||
|
||
# 其他类型,转换为字符串
|
||
return str(data)
|
||
|
||
def _process_output(self, output: str):
|
||
"""处理输出行"""
|
||
# 添加到缓冲区
|
||
self.output_buffer.append(output)
|
||
self.total_output_size += len(output)
|
||
now = time.time()
|
||
self.last_output_time = now
|
||
|
||
# 记录输出事件
|
||
self._output_event_counter += 1
|
||
self.output_history.append((self._output_event_counter, now, output))
|
||
self._append_io_event('output', output, timestamp=now)
|
||
|
||
# 控制输出历史长度
|
||
if len(self.output_history) > 2000:
|
||
self.output_history.popleft()
|
||
|
||
# 检查是否需要截断
|
||
if self.total_output_size > self.max_buffer_size:
|
||
self._truncate_buffer()
|
||
|
||
# 更新活动时间
|
||
self.last_activity = now
|
||
|
||
# 检测命令回显死循环
|
||
cleaned_output = output.replace('\r', '').strip()
|
||
cleaned_input = self.last_input_text.strip() if self.last_input_text else ""
|
||
if cleaned_output and cleaned_input and cleaned_output == cleaned_input:
|
||
self._consecutive_echo_matches += 1
|
||
else:
|
||
self._consecutive_echo_matches = 0
|
||
if cleaned_output:
|
||
self.echo_loop_detected = False
|
||
if self._consecutive_echo_matches >= 1 and self.last_input_time:
|
||
if now - self.last_input_time <= 2:
|
||
self.echo_loop_detected = True
|
||
|
||
# 检测交互式提示
|
||
self._detect_interactive_prompt(output)
|
||
|
||
# 广播输出
|
||
if self.broadcast:
|
||
self.broadcast('terminal_output', {
|
||
'session': self.session_name,
|
||
'data': output,
|
||
'timestamp': time.time()
|
||
})
|
||
|
||
def _truncate_buffer(self):
|
||
"""截断缓冲区以保持在限制内"""
|
||
# 保留最后的N个字符
|
||
while self.total_output_size > self.max_buffer_size and self.output_buffer:
|
||
removed = self.output_buffer.pop(0)
|
||
self.total_output_size -= len(removed)
|
||
self.truncated_lines += 1
|
||
if self.output_history:
|
||
self.output_history.popleft()
|
||
|
||
def _detect_interactive_prompt(self, output: str):
|
||
"""检测是否在等待交互输入"""
|
||
self.is_interactive = False
|
||
# 常见的交互提示模式
|
||
interactive_patterns = [
|
||
"? ", # 问题提示
|
||
": ", # 输入提示
|
||
"> ", # 命令提示
|
||
"$ ", # shell提示
|
||
"# ", # root提示
|
||
">>> ", # Python提示
|
||
"... ", # Python续行
|
||
"(y/n)", # 确认提示
|
||
"[Y/n]", # 确认提示
|
||
"Password:", # 密码提示
|
||
"password:", # 密码提示
|
||
"Enter", # 输入提示
|
||
"选择", # 中文选择
|
||
"请输入", # 中文输入
|
||
]
|
||
|
||
output_lower = output.lower().strip()
|
||
for pattern in interactive_patterns:
|
||
if pattern.lower() in output_lower:
|
||
self.is_interactive = True
|
||
return
|
||
|
||
# 如果输出以常见提示符结尾且没有换行,也认为是交互式
|
||
if output and not output.endswith('\n'):
|
||
last_chars = output.strip()[-3:]
|
||
if last_chars in ['> ', '$ ', '# ', ': ']:
|
||
self.is_interactive = True
|
||
|
||
def _capture_history_marker(self) -> int:
|
||
return self._output_event_counter
|
||
|
||
def _get_output_since_marker(self, marker: int) -> str:
|
||
if marker is None:
|
||
return ''.join(item[2] for item in self.output_history)
|
||
return ''.join(item[2] for item in self.output_history if item[0] > marker)
|
||
|
||
def _append_io_event(self, event_type: str, data: str, timestamp: Optional[float] = None):
|
||
"""记录终端输入输出事件"""
|
||
if timestamp is None:
|
||
timestamp = time.time()
|
||
self.io_history.append((event_type, timestamp, data))
|
||
while len(self.io_history) > self._io_history_max:
|
||
self.io_history.popleft()
|
||
|
||
def _seconds_since_last_output(self) -> Optional[float]:
|
||
if not self.last_output_time:
|
||
return None
|
||
return round(time.time() - self.last_output_time, 3)
|
||
|
||
def send_command(self, command: str, wait_for_output: bool = True, timeout: float = None) -> Dict:
|
||
"""发送命令到终端(统一编码处理)"""
|
||
if not self.is_running or not self.process:
|
||
return {
|
||
"success": False,
|
||
"error": "终端未运行",
|
||
"session": self.session_name
|
||
}
|
||
|
||
try:
|
||
marker = self._capture_history_marker()
|
||
if timeout is None:
|
||
timeout = TERMINAL_OUTPUT_WAIT
|
||
else:
|
||
try:
|
||
timeout = float(timeout)
|
||
except (TypeError, ValueError):
|
||
timeout = TERMINAL_OUTPUT_WAIT
|
||
if timeout < 0:
|
||
timeout = 0
|
||
start_time = time.time()
|
||
command_text = command.rstrip('\n')
|
||
# 记录命令
|
||
self.command_history.append({
|
||
"command": command_text,
|
||
"timestamp": datetime.now().isoformat()
|
||
})
|
||
self.last_command = command_text
|
||
self.is_interactive = False
|
||
self.last_input_text = command_text
|
||
self.last_input_time = time.time()
|
||
self.echo_loop_detected = False
|
||
self._consecutive_echo_matches = 0
|
||
self._append_io_event('input', command_text + '\n', timestamp=self.last_input_time)
|
||
|
||
# 广播输入事件
|
||
if self.broadcast:
|
||
self.broadcast('terminal_input', {
|
||
'session': self.session_name,
|
||
'data': command_text + '\n',
|
||
'timestamp': time.time()
|
||
})
|
||
|
||
# 确保命令有换行符
|
||
to_send = command if command.endswith('\n') else command + '\n'
|
||
|
||
# 发送命令(统一使用UTF-8编码)
|
||
try:
|
||
# 首先尝试UTF-8
|
||
command_bytes = to_send.encode('utf-8')
|
||
except UnicodeEncodeError:
|
||
# 如果UTF-8失败,Windows系统尝试GBK
|
||
if self.is_windows:
|
||
command_bytes = to_send.encode('gbk', errors='replace')
|
||
else:
|
||
command_bytes = to_send.encode('utf-8', errors='replace')
|
||
|
||
self.process.stdin.write(command_bytes)
|
||
self.process.stdin.flush()
|
||
|
||
# 如果需要等待输出
|
||
if wait_for_output:
|
||
output = self._wait_for_output(timeout=timeout)
|
||
recent_output = self._get_output_since_marker(marker)
|
||
if recent_output:
|
||
output = recent_output
|
||
output_truncated = False
|
||
if len(output) > TERMINAL_INPUT_MAX_CHARS:
|
||
output = output[-TERMINAL_INPUT_MAX_CHARS:]
|
||
output_truncated = True
|
||
output_clean = output.strip()
|
||
has_output = bool(output_clean)
|
||
status = "completed"
|
||
if not has_output:
|
||
if self.echo_loop_detected:
|
||
status = "echo_loop"
|
||
elif self.is_interactive:
|
||
status = "awaiting_input"
|
||
else:
|
||
status = "no_output"
|
||
else:
|
||
if self.echo_loop_detected:
|
||
status = "output_with_echo"
|
||
message_map = {
|
||
"completed": "命令执行完成,已捕获终端输出",
|
||
"no_output": "未捕获任何输出,命令可能未产生可见结果或终端已卡死需要重制",
|
||
"awaiting_input": "命令已发送,终端正在等待进一步输入或进程仍在运行",
|
||
"echo_loop": "检测到终端正在回显输入,命令可能未成功执行",
|
||
"output_with_echo": "命令产生输出,但终端疑似重复回显,请检查是否卡住"
|
||
}
|
||
message = message_map.get(status, "命令执行完成")
|
||
if output_truncated:
|
||
message += f"(输出已截断,保留末尾{TERMINAL_INPUT_MAX_CHARS}字符)"
|
||
return {
|
||
"success": True,
|
||
"session": self.session_name,
|
||
"command": command_text,
|
||
"output": output,
|
||
"message": message,
|
||
"status": status,
|
||
"truncated": output_truncated
|
||
}
|
||
else:
|
||
return {
|
||
"success": True,
|
||
"session": self.session_name,
|
||
"command": command_text,
|
||
"output": "",
|
||
"message": "命令已发送至终端,后续输出将实时流式返回",
|
||
"status": "pending",
|
||
"truncated": False
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"发送命令失败: {str(e)}"
|
||
print(f"{OUTPUT_FORMATS['error']} {error_msg}")
|
||
return {
|
||
"success": False,
|
||
"error": error_msg,
|
||
"session": self.session_name
|
||
}
|
||
|
||
def _wait_for_output(self, timeout: float = 5) -> str:
|
||
"""等待并收集输出"""
|
||
collected_output = []
|
||
start_time = time.time()
|
||
last_output_time = time.time()
|
||
|
||
if timeout is None or timeout <= 0:
|
||
timeout = 0
|
||
|
||
if timeout == 0:
|
||
try:
|
||
while True:
|
||
output = self.output_queue.get_nowait()
|
||
collected_output.append(output)
|
||
except queue.Empty:
|
||
return ''.join(collected_output)
|
||
|
||
while time.time() - start_time < timeout:
|
||
try:
|
||
remaining = max(0.05, min(0.5, timeout - (time.time() - start_time)))
|
||
output = self.output_queue.get(timeout=remaining)
|
||
collected_output.append(output)
|
||
last_output_time = time.time()
|
||
# 快速收集剩余输出,直到短暂空闲
|
||
while True:
|
||
try:
|
||
output = self.output_queue.get(timeout=0.1)
|
||
collected_output.append(output)
|
||
last_output_time = time.time()
|
||
except queue.Empty:
|
||
break
|
||
except queue.Empty:
|
||
if collected_output and time.time() - last_output_time > 0.3:
|
||
break
|
||
if timeout == 0:
|
||
break
|
||
|
||
return ''.join(collected_output)
|
||
|
||
def get_output(self, last_n_lines: int = 50) -> str:
|
||
"""
|
||
获取终端输出
|
||
|
||
Args:
|
||
last_n_lines: 获取最后N行
|
||
|
||
Returns:
|
||
输出内容
|
||
"""
|
||
if last_n_lines <= 0:
|
||
return ''.join(self.output_buffer)
|
||
|
||
# 获取最后N行
|
||
lines = []
|
||
for line in reversed(self.output_buffer):
|
||
lines.insert(0, line)
|
||
if len(lines) >= last_n_lines:
|
||
break
|
||
|
||
return ''.join(lines)
|
||
|
||
def get_display_output(self) -> str:
|
||
"""获取用于显示的输出(截断到display_size)"""
|
||
output = self.get_output()
|
||
if len(output) > self.display_size:
|
||
# 保留最后的display_size字符
|
||
output = output[-self.display_size:]
|
||
output = f"[输出已截断,显示最后{self.display_size}字符]\n{output}"
|
||
return output
|
||
|
||
def get_snapshot(self, last_n_lines: int, max_chars: int) -> Dict:
|
||
"""获取终端快照,包含按顺序排列的输入/输出"""
|
||
if last_n_lines <= 0:
|
||
last_n_lines = 1
|
||
|
||
combined_lines: List[str] = []
|
||
for event_type, _, data in self.io_history:
|
||
if event_type == 'input':
|
||
# 显示输入命令,保持与终端监控一致
|
||
combined_lines.append(f"➜ {data.rstrip()}" if data.strip() else "➜")
|
||
else:
|
||
cleaned = data.replace('\r', '')
|
||
# 按行拆分输出,保留空行
|
||
segments = cleaned.splitlines()
|
||
if cleaned.endswith('\n'):
|
||
segments.append('')
|
||
combined_lines.extend(segments if segments else [''])
|
||
|
||
if combined_lines:
|
||
selected_lines = combined_lines[-last_n_lines:]
|
||
output_text = '\n'.join(selected_lines)
|
||
else:
|
||
selected_lines = []
|
||
output_text = ''
|
||
|
||
truncated = False
|
||
if len(output_text) > max_chars:
|
||
output_text = output_text[-max_chars:]
|
||
truncated = True
|
||
|
||
# 统计行数
|
||
if output_text:
|
||
lines_returned = output_text.count('\n') + (0 if output_text.endswith('\n') else 1)
|
||
else:
|
||
lines_returned = 0
|
||
|
||
return {
|
||
"success": True,
|
||
"session": self.session_name,
|
||
"output": output_text,
|
||
"lines_requested": last_n_lines,
|
||
"lines_returned": lines_returned,
|
||
"truncated": truncated,
|
||
"is_interactive": self.is_interactive,
|
||
"echo_loop_detected": self.echo_loop_detected,
|
||
"seconds_since_last_output": self._seconds_since_last_output(),
|
||
"last_command": self.last_command,
|
||
"buffer_size": self.total_output_size,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
def get_status(self) -> Dict:
|
||
"""获取终端状态"""
|
||
return {
|
||
"session_name": self.session_name,
|
||
"is_running": self.is_running,
|
||
"working_dir": str(self.working_dir),
|
||
"shell": self.shell_command,
|
||
"start_time": self.start_time.isoformat() if self.start_time else None,
|
||
"is_interactive": self.is_interactive,
|
||
"last_command": self.last_command,
|
||
"command_count": len(self.command_history),
|
||
"buffer_size": self.total_output_size,
|
||
"truncated_lines": self.truncated_lines,
|
||
"last_activity": datetime.fromtimestamp(self.last_activity).isoformat(),
|
||
"uptime_seconds": (datetime.now() - self.start_time).total_seconds() if self.start_time else 0,
|
||
"seconds_since_last_output": self._seconds_since_last_output(),
|
||
"echo_loop_detected": self.echo_loop_detected
|
||
}
|
||
|
||
def close(self) -> bool:
|
||
"""关闭终端"""
|
||
if not self.is_running:
|
||
return False
|
||
|
||
try:
|
||
# 停止读取线程
|
||
self.is_reading = False
|
||
|
||
# 发送退出命令
|
||
if self.process and self.process.poll() is None:
|
||
exit_cmd = "exit\n"
|
||
try:
|
||
self.process.stdin.write(exit_cmd.encode('utf-8'))
|
||
self.process.stdin.flush()
|
||
except:
|
||
pass
|
||
|
||
# 等待进程结束
|
||
try:
|
||
self.process.wait(timeout=2)
|
||
except subprocess.TimeoutExpired:
|
||
# 强制终止
|
||
self.process.terminate()
|
||
time.sleep(0.5)
|
||
if self.process.poll() is None:
|
||
self.process.kill()
|
||
|
||
self.is_running = False
|
||
|
||
# 等待读取线程结束
|
||
if self.reader_thread and self.reader_thread.is_alive():
|
||
self.reader_thread.join(timeout=1)
|
||
|
||
# 广播终端关闭事件
|
||
if self.broadcast:
|
||
self.broadcast('terminal_closed', {
|
||
'session': self.session_name,
|
||
'time': datetime.now().isoformat()
|
||
})
|
||
|
||
print(f"{OUTPUT_FORMATS['info']} 终端会话关闭: {self.session_name}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"{OUTPUT_FORMATS['error']} 关闭终端失败: {e}")
|
||
return False
|
||
|
||
def __del__(self):
|
||
"""析构函数,确保进程被关闭"""
|
||
if hasattr(self, 'is_running') and self.is_running:
|
||
self.close()
|