diff --git a/core/sse_parser.py b/core/sse_parser.py new file mode 100644 index 0000000..276ca21 --- /dev/null +++ b/core/sse_parser.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +"""DeepSeek SSE 流解析模块 + +这个模块包含解析 DeepSeek SSE 响应的公共逻辑,供 openai.py 和 accounts.py 共用。 +""" + +from typing import List, Tuple, Optional, Dict, Any + +# 跳过的路径模式(状态相关,不是内容) +SKIP_PATTERNS = [ + "quasi_status", "elapsed_secs", "token_usage", + "pending_fragment", "conversation_mode", + "fragments/-1/status", "fragments/-2/status", "fragments/-3/status" +] + + +def should_skip_chunk(chunk_path: str) -> bool: + """判断是否应该跳过这个 chunk(状态相关,不是内容)""" + if chunk_path == "response/search_status": + return True + return any(kw in chunk_path for kw in SKIP_PATTERNS) + + +def is_response_finished(chunk_path: str, v_value: Any) -> bool: + """判断是否是响应结束信号""" + return chunk_path == "response/status" and isinstance(v_value, str) and v_value == "FINISHED" + + +def is_finished_signal(chunk_path: str, v_value: str) -> bool: + """判断字符串 v_value 是否是结束信号""" + return v_value == "FINISHED" and (not chunk_path or chunk_path == "status") + + +def is_search_result(item: dict) -> bool: + """判断是否是搜索结果项(url/title/snippet)""" + return "url" in item and "title" in item + + +def extract_content_from_item(item: dict, default_type: str = "text") -> Optional[Tuple[str, str]]: + """从包含 content 和 type 的项中提取内容 + + 返回 (content, content_type) 或 None + """ + if "content" in item and "type" in item: + inner_type = item.get("type", "").upper() + content = item.get("content", "") + if content: + if inner_type == "THINK" or inner_type == "THINKING": + return (content, "thinking") + elif inner_type == "RESPONSE": + return (content, "text") + else: + return (content, default_type) + return None + + +def extract_content_recursive(items: List[Dict], default_type: str = "text") -> Optional[List[Tuple[str, str]]]: + """递归提取列表中的内容 + + 返回 [(content, content_type), ...] 列表, + 如果遇到 FINISHED 信号返回 None + """ + extracted = [] + for item in items: + if not isinstance(item, dict): + continue + + item_p = item.get("p", "") + item_v = item.get("v") + + # 跳过搜索结果项 + if is_search_result(item): + continue + + # 只有当 p="status" (精确匹配) 且 v="FINISHED" 才认为是真正结束 + if item_p == "status" and item_v == "FINISHED": + return None # 信号结束 + + # 跳过状态相关 + if should_skip_chunk(item_p): + continue + + # 直接处理包含 content 和 type 的项 + result = extract_content_from_item(item, default_type) + if result: + extracted.append(result) + continue + + # 确定类型(基于 p 字段) + if "thinking" in item_p: + content_type = "thinking" + elif "content" in item_p or item_p == "response" or item_p == "fragments": + content_type = "text" + else: + content_type = default_type + + # 处理不同的 v 类型 + if isinstance(item_v, str): + if item_v and item_v != "FINISHED": + extracted.append((item_v, content_type)) + elif isinstance(item_v, list): + # 内层可能是 [{"content": "text", "type": "THINK/RESPONSE", ...}] 格式 + for inner in item_v: + if isinstance(inner, dict): + # 检查内层的 type 字段 + inner_type = inner.get("type", "").upper() + # DeepSeek 使用 THINK 而不是 THINKING + if inner_type == "THINK" or inner_type == "THINKING": + final_type = "thinking" + elif inner_type == "RESPONSE": + final_type = "text" + else: + final_type = content_type # 继承外层类型 + + content = inner.get("content", "") + if content: + extracted.append((content, final_type)) + elif isinstance(inner, str) and inner: + extracted.append((inner, content_type)) + return extracted + + +def parse_sse_chunk_for_content(chunk: dict, thinking_enabled: bool = False, + current_fragment_type: str = "thinking") -> Tuple[List[Tuple[str, str]], bool, str]: + """解析单个 SSE chunk 并提取内容 + + Args: + chunk: 解析后的 JSON chunk + thinking_enabled: 是否启用思考模式 + current_fragment_type: 当前活跃的 fragment 类型 ("thinking" 或 "text") + 用于处理没有明确路径的空 p 字段内容 + + Returns: + (contents, is_finished, new_fragment_type) + - contents: [(content, content_type), ...] 列表 + - is_finished: 是否是结束信号 + - new_fragment_type: 更新后的 fragment 类型,供下一个 chunk 使用 + """ + if "v" not in chunk: + return ([], False, current_fragment_type) + + v_value = chunk["v"] + chunk_path = chunk.get("p", "") + contents = [] + new_fragment_type = current_fragment_type + + # 跳过状态相关 chunk + if should_skip_chunk(chunk_path): + return ([], False, current_fragment_type) + + # 检查是否是真正的响应结束信号 + if is_response_finished(chunk_path, v_value): + return ([], True, current_fragment_type) + + # 检测 fragment 类型变化(来自 APPEND 操作) + # 格式: {'p': 'response', 'o': 'BATCH', 'v': [{'p': 'fragments', 'o': 'APPEND', 'v': [{'type': 'THINK/RESPONSE', ...}]}]} + if chunk_path == "response" and isinstance(v_value, list): + for batch_item in v_value: + if isinstance(batch_item, dict) and batch_item.get("p") == "fragments" and batch_item.get("o") == "APPEND": + fragments = batch_item.get("v", []) + for frag in fragments: + if isinstance(frag, dict): + frag_type = frag.get("type", "").upper() + if frag_type == "THINK" or frag_type == "THINKING": + new_fragment_type = "thinking" + elif frag_type == "RESPONSE": + new_fragment_type = "text" + + # 也检测直接的 fragments 路径 + if "response/fragments" in chunk_path and isinstance(v_value, list): + for frag in v_value: + if isinstance(frag, dict): + frag_type = frag.get("type", "").upper() + if frag_type == "THINK" or frag_type == "THINKING": + new_fragment_type = "thinking" + elif frag_type == "RESPONSE": + new_fragment_type = "text" + + # 确定当前内容类型 + if chunk_path == "response/thinking_content": + ptype = "thinking" + elif chunk_path == "response/content": + ptype = "text" + elif "response/fragments" in chunk_path and "/content" in chunk_path: + # 如 response/fragments/-1/content - 使用当前 fragment 类型 + ptype = new_fragment_type + elif not chunk_path: + # 空路径内容:使用当前活跃的 fragment 类型 + if thinking_enabled: + ptype = new_fragment_type + else: + ptype = "text" + else: + ptype = "text" + + # 处理字符串值 + if isinstance(v_value, str): + if is_finished_signal(chunk_path, v_value): + return ([], True, new_fragment_type) + if v_value: + contents.append((v_value, ptype)) + + # 处理列表值 + elif isinstance(v_value, list): + result = extract_content_recursive(v_value, ptype) + if result is None: + return ([], True, new_fragment_type) + contents.extend(result) + + return (contents, False, new_fragment_type) + diff --git a/routes/admin/accounts.py b/routes/admin/accounts.py index 4552b9c..f5b8b43 100644 --- a/routes/admin/accounts.py +++ b/routes/admin/accounts.py @@ -17,6 +17,7 @@ from core.deepseek import ( ) from core.pow import compute_pow_answer from core.models import get_model_config +from core.sse_parser import parse_sse_chunk_for_content from .auth import verify_admin @@ -242,6 +243,7 @@ async def test_account_api(account: dict, model: str = "deepseek-chat", message: thinking_parts = [] content_parts = [] + current_fragment_type = "thinking" if thinking_enabled else "text" for line in completion_resp.iter_lines(): if not line: @@ -260,54 +262,19 @@ async def test_account_api(account: dict, model: str = "deepseek-chat", message: try: chunk = json.loads(data_str) - if "v" in chunk: - v_value = chunk["v"] - path = chunk.get("p", "") - - if path == "response/search_status": - continue - - ptype = "thinking" if "thinking" in path else "text" - - if isinstance(v_value, str): - if v_value == "FINISHED": - break - if ptype == "thinking": - thinking_parts.append(v_value) - else: - content_parts.append(v_value) - elif isinstance(v_value, list): - # DeepSeek V3 嵌套列表格式处理 - for item in v_value: - if not isinstance(item, dict): - continue - if item.get("p") == "status" and item.get("v") == "FINISHED": - break - - item_p = item.get("p", "") - item_v = item.get("v") - - if item_p == "response/search_status": - continue - - itype = "thinking" if "thinking" in item_p else "text" - - # 处理不同的 v 类型 - if isinstance(item_v, str) and item_v: - if itype == "thinking": - thinking_parts.append(item_v) - else: - content_parts.append(item_v) - elif isinstance(item_v, list): - # 内层可能是 [{"content": "text", ...}] 格式 - for inner in item_v: - if isinstance(inner, dict): - content = inner.get("content", "") - if content: - if itype == "thinking": - thinking_parts.append(content) - else: - content_parts.append(content) + # 使用共享的解析函数 + contents, is_finished, current_fragment_type = parse_sse_chunk_for_content( + chunk, thinking_enabled, current_fragment_type + ) + + if is_finished: + break + + for content, ctype in contents: + if ctype == "thinking": + thinking_parts.append(content) + else: + content_parts.append(content) except: continue diff --git a/routes/openai.py b/routes/openai.py index 587ee92..66cbb4c 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -137,7 +137,7 @@ async def chat_completions(request: Request): def process_data(): nonlocal has_content ptype = "text" - response_started = False # 追踪是否已开始正式回复 + current_fragment_type = "thinking" if thinking_enabled else "text" # 追踪当前活跃的 fragment 类型 logger.info(f"[sse_stream] 开始处理数据流, session_id={session_id}") try: for raw_line in deepseek_resp.iter_lines(): @@ -203,33 +203,54 @@ async def chat_completions(request: Request): result_queue.put(None) return - # 检测是否开始正式回复 - # 只有当 fragments 包含 RESPONSE 类型时才认为开始正式回复 + # 检测 fragment 类型变化(来自直接的 fragments 路径或 BATCH 操作) + new_fragment_type = current_fragment_type + + # 检测 BATCH APPEND 格式: {'p': 'response', 'o': 'BATCH', 'v': [...]} + if chunk_path == "response" and isinstance(v_value, list): + for batch_item in v_value: + if isinstance(batch_item, dict) and batch_item.get("p") == "fragments" and batch_item.get("o") == "APPEND": + fragments = batch_item.get("v", []) + for frag in fragments: + if isinstance(frag, dict): + frag_type = frag.get("type", "").upper() + if frag_type == "THINK" or frag_type == "THINKING": + new_fragment_type = "thinking" + elif frag_type == "RESPONSE": + new_fragment_type = "text" + + # 也检测直接的 fragments 路径 if "response/fragments" in chunk_path and isinstance(v_value, list): for frag in v_value: - if isinstance(frag, dict) and frag.get("type", "").upper() == "RESPONSE": - response_started = True - break + if isinstance(frag, dict): + frag_type = frag.get("type", "").upper() + if frag_type == "THINK" or frag_type == "THINKING": + new_fragment_type = "thinking" + elif frag_type == "RESPONSE": + new_fragment_type = "text" - # 确定当前类型 + # 确定当前内容类型 if chunk_path == "response/thinking_content": ptype = "thinking" elif chunk_path == "response/content": ptype = "text" - response_started = True # 有 response/content 也意味着开始正式回复 + elif "response/fragments" in chunk_path and "/content" in chunk_path: + # 如 response/fragments/-1/content - 使用当前 fragment 类型 + ptype = current_fragment_type elif "response/fragments" in chunk_path: # fragments 的类型由内层 type 决定,默认用之前的 ptype pass elif not chunk_path: - # 没有 p 字段的内容: - # - reasoner 模式下,未开始正式回复前是 thinking - # - 开始正式回复后是 text - if thinking_enabled and not response_started: - ptype = "thinking" + # 空路径内容:使用当前活跃的 fragment 类型 + if thinking_enabled: + ptype = current_fragment_type else: ptype = "text" - logger.info(f"[sse_stream] ptype={ptype}, response_started={response_started}, chunk_path='{chunk_path}', v_type={type(v_value).__name__}, v={str(v_value)[:100]}") + # 更新 current_fragment_type 供后续处理使用 + current_fragment_type = new_fragment_type + + logger.info(f"[sse_stream] ptype={ptype}, current_fragment_type={current_fragment_type}, chunk_path='{chunk_path}', v_type={type(v_value).__name__}, v={str(v_value)[:100]}") if isinstance(v_value, str): # 检查是否是 FINISHED 状态 # 只有当 chunk_path 为空或为 "status" 时才认为是真正的结束 @@ -349,6 +370,7 @@ async def chat_completions(request: Request): } result_queue.put(chunk) has_content = True + continue unified_chunk = { @@ -363,6 +385,8 @@ async def chat_completions(request: Request): "parent_id": -1 } result_queue.put(unified_chunk) + + except Exception as e: logger.warning(f"[sse_stream] 无法解析: {data_str}, 错误: {e}") error_type = "thinking" if ptype == "thinking" else "text" @@ -567,6 +591,15 @@ async def chat_completions(request: Request): prompt_tokens = len(final_prompt) // 4 reasoning_tokens = len(final_reasoning) // 4 completion_tokens = len(final_content) // 4 + # 构建 message 对象 + message_obj = { + "role": "assistant", + "content": final_content, + } + # 只有启用思考模式时才包含 reasoning_content + if thinking_enabled and final_reasoning: + message_obj["reasoning_content"] = final_reasoning + result = { "id": completion_id, "object": "chat.completion", @@ -574,11 +607,7 @@ async def chat_completions(request: Request): "model": model, "choices": [{ "index": 0, - "message": { - "role": "assistant", - "content": final_content, - "reasoning_content": final_reasoning, - }, + "message": message_obj, "finish_reason": "stop", }], "usage": { @@ -613,6 +642,15 @@ async def chat_completions(request: Request): prompt_tokens = len(final_prompt) // 4 reasoning_tokens = len(final_reasoning) // 4 completion_tokens = len(final_content) // 4 + # 构建 message 对象 + message_obj = { + "role": "assistant", + "content": final_content, + } + # 只有启用思考模式时才包含 reasoning_content + if thinking_enabled and final_reasoning: + message_obj["reasoning_content"] = final_reasoning + result = { "id": completion_id, "object": "chat.completion", @@ -620,11 +658,7 @@ async def chat_completions(request: Request): "model": model, "choices": [{ "index": 0, - "message": { - "role": "assistant", - "content": final_content, - "reasoning_content": final_reasoning, - }, + "message": message_obj, "finish_reason": "stop", }], "usage": {