feat: Implement streaming incremental tool call deltas with a new tool sieve and standalone parser.

This commit is contained in:
CJACK
2026-02-18 16:10:35 +08:00
parent 19289c9008
commit 7beeea5779
9 changed files with 1324 additions and 114 deletions

View File

@@ -2,6 +2,7 @@
const crypto = require('crypto');
const TOOL_CALL_PATTERN = /\{\s*["']tool_calls["']\s*:\s*\[(.*?)\]\s*\}/s;
const TOOL_SIEVE_CAPTURE_LIMIT = 8 * 1024;
function extractToolNames(tools) {
if (!Array.isArray(tools) || tools.length === 0) {
@@ -26,9 +27,25 @@ function createToolSieveState() {
pending: '',
capture: '',
capturing: false,
hasMeaningfulText: false,
toolNameSent: false,
toolName: '',
toolArgsStart: -1,
toolArgsSent: -1,
toolArgsString: false,
toolArgsDone: false,
};
}
function resetIncrementalToolState(state) {
state.toolNameSent = false;
state.toolName = '';
state.toolArgsStart = -1;
state.toolArgsSent = -1;
state.toolArgsString = false;
state.toolArgsDone = false;
}
function processToolSieveChunk(state, chunk, toolNames) {
if (!state) {
return [];
@@ -44,13 +61,31 @@ function processToolSieveChunk(state, chunk, toolNames) {
state.capture += state.pending;
state.pending = '';
}
const consumed = consumeToolCapture(state.capture, toolNames);
const deltas = buildIncrementalToolDeltas(state);
if (deltas.length > 0) {
events.push({ type: 'tool_call_deltas', deltas });
}
const consumed = consumeToolCapture(state, toolNames);
if (!consumed.ready) {
if (state.capture.length > TOOL_SIEVE_CAPTURE_LIMIT) {
if (hasMeaningfulText(state.capture)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: state.capture });
state.capture = '';
state.capturing = false;
resetIncrementalToolState(state);
continue;
}
break;
}
state.capture = '';
state.capturing = false;
resetIncrementalToolState(state);
if (consumed.prefix) {
if (hasMeaningfulText(consumed.prefix)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: consumed.prefix });
}
if (Array.isArray(consumed.calls) && consumed.calls.length > 0) {
@@ -70,11 +105,15 @@ function processToolSieveChunk(state, chunk, toolNames) {
if (start >= 0) {
const prefix = state.pending.slice(0, start);
if (prefix) {
if (hasMeaningfulText(prefix)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: prefix });
}
state.capture = state.pending.slice(start);
state.pending = '';
state.capturing = true;
resetIncrementalToolState(state);
continue;
}
@@ -83,6 +122,9 @@ function processToolSieveChunk(state, chunk, toolNames) {
break;
}
state.pending = hold;
if (hasMeaningfulText(safe)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: safe });
}
return events;
@@ -94,24 +136,37 @@ function flushToolSieve(state, toolNames) {
}
const events = processToolSieveChunk(state, '', toolNames);
if (state.capturing) {
const consumed = consumeToolCapture(state.capture, toolNames);
const consumed = consumeToolCapture(state, toolNames);
if (consumed.ready) {
if (consumed.prefix) {
if (hasMeaningfulText(consumed.prefix)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: consumed.prefix });
}
if (Array.isArray(consumed.calls) && consumed.calls.length > 0) {
events.push({ type: 'tool_calls', calls: consumed.calls });
}
if (consumed.suffix) {
if (hasMeaningfulText(consumed.suffix)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: consumed.suffix });
}
} else if (state.capture) {
// Incomplete captured tool JSON at stream end: suppress raw capture.
if (hasMeaningfulText(state.capture)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: state.capture });
}
state.capture = '';
state.capturing = false;
resetIncrementalToolState(state);
}
if (state.pending) {
if (hasMeaningfulText(state.pending)) {
state.hasMeaningfulText = true;
}
events.push({ type: 'text', text: state.pending });
state.pending = '';
}
@@ -159,7 +214,8 @@ function findToolSegmentStart(s) {
return start >= 0 ? start : keyIdx;
}
function consumeToolCapture(captured, toolNames) {
function consumeToolCapture(state, toolNames) {
const captured = state.capture;
if (!captured) {
return { ready: false, prefix: '', calls: [], suffix: '' };
}
@@ -176,25 +232,361 @@ function consumeToolCapture(captured, toolNames) {
if (!obj.ok) {
return { ready: false, prefix: '', calls: [], suffix: '' };
}
const parsed = parseToolCalls(captured.slice(start, obj.end), toolNames);
if (parsed.length === 0) {
// `tool_calls` key exists but strict JSON parse failed.
// Drop the captured object body to avoid leaking raw tool JSON.
const prefixPart = captured.slice(0, start);
const suffixPart = captured.slice(obj.end);
if (!state.toolNameSent && (state.hasMeaningfulText || hasMeaningfulText(prefixPart) || hasMeaningfulText(suffixPart))) {
return {
ready: true,
prefix: captured.slice(0, start),
prefix: captured,
calls: [],
suffix: captured.slice(obj.end),
suffix: '',
};
}
const parsed = parseStandaloneToolCalls(captured.slice(start, obj.end), toolNames);
if (parsed.length === 0) {
if (state.toolNameSent) {
return {
ready: true,
prefix: prefixPart,
calls: [],
suffix: suffixPart,
};
}
return {
ready: true,
prefix: captured,
calls: [],
suffix: '',
};
}
if (state.toolNameSent) {
if (parsed.length > 1) {
return {
ready: true,
prefix: prefixPart,
calls: parsed.slice(1),
suffix: suffixPart,
};
}
return {
ready: true,
prefix: prefixPart,
calls: [],
suffix: suffixPart,
};
}
return {
ready: true,
prefix: captured.slice(0, start),
prefix: prefixPart,
calls: parsed,
suffix: captured.slice(obj.end),
suffix: suffixPart,
};
}
function buildIncrementalToolDeltas(state) {
const captured = state.capture || '';
if (!captured || state.hasMeaningfulText) {
return [];
}
const lower = captured.toLowerCase();
const keyIdx = lower.indexOf('tool_calls');
if (keyIdx < 0) {
return [];
}
const start = captured.slice(0, keyIdx).lastIndexOf('{');
if (start < 0 || hasMeaningfulText(captured.slice(0, start))) {
return [];
}
const callStart = findFirstToolCallObjectStart(captured, keyIdx);
if (callStart < 0) {
return [];
}
const deltas = [];
if (!state.toolName) {
const name = extractToolCallName(captured, callStart);
if (!name) {
return [];
}
state.toolName = name;
}
if (state.toolArgsStart < 0) {
const args = findToolCallArgsStart(captured, callStart);
if (args) {
state.toolArgsString = Boolean(args.stringMode);
state.toolArgsStart = state.toolArgsString ? args.start + 1 : args.start;
state.toolArgsSent = state.toolArgsStart;
}
}
if (!state.toolNameSent) {
if (state.toolArgsStart < 0) {
return [];
}
state.toolNameSent = true;
deltas.push({ index: 0, name: state.toolName });
}
if (state.toolArgsStart < 0 || state.toolArgsDone) {
return deltas;
}
const progress = scanToolCallArgsProgress(captured, state.toolArgsStart, state.toolArgsString);
if (!progress) {
return deltas;
}
if (progress.end > state.toolArgsSent) {
deltas.push({
index: 0,
arguments: captured.slice(state.toolArgsSent, progress.end),
});
state.toolArgsSent = progress.end;
}
if (progress.complete) {
state.toolArgsDone = true;
}
return deltas;
}
function findFirstToolCallObjectStart(text, keyIdx) {
const arrStart = findToolCallsArrayStart(text, keyIdx);
if (arrStart < 0) {
return -1;
}
const i = skipSpaces(text, arrStart + 1);
if (i >= text.length || text[i] !== '{') {
return -1;
}
return i;
}
function findToolCallsArrayStart(text, keyIdx) {
let i = keyIdx + 'tool_calls'.length;
while (i < text.length && text[i] !== ':') {
i += 1;
}
if (i >= text.length) {
return -1;
}
i = skipSpaces(text, i + 1);
if (i >= text.length || text[i] !== '[') {
return -1;
}
return i;
}
function extractToolCallName(text, callStart) {
let valueStart = findObjectFieldValueStart(text, callStart, ['name']);
if (valueStart < 0 || text[valueStart] !== '"') {
const fnStart = findFunctionObjectStart(text, callStart);
if (fnStart < 0) {
return '';
}
valueStart = findObjectFieldValueStart(text, fnStart, ['name']);
if (valueStart < 0 || text[valueStart] !== '"') {
return '';
}
}
const parsed = parseJSONStringLiteral(text, valueStart);
if (!parsed) {
return '';
}
return parsed.value;
}
function findToolCallArgsStart(text, callStart) {
const keys = ['input', 'arguments', 'args', 'parameters', 'params'];
let valueStart = findObjectFieldValueStart(text, callStart, keys);
if (valueStart < 0) {
const fnStart = findFunctionObjectStart(text, callStart);
if (fnStart < 0) {
return null;
}
valueStart = findObjectFieldValueStart(text, fnStart, keys);
if (valueStart < 0) {
return null;
}
}
if (valueStart >= text.length) {
return null;
}
const ch = text[valueStart];
if (ch === '{' || ch === '[') {
return { start: valueStart, stringMode: false };
}
if (ch === '"') {
return { start: valueStart, stringMode: true };
}
return null;
}
function scanToolCallArgsProgress(text, start, stringMode) {
if (start < 0 || start > text.length) {
return null;
}
if (stringMode) {
let escaped = false;
for (let i = start; i < text.length; i += 1) {
const ch = text[i];
if (escaped) {
escaped = false;
continue;
}
if (ch === '\\') {
escaped = true;
continue;
}
if (ch === '"') {
return { end: i, complete: true };
}
}
return { end: text.length, complete: false };
}
if (start >= text.length || (text[start] !== '{' && text[start] !== '[')) {
return null;
}
let depth = 0;
let quote = '';
let escaped = false;
for (let i = start; i < text.length; i += 1) {
const ch = text[i];
if (quote) {
if (escaped) {
escaped = false;
continue;
}
if (ch === '\\') {
escaped = true;
continue;
}
if (ch === quote) {
quote = '';
}
continue;
}
if (ch === '"' || ch === "'") {
quote = ch;
continue;
}
if (ch === '{' || ch === '[') {
depth += 1;
continue;
}
if (ch === '}' || ch === ']') {
depth -= 1;
if (depth === 0) {
return { end: i + 1, complete: true };
}
}
}
return { end: text.length, complete: false };
}
function findObjectFieldValueStart(text, objStart, keys) {
if (!text || objStart < 0 || objStart >= text.length || text[objStart] !== '{') {
return -1;
}
let depth = 0;
let quote = '';
let escaped = false;
for (let i = objStart; i < text.length; i += 1) {
const ch = text[i];
if (quote) {
if (escaped) {
escaped = false;
continue;
}
if (ch === '\\') {
escaped = true;
continue;
}
if (ch === quote) {
quote = '';
}
continue;
}
if (ch === '"' || ch === "'") {
if (depth === 1) {
const parsed = parseJSONStringLiteral(text, i);
if (!parsed) {
return -1;
}
let j = skipSpaces(text, parsed.end);
if (j >= text.length || text[j] !== ':') {
i = parsed.end - 1;
continue;
}
j = skipSpaces(text, j + 1);
if (j >= text.length) {
return -1;
}
if (keys.includes(parsed.value)) {
return j;
}
i = j - 1;
continue;
}
quote = ch;
continue;
}
if (ch === '{') {
depth += 1;
continue;
}
if (ch === '}') {
depth -= 1;
if (depth === 0) {
break;
}
}
}
return -1;
}
function findFunctionObjectStart(text, callStart) {
const valueStart = findObjectFieldValueStart(text, callStart, ['function']);
if (valueStart < 0 || valueStart >= text.length || text[valueStart] !== '{') {
return -1;
}
return valueStart;
}
function parseJSONStringLiteral(text, start) {
if (!text || start < 0 || start >= text.length || text[start] !== '"') {
return null;
}
let out = '';
let escaped = false;
for (let i = start + 1; i < text.length; i += 1) {
const ch = text[i];
if (escaped) {
out += ch;
escaped = false;
continue;
}
if (ch === '\\') {
escaped = true;
continue;
}
if (ch === '"') {
return { value: out, end: i + 1 };
}
out += ch;
}
return null;
}
function skipSpaces(text, i) {
let idx = i;
while (idx < text.length) {
const ch = text[idx];
if (ch === ' ' || ch === '\t' || ch === '\n' || ch === '\r') {
idx += 1;
continue;
}
break;
}
return idx;
}
function extractJSONObjectFrom(text, start) {
if (!text || start < 0 || start >= text.length || text[start] !== '{') {
return { ok: false, end: 0 };
@@ -251,26 +643,35 @@ function parseToolCalls(text, toolNames) {
if (parsed.length === 0) {
return [];
}
const allowed = new Set((toolNames || []).filter(Boolean));
const out = [];
for (const tc of parsed) {
if (!tc || !tc.name) {
continue;
}
if (allowed.size > 0 && !allowed.has(tc.name)) {
continue;
}
out.push({ name: tc.name, input: tc.input || {} });
return filterToolCalls(parsed, toolNames);
}
function parseStandaloneToolCalls(text, toolNames) {
const trimmed = toStringSafe(text);
if (!trimmed) {
return [];
}
if (out.length === 0 && parsed.length > 0) {
for (const tc of parsed) {
if (!tc || !tc.name) {
continue;
}
out.push({ name: tc.name, input: tc.input || {} });
const candidates = [trimmed];
if (trimmed.startsWith('```') && trimmed.endsWith('```')) {
const m = trimmed.match(/```(?:json)?\s*([\s\S]*?)\s*```/i);
if (m && m[1]) {
candidates.push(toStringSafe(m[1]));
}
}
return out;
for (const candidate of candidates) {
const c = toStringSafe(candidate);
if (!c) {
continue;
}
if (!c.startsWith('{') && !c.startsWith('[')) {
continue;
}
const parsed = parseToolCallsPayload(c);
if (parsed.length > 0) {
return filterToolCalls(parsed, toolNames);
}
}
return [];
}
function buildToolCallCandidates(text) {
@@ -432,6 +833,33 @@ function parseToolCallInput(v) {
return {};
}
function filterToolCalls(parsed, toolNames) {
const allowed = new Set((toolNames || []).filter(Boolean));
const out = [];
for (const tc of parsed) {
if (!tc || !tc.name) {
continue;
}
if (allowed.size > 0 && !allowed.has(tc.name)) {
continue;
}
out.push({ name: tc.name, input: tc.input || {} });
}
if (out.length === 0 && parsed.length > 0) {
for (const tc of parsed) {
if (!tc || !tc.name) {
continue;
}
out.push({ name: tc.name, input: tc.input || {} });
}
}
return out;
}
function hasMeaningfulText(text) {
return toStringSafe(text) !== '';
}
function formatOpenAIStreamToolCalls(calls) {
if (!Array.isArray(calls) || calls.length === 0) {
return [];
@@ -473,5 +901,6 @@ module.exports = {
processToolSieveChunk,
flushToolSieve,
parseToolCalls,
parseStandaloneToolCalls,
formatOpenAIStreamToolCalls,
};

View File

@@ -9,6 +9,7 @@ const {
processToolSieveChunk,
flushToolSieve,
parseToolCalls,
parseStandaloneToolCalls,
} = require('./stream-tool-sieve');
function runSieve(chunks, toolNames) {
@@ -73,6 +74,15 @@ test('parseToolCalls supports fenced json and function.arguments string payload'
assert.deepEqual(calls[0].input, { path: 'README.md' });
});
test('parseStandaloneToolCalls only matches standalone payload and ignores mixed prose', () => {
const mixed = '这里是示例:{"tool_calls":[{"name":"read_file","input":{"path":"README.MD"}}]},请勿执行。';
const standalone = '{"tool_calls":[{"name":"read_file","input":{"path":"README.MD"}}]}';
const mixedCalls = parseStandaloneToolCalls(mixed, ['read_file']);
const standaloneCalls = parseStandaloneToolCalls(standalone, ['read_file']);
assert.equal(mixedCalls.length, 0);
assert.equal(standaloneCalls.length, 1);
});
test('sieve emits tool_calls and does not leak suspicious prefix on late key convergence', () => {
const events = runSieve(
[
@@ -84,13 +94,14 @@ test('sieve emits tool_calls and does not leak suspicious prefix on late key con
);
const leakedText = collectText(events);
const hasToolCall = events.some((evt) => evt.type === 'tool_calls' && Array.isArray(evt.calls) && evt.calls.length > 0);
assert.equal(hasToolCall, true);
const hasToolDelta = events.some((evt) => evt.type === 'tool_call_deltas' && Array.isArray(evt.deltas) && evt.deltas.length > 0);
assert.equal(hasToolCall || hasToolDelta, true);
assert.equal(leakedText.includes('{'), false);
assert.equal(leakedText.toLowerCase().includes('tool_calls'), false);
assert.equal(leakedText.includes('后置正文C。'), true);
});
test('sieve drops invalid tool json body while preserving surrounding text', () => {
test('sieve keeps embedded invalid tool-like json as normal text to avoid stream stalls', () => {
const events = runSieve(
[
'前置正文D。',
@@ -104,18 +115,18 @@ test('sieve drops invalid tool json body while preserving surrounding text', ()
assert.equal(hasToolCall, false);
assert.equal(leakedText.includes('前置正文D。'), true);
assert.equal(leakedText.includes('后置正文E。'), true);
assert.equal(leakedText.toLowerCase().includes('tool_calls'), false);
assert.equal(leakedText.toLowerCase().includes('tool_calls'), true);
});
test('sieve suppresses incomplete captured tool json on stream finalize', () => {
test('sieve flushes incomplete captured tool json as text on stream finalize', () => {
const events = runSieve(
['前置正文F。', '{"tool_calls":[{"name":"read_file"'],
['read_file'],
);
const leakedText = collectText(events);
assert.equal(leakedText.includes('前置正文F。'), true);
assert.equal(leakedText.toLowerCase().includes('tool_calls'), false);
assert.equal(leakedText.includes('{'), false);
assert.equal(leakedText.toLowerCase().includes('tool_calls'), true);
assert.equal(leakedText.includes('{'), true);
});
test('sieve keeps plain text intact in tool mode when no tool call appears', () => {
@@ -128,3 +139,29 @@ test('sieve keeps plain text intact in tool mode when no tool call appears', ()
assert.equal(hasToolCall, false);
assert.equal(leakedText, '你好,这是普通文本回复。请继续。');
});
test('sieve emits incremental tool_call_deltas for split arguments payload', () => {
const state = createToolSieveState();
const first = processToolSieveChunk(
state,
'{"tool_calls":[{"name":"read_file","input":{"path":"READ',
['read_file'],
);
const second = processToolSieveChunk(
state,
'ME.MD","mode":"head"}}]}',
['read_file'],
);
const tail = flushToolSieve(state, ['read_file']);
const events = [...first, ...second, ...tail];
const deltaEvents = events.filter((evt) => evt.type === 'tool_call_deltas');
assert.equal(deltaEvents.length > 0, true);
const merged = deltaEvents.flatMap((evt) => evt.deltas || []);
const hasName = merged.some((d) => d.name === 'read_file');
const argsJoined = merged
.map((d) => d.arguments || '')
.join('');
assert.equal(hasName, true);
assert.equal(argsJoined.includes('"path":"README.MD"'), true);
assert.equal(argsJoined.includes('"mode":"head"'), true);
});