diff --git a/.env.example b/.env.example index 6812b39..a4f392e 100644 --- a/.env.example +++ b/.env.example @@ -66,6 +66,11 @@ DS2API_ADMIN_KEY=admin # Default: enabled on local/Docker, disabled on Vercel. # DS2API_AUTO_BUILD_WEBUI=true +# Internal auth secret used by the Vercel hybrid streaming path +# (Go prepare endpoint <-> Node stream function). +# Optional: falls back to DS2API_ADMIN_KEY when unset. +# DS2API_VERCEL_INTERNAL_SECRET=change-me + # --------------------------------------------------------------- # Vercel sync integration (optional) # --------------------------------------------------------------- diff --git a/DEPLOY.en.md b/DEPLOY.en.md index 42f3b48..3d09833 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -67,6 +67,9 @@ Notes: - Rewrites and cache headers: `vercel.json` - Build stage runs `npm ci --prefix webui && npm run build --prefix webui` automatically - `vercel.json` routes `/admin/assets/*` and the `/admin` page to static output, while `/admin/*` APIs still go to `api/index` +- To mitigate Go Runtime streaming buffering, `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) +- `api/chat-stream.js` automatically falls back to the Go entry for non-stream requests or requests with `tools` (internal `__go=1`) +- `api/chat-stream.js` is data-path only (stream relay + SSE conversion); auth/account/session/PoW preparation still comes from an internal Go prepare endpoint (enabled on Vercel only) Minimum environment variables: @@ -82,6 +85,7 @@ Optional: - `DS2API_ACCOUNT_CONCURRENCY` (alias of the same setting) - `DS2API_ACCOUNT_MAX_QUEUE` (waiting queue limit, default=`recommended_concurrency`) - `DS2API_ACCOUNT_QUEUE_SIZE` (alias of the same setting) +- `DS2API_VERCEL_INTERNAL_SECRET` (optional internal auth secret for Vercel hybrid streaming path; falls back to `DS2API_ADMIN_KEY` when unset) Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`). When inflight slots are full, requests are queued first; with default queue size, 429 typically starts around `account_count * 4`. @@ -139,14 +143,19 @@ If you see: No Output Directory named "public" found after the Build completed. ``` -Vercel is validating frontend output against `public`, while this repo emits WebUI assets to `static/admin`. +Vercel is validating frontend output against `public`. This repo builds WebUI into `static/admin`, and uses the parent directory `static` as Vercel output root. `vercel.json` now explicitly sets: ```json -"outputDirectory": "static/admin" +"outputDirectory": "static" ``` -If you manually changed Output Directory in Project Settings, set it to `static/admin` (or clear it and let repo config apply). +If you manually changed Output Directory in Project Settings, set it to `static` (or clear it and let repo config apply). + +Vercel streaming note (important): + +- Vercel Go Runtime applies platform-level buffering, so this repo uses a hybrid path on Vercel (`Go prepare + Node stream`) to restore real-time SSE behavior. +- This adaptation is Vercel-only; local and Docker remain pure Go. ## 4. Reverse Proxy (Nginx) diff --git a/DEPLOY.md b/DEPLOY.md index 78f92ea..e443cb5 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -67,6 +67,9 @@ docker-compose up -d --build - 路由与缓存头:`vercel.json` - 构建阶段会自动执行 `npm ci --prefix webui && npm run build --prefix webui` - `vercel.json` 已将 `/admin/assets/*` 与 `/admin` 页面走静态产物,`/admin/*` API 仍走 `api/index` +- 为缓解 Go Runtime 的流式缓冲,`/v1/chat/completions` 在 Vercel 上会优先走 `api/chat-stream.js`(Node Runtime) +- `api/chat-stream.js` 对非流式请求或 `tools` 请求会自动回退到 Go 入口(内部 `__go=1`) +- `api/chat-stream.js` 仅负责流式数据转发与 SSE 转换;鉴权、账号选择、会话创建、PoW 计算仍由 Go 内部 prepare 接口完成(仅 Vercel 启用) 至少配置环境变量: @@ -82,6 +85,7 @@ docker-compose up -d --build - `DS2API_ACCOUNT_CONCURRENCY`(同上别名) - `DS2API_ACCOUNT_MAX_QUEUE`(等待队列上限,默认=`recommended_concurrency`) - `DS2API_ACCOUNT_QUEUE_SIZE`(同上别名) +- `DS2API_VERCEL_INTERNAL_SECRET`(可选,Vercel 混合流式链路内部鉴权;未设置时回退使用 `DS2API_ADMIN_KEY`) 并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。 当 in-flight 满时,请求先进入等待队列;默认队列上限等于建议并发值,因此默认 429 阈值约为 `账号数量 × 4`。 @@ -138,13 +142,18 @@ Error: Command failed: go build -ldflags -s -w -o .../bootstrap .../main__vc__go No Output Directory named "public" found after the Build completed. ``` -说明 Vercel 正在按 `public` 校验前端产物目录。当前仓库前端产物目录是 `static/admin`,已在 `vercel.json` 显式配置: +说明 Vercel 正在按 `public` 校验前端产物目录。当前仓库会将 WebUI 构建到 `static/admin`,并在 `vercel.json` 使用上级目录 `static` 作为输出根目录: ```json -"outputDirectory": "static/admin" +"outputDirectory": "static" ``` -若你在项目设置里手动改过 Output Directory,请同步改为 `static/admin` 或清空让仓库配置生效。 +若你在项目设置里手动改过 Output Directory,请同步改为 `static` 或清空让仓库配置生效。 + +Vercel 流式说明(重要): + +- Vercel 的 Go Runtime 存在平台层响应缓冲,因此本项目在 Vercel 上采用“Go prepare + Node stream”的混合链路来恢复实时 SSE。 +- 该适配只在 Vercel 生效;本地与 Docker 仍走纯 Go 链路。 ## 4. 反向代理(Nginx) diff --git a/README.MD b/README.MD index 7d3190b..1b5fa2c 100644 --- a/README.MD +++ b/README.MD @@ -87,6 +87,8 @@ docker-compose logs -f - 入口:`api/index.go` - 路由重写:`vercel.json` - `vercel.json` 会在构建阶段自动执行 `npm ci --prefix webui && npm run build --prefix webui` +- `/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`(Node Runtime)以保证实时 SSE +- `api/chat-stream.js` 仅负责流式数据转发;鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口处理 - 至少配置: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON`(JSON 字符串或 Base64) @@ -166,6 +168,7 @@ cp config.example.json config.json | `DS2API_WASM_PATH` | PoW wasm 文件路径 | | `DS2API_STATIC_ADMIN_DIR` | 管理台静态文件目录 | | `DS2API_AUTO_BUILD_WEBUI` | 启动时缺失 WebUI 时是否自动执行 npm build(默认:本地开启,Vercel 关闭) | +| `DS2API_VERCEL_INTERNAL_SECRET` | Vercel 混合流式链路内部鉴权密钥(可选;未设置时回退用 `DS2API_ADMIN_KEY`) | | `VERCEL_TOKEN` | Vercel 同步 token(可选) | | `VERCEL_PROJECT_ID` | Vercel 项目 ID(可选) | | `VERCEL_TEAM_ID` | Vercel 团队 ID(可选) | diff --git a/README.en.md b/README.en.md index 143b978..31ac889 100644 --- a/README.en.md +++ b/README.en.md @@ -87,6 +87,8 @@ docker-compose logs -f - Entrypoint: `api/index.go` - Rewrites: `vercel.json` - `vercel.json` runs `npm ci --prefix webui && npm run build --prefix webui` during build +- `/v1/chat/completions` is routed to `api/chat-stream.js` (Node Runtime) on Vercel to preserve real-time SSE +- `api/chat-stream.js` is data-path only; auth/account/session/PoW preparation still comes from an internal Go prepare endpoint - Minimum env vars: - `DS2API_ADMIN_KEY` - `DS2API_CONFIG_JSON` (raw JSON or Base64) @@ -166,6 +168,7 @@ cp config.example.json config.json | `DS2API_WASM_PATH` | PoW wasm path | | `DS2API_STATIC_ADMIN_DIR` | Admin static assets dir | | `DS2API_AUTO_BUILD_WEBUI` | Auto run npm build on startup when WebUI assets are missing (default: enabled locally, disabled on Vercel) | +| `DS2API_VERCEL_INTERNAL_SECRET` | Internal auth secret for Vercel hybrid streaming path (optional; falls back to `DS2API_ADMIN_KEY` if unset) | | `VERCEL_TOKEN` | Vercel sync token (optional) | | `VERCEL_PROJECT_ID` | Vercel project ID (optional) | | `VERCEL_TEAM_ID` | Vercel team ID (optional) | diff --git a/api/chat-stream.js b/api/chat-stream.js new file mode 100644 index 0000000..4cf05da --- /dev/null +++ b/api/chat-stream.js @@ -0,0 +1,543 @@ +'use strict'; + +const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion'; + +const BASE_HEADERS = { + Host: 'chat.deepseek.com', + 'User-Agent': 'DeepSeek/1.6.11 Android/35', + Accept: 'application/json', + 'Content-Type': 'application/json', + 'x-client-platform': 'android', + 'x-client-version': '1.6.11', + 'x-client-locale': 'zh_CN', + 'accept-charset': 'UTF-8', +}; + +const SKIP_PATTERNS = [ + 'quasi_status', + 'elapsed_secs', + 'token_usage', + 'pending_fragment', + 'conversation_mode', + 'fragments/-1/status', + 'fragments/-2/status', + 'fragments/-3/status', +]; + +module.exports = async function handler(req, res) { + setCorsHeaders(res); + if (req.method === 'OPTIONS') { + res.statusCode = 204; + res.end(); + return; + } + if (req.method !== 'POST') { + writeOpenAIError(res, 405, 'method not allowed'); + return; + } + + const rawBody = await readRawBody(req); + let payload; + try { + payload = JSON.parse(rawBody.toString('utf8') || '{}'); + } catch (_err) { + writeOpenAIError(res, 400, 'invalid json'); + return; + } + + // Keep all non-stream behavior on Go side to avoid compatibility regressions. + if (!toBool(payload.stream) || (Array.isArray(payload.tools) && payload.tools.length > 0)) { + await proxyToGo(req, res, rawBody); + return; + } + + const prep = await fetchStreamPrepare(req, rawBody); + if (!prep.ok) { + relayPreparedFailure(res, prep); + return; + } + + const model = asString(prep.body.model) || asString(payload.model); + const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`; + const deepseekToken = asString(prep.body.deepseek_token); + const powHeader = asString(prep.body.pow_header); + const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null; + const finalPrompt = asString(prep.body.final_prompt); + const thinkingEnabled = toBool(prep.body.thinking_enabled); + const searchEnabled = toBool(prep.body.search_enabled); + + if (!model || !deepseekToken || !powHeader || !completionPayload) { + writeOpenAIError(res, 500, 'invalid vercel prepare response'); + return; + } + + const completionRes = await fetch(DEEPSEEK_COMPLETION_URL, { + method: 'POST', + headers: { + ...BASE_HEADERS, + authorization: `Bearer ${deepseekToken}`, + 'x-ds-pow-response': powHeader, + }, + body: JSON.stringify(completionPayload), + }); + + if (!completionRes.ok || !completionRes.body) { + const detail = await safeReadText(completionRes); + writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.'); + return; + } + + res.statusCode = 200; + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache, no-transform'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + if (typeof res.flushHeaders === 'function') { + res.flushHeaders(); + } + + const created = Math.floor(Date.now() / 1000); + let firstChunkSent = false; + let currentType = thinkingEnabled ? 'thinking' : 'text'; + let thinkingText = ''; + let outputText = ''; + const decoder = new TextDecoder(); + const reader = completionRes.body.getReader(); + let buffered = ''; + + const sendFrame = (obj) => { + res.write(`data: ${JSON.stringify(obj)}\n\n`); + if (typeof res.flush === 'function') { + res.flush(); + } + }; + + const finish = (reason) => { + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta: {}, index: 0, finish_reason: reason }], + usage: buildUsage(finalPrompt, thinkingText, outputText), + }); + res.write('data: [DONE]\n\n'); + res.end(); + }; + + try { + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + buffered += decoder.decode(value, { stream: true }); + const lines = buffered.split('\n'); + buffered = lines.pop() || ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line.startsWith('data:')) { + continue; + } + const dataStr = line.slice(5).trim(); + if (!dataStr) { + continue; + } + if (dataStr === '[DONE]') { + finish('stop'); + return; + } + let chunk; + try { + chunk = JSON.parse(dataStr); + } catch (_err) { + continue; + } + if (chunk.error || chunk.code === 'content_filter') { + finish('content_filter'); + return; + } + const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType); + currentType = parsed.newType; + if (parsed.finished) { + finish('stop'); + return; + } + + for (const p of parsed.parts) { + if (!p.text) { + continue; + } + if (searchEnabled && isCitation(p.text)) { + continue; + } + const delta = {}; + if (!firstChunkSent) { + delta.role = 'assistant'; + firstChunkSent = true; + } + if (p.type === 'thinking') { + thinkingText += p.text; + delta.reasoning_content = p.text; + } else { + outputText += p.text; + delta.content = p.text; + } + sendFrame({ + id: sessionID, + object: 'chat.completion.chunk', + created, + model, + choices: [{ delta, index: 0 }], + }); + } + } + } + finish('stop'); + } catch (_err) { + finish('stop'); + } +}; + +function setCorsHeaders(res) { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Credentials', 'true'); + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS, PUT, DELETE'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-API-Key, X-Ds2-Target-Account'); +} + +function header(req, key) { + if (!req || !req.headers) { + return ''; + } + return asString(req.headers[key.toLowerCase()]); +} + +async function readRawBody(req) { + if (Buffer.isBuffer(req.body)) { + return req.body; + } + if (typeof req.body === 'string') { + return Buffer.from(req.body); + } + if (req.body && typeof req.body === 'object') { + return Buffer.from(JSON.stringify(req.body)); + } + const chunks = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks); +} + +async function fetchStreamPrepare(req, rawBody) { + const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; + const host = asString(header(req, 'host')); + const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); + url.searchParams.set('__go', '1'); + url.searchParams.set('__stream_prepare', '1'); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: { + authorization: asString(header(req, 'authorization')), + 'x-api-key': asString(header(req, 'x-api-key')), + 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), + 'x-ds2-internal-token': internalSecret(), + 'content-type': asString(header(req, 'content-type')) || 'application/json', + }, + body: rawBody, + }); + + const text = await upstream.text(); + let body = {}; + try { + body = JSON.parse(text || '{}'); + } catch (_err) { + body = {}; + } + + return { + ok: upstream.ok, + status: upstream.status, + contentType: upstream.headers.get('content-type') || 'application/json', + text, + body, + }; +} + +function relayPreparedFailure(res, prep) { + res.statusCode = prep.status || 500; + res.setHeader('Content-Type', prep.contentType || 'application/json'); + if (prep.text) { + res.end(prep.text); + return; + } + writeOpenAIError(res, prep.status || 500, 'vercel prepare failed'); +} + +async function safeReadText(resp) { + if (!resp) { + return ''; + } + try { + const text = await resp.text(); + return text.trim(); + } catch (_err) { + return ''; + } +} + +function internalSecret() { + return asString(process.env.DS2API_VERCEL_INTERNAL_SECRET) || asString(process.env.DS2API_ADMIN_KEY) || 'admin'; +} + +function parseChunkForContent(chunk, thinkingEnabled, currentType) { + if (!chunk || typeof chunk !== 'object' || !Object.prototype.hasOwnProperty.call(chunk, 'v')) { + return { parts: [], finished: false, newType: currentType }; + } + const pathValue = asString(chunk.p); + if (shouldSkipPath(pathValue)) { + return { parts: [], finished: false, newType: currentType }; + } + if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { + return { parts: [], finished: true, newType: currentType }; + } + + let newType = currentType; + const parts = []; + + if (pathValue === 'response/fragments' && asString(chunk.o).toUpperCase() === 'APPEND' && Array.isArray(chunk.v)) { + for (const frag of chunk.v) { + if (!frag || typeof frag !== 'object') { + continue; + } + const fragType = asString(frag.type).toUpperCase(); + const content = asString(frag.content); + if (!content) { + continue; + } + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (fragType === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: 'text' }); + } + } + } + + if (pathValue === 'response' && Array.isArray(chunk.v)) { + for (const item of chunk.v) { + if (!item || typeof item !== 'object') { + continue; + } + if (item.p === 'fragments' && item.o === 'APPEND' && Array.isArray(item.v)) { + for (const frag of item.v) { + const fragType = asString(frag && frag.type).toUpperCase(); + if (fragType === 'THINK' || fragType === 'THINKING') { + newType = 'thinking'; + } else if (fragType === 'RESPONSE') { + newType = 'text'; + } + } + } + } + } + + let partType = 'text'; + if (pathValue === 'response/thinking_content') { + partType = 'thinking'; + } else if (pathValue === 'response/content') { + partType = 'text'; + } else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) { + partType = newType; + } else if (!pathValue && thinkingEnabled) { + partType = newType; + } + + const val = chunk.v; + if (typeof val === 'string') { + if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) { + return { parts: [], finished: true, newType }; + } + if (val) { + parts.push({ text: val, type: partType }); + } + return { parts, finished: false, newType }; + } + + if (Array.isArray(val)) { + for (const entry of val) { + if (typeof entry === 'string') { + if (entry) { + parts.push({ text: entry, type: partType }); + } + continue; + } + if (!entry || typeof entry !== 'object') { + continue; + } + if (asString(entry.p) === 'status' && asString(entry.v) === 'FINISHED') { + return { parts: [], finished: true, newType }; + } + const content = asString(entry.content); + if (!content) { + continue; + } + const t = asString(entry.type).toUpperCase(); + if (t === 'THINK' || t === 'THINKING') { + parts.push({ text: content, type: 'thinking' }); + } else if (t === 'RESPONSE') { + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: partType }); + } + } + return { parts, finished: false, newType }; + } + + if (val && typeof val === 'object') { + const resp = val.response && typeof val.response === 'object' ? val.response : val; + if (Array.isArray(resp.fragments)) { + for (const frag of resp.fragments) { + if (!frag || typeof frag !== 'object') { + continue; + } + const content = asString(frag.content); + if (!content) { + continue; + } + const t = asString(frag.type).toUpperCase(); + if (t === 'THINK' || t === 'THINKING') { + newType = 'thinking'; + parts.push({ text: content, type: 'thinking' }); + } else if (t === 'RESPONSE') { + newType = 'text'; + parts.push({ text: content, type: 'text' }); + } else { + parts.push({ text: content, type: partType }); + } + } + } + } + return { parts, finished: false, newType }; +} + +function shouldSkipPath(pathValue) { + if (pathValue === 'response/search_status') { + return true; + } + for (const p of SKIP_PATTERNS) { + if (pathValue.includes(p)) { + return true; + } + } + return false; +} + +function isCitation(text) { + return asString(text).trim().startsWith('[citation:'); +} + +function buildUsage(prompt, thinking, output) { + const promptTokens = estimateTokens(prompt); + const reasoningTokens = estimateTokens(thinking); + const completionTokens = estimateTokens(output); + return { + prompt_tokens: promptTokens, + completion_tokens: reasoningTokens + completionTokens, + total_tokens: promptTokens + reasoningTokens + completionTokens, + completion_tokens_details: { + reasoning_tokens: reasoningTokens, + }, + }; +} + +function estimateTokens(text) { + const t = asString(text); + if (!t) { + return 0; + } + const n = Math.floor(Array.from(t).length / 4); + return n < 1 ? 1 : n; +} + +async function proxyToGo(req, res, rawBody) { + const proto = asString(header(req, 'x-forwarded-proto')) || 'https'; + const host = asString(header(req, 'host')); + const url = new URL(`${proto}://${host}${req.url || '/v1/chat/completions'}`); + url.searchParams.set('__go', '1'); + + const upstream = await fetch(url.toString(), { + method: 'POST', + headers: { + authorization: asString(header(req, 'authorization')), + 'x-api-key': asString(header(req, 'x-api-key')), + 'x-ds2-target-account': asString(header(req, 'x-ds2-target-account')), + 'content-type': asString(header(req, 'content-type')) || 'application/json', + }, + body: rawBody, + }); + + res.statusCode = upstream.status; + upstream.headers.forEach((value, key) => { + if (key.toLowerCase() === 'content-length') { + return; + } + res.setHeader(key, value); + }); + const bytes = Buffer.from(await upstream.arrayBuffer()); + res.end(bytes); +} + +function writeOpenAIError(res, status, message) { + res.statusCode = status; + res.setHeader('Content-Type', 'application/json'); + res.end( + JSON.stringify({ + error: { + message, + type: openAIErrorType(status), + }, + }), + ); +} + +function openAIErrorType(status) { + switch (status) { + case 400: + return 'invalid_request_error'; + case 401: + return 'authentication_error'; + case 403: + return 'permission_error'; + case 429: + return 'rate_limit_error'; + case 503: + return 'service_unavailable_error'; + default: + return status >= 500 ? 'api_error' : 'invalid_request_error'; + } +} + +function toBool(v) { + return v === true; +} + +function asString(v) { + if (typeof v === 'string') { + return v.trim(); + } + if (Array.isArray(v)) { + return asString(v[0]); + } + if (v == null) { + return ''; + } + return String(v).trim(); +} diff --git a/internal/adapter/claude/handler.go b/internal/adapter/claude/handler.go index 83a82ac..8abd829 100644 --- a/internal/adapter/claude/handler.go +++ b/internal/adapter/claude/handler.go @@ -230,19 +230,21 @@ func collectDeepSeek(resp *http.Response, thinkingEnabled bool) (string, string) func (h *Handler) writeClaudeStream(w http.ResponseWriter, r *http.Request, model string, messages []any, fullText string, detected []util.ParsedToolCall) { w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") - flusher, hasFlusher := w.(http.Flusher) - if !hasFlusher { - config.Logger.Warn("[claude_stream] response writer does not support flush; falling back to buffered SSE") + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + canFlush := rc.Flush() == nil + if !canFlush { + config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered") } send := func(v any) { b, _ := json.Marshal(v) _, _ = w.Write([]byte("data: ")) _, _ = w.Write(b) _, _ = w.Write([]byte("\n\n")) - if hasFlusher { - flusher.Flush() + if canFlush { + _ = rc.Flush() } } messageID := fmt.Sprintf("msg_%d", time.Now().UnixNano()) diff --git a/internal/adapter/openai/handler.go b/internal/adapter/openai/handler.go index 23d17d5..afac493 100644 --- a/internal/adapter/openai/handler.go +++ b/internal/adapter/openai/handler.go @@ -3,10 +3,12 @@ package openai import ( "bufio" "context" + "crypto/subtle" "encoding/json" "fmt" "io" "net/http" + "os" "strings" "time" @@ -35,6 +37,11 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) { } func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { + if isVercelStreamPrepareRequest(r) { + h.handleVercelStreamPrepare(w, r) + return + } + a, err := h.Auth.Determine(r) if err != nil { status := http.StatusUnauthorized @@ -106,6 +113,98 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { h.handleNonStream(w, r.Context(), resp, sessionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames) } +func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) { + if !config.IsVercel() { + http.NotFound(w, r) + return + } + internalSecret := vercelInternalSecret() + internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) + if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { + writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") + return + } + + a, err := h.Auth.Determine(r) + if err != nil { + status := http.StatusUnauthorized + if err == auth.ErrNoAccount { + status = http.StatusTooManyRequests + } + writeOpenAIError(w, status, err.Error()) + return + } + defer h.Auth.Release(a) + r = r.WithContext(auth.WithAuth(r.Context(), a)) + + var req map[string]any + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeOpenAIError(w, http.StatusBadRequest, "invalid json") + return + } + if !toBool(req["stream"]) { + writeOpenAIError(w, http.StatusBadRequest, "stream must be true") + return + } + if tools, ok := req["tools"].([]any); ok && len(tools) > 0 { + writeOpenAIError(w, http.StatusBadRequest, "tools are not supported by vercel stream prepare") + return + } + + model, _ := req["model"].(string) + messagesRaw, _ := req["messages"].([]any) + if model == "" || len(messagesRaw) == 0 { + writeOpenAIError(w, http.StatusBadRequest, "Request must include 'model' and 'messages'.") + return + } + thinkingEnabled, searchEnabled, ok := config.GetModelConfig(model) + if !ok { + writeOpenAIError(w, http.StatusServiceUnavailable, fmt.Sprintf("Model '%s' is not available.", model)) + return + } + + messages := normalizeMessages(messagesRaw) + finalPrompt := util.MessagesPrepare(messages) + + sessionID, err := h.DS.CreateSession(r.Context(), a, 3) + if err != nil { + if a.UseConfigToken { + writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") + } else { + writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") + } + return + } + powHeader, err := h.DS.GetPow(r.Context(), a, 3) + if err != nil { + writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).") + return + } + if strings.TrimSpace(a.DeepSeekToken) == "" { + writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") + return + } + + payload := map[string]any{ + "chat_session_id": sessionID, + "parent_message_id": nil, + "prompt": finalPrompt, + "ref_file_ids": []any{}, + "thinking_enabled": thinkingEnabled, + "search_enabled": searchEnabled, + } + writeJSON(w, http.StatusOK, map[string]any{ + "session_id": sessionID, + "model": model, + "final_prompt": finalPrompt, + "thinking_enabled": thinkingEnabled, + "search_enabled": searchEnabled, + "deepseek_token": a.DeepSeekToken, + "pow_header": powHeader, + "payload": payload, + }) +} + func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { @@ -191,11 +290,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt return } w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") - flusher, hasFlusher := w.(http.Flusher) - if !hasFlusher { - config.Logger.Warn("[stream] response writer does not support flush; falling back to buffered SSE") + w.Header().Set("X-Accel-Buffering", "no") + rc := http.NewResponseController(w) + canFlush := rc.Flush() == nil + if !canFlush { + config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered") } lines := make(chan []byte, 128) @@ -232,14 +333,14 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt _, _ = w.Write([]byte("data: ")) _, _ = w.Write(b) _, _ = w.Write([]byte("\n\n")) - if hasFlusher { - flusher.Flush() + if canFlush { + _ = rc.Flush() } } sendDone := func() { _, _ = w.Write([]byte("data: [DONE]\n\n")) - if hasFlusher { - flusher.Flush() + if canFlush { + _ = rc.Flush() } } @@ -316,9 +417,9 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt finalize("stop") return } - if hasFlusher { + if canFlush { _, _ = w.Write([]byte(": keep-alive\n\n")) - flusher.Flush() + _ = rc.Flush() } case line, ok := <-lines: if !ok { @@ -483,3 +584,20 @@ func openAIErrorType(status int) string { return "invalid_request_error" } } + +func isVercelStreamPrepareRequest(r *http.Request) bool { + if r == nil { + return false + } + return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" +} + +func vercelInternalSecret() string { + if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { + return v + } + if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" { + return v + } + return "admin" +} diff --git a/internal/adapter/openai/vercel_prepare_test.go b/internal/adapter/openai/vercel_prepare_test.go new file mode 100644 index 0000000..01b209c --- /dev/null +++ b/internal/adapter/openai/vercel_prepare_test.go @@ -0,0 +1,44 @@ +package openai + +import ( + "net/http/httptest" + "testing" +) + +func TestIsVercelStreamPrepareRequest(t *testing.T) { + req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_prepare=1", nil) + if !isVercelStreamPrepareRequest(req) { + t.Fatalf("expected prepare request to be detected") + } + + req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil) + if isVercelStreamPrepareRequest(req2) { + t.Fatalf("expected non-prepare request") + } +} + +func TestVercelInternalSecret(t *testing.T) { + t.Run("prefer explicit secret", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") + t.Setenv("DS2API_ADMIN_KEY", "admin-fallback") + if got := vercelInternalSecret(); got != "stream-secret" { + t.Fatalf("expected explicit secret, got %q", got) + } + }) + + t.Run("fallback to admin key", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "") + t.Setenv("DS2API_ADMIN_KEY", "admin-fallback") + if got := vercelInternalSecret(); got != "admin-fallback" { + t.Fatalf("expected admin key fallback, got %q", got) + } + }) + + t.Run("default admin when env missing", func(t *testing.T) { + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "") + t.Setenv("DS2API_ADMIN_KEY", "") + if got := vercelInternalSecret(); got != "admin" { + t.Fatalf("expected default admin fallback, got %q", got) + } + }) +} diff --git a/vercel.json b/vercel.json index db2bc6a..0645290 100644 --- a/vercel.json +++ b/vercel.json @@ -1,8 +1,27 @@ { "version": 2, "buildCommand": "npm ci --prefix webui && npm run build --prefix webui", - "outputDirectory": "static/admin", + "outputDirectory": "static", + "functions": { + "api/chat-stream.js": { + "includeFiles": "**/sha3_wasm_bg.7b9ca65ddd.wasm" + } + }, "rewrites": [ + { + "source": "/v1/chat/completions", + "has": [ + { + "type": "query", + "key": "__go" + } + ], + "destination": "/api/index" + }, + { + "source": "/v1/chat/completions", + "destination": "/api/chat-stream" + }, { "source": "/admin/login", "destination": "/api/index" @@ -44,16 +63,16 @@ "destination": "/api/index" }, { - "source": "/admin/assets/(.*)", - "destination": "/assets/$1" + "source": "/admin", + "destination": "/admin/index.html" }, { - "source": "/admin", - "destination": "/index.html" + "source": "/admin/assets/(.*)", + "destination": "/admin/assets/$1" }, { "source": "/admin/(.*)", - "destination": "/index.html" + "destination": "/admin/index.html" }, { "source": "/(.*)",