From 6052a8d1e20b8fa6e60a49d82aff3c4b0094dc9b Mon Sep 17 00:00:00 2001 From: songguoliang <957057673@qq.com> Date: Wed, 22 Apr 2026 19:03:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=90=9C=E7=B4=A2=E5=9C=BA?= =?UTF-8?q?=E6=99=AF=20citation=20=E5=81=B6=E5=8F=91=E6=9C=AA=E6=9B=BF?= =?UTF-8?q?=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/sse/consumer.go | 9 ++++++++- internal/sse/consumer_edge_test.go | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/internal/sse/consumer.go b/internal/sse/consumer.go index 83d66e9..11dc291 100644 --- a/internal/sse/consumer.go +++ b/internal/sse/consumer.go @@ -29,6 +29,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co text := strings.Builder{} thinking := strings.Builder{} contentFilter := false + stopped := false collector := newCitationLinkCollector() currentType := "text" if thinkingEnabled { @@ -38,6 +39,9 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co if chunk, done, parsed := ParseDeepSeekSSELine(line); parsed && !done { collector.ingestChunk(chunk) } + if stopped { + return true + } result := ParseDeepSeekContentLine(line, thinkingEnabled, currentType) currentType = result.NextType if !result.Parsed { @@ -47,7 +51,10 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co if result.ContentFilter { contentFilter = true } - return false + // Keep scanning to collect late-arriving citation metadata lines + // that can appear after response/status=FINISHED. + stopped = true + return true } for _, p := range result.Parts { if p.Type == "thinking" { diff --git a/internal/sse/consumer_edge_test.go b/internal/sse/consumer_edge_test.go index d720414..9e751c7 100644 --- a/internal/sse/consumer_edge_test.go +++ b/internal/sse/consumer_edge_test.go @@ -185,6 +185,24 @@ func TestCollectStreamExtractsCitationLinksWithRepeatedURLsAndNilIndices(t *test } } +func TestCollectStreamCollectsCitationLinksAfterFinished(t *testing.T) { + resp := makeHTTPResponse( + "data: {\"p\":\"response/content\",\"v\":\"结论[citation:1]\"}\n" + + "data: {\"p\":\"response/status\",\"v\":\"FINISHED\"}\n" + + "data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":1}]}\n" + + "data: {\"p\":\"response/content\",\"v\":\"should-not-append\"}\n" + + "data: [DONE]\n", + ) + + result := CollectStream(resp, false, false) + if result.Text != "结论[citation:1]" { + t.Fatalf("expected text to freeze after finished, got %q", result.Text) + } + if got := result.CitationLinks[1]; got != "https://example.com/a" { + t.Fatalf("expected citation 1 link, got %q", got) + } +} + func TestCollectStreamMultipleThinkingChunks(t *testing.T) { resp := makeHTTPResponse( "data: {\"p\":\"response/thinking_content\",\"v\":\"part1\"}\n" +