From fc2175474b9cdc8096ef81650db8e0dd8ade9243 Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 1 Feb 2026 04:25:24 +0800 Subject: [PATCH] feat: Implement DeepSeek V3 streaming response parsing to differentiate thinking and text content, and update Safari impersonation. --- routes/admin/accounts.py | 30 +++- routes/openai.py | 77 ++++++++- webui/src/components/ApiTester.jsx | 249 ++++++++++++++++++++--------- 3 files changed, 278 insertions(+), 78 deletions(-) diff --git a/routes/admin/accounts.py b/routes/admin/accounts.py index 7171430..4552b9c 100644 --- a/routes/admin/accounts.py +++ b/routes/admin/accounts.py @@ -231,7 +231,7 @@ async def test_account_api(account: dict, model: str = "deepseek-chat", message: DEEPSEEK_COMPLETION_URL, headers=completion_headers, json=payload, - impersonate="safari15_4", + impersonate="safari15_3", timeout=60, stream=True, ) @@ -277,9 +277,37 @@ async def test_account_api(account: dict, model: str = "deepseek-chat", message: else: content_parts.append(v_value) elif isinstance(v_value, list): + # DeepSeek V3 嵌套列表格式处理 for item in v_value: + if not isinstance(item, dict): + continue if item.get("p") == "status" and item.get("v") == "FINISHED": break + + item_p = item.get("p", "") + item_v = item.get("v") + + if item_p == "response/search_status": + continue + + itype = "thinking" if "thinking" in item_p else "text" + + # 处理不同的 v 类型 + if isinstance(item_v, str) and item_v: + if itype == "thinking": + thinking_parts.append(item_v) + else: + content_parts.append(item_v) + elif isinstance(item_v, list): + # 内层可能是 [{"content": "text", ...}] 格式 + for inner in item_v: + if isinstance(inner, dict): + content = inner.get("content", "") + if content: + if itype == "thinking": + thinking_parts.append(content) + else: + content_parts.append(content) except: continue diff --git a/routes/openai.py b/routes/openai.py index ffa3ef1..30430dd 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -137,6 +137,7 @@ async def chat_completions(request: Request): def process_data(): nonlocal has_content ptype = "text" + logger.info(f"[sse_stream] 开始处理数据流, session_id={session_id}") try: for raw_line in deepseek_resp.iter_lines(): try: @@ -169,6 +170,8 @@ async def chat_completions(request: Request): result_queue.put(None) return + # logger.debug(f"[sse_stream] 收到 chunk: {chunk}") + if "v" in chunk: v_value = chunk["v"] content = "" @@ -188,12 +191,76 @@ async def chat_completions(request: Request): if content: has_content = True elif isinstance(v_value, list): - for item in v_value: - if item.get("p") == "status" and item.get("v") == "FINISHED": - result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) - result_queue.put(None) - return + # DeepSeek 可能发送嵌套列表格式 + # 需要递归提取内容 + def extract_content_recursive(items, default_type="text"): + """递归提取列表中的内容""" + extracted = [] + for item in items: + if not isinstance(item, dict): + continue + + # 检查是否是 FINISHED 状态 + if item.get("p") == "status" and item.get("v") == "FINISHED": + return None # 信号结束 + + item_p = item.get("p", "") + item_v = item.get("v") + + # 跳过搜索状态 + if item_p == "response/search_status": + continue + + # 确定类型 + if "thinking" in item_p: + content_type = "thinking" + elif "content" in item_p or item_p == "response": + content_type = "text" + else: + content_type = default_type + + # 处理不同的 v 类型 + if isinstance(item_v, str): + if item_v and item_v != "FINISHED": + extracted.append((item_v, content_type)) + elif isinstance(item_v, list): + # 内层可能是 [{"content": "text", ...}] 格式 + for inner in item_v: + if isinstance(inner, dict): + # 直接提取 content 字段 + content = inner.get("content", "") + if content: + extracted.append((content, content_type)) + elif isinstance(inner, str) and inner: + extracted.append((inner, content_type)) + return extracted + + result = extract_content_recursive(v_value, ptype) + + if result is None: + # FINISHED 信号 + result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) + result_queue.put(None) + return + + for content_text, content_type in result: + if content_text: + logger.debug(f"[sse_stream] 提取内容: {content_text[:30] if len(content_text) > 30 else content_text}") + chunk = { + "choices": [{ + "index": 0, + "delta": {"content": content_text, "type": content_type} + }], + "model": "", + "chunk_token_usage": len(content_text) // 4, + "created": 0, + "message_id": -1, + "parent_id": -1 + } + result_queue.put(chunk) + has_content = True continue + unified_chunk = { "choices": [{ "index": 0, diff --git a/webui/src/components/ApiTester.jsx b/webui/src/components/ApiTester.jsx index 0aa39d0..ac07b48 100644 --- a/webui/src/components/ApiTester.jsx +++ b/webui/src/components/ApiTester.jsx @@ -1,4 +1,4 @@ -import { useState } from 'react' +import { useState, useRef, useEffect } from 'react' const MODELS = [ { id: 'deepseek-chat', name: 'deepseek-chat' }, @@ -14,6 +14,10 @@ export default function ApiTester({ config, onMessage, authFetch }) { const [selectedAccount, setSelectedAccount] = useState('') // 空为随机 const [response, setResponse] = useState(null) const [loading, setLoading] = useState(false) + const [streamingContent, setStreamingContent] = useState('') + const [streamingThinking, setStreamingThinking] = useState('') + const [isStreaming, setIsStreaming] = useState(false) + const abortControllerRef = useRef(null) // 使用 authFetch 或回退到普通 fetch(admin API 用 authFetch,OpenAI 兼容 API 用普通 fetch) const apiFetch = authFetch || fetch @@ -22,41 +26,35 @@ export default function ApiTester({ config, onMessage, authFetch }) { const accounts = config.accounts || [] const testApi = async () => { - setLoading(true) - setResponse(null) - try { - const res = await apiFetch('/admin/test', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - model, - message, - api_key: apiKey || (config.keys?.[0] || ''), - }), - }) - const data = await res.json() - setResponse(data) - if (data.success) { - onMessage('success', 'API 调用成功') - } else { - onMessage('error', data.error || 'API 调用失败') - } - } catch (e) { - onMessage('error', '网络错误') - setResponse({ error: e.message }) - } finally { - setLoading(false) + // ... (保留旧的 server-side test作为备用,或者完全移除?保留吧但不使用) + } + + const stopGeneration = () => { + if (abortControllerRef.current) { + abortControllerRef.current.abort() + abortControllerRef.current = null } + setLoading(false) + setIsStreaming(false) } const directTest = async () => { + if (loading) return + setLoading(true) + setIsStreaming(true) setResponse(null) + setStreamingContent('') + setStreamingThinking('') + + abortControllerRef.current = new AbortController() + try { const key = apiKey || (config.keys?.[0] || '') if (!key) { onMessage('error', '请提供 API Key') setLoading(false) + setIsStreaming(false) return } @@ -69,35 +67,82 @@ export default function ApiTester({ config, onMessage, authFetch }) { body: JSON.stringify({ model, messages: [{ role: 'user', content: message }], - stream: false, + stream: true, }), + signal: abortControllerRef.current.signal, }) - const data = await res.json() - setResponse({ - success: res.ok, - status_code: res.status, - response: data, - }) - if (res.ok) { - onMessage('success', 'API 调用成功') - } else { - onMessage('error', data.error || 'API 调用失败') + + if (!res.ok) { + const data = await res.json() + setResponse({ success: false, error: data.error?.message || '请求失败' }) + onMessage('error', data.error?.message || '请求失败') + setLoading(false) + setIsStreaming(false) + return + } + + setResponse({ success: true, status_code: res.status }) + + // 处理流式响应 + const reader = res.body.getReader() + const decoder = new TextDecoder() + let buffer = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed || !trimmed.startsWith('data: ')) continue + + const dataStr = trimmed.slice(6) + if (dataStr === '[DONE]') continue + + try { + const json = JSON.parse(dataStr) + const choice = json.choices?.[0] + if (choice?.delta) { + const delta = choice.delta + + // DeepSeek 官方格式使用 reasoning_content 表示思考内容 + if (delta.reasoning_content) { + setStreamingThinking(prev => prev + delta.reasoning_content) + } + // 正常内容 + if (delta.content) { + setStreamingContent(prev => prev + delta.content) + } + } + } catch (e) { + console.error('Invalid JSON hunk:', dataStr, e) + } + } } } catch (e) { - onMessage('error', '网络错误') - setResponse({ error: e.message }) + if (e.name === 'AbortError') { + onMessage('info', '已停止生成') + } else { + onMessage('error', '网络错误: ' + e.message) + setResponse({ error: e.message, success: false }) + } } finally { setLoading(false) + setIsStreaming(false) + abortControllerRef.current = null } } // 智能测试:根据是否选择账号决定测试方式 const sendTest = async () => { - setLoading(true) - setResponse(null) - - // 如果选择了指定账号,使用账号测试接口 + // 如果选择了指定账号,使用账号测试接口(暂时保持非流式,或者后续改为支持流式) if (selectedAccount) { + setLoading(true) + setResponse(null) try { const res = await apiFetch('/admin/accounts/test', { method: 'POST', @@ -129,7 +174,7 @@ export default function ApiTester({ config, onMessage, authFetch }) { return } - // 随机账号:使用标准 API + // 随机账号:使用标准 API (流式) directTest() } @@ -158,7 +203,7 @@ export default function ApiTester({ config, onMessage, authFetch }) { value={selectedAccount} onChange={e => setSelectedAccount(e.target.value)} > - + {accounts.map((acc, i) => { const id = acc.email || acc.mobile return @@ -184,59 +229,119 @@ export default function ApiTester({ config, onMessage, authFetch }) { value={message} onChange={e => setMessage(e.target.value)} placeholder="输入测试消息..." + rows={3} />
- + {loading && isStreaming ? ( + + ) : ( + + )}
- {response && ( + {(response || isStreaming) && (
响应结果 - - {response.success ? '成功' : '失败'} {response.status_code && `(${response.status_code})`} - -
-
- {JSON.stringify(response.response || response.error, null, 2)} + {response && ( + + {response.success ? '成功' : '失败'} {response.status_code && `(${response.status_code})`} + + )}
- {response.success && response.response?.choices?.[0]?.message?.content && ( + {/* 流式响应显示区域 */} + {(streamingContent || streamingThinking || isStreaming) && !selectedAccount ? (
-
AI 回复:
+ {streamingThinking && ( +
+
🤔 思考过程:
+
+ {streamingThinking} +
+
+ )} + +
🤖 AI 回复:
- {response.response.choices[0].message.content} + {streamingContent} + {isStreaming && |}
+ ) : ( + // 非流式响应显示(如JSON或指定账号测试结果) +
+ {JSON.stringify(response?.response || response?.error || {}, null, 2)} +
)} - {/* 指定账号测试的回复 */} - {response.success && response.response?.reply && ( -
-
AI 回复 ({response.account}):
-
- {response.response.reply} -
-
+ {/* 指定账号测试的特定显示 */} + {selectedAccount && response?.success && ( + <> + {response.response?.thinking && ( +
+
🤔 思考过程:
+
+ {response.response.thinking} +
+
+ )} + {response.response?.message && ( +
+
AI 回复 ({response.account}):
+
+ {response.response.message} +
+
+ )} + )}
)} + + ) }