From 868640b479b06d680981c043b8e7342a44b81b50 Mon Sep 17 00:00:00 2001 From: JOJO <1498581755@qq.com> Date: Fri, 6 Mar 2026 12:31:20 +0800 Subject: [PATCH] feat: add aliyun quota fallback --- config/model_profiles.py | 70 +- scripts/mock_aliyun_quota_server.py | 40 ++ server/chat_flow.py | 827 ++++++++++++---------- static/src/composables/useLegacySocket.ts | 38 +- utils/aliyun_fallback.py | 103 +++ utils/api_client.py | 155 +++- 6 files changed, 864 insertions(+), 369 deletions(-) create mode 100644 scripts/mock_aliyun_quota_server.py create mode 100644 utils/aliyun_fallback.py diff --git a/config/model_profiles.py b/config/model_profiles.py index 109cb15..b894efa 100644 --- a/config/model_profiles.py +++ b/config/model_profiles.py @@ -1,9 +1,32 @@ import os +from pathlib import Path from typing import Optional def _env(name: str, default: str = "") -> str: return os.environ.get(name, default) +def _env_optional(name: str) -> Optional[str]: + value = os.environ.get(name) + if value is None: + # 回退读取 .env(支持运行中更新) + env_path = Path(__file__).resolve().parents[1] / ".env" + if env_path.exists(): + try: + for raw_line in env_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, val = line.split("=", 1) + if key.strip() == name: + value = val.strip().strip('"').strip("'") + break + except Exception: + value = None + if value is None: + return None + value = value.strip() + return value or None + # 模型上下文窗口(单位: token) CONTEXT_WINDOWS = { @@ -19,6 +42,8 @@ CONTEXT_WINDOWS = { # 默认(Kimi) KIMI_BASE = _env("API_BASE_KIMI", _env("AGENT_API_BASE_URL", "https://api.moonshot.cn/v1")) KIMI_KEY = _env("API_KEY_KIMI", _env("AGENT_API_KEY", "")) +KIMI_BASE_OFFICIAL = _env_optional("API_BASE_KIMI_OFFICIAL") +KIMI_KEY_OFFICIAL = _env_optional("API_KEY_KIMI_OFFICIAL") KIMI_FAST_MODEL = _env("MODEL_KIMI_FAST", _env("AGENT_MODEL_ID", "kimi-k2-0905-preview")) KIMI_THINK_MODEL = _env("MODEL_KIMI_THINK", _env("AGENT_THINKING_MODEL_ID", "kimi-k2-thinking")) KIMI_25_MODEL = _env("MODEL_KIMI_25", "kimi-k2.5") @@ -32,12 +57,16 @@ DEEPSEEK_THINK_MODEL = _env("MODEL_DEEPSEEK_THINK", "deepseek-reasoner") # Qwen QWEN_BASE = _env("API_BASE_QWEN", "https://dashscope.aliyuncs.com/compatible-mode/v1") QWEN_KEY = _env("API_KEY_QWEN", _env("DASHSCOPE_API_KEY", "")) +QWEN_BASE_OFFICIAL = _env_optional("API_BASE_QWEN_OFFICIAL") +QWEN_KEY_OFFICIAL = _env_optional("API_KEY_QWEN_OFFICIAL") QWEN_MAX_MODEL = _env("MODEL_QWEN_MAX", "qwen3-max") QWEN_VL_MODEL = _env("MODEL_QWEN_VL", "qwen3.5-plus") # MiniMax MINIMAX_BASE = _env("API_BASE_MINIMAX", "https://api.minimaxi.com/v1") MINIMAX_KEY = _env("API_KEY_MINIMAX", "") +MINIMAX_BASE_OFFICIAL = _env_optional("API_BASE_MINIMAX_OFFICIAL") +MINIMAX_KEY_OFFICIAL = _env_optional("API_KEY_MINIMAX_OFFICIAL") MINIMAX_MODEL = _env("MODEL_MINIMAX", "MiniMax-M2.5") @@ -78,7 +107,7 @@ MODEL_PROFILES = { "model_id": KIMI_25_MODEL, "max_tokens": None, "context_window": CONTEXT_WINDOWS["kimi-k2.5"], - "extra_params": {"thinking": {"type": "enabled"}} + "extra_params": {"thinking": {"type": "enabled"}, "enable_thinking": True} }, "supports_thinking": True, "fast_only": False, @@ -204,6 +233,45 @@ def get_model_profile(key: str) -> dict: if key not in MODEL_PROFILES: raise ValueError(f"未知模型 key: {key}") profile = MODEL_PROFILES[key] + try: + from utils.aliyun_fallback import is_fallback_active + except Exception: + is_fallback_active = None + + if is_fallback_active and is_fallback_active(key): + if key == "kimi-k2.5": + kimi_base_official = _env_optional("API_BASE_KIMI_OFFICIAL") or KIMI_BASE_OFFICIAL + kimi_key_official = _env_optional("API_KEY_KIMI_OFFICIAL") or KIMI_KEY_OFFICIAL + if kimi_base_official and kimi_key_official: + profile = dict(profile) + fast = dict(profile.get("fast") or {}) + thinking = dict(profile.get("thinking") or fast) + fast.update({"base_url": kimi_base_official, "api_key": kimi_key_official}) + thinking.update({"base_url": kimi_base_official, "api_key": kimi_key_official}) + profile["fast"] = fast + profile["thinking"] = thinking + elif key == "qwen3-vl-plus": + qwen_base_official = _env_optional("API_BASE_QWEN_OFFICIAL") or QWEN_BASE_OFFICIAL + qwen_key_official = _env_optional("API_KEY_QWEN_OFFICIAL") or QWEN_KEY_OFFICIAL + if qwen_base_official and qwen_key_official: + profile = dict(profile) + fast = dict(profile.get("fast") or {}) + thinking = dict(profile.get("thinking") or fast) + fast.update({"base_url": qwen_base_official, "api_key": qwen_key_official}) + thinking.update({"base_url": qwen_base_official, "api_key": qwen_key_official}) + profile["fast"] = fast + profile["thinking"] = thinking + elif key == "minimax-m2.5": + minimax_base_official = _env_optional("API_BASE_MINIMAX_OFFICIAL") or MINIMAX_BASE_OFFICIAL + minimax_key_official = _env_optional("API_KEY_MINIMAX_OFFICIAL") or MINIMAX_KEY_OFFICIAL + if minimax_base_official and minimax_key_official: + profile = dict(profile) + fast = dict(profile.get("fast") or {}) + thinking = dict(profile.get("thinking") or fast) + fast.update({"base_url": minimax_base_official, "api_key": minimax_key_official}) + thinking.update({"base_url": minimax_base_official, "api_key": minimax_key_official}) + profile["fast"] = fast + profile["thinking"] = thinking # 基础校验:必须有 fast 段且有 key fast = profile.get("fast") or {} if not fast.get("api_key"): diff --git a/scripts/mock_aliyun_quota_server.py b/scripts/mock_aliyun_quota_server.py new file mode 100644 index 0000000..e0ba4f4 --- /dev/null +++ b/scripts/mock_aliyun_quota_server.py @@ -0,0 +1,40 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer +import json + +HOST = "0.0.0.0" +PORT = 8899 + +ERROR_MESSAGE = "hour allocated quota exceeded" + +class Handler(BaseHTTPRequestHandler): + def _send(self, code: int, payload: dict): + body = json.dumps(payload).encode("utf-8") + self.send_response(code) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_POST(self): + # Consume request body to avoid broken pipe on clients + try: + length = int(self.headers.get("Content-Length", "0")) + except ValueError: + length = 0 + if length: + _ = self.rfile.read(length) + payload = { + "error": { + "message": ERROR_MESSAGE, + "type": "quota_exceeded" + } + } + self._send(429, payload) + + def log_message(self, format, *args): + return + +if __name__ == "__main__": + server = HTTPServer((HOST, PORT), Handler) + print(f"mock aliyun quota server running on http://{HOST}:{PORT}") + server.serve_forever() diff --git a/server/chat_flow.py b/server/chat_flow.py index 2c5e4c1..0b26fb3 100644 --- a/server/chat_flow.py +++ b/server/chat_flow.py @@ -505,6 +505,11 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac context = web_terminal.build_context() messages = web_terminal.build_messages(context, message) tools = web_terminal.define_tools() + try: + profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") + web_terminal.apply_model_profile(profile) + except Exception as exc: + debug_log(f"更新模型配置失败: {exc}") # === 上下文预算与安全校验(避免超出模型上下文) === max_context_tokens = get_model_context_window(getattr(web_terminal, "model_key", None) or "kimi-k2.5") @@ -559,6 +564,8 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac # 设置最大迭代次数(API 可覆盖) max_iterations = getattr(web_terminal, "max_iterations_override", None) or MAX_ITERATIONS_PER_TASK + max_api_retries = 4 + retry_delay_seconds = 10 pending_append = None # {"path": str, "tool_call_id": str, "buffer": str, ...} append_probe_buffer = "" @@ -1198,6 +1205,25 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac 'inline': inline }) maybe_mark_failure_from_message(web_terminal, message) + + async def _wait_retry_delay(delay_seconds: int) -> bool: + """等待重试间隔,同时检查是否收到停止请求。""" + if delay_seconds <= 0: + return False + deadline = time.time() + delay_seconds + while time.time() < deadline: + client_stop_info = get_stop_flag(client_sid, username) + if client_stop_info: + stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info + if stop_requested: + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + clear_stop_flag(client_sid, username) + return True + await asyncio.sleep(0.2) + return False for iteration in range(max_iterations): total_iterations += 1 @@ -1293,308 +1319,393 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac print(f"[API] 第{iteration + 1}次调用 (总工具调用: {total_tool_calls}/{MAX_TOTAL_TOOL_CALLS})") - # 收集流式响应 - async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): - chunk_count += 1 - - # 检查停止标志 - client_stop_info = get_stop_flag(client_sid, username) - if client_stop_info: - stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info - if stop_requested: - debug_log(f"检测到停止请求,中断流处理") - if pending_append: - append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop") - if pending_modify: - modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop") - _cancel_pending_tools(tool_calls) - sender('task_stopped', { - 'message': '命令执行被用户取消', - 'reason': 'user_stop' - }) - clear_stop_flag(client_sid, username) - return - - # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) - usage_info = chunk.get("usage") - if usage_info: - last_usage_payload = usage_info + api_error = None + for api_attempt in range(max_api_retries + 1): + api_error = None + if api_attempt > 0: + full_response = "" + tool_calls = [] + current_thinking = "" + detected_tools = {} + last_usage_payload = None + in_thinking = False + thinking_started = False + thinking_ended = False + text_started = False + text_has_content = False + text_streaming = False + text_chunk_index = 0 + last_text_chunk_time = None + chunk_count = 0 + reasoning_chunks = 0 + content_chunks = 0 + tool_chunks = 0 + append_break_triggered = False + append_result = {"handled": False} + modify_break_triggered = False + modify_result = {"handled": False} + last_finish_reason = None - if "choices" not in chunk: - debug_log(f"Chunk {chunk_count}: 无choices字段") - continue - if not chunk.get("choices"): - debug_log(f"Chunk {chunk_count}: choices为空列表") - continue - choice = chunk["choices"][0] - if not usage_info and isinstance(choice, dict) and choice.get("usage"): - # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) - last_usage_payload = choice.get("usage") - delta = choice.get("delta", {}) - finish_reason = choice.get("finish_reason") - if finish_reason: - last_finish_reason = finish_reason + # 收集流式响应 + async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): + chunk_count += 1 - # 处理思考内容(兼容 reasoning_content / reasoning_details) - reasoning_content = "" - if "reasoning_content" in delta: - reasoning_content = delta.get("reasoning_content") or "" - elif "reasoning_details" in delta: - details = delta.get("reasoning_details") - if isinstance(details, list): - parts = [] - for item in details: - if isinstance(item, dict): - text = item.get("text") - if text: - parts.append(text) - if parts: - reasoning_content = "".join(parts) - if reasoning_content: - reasoning_chunks += 1 - debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") - - if not thinking_started: - in_thinking = True - thinking_started = True - sender('thinking_start', {}) - await asyncio.sleep(0.05) - - current_thinking += reasoning_content - sender('thinking_chunk', {'content': reasoning_content}) + # 检查停止标志 + client_stop_info = get_stop_flag(client_sid, username) + if client_stop_info: + stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info + if stop_requested: + debug_log(f"检测到停止请求,中断流处理") + if pending_append: + append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop") + if pending_modify: + modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop") + _cancel_pending_tools(tool_calls) + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + clear_stop_flag(client_sid, username) + return - # 处理正常内容 - if "content" in delta: - content = delta["content"] - if content: - content_chunks += 1 - debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") + if isinstance(chunk, dict) and chunk.get("error"): + api_error = chunk.get("error") + break + + # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) + usage_info = chunk.get("usage") + if usage_info: + last_usage_payload = usage_info + + if "choices" not in chunk: + debug_log(f"Chunk {chunk_count}: 无choices字段") + continue + if not chunk.get("choices"): + debug_log(f"Chunk {chunk_count}: choices为空列表") + continue + choice = chunk["choices"][0] + if not usage_info and isinstance(choice, dict) and choice.get("usage"): + # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) + last_usage_payload = choice.get("usage") + delta = choice.get("delta", {}) + finish_reason = choice.get("finish_reason") + if finish_reason: + last_finish_reason = finish_reason + + # 处理思考内容(兼容 reasoning_content / reasoning_details) + reasoning_content = "" + if "reasoning_content" in delta: + reasoning_content = delta.get("reasoning_content") or "" + elif "reasoning_details" in delta: + details = delta.get("reasoning_details") + if isinstance(details, list): + parts = [] + for item in details: + if isinstance(item, dict): + text = item.get("text") + if text: + parts.append(text) + if parts: + reasoning_content = "".join(parts) + if reasoning_content: + reasoning_chunks += 1 + debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") + + if not thinking_started: + in_thinking = True + thinking_started = True + sender('thinking_start', {}) + await asyncio.sleep(0.05) + + current_thinking += reasoning_content + sender('thinking_chunk', {'content': reasoning_content}) + + # 处理正常内容 + if "content" in delta: + content = delta["content"] + if content: + content_chunks += 1 + debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") - if in_thinking and not thinking_ended: - in_thinking = False - thinking_ended = True - sender('thinking_end', {'full_content': current_thinking}) - await asyncio.sleep(0.1) + if in_thinking and not thinking_ended: + in_thinking = False + thinking_ended = True + sender('thinking_end', {'full_content': current_thinking}) + await asyncio.sleep(0.1) - expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) - expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) + expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) + expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) - if pending_modify: - if not pending_modify.get("start_seen"): - probe_buffer = pending_modify.get("probe_buffer", "") + content - if len(probe_buffer) > 10000: - probe_buffer = probe_buffer[-10000:] - marker = pending_modify.get("start_marker") - marker_index = probe_buffer.find(marker) - if marker_index == -1: - pending_modify["probe_buffer"] = probe_buffer - continue - after_marker = marker_index + len(marker) - remainder = probe_buffer[after_marker:] - pending_modify["buffer"] = remainder - pending_modify["raw_buffer"] = marker + remainder - pending_modify["start_seen"] = True - pending_modify["detected_blocks"] = set() - pending_modify["probe_buffer"] = "" - if pending_modify.get("display_id"): - sender('update_action', { - 'id': pending_modify["display_id"], - 'status': 'running', - 'preparing_id': pending_modify.get("tool_call_id"), - 'message': f"正在修改 {pending_modify['path']}..." - }) - else: - pending_modify["buffer"] += content - pending_modify["raw_buffer"] += content - - if pending_modify.get("start_seen"): - block_text = pending_modify["buffer"] - for match in re.finditer(r"\[replace:(\d+)\]", block_text): - try: - block_index = int(match.group(1)) - except ValueError: + if pending_modify: + if not pending_modify.get("start_seen"): + probe_buffer = pending_modify.get("probe_buffer", "") + content + if len(probe_buffer) > 10000: + probe_buffer = probe_buffer[-10000:] + marker = pending_modify.get("start_marker") + marker_index = probe_buffer.find(marker) + if marker_index == -1: + pending_modify["probe_buffer"] = probe_buffer continue - detected_blocks = pending_modify.setdefault("detected_blocks", set()) - if block_index not in detected_blocks: - detected_blocks.add(block_index) - if pending_modify.get("display_id"): - sender('update_action', { - 'id': pending_modify["display_id"], - 'status': 'running', - 'preparing_id': pending_modify.get("tool_call_id"), - 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." - }) + after_marker = marker_index + len(marker) + remainder = probe_buffer[after_marker:] + pending_modify["buffer"] = remainder + pending_modify["raw_buffer"] = marker + remainder + pending_modify["start_seen"] = True + pending_modify["detected_blocks"] = set() + pending_modify["probe_buffer"] = "" + if pending_modify.get("display_id"): + sender('update_action', { + 'id': pending_modify["display_id"], + 'status': 'running', + 'preparing_id': pending_modify.get("tool_call_id"), + 'message': f"正在修改 {pending_modify['path']}..." + }) + else: + pending_modify["buffer"] += content + pending_modify["raw_buffer"] += content - if pending_modify.get("start_seen"): - end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) - if end_pos != -1: - pending_modify["end_index"] = end_pos - modify_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并应用修改") - break - continue - elif expecting_modify: - modify_probe_buffer += content - if len(modify_probe_buffer) > 10000: - modify_probe_buffer = modify_probe_buffer[-10000:] + if pending_modify.get("start_seen"): + block_text = pending_modify["buffer"] + for match in re.finditer(r"\[replace:(\d+)\]", block_text): + try: + block_index = int(match.group(1)) + except ValueError: + continue + detected_blocks = pending_modify.setdefault("detected_blocks", set()) + if block_index not in detected_blocks: + detected_blocks.add(block_index) + if pending_modify.get("display_id"): + sender('update_action', { + 'id': pending_modify["display_id"], + 'status': 'running', + 'preparing_id': pending_modify.get("tool_call_id"), + 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." + }) - marker_match = re.search(r"<<>>", modify_probe_buffer) - if marker_match: - detected_raw_path = marker_match.group(1) - detected_path = detected_raw_path.strip() - marker_full = marker_match.group(0) - after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) - remainder = modify_probe_buffer[after_marker_index:] - modify_probe_buffer = "" + if pending_modify.get("start_seen"): + end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) + if end_pos != -1: + pending_modify["end_index"] = end_pos + modify_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并应用修改") + break + continue + elif expecting_modify: + modify_probe_buffer += content + if len(modify_probe_buffer) > 10000: + modify_probe_buffer = modify_probe_buffer[-10000:] + + marker_match = re.search(r"<<>>", modify_probe_buffer) + if marker_match: + detected_raw_path = marker_match.group(1) + detected_path = detected_raw_path.strip() + marker_full = marker_match.group(0) + after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) + remainder = modify_probe_buffer[after_marker_index:] + modify_probe_buffer = "" - if not detected_path: - debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") - continue + if not detected_path: + debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") + continue - pending_modify = { - "path": detected_path, - "tool_call_id": None, - "buffer": remainder, - "raw_buffer": marker_full + remainder, - "start_marker": marker_full, - "end_marker": "<<>>", - "start_seen": True, - "end_index": None, - "display_id": None, - "detected_blocks": set() - } - if hasattr(web_terminal, "pending_modify_request"): - web_terminal.pending_modify_request = {"path": detected_path} - debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") + pending_modify = { + "path": detected_path, + "tool_call_id": None, + "buffer": remainder, + "raw_buffer": marker_full + remainder, + "start_marker": marker_full, + "end_marker": "<<>>", + "start_seen": True, + "end_index": None, + "display_id": None, + "detected_blocks": set() + } + if hasattr(web_terminal, "pending_modify_request"): + web_terminal.pending_modify_request = {"path": detected_path} + debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") - end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) - if end_pos != -1: - pending_modify["end_index"] = end_pos - modify_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并应用修改") - break - continue + end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) + if end_pos != -1: + pending_modify["end_index"] = end_pos + modify_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并应用修改") + break + continue - if pending_append: - pending_append["buffer"] += content + if pending_append: + pending_append["buffer"] += content - if pending_append.get("content_start") is None: - marker_index = pending_append["buffer"].find(pending_append["start_marker"]) - if marker_index != -1: - pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) - debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") + if pending_append.get("content_start") is None: + marker_index = pending_append["buffer"].find(pending_append["start_marker"]) + if marker_index != -1: + pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) + debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") - if pending_append.get("content_start") is not None: - end_index = pending_append["buffer"].find( - pending_append["end_marker"], - pending_append["content_start"] - ) - if end_index != -1: - pending_append["end_index"] = end_index - append_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并写入文件") - break - - # 继续累积追加内容 - continue - elif expecting_append: - append_probe_buffer += content - # 限制缓冲区大小防止过长 - if len(append_probe_buffer) > 10000: - append_probe_buffer = append_probe_buffer[-10000:] - - marker_match = re.search(r"<<>>", append_probe_buffer) - if marker_match: - detected_raw_path = marker_match.group(1) - detected_path = detected_raw_path.strip() - if not detected_path: - append_probe_buffer = append_probe_buffer[marker_match.end():] - continue - marker_full = marker_match.group(0) - after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) - remainder = append_probe_buffer[after_marker_index:] - append_probe_buffer = "" - pending_append = { - "path": detected_path, - "tool_call_id": None, - "buffer": remainder, - "start_marker": marker_full, - "end_marker": "<<>>", - "content_start": 0, - "end_index": None, - "display_id": None - } - if hasattr(web_terminal, "pending_append_request"): - web_terminal.pending_append_request = {"path": detected_path} - debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") - # 检查是否立即包含结束标记 - if pending_append["buffer"]: - end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) + if pending_append.get("content_start") is not None: + end_index = pending_append["buffer"].find( + pending_append["end_marker"], + pending_append["content_start"] + ) if end_index != -1: pending_append["end_index"] = end_index append_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并写入文件") break + + # 继续累积追加内容 continue - - if not text_started: - text_started = True - text_streaming = True - sender('text_start', {}) - brief_log("模型输出了内容") - await asyncio.sleep(0.05) - - if not pending_append: - full_response += content - accumulated_response += content - text_has_content = True - emit_time = time.time() - elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time - last_text_chunk_time = emit_time - text_chunk_index += 1 - log_backend_chunk( - conversation_id, - iteration + 1, - text_chunk_index, - elapsed, - len(content), - content[:32] - ) - sender('text_chunk', { - 'content': content, - 'index': text_chunk_index, - 'elapsed': elapsed - }) - - # 收集工具调用 - 实时发送准备状态 - if "tool_calls" in delta: - tool_chunks += 1 - for tc in delta["tool_calls"]: - found = False - for existing in tool_calls: - if existing.get("index") == tc.get("index"): - if "function" in tc and "arguments" in tc["function"]: - arg_chunk = tc["function"]["arguments"] - existing_fn = existing.get("function", {}) - existing_args = existing_fn.get("arguments", "") - existing_fn["arguments"] = (existing_args or "") + arg_chunk - existing["function"] = existing_fn + elif expecting_append: + append_probe_buffer += content + # 限制缓冲区大小防止过长 + if len(append_probe_buffer) > 10000: + append_probe_buffer = append_probe_buffer[-10000:] - combined_args = existing_fn.get("arguments", "") - tool_id = existing.get("id") or tc.get("id") - tool_name = ( - existing_fn.get("name") - or tc.get("function", {}).get("name", "") - ) - intent_value = extract_intent_from_partial(combined_args) - if ( - intent_value - and tool_id - and detected_tool_intent.get(tool_id) != intent_value - ): + marker_match = re.search(r"<<>>", append_probe_buffer) + if marker_match: + detected_raw_path = marker_match.group(1) + detected_path = detected_raw_path.strip() + if not detected_path: + append_probe_buffer = append_probe_buffer[marker_match.end():] + continue + marker_full = marker_match.group(0) + after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) + remainder = append_probe_buffer[after_marker_index:] + append_probe_buffer = "" + pending_append = { + "path": detected_path, + "tool_call_id": None, + "buffer": remainder, + "start_marker": marker_full, + "end_marker": "<<>>", + "content_start": 0, + "end_index": None, + "display_id": None + } + if hasattr(web_terminal, "pending_append_request"): + web_terminal.pending_append_request = {"path": detected_path} + debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") + # 检查是否立即包含结束标记 + if pending_append["buffer"]: + end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) + if end_index != -1: + pending_append["end_index"] = end_index + append_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并写入文件") + break + continue + + if not text_started: + text_started = True + text_streaming = True + sender('text_start', {}) + brief_log("模型输出了内容") + await asyncio.sleep(0.05) + + if not pending_append: + full_response += content + accumulated_response += content + text_has_content = True + emit_time = time.time() + elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time + last_text_chunk_time = emit_time + text_chunk_index += 1 + log_backend_chunk( + conversation_id, + iteration + 1, + text_chunk_index, + elapsed, + len(content), + content[:32] + ) + sender('text_chunk', { + 'content': content, + 'index': text_chunk_index, + 'elapsed': elapsed + }) + + # 收集工具调用 - 实时发送准备状态 + if "tool_calls" in delta: + tool_chunks += 1 + for tc in delta["tool_calls"]: + found = False + for existing in tool_calls: + if existing.get("index") == tc.get("index"): + if "function" in tc and "arguments" in tc["function"]: + arg_chunk = tc["function"]["arguments"] + existing_fn = existing.get("function", {}) + existing_args = existing_fn.get("arguments", "") + existing_fn["arguments"] = (existing_args or "") + arg_chunk + existing["function"] = existing_fn + + combined_args = existing_fn.get("arguments", "") + tool_id = existing.get("id") or tc.get("id") + tool_name = ( + existing_fn.get("name") + or tc.get("function", {}).get("name", "") + ) + intent_value = extract_intent_from_partial(combined_args) + if ( + intent_value + and tool_id + and detected_tool_intent.get(tool_id) != intent_value + ): + detected_tool_intent[tool_id] = intent_value + brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") + sender('tool_intent', { + 'id': tool_id, + 'name': tool_name, + 'intent': intent_value, + 'conversation_id': conversation_id + }) + debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") + await asyncio.sleep(0.01) + found = True + break + + if not found and tc.get("id"): + tool_id = tc["id"] + tool_name = tc.get("function", {}).get("name", "") + arguments_str = tc.get("function", {}).get("arguments", "") or "" + + # 新工具检测到,立即发送准备事件 + if tool_id not in detected_tools and tool_name: + detected_tools[tool_id] = tool_name + + # 尝试提前提取 intent + intent_value = None + if arguments_str: + intent_value = extract_intent_from_partial(arguments_str) + if intent_value: + detected_tool_intent[tool_id] = intent_value + brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") + + # 立即发送工具准备中事件 + brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") + sender('tool_preparing', { + 'id': tool_id, + 'name': tool_name, + 'message': f'准备调用 {tool_name}...', + 'intent': intent_value, + 'conversation_id': conversation_id + }) + debug_log(f" 发送工具准备事件: {tool_name}") + await asyncio.sleep(0.1) + + tool_calls.append({ + "id": tool_id, + "index": tc.get("index"), + "type": "function", + "function": { + "name": tool_name, + "arguments": arguments_str + } + }) + # 尝试从增量参数中抽取 intent,并单独推送 + if tool_id and arguments_str: + intent_value = extract_intent_from_partial(arguments_str) + if intent_value and detected_tool_intent.get(tool_id) != intent_value: detected_tool_intent[tool_id] = intent_value - brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") sender('tool_intent', { 'id': tool_id, 'name': tool_name, @@ -1603,90 +1714,88 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) - found = True - break - - if not found and tc.get("id"): - tool_id = tc["id"] - tool_name = tc.get("function", {}).get("name", "") - arguments_str = tc.get("function", {}).get("arguments", "") or "" - - # 新工具检测到,立即发送准备事件 - if tool_id not in detected_tools and tool_name: - detected_tools[tool_id] = tool_name - - # 尝试提前提取 intent - intent_value = None - if arguments_str: - intent_value = extract_intent_from_partial(arguments_str) - if intent_value: - detected_tool_intent[tool_id] = intent_value - brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") - - # 立即发送工具准备中事件 - brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") - sender('tool_preparing', { - 'id': tool_id, - 'name': tool_name, - 'message': f'准备调用 {tool_name}...', - 'intent': intent_value, - 'conversation_id': conversation_id - }) - debug_log(f" 发送工具准备事件: {tool_name}") - await asyncio.sleep(0.1) - - tool_calls.append({ - "id": tool_id, - "index": tc.get("index"), - "type": "function", - "function": { - "name": tool_name, - "arguments": arguments_str - } - }) - # 尝试从增量参数中抽取 intent,并单独推送 - if tool_id and arguments_str: - intent_value = extract_intent_from_partial(arguments_str) - if intent_value and detected_tool_intent.get(tool_id) != intent_value: - detected_tool_intent[tool_id] = intent_value - sender('tool_intent', { - 'id': tool_id, - 'name': tool_name, - 'intent': intent_value, - 'conversation_id': conversation_id - }) - debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") - await asyncio.sleep(0.01) - debug_log(f" 新工具: {tool_name}") + debug_log(f" 新工具: {tool_name}") - # 检查是否被停止 - client_stop_info = get_stop_flag(client_sid, username) - if client_stop_info: - stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info - if stop_requested: - debug_log("任务在流处理完成后检测到停止状态") - sender('task_stopped', { - 'message': '命令执行被用户取消', - 'reason': 'user_stop' - }) - _cancel_pending_tools(tool_calls) - clear_stop_flag(client_sid, username) - return + # 检查是否被停止 + client_stop_info = get_stop_flag(client_sid, username) + if client_stop_info: + stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info + if stop_requested: + debug_log("任务在流处理完成后检测到停止状态") + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + _cancel_pending_tools(tool_calls) + clear_stop_flag(client_sid, username) + return - # === API响应完成后只计算输出token === - if last_usage_payload: - try: - web_terminal.context_manager.apply_usage_statistics(last_usage_payload) - debug_log( - f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " - f"completion={last_usage_payload.get('completion_tokens', 0)}, " - f"total={last_usage_payload.get('total_tokens', 0)}" + # === API响应完成后只计算输出token === + if last_usage_payload: + try: + web_terminal.context_manager.apply_usage_statistics(last_usage_payload) + debug_log( + f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " + f"completion={last_usage_payload.get('completion_tokens', 0)}, " + f"total={last_usage_payload.get('total_tokens', 0)}" + ) + except Exception as e: + debug_log(f"Usage统计更新失败: {e}") + else: + debug_log("未获取到usage字段,跳过token统计更新") + + + if api_error: + error_message = "" + error_status = None + error_type = None + if isinstance(api_error, dict): + error_status = api_error.get("status_code") + error_type = api_error.get("error_type") + error_message = api_error.get("error_message") or api_error.get("error_text") or "" + if not error_message: + error_message = "API 请求失败" + # 若命中阿里云配额错误,立即写入状态并切换到官方 API + try: + from utils.aliyun_fallback import compute_disabled_until, set_disabled_until + disabled_until, reason = compute_disabled_until(error_message) + if disabled_until and reason: + set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason) + profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") + web_terminal.apply_model_profile(profile) + except Exception as exc: + debug_log(f"处理阿里云配额回退失败: {exc}") + can_retry = ( + api_attempt < max_api_retries + and not full_response + and not tool_calls + and not current_thinking + and not pending_append + and not pending_modify ) - except Exception as e: - debug_log(f"Usage统计更新失败: {e}") - else: - debug_log("未获取到usage字段,跳过token统计更新") - + sender('error', { + 'message': error_message, + 'status_code': error_status, + 'error_type': error_type, + 'retry': bool(can_retry), + 'retry_in': retry_delay_seconds if can_retry else None, + 'attempt': api_attempt + 1, + 'max_attempts': max_api_retries + 1 + }) + if can_retry: + try: + profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") + web_terminal.apply_model_profile(profile) + except Exception as exc: + debug_log(f"重试前更新模型配置失败: {exc}") + cancelled = await _wait_retry_delay(retry_delay_seconds) + if cancelled: + return + continue + _cancel_pending_tools(tool_calls) + return + break + # 流结束后的处理 debug_log(f"\n流结束统计:") debug_log(f" 总chunks: {chunk_count}") diff --git a/static/src/composables/useLegacySocket.ts b/static/src/composables/useLegacySocket.ts index b8be6db..b646354 100644 --- a/static/src/composables/useLegacySocket.ts +++ b/static/src/composables/useLegacySocket.ts @@ -1042,6 +1042,10 @@ export async function initializeLegacySocket(ctx: any) { if (!msg) { return; } + if (msg.awaitingFirstContent) { + msg.awaitingFirstContent = false; + msg.generatingLabel = ''; + } const action = { id: data.id, type: 'tool', @@ -1405,7 +1409,10 @@ export async function initializeLegacySocket(ctx: any) { const msg = data?.message || '发生未知错误'; const code = data?.status_code; const errType = data?.error_type; - ctx.addSystemMessage(`错误: ${msg}`); + const shouldRetry = Boolean(data?.retry); + const retryIn = Number(data?.retry_in) || 5; + const retryAttempt = Number(data?.attempt) || 1; + const retryMax = Number(data?.max_attempts) || retryAttempt; if (typeof ctx.uiPushToast === 'function') { ctx.uiPushToast({ title: code ? `API错误 ${code}` : 'API错误', @@ -1413,8 +1420,35 @@ export async function initializeLegacySocket(ctx: any) { type: 'error', duration: 6000 }); + if (shouldRetry) { + ctx.uiPushToast({ + title: '即将重试', + message: `将在 ${retryIn} 秒后重试(第 ${retryAttempt}/${retryMax} 次)`, + type: 'info', + duration: Math.max(retryIn, 1) * 1000 + }); + } + } + if (shouldRetry) { + // 错误后保持停止按钮态,用户可手动停止或等待自动重试 + ctx.stopRequested = false; + ctx.taskInProgress = true; + ctx.streamingMessage = true; + return; + } + + // 最后一次报错:恢复输入状态并清理提示动画 + const msgIndex = typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : -1; + if (msgIndex >= 0 && Array.isArray(ctx.messages)) { + const currentMessage = ctx.messages[msgIndex]; + if (currentMessage && currentMessage.role === 'assistant') { + currentMessage.awaitingFirstContent = false; + currentMessage.generatingLabel = ''; + } + } + if (typeof ctx.chatClearThinkingLocks === 'function') { + ctx.chatClearThinkingLocks(); } - // 仅标记当前流结束,避免状态错乱 ctx.streamingMessage = false; ctx.stopRequested = false; ctx.taskInProgress = false; diff --git a/utils/aliyun_fallback.py b/utils/aliyun_fallback.py new file mode 100644 index 0000000..40f9a9d --- /dev/null +++ b/utils/aliyun_fallback.py @@ -0,0 +1,103 @@ +import json +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Dict, Optional, Tuple + + +FALLBACK_MODELS = {"qwen3-vl-plus", "kimi-k2.5", "minimax-m2.5"} +STATE_PATH = Path(__file__).resolve().parents[1] / "data" / "aliyun_fallback_state.json" + + +def _read_state() -> Dict: + if not STATE_PATH.exists(): + return {"models": {}} + try: + data = json.loads(STATE_PATH.read_text(encoding="utf-8")) + except Exception: + return {"models": {}} + if not isinstance(data, dict): + return {"models": {}} + if "models" not in data or not isinstance(data["models"], dict): + data["models"] = {} + return data + + +def _write_state(data: Dict) -> None: + STATE_PATH.parent.mkdir(parents=True, exist_ok=True) + STATE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + +def get_disabled_until(model_key: str) -> Optional[float]: + data = _read_state() + entry = (data.get("models") or {}).get(model_key) or {} + ts = entry.get("disabled_until") + try: + return float(ts) if ts is not None else None + except (TypeError, ValueError): + return None + + +def is_fallback_active(model_key: str, now_ts: Optional[float] = None) -> bool: + if model_key not in FALLBACK_MODELS: + return False + now_ts = float(now_ts) if now_ts is not None else datetime.now(tz=timezone.utc).timestamp() + disabled_until = get_disabled_until(model_key) + return bool(disabled_until and disabled_until > now_ts) + + +def set_disabled_until(model_key: str, disabled_until_ts: float, reason: str = "") -> None: + if model_key not in FALLBACK_MODELS: + return + data = _read_state() + models = data.setdefault("models", {}) + models[model_key] = { + "disabled_until": float(disabled_until_ts), + "reason": reason, + "updated_at": datetime.now(tz=timezone.utc).timestamp(), + } + _write_state(data) + + +def _next_monday_utc8(now: datetime) -> datetime: + # Monday = 0 + weekday = now.weekday() + days_ahead = (7 - weekday) % 7 + if days_ahead == 0: + days_ahead = 7 + target = (now + timedelta(days=days_ahead)).replace(hour=0, minute=0, second=0, microsecond=0) + return target + + +def _next_month_same_day_utc8(now: datetime) -> datetime: + year = now.year + month = now.month + 1 + if month > 12: + month = 1 + year += 1 + # clamp day to last day of next month + if month == 12: + next_month = datetime(year + 1, 1, 1, tzinfo=now.tzinfo) + else: + next_month = datetime(year, month + 1, 1, tzinfo=now.tzinfo) + last_day = (next_month - timedelta(days=1)).day + day = min(now.day, last_day) + return datetime(year, month, day, 0, 0, 0, tzinfo=now.tzinfo) + + +def compute_disabled_until(error_text: str) -> Tuple[Optional[float], Optional[str]]: + if not error_text: + return None, None + text = str(error_text).lower() + tz8 = timezone(timedelta(hours=8)) + now = datetime.now(tz=tz8) + + if "hour allocated quota exceeded" in text or "每 5 小时请求额度已用完" in text: + until = now + timedelta(hours=5) + return until.astimezone(timezone.utc).timestamp(), "hour_quota" + if "week allocated quota exceeded" in text or "每周请求额度已用完" in text: + until = _next_monday_utc8(now) + return until.astimezone(timezone.utc).timestamp(), "week_quota" + if "month allocated quota exceeded" in text or "每月请求额度已用完" in text: + until = _next_month_same_day_utc8(now) + return until.astimezone(timezone.utc).timestamp(), "month_quota" + return None, None diff --git a/utils/api_client.py b/utils/api_client.py index f9a15c4..0ef2f86 100644 --- a/utils/api_client.py +++ b/utils/api_client.py @@ -6,9 +6,12 @@ import json import asyncio import base64 import mimetypes +import os from typing import List, Dict, Optional, AsyncGenerator, Any from pathlib import Path from datetime import datetime +from pathlib import Path +from typing import Tuple try: from config import ( API_BASE_URL, @@ -78,6 +81,73 @@ class DeepSeekClient: # 请求体落盘目录 self.request_dump_dir = Path(__file__).resolve().parents[1] / "logs" / "api_requests" self.request_dump_dir.mkdir(parents=True, exist_ok=True) + self.debug_log_path = Path(__file__).resolve().parents[1] / "logs" / "api_debug.log" + + def _maybe_mark_aliyun_quota(self, error_text: str) -> None: + if not error_text or not self.model_key: + return + try: + from utils.aliyun_fallback import compute_disabled_until, set_disabled_until + except Exception: + return + disabled_until, reason = compute_disabled_until(error_text) + if disabled_until and reason: + set_disabled_until(self.model_key, disabled_until, reason) + # 立即切换到官方 API(仅在有配置时) + base_env_key = None + key_env_key = None + if self.model_key == "kimi-k2.5": + base_env_key = "API_BASE_KIMI_OFFICIAL" + key_env_key = "API_KEY_KIMI_OFFICIAL" + elif self.model_key == "qwen3-vl-plus": + base_env_key = "API_BASE_QWEN_OFFICIAL" + key_env_key = "API_KEY_QWEN_OFFICIAL" + elif self.model_key == "minimax-m2.5": + base_env_key = "API_BASE_MINIMAX_OFFICIAL" + key_env_key = "API_KEY_MINIMAX_OFFICIAL" + if base_env_key and key_env_key: + official_base = self._resolve_env_value(base_env_key) + official_key = self._resolve_env_value(key_env_key) + if official_base and official_key: + self.fast_api_config["base_url"] = official_base + self.fast_api_config["api_key"] = official_key + self.thinking_api_config["base_url"] = official_base + self.thinking_api_config["api_key"] = official_key + self.api_base_url = official_base + self.api_key = official_key + + def _debug_log(self, payload: Dict[str, Any]) -> None: + try: + entry = { + "ts": datetime.now().isoformat(), + **payload + } + self.debug_log_path.parent.mkdir(parents=True, exist_ok=True) + with self.debug_log_path.open("a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + except Exception: + pass + + def _resolve_env_value(self, name: str) -> Optional[str]: + value = os.environ.get(name) + if value is None: + env_path = Path(__file__).resolve().parents[1] / ".env" + if env_path.exists(): + try: + for raw_line in env_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, val = line.split("=", 1) + if key.strip() == name: + value = val.strip().strip('"').strip("'") + break + except Exception: + value = None + if value is None: + return None + value = value.strip() + return value or None def _print(self, message: str, end: str = "\n", flush: bool = False): """安全的打印函数,在Web模式下不输出""" @@ -568,7 +638,10 @@ class DeepSeekClient: "error_text": error_text, "error_type": None, "error_message": None, - "request_dump": str(dump_path) + "request_dump": str(dump_path), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key } try: parsed = json.loads(error_text) @@ -578,7 +651,20 @@ class DeepSeekClient: self.last_error_info["error_message"] = err.get("message") except Exception: pass - self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") + self._maybe_mark_aliyun_quota(error_text) + self._debug_log({ + "event": "http_error_stream", + "status_code": response.status_code, + "error_text": error_text, + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key, + "request_dump": str(dump_path) + }) + self._print( + f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text} " + f"(base_url={api_config.get('base_url')}, model_id={api_config.get('model_id')})" + ) self._mark_request_error(dump_path, response.status_code, error_text) yield {"error": self.last_error_info} return @@ -607,7 +693,10 @@ class DeepSeekClient: "error_text": error_text, "error_type": None, "error_message": None, - "request_dump": str(dump_path) + "request_dump": str(dump_path), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key } try: parsed = response.json() @@ -617,7 +706,20 @@ class DeepSeekClient: self.last_error_info["error_message"] = err.get("message") except Exception: pass - self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") + self._maybe_mark_aliyun_quota(error_text) + self._debug_log({ + "event": "http_error", + "status_code": response.status_code, + "error_text": error_text, + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key, + "request_dump": str(dump_path) + }) + self._print( + f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text} " + f"(base_url={api_config.get('base_url')}, model_id={api_config.get('model_id')})" + ) self._mark_request_error(dump_path, response.status_code, error_text) yield {"error": self.last_error_info} return @@ -632,8 +734,21 @@ class DeepSeekClient: "error_text": "connect_error", "error_type": "connection_error", "error_message": "无法连接到API服务器", - "request_dump": str(dump_path) + "request_dump": str(dump_path), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key } + self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text")) + self._debug_log({ + "event": "connect_error", + "status_code": None, + "error_text": "connect_error", + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key, + "request_dump": str(dump_path) + }) self._mark_request_error(dump_path, error_text="connect_error") yield {"error": self.last_error_info} except httpx.TimeoutException: @@ -643,8 +758,21 @@ class DeepSeekClient: "error_text": "timeout", "error_type": "timeout", "error_message": "API请求超时", - "request_dump": str(dump_path) + "request_dump": str(dump_path), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key } + self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text")) + self._debug_log({ + "event": "timeout", + "status_code": None, + "error_text": "timeout", + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key, + "request_dump": str(dump_path) + }) self._mark_request_error(dump_path, error_text="timeout") yield {"error": self.last_error_info} except Exception as e: @@ -654,8 +782,21 @@ class DeepSeekClient: "error_text": str(e), "error_type": "exception", "error_message": str(e), - "request_dump": str(dump_path) + "request_dump": str(dump_path), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key } + self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text")) + self._debug_log({ + "event": "exception", + "status_code": None, + "error_text": str(e), + "base_url": api_config.get("base_url"), + "model_id": api_config.get("model_id"), + "model_key": self.model_key, + "request_dump": str(dump_path) + }) self._mark_request_error(dump_path, error_text=str(e)) yield {"error": self.last_error_info}