Merge pull request #362 from CJackHwang/codex/fix-issue-based-on-feedback

fix(sse): batch tiny stream chunks before emitting
This commit is contained in:
CJACK.
2026-04-29 23:07:03 +08:00
committed by GitHub
2 changed files with 143 additions and 10 deletions

View File

@@ -4,12 +4,15 @@ import (
"bufio"
"context"
"io"
"time"
)
const (
parsedLineBufferSize = 128
scannerBufferSize = 64 * 1024
maxScannerLineSize = 2 * 1024 * 1024
minFlushChars = 160
maxFlushWait = 80 * time.Millisecond
)
// StartParsedLinePump scans an upstream DeepSeek SSE body and emits normalized
@@ -20,21 +23,123 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo
done := make(chan error, 1)
go func() {
defer close(out)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
type scanItem struct {
line []byte
err error
eof bool
}
lineCh := make(chan scanItem, 1)
stopScanner := make(chan struct{})
defer close(stopScanner)
go func() {
sendScanItem := func(item scanItem) bool {
select {
case lineCh <- item:
return true
case <-ctx.Done():
return false
case <-stopScanner:
return false
}
}
defer close(lineCh)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
for scanner.Scan() {
line := append([]byte{}, scanner.Bytes()...)
if !sendScanItem(scanItem{line: line}) {
return
}
}
_ = sendScanItem(scanItem{err: scanner.Err(), eof: true})
}()
ticker := time.NewTicker(maxFlushWait)
defer ticker.Stop()
currentType := initialType
for scanner.Scan() {
line := append([]byte{}, scanner.Bytes()...)
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
currentType = result.NextType
var pending *LineResult
pendingChars := 0
sendResult := func(r LineResult) bool {
select {
case out <- r:
return true
case <-ctx.Done():
done <- ctx.Err()
return false
}
}
flushPending := func() bool {
if pending == nil {
return true
}
if !sendResult(*pending) {
return false
}
pending = nil
pendingChars = 0
return true
}
for {
select {
case out <- result:
case <-ctx.Done():
done <- ctx.Err()
return
case <-ticker.C:
if !flushPending() {
return
}
case item, ok := <-lineCh:
if !ok || item.eof {
if !flushPending() {
return
}
done <- item.err
return
}
line := item.line
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
currentType = result.NextType
canAccumulate := result.Parsed && !result.Stop && result.ErrorMessage == "" && !result.ContentFilter && result.ResponseMessageID == 0
if canAccumulate {
lineChars := 0
for _, p := range result.Parts {
lineChars += len(p.Text)
}
for _, p := range result.ToolDetectionThinkingParts {
lineChars += len(p.Text)
}
if lineChars > 0 {
if pending == nil {
cp := result
pending = &cp
} else {
pending.Parts = append(pending.Parts, result.Parts...)
pending.ToolDetectionThinkingParts = append(pending.ToolDetectionThinkingParts, result.ToolDetectionThinkingParts...)
pending.NextType = result.NextType
}
pendingChars += lineChars
if pendingChars < minFlushChars {
continue
}
if !flushPending() {
return
}
continue
}
}
if !flushPending() {
return
}
if !sendResult(result) {
return
}
}
}
done <- scanner.Err()
}()
return out, done
}

View File

@@ -38,8 +38,8 @@ func TestStartParsedLinePumpMultipleLines(t *testing.T) {
if err := <-done; err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(collected) < 3 {
t.Fatalf("expected at least 3 results, got %d", len(collected))
if len(collected) < 2 {
t.Fatalf("expected at least 2 results, got %d", len(collected))
}
// First should be thinking
if collected[0].Parts[0].Type != "thinking" {
@@ -175,3 +175,31 @@ func TestStartParsedLinePumpThinkingDisabled(t *testing.T) {
t.Fatalf("expected at least 1 part, got %d", len(parts))
}
}
func TestStartParsedLinePumpAccumulatesSmallChunks(t *testing.T) {
body := strings.NewReader(
"data: {\"p\":\"response/content\",\"v\":\"h\"}\n" +
"data: {\"p\":\"response/content\",\"v\":\"i\"}\n" +
"data: [DONE]\n",
)
results, done := StartParsedLinePump(context.Background(), body, false, "text")
collected := make([]LineResult, 0)
for r := range results {
collected = append(collected, r)
}
if err := <-done; err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(collected) != 2 {
t.Fatalf("expected 2 results (accumulated content + done), got %d", len(collected))
}
if len(collected[0].Parts) != 2 {
t.Fatalf("expected 2 accumulated parts, got %d", len(collected[0].Parts))
}
if !collected[1].Stop {
t.Fatal("expected second result to stop")
}
}