Files
ds2api/internal/sse/consumer.go
CJACK b82bc1311a fix: use parent_message_id and fresh PoW headers for empty-output retry and continue
Previously retry/continue requests reused the initial PoW header and
lacked parent_message_id, causing them to land as disconnected root
messages in the DeepSeek session instead of proper follow-up turns.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 21:31:51 +08:00

120 lines
3.5 KiB
Go

package sse
import (
"net/http"
"strings"
dsprotocol "ds2api/internal/deepseek/protocol"
"ds2api/internal/util"
)
// CollectResult holds the aggregated text and thinking content from a
// DeepSeek SSE stream, consumed to completion (non-streaming use case).
type CollectResult struct {
Text string
Thinking string
ToolDetectionThinking string
ContentFilter bool
CitationLinks map[int]string
ResponseMessageID int
}
// CollectStream fully consumes a DeepSeek SSE response and separates
// thinking content from text content. This replaces the duplicated
// stream-collection logic in openai.handleNonStream, claude.collectDeepSeek,
// and admin.testAccount.
//
// The caller is responsible for closing resp.Body unless closeBody is true.
func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) CollectResult {
if closeBody {
defer func() { _ = resp.Body.Close() }()
}
text := strings.Builder{}
thinking := strings.Builder{}
toolDetectionThinking := strings.Builder{}
contentFilter := false
stopped := false
collector := newCitationLinkCollector()
responseMessageID := 0
currentType := "text"
if thinkingEnabled {
currentType = "thinking"
}
_ = dsprotocol.ScanSSELines(resp, func(line []byte) bool {
chunk, done, parsed := ParseDeepSeekSSELine(line)
if parsed && !done {
collector.ingestChunk(chunk)
observeResponseMessageID(chunk, &responseMessageID)
}
if done {
return false
}
if stopped {
return true
}
result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType)
currentType = result.NextType
if !result.Parsed {
return true
}
if result.Stop {
if result.ContentFilter {
contentFilter = true
}
// Keep scanning to collect late-arriving citation metadata lines
// that can appear after response/status=FINISHED, but stop as soon
// as [DONE] arrives.
stopped = true
return true
}
for _, p := range result.Parts {
if p.Type == "thinking" {
trimmed := TrimContinuationOverlap(thinking.String(), p.Text)
thinking.WriteString(trimmed)
} else {
trimmed := TrimContinuationOverlap(text.String(), p.Text)
text.WriteString(trimmed)
}
}
for _, p := range result.ToolDetectionThinkingParts {
trimmed := TrimContinuationOverlap(toolDetectionThinking.String(), p.Text)
toolDetectionThinking.WriteString(trimmed)
}
return true
})
return CollectResult{
Text: text.String(),
Thinking: thinking.String(),
ToolDetectionThinking: toolDetectionThinking.String(),
ContentFilter: contentFilter,
CitationLinks: collector.build(),
ResponseMessageID: responseMessageID,
}
}
// observeResponseMessageID extracts the response_message_id from a parsed SSE
// chunk. It mirrors the extraction logic in client_continue.go's observe
// method, checking top-level response_message_id, v.response.message_id, and
// message.response.message_id.
func observeResponseMessageID(chunk map[string]any, out *int) {
if chunk == nil || out == nil {
return
}
if id := util.IntFrom(chunk["response_message_id"]); id > 0 {
*out = id
}
v, _ := chunk["v"].(map[string]any)
if response, _ := v["response"].(map[string]any); response != nil {
if id := util.IntFrom(response["message_id"]); id > 0 {
*out = id
}
}
if message, _ := chunk["message"].(map[string]any); message != nil {
if response, _ := message["response"].(map[string]any); response != nil {
if id := util.IntFrom(response["message_id"]); id > 0 {
*out = id
}
}
}
}