# modules/file_manager.py - 文件管理模块(添加行编辑功能) import os import shutil from pathlib import Path from typing import Optional, Dict, List, Tuple, TYPE_CHECKING from datetime import datetime try: from config import ( MAX_FILE_SIZE, FORBIDDEN_PATHS, FORBIDDEN_ROOT_PATHS, OUTPUT_FORMATS, READ_TOOL_MAX_FILE_SIZE, PROJECT_MAX_STORAGE_BYTES, ) except ImportError: # 兼容全局环境中存在同名包的情况 import sys from pathlib import Path project_root = Path(__file__).resolve().parents[1] if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from config import ( MAX_FILE_SIZE, FORBIDDEN_PATHS, FORBIDDEN_ROOT_PATHS, OUTPUT_FORMATS, READ_TOOL_MAX_FILE_SIZE, PROJECT_MAX_STORAGE_BYTES, ) from modules.container_file_proxy import ContainerFileProxy from utils.logger import setup_logger if TYPE_CHECKING: from modules.user_container_manager import ContainerHandle # 临时禁用长度检查 DISABLE_LENGTH_CHECK = True logger = setup_logger(__name__) class FileManager: def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None): self.project_path = Path(project_path).resolve() self.container_session: Optional["ContainerHandle"] = None self._container_proxy: Optional[ContainerFileProxy] = None self.set_container_session(container_session) def set_container_session(self, container_session: Optional["ContainerHandle"]): self.container_session = container_session if ( container_session and container_session.mode == "docker" and container_session.container_name ): if self._container_proxy is None: self._container_proxy = ContainerFileProxy(container_session) else: self._container_proxy.update_session(container_session) else: self._container_proxy = None def _use_container(self) -> bool: return self._container_proxy is not None and self._container_proxy.is_available() def _container_call(self, action: str, payload: Dict) -> Dict: if not self._use_container(): return { "success": False, "error": "容器未就绪,无法执行文件操作" } return self._container_proxy.run(action, payload) def _get_project_size(self) -> int: """计算项目目录的总大小(字节),遇到异常时记录并抛出。""" total = 0 if not self.project_path.exists(): return 0 for path in self.project_path.rglob('*'): if not path.is_file(): continue try: total += path.stat().st_size except Exception as exc: logger.error( "Failed to stat %s while calculating project size: %s", path, exc, exc_info=True, ) raise return total def _validate_path(self, path: str) -> Tuple[bool, str, Path]: """ 验证路径安全性 Returns: (是否有效, 错误信息, 完整路径) """ project_root = Path(self.project_path).resolve() if project_root != self.project_path: self.project_path = project_root # 不允许绝对路径(除非是在项目内的绝对路径) if path.startswith('/') or path.startswith('\\') or (len(path) > 1 and path[1] == ':'): # 如果是绝对路径,检查是否指向项目内 try: test_path = Path(path).resolve() test_path.relative_to(project_root) # 如果成功,说明绝对路径在项目内,转换为相对路径 path = str(test_path.relative_to(project_root)) except ValueError: return False, "路径必须在项目文件夹内", None # 检查是否包含向上遍历 if ".." in path: return False, "不允许使用../向上遍历", None # 构建完整路径 full_path = (project_root / path).resolve() # 检查是否在项目目录内 try: full_path.relative_to(project_root) except ValueError: return False, "路径必须在项目文件夹内", None # 检查禁止的路径 path_str = str(full_path) for forbidden_root in FORBIDDEN_ROOT_PATHS: if path_str == forbidden_root: return False, f"禁止访问根目录: {forbidden_root}", None for forbidden in FORBIDDEN_PATHS: if path_str.startswith(forbidden + os.sep) or path_str == forbidden: return False, f"禁止访问系统目录: {forbidden}", None return True, "", full_path def _relative_path(self, full_path: Path) -> str: return str(full_path.relative_to(self.project_path)) def create_file(self, path: str, content: str = "", file_type: str = "txt") -> Dict: """创建文件""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} # 添加文件扩展名 if not full_path.suffix: full_path = full_path.with_suffix(f".{file_type}") relative_path = self._relative_path(full_path) try: if full_path.parent == self.project_path: return { "success": False, "error": "禁止在项目根目录直接创建文件,请先创建或选择子目录。", "suggestion": "创建文件所属文件夹,在其中创建新文件。" } if self._use_container(): result = self._container_call("create_file", { "path": relative_path, "content": "" }) if result.get("success"): print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}") return result # 创建父目录 full_path.parent.mkdir(parents=True, exist_ok=True) with open(full_path, 'w', encoding='utf-8') as f: f.write("") print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}") return { "success": True, "path": relative_path, "size": 0 } except Exception as e: return {"success": False, "error": str(e)} def delete_file(self, path: str) -> Dict: """删除文件""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} try: relative_path = self._relative_path(full_path) if self._use_container(): result = self._container_call("delete_file", { "path": relative_path }) if result.get("success"): print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}") return result full_path.unlink() print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}") # 删除文件备注(如果存在) # 这需要通过context_manager处理,但file_manager没有直接访问权限 # 所以返回相对路径,让调用者处理备注删除 return { "success": True, "path": relative_path, "action": "deleted" } except Exception as e: return {"success": False, "error": str(e)} def rename_file(self, old_path: str, new_path: str) -> Dict: """重命名文件""" valid_old, error_old, full_old_path = self._validate_path(old_path) if not valid_old: return {"success": False, "error": error_old} valid_new, error_new, full_new_path = self._validate_path(new_path) if not valid_new: return {"success": False, "error": error_new} if not full_old_path.exists(): return {"success": False, "error": "原文件不存在"} if full_new_path.exists(): return {"success": False, "error": "目标文件已存在"} try: old_relative = self._relative_path(full_old_path) new_relative = self._relative_path(full_new_path) if self._use_container(): result = self._container_call("rename_file", { "old_path": old_relative, "new_path": new_relative }) if result.get("success"): print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}") return result full_old_path.rename(full_new_path) print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}") return { "success": True, "old_path": old_relative, "new_path": new_relative, "action": "renamed" } except Exception as e: return {"success": False, "error": str(e)} def create_folder(self, path: str) -> Dict: """创建文件夹""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if full_path.exists(): return {"success": False, "error": "文件夹已存在"} try: relative_path = self._relative_path(full_path) if self._use_container(): result = self._container_call("create_folder", {"path": relative_path}) if result.get("success"): print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}") return result full_path.mkdir(parents=True, exist_ok=True) print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}") return {"success": True, "path": relative_path} except Exception as e: return {"success": False, "error": str(e)} def delete_folder(self, path: str) -> Dict: """删除文件夹""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件夹不存在"} if not full_path.is_dir(): return {"success": False, "error": "不是文件夹"} try: relative_path = self._relative_path(full_path) if self._use_container(): result = self._container_call("delete_folder", {"path": relative_path}) if result.get("success"): print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}") return result shutil.rmtree(full_path) print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}") return {"success": True, "path": relative_path} except Exception as e: return {"success": False, "error": str(e)} def _read_text_lines( self, full_path: Path, *, size_limit: Optional[int] = None, encoding: str = "utf-8", ) -> Dict: """读取UTF-8文本并返回行列表。""" try: file_size = full_path.stat().st_size except FileNotFoundError: return {"success": False, "error": "文件不存在"} if size_limit and file_size > size_limit: return { "success": False, "error": f"文件太大 ({file_size / 1024 / 1024:.2f}MB > {size_limit / 1024 / 1024}MB)" } try: with open(full_path, 'r', encoding=encoding) as f: lines = f.readlines() except UnicodeDecodeError: return { "success": False, "error": "文件不是 UTF-8 文本,无法直接读取,请改用 run_python 解析。" } except Exception as e: return {"success": False, "error": f"读取文件失败: {e}"} content = "".join(lines) return { "success": True, "content": content, "lines": lines, "size": file_size } def read_file(self, path: str) -> Dict: """读取文件内容(兼容旧逻辑,限制为 MAX_FILE_SIZE)。""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} if self._use_container(): relative_path = self._relative_path(full_path) result = self._container_call("read_file", { "path": relative_path, "size_limit": MAX_FILE_SIZE }) return result result = self._read_text_lines(full_path, size_limit=MAX_FILE_SIZE) if not result["success"]: return result relative_path = str(full_path.relative_to(self.project_path)) return { "success": True, "path": relative_path, "content": result["content"], "size": result["size"] } def read_text_segment( self, path: str, *, start_line: Optional[int] = None, end_line: Optional[int] = None, size_limit: Optional[int] = None ) -> Dict: """按行范围读取文本片段。""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} if self._use_container(): relative_path = self._relative_path(full_path) result = self._container_call("read_text_segment", { "path": relative_path, "start_line": start_line, "end_line": end_line, "size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE }) return result if self._use_container(): relative_path = self._relative_path(full_path) return self._container_call("search_text", { "path": relative_path, "query": query, "max_matches": max_matches, "context_before": context_before, "context_after": context_after, "case_sensitive": case_sensitive, }) if self._use_container(): relative_path = self._relative_path(full_path) return self._container_call("extract_segments", { "path": relative_path, "segments": segments, "size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE }) result = self._read_text_lines( full_path, size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE ) if not result["success"]: return result lines = result["lines"] total_lines = len(lines) start = start_line if start_line and start_line > 0 else 1 end = end_line if end_line and end_line >= start else total_lines if start > total_lines: return {"success": False, "error": "起始行超出文件长度"} end = min(end, total_lines) selected_lines = lines[start - 1 : end] content = "".join(selected_lines) relative_path = str(full_path.relative_to(self.project_path)) return { "success": True, "path": relative_path, "content": content, "size": result["size"], "line_start": start, "line_end": end, "total_lines": total_lines } def search_text( self, path: str, *, query: str, max_matches: int, context_before: int, context_after: int, case_sensitive: bool = False, size_limit: Optional[int] = None ) -> Dict: """在文件中搜索关键词,返回合并后的窗口。""" if not query: return {"success": False, "error": "缺少搜索关键词"} valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} result = self._read_text_lines( full_path, size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE ) if not result["success"]: return result lines = result["lines"] total_lines = len(lines) matches = [] query_text = query if case_sensitive else query.lower() def contains(haystack: str) -> bool: target = haystack if case_sensitive else haystack.lower() return query_text in target for idx, line in enumerate(lines, start=1): if contains(line): window_start = max(1, idx - context_before) window_end = min(total_lines, idx + context_after) if matches and window_start <= matches[-1]["line_end"]: matches[-1]["line_end"] = max(matches[-1]["line_end"], window_end) matches[-1]["hits"].append(idx) else: if len(matches) >= max_matches: break matches.append({ "line_start": window_start, "line_end": window_end, "hits": [idx] }) relative_path = str(full_path.relative_to(self.project_path)) for window in matches: snippet_lines = lines[window["line_start"] - 1 : window["line_end"]] window["snippet"] = "".join(snippet_lines) return { "success": True, "path": relative_path, "size": result["size"], "total_lines": total_lines, "matches": matches } def extract_segments( self, path: str, segments: List[Dict], *, size_limit: Optional[int] = None ) -> Dict: """根据多个行区间提取内容。""" if not segments: return {"success": False, "error": "缺少要提取的行区间"} valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} result = self._read_text_lines( full_path, size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE ) if not result["success"]: return result lines = result["lines"] total_lines = len(lines) extracted = [] for item in segments: if not isinstance(item, dict): return {"success": False, "error": "segments 数组中的每一项都必须是对象"} start_line = item.get("start_line") end_line = item.get("end_line") label = item.get("label") if start_line is None or end_line is None: return {"success": False, "error": "所有区间都必须包含 start_line 和 end_line"} if start_line <= 0 or end_line < start_line: return {"success": False, "error": "行区间不合法"} if start_line > total_lines: return {"success": False, "error": f"区间起点 {start_line} 超出文件行数"} end_line = min(end_line, total_lines) snippet = "".join(lines[start_line - 1 : end_line]) extracted.append({ "label": label, "line_start": start_line, "line_end": end_line, "content": snippet }) relative_path = str(full_path.relative_to(self.project_path)) return { "success": True, "path": relative_path, "size": result["size"], "total_lines": total_lines, "segments": extracted } def write_file(self, path: str, content: str, mode: str = "w") -> Dict: """ 写入文件 Args: path: 文件路径 content: 内容 mode: 写入模式 - "w"(覆盖), "a"(追加) """ valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} # === 新增:内容预处理和验证 === if content: # 长度检查 if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 100KB限制 return { "success": False, "error": f"内容过长({len(content)}字符),超过100KB限制", "suggestion": "请分块处理或使用部分修改方式" } # 检查潜在的JSON格式问题 if content.count('"') % 2 != 0: print(f"{OUTPUT_FORMATS['warning']} 检测到奇数个引号,可能存在格式问题") # 检查大量转义字符 if content.count('\\') > len(content) / 20: print(f"{OUTPUT_FORMATS['warning']} 检测到大量转义字符,建议检查内容格式") try: relative_path = self._relative_path(full_path) current_size = self._get_project_size() existing_size = full_path.stat().st_size if full_path.exists() else 0 if mode == "a": projected_total = current_size + len(content) else: projected_total = current_size - existing_size + len(content) if PROJECT_MAX_STORAGE_BYTES and projected_total > PROJECT_MAX_STORAGE_BYTES: return { "success": False, "error": "写入失败:超出项目磁盘配额", "limit_bytes": PROJECT_MAX_STORAGE_BYTES, "project_size_bytes": current_size, "attempt_size_bytes": len(content) } if self._use_container(): result = self._container_call("write_file", { "path": relative_path, "content": content, "mode": mode }) if result.get("success"): action = "覆盖" if mode == "w" else "追加" print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}") return result # 创建父目录 full_path.parent.mkdir(parents=True, exist_ok=True) with open(full_path, mode, encoding='utf-8') as f: f.write(content) action = "覆盖" if mode == "w" else "追加" print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}") return { "success": True, "path": relative_path, "size": len(content), "mode": mode } except Exception as e: return {"success": False, "error": str(e)} def append_file(self, path: str, content: str) -> Dict: """追加内容到文件""" return self.write_file(path, content, mode="a") def apply_modify_blocks(self, path: str, blocks: List[Dict]) -> Dict: """ 应用批量替换块 Args: path: 目标文件路径 blocks: [{"index": int, "old": str, "new": str}] """ valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} try: relative_path = self._relative_path(full_path) if self._use_container(): return self._container_call("apply_modify_blocks", { "path": relative_path, "blocks": blocks }) with open(full_path, 'r', encoding='utf-8') as f: original_content = f.read() except Exception as e: return {"success": False, "error": f"读取文件失败: {e}"} current_content = original_content results: List[Dict] = [] completed_indices: List[int] = [] failed_details: List[Dict] = [] write_error = None for block in blocks: index = block.get("index") old_text = block.get("old", "") new_text = block.get("new", "") block_result = { "index": index, "status": "pending", "removed_lines": 0, "added_lines": 0, "reason": None, "hint": None } if old_text is None or new_text is None: block_result["status"] = "error" block_result["reason"] = "缺少 OLD 或 NEW 内容" block_result["hint"] = "请确保粘贴的补丁包含成对的 <<>> / <<>> 标记。" failed_details.append({"index": index, "reason": "缺少 OLD/NEW 标记"}) results.append(block_result) continue # 统一换行符,避免 CRLF 与 LF 不一致导致匹配失败 old_text = old_text.replace('\r\n', '\n') new_text = new_text.replace('\r\n', '\n') if not old_text: block_result["status"] = "error" block_result["reason"] = "OLD 内容不能为空" block_result["hint"] = "请确认要替换的原文是否准确复制;若多次失败,可改用 terminal_snapshot 查证或使用终端命令/Python 小脚本进行精确替换。" failed_details.append({"index": index, "reason": "OLD 内容为空"}) results.append(block_result) continue position = current_content.find(old_text) if position == -1: block_result["status"] = "not_found" block_result["reason"] = "未找到匹配的原文,请确认是否完全复制" block_result["hint"] = "请先用 terminal_snapshot 或 grep -n 校验原文;若仍失败,可在说明后改用 run_command/python 进行局部修改。" failed_details.append({"index": index, "reason": "未找到匹配的原文"}) results.append(block_result) continue current_content = ( current_content[:position] + new_text + current_content[position + len(old_text):] ) removed_lines = old_text.count('\n') added_lines = new_text.count('\n') if old_text and not old_text.endswith('\n'): removed_lines += 1 if new_text and not new_text.endswith('\n'): added_lines += 1 block_result.update({ "status": "success", "removed_lines": removed_lines if old_text else 0, "added_lines": added_lines if new_text else 0 }) completed_indices.append(index) results.append(block_result) write_performed = False if completed_indices: try: with open(full_path, 'w', encoding='utf-8') as f: f.write(current_content) write_performed = True except Exception as e: write_error = f"写入文件失败: {e}" # 写入失败时恢复原始内容 try: with open(full_path, 'w', encoding='utf-8') as f: f.write(original_content) except Exception: pass success = bool(completed_indices) and not failed_details and write_error is None return { "success": success, "completed": completed_indices, "failed": failed_details, "results": results, "write_performed": write_performed, "error": write_error } def replace_in_file(self, path: str, old_text: str, new_text: str) -> Dict: """替换文件中的内容""" # 先读取文件 result = self.read_file(path) if not result["success"]: return result content = result["content"] # === 新增:替换操作的安全检查 === if old_text and len(old_text) > 9999999999: return { "success": False, "error": "要替换的文本过长,可能导致性能问题", "suggestion": "请拆分内容或使用 modify_file 提交结构化补丁" } if new_text and len(new_text) > 9999999999: return { "success": False, "error": "替换的新文本过长,建议分块处理", "suggestion": "请将大内容分成多个小的替换操作" } # 检查是否包含要替换的内容 if old_text and old_text not in content: return {"success": False, "error": "未找到要替换的内容"} # 替换内容 if old_text: new_content = content.replace(old_text, new_text) count = content.count(old_text) else: # 空文件直接写入新内容 new_content = new_text count = 1 # 写回文件 result = self.write_file(path, new_content) if result["success"]: result["replacements"] = count print(f"{OUTPUT_FORMATS['file']} 替换了 {count} 处内容") return result def clear_file(self, path: str) -> Dict: """清空文件内容""" return self.write_file(path, "", mode="w") def edit_lines_range(self, path: str, start_line: int, end_line: int, content: str, operation: str) -> Dict: """ 基于行号编辑文件 Args: path: 文件路径 start_line: 起始行号(从1开始) end_line: 结束行号(从1开始,包含) content: 新内容 operation: 操作类型 - "replace", "insert", "delete" """ valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} if not full_path.is_file(): return {"success": False, "error": "不是文件"} # 验证行号 if start_line < 1: return {"success": False, "error": "行号必须从1开始"} if end_line < start_line: return {"success": False, "error": "结束行号不能小于起始行号"} try: relative_path = self._relative_path(full_path) if self._use_container(): return self._container_call("edit_lines_range", { "path": relative_path, "start_line": start_line, "end_line": end_line, "content": content, "operation": operation }) # 读取文件内容 with open(full_path, 'r', encoding='utf-8') as f: lines = f.readlines() total_lines = len(lines) # 检查行号范围 if start_line > total_lines: if operation == "insert": # 插入操作允许在文件末尾后插入 lines.extend([''] * (start_line - total_lines - 1)) lines.append(content if content.endswith('\n') else content + '\n') else: return {"success": False, "error": f"起始行号 {start_line} 超出文件范围 (共 {total_lines} 行)"} elif end_line > total_lines: return {"success": False, "error": f"结束行号 {end_line} 超出文件范围 (共 {total_lines} 行)"} else: # 执行操作(转换为0基索引) start_idx = start_line - 1 end_idx = end_line if operation == "replace": # 替换指定行范围 new_lines = content.split('\n') if '\n' in content else [content] # 确保每行都有换行符,除了最后一行需要检查原文件格式 formatted_lines = [] for i, line in enumerate(new_lines): if i < len(new_lines) - 1 or (end_idx < len(lines) and lines[end_idx - 1].endswith('\n')): formatted_lines.append(line + '\n' if not line.endswith('\n') else line) else: formatted_lines.append(line) lines[start_idx:end_idx] = formatted_lines affected_lines = end_line - start_line + 1 elif operation == "insert": # 在指定行前插入内容 new_lines = content.split('\n') if '\n' in content else [content] formatted_lines = [line + '\n' if not line.endswith('\n') else line for line in new_lines] lines[start_idx:start_idx] = formatted_lines affected_lines = len(formatted_lines) elif operation == "delete": # 删除指定行范围 affected_lines = end_line - start_line + 1 del lines[start_idx:end_idx] else: return {"success": False, "error": f"未知的操作类型: {operation}"} # 写回文件 with open(full_path, 'w', encoding='utf-8') as f: f.writelines(lines) relative_path = str(full_path.relative_to(self.project_path)) # 生成操作描述 if operation == "replace": operation_desc = f"替换第 {start_line}-{end_line} 行" elif operation == "insert": operation_desc = f"在第 {start_line} 行前插入" elif operation == "delete": operation_desc = f"删除第 {start_line}-{end_line} 行" print(f"{OUTPUT_FORMATS['file']} {operation_desc}: {relative_path}") return { "success": True, "path": relative_path, "operation": operation, "start_line": start_line, "end_line": end_line, "affected_lines": affected_lines, "total_lines_after": len(lines), "description": operation_desc } except Exception as e: return {"success": False, "error": str(e)} def list_files(self, path: str = "") -> Dict: """列出目录内容""" if path: valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} else: full_path = self.project_path if not full_path.exists(): return {"success": False, "error": "目录不存在"} if not full_path.is_dir(): return {"success": False, "error": "不是目录"} try: files = [] folders = [] for item in full_path.iterdir(): if item.name.startswith('.'): continue relative_path = str(item.relative_to(self.project_path)) if item.is_file(): files.append({ "name": item.name, "path": relative_path, "size": item.stat().st_size, "modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat() }) elif item.is_dir(): folders.append({ "name": item.name, "path": relative_path }) return { "success": True, "path": str(full_path.relative_to(self.project_path)) if path else ".", "files": sorted(files, key=lambda x: x["name"]), "folders": sorted(folders, key=lambda x: x["name"]) } except Exception as e: return {"success": False, "error": str(e)} def get_file_info(self, path: str) -> Dict: """获取文件信息""" valid, error, full_path = self._validate_path(path) if not valid: return {"success": False, "error": error} if not full_path.exists(): return {"success": False, "error": "文件不存在"} try: stat = full_path.stat() relative_path = str(full_path.relative_to(self.project_path)) return { "success": True, "path": relative_path, "name": full_path.name, "type": "file" if full_path.is_file() else "folder", "size": stat.st_size, "created": datetime.fromtimestamp(stat.st_ctime).isoformat(), "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "extension": full_path.suffix if full_path.is_file() else None } except Exception as e: return {"success": False, "error": str(e)}