From 92e321fe2c3364db5f6a5482117cf6eb10c3b5b9 Mon Sep 17 00:00:00 2001 From: CJACK Date: Fri, 1 May 2026 01:31:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=90=9E=E5=AD=97=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- VERSION | 2 +- go.mod | 6 +- go.sum | 2 + .../openai/chat/chat_stream_runtime.go | 52 ++++---- .../openai/chat/empty_retry_runtime.go | 1 + .../openai/chat/handler_toolcall_test.go | 112 ++++++++++++++++++ internal/js/chat-stream/sse_parse_impl.js | 11 +- internal/sse/parser.go | 23 +++- internal/sse/parser_test.go | 65 ++++++++++ internal/sse/stream_edge_test.go | 17 ++- tests/node/chat-stream.test.js | 13 +- 11 files changed, 257 insertions(+), 47 deletions(-) diff --git a/VERSION b/VERSION index 6aba2b2..fae6e3d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.2.0 +4.2.1 diff --git a/go.mod b/go.mod index 87cabfa..9ce4793 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,12 @@ require ( github.com/andybalholm/brotli v1.2.1 github.com/go-chi/chi/v5 v5.2.5 github.com/google/uuid v1.6.0 + github.com/hupe1980/go-tiktoken v0.0.10 github.com/refraction-networking/utls v1.8.2 github.com/router-for-me/CLIProxyAPI/v6 v6.9.14 ) -require ( - github.com/dlclark/regexp2 v1.11.5 // indirect - github.com/hupe1980/go-tiktoken v0.0.10 // indirect -) +require github.com/dlclark/regexp2 v1.11.5 // indirect require ( github.com/klauspost/compress v1.18.5 // indirect diff --git a/go.sum b/go.sum index 811e782..2936ba8 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/httpapi/openai/chat/chat_stream_runtime.go b/internal/httpapi/openai/chat/chat_stream_runtime.go index 446d57e..5cc0548 100644 --- a/internal/httpapi/openai/chat/chat_stream_runtime.go +++ b/internal/httpapi/openai/chat/chat_stream_runtime.go @@ -107,6 +107,23 @@ func (s *chatStreamRuntime) sendChunk(v any) { } } +func (s *chatStreamRuntime) sendDelta(delta map[string]any) { + if len(delta) == 0 { + return + } + if !s.firstChunkSent { + delta["role"] = "assistant" + s.firstChunkSent = true + } + s.sendChunk(openaifmt.BuildChatStreamChunk( + s.completionID, + s.created, + s.model, + []map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)}, + nil, + )) +} + func (s *chatStreamRuntime) sendDone() { _, _ = s.w.Write([]byte("data: [DONE]\n\n")) if s.canFlush { @@ -257,7 +274,6 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReasonHandlerRequested} } - newChoices := make([]map[string]any, 0, len(parsed.Parts)) contentSeen := false for _, p := range parsed.ToolDetectionThinkingParts { trimmed := sse.TrimContinuationOverlap(s.toolDetectionThinking.String(), p.Text) @@ -266,11 +282,6 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD } } for _, p := range parsed.Parts { - delta := map[string]any{} - if !s.firstChunkSent { - delta["role"] = "assistant" - s.firstChunkSent = true - } if p.Type == "thinking" { rawTrimmed := sse.TrimContinuationOverlap(s.rawThinking.String(), p.Text) if rawTrimmed != "" { @@ -287,7 +298,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD continue } s.thinking.WriteString(trimmed) - delta["reasoning_content"] = trimmed + s.sendDelta(map[string]any{"reasoning_content": trimmed}) } } else { rawTrimmed := sse.TrimContinuationOverlap(s.rawText.String(), p.Text) @@ -308,7 +319,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD if trimmed == "" { continue } - delta["content"] = trimmed + s.sendDelta(map[string]any{"content": trimmed}) } else { events := toolstream.ProcessChunk(&s.toolSieve, rawTrimmed, s.toolNames) for _, evt := range events { @@ -328,11 +339,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD "tool_calls": formatted, } s.toolCallsEmitted = true - if !s.firstChunkSent { - tcDelta["role"] = "assistant" - s.firstChunkSent = true - } - newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, tcDelta)) + s.sendDelta(tcDelta) continue } if len(evt.ToolCalls) > 0 { @@ -341,11 +348,7 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD tcDelta := map[string]any{ "tool_calls": formatFinalStreamToolCallsWithStableIDs(evt.ToolCalls, s.streamToolCallIDs, s.toolsRaw), } - if !s.firstChunkSent { - tcDelta["role"] = "assistant" - s.firstChunkSent = true - } - newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, tcDelta)) + s.sendDelta(tcDelta) s.resetStreamToolCallState() continue } @@ -357,22 +360,11 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD contentDelta := map[string]any{ "content": cleaned, } - if !s.firstChunkSent { - contentDelta["role"] = "assistant" - s.firstChunkSent = true - } - newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, contentDelta)) + s.sendDelta(contentDelta) } } } } - if len(delta) > 0 { - newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, delta)) - } - } - - if len(newChoices) > 0 { - s.sendChunk(openaifmt.BuildChatStreamChunk(s.completionID, s.created, s.model, newChoices, nil)) } return streamengine.ParsedDecision{ContentSeen: contentSeen} } diff --git a/internal/httpapi/openai/chat/empty_retry_runtime.go b/internal/httpapi/openai/chat/empty_retry_runtime.go index 3c9ed7c..de2ff12 100644 --- a/internal/httpapi/openai/chat/empty_retry_runtime.go +++ b/internal/httpapi/openai/chat/empty_retry_runtime.go @@ -49,6 +49,7 @@ func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Co detected := detectAssistantToolCalls(result.rawText, result.text, result.rawThinking, result.toolDetectionThinking, toolNames) result.detectedCalls = len(detected.Calls) result.body = openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, result.thinking, result.text, detected.Calls, toolsRaw) + addRefFileTokensToUsage(result.body, refFileTokens) result.finishReason = chatFinishReason(result.body) if !shouldRetryChatNonStream(result, attempts) { h.finishChatNonStreamResult(w, result, attempts, usagePrompt, refFileTokens, historySession) diff --git a/internal/httpapi/openai/chat/handler_toolcall_test.go b/internal/httpapi/openai/chat/handler_toolcall_test.go index 2efd4db..5cb9a54 100644 --- a/internal/httpapi/openai/chat/handler_toolcall_test.go +++ b/internal/httpapi/openai/chat/handler_toolcall_test.go @@ -1,6 +1,7 @@ package chat import ( + "context" "encoding/json" "io" "net/http" @@ -239,6 +240,75 @@ func TestHandleStreamToolsPlainTextStreamsBeforeFinish(t *testing.T) { } } +func TestHandleStreamThinkingDisabledDoesNotLeakHiddenFragmentContinuations(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/fragments","o":"APPEND","v":[{"type":"THINK","content":"我们"}]}`, + `data: {"p":"response/fragments/-1/content","v":"被"}`, + `data: {"v":"要求"}`, + `data: {"p":"response/fragments","o":"APPEND","v":[{"type":"RESPONSE","content":"答"}]}`, + `data: {"p":"response/fragments/-1/content","v":"案"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid-hidden-fragment", "deepseek-v4-flash", "prompt", 0, false, false, nil, nil, nil) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + content := strings.Builder{} + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + for _, item := range choices { + choice, _ := item.(map[string]any) + delta, _ := choice["delta"].(map[string]any) + if c, ok := delta["content"].(string); ok { + content.WriteString(c) + } + } + } + if got := content.String(); got != "答案" { + t.Fatalf("expected only visible response text, got %q body=%s", got, rec.Body.String()) + } +} + +func TestHandleStreamEmitsSingleChoiceFramesForMultipleParsedParts(t *testing.T) { + h := &Handler{} + resp := makeSSEHTTPResponse( + `data: {"p":"response/fragments","o":"APPEND","v":[{"type":"THINK","content":"我们"},{"type":"THINK","content":"被"},{"type":"THINK","content":"要求"},{"type":"RESPONSE","content":"答"},{"type":"RESPONSE","content":"案"}]}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil) + + h.handleStream(rec, req, resp, "cid-multi-parts", "deepseek-v4-pro", "prompt", 0, true, false, nil, nil, nil) + + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + var reasoning, content strings.Builder + for _, frame := range frames { + choices, _ := frame["choices"].([]any) + if len(choices) != 1 { + t.Fatalf("expected exactly one choice per stream frame, got %d frame=%#v body=%s", len(choices), frame, rec.Body.String()) + } + choice, _ := choices[0].(map[string]any) + delta, _ := choice["delta"].(map[string]any) + reasoning.WriteString(asString(delta["reasoning_content"])) + content.WriteString(asString(delta["content"])) + } + if got := reasoning.String(); got != "我们被要求" { + t.Fatalf("first-choice-only client would miss reasoning tokens: got %q body=%s", got, rec.Body.String()) + } + if got := content.String(); got != "答案" { + t.Fatalf("first-choice-only client would miss content tokens: got %q body=%s", got, rec.Body.String()) + } +} + func TestHandleStreamIncompleteCapturedToolJSONFlushesAsTextOnFinalize(t *testing.T) { h := &Handler{} resp := makeSSEHTTPResponse( @@ -447,3 +517,45 @@ func TestHandleStreamCoercesSchemaDeclaredStringArgumentsOnFinalize(t *testing.T } t.Fatalf("expected at least one streamed tool call delta, body=%s", rec.Body.String()) } + +func TestHandleNonStreamWithRetryIncludesRefFileTokensInUsage(t *testing.T) { + h := &Handler{} + + run := func(refFileTokens int) map[string]any { + resp := makeSSEHTTPResponse( + `data: {"p":"response/content","v":"hello world"}`, + `data: [DONE]`, + ) + rec := httptest.NewRecorder() + h.handleNonStreamWithRetry(rec, context.Background(), nil, resp, nil, "", "cid-ref", "deepseek-v4-flash", "prompt", refFileTokens, false, false, nil, nil, nil) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + return decodeJSONBody(t, rec.Body.String()) + } + + base := run(0) + withRef := run(7) + + baseUsage, _ := base["usage"].(map[string]any) + refUsage, _ := withRef["usage"].(map[string]any) + if baseUsage == nil || refUsage == nil { + t.Fatalf("expected usage objects, base=%#v ref=%#v", base["usage"], withRef["usage"]) + } + + getInt := func(m map[string]any, key string) int { + t.Helper() + v, ok := m[key].(float64) + if !ok { + t.Fatalf("expected numeric %s, got %#v", key, m[key]) + } + return int(v) + } + + if got := getInt(refUsage, "prompt_tokens") - getInt(baseUsage, "prompt_tokens"); got != 7 { + t.Fatalf("expected prompt_tokens delta 7, got %d", got) + } + if got := getInt(refUsage, "total_tokens") - getInt(baseUsage, "total_tokens"); got != 7 { + t.Fatalf("expected total_tokens delta 7, got %d", got) + } +} diff --git a/internal/js/chat-stream/sse_parse_impl.js b/internal/js/chat-stream/sse_parse_impl.js index a665f8e..aff7104 100644 --- a/internal/js/chat-stream/sse_parse_impl.js +++ b/internal/js/chat-stream/sse_parse_impl.js @@ -70,7 +70,6 @@ function finalizeThinkingParts(parts, thinkingEnabled, newType) { } if (!thinkingEnabled) { finalParts = dropThinkingParts(finalParts); - finalType = 'text'; } return { parts: finalParts, newType: finalType }; } @@ -213,6 +212,12 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc } } + if (pathValue === 'response/content') { + newType = 'text'; + } else if (pathValue === 'response/thinking_content' && (!thinkingEnabled || newType !== 'text')) { + newType = 'thinking'; + } + let partType = 'text'; if (pathValue === 'response/thinking_content') { if (!thinkingEnabled) { @@ -226,8 +231,8 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc partType = 'text'; } else if (pathValue.includes('response/fragments') && pathValue.includes('/content')) { partType = newType; - } else if (!pathValue && thinkingEnabled) { - partType = newType; + } else if (!pathValue) { + partType = newType || 'text'; } const val = chunk.v; diff --git a/internal/sse/parser.go b/internal/sse/parser.go index 5d0f227..528bd2f 100644 --- a/internal/sse/parser.go +++ b/internal/sse/parser.go @@ -92,6 +92,7 @@ func ParseSSEChunkForContentDetailed(chunk map[string]any, thinkingEnabled bool, } newType := currentFragmentType parts := make([]ContentPart, 0, 8) + updateTypeFromExplicitPath(path, thinkingEnabled, &newType) collectDirectFragments(path, chunk, v, &newType, &parts) updateTypeFromNestedResponse(path, v, &newType) partType := resolvePartType(path, thinkingEnabled, newType) @@ -107,11 +108,24 @@ func ParseSSEChunkForContentDetailed(chunk map[string]any, thinkingEnabled bool, detectionThinkingParts := selectThinkingParts(parts) if !thinkingEnabled { parts = dropThinkingParts(parts) - newType = "text" } return parts, detectionThinkingParts, false, newType } +func updateTypeFromExplicitPath(path string, thinkingEnabled bool, newType *string) { + if newType == nil { + return + } + switch path { + case "response/content": + *newType = "text" + case "response/thinking_content": + if !thinkingEnabled || *newType != "text" { + *newType = "thinking" + } + } +} + func selectThinkingParts(parts []ContentPart) []ContentPart { if len(parts) == 0 { return nil @@ -206,8 +220,11 @@ func resolvePartType(path string, thinkingEnabled bool, newType string) string { return "text" case strings.Contains(path, "response/fragments") && strings.Contains(path, "/content"): return newType - case path == "" && thinkingEnabled: - return newType + case path == "": + if newType != "" { + return newType + } + return "text" default: return "text" } diff --git a/internal/sse/parser_test.go b/internal/sse/parser_test.go index 26c1b74..16c02b9 100644 --- a/internal/sse/parser_test.go +++ b/internal/sse/parser_test.go @@ -88,6 +88,71 @@ func TestParseSSEChunkForContentAfterAppendUsesUpdatedType(t *testing.T) { } } +func TestParseSSEChunkForContentThinkingDisabledKeepsHiddenFragmentState(t *testing.T) { + chunk1 := map[string]any{ + "p": "response/fragments", + "o": "APPEND", + "v": []any{ + map[string]any{"type": "THINK", "content": "我们"}, + }, + } + parts1, finished1, nextType1 := ParseSSEChunkForContent(chunk1, false, "text") + if finished1 { + t.Fatal("expected first chunk unfinished") + } + if nextType1 != "thinking" { + t.Fatalf("expected hidden THINK fragment to keep next type thinking, got %q", nextType1) + } + if len(parts1) != 0 { + t.Fatalf("expected hidden thinking to be dropped, got %#v", parts1) + } + + chunk2 := map[string]any{ + "p": "response/fragments/-1/content", + "v": "被", + } + parts2, finished2, nextType2 := ParseSSEChunkForContent(chunk2, false, nextType1) + if finished2 { + t.Fatal("expected second chunk unfinished") + } + if nextType2 != "thinking" { + t.Fatalf("expected hidden continuation to keep next type thinking, got %q", nextType2) + } + if len(parts2) != 0 { + t.Fatalf("expected hidden continuation to be dropped, got %#v", parts2) + } + + chunk3 := map[string]any{"v": "要求"} + parts3, finished3, nextType3 := ParseSSEChunkForContent(chunk3, false, nextType2) + if finished3 { + t.Fatal("expected third chunk unfinished") + } + if nextType3 != "thinking" { + t.Fatalf("expected pathless hidden continuation to keep next type thinking, got %q", nextType3) + } + if len(parts3) != 0 { + t.Fatalf("expected pathless hidden continuation to be dropped, got %#v", parts3) + } + + chunk4 := map[string]any{ + "p": "response/fragments", + "o": "APPEND", + "v": []any{ + map[string]any{"type": "RESPONSE", "content": "答"}, + }, + } + parts4, finished4, nextType4 := ParseSSEChunkForContent(chunk4, false, nextType3) + if finished4 { + t.Fatal("expected fourth chunk unfinished") + } + if nextType4 != "text" { + t.Fatalf("expected RESPONSE fragment to switch next type text, got %q", nextType4) + } + if len(parts4) != 1 || parts4[0].Type != "text" || parts4[0].Text != "答" { + t.Fatalf("expected visible response text, got %#v", parts4) + } +} + func TestParseSSEChunkForContentAutoTransitionsThinkClose(t *testing.T) { chunk := map[string]any{ "p": "response/thinking_content", diff --git a/internal/sse/stream_edge_test.go b/internal/sse/stream_edge_test.go index 7c5ea4c..40b4460 100644 --- a/internal/sse/stream_edge_test.go +++ b/internal/sse/stream_edge_test.go @@ -158,11 +158,13 @@ func TestStartParsedLinePumpNonSSELines(t *testing.T) { func TestStartParsedLinePumpThinkingDisabled(t *testing.T) { body := strings.NewReader( - "data: {\"p\":\"response/thinking_content\",\"v\":\"thought\"}\n" + + "data: {\"p\":\"response/fragments\",\"o\":\"APPEND\",\"v\":[{\"type\":\"THINK\",\"content\":\"思\"}]}\n" + + "data: {\"p\":\"response/fragments/-1/content\",\"v\":\"考\"}\n" + + "data: {\"v\":\"隐藏\"}\n" + + "data: {\"p\":\"response/fragments\",\"o\":\"APPEND\",\"v\":[{\"type\":\"RESPONSE\",\"content\":\"答\"}]}\n" + "data: {\"p\":\"response/content\",\"v\":\"response\"}\n" + "data: [DONE]\n", ) - // With thinking disabled, thinking content should still be emitted but marked differently results, done := StartParsedLinePump(context.Background(), body, false, "text") var parts []ContentPart @@ -171,8 +173,15 @@ func TestStartParsedLinePumpThinkingDisabled(t *testing.T) { } <-done - if len(parts) < 1 { - t.Fatalf("expected at least 1 part, got %d", len(parts)) + got := strings.Builder{} + for _, p := range parts { + if p.Type != "text" { + t.Fatalf("expected only text parts with thinking disabled, got %#v", parts) + } + got.WriteString(p.Text) + } + if got.String() != "答response" { + t.Fatalf("expected hidden thinking to be dropped, got %q from %#v", got.String(), parts) } } diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 48dc707..b7f75d2 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -511,14 +511,23 @@ test('parseChunkForContent drops thinking content when thinking is disabled', () 'text', ); assert.equal(thinking.finished, false); - assert.equal(thinking.newType, 'text'); + assert.equal(thinking.newType, 'thinking'); assert.deepEqual(thinking.parts, []); + const hiddenContinuation = parseChunkForContent( + { v: 'still hidden' }, + false, + thinking.newType, + ); + assert.equal(hiddenContinuation.newType, 'thinking'); + assert.deepEqual(hiddenContinuation.parts, []); + const answer = parseChunkForContent( { p: 'response/content', v: 'visible answer' }, false, - thinking.newType, + hiddenContinuation.newType, ); + assert.equal(answer.newType, 'text'); assert.deepEqual(answer.parts, [{ text: 'visible answer', type: 'text' }]); });