Files
ds2api/internal/js/chat-stream/stream_emitter.js
d407ccb773 perf(streaming): optimize TTFT and reduce buffering latency
Core changes:
- stream.go: New accumulation buffer architecture with scanner goroutine
  + select loop, MinChars=16, MaxWait=10ms, first-flush-immediate
- dedupe.go: Add TrimContinuationOverlapFromBuilder to avoid string copies
- claude/stream_runtime_core.go: Integrate toolstream for incremental text
- claude/stream_runtime_finalize.go: toolstream flush support
- stream_emitter.js: Reduce DeltaCoalescer thresholds (160->16 chars, 80->20ms)
- empty_retry: Add thinking-aware empty output detection
- Fix reasoning_content leak and finish_reason=null in edge cases
- Fix tail content truncation when max_tokens exceeded

Tests: sync test expectations with upstream for thinking content
2026-05-02 20:28:30 +08:00

99 lines
2.0 KiB
JavaScript

'use strict';
const MIN_DELTA_FLUSH_CHARS = 16;
const MAX_DELTA_FLUSH_WAIT_MS = 20;
function createChatCompletionEmitter({ res, sessionID, created, model, isClosed }) {
let firstChunkSent = false;
const sendFrame = (obj) => {
if (isClosed() || res.writableEnded || res.destroyed) {
return;
}
res.write(`data: ${JSON.stringify(obj)}\n\n`);
if (typeof res.flush === 'function') {
res.flush();
}
};
const sendDeltaFrame = (delta) => {
const payloadDelta = { ...delta };
if (!firstChunkSent) {
payloadDelta.role = 'assistant';
firstChunkSent = true;
}
sendFrame({
id: sessionID,
object: 'chat.completion.chunk',
created,
model,
choices: [{ delta: payloadDelta, index: 0 }],
});
};
return {
sendFrame,
sendDeltaFrame,
};
}
function createDeltaCoalescer({ sendDeltaFrame, minFlushChars = MIN_DELTA_FLUSH_CHARS, maxFlushWaitMS = MAX_DELTA_FLUSH_WAIT_MS }) {
let pendingField = '';
let pendingText = '';
let flushTimer = null;
const clearFlushTimer = () => {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
};
const flush = () => {
clearFlushTimer();
if (!pendingField || !pendingText) {
return;
}
const delta = { [pendingField]: pendingText };
pendingField = '';
pendingText = '';
sendDeltaFrame(delta);
};
const scheduleFlush = () => {
if (flushTimer || maxFlushWaitMS <= 0) {
return;
}
flushTimer = setTimeout(flush, maxFlushWaitMS);
if (typeof flushTimer.unref === 'function') {
flushTimer.unref();
}
};
const append = (field, text) => {
if (!field || !text) {
return;
}
if (pendingField && pendingField !== field) {
flush();
}
pendingField = field;
pendingText += text;
if ([...pendingText].length >= minFlushChars) {
flush();
return;
}
scheduleFlush();
};
return {
append,
flush,
};
}
module.exports = {
createChatCompletionEmitter,
createDeltaCoalescer,
};