diff --git a/internal/js/chat-stream/sse_parse_impl.js b/internal/js/chat-stream/sse_parse_impl.js index 577b1c4..09530bc 100644 --- a/internal/js/chat-stream/sse_parse_impl.js +++ b/internal/js/chat-stream/sse_parse_impl.js @@ -58,11 +58,22 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc newType: currentType, }; } - if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') { + if (isStatusPath(pathValue)) { + if (asString(chunk.v) === 'FINISHED') { + return { + parsed: true, + parts: [], + finished: true, + contentFilter: false, + errorMessage: '', + outputTokens, + newType: currentType, + }; + } return { parsed: true, parts: [], - finished: true, + finished: false, contentFilter: false, errorMessage: '', outputTokens, @@ -149,6 +160,17 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc newType, }; } + if (isStatusPath(pathValue)) { + return { + parsed: true, + parts: [], + finished: false, + contentFilter: false, + errorMessage: '', + outputTokens, + newType, + }; + } const content = asContentString(val, stripReferenceMarkers); if (content) { parts.push({ text: content, type: partType }); @@ -235,8 +257,11 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru } const itemPath = asString(it.p); const itemV = it.v; - if (itemPath === 'status' && asString(itemV) === 'FINISHED') { - return { parts: [], finished: true }; + if (isStatusPath(itemPath)) { + if (asString(itemV) === 'FINISHED') { + return { parts: [], finished: true }; + } + continue; } if (shouldSkipPath(itemPath)) { continue; @@ -262,6 +287,9 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru } if (typeof itemV === 'string') { + if (isStatusPath(itemPath)) { + continue; + } if (itemV && itemV !== 'FINISHED') { const content = asContentString(itemV, stripReferenceMarkers); if (content) { @@ -304,6 +332,10 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru return { parts, finished: false }; } +function isStatusPath(pathValue) { + return pathValue === 'response/status' || pathValue === 'status'; +} + function filterLeakedContentFilterParts(parts) { if (!Array.isArray(parts) || parts.length === 0) { return parts; diff --git a/internal/sse/line_test.go b/internal/sse/line_test.go index 7ca1e8c..09aa97c 100644 --- a/internal/sse/line_test.go +++ b/internal/sse/line_test.go @@ -63,6 +63,16 @@ func TestParseDeepSeekContentLineContent(t *testing.T) { } } +func TestParseDeepSeekContentLineFiltersIncompleteStatusText(t *testing.T) { + res := ParseDeepSeekContentLine([]byte(`data: {"p":"response/status","v":"INCOMPLETE"}`), false, "text") + if !res.Parsed || res.Stop { + t.Fatalf("expected parsed non-stop result: %#v", res) + } + if len(res.Parts) != 0 { + t.Fatalf("expected INCOMPLETE status to be filtered, got %#v", res.Parts) + } +} + func TestParseDeepSeekContentLinePreservesSpaceOnlyChunk(t *testing.T) { res := ParseDeepSeekContentLine([]byte(`data: {"v":" "}`), false, "text") if !res.Parsed || res.Stop { diff --git a/internal/sse/parser.go b/internal/sse/parser.go index 59d5678..9deb440 100644 --- a/internal/sse/parser.go +++ b/internal/sse/parser.go @@ -79,9 +79,12 @@ func ParseSSEChunkForContent(chunk map[string]any, thinkingEnabled bool, current if shouldSkipPath(path) { return nil, false, currentFragmentType } - if path == "response/status" { - if s, ok := v.(string); ok && s == "FINISHED" { - return nil, true, currentFragmentType + if isStatusPath(path) { + if s, ok := v.(string); ok { + if strings.EqualFold(strings.TrimSpace(s), "FINISHED") { + return nil, true, currentFragmentType + } + return nil, false, currentFragmentType } } newType := currentFragmentType @@ -184,6 +187,9 @@ func appendChunkValueContent(v any, partType string, newType *string, parts *[]C if val == "FINISHED" && (path == "" || path == "status") { return true } + if isStatusPath(path) { + return false + } appendContentPart(parts, val, partType) case []any: pp, finished := extractContentRecursive(val, partType) @@ -241,6 +247,10 @@ func appendContentPart(parts *[]ContentPart, content, kind string) { *parts = append(*parts, ContentPart{Text: content, Type: kind}) } +func isStatusPath(path string) bool { + return path == "response/status" || path == "status" +} + func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bool) { parts := make([]ContentPart, 0, len(items)) for _, it := range items { @@ -253,10 +263,11 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo if !hasV { continue } - if itemPath == "status" { - if s, ok := itemV.(string); ok && s == "FINISHED" { + if isStatusPath(itemPath) { + if s, ok := itemV.(string); ok && strings.EqualFold(strings.TrimSpace(s), "FINISHED") { return nil, true } + continue } if shouldSkipPath(itemPath) { continue @@ -282,6 +293,9 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo } switch v := itemV.(type) { case string: + if isStatusPath(itemPath) { + continue + } if v != "" && v != "FINISHED" { parts = append(parts, ContentPart{Text: v, Type: partType}) } diff --git a/internal/sse/parser_edge_test.go b/internal/sse/parser_edge_test.go index 20a7342..ba1c723 100644 --- a/internal/sse/parser_edge_test.go +++ b/internal/sse/parser_edge_test.go @@ -159,8 +159,8 @@ func TestParseSSEChunkForContentStatusNotFinished(t *testing.T) { if finished { t.Fatal("expected not finished for non-FINISHED status") } - if len(parts) != 1 || parts[0].Text != "IN_PROGRESS" { - t.Fatalf("expected content for non-FINISHED status, got %#v", parts) + if len(parts) != 0 { + t.Fatalf("expected non-finished status to be filtered, got %#v", parts) } } diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index e6a13f9..6086ba1 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -291,6 +291,19 @@ test('parseChunkForContent preserves output tokens on FINISHED lines', () => { assert.deepEqual(parsed.parts, []); }); +test('parseChunkForContent filters INCOMPLETE status text without stopping stream', () => { + const parsed = parseChunkForContent( + { p: 'response/status', v: 'INCOMPLETE', accumulated_token_usage: 190 }, + false, + 'text', + ); + assert.equal(parsed.parsed, true); + assert.equal(parsed.finished, false); + assert.equal(parsed.contentFilter, false); + assert.equal(parsed.outputTokens, 190); + assert.deepEqual(parsed.parts, []); +}); + test('parseChunkForContent strips leaked CONTENT_FILTER suffix and preserves line breaks', () => { const leaked = parseChunkForContent( { p: 'response/content', v: '正常输出CONTENT_FILTER你好,这个问题我暂时无法回答' },