agent-Specialization/server/files.py
JOJO d6fb59e1d8 refactor: split web_server into modular architecture
- Refactor 6000+ line web_server.py into server/ module
- Create separate modules: auth, chat, conversation, files, admin, etc.
- Keep web_server.py as backward-compatible entry point
- Add container running status field in user_container_manager
- Improve admin dashboard API with credentials and debug support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-22 09:21:53 +08:00

315 lines
12 KiB
Python

"""文件与GUI文件管理相关路由。"""
from __future__ import annotations
import os
import zipfile
from io import BytesIO
from pathlib import Path
from typing import Dict, Any
from flask import Blueprint, jsonify, request, send_file
from werkzeug.utils import secure_filename
from modules.upload_security import UploadSecurityError
from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record
from .security import rate_limited
from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response
from .utils_common import debug_log
files_bp = Blueprint("files", __name__)
def sanitize_filename_preserve_unicode(filename: str) -> str:
"""在保留中文等字符的同时,移除危险字符和路径成分"""
import re
if not filename:
return ""
cleaned = filename.strip().replace("\x00", "")
if not cleaned:
return ""
cleaned = cleaned.replace("\\", "/").split("/")[-1]
cleaned = re.sub(r'[<>:"\\|?*\n\r\t]', "_", cleaned)
cleaned = cleaned.strip(". ")
if not cleaned:
return ""
return cleaned[:255]
@files_bp.route('/api/files')
@api_login_required
@with_terminal
def get_files(terminal, workspace, username):
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("collapse_workspace") or policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件浏览已被管理员禁用"}), 403
structure = terminal.context_manager.get_project_structure()
return jsonify(structure)
def _format_entry(entry) -> Dict[str, Any]:
return {
"name": entry.name,
"path": entry.path,
"type": entry.type,
"size": entry.size,
"modified_at": entry.modified_at,
"extension": entry.extension,
"is_editable": entry.is_editable,
}
@files_bp.route('/api/gui/files/entries', methods=['GET'])
@api_login_required
@with_terminal
def gui_list_entries(terminal, workspace, username):
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
relative_path = request.args.get('path') or ""
manager = get_gui_manager(workspace)
try:
resolved_path, entries = manager.list_directory(relative_path)
breadcrumb = manager.breadcrumb(resolved_path)
return jsonify({
"success": True,
"data": {
"path": resolved_path,
"breadcrumb": breadcrumb,
"items": [_format_entry(entry) for entry in entries]
}
})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/create', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_create", 30, 60, scope="user")
def gui_create_entry(terminal, workspace, username):
payload = request.get_json() or {}
parent = payload.get('path') or ""
name = payload.get('name') or ""
entry_type = payload.get('type') or "file"
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
manager = get_gui_manager(workspace)
try:
new_path = manager.create_entry(parent, name, entry_type)
return jsonify({"success": True, "path": new_path})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/delete', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_delete", 30, 60, scope="user")
def gui_delete_entries(terminal, workspace, username):
payload = request.get_json() or {}
paths = payload.get('paths') or []
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
manager = get_gui_manager(workspace)
try:
result = manager.delete_entries(paths)
return jsonify({"success": True, "result": result})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/rename', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_rename", 30, 60, scope="user")
def gui_rename_entry(terminal, workspace, username):
payload = request.get_json() or {}
path = payload.get('path')
new_name = payload.get('new_name')
if not path or not new_name:
return jsonify({"success": False, "error": "缺少 path 或 new_name"}), 400
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
manager = get_gui_manager(workspace)
try:
new_path = manager.rename_entry(path, new_name)
return jsonify({"success": True, "path": new_path})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/copy', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_copy", 40, 120, scope="user")
def gui_copy_entries(terminal, workspace, username):
payload = request.get_json() or {}
paths = payload.get('paths') or []
target_dir = payload.get('target_dir') or ""
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_file_manager"):
return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403
manager = get_gui_manager(workspace)
try:
result = manager.copy_entries(paths, target_dir)
return jsonify({"success": True, "result": result})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/move', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_move", 40, 120, scope="user")
def gui_move_entries(terminal, workspace, username):
payload = request.get_json() or {}
paths = payload.get('paths') or []
target_dir = payload.get('target_dir') or ""
manager = get_gui_manager(workspace)
try:
result = manager.move_entries(paths, target_dir)
return jsonify({"success": True, "result": result})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/upload', methods=['POST'])
@api_login_required
@with_terminal
@rate_limited("gui_file_upload", 10, 300, scope="user")
def gui_upload_entry(terminal, workspace, username):
policy = resolve_admin_policy(get_current_user_record())
if policy.get("ui_blocks", {}).get("block_upload"):
return jsonify({"success": False, "error": "文件上传已被管理员禁用"}), 403
if 'file' not in request.files:
return jsonify({"success": False, "error": "未找到文件"}), 400
file_obj = request.files['file']
if not file_obj or not file_obj.filename:
return jsonify({"success": False, "error": "文件名为空"}), 400
current_dir = request.form.get('path') or ""
raw_name = request.form.get('filename') or file_obj.filename
filename = sanitize_filename_preserve_unicode(raw_name) or secure_filename(raw_name)
if not filename:
return jsonify({"success": False, "error": "非法文件名"}), 400
manager = get_gui_manager(workspace)
try:
target_path = manager.prepare_upload(current_dir, filename)
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
try:
relative_path = manager._to_relative(target_path)
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
guard = get_upload_guard(workspace)
try:
result = guard.process_upload(
file_obj,
target_path,
username=username,
source="web_gui",
original_name=raw_name,
relative_path=relative_path,
)
except UploadSecurityError as exc:
return build_upload_error_response(exc)
except Exception as exc:
return jsonify({"success": False, "error": f"保存文件失败: {exc}"}), 500
metadata = result.get("metadata", {})
return jsonify({
"success": True,
"path": relative_path,
"filename": target_path.name,
"scan": metadata.get("scan"),
"sha256": metadata.get("sha256"),
"size": metadata.get("size"),
})
@files_bp.route('/api/gui/files/download', methods=['GET'])
@api_login_required
@with_terminal
def gui_download_entry(terminal, workspace, username):
path = request.args.get('path')
if not path:
return jsonify({"success": False, "error": "缺少 path"}), 400
manager = get_gui_manager(workspace)
try:
target = manager.prepare_download(path)
if target.is_dir():
memory_file = BytesIO()
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(target):
for file in files:
full_path = Path(root) / file
arcname = manager._to_relative(full_path)
zf.write(full_path, arcname=arcname)
memory_file.seek(0)
download_name = f"{target.name}.zip"
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
return send_file(target, as_attachment=True, download_name=target.name)
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/download/batch', methods=['POST'])
@api_login_required
@with_terminal
def gui_download_batch(terminal, workspace, username):
payload = request.get_json() or {}
paths = payload.get('paths') or []
if not paths:
return jsonify({"success": False, "error": "缺少待下载的路径"}), 400
manager = get_gui_manager(workspace)
try:
memory_file = BytesIO()
with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:
for rel in paths:
target = manager.prepare_download(rel)
arc_base = rel.strip('/') or target.name
if target.is_dir():
for root, _, files in os.walk(target):
for file in files:
full_path = Path(root) / file
relative_sub = full_path.relative_to(target)
arcname = Path(arc_base) / relative_sub
zf.write(full_path, arcname=str(arcname))
else:
zf.write(target, arcname=arc_base)
memory_file.seek(0)
download_name = f"selected_{len(paths)}.zip"
return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip')
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
@files_bp.route('/api/gui/files/text', methods=['GET', 'POST'])
@api_login_required
@with_terminal
def gui_text_entry(terminal, workspace, username):
manager = get_gui_manager(workspace)
if request.method == 'GET':
path = request.args.get('path')
if not path:
return jsonify({"success": False, "error": "缺少 path"}), 400
try:
content, modified = manager.read_text(path)
return jsonify({"success": True, "path": path, "content": content, "modified_at": modified})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
payload = request.get_json() or {}
path = payload.get('path')
content = payload.get('content')
if path is None or content is None:
return jsonify({"success": False, "error": "缺少 path 或 content"}), 400
try:
result = manager.write_text(path, content)
return jsonify({"success": True, "data": result})
except Exception as exc:
return jsonify({"success": False, "error": str(exc)}), 400
__all__ = ["files_bp"]