agent-Specialization/server/utils_common.py
JOJO d6fb59e1d8 refactor: split web_server into modular architecture
- Refactor 6000+ line web_server.py into server/ module
- Create separate modules: auth, chat, conversation, files, admin, etc.
- Keep web_server.py as backward-compatible entry point
- Add container running status field in user_container_manager
- Improve admin dashboard API with credentials and debug support

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-22 09:21:53 +08:00

196 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""通用工具:文本处理、日志写入等(由原 web_server.py 拆分)。"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
from config import LOGS_DIR
# 文件名安全处理
def _sanitize_filename_component(text: str) -> str:
safe = (text or "untitled").strip()
safe = __import__('re').sub(r'[\\/:*?"<>|]+', '_', safe)
return safe or "untitled"
def build_review_lines(messages, limit=None):
"""
将对话消息序列拍平成简化文本。
保留 user / assistant / system 以及 assistant 内的 tool 调用与 tool 消息。
limit 为正整数时,最多返回该数量的行(用于预览)。
"""
lines: List[str] = []
def append_line(text: str):
lines.append(text.rstrip())
def extract_text(content):
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(item.get("text") or "")
elif isinstance(item, str):
parts.append(item)
return "".join(parts)
if isinstance(content, dict):
return content.get("text") or ""
return ""
def append_tool_call(name, args):
try:
args_text = json.dumps(args, ensure_ascii=False)
except Exception:
args_text = str(args)
append_line(f"tool_call{name} {args_text}")
for msg in messages or []:
role = msg.get("role")
base_content_raw = msg.get("content") if isinstance(msg.get("content"), (str, list, dict)) else msg.get("text") or ""
base_content = extract_text(base_content_raw)
if role in ("user", "assistant", "system"):
append_line(f"{role}{base_content}")
if role == "tool":
append_line(f"tool{extract_text(base_content_raw)}")
if role == "assistant":
actions = msg.get("actions") or []
for action in actions:
if action.get("type") != "tool":
continue
tool = action.get("tool") or {}
name = tool.get("name") or "tool"
args = tool.get("arguments")
if args is None:
args = tool.get("argumentSnapshot")
try:
args_text = json.dumps(args, ensure_ascii=False)
except Exception:
args_text = str(args)
append_line(f"tool_call{name} {args_text}")
tool_content = tool.get("content")
if tool_content is None:
if isinstance(tool.get("result"), str):
tool_content = tool.get("result")
elif tool.get("result") is not None:
try:
tool_content = json.dumps(tool.get("result"), ensure_ascii=False)
except Exception:
tool_content = str(tool.get("result"))
elif tool.get("message"):
tool_content = tool.get("message")
else:
tool_content = ""
append_line(f"tool{tool_content}")
if isinstance(limit, int) and limit > 0 and len(lines) >= limit:
return lines[:limit]
tool_calls = msg.get("tool_calls") or []
for tc in tool_calls:
fn = tc.get("function") or {}
name = fn.get("name") or "tool"
args_raw = fn.get("arguments")
try:
args_obj = json.loads(args_raw) if isinstance(args_raw, str) else args_raw
except Exception:
args_obj = args_raw
append_tool_call(name, args_obj)
if isinstance(limit, int) and limit > 0 and len(lines) >= limit:
return lines[:limit]
if isinstance(base_content_raw, list):
for item in base_content_raw:
if isinstance(item, dict) and item.get("type") == "tool_call":
fn = item.get("function") or {}
name = fn.get("name") or "tool"
args_raw = fn.get("arguments")
try:
args_obj = json.loads(args_raw) if isinstance(args_raw, str) else args_raw
except Exception:
args_obj = args_raw
append_tool_call(name, args_obj)
if isinstance(limit, int) and limit > 0 and len(lines) >= limit:
return lines[:limit]
if isinstance(limit, int) and limit > 0 and len(lines) >= limit:
return lines[:limit]
return lines if limit is None else lines[:limit]
# 日志输出
_ORIGINAL_PRINT = print
ENABLE_VERBOSE_CONSOLE = True
def brief_log(message: str):
"""始终输出的简要日志(模型输出/工具调用等关键事件)"""
try:
_ORIGINAL_PRINT(message)
except Exception:
pass
DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log"
CHUNK_BACKEND_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "chunk_backend.log"
CHUNK_FRONTEND_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "chunk_frontend.log"
STREAMING_DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "streaming_debug.log"
def _write_log(file_path: Path, message: str) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with file_path.open('a', encoding='utf-8') as f:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
f.write(f"[{timestamp}] {message}\n")
def debug_log(message):
"""写入调试日志"""
_write_log(DEBUG_LOG_FILE, message)
def log_backend_chunk(conversation_id: str, iteration: int, chunk_index: int, elapsed: float, char_len: int, content_preview: str):
preview = content_preview.replace('\n', '\\n')
_write_log(
CHUNK_BACKEND_LOG_FILE,
f"conv={conversation_id or 'unknown'} iter={iteration} chunk={chunk_index} elapsed={elapsed:.3f}s len={char_len} preview={preview}"
)
def log_frontend_chunk(conversation_id: str, chunk_index: int, elapsed: float, char_len: int, client_ts: float):
_write_log(
CHUNK_FRONTEND_LOG_FILE,
f"conv={conversation_id or 'unknown'} chunk={chunk_index} elapsed={elapsed:.3f}s len={char_len} client_ts={client_ts}"
)
def log_streaming_debug_entry(data: Dict[str, Any]):
try:
serialized = json.dumps(data, ensure_ascii=False)
except Exception:
serialized = str(data)
_write_log(STREAMING_DEBUG_LOG_FILE, serialized)
__all__ = [
"_sanitize_filename_component",
"build_review_lines",
"brief_log",
"debug_log",
"log_backend_chunk",
"log_frontend_chunk",
"log_streaming_debug_entry",
"DEBUG_LOG_FILE",
"CHUNK_BACKEND_LOG_FILE",
"CHUNK_FRONTEND_LOG_FILE",
"STREAMING_DEBUG_LOG_FILE",
]