diff --git a/API.en.md b/API.en.md index 6e93202..e892df0 100644 --- a/API.en.md +++ b/API.en.md @@ -42,6 +42,7 @@ Docs: [Overview](README.en.md) / [Architecture](docs/ARCHITECTURE.en.md) / [Depl - Adapter responsibilities are streamlined to: **request normalization → DeepSeek invocation → protocol-shaped rendering**, reducing legacy split-logic paths. - Tool-calling semantics are aligned between Go and Node runtime: models should output the DSML shell `<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`; DS2API also accepts legacy canonical XML `` → `` → ``. DSML is normalized back to XML at the parser entry, so internal parsing remains XML-based, with stream-time anti-leak filtering. - `Admin API` separates static config from runtime policy: `/admin/config*` for configuration state, `/admin/settings*` for runtime behavior. +- When upstream returns a thinking-only response with no visible text, the Go main path for both streaming and non-streaming completions retries once in the same DeepSeek session: it appends the prompt suffix `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` and sets `parent_message_id`. If that same-account retry would still end as `429 upstream_empty_output`, managed-account mode switches to the next available account, creates a fresh session, and retries the original payload once before returning 429. --- @@ -84,7 +85,7 @@ Two header formats accepted: - Token is in `config.keys` → **Managed account mode**: DS2API auto-selects an account via rotation - Token is not in `config.keys` → **Direct token mode**: treated as a DeepSeek token directly -**Optional header**: `X-Ds2-Target-Account: ` — Pin a specific managed account; if the target account does not exist or the managed-account queue is exhausted, the request returns `429`, and current responses do not include `Retry-After`. If the account exists but login/refresh fails, the request returns the underlying `401` or upstream error. +**Optional header**: `X-Ds2-Target-Account: ` — Pin a specific managed account; if the target account does not exist or the managed-account queue is exhausted, the request returns `429`, and current responses do not include `Retry-After`. If the account exists but login/refresh fails, the request returns the underlying `401` or upstream error. Without a pinned target, managed-account completion requests try one alternate-account fresh retry before returning an empty-output 429; pinned-target requests and requests with no other available account do not switch. Gemini-compatible clients can also send `x-goog-api-key`, `?key=`, or `?api_key=` as the caller credential source. ### Admin Endpoints (`/admin/*`) @@ -1258,7 +1259,7 @@ Clients should handle HTTP status code plus `error` / `detail` fields. | Code | Meaning | | --- | --- | | `401` | Authentication failed (invalid key/token, or expired admin JWT) | -| `429` | Too many requests (exceeded inflight + queue capacity; current responses do not include `Retry-After`) | +| `429` | Too many requests (exceeded inflight + queue capacity, or upstream thinking-only output with no visible answer; managed-account mode first tries one alternate-account fresh retry; current responses do not include `Retry-After`) | | `503` | Model unavailable or upstream error | --- diff --git a/API.md b/API.md index 86772f7..e50c0f1 100644 --- a/API.md +++ b/API.md @@ -42,7 +42,7 @@ - 适配器层职责收敛为:**请求归一化 → DeepSeek 调用 → 协议形态渲染**,减少历史版本中“同能力多处实现”的分叉。 - Tool Calling 的解析策略在 Go 与 Node Runtime 间保持一致:推荐模型输出 DSML 外壳 `<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`;兼容层也接受 DSML wrapper 别名 ``、`<|tool_calls>`、`<|tool_calls>`、常见 DSML 分隔符漏写形态(如 `<|DSML tool_calls>`)、`DSML` 与工具标签名黏连的常见 typo(如 ``)、控制分隔符漂移(如 `` / 原始 STX `\x02`)、任意协议前缀壳(如 ``),以及旧式 canonical XML `` → `` → ``。实现上采用结构扫描:只要固定本地标签名是 `tool_calls` / `invoke` / `parameter`,前缀壳会在解析入口归一化;只有 `tool_calls` wrapper 或可修复的缺失 opening wrapper 会进入工具路径,裸 `` 不计为已支持语法;流式场景继续执行防泄漏筛分。若参数体本身是合法 JSON 字面量(如 `123`、`true`、`null`、数组或对象),会按结构化值输出,不再一律当作字符串;若 CDATA 偶发漏闭合,则会在最终 parse / flush 恢复阶段做窄修复,尽量保住已完整包裹的外层工具调用。 - `Admin API` 将配置与运行时策略分开:`/admin/config*` 管静态配置,`/admin/settings*` 管运行时行为。 -- 当上游返回 thinking-only 响应(模型输出了推理链但无可见文本)时,非流式补全会自动重试一次:以多轮对话 follow-up 方式追加 prompt 后缀 `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` 并设置 `parent_message_id` 在同一 DeepSeek session 内让模型重新输出;重试最大 1 次。 +- 当上游返回 thinking-only 响应(模型输出了推理链但无可见文本)时,Go 主路径的流式与非流式补全都会先自动重试一次:以多轮对话 follow-up 方式追加 prompt 后缀 `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` 并设置 `parent_message_id` 在同一 DeepSeek session 内让模型重新输出;同账号重试最大 1 次。若同账号重试后仍即将返回 `429 upstream_empty_output`,托管账号模式会在返回 429 前自动切换到下一个可用账号,新建 session,用原始 payload 再 fresh retry 一次。 - 引用标记处理边界:流式输出默认隐藏 `[citation:N]` / `[reference:N]` 这类上游内部占位符;非流式输出默认把 DeepSeek 搜索引用标记转换为 Markdown 引用链接。 --- @@ -86,7 +86,7 @@ Vercel 一键部署可先只填 `DS2API_ADMIN_KEY`,部署后在 `/admin` 导 - token 在 `config.keys` 中 → **托管账号模式**,自动轮询选择账号 - token 不在 `config.keys` 中 → **直通 token 模式**,直接作为 DeepSeek token 使用 -**可选请求头**:`X-Ds2-Target-Account: ` — 指定使用某个托管账号;如果目标账号不存在,或管理账号队列已耗尽,相关业务请求会返回 `429`,当前不会附带 `Retry-After` 头。若账号存在但登录/刷新失败,则返回对应的 `401` 或上游错误。 +**可选请求头**:`X-Ds2-Target-Account: ` — 指定使用某个托管账号;如果目标账号不存在,或管理账号队列已耗尽,相关业务请求会返回 `429`,当前不会附带 `Retry-After` 头。若账号存在但登录/刷新失败,则返回对应的 `401` 或上游错误。未指定目标账号时,托管账号模式的 completion 空输出 429 会先尝试切到另一个可用账号 fresh retry 一次;指定目标账号或无其他可用账号时不会切号。 Gemini 兼容客户端还可以使用 `x-goog-api-key`、`?key=` 或 `?api_key=` 作为凭据来源。 ### Admin 接口(`/admin/*`) @@ -1271,7 +1271,7 @@ Gemini 路由使用 Google 风格错误结构: | 状态码 | 说明 | | --- | --- | | `401` | 鉴权失败(key/token 无效,或 Admin JWT 过期) | -| `429` | 请求过多(超出并发上限 + 等待队列;当前不附带 `Retry-After` 头) | +| `429` | 请求过多(超出并发上限 + 等待队列,或上游账号 thinking-only 后仍无可见输出;托管账号模式会先尝试一次切号 fresh retry;当前不附带 `Retry-After` 头) | | `503` | 模型不可用或上游服务异常 | --- diff --git a/README.MD b/README.MD index 3edf3b8..eac8da2 100644 --- a/README.MD +++ b/README.MD @@ -350,6 +350,7 @@ go run ./cmd/ds2api 可选请求头 `X-Ds2-Target-Account`:指定使用某个托管账号(值为 email 或 mobile)。 如果指定账号不存在,或者当前管理账号队列已满,请求会返回 `429`;当前 `429` 不附带 `Retry-After` 头。若账号存在但登录/刷新失败,则返回对应的鉴权错误。 +未指定目标账号时,如果 completion 因上游 thinking-only 空输出在同账号补偿重试后仍将返回 `429 upstream_empty_output`,托管账号模式会自动切到下一个可用账号,新建 session,并用原始 payload 再 fresh retry 一次。 Gemini 路由还可以使用 `x-goog-api-key`,或在没有认证头时使用 `?key=` / `?api_key=` 作为调用方凭据。 ## 并发模型 @@ -363,6 +364,7 @@ Gemini 路由还可以使用 `x-goog-api-key`,或在没有认证头时使用 ` - 当 in-flight 槽位满时,请求进入等待队列,**不会立即 429** - 超出总承载上限后才返回 `429 Too Many Requests`,当前响应不附带 `Retry-After` +- completion 空输出类 429 会先做同账号补偿重试;托管账号模式还会在最终返回 429 前切到另一个可用账号 fresh retry 一次 - `GET /admin/queue/status` 返回实时并发状态 ## Tool Call 适配 diff --git a/README.en.md b/README.en.md index 62503b6..470ccc1 100644 --- a/README.en.md +++ b/README.en.md @@ -336,6 +336,7 @@ For business endpoints (`/v1/*`, `/anthropic/*`, Gemini routes), DS2API supports | **Direct token** | If the token is not in `config.keys`, DS2API treats it as a DeepSeek token directly | Optional header `X-Ds2-Target-Account`: Pin a specific managed account (value is email or mobile). +When no target account is pinned, if a completion would end as `429 upstream_empty_output` after the same-account empty-output retry, managed-account mode switches to the next available account, creates a fresh session, and retries the original payload once. Gemini routes also accept `x-goog-api-key`, or `?key=` / `?api_key=` when no auth header is present. ## Concurrency Model @@ -349,6 +350,7 @@ Queue limit = DS2API_ACCOUNT_MAX_QUEUE (default = recommended concurrency) - When inflight slots are full, requests enter a waiting queue — **no immediate 429** - 429 is returned only when total load exceeds inflight + queue capacity +- Completion empty-output 429s first get the same-account compensation retry; managed-account mode also tries one alternate-account fresh retry before returning the final 429 - `GET /admin/queue/status` returns real-time concurrency state ## Tool Call Adaptation diff --git a/docs/ARCHITECTURE.en.md b/docs/ARCHITECTURE.en.md index aa93142..8a97f95 100644 --- a/docs/ARCHITECTURE.en.md +++ b/docs/ARCHITECTURE.en.md @@ -27,7 +27,7 @@ ds2api/ │ ├── claudeconv/ # Claude message conversion helpers │ ├── compat/ # Compatibility and regression helpers │ ├── assistantturn/ # Upstream output to canonical assistant turn / stream event semantics -│ ├── completionruntime/ # Shared Go DeepSeek completion startup, non-stream collection, and retry +│ ├── completionruntime/ # Shared Go DeepSeek completion startup, collection, empty-output/account-switch retry │ ├── config/ # Config loading/validation/hot reload │ ├── deepseek/ # DeepSeek upstream client/protocol/transport │ │ ├── client/ # Login/session/completion/upload/delete calls @@ -191,7 +191,7 @@ flowchart LR - `internal/httpapi/requestbody`: shared HTTP body reading, JSON pre-validation, and UTF-8 error helpers across protocol adapters. - `internal/promptcompat`: compatibility core for turning OpenAI/Claude/Gemini requests into DeepSeek web-chat plain-text context. - `internal/assistantturn`: Go output-side canonical semantics, converting DeepSeek SSE collection results and stream finalization state into assistant turns and centralizing thinking, tool call, citation, usage, stop/error behavior. -- `internal/completionruntime`: shared Go completion execution helpers for DeepSeek session/PoW/call startup, non-stream collection, and empty-output retry; streaming paths use it to start upstream requests, continue to use `internal/stream` for real-time consumption, and use `assistantturn` during finalization. +- `internal/completionruntime`: shared Go completion execution helpers for DeepSeek session/PoW/call startup, non-stream collection, empty-output retry, and one managed-account fresh retry before a final 429; streaming paths use it to start upstream requests, continue to use `internal/stream` for real-time consumption, and use `assistantturn` during finalization. - `internal/translatorcliproxy`: bridge compatibility layer for Claude/Gemini and OpenAI shape translation; it is not the main business protocol conversion center. - `internal/deepseek/{client,protocol,transport}`: upstream requests, sessions, PoW adaptation, protocol constants, and transport details. - `internal/js/chat-stream` + `api/chat-stream.js`: Vercel Node streaming bridge; Go prepare/release owns auth, account lease, and completion payload assembly, while Node relays real-time SSE with Go-aligned finalization and tool sieve semantics. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a4da59e..1b889bf 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -27,7 +27,7 @@ ds2api/ │ ├── claudeconv/ # Claude 消息格式转换工具 │ ├── compat/ # 兼容性辅助与回归支持 │ ├── assistantturn/ # 上游输出到统一 assistant turn / stream event 的语义层 -│ ├── completionruntime/ # Go 主路径共享 DeepSeek completion 启动、非流式收集与 retry +│ ├── completionruntime/ # Go 主路径共享 DeepSeek completion 启动、收集、空输出/切号 retry │ ├── config/ # 配置加载、校验、热更新 │ ├── deepseek/ # DeepSeek 上游 client/protocol/transport │ │ ├── client/ # 登录、会话、completion、上传/删除等上游调用 @@ -191,7 +191,7 @@ flowchart LR - `internal/httpapi/requestbody`:跨协议复用的请求体读取、JSON 解码前置校验与 UTF-8 错误处理辅助。 - `internal/promptcompat`:OpenAI/Claude/Gemini 请求到 DeepSeek 网页纯文本上下文的兼容内核。 - `internal/assistantturn`:Go 输出侧统一语义层,把 DeepSeek SSE 收集结果和流式收尾状态归一成 assistant turn,集中处理 thinking、tool call、citation、usage、stop/error 语义。 -- `internal/completionruntime`:Go surface 共享的 completion 执行辅助,负责 DeepSeek session/PoW/call 启动、非流式 collect 和 empty-output retry;流式路径复用它启动上游请求,继续用 `internal/stream` 做实时消费,并在最终收尾阶段接入 `assistantturn`。 +- `internal/completionruntime`:Go surface 共享的 completion 执行辅助,负责 DeepSeek session/PoW/call 启动、非流式 collect、empty-output retry,以及托管账号在最终 429 前的一次切号 fresh retry;流式路径复用它启动上游请求,继续用 `internal/stream` 做实时消费,并在最终收尾阶段接入 `assistantturn`。 - `internal/translatorcliproxy`:Claude/Gemini 与 OpenAI 结构互转的桥接兼容层,不作为主业务协议转换中心。 - `internal/deepseek/{client,protocol,transport}`:上游请求、会话、PoW 适配、协议常量与传输层。 - `internal/js/chat-stream` + `api/chat-stream.js`:Vercel Node 流式桥;Go prepare/release 管理鉴权、账号租约和 completion payload,Node 侧负责实时 SSE 转发并保持 Go 对齐的终结态和 tool sieve 语义。 diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 3dfaa45..2a5dc2f 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -112,7 +112,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools` - Vercel Node 流式路径本轮不迁移,仍使用现有 Node bridge / stream-tool-sieve 实现;后续若变更 Node 流式语义,需要按 `assistantturn` 的 Go canonical 输出语义同步对齐。 - 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。未显式关闭时,各 surface 会按解析后的 DeepSeek 模型默认能力开启 thinking,并用各自协议的原生形态暴露:OpenAI Chat 为 `reasoning_content`,OpenAI Responses 为 `response.reasoning.delta` / `reasoning` content,Claude 为 `thinking` block / `thinking_delta`,Gemini 为 `thought: true` part。 - 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;真正的工具识别始终基于原始上游文本,而不是基于“已经做过可见输出清洗”的版本,因此即使最终可见层会剥离完整 leaked DSML / XML `tool_calls` wrapper、并抑制全空参数或无效 wrapper 块,也不会影响真实工具调用转成结构化 `tool_calls` / `function_call`。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。 -- OpenAI Chat / Responses、Claude Messages、Gemini generateContent 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。Go 主路径的非流式重试由 `completionruntime.ExecuteNonStreamWithRetry` 统一处理;流式重试由 `completionruntime.ExecuteStreamWithRetry` 统一处理,各协议 runtime 只负责消费/渲染本协议 SSE framing。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍没有任何输出,终端错误为 503 `upstream_unavailable`;若有 reasoning 但没有可见正文或工具调用,仍返回 429 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW。 +- OpenAI Chat / Responses、Claude Messages、Gemini generateContent 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。Go 主路径的非流式重试由 `completionruntime.ExecuteNonStreamWithRetry` 统一处理;流式重试由 `completionruntime.ExecuteStreamWithRetry` 统一处理,各协议 runtime 只负责消费/渲染本协议 SSE framing。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该同账号重试不会重新标准化消息、不会新建 session,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若同账号补偿重试后即将返回 429 `upstream_empty_output`,并且当前是托管账号模式,Go 主路径会在返回 429 前切换到下一个可用账号,新建 `chat_session_id`,使用原始 completion payload 再做一次 fresh retry;该切号重试不携带空回复 prompt 后缀,也不设置上一账号的 `parent_message_id`。如果没有可切换账号,或切号后的 fresh retry 仍没有可见正文或工具调用,则继续按原错误返回:无任何输出为 503 `upstream_unavailable`,有 reasoning 但没有可见正文或工具调用为 429 `upstream_empty_output`。若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW;切号 fresh retry 目前由 Go 主路径提供。 - 非流式 OpenAI Chat / Responses、Claude Messages、Gemini generateContent 在最终可见正文渲染阶段,会把 DeepSeek 搜索返回中的 `[citation:N]` / `[reference:N]` 标记替换成对应 Markdown 链接。`citation` 标记按一基序号解析;`reference` 标记只有在同一段正文中出现 `[reference:0]`(允许冒号后有空格)时才按零基序号映射,并且不会影响同段正文里的 `citation` 标记。 - 流式输出仍默认隐藏 `[citation:N]` / `[reference:N]` 这类上游内部标记,避免分片输出中泄漏尚未完成映射的引用占位符。 diff --git a/internal/auth/auth_edge_test.go b/internal/auth/auth_edge_test.go index 0dad649..73e970d 100644 --- a/internal/auth/auth_edge_test.go +++ b/internal/auth/auth_edge_test.go @@ -241,6 +241,36 @@ func TestSwitchAccountSkipsLoginFailureAndContinues(t *testing.T) { } } +func TestSwitchAccountRespectsPinnedTargetAccount(t *testing.T) { + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["managed-key"], + "accounts":[ + {"email":"acc1@test.com","token":"t1"}, + {"email":"acc2@test.com","token":"t2"} + ] + }`) + store := config.LoadStore() + pool := account.NewPool(store) + r := NewResolver(store, pool, func(_ context.Context, _ config.Account) (string, error) { + return "new-token", nil + }) + + req, _ := http.NewRequest("POST", "/", nil) + req.Header.Set("Authorization", "Bearer managed-key") + req.Header.Set("X-Ds2-Target-Account", "acc1@test.com") + a, err := r.Determine(req) + if err != nil { + t.Fatalf("determine failed: %v", err) + } + defer r.Release(a) + if r.SwitchAccount(context.Background(), a) { + t.Fatal("expected switch to be disabled for pinned target account") + } + if a.AccountID != "acc1@test.com" { + t.Fatalf("expected pinned account to remain selected, got %q", a.AccountID) + } +} + // ─── Release edge cases ───────────────────────────────────────────── func TestReleaseNilAuth(t *testing.T) { diff --git a/internal/auth/request.go b/internal/auth/request.go index e6a0d88..fd84a12 100644 --- a/internal/auth/request.go +++ b/internal/auth/request.go @@ -28,6 +28,7 @@ type RequestAuth struct { DeepSeekToken string CallerID string AccountID string + TargetAccount string Account config.Account TriedAccounts map[string]bool resolver *Resolver @@ -99,6 +100,7 @@ func (r *Resolver) acquireManagedRequestAuth(ctx context.Context, callerID, targ UseConfigToken: true, CallerID: callerID, AccountID: acc.Identifier(), + TargetAccount: target, Account: acc, TriedAccounts: tried, resolver: r, @@ -185,6 +187,9 @@ func (r *Resolver) SwitchAccount(ctx context.Context, a *RequestAuth) bool { if !a.UseConfigToken { return false } + if strings.TrimSpace(a.TargetAccount) != "" { + return false + } if a.TriedAccounts == nil { a.TriedAccounts = map[string]bool{} } @@ -208,6 +213,13 @@ func (r *Resolver) SwitchAccount(ctx context.Context, a *RequestAuth) bool { } } +func (a *RequestAuth) SwitchAccount(ctx context.Context) bool { + if a == nil || a.resolver == nil { + return false + } + return a.resolver.SwitchAccount(ctx, a) +} + func (r *Resolver) Release(a *RequestAuth) { if a == nil || !a.UseConfigToken || a.AccountID == "" { return diff --git a/internal/completionruntime/nonstream.go b/internal/completionruntime/nonstream.go index ee31c0b..921d3b4 100644 --- a/internal/completionruntime/nonstream.go +++ b/internal/completionruntime/nonstream.go @@ -104,6 +104,7 @@ func ExecuteNonStreamStartedWithRetry(ctx context.Context, ds DeepSeekCaller, a pow := start.Pow attempts := 0 + accountSwitchAttempted := false currentResp := start.Response usagePrompt := stdReq.PromptTokenText accumulatedThinking := "" @@ -112,6 +113,24 @@ func ExecuteNonStreamStartedWithRetry(ctx context.Context, ds DeepSeekCaller, a for { turn, outErr := collectAttempt(currentResp, stdReq, usagePrompt, opts) if outErr != nil { + if canRetryOnAlternateAccount(ctx, a, outErr, opts.RetryEnabled, &accountSwitchAttempted) { + switched, switchErr := startStandardCompletionOnAlternateAccount(ctx, ds, a, stdReq, maxAttempts) + if switchErr != nil { + return NonStreamResult{SessionID: sessionID, Payload: payload, Attempts: attempts}, switchErr + } + if switched.Response != nil { + config.Logger.Info("[completion_runtime_account_switch_retry] retrying after 429", "surface", stdReq.Surface, "stream", false, "account", a.AccountID) + sessionID = switched.SessionID + payload = switched.Payload + pow = switched.Pow + currentResp = switched.Response + usagePrompt = stdReq.PromptTokenText + accumulatedThinking = "" + accumulatedRawThinking = "" + accumulatedToolDetectionThinking = "" + continue + } + } return NonStreamResult{SessionID: sessionID, Payload: payload, Attempts: attempts}, outErr } accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, turn.Thinking) @@ -134,6 +153,24 @@ func ExecuteNonStreamStartedWithRetry(ctx context.Context, ds DeepSeekCaller, a retryMax = shared.EmptyOutputRetryMaxAttempts() } if !opts.RetryEnabled || !assistantturn.ShouldRetryEmptyOutput(turn, attempts, retryMax) { + if canRetryOnAlternateAccount(ctx, a, turn.Error, opts.RetryEnabled, &accountSwitchAttempted) { + switched, switchErr := startStandardCompletionOnAlternateAccount(ctx, ds, a, stdReq, maxAttempts) + if switchErr != nil { + return NonStreamResult{SessionID: sessionID, Payload: payload, Turn: turn, Attempts: attempts}, switchErr + } + if switched.Response != nil { + config.Logger.Info("[completion_runtime_account_switch_retry] retrying after 429", "surface", stdReq.Surface, "stream", false, "account", a.AccountID) + sessionID = switched.SessionID + payload = switched.Payload + pow = switched.Pow + currentResp = switched.Response + usagePrompt = stdReq.PromptTokenText + accumulatedThinking = "" + accumulatedRawThinking = "" + accumulatedToolDetectionThinking = "" + continue + } + } return NonStreamResult{SessionID: sessionID, Payload: payload, Turn: turn, Attempts: attempts}, turn.Error } @@ -154,6 +191,37 @@ func ExecuteNonStreamStartedWithRetry(ctx context.Context, ds DeepSeekCaller, a } } +func canRetryOnAlternateAccount(ctx context.Context, a *auth.RequestAuth, outErr *assistantturn.OutputError, retryEnabled bool, attempted *bool) bool { + if outErr == nil || outErr.Status != http.StatusTooManyRequests { + return false + } + if !retryEnabled || attempted == nil || *attempted { + return false + } + if a == nil || !a.UseConfigToken { + return false + } + *attempted = true + return a.SwitchAccount(ctx) +} + +func startStandardCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, maxAttempts int) (StartResult, *assistantturn.OutputError) { + sessionID, err := ds.CreateSession(ctx, a, maxAttempts) + if err != nil { + return StartResult{}, authOutputError(a) + } + pow, err := ds.GetPow(ctx, a, maxAttempts) + if err != nil { + return StartResult{SessionID: sessionID}, &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Failed to get PoW (invalid token or unknown error).", Code: "error"} + } + payload := stdReq.CompletionPayload(sessionID) + resp, err := ds.CallCompletion(ctx, a, payload, pow, maxAttempts) + if err != nil { + return StartResult{SessionID: sessionID, Payload: payload, Pow: pow}, &assistantturn.OutputError{Status: http.StatusInternalServerError, Message: "Failed to get completion.", Code: "error"} + } + return StartResult{SessionID: sessionID, Payload: payload, Pow: pow, Response: resp, Request: stdReq}, nil +} + func collectAttempt(resp *http.Response, stdReq promptcompat.StandardRequest, usagePrompt string, opts Options) (assistantturn.Turn, *assistantturn.OutputError) { defer func() { if err := resp.Body.Close(); err != nil { diff --git a/internal/completionruntime/nonstream_test.go b/internal/completionruntime/nonstream_test.go index 36461ad..7c5959a 100644 --- a/internal/completionruntime/nonstream_test.go +++ b/internal/completionruntime/nonstream_test.go @@ -7,15 +7,19 @@ import ( "strings" "testing" + "ds2api/internal/account" "ds2api/internal/auth" + "ds2api/internal/config" dsclient "ds2api/internal/deepseek/client" "ds2api/internal/promptcompat" ) type fakeDeepSeekCaller struct { - responses []*http.Response - payloads []map[string]any - uploads []dsclient.UploadFileRequest + responses []*http.Response + payloads []map[string]any + uploads []dsclient.UploadFileRequest + completionAccounts []string + sessionByAccount bool } type currentInputRuntimeConfig struct{} @@ -23,7 +27,10 @@ type currentInputRuntimeConfig struct{} func (currentInputRuntimeConfig) CurrentInputFileEnabled() bool { return true } func (currentInputRuntimeConfig) CurrentInputFileMinChars() int { return 0 } -func (f *fakeDeepSeekCaller) CreateSession(context.Context, *auth.RequestAuth, int) (string, error) { +func (f *fakeDeepSeekCaller) CreateSession(_ context.Context, a *auth.RequestAuth, _ int) (string, error) { + if f.sessionByAccount && a != nil && a.AccountID != "" { + return "session-" + a.AccountID, nil + } return "session-1", nil } @@ -36,8 +43,11 @@ func (f *fakeDeepSeekCaller) UploadFile(_ context.Context, _ *auth.RequestAuth, return &dsclient.UploadFileResult{ID: "file-runtime-1"}, nil } -func (f *fakeDeepSeekCaller) CallCompletion(_ context.Context, _ *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) { +func (f *fakeDeepSeekCaller) CallCompletion(_ context.Context, a *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) { f.payloads = append(f.payloads, payload) + if a != nil { + f.completionAccounts = append(f.completionAccounts, a.AccountID) + } if len(f.responses) == 0 { return sseHTTPResponse(http.StatusOK, `data: {"p":"response/content","v":"fallback"}`), nil } @@ -89,6 +99,69 @@ func TestExecuteNonStreamWithRetryBuildsCanonicalTurn(t *testing.T) { } } +func TestExecuteNonStreamWithRetrySwitchesManagedAccountBeforeFinal429(t *testing.T) { + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["managed-key"], + "accounts":[ + {"email":"acc1@test.com","password":"pwd"}, + {"email":"acc2@test.com","password":"pwd"} + ] + }`) + store := config.LoadStore() + resolver := auth.NewResolver(store, account.NewPool(store), func(_ context.Context, acc config.Account) (string, error) { + return "token-" + acc.Identifier(), nil + }) + req, _ := http.NewRequest(http.MethodPost, "/", nil) + req.Header.Set("Authorization", "Bearer managed-key") + a, err := resolver.Determine(req) + if err != nil { + t.Fatalf("determine failed: %v", err) + } + defer resolver.Release(a) + + ds := &fakeDeepSeekCaller{ + sessionByAccount: true, + responses: []*http.Response{ + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":11,"p":"response/thinking_content","v":"first empty"}`), + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":12,"p":"response/thinking_content","v":"retry empty"}`), + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":21,"p":"response/content","v":"ok from second account"}`), + }, + } + stdReq := promptcompat.StandardRequest{ + Surface: "test", + ResponseModel: "deepseek-v4-flash", + PromptTokenText: "prompt", + FinalPrompt: "final prompt", + Thinking: true, + } + + result, outErr := ExecuteNonStreamWithRetry(context.Background(), ds, a, stdReq, Options{RetryEnabled: true}) + if outErr != nil { + t.Fatalf("unexpected output error after account switch retry: %#v", outErr) + } + if result.Turn.Text != "ok from second account" { + t.Fatalf("text mismatch after switch retry: %q", result.Turn.Text) + } + if result.SessionID != "session-acc2@test.com" { + t.Fatalf("expected switched account session, got %q", result.SessionID) + } + wantAccounts := []string{"acc1@test.com", "acc1@test.com", "acc2@test.com"} + if len(ds.completionAccounts) != len(wantAccounts) { + t.Fatalf("completion account count mismatch: got %v want %v", ds.completionAccounts, wantAccounts) + } + for i, want := range wantAccounts { + if ds.completionAccounts[i] != want { + t.Fatalf("completion account %d = %q want %q (all=%v)", i, ds.completionAccounts[i], want, ds.completionAccounts) + } + } + if got := ds.payloads[2]["chat_session_id"]; got != "session-acc2@test.com" { + t.Fatalf("switched payload session mismatch: %#v", got) + } + if prompt, _ := ds.payloads[2]["prompt"].(string); strings.Contains(prompt, "Previous reply had no visible output") { + t.Fatalf("expected fresh switched-account prompt without empty-output suffix, got %q", prompt) + } +} + func TestExecuteNonStreamWithRetryUsesParentMessageForEmptyRetry(t *testing.T) { ds := &fakeDeepSeekCaller{responses: []*http.Response{ sseHTTPResponse(http.StatusOK, `data: {"response_message_id":77,"p":"response/thinking_content","v":"plan"}`), diff --git a/internal/completionruntime/stream_retry.go b/internal/completionruntime/stream_retry.go index 27a7ebc..03c9dc7 100644 --- a/internal/completionruntime/stream_retry.go +++ b/internal/completionruntime/stream_retry.go @@ -6,6 +6,7 @@ import ( "net/http" "strings" + "ds2api/internal/assistantturn" "ds2api/internal/auth" "ds2api/internal/config" "ds2api/internal/httpapi/openai/shared" @@ -27,6 +28,7 @@ type StreamRetryHooks struct { OnRetry func(attempts int) OnRetryPrompt func(prompt string) OnRetryFailure func(status int, message, code string) + OnAccountSwitch func(sessionID string) OnTerminal func(attempts int) } @@ -48,16 +50,48 @@ func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.Requ } attempts := 0 + accountSwitchAttempted := false currentResp := initialResp + currentPayload := clonePayload(payload) for { - terminalWritten, retryable := hooks.ConsumeAttempt(currentResp, opts.RetryEnabled && attempts < retryMax) + allowAccountSwitch := opts.RetryEnabled && attempts >= retryMax && !accountSwitchAttempted && a != nil && a.UseConfigToken + terminalWritten, retryable := hooks.ConsumeAttempt(currentResp, opts.RetryEnabled && (attempts < retryMax || allowAccountSwitch)) if terminalWritten { if hooks.OnTerminal != nil { hooks.OnTerminal(attempts) } return } - if !retryable || !opts.RetryEnabled || attempts >= retryMax { + if !retryable || !opts.RetryEnabled { + if hooks.Finalize != nil { + hooks.Finalize(attempts) + } + return + } + + if attempts >= retryMax { + if canRetryOnAlternateAccount(ctx, a, &assistantturn.OutputError{Status: http.StatusTooManyRequests}, opts.RetryEnabled, &accountSwitchAttempted) { + switched, switchErr := startPayloadCompletionOnAlternateAccount(ctx, ds, a, payload, maxAttempts) + if switchErr != nil { + if hooks.OnRetryFailure != nil { + hooks.OnRetryFailure(switchErr.Status, switchErr.Message, switchErr.Code) + } + return + } + if switched.Response != nil { + config.Logger.Info("[completion_runtime_account_switch_retry] retrying after 429", "surface", surface, "stream", opts.Stream, "account", a.AccountID) + currentResp = switched.Response + currentPayload = switched.Payload + pow = switched.Pow + if hooks.OnAccountSwitch != nil { + hooks.OnAccountSwitch(switched.SessionID) + } + if hooks.OnRetryPrompt != nil { + hooks.OnRetryPrompt(opts.UsagePrompt) + } + continue + } + } if hooks.Finalize != nil { hooks.Finalize(attempts) } @@ -75,7 +109,7 @@ func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.Requ config.Logger.Warn("[completion_runtime_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", powErr) retryPow = pow } - nextResp, err := ds.CallCompletion(ctx, a, shared.ClonePayloadForEmptyOutputRetry(payload, parentMessageID), retryPow, maxAttempts) + nextResp, err := ds.CallCompletion(ctx, a, shared.ClonePayloadForEmptyOutputRetry(currentPayload, parentMessageID), retryPow, maxAttempts) if err != nil { if hooks.OnRetryFailure != nil { hooks.OnRetryFailure(http.StatusInternalServerError, "Failed to get completion.", "error") @@ -108,6 +142,33 @@ func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.Requ } } +func startPayloadCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, payload map[string]any, maxAttempts int) (StartResult, *assistantturn.OutputError) { + sessionID, err := ds.CreateSession(ctx, a, maxAttempts) + if err != nil { + return StartResult{}, authOutputError(a) + } + pow, err := ds.GetPow(ctx, a, maxAttempts) + if err != nil { + return StartResult{SessionID: sessionID}, &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Failed to get PoW (invalid token or unknown error).", Code: "error"} + } + nextPayload := clonePayload(payload) + nextPayload["chat_session_id"] = sessionID + delete(nextPayload, "parent_message_id") + resp, err := ds.CallCompletion(ctx, a, nextPayload, pow, maxAttempts) + if err != nil { + return StartResult{SessionID: sessionID, Payload: nextPayload, Pow: pow}, &assistantturn.OutputError{Status: http.StatusInternalServerError, Message: "Failed to get completion.", Code: "error"} + } + return StartResult{SessionID: sessionID, Payload: nextPayload, Pow: pow, Response: resp}, nil +} + +func clonePayload(payload map[string]any) map[string]any { + clone := make(map[string]any, len(payload)) + for k, v := range payload { + clone[k] = v + } + return clone +} + func closeRetryBody(surface string, body io.Closer) { if body == nil { return diff --git a/internal/completionruntime/stream_retry_test.go b/internal/completionruntime/stream_retry_test.go index 7340dca..655016c 100644 --- a/internal/completionruntime/stream_retry_test.go +++ b/internal/completionruntime/stream_retry_test.go @@ -7,7 +7,9 @@ import ( "strings" "testing" + "ds2api/internal/account" "ds2api/internal/auth" + "ds2api/internal/config" "ds2api/internal/httpapi/openai/shared" ) @@ -60,3 +62,89 @@ func TestExecuteStreamWithRetryUsesSharedRetryPayloadAndUsagePrompt(t *testing.T t.Fatalf("expected retry suffix in usage prompt, got %q", retryPrompt) } } + +func TestExecuteStreamWithRetrySwitchesManagedAccountBeforeFinal429(t *testing.T) { + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["managed-key"], + "accounts":[ + {"email":"acc1@test.com","password":"pwd"}, + {"email":"acc2@test.com","password":"pwd"} + ] + }`) + store := config.LoadStore() + resolver := auth.NewResolver(store, account.NewPool(store), func(_ context.Context, acc config.Account) (string, error) { + return "token-" + acc.Identifier(), nil + }) + req, _ := http.NewRequest(http.MethodPost, "/", nil) + req.Header.Set("Authorization", "Bearer managed-key") + a, err := resolver.Determine(req) + if err != nil { + t.Fatalf("determine failed: %v", err) + } + defer resolver.Release(a) + + ds := &fakeDeepSeekCaller{ + sessionByAccount: true, + responses: []*http.Response{ + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":12,"p":"response/thinking_content","v":"retry empty"}`), + sseHTTPResponse(http.StatusOK, `data: {"response_message_id":21,"p":"response/content","v":"ok from second account"}`), + }, + } + initial := sseHTTPResponse(http.StatusOK, `data: {"response_message_id":11,"p":"response/thinking_content","v":"first empty"}`) + payload := map[string]any{"prompt": "original prompt", "chat_session_id": "session-acc1@test.com"} + attemptsSeen := 0 + switchedSession := "" + + ExecuteStreamWithRetry(context.Background(), ds, a, initial, payload, "pow", StreamRetryOptions{ + Surface: "test.stream", + Stream: true, + RetryEnabled: true, + RetryMaxAttempts: 1, + UsagePrompt: "original prompt", + }, StreamRetryHooks{ + ConsumeAttempt: func(resp *http.Response, allowDeferEmpty bool) (bool, bool) { + defer func() { + if err := resp.Body.Close(); err != nil { + t.Fatalf("close failed: %v", err) + } + }() + body, _ := io.ReadAll(resp.Body) + attemptsSeen++ + if strings.Contains(string(body), "ok from second account") { + return true, false + } + if !allowDeferEmpty { + t.Fatalf("expected empty attempt %d to be deferred before final 429", attemptsSeen) + } + return false, true + }, + ParentMessageID: func() int { + return 11 + attemptsSeen + }, + OnAccountSwitch: func(sessionID string) { + switchedSession = sessionID + }, + }) + + if attemptsSeen != 3 { + t.Fatalf("expected three stream attempts, got %d", attemptsSeen) + } + if switchedSession != "session-acc2@test.com" { + t.Fatalf("expected switched session id, got %q", switchedSession) + } + wantAccounts := []string{"acc1@test.com", "acc2@test.com"} + if len(ds.completionAccounts) != len(wantAccounts) { + t.Fatalf("completion accounts mismatch: got %v want %v", ds.completionAccounts, wantAccounts) + } + for i, want := range wantAccounts { + if ds.completionAccounts[i] != want { + t.Fatalf("completion account %d = %q want %q (all=%v)", i, ds.completionAccounts[i], want, ds.completionAccounts) + } + } + if got := ds.payloads[1]["chat_session_id"]; got != "session-acc2@test.com" { + t.Fatalf("switched payload session mismatch: %#v", got) + } + if prompt, _ := ds.payloads[1]["prompt"].(string); strings.Contains(prompt, shared.EmptyOutputRetrySuffix) { + t.Fatalf("expected switched-account prompt without empty-output suffix, got %q", prompt) + } +} diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 40067ed..1dc8ca9 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -66,7 +66,7 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co writeJSON(w, http.StatusOK, respBody) } -func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { +func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID string, sessionIDRef *string, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { streamRuntime, initialType, ok := h.prepareChatStreamRuntime(w, resp, completionID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, historySession) if !ok { return @@ -96,6 +96,11 @@ func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, OnRetryFailure: func(status int, message, code string) { failChatStreamRetry(streamRuntime, historySession, status, message, code) }, + OnAccountSwitch: func(sessionID string) { + if sessionIDRef != nil { + *sessionIDRef = sessionID + } + }, OnTerminal: func(attempts int) { logChatStreamTerminal(streamRuntime, attempts) }, diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index 61703a0..9d86cf7 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -114,7 +114,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { } streamReq := start.Request refFileTokens := streamReq.RefFileTokens - h.handleStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, sessionID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, historySession) + h.handleStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, sessionID, &sessionID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, historySession) } func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) {