diff --git a/routes/openai.py b/routes/openai.py index 5063ec9..34301db 100644 --- a/routes/openai.py +++ b/routes/openai.py @@ -453,107 +453,84 @@ IMPORTANT: If calling tools, output ONLY the JSON. The response must start with def collect_data(): nonlocal result - ptype = "text" + current_fragment_type = "thinking" if thinking_enabled else "text" try: for raw_line in deepseek_resp.iter_lines(): - try: - line = raw_line.decode("utf-8") - except Exception as e: - logger.warning(f"[chat_completions] 解码失败: {e}") - if ptype == "thinking": - think_list.append("解码失败,请稍候再试") - else: - text_list.append("解码失败,请稍候再试") + chunk = parse_deepseek_sse_line(raw_line) + if not chunk: + continue + if chunk.get("type") == "done": data_queue.put(None) break - if not line: - continue - if line.startswith("data:"): - data_str = line[5:].strip() - if data_str == "[DONE]": - data_queue.put(None) - break - try: - chunk = json.loads(data_str) - if "v" in chunk: - v_value = chunk["v"] - if "p" in chunk and chunk.get("p") == "response/search_status": - continue - if "p" in chunk and chunk.get("p") == "response/thinking_content": - ptype = "thinking" - elif "p" in chunk and chunk.get("p") == "response/content": - ptype = "text" - if isinstance(v_value, str): - if search_enabled and v_value.startswith("[citation:"): - continue - if ptype == "thinking": - think_list.append(v_value) - else: - text_list.append(v_value) - elif isinstance(v_value, list): - for item in v_value: - if item.get("p") == "status" and item.get("v") == "FINISHED": - final_reasoning = "".join(think_list) - final_content = "".join(text_list) - prompt_tokens = len(final_prompt) // 4 - reasoning_tokens = len(final_reasoning) // 4 - completion_tokens = len(final_content) // 4 - - # 检测工具调用 - detected_tools = [] - finish_reason = "stop" - if has_tools: - detected_tools = parse_tool_calls(final_content, [{"name": t.get("function", t).get("name")} for t in tools_requested]) - if detected_tools: - finish_reason = "tool_calls" - - # 构建 message 对象 - message_obj = { - "role": "assistant", - "content": final_content if not detected_tools else None, - } - # 只有启用思考模式时才包含 reasoning_content - if thinking_enabled and final_reasoning: - message_obj["reasoning_content"] = final_reasoning - # 添加工具调用 - if detected_tools: - tool_calls_data = format_openai_tool_calls(detected_tools) - message_obj["tool_calls"] = tool_calls_data - message_obj["content"] = None - - result = { - "id": completion_id, - "object": "chat.completion", - "created": created_time, - "model": model, - "choices": [{ - "index": 0, - "message": message_obj, - "finish_reason": finish_reason, - }], - "usage": { - "prompt_tokens": prompt_tokens, - "completion_tokens": reasoning_tokens + completion_tokens, - "total_tokens": prompt_tokens + reasoning_tokens + completion_tokens, - "completion_tokens_details": {"reasoning_tokens": reasoning_tokens}, - }, - } - data_queue.put("DONE") - return - except Exception as e: - logger.warning(f"[collect_data] 无法解析: {data_str}, 错误: {e}") - if ptype == "thinking": - think_list.append("解析失败,请稍候再试") + try: + contents, is_finished, new_fragment_type = parse_sse_chunk_for_content( + chunk, thinking_enabled, current_fragment_type + ) + current_fragment_type = new_fragment_type + if is_finished: + final_reasoning = "".join(think_list) + final_content = "".join(text_list) + prompt_tokens = len(final_prompt) // 4 + reasoning_tokens = len(final_reasoning) // 4 + completion_tokens = len(final_content) // 4 + + # 检测工具调用 + detected_tools = [] + finish_reason = "stop" + if has_tools: + detected_tools = parse_tool_calls(final_content, [{"name": t.get("function", t).get("name")} for t in tools_requested]) + if detected_tools: + finish_reason = "tool_calls" + + # 构建 message 对象 + message_obj = { + "role": "assistant", + "content": final_content if not detected_tools else None, + } + # 只有启用思考模式时才包含 reasoning_content + if thinking_enabled and final_reasoning: + message_obj["reasoning_content"] = final_reasoning + # 添加工具调用 + if detected_tools: + tool_calls_data = format_openai_tool_calls(detected_tools) + message_obj["tool_calls"] = tool_calls_data + message_obj["content"] = None + + result = { + "id": completion_id, + "object": "chat.completion", + "created": created_time, + "model": model, + "choices": [{ + "index": 0, + "message": message_obj, + "finish_reason": finish_reason, + }], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": reasoning_tokens + completion_tokens, + "total_tokens": prompt_tokens + reasoning_tokens + completion_tokens, + "completion_tokens_details": {"reasoning_tokens": reasoning_tokens}, + }, + } + data_queue.put("DONE") + return + + for content_text, content_type in contents: + if should_filter_citation(content_text, search_enabled): + continue + if content_type == "thinking": + think_list.append(content_text) else: - text_list.append("解析失败,请稍候再试") + text_list.append(content_text) + except Exception as e: + logger.warning(f"[collect_data] 无法解析: {chunk}, 错误: {e}") + text_list.append("解析失败,请稍候再试") data_queue.put(None) break except Exception as e: logger.warning(f"[collect_data] 错误: {e}") - if ptype == "thinking": - think_list.append("处理失败,请稍候再试") - else: - text_list.append("处理失败,请稍候再试") + text_list.append("处理失败,请稍候再试") data_queue.put(None) finally: deepseek_resp.close()