From 96b8587c5bdee686ea25b3b377a271ec784fa181 Mon Sep 17 00:00:00 2001 From: "CJACK." Date: Tue, 7 Apr 2026 08:27:03 +0800 Subject: [PATCH] Fix token usage propagation and remove stale env docs --- README.MD | 1 - README.en.md | 1 - internal/adapter/openai/prompt_build_test.go | 9 +- internal/adapter/openai/responses_handler.go | 13 ++- .../openai/responses_stream_runtime_core.go | 18 +++- internal/adapter/openai/stream_status_test.go | 94 +++++++++++++++++++ tests/node/chat-stream.test.js | 22 ++++- 7 files changed, 143 insertions(+), 15 deletions(-) diff --git a/README.MD b/README.MD index 6e5411d..5d2ee4e 100644 --- a/README.MD +++ b/README.MD @@ -344,7 +344,6 @@ cp opencode.json.example opencode.json | `DS2API_CONFIG_PATH` | 配置文件路径 | `config.json` | | `DS2API_CONFIG_JSON` | 直接注入配置(JSON 或 Base64) | — | | `DS2API_ENV_WRITEBACK` | 环境变量模式下自动写回配置文件并切换文件模式(`1/true/yes/on`) | 关闭 | -| `DS2API_POW_CONCURRENCY` | PoW 并行计算协程数(可选) | 默认 CPU 核心数 | | `DS2API_STATIC_ADMIN_DIR` | 管理台静态文件目录 | `static/admin` | | `DS2API_AUTO_BUILD_WEBUI` | 启动时自动构建 WebUI | 本地开启,Vercel 关闭 | | `DS2API_DEV_PACKET_CAPTURE` | 本地开发抓包开关(记录最近会话请求/响应体) | 本地非 Vercel 默认开启 | diff --git a/README.en.md b/README.en.md index df764ab..6753bc0 100644 --- a/README.en.md +++ b/README.en.md @@ -344,7 +344,6 @@ cp opencode.json.example opencode.json | `DS2API_CONFIG_PATH` | Config file path | `config.json` | | `DS2API_CONFIG_JSON` | Inline config (JSON or Base64) | — | | `DS2API_ENV_WRITEBACK` | Auto-write env-backed config to file and transition to file mode (`1/true/yes/on`) | Disabled | -| `DS2API_POW_CONCURRENCY` | PoW parallel solver goroutine count (optional) | Default CPU core count | | `DS2API_STATIC_ADMIN_DIR` | Admin static assets dir | `static/admin` | | `DS2API_AUTO_BUILD_WEBUI` | Auto-build WebUI on startup | Enabled locally, disabled on Vercel | | `DS2API_ACCOUNT_MAX_INFLIGHT` | Max in-flight requests per account | `2` | diff --git a/internal/adapter/openai/prompt_build_test.go b/internal/adapter/openai/prompt_build_test.go index 223689b..390cbd4 100644 --- a/internal/adapter/openai/prompt_build_test.go +++ b/internal/adapter/openai/prompt_build_test.go @@ -74,16 +74,13 @@ func TestBuildOpenAIFinalPrompt_VercelPreparePathKeepsFinalAnswerInstruction(t * } finalPrompt, _ := buildOpenAIFinalPrompt(messages, tools, "") - if !strings.Contains(finalPrompt, "After receiving a tool result, use it directly.") { - t.Fatalf("vercel prepare finalPrompt missing final-answer instruction: %q", finalPrompt) - } - if !strings.Contains(finalPrompt, "Only call another tool if the result is insufficient.") { - t.Fatalf("vercel prepare finalPrompt missing retry guard instruction: %q", finalPrompt) + if !strings.Contains(finalPrompt, "Remember: Output ONLY the ... XML block when calling tools.") { + t.Fatalf("vercel prepare finalPrompt missing final tool-call anchor instruction: %q", finalPrompt) } if !strings.Contains(finalPrompt, "TOOL CALL FORMAT") { t.Fatalf("vercel prepare finalPrompt missing xml format instruction: %q", finalPrompt) } - if !strings.Contains(finalPrompt, "Do NOT wrap the XML in markdown code fences") { + if !strings.Contains(finalPrompt, "Do NOT wrap XML in markdown fences") { t.Fatalf("vercel prepare finalPrompt missing no-fence xml instruction: %q", finalPrompt) } if strings.Contains(finalPrompt, "```json") { diff --git a/internal/adapter/openai/responses_handler.go b/internal/adapter/openai/responses_handler.go index 7cb7ec3..1bd5b1c 100644 --- a/internal/adapter/openai/responses_handler.go +++ b/internal/adapter/openai/responses_handler.go @@ -130,12 +130,17 @@ func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Res } responseObj := openaifmt.BuildResponseObject(responseID, model, finalPrompt, sanitizedThinking, sanitizedText, toolNames) - if result.OutputTokens > 0 { + if result.PromptTokens > 0 || result.OutputTokens > 0 { if usage, ok := responseObj["usage"].(map[string]any); ok { - usage["output_tokens"] = result.OutputTokens - if input, ok := usage["input_tokens"].(int); ok { - usage["total_tokens"] = input + result.OutputTokens + if result.PromptTokens > 0 { + usage["input_tokens"] = result.PromptTokens } + if result.OutputTokens > 0 { + usage["output_tokens"] = result.OutputTokens + } + input, _ := usage["input_tokens"].(int) + output, _ := usage["output_tokens"].(int) + usage["total_tokens"] = input + output } } h.getResponseStore().put(owner, responseID, responseObj) diff --git a/internal/adapter/openai/responses_stream_runtime_core.go b/internal/adapter/openai/responses_stream_runtime_core.go index 8072ccb..ff9ea26 100644 --- a/internal/adapter/openai/responses_stream_runtime_core.go +++ b/internal/adapter/openai/responses_stream_runtime_core.go @@ -51,6 +51,7 @@ type responsesStreamRuntime struct { messagePartAdded bool sequence int failed bool + promptTokens int outputTokens int persistResponse func(obj map[string]any) @@ -152,9 +153,19 @@ func (s *responsesStreamRuntime) finalize() { if s.outputTokens > 0 { if usage, ok := obj["usage"].(map[string]any); ok { usage["output_tokens"] = s.outputTokens - if input, ok := usage["input_tokens"].(int); ok { - usage["total_tokens"] = input + s.outputTokens + } + } + if s.promptTokens > 0 || s.outputTokens > 0 { + if usage, ok := obj["usage"].(map[string]any); ok { + if s.promptTokens > 0 { + usage["input_tokens"] = s.promptTokens } + if s.outputTokens > 0 { + usage["output_tokens"] = s.outputTokens + } + input, _ := usage["input_tokens"].(int) + output, _ := usage["output_tokens"].(int) + usage["total_tokens"] = input + output } } if s.persistResponse != nil { @@ -185,6 +196,9 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa if !parsed.Parsed { return streamengine.ParsedDecision{} } + if parsed.PromptTokens > 0 { + s.promptTokens = parsed.PromptTokens + } if parsed.OutputTokens > 0 { s.outputTokens = parsed.OutputTokens } diff --git a/internal/adapter/openai/stream_status_test.go b/internal/adapter/openai/stream_status_test.go index 6352141..1601a7c 100644 --- a/internal/adapter/openai/stream_status_test.go +++ b/internal/adapter/openai/stream_status_test.go @@ -238,3 +238,97 @@ func TestChatCompletionsStreamContentFilterStopsNormallyWithoutLeak(t *testing.T t.Fatalf("expected finish_reason=stop for content-filter upstream stop, got %#v", choice["finish_reason"]) } } + +func TestResponsesStreamUsageOverridesFromBatchAccumulatedTokenUsage(t *testing.T) { + statuses := make([]int, 0, 1) + h := &Handler{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: streamStatusDSStub{resp: makeOpenAISSEHTTPResponse( + `data: {"p":"response/content","v":"hello"}`, + `data: {"p":"response","o":"BATCH","v":[{"p":"accumulated_token_usage","v":190},{"p":"quasi_status","v":"FINISHED"}]}`, + )}, + } + r := chi.NewRouter() + r.Use(captureStatusMiddleware(&statuses)) + RegisterRoutes(r, h) + + reqBody := `{"model":"deepseek-chat","input":"hi","stream":true}` + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(statuses) != 1 || statuses[0] != http.StatusOK { + t.Fatalf("expected captured status 200, got %#v", statuses) + } + frames, done := parseSSEDataFrames(t, rec.Body.String()) + if !done { + t.Fatalf("expected [DONE], body=%s", rec.Body.String()) + } + if len(frames) == 0 { + t.Fatalf("expected at least one json frame, body=%s", rec.Body.String()) + } + last := frames[len(frames)-1] + resp, _ := last["response"].(map[string]any) + if resp == nil { + t.Fatalf("expected response payload in final frame, got %#v", last) + } + usage, _ := resp["usage"].(map[string]any) + if usage == nil { + t.Fatalf("expected usage in response payload, got %#v", resp) + } + if got, _ := usage["output_tokens"].(float64); int(got) != 190 { + t.Fatalf("expected output_tokens=190, got %#v", usage["output_tokens"]) + } +} + +func TestResponsesNonStreamUsageOverridesPromptAndOutputTokenUsage(t *testing.T) { + statuses := make([]int, 0, 1) + h := &Handler{ + Store: mockOpenAIConfig{wideInput: true}, + Auth: streamStatusAuthStub{}, + DS: streamStatusDSStub{resp: makeOpenAISSEHTTPResponse( + `data: {"p":"response/content","v":"ok"}`, + `data: {"p":"response","o":"BATCH","v":[{"p":"token_usage","v":{"prompt_tokens":11,"completion_tokens":29}},{"p":"quasi_status","v":"FINISHED"}]}`, + )}, + } + r := chi.NewRouter() + r.Use(captureStatusMiddleware(&statuses)) + RegisterRoutes(r, h) + + reqBody := `{"model":"deepseek-chat","input":"hi","stream":false}` + req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody)) + req.Header.Set("Authorization", "Bearer direct-token") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + r.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if len(statuses) != 1 || statuses[0] != http.StatusOK { + t.Fatalf("expected captured status 200, got %#v", statuses) + } + var out map[string]any + if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil { + t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String()) + } + usage, _ := out["usage"].(map[string]any) + if usage == nil { + t.Fatalf("expected usage object, got %#v", out) + } + if got, _ := usage["input_tokens"].(float64); int(got) != 11 { + t.Fatalf("expected input_tokens=11, got %#v", usage["input_tokens"]) + } + if got, _ := usage["output_tokens"].(float64); int(got) != 29 { + t.Fatalf("expected output_tokens=29, got %#v", usage["output_tokens"]) + } + if got, _ := usage["total_tokens"].(float64); int(got) != 40 { + t.Fatalf("expected total_tokens=40, got %#v", usage["total_tokens"]) + } +} diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 9681843..d1cc859 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -275,7 +275,7 @@ test('parseChunkForContent keeps error branches distinct from content_filter sta assert.equal(parsed.finished, true); assert.equal(parsed.contentFilter, false); assert.equal(parsed.errorMessage.length > 0, true); - assert.equal(parsed.outputTokens, 0); + assert.equal(parsed.outputTokens, 88); assert.deepEqual(parsed.parts, []); }); @@ -292,6 +292,26 @@ test('parseChunkForContent preserves output tokens on FINISHED lines', () => { assert.deepEqual(parsed.parts, []); }); +test('parseChunkForContent captures output tokens from response BATCH status snapshots', () => { + const parsed = parseChunkForContent( + { + p: 'response', + o: 'BATCH', + v: [ + { p: 'accumulated_token_usage', v: 190 }, + { p: 'quasi_status', v: 'FINISHED' }, + ], + }, + false, + 'text', + ); + assert.equal(parsed.parsed, true); + assert.equal(parsed.finished, false); + assert.equal(parsed.contentFilter, false); + assert.equal(parsed.outputTokens, 190); + assert.deepEqual(parsed.parts, []); +}); + test('parseChunkForContent matches FINISHED case-insensitively on status paths', () => { const parsed = parseChunkForContent( { p: 'response/status', v: ' finished ', accumulated_token_usage: 190 },