refactor: split chat flow streaming loop
This commit is contained in:
parent
7f6a8d8511
commit
5e768a9e41
661
server/chat_flow_stream_loop.py
Normal file
661
server/chat_flow_stream_loop.py
Normal file
@ -0,0 +1,661 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
from config.model_profiles import get_model_profile
|
||||||
|
|
||||||
|
from .utils_common import debug_log, brief_log, log_backend_chunk
|
||||||
|
from .chat_flow_runner_helpers import extract_intent_from_partial
|
||||||
|
from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify
|
||||||
|
from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools
|
||||||
|
from .state import get_stop_flag, clear_stop_flag
|
||||||
|
|
||||||
|
|
||||||
|
async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, pending_append, append_probe_buffer: str, pending_modify, modify_probe_buffer: str, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, append_result: Dict[str, Any], modify_result: Dict[str, Any], last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]:
|
||||||
|
api_error = None
|
||||||
|
for api_attempt in range(max_api_retries + 1):
|
||||||
|
api_error = None
|
||||||
|
if api_attempt > 0:
|
||||||
|
full_response = ""
|
||||||
|
tool_calls = []
|
||||||
|
current_thinking = ""
|
||||||
|
detected_tools = {}
|
||||||
|
last_usage_payload = None
|
||||||
|
in_thinking = False
|
||||||
|
thinking_started = False
|
||||||
|
thinking_ended = False
|
||||||
|
text_started = False
|
||||||
|
text_has_content = False
|
||||||
|
text_streaming = False
|
||||||
|
text_chunk_index = 0
|
||||||
|
last_text_chunk_time = None
|
||||||
|
chunk_count = 0
|
||||||
|
reasoning_chunks = 0
|
||||||
|
content_chunks = 0
|
||||||
|
tool_chunks = 0
|
||||||
|
append_result = {"handled": False}
|
||||||
|
modify_result = {"handled": False}
|
||||||
|
last_finish_reason = None
|
||||||
|
|
||||||
|
append_break_triggered = False
|
||||||
|
modify_break_triggered = False
|
||||||
|
|
||||||
|
# 收集流式响应
|
||||||
|
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
|
||||||
|
chunk_count += 1
|
||||||
|
|
||||||
|
# 检查停止标志
|
||||||
|
client_stop_info = get_stop_flag(client_sid, username)
|
||||||
|
if client_stop_info:
|
||||||
|
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
|
||||||
|
if stop_requested:
|
||||||
|
debug_log(f"检测到停止请求,中断流处理")
|
||||||
|
if pending_append:
|
||||||
|
append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
|
||||||
|
if pending_modify:
|
||||||
|
modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
|
||||||
|
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
||||||
|
sender('task_stopped', {
|
||||||
|
'message': '命令执行被用户取消',
|
||||||
|
'reason': 'user_stop'
|
||||||
|
})
|
||||||
|
clear_stop_flag(client_sid, username)
|
||||||
|
return {
|
||||||
|
"stopped": True,
|
||||||
|
"full_response": full_response,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"current_thinking": current_thinking,
|
||||||
|
"detected_tools": detected_tools,
|
||||||
|
"last_usage_payload": last_usage_payload,
|
||||||
|
"in_thinking": in_thinking,
|
||||||
|
"thinking_started": thinking_started,
|
||||||
|
"thinking_ended": thinking_ended,
|
||||||
|
"text_started": text_started,
|
||||||
|
"text_has_content": text_has_content,
|
||||||
|
"text_streaming": text_streaming,
|
||||||
|
"text_chunk_index": text_chunk_index,
|
||||||
|
"last_text_chunk_time": last_text_chunk_time,
|
||||||
|
"chunk_count": chunk_count,
|
||||||
|
"reasoning_chunks": reasoning_chunks,
|
||||||
|
"content_chunks": content_chunks,
|
||||||
|
"tool_chunks": tool_chunks,
|
||||||
|
"append_result": append_result,
|
||||||
|
"modify_result": modify_result,
|
||||||
|
"last_finish_reason": last_finish_reason,
|
||||||
|
"pending_append": pending_append,
|
||||||
|
"append_probe_buffer": append_probe_buffer,
|
||||||
|
"pending_modify": pending_modify,
|
||||||
|
"modify_probe_buffer": modify_probe_buffer,
|
||||||
|
"accumulated_response": accumulated_response,
|
||||||
|
}
|
||||||
|
|
||||||
|
if isinstance(chunk, dict) and chunk.get("error"):
|
||||||
|
api_error = chunk.get("error")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空)
|
||||||
|
usage_info = chunk.get("usage")
|
||||||
|
if usage_info:
|
||||||
|
last_usage_payload = usage_info
|
||||||
|
|
||||||
|
if "choices" not in chunk:
|
||||||
|
debug_log(f"Chunk {chunk_count}: 无choices字段")
|
||||||
|
continue
|
||||||
|
if not chunk.get("choices"):
|
||||||
|
debug_log(f"Chunk {chunk_count}: choices为空列表")
|
||||||
|
continue
|
||||||
|
choice = chunk["choices"][0]
|
||||||
|
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
|
||||||
|
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
|
||||||
|
last_usage_payload = choice.get("usage")
|
||||||
|
delta = choice.get("delta", {})
|
||||||
|
finish_reason = choice.get("finish_reason")
|
||||||
|
if finish_reason:
|
||||||
|
last_finish_reason = finish_reason
|
||||||
|
|
||||||
|
# 处理思考内容(兼容 reasoning_content / reasoning_details)
|
||||||
|
reasoning_content = ""
|
||||||
|
if "reasoning_content" in delta:
|
||||||
|
reasoning_content = delta.get("reasoning_content") or ""
|
||||||
|
elif "reasoning_details" in delta:
|
||||||
|
details = delta.get("reasoning_details")
|
||||||
|
if isinstance(details, list):
|
||||||
|
parts = []
|
||||||
|
for item in details:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
text = item.get("text")
|
||||||
|
if text:
|
||||||
|
parts.append(text)
|
||||||
|
if parts:
|
||||||
|
reasoning_content = "".join(parts)
|
||||||
|
if reasoning_content:
|
||||||
|
reasoning_chunks += 1
|
||||||
|
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
|
||||||
|
|
||||||
|
if not thinking_started:
|
||||||
|
in_thinking = True
|
||||||
|
thinking_started = True
|
||||||
|
sender('thinking_start', {})
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
current_thinking += reasoning_content
|
||||||
|
sender('thinking_chunk', {'content': reasoning_content})
|
||||||
|
|
||||||
|
# 处理正常内容
|
||||||
|
if "content" in delta:
|
||||||
|
content = delta["content"]
|
||||||
|
if content:
|
||||||
|
content_chunks += 1
|
||||||
|
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
|
||||||
|
|
||||||
|
if in_thinking and not thinking_ended:
|
||||||
|
in_thinking = False
|
||||||
|
thinking_ended = True
|
||||||
|
sender('thinking_end', {'full_content': current_thinking})
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None))
|
||||||
|
expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None))
|
||||||
|
|
||||||
|
if pending_modify:
|
||||||
|
if not pending_modify.get("start_seen"):
|
||||||
|
probe_buffer = pending_modify.get("probe_buffer", "") + content
|
||||||
|
if len(probe_buffer) > 10000:
|
||||||
|
probe_buffer = probe_buffer[-10000:]
|
||||||
|
marker = pending_modify.get("start_marker")
|
||||||
|
marker_index = probe_buffer.find(marker)
|
||||||
|
if marker_index == -1:
|
||||||
|
pending_modify["probe_buffer"] = probe_buffer
|
||||||
|
continue
|
||||||
|
after_marker = marker_index + len(marker)
|
||||||
|
remainder = probe_buffer[after_marker:]
|
||||||
|
pending_modify["buffer"] = remainder
|
||||||
|
pending_modify["raw_buffer"] = marker + remainder
|
||||||
|
pending_modify["start_seen"] = True
|
||||||
|
pending_modify["detected_blocks"] = set()
|
||||||
|
pending_modify["probe_buffer"] = ""
|
||||||
|
if pending_modify.get("display_id"):
|
||||||
|
sender('update_action', {
|
||||||
|
'id': pending_modify["display_id"],
|
||||||
|
'status': 'running',
|
||||||
|
'preparing_id': pending_modify.get("tool_call_id"),
|
||||||
|
'message': f"正在修改 {pending_modify['path']}..."
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
pending_modify["buffer"] += content
|
||||||
|
pending_modify["raw_buffer"] += content
|
||||||
|
|
||||||
|
if pending_modify.get("start_seen"):
|
||||||
|
block_text = pending_modify["buffer"]
|
||||||
|
for match in re.finditer(r"\[replace:(\d+)\]", block_text):
|
||||||
|
try:
|
||||||
|
block_index = int(match.group(1))
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
detected_blocks = pending_modify.setdefault("detected_blocks", set())
|
||||||
|
if block_index not in detected_blocks:
|
||||||
|
detected_blocks.add(block_index)
|
||||||
|
if pending_modify.get("display_id"):
|
||||||
|
sender('update_action', {
|
||||||
|
'id': pending_modify["display_id"],
|
||||||
|
'status': 'running',
|
||||||
|
'preparing_id': pending_modify.get("tool_call_id"),
|
||||||
|
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..."
|
||||||
|
})
|
||||||
|
|
||||||
|
if pending_modify.get("start_seen"):
|
||||||
|
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
|
||||||
|
if end_pos != -1:
|
||||||
|
pending_modify["end_index"] = end_pos
|
||||||
|
modify_break_triggered = True
|
||||||
|
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
elif expecting_modify:
|
||||||
|
modify_probe_buffer += content
|
||||||
|
if len(modify_probe_buffer) > 10000:
|
||||||
|
modify_probe_buffer = modify_probe_buffer[-10000:]
|
||||||
|
|
||||||
|
marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer)
|
||||||
|
if marker_match:
|
||||||
|
detected_raw_path = marker_match.group(1)
|
||||||
|
detected_path = detected_raw_path.strip()
|
||||||
|
marker_full = marker_match.group(0)
|
||||||
|
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full)
|
||||||
|
remainder = modify_probe_buffer[after_marker_index:]
|
||||||
|
modify_probe_buffer = ""
|
||||||
|
|
||||||
|
if not detected_path:
|
||||||
|
debug_log("检测到 MODIFY 起始标记但路径为空,忽略。")
|
||||||
|
continue
|
||||||
|
|
||||||
|
pending_modify = {
|
||||||
|
"path": detected_path,
|
||||||
|
"tool_call_id": None,
|
||||||
|
"buffer": remainder,
|
||||||
|
"raw_buffer": marker_full + remainder,
|
||||||
|
"start_marker": marker_full,
|
||||||
|
"end_marker": "<<<END_MODIFY>>>",
|
||||||
|
"start_seen": True,
|
||||||
|
"end_index": None,
|
||||||
|
"display_id": None,
|
||||||
|
"detected_blocks": set()
|
||||||
|
}
|
||||||
|
if hasattr(web_terminal, "pending_modify_request"):
|
||||||
|
web_terminal.pending_modify_request = {"path": detected_path}
|
||||||
|
debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}")
|
||||||
|
|
||||||
|
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
|
||||||
|
if end_pos != -1:
|
||||||
|
pending_modify["end_index"] = end_pos
|
||||||
|
modify_break_triggered = True
|
||||||
|
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
|
||||||
|
if pending_append:
|
||||||
|
pending_append["buffer"] += content
|
||||||
|
|
||||||
|
if pending_append.get("content_start") is None:
|
||||||
|
marker_index = pending_append["buffer"].find(pending_append["start_marker"])
|
||||||
|
if marker_index != -1:
|
||||||
|
pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
|
||||||
|
debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
|
||||||
|
|
||||||
|
if pending_append.get("content_start") is not None:
|
||||||
|
end_index = pending_append["buffer"].find(
|
||||||
|
pending_append["end_marker"],
|
||||||
|
pending_append["content_start"]
|
||||||
|
)
|
||||||
|
if end_index != -1:
|
||||||
|
pending_append["end_index"] = end_index
|
||||||
|
append_break_triggered = True
|
||||||
|
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
|
||||||
|
break
|
||||||
|
|
||||||
|
# 继续累积追加内容
|
||||||
|
continue
|
||||||
|
elif expecting_append:
|
||||||
|
append_probe_buffer += content
|
||||||
|
# 限制缓冲区大小防止过长
|
||||||
|
if len(append_probe_buffer) > 10000:
|
||||||
|
append_probe_buffer = append_probe_buffer[-10000:]
|
||||||
|
|
||||||
|
marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
|
||||||
|
if marker_match:
|
||||||
|
detected_raw_path = marker_match.group(1)
|
||||||
|
detected_path = detected_raw_path.strip()
|
||||||
|
if not detected_path:
|
||||||
|
append_probe_buffer = append_probe_buffer[marker_match.end():]
|
||||||
|
continue
|
||||||
|
marker_full = marker_match.group(0)
|
||||||
|
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
|
||||||
|
remainder = append_probe_buffer[after_marker_index:]
|
||||||
|
append_probe_buffer = ""
|
||||||
|
pending_append = {
|
||||||
|
"path": detected_path,
|
||||||
|
"tool_call_id": None,
|
||||||
|
"buffer": remainder,
|
||||||
|
"start_marker": marker_full,
|
||||||
|
"end_marker": "<<<END_APPEND>>>",
|
||||||
|
"content_start": 0,
|
||||||
|
"end_index": None,
|
||||||
|
"display_id": None
|
||||||
|
}
|
||||||
|
if hasattr(web_terminal, "pending_append_request"):
|
||||||
|
web_terminal.pending_append_request = {"path": detected_path}
|
||||||
|
debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}")
|
||||||
|
# 检查是否立即包含结束标记
|
||||||
|
if pending_append["buffer"]:
|
||||||
|
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
|
||||||
|
if end_index != -1:
|
||||||
|
pending_append["end_index"] = end_index
|
||||||
|
append_break_triggered = True
|
||||||
|
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not text_started:
|
||||||
|
text_started = True
|
||||||
|
text_streaming = True
|
||||||
|
sender('text_start', {})
|
||||||
|
brief_log("模型输出了内容")
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
if not pending_append:
|
||||||
|
full_response += content
|
||||||
|
accumulated_response += content
|
||||||
|
text_has_content = True
|
||||||
|
emit_time = time.time()
|
||||||
|
elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time
|
||||||
|
last_text_chunk_time = emit_time
|
||||||
|
text_chunk_index += 1
|
||||||
|
log_backend_chunk(
|
||||||
|
conversation_id,
|
||||||
|
current_iteration,
|
||||||
|
text_chunk_index,
|
||||||
|
elapsed,
|
||||||
|
len(content),
|
||||||
|
content[:32]
|
||||||
|
)
|
||||||
|
sender('text_chunk', {
|
||||||
|
'content': content,
|
||||||
|
'index': text_chunk_index,
|
||||||
|
'elapsed': elapsed
|
||||||
|
})
|
||||||
|
|
||||||
|
# 收集工具调用 - 实时发送准备状态
|
||||||
|
if "tool_calls" in delta:
|
||||||
|
tool_chunks += 1
|
||||||
|
for tc in delta["tool_calls"]:
|
||||||
|
found = False
|
||||||
|
for existing in tool_calls:
|
||||||
|
if existing.get("index") == tc.get("index"):
|
||||||
|
if "function" in tc and "arguments" in tc["function"]:
|
||||||
|
arg_chunk = tc["function"]["arguments"]
|
||||||
|
existing_fn = existing.get("function", {})
|
||||||
|
existing_args = existing_fn.get("arguments", "")
|
||||||
|
existing_fn["arguments"] = (existing_args or "") + arg_chunk
|
||||||
|
existing["function"] = existing_fn
|
||||||
|
|
||||||
|
combined_args = existing_fn.get("arguments", "")
|
||||||
|
tool_id = existing.get("id") or tc.get("id")
|
||||||
|
tool_name = (
|
||||||
|
existing_fn.get("name")
|
||||||
|
or tc.get("function", {}).get("name", "")
|
||||||
|
)
|
||||||
|
intent_value = extract_intent_from_partial(combined_args)
|
||||||
|
if (
|
||||||
|
intent_value
|
||||||
|
and tool_id
|
||||||
|
and detected_tool_intent.get(tool_id) != intent_value
|
||||||
|
):
|
||||||
|
detected_tool_intent[tool_id] = intent_value
|
||||||
|
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
|
||||||
|
sender('tool_intent', {
|
||||||
|
'id': tool_id,
|
||||||
|
'name': tool_name,
|
||||||
|
'intent': intent_value,
|
||||||
|
'conversation_id': conversation_id
|
||||||
|
})
|
||||||
|
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not found and tc.get("id"):
|
||||||
|
tool_id = tc["id"]
|
||||||
|
tool_name = tc.get("function", {}).get("name", "")
|
||||||
|
arguments_str = tc.get("function", {}).get("arguments", "") or ""
|
||||||
|
|
||||||
|
# 新工具检测到,立即发送准备事件
|
||||||
|
if tool_id not in detected_tools and tool_name:
|
||||||
|
detected_tools[tool_id] = tool_name
|
||||||
|
|
||||||
|
# 尝试提前提取 intent
|
||||||
|
intent_value = None
|
||||||
|
if arguments_str:
|
||||||
|
intent_value = extract_intent_from_partial(arguments_str)
|
||||||
|
if intent_value:
|
||||||
|
detected_tool_intent[tool_id] = intent_value
|
||||||
|
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
|
||||||
|
|
||||||
|
# 立即发送工具准备中事件
|
||||||
|
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}")
|
||||||
|
sender('tool_preparing', {
|
||||||
|
'id': tool_id,
|
||||||
|
'name': tool_name,
|
||||||
|
'message': f'准备调用 {tool_name}...',
|
||||||
|
'intent': intent_value,
|
||||||
|
'conversation_id': conversation_id
|
||||||
|
})
|
||||||
|
debug_log(f" 发送工具准备事件: {tool_name}")
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
tool_calls.append({
|
||||||
|
"id": tool_id,
|
||||||
|
"index": tc.get("index"),
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": tool_name,
|
||||||
|
"arguments": arguments_str
|
||||||
|
}
|
||||||
|
})
|
||||||
|
# 尝试从增量参数中抽取 intent,并单独推送
|
||||||
|
if tool_id and arguments_str:
|
||||||
|
intent_value = extract_intent_from_partial(arguments_str)
|
||||||
|
if intent_value and detected_tool_intent.get(tool_id) != intent_value:
|
||||||
|
detected_tool_intent[tool_id] = intent_value
|
||||||
|
sender('tool_intent', {
|
||||||
|
'id': tool_id,
|
||||||
|
'name': tool_name,
|
||||||
|
'intent': intent_value,
|
||||||
|
'conversation_id': conversation_id
|
||||||
|
})
|
||||||
|
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
debug_log(f" 新工具: {tool_name}")
|
||||||
|
|
||||||
|
# 检查是否被停止
|
||||||
|
client_stop_info = get_stop_flag(client_sid, username)
|
||||||
|
if client_stop_info:
|
||||||
|
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
|
||||||
|
if stop_requested:
|
||||||
|
debug_log("任务在流处理完成后检测到停止状态")
|
||||||
|
sender('task_stopped', {
|
||||||
|
'message': '命令执行被用户取消',
|
||||||
|
'reason': 'user_stop'
|
||||||
|
})
|
||||||
|
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
||||||
|
clear_stop_flag(client_sid, username)
|
||||||
|
return {
|
||||||
|
"stopped": True,
|
||||||
|
"full_response": full_response,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"current_thinking": current_thinking,
|
||||||
|
"detected_tools": detected_tools,
|
||||||
|
"last_usage_payload": last_usage_payload,
|
||||||
|
"in_thinking": in_thinking,
|
||||||
|
"thinking_started": thinking_started,
|
||||||
|
"thinking_ended": thinking_ended,
|
||||||
|
"text_started": text_started,
|
||||||
|
"text_has_content": text_has_content,
|
||||||
|
"text_streaming": text_streaming,
|
||||||
|
"text_chunk_index": text_chunk_index,
|
||||||
|
"last_text_chunk_time": last_text_chunk_time,
|
||||||
|
"chunk_count": chunk_count,
|
||||||
|
"reasoning_chunks": reasoning_chunks,
|
||||||
|
"content_chunks": content_chunks,
|
||||||
|
"tool_chunks": tool_chunks,
|
||||||
|
"append_result": append_result,
|
||||||
|
"modify_result": modify_result,
|
||||||
|
"last_finish_reason": last_finish_reason,
|
||||||
|
"pending_append": pending_append,
|
||||||
|
"append_probe_buffer": append_probe_buffer,
|
||||||
|
"pending_modify": pending_modify,
|
||||||
|
"modify_probe_buffer": modify_probe_buffer,
|
||||||
|
"accumulated_response": accumulated_response,
|
||||||
|
}
|
||||||
|
|
||||||
|
# === API响应完成后只计算输出token ===
|
||||||
|
if last_usage_payload:
|
||||||
|
try:
|
||||||
|
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
|
||||||
|
debug_log(
|
||||||
|
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
|
||||||
|
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
|
||||||
|
f"total={last_usage_payload.get('total_tokens', 0)}"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
debug_log(f"Usage统计更新失败: {e}")
|
||||||
|
else:
|
||||||
|
debug_log("未获取到usage字段,跳过token统计更新")
|
||||||
|
|
||||||
|
|
||||||
|
if api_error:
|
||||||
|
try:
|
||||||
|
debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}")
|
||||||
|
except Exception:
|
||||||
|
debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}")
|
||||||
|
error_message = ""
|
||||||
|
error_status = None
|
||||||
|
error_type = None
|
||||||
|
error_code = None
|
||||||
|
error_text = ""
|
||||||
|
request_dump = None
|
||||||
|
error_base_url = None
|
||||||
|
error_model_id = None
|
||||||
|
if isinstance(api_error, dict):
|
||||||
|
error_status = api_error.get("status_code")
|
||||||
|
error_type = api_error.get("error_type") or api_error.get("type")
|
||||||
|
error_code = api_error.get("error_code") or api_error.get("code")
|
||||||
|
error_text = api_error.get("error_text") or ""
|
||||||
|
error_message = (
|
||||||
|
api_error.get("error_message")
|
||||||
|
or api_error.get("message")
|
||||||
|
or error_text
|
||||||
|
or ""
|
||||||
|
)
|
||||||
|
request_dump = api_error.get("request_dump")
|
||||||
|
error_base_url = api_error.get("base_url")
|
||||||
|
error_model_id = api_error.get("model_id")
|
||||||
|
elif isinstance(api_error, str):
|
||||||
|
error_message = api_error
|
||||||
|
if not error_message:
|
||||||
|
if error_status:
|
||||||
|
error_message = f"API 请求失败(HTTP {error_status})"
|
||||||
|
else:
|
||||||
|
error_message = "API 请求失败"
|
||||||
|
# 若命中阿里云配额错误,立即写入状态并切换到官方 API
|
||||||
|
try:
|
||||||
|
from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
|
||||||
|
disabled_until, reason = compute_disabled_until(error_message)
|
||||||
|
if disabled_until and reason:
|
||||||
|
set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason)
|
||||||
|
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
|
||||||
|
web_terminal.apply_model_profile(profile)
|
||||||
|
except Exception as exc:
|
||||||
|
debug_log(f"处理阿里云配额回退失败: {exc}")
|
||||||
|
can_retry = (
|
||||||
|
api_attempt < max_api_retries
|
||||||
|
and not full_response
|
||||||
|
and not tool_calls
|
||||||
|
and not current_thinking
|
||||||
|
and not pending_append
|
||||||
|
and not pending_modify
|
||||||
|
)
|
||||||
|
sender('error', {
|
||||||
|
'message': error_message,
|
||||||
|
'status_code': error_status,
|
||||||
|
'error_type': error_type,
|
||||||
|
'error_code': error_code,
|
||||||
|
'error_text': error_text,
|
||||||
|
'request_dump': request_dump,
|
||||||
|
'base_url': error_base_url,
|
||||||
|
'model_id': error_model_id,
|
||||||
|
'retry': bool(can_retry),
|
||||||
|
'retry_in': retry_delay_seconds if can_retry else None,
|
||||||
|
'attempt': api_attempt + 1,
|
||||||
|
'max_attempts': max_api_retries + 1
|
||||||
|
})
|
||||||
|
if can_retry:
|
||||||
|
try:
|
||||||
|
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
|
||||||
|
web_terminal.apply_model_profile(profile)
|
||||||
|
except Exception as exc:
|
||||||
|
debug_log(f"重试前更新模型配置失败: {exc}")
|
||||||
|
cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag)
|
||||||
|
if cancelled:
|
||||||
|
return {
|
||||||
|
"stopped": True,
|
||||||
|
"full_response": full_response,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"current_thinking": current_thinking,
|
||||||
|
"detected_tools": detected_tools,
|
||||||
|
"last_usage_payload": last_usage_payload,
|
||||||
|
"in_thinking": in_thinking,
|
||||||
|
"thinking_started": thinking_started,
|
||||||
|
"thinking_ended": thinking_ended,
|
||||||
|
"text_started": text_started,
|
||||||
|
"text_has_content": text_has_content,
|
||||||
|
"text_streaming": text_streaming,
|
||||||
|
"text_chunk_index": text_chunk_index,
|
||||||
|
"last_text_chunk_time": last_text_chunk_time,
|
||||||
|
"chunk_count": chunk_count,
|
||||||
|
"reasoning_chunks": reasoning_chunks,
|
||||||
|
"content_chunks": content_chunks,
|
||||||
|
"tool_chunks": tool_chunks,
|
||||||
|
"append_result": append_result,
|
||||||
|
"modify_result": modify_result,
|
||||||
|
"last_finish_reason": last_finish_reason,
|
||||||
|
"pending_append": pending_append,
|
||||||
|
"append_probe_buffer": append_probe_buffer,
|
||||||
|
"pending_modify": pending_modify,
|
||||||
|
"modify_probe_buffer": modify_probe_buffer,
|
||||||
|
"accumulated_response": accumulated_response,
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
||||||
|
return {
|
||||||
|
"stopped": True,
|
||||||
|
"full_response": full_response,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"current_thinking": current_thinking,
|
||||||
|
"detected_tools": detected_tools,
|
||||||
|
"last_usage_payload": last_usage_payload,
|
||||||
|
"in_thinking": in_thinking,
|
||||||
|
"thinking_started": thinking_started,
|
||||||
|
"thinking_ended": thinking_ended,
|
||||||
|
"text_started": text_started,
|
||||||
|
"text_has_content": text_has_content,
|
||||||
|
"text_streaming": text_streaming,
|
||||||
|
"text_chunk_index": text_chunk_index,
|
||||||
|
"last_text_chunk_time": last_text_chunk_time,
|
||||||
|
"chunk_count": chunk_count,
|
||||||
|
"reasoning_chunks": reasoning_chunks,
|
||||||
|
"content_chunks": content_chunks,
|
||||||
|
"tool_chunks": tool_chunks,
|
||||||
|
"append_result": append_result,
|
||||||
|
"modify_result": modify_result,
|
||||||
|
"last_finish_reason": last_finish_reason,
|
||||||
|
"pending_append": pending_append,
|
||||||
|
"append_probe_buffer": append_probe_buffer,
|
||||||
|
"pending_modify": pending_modify,
|
||||||
|
"modify_probe_buffer": modify_probe_buffer,
|
||||||
|
"accumulated_response": accumulated_response,
|
||||||
|
}
|
||||||
|
break
|
||||||
|
|
||||||
|
return {
|
||||||
|
"stopped": False,
|
||||||
|
"full_response": full_response,
|
||||||
|
"tool_calls": tool_calls,
|
||||||
|
"current_thinking": current_thinking,
|
||||||
|
"detected_tools": detected_tools,
|
||||||
|
"last_usage_payload": last_usage_payload,
|
||||||
|
"in_thinking": in_thinking,
|
||||||
|
"thinking_started": thinking_started,
|
||||||
|
"thinking_ended": thinking_ended,
|
||||||
|
"text_started": text_started,
|
||||||
|
"text_has_content": text_has_content,
|
||||||
|
"text_streaming": text_streaming,
|
||||||
|
"text_chunk_index": text_chunk_index,
|
||||||
|
"last_text_chunk_time": last_text_chunk_time,
|
||||||
|
"chunk_count": chunk_count,
|
||||||
|
"reasoning_chunks": reasoning_chunks,
|
||||||
|
"content_chunks": content_chunks,
|
||||||
|
"tool_chunks": tool_chunks,
|
||||||
|
"append_result": append_result,
|
||||||
|
"modify_result": modify_result,
|
||||||
|
"last_finish_reason": last_finish_reason,
|
||||||
|
"pending_append": pending_append,
|
||||||
|
"append_probe_buffer": append_probe_buffer,
|
||||||
|
"pending_modify": pending_modify,
|
||||||
|
"modify_probe_buffer": modify_probe_buffer,
|
||||||
|
"accumulated_response": accumulated_response,
|
||||||
|
}
|
||||||
@ -124,8 +124,9 @@ from .chat_flow_runtime import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify
|
from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify
|
||||||
from .chat_flow_task_support import process_sub_agent_updates, wait_retry_delay, cancel_pending_tools
|
from .chat_flow_task_support import process_sub_agent_updates
|
||||||
from .chat_flow_tool_loop import execute_tool_calls
|
from .chat_flow_tool_loop import execute_tool_calls
|
||||||
|
from .chat_flow_stream_loop import run_streaming_attempts
|
||||||
|
|
||||||
async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspace, message, images, sender, client_sid, username: str, videos=None):
|
async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspace, message, images, sender, client_sid, username: str, videos=None):
|
||||||
"""处理任务并发送消息 - 集成token统计版本"""
|
"""处理任务并发送消息 - 集成token统计版本"""
|
||||||
@ -295,9 +296,7 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
|
|||||||
reasoning_chunks = 0
|
reasoning_chunks = 0
|
||||||
content_chunks = 0
|
content_chunks = 0
|
||||||
tool_chunks = 0
|
tool_chunks = 0
|
||||||
append_break_triggered = False
|
|
||||||
append_result = {"handled": False}
|
append_result = {"handled": False}
|
||||||
modify_break_triggered = False
|
|
||||||
modify_result = {"handled": False}
|
modify_result = {"handled": False}
|
||||||
last_finish_reason = None
|
last_finish_reason = None
|
||||||
|
|
||||||
@ -328,511 +327,72 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac
|
|||||||
tool_call_limit_label = MAX_TOTAL_TOOL_CALLS if MAX_TOTAL_TOOL_CALLS is not None else "∞"
|
tool_call_limit_label = MAX_TOTAL_TOOL_CALLS if MAX_TOTAL_TOOL_CALLS is not None else "∞"
|
||||||
print(f"[API] 第{current_iteration}次调用 (总工具调用: {total_tool_calls}/{tool_call_limit_label})")
|
print(f"[API] 第{current_iteration}次调用 (总工具调用: {total_tool_calls}/{tool_call_limit_label})")
|
||||||
|
|
||||||
api_error = None
|
stream_result = await run_streaming_attempts(
|
||||||
for api_attempt in range(max_api_retries + 1):
|
web_terminal=web_terminal,
|
||||||
api_error = None
|
messages=messages,
|
||||||
if api_attempt > 0:
|
tools=tools,
|
||||||
full_response = ""
|
sender=sender,
|
||||||
tool_calls = []
|
client_sid=client_sid,
|
||||||
current_thinking = ""
|
username=username,
|
||||||
detected_tools = {}
|
conversation_id=conversation_id,
|
||||||
last_usage_payload = None
|
current_iteration=current_iteration,
|
||||||
in_thinking = False
|
max_api_retries=max_api_retries,
|
||||||
thinking_started = False
|
retry_delay_seconds=retry_delay_seconds,
|
||||||
thinking_ended = False
|
pending_append=pending_append,
|
||||||
text_started = False
|
append_probe_buffer=append_probe_buffer,
|
||||||
text_has_content = False
|
pending_modify=pending_modify,
|
||||||
text_streaming = False
|
modify_probe_buffer=modify_probe_buffer,
|
||||||
text_chunk_index = 0
|
detected_tool_intent=detected_tool_intent,
|
||||||
last_text_chunk_time = None
|
full_response=full_response,
|
||||||
chunk_count = 0
|
tool_calls=tool_calls,
|
||||||
reasoning_chunks = 0
|
current_thinking=current_thinking,
|
||||||
content_chunks = 0
|
detected_tools=detected_tools,
|
||||||
tool_chunks = 0
|
last_usage_payload=last_usage_payload,
|
||||||
append_break_triggered = False
|
in_thinking=in_thinking,
|
||||||
append_result = {"handled": False}
|
thinking_started=thinking_started,
|
||||||
modify_break_triggered = False
|
thinking_ended=thinking_ended,
|
||||||
modify_result = {"handled": False}
|
text_started=text_started,
|
||||||
last_finish_reason = None
|
text_has_content=text_has_content,
|
||||||
|
text_streaming=text_streaming,
|
||||||
|
text_chunk_index=text_chunk_index,
|
||||||
|
last_text_chunk_time=last_text_chunk_time,
|
||||||
|
chunk_count=chunk_count,
|
||||||
|
reasoning_chunks=reasoning_chunks,
|
||||||
|
content_chunks=content_chunks,
|
||||||
|
tool_chunks=tool_chunks,
|
||||||
|
append_result=append_result,
|
||||||
|
modify_result=modify_result,
|
||||||
|
last_finish_reason=last_finish_reason,
|
||||||
|
accumulated_response=accumulated_response,
|
||||||
|
)
|
||||||
|
if stream_result.get("stopped"):
|
||||||
|
return
|
||||||
|
|
||||||
# 收集流式响应
|
full_response = stream_result["full_response"]
|
||||||
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
|
tool_calls = stream_result["tool_calls"]
|
||||||
chunk_count += 1
|
current_thinking = stream_result["current_thinking"]
|
||||||
|
detected_tools = stream_result["detected_tools"]
|
||||||
# 检查停止标志
|
last_usage_payload = stream_result["last_usage_payload"]
|
||||||
client_stop_info = get_stop_flag(client_sid, username)
|
in_thinking = stream_result["in_thinking"]
|
||||||
if client_stop_info:
|
thinking_started = stream_result["thinking_started"]
|
||||||
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
|
thinking_ended = stream_result["thinking_ended"]
|
||||||
if stop_requested:
|
text_started = stream_result["text_started"]
|
||||||
debug_log(f"检测到停止请求,中断流处理")
|
text_has_content = stream_result["text_has_content"]
|
||||||
if pending_append:
|
text_streaming = stream_result["text_streaming"]
|
||||||
append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
|
text_chunk_index = stream_result["text_chunk_index"]
|
||||||
if pending_modify:
|
last_text_chunk_time = stream_result["last_text_chunk_time"]
|
||||||
modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
|
chunk_count = stream_result["chunk_count"]
|
||||||
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
reasoning_chunks = stream_result["reasoning_chunks"]
|
||||||
sender('task_stopped', {
|
content_chunks = stream_result["content_chunks"]
|
||||||
'message': '命令执行被用户取消',
|
tool_chunks = stream_result["tool_chunks"]
|
||||||
'reason': 'user_stop'
|
append_result = stream_result["append_result"]
|
||||||
})
|
modify_result = stream_result["modify_result"]
|
||||||
clear_stop_flag(client_sid, username)
|
last_finish_reason = stream_result["last_finish_reason"]
|
||||||
return
|
pending_append = stream_result["pending_append"]
|
||||||
|
append_probe_buffer = stream_result["append_probe_buffer"]
|
||||||
if isinstance(chunk, dict) and chunk.get("error"):
|
pending_modify = stream_result["pending_modify"]
|
||||||
api_error = chunk.get("error")
|
modify_probe_buffer = stream_result["modify_probe_buffer"]
|
||||||
break
|
accumulated_response = stream_result["accumulated_response"]
|
||||||
|
|
||||||
# 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空)
|
|
||||||
usage_info = chunk.get("usage")
|
|
||||||
if usage_info:
|
|
||||||
last_usage_payload = usage_info
|
|
||||||
|
|
||||||
if "choices" not in chunk:
|
|
||||||
debug_log(f"Chunk {chunk_count}: 无choices字段")
|
|
||||||
continue
|
|
||||||
if not chunk.get("choices"):
|
|
||||||
debug_log(f"Chunk {chunk_count}: choices为空列表")
|
|
||||||
continue
|
|
||||||
choice = chunk["choices"][0]
|
|
||||||
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
|
|
||||||
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
|
|
||||||
last_usage_payload = choice.get("usage")
|
|
||||||
delta = choice.get("delta", {})
|
|
||||||
finish_reason = choice.get("finish_reason")
|
|
||||||
if finish_reason:
|
|
||||||
last_finish_reason = finish_reason
|
|
||||||
|
|
||||||
# 处理思考内容(兼容 reasoning_content / reasoning_details)
|
|
||||||
reasoning_content = ""
|
|
||||||
if "reasoning_content" in delta:
|
|
||||||
reasoning_content = delta.get("reasoning_content") or ""
|
|
||||||
elif "reasoning_details" in delta:
|
|
||||||
details = delta.get("reasoning_details")
|
|
||||||
if isinstance(details, list):
|
|
||||||
parts = []
|
|
||||||
for item in details:
|
|
||||||
if isinstance(item, dict):
|
|
||||||
text = item.get("text")
|
|
||||||
if text:
|
|
||||||
parts.append(text)
|
|
||||||
if parts:
|
|
||||||
reasoning_content = "".join(parts)
|
|
||||||
if reasoning_content:
|
|
||||||
reasoning_chunks += 1
|
|
||||||
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
|
|
||||||
|
|
||||||
if not thinking_started:
|
|
||||||
in_thinking = True
|
|
||||||
thinking_started = True
|
|
||||||
sender('thinking_start', {})
|
|
||||||
await asyncio.sleep(0.05)
|
|
||||||
|
|
||||||
current_thinking += reasoning_content
|
|
||||||
sender('thinking_chunk', {'content': reasoning_content})
|
|
||||||
|
|
||||||
# 处理正常内容
|
|
||||||
if "content" in delta:
|
|
||||||
content = delta["content"]
|
|
||||||
if content:
|
|
||||||
content_chunks += 1
|
|
||||||
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
|
|
||||||
|
|
||||||
if in_thinking and not thinking_ended:
|
|
||||||
in_thinking = False
|
|
||||||
thinking_ended = True
|
|
||||||
sender('thinking_end', {'full_content': current_thinking})
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
|
|
||||||
|
|
||||||
expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None))
|
|
||||||
expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None))
|
|
||||||
|
|
||||||
if pending_modify:
|
|
||||||
if not pending_modify.get("start_seen"):
|
|
||||||
probe_buffer = pending_modify.get("probe_buffer", "") + content
|
|
||||||
if len(probe_buffer) > 10000:
|
|
||||||
probe_buffer = probe_buffer[-10000:]
|
|
||||||
marker = pending_modify.get("start_marker")
|
|
||||||
marker_index = probe_buffer.find(marker)
|
|
||||||
if marker_index == -1:
|
|
||||||
pending_modify["probe_buffer"] = probe_buffer
|
|
||||||
continue
|
|
||||||
after_marker = marker_index + len(marker)
|
|
||||||
remainder = probe_buffer[after_marker:]
|
|
||||||
pending_modify["buffer"] = remainder
|
|
||||||
pending_modify["raw_buffer"] = marker + remainder
|
|
||||||
pending_modify["start_seen"] = True
|
|
||||||
pending_modify["detected_blocks"] = set()
|
|
||||||
pending_modify["probe_buffer"] = ""
|
|
||||||
if pending_modify.get("display_id"):
|
|
||||||
sender('update_action', {
|
|
||||||
'id': pending_modify["display_id"],
|
|
||||||
'status': 'running',
|
|
||||||
'preparing_id': pending_modify.get("tool_call_id"),
|
|
||||||
'message': f"正在修改 {pending_modify['path']}..."
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
pending_modify["buffer"] += content
|
|
||||||
pending_modify["raw_buffer"] += content
|
|
||||||
|
|
||||||
if pending_modify.get("start_seen"):
|
|
||||||
block_text = pending_modify["buffer"]
|
|
||||||
for match in re.finditer(r"\[replace:(\d+)\]", block_text):
|
|
||||||
try:
|
|
||||||
block_index = int(match.group(1))
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
detected_blocks = pending_modify.setdefault("detected_blocks", set())
|
|
||||||
if block_index not in detected_blocks:
|
|
||||||
detected_blocks.add(block_index)
|
|
||||||
if pending_modify.get("display_id"):
|
|
||||||
sender('update_action', {
|
|
||||||
'id': pending_modify["display_id"],
|
|
||||||
'status': 'running',
|
|
||||||
'preparing_id': pending_modify.get("tool_call_id"),
|
|
||||||
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..."
|
|
||||||
})
|
|
||||||
|
|
||||||
if pending_modify.get("start_seen"):
|
|
||||||
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
|
|
||||||
if end_pos != -1:
|
|
||||||
pending_modify["end_index"] = end_pos
|
|
||||||
modify_break_triggered = True
|
|
||||||
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
elif expecting_modify:
|
|
||||||
modify_probe_buffer += content
|
|
||||||
if len(modify_probe_buffer) > 10000:
|
|
||||||
modify_probe_buffer = modify_probe_buffer[-10000:]
|
|
||||||
|
|
||||||
marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer)
|
|
||||||
if marker_match:
|
|
||||||
detected_raw_path = marker_match.group(1)
|
|
||||||
detected_path = detected_raw_path.strip()
|
|
||||||
marker_full = marker_match.group(0)
|
|
||||||
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full)
|
|
||||||
remainder = modify_probe_buffer[after_marker_index:]
|
|
||||||
modify_probe_buffer = ""
|
|
||||||
|
|
||||||
if not detected_path:
|
|
||||||
debug_log("检测到 MODIFY 起始标记但路径为空,忽略。")
|
|
||||||
continue
|
|
||||||
|
|
||||||
pending_modify = {
|
|
||||||
"path": detected_path,
|
|
||||||
"tool_call_id": None,
|
|
||||||
"buffer": remainder,
|
|
||||||
"raw_buffer": marker_full + remainder,
|
|
||||||
"start_marker": marker_full,
|
|
||||||
"end_marker": "<<<END_MODIFY>>>",
|
|
||||||
"start_seen": True,
|
|
||||||
"end_index": None,
|
|
||||||
"display_id": None,
|
|
||||||
"detected_blocks": set()
|
|
||||||
}
|
|
||||||
if hasattr(web_terminal, "pending_modify_request"):
|
|
||||||
web_terminal.pending_modify_request = {"path": detected_path}
|
|
||||||
debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}")
|
|
||||||
|
|
||||||
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
|
|
||||||
if end_pos != -1:
|
|
||||||
pending_modify["end_index"] = end_pos
|
|
||||||
modify_break_triggered = True
|
|
||||||
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
if pending_append:
|
|
||||||
pending_append["buffer"] += content
|
|
||||||
|
|
||||||
if pending_append.get("content_start") is None:
|
|
||||||
marker_index = pending_append["buffer"].find(pending_append["start_marker"])
|
|
||||||
if marker_index != -1:
|
|
||||||
pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
|
|
||||||
debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
|
|
||||||
|
|
||||||
if pending_append.get("content_start") is not None:
|
|
||||||
end_index = pending_append["buffer"].find(
|
|
||||||
pending_append["end_marker"],
|
|
||||||
pending_append["content_start"]
|
|
||||||
)
|
|
||||||
if end_index != -1:
|
|
||||||
pending_append["end_index"] = end_index
|
|
||||||
append_break_triggered = True
|
|
||||||
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
|
|
||||||
break
|
|
||||||
|
|
||||||
# 继续累积追加内容
|
|
||||||
continue
|
|
||||||
elif expecting_append:
|
|
||||||
append_probe_buffer += content
|
|
||||||
# 限制缓冲区大小防止过长
|
|
||||||
if len(append_probe_buffer) > 10000:
|
|
||||||
append_probe_buffer = append_probe_buffer[-10000:]
|
|
||||||
|
|
||||||
marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
|
|
||||||
if marker_match:
|
|
||||||
detected_raw_path = marker_match.group(1)
|
|
||||||
detected_path = detected_raw_path.strip()
|
|
||||||
if not detected_path:
|
|
||||||
append_probe_buffer = append_probe_buffer[marker_match.end():]
|
|
||||||
continue
|
|
||||||
marker_full = marker_match.group(0)
|
|
||||||
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
|
|
||||||
remainder = append_probe_buffer[after_marker_index:]
|
|
||||||
append_probe_buffer = ""
|
|
||||||
pending_append = {
|
|
||||||
"path": detected_path,
|
|
||||||
"tool_call_id": None,
|
|
||||||
"buffer": remainder,
|
|
||||||
"start_marker": marker_full,
|
|
||||||
"end_marker": "<<<END_APPEND>>>",
|
|
||||||
"content_start": 0,
|
|
||||||
"end_index": None,
|
|
||||||
"display_id": None
|
|
||||||
}
|
|
||||||
if hasattr(web_terminal, "pending_append_request"):
|
|
||||||
web_terminal.pending_append_request = {"path": detected_path}
|
|
||||||
debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}")
|
|
||||||
# 检查是否立即包含结束标记
|
|
||||||
if pending_append["buffer"]:
|
|
||||||
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
|
|
||||||
if end_index != -1:
|
|
||||||
pending_append["end_index"] = end_index
|
|
||||||
append_break_triggered = True
|
|
||||||
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not text_started:
|
|
||||||
text_started = True
|
|
||||||
text_streaming = True
|
|
||||||
sender('text_start', {})
|
|
||||||
brief_log("模型输出了内容")
|
|
||||||
await asyncio.sleep(0.05)
|
|
||||||
|
|
||||||
if not pending_append:
|
|
||||||
full_response += content
|
|
||||||
accumulated_response += content
|
|
||||||
text_has_content = True
|
|
||||||
emit_time = time.time()
|
|
||||||
elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time
|
|
||||||
last_text_chunk_time = emit_time
|
|
||||||
text_chunk_index += 1
|
|
||||||
log_backend_chunk(
|
|
||||||
conversation_id,
|
|
||||||
current_iteration,
|
|
||||||
text_chunk_index,
|
|
||||||
elapsed,
|
|
||||||
len(content),
|
|
||||||
content[:32]
|
|
||||||
)
|
|
||||||
sender('text_chunk', {
|
|
||||||
'content': content,
|
|
||||||
'index': text_chunk_index,
|
|
||||||
'elapsed': elapsed
|
|
||||||
})
|
|
||||||
|
|
||||||
# 收集工具调用 - 实时发送准备状态
|
|
||||||
if "tool_calls" in delta:
|
|
||||||
tool_chunks += 1
|
|
||||||
for tc in delta["tool_calls"]:
|
|
||||||
found = False
|
|
||||||
for existing in tool_calls:
|
|
||||||
if existing.get("index") == tc.get("index"):
|
|
||||||
if "function" in tc and "arguments" in tc["function"]:
|
|
||||||
arg_chunk = tc["function"]["arguments"]
|
|
||||||
existing_fn = existing.get("function", {})
|
|
||||||
existing_args = existing_fn.get("arguments", "")
|
|
||||||
existing_fn["arguments"] = (existing_args or "") + arg_chunk
|
|
||||||
existing["function"] = existing_fn
|
|
||||||
|
|
||||||
combined_args = existing_fn.get("arguments", "")
|
|
||||||
tool_id = existing.get("id") or tc.get("id")
|
|
||||||
tool_name = (
|
|
||||||
existing_fn.get("name")
|
|
||||||
or tc.get("function", {}).get("name", "")
|
|
||||||
)
|
|
||||||
intent_value = extract_intent_from_partial(combined_args)
|
|
||||||
if (
|
|
||||||
intent_value
|
|
||||||
and tool_id
|
|
||||||
and detected_tool_intent.get(tool_id) != intent_value
|
|
||||||
):
|
|
||||||
detected_tool_intent[tool_id] = intent_value
|
|
||||||
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
|
|
||||||
sender('tool_intent', {
|
|
||||||
'id': tool_id,
|
|
||||||
'name': tool_name,
|
|
||||||
'intent': intent_value,
|
|
||||||
'conversation_id': conversation_id
|
|
||||||
})
|
|
||||||
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
|
|
||||||
await asyncio.sleep(0.01)
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if not found and tc.get("id"):
|
|
||||||
tool_id = tc["id"]
|
|
||||||
tool_name = tc.get("function", {}).get("name", "")
|
|
||||||
arguments_str = tc.get("function", {}).get("arguments", "") or ""
|
|
||||||
|
|
||||||
# 新工具检测到,立即发送准备事件
|
|
||||||
if tool_id not in detected_tools and tool_name:
|
|
||||||
detected_tools[tool_id] = tool_name
|
|
||||||
|
|
||||||
# 尝试提前提取 intent
|
|
||||||
intent_value = None
|
|
||||||
if arguments_str:
|
|
||||||
intent_value = extract_intent_from_partial(arguments_str)
|
|
||||||
if intent_value:
|
|
||||||
detected_tool_intent[tool_id] = intent_value
|
|
||||||
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
|
|
||||||
|
|
||||||
# 立即发送工具准备中事件
|
|
||||||
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}")
|
|
||||||
sender('tool_preparing', {
|
|
||||||
'id': tool_id,
|
|
||||||
'name': tool_name,
|
|
||||||
'message': f'准备调用 {tool_name}...',
|
|
||||||
'intent': intent_value,
|
|
||||||
'conversation_id': conversation_id
|
|
||||||
})
|
|
||||||
debug_log(f" 发送工具准备事件: {tool_name}")
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
|
|
||||||
tool_calls.append({
|
|
||||||
"id": tool_id,
|
|
||||||
"index": tc.get("index"),
|
|
||||||
"type": "function",
|
|
||||||
"function": {
|
|
||||||
"name": tool_name,
|
|
||||||
"arguments": arguments_str
|
|
||||||
}
|
|
||||||
})
|
|
||||||
# 尝试从增量参数中抽取 intent,并单独推送
|
|
||||||
if tool_id and arguments_str:
|
|
||||||
intent_value = extract_intent_from_partial(arguments_str)
|
|
||||||
if intent_value and detected_tool_intent.get(tool_id) != intent_value:
|
|
||||||
detected_tool_intent[tool_id] = intent_value
|
|
||||||
sender('tool_intent', {
|
|
||||||
'id': tool_id,
|
|
||||||
'name': tool_name,
|
|
||||||
'intent': intent_value,
|
|
||||||
'conversation_id': conversation_id
|
|
||||||
})
|
|
||||||
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
|
|
||||||
await asyncio.sleep(0.01)
|
|
||||||
debug_log(f" 新工具: {tool_name}")
|
|
||||||
|
|
||||||
# 检查是否被停止
|
|
||||||
client_stop_info = get_stop_flag(client_sid, username)
|
|
||||||
if client_stop_info:
|
|
||||||
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
|
|
||||||
if stop_requested:
|
|
||||||
debug_log("任务在流处理完成后检测到停止状态")
|
|
||||||
sender('task_stopped', {
|
|
||||||
'message': '命令执行被用户取消',
|
|
||||||
'reason': 'user_stop'
|
|
||||||
})
|
|
||||||
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
|
||||||
clear_stop_flag(client_sid, username)
|
|
||||||
return
|
|
||||||
|
|
||||||
# === API响应完成后只计算输出token ===
|
|
||||||
if last_usage_payload:
|
|
||||||
try:
|
|
||||||
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
|
|
||||||
debug_log(
|
|
||||||
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
|
|
||||||
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
|
|
||||||
f"total={last_usage_payload.get('total_tokens', 0)}"
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
debug_log(f"Usage统计更新失败: {e}")
|
|
||||||
else:
|
|
||||||
debug_log("未获取到usage字段,跳过token统计更新")
|
|
||||||
|
|
||||||
|
|
||||||
if api_error:
|
|
||||||
try:
|
|
||||||
debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}")
|
|
||||||
except Exception:
|
|
||||||
debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}")
|
|
||||||
error_message = ""
|
|
||||||
error_status = None
|
|
||||||
error_type = None
|
|
||||||
error_code = None
|
|
||||||
error_text = ""
|
|
||||||
request_dump = None
|
|
||||||
error_base_url = None
|
|
||||||
error_model_id = None
|
|
||||||
if isinstance(api_error, dict):
|
|
||||||
error_status = api_error.get("status_code")
|
|
||||||
error_type = api_error.get("error_type") or api_error.get("type")
|
|
||||||
error_code = api_error.get("error_code") or api_error.get("code")
|
|
||||||
error_text = api_error.get("error_text") or ""
|
|
||||||
error_message = (
|
|
||||||
api_error.get("error_message")
|
|
||||||
or api_error.get("message")
|
|
||||||
or error_text
|
|
||||||
or ""
|
|
||||||
)
|
|
||||||
request_dump = api_error.get("request_dump")
|
|
||||||
error_base_url = api_error.get("base_url")
|
|
||||||
error_model_id = api_error.get("model_id")
|
|
||||||
elif isinstance(api_error, str):
|
|
||||||
error_message = api_error
|
|
||||||
if not error_message:
|
|
||||||
if error_status:
|
|
||||||
error_message = f"API 请求失败(HTTP {error_status})"
|
|
||||||
else:
|
|
||||||
error_message = "API 请求失败"
|
|
||||||
# 若命中阿里云配额错误,立即写入状态并切换到官方 API
|
|
||||||
try:
|
|
||||||
from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
|
|
||||||
disabled_until, reason = compute_disabled_until(error_message)
|
|
||||||
if disabled_until and reason:
|
|
||||||
set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason)
|
|
||||||
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
|
|
||||||
web_terminal.apply_model_profile(profile)
|
|
||||||
except Exception as exc:
|
|
||||||
debug_log(f"处理阿里云配额回退失败: {exc}")
|
|
||||||
can_retry = (
|
|
||||||
api_attempt < max_api_retries
|
|
||||||
and not full_response
|
|
||||||
and not tool_calls
|
|
||||||
and not current_thinking
|
|
||||||
and not pending_append
|
|
||||||
and not pending_modify
|
|
||||||
)
|
|
||||||
sender('error', {
|
|
||||||
'message': error_message,
|
|
||||||
'status_code': error_status,
|
|
||||||
'error_type': error_type,
|
|
||||||
'error_code': error_code,
|
|
||||||
'error_text': error_text,
|
|
||||||
'request_dump': request_dump,
|
|
||||||
'base_url': error_base_url,
|
|
||||||
'model_id': error_model_id,
|
|
||||||
'retry': bool(can_retry),
|
|
||||||
'retry_in': retry_delay_seconds if can_retry else None,
|
|
||||||
'attempt': api_attempt + 1,
|
|
||||||
'max_attempts': max_api_retries + 1
|
|
||||||
})
|
|
||||||
if can_retry:
|
|
||||||
try:
|
|
||||||
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
|
|
||||||
web_terminal.apply_model_profile(profile)
|
|
||||||
except Exception as exc:
|
|
||||||
debug_log(f"重试前更新模型配置失败: {exc}")
|
|
||||||
cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag)
|
|
||||||
if cancelled:
|
|
||||||
return
|
|
||||||
continue
|
|
||||||
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
|
|
||||||
return
|
|
||||||
break
|
|
||||||
|
|
||||||
# 流结束后的处理
|
# 流结束后的处理
|
||||||
debug_log(f"\n流结束统计:")
|
debug_log(f"\n流结束统计:")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user