"""子智能体测试服务。 该服务与主智能体完全隔离,监听 8092 端口,模拟 20 秒后完成任务的过程。 后续可替换为真实的多轮子智能体推理逻辑。 """ import json import os import time from pathlib import Path from flask import Flask, jsonify, request BASE_DIR = Path(__file__).resolve().parent TASKS_ROOT = BASE_DIR / "tasks" STATE_FILE = TASKS_ROOT / "tasks_state.json" STUB_DELAY_SECONDS = int(os.environ.get("SUB_AGENT_STUB_DELAY", "20")) TASKS_ROOT.mkdir(parents=True, exist_ok=True) app = Flask(__name__) TASK_CACHE = {} def load_state(): if STATE_FILE.exists(): try: data = json.loads(STATE_FILE.read_text(encoding="utf-8")) TASK_CACHE.update(data) except json.JSONDecodeError: STATE_FILE.unlink(missing_ok=True) def save_state(): STATE_FILE.write_text(json.dumps(TASK_CACHE, ensure_ascii=False, indent=2), encoding="utf-8") def _touch_result_files(deliverables_dir: Path, summary: str, task_description: str) -> str: deliverables_dir.mkdir(parents=True, exist_ok=True) result_md = deliverables_dir / "result.md" if not result_md.exists(): result_md.write_text( f"# 子智能体测试结果\n\n" f"- 摘要:{summary}\n" f"- 任务:{task_description}\n" f"- 说明:当前为测试服务,已自动生成占位结果。\n", encoding="utf-8" ) payload = { "summary": summary, "task": task_description, "note": "stub result generated by sub_agent server", "timestamp": time.time(), } result_json = deliverables_dir / "result.json" result_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return str(result_json) def _finalize_if_ready(task_id: str) -> dict: record = TASK_CACHE.get(task_id) if not record: return {} if record.get("status") == "completed": return record if time.time() >= record.get("ready_at", 0): deliverables_dir = Path(record["deliverables_dir"]) result_file = _touch_result_files(deliverables_dir, record["summary"], record["task_description"]) record.update( { "success": True, "status": "completed", "message": "测试子智能体已生成占位结果。", "result_file": result_file, "updated_at": time.time(), } ) TASK_CACHE[task_id] = record save_state() return record @app.post("/tasks") def create_task(): data = request.get_json(silent=True) or {} task_id = data.get("task_id") or f"sub_stub_{int(time.time())}" summary = data.get("summary", "未提供摘要") description = data.get("task", "未提供任务说明") deliverables_dir = Path(data.get("deliverables_dir") or (TASKS_ROOT / task_id / "deliverables")) deliverables_dir.mkdir(parents=True, exist_ok=True) record = { "success": True, "task_id": task_id, "status": "running", "message": f"子智能体任务已提交,预计 {STUB_DELAY_SECONDS} 秒后完成。", "deliverables_dir": str(deliverables_dir), "result_file": None, "updated_at": time.time(), "ready_at": time.time() + STUB_DELAY_SECONDS, "summary": summary, "task_description": description, } TASK_CACHE[task_id] = record save_state() return jsonify(record) @app.get("/tasks/") def get_task(task_id: str): record = _finalize_if_ready(task_id) if not record: return jsonify({"success": False, "status": "unknown", "message": "task not found"}), 404 return jsonify(record) def main(): load_state() app.run(host="0.0.0.0", port=8092) if __name__ == "__main__": main()