diff --git a/static/src/composables/useLegacySocket.ts b/static/src/composables/useLegacySocket.ts index 3d8a33a..995119f 100644 --- a/static/src/composables/useLegacySocket.ts +++ b/static/src/composables/useLegacySocket.ts @@ -6,20 +6,354 @@ export async function initializeLegacySocket(ctx: any) { try { console.log('初始化WebSocket连接...'); - const usePollingOnly = window.location.hostname !== 'localhost' && - window.location.hostname !== '127.0.0.1'; - - const socketOptions = usePollingOnly ? { - transports: ['polling'], - upgrade: false, - autoConnect: false - } : { + const socketOptions = { transports: ['websocket', 'polling'], autoConnect: false }; ctx.socket = createSocketClient('/', socketOptions); + const STREAMING_CHAR_DELAY = 22; + const STREAMING_FINALIZE_DELAY = 1000; + const STREAMING_DEBUG = true; + const STREAMING_DEBUG_HISTORY_LIMIT = 2000; + const streamingState = { + buffer: [] as string[], + timer: null as number | null, + completionTimer: null as number | null, + apiCompleted: false, + pendingCompleteContent: '' as string, + renderedText: '' as string, + activeMessageIndex: null as number | null, + activeTextAction: null as any + }; + + const snapshotStreamingState = () => ({ + bufferLength: streamingState.buffer.length, + timerActive: streamingState.timer !== null, + completionTimerActive: streamingState.completionTimer !== null, + apiCompleted: streamingState.apiCompleted, + pendingLength: (streamingState.pendingCompleteContent || '').length, + renderedLength: streamingState.renderedText.length, + currentMessageIndex: + typeof ctx?.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null, + messagesLength: Array.isArray(ctx?.messages) ? ctx.messages.length : null, + streamingMessage: !!ctx?.streamingMessage + }); + + const streamingDebugHistory: Array = []; + + const getActiveMessage = () => { + if (streamingState.activeMessageIndex === null) { + return null; + } + const messages = ctx?.messages; + if (!Array.isArray(messages)) { + return null; + } + return messages[streamingState.activeMessageIndex] || null; + }; + + const ensureActiveMessageBinding = () => { + if ( + streamingState.activeMessageIndex === null && + typeof ctx?.currentMessageIndex === 'number' && + ctx.currentMessageIndex >= 0 + ) { + streamingState.activeMessageIndex = ctx.currentMessageIndex; + } + if ( + typeof ctx?.currentMessageIndex !== 'number' || + ctx.currentMessageIndex < 0 + ) { + if (streamingState.activeMessageIndex !== null) { + ctx.currentMessageIndex = streamingState.activeMessageIndex; + } + } + if (!ctx.streamingMessage) { + ctx.streamingMessage = true; + } + const msg = getActiveMessage(); + if (!msg && Array.isArray(ctx?.messages) && ctx.messages.length) { + streamingState.activeMessageIndex = ctx.messages.length - 1; + } + return getActiveMessage(); + }; + + const ensureActiveTextAction = () => { + const msg = getActiveMessage(); + if (!msg || !Array.isArray(msg.actions) || !msg.actions.length) { + return null; + } + const known = streamingState.activeTextAction; + if (known && msg.actions.includes(known)) { + return known; + } + for (let i = msg.actions.length - 1; i >= 0; i--) { + const action = msg.actions[i]; + if (action && action.type === 'text') { + streamingState.activeTextAction = action; + return action; + } + } + return null; + }; + + const fallbackAppendToActiveMessage = (text: string) => { + const msg = ensureActiveMessageBinding(); + if (!msg) { + return null; + } + if (typeof msg.streamingText !== 'string') { + msg.streamingText = ''; + } + msg.streamingText += text; + const action = ensureActiveTextAction(); + if (!action) { + return null; + } + if (typeof action.content !== 'string') { + action.content = ''; + } + action.content += text; + action.streaming = action.streaming !== false; + streamingState.activeTextAction = action; + ctx.$forceUpdate(); + ctx.conditionalScrollToBottom(); + renderLatexInRealtime(); + return action; + }; + + const completeActiveTextAction = (fullContent: string) => { + const msg = ensureActiveMessageBinding(); + if (!msg) { + return false; + } + const action = ensureActiveTextAction(); + if (action) { + action.streaming = false; + action.content = fullContent; + } + msg.streamingText = ''; + msg.currentStreamingType = null; + return !!action; + }; + + const logStreamingDebug = (event: string, detail?: any) => { + if (!STREAMING_DEBUG) { + return; + } + const payload = typeof detail === 'undefined' ? snapshotStreamingState() : detail; + const entry = { + event, + detail: payload, + conversationId: ctx.currentConversationId || null, + timestamp: Date.now() + }; + streamingDebugHistory.push(entry); + if (streamingDebugHistory.length > STREAMING_DEBUG_HISTORY_LIMIT) { + streamingDebugHistory.shift(); + } + try { + window.__streamingDebugLogs = streamingDebugHistory.slice(); + } catch (error) { + // ignore + } + console.log('[streaming-debug]', event, payload); + try { + if (ctx.socket && typeof ctx.socket.emit === 'function') { + ctx.socket.emit('client_stream_debug_log', entry); + } + } catch (error) { + console.warn('上报 streaming debug 日志失败:', error); + } + }; + + const stopStreamingTimer = () => { + if (streamingState.timer !== null) { + clearTimeout(streamingState.timer); + streamingState.timer = null; + logStreamingDebug('stopStreamingTimer', snapshotStreamingState()); + } + }; + + const stopCompletionTimer = () => { + if (streamingState.completionTimer !== null) { + clearTimeout(streamingState.completionTimer); + streamingState.completionTimer = null; + logStreamingDebug('stopCompletionTimer', snapshotStreamingState()); + } + }; + + const finalizeStreamingText = (options?: { force?: boolean; allowIncomplete?: boolean }) => { + const forceFlush = !!options?.force; + const allowIncomplete = !!options?.allowIncomplete; + logStreamingDebug('finalizeStreamingText:start', { forceFlush, allowIncomplete, snapshot: snapshotStreamingState() }); + if (!forceFlush) { + if (streamingState.buffer.length) { + logStreamingDebug('finalizeStreamingText:blocked-buffer', snapshotStreamingState()); + return false; + } + if (!allowIncomplete && !streamingState.apiCompleted) { + logStreamingDebug('finalizeStreamingText:blocked-api-incomplete', snapshotStreamingState()); + return false; + } + } + stopStreamingTimer(); + stopCompletionTimer(); + if (forceFlush && streamingState.buffer.length) { + const remainder = streamingState.buffer.join(''); + streamingState.buffer.length = 0; + if (remainder) { + applyTextChunk(remainder); + logStreamingDebug('finalizeStreamingText:force-flush-buffer', { flushedChars: remainder.length }); + } + } + const pendingText = streamingState.pendingCompleteContent || ''; + const renderedText = streamingState.renderedText || ''; + let finalText = pendingText || renderedText || ''; + let remainderToAppend = ''; + if (!pendingText) { + finalText = renderedText; + } else if (!renderedText) { + finalText = pendingText; + remainderToAppend = pendingText; + } else if (pendingText.length < renderedText.length) { + // 后端返回的最终内容比已渲染的还短,避免覆盖已展示的字符 + finalText = renderedText; + } else if (pendingText.startsWith(renderedText)) { + finalText = pendingText; + remainderToAppend = pendingText.slice(renderedText.length); + } else { + finalText = pendingText; + } + logStreamingDebug('finalizeStreamingText:resolved-final-text', { + pendingLength: pendingText.length, + renderedLength: renderedText.length, + remainderToAppendLength: remainderToAppend.length, + finalLength: finalText.length + }); + if (remainderToAppend) { + applyTextChunk(remainderToAppend); + } + streamingState.pendingCompleteContent = ''; + streamingState.apiCompleted = false; + streamingState.renderedText = ''; + ctx.chatCompleteTextAction(finalText || ''); + completeActiveTextAction(finalText || ''); + ctx.$forceUpdate(); + ctx.streamingMessage = false; + logStreamingDebug('finalizeStreamingText:complete', snapshotStreamingState()); + streamingState.activeMessageIndex = null; + streamingState.activeTextAction = null; + return true; + }; + + const scheduleFinalizationAfterDrain = () => { + if (streamingState.buffer.length) { + logStreamingDebug('scheduleFinalizationAfterDrain:buffer-not-empty', snapshotStreamingState()); + return; + } + if (streamingState.completionTimer !== null) { + logStreamingDebug('scheduleFinalizationAfterDrain:already-scheduled', snapshotStreamingState()); + return; + } + logStreamingDebug('scheduleFinalizationAfterDrain:scheduled', snapshotStreamingState()); + streamingState.completionTimer = window.setTimeout(() => { + streamingState.completionTimer = null; + logStreamingDebug('scheduleFinalizationAfterDrain:timer-fired', snapshotStreamingState()); + finalizeStreamingText({ allowIncomplete: true }); + }, STREAMING_FINALIZE_DELAY); + }; + + const resetStreamingBuffer = () => { + stopStreamingTimer(); + stopCompletionTimer(); + streamingState.buffer.length = 0; + streamingState.apiCompleted = false; + streamingState.pendingCompleteContent = ''; + streamingState.renderedText = ''; + streamingState.activeMessageIndex = null; + streamingState.activeTextAction = null; + logStreamingDebug('resetStreamingBuffer', snapshotStreamingState()); + }; + + const applyTextChunk = (text: string) => { + if (!text) { + return null; + } + ensureActiveMessageBinding(); + let action = ctx.chatAppendTextChunk(text); + if (action) { + ctx.$forceUpdate(); + ctx.conditionalScrollToBottom(); + renderLatexInRealtime(); + } else { + action = fallbackAppendToActiveMessage(text); + } + streamingState.renderedText += text; + const appended = !!action; + logStreamingDebug('applyTextChunk', { + chunkLength: text.length, + appended, + snapshot: snapshotStreamingState() + }); + if (!appended) { + logStreamingDebug('applyTextChunk:missing-target', { + chunkLength: text.length, + currentMessageIndex: + typeof ctx?.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null, + messagesLength: Array.isArray(ctx?.messages) ? ctx.messages.length : null + }); + } + return action; + }; + + const scheduleStreamingFlush = () => { + if (streamingState.timer !== null) { + logStreamingDebug('scheduleStreamingFlush:timer-exists', snapshotStreamingState()); + return; + } + if (!streamingState.buffer.length) { + logStreamingDebug('scheduleStreamingFlush:no-buffer', snapshotStreamingState()); + return; + } + logStreamingDebug('scheduleStreamingFlush:start', snapshotStreamingState()); + const process = () => { + streamingState.timer = null; + logStreamingDebug('scheduleStreamingFlush:tick', snapshotStreamingState()); + if (!streamingState.buffer.length) { + logStreamingDebug('scheduleStreamingFlush:buffer-empty', snapshotStreamingState()); + return; + } + const piece = streamingState.buffer.shift(); + if (piece) { + applyTextChunk(piece); + } + if (streamingState.buffer.length) { + scheduleStreamingFlush(); + } else { + logStreamingDebug('scheduleStreamingFlush:buffer-drained', snapshotStreamingState()); + scheduleFinalizationAfterDrain(); + } + }; + streamingState.timer = window.setTimeout(process, STREAMING_CHAR_DELAY); + }; + + const enqueueStreamingContent = (text: string) => { + if (!text) { + return; + } + stopCompletionTimer(); + logStreamingDebug('enqueueStreamingContent', { incomingLength: text.length }); + for (const ch of Array.from(text)) { + streamingState.buffer.push(ch); + } + logStreamingDebug('enqueueStreamingContent:buffered', snapshotStreamingState()); + scheduleStreamingFlush(); + }; + + const scheduleHistoryReload = (delay = 0) => { if (!ctx || typeof ctx.fetchAndDisplayHistory !== 'function') { return; @@ -256,8 +590,14 @@ export async function initializeLegacySocket(ctx: any) { // AI消息开始 ctx.socket.on('ai_message_start', () => { console.log('AI消息开始'); + logStreamingDebug('socket:ai_message_start'); + finalizeStreamingText({ force: true }); + resetStreamingBuffer(); ctx.cleanupStaleToolActions(); ctx.chatStartAssistantMessage(); + streamingState.activeMessageIndex = + typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null; + streamingState.activeTextAction = null; ctx.stopRequested = false; ctx.chatEnableAutoScroll(); ctx.scrollToBottom(); @@ -310,25 +650,55 @@ export async function initializeLegacySocket(ctx: any) { // 文本流开始 ctx.socket.on('text_start', () => { console.log('文本开始'); - ctx.chatStartTextAction(); + logStreamingDebug('socket:text_start'); + finalizeStreamingText({ force: true }); + resetStreamingBuffer(); + const action = ctx.chatStartTextAction(); + streamingState.activeMessageIndex = + typeof ctx.currentMessageIndex === 'number' ? ctx.currentMessageIndex : null; + streamingState.activeTextAction = action || ensureActiveTextAction(); + ensureActiveMessageBinding(); ctx.$forceUpdate(); }); // 文本内容块 ctx.socket.on('text_chunk', (data) => { - const action = ctx.chatAppendTextChunk(data.content); - if (action) { - ctx.$forceUpdate(); - ctx.conditionalScrollToBottom(); - renderLatexInRealtime(); + logStreamingDebug('socket:text_chunk', { + index: data?.index ?? null, + elapsed: data?.elapsed ?? null, + chunkLength: (data?.content || '').length, + snapshot: snapshotStreamingState() + }); + try { + ctx.socket.emit('client_chunk_log', { + conversation_id: ctx.currentConversationId, + index: data?.index ?? null, + elapsed: data?.elapsed ?? null, + length: (data?.content || '').length, + ts: Date.now() + }); + } catch (error) { + console.warn('上报chunk日志失败:', error); + } + if (data && typeof data.content === 'string' && data.content.length) { + enqueueStreamingContent(data.content); } }); // 文本结束 ctx.socket.on('text_end', (data) => { console.log('文本结束'); - ctx.chatCompleteTextAction(data.full_content); - ctx.$forceUpdate(); + logStreamingDebug('socket:text_end', { + finalLength: (data?.full_content || '').length, + snapshot: snapshotStreamingState() + }); + streamingState.apiCompleted = true; + streamingState.pendingCompleteContent = data?.full_content || ''; + if (!streamingState.buffer.length) { + scheduleFinalizationAfterDrain(); + } else { + scheduleStreamingFlush(); + } }); // 工具提示事件(可选)