Files
ds2api/internal/sse/stream.go

41 lines
1.1 KiB
Go

package sse
import (
"bufio"
"context"
"io"
)
const (
parsedLineBufferSize = 128
scannerBufferSize = 64 * 1024
maxScannerLineSize = 2 * 1024 * 1024
)
// StartParsedLinePump scans an upstream DeepSeek SSE body and emits normalized
// line parse results. It centralizes scanner setup + current fragment type
// tracking for all streaming adapters.
func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bool, initialType string) (<-chan LineResult, <-chan error) {
out := make(chan LineResult, parsedLineBufferSize)
done := make(chan error, 1)
go func() {
defer close(out)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
currentType := initialType
for scanner.Scan() {
line := append([]byte{}, scanner.Bytes()...)
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
currentType = result.NextType
select {
case out <- result:
case <-ctx.Done():
done <- ctx.Err()
return
}
}
done <- scanner.Err()
}()
return out, done
}