diff --git a/API.en.md b/API.en.md index bf8c42d..c5b8f9b 100644 --- a/API.en.md +++ b/API.en.md @@ -42,7 +42,7 @@ Docs: [Overview](README.en.md) / [Architecture](docs/ARCHITECTURE.en.md) / [Depl - Adapter responsibilities are streamlined to: **request normalization → DeepSeek invocation → protocol-shaped rendering**, reducing legacy split-logic paths. - Tool-calling semantics are aligned between Go and Node runtime: models should output the halfwidth-pipe DSML shell `<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`; DS2API also accepts DSML wrapper aliases such as `` and `<|tool_calls>`, common DSML separator drift such as `<|DSML tool_calls>`, collapsed DSML local names such as ``, control-separator drift such as `` / raw STX `\x02`, CJK angle bracket, fullwidth-bang / ideographic-comma separator drift, PascalCase local-name drift, and trailing attribute separator drift such as `...〈/DSM|parameter〉`, `<!DSML!invoke name=“Bash”>`, `<、DSML、tool_calls>`, ``, or ``, arbitrary protocol prefixes such as ``, and legacy canonical XML `` → `` → ``. The scanner normalizes fixed local names (`tool_calls` / `invoke` / `parameter`) with non-structural separators before or after them back to XML before parsing, and also tolerates CDATA opener drift such as `<![CDATA[` / `<、[CDATA[`; only wrapped tool blocks or the narrow missing-opening-wrapper repair path enter the tool path, while bare `` does not count as supported syntax. JSON literal parameter bodies are preserved as structured values, explicit empty or whitespace-only parameters are preserved as empty strings, malformed complete wrappers are released as plain text, and loose CDATA is narrowly repaired at final parse/flush when it can preserve a complete outer tool call. - `Admin API` separates static config from runtime policy: `/admin/config*` for configuration state, `/admin/settings*` for runtime behavior. -- When upstream returns a thinking-only response with no visible text, the Go main path for both streaming and non-streaming completions retries once in the same DeepSeek session: it appends the prompt suffix `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` and sets `parent_message_id`. If that same-account retry would still end as `429 upstream_empty_output`, managed-account mode switches to the next available account, creates a fresh session, and retries the original payload once before returning 429. +- When upstream returns a thinking-only response with no visible text, the Go main path and the Vercel Node streaming path retry once in the same DeepSeek session: it appends the prompt suffix `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` and sets `parent_message_id`. If that same-account retry would still end as `429 upstream_empty_output`, managed-account mode switches to the next available account, creates a fresh session, and retries the original payload once before returning 429. - Citation/reference marker boundary: streaming output hides upstream `[citation:N]` / `[reference:N]` placeholders by default; non-stream output converts DeepSeek search reference markers into Markdown links. --- diff --git a/API.md b/API.md index 63b9a6d..63b4539 100644 --- a/API.md +++ b/API.md @@ -42,7 +42,7 @@ - 适配器层职责收敛为:**请求归一化 → DeepSeek 调用 → 协议形态渲染**,减少历史版本中“同能力多处实现”的分叉。 - Tool Calling 的解析策略在 Go 与 Node Runtime 间保持一致:推荐模型输出半角管道符 DSML 外壳 `<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`;兼容层也接受 DSML wrapper 别名 ``、`<|tool_calls>`、常见 DSML 分隔符漏写形态(如 `<|DSML tool_calls>`)、`DSML` 与工具标签名黏连的常见 typo(如 ``)、控制分隔符漂移(如 `` / 原始 STX `\x02`)、CJK 尖括号、全角感叹号、顿号、PascalCase 本地名、弯引号属性值与属性尾部分隔符漂移(如 `...〈/DSM|parameter〉` / `<!DSML!invoke name=“Bash”>` / `<、DSML、tool_calls>` / `` / ``)、任意协议前缀壳(如 ``),以及旧式 canonical XML `` → `` → ``。实现上采用结构扫描:只要固定本地标签名是 `tool_calls` / `invoke` / `parameter`,标签名前或标签名后的非结构性分隔符会在解析入口归一化;CDATA 开头也会容错 `<![CDATA[` / `<、[CDATA[` 这类分隔符漂移;只有 `tool_calls` wrapper 或可修复的缺失 opening wrapper 会进入工具路径,裸 `` 不计为已支持语法;流式场景继续执行防泄漏筛分。若参数体本身是合法 JSON 字面量(如 `123`、`true`、`null`、数组或对象),会按结构化值输出,不再一律当作字符串;显式空字符串和纯空白参数会结构化保留为空字符串,是否拒绝缺参由工具执行侧决定;完整但 malformed 的 wrapper 会作为普通文本释放,不会吞掉或伪造成工具调用;若 CDATA 偶发漏闭合,则会在最终 parse / flush 恢复阶段做窄修复,尽量保住已完整包裹的外层工具调用。 - `Admin API` 将配置与运行时策略分开:`/admin/config*` 管静态配置,`/admin/settings*` 管运行时行为。 -- 当上游返回 thinking-only 响应(模型输出了推理链但无可见文本)时,Go 主路径的流式与非流式补全都会先自动重试一次:以多轮对话 follow-up 方式追加 prompt 后缀 `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` 并设置 `parent_message_id` 在同一 DeepSeek session 内让模型重新输出;同账号重试最大 1 次。若同账号重试后仍即将返回 `429 upstream_empty_output`,托管账号模式会在返回 429 前自动切换到下一个可用账号,新建 session,用原始 payload 再 fresh retry 一次。 +- 当上游返回 thinking-only 响应(模型输出了推理链但无可见文本)时,Go 主路径与 Vercel Node 流式路径都会先自动重试一次:以多轮对话 follow-up 方式追加 prompt 后缀 `"Previous reply had no visible output. Please regenerate the visible final answer or tool call now."` 并设置 `parent_message_id` 在同一 DeepSeek session 内让模型重新输出;同账号重试最大 1 次。若同账号重试后仍即将返回 `429 upstream_empty_output`,托管账号模式会在返回 429 前自动切换到下一个可用账号,新建 session,用原始 payload 再 fresh retry 一次。 - 引用标记处理边界:流式输出默认隐藏 `[citation:N]` / `[reference:N]` 这类上游内部占位符;非流式输出默认把 DeepSeek 搜索引用标记转换为 Markdown 引用链接。 --- diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 1555b62..7b49865 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -112,7 +112,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools` - Vercel Node 流式路径本轮不迁移,仍使用现有 Node bridge / stream-tool-sieve 实现;后续若变更 Node 流式语义,需要按 `assistantturn` 的 Go canonical 输出语义同步对齐。 - 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。未显式关闭时,各 surface 会按解析后的 DeepSeek 模型默认能力开启 thinking,并用各自协议的原生形态暴露:OpenAI Chat 为 `reasoning_content`,OpenAI Responses 为 `response.reasoning.delta` / `reasoning` content,Claude 为 `thinking` block / `thinking_delta`,Gemini 为 `thought: true` part。 - 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;真正的工具识别始终基于原始上游文本,而不是基于“已经做过可见输出清洗”的版本。最终可见层会剥离已经成功解析成工具调用的完整 leaked DSML / XML `tool_calls` wrapper;如果遇到完整 wrapper 但内部形态不符合可执行工具调用语义(例如 `` 这类 malformed XML 工具壳),流式 sieve 会把该块作为普通文本释放,而不是吞掉或伪造成工具调用。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。 -- OpenAI Chat / Responses、Claude Messages、Gemini generateContent 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。Go 主路径的非流式重试由 `completionruntime.ExecuteNonStreamWithRetry` 统一处理;流式重试由 `completionruntime.ExecuteStreamWithRetry` 统一处理,各协议 runtime 只负责消费/渲染本协议 SSE framing。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该同账号重试不会重新标准化消息、不会新建 session,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若同账号补偿重试后即将返回 429 `upstream_empty_output`,并且当前是托管账号模式,Go 主路径会在返回 429 前切换到下一个可用账号,新建 `chat_session_id`,使用原始 completion payload 再做一次 fresh retry;该切号重试不携带空回复 prompt 后缀,也不设置上一账号的 `parent_message_id`。如果没有可切换账号,或切号后的 fresh retry 仍没有可见正文或工具调用,则继续按原错误返回:无任何输出为 503 `upstream_unavailable`,有 reasoning 但没有可见正文或工具调用为 429 `upstream_empty_output`。若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。JS Vercel 运行时同样设置 `parent_message_id`,但因无法直接调用 PoW API 而复用原始 PoW;切号 fresh retry 目前由 Go 主路径提供。 +- OpenAI Chat / Responses、Claude Messages、Gemini generateContent 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。Go 主路径的非流式重试由 `completionruntime.ExecuteNonStreamWithRetry` 统一处理;流式重试由 `completionruntime.ExecuteStreamWithRetry` 统一处理,各协议 runtime 只负责消费/渲染本协议 SSE framing。重试遵循 DeepSeek 多轮对话协议:从第一次上游 SSE 流中提取 `response_message_id`,并在重试 payload 中设置 `parent_message_id` 为该值,使重试成为同一会话的后续轮次而非断裂的根消息;同时重新获取一次 PoW(若 PoW 获取失败则回退到原始 PoW)。该同账号重试不会重新标准化消息、不会新建 session,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若同账号补偿重试后即将返回 429 `upstream_empty_output`,并且当前是托管账号模式,runtime 会在返回 429 前切换到下一个可用账号,新建 `chat_session_id`,使用原始 completion payload 再做一次 fresh retry;该切号重试不携带空回复 prompt 后缀,也不设置上一账号的 `parent_message_id`。如果 current input file 已触发,切号前会在新账号上重新上传同一份 `DS2API_HISTORY.txt`(以及需要时的 `DS2API_TOOLS.txt`),并用新账号可见的 file_id 替换自动生成的旧 file_id;客户端原本传入的其他文件引用保持不变。如果没有可切换账号,或切号后的 fresh retry 仍没有可见正文或工具调用,则继续按原错误返回:无任何输出为 503 `upstream_unavailable`,有 reasoning 但没有可见正文或工具调用为 429 `upstream_empty_output`。若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。Vercel Node 流式路径通过 Go 内部 prepare / pow / switch 端点获取初始 payload、重试 PoW 和切号 fresh retry payload,因此同样会重新上传 current-input 自动文件并替换为新账号 file_id。 - 非流式 OpenAI Chat / Responses、Claude Messages、Gemini generateContent 在最终可见正文渲染阶段,会把 DeepSeek 搜索返回中的 `[citation:N]` / `[reference:N]` 标记替换成对应 Markdown 链接。`citation` 标记按一基序号解析;`reference` 标记只有在同一段正文中出现 `[reference:0]`(允许冒号后有空格)时才按零基序号映射,并且不会影响同段正文里的 `citation` 标记。 - 流式输出仍默认隐藏 `[citation:N]` / `[reference:N]` 这类上游内部标记,避免分片输出中泄漏尚未完成映射的引用占位符。 @@ -165,7 +165,7 @@ OpenAI Chat / Responses 在标准化后、current input file 之前,会默认 1. 把每个 tool 的名称、描述、参数 schema 序列化成文本。 2. 拼成 `You have access to these tools:` 大段说明。 3. 再附上统一的 DSML tool call 外壳格式约束。 -4. 普通直传请求会把“工具描述 + 格式约束”一起并入 system prompt;如果 `current_input_file` 触发,则工具描述/schema 会单独上传成 `DS2API_TOOLS.txt`,live prompt 只保留工具格式约束、工具选择策略和对 `DS2API_TOOLS.txt` 的引用。 +4. 普通直传请求会把“工具描述 + 格式约束”一起并入 system prompt;如果 `current_input_file` 触发,则工具描述/schema 会单独上传成 `DS2API_TOOLS.txt`,live prompt 和 system tool 格式提示都会明确要求模型把 `DS2API_TOOLS.txt` 当作可调用工具和参数 schema 的权威来源。 工具调用正例现在优先示范半角管道符 DSML 风格:`<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`。 兼容层仍接受旧式纯 `` wrapper,并会容错若干 DSML 标签变体,包括短横线形式 `` / `` / ``、下划线形式 `` / `` / ``,以及其他前缀分隔形态如 `` / `` / ``;标签壳扫描还会把全角 ASCII 漂移归一化,例如 `<dSML|tool_calls>` 与全角 `>` 结束符,也会容错 CJK 尖括号、全角感叹号或顿号分隔符、弯引号属性值、PascalCase 本地名和属性尾部分隔符漂移,例如 `...〈/DSM|parameter〉`、`<!DSML!invoke name=“Bash”>`、`<、DSML、tool_calls>`、``、``。更一般地,Go / Node tag 扫描以固定本地标签名 `tool_calls` / `invoke` / `parameter` 为准,标签名前或标签名后的非结构性协议分隔符都会在解析入口剥离,例如 ``、`` 这类控制符或非 ASCII 分隔符漂移也会归一化回现有 XML 标签后继续走同一套 parser;结构性字符如 `<` / `>` / `/` / `=` / 引号、空白和 ASCII 字母数字不会被当作这类分隔符。进入现有 DSML rewrite / XML parse 之前,Go / Node 还会先对“已经识别成工具标签壳的 candidate span”做一次窄 canonicalization:只折叠 wrapper / `invoke` / `parameter` / `name` / `CDATA` / `DSML` 及其壳层分隔符里的 confusable 字符,清理零宽 / BOM / 控制类干扰,并把引号、空白、dash / underscore 变体等统一回可解析的工具语法。这个阶段不会广义改写普通正文、参数内容、CDATA 里的示例文本或其他非工具 XML。CDATA 开头也使用同一类扫描式容错,`= retryMax { if canRetryOnAlternateAccount(ctx, a, &assistantturn.OutputError{Status: http.StatusTooManyRequests}, opts.RetryEnabled, &accountSwitchAttempted) { - switched, switchErr := startPayloadCompletionOnAlternateAccount(ctx, ds, a, payload, maxAttempts) + switched, switchErr := startPayloadCompletionOnAlternateAccount(ctx, ds, a, payload, opts, maxAttempts) if switchErr != nil { if hooks.OnRetryFailure != nil { hooks.OnRetryFailure(switchErr.Status, switchErr.Message, switchErr.Code) @@ -142,7 +146,7 @@ func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.Requ } } -func startPayloadCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, payload map[string]any, maxAttempts int) (StartResult, *assistantturn.OutputError) { +func startPayloadCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, payload map[string]any, opts StreamRetryOptions, maxAttempts int) (StartResult, *assistantturn.OutputError) { sessionID, err := ds.CreateSession(ctx, a, maxAttempts) if err != nil { return StartResult{}, authOutputError(a) @@ -152,6 +156,13 @@ func startPayloadCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCa return StartResult{SessionID: sessionID}, &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Failed to get PoW (invalid token or unknown error).", Code: "error"} } nextPayload := clonePayload(payload) + if opts.CurrentInputFile != nil && opts.Request.CurrentInputFileApplied { + stdReq, prepErr := reuploadCurrentInputFileForAccount(ctx, ds, a, opts.Request, Options{CurrentInputFile: opts.CurrentInputFile}) + if prepErr != nil { + return StartResult{SessionID: sessionID}, prepErr + } + nextPayload = stdReq.CompletionPayload(sessionID) + } nextPayload["chat_session_id"] = sessionID delete(nextPayload, "parent_message_id") resp, err := ds.CallCompletion(ctx, a, nextPayload, pow, maxAttempts) diff --git a/internal/deepseek/client/client_completion.go b/internal/deepseek/client/client_completion.go index 1b91ce2..0563d33 100644 --- a/internal/deepseek/client/client_completion.go +++ b/internal/deepseek/client/client_completion.go @@ -5,9 +5,7 @@ import ( "context" dsprotocol "ds2api/internal/deepseek/protocol" "encoding/json" - "errors" "net/http" - "time" "ds2api/internal/auth" "ds2api/internal/config" @@ -15,39 +13,33 @@ import ( ) func (c *Client) CallCompletion(ctx context.Context, a *auth.RequestAuth, payload map[string]any, powResp string, maxAttempts int) (*http.Response, error) { - if maxAttempts <= 0 { - maxAttempts = c.maxRetries - } + _ = maxAttempts clients := c.requestClientsForAuth(ctx, a) headers := c.authHeaders(a.DeepSeekToken) headers["x-ds-pow-response"] = powResp captureSession := c.capture.Start("deepseek_completion", dsprotocol.DeepSeekCompletionURL, a.AccountID, payload) - attempts := 0 - for attempts < maxAttempts { - resp, err := c.streamPost(ctx, clients.stream, dsprotocol.DeepSeekCompletionURL, headers, payload) - if err != nil { - attempts++ - time.Sleep(time.Second) - continue - } - if resp.StatusCode == http.StatusOK { - if captureSession != nil { - resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode) - } - resp = c.wrapCompletionWithAutoContinue(ctx, a, payload, powResp, resp) - return resp, nil - } - if captureSession != nil { - resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode) - } - _ = resp.Body.Close() - attempts++ - time.Sleep(time.Second) + resp, err := c.streamPostOnce(ctx, clients.stream, dsprotocol.DeepSeekCompletionURL, headers, payload) + if err != nil { + return nil, err } - return nil, errors.New("completion failed") + if captureSession != nil { + resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode) + } + if resp.StatusCode == http.StatusOK { + resp = c.wrapCompletionWithAutoContinue(ctx, a, payload, powResp, resp) + } + return resp, nil } func (c *Client) streamPost(ctx context.Context, doer trans.Doer, url string, headers map[string]string, payload any) (*http.Response, error) { + return c.streamPostWithFallback(ctx, doer, url, headers, payload, true) +} + +func (c *Client) streamPostOnce(ctx context.Context, doer trans.Doer, url string, headers map[string]string, payload any) (*http.Response, error) { + return c.streamPostWithFallback(ctx, doer, url, headers, payload, false) +} + +func (c *Client) streamPostWithFallback(ctx context.Context, doer trans.Doer, url string, headers map[string]string, payload any, allowFallback bool) (*http.Response, error) { b, err := json.Marshal(payload) if err != nil { return nil, err @@ -63,15 +55,18 @@ func (c *Client) streamPost(ctx context.Context, doer trans.Doer, url string, he } resp, err := doer.Do(req) if err != nil { - config.Logger.Warn("[deepseek] fingerprint stream request failed, fallback to std transport", "url", url, "error", err) - req2, reqErr := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b)) - if reqErr != nil { - return nil, reqErr + if allowFallback { + config.Logger.Warn("[deepseek] fingerprint stream request failed, fallback to std transport", "url", url, "error", err) + req2, reqErr := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b)) + if reqErr != nil { + return nil, reqErr + } + for k, v := range headers { + req2.Header.Set(k, v) + } + return clients.fallbackS.Do(req2) } - for k, v := range headers { - req2.Header.Set(k, v) - } - return clients.fallbackS.Do(req2) + return nil, err } return resp, nil } diff --git a/internal/deepseek/client/client_completion_test.go b/internal/deepseek/client/client_completion_test.go new file mode 100644 index 0000000..5244c80 --- /dev/null +++ b/internal/deepseek/client/client_completion_test.go @@ -0,0 +1,36 @@ +package client + +import ( + "context" + "errors" + "net/http" + "testing" + + "ds2api/internal/auth" +) + +func TestCallCompletionDoesNotFallbackForNonIdempotentCompletion(t *testing.T) { + var fallbackCalled bool + client := &Client{ + stream: doerFunc(func(*http.Request) (*http.Response, error) { + return nil, errors.New("ambiguous completion write failure") + }), + fallbackS: &http.Client{Transport: roundTripperFunc(func(*http.Request) (*http.Response, error) { + fallbackCalled = true + return &http.Response{StatusCode: http.StatusOK}, nil + })}, + } + _, err := client.CallCompletion( + context.Background(), + &auth.RequestAuth{DeepSeekToken: "token"}, + map[string]any{"prompt": "hello"}, + "pow", + 3, + ) + if err == nil { + t.Fatal("expected completion error") + } + if fallbackCalled { + t.Fatal("completion fallback should not be called for a non-idempotent request") + } +} diff --git a/internal/deepseek/client/client_upload.go b/internal/deepseek/client/client_upload.go index c3334c3..3dc778d 100644 --- a/internal/deepseek/client/client_upload.go +++ b/internal/deepseek/client/client_upload.go @@ -95,11 +95,7 @@ func (c *Client) UploadFile(ctx context.Context, a *auth.RequestAuth, req Upload resp, err := c.doUpload(ctx, clients.regular, clients.fallback, dsprotocol.DeepSeekUploadFileURL, headers, body) if err != nil { config.Logger.Warn("[upload_file] request error", "error", err, "account", a.AccountID, "filename", filename) - powHeader = "" - lastFailureKind = FailureUnknown - lastFailureMessage = err.Error() - attempts++ - continue + return nil, err } if captureSession != nil { resp.Body = captureSession.WrapBody(resp.Body, resp.StatusCode) @@ -201,7 +197,7 @@ func escapeMultipartFilename(filename string) string { return filename } -func (c *Client) doUpload(ctx context.Context, doer trans.Doer, fallback trans.Doer, url string, headers map[string]string, body []byte) (*http.Response, error) { +func (c *Client) doUpload(ctx context.Context, doer trans.Doer, _ trans.Doer, url string, headers map[string]string, body []byte) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) if err != nil { return nil, err @@ -213,15 +209,7 @@ func (c *Client) doUpload(ctx context.Context, doer trans.Doer, fallback trans.D if err == nil { return resp, nil } - config.Logger.Warn("[deepseek] fingerprint upload request failed, fallback to std transport", "url", url, "error", err) - req2, reqErr := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) - if reqErr != nil { - return nil, reqErr - } - for k, v := range headers { - req2.Header.Set(k, v) - } - return fallback.Do(req2) + return nil, err } func extractUploadFileResult(resp map[string]any) *UploadFileResult { diff --git a/internal/deepseek/client/client_upload_test.go b/internal/deepseek/client/client_upload_test.go index e7d1cc0..ff547da 100644 --- a/internal/deepseek/client/client_upload_test.go +++ b/internal/deepseek/client/client_upload_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "io" "net/http" "strings" @@ -39,6 +40,31 @@ func TestBuildUploadMultipartBodyOmitsPurposeAndIncludesFilePart(t *testing.T) { } } +func TestDoUploadDoesNotFallbackForNonIdempotentUpload(t *testing.T) { + var fallbackCalled bool + client := &Client{} + _, err := client.doUpload( + context.Background(), + doerFunc(func(req *http.Request) (*http.Response, error) { + _, _ = io.ReadAll(req.Body) + return nil, errors.New("ambiguous upload write failure") + }), + doerFunc(func(*http.Request) (*http.Response, error) { + fallbackCalled = true + return &http.Response{StatusCode: http.StatusOK, Header: make(http.Header), Body: io.NopCloser(strings.NewReader("{}"))}, nil + }), + dsprotocol.DeepSeekUploadFileURL, + map[string]string{"Content-Type": "multipart/form-data"}, + []byte("body"), + ) + if err == nil { + t.Fatal("expected upload error") + } + if fallbackCalled { + t.Fatal("upload fallback should not be called for a non-idempotent request") + } +} + func TestExtractUploadFileResultSupportsNestedShapes(t *testing.T) { got := extractUploadFileResult(map[string]any{ "data": map[string]any{ diff --git a/internal/httpapi/claude/handler_messages.go b/internal/httpapi/claude/handler_messages.go index e22a1ed..a89ed8d 100644 --- a/internal/httpapi/claude/handler_messages.go +++ b/internal/httpapi/claude/handler_messages.go @@ -145,7 +145,7 @@ func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Reques return } streamReq := start.Request - h.handleClaudeStreamRealtimeWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.PromptTokenText, historySession) + h.handleClaudeStreamRealtimeWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.PromptTokenText, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool { @@ -361,7 +361,7 @@ func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Requ }) } -func (h *Handler) handleClaudeStreamRealtimeWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, promptTokenText string, historySession *responsehistory.Session) { +func (h *Handler) handleClaudeStreamRealtimeWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow string, stdReq promptcompat.StandardRequest, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, promptTokenText string, historySession *responsehistory.Session) { if resp.StatusCode != http.StatusOK { defer func() { _ = resp.Body.Close() }() body, _ := io.ReadAll(resp.Body) @@ -399,11 +399,13 @@ func (h *Handler) handleClaudeStreamRealtimeWithRetry(w http.ResponseWriter, r * streamRuntime.sendMessageStart() completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ - Surface: "claude.messages", - Stream: true, - RetryEnabled: true, - MaxAttempts: 3, - UsagePrompt: promptTokenText, + Surface: "claude.messages", + Stream: true, + RetryEnabled: true, + MaxAttempts: 3, + UsagePrompt: promptTokenText, + Request: stdReq, + CurrentInputFile: h.Store, }, completionruntime.StreamRetryHooks{ ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { return h.consumeClaudeStreamAttempt(r, currentResp, streamRuntime, thinkingEnabled, allowDeferEmpty) diff --git a/internal/httpapi/gemini/handler_generate.go b/internal/httpapi/gemini/handler_generate.go index 784ff75..b9a648d 100644 --- a/internal/httpapi/gemini/handler_generate.go +++ b/internal/httpapi/gemini/handler_generate.go @@ -137,7 +137,7 @@ func (h *Handler) handleGeminiDirectStream(w http.ResponseWriter, r *http.Reques return } streamReq := start.Request - h.handleStreamGenerateContentWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) + h.handleStreamGenerateContentWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq, streamReq.ResponseModel, streamReq.PromptTokenText, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, stream bool) bool { diff --git a/internal/httpapi/gemini/handler_stream_runtime.go b/internal/httpapi/gemini/handler_stream_runtime.go index a1244ad..6a98a4e 100644 --- a/internal/httpapi/gemini/handler_stream_runtime.go +++ b/internal/httpapi/gemini/handler_stream_runtime.go @@ -12,6 +12,7 @@ import ( "ds2api/internal/auth" "ds2api/internal/completionruntime" dsprotocol "ds2api/internal/deepseek/protocol" + "ds2api/internal/promptcompat" "ds2api/internal/responsehistory" "ds2api/internal/sse" streamengine "ds2api/internal/stream" @@ -87,7 +88,7 @@ type geminiStreamRuntime struct { history *responsehistory.Session } -func (h *Handler) handleStreamGenerateContentWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *responsehistory.Session) { +func (h *Handler) handleStreamGenerateContentWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow string, stdReq promptcompat.StandardRequest, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *responsehistory.Session) { if resp.StatusCode != http.StatusOK { defer func() { _ = resp.Body.Close() }() body, _ := io.ReadAll(resp.Body) @@ -108,11 +109,13 @@ func (h *Handler) handleStreamGenerateContentWithRetry(w http.ResponseWriter, r runtime := newGeminiStreamRuntime(w, rc, canFlush, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, historySession) completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{ - Surface: "gemini.generate_content", - Stream: true, - RetryEnabled: true, - MaxAttempts: 3, - UsagePrompt: finalPrompt, + Surface: "gemini.generate_content", + Stream: true, + RetryEnabled: true, + MaxAttempts: 3, + UsagePrompt: finalPrompt, + Request: stdReq, + CurrentInputFile: h.Store, }, completionruntime.StreamRetryHooks{ ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { return h.consumeGeminiStreamAttempt(r.Context(), currentResp, runtime, thinkingEnabled, allowDeferEmpty) diff --git a/internal/httpapi/gemini/handler_test.go b/internal/httpapi/gemini/handler_test.go index d420dcb..9409b72 100644 --- a/internal/httpapi/gemini/handler_test.go +++ b/internal/httpapi/gemini/handler_test.go @@ -205,6 +205,57 @@ func TestGeminiDirectAppliesCurrentInputFile(t *testing.T) { } } +func TestGeminiCurrentInputFileUploadsToolsSeparately(t *testing.T) { + ds := &testGeminiDS{ + resp: makeGeminiUpstreamResponse(`data: {"p":"response/content","v":"ok"}`), + } + h := &Handler{ + Store: testGeminiConfig{}, + Auth: testGeminiAuth{}, + DS: ds, + } + reqBody := `{ + "contents":[{"role":"user","parts":[{"text":"run code"}]}], + "tools":[{"functionDeclarations":[{"name":"eval_javascript","description":"eval","parameters":{"type":"object","properties":{"code":{"type":"string"}}}}]}] + }` + req := httptest.NewRequest(http.MethodPost, "/v1beta/models/gemini-2.5-pro:generateContent", strings.NewReader(reqBody)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + r := chi.NewRouter() + RegisterRoutes(r, h) + + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.uploadCalls) != 2 { + t.Fatalf("expected history and tools uploads, got %d", len(ds.uploadCalls)) + } + if ds.uploadCalls[0].Filename != "DS2API_HISTORY.txt" || ds.uploadCalls[1].Filename != "DS2API_TOOLS.txt" { + t.Fatalf("unexpected upload filenames: %#v", ds.uploadCalls) + } + historyText := string(ds.uploadCalls[0].Data) + if strings.Contains(historyText, "Description: eval") { + t.Fatalf("history transcript should not embed tool descriptions, got %q", historyText) + } + toolsText := string(ds.uploadCalls[1].Data) + if !strings.Contains(toolsText, "# DS2API_TOOLS.txt") || !strings.Contains(toolsText, "Tool: eval_javascript") || !strings.Contains(toolsText, "Description: eval") { + t.Fatalf("expected tools transcript to include Gemini tool schema, got %q", toolsText) + } + refIDs, _ := ds.payloads[0]["ref_file_ids"].([]any) + if len(refIDs) < 2 || refIDs[0] != "file-gemini-history" || refIDs[1] != "file-gemini-tools" { + t.Fatalf("expected history and tools ref ids first, got %#v", ds.payloads[0]["ref_file_ids"]) + } + prompt, _ := ds.payloads[0]["prompt"].(string) + if !strings.Contains(prompt, "DS2API_TOOLS.txt") || !strings.Contains(prompt, "TOOL CALL FORMAT") { + t.Fatalf("expected live prompt to reference tools file and retain format instructions, got %q", prompt) + } + if strings.Contains(prompt, "Description: eval") { + t.Fatalf("live prompt should not inline tool descriptions, got %q", prompt) + } +} + func TestGeminiRoutesRegistered(t *testing.T) { h := &Handler{ Store: testGeminiConfig{}, diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 1dc8ca9..3494b6d 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -66,7 +66,7 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co writeJSON(w, http.StatusOK, respBody) } -func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID string, sessionIDRef *string, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { +func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID string, sessionIDRef *string, stdReq promptcompat.StandardRequest, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { streamRuntime, initialType, ok := h.prepareChatStreamRuntime(w, resp, completionID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, historySession) if !ok { return @@ -78,6 +78,8 @@ func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, RetryMaxAttempts: emptyOutputRetryMaxAttempts(), MaxAttempts: 3, UsagePrompt: finalPrompt, + Request: stdReq, + CurrentInputFile: h.Store, }, completionruntime.StreamRetryHooks{ ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { return h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, allowDeferEmpty) diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index da0ad4a..964147e 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -33,6 +33,7 @@ type Handler struct { type streamLease struct { Auth *auth.RequestAuth + Standard promptcompat.StandardRequest ExpiresAt time.Time } diff --git a/internal/httpapi/openai/chat/handler_chat.go b/internal/httpapi/openai/chat/handler_chat.go index 9d86cf7..c46278b 100644 --- a/internal/httpapi/openai/chat/handler_chat.go +++ b/internal/httpapi/openai/chat/handler_chat.go @@ -28,6 +28,10 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { h.handleVercelStreamPow(w, r) return } + if isVercelStreamSwitchRequest(r) { + h.handleVercelStreamSwitch(w, r) + return + } if isVercelStreamPrepareRequest(r) { h.handleVercelStreamPrepare(w, r) return @@ -114,7 +118,7 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { } streamReq := start.Request refFileTokens := streamReq.RefFileTokens - h.handleStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, sessionID, &sessionID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, historySession) + h.handleStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, sessionID, &sessionID, streamReq, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, historySession) } func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) { diff --git a/internal/httpapi/openai/chat/vercel_prepare_test.go b/internal/httpapi/openai/chat/vercel_prepare_test.go index e7f3892..07861c2 100644 --- a/internal/httpapi/openai/chat/vercel_prepare_test.go +++ b/internal/httpapi/openai/chat/vercel_prepare_test.go @@ -1,6 +1,7 @@ package chat import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -8,8 +9,11 @@ import ( "testing" "time" + "ds2api/internal/account" "ds2api/internal/auth" + "ds2api/internal/config" dsclient "ds2api/internal/deepseek/client" + "ds2api/internal/promptcompat" ) func TestIsVercelStreamPrepareRequest(t *testing.T) { @@ -206,6 +210,76 @@ func TestHandleVercelStreamPrepareUsesHalfwidthDSMLToolPrompt(t *testing.T) { } } +func TestHandleVercelStreamPrepareUploadsToolsSeparately(t *testing.T) { + t.Setenv("VERCEL", "1") + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") + + ds := &inlineUploadDSStub{} + h := &Handler{ + Store: mockOpenAIConfig{currentInputEnabled: true}, + Auth: streamStatusAuthStub{}, + DS: ds, + } + + reqBody, _ := json.Marshal(map[string]any{ + "model": "deepseek-v4-flash", + "messages": []any{ + map[string]any{"role": "user", "content": "search docs"}, + }, + "tools": []any{ + map[string]any{ + "type": "function", + "function": map[string]any{ + "name": "search", + "description": "search docs", + "parameters": map[string]any{"type": "object"}, + }, + }, + }, + "stream": true, + }) + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_prepare=1", strings.NewReader(string(reqBody))) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Ds2-Internal-Token", "stream-secret") + rec := httptest.NewRecorder() + + h.handleVercelStreamPrepare(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.uploadCalls) != 2 { + t.Fatalf("expected history and tools uploads, got %d", len(ds.uploadCalls)) + } + if ds.uploadCalls[0].Filename != "DS2API_HISTORY.txt" || ds.uploadCalls[1].Filename != "DS2API_TOOLS.txt" { + t.Fatalf("unexpected upload filenames: %#v", ds.uploadCalls) + } + if strings.Contains(string(ds.uploadCalls[0].Data), "Description: search docs") { + t.Fatalf("history transcript should not embed tool descriptions, got %q", string(ds.uploadCalls[0].Data)) + } + + var body map[string]any + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("decode failed: %v", err) + } + finalPrompt, _ := body["final_prompt"].(string) + payload, _ := body["payload"].(map[string]any) + payloadPrompt, _ := payload["prompt"].(string) + for label, promptText := range map[string]string{"final_prompt": finalPrompt, "payload.prompt": payloadPrompt} { + if !strings.Contains(promptText, "DS2API_TOOLS.txt") || !strings.Contains(promptText, "TOOL CALL FORMAT") { + t.Fatalf("expected %s to reference tools file and retain tool instructions, got %q", label, promptText) + } + if strings.Contains(promptText, "Description: search docs") { + t.Fatalf("expected %s not to inline tool descriptions, got %q", label, promptText) + } + } + refIDs, _ := payload["ref_file_ids"].([]any) + if len(refIDs) < 2 || refIDs[0] != "file-inline-1" || refIDs[1] != "file-inline-2" { + t.Fatalf("expected history and tools ref ids first, got %#v", payload["ref_file_ids"]) + } +} + func TestHandleVercelStreamPrepareMapsCurrentInputFileManagedAuthFailureTo401(t *testing.T) { t.Setenv("VERCEL", "1") t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") @@ -241,3 +315,88 @@ func TestHandleVercelStreamPrepareMapsCurrentInputFileManagedAuthFailureTo401(t t.Fatalf("expected managed auth error message, got %s", rec.Body.String()) } } + +func TestHandleVercelStreamSwitchReuploadsCurrentInputFile(t *testing.T) { + t.Setenv("VERCEL", "1") + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["managed-key"], + "accounts":[ + {"email":"acc1@test.com","password":"pwd"}, + {"email":"acc2@test.com","password":"pwd"} + ] + }`) + store := config.LoadStore() + resolver := auth.NewResolver(store, account.NewPool(store), func(_ context.Context, acc config.Account) (string, error) { + return "token-" + acc.Identifier(), nil + }) + authReq := httptest.NewRequest(http.MethodPost, "/", nil) + authReq.Header.Set("Authorization", "Bearer managed-key") + a, err := resolver.Determine(authReq) + if err != nil { + t.Fatalf("determine failed: %v", err) + } + defer resolver.Release(a) + + ds := &inlineUploadDSStub{} + h := &Handler{ + Store: mockOpenAIConfig{currentInputEnabled: true}, + Auth: resolver, + DS: ds, + } + stdReq := promptcompat.StandardRequest{ + RequestedModel: "deepseek-v4-flash", + ResolvedModel: "deepseek-v4-flash", + ResponseModel: "deepseek-v4-flash", + FinalPrompt: "Continue from the latest state in the attached DS2API_HISTORY.txt context. Available tool descriptions and parameter schemas are attached in DS2API_TOOLS.txt; use only those tools and follow the tool-call format rules in this prompt.", + PromptTokenText: "# DS2API_HISTORY.txt\n\n=== 1. USER ===\nhello\n\n# DS2API_TOOLS.txt\nAvailable tool descriptions and parameter schemas for this request.\n\nYou have access to these tools:\n\nTool: search\nDescription: search docs\nParameters: {\"type\":\"object\"}\n", + HistoryText: "# DS2API_HISTORY.txt\n\n=== 1. USER ===\nhello\n", + CurrentInputFileApplied: true, + CurrentInputFileID: "file-old", + CurrentToolsFileID: "file-old-tools", + ToolsRaw: []any{ + map[string]any{ + "type": "function", + "function": map[string]any{ + "name": "search", + "description": "search docs", + "parameters": map[string]any{"type": "object"}, + }, + }, + }, + RefFileIDs: []string{"file-old", "file-old-tools", "client-file"}, + Thinking: true, + } + leaseID := h.holdStreamLease(a, stdReq) + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_switch=1", strings.NewReader(`{"lease_id":"`+leaseID+`"}`)) + req.Header.Set("X-Ds2-Internal-Token", "stream-secret") + rec := httptest.NewRecorder() + + h.handleVercelStreamSwitch(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.uploadCalls) != 2 { + t.Fatalf("expected current input and tools reupload on switched account, got %d", len(ds.uploadCalls)) + } + if ds.uploadCalls[0].Filename != "DS2API_HISTORY.txt" || ds.uploadCalls[1].Filename != "DS2API_TOOLS.txt" { + t.Fatalf("unexpected reupload filenames: %#v", ds.uploadCalls) + } + var body map[string]any + if err := json.NewDecoder(rec.Body).Decode(&body); err != nil { + t.Fatalf("decode failed: %v", err) + } + if body["deepseek_token"] != "token-acc2@test.com" { + t.Fatalf("expected switched account token, got %#v", body["deepseek_token"]) + } + payload, _ := body["payload"].(map[string]any) + refIDs, _ := payload["ref_file_ids"].([]any) + if len(refIDs) != 3 || refIDs[0] != "file-inline-1" || refIDs[1] != "file-inline-2" || refIDs[2] != "client-file" { + t.Fatalf("expected reuploaded current input ref plus client ref, got %#v", payload["ref_file_ids"]) + } + promptText, _ := payload["prompt"].(string) + if !strings.Contains(promptText, "DS2API_TOOLS.txt") { + t.Fatalf("expected switched payload prompt to retain tools file reference, got %q", promptText) + } +} diff --git a/internal/httpapi/openai/chat/vercel_stream.go b/internal/httpapi/openai/chat/vercel_stream.go index b52cd9c..6e00a37 100644 --- a/internal/httpapi/openai/chat/vercel_stream.go +++ b/internal/httpapi/openai/chat/vercel_stream.go @@ -11,6 +11,7 @@ import ( "ds2api/internal/auth" "ds2api/internal/config" + "ds2api/internal/httpapi/openai/history" "ds2api/internal/promptcompat" "ds2api/internal/util" @@ -96,7 +97,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque } payload := stdReq.CompletionPayload(sessionID) - leaseID := h.holdStreamLease(a) + leaseID := h.holdStreamLease(a, stdReq) if leaseID == "" { writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") return @@ -185,6 +186,80 @@ func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) }) } +func (h *Handler) handleVercelStreamSwitch(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + h.sweepExpiredStreamLeases() + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + leaseID, _ := req["lease_id"].(string) + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") + return + } + lease, ok := h.lookupStreamLease(leaseID) + if !ok || lease.Auth == nil { + writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired") + return + } + a := lease.Auth + if !a.UseConfigToken || !a.SwitchAccount(r.Context()) { + writeOpenAIErrorWithCode(w, http.StatusTooManyRequests, "Upstream account hit a rate limit and returned reasoning without visible output.", "upstream_empty_output") + return + } + + stdReq := lease.Standard + var err error + if stdReq.CurrentInputFileApplied { + stdReq, err = (history.Service{Store: h.Store, DS: h.DS}).ReuploadAppliedCurrentInputFile(r.Context(), a, stdReq) + if err != nil { + status, message := mapCurrentInputFileError(err) + writeOpenAIError(w, status, message) + return + } + } + sessionID, err := h.DS.CreateSession(r.Context(), a, 3) + if err != nil { + writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") + return + } + powHeader, err := h.DS.GetPow(r.Context(), a, 3) + if err != nil { + writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).") + return + } + if strings.TrimSpace(a.DeepSeekToken) == "" { + writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") + return + } + h.updateStreamLeaseStandard(leaseID, stdReq) + writeJSON(w, http.StatusOK, map[string]any{ + "session_id": sessionID, + "lease_id": leaseID, + "model": stdReq.ResponseModel, + "final_prompt": stdReq.FinalPrompt, + "thinking_enabled": stdReq.Thinking, + "search_enabled": stdReq.Search, + "tool_names": stdReq.ToolNames, + "deepseek_token": a.DeepSeekToken, + "pow_header": powHeader, + "payload": stdReq.CompletionPayload(sessionID), + }) +} + func isVercelStreamPrepareRequest(r *http.Request) bool { if r == nil { return false @@ -206,6 +281,13 @@ func isVercelStreamPowRequest(r *http.Request) bool { return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1" } +func isVercelStreamSwitchRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_switch")) == "1" +} + func vercelInternalSecret() string { if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { return v @@ -216,10 +298,14 @@ func vercelInternalSecret() string { return "admin" } -func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { +func (h *Handler) holdStreamLease(a *auth.RequestAuth, standards ...promptcompat.StandardRequest) string { if a == nil { return "" } + var stdReq promptcompat.StandardRequest + if len(standards) > 0 { + stdReq = standards[0] + } now := time.Now() ttl := streamLeaseTTL() if ttl <= 0 { @@ -234,6 +320,7 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { leaseID := newLeaseID() h.streamLeases[leaseID] = streamLease{ Auth: a, + Standard: stdReq, ExpiresAt: now.Add(ttl), } h.leaseMu.Unlock() @@ -241,20 +328,43 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { return leaseID } -func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { +func (h *Handler) lookupStreamLease(leaseID string) (streamLease, bool) { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { - return nil + return streamLease{}, false } h.leaseMu.Lock() lease, ok := h.streamLeases[leaseID] h.leaseMu.Unlock() if !ok || time.Now().After(lease.ExpiresAt) { + return streamLease{}, false + } + return lease, true +} + +func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { + lease, ok := h.lookupStreamLease(leaseID) + if !ok { return nil } return lease.Auth } +func (h *Handler) updateStreamLeaseStandard(leaseID string, stdReq promptcompat.StandardRequest) { + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + return + } + h.leaseMu.Lock() + defer h.leaseMu.Unlock() + lease, ok := h.streamLeases[leaseID] + if !ok { + return + } + lease.Standard = stdReq + h.streamLeases[leaseID] = lease +} + func (h *Handler) releaseStreamLease(leaseID string) bool { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { diff --git a/internal/httpapi/openai/history/current_input_file.go b/internal/httpapi/openai/history/current_input_file.go index b69cb82..032927d 100644 --- a/internal/httpapi/openai/history/current_input_file.go +++ b/internal/httpapi/openai/history/current_input_file.go @@ -99,6 +99,8 @@ func (s Service) ApplyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq.Messages = messages stdReq.HistoryText = fileText stdReq.CurrentInputFileApplied = true + stdReq.CurrentInputFileID = fileID + stdReq.CurrentToolsFileID = toolFileID stdReq.RefFileIDs = prependUniqueRefFileIDs(stdReq.RefFileIDs, fileID, toolFileID) stdReq.FinalPrompt, stdReq.ToolNames = promptcompat.BuildOpenAIPromptWithToolInstructionsOnly(messages, stdReq.ToolsRaw, "", stdReq.ToolChoice, stdReq.Thinking) // Token accounting must reflect the actual downstream context: @@ -112,6 +114,58 @@ func (s Service) ApplyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, return stdReq, nil } +func (s Service) ReuploadAppliedCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) (promptcompat.StandardRequest, error) { + if !stdReq.CurrentInputFileApplied || s.DS == nil || a == nil { + return stdReq, nil + } + fileText := strings.TrimSpace(stdReq.HistoryText) + if fileText == "" { + return stdReq, nil + } + modelType := "default" + if resolvedType, ok := config.GetModelType(stdReq.ResolvedModel); ok { + modelType = resolvedType + } + result, err := s.DS.UploadFile(ctx, a, dsclient.UploadFileRequest{ + Filename: currentInputFilename, + ContentType: currentInputContentType, + Purpose: currentInputPurpose, + ModelType: modelType, + Data: []byte(stdReq.HistoryText), + }, 3) + if err != nil { + return stdReq, fmt.Errorf("upload current user input file: %w", err) + } + fileID := strings.TrimSpace(result.ID) + if fileID == "" { + return stdReq, errors.New("upload current user input file returned empty file id") + } + + toolsText, _ := promptcompat.BuildOpenAIToolsContextTranscript(stdReq.ToolsRaw, stdReq.ToolChoice) + toolFileID := "" + if strings.TrimSpace(toolsText) != "" { + result, err := s.DS.UploadFile(ctx, a, dsclient.UploadFileRequest{ + Filename: currentToolsFilename, + ContentType: currentInputContentType, + Purpose: currentInputPurpose, + ModelType: modelType, + Data: []byte(toolsText), + }, 3) + if err != nil { + return stdReq, fmt.Errorf("upload current tools file: %w", err) + } + toolFileID = strings.TrimSpace(result.ID) + if toolFileID == "" { + return stdReq, errors.New("upload current tools file returned empty file id") + } + } + + stdReq.RefFileIDs = replaceGeneratedCurrentInputRefs(stdReq.RefFileIDs, stdReq.CurrentInputFileID, stdReq.CurrentToolsFileID, fileID, toolFileID) + stdReq.CurrentInputFileID = fileID + stdReq.CurrentToolsFileID = toolFileID + return stdReq, nil +} + func latestUserInputForFile(messages []any) (int, string) { for i := len(messages) - 1; i >= 0; i-- { msg, ok := messages[i].(map[string]any) @@ -168,3 +222,25 @@ func prependUniqueRefFileIDs(existing []string, fileIDs ...string) []string { } return out } + +func replaceGeneratedCurrentInputRefs(existing []string, oldHistoryID, oldToolsID, newHistoryID, newToolsID string) []string { + filtered := make([]string, 0, len(existing)) + old := map[string]struct{}{} + for _, id := range []string{oldHistoryID, oldToolsID} { + trimmed := strings.ToLower(strings.TrimSpace(id)) + if trimmed != "" { + old[trimmed] = struct{}{} + } + } + for _, id := range existing { + trimmed := strings.TrimSpace(id) + if trimmed == "" { + continue + } + if _, ok := old[strings.ToLower(trimmed)]; ok { + continue + } + filtered = append(filtered, trimmed) + } + return prependUniqueRefFileIDs(filtered, newHistoryID, newToolsID) +} diff --git a/internal/httpapi/openai/history_split_test.go b/internal/httpapi/openai/history_split_test.go index 90e8d53..14b8658 100644 --- a/internal/httpapi/openai/history_split_test.go +++ b/internal/httpapi/openai/history_split_test.go @@ -610,6 +610,69 @@ func TestResponsesCurrentInputFileUploadsContextAndKeepsNeutralPrompt(t *testing } } +func TestResponsesCurrentInputFileUploadsToolsSeparately(t *testing.T) { + ds := &inlineUploadDSStub{} + h := &openAITestSurface{ + Store: mockOpenAIConfig{ + currentInputEnabled: true, + }, + Auth: streamStatusAuthStub{}, + DS: ds, + } + r := chi.NewRouter() + registerOpenAITestRoutes(r, h) + reqBody, _ := json.Marshal(map[string]any{ + "model": "deepseek-v4-flash", + "messages": historySplitTestMessages(), + "tools": []any{ + map[string]any{ + "type": "function", + "function": map[string]any{ + "name": "search", + "description": "search docs", + "parameters": map[string]any{"type": "object"}, + }, + }, + }, + "stream": false, + }) + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(string(reqBody))) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(ds.uploadCalls) != 2 { + t.Fatalf("expected history and tools uploads, got %d", len(ds.uploadCalls)) + } + if ds.uploadCalls[0].Filename != "DS2API_HISTORY.txt" || ds.uploadCalls[1].Filename != "DS2API_TOOLS.txt" { + t.Fatalf("unexpected upload filenames: %#v", ds.uploadCalls) + } + historyText := string(ds.uploadCalls[0].Data) + if strings.Contains(historyText, "Description: search docs") { + t.Fatalf("history transcript should not embed tool descriptions, got %q", historyText) + } + toolsText := string(ds.uploadCalls[1].Data) + if !strings.Contains(toolsText, "# DS2API_TOOLS.txt") || !strings.Contains(toolsText, "Tool: search") || !strings.Contains(toolsText, "Description: search docs") { + t.Fatalf("expected tools transcript to include schema, got %q", toolsText) + } + promptText, _ := ds.completionReq["prompt"].(string) + if !strings.Contains(promptText, "DS2API_TOOLS.txt") || !strings.Contains(promptText, "TOOL CALL FORMAT") { + t.Fatalf("expected live prompt to reference tools file and retain format instructions, got %q", promptText) + } + if strings.Contains(promptText, "Description: search docs") { + t.Fatalf("live prompt should not inline tool descriptions, got %q", promptText) + } + refIDs, _ := ds.completionReq["ref_file_ids"].([]any) + if len(refIDs) < 2 || refIDs[0] != "file-inline-1" || refIDs[1] != "file-inline-2" { + t.Fatalf("expected history and tools ref ids first, got %#v", ds.completionReq["ref_file_ids"]) + } +} + func TestChatCompletionsCurrentInputFileMapsManagedAuthFailureTo401(t *testing.T) { ds := &inlineUploadDSStub{ uploadErr: &dsclient.RequestFailure{Op: "upload file", Kind: dsclient.FailureManagedUnauthorized, Message: "expired token"}, diff --git a/internal/httpapi/openai/leaked_output_sanitize_test.go b/internal/httpapi/openai/leaked_output_sanitize_test.go index 4076b14..939f73f 100644 --- a/internal/httpapi/openai/leaked_output_sanitize_test.go +++ b/internal/httpapi/openai/leaked_output_sanitize_test.go @@ -26,6 +26,15 @@ func TestSanitizeLeakedOutputRemovesStandaloneMetaMarkers(t *testing.T) { } } +func TestSanitizeLeakedOutputRemovesFullwidthDelimitedMetaMarkers(t *testing.T) { + fw := "\uff5c" + raw := "A<" + fw + "end▁of▁sentence" + fw + ">B<" + fw + " Assistant " + fw + ">C<" + fw + "end_of_toolresults" + fw + ">D" + got := sanitizeLeakedOutput(raw) + if got != "ABCD" { + t.Fatalf("unexpected sanitize result for fullwidth-delimited meta markers: %q", got) + } +} + func TestSanitizeLeakedOutputRemovesThinkAndBosMarkers(t *testing.T) { raw := "ABC<|begin▁of▁sentence|>D<| begin_of_sentence |>E<|begin_of_sentence|>F" got := sanitizeLeakedOutput(raw) @@ -42,6 +51,15 @@ func TestSanitizeLeakedOutputRemovesThoughtMarkers(t *testing.T) { } } +func TestSanitizeLeakedOutputRemovesFullwidthDelimitedBosAndThoughtMarkers(t *testing.T) { + fw := "\uff5c" + raw := "A<" + fw + "begin▁of▁sentence" + fw + ">B<" + fw + "▁of▁thought" + fw + ">C<" + fw + " begin_of_thought " + fw + ">D" + got := sanitizeLeakedOutput(raw) + if got != "ABCD" { + t.Fatalf("unexpected sanitize result for fullwidth-delimited BOS/thought markers: %q", got) + } +} + func TestSanitizeLeakedOutputRemovesDanglingThinkBlock(t *testing.T) { raw := "Answer prefixinternal reasoning that never closes" got := sanitizeLeakedOutput(raw) diff --git a/internal/httpapi/openai/responses/empty_retry_runtime.go b/internal/httpapi/openai/responses/empty_retry_runtime.go index 80422f5..5166f9c 100644 --- a/internal/httpapi/openai/responses/empty_retry_runtime.go +++ b/internal/httpapi/openai/responses/empty_retry_runtime.go @@ -15,7 +15,7 @@ import ( streamengine "ds2api/internal/stream" ) -func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) { +func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID string, stdReq promptcompat.StandardRequest, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) { streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID, historySession) if !ok { return @@ -27,6 +27,8 @@ func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http. RetryMaxAttempts: emptyOutputRetryMaxAttempts(), MaxAttempts: 3, UsagePrompt: finalPrompt, + Request: stdReq, + CurrentInputFile: h.Store, }, completionruntime.StreamRetryHooks{ ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) { return h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, allowDeferEmpty) diff --git a/internal/httpapi/openai/responses/responses_handler.go b/internal/httpapi/openai/responses/responses_handler.go index 3a6680d..f34daed 100644 --- a/internal/httpapi/openai/responses/responses_handler.go +++ b/internal/httpapi/openai/responses/responses_handler.go @@ -138,7 +138,7 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) { streamReq := start.Request refFileTokens := streamReq.RefFileTokens - h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession) + h.handleResponsesStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, owner, responseID, streamReq, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, traceID, historySession) } func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string) { diff --git a/internal/httpapi/openai/shared/leaked_output_sanitize.go b/internal/httpapi/openai/shared/leaked_output_sanitize.go index ae317f6..9293e78 100644 --- a/internal/httpapi/openai/shared/leaked_output_sanitize.go +++ b/internal/httpapi/openai/shared/leaked_output_sanitize.go @@ -13,21 +13,23 @@ var leakedToolResultBlobPattern = regexp.MustCompile(`(?is)<\s*\|\s*tool\s*\|\s* var leakedThinkTagPattern = regexp.MustCompile(`(?is)`) -// leakedBOSMarkerPattern matches DeepSeek BOS markers in BOTH forms: +// leakedBOSMarkerPattern matches DeepSeek BOS markers with halfwidth or +// legacy U+FF5C fullwidth delimiters: // - ASCII underscore: <|begin_of_sentence|> // - U+2581 variant: <|begin▁of▁sentence|> -var leakedBOSMarkerPattern = regexp.MustCompile(`(?i)<[|\|]\s*begin[_▁]of[_▁]sentence\s*[|\|]>`) +var leakedBOSMarkerPattern = regexp.MustCompile(`(?i)<[\|\x{ff5c}]\s*begin[_▁]of[_▁]sentence\s*[\|\x{ff5c}]>`) // leakedThoughtMarkerPattern matches leaked thought control markers in both // explicit and compact forms: // - ASCII underscore: <| of_thought |>, <| begin_of_thought |> // - U+2581 variant: <|▁of▁thought|>, <|begin▁of▁thought|> -var leakedThoughtMarkerPattern = regexp.MustCompile(`(?i)<[|\|]\s*(?:begin[_▁])?[_▁]*of[_▁]thought\s*[|\|]>`) +var leakedThoughtMarkerPattern = regexp.MustCompile(`(?i)<[\|\x{ff5c}]\s*(?:begin[_▁])?[_▁]*of[_▁]thought\s*[\|\x{ff5c}]>`) -// leakedMetaMarkerPattern matches the remaining DeepSeek special tokens in BOTH forms: +// leakedMetaMarkerPattern matches the remaining DeepSeek special tokens with +// halfwidth or legacy U+FF5C fullwidth delimiters: // - ASCII underscore: <|end_of_sentence|>, <|end_of_toolresults|>, <|end_of_instructions|> // - U+2581 variant: <|end▁of▁sentence|>, <|end▁of▁toolresults|>, <|end▁of▁instructions|> -var leakedMetaMarkerPattern = regexp.MustCompile(`(?i)<[|\|]\s*(?:assistant|tool|end[_▁]of[_▁]sentence|end[_▁]of[_▁]thinking|end[_▁]of[_▁]thought|end[_▁]of[_▁]toolresults|end[_▁]of[_▁]instructions)\s*[|\|]>`) +var leakedMetaMarkerPattern = regexp.MustCompile(`(?i)<[\|\x{ff5c}]\s*(?:assistant|tool|end[_▁]of[_▁]sentence|end[_▁]of[_▁]thinking|end[_▁]of[_▁]thought|end[_▁]of[_▁]toolresults|end[_▁]of[_▁]instructions)\s*[\|\x{ff5c}]>`) // leakedAgentXMLBlockPatterns catch agent-style XML blocks that leak through // when the sieve fails to capture them. These are applied only to complete diff --git a/internal/js/chat-stream/http_internal.js b/internal/js/chat-stream/http_internal.js index 247e38c..1c94ced 100644 --- a/internal/js/chat-stream/http_internal.js +++ b/internal/js/chat-stream/http_internal.js @@ -85,6 +85,33 @@ async function fetchStreamPow(req, leaseID) { }; } +async function fetchStreamSwitch(req, leaseID) { + const url = buildInternalGoURL(req); + url.searchParams.set('__stream_switch', '1'); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), + body: Buffer.from(JSON.stringify({ lease_id: leaseID })), + }); + + const text = await upstream.text(); + let body = {}; + try { + body = JSON.parse(text || '{}'); + } catch (_err) { + body = {}; + } + + return { + ok: upstream.ok, + status: upstream.status, + contentType: upstream.headers.get('content-type') || 'application/json', + text, + body, + }; +} + function relayPreparedFailure(res, prep) { if (prep.status === 401 && looksLikeVercelAuthPage(prep.text)) { writeOpenAIError( @@ -223,6 +250,7 @@ module.exports = { readRawBody, fetchStreamPrepare, fetchStreamPow, + fetchStreamSwitch, relayPreparedFailure, safeReadText, buildInternalGoURL, diff --git a/internal/js/chat-stream/sse_parse_impl.js b/internal/js/chat-stream/sse_parse_impl.js index 735c615..9107471 100644 --- a/internal/js/chat-stream/sse_parse_impl.js +++ b/internal/js/chat-stream/sse_parse_impl.js @@ -7,9 +7,9 @@ const { SKIP_EXACT_PATHS, } = require('../shared/deepseek-constants'); -const LEAKED_BOS_MARKER_PATTERN = /<[||]\s*begin[_▁]of[_▁]sentence\s*[||]>/gi; -const LEAKED_THOUGHT_MARKER_PATTERN = /<[||]\s*(?:begin[_▁])?[_▁]*of[_▁]thought\s*[||]>/gi; -const LEAKED_META_MARKER_PATTERN = /<[||]\s*(?:assistant|tool|end[_▁]of[_▁]sentence|end[_▁]of[_▁]thinking|end[_▁]of[_▁]thought|end[_▁]of[_▁]toolresults|end[_▁]of[_▁]instructions)\s*[||]>/gi; +const LEAKED_BOS_MARKER_PATTERN = /<[\|\uFF5C]\s*begin[_▁]of[_▁]sentence\s*[\|\uFF5C]>/gi; +const LEAKED_THOUGHT_MARKER_PATTERN = /<[\|\uFF5C]\s*(?:begin[_▁])?[_▁]*of[_▁]thought\s*[\|\uFF5C]>/gi; +const LEAKED_META_MARKER_PATTERN = /<[\|\uFF5C]\s*(?:assistant|tool|end[_▁]of[_▁]sentence|end[_▁]of[_▁]thinking|end[_▁]of[_▁]thought|end[_▁]of[_▁]toolresults|end[_▁]of[_▁]instructions)\s*[\|\uFF5C]>/gi; diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index 9a9bb0b..6e1d4a8 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -25,6 +25,7 @@ const { isAbortError, fetchStreamPrepare, fetchStreamPow, + fetchStreamSwitch, relayPreparedFailure, createLeaseReleaser, } = require('./http_internal'); @@ -46,11 +47,11 @@ async function handleVercelStream(req, res, rawBody, payload) { } const model = asString(prep.body.model) || asString(payload.model); - const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; + const responseID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; const leaseID = asString(prep.body.lease_id); - const deepseekToken = asString(prep.body.deepseek_token); + let deepseekToken = asString(prep.body.deepseek_token); const initialPowHeader = asString(prep.body.pow_header); - const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; + let completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; const finalPrompt = asString(prep.body.final_prompt); const thinkingEnabled = toBool(prep.body.thinking_enabled); const searchEnabled = toBool(prep.body.search_enabled); @@ -133,13 +134,14 @@ async function handleVercelStream(req, res, rawBody, payload) { } }; const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload, currentPowHeader); + let activeDeepSeekSessionID = responseID; const fetchContinue = async (messageID) => { const powHeader = await refreshPowHeader('continue'); if (!powHeader) { return null; } return fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, { - chat_session_id: sessionID, + chat_session_id: activeDeepSeekSessionID, message_id: messageID, fallback_to_resume: true, }, powHeader); @@ -185,7 +187,7 @@ async function handleVercelStream(req, res, rawBody, payload) { let ended = false; const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({ res, - sessionID, + sessionID: responseID, created, model, isClosed: () => clientClosed, @@ -242,7 +244,7 @@ async function handleVercelStream(req, res, rawBody, payload) { } ended = true; sendFrame({ - id: sessionID, + id: responseID, object: 'chat.completion.chunk', created, model, @@ -261,7 +263,7 @@ async function handleVercelStream(req, res, rawBody, payload) { const processStream = async (initialResponse, allowDeferEmpty) => { let currentResponse = initialResponse; - let continueState = createContinueState(sessionID); + let continueState = createContinueState(activeDeepSeekSessionID); let continueRounds = 0; // eslint-disable-next-line no-constant-condition while (true) { @@ -412,13 +414,39 @@ async function handleVercelStream(req, res, rawBody, payload) { }; let retryAttempts = 0; + let accountSwitchAttempted = false; // eslint-disable-next-line no-constant-condition while (true) { - const processed = await processStream(completionRes, retryAttempts < EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS); + const allowDeferEmpty = retryAttempts < EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS || !accountSwitchAttempted; + const processed = await processStream(completionRes, allowDeferEmpty); if (processed.terminal) { return; } - if (!processed.retryable || retryAttempts >= EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS) { + if (!processed.retryable) { + await finish('stop'); + return; + } + if (retryAttempts >= EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS) { + if (!accountSwitchAttempted) { + accountSwitchAttempted = true; + const switched = await fetchStreamSwitch(req, leaseID); + if (switched.ok && switched.body && switched.body.payload && typeof switched.body.payload === 'object') { + completionPayload = switched.body.payload; + deepseekToken = asString(switched.body.deepseek_token) || deepseekToken; + currentPowHeader = asString(switched.body.pow_header) || currentPowHeader; + activeDeepSeekSessionID = asString(switched.body.session_id) || activeDeepSeekSessionID; + usagePrompt = finalPrompt; + completionRes = await fetchCompletion(completionPayload); + if (completionRes === null) { + return; + } + if (!completionRes.ok || !completionRes.body) { + await finish('stop'); + return; + } + continue; + } + } await finish('stop'); return; } diff --git a/internal/promptcompat/prompt_build_test.go b/internal/promptcompat/prompt_build_test.go index 61c5ede..043e1a8 100644 --- a/internal/promptcompat/prompt_build_test.go +++ b/internal/promptcompat/prompt_build_test.go @@ -113,6 +113,9 @@ func TestBuildOpenAIPromptWithToolInstructionsOnlyOmitsSchemas(t *testing.T) { if strings.Contains(finalPrompt, "You have access to these tools") || strings.Contains(finalPrompt, "Description: search docs") || strings.Contains(finalPrompt, "Parameters:") { t.Fatalf("tool descriptions should be externalized, got: %q", finalPrompt) } + if !strings.Contains(finalPrompt, "Treat DS2API_TOOLS.txt as the authoritative list of callable tools and schemas") { + t.Fatalf("expected instructions-only prompt to point model at tools file, got: %q", finalPrompt) + } if !strings.Contains(finalPrompt, "TOOL CALL FORMAT") || !strings.Contains(finalPrompt, "Remember: The ONLY valid way to use tools") { t.Fatalf("expected tool format instructions to remain in live prompt, got: %q", finalPrompt) } diff --git a/internal/promptcompat/standard_request.go b/internal/promptcompat/standard_request.go index 76b812d..b609800 100644 --- a/internal/promptcompat/standard_request.go +++ b/internal/promptcompat/standard_request.go @@ -11,6 +11,8 @@ type StandardRequest struct { HistoryText string PromptTokenText string CurrentInputFileApplied bool + CurrentInputFileID string + CurrentToolsFileID string ToolsRaw any FinalPrompt string ToolNames []string diff --git a/internal/promptcompat/tool_prompt.go b/internal/promptcompat/tool_prompt.go index d6b6144..4d84076 100644 --- a/internal/promptcompat/tool_prompt.go +++ b/internal/promptcompat/tool_prompt.go @@ -39,6 +39,8 @@ func injectToolPromptWithDescriptions(messages []map[string]any, tools []any, po toolPrompt := parts.Instructions if includeDescriptions && parts.Descriptions != "" { toolPrompt = parts.Descriptions + "\n\n" + toolPrompt + } else if !includeDescriptions && parts.Descriptions != "" { + toolPrompt = "Available tool descriptions and parameter schemas are attached in DS2API_TOOLS.txt. Treat DS2API_TOOLS.txt as the authoritative list of callable tools and schemas; use only tools and parameters listed there.\n\n" + toolPrompt } for i := range messages { diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 97226d9..5c15d32 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -224,6 +224,80 @@ test('vercel stream retries thinking-only output once', async () => { assert.equal(parsed[2].choices[0].finish_reason, 'stop'); }); +test('vercel stream switches managed account after empty retry exhaustion', async () => { + const originalFetch = global.fetch; + const fetchURLs = []; + const completionBodies = []; + const completionAuth = []; + let completionCalls = 0; + global.fetch = async (url, init = {}) => { + const textURL = String(url); + fetchURLs.push(textURL); + if (textURL.includes('__stream_prepare=1')) { + return jsonResponse({ + session_id: 'chatcmpl-test', + lease_id: 'lease-test', + model: 'gpt-test', + final_prompt: 'hello', + thinking_enabled: true, + search_enabled: false, + tool_names: [], + deepseek_token: 'token-1', + pow_header: 'pow-1', + payload: { chat_session_id: 'session-1', prompt: 'hello', ref_file_ids: ['file-1'] }, + }); + } + if (textURL.includes('__stream_pow=1')) { + return jsonResponse({ pow_header: 'pow-retry' }); + } + if (textURL.includes('__stream_switch=1')) { + return jsonResponse({ + session_id: 'session-2', + lease_id: 'lease-test', + model: 'gpt-test', + final_prompt: 'hello', + thinking_enabled: true, + search_enabled: false, + tool_names: [], + deepseek_token: 'token-2', + pow_header: 'pow-2', + payload: { chat_session_id: 'session-2', prompt: 'hello', ref_file_ids: ['file-2'] }, + }); + } + if (textURL.includes('__stream_release=1')) { + return jsonResponse({ success: true }); + } + if (textURL === 'https://chat.deepseek.com/api/v0/chat/completion') { + completionBodies.push(JSON.parse(String(init.body))); + completionAuth.push(init.headers.authorization); + completionCalls += 1; + if (completionCalls <= 2) { + return sseResponse([`data: {"response_message_id":${40 + completionCalls},"p":"response/thinking_content","v":"plan"}\n\n`, 'data: [DONE]\n\n']); + } + return sseResponse(['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n']); + } + throw new Error(`unexpected fetch url: ${textURL}`); + }; + try { + const req = new MockStreamRequest(); + const res = new MockStreamResponse(); + const payload = { model: 'gpt-test', stream: true }; + await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload); + const frames = parseSSEDataFrames(res.bodyText()); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + assert.equal(fetchURLs.filter((url) => url.includes('__stream_switch=1')).length, 1); + assert.equal(completionBodies.length, 3); + assert.match(completionBodies[1].prompt, /Previous reply had no visible output/); + assert.equal(completionBodies[1].parent_message_id, 41); + assert.equal(completionBodies[2].prompt, 'hello'); + assert.deepEqual(completionBodies[2].ref_file_ids, ['file-2']); + assert.deepEqual(completionAuth, ['Bearer token-1', 'Bearer token-1', 'Bearer token-2']); + assert.equal(parsed.at(-1).choices[0].finish_reason, 'stop'); + } finally { + global.fetch = originalFetch; + } +}); + test('vercel stream coalesces many small content deltas while keeping one choice', async () => { const lines = Array.from({ length: 100 }, () => `data: ${JSON.stringify({ p: 'response/content', v: '字' })}\n\n`); lines.push('data: [DONE]\n\n'); @@ -653,6 +727,17 @@ test('parseChunkForContent strips leaked thought control markers from content', assert.deepEqual(parsed.parts, [{ text: 'ABC', type: 'text' }]); }); +test('parseChunkForContent strips fullwidth-delimited leaked control markers from content', () => { + const fw = '\uff5c'; + const chunk = { + p: 'response/content', + v: `<${fw}begin▁of▁sentence${fw}>A<${fw}▁of▁thought${fw}>B<${fw} end_of_sentence ${fw}>C`, + }; + const parsed = parseChunkForContent(chunk, false, 'text'); + assert.equal(parsed.finished, false); + assert.deepEqual(parsed.parts, [{ text: 'ABC', type: 'text' }]); +}); + test('parseChunkForContent detects content_filter status and ignores upstream output tokens', () => { const chunk = { p: 'response',