diff --git a/internal/httpapi/claude/handler_stream_test.go b/internal/httpapi/claude/handler_stream_test.go index 5b596a8..7cd68a7 100644 --- a/internal/httpapi/claude/handler_stream_test.go +++ b/internal/httpapi/claude/handler_stream_test.go @@ -28,6 +28,18 @@ func makeClaudeSSEHTTPResponse(lines ...string) *http.Response { } } +func makeClaudeContentLine(t *testing.T, text string) string { + t.Helper() + line, err := json.Marshal(map[string]any{ + "p": "response/content", + "v": text, + }) + if err != nil { + t.Fatalf("marshal content line failed: %v", err) + } + return "data: " + string(line) +} + func parseClaudeFrames(t *testing.T, body string) []claudeFrame { t.Helper() chunks := strings.Split(body, "\n\n") @@ -71,6 +83,17 @@ func findClaudeFrames(frames []claudeFrame, event string) []claudeFrame { return out } +func collectClaudeTextDeltas(frames []claudeFrame) string { + var combined strings.Builder + for _, f := range findClaudeFrames(frames, "content_block_delta") { + delta, _ := f.Payload["delta"].(map[string]any) + if delta["type"] == "text_delta" { + combined.WriteString(asString(delta["text"])) + } + } + return combined.String() +} + func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T) { h := &Handler{} resp := makeClaudeSSEHTTPResponse( @@ -111,6 +134,26 @@ func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T) } } +func TestHandleClaudeStreamRealtimeToolBufferedPlainTextDoesNotRepeatFinalText(t *testing.T) { + h := &Handler{} + want := "明白\n\nBash\nIN\npwd\nOUT\nok" + resp := makeClaudeSSEHTTPResponse( + makeClaudeContentLine(t, "明"), + makeClaudeContentLine(t, "白\n\nBash\nIN\npwd\n"), + makeClaudeContentLine(t, "OUT\nok"), + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/anthropic/v1/messages", nil) + + h.handleClaudeStreamRealtime(rec, req, resp, "claude-sonnet-4-5", []any{map[string]any{"role": "user", "content": "use tool"}}, false, false, []string{"Bash"}, nil) + + frames := parseClaudeFrames(t, rec.Body.String()) + if got := collectClaudeTextDeltas(frames); got != want { + t.Fatalf("unexpected combined text: got %q want %q body=%s", got, want, rec.Body.String()) + } +} + func TestHandleClaudeStreamRealtimeTrimsContinuationReplay(t *testing.T) { h := &Handler{} prefix := strings.Repeat("A", 40) diff --git a/internal/httpapi/claude/stream_runtime_core.go b/internal/httpapi/claude/stream_runtime_core.go index c093a08..df73599 100644 --- a/internal/httpapi/claude/stream_runtime_core.go +++ b/internal/httpapi/claude/stream_runtime_core.go @@ -43,6 +43,7 @@ type claudeStreamRuntime struct { thinkingBlockIndex int textBlockOpen bool textBlockIndex int + textEmitted bool ended bool upstreamErr string } @@ -181,6 +182,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "text": cleanedText, }, }) + s.textEmitted = true continue } @@ -226,6 +228,7 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "text": cleaned, }, }) + s.textEmitted = true } } diff --git a/internal/httpapi/claude/stream_runtime_finalize.go b/internal/httpapi/claude/stream_runtime_finalize.go index 28e276f..89e7c9f 100644 --- a/internal/httpapi/claude/stream_runtime_finalize.go +++ b/internal/httpapi/claude/stream_runtime_finalize.go @@ -109,6 +109,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { "text": cleaned, }, }) + s.textEmitted = true } } } @@ -141,7 +142,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { s.nextBlockIndex++ s.sendToolUseBlock(idx, tc) } - } else if finalText != "" && !s.textBlockOpen { + } else if finalText != "" && !s.textEmitted { idx := s.nextBlockIndex s.nextBlockIndex++ s.send("content_block_start", map[string]any{ @@ -160,6 +161,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { "text": finalText, }, }) + s.textEmitted = true s.send("content_block_stop", map[string]any{ "type": "content_block_stop", "index": idx, diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 4bd374f..977ed5a 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -11,8 +11,7 @@ import ( const ( parsedLineBufferSize = 128 - scannerBufferSize = 64 * 1024 - maxScannerLineSize = 4 * 1024 * 1024 + lineReaderBufferSize = 64 * 1024 ) type AccumulateConfig struct { @@ -44,8 +43,7 @@ func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinking go func() { defer close(out) - scanner := bufio.NewScanner(body) - scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) + reader := bufio.NewReaderSize(body, lineReaderBufferSize) currentType := initialType var pumpErr error @@ -62,19 +60,27 @@ func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinking scanDone := make(chan error, 1) go func() { - for scanner.Scan() { - line := make([]byte, len(scanner.Bytes())) - copy(line, scanner.Bytes()) - select { - case scanCh <- line: - case <-ctx.Done(): + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + copied := append([]byte(nil), line...) + select { + case scanCh <- copied: + case <-ctx.Done(): + close(scanCh) + scanDone <- ctx.Err() + return + } + } + if err != nil { close(scanCh) - scanDone <- ctx.Err() + if err == io.EOF { + err = nil + } + scanDone <- err return } } - close(scanCh) - scanDone <- scanner.Err() }() maxWaitTimer := time.NewTimer(0) diff --git a/internal/sse/stream_test.go b/internal/sse/stream_test.go index d6addb7..ccad248 100644 --- a/internal/sse/stream_test.go +++ b/internal/sse/stream_test.go @@ -43,7 +43,7 @@ func TestStartParsedLinePumpParsesAndStops(t *testing.T) { } func TestStartParsedLinePumpHandlesLongSingleSSELine(t *testing.T) { - payload := strings.Repeat("x", 2*1024*1024+4096) + payload := strings.Repeat("x", 5*1024*1024+4096) results, done := StartParsedLinePump(context.Background(), strings.NewReader(makeLargeContentSSEBody(t, payload)), false, "text") var got strings.Builder