"""文件与GUI文件管理相关路由。""" from __future__ import annotations import os import zipfile from io import BytesIO from pathlib import Path from typing import Dict, Any from flask import Blueprint, jsonify, request, send_file from werkzeug.utils import secure_filename from modules.upload_security import UploadSecurityError from .auth_helpers import api_login_required, resolve_admin_policy, get_current_user_record from .security import rate_limited from .context import with_terminal, get_gui_manager, get_upload_guard, build_upload_error_response from .utils_common import debug_log files_bp = Blueprint("files", __name__) def sanitize_filename_preserve_unicode(filename: str) -> str: """在保留中文等字符的同时,移除危险字符和路径成分""" import re if not filename: return "" cleaned = filename.strip().replace("\x00", "") if not cleaned: return "" cleaned = cleaned.replace("\\", "/").split("/")[-1] cleaned = re.sub(r'[<>:"\\|?*\n\r\t]', "_", cleaned) cleaned = cleaned.strip(". ") if not cleaned: return "" return cleaned[:255] @files_bp.route('/api/files') @api_login_required @with_terminal def get_files(terminal, workspace, username): policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("collapse_workspace") or policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件浏览已被管理员禁用"}), 403 structure = terminal.context_manager.get_project_structure() return jsonify(structure) def _format_entry(entry) -> Dict[str, Any]: return { "name": entry.name, "path": entry.path, "type": entry.type, "size": entry.size, "modified_at": entry.modified_at, "extension": entry.extension, "is_editable": entry.is_editable, } @files_bp.route('/api/gui/files/entries', methods=['GET']) @api_login_required @with_terminal def gui_list_entries(terminal, workspace, username): policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403 relative_path = request.args.get('path') or "" manager = get_gui_manager(workspace) try: resolved_path, entries = manager.list_directory(relative_path) breadcrumb = manager.breadcrumb(resolved_path) return jsonify({ "success": True, "data": { "path": resolved_path, "breadcrumb": breadcrumb, "items": [_format_entry(entry) for entry in entries] } }) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/create', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_create", 30, 60, scope="user") def gui_create_entry(terminal, workspace, username): payload = request.get_json() or {} parent = payload.get('path') or "" name = payload.get('name') or "" entry_type = payload.get('type') or "file" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403 manager = get_gui_manager(workspace) try: new_path = manager.create_entry(parent, name, entry_type) return jsonify({"success": True, "path": new_path}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/delete', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_delete", 30, 60, scope="user") def gui_delete_entries(terminal, workspace, username): payload = request.get_json() or {} paths = payload.get('paths') or [] policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403 manager = get_gui_manager(workspace) try: result = manager.delete_entries(paths) return jsonify({"success": True, "result": result}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/rename', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_rename", 30, 60, scope="user") def gui_rename_entry(terminal, workspace, username): payload = request.get_json() or {} path = payload.get('path') new_name = payload.get('new_name') if not path or not new_name: return jsonify({"success": False, "error": "缺少 path 或 new_name"}), 400 policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403 manager = get_gui_manager(workspace) try: new_path = manager.rename_entry(path, new_name) return jsonify({"success": True, "path": new_path}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/copy', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_copy", 40, 120, scope="user") def gui_copy_entries(terminal, workspace, username): payload = request.get_json() or {} paths = payload.get('paths') or [] target_dir = payload.get('target_dir') or "" policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_file_manager"): return jsonify({"success": False, "error": "文件管理已被管理员禁用"}), 403 manager = get_gui_manager(workspace) try: result = manager.copy_entries(paths, target_dir) return jsonify({"success": True, "result": result}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/move', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_move", 40, 120, scope="user") def gui_move_entries(terminal, workspace, username): payload = request.get_json() or {} paths = payload.get('paths') or [] target_dir = payload.get('target_dir') or "" manager = get_gui_manager(workspace) try: result = manager.move_entries(paths, target_dir) return jsonify({"success": True, "result": result}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/upload', methods=['POST']) @api_login_required @with_terminal @rate_limited("gui_file_upload", 10, 300, scope="user") def gui_upload_entry(terminal, workspace, username): policy = resolve_admin_policy(get_current_user_record()) if policy.get("ui_blocks", {}).get("block_upload"): return jsonify({"success": False, "error": "文件上传已被管理员禁用"}), 403 if 'file' not in request.files: return jsonify({"success": False, "error": "未找到文件"}), 400 file_obj = request.files['file'] if not file_obj or not file_obj.filename: return jsonify({"success": False, "error": "文件名为空"}), 400 current_dir = request.form.get('path') or "" raw_name = request.form.get('filename') or file_obj.filename filename = sanitize_filename_preserve_unicode(raw_name) or secure_filename(raw_name) if not filename: return jsonify({"success": False, "error": "非法文件名"}), 400 manager = get_gui_manager(workspace) try: target_path = manager.prepare_upload(current_dir, filename) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 try: relative_path = manager._to_relative(target_path) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 guard = get_upload_guard(workspace) try: result = guard.process_upload( file_obj, target_path, username=username, source="web_gui", original_name=raw_name, relative_path=relative_path, ) except UploadSecurityError as exc: return build_upload_error_response(exc) except Exception as exc: return jsonify({"success": False, "error": f"保存文件失败: {exc}"}), 500 metadata = result.get("metadata", {}) return jsonify({ "success": True, "path": relative_path, "filename": target_path.name, "scan": metadata.get("scan"), "sha256": metadata.get("sha256"), "size": metadata.get("size"), }) @files_bp.route('/api/gui/files/download', methods=['GET']) @api_login_required @with_terminal def gui_download_entry(terminal, workspace, username): path = request.args.get('path') if not path: return jsonify({"success": False, "error": "缺少 path"}), 400 manager = get_gui_manager(workspace) try: target = manager.prepare_download(path) if target.is_dir(): memory_file = BytesIO() with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(target): for file in files: full_path = Path(root) / file arcname = manager._to_relative(full_path) zf.write(full_path, arcname=arcname) memory_file.seek(0) download_name = f"{target.name}.zip" return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip') return send_file(target, as_attachment=True, download_name=target.name) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/download/batch', methods=['POST']) @api_login_required @with_terminal def gui_download_batch(terminal, workspace, username): payload = request.get_json() or {} paths = payload.get('paths') or [] if not paths: return jsonify({"success": False, "error": "缺少待下载的路径"}), 400 manager = get_gui_manager(workspace) try: memory_file = BytesIO() with zipfile.ZipFile(memory_file, mode='w', compression=zipfile.ZIP_DEFLATED) as zf: for rel in paths: target = manager.prepare_download(rel) arc_base = rel.strip('/') or target.name if target.is_dir(): for root, _, files in os.walk(target): for file in files: full_path = Path(root) / file relative_sub = full_path.relative_to(target) arcname = Path(arc_base) / relative_sub zf.write(full_path, arcname=str(arcname)) else: zf.write(target, arcname=arc_base) memory_file.seek(0) download_name = f"selected_{len(paths)}.zip" return send_file(memory_file, as_attachment=True, download_name=download_name, mimetype='application/zip') except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 @files_bp.route('/api/gui/files/text', methods=['GET', 'POST']) @api_login_required @with_terminal def gui_text_entry(terminal, workspace, username): manager = get_gui_manager(workspace) if request.method == 'GET': path = request.args.get('path') if not path: return jsonify({"success": False, "error": "缺少 path"}), 400 try: content, modified = manager.read_text(path) return jsonify({"success": True, "path": path, "content": content, "modified_at": modified}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 payload = request.get_json() or {} path = payload.get('path') content = payload.get('content') if path is None or content is None: return jsonify({"success": False, "error": "缺少 path 或 content"}), 400 try: result = manager.write_text(path, content) return jsonify({"success": True, "data": result}) except Exception as exc: return jsonify({"success": False, "error": str(exc)}), 400 __all__ = ["files_bp"]