From 4d36afea4c625f70801d0165bb783e50aca52faf Mon Sep 17 00:00:00 2001 From: CJACK Date: Mon, 6 Apr 2026 02:01:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8E=A5=E7=BB=AD=E6=B5=81?= =?UTF-8?q?=E7=9A=84=E5=A2=9E=E9=87=8Fbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- VERSION | 2 +- .../adapter/claude/stream_runtime_core.go | 16 ++++++--- .../adapter/gemini/handler_stream_runtime.go | 14 ++++++-- .../adapter/openai/chat_stream_runtime.go | 18 +++++++--- .../openai/responses_stream_runtime_core.go | 20 +++++++---- internal/sse/consumer.go | 6 ++-- internal/sse/consumer_test.go | 33 +++++++++++++++++++ internal/sse/dedupe.go | 22 +++++++++++++ internal/sse/dedupe_test.go | 30 +++++++++++++++++ 9 files changed, 140 insertions(+), 21 deletions(-) create mode 100644 internal/sse/consumer_test.go create mode 100644 internal/sse/dedupe.go create mode 100644 internal/sse/dedupe_test.go diff --git a/VERSION b/VERSION index fd2a018..94ff29c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.1.0 +3.1.1 diff --git a/internal/adapter/claude/stream_runtime_core.go b/internal/adapter/claude/stream_runtime_core.go index 241c093..2df215b 100644 --- a/internal/adapter/claude/stream_runtime_core.go +++ b/internal/adapter/claude/stream_runtime_core.go @@ -96,7 +96,11 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse if !s.thinkingEnabled { continue } - s.thinking.WriteString(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) + if trimmed == "" { + continue + } + s.thinking.WriteString(trimmed) s.closeTextBlock() if !s.thinkingBlockOpen { s.thinkingBlockIndex = s.nextBlockIndex @@ -116,13 +120,17 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": s.thinkingBlockIndex, "delta": map[string]any{ "type": "thinking_delta", - "thinking": cleanedText, + "thinking": trimmed, }, }) continue } - s.text.WriteString(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) + if trimmed == "" { + continue + } + s.text.WriteString(trimmed) if s.bufferToolContent { if hasUnclosedCodeFence(s.text.String()) { continue @@ -148,7 +156,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": s.textBlockIndex, "delta": map[string]any{ "type": "text_delta", - "text": cleanedText, + "text": trimmed, }, }) } diff --git a/internal/adapter/gemini/handler_stream_runtime.go b/internal/adapter/gemini/handler_stream_runtime.go index 601517a..2cbe348 100644 --- a/internal/adapter/gemini/handler_stream_runtime.go +++ b/internal/adapter/gemini/handler_stream_runtime.go @@ -126,11 +126,19 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse contentSeen = true if p.Type == "thinking" { if s.thinkingEnabled { - s.thinking.WriteString(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) + if trimmed == "" { + continue + } + s.thinking.WriteString(trimmed) } continue } - s.text.WriteString(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) + if trimmed == "" { + continue + } + s.text.WriteString(trimmed) if s.bufferContent { continue } @@ -140,7 +148,7 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": 0, "content": map[string]any{ "role": "model", - "parts": []map[string]any{{"text": cleanedText}}, + "parts": []map[string]any{{"text": trimmed}}, }, }, }, diff --git a/internal/adapter/openai/chat_stream_runtime.go b/internal/adapter/openai/chat_stream_runtime.go index fb6943f..6582e09 100644 --- a/internal/adapter/openai/chat_stream_runtime.go +++ b/internal/adapter/openai/chat_stream_runtime.go @@ -221,15 +221,23 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD } if p.Type == "thinking" { if s.thinkingEnabled { - s.thinking.WriteString(cleanedText) - delta["reasoning_content"] = cleanedText + trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) + if trimmed == "" { + continue + } + s.thinking.WriteString(trimmed) + delta["reasoning_content"] = trimmed } } else { - s.text.WriteString(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) + if trimmed == "" { + continue + } + s.text.WriteString(trimmed) if !s.bufferToolContent { - delta["content"] = cleanedText + delta["content"] = trimmed } else { - events := processToolSieveChunk(&s.toolSieve, cleanedText, s.toolNames) + events := processToolSieveChunk(&s.toolSieve, trimmed, s.toolNames) for _, evt := range events { if len(evt.ToolCallDeltas) > 0 { if !s.emitEarlyToolDeltas { diff --git a/internal/adapter/openai/responses_stream_runtime_core.go b/internal/adapter/openai/responses_stream_runtime_core.go index 55fc7be..5a17dd4 100644 --- a/internal/adapter/openai/responses_stream_runtime_core.go +++ b/internal/adapter/openai/responses_stream_runtime_core.go @@ -205,17 +205,25 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa if !s.thinkingEnabled { continue } - s.thinking.WriteString(cleanedText) - s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, cleanedText)) + trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) + if trimmed == "" { + continue + } + s.thinking.WriteString(trimmed) + s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, trimmed)) continue } - s.text.WriteString(cleanedText) - if !s.bufferToolContent { - s.emitTextDelta(cleanedText) + trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) + if trimmed == "" { continue } - s.processToolStreamEvents(processToolSieveChunk(&s.sieve, cleanedText, s.toolNames), true) + s.text.WriteString(trimmed) + if !s.bufferToolContent { + s.emitTextDelta(trimmed) + continue + } + s.processToolStreamEvents(processToolSieveChunk(&s.sieve, trimmed, s.toolNames), true) } return streamengine.ParsedDecision{ContentSeen: contentSeen} diff --git a/internal/sse/consumer.go b/internal/sse/consumer.go index eb19f75..49ddd63 100644 --- a/internal/sse/consumer.go +++ b/internal/sse/consumer.go @@ -54,9 +54,11 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co } for _, p := range result.Parts { if p.Type == "thinking" { - thinking.WriteString(p.Text) + trimmed := TrimContinuationOverlap(thinking.String(), p.Text) + thinking.WriteString(trimmed) } else { - text.WriteString(p.Text) + trimmed := TrimContinuationOverlap(text.String(), p.Text) + text.WriteString(trimmed) } } return true diff --git a/internal/sse/consumer_test.go b/internal/sse/consumer_test.go new file mode 100644 index 0000000..e50528d --- /dev/null +++ b/internal/sse/consumer_test.go @@ -0,0 +1,33 @@ +package sse + +import ( + "io" + "net/http" + "strings" + "testing" +) + +func TestCollectStreamDedupesContinueSnapshotReplay(t *testing.T) { + body := strings.Join([]string{ + `data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们","references":[],"stage_id":1}]}}}`, + ``, + `data: {"p":"response/fragments/-1/content","o":"APPEND","v":"被"}`, + ``, + `data: {"v":"问到"}`, + ``, + `data: {"p":"response/status","v":"INCOMPLETE"}`, + ``, + `data: {"v":{"response":{"fragments":[{"id":2,"type":"THINK","content":"我们被问到继续","references":[],"stage_id":1}]}}}`, + ``, + `data: {"v":"分析"}`, + ``, + `data: {"p":"response/status","v":"FINISHED"}`, + ``, + }, "\n") + + resp := &http.Response{Body: io.NopCloser(strings.NewReader(body))} + got := CollectStream(resp, true, true) + if got.Thinking != "我们被问到继续分析" { + t.Fatalf("unexpected thinking after dedupe: %q", got.Thinking) + } +} diff --git a/internal/sse/dedupe.go b/internal/sse/dedupe.go new file mode 100644 index 0000000..143b350 --- /dev/null +++ b/internal/sse/dedupe.go @@ -0,0 +1,22 @@ +package sse + +import "strings" + +// TrimContinuationOverlap removes the already-seen prefix when DeepSeek +// continue rounds resend the full fragment snapshot instead of only the new +// suffix. Non-overlapping chunks are returned unchanged. +func TrimContinuationOverlap(existing, incoming string) string { + if incoming == "" { + return "" + } + if existing == "" { + return incoming + } + if strings.HasPrefix(incoming, existing) { + return incoming[len(existing):] + } + if strings.HasPrefix(existing, incoming) { + return "" + } + return incoming +} diff --git a/internal/sse/dedupe_test.go b/internal/sse/dedupe_test.go new file mode 100644 index 0000000..510a263 --- /dev/null +++ b/internal/sse/dedupe_test.go @@ -0,0 +1,30 @@ +package sse + +import "testing" + +func TestTrimContinuationOverlapReturnsSuffixForSnapshotReplay(t *testing.T) { + existing := "我们被问到:题目" + incoming := "我们被问到:题目继续分析" + got := TrimContinuationOverlap(existing, incoming) + if got != "继续分析" { + t.Fatalf("expected suffix only, got %q", got) + } +} + +func TestTrimContinuationOverlapDropsStaleShorterSnapshot(t *testing.T) { + existing := "我们被问到:题目继续分析" + incoming := "我们被问到:题目" + got := TrimContinuationOverlap(existing, incoming) + if got != "" { + t.Fatalf("expected stale snapshot to be dropped, got %q", got) + } +} + +func TestTrimContinuationOverlapPreservesNormalIncrement(t *testing.T) { + existing := "我们" + incoming := "被" + got := TrimContinuationOverlap(existing, incoming) + if got != "被" { + t.Fatalf("expected normal increment unchanged, got %q", got) + } +}