From eccd8c957b5f22eb58429d2b3d21dc5077757e92 Mon Sep 17 00:00:00 2001 From: CJACK Date: Sat, 2 May 2026 21:34:36 +0800 Subject: [PATCH] fix: prevent continuation replay overlap by trimming redundant text from thinking and response streams --- .../httpapi/claude/handler_stream_test.go | 26 ++++++++++++++ .../httpapi/claude/stream_runtime_core.go | 34 ++++++++++++------- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/internal/httpapi/claude/handler_stream_test.go b/internal/httpapi/claude/handler_stream_test.go index 731c3a9..5b596a8 100644 --- a/internal/httpapi/claude/handler_stream_test.go +++ b/internal/httpapi/claude/handler_stream_test.go @@ -111,6 +111,32 @@ func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T) } } +func TestHandleClaudeStreamRealtimeTrimsContinuationReplay(t *testing.T) { + h := &Handler{} + prefix := strings.Repeat("A", 40) + resp := makeClaudeSSEHTTPResponse( + `data: {"p":"response/content","v":"`+prefix+`"}`, + `data: {"p":"response/content","v":"`+prefix+` tail"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/anthropic/v1/messages", nil) + + h.handleClaudeStreamRealtime(rec, req, resp, "claude-sonnet-4-5", []any{map[string]any{"role": "user", "content": "hi"}}, false, false, nil, nil) + + frames := parseClaudeFrames(t, rec.Body.String()) + combined := strings.Builder{} + for _, f := range findClaudeFrames(frames, "content_block_delta") { + delta, _ := f.Payload["delta"].(map[string]any) + if delta["type"] == "text_delta" { + combined.WriteString(asString(delta["text"])) + } + } + if got, want := combined.String(), prefix+" tail"; got != want { + t.Fatalf("unexpected combined text: got %q want %q body=%s", got, want, rec.Body.String()) + } +} + func TestHandleClaudeStreamRealtimeThinkingDelta(t *testing.T) { h := &Handler{} resp := makeClaudeSSEHTTPResponse( diff --git a/internal/httpapi/claude/stream_runtime_core.go b/internal/httpapi/claude/stream_runtime_core.go index 415a448..c093a08 100644 --- a/internal/httpapi/claude/stream_runtime_core.go +++ b/internal/httpapi/claude/stream_runtime_core.go @@ -99,7 +99,21 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse } } for _, p := range parsed.Parts { - cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) + var rawTrimmed string + if p.Type == "thinking" { + rawTrimmed = sse.TrimContinuationOverlapFromBuilder(&s.rawThinking, p.Text) + } else { + rawTrimmed = sse.TrimContinuationOverlapFromBuilder(&s.rawText, p.Text) + } + if rawTrimmed == "" { + continue + } + if p.Type == "thinking" { + s.rawThinking.WriteString(rawTrimmed) + } else { + s.rawText.WriteString(rawTrimmed) + } + cleanedText := cleanVisibleOutput(rawTrimmed, s.stripReferenceMarkers) if cleanedText == "" { continue } @@ -109,14 +123,14 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse contentSeen = true if p.Type == "thinking" { - s.rawThinking.WriteString(p.Text) if !s.thinkingEnabled { continue } - if cleanedText == "" { + trimmed := sse.TrimContinuationOverlapFromBuilder(&s.thinking, cleanedText) + if trimmed == "" { continue } - s.thinking.WriteString(cleanedText) + s.thinking.WriteString(trimmed) s.closeTextBlock() if !s.thinkingBlockOpen { s.thinkingBlockIndex = s.nextBlockIndex @@ -136,21 +150,15 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": s.thinkingBlockIndex, "delta": map[string]any{ "type": "thinking_delta", - "thinking": cleanedText, + "thinking": trimmed, }, }) continue } - s.rawText.WriteString(p.Text) - if cleanedText != "" { - s.text.WriteString(cleanedText) - } + s.text.WriteString(cleanedText) if !s.bufferToolContent { - if cleanedText == "" { - continue - } s.closeThinkingBlock() if !s.textBlockOpen { s.textBlockIndex = s.nextBlockIndex @@ -176,7 +184,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse continue } - events := toolstream.ProcessChunk(&s.sieve, p.Text, s.toolNames) + events := toolstream.ProcessChunk(&s.sieve, rawTrimmed, s.toolNames) for _, evt := range events { if len(evt.ToolCalls) > 0 { s.closeTextBlock()