1358 lines
51 KiB
Python
1358 lines
51 KiB
Python
# modules/file_manager.py - 文件管理模块(添加行编辑功能)
|
||
|
||
import os
|
||
import shutil
|
||
from pathlib import Path
|
||
import re
|
||
from typing import Optional, Dict, List, Set, Tuple, TYPE_CHECKING
|
||
from datetime import datetime
|
||
try:
|
||
from config import (
|
||
MAX_FILE_SIZE,
|
||
FORBIDDEN_PATHS,
|
||
FORBIDDEN_ROOT_PATHS,
|
||
OUTPUT_FORMATS,
|
||
READ_TOOL_MAX_FILE_SIZE,
|
||
PROJECT_MAX_STORAGE_BYTES,
|
||
)
|
||
except ImportError: # 兼容全局环境中存在同名包的情况
|
||
import sys
|
||
from pathlib import Path
|
||
project_root = Path(__file__).resolve().parents[1]
|
||
if str(project_root) not in sys.path:
|
||
sys.path.insert(0, str(project_root))
|
||
from config import (
|
||
MAX_FILE_SIZE,
|
||
FORBIDDEN_PATHS,
|
||
FORBIDDEN_ROOT_PATHS,
|
||
OUTPUT_FORMATS,
|
||
READ_TOOL_MAX_FILE_SIZE,
|
||
PROJECT_MAX_STORAGE_BYTES,
|
||
)
|
||
from modules.container_file_proxy import ContainerFileProxy
|
||
from utils.logger import setup_logger
|
||
|
||
if TYPE_CHECKING:
|
||
from modules.user_container_manager import ContainerHandle
|
||
|
||
# 临时禁用长度检查
|
||
DISABLE_LENGTH_CHECK = True
|
||
|
||
logger = setup_logger(__name__)
|
||
class FileManager:
|
||
def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None):
|
||
self.project_path = Path(project_path).resolve()
|
||
self.container_session: Optional["ContainerHandle"] = None
|
||
self._container_proxy: Optional[ContainerFileProxy] = None
|
||
self.set_container_session(container_session)
|
||
|
||
def set_container_session(self, container_session: Optional["ContainerHandle"]):
|
||
self.container_session = container_session
|
||
if (
|
||
container_session
|
||
and container_session.mode == "docker"
|
||
and container_session.container_name
|
||
):
|
||
if self._container_proxy is None:
|
||
self._container_proxy = ContainerFileProxy(container_session)
|
||
else:
|
||
self._container_proxy.update_session(container_session)
|
||
else:
|
||
self._container_proxy = None
|
||
|
||
def _use_container(self) -> bool:
|
||
return self._container_proxy is not None and self._container_proxy.is_available()
|
||
|
||
def _container_call(self, action: str, payload: Dict) -> Dict:
|
||
if not self._use_container():
|
||
return {
|
||
"success": False,
|
||
"error": "容器未就绪,无法执行文件操作"
|
||
}
|
||
return self._container_proxy.run(action, payload)
|
||
|
||
def _get_project_size(self) -> int:
|
||
"""计算项目目录的总大小(字节),遇到异常时记录并抛出。"""
|
||
total = 0
|
||
if not self.project_path.exists():
|
||
return 0
|
||
|
||
for path in self.project_path.rglob('*'):
|
||
if not path.is_file():
|
||
continue
|
||
try:
|
||
total += path.stat().st_size
|
||
except Exception as exc:
|
||
logger.error(
|
||
"Failed to stat %s while calculating project size: %s",
|
||
path,
|
||
exc,
|
||
exc_info=True,
|
||
)
|
||
raise
|
||
return total
|
||
|
||
def _validate_path(self, path: str) -> Tuple[bool, str, Path]:
|
||
"""
|
||
验证路径安全性
|
||
|
||
Returns:
|
||
(是否有效, 错误信息, 完整路径)
|
||
"""
|
||
original_path = path
|
||
project_root = Path(self.project_path).resolve()
|
||
if project_root != self.project_path:
|
||
self.project_path = project_root
|
||
|
||
# 不允许绝对路径(除非是在项目内的绝对路径)
|
||
if path.startswith('/') or path.startswith('\\') or (len(path) > 1 and path[1] == ':'):
|
||
# 如果是绝对路径,检查是否指向项目内
|
||
try:
|
||
test_path = Path(path).resolve()
|
||
test_path.relative_to(project_root)
|
||
# 如果成功,说明绝对路径在项目内,转换为相对路径
|
||
path = str(test_path.relative_to(project_root))
|
||
except ValueError:
|
||
if str(original_path).replace("\\", "/").startswith("/workspace"):
|
||
return False, "路径必须在项目文件夹内。请检查是否使用的是不带/workspace的相对路径。", None
|
||
return False, "路径必须在项目文件夹内", None
|
||
|
||
# 检查是否包含向上遍历
|
||
if ".." in path:
|
||
return False, "不允许使用../向上遍历", None
|
||
|
||
# 构建完整路径
|
||
full_path = (project_root / path).resolve()
|
||
|
||
# 检查是否在项目目录内
|
||
try:
|
||
full_path.relative_to(project_root)
|
||
except ValueError:
|
||
return False, "路径必须在项目文件夹内", None
|
||
|
||
# 检查禁止的路径
|
||
path_str = str(full_path)
|
||
|
||
for forbidden_root in FORBIDDEN_ROOT_PATHS:
|
||
if path_str == forbidden_root:
|
||
return False, f"禁止访问根目录: {forbidden_root}", None
|
||
|
||
for forbidden in FORBIDDEN_PATHS:
|
||
if path_str.startswith(forbidden + os.sep) or path_str == forbidden:
|
||
return False, f"禁止访问系统目录: {forbidden}", None
|
||
|
||
return True, "", full_path
|
||
|
||
def _relative_path(self, full_path: Path) -> str:
|
||
return str(full_path.relative_to(self.project_path))
|
||
|
||
def create_file(self, path: str, content: str = "", file_type: str = "txt") -> Dict:
|
||
"""创建文件"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
# 添加文件扩展名
|
||
if not full_path.suffix:
|
||
full_path = full_path.with_suffix(f".{file_type}")
|
||
relative_path = self._relative_path(full_path)
|
||
|
||
try:
|
||
if full_path.parent == self.project_path:
|
||
return {
|
||
"success": False,
|
||
"error": "禁止在项目根目录直接创建文件,请先创建或选择合适的子目录。",
|
||
"suggestion": "然后必须**重新**再次创建文件。"
|
||
}
|
||
if self._use_container():
|
||
result = self._container_call("create_file", {
|
||
"path": relative_path,
|
||
"content": ""
|
||
})
|
||
if result.get("success"):
|
||
print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}")
|
||
return result
|
||
|
||
# 创建父目录
|
||
full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(full_path, 'w', encoding='utf-8') as f:
|
||
f.write("")
|
||
|
||
print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}")
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"size": 0
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def delete_file(self, path: str) -> Dict:
|
||
"""删除文件"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
if self._use_container():
|
||
result = self._container_call("delete_file", {
|
||
"path": relative_path
|
||
})
|
||
if result.get("success"):
|
||
print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}")
|
||
return result
|
||
|
||
full_path.unlink()
|
||
print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}")
|
||
|
||
# 删除文件备注(如果存在)
|
||
# 这需要通过context_manager处理,但file_manager没有直接访问权限
|
||
# 所以返回相对路径,让调用者处理备注删除
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"action": "deleted"
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def rename_file(self, old_path: str, new_path: str) -> Dict:
|
||
"""重命名文件"""
|
||
valid_old, error_old, full_old_path = self._validate_path(old_path)
|
||
if not valid_old:
|
||
return {"success": False, "error": error_old}
|
||
|
||
valid_new, error_new, full_new_path = self._validate_path(new_path)
|
||
if not valid_new:
|
||
return {"success": False, "error": error_new}
|
||
|
||
if not full_old_path.exists():
|
||
return {"success": False, "error": "原文件不存在"}
|
||
|
||
if full_new_path.exists():
|
||
return {"success": False, "error": "目标文件已存在"}
|
||
|
||
try:
|
||
old_relative = self._relative_path(full_old_path)
|
||
new_relative = self._relative_path(full_new_path)
|
||
if self._use_container():
|
||
result = self._container_call("rename_file", {
|
||
"old_path": old_relative,
|
||
"new_path": new_relative
|
||
})
|
||
if result.get("success"):
|
||
print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}")
|
||
return result
|
||
|
||
full_old_path.rename(full_new_path)
|
||
|
||
print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}")
|
||
|
||
return {
|
||
"success": True,
|
||
"old_path": old_relative,
|
||
"new_path": new_relative,
|
||
"action": "renamed"
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def create_folder(self, path: str) -> Dict:
|
||
"""创建文件夹"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if full_path.exists():
|
||
return {"success": False, "error": "文件夹已存在"}
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
if self._use_container():
|
||
result = self._container_call("create_folder", {"path": relative_path})
|
||
if result.get("success"):
|
||
print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}")
|
||
return result
|
||
|
||
full_path.mkdir(parents=True, exist_ok=True)
|
||
print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}")
|
||
|
||
return {"success": True, "path": relative_path}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def delete_folder(self, path: str) -> Dict:
|
||
"""删除文件夹"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件夹不存在"}
|
||
|
||
if not full_path.is_dir():
|
||
return {"success": False, "error": "不是文件夹"}
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
if self._use_container():
|
||
result = self._container_call("delete_folder", {"path": relative_path})
|
||
if result.get("success"):
|
||
print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}")
|
||
return result
|
||
|
||
shutil.rmtree(full_path)
|
||
print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}")
|
||
|
||
return {"success": True, "path": relative_path}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def _read_text_lines(
|
||
self,
|
||
full_path: Path,
|
||
*,
|
||
size_limit: Optional[int] = None,
|
||
encoding: str = "utf-8",
|
||
) -> Dict:
|
||
"""读取UTF-8文本并返回行列表。"""
|
||
try:
|
||
file_size = full_path.stat().st_size
|
||
except FileNotFoundError:
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if size_limit and file_size > size_limit:
|
||
return {
|
||
"success": False,
|
||
"error": f"文件太大 ({file_size / 1024 / 1024:.2f}MB > {size_limit / 1024 / 1024}MB)"
|
||
}
|
||
|
||
try:
|
||
with open(full_path, 'r', encoding=encoding) as f:
|
||
lines = f.readlines()
|
||
except UnicodeDecodeError:
|
||
return {
|
||
"success": False,
|
||
"error": "文件不是 UTF-8 文本,无法直接读取,请改用 run_python 解析。"
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": f"读取文件失败: {e}"}
|
||
|
||
content = "".join(lines)
|
||
return {
|
||
"success": True,
|
||
"content": content,
|
||
"lines": lines,
|
||
"size": file_size
|
||
}
|
||
|
||
def read_file(self, path: str) -> Dict:
|
||
"""读取文件内容(兼容旧逻辑,限制为 MAX_FILE_SIZE)。"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
if self._use_container():
|
||
relative_path = self._relative_path(full_path)
|
||
result = self._container_call("read_file", {
|
||
"path": relative_path,
|
||
"size_limit": MAX_FILE_SIZE
|
||
})
|
||
return result
|
||
|
||
result = self._read_text_lines(full_path, size_limit=MAX_FILE_SIZE)
|
||
if not result["success"]:
|
||
return result
|
||
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"content": result["content"],
|
||
"size": result["size"]
|
||
}
|
||
|
||
def read_text_segment(
|
||
self,
|
||
path: str,
|
||
*,
|
||
start_line: Optional[int] = None,
|
||
end_line: Optional[int] = None,
|
||
size_limit: Optional[int] = None
|
||
) -> Dict:
|
||
"""按行范围读取文本片段。"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
if self._use_container():
|
||
relative_path = self._relative_path(full_path)
|
||
result = self._container_call("read_text_segment", {
|
||
"path": relative_path,
|
||
"start_line": start_line,
|
||
"end_line": end_line,
|
||
"size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE
|
||
})
|
||
return result
|
||
|
||
if self._use_container():
|
||
relative_path = self._relative_path(full_path)
|
||
return self._container_call("search_text", {
|
||
"path": relative_path,
|
||
"query": query,
|
||
"max_matches": max_matches,
|
||
"context_before": context_before,
|
||
"context_after": context_after,
|
||
"case_sensitive": case_sensitive,
|
||
})
|
||
|
||
if self._use_container():
|
||
relative_path = self._relative_path(full_path)
|
||
return self._container_call("extract_segments", {
|
||
"path": relative_path,
|
||
"segments": segments,
|
||
"size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE
|
||
})
|
||
|
||
result = self._read_text_lines(
|
||
full_path,
|
||
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
|
||
)
|
||
if not result["success"]:
|
||
return result
|
||
|
||
lines = result["lines"]
|
||
total_lines = len(lines)
|
||
start = start_line if start_line and start_line > 0 else 1
|
||
end = end_line if end_line and end_line >= start else total_lines
|
||
if start > total_lines:
|
||
return {"success": False, "error": "起始行超出文件长度"}
|
||
end = min(end, total_lines)
|
||
|
||
selected_lines = lines[start - 1 : end]
|
||
content = "".join(selected_lines)
|
||
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"content": content,
|
||
"size": result["size"],
|
||
"line_start": start,
|
||
"line_end": end,
|
||
"total_lines": total_lines
|
||
}
|
||
|
||
def search_text(
|
||
self,
|
||
path: str,
|
||
*,
|
||
query: str,
|
||
max_matches: int,
|
||
context_before: int,
|
||
context_after: int,
|
||
case_sensitive: bool = False,
|
||
size_limit: Optional[int] = None
|
||
) -> Dict:
|
||
"""在文件中搜索关键词,返回合并后的窗口。"""
|
||
if not query:
|
||
return {"success": False, "error": "缺少搜索关键词"}
|
||
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
result = self._read_text_lines(
|
||
full_path,
|
||
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
|
||
)
|
||
if not result["success"]:
|
||
return result
|
||
|
||
lines = result["lines"]
|
||
total_lines = len(lines)
|
||
matches = []
|
||
query_text = query if case_sensitive else query.lower()
|
||
|
||
def contains(haystack: str) -> bool:
|
||
target = haystack if case_sensitive else haystack.lower()
|
||
return query_text in target
|
||
|
||
for idx, line in enumerate(lines, start=1):
|
||
if contains(line):
|
||
window_start = max(1, idx - context_before)
|
||
window_end = min(total_lines, idx + context_after)
|
||
|
||
if matches and window_start <= matches[-1]["line_end"]:
|
||
matches[-1]["line_end"] = max(matches[-1]["line_end"], window_end)
|
||
matches[-1]["hits"].append(idx)
|
||
else:
|
||
if len(matches) >= max_matches:
|
||
break
|
||
matches.append({
|
||
"line_start": window_start,
|
||
"line_end": window_end,
|
||
"hits": [idx]
|
||
})
|
||
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
for window in matches:
|
||
snippet_lines = lines[window["line_start"] - 1 : window["line_end"]]
|
||
window["snippet"] = "".join(snippet_lines)
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"size": result["size"],
|
||
"total_lines": total_lines,
|
||
"matches": matches
|
||
}
|
||
|
||
def extract_segments(
|
||
self,
|
||
path: str,
|
||
segments: List[Dict],
|
||
*,
|
||
size_limit: Optional[int] = None
|
||
) -> Dict:
|
||
"""根据多个行区间提取内容。"""
|
||
if not segments:
|
||
return {"success": False, "error": "缺少要提取的行区间"}
|
||
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
result = self._read_text_lines(
|
||
full_path,
|
||
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
|
||
)
|
||
if not result["success"]:
|
||
return result
|
||
|
||
lines = result["lines"]
|
||
total_lines = len(lines)
|
||
extracted = []
|
||
|
||
for item in segments:
|
||
if not isinstance(item, dict):
|
||
return {"success": False, "error": "segments 数组中的每一项都必须是对象"}
|
||
start_line = item.get("start_line")
|
||
end_line = item.get("end_line")
|
||
label = item.get("label")
|
||
if start_line is None or end_line is None:
|
||
return {"success": False, "error": "所有区间都必须包含 start_line 和 end_line"}
|
||
if start_line <= 0 or end_line < start_line:
|
||
return {"success": False, "error": "行区间不合法"}
|
||
if start_line > total_lines:
|
||
return {"success": False, "error": f"区间起点 {start_line} 超出文件行数"}
|
||
end_line = min(end_line, total_lines)
|
||
snippet = "".join(lines[start_line - 1 : end_line])
|
||
extracted.append({
|
||
"label": label,
|
||
"line_start": start_line,
|
||
"line_end": end_line,
|
||
"content": snippet
|
||
})
|
||
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"size": result["size"],
|
||
"total_lines": total_lines,
|
||
"segments": extracted
|
||
}
|
||
|
||
def write_file(self, path: str, content: str, mode: str = "w") -> Dict:
|
||
"""
|
||
写入文件
|
||
|
||
Args:
|
||
path: 文件路径
|
||
content: 内容
|
||
mode: 写入模式 - "w"(覆盖), "a"(追加)
|
||
"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
# === 新增:内容预处理和验证 ===
|
||
if content:
|
||
# 长度检查
|
||
if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 100KB限制
|
||
return {
|
||
"success": False,
|
||
"error": f"内容过长({len(content)}字符),超过100KB限制",
|
||
"suggestion": "请分块处理或使用部分修改方式"
|
||
}
|
||
|
||
# 检查潜在的JSON格式问题
|
||
if content.count('"') % 2 != 0:
|
||
print(f"{OUTPUT_FORMATS['warning']} 检测到奇数个引号,可能存在格式问题")
|
||
|
||
# 检查大量转义字符
|
||
if content.count('\\') > len(content) / 20:
|
||
print(f"{OUTPUT_FORMATS['warning']} 检测到大量转义字符,建议检查内容格式")
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
current_size = self._get_project_size()
|
||
existing_size = full_path.stat().st_size if full_path.exists() else 0
|
||
if mode == "a":
|
||
projected_total = current_size + len(content)
|
||
else:
|
||
projected_total = current_size - existing_size + len(content)
|
||
if PROJECT_MAX_STORAGE_BYTES and projected_total > PROJECT_MAX_STORAGE_BYTES:
|
||
return {
|
||
"success": False,
|
||
"error": "写入失败:超出项目磁盘配额",
|
||
"limit_bytes": PROJECT_MAX_STORAGE_BYTES,
|
||
"project_size_bytes": current_size,
|
||
"attempt_size_bytes": len(content)
|
||
}
|
||
if self._use_container():
|
||
result = self._container_call("write_file", {
|
||
"path": relative_path,
|
||
"content": content,
|
||
"mode": mode
|
||
})
|
||
if result.get("success"):
|
||
action = "覆盖" if mode == "w" else "追加"
|
||
print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}")
|
||
return result
|
||
|
||
# 创建父目录
|
||
full_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(full_path, mode, encoding='utf-8') as f:
|
||
f.write(content)
|
||
|
||
action = "覆盖" if mode == "w" else "追加"
|
||
print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}")
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"size": len(content),
|
||
"mode": mode
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def append_file(self, path: str, content: str) -> Dict:
|
||
"""追加内容到文件"""
|
||
return self.write_file(path, content, mode="a")
|
||
|
||
def _parse_diff_patch(self, patch_text: str) -> Dict:
|
||
"""解析统一diff格式的补丁,转换为 apply_modify_blocks 所需的块结构。"""
|
||
if not patch_text or "*** Begin Patch" not in patch_text or "*** End Patch" not in patch_text:
|
||
return {
|
||
"success": False,
|
||
"error": "补丁格式错误:缺少 *** Begin Patch / *** End Patch 标记。"
|
||
}
|
||
|
||
start = patch_text.find("*** Begin Patch")
|
||
end = patch_text.rfind("*** End Patch")
|
||
if end <= start:
|
||
return {
|
||
"success": False,
|
||
"error": "补丁格式错误:结束标记位置异常。"
|
||
}
|
||
|
||
body = patch_text[start + len("*** Begin Patch"):end]
|
||
lines = body.splitlines(True) # 保留换行符,便于逐字匹配
|
||
|
||
blocks: List[Dict] = []
|
||
current_block: Optional[Dict] = None
|
||
auto_index = 1
|
||
|
||
id_pattern = re.compile(r"\[id:\s*(\d+)\]", re.IGNORECASE)
|
||
|
||
for raw_line in lines:
|
||
stripped = raw_line.strip()
|
||
if not stripped and current_block is None:
|
||
continue
|
||
|
||
if stripped.startswith("@@"):
|
||
if current_block:
|
||
if not current_block["lines"]:
|
||
return {
|
||
"success": False,
|
||
"error": f"补丁块缺少内容:{current_block.get('header', '').strip()}"
|
||
}
|
||
blocks.append(current_block)
|
||
|
||
header = stripped
|
||
id_match = id_pattern.search(header)
|
||
block_id: Optional[int] = None
|
||
if id_match:
|
||
try:
|
||
block_id = int(id_match.group(1))
|
||
except ValueError:
|
||
return {
|
||
"success": False,
|
||
"error": f"补丁块编号必须是整数:{header}"
|
||
}
|
||
|
||
current_block = {"id": block_id, "header": header, "lines": []}
|
||
continue
|
||
|
||
if current_block is None:
|
||
if stripped:
|
||
return {
|
||
"success": False,
|
||
"error": "补丁格式错误:在检测到第一个 @@ 块之前出现内容。"
|
||
}
|
||
continue
|
||
|
||
if raw_line.startswith("\\ No newline at end of file"):
|
||
continue
|
||
|
||
current_block["lines"].append(raw_line)
|
||
|
||
if current_block:
|
||
if not current_block["lines"]:
|
||
return {
|
||
"success": False,
|
||
"error": f"补丁块缺少内容:{current_block.get('header', '').strip()}"
|
||
}
|
||
blocks.append(current_block)
|
||
|
||
if not blocks:
|
||
return {
|
||
"success": False,
|
||
"error": "补丁格式错误:未检测到任何 @@ [id:n] 块。"
|
||
}
|
||
|
||
parsed_blocks: List[Dict] = []
|
||
used_indices: Set[int] = set()
|
||
|
||
for block in blocks:
|
||
idx = block["id"]
|
||
if idx is None:
|
||
while auto_index in used_indices:
|
||
auto_index += 1
|
||
idx = auto_index
|
||
auto_index += 1
|
||
elif idx in used_indices:
|
||
while idx in used_indices:
|
||
idx += 1
|
||
auto_index = max(auto_index, idx + 1)
|
||
|
||
used_indices.add(idx)
|
||
|
||
old_lines: List[str] = []
|
||
new_lines: List[str] = []
|
||
has_anchor = False
|
||
has_content = False
|
||
|
||
for line in block["lines"]:
|
||
if not line:
|
||
continue
|
||
prefix = line[0]
|
||
if prefix == ' ':
|
||
has_anchor = True
|
||
old_lines.append(line[1:])
|
||
new_lines.append(line[1:])
|
||
has_content = True
|
||
elif prefix == '-':
|
||
has_anchor = True
|
||
old_lines.append(line[1:])
|
||
has_content = True
|
||
elif prefix == '+':
|
||
new_lines.append(line[1:])
|
||
has_content = True
|
||
else:
|
||
# 容忍空白符或意外格式,直接作为上下文
|
||
old_lines.append(line)
|
||
new_lines.append(line)
|
||
has_anchor = True
|
||
has_content = True
|
||
|
||
if not has_content:
|
||
return {
|
||
"success": False,
|
||
"error": f"补丁块 {idx} 未包含任何 + / - / 上下文行。"
|
||
}
|
||
|
||
append_only = False
|
||
if not has_anchor:
|
||
append_only = True
|
||
|
||
old_text = "".join(old_lines)
|
||
new_text = "".join(new_lines)
|
||
raw_patch = f"{block['header']}\n{''.join(block['lines'])}"
|
||
|
||
parsed_blocks.append({
|
||
"index": idx,
|
||
"old": old_text,
|
||
"new": new_text,
|
||
"append_only": append_only,
|
||
"raw_patch": raw_patch
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"blocks": parsed_blocks
|
||
}
|
||
|
||
def apply_diff_patch(self, path: str, patch_text: str) -> Dict:
|
||
"""解析统一diff并写入文件,支持多块依次执行。"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
parse_result = self._parse_diff_patch(patch_text)
|
||
if not parse_result.get("success"):
|
||
return parse_result
|
||
|
||
blocks = parse_result.get("blocks") or []
|
||
if not blocks:
|
||
return {
|
||
"success": False,
|
||
"error": "未检测到有效的补丁块。"
|
||
}
|
||
|
||
relative_path = str(self._relative_path(full_path))
|
||
|
||
parsed_blocks: List[Dict] = blocks
|
||
block_lookup: Dict[int, Dict] = {block["index"]: block for block in parsed_blocks}
|
||
|
||
def attach_block_context(entries: Optional[List[Dict]]):
|
||
if not entries:
|
||
return
|
||
for entry in entries:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
idx = entry.get("index")
|
||
if idx is None:
|
||
continue
|
||
block_info = block_lookup.get(idx)
|
||
if not block_info:
|
||
continue
|
||
patch_text = block_info.get("raw_patch")
|
||
if patch_text:
|
||
entry.setdefault("block_patch", patch_text)
|
||
if "old_text" not in entry:
|
||
entry["old_text"] = block_info.get("old")
|
||
if "new_text" not in entry:
|
||
entry["new_text"] = block_info.get("new")
|
||
entry.setdefault("append_only", block_info.get("append_only", False))
|
||
|
||
append_only_blocks = [b for b in parsed_blocks if b.get("append_only")]
|
||
modify_blocks = [
|
||
{"index": b["index"], "old": b["old"], "new": b["new"]}
|
||
for b in parsed_blocks
|
||
if not b.get("append_only")
|
||
]
|
||
|
||
apply_result = {"results": []}
|
||
completed_indices: List[int] = []
|
||
failed_entries: List[Dict] = []
|
||
write_error = None
|
||
|
||
if modify_blocks:
|
||
modify_result = self.apply_modify_blocks(path, modify_blocks)
|
||
apply_result.update(modify_result)
|
||
completed_indices.extend(modify_result.get("completed", []))
|
||
failed_entries.extend(modify_result.get("failed", []))
|
||
write_error = modify_result.get("error")
|
||
else:
|
||
apply_result.update({
|
||
"success": True,
|
||
"completed": [],
|
||
"failed": [],
|
||
"results": [],
|
||
"write_performed": False,
|
||
"error": None
|
||
})
|
||
|
||
results_blocks = apply_result.get("results", []).copy()
|
||
append_results: List[Dict] = []
|
||
append_bytes = 0
|
||
append_lines_total = 0
|
||
append_success = True
|
||
|
||
if append_only_blocks:
|
||
try:
|
||
with open(full_path, 'a', encoding='utf-8') as f:
|
||
for block in append_only_blocks:
|
||
chunk = block.get("new", "")
|
||
if not chunk:
|
||
append_results.append({
|
||
"index": block["index"],
|
||
"status": "failed",
|
||
"reason": "追加块为空"
|
||
})
|
||
failed_entries.append({
|
||
"index": block["index"],
|
||
"reason": "追加块为空"
|
||
})
|
||
append_success = False
|
||
continue
|
||
f.write(chunk)
|
||
added_lines = chunk.count('\n')
|
||
if chunk and not chunk.endswith('\n'):
|
||
added_lines += 1
|
||
append_lines_total += added_lines
|
||
append_bytes += len(chunk.encode('utf-8'))
|
||
append_results.append({
|
||
"index": block["index"],
|
||
"status": "success",
|
||
"removed_lines": 0,
|
||
"added_lines": added_lines
|
||
})
|
||
completed_indices.append(block["index"])
|
||
except Exception as e:
|
||
append_success = False
|
||
write_error = f"追加写入失败: {e}"
|
||
append_results.append({
|
||
"index": append_only_blocks[-1]["index"],
|
||
"status": "failed",
|
||
"reason": str(e)
|
||
})
|
||
failed_entries.append({
|
||
"index": append_only_blocks[-1]["index"],
|
||
"reason": str(e)
|
||
})
|
||
|
||
attach_block_context(failed_entries)
|
||
|
||
total_blocks = len(parsed_blocks)
|
||
completed_unique = sorted(set(completed_indices))
|
||
|
||
summary_parts = [
|
||
f"向 {relative_path} 应用 {total_blocks} 个补丁块",
|
||
f"成功 {len(completed_unique)} 个",
|
||
f"失败 {len(failed_entries)} 个"
|
||
]
|
||
if append_only_blocks:
|
||
summary_parts.append(f"追加 {len(append_only_blocks)} 块,写入 {append_lines_total} 行({append_bytes} 字节)")
|
||
if write_error:
|
||
summary_parts.append(write_error)
|
||
summary = ",".join(summary_parts)
|
||
|
||
results_blocks.extend(append_results)
|
||
|
||
apply_result["results"] = results_blocks
|
||
apply_result["blocks"] = results_blocks
|
||
apply_result["path"] = relative_path
|
||
apply_result["total_blocks"] = total_blocks
|
||
apply_result["summary"] = summary
|
||
apply_result["message"] = summary
|
||
apply_result["completed"] = completed_unique
|
||
apply_result["failed"] = failed_entries
|
||
apply_result["append_bytes"] = append_bytes
|
||
apply_result["append_blocks"] = len(append_only_blocks)
|
||
apply_result["success"] = (
|
||
append_success
|
||
and apply_result.get("success", True)
|
||
and not failed_entries
|
||
and not write_error
|
||
)
|
||
apply_result["error"] = write_error
|
||
return apply_result
|
||
|
||
def apply_modify_blocks(self, path: str, blocks: List[Dict]) -> Dict:
|
||
"""
|
||
应用批量替换块
|
||
|
||
Args:
|
||
path: 目标文件路径
|
||
blocks: [{"index": int, "old": str, "new": str}]
|
||
"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
if self._use_container():
|
||
return self._container_call("apply_modify_blocks", {
|
||
"path": relative_path,
|
||
"blocks": blocks
|
||
})
|
||
|
||
with open(full_path, 'r', encoding='utf-8') as f:
|
||
original_content = f.read()
|
||
except Exception as e:
|
||
return {"success": False, "error": f"读取文件失败: {e}"}
|
||
|
||
current_content = original_content
|
||
results: List[Dict] = []
|
||
completed_indices: List[int] = []
|
||
failed_details: List[Dict] = []
|
||
write_error = None
|
||
|
||
for block in blocks:
|
||
index = block.get("index")
|
||
old_text = block.get("old", "")
|
||
new_text = block.get("new", "")
|
||
|
||
block_result = {
|
||
"index": index,
|
||
"status": "pending",
|
||
"removed_lines": 0,
|
||
"added_lines": 0,
|
||
"reason": None,
|
||
"hint": None
|
||
}
|
||
|
||
if old_text is None or new_text is None:
|
||
block_result["status"] = "error"
|
||
block_result["reason"] = "缺少 OLD 或 NEW 内容"
|
||
block_result["hint"] = "请确保粘贴的补丁包含成对的 <<<OLD>>> / <<<NEW>>> 标记。"
|
||
failed_details.append({"index": index, "reason": "缺少 OLD/NEW 标记"})
|
||
results.append(block_result)
|
||
continue
|
||
|
||
# 统一换行符,避免 CRLF 与 LF 不一致导致匹配失败
|
||
old_text = old_text.replace('\r\n', '\n')
|
||
new_text = new_text.replace('\r\n', '\n')
|
||
|
||
if not old_text:
|
||
block_result["status"] = "error"
|
||
block_result["reason"] = "OLD 内容不能为空"
|
||
block_result["hint"] = "请确认要替换的原文是否准确复制;若多次失败,可改用 terminal_snapshot 查证或使用终端命令/Python 小脚本进行精确替换。"
|
||
failed_details.append({"index": index, "reason": "OLD 内容为空"})
|
||
results.append(block_result)
|
||
continue
|
||
|
||
position = current_content.find(old_text)
|
||
if position == -1:
|
||
block_result["status"] = "not_found"
|
||
block_result["reason"] = "未找到匹配的原文,请确认是否完全复制"
|
||
block_result["hint"] = "请先用 terminal_snapshot 或 grep -n 校验原文;若仍失败,可在说明后改用 run_command/python 进行局部修改。"
|
||
failed_details.append({"index": index, "reason": "未找到匹配的原文"})
|
||
results.append(block_result)
|
||
continue
|
||
|
||
current_content = (
|
||
current_content[:position] +
|
||
new_text +
|
||
current_content[position + len(old_text):]
|
||
)
|
||
|
||
removed_lines = old_text.count('\n')
|
||
added_lines = new_text.count('\n')
|
||
if old_text and not old_text.endswith('\n'):
|
||
removed_lines += 1
|
||
if new_text and not new_text.endswith('\n'):
|
||
added_lines += 1
|
||
|
||
block_result.update({
|
||
"status": "success",
|
||
"removed_lines": removed_lines if old_text else 0,
|
||
"added_lines": added_lines if new_text else 0
|
||
})
|
||
completed_indices.append(index)
|
||
results.append(block_result)
|
||
|
||
write_performed = False
|
||
if completed_indices:
|
||
try:
|
||
with open(full_path, 'w', encoding='utf-8') as f:
|
||
f.write(current_content)
|
||
write_performed = True
|
||
except Exception as e:
|
||
write_error = f"写入文件失败: {e}"
|
||
# 写入失败时恢复原始内容
|
||
try:
|
||
with open(full_path, 'w', encoding='utf-8') as f:
|
||
f.write(original_content)
|
||
except Exception:
|
||
pass
|
||
|
||
success = bool(completed_indices) and not failed_details and write_error is None
|
||
|
||
return {
|
||
"success": success,
|
||
"completed": completed_indices,
|
||
"failed": failed_details,
|
||
"results": results,
|
||
"write_performed": write_performed,
|
||
"error": write_error
|
||
}
|
||
|
||
def replace_in_file(self, path: str, old_text: str, new_text: str) -> Dict:
|
||
"""替换文件中的内容"""
|
||
# 先读取文件
|
||
result = self.read_file(path)
|
||
if not result["success"]:
|
||
return result
|
||
|
||
content = result["content"]
|
||
|
||
# === 新增:替换操作的安全检查 ===
|
||
if old_text and len(old_text) > 9999999999:
|
||
return {
|
||
"success": False,
|
||
"error": "要替换的文本过长,可能导致性能问题",
|
||
"suggestion": "请拆分内容或使用 write_file_diff 提交结构化补丁"
|
||
}
|
||
|
||
if new_text and len(new_text) > 9999999999:
|
||
return {
|
||
"success": False,
|
||
"error": "替换的新文本过长,建议分块处理",
|
||
"suggestion": "请将大内容分成多个小的替换操作"
|
||
}
|
||
|
||
# 检查是否包含要替换的内容
|
||
if old_text and old_text not in content:
|
||
return {"success": False, "error": "未找到要替换的内容"}
|
||
|
||
# 替换内容
|
||
if old_text:
|
||
new_content = content.replace(old_text, new_text)
|
||
count = content.count(old_text)
|
||
else:
|
||
# 空文件直接写入新内容
|
||
new_content = new_text
|
||
count = 1
|
||
|
||
# 写回文件
|
||
result = self.write_file(path, new_content)
|
||
if result["success"]:
|
||
result["replacements"] = count
|
||
print(f"{OUTPUT_FORMATS['file']} 替换了 {count} 处内容")
|
||
|
||
return result
|
||
|
||
def clear_file(self, path: str) -> Dict:
|
||
"""清空文件内容"""
|
||
return self.write_file(path, "", mode="w")
|
||
|
||
def edit_lines_range(self, path: str, start_line: int, end_line: int, content: str, operation: str) -> Dict:
|
||
"""
|
||
基于行号编辑文件
|
||
|
||
Args:
|
||
path: 文件路径
|
||
start_line: 起始行号(从1开始)
|
||
end_line: 结束行号(从1开始,包含)
|
||
content: 新内容
|
||
operation: 操作类型 - "replace", "insert", "delete"
|
||
"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
if not full_path.is_file():
|
||
return {"success": False, "error": "不是文件"}
|
||
|
||
# 验证行号
|
||
if start_line < 1:
|
||
return {"success": False, "error": "行号必须从1开始"}
|
||
|
||
if end_line < start_line:
|
||
return {"success": False, "error": "结束行号不能小于起始行号"}
|
||
|
||
try:
|
||
relative_path = self._relative_path(full_path)
|
||
if self._use_container():
|
||
return self._container_call("edit_lines_range", {
|
||
"path": relative_path,
|
||
"start_line": start_line,
|
||
"end_line": end_line,
|
||
"content": content,
|
||
"operation": operation
|
||
})
|
||
|
||
# 读取文件内容
|
||
with open(full_path, 'r', encoding='utf-8') as f:
|
||
lines = f.readlines()
|
||
|
||
total_lines = len(lines)
|
||
|
||
# 检查行号范围
|
||
if start_line > total_lines:
|
||
if operation == "insert":
|
||
# 插入操作允许在文件末尾后插入
|
||
lines.extend([''] * (start_line - total_lines - 1))
|
||
lines.append(content if content.endswith('\n') else content + '\n')
|
||
else:
|
||
return {"success": False, "error": f"起始行号 {start_line} 超出文件范围 (共 {total_lines} 行)"}
|
||
elif end_line > total_lines:
|
||
return {"success": False, "error": f"结束行号 {end_line} 超出文件范围 (共 {total_lines} 行)"}
|
||
else:
|
||
# 执行操作(转换为0基索引)
|
||
start_idx = start_line - 1
|
||
end_idx = end_line
|
||
|
||
if operation == "replace":
|
||
# 替换指定行范围
|
||
new_lines = content.split('\n') if '\n' in content else [content]
|
||
# 确保每行都有换行符,除了最后一行需要检查原文件格式
|
||
formatted_lines = []
|
||
for i, line in enumerate(new_lines):
|
||
if i < len(new_lines) - 1 or (end_idx < len(lines) and lines[end_idx - 1].endswith('\n')):
|
||
formatted_lines.append(line + '\n' if not line.endswith('\n') else line)
|
||
else:
|
||
formatted_lines.append(line)
|
||
|
||
lines[start_idx:end_idx] = formatted_lines
|
||
affected_lines = end_line - start_line + 1
|
||
|
||
elif operation == "insert":
|
||
# 在指定行前插入内容
|
||
new_lines = content.split('\n') if '\n' in content else [content]
|
||
formatted_lines = [line + '\n' if not line.endswith('\n') else line for line in new_lines]
|
||
lines[start_idx:start_idx] = formatted_lines
|
||
affected_lines = len(formatted_lines)
|
||
|
||
elif operation == "delete":
|
||
# 删除指定行范围
|
||
affected_lines = end_line - start_line + 1
|
||
del lines[start_idx:end_idx]
|
||
|
||
else:
|
||
return {"success": False, "error": f"未知的操作类型: {operation}"}
|
||
|
||
# 写回文件
|
||
with open(full_path, 'w', encoding='utf-8') as f:
|
||
f.writelines(lines)
|
||
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
|
||
# 生成操作描述
|
||
if operation == "replace":
|
||
operation_desc = f"替换第 {start_line}-{end_line} 行"
|
||
elif operation == "insert":
|
||
operation_desc = f"在第 {start_line} 行前插入"
|
||
elif operation == "delete":
|
||
operation_desc = f"删除第 {start_line}-{end_line} 行"
|
||
|
||
print(f"{OUTPUT_FORMATS['file']} {operation_desc}: {relative_path}")
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"operation": operation,
|
||
"start_line": start_line,
|
||
"end_line": end_line,
|
||
"affected_lines": affected_lines,
|
||
"total_lines_after": len(lines),
|
||
"description": operation_desc
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def list_files(self, path: str = "") -> Dict:
|
||
"""列出目录内容"""
|
||
if path:
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
else:
|
||
full_path = self.project_path
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "目录不存在"}
|
||
|
||
if not full_path.is_dir():
|
||
return {"success": False, "error": "不是目录"}
|
||
|
||
try:
|
||
files = []
|
||
folders = []
|
||
|
||
for item in full_path.iterdir():
|
||
if item.name.startswith('.'):
|
||
continue
|
||
|
||
relative_path = str(item.relative_to(self.project_path))
|
||
|
||
if item.is_file():
|
||
files.append({
|
||
"name": item.name,
|
||
"path": relative_path,
|
||
"size": item.stat().st_size,
|
||
"modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat()
|
||
})
|
||
elif item.is_dir():
|
||
folders.append({
|
||
"name": item.name,
|
||
"path": relative_path
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"path": str(full_path.relative_to(self.project_path)) if path else ".",
|
||
"files": sorted(files, key=lambda x: x["name"]),
|
||
"folders": sorted(folders, key=lambda x: x["name"])
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def get_file_info(self, path: str) -> Dict:
|
||
"""获取文件信息"""
|
||
valid, error, full_path = self._validate_path(path)
|
||
if not valid:
|
||
return {"success": False, "error": error}
|
||
|
||
if not full_path.exists():
|
||
return {"success": False, "error": "文件不存在"}
|
||
|
||
try:
|
||
stat = full_path.stat()
|
||
relative_path = str(full_path.relative_to(self.project_path))
|
||
|
||
return {
|
||
"success": True,
|
||
"path": relative_path,
|
||
"name": full_path.name,
|
||
"type": "file" if full_path.is_file() else "folder",
|
||
"size": stat.st_size,
|
||
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
|
||
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
||
"extension": full_path.suffix if full_path.is_file() else None
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|