diff --git a/docs/DEPLOY.en.md b/docs/DEPLOY.en.md index acb43eb..de52b4c 100644 --- a/docs/DEPLOY.en.md +++ b/docs/DEPLOY.en.md @@ -301,6 +301,7 @@ Vercel Go Runtime applies platform-level response buffering, so this project use - `api/chat-stream.js` falls back to Go entry (`?__go=1`) for non-stream requests only - Streaming requests (including requests with `tools`) stay on the Node path and use Go-aligned tool-call anti-leak handling +- The Node stream path also mirrors Go finalization semantics: empty visible output returns the same shaped error SSE, and empty `content_filter` returns a `content_filter` error - WebUI non-stream test calls `?__go=1` directly to avoid Node hop timeout on long requests #### Function Duration diff --git a/docs/DEPLOY.md b/docs/DEPLOY.md index f726df7..7509cb3 100644 --- a/docs/DEPLOY.md +++ b/docs/DEPLOY.md @@ -311,6 +311,7 @@ api/index.go api/chat-stream.js - `api/chat-stream.js` 仅对非流式请求回退到 Go 入口(`?__go=1`) - 流式请求(包括带 `tools`)走 Node 路径,并执行与 Go 对齐的 tool-call 防泄漏处理 +- Node 流式路径同时对齐 Go 的终结态语义:空可见输出会返回同形状错误 SSE,空 `content_filter` 会返回 `content_filter` 错误 - WebUI 的"非流式测试"直接请求 `?__go=1`,避免 Node 中转造成长请求超时 #### 函数时长 diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index b28ecb0..553af69 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -10,7 +10,7 @@ const { formatOpenAIStreamToolCalls, } = require('../helpers/stream-tool-sieve'); const { BASE_HEADERS } = require('../shared/deepseek-constants'); -const { writeOpenAIError } = require('./error_shape'); +const { writeOpenAIError, openAIErrorType } = require('./error_shape'); const { parseChunkForContent, isCitation } = require('./sse_parse'); const { buildUsage } = require('./token_usage'); const { @@ -129,6 +129,7 @@ async function handleVercelStream(req, res, rawBody, payload) { const toolSieveEnabled = toolPolicy.toolSieveEnabled; const toolSieveState = createToolSieveState(); let toolCallsEmitted = false; + let toolCallsDoneEmitted = false; const streamToolCallIDs = new Map(); const streamToolNames = new Map(); const decoder = new TextDecoder(); @@ -153,14 +154,16 @@ async function handleVercelStream(req, res, rawBody, payload) { return; } const detected = parseStandaloneToolCalls(outputText, toolNames); - if (detected.length > 0 && !toolCallsEmitted) { + if (detected.length > 0 && !toolCallsDoneEmitted) { toolCallsEmitted = true; + toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) }); } else if (toolSieveEnabled) { const tailEvents = flushToolSieve(toolSieveState, toolNames); for (const evt of tailEvents) { if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) { toolCallsEmitted = true; + toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); resetStreamToolCallState(streamToolCallIDs, streamToolNames); continue; @@ -173,6 +176,15 @@ async function handleVercelStream(req, res, rawBody, payload) { if (detected.length > 0 || toolCallsEmitted) { reason = 'tool_calls'; } + if (detected.length === 0 && !toolCallsEmitted && outputText.trim() === '') { + const detail = upstreamEmptyOutputDetail(reason === 'content_filter', outputText, thinkingText); + sendFailedChunk(res, detail.status, detail.message, detail.code); + await releaseLease(); + if (!res.writableEnded && !res.destroyed) { + res.end(); + } + return; + } sendFrame({ id: sessionID, object: 'chat.completion.chunk', @@ -234,7 +246,7 @@ async function handleVercelStream(req, res, rawBody, payload) { return; } if (parsed.contentFilter) { - await finish('stop'); + await finish(outputText.trim() === '' ? 'content_filter' : 'stop'); return; } if (parsed.finished) { @@ -284,6 +296,7 @@ async function handleVercelStream(req, res, rawBody, payload) { } if (evt.type === 'tool_calls') { toolCallsEmitted = true; + toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) }); resetStreamToolCallState(streamToolCallIDs, streamToolNames); continue; @@ -315,6 +328,46 @@ function toBool(v) { return v === true; } +function upstreamEmptyOutputDetail(contentFilter, _text, thinking) { + if (contentFilter) { + return { + status: 400, + message: 'Upstream content filtered the response and returned no output.', + code: 'content_filter', + }; + } + if (thinking !== '') { + return { + status: 429, + message: 'Upstream account hit a rate limit and returned reasoning without visible output.', + code: 'upstream_empty_output', + }; + } + return { + status: 429, + message: 'Upstream account hit a rate limit and returned empty output.', + code: 'upstream_empty_output', + }; +} + +function sendFailedChunk(res, status, message, code) { + res.write(`data: ${JSON.stringify({ + status_code: status, + error: { + message, + type: openAIErrorType(status), + code, + param: null, + }, + })}\n\n`); + if (!res.writableEnded && !res.destroyed) { + res.write('data: [DONE]\n\n'); + } + if (typeof res.flush === 'function') { + res.flush(); + } +} + module.exports = { handleVercelStream, }; diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 167c5c7..50e94ee 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -2,8 +2,10 @@ const test = require('node:test'); const assert = require('node:assert/strict'); +const { EventEmitter } = require('node:events'); const handler = require('../../api/chat-stream.js'); +const { handleVercelStream } = require('../../internal/js/chat-stream/vercel_stream.js'); const { createToolSieveState, processToolSieveChunk, @@ -41,11 +43,158 @@ function createMockResponse() { }; } +class MockStreamRequest extends EventEmitter { + constructor() { + super(); + this.url = '/v1/chat/completions'; + this.headers = { host: 'example.test', 'content-type': 'application/json' }; + } +} + +class MockStreamResponse extends EventEmitter { + constructor() { + super(); + this.headers = new Map(); + this.statusCode = 0; + this.chunks = []; + this.writableEnded = false; + this.destroyed = false; + } + + setHeader(key, value) { + this.headers.set(String(key).toLowerCase(), value); + } + + getHeader(key) { + return this.headers.get(String(key).toLowerCase()); + } + + write(chunk) { + this.chunks.push(Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk)); + return true; + } + + end(chunk) { + if (chunk) { + this.write(chunk); + } + this.writableEnded = true; + } + + flushHeaders() {} + + flush() {} + + bodyText() { + return this.chunks.join(''); + } +} + +function jsonResponse(body, status = 200) { + return new Response(JSON.stringify(body), { + status, + headers: { 'content-type': 'application/json' }, + }); +} + +function sseResponse(lines) { + const encoder = new TextEncoder(); + return new Response(new ReadableStream({ + start(controller) { + for (const line of lines) { + controller.enqueue(encoder.encode(line)); + } + controller.close(); + }, + }), { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }); +} + +function parseSSEDataFrames(body) { + return body + .split('\n\n') + .map((frame) => frame.trim()) + .filter((frame) => frame.startsWith('data:')) + .map((frame) => frame.slice(5).trim()); +} + +async function runMockVercelStream(upstreamLines, prepareOverrides = {}) { + const originalFetch = global.fetch; + const fetchURLs = []; + const prepareBody = { + session_id: 'chatcmpl-test', + lease_id: 'lease-test', + model: 'gpt-test', + final_prompt: 'hello', + thinking_enabled: false, + search_enabled: false, + compat: { strip_reference_markers: true }, + tool_names: [], + deepseek_token: 'deepseek-token', + pow_header: 'pow-header', + payload: { prompt: 'hello' }, + ...prepareOverrides, + }; + global.fetch = async (url) => { + const textURL = String(url); + fetchURLs.push(textURL); + if (textURL.includes('__stream_prepare=1')) { + return jsonResponse(prepareBody); + } + if (textURL.includes('__stream_release=1')) { + return jsonResponse({ success: true }); + } + return sseResponse(upstreamLines); + }; + try { + const req = new MockStreamRequest(); + const res = new MockStreamResponse(); + const payload = { model: 'gpt-test', stream: true }; + await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload); + return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs }; + } finally { + global.fetch = originalFetch; + } +} + test('chat-stream exposes parser test hooks', () => { assert.equal(typeof parseChunkForContent, 'function'); assert.equal(typeof resolveToolcallPolicy, 'function'); }); +test('vercel stream emits Go-parity empty-output failure on DONE', async () => { + const { frames } = await runMockVercelStream(['data: [DONE]\n\n']); + assert.equal(frames.length, 2); + const failed = JSON.parse(frames[0]); + assert.equal(failed.status_code, 429); + assert.equal(failed.error.type, 'rate_limit_error'); + assert.equal(failed.error.code, 'upstream_empty_output'); + assert.equal(frames[1], '[DONE]'); +}); + +test('vercel stream emits content_filter failure when upstream filters empty output', async () => { + const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']); + assert.equal(frames.length, 2); + const failed = JSON.parse(frames[0]); + assert.equal(failed.status_code, 400); + assert.equal(failed.error.type, 'invalid_request_error'); + assert.equal(failed.error.code, 'content_filter'); + assert.equal(frames[1], '[DONE]'); +}); + +test('vercel stream keeps stop finish when content_filter arrives after visible text', async () => { + const { frames } = await runMockVercelStream([ + 'data: {"p":"response/content","v":"hello"}\n\n', + 'data: {"code":"content_filter"}\n\n', + ]); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + assert.equal(parsed[0].choices[0].delta.content, 'hello'); + assert.equal(parsed[1].choices[0].finish_reason, 'stop'); + assert.equal(parsed[1].usage.completion_tokens, 1); +}); + test('resolveToolcallPolicy defaults to feature-match + early emit when prepare flags missing', () => { const policy = resolveToolcallPolicy( {},