diff --git a/server/chat_flow_stream_loop.py b/server/chat_flow_stream_loop.py new file mode 100644 index 0000000..9e41c5b --- /dev/null +++ b/server/chat_flow_stream_loop.py @@ -0,0 +1,661 @@ +from __future__ import annotations + +import asyncio +import json +import time +import re +from typing import Any, Dict, Optional + +from config.model_profiles import get_model_profile + +from .utils_common import debug_log, brief_log, log_backend_chunk +from .chat_flow_runner_helpers import extract_intent_from_partial +from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify +from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools +from .state import get_stop_flag, clear_stop_flag + + +async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, pending_append, append_probe_buffer: str, pending_modify, modify_probe_buffer: str, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, append_result: Dict[str, Any], modify_result: Dict[str, Any], last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]: + api_error = None + for api_attempt in range(max_api_retries + 1): + api_error = None + if api_attempt > 0: + full_response = "" + tool_calls = [] + current_thinking = "" + detected_tools = {} + last_usage_payload = None + in_thinking = False + thinking_started = False + thinking_ended = False + text_started = False + text_has_content = False + text_streaming = False + text_chunk_index = 0 + last_text_chunk_time = None + chunk_count = 0 + reasoning_chunks = 0 + content_chunks = 0 + tool_chunks = 0 + append_result = {"handled": False} + modify_result = {"handled": False} + last_finish_reason = None + + append_break_triggered = False + modify_break_triggered = False + + # 收集流式响应 + async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): + chunk_count += 1 + + # 检查停止标志 + client_stop_info = get_stop_flag(client_sid, username) + if client_stop_info: + stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info + if stop_requested: + debug_log(f"检测到停止请求,中断流处理") + if pending_append: + append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) + if pending_modify: + modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) + cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + clear_stop_flag(client_sid, username) + return { + "stopped": True, + "full_response": full_response, + "tool_calls": tool_calls, + "current_thinking": current_thinking, + "detected_tools": detected_tools, + "last_usage_payload": last_usage_payload, + "in_thinking": in_thinking, + "thinking_started": thinking_started, + "thinking_ended": thinking_ended, + "text_started": text_started, + "text_has_content": text_has_content, + "text_streaming": text_streaming, + "text_chunk_index": text_chunk_index, + "last_text_chunk_time": last_text_chunk_time, + "chunk_count": chunk_count, + "reasoning_chunks": reasoning_chunks, + "content_chunks": content_chunks, + "tool_chunks": tool_chunks, + "append_result": append_result, + "modify_result": modify_result, + "last_finish_reason": last_finish_reason, + "pending_append": pending_append, + "append_probe_buffer": append_probe_buffer, + "pending_modify": pending_modify, + "modify_probe_buffer": modify_probe_buffer, + "accumulated_response": accumulated_response, + } + + if isinstance(chunk, dict) and chunk.get("error"): + api_error = chunk.get("error") + break + + # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) + usage_info = chunk.get("usage") + if usage_info: + last_usage_payload = usage_info + + if "choices" not in chunk: + debug_log(f"Chunk {chunk_count}: 无choices字段") + continue + if not chunk.get("choices"): + debug_log(f"Chunk {chunk_count}: choices为空列表") + continue + choice = chunk["choices"][0] + if not usage_info and isinstance(choice, dict) and choice.get("usage"): + # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) + last_usage_payload = choice.get("usage") + delta = choice.get("delta", {}) + finish_reason = choice.get("finish_reason") + if finish_reason: + last_finish_reason = finish_reason + + # 处理思考内容(兼容 reasoning_content / reasoning_details) + reasoning_content = "" + if "reasoning_content" in delta: + reasoning_content = delta.get("reasoning_content") or "" + elif "reasoning_details" in delta: + details = delta.get("reasoning_details") + if isinstance(details, list): + parts = [] + for item in details: + if isinstance(item, dict): + text = item.get("text") + if text: + parts.append(text) + if parts: + reasoning_content = "".join(parts) + if reasoning_content: + reasoning_chunks += 1 + debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") + + if not thinking_started: + in_thinking = True + thinking_started = True + sender('thinking_start', {}) + await asyncio.sleep(0.05) + + current_thinking += reasoning_content + sender('thinking_chunk', {'content': reasoning_content}) + + # 处理正常内容 + if "content" in delta: + content = delta["content"] + if content: + content_chunks += 1 + debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") + + if in_thinking and not thinking_ended: + in_thinking = False + thinking_ended = True + sender('thinking_end', {'full_content': current_thinking}) + await asyncio.sleep(0.1) + + + expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) + expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) + + if pending_modify: + if not pending_modify.get("start_seen"): + probe_buffer = pending_modify.get("probe_buffer", "") + content + if len(probe_buffer) > 10000: + probe_buffer = probe_buffer[-10000:] + marker = pending_modify.get("start_marker") + marker_index = probe_buffer.find(marker) + if marker_index == -1: + pending_modify["probe_buffer"] = probe_buffer + continue + after_marker = marker_index + len(marker) + remainder = probe_buffer[after_marker:] + pending_modify["buffer"] = remainder + pending_modify["raw_buffer"] = marker + remainder + pending_modify["start_seen"] = True + pending_modify["detected_blocks"] = set() + pending_modify["probe_buffer"] = "" + if pending_modify.get("display_id"): + sender('update_action', { + 'id': pending_modify["display_id"], + 'status': 'running', + 'preparing_id': pending_modify.get("tool_call_id"), + 'message': f"正在修改 {pending_modify['path']}..." + }) + else: + pending_modify["buffer"] += content + pending_modify["raw_buffer"] += content + + if pending_modify.get("start_seen"): + block_text = pending_modify["buffer"] + for match in re.finditer(r"\[replace:(\d+)\]", block_text): + try: + block_index = int(match.group(1)) + except ValueError: + continue + detected_blocks = pending_modify.setdefault("detected_blocks", set()) + if block_index not in detected_blocks: + detected_blocks.add(block_index) + if pending_modify.get("display_id"): + sender('update_action', { + 'id': pending_modify["display_id"], + 'status': 'running', + 'preparing_id': pending_modify.get("tool_call_id"), + 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." + }) + + if pending_modify.get("start_seen"): + end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) + if end_pos != -1: + pending_modify["end_index"] = end_pos + modify_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并应用修改") + break + continue + elif expecting_modify: + modify_probe_buffer += content + if len(modify_probe_buffer) > 10000: + modify_probe_buffer = modify_probe_buffer[-10000:] + + marker_match = re.search(r"<<>>", modify_probe_buffer) + if marker_match: + detected_raw_path = marker_match.group(1) + detected_path = detected_raw_path.strip() + marker_full = marker_match.group(0) + after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) + remainder = modify_probe_buffer[after_marker_index:] + modify_probe_buffer = "" + + if not detected_path: + debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") + continue + + pending_modify = { + "path": detected_path, + "tool_call_id": None, + "buffer": remainder, + "raw_buffer": marker_full + remainder, + "start_marker": marker_full, + "end_marker": "<<>>", + "start_seen": True, + "end_index": None, + "display_id": None, + "detected_blocks": set() + } + if hasattr(web_terminal, "pending_modify_request"): + web_terminal.pending_modify_request = {"path": detected_path} + debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") + + end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) + if end_pos != -1: + pending_modify["end_index"] = end_pos + modify_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并应用修改") + break + continue + + if pending_append: + pending_append["buffer"] += content + + if pending_append.get("content_start") is None: + marker_index = pending_append["buffer"].find(pending_append["start_marker"]) + if marker_index != -1: + pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) + debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") + + if pending_append.get("content_start") is not None: + end_index = pending_append["buffer"].find( + pending_append["end_marker"], + pending_append["content_start"] + ) + if end_index != -1: + pending_append["end_index"] = end_index + append_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并写入文件") + break + + # 继续累积追加内容 + continue + elif expecting_append: + append_probe_buffer += content + # 限制缓冲区大小防止过长 + if len(append_probe_buffer) > 10000: + append_probe_buffer = append_probe_buffer[-10000:] + + marker_match = re.search(r"<<>>", append_probe_buffer) + if marker_match: + detected_raw_path = marker_match.group(1) + detected_path = detected_raw_path.strip() + if not detected_path: + append_probe_buffer = append_probe_buffer[marker_match.end():] + continue + marker_full = marker_match.group(0) + after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) + remainder = append_probe_buffer[after_marker_index:] + append_probe_buffer = "" + pending_append = { + "path": detected_path, + "tool_call_id": None, + "buffer": remainder, + "start_marker": marker_full, + "end_marker": "<<>>", + "content_start": 0, + "end_index": None, + "display_id": None + } + if hasattr(web_terminal, "pending_append_request"): + web_terminal.pending_append_request = {"path": detected_path} + debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") + # 检查是否立即包含结束标记 + if pending_append["buffer"]: + end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) + if end_index != -1: + pending_append["end_index"] = end_index + append_break_triggered = True + debug_log("检测到<<>>,即将终止流式输出并写入文件") + break + continue + + if not text_started: + text_started = True + text_streaming = True + sender('text_start', {}) + brief_log("模型输出了内容") + await asyncio.sleep(0.05) + + if not pending_append: + full_response += content + accumulated_response += content + text_has_content = True + emit_time = time.time() + elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time + last_text_chunk_time = emit_time + text_chunk_index += 1 + log_backend_chunk( + conversation_id, + current_iteration, + text_chunk_index, + elapsed, + len(content), + content[:32] + ) + sender('text_chunk', { + 'content': content, + 'index': text_chunk_index, + 'elapsed': elapsed + }) + + # 收集工具调用 - 实时发送准备状态 + if "tool_calls" in delta: + tool_chunks += 1 + for tc in delta["tool_calls"]: + found = False + for existing in tool_calls: + if existing.get("index") == tc.get("index"): + if "function" in tc and "arguments" in tc["function"]: + arg_chunk = tc["function"]["arguments"] + existing_fn = existing.get("function", {}) + existing_args = existing_fn.get("arguments", "") + existing_fn["arguments"] = (existing_args or "") + arg_chunk + existing["function"] = existing_fn + + combined_args = existing_fn.get("arguments", "") + tool_id = existing.get("id") or tc.get("id") + tool_name = ( + existing_fn.get("name") + or tc.get("function", {}).get("name", "") + ) + intent_value = extract_intent_from_partial(combined_args) + if ( + intent_value + and tool_id + and detected_tool_intent.get(tool_id) != intent_value + ): + detected_tool_intent[tool_id] = intent_value + brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") + sender('tool_intent', { + 'id': tool_id, + 'name': tool_name, + 'intent': intent_value, + 'conversation_id': conversation_id + }) + debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") + await asyncio.sleep(0.01) + found = True + break + + if not found and tc.get("id"): + tool_id = tc["id"] + tool_name = tc.get("function", {}).get("name", "") + arguments_str = tc.get("function", {}).get("arguments", "") or "" + + # 新工具检测到,立即发送准备事件 + if tool_id not in detected_tools and tool_name: + detected_tools[tool_id] = tool_name + + # 尝试提前提取 intent + intent_value = None + if arguments_str: + intent_value = extract_intent_from_partial(arguments_str) + if intent_value: + detected_tool_intent[tool_id] = intent_value + brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") + + # 立即发送工具准备中事件 + brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") + sender('tool_preparing', { + 'id': tool_id, + 'name': tool_name, + 'message': f'准备调用 {tool_name}...', + 'intent': intent_value, + 'conversation_id': conversation_id + }) + debug_log(f" 发送工具准备事件: {tool_name}") + await asyncio.sleep(0.1) + + tool_calls.append({ + "id": tool_id, + "index": tc.get("index"), + "type": "function", + "function": { + "name": tool_name, + "arguments": arguments_str + } + }) + # 尝试从增量参数中抽取 intent,并单独推送 + if tool_id and arguments_str: + intent_value = extract_intent_from_partial(arguments_str) + if intent_value and detected_tool_intent.get(tool_id) != intent_value: + detected_tool_intent[tool_id] = intent_value + sender('tool_intent', { + 'id': tool_id, + 'name': tool_name, + 'intent': intent_value, + 'conversation_id': conversation_id + }) + debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") + await asyncio.sleep(0.01) + debug_log(f" 新工具: {tool_name}") + + # 检查是否被停止 + client_stop_info = get_stop_flag(client_sid, username) + if client_stop_info: + stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info + if stop_requested: + debug_log("任务在流处理完成后检测到停止状态") + sender('task_stopped', { + 'message': '命令执行被用户取消', + 'reason': 'user_stop' + }) + cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) + clear_stop_flag(client_sid, username) + return { + "stopped": True, + "full_response": full_response, + "tool_calls": tool_calls, + "current_thinking": current_thinking, + "detected_tools": detected_tools, + "last_usage_payload": last_usage_payload, + "in_thinking": in_thinking, + "thinking_started": thinking_started, + "thinking_ended": thinking_ended, + "text_started": text_started, + "text_has_content": text_has_content, + "text_streaming": text_streaming, + "text_chunk_index": text_chunk_index, + "last_text_chunk_time": last_text_chunk_time, + "chunk_count": chunk_count, + "reasoning_chunks": reasoning_chunks, + "content_chunks": content_chunks, + "tool_chunks": tool_chunks, + "append_result": append_result, + "modify_result": modify_result, + "last_finish_reason": last_finish_reason, + "pending_append": pending_append, + "append_probe_buffer": append_probe_buffer, + "pending_modify": pending_modify, + "modify_probe_buffer": modify_probe_buffer, + "accumulated_response": accumulated_response, + } + + # === API响应完成后只计算输出token === + if last_usage_payload: + try: + web_terminal.context_manager.apply_usage_statistics(last_usage_payload) + debug_log( + f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " + f"completion={last_usage_payload.get('completion_tokens', 0)}, " + f"total={last_usage_payload.get('total_tokens', 0)}" + ) + except Exception as e: + debug_log(f"Usage统计更新失败: {e}") + else: + debug_log("未获取到usage字段,跳过token统计更新") + + + if api_error: + try: + debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}") + except Exception: + debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}") + error_message = "" + error_status = None + error_type = None + error_code = None + error_text = "" + request_dump = None + error_base_url = None + error_model_id = None + if isinstance(api_error, dict): + error_status = api_error.get("status_code") + error_type = api_error.get("error_type") or api_error.get("type") + error_code = api_error.get("error_code") or api_error.get("code") + error_text = api_error.get("error_text") or "" + error_message = ( + api_error.get("error_message") + or api_error.get("message") + or error_text + or "" + ) + request_dump = api_error.get("request_dump") + error_base_url = api_error.get("base_url") + error_model_id = api_error.get("model_id") + elif isinstance(api_error, str): + error_message = api_error + if not error_message: + if error_status: + error_message = f"API 请求失败(HTTP {error_status})" + else: + error_message = "API 请求失败" + # 若命中阿里云配额错误,立即写入状态并切换到官方 API + try: + from utils.aliyun_fallback import compute_disabled_until, set_disabled_until + disabled_until, reason = compute_disabled_until(error_message) + if disabled_until and reason: + set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason) + profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") + web_terminal.apply_model_profile(profile) + except Exception as exc: + debug_log(f"处理阿里云配额回退失败: {exc}") + can_retry = ( + api_attempt < max_api_retries + and not full_response + and not tool_calls + and not current_thinking + and not pending_append + and not pending_modify + ) + sender('error', { + 'message': error_message, + 'status_code': error_status, + 'error_type': error_type, + 'error_code': error_code, + 'error_text': error_text, + 'request_dump': request_dump, + 'base_url': error_base_url, + 'model_id': error_model_id, + 'retry': bool(can_retry), + 'retry_in': retry_delay_seconds if can_retry else None, + 'attempt': api_attempt + 1, + 'max_attempts': max_api_retries + 1 + }) + if can_retry: + try: + profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") + web_terminal.apply_model_profile(profile) + except Exception as exc: + debug_log(f"重试前更新模型配置失败: {exc}") + cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag) + if cancelled: + return { + "stopped": True, + "full_response": full_response, + "tool_calls": tool_calls, + "current_thinking": current_thinking, + "detected_tools": detected_tools, + "last_usage_payload": last_usage_payload, + "in_thinking": in_thinking, + "thinking_started": thinking_started, + "thinking_ended": thinking_ended, + "text_started": text_started, + "text_has_content": text_has_content, + "text_streaming": text_streaming, + "text_chunk_index": text_chunk_index, + "last_text_chunk_time": last_text_chunk_time, + "chunk_count": chunk_count, + "reasoning_chunks": reasoning_chunks, + "content_chunks": content_chunks, + "tool_chunks": tool_chunks, + "append_result": append_result, + "modify_result": modify_result, + "last_finish_reason": last_finish_reason, + "pending_append": pending_append, + "append_probe_buffer": append_probe_buffer, + "pending_modify": pending_modify, + "modify_probe_buffer": modify_probe_buffer, + "accumulated_response": accumulated_response, + } + continue + cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) + return { + "stopped": True, + "full_response": full_response, + "tool_calls": tool_calls, + "current_thinking": current_thinking, + "detected_tools": detected_tools, + "last_usage_payload": last_usage_payload, + "in_thinking": in_thinking, + "thinking_started": thinking_started, + "thinking_ended": thinking_ended, + "text_started": text_started, + "text_has_content": text_has_content, + "text_streaming": text_streaming, + "text_chunk_index": text_chunk_index, + "last_text_chunk_time": last_text_chunk_time, + "chunk_count": chunk_count, + "reasoning_chunks": reasoning_chunks, + "content_chunks": content_chunks, + "tool_chunks": tool_chunks, + "append_result": append_result, + "modify_result": modify_result, + "last_finish_reason": last_finish_reason, + "pending_append": pending_append, + "append_probe_buffer": append_probe_buffer, + "pending_modify": pending_modify, + "modify_probe_buffer": modify_probe_buffer, + "accumulated_response": accumulated_response, + } + break + + return { + "stopped": False, + "full_response": full_response, + "tool_calls": tool_calls, + "current_thinking": current_thinking, + "detected_tools": detected_tools, + "last_usage_payload": last_usage_payload, + "in_thinking": in_thinking, + "thinking_started": thinking_started, + "thinking_ended": thinking_ended, + "text_started": text_started, + "text_has_content": text_has_content, + "text_streaming": text_streaming, + "text_chunk_index": text_chunk_index, + "last_text_chunk_time": last_text_chunk_time, + "chunk_count": chunk_count, + "reasoning_chunks": reasoning_chunks, + "content_chunks": content_chunks, + "tool_chunks": tool_chunks, + "append_result": append_result, + "modify_result": modify_result, + "last_finish_reason": last_finish_reason, + "pending_append": pending_append, + "append_probe_buffer": append_probe_buffer, + "pending_modify": pending_modify, + "modify_probe_buffer": modify_probe_buffer, + "accumulated_response": accumulated_response, + } diff --git a/server/chat_flow_task_main.py b/server/chat_flow_task_main.py index c57a3d4..8a7f94b 100644 --- a/server/chat_flow_task_main.py +++ b/server/chat_flow_task_main.py @@ -124,8 +124,9 @@ from .chat_flow_runtime import ( ) from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify -from .chat_flow_task_support import process_sub_agent_updates, wait_retry_delay, cancel_pending_tools +from .chat_flow_task_support import process_sub_agent_updates from .chat_flow_tool_loop import execute_tool_calls +from .chat_flow_stream_loop import run_streaming_attempts async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspace, message, images, sender, client_sid, username: str, videos=None): """处理任务并发送消息 - 集成token统计版本""" @@ -295,9 +296,7 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac reasoning_chunks = 0 content_chunks = 0 tool_chunks = 0 - append_break_triggered = False append_result = {"handled": False} - modify_break_triggered = False modify_result = {"handled": False} last_finish_reason = None @@ -328,511 +327,72 @@ async def handle_task_with_sender(terminal: WebTerminal, workspace: UserWorkspac tool_call_limit_label = MAX_TOTAL_TOOL_CALLS if MAX_TOTAL_TOOL_CALLS is not None else "∞" print(f"[API] 第{current_iteration}次调用 (总工具调用: {total_tool_calls}/{tool_call_limit_label})") - api_error = None - for api_attempt in range(max_api_retries + 1): - api_error = None - if api_attempt > 0: - full_response = "" - tool_calls = [] - current_thinking = "" - detected_tools = {} - last_usage_payload = None - in_thinking = False - thinking_started = False - thinking_ended = False - text_started = False - text_has_content = False - text_streaming = False - text_chunk_index = 0 - last_text_chunk_time = None - chunk_count = 0 - reasoning_chunks = 0 - content_chunks = 0 - tool_chunks = 0 - append_break_triggered = False - append_result = {"handled": False} - modify_break_triggered = False - modify_result = {"handled": False} - last_finish_reason = None + stream_result = await run_streaming_attempts( + web_terminal=web_terminal, + messages=messages, + tools=tools, + sender=sender, + client_sid=client_sid, + username=username, + conversation_id=conversation_id, + current_iteration=current_iteration, + max_api_retries=max_api_retries, + retry_delay_seconds=retry_delay_seconds, + pending_append=pending_append, + append_probe_buffer=append_probe_buffer, + pending_modify=pending_modify, + modify_probe_buffer=modify_probe_buffer, + detected_tool_intent=detected_tool_intent, + full_response=full_response, + tool_calls=tool_calls, + current_thinking=current_thinking, + detected_tools=detected_tools, + last_usage_payload=last_usage_payload, + in_thinking=in_thinking, + thinking_started=thinking_started, + thinking_ended=thinking_ended, + text_started=text_started, + text_has_content=text_has_content, + text_streaming=text_streaming, + text_chunk_index=text_chunk_index, + last_text_chunk_time=last_text_chunk_time, + chunk_count=chunk_count, + reasoning_chunks=reasoning_chunks, + content_chunks=content_chunks, + tool_chunks=tool_chunks, + append_result=append_result, + modify_result=modify_result, + last_finish_reason=last_finish_reason, + accumulated_response=accumulated_response, + ) + if stream_result.get("stopped"): + return - # 收集流式响应 - async for chunk in web_terminal.api_client.chat(messages, tools, stream=True): - chunk_count += 1 - - # 检查停止标志 - client_stop_info = get_stop_flag(client_sid, username) - if client_stop_info: - stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info - if stop_requested: - debug_log(f"检测到停止请求,中断流处理") - if pending_append: - append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) - if pending_modify: - modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log) - cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) - sender('task_stopped', { - 'message': '命令执行被用户取消', - 'reason': 'user_stop' - }) - clear_stop_flag(client_sid, username) - return - - if isinstance(chunk, dict) and chunk.get("error"): - api_error = chunk.get("error") - break - - # 先尝试记录 usage(有些平台会在最后一个 chunk 里携带 usage 但 choices 为空) - usage_info = chunk.get("usage") - if usage_info: - last_usage_payload = usage_info - - if "choices" not in chunk: - debug_log(f"Chunk {chunk_count}: 无choices字段") - continue - if not chunk.get("choices"): - debug_log(f"Chunk {chunk_count}: choices为空列表") - continue - choice = chunk["choices"][0] - if not usage_info and isinstance(choice, dict) and choice.get("usage"): - # 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回) - last_usage_payload = choice.get("usage") - delta = choice.get("delta", {}) - finish_reason = choice.get("finish_reason") - if finish_reason: - last_finish_reason = finish_reason - - # 处理思考内容(兼容 reasoning_content / reasoning_details) - reasoning_content = "" - if "reasoning_content" in delta: - reasoning_content = delta.get("reasoning_content") or "" - elif "reasoning_details" in delta: - details = delta.get("reasoning_details") - if isinstance(details, list): - parts = [] - for item in details: - if isinstance(item, dict): - text = item.get("text") - if text: - parts.append(text) - if parts: - reasoning_content = "".join(parts) - if reasoning_content: - reasoning_chunks += 1 - debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符") - - if not thinking_started: - in_thinking = True - thinking_started = True - sender('thinking_start', {}) - await asyncio.sleep(0.05) - - current_thinking += reasoning_content - sender('thinking_chunk', {'content': reasoning_content}) - - # 处理正常内容 - if "content" in delta: - content = delta["content"] - if content: - content_chunks += 1 - debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}") - - if in_thinking and not thinking_ended: - in_thinking = False - thinking_ended = True - sender('thinking_end', {'full_content': current_thinking}) - await asyncio.sleep(0.1) - - - expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None)) - expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None)) - - if pending_modify: - if not pending_modify.get("start_seen"): - probe_buffer = pending_modify.get("probe_buffer", "") + content - if len(probe_buffer) > 10000: - probe_buffer = probe_buffer[-10000:] - marker = pending_modify.get("start_marker") - marker_index = probe_buffer.find(marker) - if marker_index == -1: - pending_modify["probe_buffer"] = probe_buffer - continue - after_marker = marker_index + len(marker) - remainder = probe_buffer[after_marker:] - pending_modify["buffer"] = remainder - pending_modify["raw_buffer"] = marker + remainder - pending_modify["start_seen"] = True - pending_modify["detected_blocks"] = set() - pending_modify["probe_buffer"] = "" - if pending_modify.get("display_id"): - sender('update_action', { - 'id': pending_modify["display_id"], - 'status': 'running', - 'preparing_id': pending_modify.get("tool_call_id"), - 'message': f"正在修改 {pending_modify['path']}..." - }) - else: - pending_modify["buffer"] += content - pending_modify["raw_buffer"] += content - - if pending_modify.get("start_seen"): - block_text = pending_modify["buffer"] - for match in re.finditer(r"\[replace:(\d+)\]", block_text): - try: - block_index = int(match.group(1)) - except ValueError: - continue - detected_blocks = pending_modify.setdefault("detected_blocks", set()) - if block_index not in detected_blocks: - detected_blocks.add(block_index) - if pending_modify.get("display_id"): - sender('update_action', { - 'id': pending_modify["display_id"], - 'status': 'running', - 'preparing_id': pending_modify.get("tool_call_id"), - 'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..." - }) - - if pending_modify.get("start_seen"): - end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) - if end_pos != -1: - pending_modify["end_index"] = end_pos - modify_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并应用修改") - break - continue - elif expecting_modify: - modify_probe_buffer += content - if len(modify_probe_buffer) > 10000: - modify_probe_buffer = modify_probe_buffer[-10000:] - - marker_match = re.search(r"<<>>", modify_probe_buffer) - if marker_match: - detected_raw_path = marker_match.group(1) - detected_path = detected_raw_path.strip() - marker_full = marker_match.group(0) - after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full) - remainder = modify_probe_buffer[after_marker_index:] - modify_probe_buffer = "" - - if not detected_path: - debug_log("检测到 MODIFY 起始标记但路径为空,忽略。") - continue - - pending_modify = { - "path": detected_path, - "tool_call_id": None, - "buffer": remainder, - "raw_buffer": marker_full + remainder, - "start_marker": marker_full, - "end_marker": "<<>>", - "start_seen": True, - "end_index": None, - "display_id": None, - "detected_blocks": set() - } - if hasattr(web_terminal, "pending_modify_request"): - web_terminal.pending_modify_request = {"path": detected_path} - debug_log(f"直接检测到modify起始标记,构建修改缓冲: {detected_path}") - - end_pos = pending_modify["buffer"].find(pending_modify["end_marker"]) - if end_pos != -1: - pending_modify["end_index"] = end_pos - modify_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并应用修改") - break - continue - - if pending_append: - pending_append["buffer"] += content - - if pending_append.get("content_start") is None: - marker_index = pending_append["buffer"].find(pending_append["start_marker"]) - if marker_index != -1: - pending_append["content_start"] = marker_index + len(pending_append["start_marker"]) - debug_log(f"检测到追加起始标识: {pending_append['start_marker']}") - - if pending_append.get("content_start") is not None: - end_index = pending_append["buffer"].find( - pending_append["end_marker"], - pending_append["content_start"] - ) - if end_index != -1: - pending_append["end_index"] = end_index - append_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并写入文件") - break - - # 继续累积追加内容 - continue - elif expecting_append: - append_probe_buffer += content - # 限制缓冲区大小防止过长 - if len(append_probe_buffer) > 10000: - append_probe_buffer = append_probe_buffer[-10000:] - - marker_match = re.search(r"<<>>", append_probe_buffer) - if marker_match: - detected_raw_path = marker_match.group(1) - detected_path = detected_raw_path.strip() - if not detected_path: - append_probe_buffer = append_probe_buffer[marker_match.end():] - continue - marker_full = marker_match.group(0) - after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full) - remainder = append_probe_buffer[after_marker_index:] - append_probe_buffer = "" - pending_append = { - "path": detected_path, - "tool_call_id": None, - "buffer": remainder, - "start_marker": marker_full, - "end_marker": "<<>>", - "content_start": 0, - "end_index": None, - "display_id": None - } - if hasattr(web_terminal, "pending_append_request"): - web_terminal.pending_append_request = {"path": detected_path} - debug_log(f"直接检测到append起始标记,构建追加缓冲: {detected_path}") - # 检查是否立即包含结束标记 - if pending_append["buffer"]: - end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"]) - if end_index != -1: - pending_append["end_index"] = end_index - append_break_triggered = True - debug_log("检测到<<>>,即将终止流式输出并写入文件") - break - continue - - if not text_started: - text_started = True - text_streaming = True - sender('text_start', {}) - brief_log("模型输出了内容") - await asyncio.sleep(0.05) - - if not pending_append: - full_response += content - accumulated_response += content - text_has_content = True - emit_time = time.time() - elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time - last_text_chunk_time = emit_time - text_chunk_index += 1 - log_backend_chunk( - conversation_id, - current_iteration, - text_chunk_index, - elapsed, - len(content), - content[:32] - ) - sender('text_chunk', { - 'content': content, - 'index': text_chunk_index, - 'elapsed': elapsed - }) - - # 收集工具调用 - 实时发送准备状态 - if "tool_calls" in delta: - tool_chunks += 1 - for tc in delta["tool_calls"]: - found = False - for existing in tool_calls: - if existing.get("index") == tc.get("index"): - if "function" in tc and "arguments" in tc["function"]: - arg_chunk = tc["function"]["arguments"] - existing_fn = existing.get("function", {}) - existing_args = existing_fn.get("arguments", "") - existing_fn["arguments"] = (existing_args or "") + arg_chunk - existing["function"] = existing_fn - - combined_args = existing_fn.get("arguments", "") - tool_id = existing.get("id") or tc.get("id") - tool_name = ( - existing_fn.get("name") - or tc.get("function", {}).get("name", "") - ) - intent_value = extract_intent_from_partial(combined_args) - if ( - intent_value - and tool_id - and detected_tool_intent.get(tool_id) != intent_value - ): - detected_tool_intent[tool_id] = intent_value - brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}") - sender('tool_intent', { - 'id': tool_id, - 'name': tool_name, - 'intent': intent_value, - 'conversation_id': conversation_id - }) - debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") - await asyncio.sleep(0.01) - found = True - break - - if not found and tc.get("id"): - tool_id = tc["id"] - tool_name = tc.get("function", {}).get("name", "") - arguments_str = tc.get("function", {}).get("arguments", "") or "" - - # 新工具检测到,立即发送准备事件 - if tool_id not in detected_tools and tool_name: - detected_tools[tool_id] = tool_name - - # 尝试提前提取 intent - intent_value = None - if arguments_str: - intent_value = extract_intent_from_partial(arguments_str) - if intent_value: - detected_tool_intent[tool_id] = intent_value - brief_log(f"[intent] 预提取 {tool_name}: {intent_value}") - - # 立即发送工具准备中事件 - brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}") - sender('tool_preparing', { - 'id': tool_id, - 'name': tool_name, - 'message': f'准备调用 {tool_name}...', - 'intent': intent_value, - 'conversation_id': conversation_id - }) - debug_log(f" 发送工具准备事件: {tool_name}") - await asyncio.sleep(0.1) - - tool_calls.append({ - "id": tool_id, - "index": tc.get("index"), - "type": "function", - "function": { - "name": tool_name, - "arguments": arguments_str - } - }) - # 尝试从增量参数中抽取 intent,并单独推送 - if tool_id and arguments_str: - intent_value = extract_intent_from_partial(arguments_str) - if intent_value and detected_tool_intent.get(tool_id) != intent_value: - detected_tool_intent[tool_id] = intent_value - sender('tool_intent', { - 'id': tool_id, - 'name': tool_name, - 'intent': intent_value, - 'conversation_id': conversation_id - }) - debug_log(f" 发送工具意图: {tool_name} -> {intent_value}") - await asyncio.sleep(0.01) - debug_log(f" 新工具: {tool_name}") - - # 检查是否被停止 - client_stop_info = get_stop_flag(client_sid, username) - if client_stop_info: - stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info - if stop_requested: - debug_log("任务在流处理完成后检测到停止状态") - sender('task_stopped', { - 'message': '命令执行被用户取消', - 'reason': 'user_stop' - }) - cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) - clear_stop_flag(client_sid, username) - return - - # === API响应完成后只计算输出token === - if last_usage_payload: - try: - web_terminal.context_manager.apply_usage_statistics(last_usage_payload) - debug_log( - f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, " - f"completion={last_usage_payload.get('completion_tokens', 0)}, " - f"total={last_usage_payload.get('total_tokens', 0)}" - ) - except Exception as e: - debug_log(f"Usage统计更新失败: {e}") - else: - debug_log("未获取到usage字段,跳过token统计更新") - - - if api_error: - try: - debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}") - except Exception: - debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}") - error_message = "" - error_status = None - error_type = None - error_code = None - error_text = "" - request_dump = None - error_base_url = None - error_model_id = None - if isinstance(api_error, dict): - error_status = api_error.get("status_code") - error_type = api_error.get("error_type") or api_error.get("type") - error_code = api_error.get("error_code") or api_error.get("code") - error_text = api_error.get("error_text") or "" - error_message = ( - api_error.get("error_message") - or api_error.get("message") - or error_text - or "" - ) - request_dump = api_error.get("request_dump") - error_base_url = api_error.get("base_url") - error_model_id = api_error.get("model_id") - elif isinstance(api_error, str): - error_message = api_error - if not error_message: - if error_status: - error_message = f"API 请求失败(HTTP {error_status})" - else: - error_message = "API 请求失败" - # 若命中阿里云配额错误,立即写入状态并切换到官方 API - try: - from utils.aliyun_fallback import compute_disabled_until, set_disabled_until - disabled_until, reason = compute_disabled_until(error_message) - if disabled_until and reason: - set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason) - profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") - web_terminal.apply_model_profile(profile) - except Exception as exc: - debug_log(f"处理阿里云配额回退失败: {exc}") - can_retry = ( - api_attempt < max_api_retries - and not full_response - and not tool_calls - and not current_thinking - and not pending_append - and not pending_modify - ) - sender('error', { - 'message': error_message, - 'status_code': error_status, - 'error_type': error_type, - 'error_code': error_code, - 'error_text': error_text, - 'request_dump': request_dump, - 'base_url': error_base_url, - 'model_id': error_model_id, - 'retry': bool(can_retry), - 'retry_in': retry_delay_seconds if can_retry else None, - 'attempt': api_attempt + 1, - 'max_attempts': max_api_retries + 1 - }) - if can_retry: - try: - profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5") - web_terminal.apply_model_profile(profile) - except Exception as exc: - debug_log(f"重试前更新模型配置失败: {exc}") - cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag) - if cancelled: - return - continue - cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages) - return - break + full_response = stream_result["full_response"] + tool_calls = stream_result["tool_calls"] + current_thinking = stream_result["current_thinking"] + detected_tools = stream_result["detected_tools"] + last_usage_payload = stream_result["last_usage_payload"] + in_thinking = stream_result["in_thinking"] + thinking_started = stream_result["thinking_started"] + thinking_ended = stream_result["thinking_ended"] + text_started = stream_result["text_started"] + text_has_content = stream_result["text_has_content"] + text_streaming = stream_result["text_streaming"] + text_chunk_index = stream_result["text_chunk_index"] + last_text_chunk_time = stream_result["last_text_chunk_time"] + chunk_count = stream_result["chunk_count"] + reasoning_chunks = stream_result["reasoning_chunks"] + content_chunks = stream_result["content_chunks"] + tool_chunks = stream_result["tool_chunks"] + append_result = stream_result["append_result"] + modify_result = stream_result["modify_result"] + last_finish_reason = stream_result["last_finish_reason"] + pending_append = stream_result["pending_append"] + append_probe_buffer = stream_result["append_probe_buffer"] + pending_modify = stream_result["pending_modify"] + modify_probe_buffer = stream_result["modify_probe_buffer"] + accumulated_response = stream_result["accumulated_response"] # 流结束后的处理 debug_log(f"\n流结束统计:")