fix: coalesce small stream deltas to prevent character swallowing; add read-tool cache guard

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
CJACK
2026-05-01 13:53:27 +08:00
parent 92e321fe2c
commit 2671298439
11 changed files with 349 additions and 54 deletions

View File

@@ -53,6 +53,32 @@ type chatStreamRuntime struct {
finalErrorCode string
}
type chatDeltaBatch struct {
runtime *chatStreamRuntime
field string
text strings.Builder
}
func (b *chatDeltaBatch) append(field, text string) {
if text == "" {
return
}
if b.field != "" && b.field != field {
b.flush()
}
b.field = field
b.text.WriteString(text)
}
func (b *chatDeltaBatch) flush() {
if b.field == "" || b.text.Len() == 0 {
return
}
b.runtime.sendDelta(map[string]any{b.field: b.text.String()})
b.field = ""
b.text.Reset()
}
func newChatStreamRuntime(
w http.ResponseWriter,
rc *http.ResponseController,
@@ -164,42 +190,22 @@ func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool)
detected := detectAssistantToolCalls(s.rawText.String(), finalText, s.rawThinking.String(), finalToolDetectionThinking, s.toolNames)
if len(detected.Calls) > 0 && !s.toolCallsDoneEmitted {
finishReason = "tool_calls"
delta := map[string]any{
s.sendDelta(map[string]any{
"tool_calls": formatFinalStreamToolCallsWithStableIDs(detected.Calls, s.streamToolCallIDs, s.toolsRaw),
}
if !s.firstChunkSent {
delta["role"] = "assistant"
s.firstChunkSent = true
}
s.sendChunk(openaifmt.BuildChatStreamChunk(
s.completionID,
s.created,
s.model,
[]map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)},
nil,
))
})
s.toolCallsEmitted = true
s.toolCallsDoneEmitted = true
} else if s.bufferToolContent {
batch := chatDeltaBatch{runtime: s}
for _, evt := range toolstream.Flush(&s.toolSieve, s.toolNames) {
if len(evt.ToolCalls) > 0 {
batch.flush()
finishReason = "tool_calls"
s.toolCallsEmitted = true
s.toolCallsDoneEmitted = true
tcDelta := map[string]any{
s.sendDelta(map[string]any{
"tool_calls": formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs, s.toolsRaw),
}
if !s.firstChunkSent {
tcDelta["role"] = "assistant"
s.firstChunkSent = true
}
s.sendChunk(openaifmt.BuildChatStreamChunk(
s.completionID,
s.created,
s.model,
[]map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, tcDelta)},
nil,
))
})
s.resetStreamToolCallState()
}
if evt.Content == "" {
@@ -209,21 +215,9 @@ func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool)
if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) {
continue
}
delta := map[string]any{
"content": cleaned,
}
if !s.firstChunkSent {
delta["role"] = "assistant"
s.firstChunkSent = true
}
s.sendChunk(openaifmt.BuildChatStreamChunk(
s.completionID,
s.created,
s.model,
[]map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)},
nil,
))
batch.append("content", cleaned)
}
batch.flush()
}
if len(detected.Calls) > 0 || s.toolCallsEmitted {
@@ -275,6 +269,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
}
contentSeen := false
batch := chatDeltaBatch{runtime: s}
for _, p := range parsed.ToolDetectionThinkingParts {
trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text)
if trimmed != "" {
@@ -298,7 +293,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
continue
}
s.thinking.WriteString(trimmed)
s.sendDelta(map[string]any{"reasoning_content": trimmed})
batch.append("reasoning_content", trimmed)
}
} else {
rawTrimmed := sse.TrimContinuationOverlap(s.rawText.String(), p.Text)
@@ -319,7 +314,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
if trimmed == "" {
continue
}
s.sendDelta(map[string]any{"content": trimmed})
batch.append("content", trimmed)
} else {
events := toolstream.ProcessChunk(&s.toolSieve, rawTrimmed, s.toolNames)
for _, evt := range events {
@@ -335,6 +330,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
if len(formatted) == 0 {
continue
}
batch.flush()
tcDelta := map[string]any{
"tool_calls": formatted,
}
@@ -343,6 +339,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
continue
}
if len(evt.ToolCalls) > 0 {
batch.flush()
s.toolCallsEmitted = true
s.toolCallsDoneEmitted = true
tcDelta := map[string]any{
@@ -357,14 +354,12 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) {
continue
}
contentDelta := map[string]any{
"content": cleaned,
}
s.sendDelta(contentDelta)
batch.append("content", cleaned)
}
}
}
}
}
batch.flush()
return streamengine.ParsedDecision{ContentSeen: contentSeen}
}

View File

@@ -309,6 +309,49 @@ func TestHandleStreamEmitsSingleChoiceFramesForMultipleParsedParts(t *testing.T)
}
}
func TestHandleStreamCoalescesSmallContentDeltas(t *testing.T) {
h := &Handler{}
lines := make([]string, 0, 101)
for i := 0; i < 100; i++ {
b, _ := json.Marshal(map[string]any{
"p": "response/content",
"v": "字",
})
lines = append(lines, "data: "+string(b))
}
lines = append(lines, "data: [DONE]")
resp := makeSSEHTTPResponse(lines...)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid-coalesce", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, nil)
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
var content strings.Builder
contentDeltaFrames := 0
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
if len(choices) != 1 {
t.Fatalf("expected exactly one choice per stream frame, got %d frame=%#v body=%s", len(choices), frame, rec.Body.String())
}
choice, _ := choices[0].(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if c, ok := delta["content"].(string); ok {
contentDeltaFrames++
content.WriteString(c)
}
}
if got, want := content.String(), strings.Repeat("字", 100); got != want {
t.Fatalf("coalesced stream content mismatch: got %q want %q body=%s", got, want, rec.Body.String())
}
if contentDeltaFrames >= 100 {
t.Fatalf("expected coalescing to reduce 100 tiny content frames, got %d body=%s", contentDeltaFrames, rec.Body.String())
}
}
func TestHandleStreamIncompleteCapturedToolJSONFlushesAsTextOnFinalize(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(

View File

@@ -0,0 +1,39 @@
package responses
import (
"strings"
openaifmt "ds2api/internal/format/openai"
)
type responsesDeltaBatch struct {
runtime *responsesStreamRuntime
kind string
text strings.Builder
}
func (b *responsesDeltaBatch) append(kind, text string) {
if text == "" {
return
}
if b.kind != "" && b.kind != kind {
b.flush()
}
b.kind = kind
b.text.WriteString(text)
}
func (b *responsesDeltaBatch) flush() {
if b.kind == "" || b.text.Len() == 0 {
return
}
text := b.text.String()
switch b.kind {
case "reasoning":
b.runtime.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(b.runtime.responseID, text))
case "text":
b.runtime.emitTextDelta(text)
}
b.kind = ""
b.text.Reset()
}

View File

@@ -222,6 +222,7 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
}
contentSeen := false
batch := responsesDeltaBatch{runtime: s}
for _, p := range parsed.ToolDetectionThinkingParts {
trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text)
if trimmed != "" {
@@ -247,7 +248,7 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
continue
}
s.thinking.WriteString(trimmed)
s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, trimmed))
batch.append("reasoning", trimmed)
continue
}
@@ -269,11 +270,13 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
if trimmed == "" {
continue
}
s.emitTextDelta(trimmed)
batch.append("text", trimmed)
continue
}
batch.flush()
s.processToolStreamEvents(toolstream.ProcessChunk(&s.sieve, rawTrimmed, s.toolNames), true, true)
}
batch.flush()
return streamengine.ParsedDecision{ContentSeen: contentSeen}
}

View File

@@ -109,6 +109,48 @@ func TestHandleResponsesStreamOutputTextDeltaCarriesItemIndexes(t *testing.T) {
}
}
func TestHandleResponsesStreamCoalescesSmallOutputTextDeltas(t *testing.T) {
h := &Handler{}
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
rec := httptest.NewRecorder()
var streamBody strings.Builder
for i := 0; i < 100; i++ {
b, _ := json.Marshal(map[string]any{
"p": "response/content",
"v": "字",
})
streamBody.WriteString("data: ")
streamBody.WriteString(string(b))
streamBody.WriteString("\n")
}
streamBody.WriteString("data: [DONE]\n")
resp := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(streamBody.String())),
}
h.handleResponsesStream(rec, req, resp, "owner-a", "resp_coalesce", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, promptcompat.DefaultToolChoicePolicy(), "")
payloads := extractSSEEventPayloads(rec.Body.String(), "response.output_text.delta")
if len(payloads) == 0 {
t.Fatalf("expected response.output_text.delta payloads, body=%s", rec.Body.String())
}
var content strings.Builder
for _, payload := range payloads {
content.WriteString(asString(payload["delta"]))
}
if got, want := content.String(), strings.Repeat("字", 100); got != want {
t.Fatalf("coalesced response content mismatch: got %q want %q body=%s", got, want, rec.Body.String())
}
if len(payloads) >= 100 {
t.Fatalf("expected coalescing to reduce 100 tiny text deltas, got %d body=%s", len(payloads), rec.Body.String())
}
if !strings.Contains(rec.Body.String(), "event: response.completed") {
t.Fatalf("expected completed event, body=%s", rec.Body.String())
}
}
func TestHandleResponsesStreamEmitsDistinctToolCallIDsAcrossSeparateToolBlocks(t *testing.T) {
h := &Handler{}
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)