From 7c66742a19599730ce4b2758d5a330c90d25702f Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 10 May 2026 00:10:53 +0800 Subject: [PATCH] refactor: unify empty-output retry logic into shared completionruntime package and normalize protocol adapter boundary. --- AGENTS.md | 7 + docs/prompt-compatibility.md | 4 +- internal/assistantturn/turn.go | 2 +- internal/assistantturn/turn_test.go | 8 + internal/completionruntime/nonstream.go | 6 +- internal/completionruntime/nonstream_test.go | 2 +- internal/completionruntime/stream_retry.go | 118 +++++++++ .../completionruntime/stream_retry_test.go | 62 +++++ internal/httpapi/claude/handler_messages.go | 111 ++++++++- .../httpapi/claude/stream_runtime_core.go | 10 +- .../httpapi/claude/stream_runtime_emit.go | 15 +- .../httpapi/claude/stream_runtime_finalize.go | 27 ++- internal/httpapi/gemini/handler_generate.go | 2 +- .../httpapi/gemini/handler_stream_runtime.go | 124 +++++++++- .../openai/chat/empty_retry_runtime.go | 228 +++++------------- internal/httpapi/openai/chat/handler.go | 16 -- .../openai/chat/handler_toolcall_test.go | 15 +- .../httpapi/openai/chat/ref_file_tokens.go | 26 -- .../openai/responses/empty_retry_runtime.go | 60 +++-- internal/httpapi/openai/responses/handler.go | 8 - .../openai/responses/responses_stream_test.go | 14 +- .../httpapi/openai/shared/upstream_empty.go | 2 +- internal/httpapi/openai/stream_status_test.go | 23 +- internal/js/chat-stream/vercel_stream_impl.js | 6 +- .../stream-tool-sieve/parse_payload.js | 149 +++++++++--- internal/toolcall/toolcalls_dsml.go | 34 ++- internal/toolcall/toolcalls_markup.go | 2 +- internal/toolcall/toolcalls_parse_markup.go | 76 ++++-- internal/toolcall/toolcalls_scan.go | 85 +++++-- internal/toolcall/toolcalls_test.go | 21 ++ tests/node/chat-stream.test.js | 21 +- tests/node/stream-tool-sieve.test.js | 17 ++ 32 files changed, 930 insertions(+), 371 deletions(-) create mode 100644 internal/completionruntime/stream_retry.go create mode 100644 internal/completionruntime/stream_retry_test.go delete mode 100644 internal/httpapi/openai/chat/ref_file_tokens.go diff --git a/AGENTS.md b/AGENTS.md index 1c71307..664f3f0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -22,6 +22,13 @@ These rules apply to all agent-made changes in this repository. - Keep changes additive and tightly scoped to the requested feature or bugfix. - Do not mix unrelated refactors into feature PRs unless they are required to make the change pass gates. +## Protocol Adapter Boundary + +- Do not let OpenAI Chat, OpenAI Responses, Claude, Gemini, or other interface protocol formatting own shared business behavior. +- Normalize protocol-specific request shapes into the project standard request/turn model first, run shared business logic in one place, then render back to the target protocol at the boundary. +- Business logic that must stay globally consistent includes empty-output retry, thinking/reasoning handling, tool-call detection and policy, usage accounting, current-input-file injection, history persistence, file/reference handling, and completion payload assembly. +- If a behavior must differ by protocol, keep the difference as an explicit adapter/rendering concern and document why it cannot live in the shared normalized path. + ## Documentation Sync - When business logic or user-visible behavior changes, update the corresponding documentation in the same change. diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index f64d57b..d7bd479 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -112,7 +112,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools` - Vercel Node 流式路径本轮不迁移,仍使用现有 Node bridge / stream-tool-sieve 实现;后续若变更 Node 流式语义,需要按 `assistantturn` 的 Go canonical 输出语义同步对齐。 - 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。未显式关闭时,各 surface 会按解析后的 DeepSeek 模型默认能力开启 thinking,并用各自协议的原生形态暴露:OpenAI Chat 为 `reasoning_content`,OpenAI Responses 为 `response.reasoning.delta` / `reasoning` content,Claude 为 `thinking` block / `thinking_delta`,Gemini 为 `thought: true` part。 - 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;真正的工具识别始终基于原始上游文本,而不是基于“已经做过可见输出清洗”的版本,因此即使最终可见层会剥离完整 leaked DSML / XML `tool_calls` wrapper、并抑制全空参数或无效 wrapper 块,也不会影响真实工具调用转成结构化 `tool_calls` / `function_call`。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。 -- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW。 +- OpenAI Chat / Responses、Claude Messages、Gemini generateContent 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。Go 主路径的非流式重试由 `completionruntime.ExecuteNonStreamWithRetry` 统一处理;流式重试由 `completionruntime.ExecuteStreamWithRetry` 统一处理,各协议 runtime 只负责消费/渲染本协议 SSE framing。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍没有任何输出,终端错误为 503 `upstream_unavailable`;若有 reasoning 但没有可见正文或工具调用,仍返回 429 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW。 - 非流式 OpenAI Chat / Responses、Claude Messages、Gemini generateContent 在最终可见正文渲染阶段,会把 DeepSeek 搜索返回中的 `[citation:N]` / `[reference:N]` 标记替换成对应 Markdown 链接。`citation` 标记按一基序号解析;`reference` 标记只有在同一段正文中出现 `[reference:0]`(允许冒号后有空格)时才按零基序号映射,并且不会影响同段正文里的 `citation` 标记。 - 流式输出仍默认隐藏 `[citation:N]` / `[reference:N]` 这类上游内部标记,避免分片输出中泄漏尚未完成映射的引用占位符。 @@ -168,7 +168,7 @@ OpenAI Chat / Responses 在标准化后、current input file 之前,会默认 4. 把这整段内容并入 system prompt。 工具调用正例现在优先示范官方 DSML 风格:`<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`。 -兼容层仍接受旧式纯 `` wrapper,并会容错若干 DSML 标签变体,包括短横线形式 `` / `` / ``、下划线形式 `` / `` / ``,以及其他前缀分隔形态如 `` / `` / ``;但提示词会优先要求模型输出官方 DSML 标签,并强调不能只输出 closing wrapper 而漏掉 opening tag。需要注意:这是“兼容 DSML 外壳,内部仍以 XML 解析语义为准”,不是原生 DSML 全链路实现;这些别名会在解析入口归一化回现有 XML 标签后继续走同一套 parser。解析器会先截获非代码块中的疑似工具 wrapper,完整解析失败或工具语义无效时再按普通文本放行。 +兼容层仍接受旧式纯 `` wrapper,并会容错若干 DSML 标签变体,包括短横线形式 `` / `` / ``、下划线形式 `` / `` / ``,以及其他前缀分隔形态如 `` / `` / ``;标签壳扫描还会把全角 ASCII 漂移归一化,例如 `<dSML|tool_calls>` 与全角 `>` 结束符。但提示词会优先要求模型输出官方 DSML 标签,并强调不能只输出 closing wrapper 而漏掉 opening tag。需要注意:这是“兼容 DSML 外壳,内部仍以 XML 解析语义为准”,不是原生 DSML 全链路实现;这些别名会在解析入口归一化回现有 XML 标签后继续走同一套 parser。解析器会先截获非代码块中的疑似工具 wrapper,完整解析失败或工具语义无效时再按普通文本放行。 数组参数使用 `...` 子节点表示;当某个参数体只包含 item 子节点时,Go / Node 解析器会把它还原成数组,避免 `questions` / `options` 这类 schema 中要求 array 的参数被误解析成 `{ "item": ... }` 对象。除此之外,解析器还会回收一些更松散的列表写法,例如 JSON array 字面量或逗号分隔的 JSON 项序列,只要它们足够明确;但 `` 仍然是首选形态。若模型把完整结构化 XML fragment 误包进 CDATA,兼容层会在保护 `content` / `command` 等原文字段的前提下,尝试把非原文字段中的 CDATA XML fragment 还原成 object / array。不过,如果 CDATA 只是单个平面的 XML/HTML 标签,例如 `urgent` 这种行内标记,兼容层会保留原始字符串,不会强行升成 object / array;只有明显表示结构的 CDATA 片段,例如多兄弟节点、嵌套子节点或 `item` 列表,才会触发结构化恢复。对 `command` / `content` 等长文本参数,CDATA 内部的 Markdown fenced DSML / XML 示例会作为原文保护;示例里的 `]]>` 或 `` 不会截断外层工具调用,解析器会继续等待围栏外真正的参数 / wrapper 结束标签。 Go 侧读取 DeepSeek SSE 时不再依赖 `bufio.Scanner` 的固定 2MiB 单行上限;当写文件类工具把很长的 `content` 放在单个 `data:` 行里返回时,非流式收集、流式解析和 auto-continue 透传都会保留完整行,再进入同一套工具解析与序列化流程。 在 assistant 最终回包阶段,如果某个 tool 参数在声明 schema 中明确是 `string`,兼容层会在把解析后的 `tool_calls` / `function_call` 重新序列化成 OpenAI / Responses / Claude 可见参数前,递归把该路径上的 number / bool / object / array 统一转成字符串;其中 object / array 会压成紧凑 JSON 字符串。这个保护只对 schema 明确声明为 string 的路径生效,不会改写本来就是 `number` / `boolean` / `object` / `array` 的参数。这样可以兼容 DeepSeek 输出了结构化片段、但上游客户端工具 schema 又严格要求字符串参数的场景(例如 `content`、`prompt`、`path`、`taskId` 等)。 diff --git a/internal/assistantturn/turn.go b/internal/assistantturn/turn.go index bc8bd19..b329e65 100644 --- a/internal/assistantturn/turn.go +++ b/internal/assistantturn/turn.go @@ -218,7 +218,7 @@ func UpstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, if strings.TrimSpace(thinking) != "" { return http.StatusTooManyRequests, "Upstream account hit a rate limit and returned reasoning without visible output.", "upstream_empty_output" } - return http.StatusTooManyRequests, "Upstream account hit a rate limit and returned empty output.", "upstream_empty_output" + return http.StatusServiceUnavailable, "Upstream service is unavailable and returned no output.", "upstream_unavailable" } // ShouldRetryEmptyOutput returns true when the turn produced no visible text diff --git a/internal/assistantturn/turn_test.go b/internal/assistantturn/turn_test.go index 4fa6c99..b2f9445 100644 --- a/internal/assistantturn/turn_test.go +++ b/internal/assistantturn/turn_test.go @@ -1,6 +1,7 @@ package assistantturn import ( + "net/http" "testing" "ds2api/internal/promptcompat" @@ -70,6 +71,13 @@ func TestBuildTurnFromCollectedThinkingOnlyIsEmptyOutput(t *testing.T) { } } +func TestBuildTurnFromCollectedPureEmptyOutputIsUpstreamUnavailable(t *testing.T) { + turn := BuildTurnFromCollected(sse.CollectResult{}, BuildOptions{}) + if turn.Error == nil || turn.Error.Status != http.StatusServiceUnavailable || turn.Error.Code != "upstream_unavailable" { + t.Fatalf("expected upstream unavailable error, got %#v", turn.Error) + } +} + func TestBuildTurnFromCollectedToolChoiceRequired(t *testing.T) { turn := BuildTurnFromCollected(sse.CollectResult{Text: "hello"}, BuildOptions{ ToolChoice: promptcompat.ToolChoicePolicy{Mode: promptcompat.ToolChoiceRequired}, diff --git a/internal/completionruntime/nonstream.go b/internal/completionruntime/nonstream.go index 83709ca..ee31c0b 100644 --- a/internal/completionruntime/nonstream.go +++ b/internal/completionruntime/nonstream.go @@ -90,7 +90,11 @@ func ExecuteNonStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.R if startErr != nil { return NonStreamResult{SessionID: start.SessionID, Payload: start.Payload}, startErr } - stdReq = start.Request + return ExecuteNonStreamStartedWithRetry(ctx, ds, a, start, opts) +} + +func ExecuteNonStreamStartedWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, start StartResult, opts Options) (NonStreamResult, *assistantturn.OutputError) { + stdReq := start.Request maxAttempts := opts.MaxAttempts if maxAttempts <= 0 { maxAttempts = 3 diff --git a/internal/completionruntime/nonstream_test.go b/internal/completionruntime/nonstream_test.go index e10b927..36461ad 100644 --- a/internal/completionruntime/nonstream_test.go +++ b/internal/completionruntime/nonstream_test.go @@ -91,7 +91,7 @@ func TestExecuteNonStreamWithRetryBuildsCanonicalTurn(t *testing.T) { func TestExecuteNonStreamWithRetryUsesParentMessageForEmptyRetry(t *testing.T) { ds := &fakeDeepSeekCaller{responses: []*http.Response{ - sseHTTPResponse(http.StatusOK, `data: {"response_message_id":77,"p":"response/status","v":"FINISHED"}`), + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":77,"p":"response/thinking_content","v":"plan"}`), sseHTTPResponse(http.StatusOK, `data: {"response_message_id":78,"p":"response/content","v":"ok"}`), }} stdReq := promptcompat.StandardRequest{ diff --git a/internal/completionruntime/stream_retry.go b/internal/completionruntime/stream_retry.go new file mode 100644 index 0000000..27a7ebc --- /dev/null +++ b/internal/completionruntime/stream_retry.go @@ -0,0 +1,118 @@ +package completionruntime + +import ( + "context" + "io" + "net/http" + "strings" + + "ds2api/internal/auth" + "ds2api/internal/config" + "ds2api/internal/httpapi/openai/shared" +) + +type StreamRetryOptions struct { + Surface string + Stream bool + RetryEnabled bool + RetryMaxAttempts int + MaxAttempts int + UsagePrompt string +} + +type StreamRetryHooks struct { + ConsumeAttempt func(resp *http.Response, allowDeferEmpty bool) (terminalWritten bool, retryable bool) + Finalize func(attempts int) + ParentMessageID func() int + OnRetry func(attempts int) + OnRetryPrompt func(prompt string) + OnRetryFailure func(status int, message, code string) + OnTerminal func(attempts int) +} + +func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, initialResp *http.Response, payload map[string]any, pow string, opts StreamRetryOptions, hooks StreamRetryHooks) { + if hooks.ConsumeAttempt == nil { + return + } + surface := strings.TrimSpace(opts.Surface) + if surface == "" { + surface = "completion" + } + maxAttempts := opts.MaxAttempts + if maxAttempts <= 0 { + maxAttempts = 3 + } + retryMax := opts.RetryMaxAttempts + if retryMax <= 0 { + retryMax = shared.EmptyOutputRetryMaxAttempts() + } + + attempts := 0 + currentResp := initialResp + for { + terminalWritten, retryable := hooks.ConsumeAttempt(currentResp, opts.RetryEnabled && attempts < retryMax) + if terminalWritten { + if hooks.OnTerminal != nil { + hooks.OnTerminal(attempts) + } + return + } + if !retryable || !opts.RetryEnabled || attempts >= retryMax { + if hooks.Finalize != nil { + hooks.Finalize(attempts) + } + return + } + + attempts++ + parentMessageID := 0 + if hooks.ParentMessageID != nil { + parentMessageID = hooks.ParentMessageID() + } + config.Logger.Info("[completion_runtime_empty_retry] attempting synthetic retry", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "parent_message_id", parentMessageID) + retryPow, powErr := ds.GetPow(ctx, a, maxAttempts) + if powErr != nil { + config.Logger.Warn("[completion_runtime_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", powErr) + retryPow = pow + } + nextResp, err := ds.CallCompletion(ctx, a, shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID), retryPow, maxAttempts) + if err != nil { + if hooks.OnRetryFailure != nil { + hooks.OnRetryFailure(http.StatusInternalServerError, "Failed to get completion.", "error") + } + config.Logger.Warn("[completion_runtime_empty_retry] retry request failed", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", err) + return + } + if nextResp.StatusCode != http.StatusOK { + body, readErr := io.ReadAll(nextResp.Body) + if readErr != nil { + config.Logger.Warn("[completion_runtime_empty_retry] retry error body read failed", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", readErr) + } + closeRetryBody(surface, nextResp.Body) + msg := strings.TrimSpace(string(body)) + if msg == "" { + msg = http.StatusText(nextResp.StatusCode) + } + if hooks.OnRetryFailure != nil { + hooks.OnRetryFailure(nextResp.StatusCode, msg, "error") + } + return + } + if hooks.OnRetry != nil { + hooks.OnRetry(attempts) + } + if hooks.OnRetryPrompt != nil { + hooks.OnRetryPrompt(shared.UsagePromptWithEmptyOutputRetry(opts.UsagePrompt, attempts)) + } + currentResp = nextResp + } +} + +func closeRetryBody(surface string, body io.Closer) { + if body == nil { + return + } + if err := body.Close(); err != nil { + config.Logger.Warn("[completion_runtime_empty_retry] retry response body close failed", "surface", surface, "error", err) + } +} diff --git a/internal/completionruntime/stream_retry_test.go b/internal/completionruntime/stream_retry_test.go new file mode 100644 index 0000000..7340dca --- /dev/null +++ b/internal/completionruntime/stream_retry_test.go @@ -0,0 +1,62 @@ +package completionruntime + +import ( + "context" + "io" + "net/http" + "strings" + "testing" + + "ds2api/internal/auth" + "ds2api/internal/httpapi/openai/shared" +) + +func TestExecuteStreamWithRetryUsesSharedRetryPayloadAndUsagePrompt(t *testing.T) { + ds := &fakeDeepSeekCaller{responses: []*http.Response{ + sseHTTPResponse(http.StatusOK, `data: {"p":"response/content","v":"ok"}`), + }} + initial := sseHTTPResponse(http.StatusOK, `data: {"response_message_id":77,"p":"response/thinking_content","v":"plan"}`) + payload := map[string]any{"prompt": "original prompt"} + attemptsSeen := 0 + retryPrompt := "" + + ExecuteStreamWithRetry(context.Background(), ds, &auth.RequestAuth{}, initial, payload, "pow", StreamRetryOptions{ + Surface: "test.stream", + Stream: true, + RetryEnabled: true, + UsagePrompt: "original prompt", + }, StreamRetryHooks{ + ConsumeAttempt: func(resp *http.Response, allowDeferEmpty bool) (bool, bool) { + defer func() { + if err := resp.Body.Close(); err != nil { + t.Fatalf("close failed: %v", err) + } + }() + _, _ = io.ReadAll(resp.Body) + attemptsSeen++ + return attemptsSeen == 2, attemptsSeen == 1 && allowDeferEmpty + }, + ParentMessageID: func() int { + return 77 + }, + OnRetryPrompt: func(prompt string) { + retryPrompt = prompt + }, + }) + + if attemptsSeen != 2 { + t.Fatalf("expected two stream attempts, got %d", attemptsSeen) + } + if len(ds.payloads) != 1 { + t.Fatalf("expected one retry completion call, got %d", len(ds.payloads)) + } + if got := ds.payloads[0]["parent_message_id"]; got != 77 { + t.Fatalf("retry parent_message_id mismatch: %#v", got) + } + if prompt, _ := ds.payloads[0]["prompt"].(string); !strings.Contains(prompt, shared.EmptyOutputRetrySuffix) { + t.Fatalf("expected retry suffix in payload prompt, got %q", prompt) + } + if !strings.Contains(retryPrompt, shared.EmptyOutputRetrySuffix) { + t.Fatalf("expected retry suffix in usage prompt, got %q", retryPrompt) + } +} diff --git a/internal/httpapi/claude/handler_messages.go b/internal/httpapi/claude/handler_messages.go index 8478dc7..e22a1ed 100644 --- a/internal/httpapi/claude/handler_messages.go +++ b/internal/httpapi/claude/handler_messages.go @@ -145,7 +145,7 @@ func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Reques return } streamReq := start.Request - h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) + h.handleClaudeStreamRealtimeWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.PromptTokenText, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool { @@ -360,3 +360,112 @@ func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Requ OnFinalize: streamRuntime.onFinalize, }) } + +func (h *Handler) handleClaudeStreamRealtimeWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, promptTokenText string, historySession *responsehistory.Session) { + if resp.StatusCode != http.StatusOK { + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") + } + writeClaudeError(w, http.StatusInternalServerError, string(body)) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache, no-transform") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + _, canFlush := w.(http.Flusher) + if !canFlush { + config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered") + } + + streamRuntime := newClaudeStreamRuntime( + w, + rc, + canFlush, + model, + messages, + thinkingEnabled, + searchEnabled, + stripReferenceMarkersEnabled(), + toolNames, + toolsRaw, + promptTokenText, + historySession, + ) + streamRuntime.sendMessageStart() + + completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ + Surface: "claude.messages", + Stream: true, + RetryEnabled: true, + MaxAttempts: 3, + UsagePrompt: promptTokenText, + }, completionruntime.StreamRetryHooks{ + ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { + return h.consumeClaudeStreamAttempt(r, currentResp, streamRuntime, thinkingEnabled, allowDeferEmpty) + }, + Finalize: func(_ int) { + streamRuntime.finalize("end_turn", false) + }, + ParentMessageID: func() int { + return streamRuntime.responseMessageID + }, + OnRetryPrompt: func(prompt string) { + streamRuntime.promptTokenText = prompt + }, + OnRetryFailure: func(status int, message, code string) { + streamRuntime.sendErrorWithCode(status, strings.TrimSpace(message), code) + }, + }) +} + +func (h *Handler) consumeClaudeStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *claudeStreamRuntime, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) { + defer func() { _ = resp.Body.Close() }() + initialType := "text" + if thinkingEnabled { + initialType = "thinking" + } + finalReason := streamengine.StopReason("") + var scannerErr error + streamengine.ConsumeSSE(streamengine.ConsumeConfig{ + Context: r.Context(), + Body: resp.Body, + ThinkingEnabled: thinkingEnabled, + InitialType: initialType, + KeepAliveInterval: claudeStreamPingInterval, + IdleTimeout: claudeStreamIdleTimeout, + MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt, + }, streamengine.ConsumeHooks{ + OnKeepAlive: func() { + streamRuntime.sendPing() + }, + OnParsed: streamRuntime.onParsed, + OnFinalize: func(reason streamengine.StopReason, err error) { + finalReason = reason + scannerErr = err + }, + }) + if string(finalReason) == "upstream_error" { + if streamRuntime.history != nil { + streamRuntime.history.Error(500, streamRuntime.upstreamErr, "upstream_error", responsehistory.ThinkingForArchive(streamRuntime.rawThinking.String(), streamRuntime.toolDetectionThinking.String(), streamRuntime.thinking.String()), responsehistory.TextForArchive(streamRuntime.rawText.String(), streamRuntime.text.String())) + } + streamRuntime.sendError(streamRuntime.upstreamErr) + return true, false + } + if scannerErr != nil { + if streamRuntime.history != nil { + streamRuntime.history.Error(500, scannerErr.Error(), "error", responsehistory.ThinkingForArchive(streamRuntime.rawThinking.String(), streamRuntime.toolDetectionThinking.String(), streamRuntime.thinking.String()), responsehistory.TextForArchive(streamRuntime.rawText.String(), streamRuntime.text.String())) + } + streamRuntime.sendError(scannerErr.Error()) + return true, false + } + terminalWritten := streamRuntime.finalize("end_turn", allowDeferEmpty) + if terminalWritten { + return true, false + } + return false, true +} diff --git a/internal/httpapi/claude/stream_runtime_core.go b/internal/httpapi/claude/stream_runtime_core.go index 9c9e656..c558601 100644 --- a/internal/httpapi/claude/stream_runtime_core.go +++ b/internal/httpapi/claude/stream_runtime_core.go @@ -29,9 +29,10 @@ type claudeStreamRuntime struct { bufferToolContent bool stripReferenceMarkers bool - messageID string - thinking strings.Builder - text strings.Builder + messageID string + thinking strings.Builder + text strings.Builder + responseMessageID int sieve toolstream.State rawText strings.Builder @@ -92,6 +93,9 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse s.upstreamErr = parsed.ErrorMessage return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("upstream_error")} } + if parsed.ResponseMessageID > 0 { + s.responseMessageID = parsed.ResponseMessageID + } if parsed.Stop { return streamengine.ParsedDecision{Stop: true} } diff --git a/internal/httpapi/claude/stream_runtime_emit.go b/internal/httpapi/claude/stream_runtime_emit.go index e071cdc..7425a55 100644 --- a/internal/httpapi/claude/stream_runtime_emit.go +++ b/internal/httpapi/claude/stream_runtime_emit.go @@ -22,16 +22,27 @@ func (s *claudeStreamRuntime) send(event string, v any) { } func (s *claudeStreamRuntime) sendError(message string) { + s.sendErrorWithCode(500, message, "internal_error") +} + +func (s *claudeStreamRuntime) sendErrorWithCode(status int, message, code string) { msg := strings.TrimSpace(message) if msg == "" { msg = "upstream stream error" } + if code == "" { + code = "internal_error" + } + errType := "api_error" + if status == 429 { + errType = "rate_limit_error" + } s.send("error", map[string]any{ "type": "error", "error": map[string]any{ - "type": "api_error", + "type": errType, "message": msg, - "code": "internal_error", + "code": code, "param": nil, }, }) diff --git a/internal/httpapi/claude/stream_runtime_finalize.go b/internal/httpapi/claude/stream_runtime_finalize.go index f63b125..07be629 100644 --- a/internal/httpapi/claude/stream_runtime_finalize.go +++ b/internal/httpapi/claude/stream_runtime_finalize.go @@ -63,13 +63,10 @@ func (s *claudeStreamRuntime) sendToolUseBlock(idx int, tc toolcall.ParsedToolCa }) } -func (s *claudeStreamRuntime) finalize(stopReason string) { +func (s *claudeStreamRuntime) finalize(stopReason string, deferEmptyOutput bool) bool { if s.ended { - return + return true } - s.ended = true - - s.closeThinkingBlock() if s.bufferToolContent { for _, evt := range toolstream.Flush(&s.sieve, s.toolNames) { @@ -123,6 +120,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { RawThinking: s.rawThinking.String(), VisibleThinking: s.thinking.String(), DetectionThinking: s.toolDetectionThinking.String(), + ResponseMessageID: s.responseMessageID, AlreadyEmittedCalls: s.toolCallsDetected, AlreadyEmittedToolRaw: s.toolCallsDetected, }, assistantturn.BuildOptions{ @@ -137,6 +135,22 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{ AlreadyEmittedToolCalls: s.toolCallsDetected, }) + if outcome.ShouldFail { + if deferEmptyOutput { + return false + } + s.ended = true + s.closeThinkingBlock() + s.closeTextBlock() + if s.history != nil { + s.history.Error(outcome.Error.Status, outcome.Error.Message, outcome.Error.Code, responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), responsehistory.TextForArchive(turn.RawText, turn.Text)) + } + s.sendErrorWithCode(outcome.Error.Status, outcome.Error.Message, outcome.Error.Code) + return true + } + + s.ended = true + s.closeThinkingBlock() if s.bufferToolContent && !s.toolCallsDetected { if len(turn.ToolCalls) > 0 { @@ -197,6 +211,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) { }, }) s.send("message_stop", map[string]any{"type": "message_stop"}) + return true } func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scannerErr error) { @@ -214,5 +229,5 @@ func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scanner s.sendError(scannerErr.Error()) return } - s.finalize("end_turn") + s.finalize("end_turn", false) } diff --git a/internal/httpapi/gemini/handler_generate.go b/internal/httpapi/gemini/handler_generate.go index b2a4114..784ff75 100644 --- a/internal/httpapi/gemini/handler_generate.go +++ b/internal/httpapi/gemini/handler_generate.go @@ -137,7 +137,7 @@ func (h *Handler) handleGeminiDirectStream(w http.ResponseWriter, r *http.Reques return } streamReq := start.Request - h.handleStreamGenerateContent(w, r, start.Response, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) + h.handleStreamGenerateContentWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, stream bool) bool { diff --git a/internal/httpapi/gemini/handler_stream_runtime.go b/internal/httpapi/gemini/handler_stream_runtime.go index de80fab..a1244ad 100644 --- a/internal/httpapi/gemini/handler_stream_runtime.go +++ b/internal/httpapi/gemini/handler_stream_runtime.go @@ -1,6 +1,7 @@ package gemini import ( + "context" "encoding/json" "io" "net/http" @@ -8,6 +9,8 @@ import ( "time" "ds2api/internal/assistantturn" + "ds2api/internal/auth" + "ds2api/internal/completionruntime" dsprotocol "ds2api/internal/deepseek/protocol" "ds2api/internal/responsehistory" "ds2api/internal/sse" @@ -54,7 +57,7 @@ func (h *Handler) handleStreamGenerateContent(w http.ResponseWriter, r *http.Req }, streamengine.ConsumeHooks{ OnParsed: runtime.onParsed, OnFinalize: func(_ streamengine.StopReason, _ error) { - runtime.finalize() + runtime.finalize(false) }, }) } @@ -78,9 +81,83 @@ type geminiStreamRuntime struct { accumulator *assistantturn.Accumulator contentFilter bool responseMessageID int + finalErrorStatus int + finalErrorMessage string + finalErrorCode string history *responsehistory.Session } +func (h *Handler) handleStreamGenerateContentWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *responsehistory.Session) { + if resp.StatusCode != http.StatusOK { + defer func() { _ = resp.Body.Close() }() + body, _ := io.ReadAll(resp.Body) + if historySession != nil { + historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") + } + writeGeminiError(w, resp.StatusCode, strings.TrimSpace(string(body))) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache, no-transform") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + rc := http.NewResponseController(w) + _, canFlush := w.(http.Flusher) + runtime := newGeminiStreamRuntime(w, rc, canFlush, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, historySession) + + completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ + Surface: "gemini.generate_content", + Stream: true, + RetryEnabled: true, + MaxAttempts: 3, + UsagePrompt: finalPrompt, + }, completionruntime.StreamRetryHooks{ + ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { + return h.consumeGeminiStreamAttempt(r.Context(), currentResp, runtime, thinkingEnabled, allowDeferEmpty) + }, + Finalize: func(_ int) { + runtime.finalize(false) + }, + ParentMessageID: func() int { + return runtime.responseMessageID + }, + OnRetryPrompt: func(prompt string) { + runtime.finalPrompt = prompt + }, + OnRetryFailure: func(status int, message, _ string) { + runtime.sendErrorChunk(status, strings.TrimSpace(message)) + }, + }) +} + +func (h *Handler) consumeGeminiStreamAttempt(ctx context.Context, resp *http.Response, runtime *geminiStreamRuntime, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) { + defer func() { _ = resp.Body.Close() }() + initialType := "text" + if thinkingEnabled { + initialType = "thinking" + } + streamengine.ConsumeSSE(streamengine.ConsumeConfig{ + Context: ctx, + Body: resp.Body, + ThinkingEnabled: thinkingEnabled, + InitialType: initialType, + KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, + IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, + MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, + }, streamengine.ConsumeHooks{ + OnParsed: runtime.onParsed, + OnFinalize: func(_ streamengine.StopReason, _ error) { + }, + }) + terminalWritten := runtime.finalize(allowDeferEmpty) + if terminalWritten { + return true, false + } + return false, true +} + //nolint:unused // retained for native Gemini stream handling path. func newGeminiStreamRuntime( w http.ResponseWriter, @@ -127,6 +204,35 @@ func (s *geminiStreamRuntime) sendChunk(payload map[string]any) { } } +func (s *geminiStreamRuntime) sendErrorChunk(status int, message string) { + msg := strings.TrimSpace(message) + if msg == "" { + msg = http.StatusText(status) + } + errorStatus := "INVALID_ARGUMENT" + switch status { + case http.StatusUnauthorized: + errorStatus = "UNAUTHENTICATED" + case http.StatusForbidden: + errorStatus = "PERMISSION_DENIED" + case http.StatusTooManyRequests: + errorStatus = "RESOURCE_EXHAUSTED" + case http.StatusNotFound: + errorStatus = "NOT_FOUND" + default: + if status >= 500 { + errorStatus = "INTERNAL" + } + } + s.sendChunk(map[string]any{ + "error": map[string]any{ + "code": status, + "message": msg, + "status": errorStatus, + }, + }) +} + //nolint:unused // retained for native Gemini stream handling path. func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision { if !parsed.Parsed { @@ -192,7 +298,7 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse } //nolint:unused // retained for native Gemini stream handling path. -func (s *geminiStreamRuntime) finalize() { +func (s *geminiStreamRuntime) finalize(deferEmptyOutput bool) bool { rawText, text, rawThinking, thinking, detectionThinking := s.accumulator.Snapshot() turn := assistantturn.BuildTurnFromStreamSnapshot(assistantturn.StreamSnapshot{ RawText: rawText, @@ -211,6 +317,19 @@ func (s *geminiStreamRuntime) finalize() { ToolsRaw: s.toolsRaw, }) outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) + if outcome.ShouldFail { + if deferEmptyOutput { + s.finalErrorStatus = outcome.Error.Status + s.finalErrorMessage = outcome.Error.Message + s.finalErrorCode = outcome.Error.Code + return false + } + if s.history != nil { + s.history.Error(outcome.Error.Status, outcome.Error.Message, outcome.Error.Code, responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), responsehistory.TextForArchive(turn.RawText, turn.Text)) + } + s.sendErrorChunk(outcome.Error.Status, outcome.Error.Message) + return true + } if s.history != nil { s.history.Success( http.StatusOK, @@ -257,4 +376,5 @@ func (s *geminiStreamRuntime) finalize() { "totalTokenCount": outcome.Usage.TotalTokens, }, }) + return true } diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 748a39b..40067ed 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -4,11 +4,11 @@ import ( "context" "io" "net/http" - "strings" "time" "ds2api/internal/assistantturn" "ds2api/internal/auth" + "ds2api/internal/completionruntime" "ds2api/internal/config" dsprotocol "ds2api/internal/deepseek/protocol" openaifmt "ds2api/internal/format/openai" @@ -17,148 +17,53 @@ import ( streamengine "ds2api/internal/stream" ) -type chatNonStreamResult struct { - rawThinking string - rawText string - thinking string - toolDetectionThinking string - text string - contentFilter bool - detectedCalls int - body map[string]any - finishReason string - responseMessageID int - outputError *assistantturn.OutputError -} - -func (r chatNonStreamResult) historyText() string { - return historyTextForArchive(r.rawText, r.text) -} - -func (r chatNonStreamResult) historyThinking() string { - return historyThinkingForArchive(r.rawThinking, r.toolDetectionThinking, r.thinking) -} - func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { - attempts := 0 - currentResp := resp - usagePrompt := finalPrompt - accumulatedThinking := "" - accumulatedRawThinking := "" - accumulatedToolDetectionThinking := "" - for { - result, ok := h.collectChatNonStreamAttempt(w, currentResp, completionID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames, toolsRaw) - if !ok { - return - } - accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking) - accumulatedRawThinking += sse.TrimContinuationOverlap(accumulatedRawThinking, result.rawThinking) - accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking) - result.thinking = accumulatedThinking - result.rawThinking = accumulatedRawThinking - result.toolDetectionThinking = accumulatedToolDetectionThinking - detected := detectAssistantToolCalls(result.rawText, result.text, result.rawThinking, result.toolDetectionThinking, toolNames) - result.detectedCalls = len(detected.Calls) - result.body = openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, result.thinking, result.text, detected.Calls, toolsRaw) - addRefFileTokensToUsage(result.body, refFileTokens) - result.finishReason = chatFinishReason(result.body) - if !shouldRetryChatNonStream(result, attempts) { - h.finishChatNonStreamResult(w, result, attempts, usagePrompt, refFileTokens, historySession) - return - } - - attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID) - retryPow, powErr := h.DS.GetPow(ctx, a, 3) - if powErr != nil { - config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", powErr) - retryPow = pow - } - retryPayload := clonePayloadForEmptyOutputRetry(payload, result.responseMessageID) - nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3) - if err != nil { - if historySession != nil { - historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.historyThinking(), result.historyText()) - } - writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") - config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err) - return - } - usagePrompt = usagePromptWithEmptyOutputRetry(usagePrompt, attempts) - currentResp = nextResp - } -} - -func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http.Response, completionID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any) (chatNonStreamResult, bool) { if resp.StatusCode != http.StatusOK { defer func() { _ = resp.Body.Close() }() body, _ := io.ReadAll(resp.Body) - writeOpenAIError(w, resp.StatusCode, string(body)) - return chatNonStreamResult{}, false - } - result := sse.CollectStream(resp, thinkingEnabled, true) - turn := assistantturn.BuildTurnFromCollected(result, assistantturn.BuildOptions{ - Model: model, - Prompt: usagePrompt, - SearchEnabled: searchEnabled, - ToolNames: toolNames, - ToolsRaw: toolsRaw, - }) - respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw) - return chatNonStreamResult{ - rawThinking: result.Thinking, - rawText: result.Text, - thinking: turn.Thinking, - toolDetectionThinking: result.ToolDetectionThinking, - text: turn.Text, - contentFilter: result.ContentFilter, - detectedCalls: len(turn.ToolCalls), - body: respBody, - finishReason: chatFinishReason(respBody), - responseMessageID: result.ResponseMessageID, - outputError: turn.Error, - }, true -} - -func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNonStreamResult, attempts int, usagePrompt string, refFileTokens int, historySession *chatHistorySession) { - if result.detectedCalls == 0 && strings.TrimSpace(result.text) == "" { - status, message, code := upstreamEmptyOutputDetail(result.contentFilter, result.text, result.thinking) - if result.outputError != nil { - status, message, code = result.outputError.Status, result.outputError.Message, result.outputError.Code - } if historySession != nil { - historySession.error(status, message, code, result.historyThinking(), result.historyText()) + historySession.error(resp.StatusCode, string(body), "error", "", "") } - writeOpenAIErrorWithCode(w, status, message, code) - config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter) + writeOpenAIError(w, resp.StatusCode, string(body)) return } - if historySession != nil { - historySession.success(http.StatusOK, result.historyThinking(), result.historyText(), result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens)) + stdReq := promptcompat.StandardRequest{ + Surface: "chat.completions", + ResponseModel: model, + PromptTokenText: finalPrompt, + FinalPrompt: finalPrompt, + RefFileTokens: refFileTokens, + Thinking: thinkingEnabled, + Search: searchEnabled, + ToolNames: toolNames, + ToolsRaw: toolsRaw, + ToolChoice: promptcompat.DefaultToolChoicePolicy(), } - writeJSON(w, http.StatusOK, result.body) - source := "first_attempt" - if attempts > 0 { - source = "synthetic_retry" - } - config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", source) -} - -func chatFinishReason(respBody map[string]any) string { - if choices, ok := respBody["choices"].([]map[string]any); ok && len(choices) > 0 { - if fr, _ := choices[0]["finish_reason"].(string); strings.TrimSpace(fr) != "" { - return fr + retryEnabled := h != nil && h.DS != nil && emptyOutputRetryEnabled() + result, outErr := completionruntime.ExecuteNonStreamStartedWithRetry(ctx, h.DS, a, completionruntime.StartResult{ + SessionID: completionID, + Payload: payload, + Pow: pow, + Response: resp, + Request: stdReq, + }, completionruntime.Options{ + RetryEnabled: retryEnabled, + RetryMaxAttempts: emptyOutputRetryMaxAttempts(), + }) + if outErr != nil { + if historySession != nil { + historySession.error(outErr.Status, outErr.Message, outErr.Code, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text)) } + writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) + return } - return "stop" -} - -func shouldRetryChatNonStream(result chatNonStreamResult, attempts int) bool { - return emptyOutputRetryEnabled() && - attempts < emptyOutputRetryMaxAttempts() && - !result.contentFilter && - result.detectedCalls == 0 && - strings.TrimSpace(result.text) == "" + respBody := openaifmt.BuildChatCompletionWithToolCalls(result.SessionID, model, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, toolsRaw) + respBody["usage"] = assistantturn.OpenAIChatUsage(result.Turn) + outcome := assistantturn.FinalizeTurn(result.Turn, assistantturn.FinalizeOptions{}) + if historySession != nil { + historySession.success(http.StatusOK, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text), outcome.FinishReason, assistantturn.OpenAIChatUsage(result.Turn)) + } + writeJSON(w, http.StatusOK, respBody) } func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { @@ -166,42 +71,35 @@ func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, if !ok { return } - attempts := 0 - currentResp := resp - for { - terminalWritten, retryable := h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, attempts < emptyOutputRetryMaxAttempts()) - if terminalWritten { - logChatStreamTerminal(streamRuntime, attempts) - return - } - if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { + completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ + Surface: "chat.completions", + Stream: true, + RetryEnabled: emptyOutputRetryEnabled(), + RetryMaxAttempts: emptyOutputRetryMaxAttempts(), + MaxAttempts: 3, + UsagePrompt: finalPrompt, + }, completionruntime.StreamRetryHooks{ + ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { + return h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, allowDeferEmpty) + }, + Finalize: func(attempts int) { streamRuntime.finalize("stop", false) recordChatStreamHistory(streamRuntime, historySession) config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none") - return - } - attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) - retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) - if powErr != nil { - config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", powErr) - retryPow = pow - } - nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) - if err != nil { - failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error") - config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err) - return - } - if nextResp.StatusCode != http.StatusOK { - defer func() { _ = nextResp.Body.Close() }() - body, _ := io.ReadAll(nextResp.Body) - failChatStreamRetry(streamRuntime, historySession, nextResp.StatusCode, string(body), "error") - return - } - streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) - currentResp = nextResp - } + }, + ParentMessageID: func() int { + return streamRuntime.responseMessageID + }, + OnRetryPrompt: func(prompt string) { + streamRuntime.finalPrompt = prompt + }, + OnRetryFailure: func(status int, message, code string) { + failChatStreamRetry(streamRuntime, historySession, status, message, code) + }, + OnTerminal: func(attempts int) { + logChatStreamTerminal(streamRuntime, attempts) + }, + }) } func (h *Handler) prepareChatStreamRuntime(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) (*chatStreamRuntime, string, bool) { diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index f3b4584..da0ad4a 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -106,10 +106,6 @@ func cleanVisibleOutput(text string, stripReferenceMarkers bool) string { return shared.CleanVisibleOutput(text, stripReferenceMarkers) } -func upstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, string, string) { - return shared.UpstreamEmptyOutputDetail(contentFilter, text, thinking) -} - func emptyOutputRetryEnabled() bool { return shared.EmptyOutputRetryEnabled() } @@ -118,14 +114,6 @@ func emptyOutputRetryMaxAttempts() int { return shared.EmptyOutputRetryMaxAttempts() } -func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any { - return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID) -} - -func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { - return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts) -} - func formatIncrementalStreamToolCallDeltas(deltas []toolstream.ToolCallDelta, ids map[int]string) []map[string]any { return shared.FormatIncrementalStreamToolCallDeltas(deltas, ids) } @@ -137,7 +125,3 @@ func filterIncrementalToolCallDeltasByAllowed(deltas []toolstream.ToolCallDelta, func formatFinalStreamToolCallsWithStableIDs(calls []toolcall.ParsedToolCall, ids map[int]string, toolsRaw any) []map[string]any { return shared.FormatFinalStreamToolCallsWithStableIDs(calls, ids, toolsRaw) } - -func detectAssistantToolCalls(rawText, visibleText, exposedThinking, detectionThinking string, toolNames []string) toolcall.ToolCallParseResult { - return shared.DetectAssistantToolCalls(rawText, visibleText, exposedThinking, detectionThinking, toolNames) -} diff --git a/internal/httpapi/openai/chat/handler_toolcall_test.go b/internal/httpapi/openai/chat/handler_toolcall_test.go index 446b480..a42d7d4 100644 --- a/internal/httpapi/openai/chat/handler_toolcall_test.go +++ b/internal/httpapi/openai/chat/handler_toolcall_test.go @@ -85,8 +85,7 @@ func streamFinishReason(frames []map[string]any) string { return "" } -// Backward-compatible alias for historical test name used in CI logs. -func TestHandleNonStreamReturns429WhenUpstreamOutputEmpty(t *testing.T) { +func TestHandleNonStreamSingleAttemptReturns503WhenUpstreamOutputEmpty(t *testing.T) { h := &Handler{} resp := makeSSEHTTPResponse( `data: {"p":"response/content","v":""}`, @@ -95,17 +94,17 @@ func TestHandleNonStreamReturns429WhenUpstreamOutputEmpty(t *testing.T) { rec := httptest.NewRecorder() h.handleNonStream(rec, resp, "cid-empty", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, nil) - if rec.Code != http.StatusTooManyRequests { - t.Fatalf("expected status 429 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String()) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected status 503 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String()) } out := decodeJSONBody(t, rec.Body.String()) errObj, _ := out["error"].(map[string]any) - if asString(errObj["code"]) != "upstream_empty_output" { - t.Fatalf("expected code=upstream_empty_output, got %#v", out) + if asString(errObj["code"]) != "upstream_unavailable" { + t.Fatalf("expected code=upstream_unavailable, got %#v", out) } } -func TestHandleNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) { +func TestHandleNonStreamSingleAttemptReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) { h := &Handler{} resp := makeSSEHTTPResponse( `data: {"code":"content_filter"}`, @@ -124,7 +123,7 @@ func TestHandleNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutp } } -func TestHandleNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testing.T) { +func TestHandleNonStreamSingleAttemptReturns429WhenUpstreamHasOnlyThinking(t *testing.T) { h := &Handler{} resp := makeSSEHTTPResponse( `data: {"p":"response/thinking_content","v":"Only thinking"}`, diff --git a/internal/httpapi/openai/chat/ref_file_tokens.go b/internal/httpapi/openai/chat/ref_file_tokens.go deleted file mode 100644 index e5da36a..0000000 --- a/internal/httpapi/openai/chat/ref_file_tokens.go +++ /dev/null @@ -1,26 +0,0 @@ -package chat - -// addRefFileTokensToUsage adds inline-uploaded file token estimates to an existing -// usage map inside a response object. This keeps the token accounting aware of file -// content that the upstream model processes but that is not part of the prompt text. -func addRefFileTokensToUsage(obj map[string]any, refFileTokens int) { - if refFileTokens <= 0 || obj == nil { - return - } - usage, ok := obj["usage"].(map[string]any) - if !ok || usage == nil { - return - } - for _, key := range []string{"input_tokens", "prompt_tokens"} { - if v, ok := usage[key]; ok { - if n, ok := v.(int); ok { - usage[key] = n + refFileTokens - } - } - } - if v, ok := usage["total_tokens"]; ok { - if n, ok := v.(int); ok { - usage["total_tokens"] = n + refFileTokens - } - } -} diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index b0cb205..80422f5 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -7,6 +7,7 @@ import ( "time" "ds2api/internal/auth" + "ds2api/internal/completionruntime" "ds2api/internal/config" dsprotocol "ds2api/internal/deepseek/protocol" "ds2api/internal/promptcompat" @@ -19,41 +20,34 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http. if !ok { return } - attempts := 0 - currentResp := resp - for { - terminalWritten, retryable := h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, attempts < emptyOutputRetryMaxAttempts()) - if terminalWritten { - logResponsesStreamTerminal(streamRuntime, attempts) - return - } - if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { + completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ + Surface: "responses", + Stream: true, + RetryEnabled: emptyOutputRetryEnabled(), + RetryMaxAttempts: emptyOutputRetryMaxAttempts(), + MaxAttempts: 3, + UsagePrompt: finalPrompt, + }, completionruntime.StreamRetryHooks{ + ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { + return h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, allowDeferEmpty) + }, + Finalize: func(attempts int) { streamRuntime.finalize("stop", false) config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) - return - } - attempts++ - config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) - retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) - if powErr != nil { - config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", powErr) - retryPow = pow - } - nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) - if err != nil { - streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error") - config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err) - return - } - if nextResp.StatusCode != http.StatusOK { - defer func() { _ = nextResp.Body.Close() }() - body, _ := io.ReadAll(nextResp.Body) - streamRuntime.failResponse(nextResp.StatusCode, strings.TrimSpace(string(body)), "error") - return - } - streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) - currentResp = nextResp - } + }, + ParentMessageID: func() int { + return streamRuntime.responseMessageID + }, + OnRetryPrompt: func(prompt string) { + streamRuntime.finalPrompt = prompt + }, + OnRetryFailure: func(status int, message, code string) { + streamRuntime.failResponse(status, strings.TrimSpace(message), code) + }, + OnTerminal: func(attempts int) { + logResponsesStreamTerminal(streamRuntime, attempts) + }, + }) } func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) (*responsesStreamRuntime, string, bool) { diff --git a/internal/httpapi/openai/responses/handler.go b/internal/httpapi/openai/responses/handler.go index 445c6f5..da8e2e1 100644 --- a/internal/httpapi/openai/responses/handler.go +++ b/internal/httpapi/openai/responses/handler.go @@ -103,14 +103,6 @@ func emptyOutputRetryMaxAttempts() int { return shared.EmptyOutputRetryMaxAttempts() } -func clonePayloadForEmptyOutputRetry(payload map[string]any, parentMessageID int) map[string]any { - return shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID) -} - -func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string { - return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts) -} - func filterIncrementalToolCallDeltasByAllowed(deltas []toolstream.ToolCallDelta, seenNames map[int]string) []toolstream.ToolCallDelta { return shared.FilterIncrementalToolCallDeltasByAllowed(deltas, seenNames) } diff --git a/internal/httpapi/openai/responses/responses_stream_test.go b/internal/httpapi/openai/responses/responses_stream_test.go index fa06bd5..dac0e54 100644 --- a/internal/httpapi/openai/responses/responses_stream_test.go +++ b/internal/httpapi/openai/responses/responses_stream_test.go @@ -397,7 +397,7 @@ func TestHandleResponsesNonStreamRequiredToolChoiceIgnoresThinkingToolPayloadWhe } } -func TestHandleResponsesNonStreamReturns429WhenUpstreamOutputEmpty(t *testing.T) { +func TestHandleResponsesNonStreamSingleAttemptReturns503WhenUpstreamOutputEmpty(t *testing.T) { h := &Handler{} rec := httptest.NewRecorder() resp := &http.Response{ @@ -409,17 +409,17 @@ func TestHandleResponsesNonStreamReturns429WhenUpstreamOutputEmpty(t *testing.T) } h.handleResponsesNonStream(rec, resp, "owner-a", "resp_test", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, promptcompat.DefaultToolChoicePolicy(), "") - if rec.Code != http.StatusTooManyRequests { - t.Fatalf("expected 429 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String()) + if rec.Code != http.StatusServiceUnavailable { + t.Fatalf("expected 503 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String()) } out := decodeJSONBody(t, rec.Body.String()) errObj, _ := out["error"].(map[string]any) - if asString(errObj["code"]) != "upstream_empty_output" { - t.Fatalf("expected code=upstream_empty_output, got %#v", out) + if asString(errObj["code"]) != "upstream_unavailable" { + t.Fatalf("expected code=upstream_unavailable, got %#v", out) } } -func TestHandleResponsesNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) { +func TestHandleResponsesNonStreamSingleAttemptReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) { h := &Handler{} rec := httptest.NewRecorder() resp := &http.Response{ @@ -441,7 +441,7 @@ func TestHandleResponsesNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWi } } -func TestHandleResponsesNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testing.T) { +func TestHandleResponsesNonStreamSingleAttemptReturns429WhenUpstreamHasOnlyThinking(t *testing.T) { h := &Handler{} rec := httptest.NewRecorder() resp := &http.Response{ diff --git a/internal/httpapi/openai/shared/upstream_empty.go b/internal/httpapi/openai/shared/upstream_empty.go index d2e396c..3660f78 100644 --- a/internal/httpapi/openai/shared/upstream_empty.go +++ b/internal/httpapi/openai/shared/upstream_empty.go @@ -17,7 +17,7 @@ func UpstreamEmptyOutputDetail(contentFilter bool, text, thinking string) (int, if thinking != "" { return http.StatusTooManyRequests, "Upstream account hit a rate limit and returned reasoning without visible output.", "upstream_empty_output" } - return http.StatusTooManyRequests, "Upstream account hit a rate limit and returned empty output.", "upstream_empty_output" + return http.StatusServiceUnavailable, "Upstream service is unavailable and returned no output.", "upstream_unavailable" } func WriteUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string, contentFilter bool) bool { diff --git a/internal/httpapi/openai/stream_status_test.go b/internal/httpapi/openai/stream_status_test.go index 3c11d57..2e54f3d 100644 --- a/internal/httpapi/openai/stream_status_test.go +++ b/internal/httpapi/openai/stream_status_test.go @@ -274,12 +274,12 @@ func TestChatCompletionsStreamEmitsFailureFrameWhenUpstreamOutputEmpty(t *testin } last := frames[0] statusCode, ok := last["status_code"].(float64) - if !ok || int(statusCode) != http.StatusTooManyRequests { - t.Fatalf("expected status_code=429, got %#v body=%s", last["status_code"], rec.Body.String()) + if !ok || int(statusCode) != http.StatusServiceUnavailable { + t.Fatalf("expected status_code=503, got %#v body=%s", last["status_code"], rec.Body.String()) } errObj, _ := last["error"].(map[string]any) - if asString(errObj["code"]) != "upstream_empty_output" { - t.Fatalf("expected code=upstream_empty_output, got %#v", last) + if asString(errObj["code"]) != "upstream_unavailable" { + t.Fatalf("expected code=upstream_unavailable, got %#v", last) } } @@ -345,7 +345,7 @@ func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) { func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"response_message_id":99}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":99,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -496,7 +496,7 @@ func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) { func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { ds := &streamStatusDSSeqStub{resps: []*http.Response{ - makeOpenAISSEHTTPResponse(`data: {"response_message_id":88}`, "data: [DONE]"), + makeOpenAISSEHTTPResponse(`data: {"response_message_id":88,"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"), makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"), }} h := &openAITestSurface{ @@ -537,8 +537,15 @@ func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) { if len(content) == 0 { t.Fatalf("expected content entries, got %#v", item) } - textEntry, _ := content[0].(map[string]any) - if asString(textEntry["type"]) != "output_text" || asString(textEntry["text"]) != "visible" { + var textEntry map[string]any + for _, entry := range content { + obj, _ := entry.(map[string]any) + if asString(obj["type"]) == "output_text" { + textEntry = obj + break + } + } + if asString(textEntry["text"]) != "visible" { t.Fatalf("expected visible text entry, got %#v", content) } } diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index f28598e..9a9bb0b 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -641,9 +641,9 @@ function upstreamEmptyOutputDetail(contentFilter, _text, thinking) { }; } return { - status: 429, - message: 'Upstream account hit a rate limit and returned empty output.', - code: 'upstream_empty_output', + status: 503, + message: 'Upstream service is unavailable and returned no output.', + code: 'upstream_unavailable', }; } diff --git a/internal/js/helpers/stream-tool-sieve/parse_payload.js b/internal/js/helpers/stream-tool-sieve/parse_payload.js index 658db88..8c3714b 100644 --- a/internal/js/helpers/stream-tool-sieve/parse_payload.js +++ b/internal/js/helpers/stream-tool-sieve/parse_payload.js @@ -1,6 +1,6 @@ 'use strict'; -const CDATA_PATTERN = /^$/i; +const CDATA_PATTERN = /^|>)$/i; const XML_ATTR_PATTERN = /\b([a-z0-9_:-]+)\s*=\s*("([^"]*)"|'([^']*)')/gi; const TOOL_MARKUP_NAMES = [ { raw: 'tool_calls', canonical: 'tool_calls' }, @@ -102,9 +102,10 @@ function updateCDATAStateLine(inCDATA, line) { let state = inCDATA; while (pos < lower.length) { if (state) { - const end = lower.indexOf(']]>', pos); + const cdataEnd = findCDATAEnd(lower, pos); + const end = cdataEnd.index; if (end < 0) return true; - pos = end + ']]>'.length; + pos = end + cdataEnd.len; state = false; continue; } @@ -252,8 +253,9 @@ function replaceDSMLToolMarkupOutsideIgnored(text) { const tag = scanToolMarkupTagAt(raw, i); if (tag) { if (tag.dsmlLike) { - out += `<${tag.closing ? '/' : ''}${tag.name}${raw.slice(tag.nameEnd, tag.end + 1)}`; - if (raw[tag.end] !== '>') { + const tail = normalizeToolMarkupTagTailForXML(raw.slice(tag.nameEnd, tag.end + 1)); + out += `<${tag.closing ? '/' : ''}${tag.name}${tail}`; + if (!tail.endsWith('>')) { out += '>'; } } else { @@ -409,11 +411,12 @@ function findMatchingXmlEndTagOutsideCDATA(text, tag, from) { function skipXmlIgnoredSection(lower, i) { if (lower.startsWith('', i + ''.length }; + return { advanced: true, blocked: false, next: end + cdataEnd.len }; } if (lower.startsWith('', i + '") if end < 0 { @@ -227,15 +228,26 @@ func skipXMLIgnoredSection(text string, i int) (next int, advanced bool, blocked } func hasASCIIPrefixFoldAt(text string, start int, prefix string) bool { - if start < 0 || len(text)-start < len(prefix) { - return false + _, ok := matchASCIIPrefixFoldAt(text, start, prefix) + return ok +} + +func matchASCIIPrefixFoldAt(text string, start int, prefix string) (int, bool) { + if start < 0 || start >= len(text) && prefix != "" { + return 0, false } + idx := start for j := 0; j < len(prefix); j++ { - if asciiLower(text[start+j]) != asciiLower(prefix[j]) { - return false + if idx >= len(text) { + return 0, false } + ch, size := normalizedASCIIAt(text, idx) + if size <= 0 || asciiLower(ch) != asciiLower(prefix[j]) { + return 0, false + } + idx += size } - return true + return idx - start, true } func asciiLower(b byte) byte { @@ -266,15 +278,14 @@ func findToolCDATAEnd(text string, from int) int { if from < 0 || from >= len(text) { return -1 } - const closeMarker = "]]>" firstNonFenceEnd := -1 for searchFrom := from; searchFrom < len(text); { - rel := strings.Index(text[searchFrom:], closeMarker) - if rel < 0 { + end := indexToolCDATAClose(text, searchFrom) + if end < 0 { break } - end := searchFrom + rel - searchFrom = end + len(closeMarker) + closeLen := toolCDATACloseLenAt(text, end) + searchFrom = end + closeLen if cdataOffsetIsInsideMarkdownFence(text[from:end]) { continue } @@ -288,6 +299,31 @@ func findToolCDATAEnd(text string, from int) int { return firstNonFenceEnd } +func indexToolCDATAClose(text string, from int) int { + if from < 0 { + from = 0 + } + asciiIdx := strings.Index(text[from:], "]]>") + fullIdx := strings.Index(text[from:], "]]>") + if asciiIdx < 0 && fullIdx < 0 { + return -1 + } + if asciiIdx < 0 { + return from + fullIdx + } + if fullIdx < 0 || asciiIdx < fullIdx { + return from + asciiIdx + } + return from + fullIdx +} + +func toolCDATACloseLenAt(text string, idx int) int { + if strings.HasPrefix(text[idx:], "]]>") { + return len("]]>") + } + return len("]]>") +} + func cdataEndLooksStructural(text string, after int) bool { for after < len(text) { switch { @@ -327,22 +363,29 @@ func cdataOffsetIsInsideMarkdownFence(fragment string) bool { } func findXMLTagEnd(text string, from int) int { - quote := byte(0) - for i := maxInt(from, 0); i < len(text); i++ { - ch := text[i] + quote := rune(0) + for i := maxInt(from, 0); i < len(text); { + r, size := utf8.DecodeRuneInString(text[i:]) + if r == utf8.RuneError && size == 0 { + break + } + ch := normalizeFullwidthASCII(r) if quote != 0 { if ch == quote { quote = 0 } + i += size continue } if ch == '"' || ch == '\'' { quote = ch + i += size continue } if ch == '>' { - return i + return i + size - 1 } + i += size } return -1 } @@ -355,7 +398,8 @@ func hasXMLTagBoundary(text string, idx int) bool { case ' ', '\t', '\n', '\r', '>', '/': return true default: - return false + r, _ := utf8.DecodeRuneInString(text[idx:]) + return normalizeFullwidthASCII(r) == '>' } } diff --git a/internal/toolcall/toolcalls_scan.go b/internal/toolcall/toolcalls_scan.go index a28a188..8accb1a 100644 --- a/internal/toolcall/toolcalls_scan.go +++ b/internal/toolcall/toolcalls_scan.go @@ -1,6 +1,9 @@ package toolcall -import "strings" +import ( + "strings" + "unicode/utf8" +) type toolMarkupNameAlias struct { raw string @@ -184,7 +187,7 @@ func scanToolMarkupTagAt(text string, start int) (ToolMarkupTag, bool) { } func IsPartialToolMarkupTagPrefix(text string) bool { - if text == "" || text[0] != '<' || strings.Contains(text, ">") { + if text == "" || text[0] != '<' || strings.Contains(text, ">") || strings.Contains(text, ">") { return false } i := 1 @@ -236,9 +239,10 @@ func consumeToolMarkupNamePrefixOnce(text string, idx int) (int, bool) { return idx + 1, true } if hasASCIIPrefixFoldAt(text, idx, "dsml") { - next := idx + len("dsml") - if next < len(text) && (text[next] == '-' || text[next] == '_') { - next++ + dsmlLen, _ := matchASCIIPrefixFoldAt(text, idx, "dsml") + next := idx + dsmlLen + if sep, size := normalizedASCIIAt(text, next); sep == '-' || sep == '_' { + next += size } return next, true } @@ -249,12 +253,17 @@ func consumeToolMarkupNamePrefixOnce(text string, idx int) (int, bool) { } func consumeArbitraryToolMarkupNamePrefix(text string, idx int) (int, bool) { - if idx < 0 || idx >= len(text) || !isToolMarkupPrefixSegmentByte(text[idx]) { + nextSegment, ok := consumeToolMarkupPrefixSegment(text, idx) + if !ok { return idx, false } - j := idx + 1 - for j < len(text) && isToolMarkupPrefixSegmentByte(text[j]) { - j++ + j := nextSegment + for { + nextSegment, ok = consumeToolMarkupPrefixSegment(text, j) + if !ok { + break + } + j = nextSegment } k := j for k < len(text) && (text[k] == ' ' || text[k] == '\t' || text[k] == '\r' || text[k] == '\n') { @@ -262,8 +271,8 @@ func consumeArbitraryToolMarkupNamePrefix(text string, idx int) (int, bool) { } next, ok := consumeToolMarkupPipe(text, k) if !ok { - if k < len(text) && (text[k] == '_' || text[k] == '-') { - next = k + 1 + if sep, size := normalizedASCIIAt(text, k); sep == '_' || sep == '-' { + next = k + size ok = true } } @@ -279,21 +288,32 @@ func consumeArbitraryToolMarkupNamePrefix(text string, idx int) (int, bool) { return next, true } -func isToolMarkupPrefixSegmentByte(b byte) bool { - return (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') || (b >= '0' && b <= '9') +func consumeToolMarkupPrefixSegment(text string, idx int) (int, bool) { + ch, size := normalizedASCIIAt(text, idx) + if size <= 0 { + return idx, false + } + if (ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') || (ch >= '0' && ch <= '9') { + return idx + size, true + } + return idx, false } func hasASCIIPartialPrefixFoldAt(text string, start int, prefix string) bool { - remain := len(text) - start - if remain <= 0 || remain > len(prefix) { + if start < 0 || start >= len(text) { return false } - for j := 0; j < remain; j++ { - if asciiLower(text[start+j]) != asciiLower(prefix[j]) { + idx := start + matched := 0 + for matched < len(prefix) && idx < len(text) { + ch, size := normalizedASCIIAt(text, idx) + if size <= 0 || asciiLower(ch) != asciiLower(prefix[matched]) { return false } + idx += size + matched++ } - return true + return matched > 0 && matched < len(prefix) && idx == len(text) } func hasToolMarkupNamePrefix(text string, start int) bool { @@ -313,8 +333,8 @@ func matchToolMarkupName(text string, start int, dsmlLike bool) (string, int) { if name.dsmlOnly && !dsmlLike { continue } - if hasASCIIPrefixFoldAt(text, start, name.raw) { - return name.canonical, len(name.raw) + if nameLen, ok := matchASCIIPrefixFoldAt(text, start, name.raw); ok { + return name.canonical, nameLen } } return "", 0 @@ -341,6 +361,29 @@ func hasToolMarkupBoundary(text string, idx int) bool { case ' ', '\t', '\n', '\r', '>', '/': return true default: - return false + r, _ := utf8.DecodeRuneInString(text[idx:]) + return normalizeFullwidthASCII(r) == '>' } } + +func normalizedASCIIAt(text string, idx int) (byte, int) { + if idx < 0 || idx >= len(text) { + return 0, 0 + } + r, size := utf8.DecodeRuneInString(text[idx:]) + if r == utf8.RuneError && size == 0 { + return 0, 0 + } + normalized := normalizeFullwidthASCII(r) + if normalized > 0x7f { + return 0, 0 + } + return byte(normalized), size +} + +func normalizeFullwidthASCII(r rune) rune { + if r >= '!' && r <= '~' { + return r - 0xFEE0 + } + return r +} diff --git a/internal/toolcall/toolcalls_test.go b/internal/toolcall/toolcalls_test.go index e19c318..42129c8 100644 --- a/internal/toolcall/toolcalls_test.go +++ b/internal/toolcall/toolcalls_test.go @@ -111,6 +111,27 @@ func TestParseToolCallsSupportsArbitraryPrefixedToolMarkup(t *testing.T) { } } +func TestParseToolCallsSupportsFullwidthDSMLShell(t *testing.T) { + text := `<dSML|tool_calls> + <dSML|invoke name="Read"> + <dSML|parameter name="file_path"> + + <dSML|invoke name="Read"> + <dSML|parameter name="file_path"> + +` + calls := ParseToolCalls(text, []string{"Read"}) + if len(calls) != 2 { + t.Fatalf("expected two fullwidth DSML calls, got %#v", calls) + } + if calls[0].Name != "Read" || calls[0].Input["file_path"] != "/Users/aq/Desktop/myproject/Personal_Blog/README.md" { + t.Fatalf("unexpected first fullwidth DSML call: %#v", calls[0]) + } + if calls[1].Name != "Read" || calls[1].Input["file_path"] != "/Users/aq/Desktop/myproject/Personal_Blog/index.html" { + t.Fatalf("unexpected second fullwidth DSML call: %#v", calls[1]) + } +} + func TestParseToolCallsIgnoresBareHyphenatedToolCallsLookalike(t *testing.T) { text := `pwd` calls := ParseToolCalls(text, []string{"Bash"}) diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index cf49fa1..1146113 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -187,9 +187,9 @@ test('vercel stream emits Go-parity empty-output failure on DONE', async () => { const { frames } = await runMockVercelStream(['data: [DONE]\n\n']); assert.equal(frames.length, 2); const failed = JSON.parse(frames[0]); - assert.equal(failed.status_code, 429); - assert.equal(failed.error.type, 'rate_limit_error'); - assert.equal(failed.error.code, 'upstream_empty_output'); + assert.equal(failed.status_code, 503); + assert.equal(failed.error.type, 'service_unavailable_error'); + assert.equal(failed.error.code, 'upstream_unavailable'); assert.equal(frames[1], '[DONE]'); }); @@ -209,6 +209,21 @@ test('vercel stream retries empty output once and keeps one terminal frame', asy assert.match(completionBodies[1].prompt, /Previous reply had no visible output\. Please regenerate the visible final answer or tool call now\.$/); }); +test('vercel stream retries thinking-only output once', async () => { + const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([ + ['data: {"response_message_id":42,"p":"response/thinking_content","v":"plan"}\n\n', 'data: [DONE]\n\n'], + ['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n'], + ], { thinking_enabled: true }); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + const completionBodies = fetchBodies.filter((body) => Object.hasOwn(body, 'prompt')); + assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 2); + assert.equal(frames.filter((frame) => frame === '[DONE]').length, 1); + assert.equal(completionBodies[1].parent_message_id, 42); + assert.equal(parsed[0].choices[0].delta.reasoning_content, 'plan'); + assert.equal(parsed[1].choices[0].delta.content, 'visible'); + assert.equal(parsed[2].choices[0].finish_reason, 'stop'); +}); + test('vercel stream coalesces many small content deltas while keeping one choice', async () => { const lines = Array.from({ length: 100 }, () => `data: ${JSON.stringify({ p: 'response/content', v: '字' })}\n\n`); lines.push('data: [DONE]\n\n'); diff --git a/tests/node/stream-tool-sieve.test.js b/tests/node/stream-tool-sieve.test.js index ccc5e5d..ccfff1c 100644 --- a/tests/node/stream-tool-sieve.test.js +++ b/tests/node/stream-tool-sieve.test.js @@ -112,6 +112,23 @@ test('parseToolCalls parses arbitrary-prefixed tool markup shells', () => { } }); +test('parseToolCalls parses fullwidth DSML shell drift', () => { + const payload = `<dSML|tool_calls> + <dSML|invoke name="Read"> + <dSML|parameter name="file_path"> + + <dSML|invoke name="Read"> + <dSML|parameter name="file_path"> + +`; + const calls = parseToolCalls(payload, ['Read']); + assert.equal(calls.length, 2); + assert.equal(calls[0].name, 'Read'); + assert.deepEqual(calls[0].input, { file_path: '/Users/aq/Desktop/myproject/Personal_Blog/README.md' }); + assert.equal(calls[1].name, 'Read'); + assert.deepEqual(calls[1].input, { file_path: '/Users/aq/Desktop/myproject/Personal_Blog/index.html' }); +}); + test('parseToolCalls ignores bare hyphenated tool_calls lookalike', () => { const payload = 'pwd'; const calls = parseToolCalls(payload, ['Bash']);