agent-Specialization/modules/file_manager.py

1358 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# modules/file_manager.py - 文件管理模块(添加行编辑功能)
import os
import shutil
from pathlib import Path
import re
from typing import Optional, Dict, List, Set, Tuple, TYPE_CHECKING
from datetime import datetime
try:
from config import (
MAX_FILE_SIZE,
FORBIDDEN_PATHS,
FORBIDDEN_ROOT_PATHS,
OUTPUT_FORMATS,
READ_TOOL_MAX_FILE_SIZE,
PROJECT_MAX_STORAGE_BYTES,
)
except ImportError: # 兼容全局环境中存在同名包的情况
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parents[1]
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from config import (
MAX_FILE_SIZE,
FORBIDDEN_PATHS,
FORBIDDEN_ROOT_PATHS,
OUTPUT_FORMATS,
READ_TOOL_MAX_FILE_SIZE,
PROJECT_MAX_STORAGE_BYTES,
)
from modules.container_file_proxy import ContainerFileProxy
from utils.logger import setup_logger
if TYPE_CHECKING:
from modules.user_container_manager import ContainerHandle
# 临时禁用长度检查
DISABLE_LENGTH_CHECK = True
logger = setup_logger(__name__)
class FileManager:
def __init__(self, project_path: str, container_session: Optional["ContainerHandle"] = None):
self.project_path = Path(project_path).resolve()
self.container_session: Optional["ContainerHandle"] = None
self._container_proxy: Optional[ContainerFileProxy] = None
self.set_container_session(container_session)
def set_container_session(self, container_session: Optional["ContainerHandle"]):
self.container_session = container_session
if (
container_session
and container_session.mode == "docker"
and container_session.container_name
):
if self._container_proxy is None:
self._container_proxy = ContainerFileProxy(container_session)
else:
self._container_proxy.update_session(container_session)
else:
self._container_proxy = None
def _use_container(self) -> bool:
return self._container_proxy is not None and self._container_proxy.is_available()
def _container_call(self, action: str, payload: Dict) -> Dict:
if not self._use_container():
return {
"success": False,
"error": "容器未就绪,无法执行文件操作"
}
return self._container_proxy.run(action, payload)
def _get_project_size(self) -> int:
"""计算项目目录的总大小(字节),遇到异常时记录并抛出。"""
total = 0
if not self.project_path.exists():
return 0
for path in self.project_path.rglob('*'):
if not path.is_file():
continue
try:
total += path.stat().st_size
except Exception as exc:
logger.error(
"Failed to stat %s while calculating project size: %s",
path,
exc,
exc_info=True,
)
raise
return total
def _validate_path(self, path: str) -> Tuple[bool, str, Path]:
"""
验证路径安全性
Returns:
(是否有效, 错误信息, 完整路径)
"""
original_path = path
project_root = Path(self.project_path).resolve()
if project_root != self.project_path:
self.project_path = project_root
# 不允许绝对路径(除非是在项目内的绝对路径)
if path.startswith('/') or path.startswith('\\') or (len(path) > 1 and path[1] == ':'):
# 如果是绝对路径,检查是否指向项目内
try:
test_path = Path(path).resolve()
test_path.relative_to(project_root)
# 如果成功,说明绝对路径在项目内,转换为相对路径
path = str(test_path.relative_to(project_root))
except ValueError:
if str(original_path).replace("\\", "/").startswith("/workspace"):
return False, "路径必须在项目文件夹内。请检查是否使用的是不带/workspace的相对路径。", None
return False, "路径必须在项目文件夹内", None
# 检查是否包含向上遍历
if ".." in path:
return False, "不允许使用../向上遍历", None
# 构建完整路径
full_path = (project_root / path).resolve()
# 检查是否在项目目录内
try:
full_path.relative_to(project_root)
except ValueError:
return False, "路径必须在项目文件夹内", None
# 检查禁止的路径
path_str = str(full_path)
for forbidden_root in FORBIDDEN_ROOT_PATHS:
if path_str == forbidden_root:
return False, f"禁止访问根目录: {forbidden_root}", None
for forbidden in FORBIDDEN_PATHS:
if path_str.startswith(forbidden + os.sep) or path_str == forbidden:
return False, f"禁止访问系统目录: {forbidden}", None
return True, "", full_path
def _relative_path(self, full_path: Path) -> str:
return str(full_path.relative_to(self.project_path))
def create_file(self, path: str, content: str = "", file_type: str = "txt") -> Dict:
"""创建文件"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
# 添加文件扩展名
if not full_path.suffix:
full_path = full_path.with_suffix(f".{file_type}")
relative_path = self._relative_path(full_path)
try:
if full_path.parent == self.project_path:
return {
"success": False,
"error": "禁止在项目根目录直接创建文件,请先创建或选择合适的子目录。",
"suggestion": "然后必须**重新**再次创建文件。"
}
if self._use_container():
result = self._container_call("create_file", {
"path": relative_path,
"content": ""
})
if result.get("success"):
print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}")
return result
# 创建父目录
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f:
f.write("")
print(f"{OUTPUT_FORMATS['file']} 创建文件: {relative_path}")
return {
"success": True,
"path": relative_path,
"size": 0
}
except Exception as e:
return {"success": False, "error": str(e)}
def delete_file(self, path: str) -> Dict:
"""删除文件"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
try:
relative_path = self._relative_path(full_path)
if self._use_container():
result = self._container_call("delete_file", {
"path": relative_path
})
if result.get("success"):
print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}")
return result
full_path.unlink()
print(f"{OUTPUT_FORMATS['file']} 删除文件: {relative_path}")
# 删除文件备注(如果存在)
# 这需要通过context_manager处理但file_manager没有直接访问权限
# 所以返回相对路径,让调用者处理备注删除
return {
"success": True,
"path": relative_path,
"action": "deleted"
}
except Exception as e:
return {"success": False, "error": str(e)}
def rename_file(self, old_path: str, new_path: str) -> Dict:
"""重命名文件"""
valid_old, error_old, full_old_path = self._validate_path(old_path)
if not valid_old:
return {"success": False, "error": error_old}
valid_new, error_new, full_new_path = self._validate_path(new_path)
if not valid_new:
return {"success": False, "error": error_new}
if not full_old_path.exists():
return {"success": False, "error": "原文件不存在"}
if full_new_path.exists():
return {"success": False, "error": "目标文件已存在"}
try:
old_relative = self._relative_path(full_old_path)
new_relative = self._relative_path(full_new_path)
if self._use_container():
result = self._container_call("rename_file", {
"old_path": old_relative,
"new_path": new_relative
})
if result.get("success"):
print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}")
return result
full_old_path.rename(full_new_path)
print(f"{OUTPUT_FORMATS['file']} 重命名: {old_relative} -> {new_relative}")
return {
"success": True,
"old_path": old_relative,
"new_path": new_relative,
"action": "renamed"
}
except Exception as e:
return {"success": False, "error": str(e)}
def create_folder(self, path: str) -> Dict:
"""创建文件夹"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if full_path.exists():
return {"success": False, "error": "文件夹已存在"}
try:
relative_path = self._relative_path(full_path)
if self._use_container():
result = self._container_call("create_folder", {"path": relative_path})
if result.get("success"):
print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}")
return result
full_path.mkdir(parents=True, exist_ok=True)
print(f"{OUTPUT_FORMATS['file']} 创建文件夹: {relative_path}")
return {"success": True, "path": relative_path}
except Exception as e:
return {"success": False, "error": str(e)}
def delete_folder(self, path: str) -> Dict:
"""删除文件夹"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件夹不存在"}
if not full_path.is_dir():
return {"success": False, "error": "不是文件夹"}
try:
relative_path = self._relative_path(full_path)
if self._use_container():
result = self._container_call("delete_folder", {"path": relative_path})
if result.get("success"):
print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}")
return result
shutil.rmtree(full_path)
print(f"{OUTPUT_FORMATS['file']} 删除文件夹: {relative_path}")
return {"success": True, "path": relative_path}
except Exception as e:
return {"success": False, "error": str(e)}
def _read_text_lines(
self,
full_path: Path,
*,
size_limit: Optional[int] = None,
encoding: str = "utf-8",
) -> Dict:
"""读取UTF-8文本并返回行列表。"""
try:
file_size = full_path.stat().st_size
except FileNotFoundError:
return {"success": False, "error": "文件不存在"}
if size_limit and file_size > size_limit:
return {
"success": False,
"error": f"文件太大 ({file_size / 1024 / 1024:.2f}MB > {size_limit / 1024 / 1024}MB)"
}
try:
with open(full_path, 'r', encoding=encoding) as f:
lines = f.readlines()
except UnicodeDecodeError:
return {
"success": False,
"error": "文件不是 UTF-8 文本,无法直接读取,请改用 run_python 解析。"
}
except Exception as e:
return {"success": False, "error": f"读取文件失败: {e}"}
content = "".join(lines)
return {
"success": True,
"content": content,
"lines": lines,
"size": file_size
}
def read_file(self, path: str) -> Dict:
"""读取文件内容(兼容旧逻辑,限制为 MAX_FILE_SIZE"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
if self._use_container():
relative_path = self._relative_path(full_path)
result = self._container_call("read_file", {
"path": relative_path,
"size_limit": MAX_FILE_SIZE
})
return result
result = self._read_text_lines(full_path, size_limit=MAX_FILE_SIZE)
if not result["success"]:
return result
relative_path = str(full_path.relative_to(self.project_path))
return {
"success": True,
"path": relative_path,
"content": result["content"],
"size": result["size"]
}
def read_text_segment(
self,
path: str,
*,
start_line: Optional[int] = None,
end_line: Optional[int] = None,
size_limit: Optional[int] = None
) -> Dict:
"""按行范围读取文本片段。"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
if self._use_container():
relative_path = self._relative_path(full_path)
result = self._container_call("read_text_segment", {
"path": relative_path,
"start_line": start_line,
"end_line": end_line,
"size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE
})
return result
if self._use_container():
relative_path = self._relative_path(full_path)
return self._container_call("search_text", {
"path": relative_path,
"query": query,
"max_matches": max_matches,
"context_before": context_before,
"context_after": context_after,
"case_sensitive": case_sensitive,
})
if self._use_container():
relative_path = self._relative_path(full_path)
return self._container_call("extract_segments", {
"path": relative_path,
"segments": segments,
"size_limit": size_limit or READ_TOOL_MAX_FILE_SIZE
})
result = self._read_text_lines(
full_path,
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
)
if not result["success"]:
return result
lines = result["lines"]
total_lines = len(lines)
start = start_line if start_line and start_line > 0 else 1
end = end_line if end_line and end_line >= start else total_lines
if start > total_lines:
return {"success": False, "error": "起始行超出文件长度"}
end = min(end, total_lines)
selected_lines = lines[start - 1 : end]
content = "".join(selected_lines)
relative_path = str(full_path.relative_to(self.project_path))
return {
"success": True,
"path": relative_path,
"content": content,
"size": result["size"],
"line_start": start,
"line_end": end,
"total_lines": total_lines
}
def search_text(
self,
path: str,
*,
query: str,
max_matches: int,
context_before: int,
context_after: int,
case_sensitive: bool = False,
size_limit: Optional[int] = None
) -> Dict:
"""在文件中搜索关键词,返回合并后的窗口。"""
if not query:
return {"success": False, "error": "缺少搜索关键词"}
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
result = self._read_text_lines(
full_path,
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
)
if not result["success"]:
return result
lines = result["lines"]
total_lines = len(lines)
matches = []
query_text = query if case_sensitive else query.lower()
def contains(haystack: str) -> bool:
target = haystack if case_sensitive else haystack.lower()
return query_text in target
for idx, line in enumerate(lines, start=1):
if contains(line):
window_start = max(1, idx - context_before)
window_end = min(total_lines, idx + context_after)
if matches and window_start <= matches[-1]["line_end"]:
matches[-1]["line_end"] = max(matches[-1]["line_end"], window_end)
matches[-1]["hits"].append(idx)
else:
if len(matches) >= max_matches:
break
matches.append({
"line_start": window_start,
"line_end": window_end,
"hits": [idx]
})
relative_path = str(full_path.relative_to(self.project_path))
for window in matches:
snippet_lines = lines[window["line_start"] - 1 : window["line_end"]]
window["snippet"] = "".join(snippet_lines)
return {
"success": True,
"path": relative_path,
"size": result["size"],
"total_lines": total_lines,
"matches": matches
}
def extract_segments(
self,
path: str,
segments: List[Dict],
*,
size_limit: Optional[int] = None
) -> Dict:
"""根据多个行区间提取内容。"""
if not segments:
return {"success": False, "error": "缺少要提取的行区间"}
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
result = self._read_text_lines(
full_path,
size_limit=size_limit or READ_TOOL_MAX_FILE_SIZE
)
if not result["success"]:
return result
lines = result["lines"]
total_lines = len(lines)
extracted = []
for item in segments:
if not isinstance(item, dict):
return {"success": False, "error": "segments 数组中的每一项都必须是对象"}
start_line = item.get("start_line")
end_line = item.get("end_line")
label = item.get("label")
if start_line is None or end_line is None:
return {"success": False, "error": "所有区间都必须包含 start_line 和 end_line"}
if start_line <= 0 or end_line < start_line:
return {"success": False, "error": "行区间不合法"}
if start_line > total_lines:
return {"success": False, "error": f"区间起点 {start_line} 超出文件行数"}
end_line = min(end_line, total_lines)
snippet = "".join(lines[start_line - 1 : end_line])
extracted.append({
"label": label,
"line_start": start_line,
"line_end": end_line,
"content": snippet
})
relative_path = str(full_path.relative_to(self.project_path))
return {
"success": True,
"path": relative_path,
"size": result["size"],
"total_lines": total_lines,
"segments": extracted
}
def write_file(self, path: str, content: str, mode: str = "w") -> Dict:
"""
写入文件
Args:
path: 文件路径
content: 内容
mode: 写入模式 - "w"(覆盖), "a"(追加)
"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
# === 新增:内容预处理和验证 ===
if content:
# 长度检查
if not DISABLE_LENGTH_CHECK and len(content) > 9999999999: # 100KB限制
return {
"success": False,
"error": f"内容过长({len(content)}字符)超过100KB限制",
"suggestion": "请分块处理或使用部分修改方式"
}
# 检查潜在的JSON格式问题
if content.count('"') % 2 != 0:
print(f"{OUTPUT_FORMATS['warning']} 检测到奇数个引号,可能存在格式问题")
# 检查大量转义字符
if content.count('\\') > len(content) / 20:
print(f"{OUTPUT_FORMATS['warning']} 检测到大量转义字符,建议检查内容格式")
try:
relative_path = self._relative_path(full_path)
current_size = self._get_project_size()
existing_size = full_path.stat().st_size if full_path.exists() else 0
if mode == "a":
projected_total = current_size + len(content)
else:
projected_total = current_size - existing_size + len(content)
if PROJECT_MAX_STORAGE_BYTES and projected_total > PROJECT_MAX_STORAGE_BYTES:
return {
"success": False,
"error": "写入失败:超出项目磁盘配额",
"limit_bytes": PROJECT_MAX_STORAGE_BYTES,
"project_size_bytes": current_size,
"attempt_size_bytes": len(content)
}
if self._use_container():
result = self._container_call("write_file", {
"path": relative_path,
"content": content,
"mode": mode
})
if result.get("success"):
action = "覆盖" if mode == "w" else "追加"
print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}")
return result
# 创建父目录
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, mode, encoding='utf-8') as f:
f.write(content)
action = "覆盖" if mode == "w" else "追加"
print(f"{OUTPUT_FORMATS['file']} {action}文件: {relative_path}")
return {
"success": True,
"path": relative_path,
"size": len(content),
"mode": mode
}
except Exception as e:
return {"success": False, "error": str(e)}
def append_file(self, path: str, content: str) -> Dict:
"""追加内容到文件"""
return self.write_file(path, content, mode="a")
def _parse_diff_patch(self, patch_text: str) -> Dict:
"""解析统一diff格式的补丁转换为 apply_modify_blocks 所需的块结构。"""
if not patch_text or "*** Begin Patch" not in patch_text or "*** End Patch" not in patch_text:
return {
"success": False,
"error": "补丁格式错误:缺少 *** Begin Patch / *** End Patch 标记。"
}
start = patch_text.find("*** Begin Patch")
end = patch_text.rfind("*** End Patch")
if end <= start:
return {
"success": False,
"error": "补丁格式错误:结束标记位置异常。"
}
body = patch_text[start + len("*** Begin Patch"):end]
lines = body.splitlines(True) # 保留换行符,便于逐字匹配
blocks: List[Dict] = []
current_block: Optional[Dict] = None
auto_index = 1
id_pattern = re.compile(r"\[id:\s*(\d+)\]", re.IGNORECASE)
for raw_line in lines:
stripped = raw_line.strip()
if not stripped and current_block is None:
continue
if stripped.startswith("@@"):
if current_block:
if not current_block["lines"]:
return {
"success": False,
"error": f"补丁块缺少内容:{current_block.get('header', '').strip()}"
}
blocks.append(current_block)
header = stripped
id_match = id_pattern.search(header)
block_id: Optional[int] = None
if id_match:
try:
block_id = int(id_match.group(1))
except ValueError:
return {
"success": False,
"error": f"补丁块编号必须是整数:{header}"
}
current_block = {"id": block_id, "header": header, "lines": []}
continue
if current_block is None:
if stripped:
return {
"success": False,
"error": "补丁格式错误:在检测到第一个 @@ 块之前出现内容。"
}
continue
if raw_line.startswith("\\ No newline at end of file"):
continue
current_block["lines"].append(raw_line)
if current_block:
if not current_block["lines"]:
return {
"success": False,
"error": f"补丁块缺少内容:{current_block.get('header', '').strip()}"
}
blocks.append(current_block)
if not blocks:
return {
"success": False,
"error": "补丁格式错误:未检测到任何 @@ [id:n] 块。"
}
parsed_blocks: List[Dict] = []
used_indices: Set[int] = set()
for block in blocks:
idx = block["id"]
if idx is None:
while auto_index in used_indices:
auto_index += 1
idx = auto_index
auto_index += 1
elif idx in used_indices:
while idx in used_indices:
idx += 1
auto_index = max(auto_index, idx + 1)
used_indices.add(idx)
old_lines: List[str] = []
new_lines: List[str] = []
has_anchor = False
has_content = False
for line in block["lines"]:
if not line:
continue
prefix = line[0]
if prefix == ' ':
has_anchor = True
old_lines.append(line[1:])
new_lines.append(line[1:])
has_content = True
elif prefix == '-':
has_anchor = True
old_lines.append(line[1:])
has_content = True
elif prefix == '+':
new_lines.append(line[1:])
has_content = True
else:
# 容忍空白符或意外格式,直接作为上下文
old_lines.append(line)
new_lines.append(line)
has_anchor = True
has_content = True
if not has_content:
return {
"success": False,
"error": f"补丁块 {idx} 未包含任何 + / - / 上下文行。"
}
append_only = False
if not has_anchor:
append_only = True
old_text = "".join(old_lines)
new_text = "".join(new_lines)
raw_patch = f"{block['header']}\n{''.join(block['lines'])}"
parsed_blocks.append({
"index": idx,
"old": old_text,
"new": new_text,
"append_only": append_only,
"raw_patch": raw_patch
})
return {
"success": True,
"blocks": parsed_blocks
}
def apply_diff_patch(self, path: str, patch_text: str) -> Dict:
"""解析统一diff并写入文件支持多块依次执行。"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
parse_result = self._parse_diff_patch(patch_text)
if not parse_result.get("success"):
return parse_result
blocks = parse_result.get("blocks") or []
if not blocks:
return {
"success": False,
"error": "未检测到有效的补丁块。"
}
relative_path = str(self._relative_path(full_path))
parsed_blocks: List[Dict] = blocks
block_lookup: Dict[int, Dict] = {block["index"]: block for block in parsed_blocks}
def attach_block_context(entries: Optional[List[Dict]]):
if not entries:
return
for entry in entries:
if not isinstance(entry, dict):
continue
idx = entry.get("index")
if idx is None:
continue
block_info = block_lookup.get(idx)
if not block_info:
continue
patch_text = block_info.get("raw_patch")
if patch_text:
entry.setdefault("block_patch", patch_text)
if "old_text" not in entry:
entry["old_text"] = block_info.get("old")
if "new_text" not in entry:
entry["new_text"] = block_info.get("new")
entry.setdefault("append_only", block_info.get("append_only", False))
append_only_blocks = [b for b in parsed_blocks if b.get("append_only")]
modify_blocks = [
{"index": b["index"], "old": b["old"], "new": b["new"]}
for b in parsed_blocks
if not b.get("append_only")
]
apply_result = {"results": []}
completed_indices: List[int] = []
failed_entries: List[Dict] = []
write_error = None
if modify_blocks:
modify_result = self.apply_modify_blocks(path, modify_blocks)
apply_result.update(modify_result)
completed_indices.extend(modify_result.get("completed", []))
failed_entries.extend(modify_result.get("failed", []))
write_error = modify_result.get("error")
else:
apply_result.update({
"success": True,
"completed": [],
"failed": [],
"results": [],
"write_performed": False,
"error": None
})
results_blocks = apply_result.get("results", []).copy()
append_results: List[Dict] = []
append_bytes = 0
append_lines_total = 0
append_success = True
if append_only_blocks:
try:
with open(full_path, 'a', encoding='utf-8') as f:
for block in append_only_blocks:
chunk = block.get("new", "")
if not chunk:
append_results.append({
"index": block["index"],
"status": "failed",
"reason": "追加块为空"
})
failed_entries.append({
"index": block["index"],
"reason": "追加块为空"
})
append_success = False
continue
f.write(chunk)
added_lines = chunk.count('\n')
if chunk and not chunk.endswith('\n'):
added_lines += 1
append_lines_total += added_lines
append_bytes += len(chunk.encode('utf-8'))
append_results.append({
"index": block["index"],
"status": "success",
"removed_lines": 0,
"added_lines": added_lines
})
completed_indices.append(block["index"])
except Exception as e:
append_success = False
write_error = f"追加写入失败: {e}"
append_results.append({
"index": append_only_blocks[-1]["index"],
"status": "failed",
"reason": str(e)
})
failed_entries.append({
"index": append_only_blocks[-1]["index"],
"reason": str(e)
})
attach_block_context(failed_entries)
total_blocks = len(parsed_blocks)
completed_unique = sorted(set(completed_indices))
summary_parts = [
f"{relative_path} 应用 {total_blocks} 个补丁块",
f"成功 {len(completed_unique)}",
f"失败 {len(failed_entries)}"
]
if append_only_blocks:
summary_parts.append(f"追加 {len(append_only_blocks)} 块,写入 {append_lines_total} 行({append_bytes} 字节)")
if write_error:
summary_parts.append(write_error)
summary = "".join(summary_parts)
results_blocks.extend(append_results)
apply_result["results"] = results_blocks
apply_result["blocks"] = results_blocks
apply_result["path"] = relative_path
apply_result["total_blocks"] = total_blocks
apply_result["summary"] = summary
apply_result["message"] = summary
apply_result["completed"] = completed_unique
apply_result["failed"] = failed_entries
apply_result["append_bytes"] = append_bytes
apply_result["append_blocks"] = len(append_only_blocks)
apply_result["success"] = (
append_success
and apply_result.get("success", True)
and not failed_entries
and not write_error
)
apply_result["error"] = write_error
return apply_result
def apply_modify_blocks(self, path: str, blocks: List[Dict]) -> Dict:
"""
应用批量替换块
Args:
path: 目标文件路径
blocks: [{"index": int, "old": str, "new": str}]
"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
try:
relative_path = self._relative_path(full_path)
if self._use_container():
return self._container_call("apply_modify_blocks", {
"path": relative_path,
"blocks": blocks
})
with open(full_path, 'r', encoding='utf-8') as f:
original_content = f.read()
except Exception as e:
return {"success": False, "error": f"读取文件失败: {e}"}
current_content = original_content
results: List[Dict] = []
completed_indices: List[int] = []
failed_details: List[Dict] = []
write_error = None
for block in blocks:
index = block.get("index")
old_text = block.get("old", "")
new_text = block.get("new", "")
block_result = {
"index": index,
"status": "pending",
"removed_lines": 0,
"added_lines": 0,
"reason": None,
"hint": None
}
if old_text is None or new_text is None:
block_result["status"] = "error"
block_result["reason"] = "缺少 OLD 或 NEW 内容"
block_result["hint"] = "请确保粘贴的补丁包含成对的 <<<OLD>>> / <<<NEW>>> 标记。"
failed_details.append({"index": index, "reason": "缺少 OLD/NEW 标记"})
results.append(block_result)
continue
# 统一换行符,避免 CRLF 与 LF 不一致导致匹配失败
old_text = old_text.replace('\r\n', '\n')
new_text = new_text.replace('\r\n', '\n')
if not old_text:
block_result["status"] = "error"
block_result["reason"] = "OLD 内容不能为空"
block_result["hint"] = "请确认要替换的原文是否准确复制;若多次失败,可改用 terminal_snapshot 查证或使用终端命令/Python 小脚本进行精确替换。"
failed_details.append({"index": index, "reason": "OLD 内容为空"})
results.append(block_result)
continue
position = current_content.find(old_text)
if position == -1:
block_result["status"] = "not_found"
block_result["reason"] = "未找到匹配的原文,请确认是否完全复制"
block_result["hint"] = "请先用 terminal_snapshot 或 grep -n 校验原文;若仍失败,可在说明后改用 run_command/python 进行局部修改。"
failed_details.append({"index": index, "reason": "未找到匹配的原文"})
results.append(block_result)
continue
current_content = (
current_content[:position] +
new_text +
current_content[position + len(old_text):]
)
removed_lines = old_text.count('\n')
added_lines = new_text.count('\n')
if old_text and not old_text.endswith('\n'):
removed_lines += 1
if new_text and not new_text.endswith('\n'):
added_lines += 1
block_result.update({
"status": "success",
"removed_lines": removed_lines if old_text else 0,
"added_lines": added_lines if new_text else 0
})
completed_indices.append(index)
results.append(block_result)
write_performed = False
if completed_indices:
try:
with open(full_path, 'w', encoding='utf-8') as f:
f.write(current_content)
write_performed = True
except Exception as e:
write_error = f"写入文件失败: {e}"
# 写入失败时恢复原始内容
try:
with open(full_path, 'w', encoding='utf-8') as f:
f.write(original_content)
except Exception:
pass
success = bool(completed_indices) and not failed_details and write_error is None
return {
"success": success,
"completed": completed_indices,
"failed": failed_details,
"results": results,
"write_performed": write_performed,
"error": write_error
}
def replace_in_file(self, path: str, old_text: str, new_text: str) -> Dict:
"""替换文件中的内容"""
# 先读取文件
result = self.read_file(path)
if not result["success"]:
return result
content = result["content"]
# === 新增:替换操作的安全检查 ===
if old_text and len(old_text) > 9999999999:
return {
"success": False,
"error": "要替换的文本过长,可能导致性能问题",
"suggestion": "请拆分内容或使用 write_file_diff 提交结构化补丁"
}
if new_text and len(new_text) > 9999999999:
return {
"success": False,
"error": "替换的新文本过长,建议分块处理",
"suggestion": "请将大内容分成多个小的替换操作"
}
# 检查是否包含要替换的内容
if old_text and old_text not in content:
return {"success": False, "error": "未找到要替换的内容"}
# 替换内容
if old_text:
new_content = content.replace(old_text, new_text)
count = content.count(old_text)
else:
# 空文件直接写入新内容
new_content = new_text
count = 1
# 写回文件
result = self.write_file(path, new_content)
if result["success"]:
result["replacements"] = count
print(f"{OUTPUT_FORMATS['file']} 替换了 {count} 处内容")
return result
def clear_file(self, path: str) -> Dict:
"""清空文件内容"""
return self.write_file(path, "", mode="w")
def edit_lines_range(self, path: str, start_line: int, end_line: int, content: str, operation: str) -> Dict:
"""
基于行号编辑文件
Args:
path: 文件路径
start_line: 起始行号从1开始
end_line: 结束行号从1开始包含
content: 新内容
operation: 操作类型 - "replace", "insert", "delete"
"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
if not full_path.is_file():
return {"success": False, "error": "不是文件"}
# 验证行号
if start_line < 1:
return {"success": False, "error": "行号必须从1开始"}
if end_line < start_line:
return {"success": False, "error": "结束行号不能小于起始行号"}
try:
relative_path = self._relative_path(full_path)
if self._use_container():
return self._container_call("edit_lines_range", {
"path": relative_path,
"start_line": start_line,
"end_line": end_line,
"content": content,
"operation": operation
})
# 读取文件内容
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
total_lines = len(lines)
# 检查行号范围
if start_line > total_lines:
if operation == "insert":
# 插入操作允许在文件末尾后插入
lines.extend([''] * (start_line - total_lines - 1))
lines.append(content if content.endswith('\n') else content + '\n')
else:
return {"success": False, "error": f"起始行号 {start_line} 超出文件范围 (共 {total_lines} 行)"}
elif end_line > total_lines:
return {"success": False, "error": f"结束行号 {end_line} 超出文件范围 (共 {total_lines} 行)"}
else:
# 执行操作转换为0基索引
start_idx = start_line - 1
end_idx = end_line
if operation == "replace":
# 替换指定行范围
new_lines = content.split('\n') if '\n' in content else [content]
# 确保每行都有换行符,除了最后一行需要检查原文件格式
formatted_lines = []
for i, line in enumerate(new_lines):
if i < len(new_lines) - 1 or (end_idx < len(lines) and lines[end_idx - 1].endswith('\n')):
formatted_lines.append(line + '\n' if not line.endswith('\n') else line)
else:
formatted_lines.append(line)
lines[start_idx:end_idx] = formatted_lines
affected_lines = end_line - start_line + 1
elif operation == "insert":
# 在指定行前插入内容
new_lines = content.split('\n') if '\n' in content else [content]
formatted_lines = [line + '\n' if not line.endswith('\n') else line for line in new_lines]
lines[start_idx:start_idx] = formatted_lines
affected_lines = len(formatted_lines)
elif operation == "delete":
# 删除指定行范围
affected_lines = end_line - start_line + 1
del lines[start_idx:end_idx]
else:
return {"success": False, "error": f"未知的操作类型: {operation}"}
# 写回文件
with open(full_path, 'w', encoding='utf-8') as f:
f.writelines(lines)
relative_path = str(full_path.relative_to(self.project_path))
# 生成操作描述
if operation == "replace":
operation_desc = f"替换第 {start_line}-{end_line}"
elif operation == "insert":
operation_desc = f"在第 {start_line} 行前插入"
elif operation == "delete":
operation_desc = f"删除第 {start_line}-{end_line}"
print(f"{OUTPUT_FORMATS['file']} {operation_desc}: {relative_path}")
return {
"success": True,
"path": relative_path,
"operation": operation,
"start_line": start_line,
"end_line": end_line,
"affected_lines": affected_lines,
"total_lines_after": len(lines),
"description": operation_desc
}
except Exception as e:
return {"success": False, "error": str(e)}
def list_files(self, path: str = "") -> Dict:
"""列出目录内容"""
if path:
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
else:
full_path = self.project_path
if not full_path.exists():
return {"success": False, "error": "目录不存在"}
if not full_path.is_dir():
return {"success": False, "error": "不是目录"}
try:
files = []
folders = []
for item in full_path.iterdir():
if item.name.startswith('.'):
continue
relative_path = str(item.relative_to(self.project_path))
if item.is_file():
files.append({
"name": item.name,
"path": relative_path,
"size": item.stat().st_size,
"modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat()
})
elif item.is_dir():
folders.append({
"name": item.name,
"path": relative_path
})
return {
"success": True,
"path": str(full_path.relative_to(self.project_path)) if path else ".",
"files": sorted(files, key=lambda x: x["name"]),
"folders": sorted(folders, key=lambda x: x["name"])
}
except Exception as e:
return {"success": False, "error": str(e)}
def get_file_info(self, path: str) -> Dict:
"""获取文件信息"""
valid, error, full_path = self._validate_path(path)
if not valid:
return {"success": False, "error": error}
if not full_path.exists():
return {"success": False, "error": "文件不存在"}
try:
stat = full_path.stat()
relative_path = str(full_path.relative_to(self.project_path))
return {
"success": True,
"path": relative_path,
"name": full_path.name,
"type": "file" if full_path.is_file() else "folder",
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"extension": full_path.suffix if full_path.is_file() else None
}
except Exception as e:
return {"success": False, "error": str(e)}