agent/modules/persistent_terminal.py
2025-11-19 20:47:56 +08:00

661 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/persistent_terminal.py - 持久化终端实例(修复版)
import asyncio
import subprocess
import os
import sys
import time
from pathlib import Path
from typing import Optional, Callable, Dict, List
from datetime import datetime
import threading
import queue
from collections import deque
try:
from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS
except ImportError:
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import OUTPUT_FORMATS, TERMINAL_OUTPUT_WAIT, TERMINAL_INPUT_MAX_CHARS
class PersistentTerminal:
"""单个持久化终端实例"""
def __init__(
self,
session_name: str,
working_dir: str = None,
shell_command: str = None,
broadcast_callback: Callable = None,
max_buffer_size: int = 20000,
display_size: int = 5000
):
"""
初始化持久化终端
Args:
session_name: 会话名称
working_dir: 工作目录
shell_command: shell命令None则自动选择
broadcast_callback: 广播回调函数用于WebSocket
max_buffer_size: 最大缓冲区大小
display_size: 显示大小限制
"""
self.session_name = session_name
self.working_dir = Path(working_dir) if working_dir else Path.cwd()
self.shell_command = shell_command
self.broadcast = broadcast_callback
self.max_buffer_size = max_buffer_size
self.display_size = display_size
# 进程相关
self.process = None
self.is_running = False
self.start_time = None
# 输出缓冲
self.output_buffer = []
self.command_history = []
self.total_output_size = 0
self.truncated_lines = 0
self.output_history = deque()
self._output_event_counter = 0
self.last_output_time = None
self.last_input_time = None
self.last_input_text = ""
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
self.io_history = deque()
self._io_history_max = 4000
# 线程和队列
self.output_queue = queue.Queue()
self.reader_thread = None
self.is_reading = False
# 状态标志
self.is_interactive = False # 是否在等待输入
self.last_command = ""
self.last_activity = time.time()
# 系统特定设置
self.is_windows = sys.platform == "win32"
def start(self) -> bool:
"""启动终端进程(统一处理编码)"""
if self.is_running:
return False
try:
# 确定使用的shell
if self.is_windows:
# Windows下使用CMD
self.shell_command = self.shell_command or "cmd.exe"
else:
# Unix系统
self.shell_command = self.shell_command or os.environ.get('SHELL', '/bin/bash')
# 设置环境变量
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
if self.is_windows:
# Windows特殊设置
env['CHCP'] = '65001' # UTF-8代码页
# Windows统一不使用text模式手动处理编码
self.process = subprocess.Popen(
self.shell_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=str(self.working_dir),
shell=False,
bufsize=0, # 无缓冲
env=env
)
else:
# Unix系统
env['TERM'] = 'xterm-256color'
env['LANG'] = 'en_US.UTF-8'
env['LC_ALL'] = 'en_US.UTF-8'
# Unix也不使用text模式统一处理
self.process = subprocess.Popen(
self.shell_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=str(self.working_dir),
shell=False,
bufsize=0,
env=env
)
self.is_running = True
self.start_time = datetime.now()
self.last_output_time = None
self.last_input_time = None
self.last_input_text = ""
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
# 启动输出读取线程
self.is_reading = True
self.reader_thread = threading.Thread(target=self._read_output)
self.reader_thread.daemon = True
self.reader_thread.start()
# 如果是Windows设置代码页
if self.is_windows:
time.sleep(0.5) # 等待终端初始化
self.send_command("chcp 65001", wait_for_output=False)
time.sleep(0.5)
# 清屏以去除代码页设置的输出
self.send_command("cls", wait_for_output=False)
time.sleep(0.3)
self.output_buffer.clear() # 清除初始化输出
self.total_output_size = 0
# 广播终端启动事件
if self.broadcast:
self.broadcast('terminal_started', {
'session': self.session_name,
'working_dir': str(self.working_dir),
'shell': self.shell_command,
'time': self.start_time.isoformat()
})
print(f"{OUTPUT_FORMATS['success']} 终端会话启动: {self.session_name}")
return True
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 终端启动失败: {e}")
self.is_running = False
return False
def _read_output(self):
"""后台线程:持续读取输出(修复版,正确处理编码)"""
while self.is_reading and self.process:
try:
# 始终读取字节因为我们没有使用text=True
line_bytes = self.process.stdout.readline()
if line_bytes:
# 解码字节到字符串
line = self._decode_output(line_bytes)
# 处理输出
self.output_queue.put(line)
self._process_output(line)
elif self.process.poll() is not None:
# 进程已结束
self.is_running = False
break
else:
# 没有输出,短暂休眠
time.sleep(0.01)
except Exception as e:
# 不要因为单个错误而停止
print(f"[Terminal] 读取输出警告: {e}")
time.sleep(0.01)
continue
def _decode_output(self, data):
"""安全地解码输出"""
# 如果已经是字符串,直接返回
if isinstance(data, str):
return data
# 如果是字节,尝试解码
if isinstance(data, bytes):
# Windows系统尝试的编码顺序
if self.is_windows:
encodings = ['utf-8', 'gbk', 'gb2312', 'cp936', 'latin-1']
else:
encodings = ['utf-8', 'latin-1']
for encoding in encodings:
try:
return data.decode(encoding)
except (UnicodeDecodeError, AttributeError):
continue
# 如果所有编码都失败,使用替换模式
return data.decode('utf-8', errors='replace')
# 其他类型,转换为字符串
return str(data)
def _process_output(self, output: str):
"""处理输出行"""
# 添加到缓冲区
self.output_buffer.append(output)
self.total_output_size += len(output)
now = time.time()
self.last_output_time = now
# 记录输出事件
self._output_event_counter += 1
self.output_history.append((self._output_event_counter, now, output))
self._append_io_event('output', output, timestamp=now)
# 控制输出历史长度
if len(self.output_history) > 2000:
self.output_history.popleft()
# 检查是否需要截断
if self.total_output_size > self.max_buffer_size:
self._truncate_buffer()
# 更新活动时间
self.last_activity = now
# 检测命令回显死循环
cleaned_output = output.replace('\r', '').strip()
cleaned_input = self.last_input_text.strip() if self.last_input_text else ""
if cleaned_output and cleaned_input and cleaned_output == cleaned_input:
self._consecutive_echo_matches += 1
else:
self._consecutive_echo_matches = 0
if cleaned_output:
self.echo_loop_detected = False
if self._consecutive_echo_matches >= 1 and self.last_input_time:
if now - self.last_input_time <= 2:
self.echo_loop_detected = True
# 检测交互式提示
self._detect_interactive_prompt(output)
# 广播输出
if self.broadcast:
self.broadcast('terminal_output', {
'session': self.session_name,
'data': output,
'timestamp': time.time()
})
def _truncate_buffer(self):
"""截断缓冲区以保持在限制内"""
# 保留最后的N个字符
while self.total_output_size > self.max_buffer_size and self.output_buffer:
removed = self.output_buffer.pop(0)
self.total_output_size -= len(removed)
self.truncated_lines += 1
if self.output_history:
self.output_history.popleft()
def _detect_interactive_prompt(self, output: str):
"""检测是否在等待交互输入"""
self.is_interactive = False
# 常见的交互提示模式
interactive_patterns = [
"? ", # 问题提示
": ", # 输入提示
"> ", # 命令提示
"$ ", # shell提示
"# ", # root提示
">>> ", # Python提示
"... ", # Python续行
"(y/n)", # 确认提示
"[Y/n]", # 确认提示
"Password:", # 密码提示
"password:", # 密码提示
"Enter", # 输入提示
"选择", # 中文选择
"请输入", # 中文输入
]
output_lower = output.lower().strip()
for pattern in interactive_patterns:
if pattern.lower() in output_lower:
self.is_interactive = True
return
# 如果输出以常见提示符结尾且没有换行,也认为是交互式
if output and not output.endswith('\n'):
last_chars = output.strip()[-3:]
if last_chars in ['> ', '$ ', '# ', ': ']:
self.is_interactive = True
def _capture_history_marker(self) -> int:
return self._output_event_counter
def _get_output_since_marker(self, marker: int) -> str:
if marker is None:
return ''.join(item[2] for item in self.output_history)
return ''.join(item[2] for item in self.output_history if item[0] > marker)
def _append_io_event(self, event_type: str, data: str, timestamp: Optional[float] = None):
"""记录终端输入输出事件"""
if timestamp is None:
timestamp = time.time()
self.io_history.append((event_type, timestamp, data))
while len(self.io_history) > self._io_history_max:
self.io_history.popleft()
def _seconds_since_last_output(self) -> Optional[float]:
if not self.last_output_time:
return None
return round(time.time() - self.last_output_time, 3)
def send_command(self, command: str, wait_for_output: bool = True, timeout: float = None) -> Dict:
"""发送命令到终端(统一编码处理)"""
if not self.is_running or not self.process:
return {
"success": False,
"error": "终端未运行",
"session": self.session_name
}
try:
marker = self._capture_history_marker()
if timeout is None:
timeout = TERMINAL_OUTPUT_WAIT
else:
try:
timeout = float(timeout)
except (TypeError, ValueError):
timeout = TERMINAL_OUTPUT_WAIT
if timeout < 0:
timeout = 0
start_time = time.time()
command_text = command.rstrip('\n')
# 记录命令
self.command_history.append({
"command": command_text,
"timestamp": datetime.now().isoformat()
})
self.last_command = command_text
self.is_interactive = False
self.last_input_text = command_text
self.last_input_time = time.time()
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
self._append_io_event('input', command_text + '\n', timestamp=self.last_input_time)
# 广播输入事件
if self.broadcast:
self.broadcast('terminal_input', {
'session': self.session_name,
'data': command_text + '\n',
'timestamp': time.time()
})
# 确保命令有换行符
to_send = command if command.endswith('\n') else command + '\n'
# 发送命令统一使用UTF-8编码
try:
# 首先尝试UTF-8
command_bytes = to_send.encode('utf-8')
except UnicodeEncodeError:
# 如果UTF-8失败Windows系统尝试GBK
if self.is_windows:
command_bytes = to_send.encode('gbk', errors='replace')
else:
command_bytes = to_send.encode('utf-8', errors='replace')
self.process.stdin.write(command_bytes)
self.process.stdin.flush()
# 如果需要等待输出
if wait_for_output:
output = self._wait_for_output(timeout=timeout)
recent_output = self._get_output_since_marker(marker)
if recent_output:
output = recent_output
output_truncated = False
if len(output) > TERMINAL_INPUT_MAX_CHARS:
output = output[-TERMINAL_INPUT_MAX_CHARS:]
output_truncated = True
output_clean = output.strip()
has_output = bool(output_clean)
status = "completed"
if not has_output:
if self.echo_loop_detected:
status = "echo_loop"
elif self.is_interactive:
status = "awaiting_input"
else:
status = "no_output"
else:
if self.echo_loop_detected:
status = "output_with_echo"
message_map = {
"completed": "命令执行完成,已捕获终端输出",
"no_output": "未捕获任何输出,命令可能未产生可见结果或终端已卡死需要重制",
"awaiting_input": "命令已发送,终端正在等待进一步输入或进程仍在运行",
"echo_loop": "检测到终端正在回显输入,命令可能未成功执行",
"output_with_echo": "命令产生输出,但终端疑似重复回显,请检查是否卡住"
}
message = message_map.get(status, "命令执行完成")
if output_truncated:
message += f"(输出已截断,保留末尾{TERMINAL_INPUT_MAX_CHARS}字符)"
return {
"success": True,
"session": self.session_name,
"command": command_text,
"output": output,
"message": message,
"status": status,
"truncated": output_truncated
}
else:
return {
"success": True,
"session": self.session_name,
"command": command_text,
"output": "",
"message": "命令已发送至终端,后续输出将实时流式返回",
"status": "pending",
"truncated": False
}
except Exception as e:
error_msg = f"发送命令失败: {str(e)}"
print(f"{OUTPUT_FORMATS['error']} {error_msg}")
return {
"success": False,
"error": error_msg,
"session": self.session_name
}
def _wait_for_output(self, timeout: float = 5) -> str:
"""等待并收集输出"""
collected_output = []
start_time = time.time()
last_output_time = time.time()
if timeout is None or timeout <= 0:
timeout = 0
if timeout == 0:
try:
while True:
output = self.output_queue.get_nowait()
collected_output.append(output)
except queue.Empty:
return ''.join(collected_output)
while time.time() - start_time < timeout:
try:
remaining = max(0.05, min(0.5, timeout - (time.time() - start_time)))
output = self.output_queue.get(timeout=remaining)
collected_output.append(output)
last_output_time = time.time()
# 快速收集剩余输出,直到短暂空闲
while True:
try:
output = self.output_queue.get(timeout=0.1)
collected_output.append(output)
last_output_time = time.time()
except queue.Empty:
break
except queue.Empty:
if collected_output and time.time() - last_output_time > 0.3:
break
if timeout == 0:
break
return ''.join(collected_output)
def get_output(self, last_n_lines: int = 50) -> str:
"""
获取终端输出
Args:
last_n_lines: 获取最后N行
Returns:
输出内容
"""
if last_n_lines <= 0:
return ''.join(self.output_buffer)
# 获取最后N行
lines = []
for line in reversed(self.output_buffer):
lines.insert(0, line)
if len(lines) >= last_n_lines:
break
return ''.join(lines)
def get_display_output(self) -> str:
"""获取用于显示的输出截断到display_size"""
output = self.get_output()
if len(output) > self.display_size:
# 保留最后的display_size字符
output = output[-self.display_size:]
output = f"[输出已截断,显示最后{self.display_size}字符]\n{output}"
return output
def get_snapshot(self, last_n_lines: int, max_chars: int) -> Dict:
"""获取终端快照,包含按顺序排列的输入/输出"""
if last_n_lines <= 0:
last_n_lines = 1
combined_lines: List[str] = []
for event_type, _, data in self.io_history:
if event_type == 'input':
# 显示输入命令,保持与终端监控一致
combined_lines.append(f"{data.rstrip()}" if data.strip() else "")
else:
cleaned = data.replace('\r', '')
# 按行拆分输出,保留空行
segments = cleaned.splitlines()
if cleaned.endswith('\n'):
segments.append('')
combined_lines.extend(segments if segments else [''])
if combined_lines:
selected_lines = combined_lines[-last_n_lines:]
output_text = '\n'.join(selected_lines)
else:
selected_lines = []
output_text = ''
truncated = False
if len(output_text) > max_chars:
output_text = output_text[-max_chars:]
truncated = True
# 统计行数
if output_text:
lines_returned = output_text.count('\n') + (0 if output_text.endswith('\n') else 1)
else:
lines_returned = 0
return {
"success": True,
"session": self.session_name,
"output": output_text,
"lines_requested": last_n_lines,
"lines_returned": lines_returned,
"truncated": truncated,
"is_interactive": self.is_interactive,
"echo_loop_detected": self.echo_loop_detected,
"seconds_since_last_output": self._seconds_since_last_output(),
"last_command": self.last_command,
"buffer_size": self.total_output_size,
"timestamp": datetime.now().isoformat()
}
def get_status(self) -> Dict:
"""获取终端状态"""
return {
"session_name": self.session_name,
"is_running": self.is_running,
"working_dir": str(self.working_dir),
"shell": self.shell_command,
"start_time": self.start_time.isoformat() if self.start_time else None,
"is_interactive": self.is_interactive,
"last_command": self.last_command,
"command_count": len(self.command_history),
"buffer_size": self.total_output_size,
"truncated_lines": self.truncated_lines,
"last_activity": datetime.fromtimestamp(self.last_activity).isoformat(),
"uptime_seconds": (datetime.now() - self.start_time).total_seconds() if self.start_time else 0,
"seconds_since_last_output": self._seconds_since_last_output(),
"echo_loop_detected": self.echo_loop_detected
}
def close(self) -> bool:
"""关闭终端"""
if not self.is_running:
return False
try:
# 停止读取线程
self.is_reading = False
# 发送退出命令
if self.process and self.process.poll() is None:
exit_cmd = "exit\n"
try:
self.process.stdin.write(exit_cmd.encode('utf-8'))
self.process.stdin.flush()
except:
pass
# 等待进程结束
try:
self.process.wait(timeout=2)
except subprocess.TimeoutExpired:
# 强制终止
self.process.terminate()
time.sleep(0.5)
if self.process.poll() is None:
self.process.kill()
self.is_running = False
# 等待读取线程结束
if self.reader_thread and self.reader_thread.is_alive():
self.reader_thread.join(timeout=1)
# 广播终端关闭事件
if self.broadcast:
self.broadcast('terminal_closed', {
'session': self.session_name,
'time': datetime.now().isoformat()
})
print(f"{OUTPUT_FORMATS['info']} 终端会话关闭: {self.session_name}")
return True
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 关闭终端失败: {e}")
return False
def __del__(self):
"""析构函数,确保进程被关闭"""
if hasattr(self, 'is_running') and self.is_running:
self.close()