From 89225c778e49abc3c11ac67390f7a9a7b93fddf1 Mon Sep 17 00:00:00 2001 From: "CJACK." <155826701+CJackHwang@users.noreply.github.com> Date: Wed, 29 Apr 2026 18:58:54 +0800 Subject: [PATCH 1/2] fix(sse): batch tiny stream chunks before emitting --- internal/sse/stream.go | 107 ++++++++++++++++++++++++++++--- internal/sse/stream_edge_test.go | 32 ++++++++- 2 files changed, 129 insertions(+), 10 deletions(-) diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 4aa2d39..188f28d 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -4,12 +4,15 @@ import ( "bufio" "context" "io" + "time" ) const ( parsedLineBufferSize = 128 scannerBufferSize = 64 * 1024 maxScannerLineSize = 2 * 1024 * 1024 + minFlushChars = 160 + maxFlushWait = 80 * time.Millisecond ) // StartParsedLinePump scans an upstream DeepSeek SSE body and emits normalized @@ -20,21 +23,109 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo done := make(chan error, 1) go func() { defer close(out) - scanner := bufio.NewScanner(body) - scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) + type scanItem struct { + line []byte + err error + eof bool + } + lineCh := make(chan scanItem, 1) + go func() { + scanner := bufio.NewScanner(body) + scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) + for scanner.Scan() { + line := append([]byte{}, scanner.Bytes()...) + lineCh <- scanItem{line: line} + } + lineCh <- scanItem{err: scanner.Err(), eof: true} + close(lineCh) + }() + + ticker := time.NewTicker(maxFlushWait) + defer ticker.Stop() currentType := initialType - for scanner.Scan() { - line := append([]byte{}, scanner.Bytes()...) - result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType) - currentType = result.NextType + var pending *LineResult + pendingChars := 0 + + sendResult := func(r LineResult) bool { + select { + case out <- r: + return true + case <-ctx.Done(): + done <- ctx.Err() + return false + } + } + + flushPending := func() bool { + if pending == nil { + return true + } + if !sendResult(*pending) { + return false + } + pending = nil + pendingChars = 0 + return true + } + + for { select { - case out <- result: case <-ctx.Done(): done <- ctx.Err() return + case <-ticker.C: + if !flushPending() { + return + } + case item, ok := <-lineCh: + if !ok || item.eof { + if !flushPending() { + return + } + done <- item.err + return + } + line := item.line + result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType) + currentType = result.NextType + + canAccumulate := result.Parsed && !result.Stop && result.ErrorMessage == "" && !result.ContentFilter && result.ResponseMessageID == 0 + if canAccumulate { + lineChars := 0 + for _, p := range result.Parts { + lineChars += len(p.Text) + } + for _, p := range result.ToolDetectionThinkingParts { + lineChars += len(p.Text) + } + if lineChars > 0 { + if pending == nil { + cp := result + pending = &cp + } else { + pending.Parts = append(pending.Parts, result.Parts...) + pending.ToolDetectionThinkingParts = append(pending.ToolDetectionThinkingParts, result.ToolDetectionThinkingParts...) + pending.NextType = result.NextType + } + pendingChars += lineChars + if pendingChars < minFlushChars { + continue + } + if !flushPending() { + return + } + continue + } + } + + if !flushPending() { + return + } + if !sendResult(result) { + return + } } } - done <- scanner.Err() }() return out, done } diff --git a/internal/sse/stream_edge_test.go b/internal/sse/stream_edge_test.go index 5de3a58..7c5ea4c 100644 --- a/internal/sse/stream_edge_test.go +++ b/internal/sse/stream_edge_test.go @@ -38,8 +38,8 @@ func TestStartParsedLinePumpMultipleLines(t *testing.T) { if err := <-done; err != nil { t.Fatalf("unexpected error: %v", err) } - if len(collected) < 3 { - t.Fatalf("expected at least 3 results, got %d", len(collected)) + if len(collected) < 2 { + t.Fatalf("expected at least 2 results, got %d", len(collected)) } // First should be thinking if collected[0].Parts[0].Type != "thinking" { @@ -175,3 +175,31 @@ func TestStartParsedLinePumpThinkingDisabled(t *testing.T) { t.Fatalf("expected at least 1 part, got %d", len(parts)) } } + +func TestStartParsedLinePumpAccumulatesSmallChunks(t *testing.T) { + body := strings.NewReader( + "data: {\"p\":\"response/content\",\"v\":\"h\"}\n" + + "data: {\"p\":\"response/content\",\"v\":\"i\"}\n" + + "data: [DONE]\n", + ) + + results, done := StartParsedLinePump(context.Background(), body, false, "text") + + collected := make([]LineResult, 0) + for r := range results { + collected = append(collected, r) + } + if err := <-done; err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(collected) != 2 { + t.Fatalf("expected 2 results (accumulated content + done), got %d", len(collected)) + } + if len(collected[0].Parts) != 2 { + t.Fatalf("expected 2 accumulated parts, got %d", len(collected[0].Parts)) + } + if !collected[1].Stop { + t.Fatal("expected second result to stop") + } +} From 6d3979a1d65799fb27188f5afee2116a948fb014 Mon Sep 17 00:00:00 2001 From: "CJACK." <155826701+CJackHwang@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:59:22 +0800 Subject: [PATCH 2/2] fix(sse): stop scanner sender when stream context cancels --- internal/sse/stream.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 188f28d..8b8aa9b 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -29,15 +29,29 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo eof bool } lineCh := make(chan scanItem, 1) + stopScanner := make(chan struct{}) + defer close(stopScanner) go func() { + sendScanItem := func(item scanItem) bool { + select { + case lineCh <- item: + return true + case <-ctx.Done(): + return false + case <-stopScanner: + return false + } + } + defer close(lineCh) scanner := bufio.NewScanner(body) scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) for scanner.Scan() { line := append([]byte{}, scanner.Bytes()...) - lineCh <- scanItem{line: line} + if !sendScanItem(scanItem{line: line}) { + return + } } - lineCh <- scanItem{err: scanner.Err(), eof: true} - close(lineCh) + _ = sendScanItem(scanItem{err: scanner.Err(), eof: true}) }() ticker := time.NewTicker(maxFlushWait)