diff --git a/config/api.py b/config/api.py index 458ae0a..1179f8a 100644 --- a/config/api.py +++ b/config/api.py @@ -17,3 +17,9 @@ __all__ = [ "TAVILY_API_KEY", "DEFAULT_RESPONSE_MAX_TOKENS", ] + +''' +API_BASE_URL = "https://api.moonshot.cn/v1", +API_KEY = "sk-xW0xjfQM6Mp9ZCWMLlnHiRJcpEOIZPTkXcN0dQ15xpZSuw2y", +MODEL_ID = "kimi-k2-0905-preview" +''' diff --git a/data/conversations/conv_tool_system_fixture.json b/data/conversations/conv_tool_system_fixture.json new file mode 100644 index 0000000..f7a0288 --- /dev/null +++ b/data/conversations/conv_tool_system_fixture.json @@ -0,0 +1,66 @@ +{ + "id": "conv_tool_system_fixture", + "title": "工具结果以 system 回传示例", + "created_at": "2025-11-14T21:08:00", + "updated_at": "2025-11-14T21:08:00", + "messages": [ + { + "role": "user", + "content": "我正在测试添加文件工具,请在这个文件中随便添加一些内容,不需要阅读\n/users/jojo/desktop/agents/正在修复中/agent_kimi_源码备份_标记有bug版本/project/test_add_file.txt\n" + }, + { + "role": "assistant", + "content": "我来帮你在文件中添加一些内容。" + }, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "append_to_file:0", + "index": 0, + "type": "function", + "function": { + "name": "append_to_file", + "arguments": "{\"path\": \"/users/jojo/desktop/agents/正在修复中/agent_kimi_源码备份_标记有bug版本/project/test_add_file.txt\", \"reason\": \"测试添加文件工具,添加一些随机内容\"}" + } + } + ] + }, + { + "role": "tool", + "content": "{\"success\": false, \"error\": \"路径必须在项目文件夹内\"}", + "tool_call_id": "append_to_file:0", + "name": "append_to_file" + }, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "append_to_file:1", + "index": 0, + "type": "function", + "function": { + "name": "append_to_file", + "arguments": "{\"path\": \"test_add_file.txt\", \"reason\": \"测试添加文件工具,添加一些随机内容\"}" + } + } + ] + }, + { + "role": "tool", + "content": "{\"success\": true, \"awaiting_content\": true, \"path\": \"test_add_file.txt\", \"message\": \"\\n请按照以下格式输出需要追加到文件的完整内容,禁止输出任何解释性文字:\\n<<>>\\n(在此行之后紧接着写入要追加的全部内容,可包含多行代码)\\n<<>>\\n若追加完成后仍需继续写入,请再次调用 append_to_file。\"}", + "tool_call_id": "append_to_file:1", + "name": "append_to_file" + }, + { + "role": "assistant", + "content": "<<>>\n这是一个测试文件,用于测试添加文件工具的功能。\n这里添加了一些随机的内容,包括:\n- 普通文本内容\n- 列表项\n- 时间戳:2025-10-09 16:13:00\n\n测试完成,文件添加功能正常工作。\n\n<<>>" + }, + { + "role": "system", + "content": "[工具结果] append_to_file (tool_call_id=append_to_file:1)\n{\"success\": true, \"path\": \"test_add_file.txt\", \"lines\": 7, \"bytes\": 236, \"forced\": false, \"message\": \"已向 test_add_file.txt 追加 7 行(236 字节)\", \"finish_reason\": null, \"appended_content\": \"这是一个测试文件,用于测试添加文件工具的功能。\\n这里添加了一些随机的内容,包括:\\n- 普通文本内容\\n- 列表项\\n- 时间戳:2025-10-09 16:13:00\\n\\n测试完成,文件添加功能正常工作。\\n\"}" + } + ] +} diff --git a/scripts/api_tool_role_experiment.py b/scripts/api_tool_role_experiment.py new file mode 100644 index 0000000..87a1cc8 --- /dev/null +++ b/scripts/api_tool_role_experiment.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +""" +使用现有“文件追加/修改”对话上下文对不同模型服务发起一次 Chat Completions 请求, +用于复现“单次工具调用对应多个 tool 消息”在不同 API 上的兼容性差异。 +""" + +import argparse +import json +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Tuple + +import httpx + + +DEFAULT_CONVERSATION = Path("data/conversations/conv_20251009_161243_189.json") +DEFAULT_OUTPUT_DIR = Path("logs/api_experiment") + + +def convert_messages(raw_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """将存档中的消息转换为 OpenAI Chat Completions 兼容格式。""" + converted: List[Dict[str, Any]] = [] + for msg in raw_messages: + role = msg.get("role") + if not role: + continue + entry: Dict[str, Any] = { + "role": role, + "content": msg.get("content", "") or "" + } + if role == "tool": + entry["tool_call_id"] = msg.get("tool_call_id") + if msg.get("name"): + entry["name"] = msg["name"] + if msg.get("tool_calls"): + entry["tool_calls"] = msg["tool_calls"] + converted.append(entry) + return converted + + +def load_conversation_messages(path: Path) -> List[Dict[str, Any]]: + """读取对话文件并返回 messages 列表。""" + data = json.loads(path.read_text(encoding="utf-8")) + raw_messages = data.get("messages") + if not isinstance(raw_messages, list): + raise ValueError(f"{path} 中缺少 messages 数据") + return convert_messages(raw_messages) + + +def minimal_tool_definitions() -> List[Dict[str, Any]]: + """返回涵盖 append/modify 的最小工具定义集合。""" + return [ + { + "type": "function", + "function": { + "name": "append_to_file", + "description": ( + "准备向文件追加大段内容。调用后系统会发放 <<>>…<<>> " + "格式的写入窗口,AI 必须在窗口内一次性输出需要追加的全部内容。" + ), + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string", "description": "目标文件的相对路径"}, + "reason": {"type": "string", "description": "为什么需要追加(可选)"} + }, + "required": ["path"] + } + } + }, + { + "type": "function", + "function": { + "name": "modify_file", + "description": ( + "准备替换文件中的指定内容。模型必须在 <<>>…<<>> " + "结构内输出若干 [replace:n] 补丁块,每块包含 <> 原文 和 <> 新内容。" + ), + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string", "description": "目标文件的相对路径"} + }, + "required": ["path"] + } + } + } + ] + + +def send_request( + api_base: str, + api_key: str, + model_id: str, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]], + timeout: float = 60.0 +) -> Tuple[int, Dict[str, Any], str]: + """向指定 API 发送一次非流式请求,返回状态码、JSON/空字典、原始文本。""" + url = api_base.rstrip("/") + "/chat/completions" + payload = { + "model": model_id, + "messages": messages, + "tools": tools, + "tool_choice": "auto", + "stream": False + } + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + + with httpx.Client(timeout=timeout) as client: + response = client.post(url, json=payload, headers=headers) + text = response.text + try: + data = response.json() + except ValueError: + data = {} + return response.status_code, data, text + + +def dump_result( + output_dir: Path, + label: str, + payload: Dict[str, Any], + status_code: int, + json_body: Dict[str, Any], + raw_text: str +) -> Path: + """将实验结果落盘,便于后续分析。""" + output_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = output_dir / f"{label}_{timestamp}.json" + record = { + "label": label, + "status_code": status_code, + "request_payload": payload, + "response_json": json_body, + "response_text": raw_text + } + filename.write_text(json.dumps(record, ensure_ascii=False, indent=2), encoding="utf-8") + return filename + + +def main() -> None: + parser = argparse.ArgumentParser(description="对比不同 API 对工具消息结构的兼容性。") + parser.add_argument("--conversation-file", type=Path, default=DEFAULT_CONVERSATION, + help="使用的对话存档 JSON 文件路径") + parser.add_argument("--api-base", required=True, help="API 基础地址,如 https://api.example.com/v1") + parser.add_argument("--api-key", required=True, help="API Key") + parser.add_argument("--model-id", required=True, help="模型 ID") + parser.add_argument("--label", required=True, help="本次实验标签,用于输出文件命名") + parser.add_argument("--output-dir", type=Path, default=DEFAULT_OUTPUT_DIR, + help="实验结果输出目录") + parser.add_argument("--timeout", type=float, default=60.0, help="HTTP 请求超时时间(秒)") + args = parser.parse_args() + + messages = load_conversation_messages(args.conversation_file) + tools = minimal_tool_definitions() + payload = { + "model": args.model_id, + "messages": messages, + "tools": tools, + "tool_choice": "auto", + "stream": False + } + + print(f"📨 发送消息数: {len(messages)},工具定义数: {len(tools)}") + print(f"➡️ 目标: {args.api_base} / {args.model_id} (label={args.label})") + + status_code, json_body, raw_text = send_request( + api_base=args.api_base, + api_key=args.api_key, + model_id=args.model_id, + messages=messages, + tools=tools, + timeout=args.timeout + ) + + output_path = dump_result( + output_dir=args.output_dir, + label=args.label, + payload=payload, + status_code=status_code, + json_body=json_body, + raw_text=raw_text + ) + + print(f"✅ HTTP {status_code},结果已保存: {output_path}") + if status_code >= 400: + print("⚠️ 响应出现错误,请查看 response_json/response_text 获取详细信息。") + + +if __name__ == "__main__": + main() diff --git a/scripts/stream_chunk_probe.py b/scripts/stream_chunk_probe.py new file mode 100644 index 0000000..faf6180 --- /dev/null +++ b/scripts/stream_chunk_probe.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +对指定模型服务发起一次流式请求,并记录每个 data chunk 的输出长度与时间间隔。 + +Usage: + python3 scripts/stream_chunk_probe.py \ + --api-base https://api.moonshot.cn/v1 \ + --api-key sk-xxx \ + --model-id kimi-k2-0905-preview \ + --prompt "帮我写一个Python脚本..." +""" + +import argparse +import asyncio +import json +import time +from pathlib import Path +from typing import Any, Dict, List + +import httpx + + +def build_messages(prompt: str) -> List[Dict[str, str]]: + """构建最小化消息列表。""" + system_prompt = "你是一个友好的中文助手。请在回答时输出足够长的内容以便观察流式分片。" + return [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": prompt}, + ] + + +async def stream_once(api_base: str, api_key: str, model_id: str, prompt: str, timeout: float, max_chunks: int = 0) -> None: + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json" + } + payload = { + "model": model_id, + "messages": build_messages(prompt), + "stream": True + } + + url = api_base.rstrip("/") + "/chat/completions" + print(f"➡️ 发起流式请求: {url} ({model_id})") + + start_time = time.time() + last_time = start_time + chunk_index = 0 + total_tokens = 0 + + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("POST", url, json=payload, headers=headers) as response: + print(f"HTTP {response.status_code}") + async for raw_line in response.aiter_lines(): + if not raw_line: + continue + if not raw_line.startswith("data:"): + continue + data_part = raw_line[5:].strip() + if data_part == "[DONE]": + break + + chunk_index += 1 + now = time.time() + delta = now - last_time + last_time = now + + try: + data = json.loads(data_part) + except json.JSONDecodeError: + print(f"[{chunk_index:03d}] Δ{delta:.3f}s | 非JSON: {data_part[:80]}") + continue + + delta_obj = data.get("choices", [{}])[0].get("delta", {}) + text_piece = delta_obj.get("content") or "" + total_tokens += len(text_piece) + reasoning = delta_obj.get("reasoning_content") + has_tool = bool(delta_obj.get("tool_calls")) + summary = [] + if text_piece: + summary.append(f"text {len(text_piece)} chars") + if reasoning: + summary.append(f"think {len(reasoning)} chars") + if has_tool: + summary.append("tool_calls") + if not summary: + summary.append("no-content") + summary_text = ", ".join(summary) + print(f"[{chunk_index:03d}] Δ{delta:.3f}s | {summary_text}") + + if max_chunks and chunk_index >= max_chunks: + print(f"⚠️ 已达到 max_chunks={max_chunks},提前停止流式读取。") + break + + total_time = last_time - start_time + print(f"✅ 流结束,共 {chunk_index} 个 chunk,用时 {total_time:.2f}s,累计正文字符 {total_tokens}") + + +def main() -> None: + parser = argparse.ArgumentParser(description="采集流式输出 chunk 间隔。") + parser.add_argument("--api-base", required=True, help="API 基础地址,例如 https://api.moonshot.cn/v1") + parser.add_argument("--api-key", required=True, help="API Key") + parser.add_argument("--model-id", required=True, help="模型 ID") + parser.add_argument("--prompt", default="请用中文详细说明流式输出测试,输出足够多的文字。", help="测试用 prompt") + parser.add_argument("--timeout", type=float, default=120.0, help="HTTP 超时时间(秒)") + parser.add_argument("--max-chunks", type=int, default=0, help="可选,限制最多采集的 chunk 数") + args = parser.parse_args() + + asyncio.run(stream_once( + api_base=args.api_base, + api_key=args.api_key, + model_id=args.model_id, + prompt=args.prompt, + timeout=args.timeout, + max_chunks=args.max_chunks + )) + + +if __name__ == "__main__": + main() diff --git a/web_server.py b/web_server.py index 0e07472..7334dda 100644 --- a/web_server.py +++ b/web_server.py @@ -18,7 +18,7 @@ from functools import wraps from datetime import timedelta import time from datetime import datetime -from collections import defaultdict +from collections import defaultdict, deque from werkzeug.utils import secure_filename from werkzeug.routing import BaseConverter @@ -150,6 +150,17 @@ def sanitize_filename_preserve_unicode(filename: str) -> str: # Windows/Unix 通用文件名长度安全上限 return cleaned[:255] + +def format_tool_result_notice(tool_name: str, tool_call_id: Optional[str], content: str) -> str: + """将工具执行结果转为系统消息文本,方便在对话中回传。""" + header = f"[工具结果] {tool_name}" + if tool_call_id: + header += f" (tool_call_id={tool_call_id})" + body = (content or "").strip() + if not body: + body = "(无附加输出)" + return f"{header}\n{body}" + # 创建调试日志文件 DEBUG_LOG_FILE = Path(LOGS_DIR).expanduser().resolve() / "debug_stream.log" UPLOAD_FOLDER_NAME = "user_upload" @@ -2341,6 +2352,64 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client thinking_ended = False text_started = False text_has_content = False + TEXT_BUFFER_MAX_CHARS = 1 + TEXT_BUFFER_MAX_INTERVAL = 0.0 + TEXT_BUFFER_FLUSH_CHARS = 1 + text_chunk_buffer: deque[str] = deque() + text_chunk_buffer_size = 0 + last_text_flush_time = time.time() + TEXT_BUFFER_CHAR_DELAY = 0.02 + + def _drain_text_buffer(force: bool = False) -> bool: + nonlocal text_chunk_buffer, text_chunk_buffer_size, last_text_flush_time + if not text_chunk_buffer: + return False + + drain_all = force or TEXT_BUFFER_MAX_INTERVAL == 0.0 + sent = False + while text_chunk_buffer: + now = time.time() + should_flush = ( + force + or text_chunk_buffer_size >= TEXT_BUFFER_MAX_CHARS + or TEXT_BUFFER_MAX_INTERVAL == 0.0 + or (TEXT_BUFFER_MAX_INTERVAL > 0 and (now - last_text_flush_time) >= TEXT_BUFFER_MAX_INTERVAL) + ) + if not should_flush: + break + + batch_size = text_chunk_buffer_size if drain_all else max(1, min(text_chunk_buffer_size, TEXT_BUFFER_FLUSH_CHARS or 1)) + pieces: List[str] = [] + remaining = batch_size + + while text_chunk_buffer and remaining > 0: + chunk = text_chunk_buffer.popleft() + chunk_len = len(chunk) + if chunk_len <= remaining: + pieces.append(chunk) + remaining -= chunk_len + else: + pieces.append(chunk[:remaining]) + text_chunk_buffer.appendleft(chunk[remaining:]) + chunk_len = remaining + remaining = 0 + text_chunk_buffer_size -= chunk_len + + if not pieces: + break + + sender('text_chunk', {'content': "".join(pieces)}) + last_text_flush_time = now + sent = True + + if not drain_all: + break + return sent + + async def flush_text_buffer(force: bool = False): + sent = _drain_text_buffer(force) + if sent and not force and TEXT_BUFFER_CHAR_DELAY > 0: + await asyncio.sleep(TEXT_BUFFER_CHAR_DELAY) text_streaming = False # 计数器 @@ -2625,7 +2694,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client full_response += content accumulated_response += content text_has_content = True - sender('text_chunk', {'content': content}) + for ch in content: + text_chunk_buffer.append(ch) + text_chunk_buffer_size += 1 + await flush_text_buffer() # 收集工具调用 - 实时发送准备状态 if "tool_calls" in delta: @@ -2698,6 +2770,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client debug_log(f"输出token统计失败: {e}") # 流结束后的处理 + await flush_text_buffer(force=True) debug_log(f"\n流结束统计:") debug_log(f" 总chunks: {chunk_count}") debug_log(f" 思考chunks: {reasoning_chunks}") @@ -2726,6 +2799,7 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client # 确保text_end事件被发送 if text_started and text_has_content and not append_result["handled"] and not modify_result["handled"]: + await flush_text_buffer(force=True) debug_log(f"发送text_end事件,完整内容长度: {len(full_response)}") sender('text_end', {'full_content': full_response}) await asyncio.sleep(0.1) @@ -2759,14 +2833,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client if append_result["tool_content"]: tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}" - web_terminal.context_manager.add_conversation( - "tool", - append_result["tool_content"], - tool_call_id=tool_call_id, - name="append_to_file" - ) + system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"]) + web_terminal.context_manager.add_conversation("system", system_notice) append_result["tool_call_id"] = tool_call_id - debug_log("💾 增量保存:append_to_file 工具结果") + debug_log("💾 增量保存:append_to_file 工具结果(system 通知)") finish_reason = append_result.get("finish_reason") path_for_prompt = append_result.get("path") @@ -2839,14 +2909,10 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client if modify_result["tool_content"]: tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}" - web_terminal.context_manager.add_conversation( - "tool", - modify_result["tool_content"], - tool_call_id=tool_call_id, - name="modify_file" - ) + system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"]) + web_terminal.context_manager.add_conversation("system", system_notice) modify_result["tool_call_id"] = tool_call_id - debug_log("💾 增量保存:modify_file 工具结果") + debug_log("💾 增量保存:modify_file 工具结果(system 通知)") path_for_prompt = modify_result.get("path") failed_blocks = modify_result.get("failed_blocks") or [] @@ -2955,22 +3021,22 @@ async def handle_task_with_sender(terminal: WebTerminal, message, sender, client if append_result["handled"] and append_result.get("tool_content"): tool_call_id = append_result.get("tool_call_id") or f"append_{int(time.time() * 1000)}" + system_notice = format_tool_result_notice("append_to_file", tool_call_id, append_result["tool_content"]) messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": "append_to_file", - "content": append_result["tool_content"] + "role": "system", + "content": system_notice }) - debug_log("已将 append_to_file 工具结果追加到对话上下文") + append_result["tool_call_id"] = tool_call_id + debug_log("已将 append_to_file 工具结果以 system 形式追加到对话上下文") if modify_result["handled"] and modify_result.get("tool_content"): tool_call_id = modify_result.get("tool_call_id") or f"modify_{int(time.time() * 1000)}" + system_notice = format_tool_result_notice("modify_file", tool_call_id, modify_result["tool_content"]) messages.append({ - "role": "tool", - "tool_call_id": tool_call_id, - "name": "modify_file", - "content": modify_result["tool_content"] + "role": "system", + "content": system_notice }) - debug_log("已将 modify_file 工具结果追加到对话上下文") + modify_result["tool_call_id"] = tool_call_id + debug_log("已将 modify_file 工具结果以 system 形式追加到对话上下文") force_continue = append_result["handled"] or modify_result["handled"] if force_continue: