180 lines
5.6 KiB
Python
180 lines
5.6 KiB
Python
"""
|
|
格式化工具
|
|
"""
|
|
import re
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
def format_datetime(dt: datetime, format: str = "full") -> str:
|
|
"""格式化日期时间"""
|
|
if format == "full":
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
elif format == "date":
|
|
return dt.strftime("%Y-%m-%d")
|
|
elif format == "time":
|
|
return dt.strftime("%H:%M:%S")
|
|
elif format == "relative":
|
|
return get_relative_time(dt)
|
|
else:
|
|
return dt.isoformat()
|
|
|
|
def get_relative_time(dt: datetime) -> str:
|
|
"""获取相对时间"""
|
|
now = datetime.now()
|
|
delta = now - dt
|
|
|
|
if delta.total_seconds() < 60:
|
|
return "刚刚"
|
|
elif delta.total_seconds() < 3600:
|
|
minutes = int(delta.total_seconds() / 60)
|
|
return f"{minutes}分钟前"
|
|
elif delta.total_seconds() < 86400:
|
|
hours = int(delta.total_seconds() / 3600)
|
|
return f"{hours}小时前"
|
|
elif delta.days < 30:
|
|
return f"{delta.days}天前"
|
|
elif delta.days < 365:
|
|
months = int(delta.days / 30)
|
|
return f"{months}个月前"
|
|
else:
|
|
years = int(delta.days / 365)
|
|
return f"{years}年前"
|
|
|
|
def format_file_size(size_bytes: int) -> str:
|
|
"""格式化文件大小"""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if size_bytes < 1024.0:
|
|
return f"{size_bytes:.2f} {unit}"
|
|
size_bytes /= 1024.0
|
|
return f"{size_bytes:.2f} PB"
|
|
|
|
def format_percentage(value: float, decimals: int = 1) -> str:
|
|
"""格式化百分比"""
|
|
return f"{value:.{decimals}f}%"
|
|
|
|
def format_search_results(results: List[Dict[str, Any]]) -> str:
|
|
"""格式化搜索结果为文本"""
|
|
formatted_lines = []
|
|
|
|
for i, result in enumerate(results, 1):
|
|
formatted_lines.append(f"{i}. {result.get('title', '无标题')}")
|
|
formatted_lines.append(f" URL: {result.get('url', 'N/A')}")
|
|
formatted_lines.append(f" {result.get('snippet', '无摘要')}")
|
|
formatted_lines.append("")
|
|
|
|
return '\n'.join(formatted_lines)
|
|
|
|
def format_outline_text(outline: Dict[str, Any]) -> str:
|
|
"""格式化大纲为文本"""
|
|
lines = []
|
|
|
|
lines.append(f"# {outline.get('main_topic', '研究主题')}")
|
|
lines.append("")
|
|
|
|
lines.append("## 研究问题")
|
|
for i, question in enumerate(outline.get('research_questions', []), 1):
|
|
lines.append(f"{i}. {question}")
|
|
lines.append("")
|
|
|
|
lines.append("## 子主题")
|
|
for i, subtopic in enumerate(outline.get('sub_topics', []), 1):
|
|
lines.append(f"{i}. **{subtopic.get('topic', '')}** ({subtopic.get('priority', '')})")
|
|
lines.append(f" {subtopic.get('explain', '')}")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
def clean_markdown(text: str) -> str:
|
|
"""清理Markdown文本"""
|
|
# 移除多余的空行
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
|
|
# 确保标题前后有空行
|
|
text = re.sub(r'([^\n])\n(#{1,6} )', r'\1\n\n\2', text)
|
|
text = re.sub(r'(#{1,6} [^\n]+)\n([^\n])', r'\1\n\n\2', text)
|
|
|
|
# 修复列表格式
|
|
text = re.sub(r'\n- ', r'\n- ', text)
|
|
text = re.sub(r'\n\* ', r'\n* ', text)
|
|
text = re.sub(r'\n\d+\. ', lambda m: '\n' + m.group(0)[1:], text)
|
|
|
|
return text.strip()
|
|
|
|
def truncate_text(text: str, max_length: int, ellipsis: str = "...") -> str:
|
|
"""截断文本"""
|
|
if len(text) <= max_length:
|
|
return text
|
|
|
|
# 在词边界截断
|
|
truncated = text[:max_length]
|
|
last_space = truncated.rfind(' ')
|
|
|
|
if last_space > max_length * 0.8: # 如果空格在80%位置之后
|
|
truncated = truncated[:last_space]
|
|
|
|
return truncated + ellipsis
|
|
|
|
def highlight_keywords(text: str, keywords: List[str]) -> str:
|
|
"""高亮关键词"""
|
|
for keyword in keywords:
|
|
# 使用正则表达式进行大小写不敏感的替换
|
|
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
|
|
text = pattern.sub(f"**{keyword}**", text)
|
|
|
|
return text
|
|
|
|
def extract_urls(text: str) -> List[str]:
|
|
"""从文本中提取URL"""
|
|
url_pattern = re.compile(
|
|
r'https?://(?:www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b'
|
|
r'(?:[-a-zA-Z0-9()@:%_\+.~#?&/=]*)'
|
|
)
|
|
|
|
urls = url_pattern.findall(text)
|
|
return list(set(urls)) # 去重
|
|
|
|
def format_json_output(data: Any, indent: int = 2) -> str:
|
|
"""格式化JSON输出"""
|
|
import json
|
|
|
|
return json.dumps(
|
|
data,
|
|
ensure_ascii=False,
|
|
indent=indent,
|
|
sort_keys=True,
|
|
default=str # 处理datetime等特殊对象
|
|
)
|
|
|
|
def create_summary(text: str, max_sentences: int = 3) -> str:
|
|
"""创建文本摘要"""
|
|
# 简单的句子分割
|
|
sentences = re.split(r'[。!?.!?]+', text)
|
|
sentences = [s.strip() for s in sentences if s.strip()]
|
|
|
|
# 返回前N个句子
|
|
summary_sentences = sentences[:max_sentences]
|
|
|
|
if len(sentences) > max_sentences:
|
|
return '。'.join(summary_sentences) + '。...'
|
|
else:
|
|
return '。'.join(summary_sentences) + '。'
|
|
|
|
def format_status_message(status: str, phase: Optional[str] = None) -> str:
|
|
"""格式化状态消息"""
|
|
status_messages = {
|
|
"pending": "等待开始",
|
|
"analyzing": "分析问题中",
|
|
"outlining": "制定大纲中",
|
|
"researching": "研究进行中",
|
|
"writing": "撰写报告中",
|
|
"reviewing": "审核内容中",
|
|
"completed": "研究完成",
|
|
"error": "发生错误",
|
|
"cancelled": "已取消"
|
|
}
|
|
|
|
message = status_messages.get(status, status)
|
|
|
|
if phase:
|
|
message = f"{message} - {phase}"
|
|
|
|
return message |