refactor: replace bufio.Scanner with bufio.Reader for SSE stream parsing and track emitted text to prevent redundant output blocks

This commit is contained in:
CJACK
2026-05-02 23:50:35 +08:00
parent dc5bffdf89
commit a901250de7
5 changed files with 69 additions and 15 deletions

View File

@@ -28,6 +28,18 @@ func makeClaudeSSEHTTPResponse(lines ...string) *http.Response {
}
}
func makeClaudeContentLine(t *testing.T, text string) string {
t.Helper()
line, err := json.Marshal(map[string]any{
"p": "response/content",
"v": text,
})
if err != nil {
t.Fatalf("marshal content line failed: %v", err)
}
return "data: " + string(line)
}
func parseClaudeFrames(t *testing.T, body string) []claudeFrame {
t.Helper()
chunks := strings.Split(body, "\n\n")
@@ -71,6 +83,17 @@ func findClaudeFrames(frames []claudeFrame, event string) []claudeFrame {
return out
}
func collectClaudeTextDeltas(frames []claudeFrame) string {
var combined strings.Builder
for _, f := range findClaudeFrames(frames, "content_block_delta") {
delta, _ := f.Payload["delta"].(map[string]any)
if delta["type"] == "text_delta" {
combined.WriteString(asString(delta["text"]))
}
}
return combined.String()
}
func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T) {
h := &Handler{}
resp := makeClaudeSSEHTTPResponse(
@@ -111,6 +134,26 @@ func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T)
}
}
func TestHandleClaudeStreamRealtimeToolBufferedPlainTextDoesNotRepeatFinalText(t *testing.T) {
h := &Handler{}
want := "明白\n\nBash\nIN\npwd\nOUT\nok"
resp := makeClaudeSSEHTTPResponse(
makeClaudeContentLine(t, "明"),
makeClaudeContentLine(t, "白\n\nBash\nIN\npwd\n"),
makeClaudeContentLine(t, "OUT\nok"),
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/anthropic/v1/messages", nil)
h.handleClaudeStreamRealtime(rec, req, resp, "claude-sonnet-4-5", []any{map[string]any{"role": "user", "content": "use tool"}}, false, false, []string{"Bash"}, nil)
frames := parseClaudeFrames(t, rec.Body.String())
if got := collectClaudeTextDeltas(frames); got != want {
t.Fatalf("unexpected combined text: got %q want %q body=%s", got, want, rec.Body.String())
}
}
func TestHandleClaudeStreamRealtimeTrimsContinuationReplay(t *testing.T) {
h := &Handler{}
prefix := strings.Repeat("A", 40)

View File

@@ -43,6 +43,7 @@ type claudeStreamRuntime struct {
thinkingBlockIndex int
textBlockOpen bool
textBlockIndex int
textEmitted bool
ended bool
upstreamErr string
}
@@ -181,6 +182,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
"text": cleanedText,
},
})
s.textEmitted = true
continue
}
@@ -226,6 +228,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
"text": cleaned,
},
})
s.textEmitted = true
}
}

View File

@@ -109,6 +109,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
"text": cleaned,
},
})
s.textEmitted = true
}
}
}
@@ -141,7 +142,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
s.nextBlockIndex++
s.sendToolUseBlock(idx, tc)
}
} else if finalText != "" && !s.textBlockOpen {
} else if finalText != "" && !s.textEmitted {
idx := s.nextBlockIndex
s.nextBlockIndex++
s.send("content_block_start", map[string]any{
@@ -160,6 +161,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
"text": finalText,
},
})
s.textEmitted = true
s.send("content_block_stop", map[string]any{
"type": "content_block_stop",
"index": idx,

View File

@@ -11,8 +11,7 @@ import (
const (
parsedLineBufferSize = 128
scannerBufferSize = 64 * 1024
maxScannerLineSize = 4 * 1024 * 1024
lineReaderBufferSize = 64 * 1024
)
type AccumulateConfig struct {
@@ -44,8 +43,7 @@ func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinking
go func() {
defer close(out)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
reader := bufio.NewReaderSize(body, lineReaderBufferSize)
currentType := initialType
var pumpErr error
@@ -62,19 +60,27 @@ func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinking
scanDone := make(chan error, 1)
go func() {
for scanner.Scan() {
line := make([]byte, len(scanner.Bytes()))
copy(line, scanner.Bytes())
select {
case scanCh <- line:
case <-ctx.Done():
for {
line, err := reader.ReadBytes('\n')
if len(line) > 0 {
copied := append([]byte(nil), line...)
select {
case scanCh <- copied:
case <-ctx.Done():
close(scanCh)
scanDone <- ctx.Err()
return
}
}
if err != nil {
close(scanCh)
scanDone <- ctx.Err()
if err == io.EOF {
err = nil
}
scanDone <- err
return
}
}
close(scanCh)
scanDone <- scanner.Err()
}()
maxWaitTimer := time.NewTimer(0)

View File

@@ -43,7 +43,7 @@ func TestStartParsedLinePumpParsesAndStops(t *testing.T) {
}
func TestStartParsedLinePumpHandlesLongSingleSSELine(t *testing.T) {
payload := strings.Repeat("x", 2*1024*1024+4096)
payload := strings.Repeat("x", 5*1024*1024+4096)
results, done := StartParsedLinePump(context.Background(), strings.NewReader(makeLargeContentSSEBody(t, payload)), false, "text")
var got strings.Builder