From 6697d0d227105236b68cdd4395227be20b8cd98f Mon Sep 17 00:00:00 2001 From: CJACK Date: Tue, 17 Feb 2026 13:18:52 +0800 Subject: [PATCH] feat: enhance tool call streaming and anti-leakage by suppressing invalid or incomplete tool JSON and refining detection in Node.js. --- DEPLOY.en.md | 7 +- DEPLOY.md | 7 +- README.MD | 2 +- README.en.md | 2 +- api/chat-stream.js | 20 ++--- api/helpers/stream-tool-sieve.js | 55 +++++++++++--- .../adapter/openai/handler_toolcall_test.go | 74 +++++++++++++++++++ internal/adapter/openai/tool_sieve.go | 9 +-- 8 files changed, 143 insertions(+), 33 deletions(-) diff --git a/DEPLOY.en.md b/DEPLOY.en.md index e69754a..ffdbf50 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -211,14 +211,15 @@ Vercel Go Runtime applies platform-level response buffering, so this project use 1. `api/chat-stream.js` receives `/v1/chat/completions` request 2. Node calls Go internal prepare endpoint (`?__stream_prepare=1`) for session ID, PoW, token 3. Go prepare creates a stream lease, locking the account -4. Node connects directly to DeepSeek upstream, relays SSE in real-time to client +4. Node connects directly to DeepSeek upstream, relays SSE in real-time to client (including OpenAI chunk framing and tools anti-leak sieve) 5. After stream ends, Node calls Go release endpoint (`?__stream_release=1`) to free the account > This adaptation is **Vercel-only**; local and Docker remain pure Go. -#### Non-Stream and Tool Call Fallback +#### Non-Stream Fallback and Tool Call Handling -- `api/chat-stream.js` automatically falls back to Go entry (`?__go=1`) for non-stream requests or requests with `tools` +- `api/chat-stream.js` falls back to Go entry (`?__go=1`) for non-stream requests only +- Streaming requests (including requests with `tools`) stay on the Node path and use Go-aligned tool-call anti-leak handling - WebUI non-stream test calls `?__go=1` directly to avoid Node hop timeout on long requests #### Function Duration diff --git a/DEPLOY.md b/DEPLOY.md index cf008cd..e3c6f80 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -211,14 +211,15 @@ api/index.go api/chat-stream.js 1. `api/chat-stream.js` 收到 `/v1/chat/completions` 请求 2. Node 调用 Go 内部 prepare 接口(`?__stream_prepare=1`),获取会话 ID、PoW、token 等 3. Go prepare 创建 stream lease,锁定账号 -4. Node 直连 DeepSeek 上游,实时流式转发 SSE 给客户端 +4. Node 直连 DeepSeek 上游,实时流式转发 SSE 给客户端(含 OpenAI chunk 封装与 tools 防泄漏筛分) 5. 流结束后 Node 调用 Go release 接口(`?__stream_release=1`),释放账号 > 该适配**仅在 Vercel 环境生效**;本地与 Docker 仍走纯 Go 链路。 -#### 非流式与 Tool Call 回退 +#### 非流式回退与 Tool Call 处理 -- `api/chat-stream.js` 对非流式请求或带 `tools` 的请求会自动回退到 Go 入口(`?__go=1`) +- `api/chat-stream.js` 仅对非流式请求回退到 Go 入口(`?__go=1`) +- 流式请求(包括带 `tools`)走 Node 路径,并执行与 Go 对齐的 tool-call 防泄漏处理 - WebUI 的"非流式测试"直接请求 `?__go=1`,避免 Node 中转造成长请求超时 #### 函数时长 diff --git a/README.MD b/README.MD index f94ee54..ebcf943 100644 --- a/README.MD +++ b/README.MD @@ -131,7 +131,7 @@ docker-compose logs -f 3. 配置环境变量(至少设置 `DS2API_ADMIN_KEY` 和 `DS2API_CONFIG_JSON`) 4. 部署 -> **流式说明**:`/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`(Node Runtime)以保证实时 SSE。鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口完成,Node 端仅转发流数据。 +> **流式说明**:`/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`(Node Runtime)以保证实时 SSE。鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口完成;流式响应(含 `tools`)在 Node 侧执行与 Go 对齐的输出组装与防泄漏处理。 详细部署说明请参阅 [部署指南](DEPLOY.md)。 diff --git a/README.en.md b/README.en.md index 304b117..b2c9373 100644 --- a/README.en.md +++ b/README.en.md @@ -131,7 +131,7 @@ Rebuild after updates: `docker-compose up -d --build` 3. Set environment variables (minimum: `DS2API_ADMIN_KEY` and `DS2API_CONFIG_JSON`) 4. Deploy -> **Streaming note**: `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) for real-time SSE. Auth, account selection, session/PoW preparation are still handled by the Go internal prepare endpoint; Node only relays stream data. +> **Streaming note**: `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) for real-time SSE. Auth, account selection, and session/PoW preparation are still handled by the Go internal prepare endpoint; streaming output (including `tools`) is assembled on Node with Go-aligned anti-leak handling. For detailed deployment instructions, see the [Deployment Guide](DEPLOY.en.md). diff --git a/api/chat-stream.js b/api/chat-stream.js index 852f58b..566fa57 100644 --- a/api/chat-stream.js +++ b/api/chat-stream.js @@ -5,6 +5,7 @@ const { createToolSieveState, processToolSieveChunk, flushToolSieve, + parseToolCalls, formatOpenAIStreamToolCalls, } = require('./helpers/stream-tool-sieve'); @@ -155,20 +156,19 @@ module.exports = async function handler(req, res) { return; } ended = true; - if (toolSieveEnabled) { + const detected = parseToolCalls(outputText, toolNames); + if (detected.length > 0 && !toolCallsEmitted) { + toolCallsEmitted = true; + sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected) }); + } else if (toolSieveEnabled) { const tailEvents = flushToolSieve(toolSieveState, toolNames); for (const evt of tailEvents) { - if (evt.type === 'tool_calls') { - toolCallsEmitted = true; - sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls) }); - continue; - } if (evt.text) { sendDeltaFrame({ content: evt.text }); } } } - if (toolCallsEmitted) { + if (detected.length > 0 || toolCallsEmitted) { reason = 'tool_calls'; } sendFrame({ @@ -233,8 +233,10 @@ module.exports = async function handler(req, res) { continue; } if (p.type === 'thinking') { - thinkingText += p.text; - sendDeltaFrame({ reasoning_content: p.text }); + if (thinkingEnabled) { + thinkingText += p.text; + sendDeltaFrame({ reasoning_content: p.text }); + } } else { outputText += p.text; if (!toolSieveEnabled) { diff --git a/api/helpers/stream-tool-sieve.js b/api/helpers/stream-tool-sieve.js index 0643ce5..83bb265 100644 --- a/api/helpers/stream-tool-sieve.js +++ b/api/helpers/stream-tool-sieve.js @@ -1,6 +1,7 @@ 'use strict'; const crypto = require('crypto'); +const TOOL_CALL_PATTERN = /\{\s*["']tool_calls["']\s*:\s*\[(.*?)\]\s*\}/s; function extractToolNames(tools) { if (!Array.isArray(tools) || tools.length === 0) { @@ -105,7 +106,7 @@ function flushToolSieve(state, toolNames) { events.push({ type: 'text', text: consumed.suffix }); } } else if (state.capture) { - events.push({ type: 'text', text: state.capture }); + // Incomplete captured tool JSON at stream end: suppress raw capture. } state.capture = ''; state.capturing = false; @@ -177,9 +178,11 @@ function consumeToolCapture(captured, toolNames) { } const parsed = parseToolCalls(captured.slice(start, obj.end), toolNames); if (parsed.length === 0) { + // `tool_calls` key exists but strict JSON parse failed. + // Drop the captured object body to avoid leaking raw tool JSON. return { ready: true, - prefix: captured.slice(0, obj.end), + prefix: captured.slice(0, start), calls: [], suffix: captured.slice(obj.end), }; @@ -280,24 +283,53 @@ function buildToolCallCandidates(text) { candidates.push(toStringSafe(m[1])); } } - const keyIdx = trimmed.toLowerCase().indexOf('tool_calls'); - if (keyIdx >= 0) { - const start = trimmed.slice(0, keyIdx).lastIndexOf('{'); - if (start >= 0) { - const obj = extractJSONObjectFrom(trimmed, start); - if (obj.ok) { - candidates.push(toStringSafe(trimmed.slice(start, obj.end))); - } - } + for (const candidate of extractToolCallObjects(trimmed)) { + candidates.push(toStringSafe(candidate)); } const first = trimmed.indexOf('{'); const last = trimmed.lastIndexOf('}'); if (first >= 0 && last > first) { candidates.push(toStringSafe(trimmed.slice(first, last + 1))); } + const m = trimmed.match(TOOL_CALL_PATTERN); + if (m && m[1]) { + candidates.push(`{"tool_calls":[${m[1]}]}`); + } return [...new Set(candidates.filter(Boolean))]; } +function extractToolCallObjects(text) { + const raw = toStringSafe(text); + if (!raw) { + return []; + } + const lower = raw.toLowerCase(); + const out = []; + let offset = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + let idx = lower.indexOf('tool_calls', offset); + if (idx < 0) { + break; + } + let start = raw.slice(0, idx).lastIndexOf('{'); + while (start >= 0) { + const obj = extractJSONObjectFrom(raw, start); + if (obj.ok) { + out.push(raw.slice(start, obj.end).trim()); + offset = obj.end; + idx = -1; + break; + } + start = raw.slice(0, start).lastIndexOf('{'); + } + if (idx >= 0) { + offset = idx + 'tool_calls'.length; + } + } + return out; +} + function parseToolCallsPayload(payload) { let decoded; try { @@ -440,5 +472,6 @@ module.exports = { createToolSieveState, processToolSieveChunk, flushToolSieve, + parseToolCalls, formatOpenAIStreamToolCalls, }; diff --git a/internal/adapter/openai/handler_toolcall_test.go b/internal/adapter/openai/handler_toolcall_test.go index 9089f69..f9c44dd 100644 --- a/internal/adapter/openai/handler_toolcall_test.go +++ b/internal/adapter/openai/handler_toolcall_test.go @@ -463,3 +463,77 @@ func TestHandleStreamToolCallKeyAppearsLateStillNoPrefixLeak(t *testing.T) { t.Fatalf("expected finish_reason=tool_calls, body=%s", rec.Body.String()) } } + +func TestHandleStreamInvalidToolJSONDoesNotLeakRawObject(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/content","v":"前置正文D。"}`, + `data: {"p":"response/content","v":"{'tool_calls':[{'name':'search','input':{'q':'go'}}]}"}`, + `data: {"p":"response/content","v":"后置正文E。"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid9", "deepseek-chat", "prompt", false, false, []string{"search"}) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + if streamHasToolCallsDelta(frames) { + t.Fatalf("did not expect tool_calls delta for invalid json, body=%s", rec.Body.String()) + } + content := strings.Builder{} + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + content.WriteString(c) + } + } + } + got := strings.ToLower(content.String()) + if strings.Contains(got, "tool_calls") { + t.Fatalf("unexpected raw tool_calls leak in content: %q", content.String()) + } + if !strings.Contains(content.String(), "前置正文D。") || !strings.Contains(content.String(), "后置正文E。") { + t.Fatalf("expected pre/post plain text to remain, got=%q", content.String()) + } +} + +func TestHandleStreamIncompleteCapturedToolJSONDoesNotLeakOnFinalize(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/content","v":"{\"tool_calls\":[{\"name\":\"search\""}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid10", "deepseek-chat", "prompt", false, false, []string{"search"}) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + if streamHasToolCallsDelta(frames) { + t.Fatalf("did not expect tool_calls delta for incomplete json, body=%s", rec.Body.String()) + } + content := strings.Builder{} + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + content.WriteString(c) + } + } + } + if strings.Contains(strings.ToLower(content.String()), "tool_calls") || strings.Contains(content.String(), "{") { + t.Fatalf("unexpected incomplete tool json leak in content: %q", content.String()) + } +} diff --git a/internal/adapter/openai/tool_sieve.go b/internal/adapter/openai/tool_sieve.go index 6790840..d1a9014 100644 --- a/internal/adapter/openai/tool_sieve.go +++ b/internal/adapter/openai/tool_sieve.go @@ -96,10 +96,7 @@ func flushToolSieve(state *toolStreamSieveState, toolNames []string) []toolStrea events = append(events, toolStreamEvent{Content: consumedSuffix}) } } else { - raw := state.capture.String() - if raw != "" { - events = append(events, toolStreamEvent{Content: raw}) - } + // Incomplete captured tool JSON at stream end: suppress raw capture. } state.capture.Reset() state.capturing = false @@ -176,7 +173,9 @@ func consumeToolCapture(captured string, toolNames []string) (prefix string, cal } parsed := util.ParseToolCalls(obj, toolNames) if len(parsed) == 0 { - return captured[:end], nil, captured[end:], true + // `tool_calls` key exists but strict JSON parse failed. + // Drop the captured object body to avoid leaking raw tool JSON. + return captured[:start], nil, captured[end:], true } return captured[:start], parsed, captured[end:], true }