mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-20 16:07:47 +08:00
feat: Implement DeepSeek integration, refactor model adapters for streaming and tool calls, enhance admin and account management, and introduce new UI features for settings, API testing, and Vercel sync.
This commit is contained in:
297
api/chat-stream/vercel_stream.js
Normal file
297
api/chat-stream/vercel_stream.js
Normal file
@@ -0,0 +1,297 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
extractToolNames,
|
||||
createToolSieveState,
|
||||
processToolSieveChunk,
|
||||
flushToolSieve,
|
||||
parseToolCalls,
|
||||
formatOpenAIStreamToolCalls,
|
||||
} = require('../helpers/stream-tool-sieve');
|
||||
const {
|
||||
BASE_HEADERS,
|
||||
} = require('../shared/deepseek-constants');
|
||||
|
||||
const {
|
||||
writeOpenAIError,
|
||||
} = require('./error_shape');
|
||||
const {
|
||||
parseChunkForContent,
|
||||
isCitation,
|
||||
} = require('./sse_parse');
|
||||
const {
|
||||
buildUsage,
|
||||
} = require('./token_usage');
|
||||
const {
|
||||
resolveToolcallPolicy,
|
||||
formatIncrementalToolCallDeltas,
|
||||
} = require('./toolcall_policy');
|
||||
const {
|
||||
createChatCompletionEmitter,
|
||||
} = require('./stream_emitter');
|
||||
const {
|
||||
asString,
|
||||
isAbortError,
|
||||
fetchStreamPrepare,
|
||||
relayPreparedFailure,
|
||||
safeReadText,
|
||||
createLeaseReleaser,
|
||||
} = require('./http_internal');
|
||||
|
||||
const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion';
|
||||
|
||||
async function handleVercelStream(req, res, rawBody, payload) {
|
||||
const prep = await fetchStreamPrepare(req, rawBody);
|
||||
if (!prep.ok) {
|
||||
relayPreparedFailure(res, prep);
|
||||
return;
|
||||
}
|
||||
|
||||
const model = asString(prep.body.model) || asString(payload.model);
|
||||
const sessionID = asString(prep.body.session_id) || `chatcmpl-${Date.now()}`;
|
||||
const leaseID = asString(prep.body.lease_id);
|
||||
const deepseekToken = asString(prep.body.deepseek_token);
|
||||
const powHeader = asString(prep.body.pow_header);
|
||||
const completionPayload = prep.body.payload && typeof prep.body.payload === 'object' ? prep.body.payload : null;
|
||||
const finalPrompt = asString(prep.body.final_prompt);
|
||||
const thinkingEnabled = toBool(prep.body.thinking_enabled);
|
||||
const searchEnabled = toBool(prep.body.search_enabled);
|
||||
const toolPolicy = resolveToolcallPolicy(prep.body, payload.tools);
|
||||
const toolNames = toolPolicy.toolNames;
|
||||
|
||||
if (!model || !leaseID || !deepseekToken || !powHeader || !completionPayload) {
|
||||
writeOpenAIError(res, 500, 'invalid vercel prepare response');
|
||||
return;
|
||||
}
|
||||
|
||||
const releaseLease = createLeaseReleaser(req, leaseID);
|
||||
const upstreamController = new AbortController();
|
||||
let clientClosed = false;
|
||||
let reader = null;
|
||||
const markClientClosed = () => {
|
||||
if (clientClosed) {
|
||||
return;
|
||||
}
|
||||
clientClosed = true;
|
||||
upstreamController.abort();
|
||||
if (reader && typeof reader.cancel === 'function') {
|
||||
Promise.resolve(reader.cancel()).catch(() => {});
|
||||
}
|
||||
};
|
||||
const onReqAborted = () => markClientClosed();
|
||||
const onResClose = () => {
|
||||
if (!res.writableEnded) {
|
||||
markClientClosed();
|
||||
}
|
||||
};
|
||||
req.on('aborted', onReqAborted);
|
||||
res.on('close', onResClose);
|
||||
|
||||
try {
|
||||
let completionRes;
|
||||
try {
|
||||
completionRes = await fetch(DEEPSEEK_COMPLETION_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
...BASE_HEADERS,
|
||||
authorization: `Bearer ${deepseekToken}`,
|
||||
'x-ds-pow-response': powHeader,
|
||||
},
|
||||
body: JSON.stringify(completionPayload),
|
||||
signal: upstreamController.signal,
|
||||
});
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
if (clientClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!completionRes.ok || !completionRes.body) {
|
||||
const detail = await safeReadText(completionRes);
|
||||
writeOpenAIError(res, 500, detail ? `Failed to get completion: ${detail}` : 'Failed to get completion.');
|
||||
return;
|
||||
}
|
||||
|
||||
res.statusCode = 200;
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('X-Accel-Buffering', 'no');
|
||||
if (typeof res.flushHeaders === 'function') {
|
||||
res.flushHeaders();
|
||||
}
|
||||
|
||||
const created = Math.floor(Date.now() / 1000);
|
||||
let currentType = thinkingEnabled ? 'thinking' : 'text';
|
||||
let thinkingText = '';
|
||||
let outputText = '';
|
||||
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
|
||||
const emitEarlyToolDeltas = toolPolicy.emitEarlyToolDeltas;
|
||||
const toolSieveState = createToolSieveState();
|
||||
let toolCallsEmitted = false;
|
||||
const streamToolCallIDs = new Map();
|
||||
const decoder = new TextDecoder();
|
||||
reader = completionRes.body.getReader();
|
||||
let buffered = '';
|
||||
let ended = false;
|
||||
const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({
|
||||
res,
|
||||
sessionID,
|
||||
created,
|
||||
model,
|
||||
isClosed: () => clientClosed,
|
||||
});
|
||||
|
||||
const finish = async (reason) => {
|
||||
if (ended) {
|
||||
return;
|
||||
}
|
||||
ended = true;
|
||||
if (clientClosed || res.writableEnded || res.destroyed) {
|
||||
await releaseLease();
|
||||
return;
|
||||
}
|
||||
const detected = parseToolCalls(outputText, toolNames);
|
||||
if (detected.length > 0 && !toolCallsEmitted) {
|
||||
toolCallsEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(detected) });
|
||||
} else if (toolSieveEnabled) {
|
||||
const tailEvents = flushToolSieve(toolSieveState, toolNames);
|
||||
for (const evt of tailEvents) {
|
||||
if (evt.text) {
|
||||
sendDeltaFrame({ content: evt.text });
|
||||
}
|
||||
}
|
||||
}
|
||||
if (detected.length > 0 || toolCallsEmitted) {
|
||||
reason = 'tool_calls';
|
||||
}
|
||||
sendFrame({
|
||||
id: sessionID,
|
||||
object: 'chat.completion.chunk',
|
||||
created,
|
||||
model,
|
||||
choices: [{ delta: {}, index: 0, finish_reason: reason }],
|
||||
usage: buildUsage(finalPrompt, thinkingText, outputText),
|
||||
});
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.write('data: [DONE]\n\n');
|
||||
}
|
||||
await releaseLease();
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.end();
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
if (clientClosed) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
buffered += decoder.decode(value, { stream: true });
|
||||
const lines = buffered.split('\n');
|
||||
buffered = lines.pop() || '';
|
||||
|
||||
for (const rawLine of lines) {
|
||||
const line = rawLine.trim();
|
||||
if (!line.startsWith('data:')) {
|
||||
continue;
|
||||
}
|
||||
const dataStr = line.slice(5).trim();
|
||||
if (!dataStr) {
|
||||
continue;
|
||||
}
|
||||
if (dataStr === '[DONE]') {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
let chunk;
|
||||
try {
|
||||
chunk = JSON.parse(dataStr);
|
||||
} catch (_err) {
|
||||
continue;
|
||||
}
|
||||
if (chunk.error || chunk.code === 'content_filter') {
|
||||
await finish('content_filter');
|
||||
return;
|
||||
}
|
||||
const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType);
|
||||
currentType = parsed.newType;
|
||||
if (parsed.finished) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const p of parsed.parts) {
|
||||
if (!p.text) {
|
||||
continue;
|
||||
}
|
||||
if (searchEnabled && isCitation(p.text)) {
|
||||
continue;
|
||||
}
|
||||
if (p.type === 'thinking') {
|
||||
if (thinkingEnabled) {
|
||||
thinkingText += p.text;
|
||||
sendDeltaFrame({ reasoning_content: p.text });
|
||||
}
|
||||
} else {
|
||||
outputText += p.text;
|
||||
if (!toolSieveEnabled) {
|
||||
sendDeltaFrame({ content: p.text });
|
||||
continue;
|
||||
}
|
||||
const events = processToolSieveChunk(toolSieveState, p.text, toolNames);
|
||||
for (const evt of events) {
|
||||
if (evt.type === 'tool_call_deltas' && Array.isArray(evt.deltas) && evt.deltas.length > 0) {
|
||||
if (!emitEarlyToolDeltas) {
|
||||
continue;
|
||||
}
|
||||
toolCallsEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatIncrementalToolCallDeltas(evt.deltas, streamToolCallIDs) });
|
||||
continue;
|
||||
}
|
||||
if (evt.type === 'tool_calls') {
|
||||
toolCallsEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls) });
|
||||
continue;
|
||||
}
|
||||
if (evt.text) {
|
||||
sendDeltaFrame({ content: evt.text });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
await finish('stop');
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
await finish('stop');
|
||||
}
|
||||
} finally {
|
||||
req.removeListener('aborted', onReqAborted);
|
||||
res.removeListener('close', onResClose);
|
||||
await releaseLease();
|
||||
}
|
||||
}
|
||||
|
||||
function toBool(v) {
|
||||
return v === true;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
handleVercelStream,
|
||||
};
|
||||
Reference in New Issue
Block a user