agent-Specialization/server/chat_flow_stream_loop.py

456 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import time
from typing import Any, Dict, Optional
from config.model_profiles import get_model_profile
from .utils_common import debug_log, brief_log, log_backend_chunk
from .chat_flow_runner_helpers import extract_intent_from_partial
from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools
from .state import get_stop_flag, clear_stop_flag
async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]:
api_error = None
for api_attempt in range(max_api_retries + 1):
api_error = None
if api_attempt > 0:
full_response = ""
tool_calls = []
current_thinking = ""
detected_tools = {}
last_usage_payload = None
in_thinking = False
thinking_started = False
thinking_ended = False
text_started = False
text_has_content = False
text_streaming = False
text_chunk_index = 0
last_text_chunk_time = None
chunk_count = 0
reasoning_chunks = 0
content_chunks = 0
tool_chunks = 0
last_finish_reason = None
# 收集流式响应
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
chunk_count += 1
# 检查停止标志
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log(f"检测到停止请求,中断流处理")
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
clear_stop_flag(client_sid, username)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"last_finish_reason": last_finish_reason,
"accumulated_response": accumulated_response,
}
if isinstance(chunk, dict) and chunk.get("error"):
api_error = chunk.get("error")
break
# 先尝试记录 usage有些平台会在最后一个 chunk 里携带 usage 但 choices 为空)
usage_info = chunk.get("usage")
if usage_info:
last_usage_payload = usage_info
if "choices" not in chunk:
debug_log(f"Chunk {chunk_count}: 无choices字段")
continue
if not chunk.get("choices"):
debug_log(f"Chunk {chunk_count}: choices为空列表")
continue
choice = chunk["choices"][0]
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
last_usage_payload = choice.get("usage")
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
if finish_reason:
last_finish_reason = finish_reason
# 处理思考内容(兼容 reasoning_content / reasoning_details
reasoning_content = ""
if "reasoning_content" in delta:
reasoning_content = delta.get("reasoning_content") or ""
elif "reasoning_details" in delta:
details = delta.get("reasoning_details")
if isinstance(details, list):
parts = []
for item in details:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(text)
if parts:
reasoning_content = "".join(parts)
if reasoning_content:
reasoning_chunks += 1
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
if not thinking_started:
in_thinking = True
thinking_started = True
sender('thinking_start', {})
await asyncio.sleep(0.05)
current_thinking += reasoning_content
sender('thinking_chunk', {'content': reasoning_content})
# 处理正常内容
if "content" in delta:
content = delta["content"]
if content:
content_chunks += 1
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
if in_thinking and not thinking_ended:
in_thinking = False
thinking_ended = True
sender('thinking_end', {'full_content': current_thinking})
await asyncio.sleep(0.1)
if not text_started:
text_started = True
text_streaming = True
sender('text_start', {})
brief_log("模型输出了内容")
await asyncio.sleep(0.05)
full_response += content
accumulated_response += content
text_has_content = True
emit_time = time.time()
elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time
last_text_chunk_time = emit_time
text_chunk_index += 1
log_backend_chunk(
conversation_id,
current_iteration,
text_chunk_index,
elapsed,
len(content),
content[:32]
)
sender('text_chunk', {
'content': content,
'index': text_chunk_index,
'elapsed': elapsed
})
# 收集工具调用 - 实时发送准备状态
if "tool_calls" in delta:
tool_chunks += 1
for tc in delta["tool_calls"]:
found = False
for existing in tool_calls:
if existing.get("index") == tc.get("index"):
if "function" in tc and "arguments" in tc["function"]:
arg_chunk = tc["function"]["arguments"]
existing_fn = existing.get("function", {})
existing_args = existing_fn.get("arguments", "")
existing_fn["arguments"] = (existing_args or "") + (arg_chunk or "")
existing["function"] = existing_fn
combined_args = existing_fn.get("arguments", "")
tool_id = existing.get("id") or tc.get("id")
tool_name = (
existing_fn.get("name")
or tc.get("function", {}).get("name", "")
)
intent_value = extract_intent_from_partial(combined_args)
if (
intent_value
and tool_id
and detected_tool_intent.get(tool_id) != intent_value
):
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
sender('tool_intent', {
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
found = True
break
if not found and tc.get("id"):
tool_id = tc["id"]
tool_name = tc.get("function", {}).get("name", "")
arguments_str = tc.get("function", {}).get("arguments", "") or ""
# 新工具检测到,立即发送准备事件
if tool_id not in detected_tools and tool_name:
detected_tools[tool_id] = tool_name
# 尝试提前提取 intent
intent_value = None
if arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value:
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
# 立即发送工具准备中事件
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}")
sender('tool_preparing', {
'id': tool_id,
'name': tool_name,
'message': f'准备调用 {tool_name}...',
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具准备事件: {tool_name}")
await asyncio.sleep(0.1)
tool_calls.append({
"id": tool_id,
"index": tc.get("index"),
"type": "function",
"function": {
"name": tool_name,
"arguments": arguments_str
}
})
# 尝试从增量参数中抽取 intent并单独推送
if tool_id and arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value and detected_tool_intent.get(tool_id) != intent_value:
detected_tool_intent[tool_id] = intent_value
sender('tool_intent', {
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
debug_log(f" 新工具: {tool_name}")
# 检查是否被停止
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("任务在流处理完成后检测到停止状态")
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
clear_stop_flag(client_sid, username)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"last_finish_reason": last_finish_reason,
"accumulated_response": accumulated_response,
}
# === API响应完成后只计算输出token ===
if last_usage_payload:
try:
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
debug_log(
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
f"total={last_usage_payload.get('total_tokens', 0)}"
)
except Exception as e:
debug_log(f"Usage统计更新失败: {e}")
else:
debug_log("未获取到usage字段跳过token统计更新")
if api_error:
try:
debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}")
except Exception:
debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}")
error_message = ""
error_status = None
error_type = None
error_code = None
error_text = ""
request_dump = None
error_base_url = None
error_model_id = None
if isinstance(api_error, dict):
error_status = api_error.get("status_code")
error_type = api_error.get("error_type") or api_error.get("type")
error_code = api_error.get("error_code") or api_error.get("code")
error_text = api_error.get("error_text") or ""
error_message = (
api_error.get("error_message")
or api_error.get("message")
or error_text
or ""
)
request_dump = api_error.get("request_dump")
error_base_url = api_error.get("base_url")
error_model_id = api_error.get("model_id")
elif isinstance(api_error, str):
error_message = api_error
if not error_message:
if error_status:
error_message = f"API 请求失败HTTP {error_status}"
else:
error_message = "API 请求失败"
# 若命中阿里云配额错误,立即写入状态并切换到官方 API
try:
from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
disabled_until, reason = compute_disabled_until(error_message)
if disabled_until and reason:
set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason)
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"处理阿里云配额回退失败: {exc}")
can_retry = (
api_attempt < max_api_retries
and not full_response
and not tool_calls
and not current_thinking
)
sender('error', {
'message': error_message,
'status_code': error_status,
'error_type': error_type,
'error_code': error_code,
'error_text': error_text,
'request_dump': request_dump,
'base_url': error_base_url,
'model_id': error_model_id,
'retry': bool(can_retry),
'retry_in': retry_delay_seconds if can_retry else None,
'attempt': api_attempt + 1,
'max_attempts': max_api_retries + 1
})
if can_retry:
try:
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"重试前更新模型配置失败: {exc}")
cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag)
if cancelled:
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"last_finish_reason": last_finish_reason,
"accumulated_response": accumulated_response,
}
continue
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"last_finish_reason": last_finish_reason,
"accumulated_response": accumulated_response,
}
break
return {
"stopped": False,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"last_finish_reason": last_finish_reason,
"accumulated_response": accumulated_response,
}