feat: enhance tool call streaming and anti-leakage by suppressing invalid or incomplete tool JSON and refining detection in Node.js.

This commit is contained in:
CJACK
2026-02-17 13:18:52 +08:00
parent d21fb74f29
commit 6697d0d227
8 changed files with 143 additions and 33 deletions

View File

@@ -211,14 +211,15 @@ Vercel Go Runtime applies platform-level response buffering, so this project use
1. `api/chat-stream.js` receives `/v1/chat/completions` request
2. Node calls Go internal prepare endpoint (`?__stream_prepare=1`) for session ID, PoW, token
3. Go prepare creates a stream lease, locking the account
4. Node connects directly to DeepSeek upstream, relays SSE in real-time to client
4. Node connects directly to DeepSeek upstream, relays SSE in real-time to client (including OpenAI chunk framing and tools anti-leak sieve)
5. After stream ends, Node calls Go release endpoint (`?__stream_release=1`) to free the account
> This adaptation is **Vercel-only**; local and Docker remain pure Go.
#### Non-Stream and Tool Call Fallback
#### Non-Stream Fallback and Tool Call Handling
- `api/chat-stream.js` automatically falls back to Go entry (`?__go=1`) for non-stream requests or requests with `tools`
- `api/chat-stream.js` falls back to Go entry (`?__go=1`) for non-stream requests only
- Streaming requests (including requests with `tools`) stay on the Node path and use Go-aligned tool-call anti-leak handling
- WebUI non-stream test calls `?__go=1` directly to avoid Node hop timeout on long requests
#### Function Duration

View File

@@ -211,14 +211,15 @@ api/index.go api/chat-stream.js
1. `api/chat-stream.js` 收到 `/v1/chat/completions` 请求
2. Node 调用 Go 内部 prepare 接口(`?__stream_prepare=1`),获取会话 ID、PoW、token 等
3. Go prepare 创建 stream lease锁定账号
4. Node 直连 DeepSeek 上游,实时流式转发 SSE 给客户端
4. Node 直连 DeepSeek 上游,实时流式转发 SSE 给客户端(含 OpenAI chunk 封装与 tools 防泄漏筛分)
5. 流结束后 Node 调用 Go release 接口(`?__stream_release=1`),释放账号
> 该适配**仅在 Vercel 环境生效**;本地与 Docker 仍走纯 Go 链路。
#### 非流式与 Tool Call 回退
#### 非流式回退与 Tool Call 处理
- `api/chat-stream.js` 对非流式请求或带 `tools` 的请求会自动回退到 Go 入口(`?__go=1`
- `api/chat-stream.js` 对非流式请求回退到 Go 入口(`?__go=1`
- 流式请求(包括带 `tools`)走 Node 路径,并执行与 Go 对齐的 tool-call 防泄漏处理
- WebUI 的"非流式测试"直接请求 `?__go=1`,避免 Node 中转造成长请求超时
#### 函数时长

View File

@@ -131,7 +131,7 @@ docker-compose logs -f
3. 配置环境变量(至少设置 `DS2API_ADMIN_KEY` 和 `DS2API_CONFIG_JSON`
4. 部署
> **流式说明**`/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`Node Runtime以保证实时 SSE。鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口完成Node 端仅转发流数据
> **流式说明**`/v1/chat/completions` 在 Vercel 上默认走 `api/chat-stream.js`Node Runtime以保证实时 SSE。鉴权、账号选择、会话/PoW 准备仍由 Go 内部 prepare 接口完成;流式响应(含 `tools`)在 Node 侧执行与 Go 对齐的输出组装与防泄漏处理
详细部署说明请参阅 [部署指南](DEPLOY.md)。

View File

@@ -131,7 +131,7 @@ Rebuild after updates: `docker-compose up -d --build`
3. Set environment variables (minimum: `DS2API_ADMIN_KEY` and `DS2API_CONFIG_JSON`)
4. Deploy
> **Streaming note**: `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) for real-time SSE. Auth, account selection, session/PoW preparation are still handled by the Go internal prepare endpoint; Node only relays stream data.
> **Streaming note**: `/v1/chat/completions` on Vercel is routed to `api/chat-stream.js` (Node Runtime) for real-time SSE. Auth, account selection, and session/PoW preparation are still handled by the Go internal prepare endpoint; streaming output (including `tools`) is assembled on Node with Go-aligned anti-leak handling.
For detailed deployment instructions, see the [Deployment Guide](DEPLOY.en.md).

View File

@@ -5,6 +5,7 @@ const {
createToolSieveState,
processToolSieveChunk,
flushToolSieve,
parseToolCalls,
formatOpenAIStreamToolCalls,
} = require('./helpers/stream-tool-sieve');
@@ -155,20 +156,19 @@ module.exports = async function handler(req, res) {
return;
}
ended = true;
if (toolSieveEnabled) {
const detected = parseToolCalls(outputText, toolNames);
if (detected.length > 0 && !toolCallsEmitted) {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected) });
} else if (toolSieveEnabled) {
const tailEvents = flushToolSieve(toolSieveState, toolNames);
for (const evt of tailEvents) {
if (evt.type === 'tool_calls') {
toolCallsEmitted = true;
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls) });
continue;
}
if (evt.text) {
sendDeltaFrame({ content: evt.text });
}
}
}
if (toolCallsEmitted) {
if (detected.length > 0 || toolCallsEmitted) {
reason = 'tool_calls';
}
sendFrame({
@@ -233,8 +233,10 @@ module.exports = async function handler(req, res) {
continue;
}
if (p.type === 'thinking') {
thinkingText += p.text;
sendDeltaFrame({ reasoning_content: p.text });
if (thinkingEnabled) {
thinkingText += p.text;
sendDeltaFrame({ reasoning_content: p.text });
}
} else {
outputText += p.text;
if (!toolSieveEnabled) {

View File

@@ -1,6 +1,7 @@
'use strict';
const crypto = require('crypto');
const TOOL_CALL_PATTERN = /\{\s*["']tool_calls["']\s*:\s*\[(.*?)\]\s*\}/s;
function extractToolNames(tools) {
if (!Array.isArray(tools) || tools.length === 0) {
@@ -105,7 +106,7 @@ function flushToolSieve(state, toolNames) {
events.push({ type: 'text', text: consumed.suffix });
}
} else if (state.capture) {
events.push({ type: 'text', text: state.capture });
// Incomplete captured tool JSON at stream end: suppress raw capture.
}
state.capture = '';
state.capturing = false;
@@ -177,9 +178,11 @@ function consumeToolCapture(captured, toolNames) {
}
const parsed = parseToolCalls(captured.slice(start, obj.end), toolNames);
if (parsed.length === 0) {
// `tool_calls` key exists but strict JSON parse failed.
// Drop the captured object body to avoid leaking raw tool JSON.
return {
ready: true,
prefix: captured.slice(0, obj.end),
prefix: captured.slice(0, start),
calls: [],
suffix: captured.slice(obj.end),
};
@@ -280,24 +283,53 @@ function buildToolCallCandidates(text) {
candidates.push(toStringSafe(m[1]));
}
}
const keyIdx = trimmed.toLowerCase().indexOf('tool_calls');
if (keyIdx >= 0) {
const start = trimmed.slice(0, keyIdx).lastIndexOf('{');
if (start >= 0) {
const obj = extractJSONObjectFrom(trimmed, start);
if (obj.ok) {
candidates.push(toStringSafe(trimmed.slice(start, obj.end)));
}
}
for (const candidate of extractToolCallObjects(trimmed)) {
candidates.push(toStringSafe(candidate));
}
const first = trimmed.indexOf('{');
const last = trimmed.lastIndexOf('}');
if (first >= 0 && last > first) {
candidates.push(toStringSafe(trimmed.slice(first, last + 1)));
}
const m = trimmed.match(TOOL_CALL_PATTERN);
if (m && m[1]) {
candidates.push(`{"tool_calls":[${m[1]}]}`);
}
return [...new Set(candidates.filter(Boolean))];
}
function extractToolCallObjects(text) {
const raw = toStringSafe(text);
if (!raw) {
return [];
}
const lower = raw.toLowerCase();
const out = [];
let offset = 0;
// eslint-disable-next-line no-constant-condition
while (true) {
let idx = lower.indexOf('tool_calls', offset);
if (idx < 0) {
break;
}
let start = raw.slice(0, idx).lastIndexOf('{');
while (start >= 0) {
const obj = extractJSONObjectFrom(raw, start);
if (obj.ok) {
out.push(raw.slice(start, obj.end).trim());
offset = obj.end;
idx = -1;
break;
}
start = raw.slice(0, start).lastIndexOf('{');
}
if (idx >= 0) {
offset = idx + 'tool_calls'.length;
}
}
return out;
}
function parseToolCallsPayload(payload) {
let decoded;
try {
@@ -440,5 +472,6 @@ module.exports = {
createToolSieveState,
processToolSieveChunk,
flushToolSieve,
parseToolCalls,
formatOpenAIStreamToolCalls,
};

View File

@@ -463,3 +463,77 @@ func TestHandleStreamToolCallKeyAppearsLateStillNoPrefixLeak(t *testing.T) {
t.Fatalf("expected finish_reason=tool_calls, body=%s", rec.Body.String())
}
}
func TestHandleStreamInvalidToolJSONDoesNotLeakRawObject(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/content","v":"前置正文D。"}`,
`data: {"p":"response/content","v":"{'tool_calls':[{'name':'search','input':{'q':'go'}}]}"}`,
`data: {"p":"response/content","v":"后置正文E。"}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid9", "deepseek-chat", "prompt", false, false, []string{"search"})
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
if streamHasToolCallsDelta(frames) {
t.Fatalf("did not expect tool_calls delta for invalid json, body=%s", rec.Body.String())
}
content := strings.Builder{}
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if c, ok := delta["content"].(string); ok {
content.WriteString(c)
}
}
}
got := strings.ToLower(content.String())
if strings.Contains(got, "tool_calls") {
t.Fatalf("unexpected raw tool_calls leak in content: %q", content.String())
}
if !strings.Contains(content.String(), "前置正文D。") || !strings.Contains(content.String(), "后置正文E。") {
t.Fatalf("expected pre/post plain text to remain, got=%q", content.String())
}
}
func TestHandleStreamIncompleteCapturedToolJSONDoesNotLeakOnFinalize(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/content","v":"{\"tool_calls\":[{\"name\":\"search\""}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid10", "deepseek-chat", "prompt", false, false, []string{"search"})
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
if streamHasToolCallsDelta(frames) {
t.Fatalf("did not expect tool_calls delta for incomplete json, body=%s", rec.Body.String())
}
content := strings.Builder{}
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if c, ok := delta["content"].(string); ok {
content.WriteString(c)
}
}
}
if strings.Contains(strings.ToLower(content.String()), "tool_calls") || strings.Contains(content.String(), "{") {
t.Fatalf("unexpected incomplete tool json leak in content: %q", content.String())
}
}

View File

@@ -96,10 +96,7 @@ func flushToolSieve(state *toolStreamSieveState, toolNames []string) []toolStrea
events = append(events, toolStreamEvent{Content: consumedSuffix})
}
} else {
raw := state.capture.String()
if raw != "" {
events = append(events, toolStreamEvent{Content: raw})
}
// Incomplete captured tool JSON at stream end: suppress raw capture.
}
state.capture.Reset()
state.capturing = false
@@ -176,7 +173,9 @@ func consumeToolCapture(captured string, toolNames []string) (prefix string, cal
}
parsed := util.ParseToolCalls(obj, toolNames)
if len(parsed) == 0 {
return captured[:end], nil, captured[end:], true
// `tool_calls` key exists but strict JSON parse failed.
// Drop the captured object body to avoid leaking raw tool JSON.
return captured[:start], nil, captured[end:], true
}
return captured[:start], parsed, captured[end:], true
}