feat: Refactor stream parsing to accurately distinguish thinking and response content based on DeepSeek stream formats and add comprehensive unit tests for various chunk types.

This commit is contained in:
CJACK
2026-02-01 04:57:14 +08:00
parent fc2175474b
commit 977aa22e14
2 changed files with 229 additions and 7 deletions

View File

@@ -238,6 +238,191 @@ class TestRegexPatterns(unittest.TestCase):
self.assertEqual(match.group(2), "http://example.com/image.png")
class TestStreamParsing(unittest.TestCase):
"""流式响应解析测试"""
def test_parse_simple_string_content(self):
"""测试简单字符串内容解析"""
# 模拟 DeepSeek V3 的简单字符串格式
chunk = {"v": "你好"}
v_value = chunk.get("v")
self.assertIsInstance(v_value, str)
self.assertEqual(v_value, "你好")
def test_parse_nested_list_content(self):
"""测试嵌套列表内容解析 (DeepSeek V3 格式)"""
# 模拟 DeepSeek V3 的嵌套列表格式
chunk = {
"p": "response/fragments",
"o": "APPEND",
"v": [
{"id": 1, "type": "RESPONSE", "content": "我是DeepSeek", "references": [], "stage_id": 1}
]
}
v_value = chunk.get("v")
self.assertIsInstance(v_value, list)
self.assertEqual(len(v_value), 1)
inner = v_value[0]
self.assertEqual(inner.get("type"), "RESPONSE")
self.assertEqual(inner.get("content"), "我是DeepSeek")
def test_parse_thinking_content(self):
"""测试 thinking 内容解析"""
# 模拟带有 THINK 类型的内容 (DeepSeek 使用 THINK 而不是 THINKING)
chunk = {
"p": "response/fragments",
"o": "APPEND",
"v": [
{"id": 1, "type": "THINK", "content": "让我思考一下...", "references": [], "stage_id": 1}
]
}
v_value = chunk.get("v")
inner = v_value[0]
inner_type = inner.get("type", "").upper()
self.assertEqual(inner_type, "THINK")
self.assertEqual(inner.get("content"), "让我思考一下...")
def test_parse_finished_status(self):
"""测试 FINISHED 状态解析"""
chunk = {"p": "response/status", "o": "SET", "v": "FINISHED"}
v_value = chunk.get("v")
self.assertEqual(v_value, "FINISHED")
def test_parse_batch_status(self):
"""测试批量状态解析"""
chunk = {
"p": "response",
"o": "BATCH",
"v": [
{"p": "accumulated_token_usage", "v": 54},
{"p": "quasi_status", "v": "FINISHED"}
]
}
v_value = chunk.get("v")
self.assertIsInstance(v_value, list)
# 检查是否包含 FINISHED 状态
has_finished = any(
item.get("p") == "quasi_status" and item.get("v") == "FINISHED"
for item in v_value if isinstance(item, dict)
)
self.assertTrue(has_finished)
def test_extract_content_from_nested_response(self):
"""测试从嵌套响应中提取内容"""
# 模拟完整的嵌套列表格式
items = [
{"p": "fragments", "o": "APPEND", "v": [
{"id": 1, "type": "RESPONSE", "content": "Hello", "references": []}
]},
{"p": "search_status", "v": "searching"}, # 应该被跳过
]
extracted = []
for item in items:
if not isinstance(item, dict):
continue
item_p = item.get("p", "")
item_v = item.get("v")
# 跳过搜索状态
if "search_status" in item_p:
continue
if isinstance(item_v, list):
for inner in item_v:
if isinstance(inner, dict):
content = inner.get("content", "")
if content:
inner_type = inner.get("type", "").upper()
extracted.append((content, inner_type))
self.assertEqual(len(extracted), 1)
self.assertEqual(extracted[0], ("Hello", "RESPONSE"))
def test_thinking_vs_text_classification(self):
"""测试 thinking 和 text 类型分类"""
# 测试不同路径的类型分类
test_cases = [
("response/thinking_content", "thinking"),
("response/content", "text"),
("response/fragments", "text"),
("", "text"), # 默认类型
]
for chunk_path, expected_type in test_cases:
if chunk_path == "response/thinking_content":
ptype = "thinking"
elif chunk_path == "response/content" or "response/fragments" in chunk_path:
ptype = "text"
else:
ptype = "text"
self.assertEqual(ptype, expected_type, f"Path '{chunk_path}' should be '{expected_type}'")
def test_handle_non_dict_items(self):
"""测试处理非字典类型的列表项"""
items = [
"plain string",
123,
None,
{"p": "content", "v": "valid"},
]
valid_items = [item for item in items if isinstance(item, dict)]
self.assertEqual(len(valid_items), 1)
self.assertEqual(valid_items[0].get("v"), "valid")
def test_empty_content_handling(self):
"""测试空内容处理"""
chunk = {"v": ""}
content = chunk.get("v", "")
# 空内容不应该被添加
self.assertFalse(bool(content))
def test_response_started_flag(self):
"""测试 response_started 标志逻辑"""
response_started = False
thinking_enabled = True
# 模拟处理流程
chunks = [
{"v": "思考中..."}, # thinking (before response)
{"p": "response/fragments", "v": [{"content": "回复"}]}, # response starts
{"v": "继续回复"}, # text (after response started)
]
results = []
for chunk in chunks:
chunk_path = chunk.get("p", "")
if "response/fragments" in chunk_path:
response_started = True
if not chunk_path:
if thinking_enabled and not response_started:
ptype = "thinking"
else:
ptype = "text"
else:
ptype = "text"
results.append((ptype, response_started))
self.assertEqual(results[0], ("thinking", False)) # 第一个是 thinking
self.assertEqual(results[1], ("text", True)) # response/fragments 后
self.assertEqual(results[2], ("text", True)) # 之后都是 text
if __name__ == "__main__":
# 设置环境变量避免配置警告
os.environ.setdefault("DS2API_CONFIG_PATH",