fix: ignore INCOMPLETE status messages in SSE stream parsing to prevent stream interruption

This commit is contained in:
CJACK
2026-04-05 23:38:47 +08:00
parent 84813eca80
commit 49430123d8
5 changed files with 80 additions and 11 deletions

View File

@@ -58,11 +58,22 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc
newType: currentType,
};
}
if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') {
if (isStatusPath(pathValue)) {
if (asString(chunk.v) === 'FINISHED') {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
return {
parsed: true,
parts: [],
finished: true,
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
@@ -149,6 +160,17 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc
newType,
};
}
if (isStatusPath(pathValue)) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
const content = asContentString(val, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
@@ -235,8 +257,11 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
}
const itemPath = asString(it.p);
const itemV = it.v;
if (itemPath === 'status' && asString(itemV) === 'FINISHED') {
return { parts: [], finished: true };
if (isStatusPath(itemPath)) {
if (asString(itemV) === 'FINISHED') {
return { parts: [], finished: true };
}
continue;
}
if (shouldSkipPath(itemPath)) {
continue;
@@ -262,6 +287,9 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
}
if (typeof itemV === 'string') {
if (isStatusPath(itemPath)) {
continue;
}
if (itemV && itemV !== 'FINISHED') {
const content = asContentString(itemV, stripReferenceMarkers);
if (content) {
@@ -304,6 +332,10 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
return { parts, finished: false };
}
function isStatusPath(pathValue) {
return pathValue === 'response/status' || pathValue === 'status';
}
function filterLeakedContentFilterParts(parts) {
if (!Array.isArray(parts) || parts.length === 0) {
return parts;

View File

@@ -63,6 +63,16 @@ func TestParseDeepSeekContentLineContent(t *testing.T) {
}
}
func TestParseDeepSeekContentLineFiltersIncompleteStatusText(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"p":"response/status","v":"INCOMPLETE"}`), false, "text")
if !res.Parsed || res.Stop {
t.Fatalf("expected parsed non-stop result: %#v", res)
}
if len(res.Parts) != 0 {
t.Fatalf("expected INCOMPLETE status to be filtered, got %#v", res.Parts)
}
}
func TestParseDeepSeekContentLinePreservesSpaceOnlyChunk(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"v":" "}`), false, "text")
if !res.Parsed || res.Stop {

View File

@@ -79,9 +79,12 @@ func ParseSSEChunkForContent(chunk map[string]any, thinkingEnabled bool, current
if shouldSkipPath(path) {
return nil, false, currentFragmentType
}
if path == "response/status" {
if s, ok := v.(string); ok && s == "FINISHED" {
return nil, true, currentFragmentType
if isStatusPath(path) {
if s, ok := v.(string); ok {
if strings.EqualFold(strings.TrimSpace(s), "FINISHED") {
return nil, true, currentFragmentType
}
return nil, false, currentFragmentType
}
}
newType := currentFragmentType
@@ -184,6 +187,9 @@ func appendChunkValueContent(v any, partType string, newType *string, parts *[]C
if val == "FINISHED" && (path == "" || path == "status") {
return true
}
if isStatusPath(path) {
return false
}
appendContentPart(parts, val, partType)
case []any:
pp, finished := extractContentRecursive(val, partType)
@@ -241,6 +247,10 @@ func appendContentPart(parts *[]ContentPart, content, kind string) {
*parts = append(*parts, ContentPart{Text: content, Type: kind})
}
func isStatusPath(path string) bool {
return path == "response/status" || path == "status"
}
func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bool) {
parts := make([]ContentPart, 0, len(items))
for _, it := range items {
@@ -253,10 +263,11 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo
if !hasV {
continue
}
if itemPath == "status" {
if s, ok := itemV.(string); ok && s == "FINISHED" {
if isStatusPath(itemPath) {
if s, ok := itemV.(string); ok && strings.EqualFold(strings.TrimSpace(s), "FINISHED") {
return nil, true
}
continue
}
if shouldSkipPath(itemPath) {
continue
@@ -282,6 +293,9 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo
}
switch v := itemV.(type) {
case string:
if isStatusPath(itemPath) {
continue
}
if v != "" && v != "FINISHED" {
parts = append(parts, ContentPart{Text: v, Type: partType})
}

View File

@@ -159,8 +159,8 @@ func TestParseSSEChunkForContentStatusNotFinished(t *testing.T) {
if finished {
t.Fatal("expected not finished for non-FINISHED status")
}
if len(parts) != 1 || parts[0].Text != "IN_PROGRESS" {
t.Fatalf("expected content for non-FINISHED status, got %#v", parts)
if len(parts) != 0 {
t.Fatalf("expected non-finished status to be filtered, got %#v", parts)
}
}

View File

@@ -291,6 +291,19 @@ test('parseChunkForContent preserves output tokens on FINISHED lines', () => {
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent filters INCOMPLETE status text without stopping stream', () => {
const parsed = parseChunkForContent(
{ p: 'response/status', v: 'INCOMPLETE', accumulated_token_usage: 190 },
false,
'text',
);
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, false);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent strips leaked CONTENT_FILTER suffix and preserves line breaks', () => {
const leaked = parseChunkForContent(
{ p: 'response/content', v: '正常输出CONTENT_FILTER你好这个问题我暂时无法回答' },