279 lines
9.5 KiB
Python
279 lines
9.5 KiB
Python
"""上传隔离、扫描与审计工具。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import mimetypes
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
import uuid
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional, TYPE_CHECKING
|
||
|
||
from config import (
|
||
MAX_UPLOAD_SIZE,
|
||
UPLOAD_ALLOWED_EXTENSIONS,
|
||
UPLOAD_CLAMAV_ARGS,
|
||
UPLOAD_CLAMAV_BIN,
|
||
UPLOAD_CLAMAV_ENABLED,
|
||
UPLOAD_CLAMAV_TIMEOUT_SECONDS,
|
||
UPLOAD_SCAN_LOG_SUBDIR,
|
||
)
|
||
from utils.logger import setup_logger
|
||
|
||
if TYPE_CHECKING:
|
||
from modules.user_manager import UserWorkspace
|
||
from werkzeug.datastructures import FileStorage
|
||
|
||
|
||
class UploadSecurityError(Exception):
|
||
"""上传被拒绝或扫描失败时抛出的业务异常。"""
|
||
|
||
def __init__(self, message: str, code: str = "upload_error"):
|
||
super().__init__(message)
|
||
self.code = code
|
||
|
||
|
||
@dataclass
|
||
class StageRecord:
|
||
"""暂存区中的上传文件快照。"""
|
||
|
||
path: Path
|
||
filename: str
|
||
size: int
|
||
sha256: str
|
||
mime: Optional[str]
|
||
|
||
|
||
@dataclass
|
||
class ScanReport:
|
||
"""病毒扫描结果。"""
|
||
|
||
status: str
|
||
engine: str
|
||
signature: Optional[str] = None
|
||
message: Optional[str] = None
|
||
duration_ms: Optional[int] = None
|
||
|
||
@property
|
||
def is_clean(self) -> bool:
|
||
return self.status in {"passed", "skipped"}
|
||
|
||
|
||
class UploadQuarantineManager:
|
||
"""负责将上传文件落地到隔离区、执行安全扫描并记录审计日志。"""
|
||
|
||
def __init__(self, workspace: "UserWorkspace", *, logger_name: Optional[str] = None):
|
||
self.workspace = workspace
|
||
self.quarantine_dir = Path(workspace.quarantine_dir).resolve()
|
||
self.quarantine_dir.mkdir(parents=True, exist_ok=True)
|
||
log_rel = Path(UPLOAD_SCAN_LOG_SUBDIR) / f"{workspace.username}.log"
|
||
self.logger = setup_logger(
|
||
logger_name or f"upload_guard.{workspace.username}",
|
||
str(log_rel),
|
||
)
|
||
self.allowed_extensions = tuple(UPLOAD_ALLOWED_EXTENSIONS)
|
||
self.clamav_enabled = bool(UPLOAD_CLAMAV_ENABLED)
|
||
self.clamav_bin = self._resolve_clamav_bin(UPLOAD_CLAMAV_BIN)
|
||
self.clamav_args = list(UPLOAD_CLAMAV_ARGS)
|
||
self.clamav_timeout = int(max(1, UPLOAD_CLAMAV_TIMEOUT_SECONDS))
|
||
|
||
def process_upload(
|
||
self,
|
||
file_obj: "FileStorage",
|
||
target_path: Path,
|
||
*,
|
||
username: str,
|
||
source: str,
|
||
original_name: str,
|
||
relative_path: str,
|
||
) -> Dict[str, Any]:
|
||
"""保存文件到隔离区、执行扫描并将通过的文件同步到工作目录。"""
|
||
stage = self._stage_file(file_obj, original_name)
|
||
metadata: Dict[str, Any] = {
|
||
"upload_id": uuid.uuid4().hex,
|
||
"username": username,
|
||
"source": source,
|
||
"original_name": original_name,
|
||
"target_path": str(target_path),
|
||
"target_relative": relative_path,
|
||
"staged_path": str(stage.path),
|
||
"size": stage.size,
|
||
"sha256": stage.sha256,
|
||
"mime": stage.mime,
|
||
}
|
||
scan_report: Optional[ScanReport] = None
|
||
promoted_path: Optional[Path] = None
|
||
|
||
try:
|
||
self._enforce_size(stage.size)
|
||
self._enforce_extension(target_path.name)
|
||
scan_report = self._scan(stage.path)
|
||
metadata["scan"] = asdict(scan_report)
|
||
if not scan_report.is_clean:
|
||
metadata["scan_failure_reason"] = scan_report.message or scan_report.signature
|
||
raise UploadSecurityError("安全审核未通过", code="scan_failed")
|
||
promoted_path = self._promote(stage.path, target_path)
|
||
metadata["final_path"] = str(promoted_path)
|
||
self._log_event(True, metadata)
|
||
return {
|
||
"final_path": promoted_path,
|
||
"metadata": metadata,
|
||
}
|
||
except UploadSecurityError as exc:
|
||
metadata["error"] = {
|
||
"code": exc.code,
|
||
"message": str(exc),
|
||
}
|
||
if scan_report:
|
||
metadata.setdefault("scan", asdict(scan_report))
|
||
self._log_event(False, metadata)
|
||
raise
|
||
finally:
|
||
if stage.path.exists():
|
||
try:
|
||
stage.path.unlink()
|
||
except OSError:
|
||
pass
|
||
|
||
# ------------------------------------------------------------------
|
||
# 内部实现
|
||
# ------------------------------------------------------------------
|
||
def _stage_file(self, file_obj: "FileStorage", original_name: str) -> StageRecord:
|
||
suffix = "".join(Path(original_name or "").suffixes[-2:]) or Path(original_name or "").suffix
|
||
unique_name = f"{int(time.time())}_{uuid.uuid4().hex}{suffix or ''}"
|
||
staged_path = self.quarantine_dir / unique_name
|
||
file_obj.save(staged_path)
|
||
size = staged_path.stat().st_size
|
||
sha256 = self._hash_file(staged_path)
|
||
mime, _ = mimetypes.guess_type(original_name or "")
|
||
return StageRecord(staged_path, original_name, size, sha256, mime)
|
||
|
||
def _enforce_size(self, size: int):
|
||
if size > MAX_UPLOAD_SIZE:
|
||
raise UploadSecurityError(
|
||
f"文件大小 {size} 超过上限 {MAX_UPLOAD_SIZE} 字节",
|
||
code="size_exceeded",
|
||
)
|
||
|
||
def _enforce_extension(self, filename: str):
|
||
if not self.allowed_extensions:
|
||
return
|
||
lowered = (filename or "").lower()
|
||
for pattern in self.allowed_extensions:
|
||
if lowered.endswith(pattern):
|
||
return
|
||
raise UploadSecurityError("文件类型不在允许列表中", code="extension_forbidden")
|
||
|
||
def _scan(self, staged_path: Path) -> ScanReport:
|
||
if not self.clamav_enabled:
|
||
return ScanReport(status="skipped", engine="clamdscan", message="已跳过病毒扫描")
|
||
if not self.clamav_bin:
|
||
raise UploadSecurityError("未找到 ClamAV 扫描器,请检查配置", code="scanner_missing")
|
||
|
||
command = [self.clamav_bin] + self.clamav_args + [str(staged_path)]
|
||
start = time.time()
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=self.clamav_timeout,
|
||
check=False,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
return ScanReport(
|
||
status="error",
|
||
engine="clamdscan",
|
||
message="扫描超时",
|
||
duration_ms=int((time.time() - start) * 1000),
|
||
)
|
||
except FileNotFoundError as exc:
|
||
raise UploadSecurityError("ClamAV 扫描器不可用", code="scanner_missing") from exc
|
||
|
||
duration_ms = int((time.time() - start) * 1000)
|
||
stdout = (result.stdout or "").strip()
|
||
stderr = (result.stderr or "").strip()
|
||
|
||
if result.returncode == 0:
|
||
return ScanReport(status="passed", engine="clamdscan", duration_ms=duration_ms)
|
||
|
||
if result.returncode == 1:
|
||
signature = self._extract_signature(stdout) or self._extract_signature(stderr)
|
||
message = signature or "检测到可疑内容"
|
||
return ScanReport(
|
||
status="failed",
|
||
engine="clamdscan",
|
||
signature=signature,
|
||
message=message,
|
||
duration_ms=duration_ms,
|
||
)
|
||
|
||
message = stderr or stdout or f"扫描失败(退出码 {result.returncode})"
|
||
return ScanReport(
|
||
status="error",
|
||
engine="clamdscan",
|
||
message=message,
|
||
duration_ms=duration_ms,
|
||
)
|
||
|
||
def _promote(self, staged_path: Path, target_path: Path) -> Path:
|
||
target_path.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.move(str(staged_path), str(target_path))
|
||
return target_path
|
||
|
||
def _hash_file(self, path: Path) -> str:
|
||
sha256 = hashlib.sha256()
|
||
with path.open("rb") as fh:
|
||
for chunk in iter(lambda: fh.read(1024 * 1024), b""):
|
||
sha256.update(chunk)
|
||
return sha256.hexdigest()
|
||
|
||
def _extract_signature(self, text: str) -> Optional[str]:
|
||
if not text:
|
||
return None
|
||
for raw_line in text.splitlines():
|
||
line = raw_line.strip()
|
||
if not line or "FOUND" not in line.upper():
|
||
continue
|
||
parts = line.split(":", 1)
|
||
tail = parts[-1].strip()
|
||
if tail.upper().endswith("FOUND"):
|
||
signature = tail[: -len("FOUND")].strip()
|
||
if signature:
|
||
return signature
|
||
return None
|
||
|
||
def _resolve_clamav_bin(self, candidate: str) -> Optional[str]:
|
||
if not candidate:
|
||
return None
|
||
candidate = candidate.strip()
|
||
if not candidate:
|
||
return None
|
||
if os.path.isabs(candidate):
|
||
return candidate if Path(candidate).exists() else None
|
||
resolved = shutil.which(candidate)
|
||
return resolved or None
|
||
|
||
def _log_event(self, accepted: bool, payload: Dict[str, Any]):
|
||
entry = {
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"accepted": accepted,
|
||
**payload,
|
||
}
|
||
try:
|
||
self.logger.info("UPLOAD_AUDIT %s", json.dumps(entry, ensure_ascii=False))
|
||
except Exception:
|
||
self.logger.warning("无法写入上传审计日志:%s", entry)
|
||
|
||
|
||
__all__ = [
|
||
"UploadQuarantineManager",
|
||
"UploadSecurityError",
|
||
]
|