align vercel stream finalization with go

This commit is contained in:
CJACK
2026-04-26 08:29:23 +08:00
parent 7bff2c1bab
commit 40c61949e8
4 changed files with 207 additions and 3 deletions

View File

@@ -301,6 +301,7 @@ Vercel Go Runtime applies platform-level response buffering, so this project use
- `api/chat-stream.js` falls back to Go entry (`?__go=1`) for non-stream requests only
- Streaming requests (including requests with `tools`) stay on the Node path and use Go-aligned tool-call anti-leak handling
- The Node stream path also mirrors Go finalization semantics: empty visible output returns the same shaped error SSE, and empty `content_filter` returns a `content_filter` error
- WebUI non-stream test calls `?__go=1` directly to avoid Node hop timeout on long requests
#### Function Duration

View File

@@ -311,6 +311,7 @@ api/index.go api/chat-stream.js
- `api/chat-stream.js` 仅对非流式请求回退到 Go 入口(`?__go=1`
- 流式请求(包括带 `tools`)走 Node 路径,并执行与 Go 对齐的 tool-call 防泄漏处理
- Node 流式路径同时对齐 Go 的终结态语义:空可见输出会返回同形状错误 SSE`content_filter` 会返回 `content_filter` 错误
- WebUI 的"非流式测试"直接请求 `?__go=1`,避免 Node 中转造成长请求超时
#### 函数时长

View File

@@ -10,7 +10,7 @@ const {
formatOpenAIStreamToolCalls,
} = require('../helpers/stream-tool-sieve');
const { BASE_HEADERS } = require('../shared/deepseek-constants');
const { writeOpenAIError } = require('./error_shape');
const { writeOpenAIError, openAIErrorType } = require('./error_shape');
const { parseChunkForContent, isCitation } = require('./sse_parse');
const { buildUsage } = require('./token_usage');
const {
@@ -129,6 +129,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
const toolSieveState = createToolSieveState();
let toolCallsEmitted = false;
let toolCallsDoneEmitted = false;
const streamToolCallIDs = new Map();
const streamToolNames = new Map();
const decoder = new TextDecoder();
@@ -153,14 +154,16 @@ async function handleVercelStream(req, res, rawBody, payload) {
return;
}
const detected = parseStandaloneToolCalls(outputText, toolNames);
if (detected.length > 0 && !toolCallsEmitted) {
if (detected.length > 0 && !toolCallsDoneEmitted) {
toolCallsEmitted = true;
toolCallsDoneEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected, streamToolCallIDs) });
} else if (toolSieveEnabled) {
const tailEvents = flushToolSieve(toolSieveState, toolNames);
for (const evt of tailEvents) {
if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) {
toolCallsEmitted = true;
toolCallsDoneEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
resetStreamToolCallState(streamToolCallIDs, streamToolNames);
continue;
@@ -173,6 +176,15 @@ async function handleVercelStream(req, res, rawBody, payload) {
if (detected.length > 0 || toolCallsEmitted) {
reason = 'tool_calls';
}
if (detected.length === 0 && !toolCallsEmitted && outputText.trim() === '') {
const detail = upstreamEmptyOutputDetail(reason === 'content_filter', outputText, thinkingText);
sendFailedChunk(res, detail.status, detail.message, detail.code);
await releaseLease();
if (!res.writableEnded && !res.destroyed) {
res.end();
}
return;
}
sendFrame({
id: sessionID,
object: 'chat.completion.chunk',
@@ -234,7 +246,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
return;
}
if (parsed.contentFilter) {
await finish('stop');
await finish(outputText.trim() === '' ? 'content_filter' : 'stop');
return;
}
if (parsed.finished) {
@@ -284,6 +296,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
}
if (evt.type === 'tool_calls') {
toolCallsEmitted = true;
toolCallsDoneEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
resetStreamToolCallState(streamToolCallIDs, streamToolNames);
continue;
@@ -315,6 +328,46 @@ function toBool(v) {
return v === true;
}
function upstreamEmptyOutputDetail(contentFilter, _text, thinking) {
if (contentFilter) {
return {
status: 400,
message: 'Upstream content filtered the response and returned no output.',
code: 'content_filter',
};
}
if (thinking !== '') {
return {
status: 429,
message: 'Upstream account hit a rate limit and returned reasoning without visible output.',
code: 'upstream_empty_output',
};
}
return {
status: 429,
message: 'Upstream account hit a rate limit and returned empty output.',
code: 'upstream_empty_output',
};
}
function sendFailedChunk(res, status, message, code) {
res.write(`data: ${JSON.stringify({
status_code: status,
error: {
message,
type: openAIErrorType(status),
code,
param: null,
},
})}\n\n`);
if (!res.writableEnded && !res.destroyed) {
res.write('data: [DONE]\n\n');
}
if (typeof res.flush === 'function') {
res.flush();
}
}
module.exports = {
handleVercelStream,
};

View File

@@ -2,8 +2,10 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const { EventEmitter } = require('node:events');
const handler = require('../../api/chat-stream.js');
const { handleVercelStream } = require('../../internal/js/chat-stream/vercel_stream.js');
const {
createToolSieveState,
processToolSieveChunk,
@@ -41,11 +43,158 @@ function createMockResponse() {
};
}
class MockStreamRequest extends EventEmitter {
constructor() {
super();
this.url = '/v1/chat/completions';
this.headers = { host: 'example.test', 'content-type': 'application/json' };
}
}
class MockStreamResponse extends EventEmitter {
constructor() {
super();
this.headers = new Map();
this.statusCode = 0;
this.chunks = [];
this.writableEnded = false;
this.destroyed = false;
}
setHeader(key, value) {
this.headers.set(String(key).toLowerCase(), value);
}
getHeader(key) {
return this.headers.get(String(key).toLowerCase());
}
write(chunk) {
this.chunks.push(Buffer.isBuffer(chunk) ? chunk.toString('utf8') : String(chunk));
return true;
}
end(chunk) {
if (chunk) {
this.write(chunk);
}
this.writableEnded = true;
}
flushHeaders() {}
flush() {}
bodyText() {
return this.chunks.join('');
}
}
function jsonResponse(body, status = 200) {
return new Response(JSON.stringify(body), {
status,
headers: { 'content-type': 'application/json' },
});
}
function sseResponse(lines) {
const encoder = new TextEncoder();
return new Response(new ReadableStream({
start(controller) {
for (const line of lines) {
controller.enqueue(encoder.encode(line));
}
controller.close();
},
}), {
status: 200,
headers: { 'content-type': 'text/event-stream' },
});
}
function parseSSEDataFrames(body) {
return body
.split('\n\n')
.map((frame) => frame.trim())
.filter((frame) => frame.startsWith('data:'))
.map((frame) => frame.slice(5).trim());
}
async function runMockVercelStream(upstreamLines, prepareOverrides = {}) {
const originalFetch = global.fetch;
const fetchURLs = [];
const prepareBody = {
session_id: 'chatcmpl-test',
lease_id: 'lease-test',
model: 'gpt-test',
final_prompt: 'hello',
thinking_enabled: false,
search_enabled: false,
compat: { strip_reference_markers: true },
tool_names: [],
deepseek_token: 'deepseek-token',
pow_header: 'pow-header',
payload: { prompt: 'hello' },
...prepareOverrides,
};
global.fetch = async (url) => {
const textURL = String(url);
fetchURLs.push(textURL);
if (textURL.includes('__stream_prepare=1')) {
return jsonResponse(prepareBody);
}
if (textURL.includes('__stream_release=1')) {
return jsonResponse({ success: true });
}
return sseResponse(upstreamLines);
};
try {
const req = new MockStreamRequest();
const res = new MockStreamResponse();
const payload = { model: 'gpt-test', stream: true };
await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload);
return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs };
} finally {
global.fetch = originalFetch;
}
}
test('chat-stream exposes parser test hooks', () => {
assert.equal(typeof parseChunkForContent, 'function');
assert.equal(typeof resolveToolcallPolicy, 'function');
});
test('vercel stream emits Go-parity empty-output failure on DONE', async () => {
const { frames } = await runMockVercelStream(['data: [DONE]\n\n']);
assert.equal(frames.length, 2);
const failed = JSON.parse(frames[0]);
assert.equal(failed.status_code, 429);
assert.equal(failed.error.type, 'rate_limit_error');
assert.equal(failed.error.code, 'upstream_empty_output');
assert.equal(frames[1], '[DONE]');
});
test('vercel stream emits content_filter failure when upstream filters empty output', async () => {
const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']);
assert.equal(frames.length, 2);
const failed = JSON.parse(frames[0]);
assert.equal(failed.status_code, 400);
assert.equal(failed.error.type, 'invalid_request_error');
assert.equal(failed.error.code, 'content_filter');
assert.equal(frames[1], '[DONE]');
});
test('vercel stream keeps stop finish when content_filter arrives after visible text', async () => {
const { frames } = await runMockVercelStream([
'data: {"p":"response/content","v":"hello"}\n\n',
'data: {"code":"content_filter"}\n\n',
]);
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
assert.equal(parsed[0].choices[0].delta.content, 'hello');
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
assert.equal(parsed[1].usage.completion_tokens, 1);
});
test('resolveToolcallPolicy defaults to feature-match + early emit when prepare flags missing', () => {
const policy = resolveToolcallPolicy(
{},