diff --git a/core/auth.py b/core/auth.py index 20c5762..f665890 100644 --- a/core/auth.py +++ b/core/auth.py @@ -58,17 +58,35 @@ def get_queue_status() -> dict: # ---------------------------------------------------------------------- # 账号选择与释放 - 轮询(Round-Robin)策略 # ---------------------------------------------------------------------- -def choose_new_account(exclude_ids=None): +def choose_new_account(exclude_ids=None, target_id=None): """轮询选择策略: 1. 使用线程锁保证并发安全 - 2. 优先选择队首的有 token 账号 - 3. 从队列头部取出账号(FIFO) - 4. 请求完成后调用 release_account 将账号放回队尾 + 2. 如果指定了 target_id,优先尝试获取该账号 + 3. 优先选择队首的有 token 账号 + 4. 从队列头部取出账号(FIFO) + 5. 请求完成后调用 release_account 将账号放回队尾 """ if exclude_ids is None: exclude_ids = [] with _queue_lock: + # 0. 如果指定了目标账号,优先尝试获取 + if target_id: + for i in range(len(account_queue)): + acc = account_queue[i] + acc_id = get_account_identifier(acc) + if acc_id == target_id: + selected = account_queue.pop(i) + in_use_accounts[acc_id] = selected + logger.info(f"[choose_new_account] 指定选择: {acc_id} | 队列剩余: {len(account_queue)}") + return selected + # 如果队列中没找到,且不在 in_use 中,说明账号不存在 + if target_id not in in_use_accounts: + logger.warning(f"[choose_new_account] 指定账号不存在: {target_id}") + else: + logger.warning(f"[choose_new_account] 指定账号正忙: {target_id}") + return None + # 第一轮:优先选择已有 token 的账号 for i in range(len(account_queue)): acc = account_queue[i] @@ -145,11 +163,17 @@ def determine_mode_and_token(request: Request): if caller_key in config_keys: request.state.use_config_token = True request.state.tried_accounts = [] # 初始化已尝试账号 - selected_account = choose_new_account() + + target_account = request.headers.get("X-Ds2-Target-Account") + selected_account = choose_new_account(target_id=target_account) + if not selected_account: + detail_msg = "No accounts configured or all accounts are busy." + if target_account: + detail_msg = f"Target account {target_account} is busy or not found." raise HTTPException( status_code=429, - detail="No accounts configured or all accounts are busy.", + detail=detail_msg, ) if not selected_account.get("token", "").strip(): try: diff --git a/core/config.py b/core/config.py index 83e02ca..f215691 100644 --- a/core/config.py +++ b/core/config.py @@ -32,9 +32,14 @@ logger = logging.getLogger("ds2api") # -------------------------- 初始化 tokenizer -------------------------- chat_tokenizer_dir = resolve_path("DS2API_TOKENIZER_DIR", "") +# 抑制 Mistral tokenizer regex 警告(不影响 DeepSeek tokenization) +_tf_logger = logging.getLogger("transformers") +_tf_log_level = _tf_logger.level +_tf_logger.setLevel(logging.ERROR) tokenizer = transformers.AutoTokenizer.from_pretrained( chat_tokenizer_dir, trust_remote_code=True ) +_tf_logger.setLevel(_tf_log_level) # ---------------------------------------------------------------------- # 配置文件的读写函数 diff --git a/core/sse_parser.py b/core/sse_parser.py index 481c872..e1c234e 100644 --- a/core/sse_parser.py +++ b/core/sse_parser.py @@ -255,6 +255,26 @@ def parse_sse_chunk_for_content( return ([], True, new_fragment_type) contents.extend(result) + # 处理字典值(初始响应 chunk,包含 response.fragments) + elif isinstance(v_value, dict): + response_obj = v_value.get("response", v_value) + fragments = response_obj.get("fragments", []) + if isinstance(fragments, list): + for frag in fragments: + if isinstance(frag, dict): + frag_type = frag.get("type", "").upper() + frag_content = frag.get("content", "") + if frag_type == "THINK" or frag_type == "THINKING": + new_fragment_type = "thinking" + if frag_content: + contents.append((frag_content, "thinking")) + elif frag_type == "RESPONSE": + new_fragment_type = "text" + if frag_content: + contents.append((frag_content, "text")) + elif frag_content: + contents.append((frag_content, ptype)) + return (contents, False, new_fragment_type) diff --git a/routes/admin/vercel.py b/routes/admin/vercel.py index 91b03d3..cb365b5 100644 --- a/routes/admin/vercel.py +++ b/routes/admin/vercel.py @@ -2,8 +2,10 @@ """Admin Vercel 模块 - Vercel 同步和部署""" import asyncio import base64 +import hashlib import json import os +import time as _time import httpx from fastapi import APIRouter, HTTPException, Request, Depends @@ -23,6 +25,19 @@ VERCEL_PROJECT_ID = os.getenv("VERCEL_PROJECT_ID", "") VERCEL_TEAM_ID = os.getenv("VERCEL_TEAM_ID", "") +def _compute_config_hash() -> str: + """计算可同步配置的指纹哈希(仅包含 keys 和 accounts)""" + syncable = { + "keys": CONFIG.get("keys", []), + "accounts": [ + {k: v for k, v in acc.items() if k != "token"} + for acc in CONFIG.get("accounts", []) + ], + } + raw = json.dumps(syncable, sort_keys=True, ensure_ascii=False, separators=(",", ":")) + return hashlib.md5(raw.encode("utf-8")).hexdigest() + + # ---------------------------------------------------------------------- # API 测试(通过本地 API) # ---------------------------------------------------------------------- @@ -228,6 +243,10 @@ async def sync_to_vercel(request: Request, _: bool = Depends(verify_admin)): if deploy_resp.status_code in [200, 201]: deploy_data = deploy_resp.json() + # 记录同步哈希和时间 + CONFIG["_vercel_sync_hash"] = _compute_config_hash() + CONFIG["_vercel_sync_time"] = int(_time.time()) + save_config(CONFIG) result = { "success": True, "message": "配置已同步,正在重新部署...", @@ -240,6 +259,10 @@ async def sync_to_vercel(request: Request, _: bool = Depends(verify_admin)): result["saved_credentials"] = saved_credentials return JSONResponse(content=result) + # 环境变量已更新,但无法自动触发重新部署 + CONFIG["_vercel_sync_hash"] = _compute_config_hash() + CONFIG["_vercel_sync_time"] = int(_time.time()) + save_config(CONFIG) result = { "success": True, "message": "配置已同步到 Vercel,请手动触发重新部署", @@ -259,6 +282,25 @@ async def sync_to_vercel(request: Request, _: bool = Depends(verify_admin)): raise HTTPException(status_code=500, detail=str(e)) +# ---------------------------------------------------------------------- +# 同步状态查询 +# ---------------------------------------------------------------------- +@router.get("/vercel/status") +async def get_vercel_sync_status(_: bool = Depends(verify_admin)): + """检查当前配置与上次同步到 Vercel 的配置是否一致""" + last_hash = CONFIG.get("_vercel_sync_hash", "") + last_time = CONFIG.get("_vercel_sync_time", 0) + current_hash = _compute_config_hash() + + synced = bool(last_hash and last_hash == current_hash) + + return JSONResponse(content={ + "synced": synced, + "last_sync_time": last_time if last_time else None, + "has_synced_before": bool(last_hash), + }) + + # ---------------------------------------------------------------------- # 导出配置 # ---------------------------------------------------------------------- diff --git a/routes/claude.py b/routes/claude.py index 706c022..e9c7b25 100644 --- a/routes/claude.py +++ b/routes/claude.py @@ -314,7 +314,7 @@ Remember: Output ONLY the JSON, no other text. The response must start with {{ a deepseek_resp.close() except Exception: pass - cleanup_account(request) + # 注意:不在此处调用 cleanup_account,由外层 finally 统一处理 return StreamingResponse( claude_sse_stream(), diff --git a/routes/openai.py b/routes/openai.py index 5ba60f2..2225581 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -194,6 +194,7 @@ IMPORTANT: If calling tools, output ONLY the JSON. The response must start with last_content_time = time.time() # 最后收到有效内容的时间 keepalive_count = 0 # 连续 keepalive 计数 has_content = False # 是否收到过内容 + stream_finished = False # 是否已发送过结束标记 def process_data(): """处理 DeepSeek SSE 数据流 - 使用 sse_parser 模块""" @@ -343,6 +344,7 @@ IMPORTANT: If calling tools, output ONLY the JSON. The response must start with yield f"data: {json.dumps(finish_chunk, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" last_send_time = current_time + stream_finished = True break new_choices = [] @@ -391,8 +393,8 @@ IMPORTANT: If calling tools, output ONLY the JSON. The response must start with except queue.Empty: continue - # 如果是超时退出,也发送结束标记 - if has_content: + # 如果是超时退出且尚未发送结束标记,补发结束标记 + if has_content and not stream_finished: prompt_tokens = len(final_prompt) // 4 thinking_tokens = len(final_thinking) // 4 completion_tokens = len(final_text) // 4 @@ -435,8 +437,7 @@ IMPORTANT: If calling tools, output ONLY the JSON. The response must start with except Exception as e: logger.error(f"[sse_stream] 异常: {e}") - finally: - cleanup_account(request) + # 注意:不在此处调用 cleanup_account,由外层 finally 统一处理 return StreamingResponse( sse_stream(), diff --git a/static/admin/.gitkeep b/static/admin/.gitkeep deleted file mode 100644 index 8b13789..0000000 --- a/static/admin/.gitkeep +++ /dev/null @@ -1 +0,0 @@ - diff --git a/webui/src/components/ApiTester.jsx b/webui/src/components/ApiTester.jsx index 87fc7d9..b464aec 100644 --- a/webui/src/components/ApiTester.jsx +++ b/webui/src/components/ApiTester.jsx @@ -14,7 +14,9 @@ import { ChevronDown, ShieldCheck, Terminal, - Zap + Zap, + ToggleLeft, + ToggleRight } from 'lucide-react' import clsx from 'clsx' import { useI18n } from '../i18n' @@ -31,6 +33,7 @@ export default function ApiTester({ config, onMessage, authFetch }) { const [streamingContent, setStreamingContent] = useState('') const [streamingThinking, setStreamingThinking] = useState('') const [isStreaming, setIsStreaming] = useState(false) + const [streamingMode, setStreamingMode] = useState(true) const abortControllerRef = useRef(null) const defaultMessageRef = useRef(defaultMessage) @@ -55,7 +58,7 @@ export default function ApiTester({ config, onMessage, authFetch }) { setIsStreaming(false) } - const directTest = async () => { + const runTest = async () => { if (loading) return setLoading(true) @@ -75,69 +78,78 @@ export default function ApiTester({ config, onMessage, authFetch }) { return } + const headers = { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${key}`, + } + if (selectedAccount) { + headers['X-Ds2-Target-Account'] = selectedAccount + } + const res = await fetch('/v1/chat/completions', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${key}`, - }, + headers, body: JSON.stringify({ model, messages: [{ role: 'user', content: message }], - stream: true, + stream: streamingMode, }), signal: abortControllerRef.current.signal, }) if (!res.ok) { - const data = await res.json() - setResponse({ success: false, error: data.error?.message || t('apiTester.requestFailed') }) - onMessage('error', data.error?.message || t('apiTester.requestFailed')) + const data = await res.json().catch(() => ({})) + const errorMsg = data.error?.message || t('apiTester.requestFailed') + setResponse({ success: false, error: errorMsg }) + onMessage('error', errorMsg) setLoading(false) setIsStreaming(false) return } - setResponse({ success: true, status_code: res.status }) + if (streamingMode) { + setResponse({ success: true, status_code: res.status }) - const reader = res.body.getReader() - const decoder = new TextDecoder() - let buffer = '' + const reader = res.body.getReader() + const decoder = new TextDecoder() + let buffer = '' - while (true) { - const { done, value } = await reader.read() - if (done) break + while (true) { + const { done, value } = await reader.read() + if (done) break - buffer += decoder.decode(value, { stream: true }) - const lines = buffer.split('\n') - buffer = lines.pop() || '' + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' - for (const line of lines) { - const trimmed = line.trim() - if (!trimmed || !trimmed.startsWith('data: ')) continue + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed || !trimmed.startsWith('data: ')) continue - const dataStr = trimmed.slice(6) - if (dataStr === '[DONE]') continue + const dataStr = trimmed.slice(6) + if (dataStr === '[DONE]') continue - try { - const json = JSON.parse(dataStr) - console.log('[ApiTester] Parsed JSON:', json) - const choice = json.choices?.[0] - if (choice?.delta) { - const delta = choice.delta - console.log('[ApiTester] Delta:', delta) - if (delta.reasoning_content) { - setStreamingThinking(prev => prev + delta.reasoning_content) - } - if (delta.content) { - console.log('[ApiTester] Content:', delta.content) - setStreamingContent(prev => prev + delta.content) + try { + const json = JSON.parse(dataStr) + const choice = json.choices?.[0] + if (choice?.delta) { + const delta = choice.delta + if (delta.reasoning_content) { + setStreamingThinking(prev => prev + delta.reasoning_content) + } + if (delta.content) { + setStreamingContent(prev => prev + delta.content) + } } + } catch (e) { + console.error('Invalid JSON hunk:', dataStr, e) } - } catch (e) { - console.error('Invalid JSON hunk:', dataStr, e) } } + } else { + const data = await res.json() + setResponse({ success: true, status_code: res.status, ...data }) + onMessage('success', t('apiTester.testSuccess', { account: selectedAccount || 'Auto', time: 'N/A' })) } } catch (e) { if (e.name === 'AbortError') { @@ -153,256 +165,235 @@ export default function ApiTester({ config, onMessage, authFetch }) { } } - const sendTest = async () => { - if (selectedAccount) { - setLoading(true) - setResponse(null) - try { - const res = await apiFetch('/admin/accounts/test', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - identifier: selectedAccount, - model, - message, - }), - }) - const data = await res.json() - setResponse({ - success: data.success, - status_code: res.status, - response: data, - account: selectedAccount, - }) - if (data.success) { - onMessage('success', t('apiTester.testSuccess', { account: selectedAccount, time: data.response_time })) - } else { - onMessage('error', `${selectedAccount}: ${data.message}`) - } - } catch (e) { - onMessage('error', t('apiTester.networkError', { error: e.message })) - setResponse({ error: e.message }) - } finally { - setLoading(false) - } - return - } +useEffect(() => { + setMessage((prev) => (prev === defaultMessageRef.current ? defaultMessage : prev)) + defaultMessageRef.current = defaultMessage +}, [defaultMessage]) - directTest() - } - - useEffect(() => { - setMessage((prev) => (prev === defaultMessageRef.current ? defaultMessage : prev)) - defaultMessageRef.current = defaultMessage - }, [defaultMessage]) - - return ( -
- {/* Configuration Panel */} -
-
- {/* Mobile Toggle Header */} - - -
-
- -
- {models.map(m => { - const Icon = m.icon - return ( - - ) - })} -
-
- -
- -
- - -
-
- -
- - setApiKey(e.target.value)} - /> +return ( +
+ {/* Configuration Panel */} +
+
+ {/* Mobile Toggle Header */} +
-
- - {/* Chat Interface */} -
- - {/* Messages Area */} -
- {/* User Message */} -
-
- -
-
-
- {message} -
-
+
+
+ - {/* AI Response */} - {(response || isStreaming) && ( -
-
- -
-
-
- - DeepSeek - - {response && ( - +
+ +
+ {models.map(m => { + const Icon = m.icon + return ( +
- )} -
- - {/* Input Area */} -
-
-