agent-Specialization/modules/persistent_terminal.py

975 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/persistent_terminal.py - 持久化终端实例(修复版)
import asyncio
import subprocess
import os
import sys
import time
from pathlib import Path
from typing import Optional, Callable, Dict, List
from datetime import datetime
import threading
import queue
from collections import deque
import shutil
import uuid
try:
from config import (
OUTPUT_FORMATS,
TERMINAL_OUTPUT_WAIT,
TERMINAL_INPUT_MAX_CHARS,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_IMAGE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_SHELL,
TERMINAL_SANDBOX_NETWORK,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
TERMINAL_SANDBOX_BINDS,
TERMINAL_SANDBOX_BIN,
TERMINAL_SANDBOX_NAME_PREFIX,
TERMINAL_SANDBOX_ENV,
TERMINAL_SANDBOX_REQUIRE,
)
except ImportError:
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
OUTPUT_FORMATS,
TERMINAL_OUTPUT_WAIT,
TERMINAL_INPUT_MAX_CHARS,
TERMINAL_SANDBOX_MODE,
TERMINAL_SANDBOX_IMAGE,
TERMINAL_SANDBOX_MOUNT_PATH,
TERMINAL_SANDBOX_SHELL,
TERMINAL_SANDBOX_NETWORK,
TERMINAL_SANDBOX_CPUS,
TERMINAL_SANDBOX_MEMORY,
TERMINAL_SANDBOX_BINDS,
TERMINAL_SANDBOX_BIN,
TERMINAL_SANDBOX_NAME_PREFIX,
TERMINAL_SANDBOX_ENV,
TERMINAL_SANDBOX_REQUIRE,
)
class PersistentTerminal:
"""单个持久化终端实例"""
def __init__(
self,
session_name: str,
working_dir: str = None,
shell_command: str = None,
broadcast_callback: Callable = None,
max_buffer_size: int = 20000,
display_size: int = 5000,
project_path: Optional[str] = None,
sandbox_mode: Optional[str] = None,
sandbox_options: Optional[Dict] = None,
):
"""
初始化持久化终端
Args:
session_name: 会话名称
working_dir: 工作目录
shell_command: shell命令None则自动选择
broadcast_callback: 广播回调函数用于WebSocket
max_buffer_size: 最大缓冲区大小
display_size: 显示大小限制
"""
self.session_name = session_name
self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd()
self.project_path = Path(project_path).resolve() if project_path else self.working_dir
self.host_shell_command = shell_command
self.shell_command = shell_command
self.broadcast = broadcast_callback
self.max_buffer_size = max_buffer_size
self.display_size = display_size
# 进程相关
self.process = None
self.is_running = False
self.start_time = None
# 输出缓冲
self.output_buffer = []
self.command_history = []
self.total_output_size = 0
self.truncated_lines = 0
self.output_history = deque()
self._output_event_counter = 0
self.last_output_time = None
self.last_input_time = None
self.last_input_text = ""
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
self.io_history = deque()
self._io_history_max = 4000
# 线程和队列
self.output_queue = queue.Queue()
self.reader_thread = None
self.is_reading = False
# 状态标志
self.is_interactive = False # 是否在等待输入
self.last_command = ""
self.last_activity = time.time()
# 系统特定设置
self.is_windows = sys.platform == "win32"
sandbox_defaults = {
"image": TERMINAL_SANDBOX_IMAGE,
"mount_path": TERMINAL_SANDBOX_MOUNT_PATH,
"shell": TERMINAL_SANDBOX_SHELL,
"network": TERMINAL_SANDBOX_NETWORK,
"cpus": TERMINAL_SANDBOX_CPUS,
"memory": TERMINAL_SANDBOX_MEMORY,
"binds": list(TERMINAL_SANDBOX_BINDS),
"bin": TERMINAL_SANDBOX_BIN,
"name_prefix": TERMINAL_SANDBOX_NAME_PREFIX,
"env": dict(TERMINAL_SANDBOX_ENV),
"require": TERMINAL_SANDBOX_REQUIRE,
}
if sandbox_options:
for key, value in sandbox_options.items():
if key == "binds" and isinstance(value, list):
sandbox_defaults[key] = list(value)
elif key == "env" and isinstance(value, dict):
sandbox_defaults[key] = dict(value)
else:
sandbox_defaults[key] = value
self.sandbox_mode = (sandbox_mode or TERMINAL_SANDBOX_MODE or "host").lower()
self.sandbox_options = sandbox_defaults
self.sandbox_required = bool(self.sandbox_options.get("require"))
self.sandbox_container_name = None
self.execution_mode = "host"
self.using_container = False
self._sandbox_bin_path = None
self._owns_container = False
def start(self) -> bool:
"""启动终端进程(支持容器沙箱)"""
if self.is_running:
return False
try:
process = None
selected_mode = self.sandbox_mode
if selected_mode == "docker":
try:
process = self._start_docker_terminal()
except Exception as exc:
message = f"容器终端启动失败: {exc}"
if self.sandbox_required:
print(f"{OUTPUT_FORMATS['error']} {message}")
return False
print(f"{OUTPUT_FORMATS['warning']} {message},回退到宿主机终端。")
process = None
selected_mode = "host"
if process is None:
process = self._start_host_terminal()
selected_mode = "host"
if not process:
return False
self.process = process
self.is_running = True
self.execution_mode = "docker" if self.using_container else "host"
self.start_time = datetime.now()
self.last_output_time = None
self.last_input_time = None
self.last_input_text = ""
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
# 启动输出读取线程
self.is_reading = True
self.reader_thread = threading.Thread(target=self._read_output)
self.reader_thread.daemon = True
self.reader_thread.start()
# 宿主机Windows初始化
if self.is_windows and not self.using_container:
time.sleep(0.5)
self.send_command("chcp 65001", wait_for_output=False)
time.sleep(0.5)
self.send_command("cls", wait_for_output=False)
time.sleep(0.3)
self.output_buffer.clear()
self.total_output_size = 0
# 广播终端启动事件
if self.broadcast:
self.broadcast('terminal_started', {
'session': self.session_name,
'working_dir': str(self.working_dir),
'shell': self.shell_command,
'mode': self.execution_mode,
'time': self.start_time.isoformat()
})
mode_label = "容器" if self.using_container else "宿主机"
print(f"{OUTPUT_FORMATS['success']} 终端会话启动({mode_label}): {self.session_name}")
return True
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 终端启动失败: {e}")
self.is_running = False
if self.using_container:
self._stop_sandbox_container(force=True)
return False
def _start_host_terminal(self):
"""启动宿主机终端"""
self.using_container = False
self._owns_container = False
self.is_windows = sys.platform == "win32"
shell_cmd = self.host_shell_command
if self.is_windows:
shell_cmd = shell_cmd or "cmd.exe"
else:
shell_cmd = shell_cmd or os.environ.get('SHELL', '/bin/bash')
self.shell_command = shell_cmd
env = os.environ.copy()
env['PYTHONIOENCODING'] = 'utf-8'
try:
if self.is_windows:
env['CHCP'] = '65001'
process = subprocess.Popen(
shell_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=str(self.working_dir),
shell=False,
bufsize=0,
env=env
)
else:
env['TERM'] = 'xterm-256color'
env['LANG'] = 'en_US.UTF-8'
env['LC_ALL'] = 'en_US.UTF-8'
process = subprocess.Popen(
shell_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=str(self.working_dir),
shell=False,
bufsize=0,
env=env
)
return process
except FileNotFoundError:
print(f"{OUTPUT_FORMATS['error']} 无法找到终端程序: {shell_cmd}")
return None
def _start_docker_terminal(self):
"""启动或连接容器化终端。"""
docker_bin = self.sandbox_options.get("bin") or "docker"
docker_path = shutil.which(docker_bin)
if not docker_path:
message = f"未找到容器运行时: {docker_bin}"
if self.sandbox_required:
raise RuntimeError(message)
print(f"{OUTPUT_FORMATS['warning']} {message}")
return None
self._sandbox_bin_path = docker_path
target_container = self.sandbox_options.get("container_name")
if target_container:
return self._start_existing_container_terminal(docker_path, target_container)
return self._start_new_container_terminal(docker_path)
def _start_existing_container_terminal(self, docker_path: str, container_name: str):
"""通过 docker exec 连接到已有容器。"""
if not self._ensure_container_alive(docker_path, container_name):
raise RuntimeError(f"目标容器未运行: {container_name}")
mount_path = self.sandbox_options.get("mount_path") or "/workspace"
container_workdir = self._resolve_container_workdir(mount_path)
shell_path = self.sandbox_options.get("shell") or "/bin/bash"
cmd = [
docker_path,
"exec",
"-i",
]
if container_workdir:
cmd += ["-w", container_workdir]
envs = {
"PYTHONIOENCODING": "utf-8",
"TERM": "xterm-256color",
}
for key, value in (self.sandbox_options.get("env") or {}).items():
if value is not None:
envs[key] = value
for key, value in envs.items():
cmd += ["-e", f"{key}={value}"]
cmd.append(container_name)
cmd.append(shell_path)
if shell_path.endswith("sh"):
cmd.append("-i")
env = os.environ.copy()
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
env=env
)
self.sandbox_container_name = container_name
self.shell_command = f"{shell_path} (attach:{container_name})"
self.using_container = True
self.is_windows = False
self._owns_container = False
return process
def _start_new_container_terminal(self, docker_path: str):
"""启动全新的容器终端。"""
image = self.sandbox_options.get("image")
if not image:
raise RuntimeError("TERMINAL_SANDBOX_IMAGE 未配置")
mount_path = self.sandbox_options.get("mount_path") or "/workspace"
working_dir = str(self.working_dir)
if not self.working_dir.exists():
self.working_dir.mkdir(parents=True, exist_ok=True)
container_name = f"{self.sandbox_options.get('name_prefix', 'agent-term')}-{uuid.uuid4().hex[:10]}"
cmd = [
docker_path,
"run",
"--rm",
"-i",
"--name",
container_name,
"-w",
mount_path,
"-v",
f"{working_dir}:{mount_path}",
]
network = self.sandbox_options.get("network")
if network:
cmd += ["--network", network]
cpus = self.sandbox_options.get("cpus")
if cpus:
cmd += ["--cpus", cpus]
memory = self.sandbox_options.get("memory")
if memory:
cmd += ["--memory", memory]
for bind in self.sandbox_options.get("binds", []):
bind = bind.strip()
if bind:
cmd += ["-v", bind]
envs = {
"PYTHONIOENCODING": "utf-8",
"TERM": "xterm-256color",
}
for key, value in (self.sandbox_options.get("env") or {}).items():
if value is not None:
envs[key] = value
for key, value in envs.items():
cmd += ["-e", f"{key}={value}"]
cmd.append(image)
shell_path = self.sandbox_options.get("shell") or "/bin/bash"
if shell_path:
cmd.append(shell_path)
if shell_path.endswith("sh"):
cmd.append("-i")
env = os.environ.copy()
try:
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
env=env
)
except FileNotFoundError:
message = f"无法执行容器运行时: {docker_path}"
if self.sandbox_required:
raise RuntimeError(message)
print(f"{OUTPUT_FORMATS['warning']} {message}")
return None
self.sandbox_container_name = container_name
self.shell_command = f"{shell_path} (sandbox:{image})"
self.using_container = True
self.is_windows = False
self._owns_container = True
return process
def _resolve_container_workdir(self, mount_path: str) -> str:
"""推导容器内工作目录路径。"""
mount_path = (mount_path or "/workspace").rstrip("/") or "/workspace"
try:
relative = self.working_dir.relative_to(self.project_path)
if str(relative) == ".":
return mount_path
return f"{mount_path}/{relative.as_posix()}"
except Exception:
return mount_path
def _ensure_container_alive(self, docker_path: str, container_name: str) -> bool:
"""确认目标容器正在运行。"""
try:
result = subprocess.run(
[
docker_path,
"inspect",
"-f",
"{{.State.Running}}",
container_name,
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
timeout=3,
check=False,
)
except (OSError, subprocess.SubprocessError):
return False
return result.returncode == 0 and result.stdout.strip().lower() == "true"
def _read_output(self):
"""后台线程:持续读取输出(修复版,正确处理编码)"""
while self.is_reading and self.process:
try:
# 始终读取字节因为我们没有使用text=True
line_bytes = self.process.stdout.readline()
if line_bytes:
# 解码字节到字符串
line = self._decode_output(line_bytes)
# 处理输出
self.output_queue.put(line)
self._process_output(line)
elif self.process.poll() is not None:
# 进程已结束
self.is_running = False
break
else:
# 没有输出,短暂休眠
time.sleep(0.01)
except Exception as e:
# 不要因为单个错误而停止
print(f"[Terminal] 读取输出警告: {e}")
time.sleep(0.01)
continue
def _decode_output(self, data):
"""安全地解码输出"""
# 如果已经是字符串,直接返回
if isinstance(data, str):
return data
# 如果是字节,尝试解码
if isinstance(data, bytes):
# Windows系统尝试的编码顺序
if self.is_windows:
encodings = ['utf-8', 'gbk', 'gb2312', 'cp936', 'latin-1']
else:
encodings = ['utf-8', 'latin-1']
for encoding in encodings:
try:
return data.decode(encoding)
except (UnicodeDecodeError, AttributeError):
continue
# 如果所有编码都失败,使用替换模式
return data.decode('utf-8', errors='replace')
# 其他类型,转换为字符串
return str(data)
def _process_output(self, output: str):
"""处理输出行"""
# 添加到缓冲区
self.output_buffer.append(output)
self.total_output_size += len(output)
now = time.time()
self.last_output_time = now
# 记录输出事件
self._output_event_counter += 1
self.output_history.append((self._output_event_counter, now, output))
self._append_io_event('output', output, timestamp=now)
# 控制输出历史长度
if len(self.output_history) > 2000:
self.output_history.popleft()
# 检查是否需要截断
if self.total_output_size > self.max_buffer_size:
self._truncate_buffer()
# 更新活动时间
self.last_activity = now
# 检测命令回显死循环
cleaned_output = output.replace('\r', '').strip()
cleaned_input = self.last_input_text.strip() if self.last_input_text else ""
if cleaned_output and cleaned_input and cleaned_output == cleaned_input:
self._consecutive_echo_matches += 1
else:
self._consecutive_echo_matches = 0
if cleaned_output:
self.echo_loop_detected = False
if self._consecutive_echo_matches >= 1 and self.last_input_time:
if now - self.last_input_time <= 2:
self.echo_loop_detected = True
# 检测交互式提示
self._detect_interactive_prompt(output)
# 广播输出
if self.broadcast:
self.broadcast('terminal_output', {
'session': self.session_name,
'data': output,
'timestamp': time.time()
})
def _truncate_buffer(self):
"""截断缓冲区以保持在限制内"""
# 保留最后的N个字符
while self.total_output_size > self.max_buffer_size and self.output_buffer:
removed = self.output_buffer.pop(0)
self.total_output_size -= len(removed)
self.truncated_lines += 1
if self.output_history:
self.output_history.popleft()
def _detect_interactive_prompt(self, output: str):
"""检测是否在等待交互输入"""
self.is_interactive = False
# 常见的交互提示模式
interactive_patterns = [
"? ", # 问题提示
": ", # 输入提示
"> ", # 命令提示
"$ ", # shell提示
"# ", # root提示
">>> ", # Python提示
"... ", # Python续行
"(y/n)", # 确认提示
"[Y/n]", # 确认提示
"Password:", # 密码提示
"password:", # 密码提示
"Enter", # 输入提示
"选择", # 中文选择
"请输入", # 中文输入
]
output_lower = output.lower().strip()
for pattern in interactive_patterns:
if pattern.lower() in output_lower:
self.is_interactive = True
return
# 如果输出以常见提示符结尾且没有换行,也认为是交互式
if output and not output.endswith('\n'):
last_chars = output.strip()[-3:]
if last_chars in ['> ', '$ ', '# ', ': ']:
self.is_interactive = True
def _capture_history_marker(self) -> int:
return self._output_event_counter
def _get_output_since_marker(self, marker: int) -> str:
if marker is None:
return ''.join(item[2] for item in self.output_history)
return ''.join(item[2] for item in self.output_history if item[0] > marker)
def _append_io_event(self, event_type: str, data: str, timestamp: Optional[float] = None):
"""记录终端输入输出事件"""
if timestamp is None:
timestamp = time.time()
self.io_history.append((event_type, timestamp, data))
while len(self.io_history) > self._io_history_max:
self.io_history.popleft()
def _seconds_since_last_output(self) -> Optional[float]:
if not self.last_output_time:
return None
return round(time.time() - self.last_output_time, 3)
def send_command(self, command: str, wait_for_output: bool = True, timeout: float = None) -> Dict:
"""发送命令到终端(统一编码处理)"""
if not self.is_running or not self.process:
return {
"success": False,
"error": "终端未运行",
"session": self.session_name
}
try:
marker = self._capture_history_marker()
if timeout is None:
timeout = TERMINAL_OUTPUT_WAIT
else:
try:
timeout = float(timeout)
except (TypeError, ValueError):
timeout = TERMINAL_OUTPUT_WAIT
if timeout < 0:
timeout = 0
start_time = time.time()
command_text = command.rstrip('\n')
# 记录命令
self.command_history.append({
"command": command_text,
"timestamp": datetime.now().isoformat()
})
self.last_command = command_text
self.is_interactive = False
self.last_input_text = command_text
self.last_input_time = time.time()
self.echo_loop_detected = False
self._consecutive_echo_matches = 0
self._append_io_event('input', command_text + '\n', timestamp=self.last_input_time)
# 广播输入事件
if self.broadcast:
self.broadcast('terminal_input', {
'session': self.session_name,
'data': command_text + '\n',
'timestamp': time.time()
})
# 确保命令有换行符
to_send = command if command.endswith('\n') else command + '\n'
# 发送命令统一使用UTF-8编码
try:
# 首先尝试UTF-8
command_bytes = to_send.encode('utf-8')
except UnicodeEncodeError:
# 如果UTF-8失败Windows系统尝试GBK
if self.is_windows:
command_bytes = to_send.encode('gbk', errors='replace')
else:
command_bytes = to_send.encode('utf-8', errors='replace')
self.process.stdin.write(command_bytes)
self.process.stdin.flush()
# 如果需要等待输出
if wait_for_output:
output = self._wait_for_output(timeout=timeout)
recent_output = self._get_output_since_marker(marker)
if recent_output:
output = recent_output
output_truncated = False
if len(output) > TERMINAL_INPUT_MAX_CHARS:
output = output[-TERMINAL_INPUT_MAX_CHARS:]
output_truncated = True
output_clean = output.strip()
has_output = bool(output_clean)
status = "completed"
if not has_output:
if self.echo_loop_detected:
status = "echo_loop"
elif self.is_interactive:
status = "awaiting_input"
else:
status = "no_output"
else:
if self.echo_loop_detected:
status = "output_with_echo"
message_map = {
"completed": "命令执行完成,已捕获终端输出",
"no_output": "未捕获任何输出,命令可能未产生可见结果或终端已卡死需要重制",
"awaiting_input": "命令已发送,终端正在等待进一步输入或进程仍在运行",
"echo_loop": "检测到终端正在回显输入,命令可能未成功执行",
"output_with_echo": "命令产生输出,但终端疑似重复回显,请检查是否卡住"
}
message = message_map.get(status, "命令执行完成")
if output_truncated:
message += f"(输出已截断,保留末尾{TERMINAL_INPUT_MAX_CHARS}字符)"
return {
"success": True,
"session": self.session_name,
"command": command_text,
"output": output,
"message": message,
"status": status,
"truncated": output_truncated
}
else:
return {
"success": True,
"session": self.session_name,
"command": command_text,
"output": "",
"message": "命令已发送至终端,后续输出将实时流式返回",
"status": "pending",
"truncated": False
}
except Exception as e:
error_msg = f"发送命令失败: {str(e)}"
print(f"{OUTPUT_FORMATS['error']} {error_msg}")
return {
"success": False,
"error": error_msg,
"session": self.session_name
}
def _wait_for_output(self, timeout: float = 5) -> str:
"""等待并收集输出"""
collected_output = []
start_time = time.time()
last_output_time = time.time()
if timeout is None or timeout <= 0:
timeout = 0
if timeout == 0:
try:
while True:
output = self.output_queue.get_nowait()
collected_output.append(output)
except queue.Empty:
return ''.join(collected_output)
while time.time() - start_time < timeout:
try:
remaining = max(0.05, min(0.5, timeout - (time.time() - start_time)))
output = self.output_queue.get(timeout=remaining)
collected_output.append(output)
last_output_time = time.time()
# 快速收集剩余输出,直到短暂空闲
while True:
try:
output = self.output_queue.get(timeout=0.1)
collected_output.append(output)
last_output_time = time.time()
except queue.Empty:
break
except queue.Empty:
if collected_output and time.time() - last_output_time > 0.3:
break
if timeout == 0:
break
return ''.join(collected_output)
def get_output(self, last_n_lines: int = 50) -> str:
"""
获取终端输出
Args:
last_n_lines: 获取最后N行
Returns:
输出内容
"""
if last_n_lines <= 0:
return ''.join(self.output_buffer)
# 获取最后N行
lines = []
for line in reversed(self.output_buffer):
lines.insert(0, line)
if len(lines) >= last_n_lines:
break
return ''.join(lines)
def get_display_output(self) -> str:
"""获取用于显示的输出截断到display_size"""
output = self.get_output()
if len(output) > self.display_size:
# 保留最后的display_size字符
output = output[-self.display_size:]
output = f"[输出已截断,显示最后{self.display_size}字符]\n{output}"
return output
def get_snapshot(self, last_n_lines: Optional[int], max_chars: Optional[int]) -> Dict:
"""获取终端快照,包含按顺序排列的输入/输出"""
include_all_lines = last_n_lines is None or last_n_lines <= 0
if not include_all_lines:
last_n_lines = max(1, last_n_lines)
segments: List[str] = []
reset = "\x1b[0m"
command_color = "\x1b[1;32m"
for event_type, _, data in self.io_history:
if event_type == 'input':
# 显示输入命令并加上ANSI颜色保持与实时终端一致
input_text = data.rstrip('\n')
decorated = f"{command_color}{input_text}{reset}" if input_text.strip() else f"{command_color}{reset}"
if not decorated.endswith('\n'):
decorated += '\n'
segments.append(decorated)
else:
cleaned = data.replace('\r', '')
segments.append(cleaned)
full_text = ''.join(segments)
if full_text:
line_chunks = full_text.splitlines(keepends=True)
total_lines = len(line_chunks)
if include_all_lines:
selected_lines = line_chunks
lines_requested = total_lines
else:
count = min(last_n_lines, total_lines)
selected_lines = line_chunks[-count:] if count else []
lines_requested = last_n_lines
output_text = ''.join(selected_lines)
else:
output_text = ''
lines_requested = 0
total_lines = 0
truncated = False
if max_chars is not None and max_chars > 0 and len(output_text) > max_chars:
output_text = output_text[-max_chars:]
truncated = True
# 统计行数
if output_text:
lines_returned = output_text.count('\n') + (0 if output_text.endswith('\n') else 1)
else:
lines_returned = 0
if include_all_lines:
lines_requested = total_lines
return {
"success": True,
"session": self.session_name,
"output": output_text,
"lines_requested": lines_requested,
"lines_returned": lines_returned,
"truncated": truncated,
"is_interactive": self.is_interactive,
"echo_loop_detected": self.echo_loop_detected,
"seconds_since_last_output": self._seconds_since_last_output(),
"last_command": self.last_command,
"buffer_size": self.total_output_size,
"timestamp": datetime.now().isoformat()
}
def get_status(self) -> Dict:
"""获取终端状态"""
return {
"session_name": self.session_name,
"is_running": self.is_running,
"working_dir": str(self.working_dir),
"shell": self.shell_command,
"execution_mode": self.execution_mode,
"start_time": self.start_time.isoformat() if self.start_time else None,
"is_interactive": self.is_interactive,
"last_command": self.last_command,
"command_count": len(self.command_history),
"buffer_size": self.total_output_size,
"truncated_lines": self.truncated_lines,
"last_activity": datetime.fromtimestamp(self.last_activity).isoformat(),
"uptime_seconds": (datetime.now() - self.start_time).total_seconds() if self.start_time else 0,
"seconds_since_last_output": self._seconds_since_last_output(),
"echo_loop_detected": self.echo_loop_detected
}
def close(self) -> bool:
"""关闭终端"""
if not self.is_running:
return False
try:
# 停止读取线程
self.is_reading = False
# 发送退出命令
if self.process and self.process.poll() is None:
exit_cmd = "exit\n"
try:
self.process.stdin.write(exit_cmd.encode('utf-8'))
self.process.stdin.flush()
except:
pass
# 等待进程结束
try:
self.process.wait(timeout=2)
except subprocess.TimeoutExpired:
# 强制终止
self.process.terminate()
time.sleep(0.5)
if self.process.poll() is None:
self.process.kill()
self.is_running = False
# 等待读取线程结束
if self.reader_thread and self.reader_thread.is_alive():
self.reader_thread.join(timeout=1)
# 广播终端关闭事件
if self.broadcast:
self.broadcast('terminal_closed', {
'session': self.session_name,
'time': datetime.now().isoformat()
})
if self.using_container:
self._stop_sandbox_container()
self.using_container = False
self.execution_mode = "host"
print(f"{OUTPUT_FORMATS['info']} 终端会话关闭: {self.session_name}")
return True
except Exception as e:
print(f"{OUTPUT_FORMATS['error']} 关闭终端失败: {e}")
return False
def __del__(self):
"""析构函数,确保进程被关闭"""
if hasattr(self, 'is_running') and self.is_running:
self.close()
def _stop_sandbox_container(self, force: bool = False):
"""确保容器终端被停止"""
if not self._owns_container or not self.sandbox_container_name or not self._sandbox_bin_path:
return
try:
subprocess.run(
[self._sandbox_bin_path, "kill", self.sandbox_container_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=3,
check=False
)
except Exception:
if force:
print(f"{OUTPUT_FORMATS['warning']} 强制终止容器 {self.sandbox_container_name} 失败,可能已退出。")
finally:
self.sandbox_container_name = None
self._owns_container = False