feat: add aliyun quota fallback

This commit is contained in:
JOJO 2026-03-06 12:31:20 +08:00
parent 4be61fe76e
commit 868640b479
6 changed files with 864 additions and 369 deletions

View File

@ -1,9 +1,32 @@
import os import os
from pathlib import Path
from typing import Optional from typing import Optional
def _env(name: str, default: str = "") -> str: def _env(name: str, default: str = "") -> str:
return os.environ.get(name, default) return os.environ.get(name, default)
def _env_optional(name: str) -> Optional[str]:
value = os.environ.get(name)
if value is None:
# 回退读取 .env支持运行中更新
env_path = Path(__file__).resolve().parents[1] / ".env"
if env_path.exists():
try:
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, val = line.split("=", 1)
if key.strip() == name:
value = val.strip().strip('"').strip("'")
break
except Exception:
value = None
if value is None:
return None
value = value.strip()
return value or None
# 模型上下文窗口(单位: token # 模型上下文窗口(单位: token
CONTEXT_WINDOWS = { CONTEXT_WINDOWS = {
@ -19,6 +42,8 @@ CONTEXT_WINDOWS = {
# 默认Kimi # 默认Kimi
KIMI_BASE = _env("API_BASE_KIMI", _env("AGENT_API_BASE_URL", "https://api.moonshot.cn/v1")) KIMI_BASE = _env("API_BASE_KIMI", _env("AGENT_API_BASE_URL", "https://api.moonshot.cn/v1"))
KIMI_KEY = _env("API_KEY_KIMI", _env("AGENT_API_KEY", "")) KIMI_KEY = _env("API_KEY_KIMI", _env("AGENT_API_KEY", ""))
KIMI_BASE_OFFICIAL = _env_optional("API_BASE_KIMI_OFFICIAL")
KIMI_KEY_OFFICIAL = _env_optional("API_KEY_KIMI_OFFICIAL")
KIMI_FAST_MODEL = _env("MODEL_KIMI_FAST", _env("AGENT_MODEL_ID", "kimi-k2-0905-preview")) KIMI_FAST_MODEL = _env("MODEL_KIMI_FAST", _env("AGENT_MODEL_ID", "kimi-k2-0905-preview"))
KIMI_THINK_MODEL = _env("MODEL_KIMI_THINK", _env("AGENT_THINKING_MODEL_ID", "kimi-k2-thinking")) KIMI_THINK_MODEL = _env("MODEL_KIMI_THINK", _env("AGENT_THINKING_MODEL_ID", "kimi-k2-thinking"))
KIMI_25_MODEL = _env("MODEL_KIMI_25", "kimi-k2.5") KIMI_25_MODEL = _env("MODEL_KIMI_25", "kimi-k2.5")
@ -32,12 +57,16 @@ DEEPSEEK_THINK_MODEL = _env("MODEL_DEEPSEEK_THINK", "deepseek-reasoner")
# Qwen # Qwen
QWEN_BASE = _env("API_BASE_QWEN", "https://dashscope.aliyuncs.com/compatible-mode/v1") QWEN_BASE = _env("API_BASE_QWEN", "https://dashscope.aliyuncs.com/compatible-mode/v1")
QWEN_KEY = _env("API_KEY_QWEN", _env("DASHSCOPE_API_KEY", "")) QWEN_KEY = _env("API_KEY_QWEN", _env("DASHSCOPE_API_KEY", ""))
QWEN_BASE_OFFICIAL = _env_optional("API_BASE_QWEN_OFFICIAL")
QWEN_KEY_OFFICIAL = _env_optional("API_KEY_QWEN_OFFICIAL")
QWEN_MAX_MODEL = _env("MODEL_QWEN_MAX", "qwen3-max") QWEN_MAX_MODEL = _env("MODEL_QWEN_MAX", "qwen3-max")
QWEN_VL_MODEL = _env("MODEL_QWEN_VL", "qwen3.5-plus") QWEN_VL_MODEL = _env("MODEL_QWEN_VL", "qwen3.5-plus")
# MiniMax # MiniMax
MINIMAX_BASE = _env("API_BASE_MINIMAX", "https://api.minimaxi.com/v1") MINIMAX_BASE = _env("API_BASE_MINIMAX", "https://api.minimaxi.com/v1")
MINIMAX_KEY = _env("API_KEY_MINIMAX", "") MINIMAX_KEY = _env("API_KEY_MINIMAX", "")
MINIMAX_BASE_OFFICIAL = _env_optional("API_BASE_MINIMAX_OFFICIAL")
MINIMAX_KEY_OFFICIAL = _env_optional("API_KEY_MINIMAX_OFFICIAL")
MINIMAX_MODEL = _env("MODEL_MINIMAX", "MiniMax-M2.5") MINIMAX_MODEL = _env("MODEL_MINIMAX", "MiniMax-M2.5")
@ -78,7 +107,7 @@ MODEL_PROFILES = {
"model_id": KIMI_25_MODEL, "model_id": KIMI_25_MODEL,
"max_tokens": None, "max_tokens": None,
"context_window": CONTEXT_WINDOWS["kimi-k2.5"], "context_window": CONTEXT_WINDOWS["kimi-k2.5"],
"extra_params": {"thinking": {"type": "enabled"}} "extra_params": {"thinking": {"type": "enabled"}, "enable_thinking": True}
}, },
"supports_thinking": True, "supports_thinking": True,
"fast_only": False, "fast_only": False,
@ -204,6 +233,45 @@ def get_model_profile(key: str) -> dict:
if key not in MODEL_PROFILES: if key not in MODEL_PROFILES:
raise ValueError(f"未知模型 key: {key}") raise ValueError(f"未知模型 key: {key}")
profile = MODEL_PROFILES[key] profile = MODEL_PROFILES[key]
try:
from utils.aliyun_fallback import is_fallback_active
except Exception:
is_fallback_active = None
if is_fallback_active and is_fallback_active(key):
if key == "kimi-k2.5":
kimi_base_official = _env_optional("API_BASE_KIMI_OFFICIAL") or KIMI_BASE_OFFICIAL
kimi_key_official = _env_optional("API_KEY_KIMI_OFFICIAL") or KIMI_KEY_OFFICIAL
if kimi_base_official and kimi_key_official:
profile = dict(profile)
fast = dict(profile.get("fast") or {})
thinking = dict(profile.get("thinking") or fast)
fast.update({"base_url": kimi_base_official, "api_key": kimi_key_official})
thinking.update({"base_url": kimi_base_official, "api_key": kimi_key_official})
profile["fast"] = fast
profile["thinking"] = thinking
elif key == "qwen3-vl-plus":
qwen_base_official = _env_optional("API_BASE_QWEN_OFFICIAL") or QWEN_BASE_OFFICIAL
qwen_key_official = _env_optional("API_KEY_QWEN_OFFICIAL") or QWEN_KEY_OFFICIAL
if qwen_base_official and qwen_key_official:
profile = dict(profile)
fast = dict(profile.get("fast") or {})
thinking = dict(profile.get("thinking") or fast)
fast.update({"base_url": qwen_base_official, "api_key": qwen_key_official})
thinking.update({"base_url": qwen_base_official, "api_key": qwen_key_official})
profile["fast"] = fast
profile["thinking"] = thinking
elif key == "minimax-m2.5":
minimax_base_official = _env_optional("API_BASE_MINIMAX_OFFICIAL") or MINIMAX_BASE_OFFICIAL
minimax_key_official = _env_optional("API_KEY_MINIMAX_OFFICIAL") or MINIMAX_KEY_OFFICIAL
if minimax_base_official and minimax_key_official:
profile = dict(profile)
fast = dict(profile.get("fast") or {})
thinking = dict(profile.get("thinking") or fast)
fast.update({"base_url": minimax_base_official, "api_key": minimax_key_official})
thinking.update({"base_url": minimax_base_official, "api_key": minimax_key_official})
profile["fast"] = fast
profile["thinking"] = thinking
# 基础校验:必须有 fast 段且有 key # 基础校验:必须有 fast 段且有 key
fast = profile.get("fast") or {} fast = profile.get("fast") or {}
if not fast.get("api_key"): if not fast.get("api_key"):

View File

@ -0,0 +1,40 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
HOST = "0.0.0.0"
PORT = 8899
ERROR_MESSAGE = "hour allocated quota exceeded"
class Handler(BaseHTTPRequestHandler):
def _send(self, code: int, payload: dict):
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self):
# Consume request body to avoid broken pipe on clients
try:
length = int(self.headers.get("Content-Length", "0"))
except ValueError:
length = 0
if length:
_ = self.rfile.read(length)
payload = {
"error": {
"message": ERROR_MESSAGE,
"type": "quota_exceeded"
}
}
self._send(429, payload)
def log_message(self, format, *args):
return
if __name__ == "__main__":
server = HTTPServer((HOST, PORT), Handler)
print(f"mock aliyun quota server running on http://{HOST}:{PORT}")
server.serve_forever()

View File

@ -505,6 +505,11 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
context = web_terminal.build_context() context = web_terminal.build_context()
messages = web_terminal.build_messages(context, message) messages = web_terminal.build_messages(context, message)
tools = web_terminal.define_tools() tools = web_terminal.define_tools()
try:
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"更新模型配置失败: {exc}")
# === 上下文预算与安全校验(避免超出模型上下文) === # === 上下文预算与安全校验(避免超出模型上下文) ===
max_context_tokens = get_model_context_window(getattr(web_terminal, "model_key", None) or "kimi-k2.5") max_context_tokens = get_model_context_window(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
@ -559,6 +564,8 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
# 设置最大迭代次数API 可覆盖) # 设置最大迭代次数API 可覆盖)
max_iterations = getattr(web_terminal, "max_iterations_override", None) or MAX_ITERATIONS_PER_TASK max_iterations = getattr(web_terminal, "max_iterations_override", None) or MAX_ITERATIONS_PER_TASK
max_api_retries = 4
retry_delay_seconds = 10
pending_append = None # {"path": str, "tool_call_id": str, "buffer": str, ...} pending_append = None # {"path": str, "tool_call_id": str, "buffer": str, ...}
append_probe_buffer = "" append_probe_buffer = ""
@ -1199,6 +1206,25 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
}) })
maybe_mark_failure_from_message(web_terminal, message) maybe_mark_failure_from_message(web_terminal, message)
async def _wait_retry_delay(delay_seconds: int) -> bool:
"""等待重试间隔,同时检查是否收到停止请求。"""
if delay_seconds <= 0:
return False
deadline = time.time() + delay_seconds
while time.time() < deadline:
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
clear_stop_flag(client_sid, username)
return True
await asyncio.sleep(0.2)
return False
for iteration in range(max_iterations): for iteration in range(max_iterations):
total_iterations += 1 total_iterations += 1
debug_log(f"\n--- 迭代 {iteration + 1}/{max_iterations} 开始 ---") debug_log(f"\n--- 迭代 {iteration + 1}/{max_iterations} 开始 ---")
@ -1293,308 +1319,393 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
print(f"[API] 第{iteration + 1}次调用 (总工具调用: {total_tool_calls}/{MAX_TOTAL_TOOL_CALLS})") print(f"[API] 第{iteration + 1}次调用 (总工具调用: {total_tool_calls}/{MAX_TOTAL_TOOL_CALLS})")
# 收集流式响应 api_error = None
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): for api_attempt in range(max_api_retries + 1):
chunk_count += 1 api_error = None
if api_attempt > 0:
full_response = ""
tool_calls = []
current_thinking = ""
detected_tools = {}
last_usage_payload = None
in_thinking = False
thinking_started = False
thinking_ended = False
text_started = False
text_has_content = False
text_streaming = False
text_chunk_index = 0
last_text_chunk_time = None
chunk_count = 0
reasoning_chunks = 0
content_chunks = 0
tool_chunks = 0
append_break_triggered = False
append_result = {"handled": False}
modify_break_triggered = False
modify_result = {"handled": False}
last_finish_reason = None
# 检查停止标志 # 收集流式响应
client_stop_info = get_stop_flag(client_sid, username) async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
if client_stop_info: chunk_count += 1
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log(f"检测到停止请求,中断流处理")
if pending_append:
append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop")
if pending_modify:
modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop")
_cancel_pending_tools(tool_calls)
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
clear_stop_flag(client_sid, username)
return
# 先尝试记录 usage有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) # 检查停止标志
usage_info = chunk.get("usage") client_stop_info = get_stop_flag(client_sid, username)
if usage_info: if client_stop_info:
last_usage_payload = usage_info stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log(f"检测到停止请求,中断流处理")
if pending_append:
append_result = await finalize_pending_append(full_response, False, finish_reason="user_stop")
if pending_modify:
modify_result = await finalize_pending_modify(full_response, False, finish_reason="user_stop")
_cancel_pending_tools(tool_calls)
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
clear_stop_flag(client_sid, username)
return
if "choices" not in chunk: if isinstance(chunk, dict) and chunk.get("error"):
debug_log(f"Chunk {chunk_count}: 无choices字段") api_error = chunk.get("error")
continue break
if not chunk.get("choices"):
debug_log(f"Chunk {chunk_count}: choices为空列表")
continue
choice = chunk["choices"][0]
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
last_usage_payload = choice.get("usage")
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
if finish_reason:
last_finish_reason = finish_reason
# 处理思考内容(兼容 reasoning_content / reasoning_details # 先尝试记录 usage有些平台会在最后一个 chunk 里携带 usage 但 choices 为空)
reasoning_content = "" usage_info = chunk.get("usage")
if "reasoning_content" in delta: if usage_info:
reasoning_content = delta.get("reasoning_content") or "" last_usage_payload = usage_info
elif "reasoning_details" in delta:
details = delta.get("reasoning_details")
if isinstance(details, list):
parts = []
for item in details:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(text)
if parts:
reasoning_content = "".join(parts)
if reasoning_content:
reasoning_chunks += 1
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
if not thinking_started: if "choices" not in chunk:
in_thinking = True debug_log(f"Chunk {chunk_count}: 无choices字段")
thinking_started = True continue
sender('thinking_start', {}) if not chunk.get("choices"):
await asyncio.sleep(0.05) debug_log(f"Chunk {chunk_count}: choices为空列表")
continue
choice = chunk["choices"][0]
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
last_usage_payload = choice.get("usage")
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
if finish_reason:
last_finish_reason = finish_reason
current_thinking += reasoning_content # 处理思考内容(兼容 reasoning_content / reasoning_details
sender('thinking_chunk', {'content': reasoning_content}) reasoning_content = ""
if "reasoning_content" in delta:
reasoning_content = delta.get("reasoning_content") or ""
elif "reasoning_details" in delta:
details = delta.get("reasoning_details")
if isinstance(details, list):
parts = []
for item in details:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(text)
if parts:
reasoning_content = "".join(parts)
if reasoning_content:
reasoning_chunks += 1
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
# 处理正常内容 if not thinking_started:
if "content" in delta: in_thinking = True
content = delta["content"] thinking_started = True
if content: sender('thinking_start', {})
content_chunks += 1 await asyncio.sleep(0.05)
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
if in_thinking and not thinking_ended: current_thinking += reasoning_content
in_thinking = False sender('thinking_chunk', {'content': reasoning_content})
thinking_ended = True
sender('thinking_end', {'full_content': current_thinking}) # 处理正常内容
await asyncio.sleep(0.1) if "content" in delta:
content = delta["content"]
if content:
content_chunks += 1
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
if in_thinking and not thinking_ended:
in_thinking = False
thinking_ended = True
sender('thinking_end', {'full_content': current_thinking})
await asyncio.sleep(0.1)
expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None))
expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None))
if pending_modify: if pending_modify:
if not pending_modify.get("start_seen"): if not pending_modify.get("start_seen"):
probe_buffer = pending_modify.get("probe_buffer", "") + content probe_buffer = pending_modify.get("probe_buffer", "") + content
if len(probe_buffer) > 10000: if len(probe_buffer) > 10000:
probe_buffer = probe_buffer[-10000:] probe_buffer = probe_buffer[-10000:]
marker = pending_modify.get("start_marker") marker = pending_modify.get("start_marker")
marker_index = probe_buffer.find(marker) marker_index = probe_buffer.find(marker)
if marker_index == -1: if marker_index == -1:
pending_modify["probe_buffer"] = probe_buffer pending_modify["probe_buffer"] = probe_buffer
continue
after_marker = marker_index + len(marker)
remainder = probe_buffer[after_marker:]
pending_modify["buffer"] = remainder
pending_modify["raw_buffer"] = marker + remainder
pending_modify["start_seen"] = True
pending_modify["detected_blocks"] = set()
pending_modify["probe_buffer"] = ""
if pending_modify.get("display_id"):
sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在修改 {pending_modify['path']}..."
})
else:
pending_modify["buffer"] += content
pending_modify["raw_buffer"] += content
if pending_modify.get("start_seen"):
block_text = pending_modify["buffer"]
for match in re.finditer(r"\[replace:(\d+)\]", block_text):
try:
block_index = int(match.group(1))
except ValueError:
continue continue
detected_blocks = pending_modify.setdefault("detected_blocks", set()) after_marker = marker_index + len(marker)
if block_index not in detected_blocks: remainder = probe_buffer[after_marker:]
detected_blocks.add(block_index) pending_modify["buffer"] = remainder
if pending_modify.get("display_id"): pending_modify["raw_buffer"] = marker + remainder
sender('update_action', { pending_modify["start_seen"] = True
'id': pending_modify["display_id"], pending_modify["detected_blocks"] = set()
'status': 'running', pending_modify["probe_buffer"] = ""
'preparing_id': pending_modify.get("tool_call_id"), if pending_modify.get("display_id"):
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." sender('update_action', {
}) 'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在修改 {pending_modify['path']}..."
})
else:
pending_modify["buffer"] += content
pending_modify["raw_buffer"] += content
if pending_modify.get("start_seen"): if pending_modify.get("start_seen"):
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) block_text = pending_modify["buffer"]
if end_pos != -1: for match in re.finditer(r"\[replace:(\d+)\]", block_text):
pending_modify["end_index"] = end_pos try:
modify_break_triggered = True block_index = int(match.group(1))
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改") except ValueError:
break continue
continue detected_blocks = pending_modify.setdefault("detected_blocks", set())
elif expecting_modify: if block_index not in detected_blocks:
modify_probe_buffer += content detected_blocks.add(block_index)
if len(modify_probe_buffer) > 10000: if pending_modify.get("display_id"):
modify_probe_buffer = modify_probe_buffer[-10000:] sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..."
})
marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer) if pending_modify.get("start_seen"):
if marker_match: end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
detected_raw_path = marker_match.group(1) if end_pos != -1:
detected_path = detected_raw_path.strip() pending_modify["end_index"] = end_pos
marker_full = marker_match.group(0) modify_break_triggered = True
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
remainder = modify_probe_buffer[after_marker_index:] break
modify_probe_buffer = "" continue
elif expecting_modify:
modify_probe_buffer += content
if len(modify_probe_buffer) > 10000:
modify_probe_buffer = modify_probe_buffer[-10000:]
if not detected_path: marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer)
debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") if marker_match:
continue detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
marker_full = marker_match.group(0)
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full)
remainder = modify_probe_buffer[after_marker_index:]
modify_probe_buffer = ""
pending_modify = { if not detected_path:
"path": detected_path, debug_log("检测到 MODIFY 起始标记但路径为空,忽略。")
"tool_call_id": None, continue
"buffer": remainder,
"raw_buffer": marker_full + remainder,
"start_marker": marker_full,
"end_marker": "<<<END_MODIFY>>>",
"start_seen": True,
"end_index": None,
"display_id": None,
"detected_blocks": set()
}
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = {"path": detected_path}
debug_log(f"直接检测到modify起始标记构建修改缓冲: {detected_path}")
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) pending_modify = {
if end_pos != -1: "path": detected_path,
pending_modify["end_index"] = end_pos "tool_call_id": None,
modify_break_triggered = True "buffer": remainder,
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改") "raw_buffer": marker_full + remainder,
break "start_marker": marker_full,
continue "end_marker": "<<<END_MODIFY>>>",
"start_seen": True,
"end_index": None,
"display_id": None,
"detected_blocks": set()
}
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = {"path": detected_path}
debug_log(f"直接检测到modify起始标记构建修改缓冲: {detected_path}")
if pending_append: end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
pending_append["buffer"] += content if end_pos != -1:
pending_modify["end_index"] = end_pos
modify_break_triggered = True
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
break
continue
if pending_append.get("content_start") is None: if pending_append:
marker_index = pending_append["buffer"].find(pending_append["start_marker"]) pending_append["buffer"] += content
if marker_index != -1:
pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
if pending_append.get("content_start") is not None: if pending_append.get("content_start") is None:
end_index = pending_append["buffer"].find( marker_index = pending_append["buffer"].find(pending_append["start_marker"])
pending_append["end_marker"], if marker_index != -1:
pending_append["content_start"] pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
) debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
# 继续累积追加内容 if pending_append.get("content_start") is not None:
continue end_index = pending_append["buffer"].find(
elif expecting_append: pending_append["end_marker"],
append_probe_buffer += content pending_append["content_start"]
# 限制缓冲区大小防止过长 )
if len(append_probe_buffer) > 10000:
append_probe_buffer = append_probe_buffer[-10000:]
marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
if marker_match:
detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
if not detected_path:
append_probe_buffer = append_probe_buffer[marker_match.end():]
continue
marker_full = marker_match.group(0)
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
remainder = append_probe_buffer[after_marker_index:]
append_probe_buffer = ""
pending_append = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"start_marker": marker_full,
"end_marker": "<<<END_APPEND>>>",
"content_start": 0,
"end_index": None,
"display_id": None
}
if hasattr(web_terminal, "pending_append_request"):
web_terminal.pending_append_request = {"path": detected_path}
debug_log(f"直接检测到append起始标记构建追加缓冲: {detected_path}")
# 检查是否立即包含结束标记
if pending_append["buffer"]:
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
if end_index != -1: if end_index != -1:
pending_append["end_index"] = end_index pending_append["end_index"] = end_index
append_break_triggered = True append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件") debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break break
# 继续累积追加内容
continue continue
elif expecting_append:
append_probe_buffer += content
# 限制缓冲区大小防止过长
if len(append_probe_buffer) > 10000:
append_probe_buffer = append_probe_buffer[-10000:]
if not text_started: marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
text_started = True if marker_match:
text_streaming = True detected_raw_path = marker_match.group(1)
sender('text_start', {}) detected_path = detected_raw_path.strip()
brief_log("模型输出了内容") if not detected_path:
await asyncio.sleep(0.05) append_probe_buffer = append_probe_buffer[marker_match.end():]
continue
marker_full = marker_match.group(0)
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
remainder = append_probe_buffer[after_marker_index:]
append_probe_buffer = ""
pending_append = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"start_marker": marker_full,
"end_marker": "<<<END_APPEND>>>",
"content_start": 0,
"end_index": None,
"display_id": None
}
if hasattr(web_terminal, "pending_append_request"):
web_terminal.pending_append_request = {"path": detected_path}
debug_log(f"直接检测到append起始标记构建追加缓冲: {detected_path}")
# 检查是否立即包含结束标记
if pending_append["buffer"]:
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
continue
if not pending_append: if not text_started:
full_response += content text_started = True
accumulated_response += content text_streaming = True
text_has_content = True sender('text_start', {})
emit_time = time.time() brief_log("模型输出了内容")
elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time await asyncio.sleep(0.05)
last_text_chunk_time = emit_time
text_chunk_index += 1
log_backend_chunk(
conversation_id,
iteration + 1,
text_chunk_index,
elapsed,
len(content),
content[:32]
)
sender('text_chunk', {
'content': content,
'index': text_chunk_index,
'elapsed': elapsed
})
# 收集工具调用 - 实时发送准备状态 if not pending_append:
if "tool_calls" in delta: full_response += content
tool_chunks += 1 accumulated_response += content
for tc in delta["tool_calls"]: text_has_content = True
found = False emit_time = time.time()
for existing in tool_calls: elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time
if existing.get("index") == tc.get("index"): last_text_chunk_time = emit_time
if "function" in tc and "arguments" in tc["function"]: text_chunk_index += 1
arg_chunk = tc["function"]["arguments"] log_backend_chunk(
existing_fn = existing.get("function", {}) conversation_id,
existing_args = existing_fn.get("arguments", "") iteration + 1,
existing_fn["arguments"] = (existing_args or "") + arg_chunk text_chunk_index,
existing["function"] = existing_fn elapsed,
len(content),
content[:32]
)
sender('text_chunk', {
'content': content,
'index': text_chunk_index,
'elapsed': elapsed
})
combined_args = existing_fn.get("arguments", "") # 收集工具调用 - 实时发送准备状态
tool_id = existing.get("id") or tc.get("id") if "tool_calls" in delta:
tool_name = ( tool_chunks += 1
existing_fn.get("name") for tc in delta["tool_calls"]:
or tc.get("function", {}).get("name", "") found = False
) for existing in tool_calls:
intent_value = extract_intent_from_partial(combined_args) if existing.get("index") == tc.get("index"):
if ( if "function" in tc and "arguments" in tc["function"]:
intent_value arg_chunk = tc["function"]["arguments"]
and tool_id existing_fn = existing.get("function", {})
and detected_tool_intent.get(tool_id) != intent_value existing_args = existing_fn.get("arguments", "")
): existing_fn["arguments"] = (existing_args or "") + arg_chunk
existing["function"] = existing_fn
combined_args = existing_fn.get("arguments", "")
tool_id = existing.get("id") or tc.get("id")
tool_name = (
existing_fn.get("name")
or tc.get("function", {}).get("name", "")
)
intent_value = extract_intent_from_partial(combined_args)
if (
intent_value
and tool_id
and detected_tool_intent.get(tool_id) != intent_value
):
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
sender('tool_intent', {
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
found = True
break
if not found and tc.get("id"):
tool_id = tc["id"]
tool_name = tc.get("function", {}).get("name", "")
arguments_str = tc.get("function", {}).get("arguments", "") or ""
# 新工具检测到,立即发送准备事件
if tool_id not in detected_tools and tool_name:
detected_tools[tool_id] = tool_name
# 尝试提前提取 intent
intent_value = None
if arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value:
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
# 立即发送工具准备中事件
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}")
sender('tool_preparing', {
'id': tool_id,
'name': tool_name,
'message': f'准备调用 {tool_name}...',
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具准备事件: {tool_name}")
await asyncio.sleep(0.1)
tool_calls.append({
"id": tool_id,
"index": tc.get("index"),
"type": "function",
"function": {
"name": tool_name,
"arguments": arguments_str
}
})
# 尝试从增量参数中抽取 intent并单独推送
if tool_id and arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value and detected_tool_intent.get(tool_id) != intent_value:
detected_tool_intent[tool_id] = intent_value detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
sender('tool_intent', { sender('tool_intent', {
'id': tool_id, 'id': tool_id,
'name': tool_name, 'name': tool_name,
@ -1603,89 +1714,87 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
}) })
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
found = True debug_log(f" 新工具: {tool_name}")
break
if not found and tc.get("id"): # 检查是否被停止
tool_id = tc["id"] client_stop_info = get_stop_flag(client_sid, username)
tool_name = tc.get("function", {}).get("name", "") if client_stop_info:
arguments_str = tc.get("function", {}).get("arguments", "") or "" stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("任务在流处理完成后检测到停止状态")
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
_cancel_pending_tools(tool_calls)
clear_stop_flag(client_sid, username)
return
# 新工具检测到,立即发送准备事件 # === API响应完成后只计算输出token ===
if tool_id not in detected_tools and tool_name: if last_usage_payload:
detected_tools[tool_id] = tool_name try:
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
debug_log(
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
f"total={last_usage_payload.get('total_tokens', 0)}"
)
except Exception as e:
debug_log(f"Usage统计更新失败: {e}")
else:
debug_log("未获取到usage字段跳过token统计更新")
# 尝试提前提取 intent
intent_value = None
if arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value:
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
# 立即发送工具准备中事件 if api_error:
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") error_message = ""
sender('tool_preparing', { error_status = None
'id': tool_id, error_type = None
'name': tool_name, if isinstance(api_error, dict):
'message': f'准备调用 {tool_name}...', error_status = api_error.get("status_code")
'intent': intent_value, error_type = api_error.get("error_type")
'conversation_id': conversation_id error_message = api_error.get("error_message") or api_error.get("error_text") or ""
}) if not error_message:
debug_log(f" 发送工具准备事件: {tool_name}") error_message = "API 请求失败"
await asyncio.sleep(0.1) # 若命中阿里云配额错误,立即写入状态并切换到官方 API
try:
tool_calls.append({ from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
"id": tool_id, disabled_until, reason = compute_disabled_until(error_message)
"index": tc.get("index"), if disabled_until and reason:
"type": "function", set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason)
"function": { profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
"name": tool_name, web_terminal.apply_model_profile(profile)
"arguments": arguments_str except Exception as exc:
} debug_log(f"处理阿里云配额回退失败: {exc}")
}) can_retry = (
# 尝试从增量参数中抽取 intent并单独推送 api_attempt < max_api_retries
if tool_id and arguments_str: and not full_response
intent_value = extract_intent_from_partial(arguments_str) and not tool_calls
if intent_value and detected_tool_intent.get(tool_id) != intent_value: and not current_thinking
detected_tool_intent[tool_id] = intent_value and not pending_append
sender('tool_intent', { and not pending_modify
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
debug_log(f" 新工具: {tool_name}")
# 检查是否被停止
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("任务在流处理完成后检测到停止状态")
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
_cancel_pending_tools(tool_calls)
clear_stop_flag(client_sid, username)
return
# === API响应完成后只计算输出token ===
if last_usage_payload:
try:
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
debug_log(
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
f"total={last_usage_payload.get('total_tokens', 0)}"
) )
except Exception as e: sender('error', {
debug_log(f"Usage统计更新失败: {e}") 'message': error_message,
else: 'status_code': error_status,
debug_log("未获取到usage字段跳过token统计更新") 'error_type': error_type,
'retry': bool(can_retry),
'retry_in': retry_delay_seconds if can_retry else None,
'attempt': api_attempt + 1,
'max_attempts': max_api_retries + 1
})
if can_retry:
try:
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"重试前更新模型配置失败: {exc}")
cancelled = await _wait_retry_delay(retry_delay_seconds)
if cancelled:
return
continue
_cancel_pending_tools(tool_calls)
return
break
# 流结束后的处理 # 流结束后的处理
debug_log(f"\n流结束统计:") debug_log(f"\n流结束统计:")

View File

@ -1042,6 +1042,10 @@ export async function initializeLegacySocket(ctx: any) {
if (!msg) { if (!msg) {
return; return;
} }
if (msg.awaitingFirstContent) {
msg.awaitingFirstContent = false;
msg.generatingLabel = '';
}
const action = { const action = {
id: data.id, id: data.id,
type: 'tool', type: 'tool',
@ -1405,7 +1409,10 @@ export async function initializeLegacySocket(ctx: any) {
const msg = data?.message || '发生未知错误'; const msg = data?.message || '发生未知错误';
const code = data?.status_code; const code = data?.status_code;
const errType = data?.error_type; const errType = data?.error_type;
ctx.addSystemMessage(`错误: ${msg}`); const shouldRetry = Boolean(data?.retry);
const retryIn = Number(data?.retry_in) || 5;
const retryAttempt = Number(data?.attempt) || 1;
const retryMax = Number(data?.max_attempts) || retryAttempt;
if (typeof ctx.uiPushToast === 'function') { if (typeof ctx.uiPushToast === 'function') {
ctx.uiPushToast({ ctx.uiPushToast({
title: code ? `API错误 ${code}` : 'API错误', title: code ? `API错误 ${code}` : 'API错误',
@ -1413,8 +1420,35 @@ export async function initializeLegacySocket(ctx: any) {
type: 'error', type: 'error',
duration: 6000 duration: 6000
}); });
if (shouldRetry) {
ctx.uiPushToast({
title: '即将重试',
message: `将在 ${retryIn} 秒后重试(第 ${retryAttempt}/${retryMax} 次)`,
type: 'info',
duration: Math.max(retryIn, 1) * 1000
});
}
}
if (shouldRetry) {
// 错误后保持停止按钮态,用户可手动停止或等待自动重试
ctx.stopRequested = false;
ctx.taskInProgress = true;
ctx.streamingMessage = true;
return;
}
// 最后一次报错:恢复输入状态并清理提示动画
const msgIndex = typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : -1;
if (msgIndex >= 0 && Array.isArray(ctx.messages)) {
const currentMessage = ctx.messages[msgIndex];
if (currentMessage && currentMessage.role === 'assistant') {
currentMessage.awaitingFirstContent = false;
currentMessage.generatingLabel = '';
}
}
if (typeof ctx.chatClearThinkingLocks === 'function') {
ctx.chatClearThinkingLocks();
} }
// 仅标记当前流结束,避免状态错乱
ctx.streamingMessage = false; ctx.streamingMessage = false;
ctx.stopRequested = false; ctx.stopRequested = false;
ctx.taskInProgress = false; ctx.taskInProgress = false;

103
utils/aliyun_fallback.py Normal file
View File

@ -0,0 +1,103 @@
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple
FALLBACK_MODELS = {"qwen3-vl-plus", "kimi-k2.5", "minimax-m2.5"}
STATE_PATH = Path(__file__).resolve().parents[1] / "data" / "aliyun_fallback_state.json"
def _read_state() -> Dict:
if not STATE_PATH.exists():
return {"models": {}}
try:
data = json.loads(STATE_PATH.read_text(encoding="utf-8"))
except Exception:
return {"models": {}}
if not isinstance(data, dict):
return {"models": {}}
if "models" not in data or not isinstance(data["models"], dict):
data["models"] = {}
return data
def _write_state(data: Dict) -> None:
STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
STATE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def get_disabled_until(model_key: str) -> Optional[float]:
data = _read_state()
entry = (data.get("models") or {}).get(model_key) or {}
ts = entry.get("disabled_until")
try:
return float(ts) if ts is not None else None
except (TypeError, ValueError):
return None
def is_fallback_active(model_key: str, now_ts: Optional[float] = None) -> bool:
if model_key not in FALLBACK_MODELS:
return False
now_ts = float(now_ts) if now_ts is not None else datetime.now(tz=timezone.utc).timestamp()
disabled_until = get_disabled_until(model_key)
return bool(disabled_until and disabled_until > now_ts)
def set_disabled_until(model_key: str, disabled_until_ts: float, reason: str = "") -> None:
if model_key not in FALLBACK_MODELS:
return
data = _read_state()
models = data.setdefault("models", {})
models[model_key] = {
"disabled_until": float(disabled_until_ts),
"reason": reason,
"updated_at": datetime.now(tz=timezone.utc).timestamp(),
}
_write_state(data)
def _next_monday_utc8(now: datetime) -> datetime:
# Monday = 0
weekday = now.weekday()
days_ahead = (7 - weekday) % 7
if days_ahead == 0:
days_ahead = 7
target = (now + timedelta(days=days_ahead)).replace(hour=0, minute=0, second=0, microsecond=0)
return target
def _next_month_same_day_utc8(now: datetime) -> datetime:
year = now.year
month = now.month + 1
if month > 12:
month = 1
year += 1
# clamp day to last day of next month
if month == 12:
next_month = datetime(year + 1, 1, 1, tzinfo=now.tzinfo)
else:
next_month = datetime(year, month + 1, 1, tzinfo=now.tzinfo)
last_day = (next_month - timedelta(days=1)).day
day = min(now.day, last_day)
return datetime(year, month, day, 0, 0, 0, tzinfo=now.tzinfo)
def compute_disabled_until(error_text: str) -> Tuple[Optional[float], Optional[str]]:
if not error_text:
return None, None
text = str(error_text).lower()
tz8 = timezone(timedelta(hours=8))
now = datetime.now(tz=tz8)
if "hour allocated quota exceeded" in text or "每 5 小时请求额度已用完" in text:
until = now + timedelta(hours=5)
return until.astimezone(timezone.utc).timestamp(), "hour_quota"
if "week allocated quota exceeded" in text or "每周请求额度已用完" in text:
until = _next_monday_utc8(now)
return until.astimezone(timezone.utc).timestamp(), "week_quota"
if "month allocated quota exceeded" in text or "每月请求额度已用完" in text:
until = _next_month_same_day_utc8(now)
return until.astimezone(timezone.utc).timestamp(), "month_quota"
return None, None

View File

@ -6,9 +6,12 @@ import json
import asyncio import asyncio
import base64 import base64
import mimetypes import mimetypes
import os
from typing import List, Dict, Optional, AsyncGenerator, Any from typing import List, Dict, Optional, AsyncGenerator, Any
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Tuple
try: try:
from config import ( from config import (
API_BASE_URL, API_BASE_URL,
@ -78,6 +81,73 @@ class DeepSeekClient:
# 请求体落盘目录 # 请求体落盘目录
self.request_dump_dir = Path(__file__).resolve().parents[1] / "logs" / "api_requests" self.request_dump_dir = Path(__file__).resolve().parents[1] / "logs" / "api_requests"
self.request_dump_dir.mkdir(parents=True, exist_ok=True) self.request_dump_dir.mkdir(parents=True, exist_ok=True)
self.debug_log_path = Path(__file__).resolve().parents[1] / "logs" / "api_debug.log"
def _maybe_mark_aliyun_quota(self, error_text: str) -> None:
if not error_text or not self.model_key:
return
try:
from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
except Exception:
return
disabled_until, reason = compute_disabled_until(error_text)
if disabled_until and reason:
set_disabled_until(self.model_key, disabled_until, reason)
# 立即切换到官方 API仅在有配置时
base_env_key = None
key_env_key = None
if self.model_key == "kimi-k2.5":
base_env_key = "API_BASE_KIMI_OFFICIAL"
key_env_key = "API_KEY_KIMI_OFFICIAL"
elif self.model_key == "qwen3-vl-plus":
base_env_key = "API_BASE_QWEN_OFFICIAL"
key_env_key = "API_KEY_QWEN_OFFICIAL"
elif self.model_key == "minimax-m2.5":
base_env_key = "API_BASE_MINIMAX_OFFICIAL"
key_env_key = "API_KEY_MINIMAX_OFFICIAL"
if base_env_key and key_env_key:
official_base = self._resolve_env_value(base_env_key)
official_key = self._resolve_env_value(key_env_key)
if official_base and official_key:
self.fast_api_config["base_url"] = official_base
self.fast_api_config["api_key"] = official_key
self.thinking_api_config["base_url"] = official_base
self.thinking_api_config["api_key"] = official_key
self.api_base_url = official_base
self.api_key = official_key
def _debug_log(self, payload: Dict[str, Any]) -> None:
try:
entry = {
"ts": datetime.now().isoformat(),
**payload
}
self.debug_log_path.parent.mkdir(parents=True, exist_ok=True)
with self.debug_log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception:
pass
def _resolve_env_value(self, name: str) -> Optional[str]:
value = os.environ.get(name)
if value is None:
env_path = Path(__file__).resolve().parents[1] / ".env"
if env_path.exists():
try:
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, val = line.split("=", 1)
if key.strip() == name:
value = val.strip().strip('"').strip("'")
break
except Exception:
value = None
if value is None:
return None
value = value.strip()
return value or None
def _print(self, message: str, end: str = "\n", flush: bool = False): def _print(self, message: str, end: str = "\n", flush: bool = False):
"""安全的打印函数在Web模式下不输出""" """安全的打印函数在Web模式下不输出"""
@ -568,7 +638,10 @@ class DeepSeekClient:
"error_text": error_text, "error_text": error_text,
"error_type": None, "error_type": None,
"error_message": None, "error_message": None,
"request_dump": str(dump_path) "request_dump": str(dump_path),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key
} }
try: try:
parsed = json.loads(error_text) parsed = json.loads(error_text)
@ -578,7 +651,20 @@ class DeepSeekClient:
self.last_error_info["error_message"] = err.get("message") self.last_error_info["error_message"] = err.get("message")
except Exception: except Exception:
pass pass
self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") self._maybe_mark_aliyun_quota(error_text)
self._debug_log({
"event": "http_error_stream",
"status_code": response.status_code,
"error_text": error_text,
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key,
"request_dump": str(dump_path)
})
self._print(
f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text} "
f"(base_url={api_config.get('base_url')}, model_id={api_config.get('model_id')})"
)
self._mark_request_error(dump_path, response.status_code, error_text) self._mark_request_error(dump_path, response.status_code, error_text)
yield {"error": self.last_error_info} yield {"error": self.last_error_info}
return return
@ -607,7 +693,10 @@ class DeepSeekClient:
"error_text": error_text, "error_text": error_text,
"error_type": None, "error_type": None,
"error_message": None, "error_message": None,
"request_dump": str(dump_path) "request_dump": str(dump_path),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key
} }
try: try:
parsed = response.json() parsed = response.json()
@ -617,7 +706,20 @@ class DeepSeekClient:
self.last_error_info["error_message"] = err.get("message") self.last_error_info["error_message"] = err.get("message")
except Exception: except Exception:
pass pass
self._print(f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text}") self._maybe_mark_aliyun_quota(error_text)
self._debug_log({
"event": "http_error",
"status_code": response.status_code,
"error_text": error_text,
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key,
"request_dump": str(dump_path)
})
self._print(
f"{OUTPUT_FORMATS['error']} API请求失败 ({response.status_code}): {error_text} "
f"(base_url={api_config.get('base_url')}, model_id={api_config.get('model_id')})"
)
self._mark_request_error(dump_path, response.status_code, error_text) self._mark_request_error(dump_path, response.status_code, error_text)
yield {"error": self.last_error_info} yield {"error": self.last_error_info}
return return
@ -632,8 +734,21 @@ class DeepSeekClient:
"error_text": "connect_error", "error_text": "connect_error",
"error_type": "connection_error", "error_type": "connection_error",
"error_message": "无法连接到API服务器", "error_message": "无法连接到API服务器",
"request_dump": str(dump_path) "request_dump": str(dump_path),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key
} }
self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text"))
self._debug_log({
"event": "connect_error",
"status_code": None,
"error_text": "connect_error",
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key,
"request_dump": str(dump_path)
})
self._mark_request_error(dump_path, error_text="connect_error") self._mark_request_error(dump_path, error_text="connect_error")
yield {"error": self.last_error_info} yield {"error": self.last_error_info}
except httpx.TimeoutException: except httpx.TimeoutException:
@ -643,8 +758,21 @@ class DeepSeekClient:
"error_text": "timeout", "error_text": "timeout",
"error_type": "timeout", "error_type": "timeout",
"error_message": "API请求超时", "error_message": "API请求超时",
"request_dump": str(dump_path) "request_dump": str(dump_path),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key
} }
self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text"))
self._debug_log({
"event": "timeout",
"status_code": None,
"error_text": "timeout",
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key,
"request_dump": str(dump_path)
})
self._mark_request_error(dump_path, error_text="timeout") self._mark_request_error(dump_path, error_text="timeout")
yield {"error": self.last_error_info} yield {"error": self.last_error_info}
except Exception as e: except Exception as e:
@ -654,8 +782,21 @@ class DeepSeekClient:
"error_text": str(e), "error_text": str(e),
"error_type": "exception", "error_type": "exception",
"error_message": str(e), "error_message": str(e),
"request_dump": str(dump_path) "request_dump": str(dump_path),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key
} }
self._maybe_mark_aliyun_quota(self.last_error_info.get("error_text"))
self._debug_log({
"event": "exception",
"status_code": None,
"error_text": str(e),
"base_url": api_config.get("base_url"),
"model_id": api_config.get("model_id"),
"model_key": self.model_key,
"request_dump": str(dump_path)
})
self._mark_request_error(dump_path, error_text=str(e)) self._mark_request_error(dump_path, error_text=str(e))
yield {"error": self.last_error_info} yield {"error": self.last_error_info}