From 2671298439ad94fc465360a612ba1555f677a887 Mon Sep 17 00:00:00 2001 From: CJACK Date: Fri, 1 May 2026 13:53:27 +0800 Subject: [PATCH] fix: coalesce small stream deltas to prevent character swallowing; add read-tool cache guard Co-Authored-By: Claude Opus 4.7 --- docs/prompt-compatibility.md | 1 + .../openai/chat/chat_stream_runtime.go | 87 +++++++++---------- .../openai/chat/handler_toolcall_test.go | 43 +++++++++ .../responses/responses_stream_delta_batch.go | 39 +++++++++ .../responses_stream_runtime_core.go | 7 +- .../openai/responses/responses_stream_test.go | 42 +++++++++ internal/js/chat-stream/stream_emitter.js | 59 +++++++++++++ internal/js/chat-stream/vercel_stream_impl.js | 18 ++-- internal/promptcompat/prompt_build_test.go | 52 +++++++++++ internal/promptcompat/tool_prompt.go | 24 +++++ tests/node/chat-stream.test.js | 31 +++++++ 11 files changed, 349 insertions(+), 54 deletions(-) create mode 100644 internal/httpapi/openai/responses/responses_stream_delta_batch.go diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 31e3927..43f2089 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -160,6 +160,7 @@ OpenAI Chat / Responses 在标准化后、current input file 之前,会默认 工具 schema 的权威来源始终是**当前请求实际携带的 schema**,而不是同名工具在其他 runtime(Claude Code / OpenCode / Codex 等)里的默认印象。兼容层现在会同时兼容 OpenAI 风格 `function.parameters`、直接工具对象上的 `parameters` / `input_schema`、以及 camelCase 的 `inputSchema` / `schema`,并在最终输出阶段按这份请求内 schema 决定是保留 array/object,还是仅对明确声明为 `string` 的路径做字符串化。该规则同样适用于 Claude 的流式收尾和 Vercel Node 流式 tool-call formatter,避免不同 runtime 因 schema shape 差异而出现同名工具参数类型漂移。 正例中的工具名只会来自当前请求实际声明的工具;如果当前请求没有足够的已知工具形态,就省略对应的单工具、多工具或嵌套示例,避免把不可用工具名写进 prompt。 对执行类工具,脚本内容必须进入执行参数本身:`Bash` / `execute_command` 使用 `command`,`exec_command` 使用 `cmd`;不要把脚本示范成 `path` / `content` 文件写入参数。 +如果当前请求声明了 `Read` / `read_file` 这类读取工具,兼容层会额外注入一条 read-tool cache guard:当读取结果只表示“文件未变更 / 已在历史中 / 请引用先前上下文 / 没有正文内容”时,模型必须把它视为内容不可用,不能反复调用同一个无正文读取;应改为请求完整正文读取能力,或向用户说明需要重新提供文件内容。这个约束只缓解客户端缓存返回空内容导致的死循环,DS2API 不会也无法凭空恢复客户端本地文件正文。 OpenAI 路径实现: [internal/promptcompat/tool_prompt.go](../internal/promptcompat/tool_prompt.go) diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index 5cc0548..17ff0d5 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -53,6 +53,32 @@ type chatStreamRuntime struct { finalErrorCode string } +type chatDeltaBatch struct { + runtime *chatStreamRuntime + field string + text strings.Builder +} + +func (b *chatDeltaBatch) append(field, text string) { + if text == "" { + return + } + if b.field != "" && b.field != field { + b.flush() + } + b.field = field + b.text.WriteString(text) +} + +func (b *chatDeltaBatch) flush() { + if b.field == "" || b.text.Len() == 0 { + return + } + b.runtime.sendDelta(map[string]any{b.field: b.text.String()}) + b.field = "" + b.text.Reset() +} + func newChatStreamRuntime( w http.ResponseWriter, rc *http.ResponseController, @@ -164,42 +190,22 @@ func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) detected := detectAssistantToolCalls(s.rawText.String(), finalText, s.rawThinking.String(), finalToolDetectionThinking, s.toolNames) if len(detected.Calls) > 0 && !s.toolCallsDoneEmitted { finishReason = "tool_calls" - delta := map[string]any{ + s.sendDelta(map[string]any{ "tool_calls": formatFinalStreamToolCallsWithStableIDs(detected.Calls, s.streamToolCallIDs, s.toolsRaw), - } - if !s.firstChunkSent { - delta["role"] = "assistant" - s.firstChunkSent = true - } - s.sendChunk(openaifmt.BuildChatStreamChunk( - s.completionID, - s.created, - s.model, - []map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)}, - nil, - )) + }) s.toolCallsEmitted = true s.toolCallsDoneEmitted = true } else if s.bufferToolContent { + batch := chatDeltaBatch{runtime: s} for _, evt := range toolstream.Flush(&s.toolSieve, s.toolNames) { if len(evt.ToolCalls) > 0 { + batch.flush() finishReason = "tool_calls" s.toolCallsEmitted = true s.toolCallsDoneEmitted = true - tcDelta := map[string]any{ + s.sendDelta(map[string]any{ "tool_calls": formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs, s.toolsRaw), - } - if !s.firstChunkSent { - tcDelta["role"] = "assistant" - s.firstChunkSent = true - } - s.sendChunk(openaifmt.BuildChatStreamChunk( - s.completionID, - s.created, - s.model, - []map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, tcDelta)}, - nil, - )) + }) s.resetStreamToolCallState() } if evt.Content == "" { @@ -209,21 +215,9 @@ func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) { continue } - delta := map[string]any{ - "content": cleaned, - } - if !s.firstChunkSent { - delta["role"] = "assistant" - s.firstChunkSent = true - } - s.sendChunk(openaifmt.BuildChatStreamChunk( - s.completionID, - s.created, - s.model, - []map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)}, - nil, - )) + batch.append("content", cleaned) } + batch.flush() } if len(detected.Calls) > 0 || s.toolCallsEmitted { @@ -275,6 +269,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD } contentSeen := false + batch := chatDeltaBatch{runtime: s} for _, p := range parsed.ToolDetectionThinkingParts { trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text) if trimmed != "" { @@ -298,7 +293,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD continue } s.thinking.WriteString(trimmed) - s.sendDelta(map[string]any{"reasoning_content": trimmed}) + batch.append("reasoning_content", trimmed) } } else { rawTrimmed := sse.TrimContinuationOverlap(s.rawText.String(), p.Text) @@ -319,7 +314,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD if trimmed == "" { continue } - s.sendDelta(map[string]any{"content": trimmed}) + batch.append("content", trimmed) } else { events := toolstream.ProcessChunk(&s.toolSieve, rawTrimmed, s.toolNames) for _, evt := range events { @@ -335,6 +330,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD if len(formatted) == 0 { continue } + batch.flush() tcDelta := map[string]any{ "tool_calls": formatted, } @@ -343,6 +339,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD continue } if len(evt.ToolCalls) > 0 { + batch.flush() s.toolCallsEmitted = true s.toolCallsDoneEmitted = true tcDelta := map[string]any{ @@ -357,14 +354,12 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD if cleaned == "" || (s.searchEnabled && sse.IsCitation(cleaned)) { continue } - contentDelta := map[string]any{ - "content": cleaned, - } - s.sendDelta(contentDelta) + batch.append("content", cleaned) } } } } } + batch.flush() return streamengine.ParsedDecision{ContentSeen: contentSeen} } diff --git a/internal/httpapi/openai/chat/handler_toolcall_test.go b/internal/httpapi/openai/chat/handler_toolcall_test.go index 5cb9a54..446b480 100644 --- a/internal/httpapi/openai/chat/handler_toolcall_test.go +++ b/internal/httpapi/openai/chat/handler_toolcall_test.go @@ -309,6 +309,49 @@ func TestHandleStreamEmitsSingleChoiceFramesForMultipleParsedParts(t *testing.T) } } +func TestHandleStreamCoalescesSmallContentDeltas(t *testing.T) { + h := &Handler{} + lines := make([]string, 0, 101) + for i := 0; i < 100; i++ { + b, _ := json.Marshal(map[string]any{ + "p": "response/content", + "v": "字", + }) + lines = append(lines, "data: "+string(b)) + } + lines = append(lines, "data: [DONE]") + resp := makeSSEHTTPResponse(lines...) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid-coalesce", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, nil) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + var content strings.Builder + contentDeltaFrames := 0 + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + if len(choices) != 1 { + t.Fatalf("expected exactly one choice per stream frame, got %d frame=%#v body=%s", len(choices), frame, rec.Body.String()) + } + choice, _ := choices[0].(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + contentDeltaFrames++ + content.WriteString(c) + } + } + if got, want := content.String(), strings.Repeat("字", 100); got != want { + t.Fatalf("coalesced stream content mismatch: got %q want %q body=%s", got, want, rec.Body.String()) + } + if contentDeltaFrames >= 100 { + t.Fatalf("expected coalescing to reduce 100 tiny content frames, got %d body=%s", contentDeltaFrames, rec.Body.String()) + } +} + func TestHandleStreamIncompleteCapturedToolJSONFlushesAsTextOnFinalize(t *testing.T) { h := &Handler{} resp := makeSSEHTTPResponse( diff --git a/internal/httpapi/openai/responses/responses_stream_delta_batch.go b/internal/httpapi/openai/responses/responses_stream_delta_batch.go new file mode 100644 index 0000000..84c8c7f --- /dev/null +++ b/internal/httpapi/openai/responses/responses_stream_delta_batch.go @@ -0,0 +1,39 @@ +package responses + +import ( + "strings" + + openaifmt "ds2api/internal/format/openai" +) + +type responsesDeltaBatch struct { + runtime *responsesStreamRuntime + kind string + text strings.Builder +} + +func (b *responsesDeltaBatch) append(kind, text string) { + if text == "" { + return + } + if b.kind != "" && b.kind != kind { + b.flush() + } + b.kind = kind + b.text.WriteString(text) +} + +func (b *responsesDeltaBatch) flush() { + if b.kind == "" || b.text.Len() == 0 { + return + } + text := b.text.String() + switch b.kind { + case "reasoning": + b.runtime.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(b.runtime.responseID, text)) + case "text": + b.runtime.emitTextDelta(text) + } + b.kind = "" + b.text.Reset() +} diff --git a/internal/httpapi/openai/responses/responses_stream_runtime_core.go b/internal/httpapi/openai/responses/responses_stream_runtime_core.go index 4444408..7184c3f 100644 --- a/internal/httpapi/openai/responses/responses_stream_runtime_core.go +++ b/internal/httpapi/openai/responses/responses_stream_runtime_core.go @@ -222,6 +222,7 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa } contentSeen := false + batch := responsesDeltaBatch{runtime: s} for _, p := range parsed.ToolDetectionThinkingParts { trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text) if trimmed != "" { @@ -247,7 +248,7 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa continue } s.thinking.WriteString(trimmed) - s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, trimmed)) + batch.append("reasoning", trimmed) continue } @@ -269,11 +270,13 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa if trimmed == "" { continue } - s.emitTextDelta(trimmed) + batch.append("text", trimmed) continue } + batch.flush() s.processToolStreamEvents(toolstream.ProcessChunk(&s.sieve, rawTrimmed, s.toolNames), true, true) } + batch.flush() return streamengine.ParsedDecision{ContentSeen: contentSeen} } diff --git a/internal/httpapi/openai/responses/responses_stream_test.go b/internal/httpapi/openai/responses/responses_stream_test.go index 34a7677..fa06bd5 100644 --- a/internal/httpapi/openai/responses/responses_stream_test.go +++ b/internal/httpapi/openai/responses/responses_stream_test.go @@ -109,6 +109,48 @@ func TestHandleResponsesStreamOutputTextDeltaCarriesItemIndexes(t *testing.T) { } } +func TestHandleResponsesStreamCoalescesSmallOutputTextDeltas(t *testing.T) { + h := &Handler{} + req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil) + rec := httptest.NewRecorder() + + var streamBody strings.Builder + for i := 0; i < 100; i++ { + b, _ := json.Marshal(map[string]any{ + "p": "response/content", + "v": "字", + }) + streamBody.WriteString("data: ") + streamBody.WriteString(string(b)) + streamBody.WriteString("\n") + } + streamBody.WriteString("data: [DONE]\n") + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(streamBody.String())), + } + + h.handleResponsesStream(rec, req, resp, "owner-a", "resp_coalesce", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, promptcompat.DefaultToolChoicePolicy(), "") + + payloads := extractSSEEventPayloads(rec.Body.String(), "response.output_text.delta") + if len(payloads) == 0 { + t.Fatalf("expected response.output_text.delta payloads, body=%s", rec.Body.String()) + } + var content strings.Builder + for _, payload := range payloads { + content.WriteString(asString(payload["delta"])) + } + if got, want := content.String(), strings.Repeat("字", 100); got != want { + t.Fatalf("coalesced response content mismatch: got %q want %q body=%s", got, want, rec.Body.String()) + } + if len(payloads) >= 100 { + t.Fatalf("expected coalescing to reduce 100 tiny text deltas, got %d body=%s", len(payloads), rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), "event: response.completed") { + t.Fatalf("expected completed event, body=%s", rec.Body.String()) + } +} + func TestHandleResponsesStreamEmitsDistinctToolCallIDsAcrossSeparateToolBlocks(t *testing.T) { h := &Handler{} req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil) diff --git a/internal/js/chat-stream/stream_emitter.js b/internal/js/chat-stream/stream_emitter.js index 442c24e..0046807 100644 --- a/internal/js/chat-stream/stream_emitter.js +++ b/internal/js/chat-stream/stream_emitter.js @@ -1,5 +1,8 @@ 'use strict'; +const MIN_DELTA_FLUSH_CHARS = 160; +const MAX_DELTA_FLUSH_WAIT_MS = 80; + function createChatCompletionEmitter({ res, sessionID, created, model, isClosed }) { let firstChunkSent = false; @@ -34,6 +37,62 @@ function createChatCompletionEmitter({ res, sessionID, created, model, isClosed }; } +function createDeltaCoalescer({ sendDeltaFrame, minFlushChars = MIN_DELTA_FLUSH_CHARS, maxFlushWaitMS = MAX_DELTA_FLUSH_WAIT_MS }) { + let pendingField = ''; + let pendingText = ''; + let flushTimer = null; + + const clearFlushTimer = () => { + if (flushTimer) { + clearTimeout(flushTimer); + flushTimer = null; + } + }; + + const flush = () => { + clearFlushTimer(); + if (!pendingField || !pendingText) { + return; + } + const delta = { [pendingField]: pendingText }; + pendingField = ''; + pendingText = ''; + sendDeltaFrame(delta); + }; + + const scheduleFlush = () => { + if (flushTimer || maxFlushWaitMS <= 0) { + return; + } + flushTimer = setTimeout(flush, maxFlushWaitMS); + if (typeof flushTimer.unref === 'function') { + flushTimer.unref(); + } + }; + + const append = (field, text) => { + if (!field || !text) { + return; + } + if (pendingField && pendingField !== field) { + flush(); + } + pendingField = field; + pendingText += text; + if ([...pendingText].length >= minFlushChars) { + flush(); + return; + } + scheduleFlush(); + }; + + return { + append, + flush, + }; +} + module.exports = { createChatCompletionEmitter, + createDeltaCoalescer, }; diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index 3f34fb6..f6c642d 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -20,7 +20,7 @@ const { boolDefaultTrue, resetStreamToolCallState, } = require('./toolcall_policy'); -const { createChatCompletionEmitter } = require('./stream_emitter'); +const { createChatCompletionEmitter, createDeltaCoalescer } = require('./stream_emitter'); const { asString, isAbortError, @@ -191,6 +191,7 @@ async function handleVercelStream(req, res, rawBody, payload) { model, isClosed: () => clientClosed, }); + const deltaCoalescer = createDeltaCoalescer({ sendDeltaFrame }); const finish = async (reason, options = {}) => { if (ended) { @@ -201,6 +202,7 @@ async function handleVercelStream(req, res, rawBody, payload) { await releaseLease(); return true; } + deltaCoalescer.flush(); const detected = parseStandaloneToolCalls(outputText, toolNames); if (detected.length > 0 && !toolCallsDoneEmitted) { toolCallsEmitted = true; @@ -210,6 +212,7 @@ async function handleVercelStream(req, res, rawBody, payload) { const tailEvents = flushToolSieve(toolSieveState, toolNames); for (const evt of tailEvents) { if (evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0) { + deltaCoalescer.flush(); toolCallsEmitted = true; toolCallsDoneEmitted = true; sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs, payload.tools) }); @@ -217,9 +220,10 @@ async function handleVercelStream(req, res, rawBody, payload) { continue; } if (evt.text) { - sendDeltaFrame({ content: evt.text }); + deltaCoalescer.append('content', evt.text); } } + deltaCoalescer.flush(); } if (detected.length > 0 || toolCallsEmitted) { reason = 'tool_calls'; @@ -327,7 +331,7 @@ async function handleVercelStream(req, res, rawBody, payload) { continue; } thinkingText += trimmed; - sendDeltaFrame({ reasoning_content: trimmed }); + deltaCoalescer.append('reasoning_content', trimmed); } } else { const trimmed = trimContinuationOverlap(outputText, p.text); @@ -339,7 +343,7 @@ async function handleVercelStream(req, res, rawBody, payload) { } outputText += trimmed; if (!toolSieveEnabled) { - sendDeltaFrame({ content: trimmed }); + deltaCoalescer.append('content', trimmed); continue; } const events = processToolSieveChunk(toolSieveState, trimmed, toolNames); @@ -352,19 +356,21 @@ async function handleVercelStream(req, res, rawBody, payload) { const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs); if (formatted.length > 0) { toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatted }); + deltaCoalescer.flush(); + sendDeltaFrame({ tool_calls: formatted }); } continue; } if (evt.type === 'tool_calls') { toolCallsEmitted = true; toolCallsDoneEmitted = true; + deltaCoalescer.flush(); sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs, payload.tools) }); resetStreamToolCallState(streamToolCallIDs, streamToolNames); continue; } if (evt.text) { - sendDeltaFrame({ content: evt.text }); + deltaCoalescer.append('content', evt.text); } } } diff --git a/internal/promptcompat/prompt_build_test.go b/internal/promptcompat/prompt_build_test.go index b649fea..28da8e0 100644 --- a/internal/promptcompat/prompt_build_test.go +++ b/internal/promptcompat/prompt_build_test.go @@ -88,6 +88,58 @@ func TestBuildOpenAIFinalPrompt_VercelPreparePathKeepsFinalAnswerInstruction(t * } } +func TestBuildOpenAIFinalPromptReadLikeToolIncludesCacheGuard(t *testing.T) { + messages := []any{ + map[string]any{"role": "user", "content": "请读取文件"}, + } + tools := []any{ + map[string]any{ + "type": "function", + "function": map[string]any{ + "name": "read_file", + "description": "Read a file", + "parameters": map[string]any{ + "type": "object", + }, + }, + }, + } + + finalPrompt, _ := buildOpenAIFinalPrompt(messages, tools, "", false) + if !strings.Contains(finalPrompt, "Read-tool cache guard") { + t.Fatalf("read-like tool prompt missing cache guard: %q", finalPrompt) + } + if !strings.Contains(finalPrompt, "provides no file body") { + t.Fatalf("read-like tool prompt missing no-body handling: %q", finalPrompt) + } + if !strings.Contains(finalPrompt, "Do not repeatedly call the same read request") { + t.Fatalf("read-like tool prompt missing loop guard: %q", finalPrompt) + } +} + +func TestBuildOpenAIFinalPromptNonReadToolOmitsCacheGuard(t *testing.T) { + messages := []any{ + map[string]any{"role": "user", "content": "搜索一下"}, + } + tools := []any{ + map[string]any{ + "type": "function", + "function": map[string]any{ + "name": "search", + "description": "Search docs", + "parameters": map[string]any{ + "type": "object", + }, + }, + }, + } + + finalPrompt, _ := buildOpenAIFinalPrompt(messages, tools, "", false) + if strings.Contains(finalPrompt, "Read-tool cache guard") { + t.Fatalf("non-read tool prompt should not include read cache guard: %q", finalPrompt) + } +} + func TestBuildOpenAIFinalPromptWithThinkingKeepsPromptUnchanged(t *testing.T) { messages := []any{ map[string]any{"role": "user", "content": "继续回答上一个问题"}, diff --git a/internal/promptcompat/tool_prompt.go b/internal/promptcompat/tool_prompt.go index 95d2f8b..4e5d03f 100644 --- a/internal/promptcompat/tool_prompt.go +++ b/internal/promptcompat/tool_prompt.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "strings" + "unicode" "ds2api/internal/toolcall" ) @@ -46,6 +47,9 @@ func injectToolPrompt(messages []map[string]any, tools []any, policy ToolChoiceP return messages, names } toolPrompt := "You have access to these tools:\n\n" + strings.Join(toolSchemas, "\n\n") + "\n\n" + toolcall.BuildToolCallInstructions(names) + if hasReadLikeTool(names) { + toolPrompt += "\n\nRead-tool cache guard: If a Read/read_file-style tool result says the file is unchanged, already available in history, should be referenced from previous context, or otherwise provides no file body, treat that result as missing content. Do not repeatedly call the same read request for that missing body. Request a full-content read if the tool supports it, or tell the user that the file contents need to be provided again." + } if policy.Mode == ToolChoiceRequired { toolPrompt += "\n7) For this response, you MUST call at least one tool from the allowed list." } @@ -64,3 +68,23 @@ func injectToolPrompt(messages []map[string]any, tools []any, policy ToolChoiceP messages = append([]map[string]any{{"role": "system", "content": toolPrompt}}, messages...) return messages, names } + +func hasReadLikeTool(names []string) bool { + for _, name := range names { + switch normalizeToolNameForGuard(name) { + case "read", "readfile": + return true + } + } + return false +} + +func normalizeToolNameForGuard(name string) string { + var b strings.Builder + for _, r := range strings.ToLower(strings.TrimSpace(name)) { + if unicode.IsLetter(r) || unicode.IsDigit(r) { + b.WriteRune(r) + } + } + return b.String() +} diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index b7f75d2..53ddeaf 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -210,6 +210,37 @@ test('vercel stream retries empty output once and keeps one terminal frame', asy assert.match(completionBodies[1].prompt, /Previous reply had no visible output\. Please regenerate the visible final answer or tool call now\.$/); }); +test('vercel stream coalesces many small content deltas while keeping one choice', async () => { + const lines = Array.from({ length: 100 }, () => `data: ${JSON.stringify({ p: 'response/content', v: '字' })}\n\n`); + lines.push('data: [DONE]\n\n'); + const { frames } = await runMockVercelStream(lines); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + const contentFrames = parsed.filter((item) => item.choices?.[0]?.delta?.content); + const content = contentFrames.map((item) => item.choices[0].delta.content).join(''); + assert.equal(content, '字'.repeat(100)); + assert.ok(contentFrames.length < 100, `expected fewer than 100 content frames, got ${contentFrames.length}`); + for (const item of parsed) { + assert.equal(item.choices.length, 1); + } +}); + +test('vercel stream flushes reasoning before content and before stop', async () => { + const { frames } = await runMockVercelStream([ + `data: ${JSON.stringify({ p: 'response/fragments', o: 'APPEND', v: [ + { type: 'THINK', content: '思考' }, + { type: 'THINK', content: '过程' }, + { type: 'RESPONSE', content: '回答' }, + ] })}\n\n`, + 'data: [DONE]\n\n', + ], { thinking_enabled: true }); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + const reasoning = parsed.map((item) => item.choices?.[0]?.delta?.reasoning_content || '').join(''); + const content = parsed.map((item) => item.choices?.[0]?.delta?.content || '').join(''); + assert.equal(reasoning, '思考过程'); + assert.equal(content, '回答'); + assert.equal(parsed.at(-1).choices[0].finish_reason, 'stop'); +}); + test('vercel stream exhausts DeepSeek continue before synthetic retry', async () => { const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([ [