fix: stabilize streaming buffer drain

This commit is contained in:
JOJO 2025-11-29 17:01:28 +08:00
parent 7e2550880d
commit e42a924429

View File

@ -6,20 +6,354 @@ export async function initializeLegacySocket(ctx: any) {
try { try {
console.log('初始化WebSocket连接...'); console.log('初始化WebSocket连接...');
const usePollingOnly = window.location.hostname !== 'localhost' && const socketOptions = {
window.location.hostname !== '127.0.0.1';
const socketOptions = usePollingOnly ? {
transports: ['polling'],
upgrade: false,
autoConnect: false
} : {
transports: ['websocket', 'polling'], transports: ['websocket', 'polling'],
autoConnect: false autoConnect: false
}; };
ctx.socket = createSocketClient('/', socketOptions); ctx.socket = createSocketClient('/', socketOptions);
const STREAMING_CHAR_DELAY = 22;
const STREAMING_FINALIZE_DELAY = 1000;
const STREAMING_DEBUG = true;
const STREAMING_DEBUG_HISTORY_LIMIT = 2000;
const streamingState = {
buffer: [] as string[],
timer: null as number | null,
completionTimer: null as number | null,
apiCompleted: false,
pendingCompleteContent: '' as string,
renderedText: '' as string,
activeMessageIndex: null as number | null,
activeTextAction: null as any
};
const snapshotStreamingState = () => ({
bufferLength: streamingState.buffer.length,
timerActive: streamingState.timer !== null,
completionTimerActive: streamingState.completionTimer !== null,
apiCompleted: streamingState.apiCompleted,
pendingLength: (streamingState.pendingCompleteContent || '').length,
renderedLength: streamingState.renderedText.length,
currentMessageIndex:
typeof ctx?.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null,
messagesLength: Array.isArray(ctx?.messages) ? ctx.messages.length : null,
streamingMessage: !!ctx?.streamingMessage
});
const streamingDebugHistory: Array<any> = [];
const getActiveMessage = () => {
if (streamingState.activeMessageIndex === null) {
return null;
}
const messages = ctx?.messages;
if (!Array.isArray(messages)) {
return null;
}
return messages[streamingState.activeMessageIndex] || null;
};
const ensureActiveMessageBinding = () => {
if (
streamingState.activeMessageIndex === null &&
typeof ctx?.currentMessageIndex === 'number' &&
ctx.currentMessageIndex >= 0
) {
streamingState.activeMessageIndex = ctx.currentMessageIndex;
}
if (
typeof ctx?.currentMessageIndex !== 'number' ||
ctx.currentMessageIndex < 0
) {
if (streamingState.activeMessageIndex !== null) {
ctx.currentMessageIndex = streamingState.activeMessageIndex;
}
}
if (!ctx.streamingMessage) {
ctx.streamingMessage = true;
}
const msg = getActiveMessage();
if (!msg && Array.isArray(ctx?.messages) && ctx.messages.length) {
streamingState.activeMessageIndex = ctx.messages.length - 1;
}
return getActiveMessage();
};
const ensureActiveTextAction = () => {
const msg = getActiveMessage();
if (!msg || !Array.isArray(msg.actions) || !msg.actions.length) {
return null;
}
const known = streamingState.activeTextAction;
if (known && msg.actions.includes(known)) {
return known;
}
for (let i = msg.actions.length - 1; i >= 0; i--) {
const action = msg.actions[i];
if (action && action.type === 'text') {
streamingState.activeTextAction = action;
return action;
}
}
return null;
};
const fallbackAppendToActiveMessage = (text: string) => {
const msg = ensureActiveMessageBinding();
if (!msg) {
return null;
}
if (typeof msg.streamingText !== 'string') {
msg.streamingText = '';
}
msg.streamingText += text;
const action = ensureActiveTextAction();
if (!action) {
return null;
}
if (typeof action.content !== 'string') {
action.content = '';
}
action.content += text;
action.streaming = action.streaming !== false;
streamingState.activeTextAction = action;
ctx.$forceUpdate();
ctx.conditionalScrollToBottom();
renderLatexInRealtime();
return action;
};
const completeActiveTextAction = (fullContent: string) => {
const msg = ensureActiveMessageBinding();
if (!msg) {
return false;
}
const action = ensureActiveTextAction();
if (action) {
action.streaming = false;
action.content = fullContent;
}
msg.streamingText = '';
msg.currentStreamingType = null;
return !!action;
};
const logStreamingDebug = (event: string, detail?: any) => {
if (!STREAMING_DEBUG) {
return;
}
const payload = typeof detail === 'undefined' ? snapshotStreamingState() : detail;
const entry = {
event,
detail: payload,
conversationId: ctx.currentConversationId || null,
timestamp: Date.now()
};
streamingDebugHistory.push(entry);
if (streamingDebugHistory.length > STREAMING_DEBUG_HISTORY_LIMIT) {
streamingDebugHistory.shift();
}
try {
window.__streamingDebugLogs = streamingDebugHistory.slice();
} catch (error) {
// ignore
}
console.log('[streaming-debug]', event, payload);
try {
if (ctx.socket && typeof ctx.socket.emit === 'function') {
ctx.socket.emit('client_stream_debug_log', entry);
}
} catch (error) {
console.warn('上报 streaming debug 日志失败:', error);
}
};
const stopStreamingTimer = () => {
if (streamingState.timer !== null) {
clearTimeout(streamingState.timer);
streamingState.timer = null;
logStreamingDebug('stopStreamingTimer', snapshotStreamingState());
}
};
const stopCompletionTimer = () => {
if (streamingState.completionTimer !== null) {
clearTimeout(streamingState.completionTimer);
streamingState.completionTimer = null;
logStreamingDebug('stopCompletionTimer', snapshotStreamingState());
}
};
const finalizeStreamingText = (options?: { force?: boolean; allowIncomplete?: boolean }) => {
const forceFlush = !!options?.force;
const allowIncomplete = !!options?.allowIncomplete;
logStreamingDebug('finalizeStreamingText:start', { forceFlush, allowIncomplete, snapshot: snapshotStreamingState() });
if (!forceFlush) {
if (streamingState.buffer.length) {
logStreamingDebug('finalizeStreamingText:blocked-buffer', snapshotStreamingState());
return false;
}
if (!allowIncomplete && !streamingState.apiCompleted) {
logStreamingDebug('finalizeStreamingText:blocked-api-incomplete', snapshotStreamingState());
return false;
}
}
stopStreamingTimer();
stopCompletionTimer();
if (forceFlush && streamingState.buffer.length) {
const remainder = streamingState.buffer.join('');
streamingState.buffer.length = 0;
if (remainder) {
applyTextChunk(remainder);
logStreamingDebug('finalizeStreamingText:force-flush-buffer', { flushedChars: remainder.length });
}
}
const pendingText = streamingState.pendingCompleteContent || '';
const renderedText = streamingState.renderedText || '';
let finalText = pendingText || renderedText || '';
let remainderToAppend = '';
if (!pendingText) {
finalText = renderedText;
} else if (!renderedText) {
finalText = pendingText;
remainderToAppend = pendingText;
} else if (pendingText.length < renderedText.length) {
// 后端返回的最终内容比已渲染的还短,避免覆盖已展示的字符
finalText = renderedText;
} else if (pendingText.startsWith(renderedText)) {
finalText = pendingText;
remainderToAppend = pendingText.slice(renderedText.length);
} else {
finalText = pendingText;
}
logStreamingDebug('finalizeStreamingText:resolved-final-text', {
pendingLength: pendingText.length,
renderedLength: renderedText.length,
remainderToAppendLength: remainderToAppend.length,
finalLength: finalText.length
});
if (remainderToAppend) {
applyTextChunk(remainderToAppend);
}
streamingState.pendingCompleteContent = '';
streamingState.apiCompleted = false;
streamingState.renderedText = '';
ctx.chatCompleteTextAction(finalText || '');
completeActiveTextAction(finalText || '');
ctx.$forceUpdate();
ctx.streamingMessage = false;
logStreamingDebug('finalizeStreamingText:complete', snapshotStreamingState());
streamingState.activeMessageIndex = null;
streamingState.activeTextAction = null;
return true;
};
const scheduleFinalizationAfterDrain = () => {
if (streamingState.buffer.length) {
logStreamingDebug('scheduleFinalizationAfterDrain:buffer-not-empty', snapshotStreamingState());
return;
}
if (streamingState.completionTimer !== null) {
logStreamingDebug('scheduleFinalizationAfterDrain:already-scheduled', snapshotStreamingState());
return;
}
logStreamingDebug('scheduleFinalizationAfterDrain:scheduled', snapshotStreamingState());
streamingState.completionTimer = window.setTimeout(() => {
streamingState.completionTimer = null;
logStreamingDebug('scheduleFinalizationAfterDrain:timer-fired', snapshotStreamingState());
finalizeStreamingText({ allowIncomplete: true });
}, STREAMING_FINALIZE_DELAY);
};
const resetStreamingBuffer = () => {
stopStreamingTimer();
stopCompletionTimer();
streamingState.buffer.length = 0;
streamingState.apiCompleted = false;
streamingState.pendingCompleteContent = '';
streamingState.renderedText = '';
streamingState.activeMessageIndex = null;
streamingState.activeTextAction = null;
logStreamingDebug('resetStreamingBuffer', snapshotStreamingState());
};
const applyTextChunk = (text: string) => {
if (!text) {
return null;
}
ensureActiveMessageBinding();
let action = ctx.chatAppendTextChunk(text);
if (action) {
ctx.$forceUpdate();
ctx.conditionalScrollToBottom();
renderLatexInRealtime();
} else {
action = fallbackAppendToActiveMessage(text);
}
streamingState.renderedText += text;
const appended = !!action;
logStreamingDebug('applyTextChunk', {
chunkLength: text.length,
appended,
snapshot: snapshotStreamingState()
});
if (!appended) {
logStreamingDebug('applyTextChunk:missing-target', {
chunkLength: text.length,
currentMessageIndex:
typeof ctx?.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null,
messagesLength: Array.isArray(ctx?.messages) ? ctx.messages.length : null
});
}
return action;
};
const scheduleStreamingFlush = () => {
if (streamingState.timer !== null) {
logStreamingDebug('scheduleStreamingFlush:timer-exists', snapshotStreamingState());
return;
}
if (!streamingState.buffer.length) {
logStreamingDebug('scheduleStreamingFlush:no-buffer', snapshotStreamingState());
return;
}
logStreamingDebug('scheduleStreamingFlush:start', snapshotStreamingState());
const process = () => {
streamingState.timer = null;
logStreamingDebug('scheduleStreamingFlush:tick', snapshotStreamingState());
if (!streamingState.buffer.length) {
logStreamingDebug('scheduleStreamingFlush:buffer-empty', snapshotStreamingState());
return;
}
const piece = streamingState.buffer.shift();
if (piece) {
applyTextChunk(piece);
}
if (streamingState.buffer.length) {
scheduleStreamingFlush();
} else {
logStreamingDebug('scheduleStreamingFlush:buffer-drained', snapshotStreamingState());
scheduleFinalizationAfterDrain();
}
};
streamingState.timer = window.setTimeout(process, STREAMING_CHAR_DELAY);
};
const enqueueStreamingContent = (text: string) => {
if (!text) {
return;
}
stopCompletionTimer();
logStreamingDebug('enqueueStreamingContent', { incomingLength: text.length });
for (const ch of Array.from(text)) {
streamingState.buffer.push(ch);
}
logStreamingDebug('enqueueStreamingContent:buffered', snapshotStreamingState());
scheduleStreamingFlush();
};
const scheduleHistoryReload = (delay = 0) => { const scheduleHistoryReload = (delay = 0) => {
if (!ctx || typeof ctx.fetchAndDisplayHistory !== 'function') { if (!ctx || typeof ctx.fetchAndDisplayHistory !== 'function') {
return; return;
@ -256,8 +590,14 @@ export async function initializeLegacySocket(ctx: any) {
// AI消息开始 // AI消息开始
ctx.socket.on('ai_message_start', () => { ctx.socket.on('ai_message_start', () => {
console.log('AI消息开始'); console.log('AI消息开始');
logStreamingDebug('socket:ai_message_start');
finalizeStreamingText({ force: true });
resetStreamingBuffer();
ctx.cleanupStaleToolActions(); ctx.cleanupStaleToolActions();
ctx.chatStartAssistantMessage(); ctx.chatStartAssistantMessage();
streamingState.activeMessageIndex =
typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null;
streamingState.activeTextAction = null;
ctx.stopRequested = false; ctx.stopRequested = false;
ctx.chatEnableAutoScroll(); ctx.chatEnableAutoScroll();
ctx.scrollToBottom(); ctx.scrollToBottom();
@ -310,25 +650,55 @@ export async function initializeLegacySocket(ctx: any) {
// 文本流开始 // 文本流开始
ctx.socket.on('text_start', () => { ctx.socket.on('text_start', () => {
console.log('文本开始'); console.log('文本开始');
ctx.chatStartTextAction(); logStreamingDebug('socket:text_start');
finalizeStreamingText({ force: true });
resetStreamingBuffer();
const action = ctx.chatStartTextAction();
streamingState.activeMessageIndex =
typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null;
streamingState.activeTextAction = action || ensureActiveTextAction();
ensureActiveMessageBinding();
ctx.$forceUpdate(); ctx.$forceUpdate();
}); });
// 文本内容块 // 文本内容块
ctx.socket.on('text_chunk', (data) => { ctx.socket.on('text_chunk', (data) => {
const action = ctx.chatAppendTextChunk(data.content); logStreamingDebug('socket:text_chunk', {
if (action) { index: data?.index ?? null,
ctx.$forceUpdate(); elapsed: data?.elapsed ?? null,
ctx.conditionalScrollToBottom(); chunkLength: (data?.content || '').length,
renderLatexInRealtime(); snapshot: snapshotStreamingState()
});
try {
ctx.socket.emit('client_chunk_log', {
conversation_id: ctx.currentConversationId,
index: data?.index ?? null,
elapsed: data?.elapsed ?? null,
length: (data?.content || '').length,
ts: Date.now()
});
} catch (error) {
console.warn('上报chunk日志失败:', error);
}
if (data && typeof data.content === 'string' && data.content.length) {
enqueueStreamingContent(data.content);
} }
}); });
// 文本结束 // 文本结束
ctx.socket.on('text_end', (data) => { ctx.socket.on('text_end', (data) => {
console.log('文本结束'); console.log('文本结束');
ctx.chatCompleteTextAction(data.full_content); logStreamingDebug('socket:text_end', {
ctx.$forceUpdate(); finalLength: (data?.full_content || '').length,
snapshot: snapshotStreamingState()
});
streamingState.apiCompleted = true;
streamingState.pendingCompleteContent = data?.full_content || '';
if (!streamingState.buffer.length) {
scheduleFinalizationAfterDrain();
} else {
scheduleStreamingFlush();
}
}); });
// 工具提示事件(可选) // 工具提示事件(可选)