feat: implement trimContinuationOverlap utility to remove redundant stream prefixes and add associated tests.

This commit is contained in:
CJACK
2026-04-06 02:23:28 +08:00
parent 4d36afea4c
commit 49012a227c
10 changed files with 66878 additions and 23 deletions

View File

@@ -0,0 +1,23 @@
'use strict';
const MIN_CONTINUATION_SNAPSHOT_LEN = 32;
function trimContinuationOverlap(existing, incoming) {
if (!incoming) {
return '';
}
if (!existing) {
return incoming;
}
if (incoming.length >= MIN_CONTINUATION_SNAPSHOT_LEN && incoming.startsWith(existing)) {
return incoming.slice(existing.length);
}
if (incoming.length >= MIN_CONTINUATION_SNAPSHOT_LEN && existing.startsWith(incoming)) {
return '';
}
return incoming;
}
module.exports = {
trimContinuationOverlap,
};

View File

@@ -34,6 +34,9 @@ const {
const {
handleVercelStream,
} = require('./vercel_stream');
const {
trimContinuationOverlap,
} = require('./dedupe');
async function handler(req, res) {
setCorsHeaders(res);
@@ -119,4 +122,5 @@ module.exports.__test = {
extractAccumulatedTokenUsage,
isNodeStreamSupportedPath,
extractPathname,
trimContinuationOverlap,
};

View File

@@ -27,6 +27,9 @@ const {
relayPreparedFailure,
createLeaseReleaser,
} = require('./http_internal');
const {
trimContinuationOverlap,
} = require('./dedupe');
const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion';
@@ -245,21 +248,29 @@ async function handleVercelStream(req, res, rawBody, payload) {
if (!p.text) {
continue;
}
if (searchEnabled && isCitation(p.text)) {
continue;
}
if (p.type === 'thinking') {
if (thinkingEnabled) {
thinkingText += p.text;
sendDeltaFrame({ reasoning_content: p.text });
const trimmed = trimContinuationOverlap(thinkingText, p.text);
if (!trimmed) {
continue;
}
thinkingText += trimmed;
sendDeltaFrame({ reasoning_content: trimmed });
}
} else {
outputText += p.text;
if (!toolSieveEnabled) {
sendDeltaFrame({ content: p.text });
const trimmed = trimContinuationOverlap(outputText, p.text);
if (!trimmed) {
continue;
}
const events = processToolSieveChunk(toolSieveState, p.text, toolNames);
if (searchEnabled && isCitation(trimmed)) {
continue;
}
outputText += trimmed;
if (!toolSieveEnabled) {
sendDeltaFrame({ content: trimmed });
continue;
}
const events = processToolSieveChunk(toolSieveState, trimmed, toolNames);
for (const evt of events) {
if (evt.type === 'tool_call_deltas') {
if (!emitEarlyToolDeltas) {

View File

@@ -8,16 +8,13 @@ import (
)
func TestCollectStreamDedupesContinueSnapshotReplay(t *testing.T) {
prefix := "我们被问到:这是一个很长的续答快照前缀,用来验证去重逻辑不会误伤正常 token。"
body := strings.Join([]string{
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们","references":[],"stage_id":1}]}}}`,
``,
`data: {"p":"response/fragments/-1/content","o":"APPEND","v":"被"}`,
``,
`data: {"v":"问到"}`,
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"` + prefix + `","references":[],"stage_id":1}]}}}`,
``,
`data: {"p":"response/status","v":"INCOMPLETE"}`,
``,
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们被问到继续","references":[],"stage_id":1}]}}}`,
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"` + prefix + `继续","references":[],"stage_id":1}]}}}`,
``,
`data: {"v":"分析"}`,
``,
@@ -27,7 +24,7 @@ func TestCollectStreamDedupesContinueSnapshotReplay(t *testing.T) {
resp := &http.Response{Body: io.NopCloser(strings.NewReader(body))}
got := CollectStream(resp, true, true)
if got.Thinking != "我们被问到继续分析" {
if got.Thinking != prefix+"继续分析" {
t.Fatalf("unexpected thinking after dedupe: %q", got.Thinking)
}
}

View File

@@ -2,6 +2,8 @@ package sse
import "strings"
const minContinuationSnapshotLen = 32
// TrimContinuationOverlap removes the already-seen prefix when DeepSeek
// continue rounds resend the full fragment snapshot instead of only the new
// suffix. Non-overlapping chunks are returned unchanged.
@@ -12,10 +14,10 @@ func TrimContinuationOverlap(existing, incoming string) string {
if existing == "" {
return incoming
}
if strings.HasPrefix(incoming, existing) {
if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(incoming, existing) {
return incoming[len(existing):]
}
if strings.HasPrefix(existing, incoming) {
if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(existing, incoming) {
return ""
}
return incoming

View File

@@ -3,8 +3,8 @@ package sse
import "testing"
func TestTrimContinuationOverlapReturnsSuffixForSnapshotReplay(t *testing.T) {
existing := "我们被问到:题目"
incoming := "我们被问到:题目继续分析"
existing := "我们被问到:这是一个很长的续答快照前缀,用来验证去重逻辑不会误伤正常 token。"
incoming := existing + "继续分析"
got := TrimContinuationOverlap(existing, incoming)
if got != "继续分析" {
t.Fatalf("expected suffix only, got %q", got)
@@ -12,8 +12,8 @@ func TestTrimContinuationOverlapReturnsSuffixForSnapshotReplay(t *testing.T) {
}
func TestTrimContinuationOverlapDropsStaleShorterSnapshot(t *testing.T) {
existing := "我们被问到:题目继续分析"
incoming := "我们被问到:题目"
incoming := "我们被问到:这是一个很长的续答快照前缀,用来验证去重逻辑不会误伤正常 token。"
existing := incoming + "继续分析"
got := TrimContinuationOverlap(existing, incoming)
if got != "" {
t.Fatalf("expected stale snapshot to be dropped, got %q", got)
@@ -28,3 +28,12 @@ func TestTrimContinuationOverlapPreservesNormalIncrement(t *testing.T) {
t.Fatalf("expected normal increment unchanged, got %q", got)
}
}
func TestTrimContinuationOverlapKeepsShortPrefixLikeNormalToken(t *testing.T) {
existing := "我们被问到"
incoming := "我们"
got := TrimContinuationOverlap(existing, incoming)
if got != "我们" {
t.Fatalf("expected short token preserved, got %q", got)
}
}

View File

@@ -22,6 +22,7 @@ const {
shouldSkipPath,
isNodeStreamSupportedPath,
extractPathname,
trimContinuationOverlap,
} = handler.__test;
test('chat-stream exposes parser test hooks', () => {
@@ -368,3 +369,10 @@ test('extractPathname strips query only', () => {
assert.equal(extractPathname('/v1/chat/completions?stream=true'), '/v1/chat/completions');
assert.equal(extractPathname('/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=1'), '/v1beta/models/gemini-2.5-flash:streamGenerateContent');
});
test('trimContinuationOverlap preserves short normal tokens and trims long snapshots', () => {
assert.equal(trimContinuationOverlap('我们被问到', '我们'), '我们');
const existing = '我们被问到:这是一个很长的续答快照前缀,用来验证去重逻辑不会误伤正常 token。';
const incoming = `${existing}继续分析`;
assert.equal(trimContinuationOverlap(existing, incoming), '继续分析');
});

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -7,6 +7,7 @@ import { createRequire } from 'node:module';
const require = createRequire(import.meta.url);
const chatStream = require('../../api/chat-stream.js');
const { parseChunkForContent } = chatStream.__test;
const { trimContinuationOverlap } = chatStream.__test;
function parseArgs(argv) {
const out = {
@@ -179,6 +180,8 @@ function parseDeepSeekReplay(raw) {
let currentType = 'thinking';
let sawFinish = false;
let outputText = '';
let thinkingText = '';
let textOutput = '';
let parsedChunks = 0;
for (const evt of events) {
@@ -201,7 +204,15 @@ function parseDeepSeekReplay(raw) {
sawFinish = true;
}
for (const part of parsed.parts) {
outputText += part.text;
if (part.type === 'thinking') {
const trimmed = trimContinuationOverlap(thinkingText, part.text);
thinkingText += trimmed;
outputText += trimmed;
} else {
const trimmed = trimContinuationOverlap(textOutput, part.text);
textOutput += trimmed;
outputText += trimmed;
}
}
}