mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-02 07:25:26 +08:00
Previously retry/continue requests reused the initial PoW header and lacked parent_message_id, causing them to land as disconnected root messages in the DeepSeek session instead of proper follow-up turns. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
120 lines
3.5 KiB
Go
120 lines
3.5 KiB
Go
package sse
|
|
|
|
import (
|
|
"net/http"
|
|
"strings"
|
|
|
|
dsprotocol "ds2api/internal/deepseek/protocol"
|
|
"ds2api/internal/util"
|
|
)
|
|
|
|
// CollectResult holds the aggregated text and thinking content from a
|
|
// DeepSeek SSE stream, consumed to completion (non-streaming use case).
|
|
type CollectResult struct {
|
|
Text string
|
|
Thinking string
|
|
ToolDetectionThinking string
|
|
ContentFilter bool
|
|
CitationLinks map[int]string
|
|
ResponseMessageID int
|
|
}
|
|
|
|
// CollectStream fully consumes a DeepSeek SSE response and separates
|
|
// thinking content from text content. This replaces the duplicated
|
|
// stream-collection logic in openai.handleNonStream, claude.collectDeepSeek,
|
|
// and admin.testAccount.
|
|
//
|
|
// The caller is responsible for closing resp.Body unless closeBody is true.
|
|
func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) CollectResult {
|
|
if closeBody {
|
|
defer func() { _ = resp.Body.Close() }()
|
|
}
|
|
text := strings.Builder{}
|
|
thinking := strings.Builder{}
|
|
toolDetectionThinking := strings.Builder{}
|
|
contentFilter := false
|
|
stopped := false
|
|
collector := newCitationLinkCollector()
|
|
responseMessageID := 0
|
|
currentType := "text"
|
|
if thinkingEnabled {
|
|
currentType = "thinking"
|
|
}
|
|
_ = dsprotocol.ScanSSELines(resp, func(line []byte) bool {
|
|
chunk, done, parsed := ParseDeepSeekSSELine(line)
|
|
if parsed && !done {
|
|
collector.ingestChunk(chunk)
|
|
observeResponseMessageID(chunk, &responseMessageID)
|
|
}
|
|
if done {
|
|
return false
|
|
}
|
|
if stopped {
|
|
return true
|
|
}
|
|
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
|
|
currentType = result.NextType
|
|
if !result.Parsed {
|
|
return true
|
|
}
|
|
if result.Stop {
|
|
if result.ContentFilter {
|
|
contentFilter = true
|
|
}
|
|
// Keep scanning to collect late-arriving citation metadata lines
|
|
// that can appear after response/status=FINISHED, but stop as soon
|
|
// as [DONE] arrives.
|
|
stopped = true
|
|
return true
|
|
}
|
|
for _, p := range result.Parts {
|
|
if p.Type == "thinking" {
|
|
trimmed := TrimContinuationOverlap(thinking.String(), p.Text)
|
|
thinking.WriteString(trimmed)
|
|
} else {
|
|
trimmed := TrimContinuationOverlap(text.String(), p.Text)
|
|
text.WriteString(trimmed)
|
|
}
|
|
}
|
|
for _, p := range result.ToolDetectionThinkingParts {
|
|
trimmed := TrimContinuationOverlap(toolDetectionThinking.String(), p.Text)
|
|
toolDetectionThinking.WriteString(trimmed)
|
|
}
|
|
return true
|
|
})
|
|
return CollectResult{
|
|
Text: text.String(),
|
|
Thinking: thinking.String(),
|
|
ToolDetectionThinking: toolDetectionThinking.String(),
|
|
ContentFilter: contentFilter,
|
|
CitationLinks: collector.build(),
|
|
ResponseMessageID: responseMessageID,
|
|
}
|
|
}
|
|
|
|
// observeResponseMessageID extracts the response_message_id from a parsed SSE
|
|
// chunk. It mirrors the extraction logic in client_continue.go's observe
|
|
// method, checking top-level response_message_id, v.response.message_id, and
|
|
// message.response.message_id.
|
|
func observeResponseMessageID(chunk map[string]any, out *int) {
|
|
if chunk == nil || out == nil {
|
|
return
|
|
}
|
|
if id := util.IntFrom(chunk["response_message_id"]); id > 0 {
|
|
*out = id
|
|
}
|
|
v, _ := chunk["v"].(map[string]any)
|
|
if response, _ := v["response"].(map[string]any); response != nil {
|
|
if id := util.IntFrom(response["message_id"]); id > 0 {
|
|
*out = id
|
|
}
|
|
}
|
|
if message, _ := chunk["message"].(map[string]any); message != nil {
|
|
if response, _ := message["response"].(map[string]any); response != nil {
|
|
if id := util.IntFrom(response["message_id"]); id > 0 {
|
|
*out = id
|
|
}
|
|
}
|
|
}
|
|
}
|