'use strict'; // Implementation moved here to keep the line-gate wrapper tiny. const { createToolSieveState, processToolSieveChunk, flushToolSieve, parseStandaloneToolCalls, formatOpenAIStreamToolCalls, } = require('../helpers/stream-tool-sieve'); const { BASE_HEADERS } = require('../shared/deepseek-constants'); const { writeOpenAIError, openAIErrorType } = require('./error_shape'); const { parseChunkForContent, isCitation } = require('./sse_parse'); const { buildUsage } = require('./token_usage'); const { resolveToolcallPolicy, formatIncrementalToolCallDeltas, filterIncrementalToolCallDeltasByAllowed, resetStreamToolCallState, } = require('./toolcall_policy'); const { createChatCompletionEmitter, createDeltaCoalescer } = require('./stream_emitter'); const { asString, isAbortError, fetchStreamPrepare, fetchStreamPow, relayPreparedFailure, createLeaseReleaser, } = require('./http_internal'); const { trimContinuationOverlap, } = require('./dedupe'); const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; const DEEPSEEK_CONTINUE_URL = 'https://chat.deepseek.com/api/v0/chat/continue'; const EMPTY_OUTPUT_RETRY_SUFFIX = 'Previous reply had no visible output. Please regenerate the visible final answer or tool call now.'; const EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS = 1; const AUTO_CONTINUE_MAX_ROUNDS = 8; async function handleVercelStream(req, res, rawBody, payload) { const prep = await fetchStreamPrepare(req, rawBody); if (!prep.ok) { relayPreparedFailure(res, prep); return; } const model = asString(prep.body.model) || asString(payload.model); const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; const leaseID = asString(prep.body.lease_id); const deepseekToken = asString(prep.body.deepseek_token); const initialPowHeader = asString(prep.body.pow_header); const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; const finalPrompt = asString(prep.body.final_prompt); const thinkingEnabled = toBool(prep.body.thinking_enabled); const searchEnabled = toBool(prep.body.search_enabled); const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools); const toolNames = toolPolicy.toolNames; const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas; const stripReferenceMarkers = true; if (!model || !leaseID || !deepseekToken || !initialPowHeader || !completionPayload) { writeOpenAIError(res, 500, 'invalid vercel prepare response'); return; } const releaseLease = createLeaseReleaser(req, leaseID); const upstreamController = new AbortController(); let clientClosed = false; let reader = null; const markClientClosed = () => { if (clientClosed) { return; } clientClosed = true; upstreamController.abort(); if (reader && typeof reader.cancel === 'function') { Promise.resolve(reader.cancel()).catch(() => {}); } }; const onReqAborted = () => markClientClosed(); const onResClose = () => { if (!res.writableEnded) { markClientClosed(); } }; req.on('aborted', onReqAborted); res.on('close', onResClose); try { let currentPowHeader = initialPowHeader; const refreshPowHeader = async (roundType) => { try { const pow = await fetchStreamPow(req, leaseID); const nextPowHeader = asString(pow.body && pow.body.pow_header); if (pow.ok && nextPowHeader) { currentPowHeader = nextPowHeader; return currentPowHeader; } console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', { round_type: roundType, status: pow.status || 0, }); } catch (err) { if (clientClosed || isAbortError(err)) { return ''; } console.warn('[vercel_stream_pow] refresh failed, reusing previous PoW', { round_type: roundType, error: err, }); } return currentPowHeader; }; const fetchDeepSeekStream = async (url, bodyPayload, powHeader) => { try { return await fetch(url, { method: 'POST', headers: { ...BASE_HEADERS, authorization: `Bearer ${deepseekToken}`, 'x-ds-pow-response': powHeader, }, body: JSON.stringify(bodyPayload), signal: upstreamController.signal, }); } catch (err) { if (clientClosed || isAbortError(err)) { return null; } throw err; } }; const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload, currentPowHeader); const fetchContinue = async (messageID) => { const powHeader = await refreshPowHeader('continue'); if (!powHeader) { return null; } return fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, { chat_session_id: sessionID, message_id: messageID, fallback_to_resume: true, }, powHeader); }; let completionRes = await fetchCompletion(completionPayload); if (completionRes === null) { return; } if (clientClosed) { return; } if (!completionRes.ok || !completionRes.body) { const detail = completionRes.body ? await completionRes.text() : ''; const status = completionRes.ok ? 500 : completionRes.status || 500; writeOpenAIError(res, status, detail); return; } res.statusCode = 200; res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); if (typeof res.flushHeaders === 'function') { res.flushHeaders(); } const created = Math.floor(Date.now() / 1000); let currentType = thinkingEnabled ? 'thinking' : 'text'; let thinkingText = ''; let outputText = ''; let usagePrompt = finalPrompt; const toolSieveEnabled = toolPolicy.toolSieveEnabled; const toolSieveState = createToolSieveState(); let toolCallsEmitted = false; let toolCallsDoneEmitted = false; const streamToolCallIDs = new Map(); const streamToolNames = new Map(); const decoder = new TextDecoder(); let buffered = ''; let ended = false; const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({ res, sessionID, created, model, isClosed: () => clientClosed, }); const deltaCoalescer = createDeltaCoalescer({ sendDeltaFrame }); const finish = async (reason, options = {}) => { if (ended) { return true; } if (clientClosed || res.writableEnded || res.destroyed) { ended = true; await releaseLease(); return true; } deltaCoalescer.flush(); const detected = parseStandaloneToolCalls(outputText, toolNames); if (detected.length > 0 && !toolCallsDoneEmitted) { toolCallsEmitted = true; toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs, payload.tools) }); } else if (toolSieveEnabled) { const tailEvents = flushToolSieve(toolSieveState, toolNames); for (const evt of tailEvents) { if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) { deltaCoalescer.flush(); toolCallsEmitted = true; toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs, payload.tools) }); resetStreamToolCallState(streamToolCallIDs, streamToolNames); continue; } if (evt.text) { deltaCoalescer.append('content', evt.text); } } deltaCoalescer.flush(); } if (detected.length > 0 || toolCallsEmitted) { reason = 'tool_calls'; } if (detected.length === 0 && !toolCallsEmitted && outputText.trim() === '') { if (options.deferEmpty && reason !== 'content_filter') { return false; } ended = true; const detail = upstreamEmptyOutputDetail(reason === 'content_filter', outputText, thinkingText); sendFailedChunk(res, detail.status, detail.message, detail.code); await releaseLease(); if (!res.writableEnded && !res.destroyed) { res.end(); } return true; } ended = true; sendFrame({ id: sessionID, object: 'chat.completion.chunk', created, model, choices: [{ delta: {}, index: 0, finish_reason: reason }], usage: buildUsage(usagePrompt, thinkingText, outputText), }); if (!res.writableEnded && !res.destroyed) { res.write('data: [DONE]\n\n'); } await releaseLease(); if (!res.writableEnded && !res.destroyed) { res.end(); } return true; }; const processStream = async (initialResponse, allowDeferEmpty) => { let currentResponse = initialResponse; let continueState = createContinueState(sessionID); let continueRounds = 0; // eslint-disable-next-line no-constant-condition while (true) { reader = currentResponse.body.getReader(); buffered = ''; let streamEnded = false; try { // eslint-disable-next-line no-constant-condition while (true) { if (clientClosed) { await finish('stop'); return { terminal: true, retryable: false }; } const { value, done } = await reader.read(); if (done) { break; } buffered += decoder.decode(value, { stream: true }); const lines = buffered.split('\n'); buffered = lines.pop() || ''; for (const rawLine of lines) { const line = rawLine.trim(); if (!line.startsWith('data:')) { continue; } const dataStr = line.slice(5).trim(); if (!dataStr) { continue; } if (dataStr === '[DONE]') { streamEnded = true; break; } let chunk; try { chunk = JSON.parse(dataStr); } catch (_err) { continue; } observeContinueState(continueState, chunk); const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); if (!parsed.parsed) { continue; } currentType = parsed.newType; if (parsed.errorMessage) { return { terminal: await finish('content_filter'), retryable: false }; } if (parsed.contentFilter) { return { terminal: await finish(outputText.trim() === '' ? 'content_filter' : 'stop'), retryable: false }; } if (parsed.finished) { streamEnded = true; break; } for (const p of parsed.parts) { if (!p.text) { continue; } if (p.type === 'thinking') { if (thinkingEnabled) { const trimmed = trimContinuationOverlap(thinkingText, p.text); if (!trimmed) { continue; } thinkingText += trimmed; deltaCoalescer.append('reasoning_content', trimmed); } } else { const trimmed = trimContinuationOverlap(outputText, p.text); if (!trimmed) { continue; } if (searchEnabled && isCitation(trimmed)) { continue; } outputText += trimmed; if (!toolSieveEnabled) { deltaCoalescer.append('content', trimmed); continue; } const events = processToolSieveChunk(toolSieveState, trimmed, toolNames); for (const evt of events) { if (evt.type === 'tool_call_deltas') { if (!emitEarlyToolDeltas) { continue; } const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames); const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); if (formatted.length > 0) { toolCallsEmitted = true; deltaCoalescer.flush(); sendDeltaFrame({ tool_calls: formatted }); } continue; } if (evt.type === 'tool_calls') { toolCallsEmitted = true; toolCallsDoneEmitted = true; deltaCoalescer.flush(); sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs, payload.tools) }); resetStreamToolCallState(streamToolCallIDs, streamToolNames); continue; } if (evt.text) { deltaCoalescer.append('content', evt.text); } } } } if (streamEnded) { break; } } if (streamEnded) { break; } } } catch (err) { if (clientClosed || isAbortError(err)) { await finish('stop'); return { terminal: true, retryable: false }; } await finish('stop'); return { terminal: true, retryable: false }; } if (shouldAutoContinue(continueState) && continueRounds < AUTO_CONTINUE_MAX_ROUNDS) { continueRounds += 1; const nextRes = await fetchContinue(continueState.responseMessageID); if (nextRes === null) { return { terminal: true, retryable: false }; } if (!nextRes.ok || !nextRes.body) { return { terminal: await finish('stop'), retryable: false }; } continueState = prepareContinueStateForNextRound(continueState); currentResponse = nextRes; continue; } break; } const terminal = await finish('stop', { deferEmpty: allowDeferEmpty }); return { terminal, retryable: !terminal && allowDeferEmpty, responseMessageID: continueState.responseMessageID }; }; let retryAttempts = 0; // eslint-disable-next-line no-constant-condition while (true) { const processed = await processStream(completionRes, retryAttempts < EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS); if (processed.terminal) { return; } if (!processed.retryable || retryAttempts >= EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS) { await finish('stop'); return; } retryAttempts += 1; console.info('[openai_empty_retry] attempting synthetic retry', { surface: 'chat.completions', stream: true, retry_attempt: retryAttempts, parent_message_id: processed.responseMessageID || 0, }); usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, retryAttempts); const retryPowHeader = await refreshPowHeader('retry'); if (!retryPowHeader) { return; } completionRes = await fetchDeepSeekStream( DEEPSEEK_COMPLETION_URL, clonePayloadForEmptyOutputRetry(completionPayload, processed.responseMessageID), retryPowHeader, ); if (completionRes === null) { return; } if (!completionRes.ok || !completionRes.body) { await finish('stop'); return; } } } finally { req.removeListener('aborted', onReqAborted); res.removeListener('close', onResClose); await releaseLease(); } } function toBool(v) { return v === true; } function clonePayloadForEmptyOutputRetry(payload, parentMessageID) { const clone = { ...(payload || {}), prompt: appendEmptyOutputRetrySuffix(asString(payload && payload.prompt)), }; if (parentMessageID && parentMessageID > 0) { clone.parent_message_id = parentMessageID; } return clone; } function appendEmptyOutputRetrySuffix(prompt) { const base = asString(prompt).trimEnd(); if (!base) { return EMPTY_OUTPUT_RETRY_SUFFIX; } return `${base}\n\n${EMPTY_OUTPUT_RETRY_SUFFIX}`; } function usagePromptWithEmptyOutputRetry(originalPrompt, attempts) { if (!attempts || attempts <= 0) { return originalPrompt; } const parts = [originalPrompt]; let next = originalPrompt; for (let i = 0; i < attempts; i += 1) { next = appendEmptyOutputRetrySuffix(next); parts.push(next); } return parts.join('\n'); } function createContinueState(sessionID) { return { sessionID: asString(sessionID), responseMessageID: 0, lastStatus: '', finished: false, }; } function prepareContinueStateForNextRound(state) { return { ...state, lastStatus: '', finished: false, }; } function observeContinueState(state, chunk) { if (!state || !chunk || typeof chunk !== 'object') { return; } const topID = numberValue(chunk.response_message_id); if (topID > 0) { state.responseMessageID = topID; } observeContinueDirectPatch(state, chunk.p, chunk.v); if (chunk.p === 'response') { observeContinueBatchPatches(state, 'response', chunk.v); } else { observeContinueBatchPatches(state, '', chunk.v); } const response = chunk.v && typeof chunk.v === 'object' ? chunk.v.response : null; observeContinueResponseObject(state, response); const messageResponse = chunk.message && typeof chunk.message === 'object' && chunk.message.response; observeContinueResponseObject(state, messageResponse); } function observeContinueDirectPatch(state, path, value) { if (!state) { return; } switch (asString(path).trim().replace(/^\/+|\/+$/g, '')) { case 'response/status': case 'status': case 'response/quasi_status': case 'quasi_status': setContinueStatus(state, asString(value)); break; case 'response/auto_continue': case 'auto_continue': if (value === true) { state.lastStatus = 'AUTO_CONTINUE'; } break; default: break; } } function observeContinueResponseObject(state, response) { if (!state || !response || typeof response !== 'object') { return; } const id = numberValue(response.message_id); if (id > 0) { state.responseMessageID = id; } setContinueStatus(state, asString(response.status)); if (response.auto_continue === true) { state.lastStatus = 'AUTO_CONTINUE'; } } function observeContinueBatchPatches(state, parentPath, raw) { if (!state || !Array.isArray(raw)) { return; } for (const patch of raw) { if (!patch || typeof patch !== 'object') { continue; } const path = asString(patch.p).trim(); if (!path) { continue; } let fullPath = path; const parent = asString(parentPath).trim().replace(/^\/+|\/+$/g, ''); if (parent && !path.includes('/')) { fullPath = `${parent}/${path}`; } switch (fullPath.replace(/^\/+|\/+$/g, '')) { case 'response/status': case 'status': case 'response/quasi_status': case 'quasi_status': setContinueStatus(state, asString(patch.v)); break; case 'response/auto_continue': case 'auto_continue': if (patch.v === true) { state.lastStatus = 'AUTO_CONTINUE'; } break; default: break; } } } function setContinueStatus(state, status) { const normalized = asString(status).trim(); if (!normalized) { return; } state.lastStatus = normalized; if (['FINISHED', 'CONTENT_FILTER'].includes(normalized.toUpperCase())) { state.finished = true; } } function shouldAutoContinue(state) { if (!state || state.finished || !state.sessionID || state.responseMessageID <= 0) { return false; } return ['INCOMPLETE', 'AUTO_CONTINUE'].includes(asString(state.lastStatus).trim().toUpperCase()); } function numberValue(v) { if (typeof v === 'number' && Number.isFinite(v)) { return Math.trunc(v); } const parsed = Number.parseInt(asString(v), 10); return Number.isFinite(parsed) ? parsed : 0; } function upstreamEmptyOutputDetail(contentFilter, _text, thinking) { if (contentFilter) { return { status: 400, message: 'Upstream content filtered the response and returned no output.', code: 'content_filter', }; } if (thinking !== '') { return { status: 429, message: 'Upstream account hit a rate limit and returned reasoning without visible output.', code: 'upstream_empty_output', }; } return { status: 429, message: 'Upstream account hit a rate limit and returned empty output.', code: 'upstream_empty_output', }; } function sendFailedChunk(res, status, message, code) { res.write(`data: ${JSON.stringify({ status_code: status, error: { message, type: openAIErrorType(status), code, param: null, }, })}\n\n`); if (!res.writableEnded && !res.destroyed) { res.write('data: [DONE]\n\n'); } if (typeof res.flush === 'function') { res.flush(); } } module.exports = { handleVercelStream, };