feat: improve streaming buffering and tooling

This commit is contained in:
JOJO 2025-11-14 22:17:55 +08:00
parent caaee38967
commit 2e42a970e7
5 changed files with 480 additions and 26 deletions

View File

@ -17,3 +17,9 @@ __all__ = [
"TAVILY_API_KEY",
"DEFAULT_RESPONSE_MAX_TOKENS",
]
'''
API_BASE_URL = "https://api.moonshot.cn/v1",
API_KEY = "sk-xW0xjfQM6Mp9ZCWMLlnHiRJcpEOIZPTkXcN0dQ15xpZSuw2y",
MODEL_ID = "kimi-k2-0905-preview"
'''

View File

@ -0,0 +1,66 @@
{
"id": "conv_tool_system_fixture",
"title": "工具结果以 system 回传示例",
"created_at": "2025-11-14T21:08:00",
"updated_at": "2025-11-14T21:08:00",
"messages": [
{
"role": "user",
"content": "我正在测试添加文件工具,请在这个文件中随便添加一些内容,不需要阅读\n/users/jojo/desktop/agents/正在修复中/agent_kimi_源码备份_标记有bug版本/project/test_add_file.txt\n"
},
{
"role": "assistant",
"content": "我来帮你在文件中添加一些内容。"
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "append_to_file:0",
"index": 0,
"type": "function",
"function": {
"name": "append_to_file",
"arguments": "{\"path\": \"/users/jojo/desktop/agents/正在修复中/agent_kimi_源码备份_标记有bug版本/project/test_add_file.txt\", \"reason\": \"测试添加文件工具,添加一些随机内容\"}"
}
}
]
},
{
"role": "tool",
"content": "{\"success\": false, \"error\": \"路径必须在项目文件夹内\"}",
"tool_call_id": "append_to_file:0",
"name": "append_to_file"
},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "append_to_file:1",
"index": 0,
"type": "function",
"function": {
"name": "append_to_file",
"arguments": "{\"path\": \"test_add_file.txt\", \"reason\": \"测试添加文件工具,添加一些随机内容\"}"
}
}
]
},
{
"role": "tool",
"content": "{\"success\": true, \"awaiting_content\": true, \"path\": \"test_add_file.txt\", \"message\": \"\\n请按照以下格式输出需要追加到文件的完整内容禁止输出任何解释性文字\\n<<<APPEND:test_add_file.txt>>>\\n(在此行之后紧接着写入要追加的全部内容,可包含多行代码)\\n<<<END_APPEND>>>\\n若追加完成后仍需继续写入请再次调用 append_to_file。\"}",
"tool_call_id": "append_to_file:1",
"name": "append_to_file"
},
{
"role": "assistant",
"content": "<<<APPEND:test_add_file.txt>>>\n这是一个测试文件用于测试添加文件工具的功能。\n这里添加了一些随机的内容包括\n- 普通文本内容\n- 列表项\n- 时间戳2025-10-09 16:13:00\n\n测试完成文件添加功能正常工作。\n\n<<<END_APPEND>>>"
},
{
"role": "system",
"content": "[工具结果] append_to_file (tool_call_id=append_to_file:1)\n{\"success\": true, \"path\": \"test_add_file.txt\", \"lines\": 7, \"bytes\": 236, \"forced\": false, \"message\": \"已向 test_add_file.txt 追加 7 行236 字节)\", \"finish_reason\": null, \"appended_content\": \"这是一个测试文件,用于测试添加文件工具的功能。\\n这里添加了一些随机的内容包括\\n- 普通文本内容\\n- 列表项\\n- 时间戳2025-10-09 16:13:00\\n\\n测试完成文件添加功能正常工作。\\n\"}"
}
]
}

View File

@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""
使用现有文件追加/修改对话上下文对不同模型服务发起一次 Chat Completions 请求
用于复现单次工具调用对应多个 tool 消息在不同 API 上的兼容性差异
"""
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple
import httpx
DEFAULT_CONVERSATION = Path("data/conversations/conv_20251009_161243_189.json")
DEFAULT_OUTPUT_DIR = Path("logs/api_experiment")
def convert_messages(raw_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""将存档中的消息转换为 OpenAI Chat Completions 兼容格式。"""
converted: List[Dict[str, Any]] = []
for msg in raw_messages:
role = msg.get("role")
if not role:
continue
entry: Dict[str, Any] = {
"role": role,
"content": msg.get("content", "") or ""
}
if role == "tool":
entry["tool_call_id"] = msg.get("tool_call_id")
if msg.get("name"):
entry["name"] = msg["name"]
if msg.get("tool_calls"):
entry["tool_calls"] = msg["tool_calls"]
converted.append(entry)
return converted
def load_conversation_messages(path: Path) -> List[Dict[str, Any]]:
"""读取对话文件并返回 messages 列表。"""
data = json.loads(path.read_text(encoding="utf-8"))
raw_messages = data.get("messages")
if not isinstance(raw_messages, list):
raise ValueError(f"{path} 中缺少 messages 数据")
return convert_messages(raw_messages)
def minimal_tool_definitions() -> List[Dict[str, Any]]:
"""返回涵盖 append/modify 的最小工具定义集合。"""
return [
{
"type": "function",
"function": {
"name": "append_to_file",
"description": (
"准备向文件追加大段内容。调用后系统会发放 <<<APPEND:path>>>…<<<END_APPEND>>> "
"格式的写入窗口AI 必须在窗口内一次性输出需要追加的全部内容。"
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件的相对路径"},
"reason": {"type": "string", "description": "为什么需要追加(可选)"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "modify_file",
"description": (
"准备替换文件中的指定内容。模型必须在 <<<MODIFY:path>>>…<<<END_MODIFY>>> "
"结构内输出若干 [replace:n] 补丁块,每块包含 <<OLD>> 原文 和 <<NEW>> 新内容。"
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "目标文件的相对路径"}
},
"required": ["path"]
}
}
}
]
def send_request(
api_base: str,
api_key: str,
model_id: str,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
timeout: float = 60.0
) -> Tuple[int, Dict[str, Any], str]:
"""向指定 API 发送一次非流式请求返回状态码、JSON/空字典、原始文本。"""
url = api_base.rstrip("/") + "/chat/completions"
payload = {
"model": model_id,
"messages": messages,
"tools": tools,
"tool_choice": "auto",
"stream": False
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
with httpx.Client(timeout=timeout) as client:
response = client.post(url, json=payload, headers=headers)
text = response.text
try:
data = response.json()
except ValueError:
data = {}
return response.status_code, data, text
def dump_result(
output_dir: Path,
label: str,
payload: Dict[str, Any],
status_code: int,
json_body: Dict[str, Any],
raw_text: str
) -> Path:
"""将实验结果落盘,便于后续分析。"""
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = output_dir / f"{label}_{timestamp}.json"
record = {
"label": label,
"status_code": status_code,
"request_payload": payload,
"response_json": json_body,
"response_text": raw_text
}
filename.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8")
return filename
def main() -> None:
parser = argparse.ArgumentParser(description="对比不同 API 对工具消息结构的兼容性。")
parser.add_argument("--conversation-file", type=Path, default=DEFAULT_CONVERSATION,
help="使用的对话存档 JSON 文件路径")
parser.add_argument("--api-base", required=True, help="API 基础地址,如 https://api.example.com/v1")
parser.add_argument("--api-key", required=True, help="API Key")
parser.add_argument("--model-id", required=True, help="模型 ID")
parser.add_argument("--label", required=True, help="本次实验标签,用于输出文件命名")
parser.add_argument("--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR,
help="实验结果输出目录")
parser.add_argument("--timeout", type=float, default=60.0, help="HTTP 请求超时时间(秒)")
args = parser.parse_args()
messages = load_conversation_messages(args.conversation_file)
tools = minimal_tool_definitions()
payload = {
"model": args.model_id,
"messages": messages,
"tools": tools,
"tool_choice": "auto",
"stream": False
}
print(f"📨 发送消息数: {len(messages)},工具定义数: {len(tools)}")
print(f"➡️ 目标: {args.api_base} / {args.model_id} (label={args.label})")
status_code, json_body, raw_text = send_request(
api_base=args.api_base,
api_key=args.api_key,
model_id=args.model_id,
messages=messages,
tools=tools,
timeout=args.timeout
)
output_path = dump_result(
output_dir=args.output_dir,
label=args.label,
payload=payload,
status_code=status_code,
json_body=json_body,
raw_text=raw_text
)
print(f"✅ HTTP {status_code},结果已保存: {output_path}")
if status_code >= 400:
print("⚠️ 响应出现错误,请查看 response_json/response_text 获取详细信息。")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
对指定模型服务发起一次流式请求并记录每个 data chunk 的输出长度与时间间隔
Usage:
python3 scripts/stream_chunk_probe.py \
--api-base https://api.moonshot.cn/v1 \
--api-key sk-xxx \
--model-id kimi-k2-0905-preview \
--prompt "帮我写一个Python脚本..."
"""
import argparse
import asyncio
import json
import time
from pathlib import Path
from typing import Any, Dict, List
import httpx
def build_messages(prompt: str) -> List[Dict[str, str]]:
"""构建最小化消息列表。"""
system_prompt = "你是一个友好的中文助手。请在回答时输出足够长的内容以便观察流式分片。"
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
]
async def stream_once(api_base: str, api_key: str, model_id: str, prompt: str, timeout: float, max_chunks: int = 0) -> None:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model_id,
"messages": build_messages(prompt),
"stream": True
}
url = api_base.rstrip("/") + "/chat/completions"
print(f"➡️ 发起流式请求: {url} ({model_id})")
start_time = time.time()
last_time = start_time
chunk_index = 0
total_tokens = 0
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", url, json=payload, headers=headers) as response:
print(f"HTTP {response.status_code}")
async for raw_line in response.aiter_lines():
if not raw_line:
continue
if not raw_line.startswith("data:"):
continue
data_part = raw_line[5:].strip()
if data_part == "[DONE]":
break
chunk_index += 1
now = time.time()
delta = now - last_time
last_time = now
try:
data = json.loads(data_part)
except json.JSONDecodeError:
print(f"[{chunk_index:03d}] Δ{delta:.3f}s | 非JSON: {data_part[:80]}")
continue
delta_obj = data.get("choices", [{}])[0].get("delta", {})
text_piece = delta_obj.get("content") or ""
total_tokens += len(text_piece)
reasoning = delta_obj.get("reasoning_content")
has_tool = bool(delta_obj.get("tool_calls"))
summary = []
if text_piece:
summary.append(f"text {len(text_piece)} chars")
if reasoning:
summary.append(f"think {len(reasoning)} chars")
if has_tool:
summary.append("tool_calls")
if not summary:
summary.append("no-content")
summary_text = ", ".join(summary)
print(f"[{chunk_index:03d}] Δ{delta:.3f}s | {summary_text}")
if max_chunks and chunk_index >= max_chunks:
print(f"⚠️ 已达到 max_chunks={max_chunks},提前停止流式读取。")
break
total_time = last_time - start_time
print(f"✅ 流结束,共 {chunk_index} 个 chunk用时 {total_time:.2f}s累计正文字符 {total_tokens}")
def main() -> None:
parser = argparse.ArgumentParser(description="采集流式输出 chunk 间隔。")
parser.add_argument("--api-base", required=True, help="API 基础地址,例如 https://api.moonshot.cn/v1")
parser.add_argument("--api-key", required=True, help="API Key")
parser.add_argument("--model-id", required=True, help="模型 ID")
parser.add_argument("--prompt", default="请用中文详细说明流式输出测试,输出足够多的文字。", help="测试用 prompt")
parser.add_argument("--timeout", type=float, default=120.0, help="HTTP 超时时间(秒)")
parser.add_argument("--max-chunks", type=int, default=0, help="可选,限制最多采集的 chunk 数")
args = parser.parse_args()
asyncio.run(stream_once(
api_base=args.api_base,
api_key=args.api_key,
model_id=args.model_id,
prompt=args.prompt,
timeout=args.timeout,
max_chunks=args.max_chunks
))
if __name__ == "__main__":
main()

View File

@ -18,7 +18,7 @@ from functools import wraps
from datetime import timedelta
import time
from datetime import datetime
from collections import defaultdict
from collections import defaultdict, deque
from werkzeug.utils import secure_filename
from werkzeug.routing import BaseConverter
@ -150,6 +150,17 @@ def sanitize_filename_preserve_unicode(filename: str) -> str:
# Windows/Unix 通用文件名长度安全上限
return cleaned[:255]
def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], content: str) -> str:
"""将工具执行结果转为系统消息文本,方便在对话中回传。"""
header = f"[工具结果] {tool_name}"
if tool_call_id:
header += f" (tool_call_id={tool_call_id})"
body = (content or "").strip()
if not body:
body = "(无附加输出)"
return f"{header}\n{body}"
# 创建调试日志文件
DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log"
UPLOAD_FOLDER_NAME = "user_upload"
@ -2341,6 +2352,64 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
thinking_ended = False
text_started = False
text_has_content = False
TEXT_BUFFER_MAX_CHARS = 1
TEXT_BUFFER_MAX_INTERVAL = 0.0
TEXT_BUFFER_FLUSH_CHARS = 1
text_chunk_buffer: deque[str] = deque()
text_chunk_buffer_size = 0
last_text_flush_time = time.time()
TEXT_BUFFER_CHAR_DELAY = 0.02
def _drain_text_buffer(force: bool = False) -> bool:
nonlocal text_chunk_buffer, text_chunk_buffer_size, last_text_flush_time
if not text_chunk_buffer:
return False
drain_all = force or TEXT_BUFFER_MAX_INTERVAL == 0.0
sent = False
while text_chunk_buffer:
now = time.time()
should_flush = (
force
or text_chunk_buffer_size >= TEXT_BUFFER_MAX_CHARS
or TEXT_BUFFER_MAX_INTERVAL == 0.0
or (TEXT_BUFFER_MAX_INTERVAL > 0 and (now - last_text_flush_time) >= TEXT_BUFFER_MAX_INTERVAL)
)
if not should_flush:
break
batch_size = text_chunk_buffer_size if drain_all else max(1, min(text_chunk_buffer_size, TEXT_BUFFER_FLUSH_CHARS or 1))
pieces: List[str] = []
remaining = batch_size
while text_chunk_buffer and remaining > 0:
chunk = text_chunk_buffer.popleft()
chunk_len = len(chunk)
if chunk_len <= remaining:
pieces.append(chunk)
remaining -= chunk_len
else:
pieces.append(chunk[:remaining])
text_chunk_buffer.appendleft(chunk[remaining:])
chunk_len = remaining
remaining = 0
text_chunk_buffer_size -= chunk_len
if not pieces:
break
sender('text_chunk', {'content': "".join(pieces)})
last_text_flush_time = now
sent = True
if not drain_all:
break
return sent
async def flush_text_buffer(force: bool = False):
sent = _drain_text_buffer(force)
if sent and not force and TEXT_BUFFER_CHAR_DELAY > 0:
await asyncio.sleep(TEXT_BUFFER_CHAR_DELAY)
text_streaming = False
# 计数器
@ -2625,7 +2694,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
full_response += content
accumulated_response += content
text_has_content = True
sender('text_chunk', {'content': content})
for ch in content:
text_chunk_buffer.append(ch)
text_chunk_buffer_size += 1
await flush_text_buffer()
# 收集工具调用 - 实时发送准备状态
if "tool_calls" in delta:
@ -2698,6 +2770,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
debug_log(f"输出token统计失败: {e}")
# 流结束后的处理
await flush_text_buffer(force=True)
debug_log(f"\n流结束统计:")
debug_log(f" 总chunks: {chunk_count}")
debug_log(f" 思考chunks: {reasoning_chunks}")
@ -2726,6 +2799,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
# 确保text_end事件被发送
if text_started and text_has_content and not append_result["handled"] and not modify_result["handled"]:
await flush_text_buffer(force=True)
debug_log(f"发送text_end事件完整内容长度: {len(full_response)}")
sender('text_end', {'full_content': full_response})
await asyncio.sleep(0.1)
@ -2759,14 +2833,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
if append_result["tool_content"]:
tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}"
web_terminal.context_manager.add_conversation(
"tool",
append_result["tool_content"],
tool_call_id=tool_call_id,
name="append_to_file"
)
system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"])
web_terminal.context_manager.add_conversation("system", system_notice)
append_result["tool_call_id"] = tool_call_id
debug_log("💾 增量保存append_to_file 工具结果")
debug_log("💾 增量保存append_to_file 工具结果system 通知)")
finish_reason = append_result.get("finish_reason")
path_for_prompt = append_result.get("path")
@ -2839,14 +2909,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
if modify_result["tool_content"]:
tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}"
web_terminal.context_manager.add_conversation(
"tool",
modify_result["tool_content"],
tool_call_id=tool_call_id,
name="modify_file"
)
system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"])
web_terminal.context_manager.add_conversation("system", system_notice)
modify_result["tool_call_id"] = tool_call_id
debug_log("💾 增量保存modify_file 工具结果")
debug_log("💾 增量保存modify_file 工具结果system 通知)")
path_for_prompt = modify_result.get("path")
failed_blocks = modify_result.get("failed_blocks") or []
@ -2955,22 +3021,22 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client
if append_result["handled"] and append_result.get("tool_content"):
tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"])
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": "append_to_file",
"content": append_result["tool_content"]
"role": "system",
"content": system_notice
})
debug_log("已将 append_to_file 工具结果追加到对话上下文")
append_result["tool_call_id"] = tool_call_id
debug_log("已将 append_to_file 工具结果以 system 形式追加到对话上下文")
if modify_result["handled"] and modify_result.get("tool_content"):
tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}"
system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"])
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": "modify_file",
"content": modify_result["tool_content"]
"role": "system",
"content": system_notice
})
debug_log("已将 modify_file 工具结果追加到对话上下文")
modify_result["tool_call_id"] = tool_call_id
debug_log("已将 modify_file 工具结果以 system 形式追加到对话上下文")
force_continue = append_result["handled"] or modify_result["handled"]
if force_continue: