""" JSON解析和修复工具 """ import json import re import logging from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) def parse_json_safely(text: str) -> Dict[str, Any]: """安全解析JSON,带错误修复""" # 首先尝试直接解析 try: return json.loads(text) except json.JSONDecodeError: pass # 尝试修复常见问题 fixed_text = fix_json_common_issues(text) try: return json.loads(fixed_text) except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") logger.debug(f"原始文本: {text[:500]}...") # 尝试更激进的修复 fixed_text = fix_json_aggressive(fixed_text) try: return json.loads(fixed_text) except json.JSONDecodeError: # 最后的尝试:提取JSON部分 json_part = extract_json_from_text(text) if json_part: try: return json.loads(json_part) except: pass # 返回空字典而不是抛出异常 logger.error("无法解析JSON,返回空字典") return {} def fix_json_common_issues(text: str) -> str: """修复常见的JSON问题""" # 移除可能的Markdown代码块标记 text = re.sub(r'^```json\s*', '', text, flags=re.MULTILINE) text = re.sub(r'^```\s*$', '', text, flags=re.MULTILINE) # 移除BOM text = text.lstrip('\ufeff') # 移除控制字符 text = re.sub(r'[\x00-\x1F\x7F]', '', text) # 修复尾随逗号 text = re.sub(r',\s*}', '}', text) text = re.sub(r',\s*]', ']', text) # 修复单引号(JSON只接受双引号) # 但要小心不要替换值中的单引号 text = fix_single_quotes(text) # 修复未加引号的键 text = fix_unquoted_keys(text) # 修复Python的True/False/None text = text.replace('True', 'true') text = text.replace('False', 'false') text = text.replace('None', 'null') # 移除注释 text = remove_json_comments(text) return text.strip() def fix_json_aggressive(text: str) -> str: """更激进的JSON修复""" # 尝试修复断行的字符串 text = re.sub(r'"\s*\n\s*"', '" "', text) # 修复缺失的逗号 # 在 } 或 ] 后面跟着 " 或 { 或 [ 的地方添加逗号 text = re.sub(r'}\s*"', '},\n"', text) text = re.sub(r']\s*"', '],\n"', text) text = re.sub(r'}\s*{', '},\n{', text) text = re.sub(r']\s*\[', '],\n[', text) # 修复缺失的冒号 text = re.sub(r'"([^"]+)"\s*"', r'"\1": "', text) # 确保所有字符串值都被引号包围 # 这个比较复杂,需要小心处理 return text def fix_single_quotes(text: str) -> str: """修复单引号为双引号""" # 使用更智能的方法替换单引号 # 只替换作为字符串边界的单引号 result = [] in_string = False string_char = None i = 0 while i < len(text): char = text[i] if not in_string: if char == "'" and (i == 0 or text[i-1] in ' \n\t:,{['): # 可能是字符串开始 result.append('"') in_string = True string_char = "'" else: result.append(char) else: if char == string_char and (i + 1 >= len(text) or text[i+1] in ' \n\t,}]:'): # 字符串结束 result.append('"') in_string = False string_char = None elif char == '\\' and i + 1 < len(text): # 转义字符 result.append(char) result.append(text[i + 1]) i += 1 else: result.append(char) i += 1 return ''.join(result) def fix_unquoted_keys(text: str) -> str: """修复未加引号的键""" # 匹配形如 key: value 的模式 pattern = r'([,\{\s])([a-zA-Z_][a-zA-Z0-9_]*)\s*:' replacement = r'\1"\2":' return re.sub(pattern, replacement, text) def remove_json_comments(text: str) -> str: """移除JSON中的注释""" # 移除单行注释 // text = re.sub(r'//.*$', '', text, flags=re.MULTILINE) # 移除多行注释 /* */ text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL) return text def extract_json_from_text(text: str) -> Optional[str]: """从文本中提取JSON部分""" # 查找第一个 { 或 [ start_idx = -1 start_char = None for i, char in enumerate(text): if char in '{[': start_idx = i start_char = char break if start_idx == -1: return None # 查找匹配的结束字符 end_char = '}' if start_char == '{' else ']' bracket_count = 0 in_string = False escape = False for i in range(start_idx, len(text)): char = text[i] if escape: escape = False continue if char == '\\': escape = True continue if char == '"' and not escape: in_string = not in_string continue if not in_string: if char == start_char: bracket_count += 1 elif char == end_char: bracket_count -= 1 if bracket_count == 0: return text[start_idx:i+1] return None def validate_json_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> List[str]: """验证JSON是否符合schema""" errors = [] # 简单的schema验证实现 required_fields = schema.get('required', []) properties = schema.get('properties', {}) # 检查必需字段 for field in required_fields: if field not in data: errors.append(f"缺少必需字段: {field}") # 检查字段类型 for field, value in data.items(): if field in properties: expected_type = properties[field].get('type') if expected_type: actual_type = type(value).__name__ type_mapping = { 'string': 'str', 'number': 'float', 'integer': 'int', 'boolean': 'bool', 'array': 'list', 'object': 'dict' } expected_python_type = type_mapping.get(expected_type, expected_type) if actual_type != expected_python_type: # 特殊处理:int可以作为float if not (expected_python_type == 'float' and actual_type == 'int'): errors.append( f"字段 '{field}' 类型错误: " f"期望 {expected_type}, 实际 {actual_type}" ) return errors def merge_json_objects(obj1: Dict[str, Any], obj2: Dict[str, Any], deep: bool = True) -> Dict[str, Any]: """合并两个JSON对象""" result = obj1.copy() for key, value in obj2.items(): if key in result and deep and isinstance(result[key], dict) and isinstance(value, dict): # 深度合并 result[key] = merge_json_objects(result[key], value, deep=True) elif key in result and deep and isinstance(result[key], list) and isinstance(value, list): # 合并列表(去重) result[key] = list(set(result[key] + value)) else: # 直接覆盖 result[key] = value return result def json_to_flat_dict(data: Dict[str, Any], parent_key: str = '', separator: str = '.') -> Dict[str, Any]: """将嵌套的JSON转换为扁平的字典""" items = [] for key, value in data.items(): new_key = f"{parent_key}{separator}{key}" if parent_key else key if isinstance(value, dict): items.extend( json_to_flat_dict(value, new_key, separator).items() ) elif isinstance(value, list): for i, item in enumerate(value): if isinstance(item, dict): items.extend( json_to_flat_dict(item, f"{new_key}[{i}]", separator).items() ) else: items.append((f"{new_key}[{i}]", item)) else: items.append((new_key, value)) return dict(items)