perf(streaming): optimize TTFT and reduce buffering latency

Core changes:
- stream.go: New accumulation buffer architecture with scanner goroutine
  + select loop, MinChars=16, MaxWait=10ms, first-flush-immediate
- dedupe.go: Add TrimContinuationOverlapFromBuilder to avoid string copies
- claude/stream_runtime_core.go: Integrate toolstream for incremental text
- claude/stream_runtime_finalize.go: toolstream flush support
- stream_emitter.js: Reduce DeltaCoalescer thresholds (160->16 chars, 80->20ms)
- empty_retry: Add thinking-aware empty output detection
- Fix reasoning_content leak and finish_reason=null in edge cases
- Fix tail content truncation when max_tokens exceeded

Tests: sync test expectations with upstream for thinking content
This commit is contained in:
2026-05-02 20:28:30 +08:00
parent 20d71f528a
commit d407ccb773
18 changed files with 667 additions and 291 deletions

View File

@@ -4,9 +4,6 @@ import "strings"
const minContinuationSnapshotLen = 32
// TrimContinuationOverlap removes the already-seen prefix when DeepSeek
// continue rounds resend the full fragment snapshot instead of only the new
// suffix. Non-overlapping chunks are returned unchanged.
func TrimContinuationOverlap(existing, incoming string) string {
if incoming == "" {
return ""
@@ -14,11 +11,44 @@ func TrimContinuationOverlap(existing, incoming string) string {
if existing == "" {
return incoming
}
if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(incoming, existing) {
return incoming[len(existing):]
if len(incoming) < minContinuationSnapshotLen {
return incoming
}
if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(existing, incoming) {
if len(incoming) > len(existing) {
if strings.HasPrefix(incoming, existing) {
return incoming[len(existing):]
}
return incoming
}
if len(incoming) < len(existing) && strings.HasPrefix(existing, incoming) {
return ""
}
return incoming
}
func TrimContinuationOverlapFromBuilder(existing *strings.Builder, incoming string) string {
if incoming == "" {
return ""
}
if existing == nil || existing.Len() == 0 {
return incoming
}
if len(incoming) < minContinuationSnapshotLen {
return incoming
}
existingLen := existing.Len()
if len(incoming) > existingLen {
existingStr := existing.String()
if strings.HasPrefix(incoming, existingStr) {
return incoming[existingLen:]
}
return incoming
}
if len(incoming) < existingLen {
existingStr := existing.String()
if strings.HasPrefix(existingStr, incoming) {
return ""
}
}
return incoming
}

View File

@@ -4,149 +4,353 @@ import (
"bufio"
"context"
"io"
"strings"
"time"
"unicode/utf8"
)
const (
parsedLineBufferSize = 128
lineReaderBufferSize = 64 * 1024
minFlushChars = 160
maxFlushWait = 80 * time.Millisecond
scannerBufferSize = 64 * 1024
maxScannerLineSize = 4 * 1024 * 1024
)
// StartParsedLinePump scans an upstream DeepSeek SSE body and emits normalized
// line parse results. It centralizes scanner setup + current fragment type
// tracking for all streaming adapters.
type AccumulateConfig struct {
Enabled bool
MinChars int
MaxWait time.Duration
FlushOnFinish bool
WordBoundary bool
FlushOnNewline bool
}
var productionAccumulate = AccumulateConfig{
Enabled: true,
MinChars: 16,
MaxWait: 10 * time.Millisecond,
FlushOnFinish: true,
WordBoundary: false,
FlushOnNewline: true,
}
func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bool, initialType string) (<-chan LineResult, <-chan error) {
return startParsedLinePumpWithConfig(ctx, body, thinkingEnabled, initialType, productionAccumulate)
}
func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinkingEnabled bool, initialType string, cfg AccumulateConfig) (<-chan LineResult, <-chan error) {
out := make(chan LineResult, parsedLineBufferSize)
done := make(chan error, 1)
go func() {
defer close(out)
type scanItem struct {
line []byte
err error
eof bool
}
lineCh := make(chan scanItem, 1)
stopReader := make(chan struct{})
defer close(stopReader)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
currentType := initialType
var pumpErr error
var textBuffer strings.Builder
var thinkingBuffer strings.Builder
var toolDetectionThinkingBuffer strings.Builder
var textPendingType string
var thinkingPendingType string
var anyFlushed bool
var pendingResponseMessageID int
scanCh := make(chan []byte, parsedLineBufferSize)
scanDone := make(chan error, 1)
go func() {
sendScanItem := func(item scanItem) bool {
for scanner.Scan() {
line := make([]byte, len(scanner.Bytes()))
copy(line, scanner.Bytes())
select {
case lineCh <- item:
return true
case scanCh <- line:
case <-ctx.Done():
return false
case <-stopReader:
return false
}
}
defer close(lineCh)
reader := bufio.NewReaderSize(body, lineReaderBufferSize)
for {
line, err := reader.ReadBytes('\n')
if len(line) > 0 {
line = append([]byte{}, line...)
if !sendScanItem(scanItem{line: line}) {
return
}
}
if err != nil {
if err == io.EOF {
err = nil
}
_ = sendScanItem(scanItem{err: err, eof: true})
close(scanCh)
scanDone <- ctx.Err()
return
}
}
close(scanCh)
scanDone <- scanner.Err()
}()
ticker := time.NewTicker(maxFlushWait)
defer ticker.Stop()
currentType := initialType
var pending *LineResult
pendingChars := 0
maxWaitTimer := time.NewTimer(0)
if !maxWaitTimer.Stop() {
<-maxWaitTimer.C
}
maxWaitActive := false
sendResult := func(r LineResult) bool {
select {
case out <- r:
return true
case <-ctx.Done():
done <- ctx.Err()
return false
resetMaxWait := func() {
if maxWaitActive {
if !maxWaitTimer.Stop() {
select {
case <-maxWaitTimer.C:
default:
}
}
}
maxWaitTimer.Reset(cfg.MaxWait)
maxWaitActive = true
}
stopMaxWait := func() {
if maxWaitActive {
if !maxWaitTimer.Stop() {
select {
case <-maxWaitTimer.C:
default:
}
}
maxWaitActive = false
}
}
flushPending := func() bool {
if pending == nil {
defer stopMaxWait()
shouldFlushImmediate := func(text string) bool {
if cfg.FlushOnNewline && strings.ContainsAny(text, "\n\r") {
return true
}
if !sendResult(*pending) {
return false
return false
}
hasBufferedData := func() bool {
return textBuffer.Len() > 0 || thinkingBuffer.Len() > 0 || toolDetectionThinkingBuffer.Len() > 0
}
flushBuffer := func(force bool) {
if !cfg.Enabled {
return
}
textChars := utf8.RuneCountInString(textBuffer.String())
thinkingChars := utf8.RuneCountInString(thinkingBuffer.String())
shouldFlush := force ||
!anyFlushed ||
textChars >= cfg.MinChars ||
(thinkingChars > 0 && textChars >= 50)
if !shouldFlush {
return
}
anyFlushed = true
var parts []ContentPart
if thinkingChars > 0 {
parts = append(parts, ContentPart{Text: thinkingBuffer.String(), Type: thinkingPendingType})
thinkingBuffer.Reset()
}
if textChars > 0 {
parts = append(parts, ContentPart{Text: textBuffer.String(), Type: textPendingType})
textBuffer.Reset()
}
if len(parts) > 0 || toolDetectionThinkingBuffer.Len() > 0 {
var detectionParts []ContentPart
if toolDetectionThinkingBuffer.Len() > 0 {
detectionParts = append(detectionParts, ContentPart{Text: toolDetectionThinkingBuffer.String(), Type: "thinking"})
toolDetectionThinkingBuffer.Reset()
}
result := LineResult{
Parsed: true,
Stop: false,
Parts: parts,
ToolDetectionThinkingParts: detectionParts,
NextType: currentType,
ResponseMessageID: pendingResponseMessageID,
}
pendingResponseMessageID = 0
select {
case out <- result:
case <-ctx.Done():
pumpErr = ctx.Err()
return
}
}
if hasBufferedData() {
resetMaxWait()
} else {
stopMaxWait()
}
}
processLine := func(result LineResult) bool {
currentType = result.NextType
if result.ResponseMessageID > 0 {
pendingResponseMessageID = result.ResponseMessageID
}
if result.Stop {
if cfg.Enabled && cfg.FlushOnFinish {
for _, p := range result.ToolDetectionThinkingParts {
toolDetectionThinkingBuffer.WriteString(p.Text)
}
if textBuffer.Len() > 0 || len(result.Parts) > 0 || toolDetectionThinkingBuffer.Len() > 0 {
for _, p := range result.Parts {
if p.Type == "thinking" {
thinkingBuffer.WriteString(p.Text)
thinkingPendingType = "thinking"
} else {
textBuffer.WriteString(p.Text)
textPendingType = p.Type
}
}
flushBuffer(true)
}
} else if !cfg.Enabled {
var filteredParts []ContentPart
for _, p := range result.Parts {
if p.Type == "thinking" && !thinkingEnabled {
continue
}
filteredParts = append(filteredParts, p)
}
result.Parts = filteredParts
}
if result.ErrorMessage != "" || result.ContentFilter {
select {
case out <- result:
case <-ctx.Done():
pumpErr = ctx.Err()
return false
}
} else {
stopResult := LineResult{
Parsed: true,
Stop: true,
NextType: currentType,
ResponseMessageID: pendingResponseMessageID,
}
pendingResponseMessageID = 0
select {
case out <- stopResult:
case <-ctx.Done():
pumpErr = ctx.Err()
return false
}
}
return true
}
if !result.Parsed {
return true
}
if cfg.Enabled {
for _, p := range result.ToolDetectionThinkingParts {
toolDetectionThinkingBuffer.WriteString(p.Text)
}
for _, p := range result.Parts {
if p.Type == "thinking" {
if textBuffer.Len() > 0 {
flushBuffer(true)
}
thinkingBuffer.WriteString(p.Text)
thinkingPendingType = "thinking"
} else {
textBuffer.WriteString(p.Text)
textPendingType = p.Type
if shouldFlushImmediate(p.Text) {
flushBuffer(true)
}
}
}
if utf8.RuneCountInString(textBuffer.String()) >= cfg.MinChars {
flushBuffer(false)
}
if hasBufferedData() && !maxWaitActive {
resetMaxWait()
}
} else {
var parts []ContentPart
for _, p := range result.Parts {
if p.Type == "thinking" && !thinkingEnabled {
continue
}
parts = append(parts, p)
}
if len(parts) > 0 || len(result.ToolDetectionThinkingParts) > 0 {
filteredResult := LineResult{
Parsed: true,
Stop: false,
Parts: parts,
ToolDetectionThinkingParts: result.ToolDetectionThinkingParts,
NextType: currentType,
}
select {
case out <- filteredResult:
case <-ctx.Done():
pumpErr = ctx.Err()
return false
}
}
}
pending = nil
pendingChars = 0
return true
}
for {
select {
case <-ctx.Done():
done <- ctx.Err()
return
case <-ticker.C:
if !flushPending() {
return
}
case item, ok := <-lineCh:
if !ok || item.eof {
if !flushPending() {
return
pumpErr = ctx.Err()
goto done
case line, ok := <-scanCh:
if !ok {
scanCh = nil
err := <-scanDone
if err != nil {
pumpErr = err
}
done <- item.err
return
goto done
}
line := item.line
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
currentType = result.NextType
canAccumulate := result.Parsed && !result.Stop && result.ErrorMessage == "" && !result.ContentFilter && result.ResponseMessageID == 0
if canAccumulate {
lineChars := 0
for _, p := range result.Parts {
lineChars += len(p.Text)
}
for _, p := range result.ToolDetectionThinkingParts {
lineChars += len(p.Text)
}
if lineChars > 0 {
if pending == nil {
cp := result
pending = &cp
} else {
pending.Parts = append(pending.Parts, result.Parts...)
pending.ToolDetectionThinkingParts = append(pending.ToolDetectionThinkingParts, result.ToolDetectionThinkingParts...)
pending.NextType = result.NextType
}
pendingChars += lineChars
if pendingChars < minFlushChars {
continue
}
if !flushPending() {
return
}
continue
}
if !processLine(result) {
goto done
}
if !flushPending() {
return
case err, ok := <-scanDone:
if !ok || scanCh == nil {
goto done
}
if !sendResult(result) {
return
if err != nil {
pumpErr = err
}
for line := range scanCh {
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
if !processLine(result) {
goto done
}
}
goto done
case <-maxWaitTimer.C:
maxWaitActive = false
if hasBufferedData() {
flushBuffer(true)
}
}
}
done:
stopMaxWait()
if cfg.Enabled {
flushBuffer(true)
}
if pumpErr != nil {
done <- pumpErr
} else {
done <- nil
}
}()
return out, done
}

View File

@@ -5,6 +5,7 @@ import (
"io"
"strings"
"testing"
"time"
)
func TestStartParsedLinePumpEmptyBody(t *testing.T) {
@@ -41,11 +42,17 @@ func TestStartParsedLinePumpMultipleLines(t *testing.T) {
if len(collected) < 2 {
t.Fatalf("expected at least 2 results, got %d", len(collected))
}
// First should be thinking
if collected[0].Parts[0].Type != "thinking" {
t.Fatalf("expected first part thinking, got %q", collected[0].Parts[0].Type)
hasThinking := false
for _, r := range collected {
for _, p := range r.Parts {
if p.Type == "thinking" {
hasThinking = true
}
}
}
if !hasThinking {
t.Fatal("expected thinking part in results")
}
// Last should be stop
last := collected[len(collected)-1]
if !last.Stop {
t.Fatal("expected last result to be stop")
@@ -70,15 +77,24 @@ func TestStartParsedLinePumpTypeTracking(t *testing.T) {
}
<-done
// Should have: thinking, thinking, text, text
expected := []string{"thinking", "thinking", "text", "text"}
if len(types) != len(expected) {
t.Fatalf("expected types %v, got %v", expected, types)
if len(types) == 0 {
t.Fatal("expected some parts, got none")
}
for i, want := range expected {
if types[i] != want {
t.Fatalf("type[%d] mismatch: want %q got %q (all=%v)", i, want, types[i], types)
hasThinking := false
hasText := false
for _, tp := range types {
if tp == "thinking" {
hasThinking = true
}
if tp == "text" {
hasText = true
}
}
if !hasThinking {
t.Fatalf("expected thinking type in results, got %v", types)
}
if !hasText {
t.Fatalf("expected text type in results, got %v", types)
}
}
@@ -88,29 +104,23 @@ func TestStartParsedLinePumpContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
results, done := StartParsedLinePump(ctx, pr, false, "text")
// Write one line to allow it to start
go func() {
_, _ = io.WriteString(pw, "data: {\"p\":\"response/content\",\"v\":\"hello\"}\n")
// Don't close yet - wait for context cancel
time.Sleep(50 * time.Millisecond)
_ = pw.Close()
}()
// Read first result
r := <-results
if !r.Parsed || len(r.Parts) == 0 {
t.Fatalf("expected first parsed result, got %#v", r)
}
// Cancel context - this will cause the pump to exit on next send
cancel()
// Close the pipe to unblock scanner.Scan()
_ = pw.Close()
// Drain remaining results
for range results {
}
err := <-done
// Error may be context.Canceled or nil (if pipe closed first)
if err != nil && err != context.Canceled {
t.Fatalf("expected context.Canceled or nil error, got %v", err)
}
@@ -202,13 +212,47 @@ func TestStartParsedLinePumpAccumulatesSmallChunks(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if len(collected) != 2 {
t.Fatalf("expected 2 results (accumulated content + done), got %d", len(collected))
last := collected[len(collected)-1]
if !last.Stop {
t.Fatal("expected last result to stop")
}
if len(collected[0].Parts) != 2 {
t.Fatalf("expected 2 accumulated parts, got %d", len(collected[0].Parts))
allText := strings.Builder{}
for _, r := range collected {
for _, p := range r.Parts {
allText.WriteString(p.Text)
}
}
if !collected[1].Stop {
t.Fatal("expected second result to stop")
if allText.String() != "hi" {
t.Fatalf("expected accumulated text 'hi', got %q", allText.String())
}
}
func TestStartParsedLinePumpFirstFlushImmediate(t *testing.T) {
body := strings.NewReader(
"data: {\"p\":\"response/content\",\"v\":\"Hi\"}\n" +
"data: [DONE]\n",
)
results, done := StartParsedLinePump(context.Background(), body, false, "text")
collected := make([]LineResult, 0)
for r := range results {
collected = append(collected, r)
}
if err := <-done; err != nil {
t.Fatalf("unexpected error: %v", err)
}
hasContent := false
for _, r := range collected {
for _, p := range r.Parts {
if p.Text == "Hi" {
hasContent = true
}
}
}
if !hasContent {
t.Fatal("expected 'Hi' content in results")
}
}