# 文件位置: app/routes/api.py # 文件名: api.py """ API路由 处理研究相关的API请求 """ from flask import Blueprint, request, jsonify, current_app, send_file from app.services.research_manager import ResearchManager from app.services.task_manager import task_manager from app.utils.validators import validate_question, validate_outline_feedback import os api_bp = Blueprint('api', __name__) research_manager = ResearchManager() @api_bp.route('/research', methods=['POST']) def create_research(): """创建新的研究任务""" try: data = request.get_json() # 验证输入 question = data.get('question', '').strip() error = validate_question(question) if error: return jsonify({"error": error}), 400 # 创建研究会话 session = research_manager.create_session(question) # 自动开始研究(可选) auto_start = data.get('auto_start', True) if auto_start: result = research_manager.start_research(session.id) return jsonify({ "session_id": session.id, "status": "started", "message": "研究已开始", "created_at": session.created_at.isoformat() }) else: return jsonify({ "session_id": session.id, "status": "created", "message": "研究会话已创建,等待开始", "created_at": session.created_at.isoformat() }) except Exception as e: current_app.logger.error(f"创建研究失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//start', methods=['POST']) def start_research(session_id): """手动开始研究""" try: result = research_manager.start_research(session_id) return jsonify(result) except ValueError as e: return jsonify({"error": str(e)}), 404 except Exception as e: current_app.logger.error(f"开始研究失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//status', methods=['GET']) def get_research_status(session_id): """获取研究状态""" try: status = research_manager.get_session_status(session_id) if "error" in status: return jsonify(status), 404 # 添加任务信息 tasks = task_manager.get_session_tasks(session_id) status['tasks'] = tasks return jsonify(status) except Exception as e: current_app.logger.error(f"获取状态失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//outline', methods=['GET']) def get_research_outline(session_id): """获取研究大纲""" try: session = research_manager.get_session(session_id) if not session: return jsonify({"error": "Session not found"}), 404 if not session.outline: return jsonify({"error": "Outline not yet created"}), 400 return jsonify({ "main_topic": session.outline.main_topic, "research_questions": session.outline.research_questions, "sub_topics": [ { "id": st.id, "topic": st.topic, "explain": st.explain, "priority": st.priority, "status": st.status } for st in session.outline.sub_topics ], "version": session.outline.version }) except Exception as e: current_app.logger.error(f"获取大纲失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//outline', methods=['PUT']) def update_research_outline(session_id): """更新研究大纲(用户反馈)""" try: data = request.get_json() feedback = data.get('feedback', '').strip() error = validate_outline_feedback(feedback) if error: return jsonify({"error": error}), 400 # TODO: 实现大纲更新逻辑 # 这需要调用AI服务来修改大纲 return jsonify({ "message": "大纲更新请求已接收", "status": "processing" }) except Exception as e: current_app.logger.error(f"更新大纲失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//cancel', methods=['POST']) def cancel_research(session_id): """取消研究""" try: # 取消任务 cancelled_count = task_manager.cancel_session_tasks(session_id) # 更新会话状态 result = research_manager.cancel_research(session_id) if "error" in result: return jsonify(result), 404 result['cancelled_tasks'] = cancelled_count return jsonify(result) except Exception as e: current_app.logger.error(f"取消研究失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//report', methods=['GET']) def get_research_report(session_id): """获取研究报告""" try: format = request.args.get('format', 'json') report_content = research_manager.get_research_report(session_id) if not report_content: return jsonify({"error": "Report not available"}), 404 if format == 'markdown': # 返回Markdown文件 report_path = os.path.join( current_app.config['REPORTS_DIR'], f"{session_id}.md" ) if os.path.exists(report_path): return send_file( report_path, mimetype='text/markdown', as_attachment=True, download_name=f"research_report_{session_id}.md" ) else: # 临时创建文件 with open(report_path, 'w', encoding='utf-8') as f: f.write(report_content) return send_file( report_path, mimetype='text/markdown', as_attachment=True, download_name=f"research_report_{session_id}.md" ) else: # 返回JSON格式 return jsonify({ "session_id": session_id, "report": report_content, "format": "markdown" }) except Exception as e: current_app.logger.error(f"获取报告失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//subtopic/', methods=['GET']) def get_subtopic_detail(session_id, subtopic_id): """获取子主题详情""" try: session = research_manager.get_session(session_id) if not session: return jsonify({"error": "Session not found"}), 404 if not session.outline: return jsonify({"error": "Outline not created"}), 400 # 找到对应的子主题 subtopic = None for st in session.outline.sub_topics: if st.id == subtopic_id: subtopic = st break if not subtopic: return jsonify({"error": "Subtopic not found"}), 404 return jsonify({ "id": subtopic.id, "topic": subtopic.topic, "explain": subtopic.explain, "priority": subtopic.priority, "status": subtopic.status, "search_count": subtopic.search_count, "max_searches": subtopic.max_searches, "progress": subtopic.get_total_searches() / subtopic.max_searches * 100, "has_report": subtopic.report is not None }) except Exception as e: current_app.logger.error(f"获取子主题详情失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research/sessions', methods=['GET']) def list_research_sessions(): """列出所有研究会话""" try: limit = request.args.get('limit', 20, type=int) offset = request.args.get('offset', 0, type=int) sessions = research_manager.list_sessions(limit=limit, offset=offset) return jsonify({ "sessions": sessions, "total": len(sessions), "limit": limit, "offset": offset }) except Exception as e: current_app.logger.error(f"列出会话失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/tasks/status', methods=['GET']) def get_tasks_status(): """获取任务管理器状态""" try: tasks = task_manager.tasks status_counts = { 'pending': 0, 'running': 0, 'completed': 0, 'failed': 0, 'cancelled': 0 } for task in tasks.values(): status_counts[task.status.value] += 1 return jsonify({ "total_tasks": len(tasks), "status_counts": status_counts, "sessions_count": len(task_manager.session_tasks) }) except Exception as e: current_app.logger.error(f"获取任务状态失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/test/connections', methods=['GET']) def test_connections(): """测试API连接(仅开发环境)""" if not current_app.debug: return jsonify({"error": "Not available in production"}), 403 from app.services.search_service import SearchService from app.services.ai_service import AIService results = { "deepseek_api": False, "tavily_api": False, "task_manager": False } try: # 测试DeepSeek API ai_service = AIService() test_result = ai_service.analyze_question_type("test question") results["deepseek_api"] = bool(test_result) except Exception as e: current_app.logger.error(f"DeepSeek API测试失败: {e}") try: # 测试Tavily API search_service = SearchService() results["tavily_api"] = search_service.test_connection() except Exception as e: current_app.logger.error(f"Tavily API测试失败: {e}") # 测试任务管理器 try: # 提交一个测试任务 def test_task(): return "test" task_id = task_manager.submit_task(test_task) status = task_manager.get_task_status(task_id) results["task_manager"] = status is not None except Exception as e: current_app.logger.error(f"任务管理器测试失败: {e}") return jsonify({ "connections": results, "all_connected": all(results.values()) }) # 添加到 app/routes/api.py 的末尾 @api_bp.route('/research//debug', methods=['GET']) def get_debug_logs(session_id): """获取研究会话的调试日志""" try: from app.utils.debug_logger import ai_debug_logger log_type = request.args.get('type', 'all') # all, api_calls, thinking, errors limit = request.args.get('limit', 100, type=int) # 验证会话是否存在 session = research_manager.get_session(session_id) if not session: return jsonify({"error": "Session not found"}), 404 # 获取日志 logs = ai_debug_logger.get_session_logs(session_id, log_type) # 限制返回数量 if limit > 0: logs = logs[-limit:] # 返回最新的N条 return jsonify({ "session_id": session_id, "log_type": log_type, "count": len(logs), "logs": logs }) except Exception as e: current_app.logger.error(f"获取调试日志失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/research//debug/download', methods=['GET']) def download_debug_logs(session_id): """下载调试日志文件""" try: from app.utils.debug_logger import ai_debug_logger import zipfile import io # 验证会话 session = research_manager.get_session(session_id) if not session: return jsonify({"error": "Session not found"}), 404 # 创建ZIP文件 zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: # 添加所有调试文件 debug_dir = os.path.join(ai_debug_logger.debug_dir, session_id) if os.path.exists(debug_dir): for filename in os.listdir(debug_dir): file_path = os.path.join(debug_dir, filename) if os.path.isfile(file_path): zip_file.write(file_path, filename) zip_buffer.seek(0) return send_file( zip_buffer, mimetype='application/zip', as_attachment=True, download_name=f"debug_logs_{session_id}.zip" ) except Exception as e: current_app.logger.error(f"下载调试日志失败: {e}") return jsonify({"error": str(e)}), 500 @api_bp.route('/debug/enable', methods=['POST']) def enable_debug_mode(): """启用调试模式""" try: data = request.get_json() session_id = data.get('session_id') if not session_id: return jsonify({"error": "session_id required"}), 400 # 设置调试会话 from app.utils.debug_logger import ai_debug_logger ai_debug_logger.set_session(session_id) # 如果有socketio,设置它 if hasattr(current_app, 'socketio'): ai_debug_logger.set_socketio(current_app.socketio) return jsonify({ "status": "enabled", "session_id": session_id, "message": "调试模式已启用" }) except Exception as e: current_app.logger.error(f"启用调试模式失败: {e}") return jsonify({"error": str(e)}), 500