From e9905179707336e80ac003f5110f89eadfd887c9 Mon Sep 17 00:00:00 2001 From: CJACK Date: Sun, 1 Feb 2026 15:24:36 +0800 Subject: [PATCH] refactor: Extract DeepSeek SSE parsing logic to `sse_parser` module and centralize stream constants. --- core/constants.py | 43 +++++++ core/deepseek.py | 31 ++--- core/sse_parser.py | 9 +- routes/openai.py | 312 +++++++++------------------------------------ 4 files changed, 117 insertions(+), 278 deletions(-) create mode 100644 core/constants.py diff --git a/core/constants.py b/core/constants.py new file mode 100644 index 0000000..48c2577 --- /dev/null +++ b/core/constants.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +"""常量定义模块 - 统一管理项目中的所有常量""" + +# ---------------------------------------------------------------------- +# 网络和超时配置 +# ---------------------------------------------------------------------- +KEEP_ALIVE_TIMEOUT = 5 # 保活超时(秒) +STREAM_IDLE_TIMEOUT = 30 # 流无新内容超时(秒) +MAX_KEEPALIVE_COUNT = 10 # 最大连续 keepalive 次数 + +# ---------------------------------------------------------------------- +# DeepSeek API 配置 +# ---------------------------------------------------------------------- +DEEPSEEK_HOST = "chat.deepseek.com" +DEEPSEEK_LOGIN_URL = f"https://{DEEPSEEK_HOST}/api/v0/users/login" +DEEPSEEK_CREATE_SESSION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat_session/create" +DEEPSEEK_CREATE_POW_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/create_pow_challenge" +DEEPSEEK_COMPLETION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/completion" + +# ---------------------------------------------------------------------- +# 请求头配置 +# ---------------------------------------------------------------------- +BASE_HEADERS = { + "Host": "chat.deepseek.com", + "User-Agent": "DeepSeek/1.6.11 Android/35", + "Accept": "application/json", + "Accept-Encoding": "gzip", + "Content-Type": "application/json", + "x-client-platform": "android", + "x-client-version": "1.6.11", + "x-client-locale": "zh_CN", + "accept-charset": "UTF-8", +} + +# ---------------------------------------------------------------------- +# SSE 解析配置 +# ---------------------------------------------------------------------- +# 跳过的路径模式(状态相关,不是内容) +SKIP_PATTERNS = [ + "quasi_status", "elapsed_secs", "token_usage", + "pending_fragment", "conversation_mode", + "fragments/-1/status", "fragments/-2/status", "fragments/-3/status" +] diff --git a/core/deepseek.py b/core/deepseek.py index 041442d..76c75b1 100644 --- a/core/deepseek.py +++ b/core/deepseek.py @@ -6,32 +6,21 @@ from fastapi import HTTPException from .config import CONFIG, save_config, logger from .utils import get_account_identifier - -# ---------------------------------------------------------------------- -# DeepSeek 相关常量 -# ---------------------------------------------------------------------- -DEEPSEEK_HOST = "chat.deepseek.com" -DEEPSEEK_LOGIN_URL = f"https://{DEEPSEEK_HOST}/api/v0/users/login" -DEEPSEEK_CREATE_SESSION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat_session/create" -DEEPSEEK_CREATE_POW_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/create_pow_challenge" -DEEPSEEK_COMPLETION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/completion" - -BASE_HEADERS = { - "Host": "chat.deepseek.com", - "User-Agent": "DeepSeek/1.6.11 Android/35", - "Accept": "application/json", - "Accept-Encoding": "gzip", - "Content-Type": "application/json", - "x-client-platform": "android", - "x-client-version": "1.6.11", - "x-client-locale": "zh_CN", - "accept-charset": "UTF-8", -} +from .constants import ( + DEEPSEEK_HOST, + DEEPSEEK_LOGIN_URL, + DEEPSEEK_CREATE_SESSION_URL, + DEEPSEEK_CREATE_POW_URL, + DEEPSEEK_COMPLETION_URL, + BASE_HEADERS, +) # get_account_identifier 已移至 core.utils + + # ---------------------------------------------------------------------- # 登录函数:支持使用 email 或 mobile 登录 # ---------------------------------------------------------------------- diff --git a/core/sse_parser.py b/core/sse_parser.py index 276ca21..3bf422a 100644 --- a/core/sse_parser.py +++ b/core/sse_parser.py @@ -6,12 +6,9 @@ from typing import List, Tuple, Optional, Dict, Any -# 跳过的路径模式(状态相关,不是内容) -SKIP_PATTERNS = [ - "quasi_status", "elapsed_secs", "token_usage", - "pending_fragment", "conversation_mode", - "fragments/-1/status", "fragments/-2/status", "fragments/-3/status" -] +from .constants import SKIP_PATTERNS + + def should_skip_chunk(chunk_path: str) -> bool: diff --git a/routes/openai.py b/routes/openai.py index 66cbb4c..771df5b 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -28,17 +28,25 @@ from core.stream_parser import ( extract_content_from_chunk, should_filter_citation, ) +from core.sse_parser import ( + parse_sse_chunk_for_content, + extract_content_recursive, +) +from core.constants import ( + KEEP_ALIVE_TIMEOUT, + STREAM_IDLE_TIMEOUT, + MAX_KEEPALIVE_COUNT, +) from core.messages import messages_prepare router = APIRouter() -# 添加保活超时配置(5秒) -KEEP_ALIVE_TIMEOUT = 5 - # 预编译正则表达式(性能优化) _CITATION_PATTERN = re.compile(r"^\[citation:") + + # ---------------------------------------------------------------------- # 路由:/v1/models # ---------------------------------------------------------------------- @@ -120,10 +128,7 @@ async def chat_completions(request: Request): ) def sse_stream(): - # 智能超时配置 - STREAM_IDLE_TIMEOUT = 30 # 无新内容超时(秒) - MAX_KEEPALIVE_COUNT = 10 # 最大连续 keepalive 次数 - + # 使用导入的常量(不再本地定义) try: final_text = "" final_thinking = "" @@ -135,280 +140,85 @@ async def chat_completions(request: Request): has_content = False # 是否收到过内容 def process_data(): + """处理 DeepSeek SSE 数据流 - 使用 sse_parser 模块""" nonlocal has_content - ptype = "text" - current_fragment_type = "thinking" if thinking_enabled else "text" # 追踪当前活跃的 fragment 类型 + current_fragment_type = "thinking" if thinking_enabled else "text" logger.info(f"[sse_stream] 开始处理数据流, session_id={session_id}") + try: for raw_line in deepseek_resp.iter_lines(): + # 解码行 try: line = raw_line.decode("utf-8") except Exception as e: logger.warning(f"[sse_stream] 解码失败: {e}") - error_type = "thinking" if ptype == "thinking" else "text" - busy_content_str = f'{{"choices":[{{"index":0,"delta":{{"content":"解码失败,请稍候再试","type":"{error_type}"}}}}],"model":"","chunk_token_usage":1,"created":0,"message_id":-1,"parent_id":-1}}' - try: - busy_content = json.loads(busy_content_str) - result_queue.put(busy_content) - except json.JSONDecodeError: - result_queue.put({"choices": [{"index": 0, "delta": {"content": "解码失败", "type": "text"}}]}) + result_queue.put({"choices": [{"index": 0, "delta": {"content": "解码失败,请稍候再试", "type": "text"}}]}) result_queue.put(None) break + if not line: continue - if line.startswith("data:"): - data_str = line[5:].strip() - if data_str == "[DONE]": + + if not line.startswith("data:"): + continue + + data_str = line[5:].strip() + if data_str == "[DONE]": + result_queue.put(None) + break + + try: + chunk = json.loads(data_str) + + # 检测内容审核/敏感词阻止 + if "error" in chunk or chunk.get("code") == "content_filter": + logger.warning(f"[sse_stream] 检测到内容过滤: {chunk}") + result_queue.put({"choices": [{"index": 0, "finish_reason": "content_filter"}]}) result_queue.put(None) - break - try: - chunk = json.loads(data_str) - - # 检测内容审核/敏感词阻止 - if "error" in chunk or chunk.get("code") == "content_filter": - logger.warning(f"[sse_stream] 检测到内容过滤: {chunk}") - result_queue.put({"choices": [{"index": 0, "finish_reason": "content_filter"}]}) - result_queue.put(None) - return - - logger.info(f"[sse_stream] RAW 原始chunk: {data_str[:300]}") - print(f"[DEBUG] RAW: {data_str[:300]}", flush=True) - - # 写入原始 chunk 到日志文件 - with open("/tmp/ds2api_debug.log", "a") as f: - f.write(f"[MAIN] chunk_path={chunk.get('p', '')}, v_type={type(chunk.get('v')).__name__}, chunk={str(chunk)[:300]}\n") - - if "v" in chunk: - v_value = chunk["v"] - content = "" - chunk_path = chunk.get("p", "") - - if chunk_path == "response/search_status": - continue - - # 跳过所有状态相关的 chunk(不是内容) - # 注意:response/status 是真正的结束信号,需要特殊处理(后面的代码会处理) - # 但 response/fragments/-1/status 等需要跳过 - skip_patterns = [ - "quasi_status", "elapsed_secs", "token_usage", - "pending_fragment", "conversation_mode", - "fragments/-1/status", "fragments/-2/status" # 搜索片段状态 - ] - if any(kw in chunk_path for kw in skip_patterns): - continue - - # 检查是否是真正的响应结束信号 - if chunk_path == "response/status" and isinstance(v_value, str) and v_value == "FINISHED": - result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) - result_queue.put(None) - return - - # 检测 fragment 类型变化(来自直接的 fragments 路径或 BATCH 操作) - new_fragment_type = current_fragment_type - - # 检测 BATCH APPEND 格式: {'p': 'response', 'o': 'BATCH', 'v': [...]} - if chunk_path == "response" and isinstance(v_value, list): - for batch_item in v_value: - if isinstance(batch_item, dict) and batch_item.get("p") == "fragments" and batch_item.get("o") == "APPEND": - fragments = batch_item.get("v", []) - for frag in fragments: - if isinstance(frag, dict): - frag_type = frag.get("type", "").upper() - if frag_type == "THINK" or frag_type == "THINKING": - new_fragment_type = "thinking" - elif frag_type == "RESPONSE": - new_fragment_type = "text" - - # 也检测直接的 fragments 路径 - if "response/fragments" in chunk_path and isinstance(v_value, list): - for frag in v_value: - if isinstance(frag, dict): - frag_type = frag.get("type", "").upper() - if frag_type == "THINK" or frag_type == "THINKING": - new_fragment_type = "thinking" - elif frag_type == "RESPONSE": - new_fragment_type = "text" - - # 确定当前内容类型 - if chunk_path == "response/thinking_content": - ptype = "thinking" - elif chunk_path == "response/content": - ptype = "text" - elif "response/fragments" in chunk_path and "/content" in chunk_path: - # 如 response/fragments/-1/content - 使用当前 fragment 类型 - ptype = current_fragment_type - elif "response/fragments" in chunk_path: - # fragments 的类型由内层 type 决定,默认用之前的 ptype - pass - elif not chunk_path: - # 空路径内容:使用当前活跃的 fragment 类型 - if thinking_enabled: - ptype = current_fragment_type - else: - ptype = "text" - - # 更新 current_fragment_type 供后续处理使用 - current_fragment_type = new_fragment_type - - logger.info(f"[sse_stream] ptype={ptype}, current_fragment_type={current_fragment_type}, chunk_path='{chunk_path}', v_type={type(v_value).__name__}, v={str(v_value)[:100]}") - if isinstance(v_value, str): - # 检查是否是 FINISHED 状态 - # 只有当 chunk_path 为空或为 "status" 时才认为是真正的结束 - # 搜索模型会发送 "response/fragments/-1/status": "FINISHED" 表示搜索片段完成,不是响应结束 - if v_value == "FINISHED" and (not chunk_path or chunk_path == "status"): - result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) - result_queue.put(None) - return - content = v_value - if content: - has_content = True - elif isinstance(v_value, list): - # DeepSeek 可能发送嵌套列表格式 - # 需要递归提取内容 - def extract_content_recursive(items, default_type="text"): - """递归提取列表中的内容""" - extracted = [] - for item in items: - if not isinstance(item, dict): - continue - - item_p = item.get("p", "") - item_v = item.get("v") - - # 写入调试日志 - 显示完整的 item - with open("/tmp/ds2api_debug.log", "a") as f: - f.write(f"[extract] full_item={str(item)[:200]}\n") - - # 跳过搜索结果项(包含 url/title/snippet 的项目) - if "url" in item and "title" in item: - continue - - # 跳过 quasi_status(搜索完成信号,不是响应完成) - if item_p == "quasi_status": - continue - - # 跳过 accumulated_token_usage 和 has_pending_fragment - if item_p in ("accumulated_token_usage", "has_pending_fragment"): - continue - - # 只有当 p="status" (精确匹配) 且 v="FINISHED" 才认为是真正结束 - if item_p == "status" and item_v == "FINISHED": - return None # 信号结束 - - # 跳过搜索状态 - if item_p == "response/search_status": - continue - - # 直接处理包含 content 和 type 的项 (例如 {'id': 2, 'type': 'RESPONSE', 'content': '...'}) - if "content" in item and "type" in item: - inner_type = item.get("type", "").upper() - if inner_type == "THINK" or inner_type == "THINKING": - final_type = "thinking" - elif inner_type == "RESPONSE": - final_type = "text" - else: - final_type = default_type - content = item.get("content", "") - if content: - extracted.append((content, final_type)) - continue - - # 确定类型(基于 p 字段) - if "thinking" in item_p: - content_type = "thinking" - elif "content" in item_p or item_p == "response" or item_p == "fragments": - content_type = "text" - else: - content_type = default_type - - # 处理不同的 v 类型 - if isinstance(item_v, str): - if item_v and item_v != "FINISHED": - extracted.append((item_v, content_type)) - elif isinstance(item_v, list): - # 内层可能是 [{"content": "text", "type": "THINK/RESPONSE", ...}] 格式 - for inner in item_v: - if isinstance(inner, dict): - # 检查内层的 type 字段 - inner_type = inner.get("type", "").upper() - # DeepSeek 使用 THINK 而不是 THINKING - if inner_type == "THINK" or inner_type == "THINKING": - final_type = "thinking" - elif inner_type == "RESPONSE": - final_type = "text" - else: - final_type = content_type # 继承外层类型 - - content = inner.get("content", "") - if content: - extracted.append((content, final_type)) - elif isinstance(inner, str) and inner: - extracted.append((inner, content_type)) - return extracted - - result = extract_content_recursive(v_value, ptype) - - if result is None: - # FINISHED 信号 - result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) - result_queue.put(None) - return - - for content_text, content_type in result: - if content_text: - logger.debug(f"[sse_stream] 提取内容: {content_text[:30] if len(content_text) > 30 else content_text}") - chunk = { - "choices": [{ - "index": 0, - "delta": {"content": content_text, "type": content_type} - }], - "model": "", - "chunk_token_usage": len(content_text) // 4, - "created": 0, - "message_id": -1, - "parent_id": -1 - } - result_queue.put(chunk) - has_content = True - - continue - + return + + # 使用 sse_parser 模块解析内容 + contents, is_finished, new_fragment_type = parse_sse_chunk_for_content( + chunk, thinking_enabled, current_fragment_type + ) + current_fragment_type = new_fragment_type + + if is_finished: + result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]}) + result_queue.put(None) + return + + # 处理提取的内容 + for content_text, content_type in contents: + if content_text: + has_content = True unified_chunk = { "choices": [{ "index": 0, - "delta": {"content": content, "type": ptype} + "delta": {"content": content_text, "type": content_type} }], "model": "", - "chunk_token_usage": len(content) // 4, + "chunk_token_usage": len(content_text) // 4, "created": 0, "message_id": -1, "parent_id": -1 } result_queue.put(unified_chunk) - - except Exception as e: - logger.warning(f"[sse_stream] 无法解析: {data_str}, 错误: {e}") - error_type = "thinking" if ptype == "thinking" else "text" - busy_content_str = f'{{"choices":[{{"index":0,"delta":{{"content":"解析失败,请稍候再试","type":"{error_type}"}}}}],"model":"","chunk_token_usage":1,"created":0,"message_id":-1,"parent_id":-1}}' - try: - busy_content = json.loads(busy_content_str) - result_queue.put(busy_content) - except json.JSONDecodeError: - result_queue.put({"choices": [{"index": 0, "delta": {"content": "解析失败", "type": "text"}}]}) - result_queue.put(None) - break + except Exception as e: + logger.warning(f"[sse_stream] 无法解析: {data_str[:100]}, 错误: {e}") + result_queue.put({"choices": [{"index": 0, "delta": {"content": "解析失败,请稍候再试", "type": "text"}}]}) + result_queue.put(None) + break + except Exception as e: logger.warning(f"[sse_stream] 错误: {e}") - try: - error_response = {"choices": [{"index": 0, "delta": {"content": "服务器错误,请稍候再试", "type": "text"}}]} - result_queue.put(error_response) - except Exception: - pass + result_queue.put({"choices": [{"index": 0, "delta": {"content": "服务器错误,请稍候再试", "type": "text"}}]}) result_queue.put(None) finally: deepseek_resp.close() + process_thread = threading.Thread(target=process_data) process_thread.start()