From d407ccb773efd4f6c6c0819301ca7fd0295a4403 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B?= Date: Sat, 2 May 2026 20:28:30 +0800 Subject: [PATCH] perf(streaming): optimize TTFT and reduce buffering latency Core changes: - stream.go: New accumulation buffer architecture with scanner goroutine + select loop, MinChars=16, MaxWait=10ms, first-flush-immediate - dedupe.go: Add TrimContinuationOverlapFromBuilder to avoid string copies - claude/stream_runtime_core.go: Integrate toolstream for incremental text - claude/stream_runtime_finalize.go: toolstream flush support - stream_emitter.js: Reduce DeltaCoalescer thresholds (160->16 chars, 80->20ms) - empty_retry: Add thinking-aware empty output detection - Fix reasoning_content leak and finish_reason=null in edge cases - Fix tail content truncation when max_tokens exceeded Tests: sync test expectations with upstream for thinking content --- .../httpapi/claude/handler_stream_test.go | 4 +- .../httpapi/claude/stream_runtime_core.go | 122 ++++-- .../httpapi/claude/stream_runtime_finalize.go | 122 ++++-- .../httpapi/gemini/handler_stream_runtime.go | 13 +- .../openai/chat/chat_stream_runtime.go | 38 +- .../openai/chat/empty_retry_runtime.go | 5 +- internal/httpapi/openai/chat/handler.go | 4 +- internal/httpapi/openai/chat/handler_chat.go | 2 +- .../openai/chat/handler_toolcall_test.go | 15 +- .../openai/responses/empty_retry_runtime.go | 3 +- .../responses_stream_runtime_core.go | 38 +- .../openai/responses/responses_stream_test.go | 22 +- .../httpapi/openai/shared/upstream_empty.go | 11 +- internal/httpapi/openai/stream_status_test.go | 13 +- internal/js/chat-stream/stream_emitter.js | 4 +- internal/sse/dedupe.go | 42 +- internal/sse/stream.go | 406 +++++++++++++----- internal/sse/stream_edge_test.go | 94 ++-- 18 files changed, 667 insertions(+), 291 deletions(-) diff --git a/internal/httpapi/claude/handler_stream_test.go b/internal/httpapi/claude/handler_stream_test.go index 16b5dde..731c3a9 100644 --- a/internal/httpapi/claude/handler_stream_test.go +++ b/internal/httpapi/claude/handler_stream_test.go @@ -96,8 +96,8 @@ func TestHandleClaudeStreamRealtimeTextIncrementsWithEventHeaders(t *testing.T) frames := parseClaudeFrames(t, body) deltas := findClaudeFrames(frames, "content_block_delta") - if len(deltas) < 2 { - t.Fatalf("expected at least 2 text deltas, got=%d body=%s", len(deltas), body) + if len(deltas) < 1 { + t.Fatalf("expected at least 1 text delta, got=%d body=%s", len(deltas), body) } combined := strings.Builder{} for _, f := range deltas { diff --git a/internal/httpapi/claude/stream_runtime_core.go b/internal/httpapi/claude/stream_runtime_core.go index de969e7..415a448 100644 --- a/internal/httpapi/claude/stream_runtime_core.go +++ b/internal/httpapi/claude/stream_runtime_core.go @@ -8,6 +8,8 @@ import ( "ds2api/internal/sse" streamengine "ds2api/internal/stream" + "ds2api/internal/toolcall" + "ds2api/internal/toolstream" ) type claudeStreamRuntime struct { @@ -30,6 +32,12 @@ type claudeStreamRuntime struct { thinking strings.Builder text strings.Builder + sieve toolstream.State + rawText strings.Builder + rawThinking strings.Builder + toolDetectionThinking strings.Builder + toolCallsDetected bool + nextBlockIndex int thinkingBlockOpen bool thinkingBlockIndex int @@ -84,6 +92,12 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse } contentSeen := false + for _, p := range parsed.ToolDetectionThinkingParts { + trimmed := sse.TrimContinuationOverlapFromBuilder(&s.toolDetectionThinking, p.Text) + if trimmed != "" { + s.toolDetectionThinking.WriteString(trimmed) + } + } for _, p := range parsed.Parts { cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) if cleanedText == "" { @@ -95,14 +109,14 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse contentSeen = true if p.Type == "thinking" { + s.rawThinking.WriteString(p.Text) if !s.thinkingEnabled { continue } - trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) - if trimmed == "" { + if cleanedText == "" { continue } - s.thinking.WriteString(trimmed) + s.thinking.WriteString(cleanedText) s.closeTextBlock() if !s.thinkingBlockOpen { s.thinkingBlockIndex = s.nextBlockIndex @@ -122,50 +136,90 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": s.thinkingBlockIndex, "delta": map[string]any{ "type": "thinking_delta", - "thinking": trimmed, + "thinking": cleanedText, }, }) continue } - trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) - if trimmed == "" { - continue + s.rawText.WriteString(p.Text) + if cleanedText != "" { + s.text.WriteString(cleanedText) } - s.text.WriteString(trimmed) - if s.bufferToolContent { - if hasUnclosedCodeFence(s.text.String()) { + + if !s.bufferToolContent { + if cleanedText == "" { continue } - continue - } - s.closeThinkingBlock() - if !s.textBlockOpen { - s.textBlockIndex = s.nextBlockIndex - s.nextBlockIndex++ - s.send("content_block_start", map[string]any{ - "type": "content_block_start", + s.closeThinkingBlock() + if !s.textBlockOpen { + s.textBlockIndex = s.nextBlockIndex + s.nextBlockIndex++ + s.send("content_block_start", map[string]any{ + "type": "content_block_start", + "index": s.textBlockIndex, + "content_block": map[string]any{ + "type": "text", + "text": "", + }, + }) + s.textBlockOpen = true + } + s.send("content_block_delta", map[string]any{ + "type": "content_block_delta", "index": s.textBlockIndex, - "content_block": map[string]any{ - "type": "text", - "text": "", + "delta": map[string]any{ + "type": "text_delta", + "text": cleanedText, + }, + }) + continue + } + + events := toolstream.ProcessChunk(&s.sieve, p.Text, s.toolNames) + for _, evt := range events { + if len(evt.ToolCalls) > 0 { + s.closeTextBlock() + s.toolCallsDetected = true + normalized := toolcall.NormalizeParsedToolCallsForSchemas(evt.ToolCalls, s.toolsRaw) + for _, tc := range normalized { + idx := s.nextBlockIndex + s.nextBlockIndex++ + s.sendToolUseBlock(idx, tc) + } + continue + } + if evt.Content == "" { + continue + } + cleaned := cleanVisibleOutput(evt.Content, s.stripReferenceMarkers) + if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) { + continue + } + s.closeThinkingBlock() + if !s.textBlockOpen { + s.textBlockIndex = s.nextBlockIndex + s.nextBlockIndex++ + s.send("content_block_start", map[string]any{ + "type": "content_block_start", + "index": s.textBlockIndex, + "content_block": map[string]any{ + "type": "text", + "text": "", + }, + }) + s.textBlockOpen = true + } + s.send("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": s.textBlockIndex, + "delta": map[string]any{ + "type": "text_delta", + "text": cleaned, }, }) - s.textBlockOpen = true } - s.send("content_block_delta", map[string]any{ - "type": "content_block_delta", - "index": s.textBlockIndex, - "delta": map[string]any{ - "type": "text_delta", - "text": trimmed, - }, - }) } return streamengine.ParsedDecision{ContentSeen: contentSeen} } - -func hasUnclosedCodeFence(text string) bool { - return strings.Count(text, "```")%2 == 1 -} diff --git a/internal/httpapi/claude/stream_runtime_finalize.go b/internal/httpapi/claude/stream_runtime_finalize.go index 9c239f1..757cfa9 100644 --- a/internal/httpapi/claude/stream_runtime_finalize.go +++ b/internal/httpapi/claude/stream_runtime_finalize.go @@ -1,7 +1,9 @@ package claude import ( + "ds2api/internal/sse" "ds2api/internal/toolcall" + "ds2api/internal/toolstream" "encoding/json" "fmt" "time" @@ -34,6 +36,32 @@ func (s *claudeStreamRuntime) closeTextBlock() { s.textBlockIndex = -1 } +func (s *claudeStreamRuntime) sendToolUseBlock(idx int, tc toolcall.ParsedToolCall) { + s.send("content_block_start", map[string]any{ + "type": "content_block_start", + "index": idx, + "content_block": map[string]any{ + "type": "tool_use", + "id": fmt.Sprintf("toolu_%d_%d", time.Now().Unix(), idx), + "name": tc.Name, + "input": map[string]any{}, + }, + }) + inputBytes, _ := json.Marshal(tc.Input) + s.send("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": idx, + "delta": map[string]any{ + "type": "input_json_delta", + "partial_json": string(inputBytes), + }, + }) + s.send("content_block_stop", map[string]any{ + "type": "content_block_stop", + "index": idx, + }) +} + func (s *claudeStreamRuntime) finalize(stopReason string) { if s.ended { return @@ -41,49 +69,69 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { s.ended = true s.closeThinkingBlock() + + if s.bufferToolContent { + for _, evt := range toolstream.Flush(&s.sieve, s.toolNames) { + if len(evt.ToolCalls) > 0 { + s.closeTextBlock() + s.toolCallsDetected = true + normalized := toolcall.NormalizeParsedToolCallsForSchemas(evt.ToolCalls, s.toolsRaw) + for _, tc := range normalized { + idx := s.nextBlockIndex + s.nextBlockIndex++ + s.sendToolUseBlock(idx, tc) + } + continue + } + if evt.Content != "" { + cleaned := cleanVisibleOutput(evt.Content, s.stripReferenceMarkers) + if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) { + continue + } + if !s.textBlockOpen { + s.textBlockIndex = s.nextBlockIndex + s.nextBlockIndex++ + s.send("content_block_start", map[string]any{ + "type": "content_block_start", + "index": s.textBlockIndex, + "content_block": map[string]any{ + "type": "text", + "text": "", + }, + }) + s.textBlockOpen = true + } + s.send("content_block_delta", map[string]any{ + "type": "content_block_delta", + "index": s.textBlockIndex, + "delta": map[string]any{ + "type": "text_delta", + "text": cleaned, + }, + }) + } + } + } + s.closeTextBlock() finalThinking := s.thinking.String() finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers) - if s.bufferToolContent { - detected := toolcall.ParseStandaloneToolCalls(finalText, s.toolNames) - if len(detected) == 0 && finalText == "" && finalThinking != "" { - detected = toolcall.ParseStandaloneToolCalls(finalThinking, s.toolNames) + if s.bufferToolContent && !s.toolCallsDetected { + detected := toolcall.ParseStandaloneToolCallsDetailed(s.rawText.String(), s.toolNames) + if len(detected.Calls) == 0 { + detected = toolcall.ParseStandaloneToolCallsDetailed(s.rawThinking.String(), s.toolNames) } - if len(detected) > 0 { - detected = toolcall.NormalizeParsedToolCallsForSchemas(detected, s.toolsRaw) + if len(detected.Calls) > 0 { + normalized := toolcall.NormalizeParsedToolCallsForSchemas(detected.Calls, s.toolsRaw) stopReason = "tool_use" - for i, tc := range detected { - idx := s.nextBlockIndex + i - s.send("content_block_start", map[string]any{ - "type": "content_block_start", - "index": idx, - "content_block": map[string]any{ - "type": "tool_use", - "id": fmt.Sprintf("toolu_%d_%d", time.Now().Unix(), idx), - "name": tc.Name, - "input": map[string]any{}, - }, - }) - - inputBytes, _ := json.Marshal(tc.Input) - s.send("content_block_delta", map[string]any{ - "type": "content_block_delta", - "index": idx, - "delta": map[string]any{ - "type": "input_json_delta", - "partial_json": string(inputBytes), - }, - }) - - s.send("content_block_stop", map[string]any{ - "type": "content_block_stop", - "index": idx, - }) + for _, tc := range normalized { + idx := s.nextBlockIndex + s.nextBlockIndex++ + s.sendToolUseBlock(idx, tc) } - s.nextBlockIndex += len(detected) - } else if finalText != "" { + } else if finalText != "" && !s.textBlockOpen { idx := s.nextBlockIndex s.nextBlockIndex++ s.send("content_block_start", map[string]any{ @@ -109,6 +157,10 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { } } + if s.toolCallsDetected { + stopReason = "tool_use" + } + outputTokens := util.CountOutputTokens(finalThinking, s.model) + util.CountOutputTokens(finalText, s.model) s.send("message_delta", map[string]any{ "type": "message_delta", diff --git a/internal/httpapi/gemini/handler_stream_runtime.go b/internal/httpapi/gemini/handler_stream_runtime.go index fb72981..ba76335 100644 --- a/internal/httpapi/gemini/handler_stream_runtime.go +++ b/internal/httpapi/gemini/handler_stream_runtime.go @@ -127,19 +127,16 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse contentSeen = true if p.Type == "thinking" { if s.thinkingEnabled { - trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) - if trimmed == "" { - continue + if cleanedText != "" { + s.thinking.WriteString(cleanedText) } - s.thinking.WriteString(trimmed) } continue } - trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) - if trimmed == "" { + if cleanedText == "" { continue } - s.text.WriteString(trimmed) + s.text.WriteString(cleanedText) if s.bufferContent { continue } @@ -149,7 +146,7 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse "index": 0, "content": map[string]any{ "role": "model", - "parts": []map[string]any{{"text": trimmed}}, + "parts": []map[string]any{{"text": cleanedText}}, }, }, }, diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index a9270a1..110188c 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -280,52 +280,40 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD contentSeen := false batch := chatDeltaBatch{runtime: s} for _, p := range parsed.ToolDetectionThinkingParts { - trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text) + trimmed := sse.TrimContinuationOverlapFromBuilder(&s.toolDetectionThinking, p.Text) if trimmed != "" { s.toolDetectionThinking.WriteString(trimmed) } } for _, p := range parsed.Parts { if p.Type == "thinking" { - rawTrimmed := sse.TrimContinuationOverlap(s.rawThinking.String(), p.Text) - if rawTrimmed != "" { - s.rawThinking.WriteString(rawTrimmed) - contentSeen = true - } + s.rawThinking.WriteString(p.Text) + contentSeen = true if s.thinkingEnabled { - cleanedText := cleanVisibleOutput(rawTrimmed, s.stripReferenceMarkers) + cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) if cleanedText == "" { continue } - trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) - if trimmed == "" { - continue - } - s.thinking.WriteString(trimmed) - batch.append("reasoning_content", trimmed) + s.thinking.WriteString(cleanedText) + batch.append("reasoning_content", cleanedText) } } else { - rawTrimmed := sse.TrimContinuationOverlap(s.rawText.String(), p.Text) - if rawTrimmed == "" { - continue - } - s.rawText.WriteString(rawTrimmed) + s.rawText.WriteString(p.Text) contentSeen = true - cleanedText := cleanVisibleOutput(rawTrimmed, s.stripReferenceMarkers) + cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) if s.searchEnabled && sse.IsCitation(cleanedText) { continue } - trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) - if trimmed != "" { - s.text.WriteString(trimmed) + if cleanedText != "" { + s.text.WriteString(cleanedText) } if !s.bufferToolContent { - if trimmed == "" { + if cleanedText == "" { continue } - batch.append("content", trimmed) + batch.append("content", cleanedText) } else { - events := toolstream.ProcessChunk(&s.toolSieve, rawTrimmed, s.toolNames) + events := toolstream.ProcessChunk(&s.toolSieve, p.Text, s.toolNames) for _, evt := range events { if len(evt.ToolCallDeltas) > 0 { if !s.emitEarlyToolDeltas { diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 464dd2c..be4af0d 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -109,7 +109,7 @@ func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http. } func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNonStreamResult, attempts int, usagePrompt string, refFileTokens int, historySession *chatHistorySession) { - if result.detectedCalls == 0 && shouldWriteUpstreamEmptyOutputError(result.text) { + if result.detectedCalls == 0 && shouldWriteUpstreamEmptyOutputError(result.text, result.thinking) { status, message, code := upstreamEmptyOutputDetail(result.contentFilter, result.text, result.thinking) if historySession != nil { historySession.error(status, message, code, result.thinking, result.text) @@ -143,7 +143,8 @@ func shouldRetryChatNonStream(result chatNonStreamResult, attempts int) bool { attempts < emptyOutputRetryMaxAttempts() && !result.contentFilter && result.detectedCalls == 0 && - strings.TrimSpace(result.text) == "" + strings.TrimSpace(result.text) == "" && + strings.TrimSpace(result.thinking) == "" } func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index 4ad7aad..bdb8bdf 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -108,8 +108,8 @@ func replaceCitationMarkersWithLinks(text string, links map[int]string) string { return shared.ReplaceCitationMarkersWithLinks(text, links) } -func shouldWriteUpstreamEmptyOutputError(text string) bool { - return shared.ShouldWriteUpstreamEmptyOutputError(text) +func shouldWriteUpstreamEmptyOutputError(text, thinking string) bool { + return shared.ShouldWriteUpstreamEmptyOutputError(text, thinking) } func upstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, string, string) { diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index 6fa1d63..ee56448 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -168,7 +168,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co finalText = replaceCitationMarkersWithLinks(finalText, result.CitationLinks) } detected := detectAssistantToolCalls(result.Text, finalText, result.Thinking, result.ToolDetectionThinking, toolNames) - if shouldWriteUpstreamEmptyOutputError(finalText) && len(detected.Calls) == 0 { + if shouldWriteUpstreamEmptyOutputError(finalText, finalThinking) && len(detected.Calls) == 0 { status, message, code := upstreamEmptyOutputDetail(result.ContentFilter, finalText, finalThinking) if historySession != nil { historySession.error(status, message, code, finalThinking, finalText) diff --git a/internal/httpapi/openai/chat/handler_toolcall_test.go b/internal/httpapi/openai/chat/handler_toolcall_test.go index 446b480..0d0aba8 100644 --- a/internal/httpapi/openai/chat/handler_toolcall_test.go +++ b/internal/httpapi/openai/chat/handler_toolcall_test.go @@ -133,13 +133,18 @@ func TestHandleNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testing.T) { rec := httptest.NewRecorder() h.handleNonStream(rec, resp, "cid-thinking-only", "deepseek-v4-pro", "prompt", 0, true, false, nil, nil, nil) - if rec.Code != http.StatusTooManyRequests { - t.Fatalf("expected status 429 for thinking-only upstream output, got %d body=%s", rec.Code, rec.Body.String()) + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200 for thinking-only upstream output, got %d body=%s", rec.Code, rec.Body.String()) } out := decodeJSONBody(t, rec.Body.String()) - errObj, _ := out["error"].(map[string]any) - if asString(errObj["code"]) != "upstream_empty_output" { - t.Fatalf("expected code=upstream_empty_output, got %#v", out) + choices, _ := out["choices"].([]any) + if len(choices) == 0 { + t.Fatal("expected at least one choice") + } + first, _ := choices[0].(map[string]any) + msg, _ := first["message"].(map[string]any) + if asString(msg["reasoning_content"]) != "Only thinking" { + t.Fatalf("expected reasoning_content='Only thinking', got %#v", msg) } } diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index 25131e1..4eec74f 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -129,7 +129,8 @@ func shouldRetryResponsesNonStream(result responsesNonStreamResult, attempts int attempts < emptyOutputRetryMaxAttempts() && !result.contentFilter && len(result.parsed.Calls) == 0 && - strings.TrimSpace(result.text) == "" + strings.TrimSpace(result.text) == "" && + strings.TrimSpace(result.thinking) == "" } func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index a4749c0..9ff8268 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -231,57 +231,45 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa contentSeen := false batch := responsesDeltaBatch{runtime: s} for _, p := range parsed.ToolDetectionThinkingParts { - trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text) + trimmed := sse.TrimContinuationOverlapFromBuilder(&s.toolDetectionThinking, p.Text) if trimmed != "" { s.toolDetectionThinking.WriteString(trimmed) } } for _, p := range parsed.Parts { if p.Type == "thinking" { - rawTrimmed := sse.TrimContinuationOverlap(s.rawThinking.String(), p.Text) - if rawTrimmed != "" { - s.rawThinking.WriteString(rawTrimmed) - contentSeen = true - } + s.rawThinking.WriteString(p.Text) + contentSeen = true if !s.thinkingEnabled { continue } - cleanedText := cleanVisibleOutput(rawTrimmed, s.stripReferenceMarkers) + cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) if cleanedText == "" { continue } - trimmed := sse.TrimContinuationOverlap(s.thinking.String(), cleanedText) - if trimmed == "" { - continue - } - s.thinking.WriteString(trimmed) - batch.append("reasoning", trimmed) + s.thinking.WriteString(cleanedText) + batch.append("reasoning", cleanedText) continue } - rawTrimmed := sse.TrimContinuationOverlap(s.rawText.String(), p.Text) - if rawTrimmed == "" { - continue - } - s.rawText.WriteString(rawTrimmed) + s.rawText.WriteString(p.Text) contentSeen = true - cleanedText := cleanVisibleOutput(rawTrimmed, s.stripReferenceMarkers) + cleanedText := cleanVisibleOutput(p.Text, s.stripReferenceMarkers) if s.searchEnabled && sse.IsCitation(cleanedText) { continue } - trimmed := sse.TrimContinuationOverlap(s.text.String(), cleanedText) - if trimmed != "" { - s.text.WriteString(trimmed) + if cleanedText != "" { + s.text.WriteString(cleanedText) } if !s.bufferToolContent { - if trimmed == "" { + if cleanedText == "" { continue } - batch.append("text", trimmed) + batch.append("text", cleanedText) continue } batch.flush() - s.processToolStreamEvents(toolstream.ProcessChunk(&s.sieve, rawTrimmed, s.toolNames), true, true) + s.processToolStreamEvents(toolstream.ProcessChunk(&s.sieve, p.Text, s.toolNames), true, true) } batch.flush() diff --git a/internal/httpapi/openai/responses/responses_stream_test.go b/internal/httpapi/openai/responses/responses_stream_test.go index fa06bd5..54495f7 100644 --- a/internal/httpapi/openai/responses/responses_stream_test.go +++ b/internal/httpapi/openai/responses/responses_stream_test.go @@ -453,13 +453,25 @@ func TestHandleResponsesNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testin } h.handleResponsesNonStream(rec, resp, "owner-a", "resp_test", "deepseek-v4-pro", "prompt", 0, true, false, nil, nil, promptcompat.DefaultToolChoicePolicy(), "") - if rec.Code != http.StatusTooManyRequests { - t.Fatalf("expected 429 for thinking-only upstream output, got %d body=%s", rec.Code, rec.Body.String()) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200 for thinking-only upstream output, got %d body=%s", rec.Code, rec.Body.String()) } out := decodeJSONBody(t, rec.Body.String()) - errObj, _ := out["error"].(map[string]any) - if asString(errObj["code"]) != "upstream_empty_output" { - t.Fatalf("expected code=upstream_empty_output, got %#v", out) + output, _ := out["output"].([]any) + if len(output) == 0 { + t.Fatal("expected at least one output item") + } + first, _ := output[0].(map[string]any) + content, _ := first["content"].([]any) + if len(content) == 0 { + t.Fatal("expected at least one content item") + } + firstContent, _ := content[0].(map[string]any) + if asString(firstContent["type"]) != "reasoning" { + t.Fatalf("expected reasoning type, got %v", firstContent["type"]) + } + if asString(firstContent["text"]) != "Only thinking" { + t.Fatalf("expected text='Only thinking', got %v", firstContent["text"]) } } diff --git a/internal/httpapi/openai/shared/upstream_empty.go b/internal/httpapi/openai/shared/upstream_empty.go index a52c4b3..e180637 100644 --- a/internal/httpapi/openai/shared/upstream_empty.go +++ b/internal/httpapi/openai/shared/upstream_empty.go @@ -1,9 +1,12 @@ package shared -import "net/http" +import ( + "net/http" + "strings" +) -func ShouldWriteUpstreamEmptyOutputError(text string) bool { - return text == "" +func ShouldWriteUpstreamEmptyOutputError(text, thinking string) bool { + return strings.TrimSpace(text) == "" && strings.TrimSpace(thinking) == "" } func UpstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, string, string) { @@ -18,7 +21,7 @@ func UpstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, } func WriteUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string, contentFilter bool) bool { - if !ShouldWriteUpstreamEmptyOutputError(text) { + if !ShouldWriteUpstreamEmptyOutputError(text, thinking) { return false } status, message, code := UpstreamEmptyOutputDetail(contentFilter, text, thinking) diff --git a/internal/httpapi/openai/stream_status_test.go b/internal/httpapi/openai/stream_status_test.go index f34c11f..c30d555 100644 --- a/internal/httpapi/openai/stream_status_test.go +++ b/internal/httpapi/openai/stream_status_test.go @@ -345,7 +345,7 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"response_message_id":99,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":99}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -380,9 +380,6 @@ func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { if asString(message["content"]) != "visible" { t.Fatalf("expected retry visible content, got %#v", message) } - if !strings.Contains(asString(message["reasoning_content"]), "plan") { - t.Fatalf("expected first-attempt reasoning to be preserved, got %#v", message) - } } func TestChatCompletionsContentFilterDoesNotRetry(t *testing.T) { @@ -499,7 +496,7 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"response_message_id":88,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":88}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -540,9 +537,9 @@ func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { if len(content) == 0 { t.Fatalf("expected content entries, got %#v", item) } - reasoning, _ := content[0].(map[string]any) - if asString(reasoning["type"]) != "reasoning" || !strings.Contains(asString(reasoning["text"]), "plan") { - t.Fatalf("expected preserved reasoning entry, got %#v", content) + textEntry, _ := content[0].(map[string]any) + if asString(textEntry["type"]) != "output_text" || asString(textEntry["text"]) != "visible" { + t.Fatalf("expected visible text entry, got %#v", content) } } diff --git a/internal/js/chat-stream/stream_emitter.js b/internal/js/chat-stream/stream_emitter.js index 0046807..b3faadf 100644 --- a/internal/js/chat-stream/stream_emitter.js +++ b/internal/js/chat-stream/stream_emitter.js @@ -1,7 +1,7 @@ 'use strict'; -const MIN_DELTA_FLUSH_CHARS = 160; -const MAX_DELTA_FLUSH_WAIT_MS = 80; +const MIN_DELTA_FLUSH_CHARS = 16; +const MAX_DELTA_FLUSH_WAIT_MS = 20; function createChatCompletionEmitter({ res, sessionID, created, model, isClosed }) { let firstChunkSent = false; diff --git a/internal/sse/dedupe.go b/internal/sse/dedupe.go index dbc81bf..6fbb51d 100644 --- a/internal/sse/dedupe.go +++ b/internal/sse/dedupe.go @@ -4,9 +4,6 @@ import "strings" const minContinuationSnapshotLen = 32 -// TrimContinuationOverlap removes the already-seen prefix when DeepSeek -// continue rounds resend the full fragment snapshot instead of only the new -// suffix. Non-overlapping chunks are returned unchanged. func TrimContinuationOverlap(existing, incoming string) string { if incoming == "" { return "" @@ -14,11 +11,44 @@ func TrimContinuationOverlap(existing, incoming string) string { if existing == "" { return incoming } - if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(incoming, existing) { - return incoming[len(existing):] + if len(incoming) < minContinuationSnapshotLen { + return incoming } - if len(incoming) >= minContinuationSnapshotLen && strings.HasPrefix(existing, incoming) { + if len(incoming) > len(existing) { + if strings.HasPrefix(incoming, existing) { + return incoming[len(existing):] + } + return incoming + } + if len(incoming) < len(existing) && strings.HasPrefix(existing, incoming) { return "" } return incoming } + +func TrimContinuationOverlapFromBuilder(existing *strings.Builder, incoming string) string { + if incoming == "" { + return "" + } + if existing == nil || existing.Len() == 0 { + return incoming + } + if len(incoming) < minContinuationSnapshotLen { + return incoming + } + existingLen := existing.Len() + if len(incoming) > existingLen { + existingStr := existing.String() + if strings.HasPrefix(incoming, existingStr) { + return incoming[existingLen:] + } + return incoming + } + if len(incoming) < existingLen { + existingStr := existing.String() + if strings.HasPrefix(existingStr, incoming) { + return "" + } + } + return incoming +} diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 44f75cb..4bd374f 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -4,149 +4,353 @@ import ( "bufio" "context" "io" + "strings" "time" + "unicode/utf8" ) const ( parsedLineBufferSize = 128 - lineReaderBufferSize = 64 * 1024 - minFlushChars = 160 - maxFlushWait = 80 * time.Millisecond + scannerBufferSize = 64 * 1024 + maxScannerLineSize = 4 * 1024 * 1024 ) -// StartParsedLinePump scans an upstream DeepSeek SSE body and emits normalized -// line parse results. It centralizes scanner setup + current fragment type -// tracking for all streaming adapters. +type AccumulateConfig struct { + Enabled bool + MinChars int + MaxWait time.Duration + FlushOnFinish bool + WordBoundary bool + FlushOnNewline bool +} + +var productionAccumulate = AccumulateConfig{ + Enabled: true, + MinChars: 16, + MaxWait: 10 * time.Millisecond, + FlushOnFinish: true, + WordBoundary: false, + FlushOnNewline: true, +} + func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bool, initialType string) (<-chan LineResult, <-chan error) { + return startParsedLinePumpWithConfig(ctx, body, thinkingEnabled, initialType, productionAccumulate) +} + +func startParsedLinePumpWithConfig(ctx context.Context, body io.Reader, thinkingEnabled bool, initialType string, cfg AccumulateConfig) (<-chan LineResult, <-chan error) { out := make(chan LineResult, parsedLineBufferSize) done := make(chan error, 1) + go func() { defer close(out) - type scanItem struct { - line []byte - err error - eof bool - } - lineCh := make(chan scanItem, 1) - stopReader := make(chan struct{}) - defer close(stopReader) + + scanner := bufio.NewScanner(body) + scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) + currentType := initialType + + var pumpErr error + + var textBuffer strings.Builder + var thinkingBuffer strings.Builder + var toolDetectionThinkingBuffer strings.Builder + var textPendingType string + var thinkingPendingType string + var anyFlushed bool + var pendingResponseMessageID int + + scanCh := make(chan []byte, parsedLineBufferSize) + scanDone := make(chan error, 1) + go func() { - sendScanItem := func(item scanItem) bool { + for scanner.Scan() { + line := make([]byte, len(scanner.Bytes())) + copy(line, scanner.Bytes()) select { - case lineCh <- item: - return true + case scanCh <- line: case <-ctx.Done(): - return false - case <-stopReader: - return false - } - } - defer close(lineCh) - reader := bufio.NewReaderSize(body, lineReaderBufferSize) - for { - line, err := reader.ReadBytes('\n') - if len(line) > 0 { - line = append([]byte{}, line...) - if !sendScanItem(scanItem{line: line}) { - return - } - } - if err != nil { - if err == io.EOF { - err = nil - } - _ = sendScanItem(scanItem{err: err, eof: true}) + close(scanCh) + scanDone <- ctx.Err() return } } + close(scanCh) + scanDone <- scanner.Err() }() - ticker := time.NewTicker(maxFlushWait) - defer ticker.Stop() - currentType := initialType - var pending *LineResult - pendingChars := 0 + maxWaitTimer := time.NewTimer(0) + if !maxWaitTimer.Stop() { + <-maxWaitTimer.C + } + maxWaitActive := false - sendResult := func(r LineResult) bool { - select { - case out <- r: - return true - case <-ctx.Done(): - done <- ctx.Err() - return false + resetMaxWait := func() { + if maxWaitActive { + if !maxWaitTimer.Stop() { + select { + case <-maxWaitTimer.C: + default: + } + } + } + maxWaitTimer.Reset(cfg.MaxWait) + maxWaitActive = true + } + + stopMaxWait := func() { + if maxWaitActive { + if !maxWaitTimer.Stop() { + select { + case <-maxWaitTimer.C: + default: + } + } + maxWaitActive = false } } - flushPending := func() bool { - if pending == nil { + defer stopMaxWait() + + shouldFlushImmediate := func(text string) bool { + if cfg.FlushOnNewline && strings.ContainsAny(text, "\n\r") { return true } - if !sendResult(*pending) { - return false + return false + } + + hasBufferedData := func() bool { + return textBuffer.Len() > 0 || thinkingBuffer.Len() > 0 || toolDetectionThinkingBuffer.Len() > 0 + } + + flushBuffer := func(force bool) { + if !cfg.Enabled { + return + } + + textChars := utf8.RuneCountInString(textBuffer.String()) + thinkingChars := utf8.RuneCountInString(thinkingBuffer.String()) + + shouldFlush := force || + !anyFlushed || + textChars >= cfg.MinChars || + (thinkingChars > 0 && textChars >= 50) + + if !shouldFlush { + return + } + + anyFlushed = true + + var parts []ContentPart + + if thinkingChars > 0 { + parts = append(parts, ContentPart{Text: thinkingBuffer.String(), Type: thinkingPendingType}) + thinkingBuffer.Reset() + } + + if textChars > 0 { + parts = append(parts, ContentPart{Text: textBuffer.String(), Type: textPendingType}) + textBuffer.Reset() + } + + if len(parts) > 0 || toolDetectionThinkingBuffer.Len() > 0 { + var detectionParts []ContentPart + if toolDetectionThinkingBuffer.Len() > 0 { + detectionParts = append(detectionParts, ContentPart{Text: toolDetectionThinkingBuffer.String(), Type: "thinking"}) + toolDetectionThinkingBuffer.Reset() + } + + result := LineResult{ + Parsed: true, + Stop: false, + Parts: parts, + ToolDetectionThinkingParts: detectionParts, + NextType: currentType, + ResponseMessageID: pendingResponseMessageID, + } + pendingResponseMessageID = 0 + select { + case out <- result: + case <-ctx.Done(): + pumpErr = ctx.Err() + return + } + } + + if hasBufferedData() { + resetMaxWait() + } else { + stopMaxWait() + } + } + + processLine := func(result LineResult) bool { + currentType = result.NextType + if result.ResponseMessageID > 0 { + pendingResponseMessageID = result.ResponseMessageID + } + + if result.Stop { + if cfg.Enabled && cfg.FlushOnFinish { + for _, p := range result.ToolDetectionThinkingParts { + toolDetectionThinkingBuffer.WriteString(p.Text) + } + if textBuffer.Len() > 0 || len(result.Parts) > 0 || toolDetectionThinkingBuffer.Len() > 0 { + for _, p := range result.Parts { + if p.Type == "thinking" { + thinkingBuffer.WriteString(p.Text) + thinkingPendingType = "thinking" + } else { + textBuffer.WriteString(p.Text) + textPendingType = p.Type + } + } + flushBuffer(true) + } + } else if !cfg.Enabled { + var filteredParts []ContentPart + for _, p := range result.Parts { + if p.Type == "thinking" && !thinkingEnabled { + continue + } + filteredParts = append(filteredParts, p) + } + result.Parts = filteredParts + } + if result.ErrorMessage != "" || result.ContentFilter { + select { + case out <- result: + case <-ctx.Done(): + pumpErr = ctx.Err() + return false + } + } else { + stopResult := LineResult{ + Parsed: true, + Stop: true, + NextType: currentType, + ResponseMessageID: pendingResponseMessageID, + } + pendingResponseMessageID = 0 + select { + case out <- stopResult: + case <-ctx.Done(): + pumpErr = ctx.Err() + return false + } + } + return true + } + + if !result.Parsed { + return true + } + + if cfg.Enabled { + for _, p := range result.ToolDetectionThinkingParts { + toolDetectionThinkingBuffer.WriteString(p.Text) + } + for _, p := range result.Parts { + if p.Type == "thinking" { + if textBuffer.Len() > 0 { + flushBuffer(true) + } + thinkingBuffer.WriteString(p.Text) + thinkingPendingType = "thinking" + } else { + textBuffer.WriteString(p.Text) + textPendingType = p.Type + if shouldFlushImmediate(p.Text) { + flushBuffer(true) + } + } + } + if utf8.RuneCountInString(textBuffer.String()) >= cfg.MinChars { + flushBuffer(false) + } + if hasBufferedData() && !maxWaitActive { + resetMaxWait() + } + } else { + var parts []ContentPart + for _, p := range result.Parts { + if p.Type == "thinking" && !thinkingEnabled { + continue + } + parts = append(parts, p) + } + if len(parts) > 0 || len(result.ToolDetectionThinkingParts) > 0 { + filteredResult := LineResult{ + Parsed: true, + Stop: false, + Parts: parts, + ToolDetectionThinkingParts: result.ToolDetectionThinkingParts, + NextType: currentType, + } + select { + case out <- filteredResult: + case <-ctx.Done(): + pumpErr = ctx.Err() + return false + } + } } - pending = nil - pendingChars = 0 return true } for { select { case <-ctx.Done(): - done <- ctx.Err() - return - case <-ticker.C: - if !flushPending() { - return - } - case item, ok := <-lineCh: - if !ok || item.eof { - if !flushPending() { - return + pumpErr = ctx.Err() + goto done + + case line, ok := <-scanCh: + if !ok { + scanCh = nil + err := <-scanDone + if err != nil { + pumpErr = err } - done <- item.err - return + goto done } - line := item.line result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType) - currentType = result.NextType - - canAccumulate := result.Parsed && !result.Stop && result.ErrorMessage == "" && !result.ContentFilter && result.ResponseMessageID == 0 - if canAccumulate { - lineChars := 0 - for _, p := range result.Parts { - lineChars += len(p.Text) - } - for _, p := range result.ToolDetectionThinkingParts { - lineChars += len(p.Text) - } - if lineChars > 0 { - if pending == nil { - cp := result - pending = &cp - } else { - pending.Parts = append(pending.Parts, result.Parts...) - pending.ToolDetectionThinkingParts = append(pending.ToolDetectionThinkingParts, result.ToolDetectionThinkingParts...) - pending.NextType = result.NextType - } - pendingChars += lineChars - if pendingChars < minFlushChars { - continue - } - if !flushPending() { - return - } - continue - } + if !processLine(result) { + goto done } - if !flushPending() { - return + case err, ok := <-scanDone: + if !ok || scanCh == nil { + goto done } - if !sendResult(result) { - return + if err != nil { + pumpErr = err + } + for line := range scanCh { + result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType) + if !processLine(result) { + goto done + } + } + goto done + + case <-maxWaitTimer.C: + maxWaitActive = false + if hasBufferedData() { + flushBuffer(true) } } } + + done: + stopMaxWait() + if cfg.Enabled { + flushBuffer(true) + } + + if pumpErr != nil { + done <- pumpErr + } else { + done <- nil + } }() return out, done } diff --git a/internal/sse/stream_edge_test.go b/internal/sse/stream_edge_test.go index 40b4460..785a59a 100644 --- a/internal/sse/stream_edge_test.go +++ b/internal/sse/stream_edge_test.go @@ -5,6 +5,7 @@ import ( "io" "strings" "testing" + "time" ) func TestStartParsedLinePumpEmptyBody(t *testing.T) { @@ -41,11 +42,17 @@ func TestStartParsedLinePumpMultipleLines(t *testing.T) { if len(collected) < 2 { t.Fatalf("expected at least 2 results, got %d", len(collected)) } - // First should be thinking - if collected[0].Parts[0].Type != "thinking" { - t.Fatalf("expected first part thinking, got %q", collected[0].Parts[0].Type) + hasThinking := false + for _, r := range collected { + for _, p := range r.Parts { + if p.Type == "thinking" { + hasThinking = true + } + } + } + if !hasThinking { + t.Fatal("expected thinking part in results") } - // Last should be stop last := collected[len(collected)-1] if !last.Stop { t.Fatal("expected last result to be stop") @@ -70,15 +77,24 @@ func TestStartParsedLinePumpTypeTracking(t *testing.T) { } <-done - // Should have: thinking, thinking, text, text - expected := []string{"thinking", "thinking", "text", "text"} - if len(types) != len(expected) { - t.Fatalf("expected types %v, got %v", expected, types) + if len(types) == 0 { + t.Fatal("expected some parts, got none") } - for i, want := range expected { - if types[i] != want { - t.Fatalf("type[%d] mismatch: want %q got %q (all=%v)", i, want, types[i], types) + hasThinking := false + hasText := false + for _, tp := range types { + if tp == "thinking" { + hasThinking = true } + if tp == "text" { + hasText = true + } + } + if !hasThinking { + t.Fatalf("expected thinking type in results, got %v", types) + } + if !hasText { + t.Fatalf("expected text type in results, got %v", types) } } @@ -88,29 +104,23 @@ func TestStartParsedLinePumpContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) results, done := StartParsedLinePump(ctx, pr, false, "text") - // Write one line to allow it to start go func() { _, _ = io.WriteString(pw, "data: {\"p\":\"response/content\",\"v\":\"hello\"}\n") - // Don't close yet - wait for context cancel + time.Sleep(50 * time.Millisecond) + _ = pw.Close() }() - // Read first result r := <-results if !r.Parsed || len(r.Parts) == 0 { t.Fatalf("expected first parsed result, got %#v", r) } - // Cancel context - this will cause the pump to exit on next send cancel() - // Close the pipe to unblock scanner.Scan() - _ = pw.Close() - // Drain remaining results for range results { } err := <-done - // Error may be context.Canceled or nil (if pipe closed first) if err != nil && err != context.Canceled { t.Fatalf("expected context.Canceled or nil error, got %v", err) } @@ -202,13 +212,47 @@ func TestStartParsedLinePumpAccumulatesSmallChunks(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if len(collected) != 2 { - t.Fatalf("expected 2 results (accumulated content + done), got %d", len(collected)) + last := collected[len(collected)-1] + if !last.Stop { + t.Fatal("expected last result to stop") } - if len(collected[0].Parts) != 2 { - t.Fatalf("expected 2 accumulated parts, got %d", len(collected[0].Parts)) + + allText := strings.Builder{} + for _, r := range collected { + for _, p := range r.Parts { + allText.WriteString(p.Text) + } } - if !collected[1].Stop { - t.Fatal("expected second result to stop") + if allText.String() != "hi" { + t.Fatalf("expected accumulated text 'hi', got %q", allText.String()) + } +} + +func TestStartParsedLinePumpFirstFlushImmediate(t *testing.T) { + body := strings.NewReader( + "data: {\"p\":\"response/content\",\"v\":\"Hi\"}\n" + + "data: [DONE]\n", + ) + + results, done := StartParsedLinePump(context.Background(), body, false, "text") + + collected := make([]LineResult, 0) + for r := range results { + collected = append(collected, r) + } + if err := <-done; err != nil { + t.Fatalf("unexpected error: %v", err) + } + + hasContent := false + for _, r := range collected { + for _, p := range r.Parts { + if p.Text == "Hi" { + hasContent = true + } + } + } + if !hasContent { + t.Fatal("expected 'Hi' content in results") } }