From 298a6f27cc2c4dc88b75dd0a71f85ccc69a42e7b Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 5 Apr 2026 16:32:13 +0800 Subject: [PATCH] refactor: extract SSE parsing and Vercel stream logic into dedicated implementation modules --- internal/js/chat-stream/sse_parse.js | 532 +---------------- internal/js/chat-stream/sse_parse_impl.js | 535 ++++++++++++++++++ internal/js/chat-stream/vercel_stream.js | 307 +--------- internal/js/chat-stream/vercel_stream_impl.js | 310 ++++++++++ 4 files changed, 847 insertions(+), 837 deletions(-) create mode 100644 internal/js/chat-stream/sse_parse_impl.js create mode 100644 internal/js/chat-stream/vercel_stream_impl.js diff --git a/internal/js/chat-stream/sse_parse.js b/internal/js/chat-stream/sse_parse.js index ec4b275..bae7426 100644 --- a/internal/js/chat-stream/sse_parse.js +++ b/internal/js/chat-stream/sse_parse.js @@ -1,533 +1,3 @@ 'use strict'; -const { - SKIP_PATTERNS, - SKIP_EXACT_PATHS, -} = require('../shared/deepseek-constants'); - -function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers = true) { - if (!chunk || typeof chunk !== 'object') { - return { - parsed: false, - parts: [], - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens: 0, - newType: currentType, - }; - } - - if (Object.prototype.hasOwnProperty.call(chunk, 'error')) { - return { - parsed: true, - parts: [], - finished: true, - contentFilter: false, - errorMessage: formatErrorMessage(chunk.error), - outputTokens: 0, - newType: currentType, - }; - } - - const pathValue = asString(chunk.p); - const outputTokens = extractAccumulatedTokenUsage(chunk); - - if (hasContentFilterStatus(chunk)) { - return { - parsed: true, - parts: [], - finished: true, - contentFilter: true, - errorMessage: '', - outputTokens, - newType: currentType, - }; - } - - if (shouldSkipPath(pathValue)) { - return { - parsed: true, - parts: [], - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens, - newType: currentType, - }; - } - if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { - return { - parsed: true, - parts: [], - finished: true, - contentFilter: false, - errorMessage: '', - outputTokens, - newType: currentType, - }; - } - - if (!Object.prototype.hasOwnProperty.call(chunk, 'v')) { - return { - parsed: true, - parts: [], - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens, - newType: currentType, - }; - } - - let newType = currentType; - const parts = []; - - if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) { - for (const frag of chunk.v) { - if (!frag || typeof frag !== 'object') { - continue; - } - const fragType = asString(frag.type).toUpperCase(); - const content = asContentString(frag.content, stripReferenceMarkers); - if (!content) { - continue; - } - if (fragType === 'THINK' || fragType === 'THINKING') { - newType = 'thinking'; - parts.push({ text: content, type: 'thinking' }); - } else if (fragType === 'RESPONSE') { - newType = 'text'; - parts.push({ text: content, type: 'text' }); - } else { - parts.push({ text: content, type: 'text' }); - } - } - } - - if (pathValue === 'response' && Array.isArray(chunk.v)) { - for (const item of chunk.v) { - if (!item || typeof item !== 'object') { - continue; - } - if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) { - for (const frag of item.v) { - const fragType = asString(frag && frag.type).toUpperCase(); - if (fragType === 'THINK' || fragType === 'THINKING') { - newType = 'thinking'; - } else if (fragType === 'RESPONSE') { - newType = 'text'; - } - } - } - } - } - - let partType = 'text'; - if (pathValue === 'response/thinking_content') { - partType = 'thinking'; - } else if (pathValue === 'response/content') { - partType = 'text'; - } else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) { - partType = newType; - } else if (!pathValue && thinkingEnabled) { - partType = newType; - } - - const val = chunk.v; - if (typeof val === 'string') { - if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) { - return { - parsed: true, - parts: [], - finished: true, - contentFilter: false, - errorMessage: '', - outputTokens, - newType, - }; - } - const content = asContentString(val, stripReferenceMarkers); - if (content) { - parts.push({ text: content, type: partType }); - } - return { - parsed: true, - parts: filterLeakedContentFilterParts(parts), - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens, - newType, - }; - } - - if (Array.isArray(val)) { - const extracted = extractContentRecursive(val, partType, stripReferenceMarkers); - if (extracted.finished) { - return { - parsed: true, - parts: [], - finished: true, - contentFilter: false, - errorMessage: '', - outputTokens, - newType, - }; - } - parts.push(...extracted.parts); - return { - parsed: true, - parts: filterLeakedContentFilterParts(parts), - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens, - newType, - }; - } - - if (val && typeof val === 'object') { - const resp = val.response && typeof val.response === 'object' ? val.response : val; - if (Array.isArray(resp.fragments)) { - for (const frag of resp.fragments) { - if (!frag || typeof frag !== 'object') { - continue; - } - const content = asContentString(frag.content, stripReferenceMarkers); - if (!content) { - continue; - } - const t = asString(frag.type).toUpperCase(); - if (t === 'THINK' || t === 'THINKING') { - newType = 'thinking'; - parts.push({ text: content, type: 'thinking' }); - } else if (t === 'RESPONSE') { - newType = 'text'; - parts.push({ text: content, type: 'text' }); - } else { - parts.push({ text: content, type: partType }); - } - } - } - } - return { - parsed: true, - parts: filterLeakedContentFilterParts(parts), - finished: false, - contentFilter: false, - errorMessage: '', - outputTokens, - newType, - }; -} - -function extractContentRecursive(items, defaultType, stripReferenceMarkers = true) { - const parts = []; - for (const it of items) { - if (!it || typeof it !== 'object') { - continue; - } - if (!Object.prototype.hasOwnProperty.call(it, 'v')) { - continue; - } - const itemPath = asString(it.p); - const itemV = it.v; - if (itemPath === 'status' && asString(itemV) === 'FINISHED') { - return { parts: [], finished: true }; - } - if (shouldSkipPath(itemPath)) { - continue; - } - const content = asContentString(it.content, stripReferenceMarkers); - if (content) { - const typeName = asString(it.type).toUpperCase(); - if (typeName === 'THINK' || typeName === 'THINKING') { - parts.push({ text: content, type: 'thinking' }); - } else if (typeName === 'RESPONSE') { - parts.push({ text: content, type: 'text' }); - } else { - parts.push({ text: content, type: defaultType }); - } - continue; - } - - let partType = defaultType; - if (itemPath.includes('thinking')) { - partType = 'thinking'; - } else if (itemPath.includes('content') || itemPath === 'response' || itemPath === 'fragments') { - partType = 'text'; - } - - if (typeof itemV === 'string') { - if (itemV && itemV !== 'FINISHED') { - const content = asContentString(itemV, stripReferenceMarkers); - if (content) { - parts.push({ text: content, type: partType }); - } - } - continue; - } - - if (!Array.isArray(itemV)) { - continue; - } - for (const inner of itemV) { - if (typeof inner === 'string') { - if (inner) { - const content = asContentString(inner, stripReferenceMarkers); - if (content) { - parts.push({ text: content, type: partType }); - } - } - continue; - } - if (!inner || typeof inner !== 'object') { - continue; - } - const ct = asContentString(inner.content, stripReferenceMarkers); - if (!ct) { - continue; - } - const typeName = asString(inner.type).toUpperCase(); - if (typeName === 'THINK' || typeName === 'THINKING') { - parts.push({ text: ct, type: 'thinking' }); - } else if (typeName === 'RESPONSE') { - parts.push({ text: ct, type: 'text' }); - } else { - parts.push({ text: ct, type: partType }); - } - } - } - return { parts, finished: false }; -} - -function filterLeakedContentFilterParts(parts) { - if (!Array.isArray(parts) || parts.length === 0) { - return parts; - } - const out = []; - for (const p of parts) { - if (!p || typeof p !== 'object') { - continue; - } - const { text, stripped } = stripLeakedContentFilterSuffix(p.text); - if (stripped && shouldDropCleanedLeakedChunk(text)) { - continue; - } - if (stripped) { - out.push({ ...p, text }); - continue; - } - out.push(p); - } - return out; -} - -function stripLeakedContentFilterSuffix(text) { - if (typeof text !== 'string' || text === '') { - return { text, stripped: false }; - } - const upperText = text.toUpperCase(); - const idx = upperText.indexOf('CONTENT_FILTER'); - if (idx < 0) { - return { text, stripped: false }; - } - return { - text: text.slice(0, idx).replace(/[ \t\r]+$/g, ''), - stripped: true, - }; -} - -function shouldDropCleanedLeakedChunk(cleaned) { - if (cleaned === '') { - return true; - } - if (typeof cleaned === 'string' && cleaned.includes('\n')) { - return false; - } - return asString(cleaned).trim() === ''; -} - -function hasContentFilterStatus(chunk) { - if (!chunk || typeof chunk !== 'object') { - return false; - } - const code = asString(chunk.code); - if (code && code.toLowerCase() === 'content_filter') { - return true; - } - return hasContentFilterStatusValue(chunk); -} - -function hasContentFilterStatusValue(v) { - if (Array.isArray(v)) { - for (const item of v) { - if (hasContentFilterStatusValue(item)) { - return true; - } - } - return false; - } - if (!v || typeof v !== 'object') { - return false; - } - const pathValue = asString(v.p); - if (pathValue && pathValue.toLowerCase().includes('status')) { - if (asString(v.v).toLowerCase() === 'content_filter') { - return true; - } - } - if (asString(v.code).toLowerCase() === 'content_filter') { - return true; - } - for (const value of Object.values(v)) { - if (hasContentFilterStatusValue(value)) { - return true; - } - } - return false; -} - -function extractAccumulatedTokenUsage(chunk) { - return findAccumulatedTokenUsage(chunk); -} - -function findAccumulatedTokenUsage(v) { - if (Array.isArray(v)) { - for (const item of v) { - const n = findAccumulatedTokenUsage(item); - if (n > 0) { - return n; - } - } - return 0; - } - if (!v || typeof v !== 'object') { - return 0; - } - const pathValue = asString(v.p); - if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) { - const n = toInt(v.v); - if (n > 0) { - return n; - } - } - const direct = toInt(v.accumulated_token_usage); - if (direct > 0) { - return direct; - } - for (const value of Object.values(v)) { - const n = findAccumulatedTokenUsage(value); - if (n > 0) { - return n; - } - } - return 0; -} - -function toInt(v) { - if (typeof v !== 'number' || !Number.isFinite(v)) { - return 0; - } - return Math.trunc(v); -} - -function formatErrorMessage(v) { - if (typeof v === 'string') { - return v; - } - if (v == null) { - return String(v); - } - try { - return JSON.stringify(v); - } catch (_err) { - return String(v); - } -} - -function shouldSkipPath(pathValue) { - if (isFragmentStatusPath(pathValue)) { - return true; - } - if (SKIP_EXACT_PATHS.has(pathValue)) { - return true; - } - for (const p of SKIP_PATTERNS) { - if (pathValue.includes(p)) { - return true; - } - } - return false; -} - -function isFragmentStatusPath(pathValue) { - if (!pathValue || pathValue === 'response/status') { - return false; - } - return /^response\/fragments\/-?\d+\/status$/i.test(pathValue); -} - -function isCitation(text) { - return asString(text).trim().startsWith('[citation:'); -} - -function asContentString(v, stripReferenceMarkers = true) { - if (typeof v === 'string') { - return stripReferenceMarkers ? stripReferenceMarkersText(v) : v; - } - if (Array.isArray(v)) { - let out = ''; - for (const item of v) { - out += asContentString(item, stripReferenceMarkers); - } - return out; - } - if (v && typeof v === 'object') { - if (Object.prototype.hasOwnProperty.call(v, 'content')) { - return asContentString(v.content, stripReferenceMarkers); - } - if (Object.prototype.hasOwnProperty.call(v, 'v')) { - return asContentString(v.v, stripReferenceMarkers); - } - return ''; - } - if (v == null) { - return ''; - } - const text = String(v); - return stripReferenceMarkers ? stripReferenceMarkersText(text) : text; -} - -function stripReferenceMarkersText(text) { - if (!text) { - return text; - } - return text.replace(/\[reference:\s*\d+\]/gi, ''); -} - -function asString(v) { - if (typeof v === 'string') { - return v.trim(); - } - if (Array.isArray(v)) { - return asString(v[0]); - } - if (v == null) { - return ''; - } - return String(v).trim(); -} - -module.exports = { - parseChunkForContent, - extractContentRecursive, - filterLeakedContentFilterParts, - hasContentFilterStatus, - extractAccumulatedTokenUsage, - shouldSkipPath, - isFragmentStatusPath, - isCitation, - stripReferenceMarkers: stripReferenceMarkersText, -}; +module.exports = require('./sse_parse_impl'); diff --git a/internal/js/chat-stream/sse_parse_impl.js b/internal/js/chat-stream/sse_parse_impl.js new file mode 100644 index 0000000..577b1c4 --- /dev/null +++ b/internal/js/chat-stream/sse_parse_impl.js @@ -0,0 +1,535 @@ +'use strict'; + +// Implementation moved here to keep the line-gate wrapper tiny. + +const { + SKIP_PATTERNS, + SKIP_EXACT_PATHS, +} = require('../shared/deepseek-constants'); + +function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers = true) { + if (!chunk || typeof chunk !== 'object') { + return { + parsed: false, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens: 0, + newType: currentType, + }; + } + + if (Object.prototype.hasOwnProperty.call(chunk, 'error')) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: formatErrorMessage(chunk.error), + outputTokens: 0, + newType: currentType, + }; + } + + const pathValue = asString(chunk.p); + const outputTokens = extractAccumulatedTokenUsage(chunk); + + if (hasContentFilterStatus(chunk)) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: true, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + + if (shouldSkipPath(pathValue)) { + return { + parsed: true, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + + if (!Object.prototype.hasOwnProperty.call(chunk, 'v')) { + return { + parsed: true, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + + let newType = currentType; + const parts = []; + + if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) { + for (const frag of chunk.v) { + if (!frag || typeof frag !== 'object') { + continue; + } + const fragType = asString(frag.type).toUpperCase(); + const content = asContentString(frag.content, stripReferenceMarkers); + if (!content) { + continue; + } + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (fragType === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: 'text' }); + } + } + } + + if (pathValue === 'response' && Array.isArray(chunk.v)) { + for (const item of chunk.v) { + if (!item || typeof item !== 'object') { + continue; + } + if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) { + for (const frag of item.v) { + const fragType = asString(frag && frag.type).toUpperCase(); + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + } else if (fragType === 'RESPONSE') { + newType = 'text'; + } + } + } + } + } + + let partType = 'text'; + if (pathValue === 'response/thinking_content') { + partType = 'thinking'; + } else if (pathValue === 'response/content') { + partType = 'text'; + } else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) { + partType = newType; + } else if (!pathValue && thinkingEnabled) { + partType = newType; + } + + const val = chunk.v; + if (typeof val === 'string') { + if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; + } + const content = asContentString(val, stripReferenceMarkers); + if (content) { + parts.push({ text: content, type: partType }); + } + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; + } + + if (Array.isArray(val)) { + const extracted = extractContentRecursive(val, partType, stripReferenceMarkers); + if (extracted.finished) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; + } + parts.push(...extracted.parts); + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; + } + + if (val && typeof val === 'object') { + const resp = val.response && typeof val.response === 'object' ? val.response : val; + if (Array.isArray(resp.fragments)) { + for (const frag of resp.fragments) { + if (!frag || typeof frag !== 'object') { + continue; + } + const content = asContentString(frag.content, stripReferenceMarkers); + if (!content) { + continue; + } + const t = asString(frag.type).toUpperCase(); + if (t === 'THINK' || t === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (t === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: partType }); + } + } + } + } + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; +} + +function extractContentRecursive(items, defaultType, stripReferenceMarkers = true) { + const parts = []; + for (const it of items) { + if (!it || typeof it !== 'object') { + continue; + } + if (!Object.prototype.hasOwnProperty.call(it, 'v')) { + continue; + } + const itemPath = asString(it.p); + const itemV = it.v; + if (itemPath === 'status' && asString(itemV) === 'FINISHED') { + return { parts: [], finished: true }; + } + if (shouldSkipPath(itemPath)) { + continue; + } + const content = asContentString(it.content, stripReferenceMarkers); + if (content) { + const typeName = asString(it.type).toUpperCase(); + if (typeName === 'THINK' || typeName === 'THINKING') { + parts.push({ text: content, type: 'thinking' }); + } else if (typeName === 'RESPONSE') { + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: defaultType }); + } + continue; + } + + let partType = defaultType; + if (itemPath.includes('thinking')) { + partType = 'thinking'; + } else if (itemPath.includes('content') || itemPath === 'response' || itemPath === 'fragments') { + partType = 'text'; + } + + if (typeof itemV === 'string') { + if (itemV && itemV !== 'FINISHED') { + const content = asContentString(itemV, stripReferenceMarkers); + if (content) { + parts.push({ text: content, type: partType }); + } + } + continue; + } + + if (!Array.isArray(itemV)) { + continue; + } + for (const inner of itemV) { + if (typeof inner === 'string') { + if (inner) { + const content = asContentString(inner, stripReferenceMarkers); + if (content) { + parts.push({ text: content, type: partType }); + } + } + continue; + } + if (!inner || typeof inner !== 'object') { + continue; + } + const ct = asContentString(inner.content, stripReferenceMarkers); + if (!ct) { + continue; + } + const typeName = asString(inner.type).toUpperCase(); + if (typeName === 'THINK' || typeName === 'THINKING') { + parts.push({ text: ct, type: 'thinking' }); + } else if (typeName === 'RESPONSE') { + parts.push({ text: ct, type: 'text' }); + } else { + parts.push({ text: ct, type: partType }); + } + } + } + return { parts, finished: false }; +} + +function filterLeakedContentFilterParts(parts) { + if (!Array.isArray(parts) || parts.length === 0) { + return parts; + } + const out = []; + for (const p of parts) { + if (!p || typeof p !== 'object') { + continue; + } + const { text, stripped } = stripLeakedContentFilterSuffix(p.text); + if (stripped && shouldDropCleanedLeakedChunk(text)) { + continue; + } + if (stripped) { + out.push({ ...p, text }); + continue; + } + out.push(p); + } + return out; +} + +function stripLeakedContentFilterSuffix(text) { + if (typeof text !== 'string' || text === '') { + return { text, stripped: false }; + } + const upperText = text.toUpperCase(); + const idx = upperText.indexOf('CONTENT_FILTER'); + if (idx < 0) { + return { text, stripped: false }; + } + return { + text: text.slice(0, idx).replace(/[ \t\r]+$/g, ''), + stripped: true, + }; +} + +function shouldDropCleanedLeakedChunk(cleaned) { + if (cleaned === '') { + return true; + } + if (typeof cleaned === 'string' && cleaned.includes('\n')) { + return false; + } + return asString(cleaned).trim() === ''; +} + +function hasContentFilterStatus(chunk) { + if (!chunk || typeof chunk !== 'object') { + return false; + } + const code = asString(chunk.code); + if (code && code.toLowerCase() === 'content_filter') { + return true; + } + return hasContentFilterStatusValue(chunk); +} + +function hasContentFilterStatusValue(v) { + if (Array.isArray(v)) { + for (const item of v) { + if (hasContentFilterStatusValue(item)) { + return true; + } + } + return false; + } + if (!v || typeof v !== 'object') { + return false; + } + const pathValue = asString(v.p); + if (pathValue && pathValue.toLowerCase().includes('status')) { + if (asString(v.v).toLowerCase() === 'content_filter') { + return true; + } + } + if (asString(v.code).toLowerCase() === 'content_filter') { + return true; + } + for (const value of Object.values(v)) { + if (hasContentFilterStatusValue(value)) { + return true; + } + } + return false; +} + +function extractAccumulatedTokenUsage(chunk) { + return findAccumulatedTokenUsage(chunk); +} + +function findAccumulatedTokenUsage(v) { + if (Array.isArray(v)) { + for (const item of v) { + const n = findAccumulatedTokenUsage(item); + if (n > 0) { + return n; + } + } + return 0; + } + if (!v || typeof v !== 'object') { + return 0; + } + const pathValue = asString(v.p); + if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) { + const n = toInt(v.v); + if (n > 0) { + return n; + } + } + const direct = toInt(v.accumulated_token_usage); + if (direct > 0) { + return direct; + } + for (const value of Object.values(v)) { + const n = findAccumulatedTokenUsage(value); + if (n > 0) { + return n; + } + } + return 0; +} + +function toInt(v) { + if (typeof v !== 'number' || !Number.isFinite(v)) { + return 0; + } + return Math.trunc(v); +} + +function formatErrorMessage(v) { + if (typeof v === 'string') { + return v; + } + if (v == null) { + return String(v); + } + try { + return JSON.stringify(v); + } catch (_err) { + return String(v); + } +} + +function shouldSkipPath(pathValue) { + if (isFragmentStatusPath(pathValue)) { + return true; + } + if (SKIP_EXACT_PATHS.has(pathValue)) { + return true; + } + for (const p of SKIP_PATTERNS) { + if (pathValue.includes(p)) { + return true; + } + } + return false; +} + +function isFragmentStatusPath(pathValue) { + if (!pathValue || pathValue === 'response/status') { + return false; + } + return /^response\/fragments\/-?\d+\/status$/i.test(pathValue); +} + +function isCitation(text) { + return asString(text).trim().startsWith('[citation:'); +} + +function asContentString(v, stripReferenceMarkers = true) { + if (typeof v === 'string') { + return stripReferenceMarkers ? stripReferenceMarkersText(v) : v; + } + if (Array.isArray(v)) { + let out = ''; + for (const item of v) { + out += asContentString(item, stripReferenceMarkers); + } + return out; + } + if (v && typeof v === 'object') { + if (Object.prototype.hasOwnProperty.call(v, 'content')) { + return asContentString(v.content, stripReferenceMarkers); + } + if (Object.prototype.hasOwnProperty.call(v, 'v')) { + return asContentString(v.v, stripReferenceMarkers); + } + return ''; + } + if (v == null) { + return ''; + } + const text = String(v); + return stripReferenceMarkers ? stripReferenceMarkersText(text) : text; +} + +function stripReferenceMarkersText(text) { + if (!text) { + return text; + } + return text.replace(/\[reference:\s*\d+\]/gi, ''); +} + +function asString(v) { + if (typeof v === 'string') { + return v.trim(); + } + if (Array.isArray(v)) { + return asString(v[0]); + } + if (v == null) { + return ''; + } + return String(v).trim(); +} + +module.exports = { + parseChunkForContent, + extractContentRecursive, + filterLeakedContentFilterParts, + hasContentFilterStatus, + extractAccumulatedTokenUsage, + shouldSkipPath, + isFragmentStatusPath, + isCitation, + stripReferenceMarkers: stripReferenceMarkersText, +}; diff --git a/internal/js/chat-stream/vercel_stream.js b/internal/js/chat-stream/vercel_stream.js index 030e456..a69f529 100644 --- a/internal/js/chat-stream/vercel_stream.js +++ b/internal/js/chat-stream/vercel_stream.js @@ -1,308 +1,3 @@ 'use strict'; -const { - createToolSieveState, - processToolSieveChunk, - flushToolSieve, - parseStandaloneToolCalls, - formatOpenAIStreamToolCalls, -} = require('../helpers/stream-tool-sieve'); -const { BASE_HEADERS } = require('../shared/deepseek-constants'); -const { writeOpenAIError } = require('./error_shape'); -const { parseChunkForContent, isCitation } = require('./sse_parse'); -const { buildUsage } = require('./token_usage'); -const { - resolveToolcallPolicy, - formatIncrementalToolCallDeltas, - filterIncrementalToolCallDeltasByAllowed, - boolDefaultTrue, -} = require('./toolcall_policy'); -const { createChatCompletionEmitter } = require('./stream_emitter'); -const { - asString, - isAbortError, - fetchStreamPrepare, - relayPreparedFailure, - createLeaseReleaser, -} = require('./http_internal'); - -const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; - -async function handleVercelStream(req, res, rawBody, payload) { - const prep = await fetchStreamPrepare(req, rawBody); - if (!prep.ok) { - relayPreparedFailure(res, prep); - return; - } - - const model = asString(prep.body.model) || asString(payload.model); - const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; - const leaseID = asString(prep.body.lease_id); - const deepseekToken = asString(prep.body.deepseek_token); - const powHeader = asString(prep.body.pow_header); - const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; - const finalPrompt = asString(prep.body.final_prompt); - const thinkingEnabled = toBool(prep.body.thinking_enabled); - const searchEnabled = toBool(prep.body.search_enabled); - const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools); - const toolNames = toolPolicy.toolNames; - const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas; - const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers); - - if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) { - writeOpenAIError(res, 500, 'invalid vercel prepare response'); - return; - } - - const releaseLease = createLeaseReleaser(req, leaseID); - const upstreamController = new AbortController(); - let clientClosed = false; - let reader = null; - const markClientClosed = () => { - if (clientClosed) { - return; - } - clientClosed = true; - upstreamController.abort(); - if (reader && typeof reader.cancel === 'function') { - Promise.resolve(reader.cancel()).catch(() => {}); - } - }; - const onReqAborted = () => markClientClosed(); - const onResClose = () => { - if (!res.writableEnded) { - markClientClosed(); - } - }; - req.on('aborted', onReqAborted); - res.on('close', onResClose); - - try { - let completionRes; - try { - completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { - method: 'POST', - headers: { - ...BASE_HEADERS, - authorization: `Bearer ${deepseekToken}`, - 'x-ds-pow-response': powHeader, - }, - body: JSON.stringify(completionPayload), - signal: upstreamController.signal, - }); - } catch (err) { - if (clientClosed || isAbortError(err)) { - return; - } - throw err; - } - if (clientClosed) { - return; - } - - if (!completionRes.ok || !completionRes.body) { - const detail = completionRes.body ? await completionRes.text() : ''; - const status = completionRes.ok ? 500 : completionRes.status || 500; - writeOpenAIError(res, status, detail); - return; - } - - res.statusCode = 200; - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache, no-transform'); - res.setHeader('Connection', 'keep-alive'); - res.setHeader('X-Accel-Buffering', 'no'); - if (typeof res.flushHeaders === 'function') { - res.flushHeaders(); - } - - const created = Math.floor(Date.now() / 1000); - let currentType = thinkingEnabled ? 'thinking' : 'text'; - let thinkingText = ''; - let outputText = ''; - let outputTokens = 0; - const toolSieveEnabled = toolPolicy.toolSieveEnabled; - const toolSieveState = createToolSieveState(); - let toolCallsEmitted = false; - const streamToolCallIDs = new Map(); - const streamToolNames = new Map(); - const decoder = new TextDecoder(); - reader = completionRes.body.getReader(); - let buffered = ''; - let ended = false; - const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({ - res, - sessionID, - created, - model, - isClosed: () => clientClosed, - }); - - const finish = async (reason) => { - if (ended) { - return; - } - ended = true; - if (clientClosed || res.writableEnded || res.destroyed) { - await releaseLease(); - return; - } - const detected = parseStandaloneToolCalls(outputText, toolNames); - if (detected.length > 0 && !toolCallsEmitted) { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) }); - } else if (toolSieveEnabled) { - const tailEvents = flushToolSieve(toolSieveState, toolNames); - for (const evt of tailEvents) { - if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); - continue; - } - if (evt.text) { - sendDeltaFrame({ content: evt.text }); - } - } - } - if (detected.length > 0 || toolCallsEmitted) { - reason = 'tool_calls'; - } - sendFrame({ - id: sessionID, - object: 'chat.completion.chunk', - created, - model, - choices: [{ delta: {}, index: 0, finish_reason: reason }], - usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens), - }); - if (!res.writableEnded && !res.destroyed) { - res.write('data: [DONE]\n\n'); - } - await releaseLease(); - if (!res.writableEnded && !res.destroyed) { - res.end(); - } - }; - - try { - // eslint-disable-next-line no-constant-condition - while (true) { - if (clientClosed) { - await finish('stop'); - return; - } - const { value, done } = await reader.read(); - if (done) { - break; - } - buffered += decoder.decode(value, { stream: true }); - const lines = buffered.split('\n'); - buffered = lines.pop() || ''; - - for (const rawLine of lines) { - const line = rawLine.trim(); - if (!line.startsWith('data:')) { - continue; - } - const dataStr = line.slice(5).trim(); - if (!dataStr) { - continue; - } - if (dataStr === '[DONE]') { - await finish('stop'); - return; - } - let chunk; - try { - chunk = JSON.parse(dataStr); - } catch (_err) { - continue; - } - const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); - if (!parsed.parsed) { - continue; - } - if (parsed.outputTokens > 0) { - outputTokens = parsed.outputTokens; - } - currentType = parsed.newType; - if (parsed.errorMessage) { - await finish('content_filter'); - return; - } - if (parsed.contentFilter) { - await finish('stop'); - return; - } - if (parsed.finished) { - await finish('stop'); - return; - } - - for (const p of parsed.parts) { - if (!p.text) { - continue; - } - if (searchEnabled && isCitation(p.text)) { - continue; - } - if (p.type === 'thinking') { - if (thinkingEnabled) { - thinkingText += p.text; - sendDeltaFrame({ reasoning_content: p.text }); - } - } else { - outputText += p.text; - if (!toolSieveEnabled) { - sendDeltaFrame({ content: p.text }); - continue; - } - const events = processToolSieveChunk(toolSieveState, p.text, toolNames); - for (const evt of events) { - if (evt.type === 'tool_call_deltas') { - if (!emitEarlyToolDeltas) { - continue; - } - const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames); - const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); - if (formatted.length > 0) { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatted }); - } - continue; - } - if (evt.type === 'tool_calls') { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); - continue; - } - if (evt.text) { - sendDeltaFrame({ content: evt.text }); - } - } - } - } - } - } - await finish('stop'); - } catch (err) { - if (clientClosed || isAbortError(err)) { - await finish('stop'); - return; - } - await finish('stop'); - } - } finally { - req.removeListener('aborted', onReqAborted); - res.removeListener('close', onResClose); - await releaseLease(); - } -} - -function toBool(v) { - return v === true; -} - -module.exports = { - handleVercelStream, -}; +module.exports = require('./vercel_stream_impl'); diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js new file mode 100644 index 0000000..1b2c52c --- /dev/null +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -0,0 +1,310 @@ +'use strict'; + +// Implementation moved here to keep the line-gate wrapper tiny. + +const { + createToolSieveState, + processToolSieveChunk, + flushToolSieve, + parseStandaloneToolCalls, + formatOpenAIStreamToolCalls, +} = require('../helpers/stream-tool-sieve'); +const { BASE_HEADERS } = require('../shared/deepseek-constants'); +const { writeOpenAIError } = require('./error_shape'); +const { parseChunkForContent, isCitation } = require('./sse_parse'); +const { buildUsage } = require('./token_usage'); +const { + resolveToolcallPolicy, + formatIncrementalToolCallDeltas, + filterIncrementalToolCallDeltasByAllowed, + boolDefaultTrue, +} = require('./toolcall_policy'); +const { createChatCompletionEmitter } = require('./stream_emitter'); +const { + asString, + isAbortError, + fetchStreamPrepare, + relayPreparedFailure, + createLeaseReleaser, +} = require('./http_internal'); + +const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; + +async function handleVercelStream(req, res, rawBody, payload) { + const prep = await fetchStreamPrepare(req, rawBody); + if (!prep.ok) { + relayPreparedFailure(res, prep); + return; + } + + const model = asString(prep.body.model) || asString(payload.model); + const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; + const leaseID = asString(prep.body.lease_id); + const deepseekToken = asString(prep.body.deepseek_token); + const powHeader = asString(prep.body.pow_header); + const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; + const finalPrompt = asString(prep.body.final_prompt); + const thinkingEnabled = toBool(prep.body.thinking_enabled); + const searchEnabled = toBool(prep.body.search_enabled); + const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools); + const toolNames = toolPolicy.toolNames; + const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas; + const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers); + + if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) { + writeOpenAIError(res, 500, 'invalid vercel prepare response'); + return; + } + + const releaseLease = createLeaseReleaser(req, leaseID); + const upstreamController = new AbortController(); + let clientClosed = false; + let reader = null; + const markClientClosed = () => { + if (clientClosed) { + return; + } + clientClosed = true; + upstreamController.abort(); + if (reader && typeof reader.cancel === 'function') { + Promise.resolve(reader.cancel()).catch(() => {}); + } + }; + const onReqAborted = () => markClientClosed(); + const onResClose = () => { + if (!res.writableEnded) { + markClientClosed(); + } + }; + req.on('aborted', onReqAborted); + res.on('close', onResClose); + + try { + let completionRes; + try { + completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { + method: 'POST', + headers: { + ...BASE_HEADERS, + authorization: `Bearer ${deepseekToken}`, + 'x-ds-pow-response': powHeader, + }, + body: JSON.stringify(completionPayload), + signal: upstreamController.signal, + }); + } catch (err) { + if (clientClosed || isAbortError(err)) { + return; + } + throw err; + } + if (clientClosed) { + return; + } + + if (!completionRes.ok || !completionRes.body) { + const detail = completionRes.body ? await completionRes.text() : ''; + const status = completionRes.ok ? 500 : completionRes.status || 500; + writeOpenAIError(res, status, detail); + return; + } + + res.statusCode = 200; + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + if (typeof res.flushHeaders === 'function') { + res.flushHeaders(); + } + + const created = Math.floor(Date.now() / 1000); + let currentType = thinkingEnabled ? 'thinking' : 'text'; + let thinkingText = ''; + let outputText = ''; + let outputTokens = 0; + const toolSieveEnabled = toolPolicy.toolSieveEnabled; + const toolSieveState = createToolSieveState(); + let toolCallsEmitted = false; + const streamToolCallIDs = new Map(); + const streamToolNames = new Map(); + const decoder = new TextDecoder(); + reader = completionRes.body.getReader(); + let buffered = ''; + let ended = false; + const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({ + res, + sessionID, + created, + model, + isClosed: () => clientClosed, + }); + + const finish = async (reason) => { + if (ended) { + return; + } + ended = true; + if (clientClosed || res.writableEnded || res.destroyed) { + await releaseLease(); + return; + } + const detected = parseStandaloneToolCalls(outputText, toolNames); + if (detected.length > 0 && !toolCallsEmitted) { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) }); + } else if (toolSieveEnabled) { + const tailEvents = flushToolSieve(toolSieveState, toolNames); + for (const evt of tailEvents) { + if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); + continue; + } + if (evt.text) { + sendDeltaFrame({ content: evt.text }); + } + } + } + if (detected.length > 0 || toolCallsEmitted) { + reason = 'tool_calls'; + } + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta: {}, index: 0, finish_reason: reason }], + usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens), + }); + if (!res.writableEnded && !res.destroyed) { + res.write('data: [DONE]\n\n'); + } + await releaseLease(); + if (!res.writableEnded && !res.destroyed) { + res.end(); + } + }; + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + if (clientClosed) { + await finish('stop'); + return; + } + const { value, done } = await reader.read(); + if (done) { + break; + } + buffered += decoder.decode(value, { stream: true }); + const lines = buffered.split('\n'); + buffered = lines.pop() || ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line.startsWith('data:')) { + continue; + } + const dataStr = line.slice(5).trim(); + if (!dataStr) { + continue; + } + if (dataStr === '[DONE]') { + await finish('stop'); + return; + } + let chunk; + try { + chunk = JSON.parse(dataStr); + } catch (_err) { + continue; + } + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); + if (!parsed.parsed) { + continue; + } + if (parsed.outputTokens > 0) { + outputTokens = parsed.outputTokens; + } + currentType = parsed.newType; + if (parsed.errorMessage) { + await finish('content_filter'); + return; + } + if (parsed.contentFilter) { + await finish('stop'); + return; + } + if (parsed.finished) { + await finish('stop'); + return; + } + + for (const p of parsed.parts) { + if (!p.text) { + continue; + } + if (searchEnabled && isCitation(p.text)) { + continue; + } + if (p.type === 'thinking') { + if (thinkingEnabled) { + thinkingText += p.text; + sendDeltaFrame({ reasoning_content: p.text }); + } + } else { + outputText += p.text; + if (!toolSieveEnabled) { + sendDeltaFrame({ content: p.text }); + continue; + } + const events = processToolSieveChunk(toolSieveState, p.text, toolNames); + for (const evt of events) { + if (evt.type === 'tool_call_deltas') { + if (!emitEarlyToolDeltas) { + continue; + } + const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames); + const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); + if (formatted.length > 0) { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatted }); + } + continue; + } + if (evt.type === 'tool_calls') { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); + continue; + } + if (evt.text) { + sendDeltaFrame({ content: evt.text }); + } + } + } + } + } + } + await finish('stop'); + } catch (err) { + if (clientClosed || isAbortError(err)) { + await finish('stop'); + return; + } + await finish('stop'); + } + } finally { + req.removeListener('aborted', onReqAborted); + res.removeListener('close', onResClose); + await releaseLease(); + } +} + +function toBool(v) { + return v === true; +} + +module.exports = { + handleVercelStream, +};