diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 188f28d..8b8aa9b 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -29,15 +29,29 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo eof bool } lineCh := make(chan scanItem, 1) + stopScanner := make(chan struct{}) + defer close(stopScanner) go func() { + sendScanItem := func(item scanItem) bool { + select { + case lineCh <- item: + return true + case <-ctx.Done(): + return false + case <-stopScanner: + return false + } + } + defer close(lineCh) scanner := bufio.NewScanner(body) scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) for scanner.Scan() { line := append([]byte{}, scanner.Bytes()...) - lineCh <- scanItem{line: line} + if !sendScanItem(scanItem{line: line}) { + return + } } - lineCh <- scanItem{err: scanner.Err(), eof: true} - close(lineCh) + _ = sendScanItem(scanItem{err: scanner.Err(), eof: true}) }() ticker := time.NewTicker(maxFlushWait)