修复接续流的增量bug

This commit is contained in:
CJACK
2026-04-06 02:01:41 +08:00
parent a608a4bd95
commit 4d36afea4c
9 changed files with 140 additions and 21 deletions

View File

@@ -96,7 +96,11 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
if !s.thinkingEnabled {
continue
}
s.thinking.WriteString(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText)
if trimmed == "" {
continue
}
s.thinking.WriteString(trimmed)
s.closeTextBlock()
if !s.thinkingBlockOpen {
s.thinkingBlockIndex = s.nextBlockIndex
@@ -116,13 +120,17 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
"index": s.thinkingBlockIndex,
"delta": map[string]any{
"type": "thinking_delta",
"thinking": cleanedText,
"thinking": trimmed,
},
})
continue
}
s.text.WriteString(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText)
if trimmed == "" {
continue
}
s.text.WriteString(trimmed)
if s.bufferToolContent {
if hasUnclosedCodeFence(s.text.String()) {
continue
@@ -148,7 +156,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
"index": s.textBlockIndex,
"delta": map[string]any{
"type": "text_delta",
"text": cleanedText,
"text": trimmed,
},
})
}

View File

@@ -126,11 +126,19 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
contentSeen = true
if p.Type == "thinking" {
if s.thinkingEnabled {
s.thinking.WriteString(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText)
if trimmed == "" {
continue
}
s.thinking.WriteString(trimmed)
}
continue
}
s.text.WriteString(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText)
if trimmed == "" {
continue
}
s.text.WriteString(trimmed)
if s.bufferContent {
continue
}
@@ -140,7 +148,7 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
"index": 0,
"content": map[string]any{
"role": "model",
"parts": []map[string]any{{"text": cleanedText}},
"parts": []map[string]any{{"text": trimmed}},
},
},
},

View File

@@ -221,15 +221,23 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
}
if p.Type == "thinking" {
if s.thinkingEnabled {
s.thinking.WriteString(cleanedText)
delta["reasoning_content"] = cleanedText
trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText)
if trimmed == "" {
continue
}
s.thinking.WriteString(trimmed)
delta["reasoning_content"] = trimmed
}
} else {
s.text.WriteString(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText)
if trimmed == "" {
continue
}
s.text.WriteString(trimmed)
if !s.bufferToolContent {
delta["content"] = cleanedText
delta["content"] = trimmed
} else {
events := processToolSieveChunk(&s.toolSieve, cleanedText, s.toolNames)
events := processToolSieveChunk(&s.toolSieve, trimmed, s.toolNames)
for _, evt := range events {
if len(evt.ToolCallDeltas) > 0 {
if !s.emitEarlyToolDeltas {

View File

@@ -205,17 +205,25 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
if !s.thinkingEnabled {
continue
}
s.thinking.WriteString(cleanedText)
s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, cleanedText))
trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText)
if trimmed == "" {
continue
}
s.thinking.WriteString(trimmed)
s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, trimmed))
continue
}
s.text.WriteString(cleanedText)
if !s.bufferToolContent {
s.emitTextDelta(cleanedText)
trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText)
if trimmed == "" {
continue
}
s.processToolStreamEvents(processToolSieveChunk(&s.sieve, cleanedText, s.toolNames), true)
s.text.WriteString(trimmed)
if !s.bufferToolContent {
s.emitTextDelta(trimmed)
continue
}
s.processToolStreamEvents(processToolSieveChunk(&s.sieve, trimmed, s.toolNames), true)
}
return streamengine.ParsedDecision{ContentSeen: contentSeen}

View File

@@ -54,9 +54,11 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
}
for _, p := range result.Parts {
if p.Type == "thinking" {
thinking.WriteString(p.Text)
trimmed := TrimContinuationOverlap(thinking.String(), p.Text)
thinking.WriteString(trimmed)
} else {
text.WriteString(p.Text)
trimmed := TrimContinuationOverlap(text.String(), p.Text)
text.WriteString(trimmed)
}
}
return true

View File

@@ -0,0 +1,33 @@
package sse
import (
"io"
"net/http"
"strings"
"testing"
)
func TestCollectStreamDedupesContinueSnapshotReplay(t *testing.T) {
body := strings.Join([]string{
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们","references":[],"stage_id":1}]}}}`,
``,
`data: {"p":"response/fragments/-1/content","o":"APPEND","v":"被"}`,
``,
`data: {"v":"问到"}`,
``,
`data: {"p":"response/status","v":"INCOMPLETE"}`,
``,
`data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们被问到继续","references":[],"stage_id":1}]}}}`,
``,
`data: {"v":"分析"}`,
``,
`data: {"p":"response/status","v":"FINISHED"}`,
``,
}, "\n")
resp := &http.Response{Body: io.NopCloser(strings.NewReader(body))}
got := CollectStream(resp, true, true)
if got.Thinking != "我们被问到继续分析" {
t.Fatalf("unexpected thinking after dedupe: %q", got.Thinking)
}
}

22
internal/sse/dedupe.go Normal file
View File

@@ -0,0 +1,22 @@
package sse
import "strings"
// TrimContinuationOverlap removes the already-seen prefix when DeepSeek
// continue rounds resend the full fragment snapshot instead of only the new
// suffix. Non-overlapping chunks are returned unchanged.
func TrimContinuationOverlap(existing, incoming string) string {
if incoming == "" {
return ""
}
if existing == "" {
return incoming
}
if strings.HasPrefix(incoming, existing) {
return incoming[len(existing):]
}
if strings.HasPrefix(existing, incoming) {
return ""
}
return incoming
}

View File

@@ -0,0 +1,30 @@
package sse
import "testing"
func TestTrimContinuationOverlapReturnsSuffixForSnapshotReplay(t *testing.T) {
existing := "我们被问到:题目"
incoming := "我们被问到:题目继续分析"
got := TrimContinuationOverlap(existing, incoming)
if got != "继续分析" {
t.Fatalf("expected suffix only, got %q", got)
}
}
func TestTrimContinuationOverlapDropsStaleShorterSnapshot(t *testing.T) {
existing := "我们被问到:题目继续分析"
incoming := "我们被问到:题目"
got := TrimContinuationOverlap(existing, incoming)
if got != "" {
t.Fatalf("expected stale snapshot to be dropped, got %q", got)
}
}
func TestTrimContinuationOverlapPreservesNormalIncrement(t *testing.T) {
existing := "我们"
incoming := "被"
got := TrimContinuationOverlap(existing, incoming)
if got != "被" {
t.Fatalf("expected normal increment unchanged, got %q", got)
}
}