from __future__ import annotations import asyncio import json import time import re from typing import Any, Dict, Optional from config.model_profiles import get_model_profile from .utils_common import debug_log, brief_log, log_backend_chunk from .chat_flow_runner_helpers import extract_intent_from_partial from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools from .state import get_stop_flag, clear_stop_flag async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, pending_append, append_probe_buffer: str, pending_modify, modify_probe_buffer: str, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, append_result: Dict[str, Any], modify_result: Dict[str, Any], last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]: api_error = None for api_attempt in range(max_api_retries + 1): api_error = None if api_attempt > 0: full_response = "" tool_calls = [] current_thinking = "" detected_tools = {} last_usage_payload = None in_thinking = False thinking_started = False thinking_ended = False text_started = False text_has_content = False text_streaming = False text_chunk_index = 0 last_text_chunk_time = None chunk_count = 0 reasoning_chunks = 0 content_chunks = 0 tool_chunks = 0 append_result = {"handled": False} modify_result = {"handled": False} last_finish_reason = None append_break_triggered = False modify_break_triggered = False # 收集流式响应 async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): chunk_count += 1 # 检查停止标志 client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log(f"检测到停止请求,中断流处理") if pending_append: append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) if pending_modify: modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) sender('task_stopped', { 'message': '命令执行被用户取消', 'reason': 'user_stop' }) clear_stop_flag(client_sid, username) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "append_result": append_result, "modify_result": modify_result, "last_finish_reason": last_finish_reason, "pending_append": pending_append, "append_probe_buffer": append_probe_buffer, "pending_modify": pending_modify, "modify_probe_buffer": modify_probe_buffer, "accumulated_response": accumulated_response, } if isinstance(chunk, dict) and chunk.get("error"): api_error = chunk.get("error") break # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) usage_info = chunk.get("usage") if usage_info: last_usage_payload = usage_info if "choices" not in chunk: debug_log(f"Chunk {chunk_count}: 无choices字段") continue if not chunk.get("choices"): debug_log(f"Chunk {chunk_count}: choices为空列表") continue choice = chunk["choices"][0] if not usage_info and isinstance(choice, dict) and choice.get("usage"): # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) last_usage_payload = choice.get("usage") delta = choice.get("delta", {}) finish_reason = choice.get("finish_reason") if finish_reason: last_finish_reason = finish_reason # 处理思考内容(兼容 reasoning_content / reasoning_details) reasoning_content = "" if "reasoning_content" in delta: reasoning_content = delta.get("reasoning_content") or "" elif "reasoning_details" in delta: details = delta.get("reasoning_details") if isinstance(details, list): parts = [] for item in details: if isinstance(item, dict): text = item.get("text") if text: parts.append(text) if parts: reasoning_content = "".join(parts) if reasoning_content: reasoning_chunks += 1 debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") if not thinking_started: in_thinking = True thinking_started = True sender('thinking_start', {}) await asyncio.sleep(0.05) current_thinking += reasoning_content sender('thinking_chunk', {'content': reasoning_content}) # 处理正常内容 if "content" in delta: content = delta["content"] if content: content_chunks += 1 debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") if in_thinking and not thinking_ended: in_thinking = False thinking_ended = True sender('thinking_end', {'full_content': current_thinking}) await asyncio.sleep(0.1) expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) if pending_modify: if not pending_modify.get("start_seen"): probe_buffer = pending_modify.get("probe_buffer", "") + content if len(probe_buffer) > 10000: probe_buffer = probe_buffer[-10000:] marker = pending_modify.get("start_marker") marker_index = probe_buffer.find(marker) if marker_index == -1: pending_modify["probe_buffer"] = probe_buffer continue after_marker = marker_index + len(marker) remainder = probe_buffer[after_marker:] pending_modify["buffer"] = remainder pending_modify["raw_buffer"] = marker + remainder pending_modify["start_seen"] = True pending_modify["detected_blocks"] = set() pending_modify["probe_buffer"] = "" if pending_modify.get("display_id"): sender('update_action', { 'id': pending_modify["display_id"], 'status': 'running', 'preparing_id': pending_modify.get("tool_call_id"), 'message': f"正在修改 {pending_modify['path']}..." }) else: pending_modify["buffer"] += content pending_modify["raw_buffer"] += content if pending_modify.get("start_seen"): block_text = pending_modify["buffer"] for match in re.finditer(r"\[replace:(\d+)\]", block_text): try: block_index = int(match.group(1)) except ValueError: continue detected_blocks = pending_modify.setdefault("detected_blocks", set()) if block_index not in detected_blocks: detected_blocks.add(block_index) if pending_modify.get("display_id"): sender('update_action', { 'id': pending_modify["display_id"], 'status': 'running', 'preparing_id': pending_modify.get("tool_call_id"), 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." }) if pending_modify.get("start_seen"): end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) if end_pos != -1: pending_modify["end_index"] = end_pos modify_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并应用修改") break continue elif expecting_modify: modify_probe_buffer += content if len(modify_probe_buffer) > 10000: modify_probe_buffer = modify_probe_buffer[-10000:] marker_match = re.search(r"<<>>", modify_probe_buffer) if marker_match: detected_raw_path = marker_match.group(1) detected_path = detected_raw_path.strip() marker_full = marker_match.group(0) after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) remainder = modify_probe_buffer[after_marker_index:] modify_probe_buffer = "" if not detected_path: debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") continue pending_modify = { "path": detected_path, "tool_call_id": None, "buffer": remainder, "raw_buffer": marker_full + remainder, "start_marker": marker_full, "end_marker": "<<>>", "start_seen": True, "end_index": None, "display_id": None, "detected_blocks": set() } if hasattr(web_terminal, "pending_modify_request"): web_terminal.pending_modify_request = {"path": detected_path} debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) if end_pos != -1: pending_modify["end_index"] = end_pos modify_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并应用修改") break continue if pending_append: pending_append["buffer"] += content if pending_append.get("content_start") is None: marker_index = pending_append["buffer"].find(pending_append["start_marker"]) if marker_index != -1: pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") if pending_append.get("content_start") is not None: end_index = pending_append["buffer"].find( pending_append["end_marker"], pending_append["content_start"] ) if end_index != -1: pending_append["end_index"] = end_index append_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并写入文件") break # 继续累积追加内容 continue elif expecting_append: append_probe_buffer += content # 限制缓冲区大小防止过长 if len(append_probe_buffer) > 10000: append_probe_buffer = append_probe_buffer[-10000:] marker_match = re.search(r"<<>>", append_probe_buffer) if marker_match: detected_raw_path = marker_match.group(1) detected_path = detected_raw_path.strip() if not detected_path: append_probe_buffer = append_probe_buffer[marker_match.end():] continue marker_full = marker_match.group(0) after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) remainder = append_probe_buffer[after_marker_index:] append_probe_buffer = "" pending_append = { "path": detected_path, "tool_call_id": None, "buffer": remainder, "start_marker": marker_full, "end_marker": "<<>>", "content_start": 0, "end_index": None, "display_id": None } if hasattr(web_terminal, "pending_append_request"): web_terminal.pending_append_request = {"path": detected_path} debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") # 检查是否立即包含结束标记 if pending_append["buffer"]: end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) if end_index != -1: pending_append["end_index"] = end_index append_break_triggered = True debug_log("检测到<<>>,即将终止流式输出并写入文件") break continue if not text_started: text_started = True text_streaming = True sender('text_start', {}) brief_log("模型输出了内容") await asyncio.sleep(0.05) if not pending_append: full_response += content accumulated_response += content text_has_content = True emit_time = time.time() elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time last_text_chunk_time = emit_time text_chunk_index += 1 log_backend_chunk( conversation_id, current_iteration, text_chunk_index, elapsed, len(content), content[:32] ) sender('text_chunk', { 'content': content, 'index': text_chunk_index, 'elapsed': elapsed }) # 收集工具调用 - 实时发送准备状态 if "tool_calls" in delta: tool_chunks += 1 for tc in delta["tool_calls"]: found = False for existing in tool_calls: if existing.get("index") == tc.get("index"): if "function" in tc and "arguments" in tc["function"]: arg_chunk = tc["function"]["arguments"] existing_fn = existing.get("function", {}) existing_args = existing_fn.get("arguments", "") existing_fn["arguments"] = (existing_args or "") + arg_chunk existing["function"] = existing_fn combined_args = existing_fn.get("arguments", "") tool_id = existing.get("id") or tc.get("id") tool_name = ( existing_fn.get("name") or tc.get("function", {}).get("name", "") ) intent_value = extract_intent_from_partial(combined_args) if ( intent_value and tool_id and detected_tool_intent.get(tool_id) != intent_value ): detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) found = True break if not found and tc.get("id"): tool_id = tc["id"] tool_name = tc.get("function", {}).get("name", "") arguments_str = tc.get("function", {}).get("arguments", "") or "" # 新工具检测到,立即发送准备事件 if tool_id not in detected_tools and tool_name: detected_tools[tool_id] = tool_name # 尝试提前提取 intent intent_value = None if arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value: detected_tool_intent[tool_id] = intent_value brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") # 立即发送工具准备中事件 brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") sender('tool_preparing', { 'id': tool_id, 'name': tool_name, 'message': f'准备调用 {tool_name}...', 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具准备事件: {tool_name}") await asyncio.sleep(0.1) tool_calls.append({ "id": tool_id, "index": tc.get("index"), "type": "function", "function": { "name": tool_name, "arguments": arguments_str } }) # 尝试从增量参数中抽取 intent,并单独推送 if tool_id and arguments_str: intent_value = extract_intent_from_partial(arguments_str) if intent_value and detected_tool_intent.get(tool_id) != intent_value: detected_tool_intent[tool_id] = intent_value sender('tool_intent', { 'id': tool_id, 'name': tool_name, 'intent': intent_value, 'conversation_id': conversation_id }) debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") await asyncio.sleep(0.01) debug_log(f" 新工具: {tool_name}") # 检查是否被停止 client_stop_info = get_stop_flag(client_sid, username) if client_stop_info: stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info if stop_requested: debug_log("任务在流处理完成后检测到停止状态") sender('task_stopped', { 'message': '命令执行被用户取消', 'reason': 'user_stop' }) cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) clear_stop_flag(client_sid, username) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "append_result": append_result, "modify_result": modify_result, "last_finish_reason": last_finish_reason, "pending_append": pending_append, "append_probe_buffer": append_probe_buffer, "pending_modify": pending_modify, "modify_probe_buffer": modify_probe_buffer, "accumulated_response": accumulated_response, } # === API响应完成后只计算输出token === if last_usage_payload: try: web_terminal.context_manager.apply_usage_statistics(last_usage_payload) debug_log( f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " f"completion={last_usage_payload.get('completion_tokens', 0)}, " f"total={last_usage_payload.get('total_tokens', 0)}" ) except Exception as e: debug_log(f"Usage统计更新失败: {e}") else: debug_log("未获取到usage字段,跳过token统计更新") if api_error: try: debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}") except Exception: debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}") error_message = "" error_status = None error_type = None error_code = None error_text = "" request_dump = None error_base_url = None error_model_id = None if isinstance(api_error, dict): error_status = api_error.get("status_code") error_type = api_error.get("error_type") or api_error.get("type") error_code = api_error.get("error_code") or api_error.get("code") error_text = api_error.get("error_text") or "" error_message = ( api_error.get("error_message") or api_error.get("message") or error_text or "" ) request_dump = api_error.get("request_dump") error_base_url = api_error.get("base_url") error_model_id = api_error.get("model_id") elif isinstance(api_error, str): error_message = api_error if not error_message: if error_status: error_message = f"API 请求失败(HTTP {error_status})" else: error_message = "API 请求失败" # 若命中阿里云配额错误,立即写入状态并切换到官方 API try: from utils.aliyun_fallback import compute_disabled_until, set_disabled_until disabled_until, reason = compute_disabled_until(error_message) if disabled_until and reason: set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason) profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") web_terminal.apply_model_profile(profile) except Exception as exc: debug_log(f"处理阿里云配额回退失败: {exc}") can_retry = ( api_attempt < max_api_retries and not full_response and not tool_calls and not current_thinking and not pending_append and not pending_modify ) sender('error', { 'message': error_message, 'status_code': error_status, 'error_type': error_type, 'error_code': error_code, 'error_text': error_text, 'request_dump': request_dump, 'base_url': error_base_url, 'model_id': error_model_id, 'retry': bool(can_retry), 'retry_in': retry_delay_seconds if can_retry else None, 'attempt': api_attempt + 1, 'max_attempts': max_api_retries + 1 }) if can_retry: try: profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") web_terminal.apply_model_profile(profile) except Exception as exc: debug_log(f"重试前更新模型配置失败: {exc}") cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag) if cancelled: return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "append_result": append_result, "modify_result": modify_result, "last_finish_reason": last_finish_reason, "pending_append": pending_append, "append_probe_buffer": append_probe_buffer, "pending_modify": pending_modify, "modify_probe_buffer": modify_probe_buffer, "accumulated_response": accumulated_response, } continue cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) return { "stopped": True, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "append_result": append_result, "modify_result": modify_result, "last_finish_reason": last_finish_reason, "pending_append": pending_append, "append_probe_buffer": append_probe_buffer, "pending_modify": pending_modify, "modify_probe_buffer": modify_probe_buffer, "accumulated_response": accumulated_response, } break return { "stopped": False, "full_response": full_response, "tool_calls": tool_calls, "current_thinking": current_thinking, "detected_tools": detected_tools, "last_usage_payload": last_usage_payload, "in_thinking": in_thinking, "thinking_started": thinking_started, "thinking_ended": thinking_ended, "text_started": text_started, "text_has_content": text_has_content, "text_streaming": text_streaming, "text_chunk_index": text_chunk_index, "last_text_chunk_time": last_text_chunk_time, "chunk_count": chunk_count, "reasoning_chunks": reasoning_chunks, "content_chunks": content_chunks, "tool_chunks": tool_chunks, "append_result": append_result, "modify_result": modify_result, "last_finish_reason": last_finish_reason, "pending_append": pending_append, "append_probe_buffer": append_probe_buffer, "pending_modify": pending_modify, "modify_probe_buffer": modify_probe_buffer, "accumulated_response": accumulated_response, }