diff --git a/.env.example b/.env.example index b3db288..21a4d2a 100644 --- a/.env.example +++ b/.env.example @@ -16,9 +16,17 @@ LOG_LEVEL=INFO # Recommended client concurrency is calculated dynamically as: # account_count * DS2API_ACCOUNT_MAX_INFLIGHT # So by default it is account_count * 2. +# Requests beyond inflight slots enter a waiting queue first. +# Default queue size equals recommended concurrency, so 429 starts after: +# account_count * DS2API_ACCOUNT_MAX_INFLIGHT * 2 # Alias: DS2API_ACCOUNT_CONCURRENCY # DS2API_ACCOUNT_MAX_INFLIGHT=2 +# Optional waiting queue size override for managed-key mode. +# Default: recommended_concurrency (same as account_count * inflight_limit) +# Alias: DS2API_ACCOUNT_QUEUE_SIZE +# DS2API_ACCOUNT_MAX_QUEUE=10 + # --------------------------------------------------------------- # Admin auth # --------------------------------------------------------------- @@ -54,9 +62,29 @@ DS2API_ADMIN_KEY=admin # Built admin static assets directory # DS2API_STATIC_ADMIN_DIR=static/admin +# Auto-build WebUI on startup when static/admin is missing. +# Default: enabled on local/Docker, disabled on Vercel. +# DS2API_AUTO_BUILD_WEBUI=true + +# Internal auth secret used by the Vercel hybrid streaming path +# (Go prepare endpoint <-> Node stream function). +# Optional: falls back to DS2API_ADMIN_KEY when unset. +# DS2API_VERCEL_INTERNAL_SECRET=change-me + +# Stream lease TTL seconds for Vercel hybrid streaming. +# During this window, the managed account stays occupied until Node calls release. +# Default: 900 (15 minutes) +# DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS=900 + # --------------------------------------------------------------- # Vercel sync integration (optional) # --------------------------------------------------------------- # VERCEL_TOKEN=your-vercel-token # VERCEL_PROJECT_ID=prj_xxxxxxxxxxxx # VERCEL_TEAM_ID=team_xxxxxxxxxxxx + +# Optional: Vercel deployment protection bypass secret. +# If deployment protection is enabled, DS2API will use this value as +# x-vercel-protection-bypass for internal Node->Go calls on Vercel. +# You can also use VERCEL_AUTOMATION_BYPASS_SECRET directly. +# DS2API_VERCEL_PROTECTION_BYPASS=your-bypass-secret diff --git a/.gitignore b/.gitignore index 2342ad0..8f7ada8 100644 --- a/.gitignore +++ b/.gitignore @@ -64,8 +64,8 @@ pnpm-lock.yaml *.tsbuildinfo .cache/ .parcel-cache/ -static/admin/* -!static/admin/.gitkeep +static/admin/ +internal/webui/assets/admin/ # Environment .env.local diff --git a/API.en.md b/API.en.md index 678e6b0..122d9c9 100644 --- a/API.en.md +++ b/API.en.md @@ -167,6 +167,7 @@ When `tools` is present, DS2API injects a tool prompt and parses tool-call paylo - Non-stream: if detected, returns `message.tool_calls`, `finish_reason=tool_calls`, and `message.content=null` - Stream: to avoid leaking raw tool-call JSON, DS2API buffers text first; if tool call is detected, only structured `delta.tool_calls` is emitted +- Stream `delta.tool_calls` is strict-client compatible: each tool call object includes `index` (starting from `0`) Tool-call response example: diff --git a/API.md b/API.md index 55f34e5..e66ba30 100644 --- a/API.md +++ b/API.md @@ -171,6 +171,7 @@ data: [DONE] - 非流式:若识别到工具调用,返回 `message.tool_calls`,并设置 `finish_reason=tool_calls`,`message.content=null` - 流式:为防止原始 toolcall JSON 泄漏,正文会先缓冲;若识别到工具调用,仅输出结构化 `delta.tool_calls` +- 流式 `delta.tool_calls` 兼容严格客户端:每个 tool call 对象都带 `index`(从 `0` 开始) 工具调用响应示例: diff --git a/DEPLOY.en.md b/DEPLOY.en.md index 503efa7..2d88c27 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -35,6 +35,9 @@ Build WebUI if `/admin` reports missing assets: ```bash ./scripts/build-webui.sh + +# Or rely on startup auto-build (enabled locally by default) +# DS2API_AUTO_BUILD_WEBUI=true go run ./cmd/ds2api ``` ## 2. Docker Deployment @@ -62,7 +65,13 @@ Notes: - Serverless entry: `api/index.go` - Rewrites and cache headers: `vercel.json` -- Legacy `builds` has been removed to avoid the `unused-build-settings` warning +- Build stage runs `npm ci --prefix webui && npm run build --prefix webui` automatically +- `vercel.json` routes `/admin/assets/*` and the `/admin` page to static output, while `/admin/*` APIs still go to `api/index` +- To mitigate Go Runtime streaming buffering, `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) +- `api/chat-stream.js` automatically falls back to the Go entry for non-stream requests or requests with `tools` (internal `__go=1`) +- `api/chat-stream.js` is data-path only (stream relay + SSE conversion); auth/account/session/PoW preparation still comes from an internal Go prepare endpoint (enabled on Vercel only) +- Go prepare creates a stream lease and Node releases it when streaming ends, keeping account occupancy semantics aligned with native Go streaming +- `vercel.json` sets `maxDuration: 300` for both `api/chat-stream.js` and `api/index.go` (subject to your Vercel plan limits) Minimum environment variables: @@ -76,8 +85,17 @@ Optional: - `VERCEL_TEAM_ID` - `DS2API_ACCOUNT_MAX_INFLIGHT` (per-account inflight limit, default `2`) - `DS2API_ACCOUNT_CONCURRENCY` (alias of the same setting) +- `DS2API_ACCOUNT_MAX_QUEUE` (waiting queue limit, default=`recommended_concurrency`) +- `DS2API_ACCOUNT_QUEUE_SIZE` (alias of the same setting) +- `DS2API_VERCEL_INTERNAL_SECRET` (optional internal auth secret for Vercel hybrid streaming path; falls back to `DS2API_ADMIN_KEY` when unset) +- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` (optional stream lease TTL in seconds, default `900`) Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`). +When inflight slots are full, requests are queued first; with default queue size, 429 typically starts around `account_count * 4`. + +Notes: +- `static/admin` build output is not committed +- Vercel/Docker generate WebUI assets during build After deploy, verify: @@ -94,6 +112,60 @@ This repo includes `.github/workflows/release-artifacts.yml`: - Builds Linux/macOS/Windows archives and uploads them to Release Assets - Generates `sha256sums.txt` for integrity checks +## 3.2 Vercel Build Troubleshooting + +If you see an error like: + +```text +Error: Command failed: go build -ldflags -s -w -o .../bootstrap .../main__vc__go__.go +``` + +it is usually caused by invalid Go build flag settings in Vercel +(`-ldflags` not passed as a single argument). + +How to fix: + +1. Open Vercel Project Settings -> Build and Development Settings +2. Clear custom Go Build Flags / Build Command (recommended) +3. If ldflags must be used, set `-ldflags=\"-s -w\"` so it is passed as one argument +4. Ensure `go.mod` uses a supported version (this repo uses `go 1.24`) +5. Redeploy (preferably with cache cleared) + +Another common root cause (Go monorepo + `internal/`): + +```text +... use of internal package ds2api/internal/server not allowed +``` + +This usually happens when the Vercel Go entrypoint imports `internal/...` directly. +This repo now avoids that by using a public bridge package: `api/index.go` -> `ds2api/app` -> `internal/server`. + +If you see: + +```text +No Output Directory named "public" found after the Build completed. +``` + +Vercel is validating frontend output against `public`. This repo builds WebUI into `static/admin`, and uses the parent directory `static` as Vercel output root. +`vercel.json` now explicitly sets: + +```json +"outputDirectory": "static" +``` + +If you manually changed Output Directory in Project Settings, set it to `static` (or clear it and let repo config apply). + +If API responses return Vercel HTML `Authentication Required` (instead of JSON), the request is blocked by Vercel Deployment Protection: + +- Disable protection for that deployment/environment (recommended for public API use) +- Or send `x-vercel-protection-bypass` in requests +- If only internal Node->Go calls are blocked, set `VERCEL_AUTOMATION_BYPASS_SECRET` (or `DS2API_VERCEL_PROTECTION_BYPASS`) + +Vercel streaming note (important): + +- Vercel Go Runtime applies platform-level buffering, so this repo uses a hybrid path on Vercel (`Go prepare + Node stream`) to restore real-time SSE behavior. +- This adaptation is Vercel-only; local and Docker remain pure Go. + ## 4. Reverse Proxy (Nginx) Disable buffering for SSE: diff --git a/DEPLOY.md b/DEPLOY.md index 1ccc080..cb2c7b5 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -35,6 +35,9 @@ go run ./cmd/ds2api ```bash ./scripts/build-webui.sh + +# 或依赖自动构建(默认本地开启) +# DS2API_AUTO_BUILD_WEBUI=true go run ./cmd/ds2api ``` ## 2. Docker 部署 @@ -62,7 +65,13 @@ docker-compose up -d --build - serverless 入口:`api/index.go` - 路由与缓存头:`vercel.json` -- 已移除 legacy `builds` 字段,避免 `unused-build-settings` 警告 +- 构建阶段会自动执行 `npm ci --prefix webui && npm run build --prefix webui` +- `vercel.json` 已将 `/admin/assets/*` 与 `/admin` 页面走静态产物,`/admin/*` API 仍走 `api/index` +- 为缓解 Go Runtime 的流式缓冲,`/v1/chat/completions` 在 Vercel 上会优先走 `api/chat-stream.js`(Node Runtime) +- `api/chat-stream.js` 对非流式请求或 `tools` 请求会自动回退到 Go 入口(内部 `__go=1`) +- `api/chat-stream.js` 仅负责流式数据转发与 SSE 转换;鉴权、账号选择、会话创建、PoW 计算仍由 Go 内部 prepare 接口完成(仅 Vercel 启用) +- Go prepare 会创建流式 lease,Node 在流结束后回调 release;账号占用语义与 Go 原生流式保持一致 +- `vercel.json` 已将 `api/chat-stream.js` 与 `api/index.go` 的 `maxDuration` 设为 `300`(受套餐上限约束) 至少配置环境变量: @@ -76,8 +85,17 @@ docker-compose up -d --build - `VERCEL_TEAM_ID` - `DS2API_ACCOUNT_MAX_INFLIGHT`(每账号并发上限,默认 `2`) - `DS2API_ACCOUNT_CONCURRENCY`(同上别名) +- `DS2API_ACCOUNT_MAX_QUEUE`(等待队列上限,默认=`recommended_concurrency`) +- `DS2API_ACCOUNT_QUEUE_SIZE`(同上别名) +- `DS2API_VERCEL_INTERNAL_SECRET`(可选,Vercel 混合流式链路内部鉴权;未设置时回退使用 `DS2API_ADMIN_KEY`) +- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS`(可选,流式 lease 过期秒数,默认 `900`) 并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。 +当 in-flight 满时,请求先进入等待队列;默认队列上限等于建议并发值,因此默认 429 阈值约为 `账号数量 × 4`。 + +说明: +- 仓库不提交 `static/admin` 构建产物 +- Vercel / Docker 构建阶段自动生成 WebUI 静态文件 部署后建议先访问: @@ -94,6 +112,58 @@ docker-compose up -d --build - 自动构建 Linux/macOS/Windows 二进制包并上传到 Release Assets - 生成 `sha256sums.txt` 供校验 +## 3.2 Vercel 常见报错排查 + +若看到类似报错: + +```text +Error: Command failed: go build -ldflags -s -w -o .../bootstrap .../main__vc__go__.go +``` + +通常是 Vercel 项目里的 Go 构建参数配置不正确(`-ldflags` 没有作为一个整体字符串传递)。 + +处理方式: + +1. 进入 Vercel Project Settings -> Build and Development Settings +2. 清空自定义 Go Build Flags / Build Command(推荐) +3. 若必须设置 ldflags,使用 `-ldflags=\"-s -w\"`(保证它是一个参数) +4. 确认仓库 `go.mod` 为受支持版本(当前为 `go 1.24`) +5. 重新部署(建议 `Redeploy` 并清缓存) + +另一个常见根因(Go 单仓 + `internal/`): + +```text +... use of internal package ds2api/internal/server not allowed +``` + +这通常发生在 Vercel Go 入口文件直接 `import internal/...`。 +当前仓库已通过公开桥接包 `app` 解决:`api/index.go` -> `ds2api/app` -> `internal/server`。 + +若看到类似报错: + +```text +No Output Directory named "public" found after the Build completed. +``` + +说明 Vercel 正在按 `public` 校验前端产物目录。当前仓库会将 WebUI 构建到 `static/admin`,并在 `vercel.json` 使用上级目录 `static` 作为输出根目录: + +```json +"outputDirectory": "static" +``` + +若你在项目设置里手动改过 Output Directory,请同步改为 `static` 或清空让仓库配置生效。 + +若接口返回 Vercel 的 HTML 页面 `Authentication Required`(而不是 JSON),说明被 Vercel Deployment Protection 拦截: + +- 关闭该部署/环境的 Protection(推荐用于公开 API) +- 或给请求加 `x-vercel-protection-bypass` +- 若仅是 Vercel 内部 Node->Go 调用被拦截,可设置 `VERCEL_AUTOMATION_BYPASS_SECRET`(或 `DS2API_VERCEL_PROTECTION_BYPASS`) + +Vercel 流式说明(重要): + +- Vercel 的 Go Runtime 存在平台层响应缓冲,因此本项目在 Vercel 上采用“Go prepare + Node stream”的混合链路来恢复实时 SSE。 +- 该适配只在 Vercel 生效;本地与 Docker 仍走纯 Go 链路。 + ## 4. 反向代理(Nginx) 如果在 Nginx 后挂载,建议关闭缓冲以保证 SSE: diff --git a/README.MD b/README.MD index 84adb97..d21854a 100644 --- a/README.MD +++ b/README.MD @@ -8,7 +8,7 @@ 语言 / Language: [中文](README.MD) | [English](README.en.md) -将 DeepSeek Web 对话能力转换为 OpenAI 与 Claude 兼容 API。当前仓库后端为 **Go 全量实现**,前端保留 React WebUI(构建产物托管于 `static/admin`)。 +将 DeepSeek Web 对话能力转换为 OpenAI 与 Claude 兼容 API。当前仓库后端为 **Go 全量实现**,前端保留 React WebUI(源码在 `webui/`,部署时自动构建到 `static/admin`)。 ## 当前实现边界 @@ -65,7 +65,8 @@ go run ./cmd/ds2api 默认地址:`http://localhost:5001` -如果访问 `/admin` 提示未构建 WebUI,请执行: +本地默认会在启动时自动尝试构建 WebUI(需要本机有 Node.js/npm)。 +若你想手动构建,也可执行: ```bash ./scripts/build-webui.sh @@ -85,12 +86,16 @@ docker-compose logs -f - 入口:`api/index.go` - 路由重写:`vercel.json` +- `vercel.json` 会在构建阶段自动执行 `npm ci --prefix webui && npm run build --prefix webui` +- `/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`(Node Runtime)以保证实时 SSE +- `api/chat-stream.js` 仅负责流式数据转发;鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口处理 +- Go prepare 会下发 `lease_id`,Node 在流结束后调用 release,确保账号占用时长与 Go 原生流式一致 +- WebUI 的“非流式测试”会直接请求 `?__go=1`,避免 Vercel 上 Node 中转导致长请求更易超时 - 至少配置: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON`(JSON 字符串或 Base64) -说明:`vercel.json` 已移除 legacy `builds` 配置,避免部署时出现 -`unused-build-settings` 警告,并使用当前推荐的函数路由模式。 +说明:仓库不提交 `static/admin` 构建产物,Vercel 构建时自动生成并打包。 ## Release 自动构建产物(GitHub Actions) @@ -155,6 +160,8 @@ cp config.example.json config.json | `LOG_LEVEL` | 日志级别:`DEBUG/INFO/WARN/ERROR` | | `DS2API_ACCOUNT_MAX_INFLIGHT` | 每个账号最大并发 in-flight 请求数,默认 `2` | | `DS2API_ACCOUNT_CONCURRENCY` | 同上别名(兼容旧写法) | +| `DS2API_ACCOUNT_MAX_QUEUE` | 等待队列上限,默认等于 `recommended_concurrency` | +| `DS2API_ACCOUNT_QUEUE_SIZE` | 同上别名(兼容旧写法) | | `DS2API_ADMIN_KEY` | Admin 登录密钥,默认 `admin` | | `DS2API_JWT_SECRET` | Admin JWT 签名密钥(可选) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT 过期小时数,默认 `24` | @@ -162,6 +169,9 @@ cp config.example.json config.json | `DS2API_CONFIG_JSON` | 直接注入配置(JSON 或 Base64) | | `DS2API_WASM_PATH` | PoW wasm 文件路径 | | `DS2API_STATIC_ADMIN_DIR` | 管理台静态文件目录 | +| `DS2API_AUTO_BUILD_WEBUI` | 启动时缺失 WebUI 时是否自动执行 npm build(默认:本地开启,Vercel 关闭) | +| `DS2API_VERCEL_INTERNAL_SECRET` | Vercel 混合流式链路内部鉴权密钥(可选;未设置时回退用 `DS2API_ADMIN_KEY`) | +| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Vercel 流式 lease 过期秒数(默认 `900`) | | `VERCEL_TOKEN` | Vercel 同步 token(可选) | | `VERCEL_PROJECT_ID` | Vercel 项目 ID(可选) | | `VERCEL_TEAM_ID` | Vercel 团队 ID(可选) | @@ -179,8 +189,12 @@ cp config.example.json config.json - 系统建议并发值按账号池动态计算:`账号数量 × 每账号并发上限` - 默认每账号并发上限是 `2`,因此默认建议值是 `账号数量 × 2` +- 当 in-flight 槽位满时,请求会进入等待队列,不会立即 429 +- 默认等待队列上限 = `recommended_concurrency`,因此默认总承载上限是 `账号数量 × 4` +- 超过总承载上限(in-flight + waiting)才返回 `429` - 可通过 `DS2API_ACCOUNT_MAX_INFLIGHT`(或 `DS2API_ACCOUNT_CONCURRENCY`)手动覆盖每账号并发上限 -- `GET /admin/queue/status` 会返回 `max_inflight_per_account` 与 `recommended_concurrency` +- 可通过 `DS2API_ACCOUNT_MAX_QUEUE`(或 `DS2API_ACCOUNT_QUEUE_SIZE`)手动覆盖等待队列上限 +- `GET /admin/queue/status` 会返回 `max_inflight_per_account`、`recommended_concurrency`、`waiting`、`max_queue_size` ## Tool Call 适配说明 diff --git a/README.en.md b/README.en.md index 3c3b69d..26e82fb 100644 --- a/README.en.md +++ b/README.en.md @@ -8,7 +8,7 @@ Language: [中文](README.MD) | [English](README.en.md) -DS2API converts DeepSeek Web chat capability into OpenAI-compatible and Claude-compatible APIs. The current repository is **Go backend only** with the existing React WebUI kept as static assets under `static/admin`. +DS2API converts DeepSeek Web chat capability into OpenAI-compatible and Claude-compatible APIs. The current repository is **Go backend only** with the existing React WebUI source in `webui/` and build output generated to `static/admin` during deployment. ## Implementation Boundary @@ -65,7 +65,8 @@ go run ./cmd/ds2api Default URL: `http://localhost:5001` -If `/admin` says WebUI not built: +By default, local startup will auto-build WebUI when `static/admin` is missing (Node.js/npm required). +If you prefer manual build: ```bash ./scripts/build-webui.sh @@ -85,12 +86,16 @@ docker-compose logs -f - Entrypoint: `api/index.go` - Rewrites: `vercel.json` +- `vercel.json` runs `npm ci --prefix webui && npm run build --prefix webui` during build +- `/v1/chat/completions` is routed to `api/chat-stream.js` (Node Runtime) on Vercel to preserve real-time SSE +- `api/chat-stream.js` is data-path only; auth/account/session/PoW preparation still comes from an internal Go prepare endpoint +- Go prepare returns a `lease_id`; Node releases it at stream end so account occupancy duration stays aligned with native Go streaming behavior +- WebUI non-stream test calls `?__go=1` directly to avoid extra Node hop timeout risk on long Vercel requests - Minimum env vars: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON` (raw JSON or Base64) -Note: legacy `builds` has been removed from `vercel.json` to avoid -the `unused-build-settings` warning and to follow the current function routing model. +Note: build artifacts under `static/admin` are not committed; Vercel generates them during build. ## Release Artifact Automation (GitHub Actions) @@ -155,6 +160,8 @@ cp config.example.json config.json | `LOG_LEVEL` | `DEBUG/INFO/WARN/ERROR` | | `DS2API_ACCOUNT_MAX_INFLIGHT` | Max in-flight requests per managed account, default `2` | | `DS2API_ACCOUNT_CONCURRENCY` | Alias of the same setting (legacy compatibility) | +| `DS2API_ACCOUNT_MAX_QUEUE` | Waiting queue limit (managed-key mode), default=`recommended_concurrency` | +| `DS2API_ACCOUNT_QUEUE_SIZE` | Alias of the same setting (legacy compatibility) | | `DS2API_ADMIN_KEY` | Admin login key, default `admin` | | `DS2API_JWT_SECRET` | Admin JWT signing secret (optional) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT TTL in hours, default `24` | @@ -162,6 +169,9 @@ cp config.example.json config.json | `DS2API_CONFIG_JSON` | Inline config (JSON or Base64) | | `DS2API_WASM_PATH` | PoW wasm path | | `DS2API_STATIC_ADMIN_DIR` | Admin static assets dir | +| `DS2API_AUTO_BUILD_WEBUI` | Auto run npm build on startup when WebUI assets are missing (default: enabled locally, disabled on Vercel) | +| `DS2API_VERCEL_INTERNAL_SECRET` | Internal auth secret for Vercel hybrid streaming path (optional; falls back to `DS2API_ADMIN_KEY` if unset) | +| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Stream lease TTL seconds for Vercel hybrid streaming (default `900`) | | `VERCEL_TOKEN` | Vercel sync token (optional) | | `VERCEL_PROJECT_ID` | Vercel project ID (optional) | | `VERCEL_TEAM_ID` | Vercel team ID (optional) | @@ -179,8 +189,12 @@ Optional header: `X-Ds2-Target-Account` to pin one managed account. - DS2API computes recommended concurrency dynamically as: `account_count * per_account_inflight_limit` - Default per-account inflight limit is `2`, so default recommendation is `account_count * 2` +- When inflight slots are full, requests enter a waiting queue instead of immediate 429 +- Default queue limit equals `recommended_concurrency`, so default 429 threshold is about `account_count * 4` +- 429 is returned only after total load exceeds `inflight + waiting` capacity - You can override per-account inflight via `DS2API_ACCOUNT_MAX_INFLIGHT` (or `DS2API_ACCOUNT_CONCURRENCY`) -- `GET /admin/queue/status` returns both `max_inflight_per_account` and `recommended_concurrency` +- You can override waiting queue size via `DS2API_ACCOUNT_MAX_QUEUE` (or `DS2API_ACCOUNT_QUEUE_SIZE`) +- `GET /admin/queue/status` returns `max_inflight_per_account`, `recommended_concurrency`, `waiting`, and `max_queue_size` ## Tool Call Adaptation diff --git a/api/chat-stream.js b/api/chat-stream.js new file mode 100644 index 0000000..8879a47 --- /dev/null +++ b/api/chat-stream.js @@ -0,0 +1,1086 @@ +'use strict'; + +const crypto = require('crypto'); + +const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; + +const BASE_HEADERS = { + Host: 'chat.deepseek.com', + 'User-Agent': 'DeepSeek/1.6.11 Android/35', + Accept: 'application/json', + 'Content-Type': 'application/json', + 'x-client-platform': 'android', + 'x-client-version': '1.6.11', + 'x-client-locale': 'zh_CN', + 'accept-charset': 'UTF-8', +}; + +const SKIP_PATTERNS = [ + 'quasi_status', + 'elapsed_secs', + 'token_usage', + 'pending_fragment', + 'conversation_mode', + 'fragments/-1/status', + 'fragments/-2/status', + 'fragments/-3/status', +]; + +module.exports = async function handler(req, res) { + setCorsHeaders(res); + if (req.method === 'OPTIONS') { + res.statusCode = 204; + res.end(); + return; + } + if (req.method !== 'POST') { + writeOpenAIError(res, 405, 'method not allowed'); + return; + } + + const rawBody = await readRawBody(req); + + // Hard guard: only use Node data path for streaming on Vercel runtime. + // Any non-Vercel runtime always falls back to Go for full behavior parity. + if (!isVercelRuntime()) { + await proxyToGo(req, res, rawBody); + return; + } + + let payload; + try { + payload = JSON.parse(rawBody.toString('utf8') || '{}'); + } catch (_err) { + writeOpenAIError(res, 400, 'invalid json'); + return; + } + + // Keep all non-stream behavior on Go side to avoid compatibility regressions. + if (!toBool(payload.stream) || (Array.isArray(payload.tools) && payload.tools.length > 0)) { + await proxyToGo(req, res, rawBody); + return; + } + + const prep = await fetchStreamPrepare(req, rawBody); + if (!prep.ok) { + relayPreparedFailure(res, prep); + return; + } + + const model = asString(prep.body.model) || asString(payload.model); + const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; + const leaseID = asString(prep.body.lease_id); + const deepseekToken = asString(prep.body.deepseek_token); + const powHeader = asString(prep.body.pow_header); + const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; + const finalPrompt = asString(prep.body.final_prompt); + const thinkingEnabled = toBool(prep.body.thinking_enabled); + const searchEnabled = toBool(prep.body.search_enabled); + const toolNames = extractToolNames(payload.tools); + + if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) { + writeOpenAIError(res, 500, 'invalid vercel prepare response'); + return; + } + const releaseLease = createLeaseReleaser(req, leaseID); + try { + const completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { + method: 'POST', + headers: { + ...BASE_HEADERS, + authorization: `Bearer ${deepseekToken}`, + 'x-ds-pow-response': powHeader, + }, + body: JSON.stringify(completionPayload), + }); + + if (!completionRes.ok || !completionRes.body) { + const detail = await safeReadText(completionRes); + writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.'); + return; + } + + res.statusCode = 200; + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + if (typeof res.flushHeaders === 'function') { + res.flushHeaders(); + } + + const created = Math.floor(Date.now() / 1000); + let firstChunkSent = false; + let currentType = thinkingEnabled ? 'thinking' : 'text'; + let thinkingText = ''; + let outputText = ''; + const toolSieveEnabled = toolNames.length > 0; + const toolSieveState = createToolSieveState(); + let toolCallsEmitted = false; + const decoder = new TextDecoder(); + const reader = completionRes.body.getReader(); + let buffered = ''; + let ended = false; + + const sendFrame = (obj) => { + res.write(`data: ${JSON.stringify(obj)}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + }; + + const sendDeltaFrame = (delta) => { + const payloadDelta = { ...delta }; + if (!firstChunkSent) { + payloadDelta.role = 'assistant'; + firstChunkSent = true; + } + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta: payloadDelta, index: 0 }], + }); + }; + + const finish = async (reason) => { + if (ended) { + return; + } + ended = true; + if (toolSieveEnabled) { + const tailEvents = flushToolSieve(toolSieveState, toolNames); + for (const evt of tailEvents) { + if (evt.type === 'tool_calls') { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls) }); + continue; + } + if (evt.text) { + sendDeltaFrame({ content: evt.text }); + } + } + } + if (toolCallsEmitted) { + reason = 'tool_calls'; + } + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta: {}, index: 0, finish_reason: reason }], + usage: buildUsage(finalPrompt, thinkingText, outputText), + }); + res.write('data: [DONE]\n\n'); + await releaseLease(); + res.end(); + }; + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + buffered += decoder.decode(value, { stream: true }); + const lines = buffered.split('\n'); + buffered = lines.pop() || ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line.startsWith('data:')) { + continue; + } + const dataStr = line.slice(5).trim(); + if (!dataStr) { + continue; + } + if (dataStr === '[DONE]') { + await finish('stop'); + return; + } + let chunk; + try { + chunk = JSON.parse(dataStr); + } catch (_err) { + continue; + } + if (chunk.error || chunk.code === 'content_filter') { + await finish('content_filter'); + return; + } + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType); + currentType = parsed.newType; + if (parsed.finished) { + await finish('stop'); + return; + } + + for (const p of parsed.parts) { + if (!p.text) { + continue; + } + if (searchEnabled && isCitation(p.text)) { + continue; + } + if (p.type === 'thinking') { + thinkingText += p.text; + sendDeltaFrame({ reasoning_content: p.text }); + } else { + outputText += p.text; + if (!toolSieveEnabled) { + sendDeltaFrame({ content: p.text }); + continue; + } + const events = processToolSieveChunk(toolSieveState, p.text, toolNames); + for (const evt of events) { + if (evt.type === 'tool_calls') { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls) }); + continue; + } + if (evt.text) { + sendDeltaFrame({ content: evt.text }); + } + } + } + } + } + } + await finish('stop'); + } catch (_err) { + await finish('stop'); + } + } finally { + await releaseLease(); + } +}; + +function setCorsHeaders(res) { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Credentials', 'true'); + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, DELETE'); + res.setHeader( + 'Access-Control-Allow-Headers', + 'Content-Type, Authorization, X-API-Key, X-Ds2-Target-Account, X-Vercel-Protection-Bypass', + ); +} + +function header(req, key) { + if (!req || !req.headers) { + return ''; + } + return asString(req.headers[key.toLowerCase()]); +} + +async function readRawBody(req) { + if (Buffer.isBuffer(req.body)) { + return req.body; + } + if (typeof req.body === 'string') { + return Buffer.from(req.body); + } + if (req.body && typeof req.body === 'object') { + return Buffer.from(JSON.stringify(req.body)); + } + const chunks = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks); +} + +async function fetchStreamPrepare(req, rawBody) { + const url = buildInternalGoURL(req); + url.searchParams.set('__stream_prepare', '1'); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), + body: rawBody, + }); + + const text = await upstream.text(); + let body = {}; + try { + body = JSON.parse(text || '{}'); + } catch (_err) { + body = {}; + } + + return { + ok: upstream.ok, + status: upstream.status, + contentType: upstream.headers.get('content-type') || 'application/json', + text, + body, + }; +} + +function relayPreparedFailure(res, prep) { + if (prep.status === 401 && looksLikeVercelAuthPage(prep.text)) { + writeOpenAIError( + res, + 401, + 'Vercel Deployment Protection blocked internal prepare request. Disable protection for this deployment or set VERCEL_AUTOMATION_BYPASS_SECRET.', + ); + return; + } + res.statusCode = prep.status || 500; + res.setHeader('Content-Type', prep.contentType || 'application/json'); + if (prep.text) { + res.end(prep.text); + return; + } + writeOpenAIError(res, prep.status || 500, 'vercel prepare failed'); +} + +async function safeReadText(resp) { + if (!resp) { + return ''; + } + try { + const text = await resp.text(); + return text.trim(); + } catch (_err) { + return ''; + } +} + +function internalSecret() { + return asString(process.env.DS2API_VERCEL_INTERNAL_SECRET) || asString(process.env.DS2API_ADMIN_KEY) || 'admin'; +} + +function buildInternalGoURL(req) { + const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; + const host = asString(header(req, 'host')); + const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); + url.searchParams.set('__go', '1'); + const protectionBypass = resolveProtectionBypass(req); + if (protectionBypass) { + url.searchParams.set('x-vercel-protection-bypass', protectionBypass); + } + return url; +} + +function buildInternalGoHeaders(req, opts = {}) { + const headers = { + authorization: asString(header(req, 'authorization')), + 'x-api-key': asString(header(req, 'x-api-key')), + 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), + 'x-vercel-protection-bypass': resolveProtectionBypass(req), + }; + if (opts.withInternalToken) { + headers['x-ds2-internal-token'] = internalSecret(); + } + if (opts.withContentType) { + headers['content-type'] = asString(header(req, 'content-type')) || 'application/json'; + } + return headers; +} + +function createLeaseReleaser(req, leaseID) { + let released = false; + return async () => { + if (released || !leaseID) { + return; + } + released = true; + try { + await releaseStreamLease(req, leaseID); + } catch (_err) { + // Ignore release errors. Lease TTL cleanup on Go side still prevents permanent leaks. + } + }; +} + +async function releaseStreamLease(req, leaseID) { + const url = buildInternalGoURL(req); + url.searchParams.set('__stream_release', '1'); + const body = Buffer.from(JSON.stringify({ lease_id: leaseID })); + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 1500); + try { + await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), + body, + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } +} + +function resolveProtectionBypass(req) { + const fromHeader = asString(header(req, 'x-vercel-protection-bypass')); + if (fromHeader) { + return fromHeader; + } + return asString(process.env.VERCEL_AUTOMATION_BYPASS_SECRET) || asString(process.env.DS2API_VERCEL_PROTECTION_BYPASS); +} + +function looksLikeVercelAuthPage(text) { + const body = asString(text).toLowerCase(); + if (!body) { + return false; + } + return body.includes('authentication required') && body.includes('vercel'); +} + +function parseChunkForContent(chunk, thinkingEnabled, currentType) { + if (!chunk || typeof chunk !== 'object' || !Object.prototype.hasOwnProperty.call(chunk, 'v')) { + return { parts: [], finished: false, newType: currentType }; + } + const pathValue = asString(chunk.p); + if (shouldSkipPath(pathValue)) { + return { parts: [], finished: false, newType: currentType }; + } + if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { + return { parts: [], finished: true, newType: currentType }; + } + + let newType = currentType; + const parts = []; + + if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) { + for (const frag of chunk.v) { + if (!frag || typeof frag !== 'object') { + continue; + } + const fragType = asString(frag.type).toUpperCase(); + const content = asString(frag.content); + if (!content) { + continue; + } + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (fragType === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: 'text' }); + } + } + } + + if (pathValue === 'response' && Array.isArray(chunk.v)) { + for (const item of chunk.v) { + if (!item || typeof item !== 'object') { + continue; + } + if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) { + for (const frag of item.v) { + const fragType = asString(frag && frag.type).toUpperCase(); + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + } else if (fragType === 'RESPONSE') { + newType = 'text'; + } + } + } + } + } + + let partType = 'text'; + if (pathValue === 'response/thinking_content') { + partType = 'thinking'; + } else if (pathValue === 'response/content') { + partType = 'text'; + } else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) { + partType = newType; + } else if (!pathValue && thinkingEnabled) { + partType = newType; + } + + const val = chunk.v; + if (typeof val === 'string') { + if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) { + return { parts: [], finished: true, newType }; + } + if (val) { + parts.push({ text: val, type: partType }); + } + return { parts, finished: false, newType }; + } + + if (Array.isArray(val)) { + for (const entry of val) { + if (typeof entry === 'string') { + if (entry) { + parts.push({ text: entry, type: partType }); + } + continue; + } + if (!entry || typeof entry !== 'object') { + continue; + } + if (asString(entry.p) === 'status' && asString(entry.v) === 'FINISHED') { + return { parts: [], finished: true, newType }; + } + const content = asString(entry.content); + if (!content) { + continue; + } + const t = asString(entry.type).toUpperCase(); + if (t === 'THINK' || t === 'THINKING') { + parts.push({ text: content, type: 'thinking' }); + } else if (t === 'RESPONSE') { + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: partType }); + } + } + return { parts, finished: false, newType }; + } + + if (val && typeof val === 'object') { + const resp = val.response && typeof val.response === 'object' ? val.response : val; + if (Array.isArray(resp.fragments)) { + for (const frag of resp.fragments) { + if (!frag || typeof frag !== 'object') { + continue; + } + const content = asString(frag.content); + if (!content) { + continue; + } + const t = asString(frag.type).toUpperCase(); + if (t === 'THINK' || t === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (t === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: partType }); + } + } + } + } + return { parts, finished: false, newType }; +} + +function shouldSkipPath(pathValue) { + if (pathValue === 'response/search_status') { + return true; + } + for (const p of SKIP_PATTERNS) { + if (pathValue.includes(p)) { + return true; + } + } + return false; +} + +function isCitation(text) { + return asString(text).trim().startsWith('[citation:'); +} + +function buildUsage(prompt, thinking, output) { + const promptTokens = estimateTokens(prompt); + const reasoningTokens = estimateTokens(thinking); + const completionTokens = estimateTokens(output); + return { + prompt_tokens: promptTokens, + completion_tokens: reasoningTokens + completionTokens, + total_tokens: promptTokens + reasoningTokens + completionTokens, + completion_tokens_details: { + reasoning_tokens: reasoningTokens, + }, + }; +} + +function estimateTokens(text) { + const t = asString(text); + if (!t) { + return 0; + } + const n = Math.floor(Array.from(t).length / 4); + return n < 1 ? 1 : n; +} + +async function proxyToGo(req, res, rawBody) { + const url = buildInternalGoURL(req); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withContentType: true }), + body: rawBody, + }); + + res.statusCode = upstream.status; + upstream.headers.forEach((value, key) => { + if (key.toLowerCase() === 'content-length') { + return; + } + res.setHeader(key, value); + }); + const bytes = Buffer.from(await upstream.arrayBuffer()); + res.end(bytes); +} + +function writeOpenAIError(res, status, message) { + res.statusCode = status; + res.setHeader('Content-Type', 'application/json'); + res.end( + JSON.stringify({ + error: { + message, + type: openAIErrorType(status), + }, + }), + ); +} + +function openAIErrorType(status) { + switch (status) { + case 400: + return 'invalid_request_error'; + case 401: + return 'authentication_error'; + case 403: + return 'permission_error'; + case 429: + return 'rate_limit_error'; + case 503: + return 'service_unavailable_error'; + default: + return status >= 500 ? 'api_error' : 'invalid_request_error'; + } +} + +function toBool(v) { + return v === true; +} + +function isVercelRuntime() { + return asString(process.env.VERCEL) !== '' || asString(process.env.NOW_REGION) !== ''; +} + +function asString(v) { + if (typeof v === 'string') { + return v.trim(); + } + if (Array.isArray(v)) { + return asString(v[0]); + } + if (v == null) { + return ''; + } + return String(v).trim(); +} + +function extractToolNames(tools) { + if (!Array.isArray(tools) || tools.length === 0) { + return []; + } + const out = []; + for (const t of tools) { + if (!t || typeof t !== 'object') { + continue; + } + const fn = t.function && typeof t.function === 'object' ? t.function : t; + const name = asString(fn.name); + if (name) { + out.push(name); + } + } + return out; +} + +function createToolSieveState() { + return { + pending: '', + capture: '', + capturing: false, + }; +} + +function processToolSieveChunk(state, chunk, toolNames) { + if (!state) { + return []; + } + if (chunk) { + state.pending += chunk; + } + const events = []; + // eslint-disable-next-line no-constant-condition + while (true) { + if (state.capturing) { + if (state.pending) { + state.capture += state.pending; + state.pending = ''; + } + const consumed = consumeToolCapture(state.capture, toolNames); + if (!consumed.ready) { + break; + } + state.capture = ''; + state.capturing = false; + if (consumed.prefix) { + events.push({ type: 'text', text: consumed.prefix }); + } + if (Array.isArray(consumed.calls) && consumed.calls.length > 0) { + events.push({ type: 'tool_calls', calls: consumed.calls }); + } + if (consumed.suffix) { + state.pending += consumed.suffix; + } + continue; + } + + if (!state.pending) { + break; + } + + const start = findToolSegmentStart(state.pending); + if (start >= 0) { + const prefix = state.pending.slice(0, start); + if (prefix) { + events.push({ type: 'text', text: prefix }); + } + state.capture = state.pending.slice(start); + state.pending = ''; + state.capturing = true; + continue; + } + + const [safe, hold] = splitSafeContent(state.pending, 64); + if (!safe) { + break; + } + state.pending = hold; + events.push({ type: 'text', text: safe }); + } + return events; +} + +function flushToolSieve(state, toolNames) { + if (!state) { + return []; + } + const events = processToolSieveChunk(state, '', toolNames); + if (state.capturing) { + const consumed = consumeToolCapture(state.capture, toolNames); + if (consumed.ready) { + if (consumed.prefix) { + events.push({ type: 'text', text: consumed.prefix }); + } + if (Array.isArray(consumed.calls) && consumed.calls.length > 0) { + events.push({ type: 'tool_calls', calls: consumed.calls }); + } + if (consumed.suffix) { + events.push({ type: 'text', text: consumed.suffix }); + } + } else if (state.capture) { + events.push({ type: 'text', text: state.capture }); + } + state.capture = ''; + state.capturing = false; + } + if (state.pending) { + events.push({ type: 'text', text: state.pending }); + state.pending = ''; + } + return events; +} + +function splitSafeContent(s, holdChars) { + const chars = Array.from(s || ''); + if (chars.length <= holdChars) { + return ['', s]; + } + return [chars.slice(0, chars.length - holdChars).join(''), chars.slice(chars.length - holdChars).join('')]; +} + +function findToolSegmentStart(s) { + if (!s) { + return -1; + } + const lower = s.toLowerCase(); + const keyIdx = lower.indexOf('tool_calls'); + if (keyIdx < 0) { + return -1; + } + const start = s.slice(0, keyIdx).lastIndexOf('{'); + return start >= 0 ? start : keyIdx; +} + +function consumeToolCapture(captured, toolNames) { + if (!captured) { + return { ready: false, prefix: '', calls: [], suffix: '' }; + } + const lower = captured.toLowerCase(); + const keyIdx = lower.indexOf('tool_calls'); + if (keyIdx < 0) { + if (Array.from(captured).length >= 256) { + return { ready: true, prefix: captured, calls: [], suffix: '' }; + } + return { ready: false, prefix: '', calls: [], suffix: '' }; + } + const start = captured.slice(0, keyIdx).lastIndexOf('{'); + if (start < 0) { + if (Array.from(captured).length >= 512) { + return { ready: true, prefix: captured, calls: [], suffix: '' }; + } + return { ready: false, prefix: '', calls: [], suffix: '' }; + } + const obj = extractJSONObjectFrom(captured, start); + if (!obj.ok) { + if (Array.from(captured).length >= 4096) { + return { ready: true, prefix: captured, calls: [], suffix: '' }; + } + return { ready: false, prefix: '', calls: [], suffix: '' }; + } + const parsed = parseToolCalls(captured.slice(start, obj.end), toolNames); + if (parsed.length === 0) { + return { + ready: true, + prefix: captured.slice(0, obj.end), + calls: [], + suffix: captured.slice(obj.end), + }; + } + return { + ready: true, + prefix: captured.slice(0, start), + calls: parsed, + suffix: captured.slice(obj.end), + }; +} + +function extractJSONObjectFrom(text, start) { + if (!text || start < 0 || start >= text.length || text[start] !== '{') { + return { ok: false, end: 0 }; + } + let depth = 0; + let quote = ''; + let escaped = false; + for (let i = start; i < text.length; i += 1) { + const ch = text[i]; + if (quote) { + if (escaped) { + escaped = false; + continue; + } + if (ch === '\\') { + escaped = true; + continue; + } + if (ch === quote) { + quote = ''; + } + continue; + } + if (ch === '"' || ch === "'") { + quote = ch; + continue; + } + if (ch === '{') { + depth += 1; + continue; + } + if (ch === '}') { + depth -= 1; + if (depth === 0) { + return { ok: true, end: i + 1 }; + } + } + } + return { ok: false, end: 0 }; +} + +function parseToolCalls(text, toolNames) { + if (!asString(text)) { + return []; + } + const candidates = buildToolCallCandidates(text); + let parsed = []; + for (const c of candidates) { + parsed = parseToolCallsPayload(c); + if (parsed.length > 0) { + break; + } + } + if (parsed.length === 0) { + return []; + } + const allowed = new Set((toolNames || []).filter(Boolean)); + const out = []; + for (const tc of parsed) { + if (!tc || !tc.name) { + continue; + } + if (allowed.size > 0 && !allowed.has(tc.name)) { + continue; + } + out.push({ name: tc.name, input: tc.input || {} }); + } + if (out.length === 0 && parsed.length > 0) { + for (const tc of parsed) { + if (!tc || !tc.name) { + continue; + } + out.push({ name: tc.name, input: tc.input || {} }); + } + } + return out; +} + +function buildToolCallCandidates(text) { + const trimmed = asString(text); + const candidates = [trimmed]; + const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)\s*```/gi) || []; + for (const block of fenced) { + const m = block.match(/```(?:json)?\s*([\s\S]*?)\s*```/i); + if (m && m[1]) { + candidates.push(asString(m[1])); + } + } + const keyIdx = trimmed.toLowerCase().indexOf('tool_calls'); + if (keyIdx >= 0) { + const start = trimmed.slice(0, keyIdx).lastIndexOf('{'); + if (start >= 0) { + const obj = extractJSONObjectFrom(trimmed, start); + if (obj.ok) { + candidates.push(asString(trimmed.slice(start, obj.end))); + } + } + } + const first = trimmed.indexOf('{'); + const last = trimmed.lastIndexOf('}'); + if (first >= 0 && last > first) { + candidates.push(asString(trimmed.slice(first, last + 1))); + } + return [...new Set(candidates.filter(Boolean))]; +} + +function parseToolCallsPayload(payload) { + let decoded; + try { + decoded = JSON.parse(payload); + } catch (_err) { + return []; + } + if (Array.isArray(decoded)) { + return parseToolCallList(decoded); + } + if (!decoded || typeof decoded !== 'object') { + return []; + } + if (decoded.tool_calls) { + return parseToolCallList(decoded.tool_calls); + } + const one = parseToolCallItem(decoded); + return one ? [one] : []; +} + +function parseToolCallList(v) { + if (!Array.isArray(v)) { + return []; + } + const out = []; + for (const item of v) { + if (!item || typeof item !== 'object') { + continue; + } + const one = parseToolCallItem(item); + if (one) { + out.push(one); + } + } + return out; +} + +function parseToolCallItem(m) { + let name = asString(m.name); + let inputRaw = m.input; + let hasInput = Object.prototype.hasOwnProperty.call(m, 'input'); + const fn = m.function && typeof m.function === 'object' ? m.function : null; + if (fn) { + if (!name) { + name = asString(fn.name); + } + if (!hasInput && Object.prototype.hasOwnProperty.call(fn, 'arguments')) { + inputRaw = fn.arguments; + hasInput = true; + } + } + if (!hasInput) { + for (const k of ['arguments', 'args', 'parameters', 'params']) { + if (Object.prototype.hasOwnProperty.call(m, k)) { + inputRaw = m[k]; + hasInput = true; + break; + } + } + } + if (!name) { + return null; + } + return { + name, + input: parseToolCallInput(inputRaw), + }; +} + +function parseToolCallInput(v) { + if (v == null) { + return {}; + } + if (typeof v === 'string') { + const raw = asString(v); + if (!raw) { + return {}; + } + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { + return parsed; + } + } catch (_err) { + return { _raw: raw }; + } + return {}; + } + if (typeof v === 'object' && !Array.isArray(v)) { + return v; + } + try { + const parsed = JSON.parse(JSON.stringify(v)); + if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) { + return parsed; + } + } catch (_err) { + return {}; + } + return {}; +} + +function formatOpenAIStreamToolCalls(calls) { + if (!Array.isArray(calls) || calls.length === 0) { + return []; + } + return calls.map((c, idx) => ({ + index: idx, + id: `call_${newCallID()}`, + type: 'function', + function: { + name: c.name, + arguments: JSON.stringify(c.input || {}), + }, + })); +} + +function newCallID() { + if (typeof crypto.randomUUID === 'function') { + return crypto.randomUUID().replace(/-/g, ''); + } + return `${Date.now()}${Math.floor(Math.random() * 1e9)}`; +} diff --git a/api/index.go b/api/index.go index 326e83f..147ce32 100644 --- a/api/index.go +++ b/api/index.go @@ -4,17 +4,17 @@ import ( "net/http" "sync" - "ds2api/internal/server" + "ds2api/app" ) var ( once sync.Once - app *server.App + h http.Handler ) func Handler(w http.ResponseWriter, r *http.Request) { once.Do(func() { - app = server.NewApp() + h = app.NewHandler() }) - app.Router.ServeHTTP(w, r) + h.ServeHTTP(w, r) } diff --git a/app/handler.go b/app/handler.go new file mode 100644 index 0000000..a8979fd --- /dev/null +++ b/app/handler.go @@ -0,0 +1,11 @@ +package app + +import ( + "net/http" + + "ds2api/internal/server" +) + +func NewHandler() http.Handler { + return server.NewApp().Router +} diff --git a/cmd/ds2api/main.go b/cmd/ds2api/main.go index 7c9ad68..5a3a0ca 100644 --- a/cmd/ds2api/main.go +++ b/cmd/ds2api/main.go @@ -7,9 +7,11 @@ import ( "ds2api/internal/config" "ds2api/internal/server" + "ds2api/internal/webui" ) func main() { + webui.EnsureBuiltOnStartup() app := server.NewApp() port := strings.TrimSpace(os.Getenv("PORT")) if port == "" { diff --git a/internal/account/pool.go b/internal/account/pool.go index ded6893..665bcee 100644 --- a/internal/account/pool.go +++ b/internal/account/pool.go @@ -1,6 +1,7 @@ package account import ( + "context" "os" "sort" "strconv" @@ -11,12 +12,14 @@ import ( ) type Pool struct { - store *config.Store - mu sync.Mutex - queue []string - inUse map[string]int - maxInflightPerAccount int + store *config.Store + mu sync.Mutex + queue []string + inUse map[string]int + waiters []chan struct{} + maxInflightPerAccount int recommendedConcurrency int + maxQueueSize int } func NewPool(store *config.Store) *Pool { @@ -47,25 +50,64 @@ func (p *Pool) Reset() { } } recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount) + queueLimit := maxQueueFromEnv(recommended) p.mu.Lock() defer p.mu.Unlock() + p.drainWaitersLocked() p.queue = ids p.inUse = map[string]int{} p.recommendedConcurrency = recommended + p.maxQueueSize = queueLimit config.Logger.Info( "[init_account_queue] initialized", "total", len(ids), "max_inflight_per_account", p.maxInflightPerAccount, "recommended_concurrency", p.recommendedConcurrency, + "max_queue_size", p.maxQueueSize, ) } func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) { p.mu.Lock() defer p.mu.Unlock() - if exclude == nil { - exclude = map[string]bool{} + return p.acquireLocked(target, normalizeExclude(exclude)) +} + +func (p *Pool) AcquireWait(ctx context.Context, target string, exclude map[string]bool) (config.Account, bool) { + if ctx == nil { + ctx = context.Background() } + exclude = normalizeExclude(exclude) + for { + if ctx.Err() != nil { + return config.Account{}, false + } + + p.mu.Lock() + if acc, ok := p.acquireLocked(target, exclude); ok { + p.mu.Unlock() + return acc, true + } + if !p.canQueueLocked(target, exclude) { + p.mu.Unlock() + return config.Account{}, false + } + waiter := make(chan struct{}) + p.waiters = append(p.waiters, waiter) + p.mu.Unlock() + + select { + case <-ctx.Done(): + p.mu.Lock() + p.removeWaiterLocked(waiter) + p.mu.Unlock() + return config.Account{}, false + case <-waiter: + } + } +} + +func (p *Pool) acquireLocked(target string, exclude map[string]bool) (config.Account, bool) { if target != "" { if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount { return config.Account{}, false @@ -131,9 +173,11 @@ func (p *Pool) Release(accountID string) { } if count == 1 { delete(p.inUse, accountID) + p.notifyWaiterLocked() return } p.inUse[accountID] = count - 1 + p.notifyWaiterLocked() } func (p *Pool) Status() map[string]any { @@ -162,6 +206,8 @@ func (p *Pool) Status() map[string]any { "in_use_accounts": inUseAccounts, "max_inflight_per_account": p.maxInflightPerAccount, "recommended_concurrency": p.recommendedConcurrency, + "waiting": len(p.waiters), + "max_queue_size": p.maxQueueSize, } } @@ -188,3 +234,69 @@ func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int } return accountCount * maxInflightPerAccount } + +func normalizeExclude(exclude map[string]bool) map[string]bool { + if exclude == nil { + return map[string]bool{} + } + return exclude +} + +func (p *Pool) canQueueLocked(target string, exclude map[string]bool) bool { + if target != "" { + if exclude[target] { + return false + } + if _, ok := p.store.FindAccount(target); !ok { + return false + } + } + if p.maxQueueSize <= 0 { + return false + } + return len(p.waiters) < p.maxQueueSize +} + +func (p *Pool) notifyWaiterLocked() { + if len(p.waiters) == 0 { + return + } + waiter := p.waiters[0] + p.waiters = p.waiters[1:] + close(waiter) +} + +func (p *Pool) removeWaiterLocked(waiter chan struct{}) bool { + for i, w := range p.waiters { + if w != waiter { + continue + } + p.waiters = append(p.waiters[:i], p.waiters[i+1:]...) + return true + } + return false +} + +func (p *Pool) drainWaitersLocked() { + for _, waiter := range p.waiters { + close(waiter) + } + p.waiters = nil +} + +func maxQueueFromEnv(defaultSize int) int { + for _, key := range []string{"DS2API_ACCOUNT_MAX_QUEUE", "DS2API_ACCOUNT_QUEUE_SIZE"} { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + continue + } + n, err := strconv.Atoi(raw) + if err == nil && n >= 0 { + return n + } + } + if defaultSize < 0 { + return 0 + } + return defaultSize +} diff --git a/internal/account/pool_test.go b/internal/account/pool_test.go index 82c9ced..59ea32b 100644 --- a/internal/account/pool_test.go +++ b/internal/account/pool_test.go @@ -1,8 +1,10 @@ package account import ( + "context" "sync" "testing" + "time" "ds2api/internal/config" ) @@ -10,6 +12,9 @@ import ( func newPoolForTest(t *testing.T, maxInflight string) *Pool { t.Helper() t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight) + t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "") + t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "") + t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "") t.Setenv("DS2API_CONFIG_JSON", `{ "keys":["k1"], "accounts":[ @@ -21,6 +26,33 @@ func newPoolForTest(t *testing.T, maxInflight string) *Pool { return NewPool(store) } +func newSingleAccountPoolForTest(t *testing.T, maxInflight string) *Pool { + t.Helper() + t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight) + t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "") + t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "") + t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "") + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[{"email":"acc1@example.com","token":"token1"}] + }`) + return NewPool(config.LoadStore()) +} + +func waitForWaitingCount(t *testing.T, pool *Pool, want int) { + t.Helper() + deadline := time.Now().Add(800 * time.Millisecond) + for time.Now().Before(deadline) { + status := pool.Status() + if got, ok := status["waiting"].(int); ok && got == want { + return + } + time.Sleep(10 * time.Millisecond) + } + status := pool.Status() + t.Fatalf("waiting count did not reach %d, current status=%v", want, status) +} + func TestPoolRoundRobinWithConcurrentSlots(t *testing.T) { pool := newPoolForTest(t, "2") @@ -118,6 +150,9 @@ func TestPoolStatusRecommendedConcurrencyDefault(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 4 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 4 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } } func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) { @@ -130,6 +165,9 @@ func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 6 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 6 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } } func TestPoolAccountConcurrencyAliasEnv(t *testing.T) { @@ -151,4 +189,108 @@ func TestPoolAccountConcurrencyAliasEnv(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 8 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 8 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } +} + +func TestPoolSupportsTokenOnlyAccount(t *testing.T) { + t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", "1") + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[{"token":"token-only-account"}] + }`) + + pool := NewPool(config.LoadStore()) + status := pool.Status() + if got, ok := status["total"].(int); !ok || got != 1 { + t.Fatalf("unexpected total in pool status: %#v", status["total"]) + } + if got, ok := status["available"].(int); !ok || got != 1 { + t.Fatalf("unexpected available in pool status: %#v", status["available"]) + } + + acc, ok := pool.Acquire("", nil) + if !ok { + t.Fatalf("expected acquire success for token-only account") + } + if acc.Token != "token-only-account" { + t.Fatalf("unexpected token on acquired account: %q", acc.Token) + } +} + +func TestPoolAcquireWaitQueuesAndSucceedsAfterRelease(t *testing.T) { + pool := newSingleAccountPoolForTest(t, "1") + first, ok := pool.Acquire("", nil) + if !ok { + t.Fatal("expected first acquire to succeed") + } + + type result struct { + id string + ok bool + } + resCh := make(chan result, 1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + go func() { + acc, ok := pool.AcquireWait(ctx, "", nil) + resCh <- result{id: acc.Identifier(), ok: ok} + }() + + waitForWaitingCount(t, pool, 1) + pool.Release(first.Identifier()) + + select { + case res := <-resCh: + if !res.ok { + t.Fatal("expected queued acquire to succeed after release") + } + if res.id != "acc1@example.com" { + t.Fatalf("unexpected account id from queued acquire: %q", res.id) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for queued acquire result") + } +} + +func TestPoolAcquireWaitQueueLimitReturnsFalse(t *testing.T) { + pool := newSingleAccountPoolForTest(t, "1") + first, ok := pool.Acquire("", nil) + if !ok { + t.Fatal("expected first acquire to succeed") + } + + type result struct { + id string + ok bool + } + firstWaiter := make(chan result, 1) + ctx1, cancel1 := context.WithTimeout(context.Background(), 1200*time.Millisecond) + defer cancel1() + go func() { + acc, ok := pool.AcquireWait(ctx1, "", nil) + firstWaiter <- result{id: acc.Identifier(), ok: ok} + }() + waitForWaitingCount(t, pool, 1) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel2() + start := time.Now() + if _, ok := pool.AcquireWait(ctx2, "", nil); ok { + t.Fatal("expected second queued acquire to fail when queue is full") + } + if time.Since(start) > 120*time.Millisecond { + t.Fatalf("queue-full acquire should fail fast, took %s", time.Since(start)) + } + + pool.Release(first.Identifier()) + select { + case res := <-firstWaiter: + if !res.ok { + t.Fatal("expected first queued acquire to succeed after release") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for first queued acquire") + } } diff --git a/internal/adapter/claude/handler.go b/internal/adapter/claude/handler.go index 391fd83..8abd829 100644 --- a/internal/adapter/claude/handler.go +++ b/internal/adapter/claude/handler.go @@ -230,19 +230,22 @@ func collectDeepSeek(resp *http.Response, thinkingEnabled bool) (string, string) func (h *Handler) writeClaudeStream(w http.ResponseWriter, r *http.Request, model string, messages []any, fullText string, detected []util.ParsedToolCall) { w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") - flusher, ok := w.(http.Flusher) - if !ok { - writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"type": "api_error", "message": "streaming unsupported"}}) - return + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + canFlush := rc.Flush() == nil + if !canFlush { + config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered") } send := func(v any) { b, _ := json.Marshal(v) _, _ = w.Write([]byte("data: ")) _, _ = w.Write(b) _, _ = w.Write([]byte("\n\n")) - flusher.Flush() + if canFlush { + _ = rc.Flush() + } } messageID := fmt.Sprintf("msg_%d", time.Now().UnixNano()) inputTokens := util.EstimateTokens(fmt.Sprintf("%v", messages)) diff --git a/internal/adapter/openai/handler.go b/internal/adapter/openai/handler.go index d507849..78e9164 100644 --- a/internal/adapter/openai/handler.go +++ b/internal/adapter/openai/handler.go @@ -3,11 +3,17 @@ package openai import ( "bufio" "context" + "crypto/rand" + "crypto/subtle" + "encoding/hex" "encoding/json" "fmt" "io" "net/http" + "os" + "strconv" "strings" + "sync" "time" "github.com/go-chi/chi/v5" @@ -23,6 +29,25 @@ type Handler struct { Store *config.Store Auth *auth.Resolver DS *deepseek.Client + + leaseMu sync.Mutex + streamLeases map[string]streamLease +} + +type streamLease struct { + Auth *auth.RequestAuth + ExpiresAt time.Time +} + +type toolStreamSieveState struct { + pending strings.Builder + capture strings.Builder + capturing bool +} + +type toolStreamEvent struct { + Content string + ToolCalls []util.ParsedToolCall } func RegisterRoutes(r chi.Router, h *Handler) { @@ -35,6 +60,15 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) { } func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { + if isVercelStreamReleaseRequest(r) { + h.handleVercelStreamRelease(w, r) + return + } + if isVercelStreamPrepareRequest(r) { + h.handleVercelStreamPrepare(w, r) + return + } + a, err := h.Auth.Determine(r) if err != nil { status := http.StatusUnauthorized @@ -106,6 +140,142 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { h.handleNonStream(w, r.Context(), resp, sessionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames) } +func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + h.sweepExpiredStreamLeases() + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + a, err := h.Auth.Determine(r) + if err != nil { + status := http.StatusUnauthorized + if err == auth.ErrNoAccount { + status = http.StatusTooManyRequests + } + writeOpenAIError(w, status, err.Error()) + return + } + leased := false + defer func() { + if !leased { + h.Auth.Release(a) + } + }() + r = r.WithContext(auth.WithAuth(r.Context(), a)) + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + if !toBool(req["stream"]) { + writeOpenAIError(w, http.StatusBadRequest, "stream must be true") + return + } + if tools, ok := req["tools"].([]any); ok && len(tools) > 0 { + writeOpenAIError(w, http.StatusBadRequest, "tools are not supported by vercel stream prepare") + return + } + + model, _ := req["model"].(string) + messagesRaw, _ := req["messages"].([]any) + if model == "" || len(messagesRaw) == 0 { + writeOpenAIError(w, http.StatusBadRequest, "Request must include 'model' and 'messages'.") + return + } + thinkingEnabled, searchEnabled, ok := config.GetModelConfig(model) + if !ok { + writeOpenAIError(w, http.StatusServiceUnavailable, fmt.Sprintf("Model '%s' is not available.", model)) + return + } + + messages := normalizeMessages(messagesRaw) + finalPrompt := util.MessagesPrepare(messages) + + sessionID, err := h.DS.CreateSession(r.Context(), a, 3) + if err != nil { + if a.UseConfigToken { + writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") + } else { + writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") + } + return + } + powHeader, err := h.DS.GetPow(r.Context(), a, 3) + if err != nil { + writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).") + return + } + if strings.TrimSpace(a.DeepSeekToken) == "" { + writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") + return + } + + payload := map[string]any{ + "chat_session_id": sessionID, + "parent_message_id": nil, + "prompt": finalPrompt, + "ref_file_ids": []any{}, + "thinking_enabled": thinkingEnabled, + "search_enabled": searchEnabled, + } + leaseID := h.holdStreamLease(a) + if leaseID == "" { + writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") + return + } + leased = true + writeJSON(w, http.StatusOK, map[string]any{ + "session_id": sessionID, + "lease_id": leaseID, + "model": model, + "final_prompt": finalPrompt, + "thinking_enabled": thinkingEnabled, + "search_enabled": searchEnabled, + "deepseek_token": a.DeepSeekToken, + "pow_header": powHeader, + "payload": payload, + }) +} + +func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + h.sweepExpiredStreamLeases() + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + leaseID, _ := req["lease_id"].(string) + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") + return + } + if !h.releaseStreamLease(leaseID) { + writeOpenAIError(w, http.StatusNotFound, "stream lease not found") + return + } + writeJSON(w, http.StatusOK, map[string]any{"success": true}) +} + func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { @@ -191,12 +361,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt return } w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") - flusher, ok := w.(http.Flusher) - if !ok { - writeOpenAIError(w, http.StatusInternalServerError, "streaming unsupported") - return + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + canFlush := rc.Flush() == nil + if !canFlush { + config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered") } lines := make(chan []byte, 128) @@ -216,6 +387,8 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt created := time.Now().Unix() firstChunkSent := false bufferToolContent := len(toolNames) > 0 + var toolSieve toolStreamSieveState + toolCallsEmitted := false currentType := "text" if thinkingEnabled { currentType = "thinking" @@ -226,27 +399,32 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt hasContent := false keepaliveTicker := time.NewTicker(time.Duration(deepseek.KeepAliveTimeout) * time.Second) defer keepaliveTicker.Stop() + keepaliveCountWithoutContent := 0 sendChunk := func(v any) { b, _ := json.Marshal(v) _, _ = w.Write([]byte("data: ")) _, _ = w.Write(b) _, _ = w.Write([]byte("\n\n")) - flusher.Flush() + if canFlush { + _ = rc.Flush() + } } sendDone := func() { _, _ = w.Write([]byte("data: [DONE]\n\n")) - flusher.Flush() + if canFlush { + _ = rc.Flush() + } } finalize := func(finishReason string) { finalThinking := thinking.String() finalText := text.String() detected := util.ParseToolCalls(finalText, toolNames) - if len(detected) > 0 { + if len(detected) > 0 && !toolCallsEmitted { finishReason = "tool_calls" delta := map[string]any{ - "tool_calls": util.FormatOpenAIToolCalls(detected), + "tool_calls": util.FormatOpenAIStreamToolCalls(detected), } if !firstChunkSent { delta["role"] = "assistant" @@ -259,21 +437,29 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt "model": model, "choices": []map[string]any{{"delta": delta, "index": 0}}, }) - } else if bufferToolContent && strings.TrimSpace(finalText) != "" { - delta := map[string]any{ - "content": finalText, + } else if bufferToolContent { + for _, evt := range flushToolSieve(&toolSieve, toolNames) { + if evt.Content == "" { + continue + } + delta := map[string]any{ + "content": evt.Content, + } + if !firstChunkSent { + delta["role"] = "assistant" + firstChunkSent = true + } + sendChunk(map[string]any{ + "id": completionID, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": []map[string]any{{"delta": delta, "index": 0}}, + }) } - if !firstChunkSent { - delta["role"] = "assistant" - firstChunkSent = true - } - sendChunk(map[string]any{ - "id": completionID, - "object": "chat.completion.chunk", - "created": created, - "model": model, - "choices": []map[string]any{{"delta": delta, "index": 0}}, - }) + } + if len(detected) > 0 || toolCallsEmitted { + finishReason = "tool_calls" } promptTokens := util.EstimateTokens(finalPrompt) reasoningTokens := util.EstimateTokens(finalThinking) @@ -301,12 +487,21 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt case <-r.Context().Done(): return case <-keepaliveTicker.C: + if !hasContent { + keepaliveCountWithoutContent++ + if keepaliveCountWithoutContent >= deepseek.MaxKeepaliveCount { + finalize("stop") + return + } + } if hasContent && time.Since(lastContent) > time.Duration(deepseek.StreamIdleTimeout)*time.Second { finalize("stop") return } - _, _ = w.Write([]byte(": keep-alive\n\n")) - flusher.Flush() + if canFlush { + _, _ = w.Write([]byte(": keep-alive\n\n")) + _ = rc.Flush() + } case line, ok := <-lines: if !ok { // Ensure scanner completion is observed only after all queued @@ -343,6 +538,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt } hasContent = true lastContent = time.Now() + keepaliveCountWithoutContent = 0 delta := map[string]any{} if !firstChunkSent { delta["role"] = "assistant" @@ -357,6 +553,41 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt text.WriteString(p.Text) if !bufferToolContent { delta["content"] = p.Text + } else { + events := processToolSieveChunk(&toolSieve, p.Text, toolNames) + if len(events) == 0 { + // Keep thinking delta only frame. + } + for _, evt := range events { + if len(evt.ToolCalls) > 0 { + toolCallsEmitted = true + tcDelta := map[string]any{ + "tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls), + } + if !firstChunkSent { + tcDelta["role"] = "assistant" + firstChunkSent = true + } + newChoices = append(newChoices, map[string]any{ + "delta": tcDelta, + "index": 0, + }) + continue + } + if evt.Content != "" { + contentDelta := map[string]any{ + "content": evt.Content, + } + if !firstChunkSent { + contentDelta["role"] = "assistant" + firstChunkSent = true + } + newChoices = append(newChoices, map[string]any{ + "delta": contentDelta, + "index": 0, + }) + } + } } } if len(delta) > 0 { @@ -469,3 +700,344 @@ func openAIErrorType(status int) string { return "invalid_request_error" } } + +func isVercelStreamPrepareRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" +} + +func isVercelStreamReleaseRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" +} + +func vercelInternalSecret() string { + if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { + return v + } + if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" { + return v + } + return "admin" +} + +func shouldEmitBufferedToolProbeContent(buffered string) bool { + trimmed := strings.TrimSpace(buffered) + if trimmed == "" { + return false + } + normalized := normalizeToolProbePrefix(trimmed) + if normalized == "" { + return false + } + first := normalized[0] + switch first { + case '{', '[', '`': + lower := strings.ToLower(normalized) + if strings.Contains(lower, "tool_calls") { + return false + } + // Keep a short hold window for JSON-ish starts to avoid leaking tool JSON. + if len([]rune(normalized)) < 20 { + return false + } + return true + default: + // Natural language starts can be streamed immediately. + return true + } +} + +func normalizeToolProbePrefix(s string) string { + t := strings.TrimSpace(s) + if strings.HasPrefix(t, "```") { + t = strings.TrimPrefix(t, "```") + t = strings.TrimSpace(t) + t = strings.TrimPrefix(strings.ToLower(t), "json") + t = strings.TrimSpace(t) + } + return t +} + +func processToolSieveChunk(state *toolStreamSieveState, chunk string, toolNames []string) []toolStreamEvent { + if state == nil || chunk == "" { + return nil + } + state.pending.WriteString(chunk) + events := make([]toolStreamEvent, 0, 2) + + for { + if state.capturing { + if state.pending.Len() > 0 { + state.capture.WriteString(state.pending.String()) + state.pending.Reset() + } + prefix, calls, suffix, ready := consumeToolCapture(state.capture.String(), toolNames) + if !ready { + break + } + state.capture.Reset() + state.capturing = false + if prefix != "" { + events = append(events, toolStreamEvent{Content: prefix}) + } + if len(calls) > 0 { + events = append(events, toolStreamEvent{ToolCalls: calls}) + } + if suffix != "" { + state.pending.WriteString(suffix) + } + continue + } + + pending := state.pending.String() + if pending == "" { + break + } + start := findToolSegmentStart(pending) + if start >= 0 { + prefix := pending[:start] + if prefix != "" { + events = append(events, toolStreamEvent{Content: prefix}) + } + state.pending.Reset() + state.capture.WriteString(pending[start:]) + state.capturing = true + continue + } + + safe, hold := splitSafeContent(pending, 64) + if safe == "" { + break + } + state.pending.Reset() + state.pending.WriteString(hold) + events = append(events, toolStreamEvent{Content: safe}) + } + + return events +} + +func flushToolSieve(state *toolStreamSieveState, toolNames []string) []toolStreamEvent { + if state == nil { + return nil + } + events := processToolSieveChunk(state, "", toolNames) + if state.capturing { + raw := state.capture.String() + state.capture.Reset() + state.capturing = false + if raw != "" { + events = append(events, toolStreamEvent{Content: raw}) + } + } + if state.pending.Len() > 0 { + events = append(events, toolStreamEvent{Content: state.pending.String()}) + state.pending.Reset() + } + return events +} + +func splitSafeContent(s string, holdRunes int) (safe, hold string) { + if s == "" || holdRunes <= 0 { + return s, "" + } + runes := []rune(s) + if len(runes) <= holdRunes { + return "", s + } + return string(runes[:len(runes)-holdRunes]), string(runes[len(runes)-holdRunes:]) +} + +func findToolSegmentStart(s string) int { + if s == "" { + return -1 + } + lower := strings.ToLower(s) + keyIdx := strings.Index(lower, "tool_calls") + if keyIdx < 0 { + return -1 + } + if start := strings.LastIndex(s[:keyIdx], "{"); start >= 0 { + return start + } + return keyIdx +} + +func consumeToolCapture(captured string, toolNames []string) (prefix string, calls []util.ParsedToolCall, suffix string, ready bool) { + if captured == "" { + return "", nil, "", false + } + lower := strings.ToLower(captured) + keyIdx := strings.Index(lower, "tool_calls") + if keyIdx < 0 { + if len([]rune(captured)) >= 256 { + return captured, nil, "", true + } + return "", nil, "", false + } + start := strings.LastIndex(captured[:keyIdx], "{") + if start < 0 { + if len([]rune(captured)) >= 512 { + return captured, nil, "", true + } + return "", nil, "", false + } + obj, end, ok := extractJSONObjectFrom(captured, start) + if !ok { + if len([]rune(captured)) >= 4096 { + return captured, nil, "", true + } + return "", nil, "", false + } + parsed := util.ParseToolCalls(obj, toolNames) + if len(parsed) == 0 { + return captured[:end], nil, captured[end:], true + } + return captured[:start], parsed, captured[end:], true +} + +func extractJSONObjectFrom(text string, start int) (string, int, bool) { + if start < 0 || start >= len(text) || text[start] != '{' { + return "", 0, false + } + depth := 0 + quote := byte(0) + escaped := false + for i := start; i < len(text); i++ { + ch := text[i] + if quote != 0 { + if escaped { + escaped = false + continue + } + if ch == '\\' { + escaped = true + continue + } + if ch == quote { + quote = 0 + } + continue + } + if ch == '"' || ch == '\'' { + quote = ch + continue + } + if ch == '{' { + depth++ + continue + } + if ch == '}' { + depth-- + if depth == 0 { + end := i + 1 + return text[start:end], end, true + } + } + } + return "", 0, false +} + +func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { + if a == nil { + return "" + } + now := time.Now() + ttl := streamLeaseTTL() + if ttl <= 0 { + ttl = 15 * time.Minute + } + + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(now) + if h.streamLeases == nil { + h.streamLeases = make(map[string]streamLease) + } + leaseID := newLeaseID() + h.streamLeases[leaseID] = streamLease{ + Auth: a, + ExpiresAt: now.Add(ttl), + } + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) + return leaseID +} + +func (h *Handler) releaseStreamLease(leaseID string) bool { + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + return false + } + + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(time.Now()) + lease, ok := h.streamLeases[leaseID] + if ok { + delete(h.streamLeases, leaseID) + } + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) + + if !ok { + return false + } + if h.Auth != nil { + h.Auth.Release(lease.Auth) + } + return true +} + +func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth { + if len(h.streamLeases) == 0 { + return nil + } + expired := make([]*auth.RequestAuth, 0) + for leaseID, lease := range h.streamLeases { + if now.After(lease.ExpiresAt) { + delete(h.streamLeases, leaseID) + expired = append(expired, lease.Auth) + } + } + return expired +} + +func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) { + if h.Auth == nil || len(expired) == 0 { + return + } + for _, a := range expired { + h.Auth.Release(a) + } +} + +func (h *Handler) sweepExpiredStreamLeases() { + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(time.Now()) + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) +} + +func streamLeaseTTL() time.Duration { + raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS")) + if raw == "" { + return 15 * time.Minute + } + seconds, err := strconv.Atoi(raw) + if err != nil || seconds <= 0 { + return 15 * time.Minute + } + return time.Duration(seconds) * time.Second +} + +func newLeaseID() string { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err == nil { + return hex.EncodeToString(buf) + } + return fmt.Sprintf("lease-%d", time.Now().UnixNano()) +} diff --git a/internal/adapter/openai/handler_toolcall_test.go b/internal/adapter/openai/handler_toolcall_test.go index df39d51..e3cfc7d 100644 --- a/internal/adapter/openai/handler_toolcall_test.go +++ b/internal/adapter/openai/handler_toolcall_test.go @@ -209,6 +209,24 @@ func TestHandleStreamToolCallInterceptsWithoutRawContentLeak(t *testing.T) { if !streamHasToolCallsDelta(frames) { t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String()) } + foundToolIndex := false + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + toolCalls, _ := delta["tool_calls"].([]any) + for _, tc := range toolCalls { + tcm, _ := tc.(map[string]any) + if _, ok := tcm["index"].(float64); ok { + foundToolIndex = true + } + } + } + } + if !foundToolIndex { + t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String()) + } if streamHasRawToolJSONContent(frames) { t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String()) } @@ -236,6 +254,24 @@ func TestHandleStreamReasonerToolCallInterceptsWithoutRawContentLeak(t *testing. if !streamHasToolCallsDelta(frames) { t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String()) } + foundToolIndex := false + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + toolCalls, _ := delta["tool_calls"].([]any) + for _, tc := range toolCalls { + tcm, _ := tc.(map[string]any) + if _, ok := tcm["index"].(float64); ok { + foundToolIndex = true + } + } + } + } + if !foundToolIndex { + t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String()) + } if streamHasRawToolJSONContent(frames) { t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String()) } @@ -277,7 +313,106 @@ func TestHandleStreamUnknownToolStillIntercepted(t *testing.T) { if !streamHasToolCallsDelta(frames) { t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String()) } + foundToolIndex := false + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + toolCalls, _ := delta["tool_calls"].([]any) + for _, tc := range toolCalls { + tcm, _ := tc.(map[string]any) + if _, ok := tcm["index"].(float64); ok { + foundToolIndex = true + } + } + } + } + if !foundToolIndex { + t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String()) + } if streamHasRawToolJSONContent(frames) { t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String()) } } + +func TestHandleStreamToolsPlainTextStreamsBeforeFinish(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/content","v":"你好,"}`, + `data: {"p":"response/content","v":"这是普通文本回复。"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid6", "deepseek-chat", "prompt", false, false, []string{"search"}) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + if streamHasToolCallsDelta(frames) { + t.Fatalf("did not expect tool_calls delta for plain text: %s", rec.Body.String()) + } + content := strings.Builder{} + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + content.WriteString(c) + } + } + } + if got := content.String(); got == "" { + t.Fatalf("expected streamed content in tool mode plain text, body=%s", rec.Body.String()) + } + if streamFinishReason(frames) != "stop" { + t.Fatalf("expected finish_reason=stop, body=%s", rec.Body.String()) + } +} + +func TestHandleStreamToolCallMixedWithPlainTextSegments(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/content","v":"前置正文A。"}`, + `data: {"p":"response/content","v":"{\"tool_calls\":[{\"name\":\"search\",\"input\":{\"q\":\"go\"}}]}"}`, + `data: {"p":"response/content","v":"后置正文B。"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid7", "deepseek-chat", "prompt", false, false, []string{"search"}) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + if !streamHasToolCallsDelta(frames) { + t.Fatalf("expected tool_calls delta in mixed stream, body=%s", rec.Body.String()) + } + if streamHasRawToolJSONContent(frames) { + t.Fatalf("raw tool_calls JSON leaked in mixed stream: %s", rec.Body.String()) + } + content := strings.Builder{} + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + content.WriteString(c) + } + } + } + got := content.String() + if !strings.Contains(got, "前置正文A。") || !strings.Contains(got, "后置正文B。") { + t.Fatalf("expected pre/post plain text to pass sieve, got=%q", got) + } + if streamFinishReason(frames) != "tool_calls" { + t.Fatalf("expected finish_reason=tool_calls, body=%s", rec.Body.String()) + } +} diff --git a/internal/adapter/openai/vercel_prepare_test.go b/internal/adapter/openai/vercel_prepare_test.go new file mode 100644 index 0000000..0dfaf28 --- /dev/null +++ b/internal/adapter/openai/vercel_prepare_test.go @@ -0,0 +1,83 @@ +package openai + +import ( + "ds2api/internal/auth" + "net/http/httptest" + "testing" + "time" +) + +func TestIsVercelStreamPrepareRequest(t *testing.T) { + req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_prepare=1", nil) + if !isVercelStreamPrepareRequest(req) { + t.Fatalf("expected prepare request to be detected") + } + + req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil) + if isVercelStreamPrepareRequest(req2) { + t.Fatalf("expected non-prepare request") + } +} + +func TestIsVercelStreamReleaseRequest(t *testing.T) { + req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_release=1", nil) + if !isVercelStreamReleaseRequest(req) { + t.Fatalf("expected release request to be detected") + } + + req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil) + if isVercelStreamReleaseRequest(req2) { + t.Fatalf("expected non-release request") + } +} + +func TestVercelInternalSecret(t *testing.T) { + t.Run("prefer explicit secret", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") + t.Setenv("DS2API_ADMIN_KEY", "admin-fallback") + if got := vercelInternalSecret(); got != "stream-secret" { + t.Fatalf("expected explicit secret, got %q", got) + } + }) + + t.Run("fallback to admin key", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "") + t.Setenv("DS2API_ADMIN_KEY", "admin-fallback") + if got := vercelInternalSecret(); got != "admin-fallback" { + t.Fatalf("expected admin key fallback, got %q", got) + } + }) + + t.Run("default admin when env missing", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "") + t.Setenv("DS2API_ADMIN_KEY", "") + if got := vercelInternalSecret(); got != "admin" { + t.Fatalf("expected default admin fallback, got %q", got) + } + }) +} + +func TestStreamLeaseLifecycle(t *testing.T) { + h := &Handler{} + leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}) + if leaseID == "" { + t.Fatalf("expected non-empty lease id") + } + if ok := h.releaseStreamLease(leaseID); !ok { + t.Fatalf("expected lease release success") + } + if ok := h.releaseStreamLease(leaseID); ok { + t.Fatalf("expected duplicate release to fail") + } +} + +func TestStreamLeaseTTL(t *testing.T) { + t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "120") + if got := streamLeaseTTL(); got != 120*time.Second { + t.Fatalf("expected ttl=120s, got %v", got) + } + t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "invalid") + if got := streamLeaseTTL(); got != 15*time.Minute { + t.Fatalf("expected default ttl on invalid value, got %v", got) + } +} diff --git a/internal/admin/handler.go b/internal/admin/handler.go index 6985fba..1ca4378 100644 --- a/internal/admin/handler.go +++ b/internal/admin/handler.go @@ -873,7 +873,20 @@ func toStringSlice(v any) ([]string, bool) { } func toAccount(m map[string]any) config.Account { - return config.Account{Email: strings.TrimSpace(fmt.Sprintf("%v", m["email"])), Mobile: strings.TrimSpace(fmt.Sprintf("%v", m["mobile"])), Password: strings.TrimSpace(fmt.Sprintf("%v", m["password"])), Token: strings.TrimSpace(fmt.Sprintf("%v", m["token"]))} + return config.Account{ + Email: fieldString(m, "email"), + Mobile: fieldString(m, "mobile"), + Password: fieldString(m, "password"), + Token: fieldString(m, "token"), + } +} + +func fieldString(m map[string]any, key string) string { + v, ok := m[key] + if !ok || v == nil { + return "" + } + return strings.TrimSpace(fmt.Sprintf("%v", v)) } func statusOr(v int, d int) int { diff --git a/internal/admin/handler_test.go b/internal/admin/handler_test.go new file mode 100644 index 0000000..b0df9e3 --- /dev/null +++ b/internal/admin/handler_test.go @@ -0,0 +1,28 @@ +package admin + +import "testing" + +func TestToAccountMissingFieldsRemainEmpty(t *testing.T) { + acc := toAccount(map[string]any{ + "email": "user@example.com", + "password": "secret", + }) + if acc.Email != "user@example.com" { + t.Fatalf("unexpected email: %q", acc.Email) + } + if acc.Mobile != "" { + t.Fatalf("expected empty mobile, got %q", acc.Mobile) + } + if acc.Token != "" { + t.Fatalf("expected empty token, got %q", acc.Token) + } +} + +func TestFieldStringNilToEmpty(t *testing.T) { + if got := fieldString(map[string]any{"token": nil}, "token"); got != "" { + t.Fatalf("expected empty string for nil field, got %q", got) + } + if got := fieldString(map[string]any{}, "token"); got != "" { + t.Fatalf("expected empty string for missing field, got %q", got) + } +} diff --git a/internal/auth/request.go b/internal/auth/request.go index d36f4b1..ea3d7f1 100644 --- a/internal/auth/request.go +++ b/internal/auth/request.go @@ -50,7 +50,7 @@ func (r *Resolver) Determine(req *http.Request) (*RequestAuth, error) { return &RequestAuth{UseConfigToken: false, DeepSeekToken: callerKey, resolver: r, TriedAccounts: map[string]bool{}}, nil } target := strings.TrimSpace(req.Header.Get("X-Ds2-Target-Account")) - acc, ok := r.Pool.Acquire(target, nil) + acc, ok := r.Pool.AcquireWait(ctx, target, nil) if !ok { return nil, ErrNoAccount } diff --git a/internal/config/config.go b/internal/config/config.go index e7b2d62..c1f25ad 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,7 +1,9 @@ package config import ( + "crypto/sha256" "encoding/base64" + "encoding/hex" "encoding/json" "errors" "log/slog" @@ -41,7 +43,17 @@ func (a Account) Identifier() string { if strings.TrimSpace(a.Email) != "" { return strings.TrimSpace(a.Email) } - return strings.TrimSpace(a.Mobile) + if strings.TrimSpace(a.Mobile) != "" { + return strings.TrimSpace(a.Mobile) + } + // Backward compatibility: old configs may contain token-only accounts. + // Use a stable non-sensitive synthetic id so they can still join the pool. + token := strings.TrimSpace(a.Token) + if token == "" { + return "" + } + sum := sha256.Sum256([]byte(token)) + return "token:" + hex.EncodeToString(sum[:8]) } type Config struct { diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..1f22cd4 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,41 @@ +package config + +import ( + "strings" + "testing" +) + +func TestAccountIdentifierFallsBackToTokenHash(t *testing.T) { + acc := Account{Token: "example-token-value"} + id := acc.Identifier() + if !strings.HasPrefix(id, "token:") { + t.Fatalf("expected token-prefixed identifier, got %q", id) + } + if len(id) != len("token:")+16 { + t.Fatalf("unexpected identifier length: %d (%q)", len(id), id) + } +} + +func TestStoreFindAccountWithTokenOnlyIdentifier(t *testing.T) { + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[{"token":"token-only-account"}] + }`) + + store := LoadStore() + accounts := store.Accounts() + if len(accounts) != 1 { + t.Fatalf("expected 1 account, got %d", len(accounts)) + } + id := accounts[0].Identifier() + if id == "" { + t.Fatalf("expected synthetic identifier for token-only account") + } + found, ok := store.FindAccount(id) + if !ok { + t.Fatalf("expected FindAccount to locate token-only account by synthetic id") + } + if found.Token != "token-only-account" { + t.Fatalf("unexpected token value: %q", found.Token) + } +} diff --git a/internal/deepseek/assets/sha3_wasm_bg.7b9ca65ddd.wasm b/internal/deepseek/assets/sha3_wasm_bg.7b9ca65ddd.wasm new file mode 100644 index 0000000..ac92b1d Binary files /dev/null and b/internal/deepseek/assets/sha3_wasm_bg.7b9ca65ddd.wasm differ diff --git a/internal/deepseek/embedded_pow.go b/internal/deepseek/embedded_pow.go new file mode 100644 index 0000000..13dd05a --- /dev/null +++ b/internal/deepseek/embedded_pow.go @@ -0,0 +1,6 @@ +package deepseek + +import _ "embed" + +//go:embed assets/sha3_wasm_bg.7b9ca65ddd.wasm +var embeddedWASM []byte diff --git a/internal/deepseek/pow.go b/internal/deepseek/pow.go index f2d1982..5ad0089 100644 --- a/internal/deepseek/pow.go +++ b/internal/deepseek/pow.go @@ -33,8 +33,11 @@ func (p *PowSolver) init(ctx context.Context) error { p.once.Do(func() { wasmBytes, err := os.ReadFile(p.wasmPath) if err != nil { - p.err = err - return + if len(embeddedWASM) == 0 { + p.err = err + return + } + wasmBytes = embeddedWASM } p.runtime = wazero.NewRuntime(ctx) p.compiled, p.err = p.runtime.CompileModule(ctx, wasmBytes) diff --git a/internal/util/toolcalls.go b/internal/util/toolcalls.go index a594a6a..9b9d4e6 100644 --- a/internal/util/toolcalls.go +++ b/internal/util/toolcalls.go @@ -298,3 +298,20 @@ func FormatOpenAIToolCalls(calls []ParsedToolCall) []map[string]any { } return out } + +func FormatOpenAIStreamToolCalls(calls []ParsedToolCall) []map[string]any { + out := make([]map[string]any, 0, len(calls)) + for i, c := range calls { + args, _ := json.Marshal(c.Input) + out = append(out, map[string]any{ + "index": i, + "id": "call_" + strings.ReplaceAll(uuid.NewString(), "-", ""), + "type": "function", + "function": map[string]any{ + "name": c.Name, + "arguments": string(args), + }, + }) + } + return out +} diff --git a/internal/webui/build.go b/internal/webui/build.go new file mode 100644 index 0000000..e1b6030 --- /dev/null +++ b/internal/webui/build.go @@ -0,0 +1,103 @@ +package webui + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "ds2api/internal/config" +) + +const ( + defaultBuildTimeout = 5 * time.Minute +) + +func EnsureBuiltOnStartup() { + if !shouldAutoBuild() { + return + } + staticDir := resolveStaticAdminDir(config.StaticAdminDir()) + if hasBuiltUI(staticDir) { + return + } + if err := buildWebUI(staticDir); err != nil { + config.Logger.Warn("[webui] auto build failed", "error", err) + return + } + if hasBuiltUI(staticDir) { + config.Logger.Info("[webui] auto build completed", "dir", staticDir) + return + } + config.Logger.Warn("[webui] auto build finished but output missing", "dir", staticDir) +} + +func shouldAutoBuild() bool { + raw := strings.TrimSpace(os.Getenv("DS2API_AUTO_BUILD_WEBUI")) + if raw == "" { + return !config.IsVercel() + } + switch strings.ToLower(raw) { + case "1", "true", "yes", "on": + return true + case "0", "false", "no", "off": + return false + default: + return !config.IsVercel() + } +} + +func hasBuiltUI(staticDir string) bool { + if strings.TrimSpace(staticDir) == "" { + return false + } + indexPath := filepath.Join(staticDir, "index.html") + st, err := os.Stat(indexPath) + return err == nil && !st.IsDir() +} + +func buildWebUI(staticDir string) error { + if _, err := exec.LookPath("npm"); err != nil { + return fmt.Errorf("npm not found in PATH: %w", err) + } + if strings.TrimSpace(staticDir) == "" { + return errors.New("static admin dir is empty") + } + + config.Logger.Info("[webui] static files missing, running npm build") + ctx, cancel := context.WithTimeout(context.Background(), defaultBuildTimeout) + defer cancel() + + if _, err := os.Stat(filepath.Join("webui", "node_modules")); err != nil { + if !os.IsNotExist(err) { + return err + } + installCmd := exec.CommandContext(ctx, "npm", "ci", "--prefix", "webui") + installCmd.Stdout = os.Stdout + installCmd.Stderr = os.Stderr + if err := installCmd.Run(); err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("webui npm ci timed out after %s", defaultBuildTimeout) + } + return err + } + } + + if err := os.MkdirAll(staticDir, 0o755); err != nil { + return err + } + cmd := exec.CommandContext(ctx, "npm", "run", "build", "--prefix", "webui", "--", "--outDir", staticDir, "--emptyOutDir") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + return fmt.Errorf("webui build timed out after %s", defaultBuildTimeout) + } + return err + } + return nil +} diff --git a/internal/webui/handler.go b/internal/webui/handler.go index f8120a6..ade79e4 100644 --- a/internal/webui/handler.go +++ b/internal/webui/handler.go @@ -21,7 +21,7 @@ type Handler struct { } func NewHandler() *Handler { - return &Handler{StaticDir: config.StaticAdminDir()} + return &Handler{StaticDir: resolveStaticAdminDir(config.StaticAdminDir())} } func RegisterRoutes(r chi.Router, h *Handler) { @@ -47,15 +47,20 @@ func (h *Handler) index(w http.ResponseWriter, _ *http.Request) { } func (h *Handler) admin(w http.ResponseWriter, r *http.Request) { - if fi, err := os.Stat(h.StaticDir); err != nil || !fi.IsDir() { - http.Error(w, "WebUI not built. Run `cd webui && npm run build` first.", http.StatusNotFound) + staticDir := resolveStaticAdminDir(h.StaticDir) + if fi, err := os.Stat(staticDir); err == nil && fi.IsDir() { + h.serveFromDisk(w, r, staticDir) return } + http.Error(w, "WebUI not built. Run `cd webui && npm run build` first.", http.StatusNotFound) +} + +func (h *Handler) serveFromDisk(w http.ResponseWriter, r *http.Request, staticDir string) { path := strings.TrimPrefix(r.URL.Path, "/admin") path = strings.TrimPrefix(path, "/") if path != "" && strings.Contains(path, ".") { - full := filepath.Join(h.StaticDir, filepath.Clean(path)) - if !strings.HasPrefix(full, h.StaticDir) { + full := filepath.Join(staticDir, filepath.Clean(path)) + if !strings.HasPrefix(full, staticDir) { http.NotFound(w, r) return } @@ -71,7 +76,7 @@ func (h *Handler) admin(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) return } - index := filepath.Join(h.StaticDir, "index.html") + index := filepath.Join(staticDir, "index.html") if _, err := os.Stat(index); err != nil { http.Error(w, "index.html not found", http.StatusNotFound) return @@ -79,3 +84,38 @@ func (h *Handler) admin(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-store, must-revalidate") http.ServeFile(w, r, index) } + +func resolveStaticAdminDir(preferred string) string { + if strings.TrimSpace(os.Getenv("DS2API_STATIC_ADMIN_DIR")) != "" { + return filepath.Clean(preferred) + } + candidates := []string{preferred} + if wd, err := os.Getwd(); err == nil { + candidates = append(candidates, filepath.Join(wd, "static/admin")) + } + if exe, err := os.Executable(); err == nil { + exeDir := filepath.Dir(exe) + candidates = append(candidates, + filepath.Join(exeDir, "static/admin"), + filepath.Join(filepath.Dir(exeDir), "static/admin"), + ) + } + // Common serverless locations. + candidates = append(candidates, "/var/task/static/admin", "/var/task/user/static/admin") + + seen := map[string]struct{}{} + for _, c := range candidates { + c = filepath.Clean(strings.TrimSpace(c)) + if c == "" { + continue + } + if _, ok := seen[c]; ok { + continue + } + seen[c] = struct{}{} + if fi, err := os.Stat(c); err == nil && fi.IsDir() { + return c + } + } + return filepath.Clean(preferred) +} diff --git a/vercel.json b/vercel.json index 6558561..2e68a94 100644 --- a/vercel.json +++ b/vercel.json @@ -1,6 +1,83 @@ { "version": 2, + "buildCommand": "npm ci --prefix webui && npm run build --prefix webui", + "outputDirectory": "static", + "functions": { + "api/chat-stream.js": { + "includeFiles": "**/sha3_wasm_bg.7b9ca65ddd.wasm", + "maxDuration": 300 + }, + "api/index.go": { + "maxDuration": 300 + } + }, "rewrites": [ + { + "source": "/v1/chat/completions", + "has": [ + { + "type": "query", + "key": "__go" + } + ], + "destination": "/api/index" + }, + { + "source": "/v1/chat/completions", + "destination": "/api/chat-stream" + }, + { + "source": "/admin/login", + "destination": "/api/index" + }, + { + "source": "/admin/verify", + "destination": "/api/index" + }, + { + "source": "/admin/config", + "destination": "/api/index" + }, + { + "source": "/admin/keys(.*)", + "destination": "/api/index" + }, + { + "source": "/admin/accounts(.*)", + "destination": "/api/index" + }, + { + "source": "/admin/queue/status", + "destination": "/api/index" + }, + { + "source": "/admin/import", + "destination": "/api/index" + }, + { + "source": "/admin/test", + "destination": "/api/index" + }, + { + "source": "/admin/vercel/(.*)", + "destination": "/api/index" + }, + { + "source": "/admin/export", + "destination": "/api/index" + }, + { + "source": "/admin", + "destination": "/admin/index.html" + }, + { + "source": "/admin/assets/(.*)", + "destination": "/admin/assets/$1" + }, + { + "source": "/admin/(.*)", + "destination": "/admin/index.html" + }, { "source": "/(.*)", "destination": "/api/index" diff --git a/webui/src/components/ApiTester.jsx b/webui/src/components/ApiTester.jsx index d240f4b..7d49982 100644 --- a/webui/src/components/ApiTester.jsx +++ b/webui/src/components/ApiTester.jsx @@ -89,6 +89,8 @@ export default function ApiTester({ config, onMessage, authFetch }) { const runTest = async () => { if (loading) return + const startedAt = Date.now() + setLoading(true) setIsStreaming(true) setResponse(null) @@ -113,7 +115,8 @@ export default function ApiTester({ config, onMessage, authFetch }) { headers['X-Ds2-Target-Account'] = selectedAccount } - const res = await fetch('/v1/chat/completions', { + const endpoint = streamingMode ? '/v1/chat/completions' : '/v1/chat/completions?__go=1' + const res = await fetch(endpoint, { method: 'POST', headers, body: JSON.stringify({ @@ -175,7 +178,8 @@ export default function ApiTester({ config, onMessage, authFetch }) { } else { const data = await res.json() setResponse({ success: true, status_code: res.status, ...data }) - onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: 'N/A' })) + const elapsed = Math.max(0, Date.now() - startedAt) + onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: elapsed })) } } catch (e) { if (e.name === 'AbortError') {