From 770f5719d80efffd8e51b5e6396e4f3f658ec65f Mon Sep 17 00:00:00 2001 From: CJACK Date: Mon, 16 Feb 2026 23:22:04 +0800 Subject: [PATCH] feat: implement stream lease management for Vercel hybrid streaming path to align occupancy duration with native Go streaming behavior. --- .env.example | 5 + DEPLOY.en.md | 2 + DEPLOY.md | 2 + README.MD | 2 + README.en.md | 2 + api/chat-stream.js | 360 ++++++++++-------- internal/adapter/openai/handler.go | 168 +++++++- .../adapter/openai/vercel_prepare_test.go | 39 ++ webui/src/components/ApiTester.jsx | 5 +- 9 files changed, 426 insertions(+), 159 deletions(-) diff --git a/.env.example b/.env.example index ddb7fef..21a4d2a 100644 --- a/.env.example +++ b/.env.example @@ -71,6 +71,11 @@ DS2API_ADMIN_KEY=admin # Optional: falls back to DS2API_ADMIN_KEY when unset. # DS2API_VERCEL_INTERNAL_SECRET=change-me +# Stream lease TTL seconds for Vercel hybrid streaming. +# During this window, the managed account stays occupied until Node calls release. +# Default: 900 (15 minutes) +# DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS=900 + # --------------------------------------------------------------- # Vercel sync integration (optional) # --------------------------------------------------------------- diff --git a/DEPLOY.en.md b/DEPLOY.en.md index 9c23c0f..523c098 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -70,6 +70,7 @@ Notes: - To mitigate Go Runtime streaming buffering, `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) - `api/chat-stream.js` automatically falls back to the Go entry for non-stream requests or requests with `tools` (internal `__go=1`) - `api/chat-stream.js` is data-path only (stream relay + SSE conversion); auth/account/session/PoW preparation still comes from an internal Go prepare endpoint (enabled on Vercel only) +- Go prepare creates a stream lease and Node releases it when streaming ends, keeping account occupancy semantics aligned with native Go streaming Minimum environment variables: @@ -86,6 +87,7 @@ Optional: - `DS2API_ACCOUNT_MAX_QUEUE` (waiting queue limit, default=`recommended_concurrency`) - `DS2API_ACCOUNT_QUEUE_SIZE` (alias of the same setting) - `DS2API_VERCEL_INTERNAL_SECRET` (optional internal auth secret for Vercel hybrid streaming path; falls back to `DS2API_ADMIN_KEY` when unset) +- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` (optional stream lease TTL in seconds, default `900`) Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`). When inflight slots are full, requests are queued first; with default queue size, 429 typically starts around `account_count * 4`. diff --git a/DEPLOY.md b/DEPLOY.md index 109f7fd..48e44b0 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -70,6 +70,7 @@ docker-compose up -d --build - 为缓解 Go Runtime 的流式缓冲,`/v1/chat/completions` 在 Vercel 上会优先走 `api/chat-stream.js`(Node Runtime) - `api/chat-stream.js` 对非流式请求或 `tools` 请求会自动回退到 Go 入口(内部 `__go=1`) - `api/chat-stream.js` 仅负责流式数据转发与 SSE 转换;鉴权、账号选择、会话创建、PoW 计算仍由 Go 内部 prepare 接口完成(仅 Vercel 启用) +- Go prepare 会创建流式 lease,Node 在流结束后回调 release;账号占用语义与 Go 原生流式保持一致 至少配置环境变量: @@ -86,6 +87,7 @@ docker-compose up -d --build - `DS2API_ACCOUNT_MAX_QUEUE`(等待队列上限,默认=`recommended_concurrency`) - `DS2API_ACCOUNT_QUEUE_SIZE`(同上别名) - `DS2API_VERCEL_INTERNAL_SECRET`(可选,Vercel 混合流式链路内部鉴权;未设置时回退使用 `DS2API_ADMIN_KEY`) +- `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS`(可选,流式 lease 过期秒数,默认 `900`) 并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。 当 in-flight 满时,请求先进入等待队列;默认队列上限等于建议并发值,因此默认 429 阈值约为 `账号数量 × 4`。 diff --git a/README.MD b/README.MD index 1b5fa2c..cc6e0b7 100644 --- a/README.MD +++ b/README.MD @@ -89,6 +89,7 @@ docker-compose logs -f - `vercel.json` 会在构建阶段自动执行 `npm ci --prefix webui && npm run build --prefix webui` - `/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`(Node Runtime)以保证实时 SSE - `api/chat-stream.js` 仅负责流式数据转发;鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口处理 +- Go prepare 会下发 `lease_id`,Node 在流结束后调用 release,确保账号占用时长与 Go 原生流式一致 - 至少配置: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON`(JSON 字符串或 Base64) @@ -169,6 +170,7 @@ cp config.example.json config.json | `DS2API_STATIC_ADMIN_DIR` | 管理台静态文件目录 | | `DS2API_AUTO_BUILD_WEBUI` | 启动时缺失 WebUI 时是否自动执行 npm build(默认:本地开启,Vercel 关闭) | | `DS2API_VERCEL_INTERNAL_SECRET` | Vercel 混合流式链路内部鉴权密钥(可选;未设置时回退用 `DS2API_ADMIN_KEY`) | +| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Vercel 流式 lease 过期秒数(默认 `900`) | | `VERCEL_TOKEN` | Vercel 同步 token(可选) | | `VERCEL_PROJECT_ID` | Vercel 项目 ID(可选) | | `VERCEL_TEAM_ID` | Vercel 团队 ID(可选) | diff --git a/README.en.md b/README.en.md index 31ac889..b84cbfc 100644 --- a/README.en.md +++ b/README.en.md @@ -89,6 +89,7 @@ docker-compose logs -f - `vercel.json` runs `npm ci --prefix webui && npm run build --prefix webui` during build - `/v1/chat/completions` is routed to `api/chat-stream.js` (Node Runtime) on Vercel to preserve real-time SSE - `api/chat-stream.js` is data-path only; auth/account/session/PoW preparation still comes from an internal Go prepare endpoint +- Go prepare returns a `lease_id`; Node releases it at stream end so account occupancy duration stays aligned with native Go streaming behavior - Minimum env vars: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON` (raw JSON or Base64) @@ -169,6 +170,7 @@ cp config.example.json config.json | `DS2API_STATIC_ADMIN_DIR` | Admin static assets dir | | `DS2API_AUTO_BUILD_WEBUI` | Auto run npm build on startup when WebUI assets are missing (default: enabled locally, disabled on Vercel) | | `DS2API_VERCEL_INTERNAL_SECRET` | Internal auth secret for Vercel hybrid streaming path (optional; falls back to `DS2API_ADMIN_KEY` if unset) | +| `DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS` | Stream lease TTL seconds for Vercel hybrid streaming (default `900`) | | `VERCEL_TOKEN` | Vercel sync token (optional) | | `VERCEL_PROJECT_ID` | Vercel project ID (optional) | | `VERCEL_TEAM_ID` | Vercel team ID (optional) | diff --git a/api/chat-stream.js b/api/chat-stream.js index 2742d2f..8ea21fe 100644 --- a/api/chat-stream.js +++ b/api/chat-stream.js @@ -59,6 +59,7 @@ module.exports = async function handler(req, res) { const model = asString(prep.body.model) || asString(payload.model); const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; + const leaseID = asString(prep.body.lease_id); const deepseekToken = asString(prep.body.deepseek_token); const powHeader = asString(prep.body.pow_header); const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; @@ -66,138 +67,148 @@ module.exports = async function handler(req, res) { const thinkingEnabled = toBool(prep.body.thinking_enabled); const searchEnabled = toBool(prep.body.search_enabled); - if (!model || !deepseekToken || !powHeader || !completionPayload) { + if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) { writeOpenAIError(res, 500, 'invalid vercel prepare response'); return; } - - const completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { - method: 'POST', - headers: { - ...BASE_HEADERS, - authorization: `Bearer ${deepseekToken}`, - 'x-ds-pow-response': powHeader, - }, - body: JSON.stringify(completionPayload), - }); - - if (!completionRes.ok || !completionRes.body) { - const detail = await safeReadText(completionRes); - writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.'); - return; - } - - res.statusCode = 200; - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache, no-transform'); - res.setHeader('Connection', 'keep-alive'); - res.setHeader('X-Accel-Buffering', 'no'); - if (typeof res.flushHeaders === 'function') { - res.flushHeaders(); - } - - const created = Math.floor(Date.now() / 1000); - let firstChunkSent = false; - let currentType = thinkingEnabled ? 'thinking' : 'text'; - let thinkingText = ''; - let outputText = ''; - const decoder = new TextDecoder(); - const reader = completionRes.body.getReader(); - let buffered = ''; - - const sendFrame = (obj) => { - res.write(`data: ${JSON.stringify(obj)}\n\n`); - if (typeof res.flush === 'function') { - res.flush(); - } - }; - - const finish = (reason) => { - sendFrame({ - id: sessionID, - object: 'chat.completion.chunk', - created, - model, - choices: [{ delta: {}, index: 0, finish_reason: reason }], - usage: buildUsage(finalPrompt, thinkingText, outputText), - }); - res.write('data: [DONE]\n\n'); - res.end(); - }; - + const releaseLease = createLeaseReleaser(req, leaseID); try { - // eslint-disable-next-line no-constant-condition - while (true) { - const { value, done } = await reader.read(); - if (done) { - break; - } - buffered += decoder.decode(value, { stream: true }); - const lines = buffered.split('\n'); - buffered = lines.pop() || ''; + const completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { + method: 'POST', + headers: { + ...BASE_HEADERS, + authorization: `Bearer ${deepseekToken}`, + 'x-ds-pow-response': powHeader, + }, + body: JSON.stringify(completionPayload), + }); - for (const rawLine of lines) { - const line = rawLine.trim(); - if (!line.startsWith('data:')) { - continue; - } - const dataStr = line.slice(5).trim(); - if (!dataStr) { - continue; - } - if (dataStr === '[DONE]') { - finish('stop'); - return; - } - let chunk; - try { - chunk = JSON.parse(dataStr); - } catch (_err) { - continue; - } - if (chunk.error || chunk.code === 'content_filter') { - finish('content_filter'); - return; - } - const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType); - currentType = parsed.newType; - if (parsed.finished) { - finish('stop'); - return; - } - - for (const p of parsed.parts) { - if (!p.text) { - continue; - } - if (searchEnabled && isCitation(p.text)) { - continue; - } - const delta = {}; - if (!firstChunkSent) { - delta.role = 'assistant'; - firstChunkSent = true; - } - if (p.type === 'thinking') { - thinkingText += p.text; - delta.reasoning_content = p.text; - } else { - outputText += p.text; - delta.content = p.text; - } - sendFrame({ - id: sessionID, - object: 'chat.completion.chunk', - created, - model, - choices: [{ delta, index: 0 }], - }); - } - } + if (!completionRes.ok || !completionRes.body) { + const detail = await safeReadText(completionRes); + writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.'); + return; } - finish('stop'); - } catch (_err) { - finish('stop'); + + res.statusCode = 200; + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + if (typeof res.flushHeaders === 'function') { + res.flushHeaders(); + } + + const created = Math.floor(Date.now() / 1000); + let firstChunkSent = false; + let currentType = thinkingEnabled ? 'thinking' : 'text'; + let thinkingText = ''; + let outputText = ''; + const decoder = new TextDecoder(); + const reader = completionRes.body.getReader(); + let buffered = ''; + let ended = false; + + const sendFrame = (obj) => { + res.write(`data: ${JSON.stringify(obj)}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + }; + + const finish = async (reason) => { + if (ended) { + return; + } + ended = true; + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta: {}, index: 0, finish_reason: reason }], + usage: buildUsage(finalPrompt, thinkingText, outputText), + }); + res.write('data: [DONE]\n\n'); + await releaseLease(); + res.end(); + }; + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + buffered += decoder.decode(value, { stream: true }); + const lines = buffered.split('\n'); + buffered = lines.pop() || ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line.startsWith('data:')) { + continue; + } + const dataStr = line.slice(5).trim(); + if (!dataStr) { + continue; + } + if (dataStr === '[DONE]') { + await finish('stop'); + return; + } + let chunk; + try { + chunk = JSON.parse(dataStr); + } catch (_err) { + continue; + } + if (chunk.error || chunk.code === 'content_filter') { + await finish('content_filter'); + return; + } + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType); + currentType = parsed.newType; + if (parsed.finished) { + await finish('stop'); + return; + } + + for (const p of parsed.parts) { + if (!p.text) { + continue; + } + if (searchEnabled && isCitation(p.text)) { + continue; + } + const delta = {}; + if (!firstChunkSent) { + delta.role = 'assistant'; + firstChunkSent = true; + } + if (p.type === 'thinking') { + thinkingText += p.text; + delta.reasoning_content = p.text; + } else { + outputText += p.text; + delta.content = p.text; + } + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta, index: 0 }], + }); + } + } + } + await finish('stop'); + } catch (_err) { + await finish('stop'); + } + } finally { + await releaseLease(); } }; @@ -236,26 +247,12 @@ async function readRawBody(req) { } async function fetchStreamPrepare(req, rawBody) { - const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; - const host = asString(header(req, 'host')); - const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); - url.searchParams.set('__go', '1'); + const url = buildInternalGoURL(req); url.searchParams.set('__stream_prepare', '1'); - const protectionBypass = resolveProtectionBypass(req); - if (protectionBypass) { - url.searchParams.set('x-vercel-protection-bypass', protectionBypass); - } const upstream = await fetch(url.toString(), { method: 'POST', - headers: { - authorization: asString(header(req, 'authorization')), - 'x-api-key': asString(header(req, 'x-api-key')), - 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), - 'x-ds2-internal-token': internalSecret(), - 'x-vercel-protection-bypass': protectionBypass, - 'content-type': asString(header(req, 'content-type')) || 'application/json', - }, + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), body: rawBody, }); @@ -310,6 +307,68 @@ function internalSecret() { return asString(process.env.DS2API_VERCEL_INTERNAL_SECRET) || asString(process.env.DS2API_ADMIN_KEY) || 'admin'; } +function buildInternalGoURL(req) { + const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; + const host = asString(header(req, 'host')); + const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); + url.searchParams.set('__go', '1'); + const protectionBypass = resolveProtectionBypass(req); + if (protectionBypass) { + url.searchParams.set('x-vercel-protection-bypass', protectionBypass); + } + return url; +} + +function buildInternalGoHeaders(req, opts = {}) { + const headers = { + authorization: asString(header(req, 'authorization')), + 'x-api-key': asString(header(req, 'x-api-key')), + 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), + 'x-vercel-protection-bypass': resolveProtectionBypass(req), + }; + if (opts.withInternalToken) { + headers['x-ds2-internal-token'] = internalSecret(); + } + if (opts.withContentType) { + headers['content-type'] = asString(header(req, 'content-type')) || 'application/json'; + } + return headers; +} + +function createLeaseReleaser(req, leaseID) { + let released = false; + return async () => { + if (released || !leaseID) { + return; + } + released = true; + try { + await releaseStreamLease(req, leaseID); + } catch (_err) { + // Ignore release errors. Lease TTL cleanup on Go side still prevents permanent leaks. + } + }; +} + +async function releaseStreamLease(req, leaseID) { + const url = buildInternalGoURL(req); + url.searchParams.set('__stream_release', '1'); + const body = Buffer.from(JSON.stringify({ lease_id: leaseID })); + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 1500); + try { + await fetch(url.toString(), { + method: 'POST', + headers: buildInternalGoHeaders(req, { withInternalToken: true, withContentType: true }), + body, + signal: controller.signal, + }); + } finally { + clearTimeout(timeout); + } +} + function resolveProtectionBypass(req) { const fromHeader = asString(header(req, 'x-vercel-protection-bypass')); if (fromHeader) { @@ -500,24 +559,11 @@ function estimateTokens(text) { } async function proxyToGo(req, res, rawBody) { - const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; - const host = asString(header(req, 'host')); - const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); - url.searchParams.set('__go', '1'); - const protectionBypass = resolveProtectionBypass(req); - if (protectionBypass) { - url.searchParams.set('x-vercel-protection-bypass', protectionBypass); - } + const url = buildInternalGoURL(req); const upstream = await fetch(url.toString(), { method: 'POST', - headers: { - authorization: asString(header(req, 'authorization')), - 'x-api-key': asString(header(req, 'x-api-key')), - 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), - 'x-vercel-protection-bypass': protectionBypass, - 'content-type': asString(header(req, 'content-type')) || 'application/json', - }, + headers: buildInternalGoHeaders(req, { withContentType: true }), body: rawBody, }); diff --git a/internal/adapter/openai/handler.go b/internal/adapter/openai/handler.go index afac493..99793b7 100644 --- a/internal/adapter/openai/handler.go +++ b/internal/adapter/openai/handler.go @@ -3,13 +3,17 @@ package openai import ( "bufio" "context" + "crypto/rand" "crypto/subtle" + "encoding/hex" "encoding/json" "fmt" "io" "net/http" "os" + "strconv" "strings" + "sync" "time" "github.com/go-chi/chi/v5" @@ -25,6 +29,14 @@ type Handler struct { Store *config.Store Auth *auth.Resolver DS *deepseek.Client + + leaseMu sync.Mutex + streamLeases map[string]streamLease +} + +type streamLease struct { + Auth *auth.RequestAuth + ExpiresAt time.Time } func RegisterRoutes(r chi.Router, h *Handler) { @@ -37,6 +49,10 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) { } func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { + if isVercelStreamReleaseRequest(r) { + h.handleVercelStreamRelease(w, r) + return + } if isVercelStreamPrepareRequest(r) { h.handleVercelStreamPrepare(w, r) return @@ -118,6 +134,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque http.NotFound(w, r) return } + h.sweepExpiredStreamLeases() internalSecret := vercelInternalSecret() internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { @@ -134,7 +151,12 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque writeOpenAIError(w, status, err.Error()) return } - defer h.Auth.Release(a) + leased := false + defer func() { + if !leased { + h.Auth.Release(a) + } + }() r = r.WithContext(auth.WithAuth(r.Context(), a)) var req map[string]any @@ -193,8 +215,15 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque "thinking_enabled": thinkingEnabled, "search_enabled": searchEnabled, } + leaseID := h.holdStreamLease(a) + if leaseID == "" { + writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") + return + } + leased = true writeJSON(w, http.StatusOK, map[string]any{ "session_id": sessionID, + "lease_id": leaseID, "model": model, "final_prompt": finalPrompt, "thinking_enabled": thinkingEnabled, @@ -205,6 +234,37 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque }) } +func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + h.sweepExpiredStreamLeases() + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + leaseID, _ := req["lease_id"].(string) + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") + return + } + if !h.releaseStreamLease(leaseID) { + writeOpenAIError(w, http.StatusNotFound, "stream lease not found") + return + } + writeJSON(w, http.StatusOK, map[string]any{"success": true}) +} + func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { @@ -592,6 +652,13 @@ func isVercelStreamPrepareRequest(r *http.Request) bool { return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" } +func isVercelStreamReleaseRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" +} + func vercelInternalSecret() string { if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { return v @@ -601,3 +668,102 @@ func vercelInternalSecret() string { } return "admin" } + +func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { + if a == nil { + return "" + } + now := time.Now() + ttl := streamLeaseTTL() + if ttl <= 0 { + ttl = 15 * time.Minute + } + + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(now) + if h.streamLeases == nil { + h.streamLeases = make(map[string]streamLease) + } + leaseID := newLeaseID() + h.streamLeases[leaseID] = streamLease{ + Auth: a, + ExpiresAt: now.Add(ttl), + } + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) + return leaseID +} + +func (h *Handler) releaseStreamLease(leaseID string) bool { + leaseID = strings.TrimSpace(leaseID) + if leaseID == "" { + return false + } + + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(time.Now()) + lease, ok := h.streamLeases[leaseID] + if ok { + delete(h.streamLeases, leaseID) + } + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) + + if !ok { + return false + } + if h.Auth != nil { + h.Auth.Release(lease.Auth) + } + return true +} + +func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth { + if len(h.streamLeases) == 0 { + return nil + } + expired := make([]*auth.RequestAuth, 0) + for leaseID, lease := range h.streamLeases { + if now.After(lease.ExpiresAt) { + delete(h.streamLeases, leaseID) + expired = append(expired, lease.Auth) + } + } + return expired +} + +func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) { + if h.Auth == nil || len(expired) == 0 { + return + } + for _, a := range expired { + h.Auth.Release(a) + } +} + +func (h *Handler) sweepExpiredStreamLeases() { + h.leaseMu.Lock() + expired := h.popExpiredLeasesLocked(time.Now()) + h.leaseMu.Unlock() + h.releaseExpiredAuths(expired) +} + +func streamLeaseTTL() time.Duration { + raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS")) + if raw == "" { + return 15 * time.Minute + } + seconds, err := strconv.Atoi(raw) + if err != nil || seconds <= 0 { + return 15 * time.Minute + } + return time.Duration(seconds) * time.Second +} + +func newLeaseID() string { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err == nil { + return hex.EncodeToString(buf) + } + return fmt.Sprintf("lease-%d", time.Now().UnixNano()) +} diff --git a/internal/adapter/openai/vercel_prepare_test.go b/internal/adapter/openai/vercel_prepare_test.go index 01b209c..0dfaf28 100644 --- a/internal/adapter/openai/vercel_prepare_test.go +++ b/internal/adapter/openai/vercel_prepare_test.go @@ -1,8 +1,10 @@ package openai import ( + "ds2api/internal/auth" "net/http/httptest" "testing" + "time" ) func TestIsVercelStreamPrepareRequest(t *testing.T) { @@ -17,6 +19,18 @@ func TestIsVercelStreamPrepareRequest(t *testing.T) { } } +func TestIsVercelStreamReleaseRequest(t *testing.T) { + req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_release=1", nil) + if !isVercelStreamReleaseRequest(req) { + t.Fatalf("expected release request to be detected") + } + + req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil) + if isVercelStreamReleaseRequest(req2) { + t.Fatalf("expected non-release request") + } +} + func TestVercelInternalSecret(t *testing.T) { t.Run("prefer explicit secret", func(t *testing.T) { t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") @@ -42,3 +56,28 @@ func TestVercelInternalSecret(t *testing.T) { } }) } + +func TestStreamLeaseLifecycle(t *testing.T) { + h := &Handler{} + leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}) + if leaseID == "" { + t.Fatalf("expected non-empty lease id") + } + if ok := h.releaseStreamLease(leaseID); !ok { + t.Fatalf("expected lease release success") + } + if ok := h.releaseStreamLease(leaseID); ok { + t.Fatalf("expected duplicate release to fail") + } +} + +func TestStreamLeaseTTL(t *testing.T) { + t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "120") + if got := streamLeaseTTL(); got != 120*time.Second { + t.Fatalf("expected ttl=120s, got %v", got) + } + t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "invalid") + if got := streamLeaseTTL(); got != 15*time.Minute { + t.Fatalf("expected default ttl on invalid value, got %v", got) + } +} diff --git a/webui/src/components/ApiTester.jsx b/webui/src/components/ApiTester.jsx index d240f4b..c2c228a 100644 --- a/webui/src/components/ApiTester.jsx +++ b/webui/src/components/ApiTester.jsx @@ -89,6 +89,8 @@ export default function ApiTester({ config, onMessage, authFetch }) { const runTest = async () => { if (loading) return + const startedAt = Date.now() + setLoading(true) setIsStreaming(true) setResponse(null) @@ -175,7 +177,8 @@ export default function ApiTester({ config, onMessage, authFetch }) { } else { const data = await res.json() setResponse({ success: true, status_code: res.status, ...data }) - onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: 'N/A' })) + const elapsed = Math.max(0, Date.now() - startedAt) + onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: elapsed })) } } catch (e) { if (e.name === 'AbortError') {