"""状态管理器""" import logging from typing import Optional from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class BotState: """机器人状态""" # 当前对话 ID conversation_id: Optional[str] = None # 当前任务 ID task_id: Optional[str] = None # 任务状态:idle, running status: str = "idle" # 轮询偏移量 last_offset: int = 0 def is_idle(self) -> bool: """是否空闲""" return self.status == "idle" def is_running(self) -> bool: """是否运行中""" return self.status == "running" def start_task(self, task_id: str): """开始任务""" self.task_id = task_id self.status = "running" self.last_offset = 0 logger.info(f"任务开始: {task_id}") def finish_task(self): """完成任务""" logger.info(f"任务完成: {self.task_id}") self.task_id = None self.status = "idle" self.last_offset = 0 def set_conversation(self, conversation_id: str): """设置当前对话""" self.conversation_id = conversation_id logger.info(f"切换到对话: {conversation_id}") class StateManager: """状态管理器(单群模式)""" def __init__(self): self.state = BotState() def get_state(self) -> BotState: """获取当前状态""" return self.state def reset(self): """重置状态""" self.state = BotState() logger.info("状态已重置")