from __future__ import annotations import asyncio import json import time from typing import Any, Dict, Optional from config.model_profiles import get_model_profile from .utils_common import debug_log, brief_log, log_backend_chunk from .chat_flow_runner_helpers import extract_intent_from_partial from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools from .state import get_stop_flag, clear_stop_flag async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]: api_error = None for api_attempt in range(max_api_retries + 1): api_error = None if api_attempt > 0: full_response = "" tool_calls = [] current_thinking = "" detected_tools = {} last_usage_payload = None in_thinking = False thinking_started = False thinking_ended = False text_started = False text_has_content = False text_streaming = False text_chunk_index = 0 last_text_chunk_time = None chunk_count = 0 reasoning_chunks = 0 content_chunks = 0 tool_chunks = 0 last_finish_reason = None # 收集流式响应 async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): chunk_count += 1 # 检查停止标志 client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log(f"检测到停止请求,中断流处理") cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) sender('task_stopped', { 'message': '命令执行被用户取消', 'reason': 'user_stop' }) clear_stop_flag(client_sid, username) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "last_finish_reason": last_finish_reason, "accumulated_response": accumulated_response, } if isinstance(chunk, dict) and chunk.get("error"): api_error = chunk.get("error") break # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) usage_info = chunk.get("usage") if usage_info: last_usage_payload = usage_info if "choices" not in chunk: debug_log(f"Chunk {chunk_count}: 无choices字段") continue if not chunk.get("choices"): debug_log(f"Chunk {chunk_count}: choices为空列表") continue choice = chunk["choices"][0] if not usage_info and isinstance(choice, dict) and choice.get("usage"): # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) last_usage_payload = choice.get("usage") delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") if finish_reason: last_finish_reason = finish_reason # 处理思考内容(兼容 reasoning_content / reasoning_details) reasoning_content = "" if "reasoning_content" in delta: reasoning_content = delta.get("reasoning_content") or "" elif "reasoning_details" in delta: details = delta.get("reasoning_details") if isinstance(details, list): parts = [] for item in details: if isinstance(item, dict): text = item.get("text") if text: parts.append(text) if parts: reasoning_content = "".join(parts) if reasoning_content: reasoning_chunks += 1 debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") if not thinking_started: in_thinking = True thinking_started = True sender('thinking_start', {}) await asyncio.sleep(0.05) current_thinking += reasoning_content sender('thinking_chunk', {'content': reasoning_content}) # 处理正常内容 if "content" in delta: content = delta["content"] if content: content_chunks += 1 debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") if in_thinking and not thinking_ended: in_thinking = False thinking_ended = True sender('thinking_end', {'full_content': current_thinking}) await asyncio.sleep(0.1) if not text_started: text_started = True text_streaming = True sender('text_start', {}) brief_log("模型输出了内容") await asyncio.sleep(0.05) full_response += content accumulated_response += content text_has_content = True emit_time = time.time() elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time last_text_chunk_time = emit_time text_chunk_index += 1 log_backend_chunk( conversation_id, current_iteration, text_chunk_index, elapsed, len(content), content[:32] ) sender('text_chunk', { 'content': content, 'index': text_chunk_index, 'elapsed': elapsed }) # 收集工具调用 - 实时发送准备状态 if "tool_calls" in delta: tool_chunks += 1 for tc in delta["tool_calls"]: found = False for existing in tool_calls: if existing.get("index") == tc.get("index"): if "function" in tc and "arguments" in tc["function"]: arg_chunk = tc["function"]["arguments"] existing_fn = existing.get("function", {}) existing_args = existing_fn.get("arguments", "") existing_fn["arguments"] = (existing_args or "") + (arg_chunk or "") existing["function"] = existing_fn combined_args = existing_fn.get("arguments", "") tool_id = existing.get("id") or tc.get("id") tool_name = ( existing_fn.get("name") or tc.get("function", {}).get("name", "") ) intent_value = extract_intent_from_partial(combined_args) if ( intent_value and tool_id and detected_tool_intent.get(tool_id) != intent_value ): detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) found = True break if not found and tc.get("id"): tool_id = tc["id"] tool_name = tc.get("function", {}).get("name", "") arguments_str = tc.get("function", {}).get("arguments", "") or "" # 新工具检测到,立即发送准备事件 if tool_id not in detected_tools and tool_name: detected_tools[tool_id] = tool_name # 尝试提前提取 intent intent_value = None if arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value: detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") # 立即发送工具准备中事件 brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") sender('tool_preparing', { 'id': tool_id, 'name': tool_name, 'message': f'准备调用 {tool_name}...', 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具准备事件: {tool_name}") await asyncio.sleep(0.1) tool_calls.append({ "id": tool_id, "index": tc.get("index"), "type": "function", "function": { "name": tool_name, "arguments": arguments_str } }) # 尝试从增量参数中抽取 intent,并单独推送 if tool_id and arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value and detected_tool_intent.get(tool_id) != intent_value: detected_tool_intent[tool_id] = intent_value sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) debug_log(f" 新工具: {tool_name}") # 检查是否被停止 client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log("任务在流处理完成后检测到停止状态") sender('task_stopped', { 'message': '命令执行被用户取消', 'reason': 'user_stop' }) cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) clear_stop_flag(client_sid, username) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "last_finish_reason": last_finish_reason, "accumulated_response": accumulated_response, } # === API响应完成后只计算输出token === if last_usage_payload: try: web_terminal.context_manager.apply_usage_statistics(last_usage_payload) debug_log( f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " f"completion={last_usage_payload.get('completion_tokens', 0)}, " f"total={last_usage_payload.get('total_tokens', 0)}" ) except Exception as e: debug_log(f"Usage统计更新失败: {e}") else: debug_log("未获取到usage字段,跳过token统计更新") if api_error: try: debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}") except Exception: debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}") error_message = "" error_status = None error_type = None error_code = None error_text = "" request_dump = None error_base_url = None error_model_id = None if isinstance(api_error, dict): error_status = api_error.get("status_code") error_type = api_error.get("error_type") or api_error.get("type") error_code = api_error.get("error_code") or api_error.get("code") error_text = api_error.get("error_text") or "" error_message = ( api_error.get("error_message") or api_error.get("message") or error_text or "" ) request_dump = api_error.get("request_dump") error_base_url = api_error.get("base_url") error_model_id = api_error.get("model_id") elif isinstance(api_error, str): error_message = api_error if not error_message: if error_status: error_message = f"API 请求失败(HTTP {error_status})" else: error_message = "API 请求失败" # 若命中阿里云配额错误,立即写入状态并切换到官方 API try: from utils.aliyun_fallback import compute_disabled_until, set_disabled_until disabled_until, reason = compute_disabled_until(error_message) if disabled_until and reason: set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason) profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") web_terminal.apply_model_profile(profile) except Exception as exc: debug_log(f"处理阿里云配额回退失败: {exc}") can_retry = ( api_attempt < max_api_retries and not full_response and not tool_calls and not current_thinking ) sender('error', { 'message': error_message, 'status_code': error_status, 'error_type': error_type, 'error_code': error_code, 'error_text': error_text, 'request_dump': request_dump, 'base_url': error_base_url, 'model_id': error_model_id, 'retry': bool(can_retry), 'retry_in': retry_delay_seconds if can_retry else None, 'attempt': api_attempt + 1, 'max_attempts': max_api_retries + 1 }) if can_retry: try: profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") web_terminal.apply_model_profile(profile) except Exception as exc: debug_log(f"重试前更新模型配置失败: {exc}") cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag) if cancelled: return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "last_finish_reason": last_finish_reason, "accumulated_response": accumulated_response, } continue cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "last_finish_reason": last_finish_reason, "accumulated_response": accumulated_response, } break return { "stopped": False, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "last_finish_reason": last_finish_reason, "accumulated_response": accumulated_response, }