refactor: extract SSE parsing and Vercel stream logic into dedicated implementation modules

This commit is contained in:
CJACK
2026-04-05 16:32:13 +08:00
parent 1d80f644d4
commit 298a6f27cc
4 changed files with 847 additions and 837 deletions

View File

@@ -1,533 +1,3 @@
'use strict';
const {
SKIP_PATTERNS,
SKIP_EXACT_PATHS,
} = require('../shared/deepseek-constants');
function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers = true) {
if (!chunk || typeof chunk !== 'object') {
return {
parsed: false,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens: 0,
newType: currentType,
};
}
if (Object.prototype.hasOwnProperty.call(chunk, 'error')) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: formatErrorMessage(chunk.error),
outputTokens: 0,
newType: currentType,
};
}
const pathValue = asString(chunk.p);
const outputTokens = extractAccumulatedTokenUsage(chunk);
if (hasContentFilterStatus(chunk)) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: true,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (shouldSkipPath(pathValue)) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (!Object.prototype.hasOwnProperty.call(chunk, 'v')) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
let newType = currentType;
const parts = [];
if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) {
for (const frag of chunk.v) {
if (!frag || typeof frag !== 'object') {
continue;
}
const fragType = asString(frag.type).toUpperCase();
const content = asContentString(frag.content, stripReferenceMarkers);
if (!content) {
continue;
}
if (fragType === 'THINK' || fragType === 'THINKING') {
newType = 'thinking';
parts.push({ text: content, type: 'thinking' });
} else if (fragType === 'RESPONSE') {
newType = 'text';
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: 'text' });
}
}
}
if (pathValue === 'response' && Array.isArray(chunk.v)) {
for (const item of chunk.v) {
if (!item || typeof item !== 'object') {
continue;
}
if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) {
for (const frag of item.v) {
const fragType = asString(frag && frag.type).toUpperCase();
if (fragType === 'THINK' || fragType === 'THINKING') {
newType = 'thinking';
} else if (fragType === 'RESPONSE') {
newType = 'text';
}
}
}
}
}
let partType = 'text';
if (pathValue === 'response/thinking_content') {
partType = 'thinking';
} else if (pathValue === 'response/content') {
partType = 'text';
} else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) {
partType = newType;
} else if (!pathValue && thinkingEnabled) {
partType = newType;
}
const val = chunk.v;
if (typeof val === 'string') {
if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
const content = asContentString(val, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
if (Array.isArray(val)) {
const extracted = extractContentRecursive(val, partType, stripReferenceMarkers);
if (extracted.finished) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
parts.push(...extracted.parts);
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
if (val && typeof val === 'object') {
const resp = val.response && typeof val.response === 'object' ? val.response : val;
if (Array.isArray(resp.fragments)) {
for (const frag of resp.fragments) {
if (!frag || typeof frag !== 'object') {
continue;
}
const content = asContentString(frag.content, stripReferenceMarkers);
if (!content) {
continue;
}
const t = asString(frag.type).toUpperCase();
if (t === 'THINK' || t === 'THINKING') {
newType = 'thinking';
parts.push({ text: content, type: 'thinking' });
} else if (t === 'RESPONSE') {
newType = 'text';
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: partType });
}
}
}
}
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
function extractContentRecursive(items, defaultType, stripReferenceMarkers = true) {
const parts = [];
for (const it of items) {
if (!it || typeof it !== 'object') {
continue;
}
if (!Object.prototype.hasOwnProperty.call(it, 'v')) {
continue;
}
const itemPath = asString(it.p);
const itemV = it.v;
if (itemPath === 'status' && asString(itemV) === 'FINISHED') {
return { parts: [], finished: true };
}
if (shouldSkipPath(itemPath)) {
continue;
}
const content = asContentString(it.content, stripReferenceMarkers);
if (content) {
const typeName = asString(it.type).toUpperCase();
if (typeName === 'THINK' || typeName === 'THINKING') {
parts.push({ text: content, type: 'thinking' });
} else if (typeName === 'RESPONSE') {
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: defaultType });
}
continue;
}
let partType = defaultType;
if (itemPath.includes('thinking')) {
partType = 'thinking';
} else if (itemPath.includes('content') || itemPath === 'response' || itemPath === 'fragments') {
partType = 'text';
}
if (typeof itemV === 'string') {
if (itemV && itemV !== 'FINISHED') {
const content = asContentString(itemV, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
}
continue;
}
if (!Array.isArray(itemV)) {
continue;
}
for (const inner of itemV) {
if (typeof inner === 'string') {
if (inner) {
const content = asContentString(inner, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
}
continue;
}
if (!inner || typeof inner !== 'object') {
continue;
}
const ct = asContentString(inner.content, stripReferenceMarkers);
if (!ct) {
continue;
}
const typeName = asString(inner.type).toUpperCase();
if (typeName === 'THINK' || typeName === 'THINKING') {
parts.push({ text: ct, type: 'thinking' });
} else if (typeName === 'RESPONSE') {
parts.push({ text: ct, type: 'text' });
} else {
parts.push({ text: ct, type: partType });
}
}
}
return { parts, finished: false };
}
function filterLeakedContentFilterParts(parts) {
if (!Array.isArray(parts) || parts.length === 0) {
return parts;
}
const out = [];
for (const p of parts) {
if (!p || typeof p !== 'object') {
continue;
}
const { text, stripped } = stripLeakedContentFilterSuffix(p.text);
if (stripped && shouldDropCleanedLeakedChunk(text)) {
continue;
}
if (stripped) {
out.push({ ...p, text });
continue;
}
out.push(p);
}
return out;
}
function stripLeakedContentFilterSuffix(text) {
if (typeof text !== 'string' || text === '') {
return { text, stripped: false };
}
const upperText = text.toUpperCase();
const idx = upperText.indexOf('CONTENT_FILTER');
if (idx < 0) {
return { text, stripped: false };
}
return {
text: text.slice(0, idx).replace(/[ \t\r]+$/g, ''),
stripped: true,
};
}
function shouldDropCleanedLeakedChunk(cleaned) {
if (cleaned === '') {
return true;
}
if (typeof cleaned === 'string' && cleaned.includes('\n')) {
return false;
}
return asString(cleaned).trim() === '';
}
function hasContentFilterStatus(chunk) {
if (!chunk || typeof chunk !== 'object') {
return false;
}
const code = asString(chunk.code);
if (code && code.toLowerCase() === 'content_filter') {
return true;
}
return hasContentFilterStatusValue(chunk);
}
function hasContentFilterStatusValue(v) {
if (Array.isArray(v)) {
for (const item of v) {
if (hasContentFilterStatusValue(item)) {
return true;
}
}
return false;
}
if (!v || typeof v !== 'object') {
return false;
}
const pathValue = asString(v.p);
if (pathValue && pathValue.toLowerCase().includes('status')) {
if (asString(v.v).toLowerCase() === 'content_filter') {
return true;
}
}
if (asString(v.code).toLowerCase() === 'content_filter') {
return true;
}
for (const value of Object.values(v)) {
if (hasContentFilterStatusValue(value)) {
return true;
}
}
return false;
}
function extractAccumulatedTokenUsage(chunk) {
return findAccumulatedTokenUsage(chunk);
}
function findAccumulatedTokenUsage(v) {
if (Array.isArray(v)) {
for (const item of v) {
const n = findAccumulatedTokenUsage(item);
if (n > 0) {
return n;
}
}
return 0;
}
if (!v || typeof v !== 'object') {
return 0;
}
const pathValue = asString(v.p);
if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) {
const n = toInt(v.v);
if (n > 0) {
return n;
}
}
const direct = toInt(v.accumulated_token_usage);
if (direct > 0) {
return direct;
}
for (const value of Object.values(v)) {
const n = findAccumulatedTokenUsage(value);
if (n > 0) {
return n;
}
}
return 0;
}
function toInt(v) {
if (typeof v !== 'number' || !Number.isFinite(v)) {
return 0;
}
return Math.trunc(v);
}
function formatErrorMessage(v) {
if (typeof v === 'string') {
return v;
}
if (v == null) {
return String(v);
}
try {
return JSON.stringify(v);
} catch (_err) {
return String(v);
}
}
function shouldSkipPath(pathValue) {
if (isFragmentStatusPath(pathValue)) {
return true;
}
if (SKIP_EXACT_PATHS.has(pathValue)) {
return true;
}
for (const p of SKIP_PATTERNS) {
if (pathValue.includes(p)) {
return true;
}
}
return false;
}
function isFragmentStatusPath(pathValue) {
if (!pathValue || pathValue === 'response/status') {
return false;
}
return /^response\/fragments\/-?\d+\/status$/i.test(pathValue);
}
function isCitation(text) {
return asString(text).trim().startsWith('[citation:');
}
function asContentString(v, stripReferenceMarkers = true) {
if (typeof v === 'string') {
return stripReferenceMarkers ? stripReferenceMarkersText(v) : v;
}
if (Array.isArray(v)) {
let out = '';
for (const item of v) {
out += asContentString(item, stripReferenceMarkers);
}
return out;
}
if (v && typeof v === 'object') {
if (Object.prototype.hasOwnProperty.call(v, 'content')) {
return asContentString(v.content, stripReferenceMarkers);
}
if (Object.prototype.hasOwnProperty.call(v, 'v')) {
return asContentString(v.v, stripReferenceMarkers);
}
return '';
}
if (v == null) {
return '';
}
const text = String(v);
return stripReferenceMarkers ? stripReferenceMarkersText(text) : text;
}
function stripReferenceMarkersText(text) {
if (!text) {
return text;
}
return text.replace(/\[reference:\s*\d+\]/gi, '');
}
function asString(v) {
if (typeof v === 'string') {
return v.trim();
}
if (Array.isArray(v)) {
return asString(v[0]);
}
if (v == null) {
return '';
}
return String(v).trim();
}
module.exports = {
parseChunkForContent,
extractContentRecursive,
filterLeakedContentFilterParts,
hasContentFilterStatus,
extractAccumulatedTokenUsage,
shouldSkipPath,
isFragmentStatusPath,
isCitation,
stripReferenceMarkers: stripReferenceMarkersText,
};
module.exports = require('./sse_parse_impl');

View File

@@ -0,0 +1,535 @@
'use strict';
// Implementation moved here to keep the line-gate wrapper tiny.
const {
SKIP_PATTERNS,
SKIP_EXACT_PATHS,
} = require('../shared/deepseek-constants');
function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers = true) {
if (!chunk || typeof chunk !== 'object') {
return {
parsed: false,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens: 0,
newType: currentType,
};
}
if (Object.prototype.hasOwnProperty.call(chunk, 'error')) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: formatErrorMessage(chunk.error),
outputTokens: 0,
newType: currentType,
};
}
const pathValue = asString(chunk.p);
const outputTokens = extractAccumulatedTokenUsage(chunk);
if (hasContentFilterStatus(chunk)) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: true,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (shouldSkipPath(pathValue)) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
if (!Object.prototype.hasOwnProperty.call(chunk, 'v')) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
let newType = currentType;
const parts = [];
if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) {
for (const frag of chunk.v) {
if (!frag || typeof frag !== 'object') {
continue;
}
const fragType = asString(frag.type).toUpperCase();
const content = asContentString(frag.content, stripReferenceMarkers);
if (!content) {
continue;
}
if (fragType === 'THINK' || fragType === 'THINKING') {
newType = 'thinking';
parts.push({ text: content, type: 'thinking' });
} else if (fragType === 'RESPONSE') {
newType = 'text';
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: 'text' });
}
}
}
if (pathValue === 'response' && Array.isArray(chunk.v)) {
for (const item of chunk.v) {
if (!item || typeof item !== 'object') {
continue;
}
if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) {
for (const frag of item.v) {
const fragType = asString(frag && frag.type).toUpperCase();
if (fragType === 'THINK' || fragType === 'THINKING') {
newType = 'thinking';
} else if (fragType === 'RESPONSE') {
newType = 'text';
}
}
}
}
}
let partType = 'text';
if (pathValue === 'response/thinking_content') {
partType = 'thinking';
} else if (pathValue === 'response/content') {
partType = 'text';
} else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) {
partType = newType;
} else if (!pathValue && thinkingEnabled) {
partType = newType;
}
const val = chunk.v;
if (typeof val === 'string') {
if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
const content = asContentString(val, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
if (Array.isArray(val)) {
const extracted = extractContentRecursive(val, partType, stripReferenceMarkers);
if (extracted.finished) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
parts.push(...extracted.parts);
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
if (val && typeof val === 'object') {
const resp = val.response && typeof val.response === 'object' ? val.response : val;
if (Array.isArray(resp.fragments)) {
for (const frag of resp.fragments) {
if (!frag || typeof frag !== 'object') {
continue;
}
const content = asContentString(frag.content, stripReferenceMarkers);
if (!content) {
continue;
}
const t = asString(frag.type).toUpperCase();
if (t === 'THINK' || t === 'THINKING') {
newType = 'thinking';
parts.push({ text: content, type: 'thinking' });
} else if (t === 'RESPONSE') {
newType = 'text';
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: partType });
}
}
}
}
return {
parsed: true,
parts: filterLeakedContentFilterParts(parts),
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
function extractContentRecursive(items, defaultType, stripReferenceMarkers = true) {
const parts = [];
for (const it of items) {
if (!it || typeof it !== 'object') {
continue;
}
if (!Object.prototype.hasOwnProperty.call(it, 'v')) {
continue;
}
const itemPath = asString(it.p);
const itemV = it.v;
if (itemPath === 'status' && asString(itemV) === 'FINISHED') {
return { parts: [], finished: true };
}
if (shouldSkipPath(itemPath)) {
continue;
}
const content = asContentString(it.content, stripReferenceMarkers);
if (content) {
const typeName = asString(it.type).toUpperCase();
if (typeName === 'THINK' || typeName === 'THINKING') {
parts.push({ text: content, type: 'thinking' });
} else if (typeName === 'RESPONSE') {
parts.push({ text: content, type: 'text' });
} else {
parts.push({ text: content, type: defaultType });
}
continue;
}
let partType = defaultType;
if (itemPath.includes('thinking')) {
partType = 'thinking';
} else if (itemPath.includes('content') || itemPath === 'response' || itemPath === 'fragments') {
partType = 'text';
}
if (typeof itemV === 'string') {
if (itemV && itemV !== 'FINISHED') {
const content = asContentString(itemV, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
}
continue;
}
if (!Array.isArray(itemV)) {
continue;
}
for (const inner of itemV) {
if (typeof inner === 'string') {
if (inner) {
const content = asContentString(inner, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
}
}
continue;
}
if (!inner || typeof inner !== 'object') {
continue;
}
const ct = asContentString(inner.content, stripReferenceMarkers);
if (!ct) {
continue;
}
const typeName = asString(inner.type).toUpperCase();
if (typeName === 'THINK' || typeName === 'THINKING') {
parts.push({ text: ct, type: 'thinking' });
} else if (typeName === 'RESPONSE') {
parts.push({ text: ct, type: 'text' });
} else {
parts.push({ text: ct, type: partType });
}
}
}
return { parts, finished: false };
}
function filterLeakedContentFilterParts(parts) {
if (!Array.isArray(parts) || parts.length === 0) {
return parts;
}
const out = [];
for (const p of parts) {
if (!p || typeof p !== 'object') {
continue;
}
const { text, stripped } = stripLeakedContentFilterSuffix(p.text);
if (stripped && shouldDropCleanedLeakedChunk(text)) {
continue;
}
if (stripped) {
out.push({ ...p, text });
continue;
}
out.push(p);
}
return out;
}
function stripLeakedContentFilterSuffix(text) {
if (typeof text !== 'string' || text === '') {
return { text, stripped: false };
}
const upperText = text.toUpperCase();
const idx = upperText.indexOf('CONTENT_FILTER');
if (idx < 0) {
return { text, stripped: false };
}
return {
text: text.slice(0, idx).replace(/[ \t\r]+$/g, ''),
stripped: true,
};
}
function shouldDropCleanedLeakedChunk(cleaned) {
if (cleaned === '') {
return true;
}
if (typeof cleaned === 'string' && cleaned.includes('\n')) {
return false;
}
return asString(cleaned).trim() === '';
}
function hasContentFilterStatus(chunk) {
if (!chunk || typeof chunk !== 'object') {
return false;
}
const code = asString(chunk.code);
if (code && code.toLowerCase() === 'content_filter') {
return true;
}
return hasContentFilterStatusValue(chunk);
}
function hasContentFilterStatusValue(v) {
if (Array.isArray(v)) {
for (const item of v) {
if (hasContentFilterStatusValue(item)) {
return true;
}
}
return false;
}
if (!v || typeof v !== 'object') {
return false;
}
const pathValue = asString(v.p);
if (pathValue && pathValue.toLowerCase().includes('status')) {
if (asString(v.v).toLowerCase() === 'content_filter') {
return true;
}
}
if (asString(v.code).toLowerCase() === 'content_filter') {
return true;
}
for (const value of Object.values(v)) {
if (hasContentFilterStatusValue(value)) {
return true;
}
}
return false;
}
function extractAccumulatedTokenUsage(chunk) {
return findAccumulatedTokenUsage(chunk);
}
function findAccumulatedTokenUsage(v) {
if (Array.isArray(v)) {
for (const item of v) {
const n = findAccumulatedTokenUsage(item);
if (n > 0) {
return n;
}
}
return 0;
}
if (!v || typeof v !== 'object') {
return 0;
}
const pathValue = asString(v.p);
if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) {
const n = toInt(v.v);
if (n > 0) {
return n;
}
}
const direct = toInt(v.accumulated_token_usage);
if (direct > 0) {
return direct;
}
for (const value of Object.values(v)) {
const n = findAccumulatedTokenUsage(value);
if (n > 0) {
return n;
}
}
return 0;
}
function toInt(v) {
if (typeof v !== 'number' || !Number.isFinite(v)) {
return 0;
}
return Math.trunc(v);
}
function formatErrorMessage(v) {
if (typeof v === 'string') {
return v;
}
if (v == null) {
return String(v);
}
try {
return JSON.stringify(v);
} catch (_err) {
return String(v);
}
}
function shouldSkipPath(pathValue) {
if (isFragmentStatusPath(pathValue)) {
return true;
}
if (SKIP_EXACT_PATHS.has(pathValue)) {
return true;
}
for (const p of SKIP_PATTERNS) {
if (pathValue.includes(p)) {
return true;
}
}
return false;
}
function isFragmentStatusPath(pathValue) {
if (!pathValue || pathValue === 'response/status') {
return false;
}
return /^response\/fragments\/-?\d+\/status$/i.test(pathValue);
}
function isCitation(text) {
return asString(text).trim().startsWith('[citation:');
}
function asContentString(v, stripReferenceMarkers = true) {
if (typeof v === 'string') {
return stripReferenceMarkers ? stripReferenceMarkersText(v) : v;
}
if (Array.isArray(v)) {
let out = '';
for (const item of v) {
out += asContentString(item, stripReferenceMarkers);
}
return out;
}
if (v && typeof v === 'object') {
if (Object.prototype.hasOwnProperty.call(v, 'content')) {
return asContentString(v.content, stripReferenceMarkers);
}
if (Object.prototype.hasOwnProperty.call(v, 'v')) {
return asContentString(v.v, stripReferenceMarkers);
}
return '';
}
if (v == null) {
return '';
}
const text = String(v);
return stripReferenceMarkers ? stripReferenceMarkersText(text) : text;
}
function stripReferenceMarkersText(text) {
if (!text) {
return text;
}
return text.replace(/\[reference:\s*\d+\]/gi, '');
}
function asString(v) {
if (typeof v === 'string') {
return v.trim();
}
if (Array.isArray(v)) {
return asString(v[0]);
}
if (v == null) {
return '';
}
return String(v).trim();
}
module.exports = {
parseChunkForContent,
extractContentRecursive,
filterLeakedContentFilterParts,
hasContentFilterStatus,
extractAccumulatedTokenUsage,
shouldSkipPath,
isFragmentStatusPath,
isCitation,
stripReferenceMarkers: stripReferenceMarkersText,
};

View File

@@ -1,308 +1,3 @@
'use strict';
const {
createToolSieveState,
processToolSieveChunk,
flushToolSieve,
parseStandaloneToolCalls,
formatOpenAIStreamToolCalls,
} = require('../helpers/stream-tool-sieve');
const { BASE_HEADERS } = require('../shared/deepseek-constants');
const { writeOpenAIError } = require('./error_shape');
const { parseChunkForContent, isCitation } = require('./sse_parse');
const { buildUsage } = require('./token_usage');
const {
resolveToolcallPolicy,
formatIncrementalToolCallDeltas,
filterIncrementalToolCallDeltasByAllowed,
boolDefaultTrue,
} = require('./toolcall_policy');
const { createChatCompletionEmitter } = require('./stream_emitter');
const {
asString,
isAbortError,
fetchStreamPrepare,
relayPreparedFailure,
createLeaseReleaser,
} = require('./http_internal');
const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion';
async function handleVercelStream(req, res, rawBody, payload) {
const prep = await fetchStreamPrepare(req, rawBody);
if (!prep.ok) {
relayPreparedFailure(res, prep);
return;
}
const model = asString(prep.body.model) || asString(payload.model);
const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`;
const leaseID = asString(prep.body.lease_id);
const deepseekToken = asString(prep.body.deepseek_token);
const powHeader = asString(prep.body.pow_header);
const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null;
const finalPrompt = asString(prep.body.final_prompt);
const thinkingEnabled = toBool(prep.body.thinking_enabled);
const searchEnabled = toBool(prep.body.search_enabled);
const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools);
const toolNames = toolPolicy.toolNames;
const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas;
const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers);
if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) {
writeOpenAIError(res, 500, 'invalid vercel prepare response');
return;
}
const releaseLease = createLeaseReleaser(req, leaseID);
const upstreamController = new AbortController();
let clientClosed = false;
let reader = null;
const markClientClosed = () => {
if (clientClosed) {
return;
}
clientClosed = true;
upstreamController.abort();
if (reader && typeof reader.cancel === 'function') {
Promise.resolve(reader.cancel()).catch(() => {});
}
};
const onReqAborted = () => markClientClosed();
const onResClose = () => {
if (!res.writableEnded) {
markClientClosed();
}
};
req.on('aborted', onReqAborted);
res.on('close', onResClose);
try {
let completionRes;
try {
completionRes = await fetch(DEEPSEEK_COMPLETION_URL, {
method: 'POST',
headers: {
...BASE_HEADERS,
authorization: `Bearer ${deepseekToken}`,
'x-ds-pow-response': powHeader,
},
body: JSON.stringify(completionPayload),
signal: upstreamController.signal,
});
} catch (err) {
if (clientClosed || isAbortError(err)) {
return;
}
throw err;
}
if (clientClosed) {
return;
}
if (!completionRes.ok || !completionRes.body) {
const detail = completionRes.body ? await completionRes.text() : '';
const status = completionRes.ok ? 500 : completionRes.status || 500;
writeOpenAIError(res, status, detail);
return;
}
res.statusCode = 200;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
if (typeof res.flushHeaders === 'function') {
res.flushHeaders();
}
const created = Math.floor(Date.now() / 1000);
let currentType = thinkingEnabled ? 'thinking' : 'text';
let thinkingText = '';
let outputText = '';
let outputTokens = 0;
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
const toolSieveState = createToolSieveState();
let toolCallsEmitted = false;
const streamToolCallIDs = new Map();
const streamToolNames = new Map();
const decoder = new TextDecoder();
reader = completionRes.body.getReader();
let buffered = '';
let ended = false;
const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({
res,
sessionID,
created,
model,
isClosed: () => clientClosed,
});
const finish = async (reason) => {
if (ended) {
return;
}
ended = true;
if (clientClosed || res.writableEnded || res.destroyed) {
await releaseLease();
return;
}
const detected = parseStandaloneToolCalls(outputText, toolNames);
if (detected.length > 0 && !toolCallsEmitted) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) });
} else if (toolSieveEnabled) {
const tailEvents = flushToolSieve(toolSieveState, toolNames);
for (const evt of tailEvents) {
if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
continue;
}
if (evt.text) {
sendDeltaFrame({ content: evt.text });
}
}
}
if (detected.length > 0 || toolCallsEmitted) {
reason = 'tool_calls';
}
sendFrame({
id: sessionID,
object: 'chat.completion.chunk',
created,
model,
choices: [{ delta: {}, index: 0, finish_reason: reason }],
usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens),
});
if (!res.writableEnded && !res.destroyed) {
res.write('data: [DONE]\n\n');
}
await releaseLease();
if (!res.writableEnded && !res.destroyed) {
res.end();
}
};
try {
// eslint-disable-next-line no-constant-condition
while (true) {
if (clientClosed) {
await finish('stop');
return;
}
const { value, done } = await reader.read();
if (done) {
break;
}
buffered += decoder.decode(value, { stream: true });
const lines = buffered.split('\n');
buffered = lines.pop() || '';
for (const rawLine of lines) {
const line = rawLine.trim();
if (!line.startsWith('data:')) {
continue;
}
const dataStr = line.slice(5).trim();
if (!dataStr) {
continue;
}
if (dataStr === '[DONE]') {
await finish('stop');
return;
}
let chunk;
try {
chunk = JSON.parse(dataStr);
} catch (_err) {
continue;
}
const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers);
if (!parsed.parsed) {
continue;
}
if (parsed.outputTokens > 0) {
outputTokens = parsed.outputTokens;
}
currentType = parsed.newType;
if (parsed.errorMessage) {
await finish('content_filter');
return;
}
if (parsed.contentFilter) {
await finish('stop');
return;
}
if (parsed.finished) {
await finish('stop');
return;
}
for (const p of parsed.parts) {
if (!p.text) {
continue;
}
if (searchEnabled && isCitation(p.text)) {
continue;
}
if (p.type === 'thinking') {
if (thinkingEnabled) {
thinkingText += p.text;
sendDeltaFrame({ reasoning_content: p.text });
}
} else {
outputText += p.text;
if (!toolSieveEnabled) {
sendDeltaFrame({ content: p.text });
continue;
}
const events = processToolSieveChunk(toolSieveState, p.text, toolNames);
for (const evt of events) {
if (evt.type === 'tool_call_deltas') {
if (!emitEarlyToolDeltas) {
continue;
}
const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames);
const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs);
if (formatted.length > 0) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatted });
}
continue;
}
if (evt.type === 'tool_calls') {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
continue;
}
if (evt.text) {
sendDeltaFrame({ content: evt.text });
}
}
}
}
}
}
await finish('stop');
} catch (err) {
if (clientClosed || isAbortError(err)) {
await finish('stop');
return;
}
await finish('stop');
}
} finally {
req.removeListener('aborted', onReqAborted);
res.removeListener('close', onResClose);
await releaseLease();
}
}
function toBool(v) {
return v === true;
}
module.exports = {
handleVercelStream,
};
module.exports = require('./vercel_stream_impl');

View File

@@ -0,0 +1,310 @@
'use strict';
// Implementation moved here to keep the line-gate wrapper tiny.
const {
createToolSieveState,
processToolSieveChunk,
flushToolSieve,
parseStandaloneToolCalls,
formatOpenAIStreamToolCalls,
} = require('../helpers/stream-tool-sieve');
const { BASE_HEADERS } = require('../shared/deepseek-constants');
const { writeOpenAIError } = require('./error_shape');
const { parseChunkForContent, isCitation } = require('./sse_parse');
const { buildUsage } = require('./token_usage');
const {
resolveToolcallPolicy,
formatIncrementalToolCallDeltas,
filterIncrementalToolCallDeltasByAllowed,
boolDefaultTrue,
} = require('./toolcall_policy');
const { createChatCompletionEmitter } = require('./stream_emitter');
const {
asString,
isAbortError,
fetchStreamPrepare,
relayPreparedFailure,
createLeaseReleaser,
} = require('./http_internal');
const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion';
async function handleVercelStream(req, res, rawBody, payload) {
const prep = await fetchStreamPrepare(req, rawBody);
if (!prep.ok) {
relayPreparedFailure(res, prep);
return;
}
const model = asString(prep.body.model) || asString(payload.model);
const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`;
const leaseID = asString(prep.body.lease_id);
const deepseekToken = asString(prep.body.deepseek_token);
const powHeader = asString(prep.body.pow_header);
const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null;
const finalPrompt = asString(prep.body.final_prompt);
const thinkingEnabled = toBool(prep.body.thinking_enabled);
const searchEnabled = toBool(prep.body.search_enabled);
const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools);
const toolNames = toolPolicy.toolNames;
const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas;
const stripReferenceMarkers = boolDefaultTrue(prep.body.compat && prep.body.compat.strip_reference_markers);
if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) {
writeOpenAIError(res, 500, 'invalid vercel prepare response');
return;
}
const releaseLease = createLeaseReleaser(req, leaseID);
const upstreamController = new AbortController();
let clientClosed = false;
let reader = null;
const markClientClosed = () => {
if (clientClosed) {
return;
}
clientClosed = true;
upstreamController.abort();
if (reader && typeof reader.cancel === 'function') {
Promise.resolve(reader.cancel()).catch(() => {});
}
};
const onReqAborted = () => markClientClosed();
const onResClose = () => {
if (!res.writableEnded) {
markClientClosed();
}
};
req.on('aborted', onReqAborted);
res.on('close', onResClose);
try {
let completionRes;
try {
completionRes = await fetch(DEEPSEEK_COMPLETION_URL, {
method: 'POST',
headers: {
...BASE_HEADERS,
authorization: `Bearer ${deepseekToken}`,
'x-ds-pow-response': powHeader,
},
body: JSON.stringify(completionPayload),
signal: upstreamController.signal,
});
} catch (err) {
if (clientClosed || isAbortError(err)) {
return;
}
throw err;
}
if (clientClosed) {
return;
}
if (!completionRes.ok || !completionRes.body) {
const detail = completionRes.body ? await completionRes.text() : '';
const status = completionRes.ok ? 500 : completionRes.status || 500;
writeOpenAIError(res, status, detail);
return;
}
res.statusCode = 200;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
if (typeof res.flushHeaders === 'function') {
res.flushHeaders();
}
const created = Math.floor(Date.now() / 1000);
let currentType = thinkingEnabled ? 'thinking' : 'text';
let thinkingText = '';
let outputText = '';
let outputTokens = 0;
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
const toolSieveState = createToolSieveState();
let toolCallsEmitted = false;
const streamToolCallIDs = new Map();
const streamToolNames = new Map();
const decoder = new TextDecoder();
reader = completionRes.body.getReader();
let buffered = '';
let ended = false;
const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({
res,
sessionID,
created,
model,
isClosed: () => clientClosed,
});
const finish = async (reason) => {
if (ended) {
return;
}
ended = true;
if (clientClosed || res.writableEnded || res.destroyed) {
await releaseLease();
return;
}
const detected = parseStandaloneToolCalls(outputText, toolNames);
if (detected.length > 0 && !toolCallsEmitted) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) });
} else if (toolSieveEnabled) {
const tailEvents = flushToolSieve(toolSieveState, toolNames);
for (const evt of tailEvents) {
if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
continue;
}
if (evt.text) {
sendDeltaFrame({ content: evt.text });
}
}
}
if (detected.length > 0 || toolCallsEmitted) {
reason = 'tool_calls';
}
sendFrame({
id: sessionID,
object: 'chat.completion.chunk',
created,
model,
choices: [{ delta: {}, index: 0, finish_reason: reason }],
usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens),
});
if (!res.writableEnded && !res.destroyed) {
res.write('data: [DONE]\n\n');
}
await releaseLease();
if (!res.writableEnded && !res.destroyed) {
res.end();
}
};
try {
// eslint-disable-next-line no-constant-condition
while (true) {
if (clientClosed) {
await finish('stop');
return;
}
const { value, done } = await reader.read();
if (done) {
break;
}
buffered += decoder.decode(value, { stream: true });
const lines = buffered.split('\n');
buffered = lines.pop() || '';
for (const rawLine of lines) {
const line = rawLine.trim();
if (!line.startsWith('data:')) {
continue;
}
const dataStr = line.slice(5).trim();
if (!dataStr) {
continue;
}
if (dataStr === '[DONE]') {
await finish('stop');
return;
}
let chunk;
try {
chunk = JSON.parse(dataStr);
} catch (_err) {
continue;
}
const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers);
if (!parsed.parsed) {
continue;
}
if (parsed.outputTokens > 0) {
outputTokens = parsed.outputTokens;
}
currentType = parsed.newType;
if (parsed.errorMessage) {
await finish('content_filter');
return;
}
if (parsed.contentFilter) {
await finish('stop');
return;
}
if (parsed.finished) {
await finish('stop');
return;
}
for (const p of parsed.parts) {
if (!p.text) {
continue;
}
if (searchEnabled && isCitation(p.text)) {
continue;
}
if (p.type === 'thinking') {
if (thinkingEnabled) {
thinkingText += p.text;
sendDeltaFrame({ reasoning_content: p.text });
}
} else {
outputText += p.text;
if (!toolSieveEnabled) {
sendDeltaFrame({ content: p.text });
continue;
}
const events = processToolSieveChunk(toolSieveState, p.text, toolNames);
for (const evt of events) {
if (evt.type === 'tool_call_deltas') {
if (!emitEarlyToolDeltas) {
continue;
}
const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames);
const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs);
if (formatted.length > 0) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatted });
}
continue;
}
if (evt.type === 'tool_calls') {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
continue;
}
if (evt.text) {
sendDeltaFrame({ content: evt.text });
}
}
}
}
}
}
await finish('stop');
} catch (err) {
if (clientClosed || isAbortError(err)) {
await finish('stop');
return;
}
await finish('stop');
}
} finally {
req.removeListener('aborted', onReqAborted);
res.removeListener('close', onResClose);
await releaseLease();
}
}
function toBool(v) {
return v === true;
}
module.exports = {
handleVercelStream,
};