496 lines
16 KiB
Python
496 lines
16 KiB
Python
"""Utilities to proxy FileManager operations into user containers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, Optional, Any, TYPE_CHECKING
|
|
|
|
CONTAINER_FILE_SCRIPT = r"""
|
|
import json
|
|
import sys
|
|
import pathlib
|
|
import shutil
|
|
|
|
def _resolve(root: pathlib.Path, rel: str) -> pathlib.Path:
|
|
base = root.resolve()
|
|
target = (base / rel).resolve()
|
|
if not str(target).startswith(str(base)):
|
|
raise ValueError("路径越界: %s" % rel)
|
|
return target
|
|
|
|
def _read_text(target: pathlib.Path):
|
|
with target.open('r', encoding='utf-8') as fh:
|
|
data = fh.read()
|
|
lines = data.splitlines(keepends=True)
|
|
return data, lines
|
|
|
|
def _ensure_file(target: pathlib.Path):
|
|
if not target.exists():
|
|
return {"success": False, "error": "文件不存在"}
|
|
if not target.is_file():
|
|
return {"success": False, "error": "不是文件"}
|
|
return None
|
|
|
|
def _create_file(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
content = payload.get("content") or ""
|
|
with target.open('w', encoding='utf-8') as fh:
|
|
fh.write(content)
|
|
return {"success": True, "path": rel, "size": len(content)}
|
|
|
|
def _delete_file(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
target.unlink()
|
|
return {"success": True, "path": rel, "action": "deleted"}
|
|
|
|
def _rename_file(root, payload):
|
|
old_rel = payload.get("old_path")
|
|
new_rel = payload.get("new_path")
|
|
old = _resolve(root, old_rel)
|
|
new = _resolve(root, new_rel)
|
|
if not old.exists():
|
|
return {"success": False, "error": "原文件不存在"}
|
|
if new.exists():
|
|
return {"success": False, "error": "目标文件已存在"}
|
|
new.parent.mkdir(parents=True, exist_ok=True)
|
|
old.rename(new)
|
|
return {
|
|
"success": True,
|
|
"old_path": old_rel,
|
|
"new_path": new_rel,
|
|
"action": "renamed"
|
|
}
|
|
|
|
def _create_folder(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
if target.exists():
|
|
return {"success": False, "error": "文件夹已存在"}
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
return {"success": True, "path": rel}
|
|
|
|
def _delete_folder(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
if not target.exists():
|
|
return {"success": False, "error": "文件夹不存在"}
|
|
if not target.is_dir():
|
|
return {"success": False, "error": "不是文件夹"}
|
|
shutil.rmtree(target)
|
|
return {"success": True, "path": rel}
|
|
|
|
def _read_file(root, payload):
|
|
rel = payload.get("path")
|
|
limit = payload.get("size_limit")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
size = target.stat().st_size
|
|
if limit and size > limit:
|
|
return {
|
|
"success": False,
|
|
"error": f"文件太大 ({size} 字节),超过限制"
|
|
}
|
|
with target.open('r', encoding='utf-8') as fh:
|
|
content = fh.read()
|
|
return {"success": True, "path": rel, "content": content, "size": size}
|
|
|
|
def _read_text_segment(root, payload):
|
|
rel = payload.get("path")
|
|
start = payload.get("start_line")
|
|
end = payload.get("end_line")
|
|
limit = payload.get("size_limit")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
size = target.stat().st_size
|
|
if limit and size > limit:
|
|
return {
|
|
"success": False,
|
|
"error": f"文件太大 ({size} 字节),超过限制"
|
|
}
|
|
data, lines = _read_text(target)
|
|
total = len(lines)
|
|
line_start = start if start and start > 0 else 1
|
|
line_end = end if end and end >= line_start else total
|
|
if line_start > total:
|
|
return {"success": False, "error": "起始行超出文件长度"}
|
|
line_end = min(line_end, total)
|
|
snippet = "".join(lines[line_start - 1 : line_end])
|
|
return {
|
|
"success": True,
|
|
"path": rel,
|
|
"content": snippet,
|
|
"size": size,
|
|
"line_start": line_start,
|
|
"line_end": line_end,
|
|
"total_lines": total
|
|
}
|
|
|
|
def _search_text(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
data, lines = _read_text(target)
|
|
total = len(lines)
|
|
query = payload.get("query") or ""
|
|
if not query:
|
|
return {"success": False, "error": "缺少搜索关键词"}
|
|
max_matches = payload.get("max_matches") or 10
|
|
before = payload.get("context_before") or 2
|
|
after = payload.get("context_after") or 2
|
|
case_sensitive = bool(payload.get("case_sensitive"))
|
|
query_cmp = query if case_sensitive else query.lower()
|
|
|
|
def contains(text):
|
|
text_cmp = text if case_sensitive else text.lower()
|
|
return query_cmp in text_cmp
|
|
|
|
matches = []
|
|
for idx, line in enumerate(lines, start=1):
|
|
if contains(line):
|
|
win_start = max(1, idx - before)
|
|
win_end = min(total, idx + after)
|
|
if matches and win_start <= matches[-1]["line_end"]:
|
|
matches[-1]["line_end"] = max(matches[-1]["line_end"], win_end)
|
|
matches[-1]["hits"].append(idx)
|
|
else:
|
|
if len(matches) >= max_matches:
|
|
break
|
|
matches.append({
|
|
"line_start": win_start,
|
|
"line_end": win_end,
|
|
"hits": [idx]
|
|
})
|
|
for window in matches:
|
|
snippet_lines = lines[window["line_start"] - 1 : window["line_end"]]
|
|
window["snippet"] = "".join(snippet_lines)
|
|
|
|
return {
|
|
"success": True,
|
|
"path": rel,
|
|
"size": target.stat().st_size,
|
|
"total_lines": total,
|
|
"matches": matches
|
|
}
|
|
|
|
def _extract_segments(root, payload):
|
|
rel = payload.get("path")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
segments = payload.get("segments") or []
|
|
if not segments:
|
|
return {"success": False, "error": "缺少要提取的行区间"}
|
|
_, lines = _read_text(target)
|
|
total = len(lines)
|
|
extracted = []
|
|
for spec in segments:
|
|
start = spec.get("start_line")
|
|
end = spec.get("end_line")
|
|
label = spec.get("label")
|
|
if start is None or end is None:
|
|
return {"success": False, "error": "segments 中缺少 start_line 或 end_line"}
|
|
if start <= 0 or end < start:
|
|
return {"success": False, "error": "行区间不合法"}
|
|
if start > total:
|
|
return {"success": False, "error": f"区间起点 {start} 超出文件行数"}
|
|
end = min(end, total)
|
|
snippet = "".join(lines[start - 1 : end])
|
|
extracted.append({
|
|
"label": label,
|
|
"line_start": start,
|
|
"line_end": end,
|
|
"content": snippet
|
|
})
|
|
return {
|
|
"success": True,
|
|
"path": rel,
|
|
"size": target.stat().st_size,
|
|
"total_lines": total,
|
|
"segments": extracted
|
|
}
|
|
|
|
def _write_file(root, payload):
|
|
rel = payload.get("path")
|
|
content = payload.get("content") or ""
|
|
mode = payload.get("mode") or "w"
|
|
target = _resolve(root, rel)
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
with target.open(mode, encoding='utf-8') as fh:
|
|
fh.write(content)
|
|
return {
|
|
"success": True,
|
|
"path": rel,
|
|
"size": len(content),
|
|
"mode": mode
|
|
}
|
|
|
|
def _apply_modify_blocks(root, payload):
|
|
rel = payload.get("path")
|
|
blocks = payload.get("blocks") or []
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
original, _ = _read_text(target)
|
|
current = original
|
|
results = []
|
|
completed = []
|
|
failed = []
|
|
for block in blocks:
|
|
idx = block.get("index")
|
|
old_text = (block.get("old") or "").replace('\r\n', '\n')
|
|
new_text = (block.get("new") or "").replace('\r\n', '\n')
|
|
record = {
|
|
"index": idx,
|
|
"status": "pending",
|
|
"removed_lines": 0,
|
|
"added_lines": 0,
|
|
"reason": None,
|
|
"hint": None
|
|
}
|
|
if old_text is None or new_text is None:
|
|
record.update({
|
|
"status": "error",
|
|
"reason": "缺少 OLD 或 NEW 内容",
|
|
"hint": "请确认补丁是否完整。"
|
|
})
|
|
failed.append({"index": idx, "reason": "缺少 OLD/NEW"})
|
|
results.append(record)
|
|
continue
|
|
if not old_text:
|
|
record.update({
|
|
"status": "error",
|
|
"reason": "OLD 内容不能为空",
|
|
"hint": "请确认要替换的原文是否准确复制。"
|
|
})
|
|
failed.append({"index": idx, "reason": "OLD 为空"})
|
|
results.append(record)
|
|
continue
|
|
pos = current.find(old_text)
|
|
if pos == -1:
|
|
record.update({
|
|
"status": "not_found",
|
|
"reason": "未找到匹配的原文,请确认是否完全复制",
|
|
"hint": "可使用终端或搜索确认原文。"
|
|
})
|
|
failed.append({"index": idx, "reason": "未找到匹配"})
|
|
results.append(record)
|
|
continue
|
|
current = current[:pos] + new_text + current[pos + len(old_text):]
|
|
removed_lines = old_text.count('\n')
|
|
added_lines = new_text.count('\n')
|
|
if old_text and not old_text.endswith('\n'):
|
|
removed_lines += 1
|
|
if new_text and not new_text.endswith('\n'):
|
|
added_lines += 1
|
|
record.update({
|
|
"status": "success",
|
|
"removed_lines": removed_lines,
|
|
"added_lines": added_lines
|
|
})
|
|
completed.append(idx)
|
|
results.append(record)
|
|
write_performed = False
|
|
error = None
|
|
if completed:
|
|
try:
|
|
with target.open('w', encoding='utf-8') as fh:
|
|
fh.write(current)
|
|
write_performed = True
|
|
except Exception as exc:
|
|
error = f"写入文件失败: {exc}"
|
|
try:
|
|
with target.open('w', encoding='utf-8') as fh:
|
|
fh.write(original)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"success": bool(completed) and not failed and error is None,
|
|
"completed": completed,
|
|
"failed": failed,
|
|
"results": results,
|
|
"write_performed": write_performed,
|
|
"error": error
|
|
}
|
|
|
|
def _edit_lines(root, payload):
|
|
rel = payload.get("path")
|
|
start_line = int(payload.get("start_line") or 1)
|
|
end_line = int(payload.get("end_line") or start_line)
|
|
content = payload.get("content") or ""
|
|
operation = payload.get("operation")
|
|
target = _resolve(root, rel)
|
|
err = _ensure_file(target)
|
|
if err:
|
|
return err
|
|
if start_line < 1:
|
|
return {"success": False, "error": "行号必须从1开始"}
|
|
if end_line < start_line:
|
|
return {"success": False, "error": "结束行号不能小于起始行号"}
|
|
with target.open('r', encoding='utf-8') as fh:
|
|
lines = fh.readlines()
|
|
total = len(lines)
|
|
if start_line > total:
|
|
if operation == "insert":
|
|
lines.extend([''] * (start_line - total - 1))
|
|
lines.append(content if content.endswith('\n') else content + '\n')
|
|
affected = len(content.splitlines() or [''])
|
|
else:
|
|
return {"success": False, "error": f"起始行号 {start_line} 超出文件范围 (共 {total} 行)"}
|
|
else:
|
|
if end_line > total:
|
|
return {"success": False, "error": f"结束行号 {end_line} 超出文件范围 (共 {total} 行)"}
|
|
start_idx = start_line - 1
|
|
end_idx = end_line
|
|
if operation == "replace":
|
|
new_lines = content.split('\n') if '\n' in content else [content]
|
|
formatted = []
|
|
for i, line in enumerate(new_lines):
|
|
if i < len(new_lines) - 1 or (end_idx < len(lines) and lines[end_idx - 1].endswith('\n')):
|
|
formatted.append(line + '\n' if not line.endswith('\n') else line)
|
|
else:
|
|
formatted.append(line)
|
|
lines[start_idx:end_idx] = formatted
|
|
affected = end_line - start_line + 1
|
|
elif operation == "insert":
|
|
new_lines = content.split('\n') if '\n' in content else [content]
|
|
formatted = [line + '\n' if not line.endswith('\n') else line for line in new_lines]
|
|
lines[start_idx:start_idx] = formatted
|
|
affected = len(formatted)
|
|
elif operation == "delete":
|
|
del lines[start_idx:end_idx]
|
|
affected = end_line - start_line + 1
|
|
else:
|
|
return {"success": False, "error": f"未知的操作类型: {operation}"}
|
|
with target.open('w', encoding='utf-8') as fh:
|
|
fh.writelines(lines)
|
|
return {
|
|
"success": True,
|
|
"path": rel,
|
|
"operation": operation,
|
|
"affected_lines": affected
|
|
}
|
|
|
|
HANDLERS = {
|
|
"create_file": _create_file,
|
|
"delete_file": _delete_file,
|
|
"rename_file": _rename_file,
|
|
"create_folder": _create_folder,
|
|
"delete_folder": _delete_folder,
|
|
"read_file": _read_file,
|
|
"read_text_segment": _read_text_segment,
|
|
"search_text": _search_text,
|
|
"extract_segments": _extract_segments,
|
|
"write_file": _write_file,
|
|
"apply_modify_blocks": _apply_modify_blocks,
|
|
"edit_lines_range": _edit_lines,
|
|
}
|
|
|
|
def main():
|
|
raw = sys.stdin.read()
|
|
if not raw:
|
|
raise RuntimeError("空请求")
|
|
request = json.loads(raw)
|
|
root = pathlib.Path(request["root"])
|
|
action = request["action"]
|
|
payload = request.get("payload") or {}
|
|
handler = HANDLERS.get(action)
|
|
if not handler:
|
|
raise RuntimeError(f"未知操作: {action}")
|
|
result = handler(root, payload)
|
|
sys.stdout.write(json.dumps(result, ensure_ascii=False))
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as exc:
|
|
sys.stdout.write(json.dumps({"success": False, "error": str(exc)}, ensure_ascii=False))
|
|
"""
|
|
|
|
if TYPE_CHECKING:
|
|
from modules.user_container_manager import ContainerHandle
|
|
|
|
|
|
class ContainerFileProxy:
|
|
"""Execute file operations inside a Docker container."""
|
|
|
|
def __init__(self, container_session: "ContainerHandle"):
|
|
self.container_session = container_session
|
|
|
|
def is_available(self) -> bool:
|
|
return bool(
|
|
self.container_session
|
|
and self.container_session.mode == "docker"
|
|
and self.container_session.container_name
|
|
)
|
|
|
|
def update_session(self, session: Optional["ContainerHandle"]):
|
|
self.container_session = session
|
|
|
|
def run(self, action: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not self.is_available():
|
|
return {"success": False, "error": "容器未就绪,无法执行文件操作"}
|
|
|
|
session = self.container_session
|
|
docker_bin = session.sandbox_bin or shutil.which("docker")
|
|
if not docker_bin:
|
|
return {"success": False, "error": "未找到 Docker 运行时"}
|
|
|
|
request = {
|
|
"action": action,
|
|
"root": session.mount_path or "/workspace",
|
|
"payload": payload,
|
|
}
|
|
|
|
cmd = [docker_bin, "exec", "-i"]
|
|
if session.mount_path:
|
|
cmd.extend(["-w", session.mount_path])
|
|
cmd.append(session.container_name)
|
|
cmd.extend(["python3", "-c", CONTAINER_FILE_SCRIPT])
|
|
|
|
try:
|
|
completed = subprocess.run(
|
|
cmd,
|
|
input=json.dumps(request, ensure_ascii=False),
|
|
text=True,
|
|
capture_output=True,
|
|
check=False,
|
|
timeout=60,
|
|
)
|
|
except (OSError, subprocess.SubprocessError) as exc:
|
|
return {"success": False, "error": f"容器执行失败: {exc}"}
|
|
|
|
if completed.returncode != 0:
|
|
stderr = (completed.stderr or "").strip()
|
|
stdout = (completed.stdout or "").strip()
|
|
message = stderr or stdout or "未知错误"
|
|
return {"success": False, "error": f"容器返回错误: {message}"}
|
|
|
|
output = completed.stdout or ""
|
|
output = output.strip()
|
|
if not output:
|
|
return {"success": False, "error": "容器未返回任何结果"}
|
|
try:
|
|
return json.loads(output)
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"success": False,
|
|
"error": f"容器响应无法解析: {output[:200]}",
|
|
}
|