agent-Specialization/server/chat_flow_stream_loop.py
JOJO 5ab3acef9c feat: 添加 terminal-guide 和 sub-agent-guide skills
- 新增 terminal-guide skill: 持久化终端使用指南
- 新增 sub-agent-guide skill: 子智能体使用指南
- 优化终端工具定义和执行逻辑
- 更新系统提示词以引用新 skills
- 添加 utils/__init__.py 模块初始化文件

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-16 21:17:02 +08:00

662 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import time
import re
from typing import Any, Dict, Optional
from config.model_profiles import get_model_profile
from .utils_common import debug_log, brief_log, log_backend_chunk
from .chat_flow_runner_helpers import extract_intent_from_partial
from .chat_flow_pending_writes import finalize_pending_append, finalize_pending_modify
from .chat_flow_task_support import wait_retry_delay, cancel_pending_tools
from .state import get_stop_flag, clear_stop_flag
async def run_streaming_attempts(*, web_terminal, messages, tools, sender, client_sid: str, username: str, conversation_id: Optional[str], current_iteration: int, max_api_retries: int, retry_delay_seconds: int, pending_append, append_probe_buffer: str, pending_modify, modify_probe_buffer: str, detected_tool_intent: Dict[str, str], full_response: str, tool_calls: list, current_thinking: str, detected_tools: Dict[str, str], last_usage_payload, in_thinking: bool, thinking_started: bool, thinking_ended: bool, text_started: bool, text_has_content: bool, text_streaming: bool, text_chunk_index: int, last_text_chunk_time, chunk_count: int, reasoning_chunks: int, content_chunks: int, tool_chunks: int, append_result: Dict[str, Any], modify_result: Dict[str, Any], last_finish_reason: Optional[str], accumulated_response: str) -> Dict[str, Any]:
api_error = None
for api_attempt in range(max_api_retries + 1):
api_error = None
if api_attempt > 0:
full_response = ""
tool_calls = []
current_thinking = ""
detected_tools = {}
last_usage_payload = None
in_thinking = False
thinking_started = False
thinking_ended = False
text_started = False
text_has_content = False
text_streaming = False
text_chunk_index = 0
last_text_chunk_time = None
chunk_count = 0
reasoning_chunks = 0
content_chunks = 0
tool_chunks = 0
append_result = {"handled": False}
modify_result = {"handled": False}
last_finish_reason = None
append_break_triggered = False
modify_break_triggered = False
# 收集流式响应
async for chunk in web_terminal.api_client.chat(messages, tools, stream=True):
chunk_count += 1
# 检查停止标志
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log(f"检测到停止请求,中断流处理")
if pending_append:
append_result, pending_append, append_probe_buffer = await finalize_pending_append(pending_append=pending_append, append_probe_buffer=append_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
if pending_modify:
modify_result, pending_modify, modify_probe_buffer = await finalize_pending_modify(pending_modify=pending_modify, modify_probe_buffer=modify_probe_buffer, response_text=full_response, stream_completed=False, finish_reason="user_stop", web_terminal=web_terminal, sender=sender, debug_log=debug_log)
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
clear_stop_flag(client_sid, username)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"append_result": append_result,
"modify_result": modify_result,
"last_finish_reason": last_finish_reason,
"pending_append": pending_append,
"append_probe_buffer": append_probe_buffer,
"pending_modify": pending_modify,
"modify_probe_buffer": modify_probe_buffer,
"accumulated_response": accumulated_response,
}
if isinstance(chunk, dict) and chunk.get("error"):
api_error = chunk.get("error")
break
# 先尝试记录 usage有些平台会在最后一个 chunk 里携带 usage 但 choices 为空)
usage_info = chunk.get("usage")
if usage_info:
last_usage_payload = usage_info
if "choices" not in chunk:
debug_log(f"Chunk {chunk_count}: 无choices字段")
continue
if not chunk.get("choices"):
debug_log(f"Chunk {chunk_count}: choices为空列表")
continue
choice = chunk["choices"][0]
if not usage_info and isinstance(choice, dict) and choice.get("usage"):
# 兼容部分供应商将 usage 放在 choice 内的格式(例如部分 Kimi/Qwen 返回)
last_usage_payload = choice.get("usage")
delta = choice.get("delta", {})
finish_reason = choice.get("finish_reason")
if finish_reason:
last_finish_reason = finish_reason
# 处理思考内容(兼容 reasoning_content / reasoning_details
reasoning_content = ""
if "reasoning_content" in delta:
reasoning_content = delta.get("reasoning_content") or ""
elif "reasoning_details" in delta:
details = delta.get("reasoning_details")
if isinstance(details, list):
parts = []
for item in details:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(text)
if parts:
reasoning_content = "".join(parts)
if reasoning_content:
reasoning_chunks += 1
debug_log(f" 思考内容 #{reasoning_chunks}: {len(reasoning_content)} 字符")
if not thinking_started:
in_thinking = True
thinking_started = True
sender('thinking_start', {})
await asyncio.sleep(0.05)
current_thinking += reasoning_content
sender('thinking_chunk', {'content': reasoning_content})
# 处理正常内容
if "content" in delta:
content = delta["content"]
if content:
content_chunks += 1
debug_log(f" 正式内容 #{content_chunks}: {repr(content[:100] if content else 'None')}")
if in_thinking and not thinking_ended:
in_thinking = False
thinking_ended = True
sender('thinking_end', {'full_content': current_thinking})
await asyncio.sleep(0.1)
expecting_modify = bool(pending_modify) or bool(getattr(web_terminal, "pending_modify_request", None))
expecting_append = bool(pending_append) or bool(getattr(web_terminal, "pending_append_request", None))
if pending_modify:
if not pending_modify.get("start_seen"):
probe_buffer = pending_modify.get("probe_buffer", "") + content
if len(probe_buffer) > 10000:
probe_buffer = probe_buffer[-10000:]
marker = pending_modify.get("start_marker")
marker_index = probe_buffer.find(marker)
if marker_index == -1:
pending_modify["probe_buffer"] = probe_buffer
continue
after_marker = marker_index + len(marker)
remainder = probe_buffer[after_marker:]
pending_modify["buffer"] = remainder
pending_modify["raw_buffer"] = marker + remainder
pending_modify["start_seen"] = True
pending_modify["detected_blocks"] = set()
pending_modify["probe_buffer"] = ""
if pending_modify.get("display_id"):
sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在修改 {pending_modify['path']}..."
})
else:
pending_modify["buffer"] += content
pending_modify["raw_buffer"] += content
if pending_modify.get("start_seen"):
block_text = pending_modify["buffer"]
for match in re.finditer(r"\[replace:(\d+)\]", block_text):
try:
block_index = int(match.group(1))
except ValueError:
continue
detected_blocks = pending_modify.setdefault("detected_blocks", set())
if block_index not in detected_blocks:
detected_blocks.add(block_index)
if pending_modify.get("display_id"):
sender('update_action', {
'id': pending_modify["display_id"],
'status': 'running',
'preparing_id': pending_modify.get("tool_call_id"),
'message': f"正在对 {pending_modify['path']} 进行第 {block_index} 处修改..."
})
if pending_modify.get("start_seen"):
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
if end_pos != -1:
pending_modify["end_index"] = end_pos
modify_break_triggered = True
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
break
continue
elif expecting_modify:
modify_probe_buffer += content
if len(modify_probe_buffer) > 10000:
modify_probe_buffer = modify_probe_buffer[-10000:]
marker_match = re.search(r"<<<MODIFY:\s*([\s\S]*?)>>>", modify_probe_buffer)
if marker_match:
detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
marker_full = marker_match.group(0)
after_marker_index = modify_probe_buffer.find(marker_full) + len(marker_full)
remainder = modify_probe_buffer[after_marker_index:]
modify_probe_buffer = ""
if not detected_path:
debug_log("检测到 MODIFY 起始标记但路径为空,忽略。")
continue
pending_modify = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"raw_buffer": marker_full + remainder,
"start_marker": marker_full,
"end_marker": "<<<END_MODIFY>>>",
"start_seen": True,
"end_index": None,
"display_id": None,
"detected_blocks": set()
}
if hasattr(web_terminal, "pending_modify_request"):
web_terminal.pending_modify_request = {"path": detected_path}
debug_log(f"直接检测到modify起始标记构建修改缓冲: {detected_path}")
end_pos = pending_modify["buffer"].find(pending_modify["end_marker"])
if end_pos != -1:
pending_modify["end_index"] = end_pos
modify_break_triggered = True
debug_log("检测到<<<END_MODIFY>>>,即将终止流式输出并应用修改")
break
continue
if pending_append:
pending_append["buffer"] += content
if pending_append.get("content_start") is None:
marker_index = pending_append["buffer"].find(pending_append["start_marker"])
if marker_index != -1:
pending_append["content_start"] = marker_index + len(pending_append["start_marker"])
debug_log(f"检测到追加起始标识: {pending_append['start_marker']}")
if pending_append.get("content_start") is not None:
end_index = pending_append["buffer"].find(
pending_append["end_marker"],
pending_append["content_start"]
)
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
# 继续累积追加内容
continue
elif expecting_append:
append_probe_buffer += content
# 限制缓冲区大小防止过长
if len(append_probe_buffer) > 10000:
append_probe_buffer = append_probe_buffer[-10000:]
marker_match = re.search(r"<<<APPEND:\s*([\s\S]*?)>>>", append_probe_buffer)
if marker_match:
detected_raw_path = marker_match.group(1)
detected_path = detected_raw_path.strip()
if not detected_path:
append_probe_buffer = append_probe_buffer[marker_match.end():]
continue
marker_full = marker_match.group(0)
after_marker_index = append_probe_buffer.find(marker_full) + len(marker_full)
remainder = append_probe_buffer[after_marker_index:]
append_probe_buffer = ""
pending_append = {
"path": detected_path,
"tool_call_id": None,
"buffer": remainder,
"start_marker": marker_full,
"end_marker": "<<<END_APPEND>>>",
"content_start": 0,
"end_index": None,
"display_id": None
}
if hasattr(web_terminal, "pending_append_request"):
web_terminal.pending_append_request = {"path": detected_path}
debug_log(f"直接检测到append起始标记构建追加缓冲: {detected_path}")
# 检查是否立即包含结束标记
if pending_append["buffer"]:
end_index = pending_append["buffer"].find(pending_append["end_marker"], pending_append["content_start"])
if end_index != -1:
pending_append["end_index"] = end_index
append_break_triggered = True
debug_log("检测到<<<END_APPEND>>>,即将终止流式输出并写入文件")
break
continue
if not text_started:
text_started = True
text_streaming = True
sender('text_start', {})
brief_log("模型输出了内容")
await asyncio.sleep(0.05)
if not pending_append:
full_response += content
accumulated_response += content
text_has_content = True
emit_time = time.time()
elapsed = 0.0 if last_text_chunk_time is None else emit_time - last_text_chunk_time
last_text_chunk_time = emit_time
text_chunk_index += 1
log_backend_chunk(
conversation_id,
current_iteration,
text_chunk_index,
elapsed,
len(content),
content[:32]
)
sender('text_chunk', {
'content': content,
'index': text_chunk_index,
'elapsed': elapsed
})
# 收集工具调用 - 实时发送准备状态
if "tool_calls" in delta:
tool_chunks += 1
for tc in delta["tool_calls"]:
found = False
for existing in tool_calls:
if existing.get("index") == tc.get("index"):
if "function" in tc and "arguments" in tc["function"]:
arg_chunk = tc["function"]["arguments"]
existing_fn = existing.get("function", {})
existing_args = existing_fn.get("arguments", "")
existing_fn["arguments"] = (existing_args or "") + (arg_chunk or "")
existing["function"] = existing_fn
combined_args = existing_fn.get("arguments", "")
tool_id = existing.get("id") or tc.get("id")
tool_name = (
existing_fn.get("name")
or tc.get("function", {}).get("name", "")
)
intent_value = extract_intent_from_partial(combined_args)
if (
intent_value
and tool_id
and detected_tool_intent.get(tool_id) != intent_value
):
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 增量提取 {tool_name}: {intent_value}")
sender('tool_intent', {
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
found = True
break
if not found and tc.get("id"):
tool_id = tc["id"]
tool_name = tc.get("function", {}).get("name", "")
arguments_str = tc.get("function", {}).get("arguments", "") or ""
# 新工具检测到,立即发送准备事件
if tool_id not in detected_tools and tool_name:
detected_tools[tool_id] = tool_name
# 尝试提前提取 intent
intent_value = None
if arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value:
detected_tool_intent[tool_id] = intent_value
brief_log(f"[intent] 预提取 {tool_name}: {intent_value}")
# 立即发送工具准备中事件
brief_log(f"[tool] 准备调用 {tool_name} (id={tool_id}) intent={intent_value or '-'}")
sender('tool_preparing', {
'id': tool_id,
'name': tool_name,
'message': f'准备调用 {tool_name}...',
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具准备事件: {tool_name}")
await asyncio.sleep(0.1)
tool_calls.append({
"id": tool_id,
"index": tc.get("index"),
"type": "function",
"function": {
"name": tool_name,
"arguments": arguments_str
}
})
# 尝试从增量参数中抽取 intent并单独推送
if tool_id and arguments_str:
intent_value = extract_intent_from_partial(arguments_str)
if intent_value and detected_tool_intent.get(tool_id) != intent_value:
detected_tool_intent[tool_id] = intent_value
sender('tool_intent', {
'id': tool_id,
'name': tool_name,
'intent': intent_value,
'conversation_id': conversation_id
})
debug_log(f" 发送工具意图: {tool_name} -> {intent_value}")
await asyncio.sleep(0.01)
debug_log(f" 新工具: {tool_name}")
# 检查是否被停止
client_stop_info = get_stop_flag(client_sid, username)
if client_stop_info:
stop_requested = client_stop_info.get('stop', False) if isinstance(client_stop_info, dict) else client_stop_info
if stop_requested:
debug_log("任务在流处理完成后检测到停止状态")
sender('task_stopped', {
'message': '命令执行被用户取消',
'reason': 'user_stop'
})
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
clear_stop_flag(client_sid, username)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"append_result": append_result,
"modify_result": modify_result,
"last_finish_reason": last_finish_reason,
"pending_append": pending_append,
"append_probe_buffer": append_probe_buffer,
"pending_modify": pending_modify,
"modify_probe_buffer": modify_probe_buffer,
"accumulated_response": accumulated_response,
}
# === API响应完成后只计算输出token ===
if last_usage_payload:
try:
web_terminal.context_manager.apply_usage_statistics(last_usage_payload)
debug_log(
f"Usage统计: prompt={last_usage_payload.get('prompt_tokens', 0)}, "
f"completion={last_usage_payload.get('completion_tokens', 0)}, "
f"total={last_usage_payload.get('total_tokens', 0)}"
)
except Exception as e:
debug_log(f"Usage统计更新失败: {e}")
else:
debug_log("未获取到usage字段跳过token统计更新")
if api_error:
try:
debug_log(f"API错误原始数据: {json.dumps(api_error, ensure_ascii=False)}")
except Exception:
debug_log(f"API错误原始数据(不可序列化): {repr(api_error)}")
error_message = ""
error_status = None
error_type = None
error_code = None
error_text = ""
request_dump = None
error_base_url = None
error_model_id = None
if isinstance(api_error, dict):
error_status = api_error.get("status_code")
error_type = api_error.get("error_type") or api_error.get("type")
error_code = api_error.get("error_code") or api_error.get("code")
error_text = api_error.get("error_text") or ""
error_message = (
api_error.get("error_message")
or api_error.get("message")
or error_text
or ""
)
request_dump = api_error.get("request_dump")
error_base_url = api_error.get("base_url")
error_model_id = api_error.get("model_id")
elif isinstance(api_error, str):
error_message = api_error
if not error_message:
if error_status:
error_message = f"API 请求失败HTTP {error_status}"
else:
error_message = "API 请求失败"
# 若命中阿里云配额错误,立即写入状态并切换到官方 API
try:
from utils.aliyun_fallback import compute_disabled_until, set_disabled_until
disabled_until, reason = compute_disabled_until(error_message)
if disabled_until and reason:
set_disabled_until(getattr(web_terminal, "model_key", None) or "kimi-k2.5", disabled_until, reason)
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"处理阿里云配额回退失败: {exc}")
can_retry = (
api_attempt < max_api_retries
and not full_response
and not tool_calls
and not current_thinking
and not pending_append
and not pending_modify
)
sender('error', {
'message': error_message,
'status_code': error_status,
'error_type': error_type,
'error_code': error_code,
'error_text': error_text,
'request_dump': request_dump,
'base_url': error_base_url,
'model_id': error_model_id,
'retry': bool(can_retry),
'retry_in': retry_delay_seconds if can_retry else None,
'attempt': api_attempt + 1,
'max_attempts': max_api_retries + 1
})
if can_retry:
try:
profile = get_model_profile(getattr(web_terminal, "model_key", None) or "kimi-k2.5")
web_terminal.apply_model_profile(profile)
except Exception as exc:
debug_log(f"重试前更新模型配置失败: {exc}")
cancelled = await wait_retry_delay(delay_seconds=retry_delay_seconds, client_sid=client_sid, username=username, sender=sender, get_stop_flag=get_stop_flag, clear_stop_flag=clear_stop_flag)
if cancelled:
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"append_result": append_result,
"modify_result": modify_result,
"last_finish_reason": last_finish_reason,
"pending_append": pending_append,
"append_probe_buffer": append_probe_buffer,
"pending_modify": pending_modify,
"modify_probe_buffer": modify_probe_buffer,
"accumulated_response": accumulated_response,
}
continue
cancel_pending_tools(tool_calls_list=tool_calls, sender=sender, messages=messages)
return {
"stopped": True,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"append_result": append_result,
"modify_result": modify_result,
"last_finish_reason": last_finish_reason,
"pending_append": pending_append,
"append_probe_buffer": append_probe_buffer,
"pending_modify": pending_modify,
"modify_probe_buffer": modify_probe_buffer,
"accumulated_response": accumulated_response,
}
break
return {
"stopped": False,
"full_response": full_response,
"tool_calls": tool_calls,
"current_thinking": current_thinking,
"detected_tools": detected_tools,
"last_usage_payload": last_usage_payload,
"in_thinking": in_thinking,
"thinking_started": thinking_started,
"thinking_ended": thinking_ended,
"text_started": text_started,
"text_has_content": text_has_content,
"text_streaming": text_streaming,
"text_chunk_index": text_chunk_index,
"last_text_chunk_time": last_text_chunk_time,
"chunk_count": chunk_count,
"reasoning_chunks": reasoning_chunks,
"content_chunks": content_chunks,
"tool_chunks": tool_chunks,
"append_result": append_result,
"modify_result": modify_result,
"last_finish_reason": last_finish_reason,
"pending_append": pending_append,
"append_probe_buffer": append_probe_buffer,
"pending_modify": pending_modify,
"modify_probe_buffer": modify_probe_buffer,
"accumulated_response": accumulated_response,
}