diff --git a/internal/compat/go_compat_test.go b/internal/compat/go_compat_test.go index 7768e4b..5358821 100644 --- a/internal/compat/go_compat_test.go +++ b/internal/compat/go_compat_test.go @@ -32,23 +32,36 @@ func TestGoCompatSSEFixtures(t *testing.T) { mustLoadJSON(t, fixturePath, &fixture) var expected struct { - Parts []map[string]any `json:"parts"` - Finished bool `json:"finished"` - NewType string `json:"new_type"` + Parts []map[string]any `json:"parts"` + Finished bool `json:"finished"` + NewType string `json:"new_type"` + ContentFilter bool `json:"content_filter"` + OutputTokens int `json:"output_tokens"` + ErrorMessage string `json:"error_message"` } mustLoadJSON(t, expectedPath, &expected) - parts, finished, newType := sse.ParseSSEChunkForContent(fixture.Chunk, fixture.ThinkingEnable, fixture.CurrentType) - gotParts := make([]map[string]any, 0, len(parts)) - for _, p := range parts { + raw, err := json.Marshal(fixture.Chunk) + if err != nil { + t.Fatalf("marshal fixture %s failed: %v", name, err) + } + res := sse.ParseDeepSeekContentLine(append([]byte("data: "), raw...), fixture.ThinkingEnable, fixture.CurrentType) + gotParts := make([]map[string]any, 0, len(res.Parts)) + for _, p := range res.Parts { gotParts = append(gotParts, map[string]any{ "text": p.Text, "type": p.Type, }) } - if !reflect.DeepEqual(gotParts, expected.Parts) || finished != expected.Finished || newType != expected.NewType { - t.Fatalf("fixture %s mismatch:\n got parts=%#v finished=%v newType=%q\nwant parts=%#v finished=%v newType=%q", - name, gotParts, finished, newType, expected.Parts, expected.Finished, expected.NewType) + if !reflect.DeepEqual(gotParts, expected.Parts) || + res.Stop != expected.Finished || + res.NextType != expected.NewType || + res.ContentFilter != expected.ContentFilter || + res.OutputTokens != expected.OutputTokens || + res.ErrorMessage != expected.ErrorMessage { + t.Fatalf("fixture %s mismatch:\n got parts=%#v finished=%v newType=%q contentFilter=%v outputTokens=%d errorMessage=%q\nwant parts=%#v finished=%v newType=%q contentFilter=%v outputTokens=%d errorMessage=%q", + name, gotParts, res.Stop, res.NextType, res.ContentFilter, res.OutputTokens, res.ErrorMessage, + expected.Parts, expected.Finished, expected.NewType, expected.ContentFilter, expected.OutputTokens, expected.ErrorMessage) } } } diff --git a/internal/js/chat-stream/index.js b/internal/js/chat-stream/index.js index 0bab665..bf9566d 100644 --- a/internal/js/chat-stream/index.js +++ b/internal/js/chat-stream/index.js @@ -6,6 +6,9 @@ const { const { parseChunkForContent, extractContentRecursive, + filterLeakedContentFilterParts, + hasContentFilterStatus, + extractAccumulatedTokenUsage, shouldSkipPath, stripReferenceMarkers, } = require('./sse_parse'); @@ -18,6 +21,7 @@ const { } = require('./toolcall_policy'); const { estimateTokens, + buildUsage, } = require('./token_usage'); const { setCorsHeaders, @@ -109,6 +113,10 @@ module.exports.__test = { boolDefaultTrue, filterIncrementalToolCallDeltasByAllowed, estimateTokens, + buildUsage, + filterLeakedContentFilterParts, + hasContentFilterStatus, + extractAccumulatedTokenUsage, isNodeStreamSupportedPath, extractPathname, }; diff --git a/internal/js/chat-stream/sse_parse.js b/internal/js/chat-stream/sse_parse.js index 8d9220d..ec4b275 100644 --- a/internal/js/chat-stream/sse_parse.js +++ b/internal/js/chat-stream/sse_parse.js @@ -6,15 +6,78 @@ const { } = require('../shared/deepseek-constants'); function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers = true) { - if (!chunk || typeof chunk !== 'object' || !Object.prototype.hasOwnProperty.call(chunk, 'v')) { - return { parts: [], finished: false, newType: currentType }; + if (!chunk || typeof chunk !== 'object') { + return { + parsed: false, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens: 0, + newType: currentType, + }; } + + if (Object.prototype.hasOwnProperty.call(chunk, 'error')) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: formatErrorMessage(chunk.error), + outputTokens: 0, + newType: currentType, + }; + } + const pathValue = asString(chunk.p); + const outputTokens = extractAccumulatedTokenUsage(chunk); + + if (hasContentFilterStatus(chunk)) { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: true, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + if (shouldSkipPath(pathValue)) { - return { parts: [], finished: false, newType: currentType }; + return { + parsed: true, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; } if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { - return { parts: [], finished: true, newType: currentType }; + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } + + if (!Object.prototype.hasOwnProperty.call(chunk, 'v')) { + return { + parsed: true, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; } let newType = currentType; @@ -74,22 +137,54 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc const val = chunk.v; if (typeof val === 'string') { if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) { - return { parts: [], finished: true, newType }; + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; } const content = asContentString(val, stripReferenceMarkers); if (content) { parts.push({ text: content, type: partType }); } - return { parts, finished: false, newType }; + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; } if (Array.isArray(val)) { const extracted = extractContentRecursive(val, partType, stripReferenceMarkers); if (extracted.finished) { - return { parts: [], finished: true, newType }; + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; } parts.push(...extracted.parts); - return { parts, finished: false, newType }; + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; } if (val && typeof val === 'object') { @@ -116,7 +211,15 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc } } } - return { parts, finished: false, newType }; + return { + parsed: true, + parts: filterLeakedContentFilterParts(parts), + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; } function extractContentRecursive(items, defaultType, stripReferenceMarkers = true) { @@ -199,6 +302,151 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru return { parts, finished: false }; } +function filterLeakedContentFilterParts(parts) { + if (!Array.isArray(parts) || parts.length === 0) { + return parts; + } + const out = []; + for (const p of parts) { + if (!p || typeof p !== 'object') { + continue; + } + const { text, stripped } = stripLeakedContentFilterSuffix(p.text); + if (stripped && shouldDropCleanedLeakedChunk(text)) { + continue; + } + if (stripped) { + out.push({ ...p, text }); + continue; + } + out.push(p); + } + return out; +} + +function stripLeakedContentFilterSuffix(text) { + if (typeof text !== 'string' || text === '') { + return { text, stripped: false }; + } + const upperText = text.toUpperCase(); + const idx = upperText.indexOf('CONTENT_FILTER'); + if (idx < 0) { + return { text, stripped: false }; + } + return { + text: text.slice(0, idx).replace(/[ \t\r]+$/g, ''), + stripped: true, + }; +} + +function shouldDropCleanedLeakedChunk(cleaned) { + if (cleaned === '') { + return true; + } + if (typeof cleaned === 'string' && cleaned.includes('\n')) { + return false; + } + return asString(cleaned).trim() === ''; +} + +function hasContentFilterStatus(chunk) { + if (!chunk || typeof chunk !== 'object') { + return false; + } + const code = asString(chunk.code); + if (code && code.toLowerCase() === 'content_filter') { + return true; + } + return hasContentFilterStatusValue(chunk); +} + +function hasContentFilterStatusValue(v) { + if (Array.isArray(v)) { + for (const item of v) { + if (hasContentFilterStatusValue(item)) { + return true; + } + } + return false; + } + if (!v || typeof v !== 'object') { + return false; + } + const pathValue = asString(v.p); + if (pathValue && pathValue.toLowerCase().includes('status')) { + if (asString(v.v).toLowerCase() === 'content_filter') { + return true; + } + } + if (asString(v.code).toLowerCase() === 'content_filter') { + return true; + } + for (const value of Object.values(v)) { + if (hasContentFilterStatusValue(value)) { + return true; + } + } + return false; +} + +function extractAccumulatedTokenUsage(chunk) { + return findAccumulatedTokenUsage(chunk); +} + +function findAccumulatedTokenUsage(v) { + if (Array.isArray(v)) { + for (const item of v) { + const n = findAccumulatedTokenUsage(item); + if (n > 0) { + return n; + } + } + return 0; + } + if (!v || typeof v !== 'object') { + return 0; + } + const pathValue = asString(v.p); + if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) { + const n = toInt(v.v); + if (n > 0) { + return n; + } + } + const direct = toInt(v.accumulated_token_usage); + if (direct > 0) { + return direct; + } + for (const value of Object.values(v)) { + const n = findAccumulatedTokenUsage(value); + if (n > 0) { + return n; + } + } + return 0; +} + +function toInt(v) { + if (typeof v !== 'number' || !Number.isFinite(v)) { + return 0; + } + return Math.trunc(v); +} + +function formatErrorMessage(v) { + if (typeof v === 'string') { + return v; + } + if (v == null) { + return String(v); + } + try { + return JSON.stringify(v); + } catch (_err) { + return String(v); + } +} + function shouldSkipPath(pathValue) { if (isFragmentStatusPath(pathValue)) { return true; @@ -275,6 +523,9 @@ function asString(v) { module.exports = { parseChunkForContent, extractContentRecursive, + filterLeakedContentFilterParts, + hasContentFilterStatus, + extractAccumulatedTokenUsage, shouldSkipPath, isFragmentStatusPath, isCitation, diff --git a/internal/js/chat-stream/token_usage.js b/internal/js/chat-stream/token_usage.js index 57a36fb..0f71c5f 100644 --- a/internal/js/chat-stream/token_usage.js +++ b/internal/js/chat-stream/token_usage.js @@ -1,13 +1,15 @@ 'use strict'; -function buildUsage(prompt, thinking, output) { +function buildUsage(prompt, thinking, output, outputTokens = 0) { const promptTokens = estimateTokens(prompt); const reasoningTokens = estimateTokens(thinking); const completionTokens = estimateTokens(output); + const overriddenCompletionTokens = Number.isFinite(outputTokens) && outputTokens > 0 ? Math.trunc(outputTokens) : 0; + const finalCompletionTokens = overriddenCompletionTokens > 0 ? overriddenCompletionTokens : reasoningTokens + completionTokens; return { prompt_tokens: promptTokens, - completion_tokens: reasoningTokens + completionTokens, - total_tokens: promptTokens + reasoningTokens + completionTokens, + completion_tokens: finalCompletionTokens, + total_tokens: promptTokens + finalCompletionTokens, completion_tokens_details: { reasoning_tokens: reasoningTokens, }, @@ -15,7 +17,7 @@ function buildUsage(prompt, thinking, output) { } function estimateTokens(text) { - const t = asString(text); + const t = asTokenString(text); if (!t) { return 0; } @@ -32,17 +34,17 @@ function estimateTokens(text) { return n < 1 ? 1 : n; } -function asString(v) { +function asTokenString(v) { if (typeof v === 'string') { - return v.trim(); + return v; } if (Array.isArray(v)) { - return asString(v[0]); + return asTokenString(v[0]); } if (v == null) { return ''; } - return String(v).trim(); + return String(v); } module.exports = { diff --git a/internal/js/chat-stream/vercel_stream.js b/internal/js/chat-stream/vercel_stream.js index 69b6442..030e456 100644 --- a/internal/js/chat-stream/vercel_stream.js +++ b/internal/js/chat-stream/vercel_stream.js @@ -23,7 +23,6 @@ const { isAbortError, fetchStreamPrepare, relayPreparedFailure, - safeReadText, createLeaseReleaser, } = require('./http_internal'); @@ -102,8 +101,9 @@ async function handleVercelStream(req, res, rawBody, payload) { } if (!completionRes.ok || !completionRes.body) { - const detail = await safeReadText(completionRes); - writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.'); + const detail = completionRes.body ? await completionRes.text() : ''; + const status = completionRes.ok ? 500 : completionRes.status || 500; + writeOpenAIError(res, status, detail); return; } @@ -120,6 +120,7 @@ async function handleVercelStream(req, res, rawBody, payload) { let currentType = thinkingEnabled ? 'thinking' : 'text'; let thinkingText = ''; let outputText = ''; + let outputTokens = 0; const toolSieveEnabled = toolPolicy.toolSieveEnabled; const toolSieveState = createToolSieveState(); let toolCallsEmitted = false; @@ -172,7 +173,7 @@ async function handleVercelStream(req, res, rawBody, payload) { created, model, choices: [{ delta: {}, index: 0, finish_reason: reason }], - usage: buildUsage(finalPrompt, thinkingText, outputText), + usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens), }); if (!res.writableEnded && !res.destroyed) { res.write('data: [DONE]\n\n'); @@ -217,12 +218,22 @@ async function handleVercelStream(req, res, rawBody, payload) { } catch (_err) { continue; } - if (chunk.error || chunk.code === 'content_filter') { + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); + if (!parsed.parsed) { + continue; + } + if (parsed.outputTokens > 0) { + outputTokens = parsed.outputTokens; + } + currentType = parsed.newType; + if (parsed.errorMessage) { await finish('content_filter'); return; } - const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers); - currentType = parsed.newType; + if (parsed.contentFilter) { + await finish('stop'); + return; + } if (parsed.finished) { await finish('stop'); return; diff --git a/tests/compat/expected/sse_content_filter_status.json b/tests/compat/expected/sse_content_filter_status.json new file mode 100644 index 0000000..bf70dfb --- /dev/null +++ b/tests/compat/expected/sse_content_filter_status.json @@ -0,0 +1,8 @@ +{ + "parts": [], + "finished": true, + "new_type": "text", + "content_filter": true, + "output_tokens": 77, + "error_message": "" +} diff --git a/tests/compat/expected/sse_leaked_content_filter.json b/tests/compat/expected/sse_leaked_content_filter.json new file mode 100644 index 0000000..36e23a1 --- /dev/null +++ b/tests/compat/expected/sse_leaked_content_filter.json @@ -0,0 +1,7 @@ +{ + "parts": [ + {"text": "正常输出", "type": "text"} + ], + "finished": false, + "new_type": "text" +} diff --git a/tests/compat/expected/token_cases.json b/tests/compat/expected/token_cases.json index 69694eb..2634721 100644 --- a/tests/compat/expected/token_cases.json +++ b/tests/compat/expected/token_cases.json @@ -1,6 +1,8 @@ { "cases": [ {"name": "ascii_short", "tokens": 1}, + {"name": "whitespace_only", "tokens": 1}, + {"name": "newline_only", "tokens": 1}, {"name": "cjk", "tokens": 3}, {"name": "mixed", "tokens": 4} ] diff --git a/tests/compat/fixtures/sse_chunks/content_filter_status.json b/tests/compat/fixtures/sse_chunks/content_filter_status.json new file mode 100644 index 0000000..6b5b051 --- /dev/null +++ b/tests/compat/fixtures/sse_chunks/content_filter_status.json @@ -0,0 +1,11 @@ +{ + "chunk": { + "p": "response", + "v": [ + {"p": "status", "v": "CONTENT_FILTER"}, + {"p": "accumulated_token_usage", "v": 77} + ] + }, + "thinking_enabled": false, + "current_type": "text" +} diff --git a/tests/compat/fixtures/sse_chunks/leaked_content_filter.json b/tests/compat/fixtures/sse_chunks/leaked_content_filter.json new file mode 100644 index 0000000..1352025 --- /dev/null +++ b/tests/compat/fixtures/sse_chunks/leaked_content_filter.json @@ -0,0 +1,8 @@ +{ + "chunk": { + "p": "response/content", + "v": "正常输出CONTENT_FILTER你好,这个问题我暂时无法回答" + }, + "thinking_enabled": false, + "current_type": "text" +} diff --git a/tests/compat/fixtures/token_cases.json b/tests/compat/fixtures/token_cases.json index 3887356..d2d105b 100644 --- a/tests/compat/fixtures/token_cases.json +++ b/tests/compat/fixtures/token_cases.json @@ -1,6 +1,8 @@ { "cases": [ {"name": "ascii_short", "text": "abcd"}, + {"name": "whitespace_only", "text": " "}, + {"name": "newline_only", "text": "\n"}, {"name": "cjk", "text": "你好世界"}, {"name": "mixed", "text": "Hello 你好世界"} ] diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 62f595f..e6a13f9 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -17,6 +17,8 @@ const { normalizePreparedToolNames, boolDefaultTrue, filterIncrementalToolCallDeltasByAllowed, + buildUsage, + estimateTokens, shouldSkipPath, isNodeStreamSupportedPath, extractPathname, @@ -245,6 +247,84 @@ test('parseChunkForContent strips reference markers from fragment content', () = assert.deepEqual(parsed.parts, [{ text: '广州天气 多云', type: 'text' }]); }); +test('parseChunkForContent detects content_filter status and carries output tokens', () => { + const chunk = { + p: 'response', + v: [ + { p: 'status', v: 'CONTENT_FILTER' }, + { p: 'accumulated_token_usage', v: 77 }, + ], + }; + const parsed = parseChunkForContent(chunk, false, 'text'); + assert.equal(parsed.parsed, true); + assert.equal(parsed.finished, true); + assert.equal(parsed.contentFilter, true); + assert.equal(parsed.outputTokens, 77); + assert.deepEqual(parsed.parts, []); +}); + +test('parseChunkForContent keeps error branches distinct from content_filter status', () => { + const chunk = { + error: { message: 'boom' }, + code: 'content_filter', + accumulated_token_usage: 88, + }; + const parsed = parseChunkForContent(chunk, false, 'text'); + assert.equal(parsed.parsed, true); + assert.equal(parsed.finished, true); + assert.equal(parsed.contentFilter, false); + assert.equal(parsed.errorMessage.length > 0, true); + assert.equal(parsed.outputTokens, 0); + assert.deepEqual(parsed.parts, []); +}); + +test('parseChunkForContent preserves output tokens on FINISHED lines', () => { + const parsed = parseChunkForContent( + { p: 'response/status', v: 'FINISHED', accumulated_token_usage: 190 }, + false, + 'text', + ); + assert.equal(parsed.parsed, true); + assert.equal(parsed.finished, true); + assert.equal(parsed.contentFilter, false); + assert.equal(parsed.outputTokens, 190); + assert.deepEqual(parsed.parts, []); +}); + +test('parseChunkForContent strips leaked CONTENT_FILTER suffix and preserves line breaks', () => { + const leaked = parseChunkForContent( + { p: 'response/content', v: '正常输出CONTENT_FILTER你好,这个问题我暂时无法回答' }, + false, + 'text', + ); + assert.deepEqual(leaked.parts, [{ text: '正常输出', type: 'text' }]); + + const newlineTail = parseChunkForContent( + { p: 'response/content', v: 'line1\nCONTENT_FILTERblocked' }, + false, + 'text', + ); + assert.deepEqual(newlineTail.parts, [{ text: 'line1\n', type: 'text' }]); + + const newlineOnly = parseChunkForContent( + { p: 'response/content', v: '\nCONTENT_FILTERblocked' }, + false, + 'text', + ); + assert.deepEqual(newlineOnly.parts, [{ text: '\n', type: 'text' }]); +}); + +test('estimateTokens preserves whitespace-only strings and buildUsage accepts output token overrides', () => { + assert.equal(estimateTokens(' '), 1); + assert.equal(estimateTokens('\n'), 1); + + const usage = buildUsage('abcd', 'ef', 'gh', 99); + assert.equal(usage.prompt_tokens, 1); + assert.equal(usage.completion_tokens, 99); + assert.equal(usage.total_tokens, 100); + assert.equal(usage.completion_tokens_details.reasoning_tokens, 1); +}); + test('shouldSkipPath skips dynamic response/fragments/*/status paths only', () => { assert.equal(shouldSkipPath('response/fragments/-16/status'), true); assert.equal(shouldSkipPath('response/fragments/8/status'), true); diff --git a/tests/node/js_compat_test.js b/tests/node/js_compat_test.js index c4bdcff..2b767ea 100644 --- a/tests/node/js_compat_test.js +++ b/tests/node/js_compat_test.js @@ -30,6 +30,9 @@ test('js compat: sse fixtures', () => { assert.deepEqual(got.parts, expected.parts, `${name}: parts mismatch`); assert.equal(got.finished, expected.finished, `${name}: finished mismatch`); assert.equal(got.newType, expected.new_type, `${name}: newType mismatch`); + assert.equal(Boolean(got.contentFilter), Boolean(expected.content_filter), `${name}: contentFilter mismatch`); + assert.equal(Number(got.outputTokens || 0), Number(expected.output_tokens || 0), `${name}: outputTokens mismatch`); + assert.equal(got.errorMessage || '', expected.error_message || '', `${name}: errorMessage mismatch`); } });