336 lines
11 KiB
Python
336 lines
11 KiB
Python
# modules/gui_file_manager.py - GUI 文件管理专用服务
|
|
|
|
import os
|
|
import shutil
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
@dataclass
|
|
class FileEntry:
|
|
"""用于前端显示的文件/目录条目"""
|
|
|
|
name: str
|
|
path: str
|
|
type: str # file / directory
|
|
size: int
|
|
modified_at: float
|
|
extension: Optional[str]
|
|
is_editable: bool
|
|
|
|
|
|
class GuiFileManager:
|
|
"""面向 GUI 的文件管理器,实现桌面式操作所需的能力"""
|
|
|
|
EDITABLE_EXTENSIONS = {
|
|
".txt",
|
|
".md",
|
|
".py",
|
|
".js",
|
|
".ts",
|
|
".json",
|
|
".yaml",
|
|
".yml",
|
|
".html",
|
|
".css",
|
|
".scss",
|
|
".less",
|
|
".xml",
|
|
".csv",
|
|
".ini",
|
|
".cfg",
|
|
".toml",
|
|
".sh",
|
|
".bat",
|
|
".java",
|
|
".kt",
|
|
".go",
|
|
".rs",
|
|
".c",
|
|
".cpp",
|
|
".h",
|
|
".hpp",
|
|
".vue",
|
|
".svelte",
|
|
".php",
|
|
".rb",
|
|
".swift",
|
|
".dart",
|
|
".sql",
|
|
}
|
|
|
|
MAX_TEXT_FILE_SIZE = 2 * 1024 * 1024 # 2MB
|
|
|
|
def __init__(self, base_path: str):
|
|
self.base_path = Path(base_path).expanduser().resolve()
|
|
if not self.base_path.exists():
|
|
raise ValueError("Base path does not exist")
|
|
|
|
# -------------------------
|
|
# 路径解析与安全检查
|
|
# -------------------------
|
|
|
|
def _resolve(self, relative: Optional[str]) -> Path:
|
|
relative = (relative or "").strip()
|
|
target = (self.base_path if not relative else (self.base_path / relative)).resolve()
|
|
try:
|
|
target.relative_to(self.base_path)
|
|
except ValueError:
|
|
raise ValueError("路径越界")
|
|
return target
|
|
|
|
def _to_relative(self, absolute: Path) -> str:
|
|
return str(absolute.relative_to(self.base_path)).replace("\\", "/")
|
|
|
|
def _unique_name(self, directory: Path, name: str) -> Path:
|
|
candidate = directory / name
|
|
if not candidate.exists():
|
|
return candidate
|
|
|
|
stem = candidate.stem
|
|
suffix = candidate.suffix
|
|
counter = 1
|
|
while candidate.exists():
|
|
candidate = directory / f"{stem}_copy{counter if counter > 1 else ''}{suffix}"
|
|
counter += 1
|
|
return candidate
|
|
|
|
def _check_editable(self, path: Path) -> bool:
|
|
if not path.is_file():
|
|
return False
|
|
if path.stat().st_size > self.MAX_TEXT_FILE_SIZE:
|
|
return False
|
|
ext = path.suffix.lower()
|
|
if ext in self.EDITABLE_EXTENSIONS:
|
|
return True
|
|
# 无扩展名的文本文件尝试 UTF-8 读取判断
|
|
if not ext:
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
fh.read(2048)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
# -------------------------
|
|
# 列表与元数据
|
|
# -------------------------
|
|
|
|
def list_directory(self, relative: Optional[str] = None) -> Tuple[str, List[FileEntry]]:
|
|
directory = self._resolve(relative)
|
|
if not directory.exists():
|
|
raise FileNotFoundError("目录不存在")
|
|
if not directory.is_dir():
|
|
raise NotADirectoryError("目标不是目录")
|
|
|
|
entries: List[FileEntry] = []
|
|
for entry in sorted(directory.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
|
|
stat = entry.stat()
|
|
entries.append(
|
|
FileEntry(
|
|
name=entry.name,
|
|
path=self._to_relative(entry),
|
|
type="directory" if entry.is_dir() else "file",
|
|
size=stat.st_size,
|
|
modified_at=stat.st_mtime,
|
|
extension=entry.suffix.lower() if entry.is_file() else None,
|
|
is_editable=self._check_editable(entry),
|
|
)
|
|
)
|
|
relative_path = "" if directory == self.base_path else self._to_relative(directory)
|
|
return relative_path, entries
|
|
|
|
def breadcrumb(self, relative: Optional[str]) -> List[Dict[str, str]]:
|
|
crumbs: List[Dict[str, str]] = []
|
|
directory = self._resolve(relative)
|
|
try:
|
|
directory.relative_to(self.base_path)
|
|
except ValueError:
|
|
raise ValueError("路径越界")
|
|
|
|
current = directory
|
|
while True:
|
|
relative_path = "" if current == self.base_path else self._to_relative(current)
|
|
crumbs.append({"name": current.name if current != self.base_path else "根目录", "path": relative_path})
|
|
if current == self.base_path:
|
|
break
|
|
current = current.parent
|
|
crumbs.reverse()
|
|
return crumbs
|
|
|
|
# -------------------------
|
|
# 基本操作
|
|
# -------------------------
|
|
|
|
def create_entry(self, parent_relative: Optional[str], name: str, entry_type: str) -> str:
|
|
parent = self._resolve(parent_relative)
|
|
if not parent.exists():
|
|
raise FileNotFoundError("父目录不存在")
|
|
if not parent.is_dir():
|
|
raise NotADirectoryError("父路径不是目录")
|
|
|
|
sanitized = name.strip()
|
|
if not sanitized:
|
|
raise ValueError("名称不能为空")
|
|
|
|
target = parent / sanitized
|
|
if target.exists():
|
|
raise FileExistsError("同名文件或目录已存在")
|
|
|
|
if entry_type == "directory":
|
|
target.mkdir(parents=False, exist_ok=False)
|
|
elif entry_type == "file":
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
target.touch()
|
|
else:
|
|
raise ValueError("不支持的类型")
|
|
|
|
return self._to_relative(target)
|
|
|
|
def delete_entries(self, relative_paths: List[str]) -> Dict[str, str]:
|
|
results: Dict[str, str] = {}
|
|
for rel in relative_paths:
|
|
target = self._resolve(rel)
|
|
if not target.exists():
|
|
results[rel] = "missing"
|
|
continue
|
|
try:
|
|
if target.is_dir():
|
|
shutil.rmtree(target)
|
|
else:
|
|
target.unlink()
|
|
results[rel] = "deleted"
|
|
except Exception as exc:
|
|
results[rel] = f"error: {exc}"
|
|
return results
|
|
|
|
def rename_entry(self, relative_path: str, new_name: str) -> str:
|
|
target = self._resolve(relative_path)
|
|
if not target.exists():
|
|
raise FileNotFoundError("目标不存在")
|
|
|
|
parent = target.parent
|
|
sanitized = new_name.strip()
|
|
if not sanitized:
|
|
raise ValueError("新名称不能为空")
|
|
new_path = parent / sanitized
|
|
if new_path.exists():
|
|
raise FileExistsError("目标名称已存在")
|
|
|
|
target.rename(new_path)
|
|
return self._to_relative(new_path)
|
|
|
|
def copy_entries(self, relative_paths: List[str], destination_relative: str) -> Dict[str, str]:
|
|
destination = self._resolve(destination_relative)
|
|
if not destination.exists() or not destination.is_dir():
|
|
raise NotADirectoryError("目标目录不存在")
|
|
|
|
results: Dict[str, str] = {}
|
|
for rel in relative_paths:
|
|
source = self._resolve(rel)
|
|
if not source.exists():
|
|
results[rel] = "missing"
|
|
continue
|
|
try:
|
|
target = self._unique_name(destination, source.name)
|
|
if source.is_dir():
|
|
shutil.copytree(source, target)
|
|
else:
|
|
shutil.copy2(source, target)
|
|
results[rel] = self._to_relative(target)
|
|
except Exception as exc:
|
|
results[rel] = f"error: {exc}"
|
|
return results
|
|
|
|
def move_entries(self, relative_paths: List[str], destination_relative: str) -> Dict[str, str]:
|
|
destination = self._resolve(destination_relative)
|
|
if not destination.exists() or not destination.is_dir():
|
|
raise NotADirectoryError("目标目录不存在")
|
|
|
|
results: Dict[str, str] = {}
|
|
for rel in relative_paths:
|
|
source = self._resolve(rel)
|
|
if not source.exists():
|
|
results[rel] = "missing"
|
|
continue
|
|
try:
|
|
target = destination / source.name
|
|
if target.exists():
|
|
target = self._unique_name(destination, source.name)
|
|
shutil.move(str(source), str(target))
|
|
results[rel] = self._to_relative(target)
|
|
except Exception as exc:
|
|
results[rel] = f"error: {exc}"
|
|
return results
|
|
|
|
# -------------------------
|
|
# 文本读写
|
|
# -------------------------
|
|
|
|
def read_text(self, relative_path: str) -> Tuple[str, str]:
|
|
target = self._resolve(relative_path)
|
|
if not target.exists():
|
|
raise FileNotFoundError("文件不存在")
|
|
if not target.is_file():
|
|
raise IsADirectoryError("目标是目录")
|
|
|
|
size = target.stat().st_size
|
|
if size > self.MAX_TEXT_FILE_SIZE:
|
|
raise ValueError("文件过大,暂不支持直接编辑")
|
|
|
|
try:
|
|
with open(target, "r", encoding="utf-8") as fh:
|
|
content = fh.read()
|
|
except UnicodeDecodeError as exc:
|
|
raise ValueError(f"文件不是 UTF-8 编码: {exc}") from exc
|
|
|
|
return content, datetime.fromtimestamp(target.stat().st_mtime).isoformat()
|
|
|
|
def write_text(self, relative_path: str, content: str) -> Dict[str, str]:
|
|
target = self._resolve(relative_path)
|
|
if not target.exists():
|
|
raise FileNotFoundError("文件不存在")
|
|
if not target.is_file():
|
|
raise IsADirectoryError("目标是目录")
|
|
|
|
size = len(content.encode("utf-8"))
|
|
if size > self.MAX_TEXT_FILE_SIZE:
|
|
raise ValueError("内容过大,超出限制")
|
|
|
|
with open(target, "w", encoding="utf-8") as fh:
|
|
fh.write(content)
|
|
|
|
stat = target.stat()
|
|
return {
|
|
"path": self._to_relative(target),
|
|
"size": str(stat.st_size),
|
|
"modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
}
|
|
|
|
# -------------------------
|
|
# 上传与下载
|
|
# -------------------------
|
|
|
|
def prepare_upload(self, destination_relative: Optional[str], filename: str) -> Path:
|
|
destination = self._resolve(destination_relative)
|
|
if not destination.exists():
|
|
destination.mkdir(parents=True, exist_ok=True)
|
|
if not destination.is_dir():
|
|
raise NotADirectoryError("上传目标必须是目录")
|
|
sanitized = filename.strip()
|
|
if not sanitized:
|
|
raise ValueError("文件名不能为空")
|
|
target = destination / sanitized
|
|
target = self._unique_name(destination, target.name) if target.exists() else target
|
|
return target
|
|
|
|
def prepare_download(self, relative_path: str) -> Path:
|
|
target = self._resolve(relative_path)
|
|
if not target.exists():
|
|
raise FileNotFoundError("文件不存在")
|
|
return target
|
|
|