- Refactor 6000+ line web_server.py into server/ module - Create separate modules: auth, chat, conversation, files, admin, etc. - Keep web_server.py as backward-compatible entry point - Add container running status field in user_container_manager - Improve admin dashboard API with credentials and debug support 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
315 lines
12 KiB
Python
315 lines
12 KiB
Python
"""文件与GUI文件管理相关路由。"""
|
|
from __future__ import annotations
|
|
import os
|
|
import zipfile
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
|
|
from flask import Blueprint, jsonify, request, send_file
|
|
from werkzeug.utils import secure_filename
|
|
|
|
from modules.upload_security import UploadSecurityError
|
|
from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record
|
|
from .security import rate_limited
|
|
from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response
|
|
from .utils_common import debug_log
|
|
|
|
files_bp = Blueprint("files", __name__)
|
|
|
|
|
|
def sanitize_filename_preserve_unicode(filename: str) -> str:
|
|
"""在保留中文等字符的同时,移除危险字符和路径成分"""
|
|
import re
|
|
if not filename:
|
|
return ""
|
|
cleaned = filename.strip().replace("\x00", "")
|
|
if not cleaned:
|
|
return ""
|
|
cleaned = cleaned.replace("\\", "/").split("/")[-1]
|
|
cleaned = re.sub(r'[<>:"\\|?*\n\r\t]', "_", cleaned)
|
|
cleaned = cleaned.strip(". ")
|
|
if not cleaned:
|
|
return ""
|
|
return cleaned[:255]
|
|
|
|
@files_bp.route('/api/files')
|
|
@api_login_required
|
|
@with_terminal
|
|
def get_files(terminal, workspace, username):
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("collapse_workspace") or policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件浏览已被管理员禁用"}), 403
|
|
structure = terminal.context_manager.get_project_structure()
|
|
return jsonify(structure)
|
|
|
|
|
|
def _format_entry(entry) -> Dict[str, Any]:
|
|
return {
|
|
"name": entry.name,
|
|
"path": entry.path,
|
|
"type": entry.type,
|
|
"size": entry.size,
|
|
"modified_at": entry.modified_at,
|
|
"extension": entry.extension,
|
|
"is_editable": entry.is_editable,
|
|
}
|
|
|
|
|
|
@files_bp.route('/api/gui/files/entries', methods=['GET'])
|
|
@api_login_required
|
|
@with_terminal
|
|
def gui_list_entries(terminal, workspace, username):
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
|
|
relative_path = request.args.get('path') or ""
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
resolved_path, entries = manager.list_directory(relative_path)
|
|
breadcrumb = manager.breadcrumb(resolved_path)
|
|
return jsonify({
|
|
"success": True,
|
|
"data": {
|
|
"path": resolved_path,
|
|
"breadcrumb": breadcrumb,
|
|
"items": [_format_entry(entry) for entry in entries]
|
|
}
|
|
})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/create', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_create", 30, 60, scope="user")
|
|
def gui_create_entry(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
parent = payload.get('path') or ""
|
|
name = payload.get('name') or ""
|
|
entry_type = payload.get('type') or "file"
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
new_path = manager.create_entry(parent, name, entry_type)
|
|
return jsonify({"success": True, "path": new_path})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/delete', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_delete", 30, 60, scope="user")
|
|
def gui_delete_entries(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
paths = payload.get('paths') or []
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
result = manager.delete_entries(paths)
|
|
return jsonify({"success": True, "result": result})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/rename', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_rename", 30, 60, scope="user")
|
|
def gui_rename_entry(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
path = payload.get('path')
|
|
new_name = payload.get('new_name')
|
|
if not path or not new_name:
|
|
return jsonify({"success": False, "error": "缺少 path 或 new_name"}), 400
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
new_path = manager.rename_entry(path, new_name)
|
|
return jsonify({"success": True, "path": new_path})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/copy', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_copy", 40, 120, scope="user")
|
|
def gui_copy_entries(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
paths = payload.get('paths') or []
|
|
target_dir = payload.get('target_dir') or ""
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_file_manager"):
|
|
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
result = manager.copy_entries(paths, target_dir)
|
|
return jsonify({"success": True, "result": result})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/move', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_move", 40, 120, scope="user")
|
|
def gui_move_entries(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
paths = payload.get('paths') or []
|
|
target_dir = payload.get('target_dir') or ""
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
result = manager.move_entries(paths, target_dir)
|
|
return jsonify({"success": True, "result": result})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/upload', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
@rate_limited("gui_file_upload", 10, 300, scope="user")
|
|
def gui_upload_entry(terminal, workspace, username):
|
|
policy = resolve_admin_policy(get_current_user_record())
|
|
if policy.get("ui_blocks", {}).get("block_upload"):
|
|
return jsonify({"success": False, "error": "文件上传已被管理员禁用"}), 403
|
|
if 'file' not in request.files:
|
|
return jsonify({"success": False, "error": "未找到文件"}), 400
|
|
file_obj = request.files['file']
|
|
if not file_obj or not file_obj.filename:
|
|
return jsonify({"success": False, "error": "文件名为空"}), 400
|
|
current_dir = request.form.get('path') or ""
|
|
raw_name = request.form.get('filename') or file_obj.filename
|
|
filename = sanitize_filename_preserve_unicode(raw_name) or secure_filename(raw_name)
|
|
if not filename:
|
|
return jsonify({"success": False, "error": "非法文件名"}), 400
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
target_path = manager.prepare_upload(current_dir, filename)
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
try:
|
|
relative_path = manager._to_relative(target_path)
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
guard = get_upload_guard(workspace)
|
|
try:
|
|
result = guard.process_upload(
|
|
file_obj,
|
|
target_path,
|
|
username=username,
|
|
source="web_gui",
|
|
original_name=raw_name,
|
|
relative_path=relative_path,
|
|
)
|
|
except UploadSecurityError as exc:
|
|
return build_upload_error_response(exc)
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": f"保存文件失败: {exc}"}), 500
|
|
|
|
metadata = result.get("metadata", {})
|
|
return jsonify({
|
|
"success": True,
|
|
"path": relative_path,
|
|
"filename": target_path.name,
|
|
"scan": metadata.get("scan"),
|
|
"sha256": metadata.get("sha256"),
|
|
"size": metadata.get("size"),
|
|
})
|
|
|
|
|
|
@files_bp.route('/api/gui/files/download', methods=['GET'])
|
|
@api_login_required
|
|
@with_terminal
|
|
def gui_download_entry(terminal, workspace, username):
|
|
path = request.args.get('path')
|
|
if not path:
|
|
return jsonify({"success": False, "error": "缺少 path"}), 400
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
target = manager.prepare_download(path)
|
|
if target.is_dir():
|
|
memory_file = BytesIO()
|
|
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for root, dirs, files in os.walk(target):
|
|
for file in files:
|
|
full_path = Path(root) / file
|
|
arcname = manager._to_relative(full_path)
|
|
zf.write(full_path, arcname=arcname)
|
|
memory_file.seek(0)
|
|
download_name = f"{target.name}.zip"
|
|
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
|
|
return send_file(target, as_attachment=True, download_name=target.name)
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/download/batch', methods=['POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
def gui_download_batch(terminal, workspace, username):
|
|
payload = request.get_json() or {}
|
|
paths = payload.get('paths') or []
|
|
if not paths:
|
|
return jsonify({"success": False, "error": "缺少待下载的路径"}), 400
|
|
manager = get_gui_manager(workspace)
|
|
try:
|
|
memory_file = BytesIO()
|
|
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for rel in paths:
|
|
target = manager.prepare_download(rel)
|
|
arc_base = rel.strip('/') or target.name
|
|
if target.is_dir():
|
|
for root, _, files in os.walk(target):
|
|
for file in files:
|
|
full_path = Path(root) / file
|
|
relative_sub = full_path.relative_to(target)
|
|
arcname = Path(arc_base) / relative_sub
|
|
zf.write(full_path, arcname=str(arcname))
|
|
else:
|
|
zf.write(target, arcname=arc_base)
|
|
memory_file.seek(0)
|
|
download_name = f"selected_{len(paths)}.zip"
|
|
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
@files_bp.route('/api/gui/files/text', methods=['GET', 'POST'])
|
|
@api_login_required
|
|
@with_terminal
|
|
def gui_text_entry(terminal, workspace, username):
|
|
manager = get_gui_manager(workspace)
|
|
if request.method == 'GET':
|
|
path = request.args.get('path')
|
|
if not path:
|
|
return jsonify({"success": False, "error": "缺少 path"}), 400
|
|
try:
|
|
content, modified = manager.read_text(path)
|
|
return jsonify({"success": True, "path": path, "content": content, "modified_at": modified})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
payload = request.get_json() or {}
|
|
path = payload.get('path')
|
|
content = payload.get('content')
|
|
if path is None or content is None:
|
|
return jsonify({"success": False, "error": "缺少 path 或 content"}), 400
|
|
try:
|
|
result = manager.write_text(path, content)
|
|
return jsonify({"success": True, "data": result})
|
|
except Exception as exc:
|
|
return jsonify({"success": False, "error": str(exc)}), 400
|
|
|
|
|
|
__all__ = ["files_bp"]
|