diff --git a/routes/openai.py b/routes/openai.py index 30430dd..960129f 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -137,6 +137,7 @@ async def chat_completions(request: Request): def process_data(): nonlocal has_content ptype = "text" + response_started = False # 追踪是否已开始正式回复 logger.info(f"[sse_stream] 开始处理数据流, session_id={session_id}") try: for raw_line in deepseek_resp.iter_lines(): @@ -170,17 +171,43 @@ async def chat_completions(request: Request): result_queue.put(None) return - # logger.debug(f"[sse_stream] 收到 chunk: {chunk}") + # logger.info(f"[sse_stream] RAW 原始chunk: {data_str[:300]}") if "v" in chunk: v_value = chunk["v"] content = "" - if "p" in chunk and chunk.get("p") == "response/search_status": + chunk_path = chunk.get("p", "") + + if chunk_path == "response/search_status": continue - if "p" in chunk and chunk.get("p") == "response/thinking_content": + + # 检测是否开始正式回复 + # 只有当 fragments 包含 RESPONSE 类型时才认为开始正式回复 + if "response/fragments" in chunk_path and isinstance(v_value, list): + for frag in v_value: + if isinstance(frag, dict) and frag.get("type", "").upper() == "RESPONSE": + response_started = True + break + + # 确定当前类型 + if chunk_path == "response/thinking_content": ptype = "thinking" - elif "p" in chunk and chunk.get("p") == "response/content": + elif chunk_path == "response/content": ptype = "text" + response_started = True # 有 response/content 也意味着开始正式回复 + elif "response/fragments" in chunk_path: + # fragments 的类型由内层 type 决定,默认用之前的 ptype + pass + elif not chunk_path: + # 没有 p 字段的内容: + # - reasoner 模式下,未开始正式回复前是 thinking + # - 开始正式回复后是 text + if thinking_enabled and not response_started: + ptype = "thinking" + else: + ptype = "text" + + # logger.info(f"[sse_stream] ptype={ptype}, response_started={response_started}, chunk_path='{chunk_path}', v_type={type(v_value).__name__}, v={str(v_value)[:50]}") if isinstance(v_value, str): # 检查是否是 FINISHED 状态 if v_value == "FINISHED": @@ -224,13 +251,23 @@ async def chat_completions(request: Request): if item_v and item_v != "FINISHED": extracted.append((item_v, content_type)) elif isinstance(item_v, list): - # 内层可能是 [{"content": "text", ...}] 格式 + # 内层可能是 [{"content": "text", "type": "THINK/RESPONSE", ...}] 格式 for inner in item_v: if isinstance(inner, dict): - # 直接提取 content 字段 + # 检查内层的 type 字段 + inner_type = inner.get("type", "").upper() + # logger.info(f"[sse_stream] 内层 type={inner_type}, content={str(inner.get('content', ''))[:50]}") + # DeepSeek 使用 THINK 而不是 THINKING + if inner_type == "THINK" or inner_type == "THINKING": + final_type = "thinking" + elif inner_type == "RESPONSE": + final_type = "text" + else: + final_type = content_type # 继承外层类型 + content = inner.get("content", "") if content: - extracted.append((content, content_type)) + extracted.append((content, final_type)) elif isinstance(inner, str) and inner: extracted.append((inner, content_type)) return extracted diff --git a/tests/test_unit.py b/tests/test_unit.py index 6bdf6b4..43e9068 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -238,6 +238,191 @@ class TestRegexPatterns(unittest.TestCase): self.assertEqual(match.group(2), "http://example.com/image.png") +class TestStreamParsing(unittest.TestCase): + """流式响应解析测试""" + + def test_parse_simple_string_content(self): + """测试简单字符串内容解析""" + # 模拟 DeepSeek V3 的简单字符串格式 + chunk = {"v": "你好"} + + v_value = chunk.get("v") + self.assertIsInstance(v_value, str) + self.assertEqual(v_value, "你好") + + def test_parse_nested_list_content(self): + """测试嵌套列表内容解析 (DeepSeek V3 格式)""" + # 模拟 DeepSeek V3 的嵌套列表格式 + chunk = { + "p": "response/fragments", + "o": "APPEND", + "v": [ + {"id": 1, "type": "RESPONSE", "content": "我是DeepSeek", "references": [], "stage_id": 1} + ] + } + + v_value = chunk.get("v") + self.assertIsInstance(v_value, list) + self.assertEqual(len(v_value), 1) + + inner = v_value[0] + self.assertEqual(inner.get("type"), "RESPONSE") + self.assertEqual(inner.get("content"), "我是DeepSeek") + + def test_parse_thinking_content(self): + """测试 thinking 内容解析""" + # 模拟带有 THINK 类型的内容 (DeepSeek 使用 THINK 而不是 THINKING) + chunk = { + "p": "response/fragments", + "o": "APPEND", + "v": [ + {"id": 1, "type": "THINK", "content": "让我思考一下...", "references": [], "stage_id": 1} + ] + } + + v_value = chunk.get("v") + inner = v_value[0] + + inner_type = inner.get("type", "").upper() + self.assertEqual(inner_type, "THINK") + self.assertEqual(inner.get("content"), "让我思考一下...") + + def test_parse_finished_status(self): + """测试 FINISHED 状态解析""" + chunk = {"p": "response/status", "o": "SET", "v": "FINISHED"} + + v_value = chunk.get("v") + self.assertEqual(v_value, "FINISHED") + + def test_parse_batch_status(self): + """测试批量状态解析""" + chunk = { + "p": "response", + "o": "BATCH", + "v": [ + {"p": "accumulated_token_usage", "v": 54}, + {"p": "quasi_status", "v": "FINISHED"} + ] + } + + v_value = chunk.get("v") + self.assertIsInstance(v_value, list) + + # 检查是否包含 FINISHED 状态 + has_finished = any( + item.get("p") == "quasi_status" and item.get("v") == "FINISHED" + for item in v_value if isinstance(item, dict) + ) + self.assertTrue(has_finished) + + def test_extract_content_from_nested_response(self): + """测试从嵌套响应中提取内容""" + # 模拟完整的嵌套列表格式 + items = [ + {"p": "fragments", "o": "APPEND", "v": [ + {"id": 1, "type": "RESPONSE", "content": "Hello", "references": []} + ]}, + {"p": "search_status", "v": "searching"}, # 应该被跳过 + ] + + extracted = [] + for item in items: + if not isinstance(item, dict): + continue + + item_p = item.get("p", "") + item_v = item.get("v") + + # 跳过搜索状态 + if "search_status" in item_p: + continue + + if isinstance(item_v, list): + for inner in item_v: + if isinstance(inner, dict): + content = inner.get("content", "") + if content: + inner_type = inner.get("type", "").upper() + extracted.append((content, inner_type)) + + self.assertEqual(len(extracted), 1) + self.assertEqual(extracted[0], ("Hello", "RESPONSE")) + + def test_thinking_vs_text_classification(self): + """测试 thinking 和 text 类型分类""" + # 测试不同路径的类型分类 + test_cases = [ + ("response/thinking_content", "thinking"), + ("response/content", "text"), + ("response/fragments", "text"), + ("", "text"), # 默认类型 + ] + + for chunk_path, expected_type in test_cases: + if chunk_path == "response/thinking_content": + ptype = "thinking" + elif chunk_path == "response/content" or "response/fragments" in chunk_path: + ptype = "text" + else: + ptype = "text" + + self.assertEqual(ptype, expected_type, f"Path '{chunk_path}' should be '{expected_type}'") + + def test_handle_non_dict_items(self): + """测试处理非字典类型的列表项""" + items = [ + "plain string", + 123, + None, + {"p": "content", "v": "valid"}, + ] + + valid_items = [item for item in items if isinstance(item, dict)] + self.assertEqual(len(valid_items), 1) + self.assertEqual(valid_items[0].get("v"), "valid") + + def test_empty_content_handling(self): + """测试空内容处理""" + chunk = {"v": ""} + + content = chunk.get("v", "") + # 空内容不应该被添加 + self.assertFalse(bool(content)) + + def test_response_started_flag(self): + """测试 response_started 标志逻辑""" + response_started = False + thinking_enabled = True + + # 模拟处理流程 + chunks = [ + {"v": "思考中..."}, # thinking (before response) + {"p": "response/fragments", "v": [{"content": "回复"}]}, # response starts + {"v": "继续回复"}, # text (after response started) + ] + + results = [] + for chunk in chunks: + chunk_path = chunk.get("p", "") + + if "response/fragments" in chunk_path: + response_started = True + + if not chunk_path: + if thinking_enabled and not response_started: + ptype = "thinking" + else: + ptype = "text" + else: + ptype = "text" + + results.append((ptype, response_started)) + + self.assertEqual(results[0], ("thinking", False)) # 第一个是 thinking + self.assertEqual(results[1], ("text", True)) # response/fragments 后 + self.assertEqual(results[2], ("text", True)) # 之后都是 text + + if __name__ == "__main__": # 设置环境变量避免配置警告 os.environ.setdefault("DS2API_CONFIG_PATH",