Merge pull request #25 from CJackHwang/dev

继续解决问题
This commit is contained in:
CJACK.
2026-02-17 00:47:07 +08:00
committed by GitHub
31 changed files with 2759 additions and 69 deletions

View File

@@ -16,9 +16,17 @@ LOG_LEVEL=INFO
# Recommended client concurrency is calculated dynamically as:
# account_count * DS2API_ACCOUNT_MAX_INFLIGHT
# So by default it is account_count * 2.
# Requests beyond inflight slots enter a waiting queue first.
# Default queue size equals recommended concurrency, so 429 starts after:
# account_count * DS2API_ACCOUNT_MAX_INFLIGHT * 2
# Alias: DS2API_ACCOUNT_CONCURRENCY
# DS2API_ACCOUNT_MAX_INFLIGHT=2
# Optional waiting queue size override for managed-key mode.
# Default: recommended_concurrency (same as account_count * inflight_limit)
# Alias: DS2API_ACCOUNT_QUEUE_SIZE
# DS2API_ACCOUNT_MAX_QUEUE=10
# ---------------------------------------------------------------
# Admin auth
# ---------------------------------------------------------------
@@ -54,9 +62,29 @@ DS2API_ADMIN_KEY=admin
# Built admin static assets directory
# DS2API_STATIC_ADMIN_DIR=static/admin
# Auto-build WebUI on startup when static/admin is missing.
# Default: enabled on local/Docker, disabled on Vercel.
# DS2API_AUTO_BUILD_WEBUI=true
# Internal auth secret used by the Vercel hybrid streaming path
# (Go prepare endpoint <-> Node stream function).
# Optional: falls back to DS2API_ADMIN_KEY when unset.
# DS2API_VERCEL_INTERNAL_SECRET=change-me
# Stream lease TTL seconds for Vercel hybrid streaming.
# During this window, the managed account stays occupied until Node calls release.
# Default: 900 (15 minutes)
# DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS=900
# ---------------------------------------------------------------
# Vercel sync integration (optional)
# ---------------------------------------------------------------
# VERCEL_TOKEN=your-vercel-token
# VERCEL_PROJECT_ID=prj_xxxxxxxxxxxx
# VERCEL_TEAM_ID=team_xxxxxxxxxxxx
# Optional: Vercel deployment protection bypass secret.
# If deployment protection is enabled, DS2API will use this value as
# x-vercel-protection-bypass for internal Node->Go calls on Vercel.
# You can also use VERCEL_AUTOMATION_BYPASS_SECRET directly.
# DS2API_VERCEL_PROTECTION_BYPASS=your-bypass-secret

4
.gitignore vendored
View File

@@ -64,8 +64,8 @@ pnpm-lock.yaml
*.tsbuildinfo
.cache/
.parcel-cache/
static/admin/*
!static/admin/.gitkeep
static/admin/
internal/webui/assets/admin/
# Environment
.env.local

View File

@@ -167,6 +167,7 @@ When `tools` is present, DS2API injects a tool prompt and parses tool-call paylo
- Non-stream: if detected, returns `message.tool_calls`, `finish_reason=tool_calls`, and `message.content=null`
- Stream: to avoid leaking raw tool-call JSON, DS2API buffers text first; if tool call is detected, only structured `delta.tool_calls` is emitted
- Stream `delta.tool_calls` is strict-client compatible: each tool call object includes `index` (starting from `0`)
Tool-call response example:

1
API.md
View File

@@ -171,6 +171,7 @@ data: [DONE]
- 非流式:若识别到工具调用,返回 `message.tool_calls`,并设置 `finish_reason=tool_calls``message.content=null`
- 流式:为防止原始 toolcall JSON 泄漏,正文会先缓冲;若识别到工具调用,仅输出结构化 `delta.tool_calls`
- 流式 `delta.tool_calls` 兼容严格客户端:每个 tool call 对象都带 `index`(从 `0` 开始)
工具调用响应示例:

View File

@@ -35,6 +35,9 @@ Build WebUI if `/admin` reports missing assets:
```bash
./scripts/build-webui.sh
# Or rely on startup auto-build (enabled locally by default)
# DS2API_AUTO_BUILD_WEBUI=true go run ./cmd/ds2api
```
## 2. Docker Deployment
@@ -62,7 +65,13 @@ Notes:
- Serverless entry: `api/index.go`
- Rewrites and cache headers: `vercel.json`
- Legacy `builds` has been removed to avoid the `unused-build-settings` warning
- Build stage runs `npm ci --prefix webui && npm run build --prefix webui` automatically
- `vercel.json` routes `/admin/assets/*` and the `/admin` page to static output, while `/admin/*` APIs still go to `api/index`
- To mitigate Go Runtime streaming buffering, `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime)
- `api/chat-stream.js` automatically falls back to the Go entry for non-stream requests or requests with `tools` (internal `__go=1`)
- `api/chat-stream.js` is data-path only (stream relay + SSE conversion); auth/account/session/PoW preparation still comes from an internal Go prepare endpoint (enabled on Vercel only)
- Go prepare creates a stream lease and Node releases it when streaming ends, keeping account occupancy semantics aligned with native Go streaming
- `vercel.json` sets `maxDuration: 300` for both `api/chat-stream.js` and `api/index.go` (subject to your Vercel plan limits)
Minimum environment variables:
@@ -76,8 +85,17 @@ Optional:
- `VERCEL_TEAM_ID`
- `DS2API_ACCOUNT_MAX_INFLIGHT` (per-account inflight limit, default `2`)
- `DS2API_ACCOUNT_CONCURRENCY` (alias of the same setting)
- `DS2API_ACCOUNT_MAX_QUEUE` (waiting queue limit, default=`recommended_concurrency`)
- `DS2API_ACCOUNT_QUEUE_SIZE` (alias of the same setting)
- `DS2API_VERCEL_INTERNAL_SECRET` (optional internal auth secret for Vercel hybrid streaming path; falls back to `DS2API_ADMIN_KEY` when unset)
- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` (optional stream lease TTL in seconds, default `900`)
Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`).
When inflight slots are full, requests are queued first; with default queue size, 429 typically starts around `account_count * 4`.
Notes:
- `static/admin` build output is not committed
- Vercel/Docker generate WebUI assets during build
After deploy, verify:
@@ -94,6 +112,60 @@ This repo includes `.github/workflows/release-artifacts.yml`:
- Builds Linux/macOS/Windows archives and uploads them to Release Assets
- Generates `sha256sums.txt` for integrity checks
## 3.2 Vercel Build Troubleshooting
If you see an error like:
```text
Error: Command failed: go build -ldflags -s -w -o .../bootstrap .../main__vc__go__.go
```
it is usually caused by invalid Go build flag settings in Vercel
(`-ldflags` not passed as a single argument).
How to fix:
1. Open Vercel Project Settings -> Build and Development Settings
2. Clear custom Go Build Flags / Build Command (recommended)
3. If ldflags must be used, set `-ldflags=\"-s -w\"` so it is passed as one argument
4. Ensure `go.mod` uses a supported version (this repo uses `go 1.24`)
5. Redeploy (preferably with cache cleared)
Another common root cause (Go monorepo + `internal/`):
```text
... use of internal package ds2api/internal/server not allowed
```
This usually happens when the Vercel Go entrypoint imports `internal/...` directly.
This repo now avoids that by using a public bridge package: `api/index.go` -> `ds2api/app` -> `internal/server`.
If you see:
```text
No Output Directory named "public" found after the Build completed.
```
Vercel is validating frontend output against `public`. This repo builds WebUI into `static/admin`, and uses the parent directory `static` as Vercel output root.
`vercel.json` now explicitly sets:
```json
"outputDirectory": "static"
```
If you manually changed Output Directory in Project Settings, set it to `static` (or clear it and let repo config apply).
If API responses return Vercel HTML `Authentication Required` (instead of JSON), the request is blocked by Vercel Deployment Protection:
- Disable protection for that deployment/environment (recommended for public API use)
- Or send `x-vercel-protection-bypass` in requests
- If only internal Node->Go calls are blocked, set `VERCEL_AUTOMATION_BYPASS_SECRET` (or `DS2API_VERCEL_PROTECTION_BYPASS`)
Vercel streaming note (important):
- Vercel Go Runtime applies platform-level buffering, so this repo uses a hybrid path on Vercel (`Go prepare + Node stream`) to restore real-time SSE behavior.
- This adaptation is Vercel-only; local and Docker remain pure Go.
## 4. Reverse Proxy (Nginx)
Disable buffering for SSE:

View File

@@ -35,6 +35,9 @@ go run ./cmd/ds2api
```bash
./scripts/build-webui.sh
# 或依赖自动构建(默认本地开启)
# DS2API_AUTO_BUILD_WEBUI=true go run ./cmd/ds2api
```
## 2. Docker 部署
@@ -62,7 +65,13 @@ docker-compose up -d --build
- serverless 入口:`api/index.go`
- 路由与缓存头:`vercel.json`
- 已移除 legacy `builds` 字段,避免 `unused-build-settings` 警告
- 构建阶段会自动执行 `npm ci --prefix webui && npm run build --prefix webui`
- `vercel.json` 已将 `/admin/assets/*``/admin` 页面走静态产物,`/admin/*` API 仍走 `api/index`
- 为缓解 Go Runtime 的流式缓冲,`/v1/chat/completions` 在 Vercel 上会优先走 `api/chat-stream.js`Node Runtime
- `api/chat-stream.js` 对非流式请求或 `tools` 请求会自动回退到 Go 入口(内部 `__go=1`
- `api/chat-stream.js` 仅负责流式数据转发与 SSE 转换鉴权、账号选择、会话创建、PoW 计算仍由 Go 内部 prepare 接口完成(仅 Vercel 启用)
- Go prepare 会创建流式 leaseNode 在流结束后回调 release账号占用语义与 Go 原生流式保持一致
- `vercel.json` 已将 `api/chat-stream.js``api/index.go``maxDuration` 设为 `300`(受套餐上限约束)
至少配置环境变量:
@@ -76,8 +85,17 @@ docker-compose up -d --build
- `VERCEL_TEAM_ID`
- `DS2API_ACCOUNT_MAX_INFLIGHT`(每账号并发上限,默认 `2`
- `DS2API_ACCOUNT_CONCURRENCY`(同上别名)
- `DS2API_ACCOUNT_MAX_QUEUE`(等待队列上限,默认=`recommended_concurrency`
- `DS2API_ACCOUNT_QUEUE_SIZE`(同上别名)
- `DS2API_VERCEL_INTERNAL_SECRET`可选Vercel 混合流式链路内部鉴权;未设置时回退使用 `DS2API_ADMIN_KEY`
- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS`(可选,流式 lease 过期秒数,默认 `900`
并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。
当 in-flight 满时,请求先进入等待队列;默认队列上限等于建议并发值,因此默认 429 阈值约为 `账号数量 × 4`
说明:
- 仓库不提交 `static/admin` 构建产物
- Vercel / Docker 构建阶段自动生成 WebUI 静态文件
部署后建议先访问:
@@ -94,6 +112,58 @@ docker-compose up -d --build
- 自动构建 Linux/macOS/Windows 二进制包并上传到 Release Assets
- 生成 `sha256sums.txt` 供校验
## 3.2 Vercel 常见报错排查
若看到类似报错:
```text
Error: Command failed: go build -ldflags -s -w -o .../bootstrap .../main__vc__go__.go
```
通常是 Vercel 项目里的 Go 构建参数配置不正确(`-ldflags` 没有作为一个整体字符串传递)。
处理方式:
1. 进入 Vercel Project Settings -> Build and Development Settings
2. 清空自定义 Go Build Flags / Build Command推荐
3. 若必须设置 ldflags使用 `-ldflags=\"-s -w\"`(保证它是一个参数)
4. 确认仓库 `go.mod` 为受支持版本(当前为 `go 1.24`
5. 重新部署(建议 `Redeploy` 并清缓存)
另一个常见根因Go 单仓 + `internal/`
```text
... use of internal package ds2api/internal/server not allowed
```
这通常发生在 Vercel Go 入口文件直接 `import internal/...`
当前仓库已通过公开桥接包 `app` 解决:`api/index.go` -> `ds2api/app` -> `internal/server`
若看到类似报错:
```text
No Output Directory named "public" found after the Build completed.
```
说明 Vercel 正在按 `public` 校验前端产物目录。当前仓库会将 WebUI 构建到 `static/admin`,并在 `vercel.json` 使用上级目录 `static` 作为输出根目录:
```json
"outputDirectory": "static"
```
若你在项目设置里手动改过 Output Directory请同步改为 `static` 或清空让仓库配置生效。
若接口返回 Vercel 的 HTML 页面 `Authentication Required`(而不是 JSON说明被 Vercel Deployment Protection 拦截:
- 关闭该部署/环境的 Protection推荐用于公开 API
- 或给请求加 `x-vercel-protection-bypass`
- 若仅是 Vercel 内部 Node->Go 调用被拦截,可设置 `VERCEL_AUTOMATION_BYPASS_SECRET`(或 `DS2API_VERCEL_PROTECTION_BYPASS`
Vercel 流式说明(重要):
- Vercel 的 Go Runtime 存在平台层响应缓冲,因此本项目在 Vercel 上采用“Go prepare + Node stream”的混合链路来恢复实时 SSE。
- 该适配只在 Vercel 生效;本地与 Docker 仍走纯 Go 链路。
## 4. 反向代理Nginx
如果在 Nginx 后挂载,建议关闭缓冲以保证 SSE

View File

@@ -8,7 +8,7 @@
语言 / Language: [中文](README.MD) | [English](README.en.md)
将 DeepSeek Web 对话能力转换为 OpenAI 与 Claude 兼容 API。当前仓库后端为 **Go 全量实现**,前端保留 React WebUI构建产物托管于 `static/admin`)。
将 DeepSeek Web 对话能力转换为 OpenAI 与 Claude 兼容 API。当前仓库后端为 **Go 全量实现**,前端保留 React WebUI源码在 `webui/`,部署时自动构建到 `static/admin`)。
## 当前实现边界
@@ -65,7 +65,8 @@ go run ./cmd/ds2api
默认地址:`http://localhost:5001`
如果访问 `/admin` 提示未构建 WebUI请执行
本地默认会在启动时自动尝试构建 WebUI需要本机有 Node.js/npm
若你想手动构建,也可执行:
```bash
./scripts/build-webui.sh
@@ -85,12 +86,16 @@ docker-compose logs -f
- 入口:`api/index.go`
- 路由重写:`vercel.json`
- `vercel.json` 会在构建阶段自动执行 `npm ci --prefix webui && npm run build --prefix webui`
- `/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`Node Runtime以保证实时 SSE
- `api/chat-stream.js` 仅负责流式数据转发;鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口处理
- Go prepare 会下发 `lease_id`Node 在流结束后调用 release确保账号占用时长与 Go 原生流式一致
- WebUI 的“非流式测试”会直接请求 `?__go=1`,避免 Vercel 上 Node 中转导致长请求更易超时
- 至少配置:
- `DS2API_ADMIN_KEY`
- `DS2API_CONFIG_JSON`JSON 字符串或 Base64
说明:`vercel.json` 已移除 legacy `builds` 配置,避免部署时出现
`unused-build-settings` 警告,并使用当前推荐的函数路由模式。
说明:仓库不提交 `static/admin` 构建产物Vercel 构建时自动生成并打包。
## Release 自动构建产物GitHub Actions
@@ -155,6 +160,8 @@ cp config.example.json config.json
| `LOG_LEVEL` | 日志级别:`DEBUG/INFO/WARN/ERROR` |
| `DS2API_ACCOUNT_MAX_INFLIGHT` | 每个账号最大并发 in-flight 请求数,默认 `2` |
| `DS2API_ACCOUNT_CONCURRENCY` | 同上别名(兼容旧写法) |
| `DS2API_ACCOUNT_MAX_QUEUE` | 等待队列上限,默认等于 `recommended_concurrency` |
| `DS2API_ACCOUNT_QUEUE_SIZE` | 同上别名(兼容旧写法) |
| `DS2API_ADMIN_KEY` | Admin 登录密钥,默认 `admin` |
| `DS2API_JWT_SECRET` | Admin JWT 签名密钥(可选) |
| `DS2API_JWT_EXPIRE_HOURS` | Admin JWT 过期小时数,默认 `24` |
@@ -162,6 +169,9 @@ cp config.example.json config.json
| `DS2API_CONFIG_JSON` | 直接注入配置JSON 或 Base64 |
| `DS2API_WASM_PATH` | PoW wasm 文件路径 |
| `DS2API_STATIC_ADMIN_DIR` | 管理台静态文件目录 |
| `DS2API_AUTO_BUILD_WEBUI` | 启动时缺失 WebUI 时是否自动执行 npm build默认本地开启Vercel 关闭) |
| `DS2API_VERCEL_INTERNAL_SECRET` | Vercel 混合流式链路内部鉴权密钥(可选;未设置时回退用 `DS2API_ADMIN_KEY` |
| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Vercel 流式 lease 过期秒数(默认 `900` |
| `VERCEL_TOKEN` | Vercel 同步 token可选 |
| `VERCEL_PROJECT_ID` | Vercel 项目 ID可选 |
| `VERCEL_TEAM_ID` | Vercel 团队 ID可选 |
@@ -179,8 +189,12 @@ cp config.example.json config.json
- 系统建议并发值按账号池动态计算:`账号数量 × 每账号并发上限`
- 默认每账号并发上限是 `2`,因此默认建议值是 `账号数量 × 2`
- 当 in-flight 槽位满时,请求会进入等待队列,不会立即 429
- 默认等待队列上限 = `recommended_concurrency`,因此默认总承载上限是 `账号数量 × 4`
- 超过总承载上限in-flight + waiting才返回 `429`
- 可通过 `DS2API_ACCOUNT_MAX_INFLIGHT`(或 `DS2API_ACCOUNT_CONCURRENCY`)手动覆盖每账号并发上限
- `GET /admin/queue/status` 会返回 `max_inflight_per_account` 与 `recommended_concurrency`
- 可通过 `DS2API_ACCOUNT_MAX_QUEUE`(或 `DS2API_ACCOUNT_QUEUE_SIZE`)手动覆盖等待队列上限
- `GET /admin/queue/status` 会返回 `max_inflight_per_account`、`recommended_concurrency`、`waiting`、`max_queue_size`
## Tool Call 适配说明

View File

@@ -8,7 +8,7 @@
Language: [中文](README.MD) | [English](README.en.md)
DS2API converts DeepSeek Web chat capability into OpenAI-compatible and Claude-compatible APIs. The current repository is **Go backend only** with the existing React WebUI kept as static assets under `static/admin`.
DS2API converts DeepSeek Web chat capability into OpenAI-compatible and Claude-compatible APIs. The current repository is **Go backend only** with the existing React WebUI source in `webui/` and build output generated to `static/admin` during deployment.
## Implementation Boundary
@@ -65,7 +65,8 @@ go run ./cmd/ds2api
Default URL: `http://localhost:5001`
If `/admin` says WebUI not built:
By default, local startup will auto-build WebUI when `static/admin` is missing (Node.js/npm required).
If you prefer manual build:
```bash
./scripts/build-webui.sh
@@ -85,12 +86,16 @@ docker-compose logs -f
- Entrypoint: `api/index.go`
- Rewrites: `vercel.json`
- `vercel.json` runs `npm ci --prefix webui && npm run build --prefix webui` during build
- `/v1/chat/completions` is routed to `api/chat-stream.js` (Node Runtime) on Vercel to preserve real-time SSE
- `api/chat-stream.js` is data-path only; auth/account/session/PoW preparation still comes from an internal Go prepare endpoint
- Go prepare returns a `lease_id`; Node releases it at stream end so account occupancy duration stays aligned with native Go streaming behavior
- WebUI non-stream test calls `?__go=1` directly to avoid extra Node hop timeout risk on long Vercel requests
- Minimum env vars:
- `DS2API_ADMIN_KEY`
- `DS2API_CONFIG_JSON` (raw JSON or Base64)
Note: legacy `builds` has been removed from `vercel.json` to avoid
the `unused-build-settings` warning and to follow the current function routing model.
Note: build artifacts under `static/admin` are not committed; Vercel generates them during build.
## Release Artifact Automation (GitHub Actions)
@@ -155,6 +160,8 @@ cp config.example.json config.json
| `LOG_LEVEL` | `DEBUG/INFO/WARN/ERROR` |
| `DS2API_ACCOUNT_MAX_INFLIGHT` | Max in-flight requests per managed account, default `2` |
| `DS2API_ACCOUNT_CONCURRENCY` | Alias of the same setting (legacy compatibility) |
| `DS2API_ACCOUNT_MAX_QUEUE` | Waiting queue limit (managed-key mode), default=`recommended_concurrency` |
| `DS2API_ACCOUNT_QUEUE_SIZE` | Alias of the same setting (legacy compatibility) |
| `DS2API_ADMIN_KEY` | Admin login key, default `admin` |
| `DS2API_JWT_SECRET` | Admin JWT signing secret (optional) |
| `DS2API_JWT_EXPIRE_HOURS` | Admin JWT TTL in hours, default `24` |
@@ -162,6 +169,9 @@ cp config.example.json config.json
| `DS2API_CONFIG_JSON` | Inline config (JSON or Base64) |
| `DS2API_WASM_PATH` | PoW wasm path |
| `DS2API_STATIC_ADMIN_DIR` | Admin static assets dir |
| `DS2API_AUTO_BUILD_WEBUI` | Auto run npm build on startup when WebUI assets are missing (default: enabled locally, disabled on Vercel) |
| `DS2API_VERCEL_INTERNAL_SECRET` | Internal auth secret for Vercel hybrid streaming path (optional; falls back to `DS2API_ADMIN_KEY` if unset) |
| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Stream lease TTL seconds for Vercel hybrid streaming (default `900`) |
| `VERCEL_TOKEN` | Vercel sync token (optional) |
| `VERCEL_PROJECT_ID` | Vercel project ID (optional) |
| `VERCEL_TEAM_ID` | Vercel team ID (optional) |
@@ -179,8 +189,12 @@ Optional header: `X-Ds2-Target-Account` to pin one managed account.
- DS2API computes recommended concurrency dynamically as: `account_count * per_account_inflight_limit`
- Default per-account inflight limit is `2`, so default recommendation is `account_count * 2`
- When inflight slots are full, requests enter a waiting queue instead of immediate 429
- Default queue limit equals `recommended_concurrency`, so default 429 threshold is about `account_count * 4`
- 429 is returned only after total load exceeds `inflight + waiting` capacity
- You can override per-account inflight via `DS2API_ACCOUNT_MAX_INFLIGHT` (or `DS2API_ACCOUNT_CONCURRENCY`)
- `GET /admin/queue/status` returns both `max_inflight_per_account` and `recommended_concurrency`
- You can override waiting queue size via `DS2API_ACCOUNT_MAX_QUEUE` (or `DS2API_ACCOUNT_QUEUE_SIZE`)
- `GET /admin/queue/status` returns `max_inflight_per_account`, `recommended_concurrency`, `waiting`, and `max_queue_size`
## Tool Call Adaptation

1086
api/chat-stream.js Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -4,17 +4,17 @@ import (
"net/http"
"sync"
"ds2api/internal/server"
"ds2api/app"
)
var (
once sync.Once
app *server.App
h http.Handler
)
func Handler(w http.ResponseWriter, r *http.Request) {
once.Do(func() {
app = server.NewApp()
h = app.NewHandler()
})
app.Router.ServeHTTP(w, r)
h.ServeHTTP(w, r)
}

11
app/handler.go Normal file
View File

@@ -0,0 +1,11 @@
package app
import (
"net/http"
"ds2api/internal/server"
)
func NewHandler() http.Handler {
return server.NewApp().Router
}

View File

@@ -7,9 +7,11 @@ import (
"ds2api/internal/config"
"ds2api/internal/server"
"ds2api/internal/webui"
)
func main() {
webui.EnsureBuiltOnStartup()
app := server.NewApp()
port := strings.TrimSpace(os.Getenv("PORT"))
if port == "" {

View File

@@ -1,6 +1,7 @@
package account
import (
"context"
"os"
"sort"
"strconv"
@@ -11,12 +12,14 @@ import (
)
type Pool struct {
store *config.Store
mu sync.Mutex
queue []string
inUse map[string]int
maxInflightPerAccount int
store *config.Store
mu sync.Mutex
queue []string
inUse map[string]int
waiters []chan struct{}
maxInflightPerAccount int
recommendedConcurrency int
maxQueueSize int
}
func NewPool(store *config.Store) *Pool {
@@ -47,25 +50,64 @@ func (p *Pool) Reset() {
}
}
recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount)
queueLimit := maxQueueFromEnv(recommended)
p.mu.Lock()
defer p.mu.Unlock()
p.drainWaitersLocked()
p.queue = ids
p.inUse = map[string]int{}
p.recommendedConcurrency = recommended
p.maxQueueSize = queueLimit
config.Logger.Info(
"[init_account_queue] initialized",
"total", len(ids),
"max_inflight_per_account", p.maxInflightPerAccount,
"recommended_concurrency", p.recommendedConcurrency,
"max_queue_size", p.maxQueueSize,
)
}
func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if exclude == nil {
exclude = map[string]bool{}
return p.acquireLocked(target, normalizeExclude(exclude))
}
func (p *Pool) AcquireWait(ctx context.Context, target string, exclude map[string]bool) (config.Account, bool) {
if ctx == nil {
ctx = context.Background()
}
exclude = normalizeExclude(exclude)
for {
if ctx.Err() != nil {
return config.Account{}, false
}
p.mu.Lock()
if acc, ok := p.acquireLocked(target, exclude); ok {
p.mu.Unlock()
return acc, true
}
if !p.canQueueLocked(target, exclude) {
p.mu.Unlock()
return config.Account{}, false
}
waiter := make(chan struct{})
p.waiters = append(p.waiters, waiter)
p.mu.Unlock()
select {
case <-ctx.Done():
p.mu.Lock()
p.removeWaiterLocked(waiter)
p.mu.Unlock()
return config.Account{}, false
case <-waiter:
}
}
}
func (p *Pool) acquireLocked(target string, exclude map[string]bool) (config.Account, bool) {
if target != "" {
if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount {
return config.Account{}, false
@@ -131,9 +173,11 @@ func (p *Pool) Release(accountID string) {
}
if count == 1 {
delete(p.inUse, accountID)
p.notifyWaiterLocked()
return
}
p.inUse[accountID] = count - 1
p.notifyWaiterLocked()
}
func (p *Pool) Status() map[string]any {
@@ -162,6 +206,8 @@ func (p *Pool) Status() map[string]any {
"in_use_accounts": inUseAccounts,
"max_inflight_per_account": p.maxInflightPerAccount,
"recommended_concurrency": p.recommendedConcurrency,
"waiting": len(p.waiters),
"max_queue_size": p.maxQueueSize,
}
}
@@ -188,3 +234,69 @@ func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int
}
return accountCount * maxInflightPerAccount
}
func normalizeExclude(exclude map[string]bool) map[string]bool {
if exclude == nil {
return map[string]bool{}
}
return exclude
}
func (p *Pool) canQueueLocked(target string, exclude map[string]bool) bool {
if target != "" {
if exclude[target] {
return false
}
if _, ok := p.store.FindAccount(target); !ok {
return false
}
}
if p.maxQueueSize <= 0 {
return false
}
return len(p.waiters) < p.maxQueueSize
}
func (p *Pool) notifyWaiterLocked() {
if len(p.waiters) == 0 {
return
}
waiter := p.waiters[0]
p.waiters = p.waiters[1:]
close(waiter)
}
func (p *Pool) removeWaiterLocked(waiter chan struct{}) bool {
for i, w := range p.waiters {
if w != waiter {
continue
}
p.waiters = append(p.waiters[:i], p.waiters[i+1:]...)
return true
}
return false
}
func (p *Pool) drainWaitersLocked() {
for _, waiter := range p.waiters {
close(waiter)
}
p.waiters = nil
}
func maxQueueFromEnv(defaultSize int) int {
for _, key := range []string{"DS2API_ACCOUNT_MAX_QUEUE", "DS2API_ACCOUNT_QUEUE_SIZE"} {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
continue
}
n, err := strconv.Atoi(raw)
if err == nil && n >= 0 {
return n
}
}
if defaultSize < 0 {
return 0
}
return defaultSize
}

View File

@@ -1,8 +1,10 @@
package account
import (
"context"
"sync"
"testing"
"time"
"ds2api/internal/config"
)
@@ -10,6 +12,9 @@ import (
func newPoolForTest(t *testing.T, maxInflight string) *Pool {
t.Helper()
t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight)
t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "")
t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "")
t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "")
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[
@@ -21,6 +26,33 @@ func newPoolForTest(t *testing.T, maxInflight string) *Pool {
return NewPool(store)
}
func newSingleAccountPoolForTest(t *testing.T, maxInflight string) *Pool {
t.Helper()
t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight)
t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "")
t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "")
t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "")
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[{"email":"acc1@example.com","token":"token1"}]
}`)
return NewPool(config.LoadStore())
}
func waitForWaitingCount(t *testing.T, pool *Pool, want int) {
t.Helper()
deadline := time.Now().Add(800 * time.Millisecond)
for time.Now().Before(deadline) {
status := pool.Status()
if got, ok := status["waiting"].(int); ok && got == want {
return
}
time.Sleep(10 * time.Millisecond)
}
status := pool.Status()
t.Fatalf("waiting count did not reach %d, current status=%v", want, status)
}
func TestPoolRoundRobinWithConcurrentSlots(t *testing.T) {
pool := newPoolForTest(t, "2")
@@ -118,6 +150,9 @@ func TestPoolStatusRecommendedConcurrencyDefault(t *testing.T) {
if got, ok := status["recommended_concurrency"].(int); !ok || got != 4 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
if got, ok := status["max_queue_size"].(int); !ok || got != 4 {
t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"])
}
}
func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) {
@@ -130,6 +165,9 @@ func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) {
if got, ok := status["recommended_concurrency"].(int); !ok || got != 6 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
if got, ok := status["max_queue_size"].(int); !ok || got != 6 {
t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"])
}
}
func TestPoolAccountConcurrencyAliasEnv(t *testing.T) {
@@ -151,4 +189,108 @@ func TestPoolAccountConcurrencyAliasEnv(t *testing.T) {
if got, ok := status["recommended_concurrency"].(int); !ok || got != 8 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
if got, ok := status["max_queue_size"].(int); !ok || got != 8 {
t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"])
}
}
func TestPoolSupportsTokenOnlyAccount(t *testing.T) {
t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", "1")
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[{"token":"token-only-account"}]
}`)
pool := NewPool(config.LoadStore())
status := pool.Status()
if got, ok := status["total"].(int); !ok || got != 1 {
t.Fatalf("unexpected total in pool status: %#v", status["total"])
}
if got, ok := status["available"].(int); !ok || got != 1 {
t.Fatalf("unexpected available in pool status: %#v", status["available"])
}
acc, ok := pool.Acquire("", nil)
if !ok {
t.Fatalf("expected acquire success for token-only account")
}
if acc.Token != "token-only-account" {
t.Fatalf("unexpected token on acquired account: %q", acc.Token)
}
}
func TestPoolAcquireWaitQueuesAndSucceedsAfterRelease(t *testing.T) {
pool := newSingleAccountPoolForTest(t, "1")
first, ok := pool.Acquire("", nil)
if !ok {
t.Fatal("expected first acquire to succeed")
}
type result struct {
id string
ok bool
}
resCh := make(chan result, 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
go func() {
acc, ok := pool.AcquireWait(ctx, "", nil)
resCh <- result{id: acc.Identifier(), ok: ok}
}()
waitForWaitingCount(t, pool, 1)
pool.Release(first.Identifier())
select {
case res := <-resCh:
if !res.ok {
t.Fatal("expected queued acquire to succeed after release")
}
if res.id != "acc1@example.com" {
t.Fatalf("unexpected account id from queued acquire: %q", res.id)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for queued acquire result")
}
}
func TestPoolAcquireWaitQueueLimitReturnsFalse(t *testing.T) {
pool := newSingleAccountPoolForTest(t, "1")
first, ok := pool.Acquire("", nil)
if !ok {
t.Fatal("expected first acquire to succeed")
}
type result struct {
id string
ok bool
}
firstWaiter := make(chan result, 1)
ctx1, cancel1 := context.WithTimeout(context.Background(), 1200*time.Millisecond)
defer cancel1()
go func() {
acc, ok := pool.AcquireWait(ctx1, "", nil)
firstWaiter <- result{id: acc.Identifier(), ok: ok}
}()
waitForWaitingCount(t, pool, 1)
ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2()
start := time.Now()
if _, ok := pool.AcquireWait(ctx2, "", nil); ok {
t.Fatal("expected second queued acquire to fail when queue is full")
}
if time.Since(start) > 120*time.Millisecond {
t.Fatalf("queue-full acquire should fail fast, took %s", time.Since(start))
}
pool.Release(first.Identifier())
select {
case res := <-firstWaiter:
if !res.ok {
t.Fatal("expected first queued acquire to succeed after release")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for first queued acquire")
}
}

View File

@@ -230,19 +230,22 @@ func collectDeepSeek(resp *http.Response, thinkingEnabled bool) (string, string)
func (h *Handler) writeClaudeStream(w http.ResponseWriter, r *http.Request, model string, messages []any, fullText string, detected []util.ParsedToolCall) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
writeJSON(w, http.StatusInternalServerError, map[string]any{"error": map[string]any{"type": "api_error", "message": "streaming unsupported"}})
return
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
canFlush := rc.Flush() == nil
if !canFlush {
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
}
send := func(v any) {
b, _ := json.Marshal(v)
_, _ = w.Write([]byte("data: "))
_, _ = w.Write(b)
_, _ = w.Write([]byte("\n\n"))
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
messageID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
inputTokens := util.EstimateTokens(fmt.Sprintf("%v", messages))

View File

@@ -3,11 +3,17 @@ package openai
import (
"bufio"
"context"
"crypto/rand"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/go-chi/chi/v5"
@@ -23,6 +29,25 @@ type Handler struct {
Store *config.Store
Auth *auth.Resolver
DS *deepseek.Client
leaseMu sync.Mutex
streamLeases map[string]streamLease
}
type streamLease struct {
Auth *auth.RequestAuth
ExpiresAt time.Time
}
type toolStreamSieveState struct {
pending strings.Builder
capture strings.Builder
capturing bool
}
type toolStreamEvent struct {
Content string
ToolCalls []util.ParsedToolCall
}
func RegisterRoutes(r chi.Router, h *Handler) {
@@ -35,6 +60,15 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) {
}
func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
if isVercelStreamReleaseRequest(r) {
h.handleVercelStreamRelease(w, r)
return
}
if isVercelStreamPrepareRequest(r) {
h.handleVercelStreamPrepare(w, r)
return
}
a, err := h.Auth.Determine(r)
if err != nil {
status := http.StatusUnauthorized
@@ -106,6 +140,142 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
h.handleNonStream(w, r.Context(), resp, sessionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames)
}
func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
h.sweepExpiredStreamLeases()
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
a, err := h.Auth.Determine(r)
if err != nil {
status := http.StatusUnauthorized
if err == auth.ErrNoAccount {
status = http.StatusTooManyRequests
}
writeOpenAIError(w, status, err.Error())
return
}
leased := false
defer func() {
if !leased {
h.Auth.Release(a)
}
}()
r = r.WithContext(auth.WithAuth(r.Context(), a))
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
if !toBool(req["stream"]) {
writeOpenAIError(w, http.StatusBadRequest, "stream must be true")
return
}
if tools, ok := req["tools"].([]any); ok && len(tools) > 0 {
writeOpenAIError(w, http.StatusBadRequest, "tools are not supported by vercel stream prepare")
return
}
model, _ := req["model"].(string)
messagesRaw, _ := req["messages"].([]any)
if model == "" || len(messagesRaw) == 0 {
writeOpenAIError(w, http.StatusBadRequest, "Request must include 'model' and 'messages'.")
return
}
thinkingEnabled, searchEnabled, ok := config.GetModelConfig(model)
if !ok {
writeOpenAIError(w, http.StatusServiceUnavailable, fmt.Sprintf("Model '%s' is not available.", model))
return
}
messages := normalizeMessages(messagesRaw)
finalPrompt := util.MessagesPrepare(messages)
sessionID, err := h.DS.CreateSession(r.Context(), a, 3)
if err != nil {
if a.UseConfigToken {
writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.")
} else {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
}
return
}
powHeader, err := h.DS.GetPow(r.Context(), a, 3)
if err != nil {
writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).")
return
}
if strings.TrimSpace(a.DeepSeekToken) == "" {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
return
}
payload := map[string]any{
"chat_session_id": sessionID,
"parent_message_id": nil,
"prompt": finalPrompt,
"ref_file_ids": []any{},
"thinking_enabled": thinkingEnabled,
"search_enabled": searchEnabled,
}
leaseID := h.holdStreamLease(a)
if leaseID == "" {
writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease")
return
}
leased = true
writeJSON(w, http.StatusOK, map[string]any{
"session_id": sessionID,
"lease_id": leaseID,
"model": model,
"final_prompt": finalPrompt,
"thinking_enabled": thinkingEnabled,
"search_enabled": searchEnabled,
"deepseek_token": a.DeepSeekToken,
"pow_header": powHeader,
"payload": payload,
})
}
func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
h.sweepExpiredStreamLeases()
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
leaseID, _ := req["lease_id"].(string)
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
if !h.releaseStreamLease(leaseID) {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found")
return
}
writeJSON(w, http.StatusOK, map[string]any{"success": true})
}
func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
@@ -191,12 +361,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
writeOpenAIError(w, http.StatusInternalServerError, "streaming unsupported")
return
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
canFlush := rc.Flush() == nil
if !canFlush {
config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered")
}
lines := make(chan []byte, 128)
@@ -216,6 +387,8 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
created := time.Now().Unix()
firstChunkSent := false
bufferToolContent := len(toolNames) > 0
var toolSieve toolStreamSieveState
toolCallsEmitted := false
currentType := "text"
if thinkingEnabled {
currentType = "thinking"
@@ -226,27 +399,32 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
hasContent := false
keepaliveTicker := time.NewTicker(time.Duration(deepseek.KeepAliveTimeout) * time.Second)
defer keepaliveTicker.Stop()
keepaliveCountWithoutContent := 0
sendChunk := func(v any) {
b, _ := json.Marshal(v)
_, _ = w.Write([]byte("data: "))
_, _ = w.Write(b)
_, _ = w.Write([]byte("\n\n"))
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
sendDone := func() {
_, _ = w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
finalize := func(finishReason string) {
finalThinking := thinking.String()
finalText := text.String()
detected := util.ParseToolCalls(finalText, toolNames)
if len(detected) > 0 {
if len(detected) > 0 && !toolCallsEmitted {
finishReason = "tool_calls"
delta := map[string]any{
"tool_calls": util.FormatOpenAIToolCalls(detected),
"tool_calls": util.FormatOpenAIStreamToolCalls(detected),
}
if !firstChunkSent {
delta["role"] = "assistant"
@@ -259,21 +437,29 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
"model": model,
"choices": []map[string]any{{"delta": delta, "index": 0}},
})
} else if bufferToolContent && strings.TrimSpace(finalText) != "" {
delta := map[string]any{
"content": finalText,
} else if bufferToolContent {
for _, evt := range flushToolSieve(&toolSieve, toolNames) {
if evt.Content == "" {
continue
}
delta := map[string]any{
"content": evt.Content,
}
if !firstChunkSent {
delta["role"] = "assistant"
firstChunkSent = true
}
sendChunk(map[string]any{
"id": completionID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{{"delta": delta, "index": 0}},
})
}
if !firstChunkSent {
delta["role"] = "assistant"
firstChunkSent = true
}
sendChunk(map[string]any{
"id": completionID,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": []map[string]any{{"delta": delta, "index": 0}},
})
}
if len(detected) > 0 || toolCallsEmitted {
finishReason = "tool_calls"
}
promptTokens := util.EstimateTokens(finalPrompt)
reasoningTokens := util.EstimateTokens(finalThinking)
@@ -301,12 +487,21 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
case <-r.Context().Done():
return
case <-keepaliveTicker.C:
if !hasContent {
keepaliveCountWithoutContent++
if keepaliveCountWithoutContent >= deepseek.MaxKeepaliveCount {
finalize("stop")
return
}
}
if hasContent && time.Since(lastContent) > time.Duration(deepseek.StreamIdleTimeout)*time.Second {
finalize("stop")
return
}
_, _ = w.Write([]byte(": keep-alive\n\n"))
flusher.Flush()
if canFlush {
_, _ = w.Write([]byte(": keep-alive\n\n"))
_ = rc.Flush()
}
case line, ok := <-lines:
if !ok {
// Ensure scanner completion is observed only after all queued
@@ -343,6 +538,7 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
}
hasContent = true
lastContent = time.Now()
keepaliveCountWithoutContent = 0
delta := map[string]any{}
if !firstChunkSent {
delta["role"] = "assistant"
@@ -357,6 +553,41 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
text.WriteString(p.Text)
if !bufferToolContent {
delta["content"] = p.Text
} else {
events := processToolSieveChunk(&toolSieve, p.Text, toolNames)
if len(events) == 0 {
// Keep thinking delta only frame.
}
for _, evt := range events {
if len(evt.ToolCalls) > 0 {
toolCallsEmitted = true
tcDelta := map[string]any{
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
}
if !firstChunkSent {
tcDelta["role"] = "assistant"
firstChunkSent = true
}
newChoices = append(newChoices, map[string]any{
"delta": tcDelta,
"index": 0,
})
continue
}
if evt.Content != "" {
contentDelta := map[string]any{
"content": evt.Content,
}
if !firstChunkSent {
contentDelta["role"] = "assistant"
firstChunkSent = true
}
newChoices = append(newChoices, map[string]any{
"delta": contentDelta,
"index": 0,
})
}
}
}
}
if len(delta) > 0 {
@@ -469,3 +700,344 @@ func openAIErrorType(status int) string {
return "invalid_request_error"
}
}
func isVercelStreamPrepareRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
}
func isVercelStreamReleaseRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
}
func vercelInternalSecret() string {
if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" {
return v
}
if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" {
return v
}
return "admin"
}
func shouldEmitBufferedToolProbeContent(buffered string) bool {
trimmed := strings.TrimSpace(buffered)
if trimmed == "" {
return false
}
normalized := normalizeToolProbePrefix(trimmed)
if normalized == "" {
return false
}
first := normalized[0]
switch first {
case '{', '[', '`':
lower := strings.ToLower(normalized)
if strings.Contains(lower, "tool_calls") {
return false
}
// Keep a short hold window for JSON-ish starts to avoid leaking tool JSON.
if len([]rune(normalized)) < 20 {
return false
}
return true
default:
// Natural language starts can be streamed immediately.
return true
}
}
func normalizeToolProbePrefix(s string) string {
t := strings.TrimSpace(s)
if strings.HasPrefix(t, "```") {
t = strings.TrimPrefix(t, "```")
t = strings.TrimSpace(t)
t = strings.TrimPrefix(strings.ToLower(t), "json")
t = strings.TrimSpace(t)
}
return t
}
func processToolSieveChunk(state *toolStreamSieveState, chunk string, toolNames []string) []toolStreamEvent {
if state == nil || chunk == "" {
return nil
}
state.pending.WriteString(chunk)
events := make([]toolStreamEvent, 0, 2)
for {
if state.capturing {
if state.pending.Len() > 0 {
state.capture.WriteString(state.pending.String())
state.pending.Reset()
}
prefix, calls, suffix, ready := consumeToolCapture(state.capture.String(), toolNames)
if !ready {
break
}
state.capture.Reset()
state.capturing = false
if prefix != "" {
events = append(events, toolStreamEvent{Content: prefix})
}
if len(calls) > 0 {
events = append(events, toolStreamEvent{ToolCalls: calls})
}
if suffix != "" {
state.pending.WriteString(suffix)
}
continue
}
pending := state.pending.String()
if pending == "" {
break
}
start := findToolSegmentStart(pending)
if start >= 0 {
prefix := pending[:start]
if prefix != "" {
events = append(events, toolStreamEvent{Content: prefix})
}
state.pending.Reset()
state.capture.WriteString(pending[start:])
state.capturing = true
continue
}
safe, hold := splitSafeContent(pending, 64)
if safe == "" {
break
}
state.pending.Reset()
state.pending.WriteString(hold)
events = append(events, toolStreamEvent{Content: safe})
}
return events
}
func flushToolSieve(state *toolStreamSieveState, toolNames []string) []toolStreamEvent {
if state == nil {
return nil
}
events := processToolSieveChunk(state, "", toolNames)
if state.capturing {
raw := state.capture.String()
state.capture.Reset()
state.capturing = false
if raw != "" {
events = append(events, toolStreamEvent{Content: raw})
}
}
if state.pending.Len() > 0 {
events = append(events, toolStreamEvent{Content: state.pending.String()})
state.pending.Reset()
}
return events
}
func splitSafeContent(s string, holdRunes int) (safe, hold string) {
if s == "" || holdRunes <= 0 {
return s, ""
}
runes := []rune(s)
if len(runes) <= holdRunes {
return "", s
}
return string(runes[:len(runes)-holdRunes]), string(runes[len(runes)-holdRunes:])
}
func findToolSegmentStart(s string) int {
if s == "" {
return -1
}
lower := strings.ToLower(s)
keyIdx := strings.Index(lower, "tool_calls")
if keyIdx < 0 {
return -1
}
if start := strings.LastIndex(s[:keyIdx], "{"); start >= 0 {
return start
}
return keyIdx
}
func consumeToolCapture(captured string, toolNames []string) (prefix string, calls []util.ParsedToolCall, suffix string, ready bool) {
if captured == "" {
return "", nil, "", false
}
lower := strings.ToLower(captured)
keyIdx := strings.Index(lower, "tool_calls")
if keyIdx < 0 {
if len([]rune(captured)) >= 256 {
return captured, nil, "", true
}
return "", nil, "", false
}
start := strings.LastIndex(captured[:keyIdx], "{")
if start < 0 {
if len([]rune(captured)) >= 512 {
return captured, nil, "", true
}
return "", nil, "", false
}
obj, end, ok := extractJSONObjectFrom(captured, start)
if !ok {
if len([]rune(captured)) >= 4096 {
return captured, nil, "", true
}
return "", nil, "", false
}
parsed := util.ParseToolCalls(obj, toolNames)
if len(parsed) == 0 {
return captured[:end], nil, captured[end:], true
}
return captured[:start], parsed, captured[end:], true
}
func extractJSONObjectFrom(text string, start int) (string, int, bool) {
if start < 0 || start >= len(text) || text[start] != '{' {
return "", 0, false
}
depth := 0
quote := byte(0)
escaped := false
for i := start; i < len(text); i++ {
ch := text[i]
if quote != 0 {
if escaped {
escaped = false
continue
}
if ch == '\\' {
escaped = true
continue
}
if ch == quote {
quote = 0
}
continue
}
if ch == '"' || ch == '\'' {
quote = ch
continue
}
if ch == '{' {
depth++
continue
}
if ch == '}' {
depth--
if depth == 0 {
end := i + 1
return text[start:end], end, true
}
}
}
return "", 0, false
}
func (h *Handler) holdStreamLease(a *auth.RequestAuth) string {
if a == nil {
return ""
}
now := time.Now()
ttl := streamLeaseTTL()
if ttl <= 0 {
ttl = 15 * time.Minute
}
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(now)
if h.streamLeases == nil {
h.streamLeases = make(map[string]streamLease)
}
leaseID := newLeaseID()
h.streamLeases[leaseID] = streamLease{
Auth: a,
ExpiresAt: now.Add(ttl),
}
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
return leaseID
}
func (h *Handler) releaseStreamLease(leaseID string) bool {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return false
}
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(time.Now())
lease, ok := h.streamLeases[leaseID]
if ok {
delete(h.streamLeases, leaseID)
}
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
if !ok {
return false
}
if h.Auth != nil {
h.Auth.Release(lease.Auth)
}
return true
}
func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {
if len(h.streamLeases) == 0 {
return nil
}
expired := make([]*auth.RequestAuth, 0)
for leaseID, lease := range h.streamLeases {
if now.After(lease.ExpiresAt) {
delete(h.streamLeases, leaseID)
expired = append(expired, lease.Auth)
}
}
return expired
}
func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) {
if h.Auth == nil || len(expired) == 0 {
return
}
for _, a := range expired {
h.Auth.Release(a)
}
}
func (h *Handler) sweepExpiredStreamLeases() {
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(time.Now())
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
}
func streamLeaseTTL() time.Duration {
raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS"))
if raw == "" {
return 15 * time.Minute
}
seconds, err := strconv.Atoi(raw)
if err != nil || seconds <= 0 {
return 15 * time.Minute
}
return time.Duration(seconds) * time.Second
}
func newLeaseID() string {
buf := make([]byte, 16)
if _, err := rand.Read(buf); err == nil {
return hex.EncodeToString(buf)
}
return fmt.Sprintf("lease-%d", time.Now().UnixNano())
}

View File

@@ -209,6 +209,24 @@ func TestHandleStreamToolCallInterceptsWithoutRawContentLeak(t *testing.T) {
if !streamHasToolCallsDelta(frames) {
t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String())
}
foundToolIndex := false
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
toolCalls, _ := delta["tool_calls"].([]any)
for _, tc := range toolCalls {
tcm, _ := tc.(map[string]any)
if _, ok := tcm["index"].(float64); ok {
foundToolIndex = true
}
}
}
}
if !foundToolIndex {
t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String())
}
if streamHasRawToolJSONContent(frames) {
t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String())
}
@@ -236,6 +254,24 @@ func TestHandleStreamReasonerToolCallInterceptsWithoutRawContentLeak(t *testing.
if !streamHasToolCallsDelta(frames) {
t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String())
}
foundToolIndex := false
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
toolCalls, _ := delta["tool_calls"].([]any)
for _, tc := range toolCalls {
tcm, _ := tc.(map[string]any)
if _, ok := tcm["index"].(float64); ok {
foundToolIndex = true
}
}
}
}
if !foundToolIndex {
t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String())
}
if streamHasRawToolJSONContent(frames) {
t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String())
}
@@ -277,7 +313,106 @@ func TestHandleStreamUnknownToolStillIntercepted(t *testing.T) {
if !streamHasToolCallsDelta(frames) {
t.Fatalf("expected tool_calls delta, body=%s", rec.Body.String())
}
foundToolIndex := false
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
toolCalls, _ := delta["tool_calls"].([]any)
for _, tc := range toolCalls {
tcm, _ := tc.(map[string]any)
if _, ok := tcm["index"].(float64); ok {
foundToolIndex = true
}
}
}
}
if !foundToolIndex {
t.Fatalf("expected stream tool_calls item with index, body=%s", rec.Body.String())
}
if streamHasRawToolJSONContent(frames) {
t.Fatalf("raw tool_calls JSON leaked in content delta: %s", rec.Body.String())
}
}
func TestHandleStreamToolsPlainTextStreamsBeforeFinish(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/content","v":"你好,"}`,
`data: {"p":"response/content","v":"这是普通文本回复。"}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid6", "deepseek-chat", "prompt", false, false, []string{"search"})
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
if streamHasToolCallsDelta(frames) {
t.Fatalf("did not expect tool_calls delta for plain text: %s", rec.Body.String())
}
content := strings.Builder{}
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if c, ok := delta["content"].(string); ok {
content.WriteString(c)
}
}
}
if got := content.String(); got == "" {
t.Fatalf("expected streamed content in tool mode plain text, body=%s", rec.Body.String())
}
if streamFinishReason(frames) != "stop" {
t.Fatalf("expected finish_reason=stop, body=%s", rec.Body.String())
}
}
func TestHandleStreamToolCallMixedWithPlainTextSegments(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/content","v":"前置正文A。"}`,
`data: {"p":"response/content","v":"{\"tool_calls\":[{\"name\":\"search\",\"input\":{\"q\":\"go\"}}]}"}`,
`data: {"p":"response/content","v":"后置正文B。"}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid7", "deepseek-chat", "prompt", false, false, []string{"search"})
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
if !streamHasToolCallsDelta(frames) {
t.Fatalf("expected tool_calls delta in mixed stream, body=%s", rec.Body.String())
}
if streamHasRawToolJSONContent(frames) {
t.Fatalf("raw tool_calls JSON leaked in mixed stream: %s", rec.Body.String())
}
content := strings.Builder{}
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if c, ok := delta["content"].(string); ok {
content.WriteString(c)
}
}
}
got := content.String()
if !strings.Contains(got, "前置正文A。") || !strings.Contains(got, "后置正文B。") {
t.Fatalf("expected pre/post plain text to pass sieve, got=%q", got)
}
if streamFinishReason(frames) != "tool_calls" {
t.Fatalf("expected finish_reason=tool_calls, body=%s", rec.Body.String())
}
}

View File

@@ -0,0 +1,83 @@
package openai
import (
"ds2api/internal/auth"
"net/http/httptest"
"testing"
"time"
)
func TestIsVercelStreamPrepareRequest(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_prepare=1", nil)
if !isVercelStreamPrepareRequest(req) {
t.Fatalf("expected prepare request to be detected")
}
req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil)
if isVercelStreamPrepareRequest(req2) {
t.Fatalf("expected non-prepare request")
}
}
func TestIsVercelStreamReleaseRequest(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_release=1", nil)
if !isVercelStreamReleaseRequest(req) {
t.Fatalf("expected release request to be detected")
}
req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil)
if isVercelStreamReleaseRequest(req2) {
t.Fatalf("expected non-release request")
}
}
func TestVercelInternalSecret(t *testing.T) {
t.Run("prefer explicit secret", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
t.Setenv("DS2API_ADMIN_KEY", "admin-fallback")
if got := vercelInternalSecret(); got != "stream-secret" {
t.Fatalf("expected explicit secret, got %q", got)
}
})
t.Run("fallback to admin key", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "")
t.Setenv("DS2API_ADMIN_KEY", "admin-fallback")
if got := vercelInternalSecret(); got != "admin-fallback" {
t.Fatalf("expected admin key fallback, got %q", got)
}
})
t.Run("default admin when env missing", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "")
t.Setenv("DS2API_ADMIN_KEY", "")
if got := vercelInternalSecret(); got != "admin" {
t.Fatalf("expected default admin fallback, got %q", got)
}
})
}
func TestStreamLeaseLifecycle(t *testing.T) {
h := &Handler{}
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false})
if leaseID == "" {
t.Fatalf("expected non-empty lease id")
}
if ok := h.releaseStreamLease(leaseID); !ok {
t.Fatalf("expected lease release success")
}
if ok := h.releaseStreamLease(leaseID); ok {
t.Fatalf("expected duplicate release to fail")
}
}
func TestStreamLeaseTTL(t *testing.T) {
t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "120")
if got := streamLeaseTTL(); got != 120*time.Second {
t.Fatalf("expected ttl=120s, got %v", got)
}
t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "invalid")
if got := streamLeaseTTL(); got != 15*time.Minute {
t.Fatalf("expected default ttl on invalid value, got %v", got)
}
}

View File

@@ -873,7 +873,20 @@ func toStringSlice(v any) ([]string, bool) {
}
func toAccount(m map[string]any) config.Account {
return config.Account{Email: strings.TrimSpace(fmt.Sprintf("%v", m["email"])), Mobile: strings.TrimSpace(fmt.Sprintf("%v", m["mobile"])), Password: strings.TrimSpace(fmt.Sprintf("%v", m["password"])), Token: strings.TrimSpace(fmt.Sprintf("%v", m["token"]))}
return config.Account{
Email: fieldString(m, "email"),
Mobile: fieldString(m, "mobile"),
Password: fieldString(m, "password"),
Token: fieldString(m, "token"),
}
}
func fieldString(m map[string]any, key string) string {
v, ok := m[key]
if !ok || v == nil {
return ""
}
return strings.TrimSpace(fmt.Sprintf("%v", v))
}
func statusOr(v int, d int) int {

View File

@@ -0,0 +1,28 @@
package admin
import "testing"
func TestToAccountMissingFieldsRemainEmpty(t *testing.T) {
acc := toAccount(map[string]any{
"email": "user@example.com",
"password": "secret",
})
if acc.Email != "user@example.com" {
t.Fatalf("unexpected email: %q", acc.Email)
}
if acc.Mobile != "" {
t.Fatalf("expected empty mobile, got %q", acc.Mobile)
}
if acc.Token != "" {
t.Fatalf("expected empty token, got %q", acc.Token)
}
}
func TestFieldStringNilToEmpty(t *testing.T) {
if got := fieldString(map[string]any{"token": nil}, "token"); got != "" {
t.Fatalf("expected empty string for nil field, got %q", got)
}
if got := fieldString(map[string]any{}, "token"); got != "" {
t.Fatalf("expected empty string for missing field, got %q", got)
}
}

View File

@@ -50,7 +50,7 @@ func (r *Resolver) Determine(req *http.Request) (*RequestAuth, error) {
return &RequestAuth{UseConfigToken: false, DeepSeekToken: callerKey, resolver: r, TriedAccounts: map[string]bool{}}, nil
}
target := strings.TrimSpace(req.Header.Get("X-Ds2-Target-Account"))
acc, ok := r.Pool.Acquire(target, nil)
acc, ok := r.Pool.AcquireWait(ctx, target, nil)
if !ok {
return nil, ErrNoAccount
}

View File

@@ -1,7 +1,9 @@
package config
import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"log/slog"
@@ -41,7 +43,17 @@ func (a Account) Identifier() string {
if strings.TrimSpace(a.Email) != "" {
return strings.TrimSpace(a.Email)
}
return strings.TrimSpace(a.Mobile)
if strings.TrimSpace(a.Mobile) != "" {
return strings.TrimSpace(a.Mobile)
}
// Backward compatibility: old configs may contain token-only accounts.
// Use a stable non-sensitive synthetic id so they can still join the pool.
token := strings.TrimSpace(a.Token)
if token == "" {
return ""
}
sum := sha256.Sum256([]byte(token))
return "token:" + hex.EncodeToString(sum[:8])
}
type Config struct {

View File

@@ -0,0 +1,41 @@
package config
import (
"strings"
"testing"
)
func TestAccountIdentifierFallsBackToTokenHash(t *testing.T) {
acc := Account{Token: "example-token-value"}
id := acc.Identifier()
if !strings.HasPrefix(id, "token:") {
t.Fatalf("expected token-prefixed identifier, got %q", id)
}
if len(id) != len("token:")+16 {
t.Fatalf("unexpected identifier length: %d (%q)", len(id), id)
}
}
func TestStoreFindAccountWithTokenOnlyIdentifier(t *testing.T) {
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[{"token":"token-only-account"}]
}`)
store := LoadStore()
accounts := store.Accounts()
if len(accounts) != 1 {
t.Fatalf("expected 1 account, got %d", len(accounts))
}
id := accounts[0].Identifier()
if id == "" {
t.Fatalf("expected synthetic identifier for token-only account")
}
found, ok := store.FindAccount(id)
if !ok {
t.Fatalf("expected FindAccount to locate token-only account by synthetic id")
}
if found.Token != "token-only-account" {
t.Fatalf("unexpected token value: %q", found.Token)
}
}

Binary file not shown.

View File

@@ -0,0 +1,6 @@
package deepseek
import _ "embed"
//go:embed assets/sha3_wasm_bg.7b9ca65ddd.wasm
var embeddedWASM []byte

View File

@@ -33,8 +33,11 @@ func (p *PowSolver) init(ctx context.Context) error {
p.once.Do(func() {
wasmBytes, err := os.ReadFile(p.wasmPath)
if err != nil {
p.err = err
return
if len(embeddedWASM) == 0 {
p.err = err
return
}
wasmBytes = embeddedWASM
}
p.runtime = wazero.NewRuntime(ctx)
p.compiled, p.err = p.runtime.CompileModule(ctx, wasmBytes)

View File

@@ -298,3 +298,20 @@ func FormatOpenAIToolCalls(calls []ParsedToolCall) []map[string]any {
}
return out
}
func FormatOpenAIStreamToolCalls(calls []ParsedToolCall) []map[string]any {
out := make([]map[string]any, 0, len(calls))
for i, c := range calls {
args, _ := json.Marshal(c.Input)
out = append(out, map[string]any{
"index": i,
"id": "call_" + strings.ReplaceAll(uuid.NewString(), "-", ""),
"type": "function",
"function": map[string]any{
"name": c.Name,
"arguments": string(args),
},
})
}
return out
}

103
internal/webui/build.go Normal file
View File

@@ -0,0 +1,103 @@
package webui
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"ds2api/internal/config"
)
const (
defaultBuildTimeout = 5 * time.Minute
)
func EnsureBuiltOnStartup() {
if !shouldAutoBuild() {
return
}
staticDir := resolveStaticAdminDir(config.StaticAdminDir())
if hasBuiltUI(staticDir) {
return
}
if err := buildWebUI(staticDir); err != nil {
config.Logger.Warn("[webui] auto build failed", "error", err)
return
}
if hasBuiltUI(staticDir) {
config.Logger.Info("[webui] auto build completed", "dir", staticDir)
return
}
config.Logger.Warn("[webui] auto build finished but output missing", "dir", staticDir)
}
func shouldAutoBuild() bool {
raw := strings.TrimSpace(os.Getenv("DS2API_AUTO_BUILD_WEBUI"))
if raw == "" {
return !config.IsVercel()
}
switch strings.ToLower(raw) {
case "1", "true", "yes", "on":
return true
case "0", "false", "no", "off":
return false
default:
return !config.IsVercel()
}
}
func hasBuiltUI(staticDir string) bool {
if strings.TrimSpace(staticDir) == "" {
return false
}
indexPath := filepath.Join(staticDir, "index.html")
st, err := os.Stat(indexPath)
return err == nil && !st.IsDir()
}
func buildWebUI(staticDir string) error {
if _, err := exec.LookPath("npm"); err != nil {
return fmt.Errorf("npm not found in PATH: %w", err)
}
if strings.TrimSpace(staticDir) == "" {
return errors.New("static admin dir is empty")
}
config.Logger.Info("[webui] static files missing, running npm build")
ctx, cancel := context.WithTimeout(context.Background(), defaultBuildTimeout)
defer cancel()
if _, err := os.Stat(filepath.Join("webui", "node_modules")); err != nil {
if !os.IsNotExist(err) {
return err
}
installCmd := exec.CommandContext(ctx, "npm", "ci", "--prefix", "webui")
installCmd.Stdout = os.Stdout
installCmd.Stderr = os.Stderr
if err := installCmd.Run(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return fmt.Errorf("webui npm ci timed out after %s", defaultBuildTimeout)
}
return err
}
}
if err := os.MkdirAll(staticDir, 0o755); err != nil {
return err
}
cmd := exec.CommandContext(ctx, "npm", "run", "build", "--prefix", "webui", "--", "--outDir", staticDir, "--emptyOutDir")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return fmt.Errorf("webui build timed out after %s", defaultBuildTimeout)
}
return err
}
return nil
}

View File

@@ -21,7 +21,7 @@ type Handler struct {
}
func NewHandler() *Handler {
return &Handler{StaticDir: config.StaticAdminDir()}
return &Handler{StaticDir: resolveStaticAdminDir(config.StaticAdminDir())}
}
func RegisterRoutes(r chi.Router, h *Handler) {
@@ -47,15 +47,20 @@ func (h *Handler) index(w http.ResponseWriter, _ *http.Request) {
}
func (h *Handler) admin(w http.ResponseWriter, r *http.Request) {
if fi, err := os.Stat(h.StaticDir); err != nil || !fi.IsDir() {
http.Error(w, "WebUI not built. Run `cd webui && npm run build` first.", http.StatusNotFound)
staticDir := resolveStaticAdminDir(h.StaticDir)
if fi, err := os.Stat(staticDir); err == nil && fi.IsDir() {
h.serveFromDisk(w, r, staticDir)
return
}
http.Error(w, "WebUI not built. Run `cd webui && npm run build` first.", http.StatusNotFound)
}
func (h *Handler) serveFromDisk(w http.ResponseWriter, r *http.Request, staticDir string) {
path := strings.TrimPrefix(r.URL.Path, "/admin")
path = strings.TrimPrefix(path, "/")
if path != "" && strings.Contains(path, ".") {
full := filepath.Join(h.StaticDir, filepath.Clean(path))
if !strings.HasPrefix(full, h.StaticDir) {
full := filepath.Join(staticDir, filepath.Clean(path))
if !strings.HasPrefix(full, staticDir) {
http.NotFound(w, r)
return
}
@@ -71,7 +76,7 @@ func (h *Handler) admin(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
index := filepath.Join(h.StaticDir, "index.html")
index := filepath.Join(staticDir, "index.html")
if _, err := os.Stat(index); err != nil {
http.Error(w, "index.html not found", http.StatusNotFound)
return
@@ -79,3 +84,38 @@ func (h *Handler) admin(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-store, must-revalidate")
http.ServeFile(w, r, index)
}
func resolveStaticAdminDir(preferred string) string {
if strings.TrimSpace(os.Getenv("DS2API_STATIC_ADMIN_DIR")) != "" {
return filepath.Clean(preferred)
}
candidates := []string{preferred}
if wd, err := os.Getwd(); err == nil {
candidates = append(candidates, filepath.Join(wd, "static/admin"))
}
if exe, err := os.Executable(); err == nil {
exeDir := filepath.Dir(exe)
candidates = append(candidates,
filepath.Join(exeDir, "static/admin"),
filepath.Join(filepath.Dir(exeDir), "static/admin"),
)
}
// Common serverless locations.
candidates = append(candidates, "/var/task/static/admin", "/var/task/user/static/admin")
seen := map[string]struct{}{}
for _, c := range candidates {
c = filepath.Clean(strings.TrimSpace(c))
if c == "" {
continue
}
if _, ok := seen[c]; ok {
continue
}
seen[c] = struct{}{}
if fi, err := os.Stat(c); err == nil && fi.IsDir() {
return c
}
}
return filepath.Clean(preferred)
}

View File

@@ -1,6 +1,83 @@
{
"version": 2,
"buildCommand": "npm ci --prefix webui && npm run build --prefix webui",
"outputDirectory": "static",
"functions": {
"api/chat-stream.js": {
"includeFiles": "**/sha3_wasm_bg.7b9ca65ddd.wasm",
"maxDuration": 300
},
"api/index.go": {
"maxDuration": 300
}
},
"rewrites": [
{
"source": "/v1/chat/completions",
"has": [
{
"type": "query",
"key": "__go"
}
],
"destination": "/api/index"
},
{
"source": "/v1/chat/completions",
"destination": "/api/chat-stream"
},
{
"source": "/admin/login",
"destination": "/api/index"
},
{
"source": "/admin/verify",
"destination": "/api/index"
},
{
"source": "/admin/config",
"destination": "/api/index"
},
{
"source": "/admin/keys(.*)",
"destination": "/api/index"
},
{
"source": "/admin/accounts(.*)",
"destination": "/api/index"
},
{
"source": "/admin/queue/status",
"destination": "/api/index"
},
{
"source": "/admin/import",
"destination": "/api/index"
},
{
"source": "/admin/test",
"destination": "/api/index"
},
{
"source": "/admin/vercel/(.*)",
"destination": "/api/index"
},
{
"source": "/admin/export",
"destination": "/api/index"
},
{
"source": "/admin",
"destination": "/admin/index.html"
},
{
"source": "/admin/assets/(.*)",
"destination": "/admin/assets/$1"
},
{
"source": "/admin/(.*)",
"destination": "/admin/index.html"
},
{
"source": "/(.*)",
"destination": "/api/index"

View File

@@ -89,6 +89,8 @@ export default function ApiTester({ config, onMessage, authFetch }) {
const runTest = async () => {
if (loading) return
const startedAt = Date.now()
setLoading(true)
setIsStreaming(true)
setResponse(null)
@@ -113,7 +115,8 @@ export default function ApiTester({ config, onMessage, authFetch }) {
headers['X-Ds2-Target-Account'] = selectedAccount
}
const res = await fetch('/v1/chat/completions', {
const endpoint = streamingMode ? '/v1/chat/completions' : '/v1/chat/completions?__go=1'
const res = await fetch(endpoint, {
method: 'POST',
headers,
body: JSON.stringify({
@@ -175,7 +178,8 @@ export default function ApiTester({ config, onMessage, authFetch }) {
} else {
const data = await res.json()
setResponse({ success: true, status_code: res.status, ...data })
onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: 'N/A' }))
const elapsed = Math.max(0, Date.now() - startedAt)
onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: elapsed }))
}
} catch (e) {
if (e.name === 'AbortError') {