deepresearch/所有文件/json_parser.py
2025-07-02 15:35:36 +08:00

283 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
JSON解析和修复工具
"""
import json
import re
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
def parse_json_safely(text: str) -> Dict[str, Any]:
"""安全解析JSON带错误修复"""
# 首先尝试直接解析
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 尝试修复常见问题
fixed_text = fix_json_common_issues(text)
try:
return json.loads(fixed_text)
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
logger.debug(f"原始文本: {text[:500]}...")
# 尝试更激进的修复
fixed_text = fix_json_aggressive(fixed_text)
try:
return json.loads(fixed_text)
except json.JSONDecodeError:
# 最后的尝试提取JSON部分
json_part = extract_json_from_text(text)
if json_part:
try:
return json.loads(json_part)
except:
pass
# 返回空字典而不是抛出异常
logger.error("无法解析JSON返回空字典")
return {}
def fix_json_common_issues(text: str) -> str:
"""修复常见的JSON问题"""
# 移除可能的Markdown代码块标记
text = re.sub(r'^```json\s*', '', text, flags=re.MULTILINE)
text = re.sub(r'^```\s*$', '', text, flags=re.MULTILINE)
# 移除BOM
text = text.lstrip('\ufeff')
# 移除控制字符
text = re.sub(r'[\x00-\x1F\x7F]', '', text)
# 修复尾随逗号
text = re.sub(r',\s*}', '}', text)
text = re.sub(r',\s*]', ']', text)
# 修复单引号JSON只接受双引号
# 但要小心不要替换值中的单引号
text = fix_single_quotes(text)
# 修复未加引号的键
text = fix_unquoted_keys(text)
# 修复Python的True/False/None
text = text.replace('True', 'true')
text = text.replace('False', 'false')
text = text.replace('None', 'null')
# 移除注释
text = remove_json_comments(text)
return text.strip()
def fix_json_aggressive(text: str) -> str:
"""更激进的JSON修复"""
# 尝试修复断行的字符串
text = re.sub(r'"\s*\n\s*"', '" "', text)
# 修复缺失的逗号
# 在 } 或 ] 后面跟着 " 或 { 或 [ 的地方添加逗号
text = re.sub(r'}\s*"', '},\n"', text)
text = re.sub(r']\s*"', '],\n"', text)
text = re.sub(r'}\s*{', '},\n{', text)
text = re.sub(r']\s*\[', '],\n[', text)
# 修复缺失的冒号
text = re.sub(r'"([^"]+)"\s*"', r'"\1": "', text)
# 确保所有字符串值都被引号包围
# 这个比较复杂,需要小心处理
return text
def fix_single_quotes(text: str) -> str:
"""修复单引号为双引号"""
# 使用更智能的方法替换单引号
# 只替换作为字符串边界的单引号
result = []
in_string = False
string_char = None
i = 0
while i < len(text):
char = text[i]
if not in_string:
if char == "'" and (i == 0 or text[i-1] in ' \n\t:,{['):
# 可能是字符串开始
result.append('"')
in_string = True
string_char = "'"
else:
result.append(char)
else:
if char == string_char and (i + 1 >= len(text) or text[i+1] in ' \n\t,}]:'):
# 字符串结束
result.append('"')
in_string = False
string_char = None
elif char == '\\' and i + 1 < len(text):
# 转义字符
result.append(char)
result.append(text[i + 1])
i += 1
else:
result.append(char)
i += 1
return ''.join(result)
def fix_unquoted_keys(text: str) -> str:
"""修复未加引号的键"""
# 匹配形如 key: value 的模式
pattern = r'([,\{\s])([a-zA-Z_][a-zA-Z0-9_]*)\s*:'
replacement = r'\1"\2":'
return re.sub(pattern, replacement, text)
def remove_json_comments(text: str) -> str:
"""移除JSON中的注释"""
# 移除单行注释 //
text = re.sub(r'//.*$', '', text, flags=re.MULTILINE)
# 移除多行注释 /* */
text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
return text
def extract_json_from_text(text: str) -> Optional[str]:
"""从文本中提取JSON部分"""
# 查找第一个 { 或 [
start_idx = -1
start_char = None
for i, char in enumerate(text):
if char in '{[':
start_idx = i
start_char = char
break
if start_idx == -1:
return None
# 查找匹配的结束字符
end_char = '}' if start_char == '{' else ']'
bracket_count = 0
in_string = False
escape = False
for i in range(start_idx, len(text)):
char = text[i]
if escape:
escape = False
continue
if char == '\\':
escape = True
continue
if char == '"' and not escape:
in_string = not in_string
continue
if not in_string:
if char == start_char:
bracket_count += 1
elif char == end_char:
bracket_count -= 1
if bracket_count == 0:
return text[start_idx:i+1]
return None
def validate_json_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> List[str]:
"""验证JSON是否符合schema"""
errors = []
# 简单的schema验证实现
required_fields = schema.get('required', [])
properties = schema.get('properties', {})
# 检查必需字段
for field in required_fields:
if field not in data:
errors.append(f"缺少必需字段: {field}")
# 检查字段类型
for field, value in data.items():
if field in properties:
expected_type = properties[field].get('type')
if expected_type:
actual_type = type(value).__name__
type_mapping = {
'string': 'str',
'number': 'float',
'integer': 'int',
'boolean': 'bool',
'array': 'list',
'object': 'dict'
}
expected_python_type = type_mapping.get(expected_type, expected_type)
if actual_type != expected_python_type:
# 特殊处理int可以作为float
if not (expected_python_type == 'float' and actual_type == 'int'):
errors.append(
f"字段 '{field}' 类型错误: "
f"期望 {expected_type}, 实际 {actual_type}"
)
return errors
def merge_json_objects(obj1: Dict[str, Any], obj2: Dict[str, Any],
deep: bool = True) -> Dict[str, Any]:
"""合并两个JSON对象"""
result = obj1.copy()
for key, value in obj2.items():
if key in result and deep and isinstance(result[key], dict) and isinstance(value, dict):
# 深度合并
result[key] = merge_json_objects(result[key], value, deep=True)
elif key in result and deep and isinstance(result[key], list) and isinstance(value, list):
# 合并列表(去重)
result[key] = list(set(result[key] + value))
else:
# 直接覆盖
result[key] = value
return result
def json_to_flat_dict(data: Dict[str, Any], parent_key: str = '',
separator: str = '.') -> Dict[str, Any]:
"""将嵌套的JSON转换为扁平的字典"""
items = []
for key, value in data.items():
new_key = f"{parent_key}{separator}{key}" if parent_key else key
if isinstance(value, dict):
items.extend(
json_to_flat_dict(value, new_key, separator).items()
)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
items.extend(
json_to_flat_dict(item, f"{new_key}[{i}]", separator).items()
)
else:
items.append((f"{new_key}[{i}]", item))
else:
items.append((new_key, value))
return dict(items)