deepresearch/app/utils/formatters.py
2025-07-02 15:35:36 +08:00

180 lines
5.6 KiB
Python

"""
格式化工具
"""
import re
from datetime import datetime
from typing import List, Dict, Any, Optional
def format_datetime(dt: datetime, format: str = "full") -> str:
"""格式化日期时间"""
if format == "full":
return dt.strftime("%Y-%m-%d %H:%M:%S")
elif format == "date":
return dt.strftime("%Y-%m-%d")
elif format == "time":
return dt.strftime("%H:%M:%S")
elif format == "relative":
return get_relative_time(dt)
else:
return dt.isoformat()
def get_relative_time(dt: datetime) -> str:
"""获取相对时间"""
now = datetime.now()
delta = now - dt
if delta.total_seconds() < 60:
return "刚刚"
elif delta.total_seconds() < 3600:
minutes = int(delta.total_seconds() / 60)
return f"{minutes}分钟前"
elif delta.total_seconds() < 86400:
hours = int(delta.total_seconds() / 3600)
return f"{hours}小时前"
elif delta.days < 30:
return f"{delta.days}天前"
elif delta.days < 365:
months = int(delta.days / 30)
return f"{months}个月前"
else:
years = int(delta.days / 365)
return f"{years}年前"
def format_file_size(size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def format_percentage(value: float, decimals: int = 1) -> str:
"""格式化百分比"""
return f"{value:.{decimals}f}%"
def format_search_results(results: List[Dict[str, Any]]) -> str:
"""格式化搜索结果为文本"""
formatted_lines = []
for i, result in enumerate(results, 1):
formatted_lines.append(f"{i}. {result.get('title', '无标题')}")
formatted_lines.append(f" URL: {result.get('url', 'N/A')}")
formatted_lines.append(f" {result.get('snippet', '无摘要')}")
formatted_lines.append("")
return '\n'.join(formatted_lines)
def format_outline_text(outline: Dict[str, Any]) -> str:
"""格式化大纲为文本"""
lines = []
lines.append(f"# {outline.get('main_topic', '研究主题')}")
lines.append("")
lines.append("## 研究问题")
for i, question in enumerate(outline.get('research_questions', []), 1):
lines.append(f"{i}. {question}")
lines.append("")
lines.append("## 子主题")
for i, subtopic in enumerate(outline.get('sub_topics', []), 1):
lines.append(f"{i}. **{subtopic.get('topic', '')}** ({subtopic.get('priority', '')})")
lines.append(f" {subtopic.get('explain', '')}")
return '\n'.join(lines)
def clean_markdown(text: str) -> str:
"""清理Markdown文本"""
# 移除多余的空行
text = re.sub(r'\n{3,}', '\n\n', text)
# 确保标题前后有空行
text = re.sub(r'([^\n])\n(#{1,6} )', r'\1\n\n\2', text)
text = re.sub(r'(#{1,6} [^\n]+)\n([^\n])', r'\1\n\n\2', text)
# 修复列表格式
text = re.sub(r'\n- ', r'\n- ', text)
text = re.sub(r'\n\* ', r'\n* ', text)
text = re.sub(r'\n\d+\. ', lambda m: '\n' + m.group(0)[1:], text)
return text.strip()
def truncate_text(text: str, max_length: int, ellipsis: str = "...") -> str:
"""截断文本"""
if len(text) <= max_length:
return text
# 在词边界截断
truncated = text[:max_length]
last_space = truncated.rfind(' ')
if last_space > max_length * 0.8: # 如果空格在80%位置之后
truncated = truncated[:last_space]
return truncated + ellipsis
def highlight_keywords(text: str, keywords: List[str]) -> str:
"""高亮关键词"""
for keyword in keywords:
# 使用正则表达式进行大小写不敏感的替换
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
text = pattern.sub(f"**{keyword}**", text)
return text
def extract_urls(text: str) -> List[str]:
"""从文本中提取URL"""
url_pattern = re.compile(
r'https?://(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b'
r'(?:[-a-zA-Z0-9()@:%_\+.~#?&/=]*)'
)
urls = url_pattern.findall(text)
return list(set(urls)) # 去重
def format_json_output(data: Any, indent: int = 2) -> str:
"""格式化JSON输出"""
import json
return json.dumps(
data,
ensure_ascii=False,
indent=indent,
sort_keys=True,
default=str # 处理datetime等特殊对象
)
def create_summary(text: str, max_sentences: int = 3) -> str:
"""创建文本摘要"""
# 简单的句子分割
sentences = re.split(r'[。!?.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
# 返回前N个句子
summary_sentences = sentences[:max_sentences]
if len(sentences) > max_sentences:
return ''.join(summary_sentences) + '。...'
else:
return ''.join(summary_sentences) + ''
def format_status_message(status: str, phase: Optional[str] = None) -> str:
"""格式化状态消息"""
status_messages = {
"pending": "等待开始",
"analyzing": "分析问题中",
"outlining": "制定大纲中",
"researching": "研究进行中",
"writing": "撰写报告中",
"reviewing": "审核内容中",
"completed": "研究完成",
"error": "发生错误",
"cancelled": "已取消"
}
message = status_messages.get(status, status)
if phase:
message = f"{message} - {phase}"
return message