diff --git a/core/sse_parser.py b/core/sse_parser.py index 3bf422a..8963607 100644 --- a/core/sse_parser.py +++ b/core/sse_parser.py @@ -1,15 +1,55 @@ # -*- coding: utf-8 -*- """DeepSeek SSE 流解析模块 -这个模块包含解析 DeepSeek SSE 响应的公共逻辑,供 openai.py 和 accounts.py 共用。 +这个模块包含解析 DeepSeek SSE 响应的公共逻辑,供 openai.py、claude.py 和 accounts.py 共用。 +合并了原 sse_parser.py 和 stream_parser.py 的功能。 """ +import json +import re +from typing import List, Tuple, Optional, Dict, Any, Generator -from typing import List, Tuple, Optional, Dict, Any - +from .config import logger from .constants import SKIP_PATTERNS +# 预编译正则表达式 +_TOOL_CALL_PATTERN = re.compile(r'\{\s*["\']tool_calls["\']\s*:\s*\[(.*?)\]\s*\}', re.DOTALL) +_CITATION_PATTERN = re.compile(r"^\[citation:") +# ---------------------------------------------------------------------- +# 基础解析函数 +# ---------------------------------------------------------------------- + +def parse_deepseek_sse_line(raw_line: bytes) -> Optional[Dict[str, Any]]: + """解析 DeepSeek SSE 行 + + Args: + raw_line: 原始字节行 + + Returns: + 解析后的 chunk 字典,如果解析失败或应跳过则返回 None + """ + try: + line = raw_line.decode("utf-8") + except Exception as e: + logger.warning(f"[parse_deepseek_sse_line] 解码失败: {e}") + return None + + if not line or not line.startswith("data:"): + return None + + data_str = line[5:].strip() + + if data_str == "[DONE]": + return {"type": "done"} + + try: + chunk = json.loads(data_str) + return chunk + except json.JSONDecodeError as e: + logger.warning(f"[parse_deepseek_sse_line] JSON解析失败: {e}") + return None + def should_skip_chunk(chunk_path: str) -> bool: """判断是否应该跳过这个 chunk(状态相关,不是内容)""" @@ -33,6 +73,10 @@ def is_search_result(item: dict) -> bool: return "url" in item and "title" in item +# ---------------------------------------------------------------------- +# 内容提取函数 +# ---------------------------------------------------------------------- + def extract_content_from_item(item: dict, default_type: str = "text") -> Optional[Tuple[str, str]]: """从包含 content 和 type 的项中提取内容 @@ -57,7 +101,7 @@ def extract_content_recursive(items: List[Dict], default_type: str = "text") -> 返回 [(content, content_type), ...] 列表, 如果遇到 FINISHED 信号返回 None """ - extracted = [] + extracted: List[Tuple[str, str]] = [] for item in items: if not isinstance(item, dict): continue @@ -117,8 +161,15 @@ def extract_content_recursive(items: List[Dict], default_type: str = "text") -> return extracted -def parse_sse_chunk_for_content(chunk: dict, thinking_enabled: bool = False, - current_fragment_type: str = "thinking") -> Tuple[List[Tuple[str, str]], bool, str]: +# ---------------------------------------------------------------------- +# 高级解析函数 +# ---------------------------------------------------------------------- + +def parse_sse_chunk_for_content( + chunk: Dict[str, Any], + thinking_enabled: bool = False, + current_fragment_type: str = "thinking" +) -> Tuple[List[Tuple[str, str]], bool, str]: """解析单个 SSE chunk 并提取内容 Args: @@ -138,7 +189,7 @@ def parse_sse_chunk_for_content(chunk: dict, thinking_enabled: bool = False, v_value = chunk["v"] chunk_path = chunk.get("p", "") - contents = [] + contents: List[Tuple[str, str]] = [] new_fragment_type = current_fragment_type # 跳过状态相关 chunk @@ -206,3 +257,160 @@ def parse_sse_chunk_for_content(chunk: dict, thinking_enabled: bool = False, return (contents, False, new_fragment_type) + +def extract_content_from_chunk(chunk: Dict[str, Any]) -> Tuple[str, str, bool]: + """从 DeepSeek chunk 中提取内容(简化版本,兼容旧接口) + + Args: + chunk: 解析后的 chunk 字典 + + Returns: + (content, content_type, is_finished) 元组 + content_type 为 "thinking" 或 "text" + is_finished 为 True 表示响应结束 + """ + if chunk.get("type") == "done": + return "", "text", True + + # 检测内容审核/敏感词阻止 + if "error" in chunk or chunk.get("code") == "content_filter": + logger.warning(f"[extract_content_from_chunk] 检测到内容过滤: {chunk}") + return "", "text", True + + if "v" not in chunk: + return "", "text", False + + v_value = chunk["v"] + ptype = "text" + + # 检查路径确定类型 + path = chunk.get("p", "") + if path == "response/search_status": + return "", "text", False # 跳过搜索状态 + elif path == "response/thinking_content": + ptype = "thinking" + elif path == "response/content": + ptype = "text" + + if isinstance(v_value, str): + if v_value == "FINISHED": + return "", ptype, True + return v_value, ptype, False + elif isinstance(v_value, list): + for item in v_value: + if isinstance(item, dict): + if item.get("p") == "status" and item.get("v") == "FINISHED": + return "", ptype, True + return "", ptype, False + + return "", ptype, False + + +# ---------------------------------------------------------------------- +# 响应收集函数 +# ---------------------------------------------------------------------- + +def collect_deepseek_response(response: Any) -> Tuple[str, str]: + """收集 DeepSeek 流响应的完整内容 + + Args: + response: DeepSeek 流响应对象 + + Returns: + (reasoning_content, text_content) 元组 + """ + thinking_parts: List[str] = [] + text_parts: List[str] = [] + + try: + for raw_line in response.iter_lines(): + chunk = parse_deepseek_sse_line(raw_line) + if not chunk: + continue + + content, content_type, is_finished = extract_content_from_chunk(chunk) + + if is_finished: + break + + if content: + if content_type == "thinking": + thinking_parts.append(content) + else: + text_parts.append(content) + except Exception as e: + logger.error(f"[collect_deepseek_response] 收集响应失败: {e}") + finally: + try: + response.close() + except Exception: + pass + + return "".join(thinking_parts), "".join(text_parts) + + +# ---------------------------------------------------------------------- +# 工具调用解析 +# ---------------------------------------------------------------------- + +def parse_tool_calls(text: str, tools_requested: List[Dict]) -> List[Dict[str, Any]]: + """从响应文本中解析工具调用 + + Args: + text: 响应文本 + tools_requested: 请求中定义的工具列表 + + Returns: + 检测到的工具调用列表,每项包含 name 和 input + """ + detected_tools: List[Dict[str, Any]] = [] + cleaned_text = text.strip() + + # 尝试直接解析完整 JSON + if cleaned_text.startswith('{"tool_calls":') and cleaned_text.endswith("]}"): + try: + tool_data = json.loads(cleaned_text) + for tool_call in tool_data.get("tool_calls", []): + tool_name = tool_call.get("name") + tool_input = tool_call.get("input", {}) + if any(tool.get("name") == tool_name for tool in tools_requested): + detected_tools.append({"name": tool_name, "input": tool_input}) + if detected_tools: + return detected_tools + except json.JSONDecodeError: + pass + + # 使用正则匹配 + matches = _TOOL_CALL_PATTERN.findall(cleaned_text) + for match in matches: + try: + tool_calls_json = f'{{"tool_calls": [{match}]}}' + tool_data = json.loads(tool_calls_json) + for tool_call in tool_data.get("tool_calls", []): + tool_name = tool_call.get("name") + tool_input = tool_call.get("input", {}) + if any(tool.get("name") == tool_name for tool in tools_requested): + detected_tools.append({"name": tool_name, "input": tool_input}) + except json.JSONDecodeError: + continue + + return detected_tools + + +# ---------------------------------------------------------------------- +# 引用过滤 +# ---------------------------------------------------------------------- + +def should_filter_citation(text: str, search_enabled: bool) -> bool: + """检查是否应该过滤引用内容 + + Args: + text: 内容文本 + search_enabled: 是否启用搜索 + + Returns: + 是否应该过滤 + """ + if not search_enabled: + return False + return _CITATION_PATTERN.match(text) is not None diff --git a/core/stream_parser.py b/core/stream_parser.py deleted file mode 100644 index 2e2229b..0000000 --- a/core/stream_parser.py +++ /dev/null @@ -1,186 +0,0 @@ -# -*- coding: utf-8 -*- -"""流解析模块 - 处理 DeepSeek SSE 流响应""" -import json -import re - -from .config import logger - -# 预编译正则表达式 -_TOOL_CALL_PATTERN = re.compile(r'\{\s*["\']tool_calls["\']\s*:\s*\[(.*?)\]\s*\}', re.DOTALL) -_CITATION_PATTERN = re.compile(r"^\[citation:") - - -def parse_deepseek_sse_line(raw_line: bytes) -> dict | None: - """解析 DeepSeek SSE 行 - - Args: - raw_line: 原始字节行 - - Returns: - 解析后的 chunk 字典,如果解析失败或应跳过则返回 None - """ - try: - line = raw_line.decode("utf-8") - except Exception as e: - logger.warning(f"[parse_deepseek_sse_line] 解码失败: {e}") - return None - - if not line or not line.startswith("data:"): - return None - - data_str = line[5:].strip() - - if data_str == "[DONE]": - return {"type": "done"} - - try: - chunk = json.loads(data_str) - return chunk - except json.JSONDecodeError as e: - logger.warning(f"[parse_deepseek_sse_line] JSON解析失败: {e}") - return None - - -def extract_content_from_chunk(chunk: dict) -> tuple[str, str, bool]: - """从 DeepSeek chunk 中提取内容 - - Args: - chunk: 解析后的 chunk 字典 - - Returns: - (content, content_type, is_finished) 元组 - content_type 为 "thinking" 或 "text" - is_finished 为 True 表示响应结束 - """ - if chunk.get("type") == "done": - return "", "text", True - - # 检测内容审核/敏感词阻止 - if "error" in chunk or chunk.get("code") == "content_filter": - logger.warning(f"[extract_content_from_chunk] 检测到内容过滤: {chunk}") - return "", "text", True - - if "v" not in chunk: - return "", "text", False - - v_value = chunk["v"] - ptype = "text" - - # 检查路径确定类型 - path = chunk.get("p", "") - if path == "response/search_status": - return "", "text", False # 跳过搜索状态 - elif path == "response/thinking_content": - ptype = "thinking" - elif path == "response/content": - ptype = "text" - - if isinstance(v_value, str): - if v_value == "FINISHED": - return "", ptype, True - return v_value, ptype, False - elif isinstance(v_value, list): - for item in v_value: - if item.get("p") == "status" and item.get("v") == "FINISHED": - return "", ptype, True - return "", ptype, False - - return "", ptype, False - - -def collect_deepseek_response(response) -> tuple[str, str]: - """收集 DeepSeek 流响应的完整内容 - - Args: - response: DeepSeek 流响应对象 - - Returns: - (reasoning_content, text_content) 元组 - """ - thinking_parts = [] - text_parts = [] - - try: - for raw_line in response.iter_lines(): - chunk = parse_deepseek_sse_line(raw_line) - if not chunk: - continue - - content, content_type, is_finished = extract_content_from_chunk(chunk) - - if is_finished: - break - - if content: - if content_type == "thinking": - thinking_parts.append(content) - else: - text_parts.append(content) - except Exception as e: - logger.error(f"[collect_deepseek_response] 收集响应失败: {e}") - finally: - try: - response.close() - except Exception: - pass - - return "".join(thinking_parts), "".join(text_parts) - - -def parse_tool_calls(text: str, tools_requested: list) -> list[dict]: - """从响应文本中解析工具调用 - - Args: - text: 响应文本 - tools_requested: 请求中定义的工具列表 - - Returns: - 检测到的工具调用列表,每项包含 name 和 input - """ - detected_tools = [] - cleaned_text = text.strip() - - # 尝试直接解析完整 JSON - if cleaned_text.startswith('{"tool_calls":') and cleaned_text.endswith("]}"): - try: - tool_data = json.loads(cleaned_text) - for tool_call in tool_data.get("tool_calls", []): - tool_name = tool_call.get("name") - tool_input = tool_call.get("input", {}) - if any(tool.get("name") == tool_name for tool in tools_requested): - detected_tools.append({"name": tool_name, "input": tool_input}) - if detected_tools: - return detected_tools - except json.JSONDecodeError: - pass - - # 使用正则匹配 - matches = _TOOL_CALL_PATTERN.findall(cleaned_text) - for match in matches: - try: - tool_calls_json = f'{{"tool_calls": [{match}]}}' - tool_data = json.loads(tool_calls_json) - for tool_call in tool_data.get("tool_calls", []): - tool_name = tool_call.get("name") - tool_input = tool_call.get("input", {}) - if any(tool.get("name") == tool_name for tool in tools_requested): - detected_tools.append({"name": tool_name, "input": tool_input}) - except json.JSONDecodeError: - continue - - return detected_tools - - -def should_filter_citation(text: str, search_enabled: bool) -> bool: - """检查是否应该过滤引用内容 - - Args: - text: 内容文本 - search_enabled: 是否启用搜索 - - Returns: - 是否应该过滤 - """ - if not search_enabled: - return False - return _CITATION_PATTERN.match(text) is not None diff --git a/routes/claude.py b/routes/claude.py index cdb331f..706c022 100644 --- a/routes/claude.py +++ b/routes/claude.py @@ -20,12 +20,14 @@ from core.session_manager import ( cleanup_account, ) from core.models import get_model_config, get_claude_models_response -from core.stream_parser import ( +from core.sse_parser import ( parse_deepseek_sse_line, + parse_sse_chunk_for_content, extract_content_from_chunk, collect_deepseek_response, parse_tool_calls, ) +from core.constants import STREAM_IDLE_TIMEOUT from core.utils import estimate_tokens from core.messages import ( messages_prepare, @@ -36,6 +38,7 @@ from core.messages import ( router = APIRouter() + # ---------------------------------------------------------------------- # 通过 OpenAI 接口调用 Claude # ---------------------------------------------------------------------- @@ -201,9 +204,7 @@ Remember: Output ONLY the JSON, no other text. The response must start with {{ a if bool(req_data.get("stream", False)): def claude_sse_stream(): - # 智能超时配置 - STREAM_IDLE_TIMEOUT = 30 # 无新内容超时(秒) - + # 使用导入的常量(不再本地定义) try: message_id = f"msg_{int(time.time())}_{random.randint(1000, 9999)}" input_tokens = sum(len(str(m.get("content", ""))) for m in messages) // 4 @@ -212,6 +213,7 @@ Remember: Output ONLY the JSON, no other text. The response must start with {{ a last_content_time = time.time() has_content = False + for line in deepseek_resp.iter_lines(): current_time = time.time() diff --git a/routes/openai.py b/routes/openai.py index 771df5b..e4fe73b 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -23,14 +23,12 @@ from core.session_manager import ( cleanup_account, ) from core.models import get_model_config, get_openai_models_response -from core.stream_parser import ( - parse_deepseek_sse_line, - extract_content_from_chunk, - should_filter_citation, -) from core.sse_parser import ( + parse_deepseek_sse_line, parse_sse_chunk_for_content, + extract_content_from_chunk, extract_content_recursive, + should_filter_citation, ) from core.constants import ( KEEP_ALIVE_TIMEOUT, @@ -47,6 +45,8 @@ _CITATION_PATTERN = re.compile(r"^\[citation:") + + # ---------------------------------------------------------------------- # 路由:/v1/models # ---------------------------------------------------------------------- diff --git a/webui/src/App.jsx b/webui/src/App.jsx index 456b8a2..57e15a9 100644 --- a/webui/src/App.jsx +++ b/webui/src/App.jsx @@ -184,7 +184,7 @@ export default function App() { DS2API -
V1.0.0 管理面板
+在线管理面板