mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-04 16:35:27 +08:00
Replace bufio.Scanner with bufio.NewReaderSize + ReadBytes('\n') across all
SSE read paths to preserve long single-line data (e.g. write_file content).
Add quasi_status and auto_continue handling as direct path-based patches in
both Go continue observer and Node vercel_stream_impl, mirroring existing
batch-patch logic. Add 2MiB+ line throughput tests at every SSE layer.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
299 lines
10 KiB
Go
299 lines
10 KiB
Go
package sse
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// ─── CollectStream edge cases ────────────────────────────────────────
|
|
|
|
func makeHTTPResponse(body string) *http.Response {
|
|
return &http.Response{
|
|
StatusCode: http.StatusOK,
|
|
Header: make(http.Header),
|
|
Body: io.NopCloser(strings.NewReader(body)),
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamEmpty(t *testing.T) {
|
|
resp := makeHTTPResponse("")
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "" || result.Thinking != "" {
|
|
t.Fatalf("expected empty result, got text=%q think=%q", result.Text, result.Thinking)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamTextOnly(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/content\",\"v\":\"Hello\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\" World\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "Hello World" {
|
|
t.Fatalf("expected 'Hello World', got %q", result.Text)
|
|
}
|
|
if result.Thinking != "" {
|
|
t.Fatalf("expected no thinking, got %q", result.Thinking)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamHandlesLongSingleSSELine(t *testing.T) {
|
|
payload := strings.Repeat("x", 2*1024*1024+4096)
|
|
resp := makeHTTPResponse(makeLargeContentSSEBody(t, payload))
|
|
result := CollectStream(resp, false, true)
|
|
if result.Text != payload {
|
|
t.Fatalf("long SSE line payload mismatch: got len=%d want len=%d", len(result.Text), len(payload))
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamThinkingAndText(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/thinking_content\",\"v\":\"Thinking...\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"Answer\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, true, true)
|
|
if result.Thinking != "Thinking..." {
|
|
t.Fatalf("expected 'Thinking...', got %q", result.Thinking)
|
|
}
|
|
if result.Text != "Answer" {
|
|
t.Fatalf("expected 'Answer', got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamDropsThinkingWhenDisabled(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/thinking_content\",\"v\":\"Thinking...\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"Answer\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, true)
|
|
if result.Thinking != "" {
|
|
t.Fatalf("expected disabled thinking to be dropped, got %q", result.Thinking)
|
|
}
|
|
if result.Text != "Answer" {
|
|
t.Fatalf("expected only visible answer, got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamOnlyThinking(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/thinking_content\",\"v\":\"Only thinking\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, true, true)
|
|
if result.Thinking != "Only thinking" {
|
|
t.Fatalf("expected 'Only thinking', got %q", result.Thinking)
|
|
}
|
|
if result.Text != "" {
|
|
t.Fatalf("expected empty text, got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamSkipsInvalidLines(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"event: comment\n" +
|
|
"data: invalid_json\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"valid\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "valid" {
|
|
t.Fatalf("expected 'valid', got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamWithFragments(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/fragments\",\"o\":\"APPEND\",\"v\":[{\"type\":\"THINK\",\"content\":\"Think\"}]}\n" +
|
|
"data: {\"p\":\"response/fragments\",\"o\":\"APPEND\",\"v\":[{\"type\":\"RESPONSE\",\"content\":\"Done\"}]}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, true, true)
|
|
if result.Thinking != "Think" {
|
|
t.Fatalf("expected 'Think' thinking, got %q", result.Thinking)
|
|
}
|
|
if result.Text != "Done" {
|
|
t.Fatalf("expected 'Done' text, got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamWithCitation(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/content\",\"v\":\"Hello\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"[citation:1] cited text\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\" more\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
// CollectStream does NOT filter citations (that's done by the adapters)
|
|
// So citations are passed through as-is
|
|
if !strings.Contains(result.Text, "[citation:1]") {
|
|
t.Fatalf("expected citations to be passed through, got %q", result.Text)
|
|
}
|
|
if result.Text != "Hello[citation:1] cited text more" {
|
|
t.Fatalf("expected full text with citation, got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamExtractsCitationLinks(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":0},{\"url\":\"https://example.com/b\",\"cite_index\":1}]}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"结论[citation:1][citation:2]\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 1 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[2]; got != "https://example.com/b" {
|
|
t.Fatalf("expected citation 2 link, got %q", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamExtractsCitationLinksForSequentialZeroBasedIndices(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":0},{\"url\":\"https://example.com/b\",\"cite_index\":1},{\"url\":\"https://example.com/c\",\"cite_index\":2}]}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"结论[citation:1][citation:2][citation:3]\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 1 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[2]; got != "https://example.com/b" {
|
|
t.Fatalf("expected citation 2 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[3]; got != "https://example.com/c" {
|
|
t.Fatalf("expected citation 3 link, got %q", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamExtractsCitationLinksForOneBasedIndices(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":1},{\"url\":\"https://example.com/b\",\"cite_index\":2}]}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"结论[citation:1][citation:2]\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 1 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[2]; got != "https://example.com/b" {
|
|
t.Fatalf("expected citation 2 link, got %q", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamExtractsCitationLinksWithRepeatedURLsAndNilIndices(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":null},{\"url\":\"https://example.com/a\",\"cite_index\":null},{\"url\":\"https://example.com/b\",\"cite_index\":null}]}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"结论[citation:1][citation:2][citation:3]\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 1 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[2]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 2 link, got %q", got)
|
|
}
|
|
if got := result.CitationLinks[3]; got != "https://example.com/b" {
|
|
t.Fatalf("expected citation 3 link, got %q", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamCollectsCitationLinksAfterFinished(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/content\",\"v\":\"结论[citation:1]\"}\n" +
|
|
"data: {\"p\":\"response/status\",\"v\":\"FINISHED\"}\n" +
|
|
"data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":1}]}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"should-not-append\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "结论[citation:1]" {
|
|
t.Fatalf("expected text to freeze after finished, got %q", result.Text)
|
|
}
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation 1 link, got %q", got)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamMultipleThinkingChunks(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/thinking_content\",\"v\":\"part1\"}\n" +
|
|
"data: {\"p\":\"response/thinking_content\",\"v\":\" part2\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"answer\"}\n" +
|
|
"data: [DONE]\n",
|
|
)
|
|
result := CollectStream(resp, true, true)
|
|
if result.Thinking != "part1 part2" {
|
|
t.Fatalf("expected 'part1 part2', got %q", result.Thinking)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamStatusFinished(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/content\",\"v\":\"Hello\"}\n" +
|
|
"data: {\"p\":\"response/status\",\"v\":\"FINISHED\"}\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "Hello" {
|
|
t.Fatalf("expected 'Hello', got %q", result.Text)
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamStopsOnDoneAfterFinished(t *testing.T) {
|
|
pr, pw := io.Pipe()
|
|
defer func() { _ = pw.Close() }()
|
|
|
|
resp := &http.Response{
|
|
StatusCode: http.StatusOK,
|
|
Header: make(http.Header),
|
|
Body: pr,
|
|
}
|
|
|
|
resultCh := make(chan CollectResult, 1)
|
|
go func() {
|
|
resultCh <- CollectStream(resp, false, false)
|
|
}()
|
|
|
|
_, _ = io.WriteString(pw, "data: {\"p\":\"response/content\",\"v\":\"Hello\"}\n")
|
|
_, _ = io.WriteString(pw, "data: {\"p\":\"response/status\",\"v\":\"FINISHED\"}\n")
|
|
_, _ = io.WriteString(pw, "data: {\"p\":\"response/fragments/-1/results\",\"v\":[{\"url\":\"https://example.com/a\",\"cite_index\":1}]}\n")
|
|
_, _ = io.WriteString(pw, "data: [DONE]\n")
|
|
|
|
select {
|
|
case result := <-resultCh:
|
|
if result.Text != "Hello" {
|
|
t.Fatalf("expected text to freeze at FINISHED, got %q", result.Text)
|
|
}
|
|
if got := result.CitationLinks[1]; got != "https://example.com/a" {
|
|
t.Fatalf("expected citation metadata after FINISHED, got %q", got)
|
|
}
|
|
case <-time.After(500 * time.Millisecond):
|
|
t.Fatal("CollectStream did not stop on [DONE] after FINISHED")
|
|
}
|
|
}
|
|
|
|
func TestCollectStreamStopsOnContentFilterStatus(t *testing.T) {
|
|
resp := makeHTTPResponse(
|
|
"data: {\"p\":\"response/content\",\"v\":\"safe\"}\n" +
|
|
"data: {\"p\":\"response/status\",\"v\":\"CONTENT_FILTER\"}\n" +
|
|
"data: {\"p\":\"response/content\",\"v\":\"blocked\"}\n",
|
|
)
|
|
result := CollectStream(resp, false, false)
|
|
if result.Text != "safe" {
|
|
t.Fatalf("expected stream to stop before blocked tail, got %q", result.Text)
|
|
}
|
|
}
|