mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-06 17:35:30 +08:00
refactor: Extract DeepSeek SSE parsing logic to sse_parser module and centralize stream constants.
This commit is contained in:
43
core/constants.py
Normal file
43
core/constants.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""常量定义模块 - 统一管理项目中的所有常量"""
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# 网络和超时配置
|
||||
# ----------------------------------------------------------------------
|
||||
KEEP_ALIVE_TIMEOUT = 5 # 保活超时(秒)
|
||||
STREAM_IDLE_TIMEOUT = 30 # 流无新内容超时(秒)
|
||||
MAX_KEEPALIVE_COUNT = 10 # 最大连续 keepalive 次数
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# DeepSeek API 配置
|
||||
# ----------------------------------------------------------------------
|
||||
DEEPSEEK_HOST = "chat.deepseek.com"
|
||||
DEEPSEEK_LOGIN_URL = f"https://{DEEPSEEK_HOST}/api/v0/users/login"
|
||||
DEEPSEEK_CREATE_SESSION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat_session/create"
|
||||
DEEPSEEK_CREATE_POW_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/create_pow_challenge"
|
||||
DEEPSEEK_COMPLETION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/completion"
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# 请求头配置
|
||||
# ----------------------------------------------------------------------
|
||||
BASE_HEADERS = {
|
||||
"Host": "chat.deepseek.com",
|
||||
"User-Agent": "DeepSeek/1.6.11 Android/35",
|
||||
"Accept": "application/json",
|
||||
"Accept-Encoding": "gzip",
|
||||
"Content-Type": "application/json",
|
||||
"x-client-platform": "android",
|
||||
"x-client-version": "1.6.11",
|
||||
"x-client-locale": "zh_CN",
|
||||
"accept-charset": "UTF-8",
|
||||
}
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# SSE 解析配置
|
||||
# ----------------------------------------------------------------------
|
||||
# 跳过的路径模式(状态相关,不是内容)
|
||||
SKIP_PATTERNS = [
|
||||
"quasi_status", "elapsed_secs", "token_usage",
|
||||
"pending_fragment", "conversation_mode",
|
||||
"fragments/-1/status", "fragments/-2/status", "fragments/-3/status"
|
||||
]
|
||||
@@ -6,32 +6,21 @@ from fastapi import HTTPException
|
||||
|
||||
from .config import CONFIG, save_config, logger
|
||||
from .utils import get_account_identifier
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# DeepSeek 相关常量
|
||||
# ----------------------------------------------------------------------
|
||||
DEEPSEEK_HOST = "chat.deepseek.com"
|
||||
DEEPSEEK_LOGIN_URL = f"https://{DEEPSEEK_HOST}/api/v0/users/login"
|
||||
DEEPSEEK_CREATE_SESSION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat_session/create"
|
||||
DEEPSEEK_CREATE_POW_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/create_pow_challenge"
|
||||
DEEPSEEK_COMPLETION_URL = f"https://{DEEPSEEK_HOST}/api/v0/chat/completion"
|
||||
|
||||
BASE_HEADERS = {
|
||||
"Host": "chat.deepseek.com",
|
||||
"User-Agent": "DeepSeek/1.6.11 Android/35",
|
||||
"Accept": "application/json",
|
||||
"Accept-Encoding": "gzip",
|
||||
"Content-Type": "application/json",
|
||||
"x-client-platform": "android",
|
||||
"x-client-version": "1.6.11",
|
||||
"x-client-locale": "zh_CN",
|
||||
"accept-charset": "UTF-8",
|
||||
}
|
||||
from .constants import (
|
||||
DEEPSEEK_HOST,
|
||||
DEEPSEEK_LOGIN_URL,
|
||||
DEEPSEEK_CREATE_SESSION_URL,
|
||||
DEEPSEEK_CREATE_POW_URL,
|
||||
DEEPSEEK_COMPLETION_URL,
|
||||
BASE_HEADERS,
|
||||
)
|
||||
|
||||
|
||||
# get_account_identifier 已移至 core.utils
|
||||
|
||||
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# 登录函数:支持使用 email 或 mobile 登录
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
@@ -6,12 +6,9 @@
|
||||
|
||||
from typing import List, Tuple, Optional, Dict, Any
|
||||
|
||||
# 跳过的路径模式(状态相关,不是内容)
|
||||
SKIP_PATTERNS = [
|
||||
"quasi_status", "elapsed_secs", "token_usage",
|
||||
"pending_fragment", "conversation_mode",
|
||||
"fragments/-1/status", "fragments/-2/status", "fragments/-3/status"
|
||||
]
|
||||
from .constants import SKIP_PATTERNS
|
||||
|
||||
|
||||
|
||||
|
||||
def should_skip_chunk(chunk_path: str) -> bool:
|
||||
|
||||
312
routes/openai.py
312
routes/openai.py
@@ -28,17 +28,25 @@ from core.stream_parser import (
|
||||
extract_content_from_chunk,
|
||||
should_filter_citation,
|
||||
)
|
||||
from core.sse_parser import (
|
||||
parse_sse_chunk_for_content,
|
||||
extract_content_recursive,
|
||||
)
|
||||
from core.constants import (
|
||||
KEEP_ALIVE_TIMEOUT,
|
||||
STREAM_IDLE_TIMEOUT,
|
||||
MAX_KEEPALIVE_COUNT,
|
||||
)
|
||||
from core.messages import messages_prepare
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
# 添加保活超时配置(5秒)
|
||||
KEEP_ALIVE_TIMEOUT = 5
|
||||
|
||||
# 预编译正则表达式(性能优化)
|
||||
_CITATION_PATTERN = re.compile(r"^\[citation:")
|
||||
|
||||
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# 路由:/v1/models
|
||||
# ----------------------------------------------------------------------
|
||||
@@ -120,10 +128,7 @@ async def chat_completions(request: Request):
|
||||
)
|
||||
|
||||
def sse_stream():
|
||||
# 智能超时配置
|
||||
STREAM_IDLE_TIMEOUT = 30 # 无新内容超时(秒)
|
||||
MAX_KEEPALIVE_COUNT = 10 # 最大连续 keepalive 次数
|
||||
|
||||
# 使用导入的常量(不再本地定义)
|
||||
try:
|
||||
final_text = ""
|
||||
final_thinking = ""
|
||||
@@ -135,280 +140,85 @@ async def chat_completions(request: Request):
|
||||
has_content = False # 是否收到过内容
|
||||
|
||||
def process_data():
|
||||
"""处理 DeepSeek SSE 数据流 - 使用 sse_parser 模块"""
|
||||
nonlocal has_content
|
||||
ptype = "text"
|
||||
current_fragment_type = "thinking" if thinking_enabled else "text" # 追踪当前活跃的 fragment 类型
|
||||
current_fragment_type = "thinking" if thinking_enabled else "text"
|
||||
logger.info(f"[sse_stream] 开始处理数据流, session_id={session_id}")
|
||||
|
||||
try:
|
||||
for raw_line in deepseek_resp.iter_lines():
|
||||
# 解码行
|
||||
try:
|
||||
line = raw_line.decode("utf-8")
|
||||
except Exception as e:
|
||||
logger.warning(f"[sse_stream] 解码失败: {e}")
|
||||
error_type = "thinking" if ptype == "thinking" else "text"
|
||||
busy_content_str = f'{{"choices":[{{"index":0,"delta":{{"content":"解码失败,请稍候再试","type":"{error_type}"}}}}],"model":"","chunk_token_usage":1,"created":0,"message_id":-1,"parent_id":-1}}'
|
||||
try:
|
||||
busy_content = json.loads(busy_content_str)
|
||||
result_queue.put(busy_content)
|
||||
except json.JSONDecodeError:
|
||||
result_queue.put({"choices": [{"index": 0, "delta": {"content": "解码失败", "type": "text"}}]})
|
||||
result_queue.put({"choices": [{"index": 0, "delta": {"content": "解码失败,请稍候再试", "type": "text"}}]})
|
||||
result_queue.put(None)
|
||||
break
|
||||
|
||||
if not line:
|
||||
continue
|
||||
if line.startswith("data:"):
|
||||
data_str = line[5:].strip()
|
||||
if data_str == "[DONE]":
|
||||
|
||||
if not line.startswith("data:"):
|
||||
continue
|
||||
|
||||
data_str = line[5:].strip()
|
||||
if data_str == "[DONE]":
|
||||
result_queue.put(None)
|
||||
break
|
||||
|
||||
try:
|
||||
chunk = json.loads(data_str)
|
||||
|
||||
# 检测内容审核/敏感词阻止
|
||||
if "error" in chunk or chunk.get("code") == "content_filter":
|
||||
logger.warning(f"[sse_stream] 检测到内容过滤: {chunk}")
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "content_filter"}]})
|
||||
result_queue.put(None)
|
||||
break
|
||||
try:
|
||||
chunk = json.loads(data_str)
|
||||
|
||||
# 检测内容审核/敏感词阻止
|
||||
if "error" in chunk or chunk.get("code") == "content_filter":
|
||||
logger.warning(f"[sse_stream] 检测到内容过滤: {chunk}")
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "content_filter"}]})
|
||||
result_queue.put(None)
|
||||
return
|
||||
|
||||
logger.info(f"[sse_stream] RAW 原始chunk: {data_str[:300]}")
|
||||
print(f"[DEBUG] RAW: {data_str[:300]}", flush=True)
|
||||
|
||||
# 写入原始 chunk 到日志文件
|
||||
with open("/tmp/ds2api_debug.log", "a") as f:
|
||||
f.write(f"[MAIN] chunk_path={chunk.get('p', '')}, v_type={type(chunk.get('v')).__name__}, chunk={str(chunk)[:300]}\n")
|
||||
|
||||
if "v" in chunk:
|
||||
v_value = chunk["v"]
|
||||
content = ""
|
||||
chunk_path = chunk.get("p", "")
|
||||
|
||||
if chunk_path == "response/search_status":
|
||||
continue
|
||||
|
||||
# 跳过所有状态相关的 chunk(不是内容)
|
||||
# 注意:response/status 是真正的结束信号,需要特殊处理(后面的代码会处理)
|
||||
# 但 response/fragments/-1/status 等需要跳过
|
||||
skip_patterns = [
|
||||
"quasi_status", "elapsed_secs", "token_usage",
|
||||
"pending_fragment", "conversation_mode",
|
||||
"fragments/-1/status", "fragments/-2/status" # 搜索片段状态
|
||||
]
|
||||
if any(kw in chunk_path for kw in skip_patterns):
|
||||
continue
|
||||
|
||||
# 检查是否是真正的响应结束信号
|
||||
if chunk_path == "response/status" and isinstance(v_value, str) and v_value == "FINISHED":
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]})
|
||||
result_queue.put(None)
|
||||
return
|
||||
|
||||
# 检测 fragment 类型变化(来自直接的 fragments 路径或 BATCH 操作)
|
||||
new_fragment_type = current_fragment_type
|
||||
|
||||
# 检测 BATCH APPEND 格式: {'p': 'response', 'o': 'BATCH', 'v': [...]}
|
||||
if chunk_path == "response" and isinstance(v_value, list):
|
||||
for batch_item in v_value:
|
||||
if isinstance(batch_item, dict) and batch_item.get("p") == "fragments" and batch_item.get("o") == "APPEND":
|
||||
fragments = batch_item.get("v", [])
|
||||
for frag in fragments:
|
||||
if isinstance(frag, dict):
|
||||
frag_type = frag.get("type", "").upper()
|
||||
if frag_type == "THINK" or frag_type == "THINKING":
|
||||
new_fragment_type = "thinking"
|
||||
elif frag_type == "RESPONSE":
|
||||
new_fragment_type = "text"
|
||||
|
||||
# 也检测直接的 fragments 路径
|
||||
if "response/fragments" in chunk_path and isinstance(v_value, list):
|
||||
for frag in v_value:
|
||||
if isinstance(frag, dict):
|
||||
frag_type = frag.get("type", "").upper()
|
||||
if frag_type == "THINK" or frag_type == "THINKING":
|
||||
new_fragment_type = "thinking"
|
||||
elif frag_type == "RESPONSE":
|
||||
new_fragment_type = "text"
|
||||
|
||||
# 确定当前内容类型
|
||||
if chunk_path == "response/thinking_content":
|
||||
ptype = "thinking"
|
||||
elif chunk_path == "response/content":
|
||||
ptype = "text"
|
||||
elif "response/fragments" in chunk_path and "/content" in chunk_path:
|
||||
# 如 response/fragments/-1/content - 使用当前 fragment 类型
|
||||
ptype = current_fragment_type
|
||||
elif "response/fragments" in chunk_path:
|
||||
# fragments 的类型由内层 type 决定,默认用之前的 ptype
|
||||
pass
|
||||
elif not chunk_path:
|
||||
# 空路径内容:使用当前活跃的 fragment 类型
|
||||
if thinking_enabled:
|
||||
ptype = current_fragment_type
|
||||
else:
|
||||
ptype = "text"
|
||||
|
||||
# 更新 current_fragment_type 供后续处理使用
|
||||
current_fragment_type = new_fragment_type
|
||||
|
||||
logger.info(f"[sse_stream] ptype={ptype}, current_fragment_type={current_fragment_type}, chunk_path='{chunk_path}', v_type={type(v_value).__name__}, v={str(v_value)[:100]}")
|
||||
if isinstance(v_value, str):
|
||||
# 检查是否是 FINISHED 状态
|
||||
# 只有当 chunk_path 为空或为 "status" 时才认为是真正的结束
|
||||
# 搜索模型会发送 "response/fragments/-1/status": "FINISHED" 表示搜索片段完成,不是响应结束
|
||||
if v_value == "FINISHED" and (not chunk_path or chunk_path == "status"):
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]})
|
||||
result_queue.put(None)
|
||||
return
|
||||
content = v_value
|
||||
if content:
|
||||
has_content = True
|
||||
elif isinstance(v_value, list):
|
||||
# DeepSeek 可能发送嵌套列表格式
|
||||
# 需要递归提取内容
|
||||
def extract_content_recursive(items, default_type="text"):
|
||||
"""递归提取列表中的内容"""
|
||||
extracted = []
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
||||
item_p = item.get("p", "")
|
||||
item_v = item.get("v")
|
||||
|
||||
# 写入调试日志 - 显示完整的 item
|
||||
with open("/tmp/ds2api_debug.log", "a") as f:
|
||||
f.write(f"[extract] full_item={str(item)[:200]}\n")
|
||||
|
||||
# 跳过搜索结果项(包含 url/title/snippet 的项目)
|
||||
if "url" in item and "title" in item:
|
||||
continue
|
||||
|
||||
# 跳过 quasi_status(搜索完成信号,不是响应完成)
|
||||
if item_p == "quasi_status":
|
||||
continue
|
||||
|
||||
# 跳过 accumulated_token_usage 和 has_pending_fragment
|
||||
if item_p in ("accumulated_token_usage", "has_pending_fragment"):
|
||||
continue
|
||||
|
||||
# 只有当 p="status" (精确匹配) 且 v="FINISHED" 才认为是真正结束
|
||||
if item_p == "status" and item_v == "FINISHED":
|
||||
return None # 信号结束
|
||||
|
||||
# 跳过搜索状态
|
||||
if item_p == "response/search_status":
|
||||
continue
|
||||
|
||||
# 直接处理包含 content 和 type 的项 (例如 {'id': 2, 'type': 'RESPONSE', 'content': '...'})
|
||||
if "content" in item and "type" in item:
|
||||
inner_type = item.get("type", "").upper()
|
||||
if inner_type == "THINK" or inner_type == "THINKING":
|
||||
final_type = "thinking"
|
||||
elif inner_type == "RESPONSE":
|
||||
final_type = "text"
|
||||
else:
|
||||
final_type = default_type
|
||||
content = item.get("content", "")
|
||||
if content:
|
||||
extracted.append((content, final_type))
|
||||
continue
|
||||
|
||||
# 确定类型(基于 p 字段)
|
||||
if "thinking" in item_p:
|
||||
content_type = "thinking"
|
||||
elif "content" in item_p or item_p == "response" or item_p == "fragments":
|
||||
content_type = "text"
|
||||
else:
|
||||
content_type = default_type
|
||||
|
||||
# 处理不同的 v 类型
|
||||
if isinstance(item_v, str):
|
||||
if item_v and item_v != "FINISHED":
|
||||
extracted.append((item_v, content_type))
|
||||
elif isinstance(item_v, list):
|
||||
# 内层可能是 [{"content": "text", "type": "THINK/RESPONSE", ...}] 格式
|
||||
for inner in item_v:
|
||||
if isinstance(inner, dict):
|
||||
# 检查内层的 type 字段
|
||||
inner_type = inner.get("type", "").upper()
|
||||
# DeepSeek 使用 THINK 而不是 THINKING
|
||||
if inner_type == "THINK" or inner_type == "THINKING":
|
||||
final_type = "thinking"
|
||||
elif inner_type == "RESPONSE":
|
||||
final_type = "text"
|
||||
else:
|
||||
final_type = content_type # 继承外层类型
|
||||
|
||||
content = inner.get("content", "")
|
||||
if content:
|
||||
extracted.append((content, final_type))
|
||||
elif isinstance(inner, str) and inner:
|
||||
extracted.append((inner, content_type))
|
||||
return extracted
|
||||
|
||||
result = extract_content_recursive(v_value, ptype)
|
||||
|
||||
if result is None:
|
||||
# FINISHED 信号
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]})
|
||||
result_queue.put(None)
|
||||
return
|
||||
|
||||
for content_text, content_type in result:
|
||||
if content_text:
|
||||
logger.debug(f"[sse_stream] 提取内容: {content_text[:30] if len(content_text) > 30 else content_text}")
|
||||
chunk = {
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"delta": {"content": content_text, "type": content_type}
|
||||
}],
|
||||
"model": "",
|
||||
"chunk_token_usage": len(content_text) // 4,
|
||||
"created": 0,
|
||||
"message_id": -1,
|
||||
"parent_id": -1
|
||||
}
|
||||
result_queue.put(chunk)
|
||||
has_content = True
|
||||
|
||||
continue
|
||||
|
||||
return
|
||||
|
||||
# 使用 sse_parser 模块解析内容
|
||||
contents, is_finished, new_fragment_type = parse_sse_chunk_for_content(
|
||||
chunk, thinking_enabled, current_fragment_type
|
||||
)
|
||||
current_fragment_type = new_fragment_type
|
||||
|
||||
if is_finished:
|
||||
result_queue.put({"choices": [{"index": 0, "finish_reason": "stop"}]})
|
||||
result_queue.put(None)
|
||||
return
|
||||
|
||||
# 处理提取的内容
|
||||
for content_text, content_type in contents:
|
||||
if content_text:
|
||||
has_content = True
|
||||
unified_chunk = {
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"delta": {"content": content, "type": ptype}
|
||||
"delta": {"content": content_text, "type": content_type}
|
||||
}],
|
||||
"model": "",
|
||||
"chunk_token_usage": len(content) // 4,
|
||||
"chunk_token_usage": len(content_text) // 4,
|
||||
"created": 0,
|
||||
"message_id": -1,
|
||||
"parent_id": -1
|
||||
}
|
||||
result_queue.put(unified_chunk)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"[sse_stream] 无法解析: {data_str}, 错误: {e}")
|
||||
error_type = "thinking" if ptype == "thinking" else "text"
|
||||
busy_content_str = f'{{"choices":[{{"index":0,"delta":{{"content":"解析失败,请稍候再试","type":"{error_type}"}}}}],"model":"","chunk_token_usage":1,"created":0,"message_id":-1,"parent_id":-1}}'
|
||||
try:
|
||||
busy_content = json.loads(busy_content_str)
|
||||
result_queue.put(busy_content)
|
||||
except json.JSONDecodeError:
|
||||
result_queue.put({"choices": [{"index": 0, "delta": {"content": "解析失败", "type": "text"}}]})
|
||||
result_queue.put(None)
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"[sse_stream] 无法解析: {data_str[:100]}, 错误: {e}")
|
||||
result_queue.put({"choices": [{"index": 0, "delta": {"content": "解析失败,请稍候再试", "type": "text"}}]})
|
||||
result_queue.put(None)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"[sse_stream] 错误: {e}")
|
||||
try:
|
||||
error_response = {"choices": [{"index": 0, "delta": {"content": "服务器错误,请稍候再试", "type": "text"}}]}
|
||||
result_queue.put(error_response)
|
||||
except Exception:
|
||||
pass
|
||||
result_queue.put({"choices": [{"index": 0, "delta": {"content": "服务器错误,请稍候再试", "type": "text"}}]})
|
||||
result_queue.put(None)
|
||||
finally:
|
||||
deepseek_resp.close()
|
||||
|
||||
|
||||
process_thread = threading.Thread(target=process_data)
|
||||
process_thread.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user