Merge pull request #244 from CJackHwang/codex/temporarily-switch-to-internal-usage-count

Temporarily ignore DeepSeek upstream usage fields and prefer internal token estimation
This commit is contained in:
CJACK.
2026-04-07 20:39:36 +08:00
committed by GitHub
23 changed files with 55 additions and 472 deletions

View File

@@ -237,6 +237,7 @@ go run ./cmd/ds2api-tests --no-preflight
说明:
- 该工具默认重放 `tests/raw_stream_samples/manifest.json` 声明的 canonical 样本,按上游 SSE 顺序做 1:1 仿真解析。
- 默认校验不出现 `FINISHED` 文本泄露,并要求存在结束信号。
- 默认**不**把 `raw accumulated_token_usage` 与本地解析 token 做强一致校验(当前实现以内容估算为准);如需强校验可显式加 `--fail-on-token-mismatch`
- 每次运行都会把本地派生结果写入 `artifacts/raw-stream-sim/<run-id>/<sample-id>/replay.output.txt`,并输出结构化报告。
- 如果你有历史基线目录,可以通过 `--baseline-root` 让工具直接做文本对比。
- 更完整的协议级行为结构说明见 [DeepSeekSSE行为结构说明-2026-04-05.md](./DeepSeekSSE行为结构说明-2026-04-05.md)。

View File

@@ -24,10 +24,9 @@ type claudeStreamRuntime struct {
bufferToolContent bool
stripReferenceMarkers bool
messageID string
thinking strings.Builder
text strings.Builder
outputTokens int
messageID string
thinking strings.Builder
text strings.Builder
nextBlockIndex int
thinkingBlockOpen bool
@@ -70,9 +69,6 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.OutputTokens > 0 {
s.outputTokens = parsed.OutputTokens
}
if parsed.ErrorMessage != "" {
s.upstreamErr = parsed.ErrorMessage
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("upstream_error")}

View File

@@ -109,9 +109,6 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
}
outputTokens := util.EstimateTokens(finalThinking) + util.EstimateTokens(finalText)
if s.outputTokens > 0 {
outputTokens = s.outputTokens
}
s.send("message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{

View File

@@ -149,14 +149,13 @@ func (h *Handler) handleNonStreamGenerateContent(w http.ResponseWriter, resp *ht
cleanVisibleOutput(result.Thinking, stripReferenceMarkers),
cleanVisibleOutput(result.Text, stripReferenceMarkers),
toolNames,
result.OutputTokens,
))
}
//nolint:unused // retained for native Gemini non-stream handling path.
func buildGeminiGenerateContentResponse(model, finalPrompt, finalThinking, finalText string, toolNames []string, outputTokens int) map[string]any {
func buildGeminiGenerateContentResponse(model, finalPrompt, finalThinking, finalText string, toolNames []string) map[string]any {
parts := buildGeminiPartsFromFinal(finalText, finalThinking, toolNames)
usage := buildGeminiUsage(finalPrompt, finalThinking, finalText, outputTokens)
usage := buildGeminiUsage(finalPrompt, finalThinking, finalText)
return map[string]any{
"candidates": []map[string]any{
{
@@ -174,14 +173,10 @@ func buildGeminiGenerateContentResponse(model, finalPrompt, finalThinking, final
}
//nolint:unused // retained for native Gemini non-stream handling path.
func buildGeminiUsage(finalPrompt, finalThinking, finalText string, outputTokens int) map[string]any {
func buildGeminiUsage(finalPrompt, finalThinking, finalText string) map[string]any {
promptTokens := util.EstimateTokens(finalPrompt)
reasoningTokens := util.EstimateTokens(finalThinking)
completionTokens := util.EstimateTokens(finalText)
if outputTokens > 0 {
completionTokens = outputTokens
reasoningTokens = 0
}
return map[string]any{
"promptTokenCount": promptTokens,
"candidatesTokenCount": reasoningTokens + completionTokens,

View File

@@ -65,9 +65,8 @@ type geminiStreamRuntime struct {
stripReferenceMarkers bool
toolNames []string
thinking strings.Builder
text strings.Builder
outputTokens int
thinking strings.Builder
text strings.Builder
}
//nolint:unused // retained for native Gemini stream handling path.
@@ -112,9 +111,6 @@ func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.OutputTokens > 0 {
s.outputTokens = parsed.OutputTokens
}
if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop {
return streamengine.ParsedDecision{Stop: true}
}
@@ -198,6 +194,6 @@ func (s *geminiStreamRuntime) finalize() {
},
},
"modelVersion": s.model,
"usageMetadata": buildGeminiUsage(s.finalPrompt, finalThinking, finalText, s.outputTokens),
"usageMetadata": buildGeminiUsage(s.finalPrompt, finalThinking, finalText),
})
}

View File

@@ -37,8 +37,6 @@ type chatStreamRuntime struct {
streamToolNames map[int]string
thinking strings.Builder
text strings.Builder
promptTokens int
outputTokens int
}
func newChatStreamRuntime(
@@ -171,17 +169,6 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
finishReason = "tool_calls"
}
usage := openaifmt.BuildChatUsage(s.finalPrompt, finalThinking, finalText)
if s.promptTokens > 0 {
usage["prompt_tokens"] = s.promptTokens
}
if s.outputTokens > 0 {
usage["completion_tokens"] = s.outputTokens
}
if s.promptTokens > 0 || s.outputTokens > 0 {
p := usage["prompt_tokens"].(int)
c := usage["completion_tokens"].(int)
usage["total_tokens"] = p + c
}
s.sendChunk(openaifmt.BuildChatStreamChunk(
s.completionID,
s.created,
@@ -196,12 +183,6 @@ func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedD
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.PromptTokens > 0 {
s.promptTokens = parsed.PromptTokens
}
if parsed.OutputTokens > 0 {
s.outputTokens = parsed.OutputTokens
}
if parsed.ContentFilter {
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReasonHandlerRequested}
}

View File

@@ -131,19 +131,6 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, re
return
}
respBody := openaifmt.BuildChatCompletion(completionID, model, finalPrompt, finalThinking, finalText, toolNames)
if result.PromptTokens > 0 || result.OutputTokens > 0 {
if usage, ok := respBody["usage"].(map[string]any); ok {
if result.PromptTokens > 0 {
usage["prompt_tokens"] = result.PromptTokens
}
if result.OutputTokens > 0 {
usage["completion_tokens"] = result.OutputTokens
}
p, _ := usage["prompt_tokens"].(int)
c, _ := usage["completion_tokens"].(int)
usage["total_tokens"] = p + c
}
}
writeJSON(w, http.StatusOK, respBody)
}

View File

@@ -130,19 +130,6 @@ func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Res
}
responseObj := openaifmt.BuildResponseObject(responseID, model, finalPrompt, sanitizedThinking, sanitizedText, toolNames)
if result.PromptTokens > 0 || result.OutputTokens > 0 {
if usage, ok := responseObj["usage"].(map[string]any); ok {
if result.PromptTokens > 0 {
usage["input_tokens"] = result.PromptTokens
}
if result.OutputTokens > 0 {
usage["output_tokens"] = result.OutputTokens
}
input, _ := usage["input_tokens"].(int)
output, _ := usage["output_tokens"].(int)
usage["total_tokens"] = input + output
}
}
h.getResponseStore().put(owner, responseID, responseObj)
writeJSON(w, http.StatusOK, responseObj)
}

View File

@@ -51,8 +51,6 @@ type responsesStreamRuntime struct {
messagePartAdded bool
sequence int
failed bool
promptTokens int
outputTokens int
persistResponse func(obj map[string]any)
}
@@ -150,24 +148,6 @@ func (s *responsesStreamRuntime) finalize() {
s.closeIncompleteFunctionItems()
obj := s.buildCompletedResponseObject(finalThinking, finalText, detected)
if s.outputTokens > 0 {
if usage, ok := obj["usage"].(map[string]any); ok {
usage["output_tokens"] = s.outputTokens
}
}
if s.promptTokens > 0 || s.outputTokens > 0 {
if usage, ok := obj["usage"].(map[string]any); ok {
if s.promptTokens > 0 {
usage["input_tokens"] = s.promptTokens
}
if s.outputTokens > 0 {
usage["output_tokens"] = s.outputTokens
}
input, _ := usage["input_tokens"].(int)
output, _ := usage["output_tokens"].(int)
usage["total_tokens"] = input + output
}
}
if s.persistResponse != nil {
s.persistResponse(obj)
}
@@ -196,12 +176,6 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
if !parsed.Parsed {
return streamengine.ParsedDecision{}
}
if parsed.PromptTokens > 0 {
s.promptTokens = parsed.PromptTokens
}
if parsed.OutputTokens > 0 {
s.outputTokens = parsed.OutputTokens
}
if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop {
return streamengine.ParsedDecision{Stop: true}
}

View File

@@ -239,7 +239,7 @@ func TestChatCompletionsStreamContentFilterStopsNormallyWithoutLeak(t *testing.T
}
}
func TestResponsesStreamUsageOverridesFromBatchAccumulatedTokenUsage(t *testing.T) {
func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) {
statuses := make([]int, 0, 1)
h := &Handler{
Store: mockOpenAIConfig{wideInput: true},
@@ -282,12 +282,12 @@ func TestResponsesStreamUsageOverridesFromBatchAccumulatedTokenUsage(t *testing.
if usage == nil {
t.Fatalf("expected usage in response payload, got %#v", resp)
}
if got, _ := usage["output_tokens"].(float64); int(got) != 190 {
t.Fatalf("expected output_tokens=190, got %#v", usage["output_tokens"])
if got, _ := usage["output_tokens"].(float64); int(got) == 190 {
t.Fatalf("expected upstream accumulated token usage to be ignored, got %#v", usage["output_tokens"])
}
}
func TestResponsesNonStreamUsageOverridesPromptAndOutputTokenUsage(t *testing.T) {
func TestResponsesNonStreamUsageIgnoresPromptAndOutputTokenUsage(t *testing.T) {
statuses := make([]int, 0, 1)
h := &Handler{
Store: mockOpenAIConfig{wideInput: true},
@@ -322,13 +322,13 @@ func TestResponsesNonStreamUsageOverridesPromptAndOutputTokenUsage(t *testing.T)
if usage == nil {
t.Fatalf("expected usage object, got %#v", out)
}
if got, _ := usage["input_tokens"].(float64); int(got) != 11 {
t.Fatalf("expected input_tokens=11, got %#v", usage["input_tokens"])
input, _ := usage["input_tokens"].(float64)
output, _ := usage["output_tokens"].(float64)
total, _ := usage["total_tokens"].(float64)
if int(output) == 29 {
t.Fatalf("expected upstream completion token usage to be ignored, got %#v", usage["output_tokens"])
}
if got, _ := usage["output_tokens"].(float64); int(got) != 29 {
t.Fatalf("expected output_tokens=29, got %#v", usage["output_tokens"])
}
if got, _ := usage["total_tokens"].(float64); int(got) != 40 {
t.Fatalf("expected total_tokens=40, got %#v", usage["total_tokens"])
if int(total) != int(input)+int(output) {
t.Fatalf("expected total_tokens=input_tokens+output_tokens, usage=%#v", usage)
}
}

View File

@@ -37,7 +37,6 @@ func TestGoCompatSSEFixtures(t *testing.T) {
Finished bool `json:"finished"`
NewType string `json:"new_type"`
ContentFilter bool `json:"content_filter"`
OutputTokens int `json:"output_tokens"`
ErrorMessage string `json:"error_message"`
}
mustLoadJSON(t, expectedPath, &expected)
@@ -58,11 +57,10 @@ func TestGoCompatSSEFixtures(t *testing.T) {
res.Stop != expected.Finished ||
res.NextType != expected.NewType ||
res.ContentFilter != expected.ContentFilter ||
res.OutputTokens != expected.OutputTokens ||
res.ErrorMessage != expected.ErrorMessage {
t.Fatalf("fixture %s mismatch:\n got parts=%#v finished=%v newType=%q contentFilter=%v outputTokens=%d errorMessage=%q\nwant parts=%#v finished=%v newType=%q contentFilter=%v outputTokens=%d errorMessage=%q",
name, gotParts, res.Stop, res.NextType, res.ContentFilter, res.OutputTokens, res.ErrorMessage,
expected.Parts, expected.Finished, expected.NewType, expected.ContentFilter, expected.OutputTokens, expected.ErrorMessage)
t.Fatalf("fixture %s mismatch:\n got parts=%#v finished=%v newType=%q contentFilter=%v errorMessage=%q\nwant parts=%#v finished=%v newType=%q contentFilter=%v errorMessage=%q",
name, gotParts, res.Stop, res.NextType, res.ContentFilter, res.ErrorMessage,
expected.Parts, expected.Finished, expected.NewType, expected.ContentFilter, expected.ErrorMessage)
}
}
}

View File

@@ -443,70 +443,10 @@ function hasContentFilterStatusValue(v) {
}
function extractAccumulatedTokenUsage(chunk) {
const usage = findAccumulatedTokenUsage(chunk);
return usage || { prompt: 0, output: 0 };
}
function findAccumulatedTokenUsage(v) {
if (Array.isArray(v)) {
for (const item of v) {
const u = findAccumulatedTokenUsage(item);
if (u) return u;
}
return null;
}
if (!v || typeof v !== 'object') {
return null;
}
const pathValue = asString(v.p);
if (pathValue && pathValue.toLowerCase().includes('accumulated_token_usage')) {
const n = toInt(v.v);
if (n > 0) {
return { prompt: 0, output: n };
}
}
if (pathValue && pathValue.toLowerCase().includes('token_usage')) {
const u = v.v;
if (u && typeof u === 'object') {
const p = toInt(u.prompt_tokens);
const c = toInt(u.completion_tokens);
if (p > 0 || c > 0) {
return { prompt: p, output: c };
}
}
}
const direct = toInt(v.accumulated_token_usage);
if (direct > 0) {
return { prompt: 0, output: direct };
}
if (v.token_usage && typeof v.token_usage === 'object') {
const p = toInt(v.token_usage.prompt_tokens);
const c = toInt(v.token_usage.completion_tokens);
if (p > 0 || c > 0) {
return { prompt: p, output: c };
}
}
for (const value of Object.values(v)) {
const u = findAccumulatedTokenUsage(value);
if (u) return u;
}
return null;
}
function toInt(v) {
if (typeof v === 'number' && Number.isFinite(v)) {
return Math.trunc(v);
}
if (typeof v === 'string' && v.trim() !== '') {
const n = Number(v);
if (Number.isFinite(n)) {
return Math.trunc(n);
}
}
if (typeof v !== 'number') {
return 0;
}
return Number.isFinite(v) ? Math.trunc(v) : 0;
// 临时策略:忽略上游 usage 字段accumulated_token_usage / token_usage
// 统一使用内部估算计数,避免上下文累计口径误差。
void chunk;
return { prompt: 0, output: 0 };
}
function formatErrorMessage(v) {

View File

@@ -125,8 +125,6 @@ async function handleVercelStream(req, res, rawBody, payload) {
let currentType = thinkingEnabled ? 'thinking' : 'text';
let thinkingText = '';
let outputText = '';
let promptTokens = 0;
let outputTokens = 0;
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
const toolSieveState = createToolSieveState();
let toolCallsEmitted = false;
@@ -179,7 +177,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
created,
model,
choices: [{ delta: {}, index: 0, finish_reason: reason }],
usage: buildUsage(finalPrompt, thinkingText, outputText, outputTokens, promptTokens),
usage: buildUsage(finalPrompt, thinkingText, outputText),
});
if (!res.writableEnded && !res.destroyed) {
res.write('data: [DONE]\n\n');
@@ -228,12 +226,6 @@ async function handleVercelStream(req, res, rawBody, payload) {
if (!parsed.parsed) {
continue;
}
if (parsed.promptTokens > 0) {
promptTokens = parsed.promptTokens;
}
if (parsed.outputTokens > 0) {
outputTokens = parsed.outputTokens;
}
currentType = parsed.newType;
if (parsed.errorMessage) {
await finish('content_filter');

View File

@@ -12,8 +12,6 @@ import (
type CollectResult struct {
Text string
Thinking string
PromptTokens int
OutputTokens int
ContentFilter bool
}
@@ -29,8 +27,6 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
}
text := strings.Builder{}
thinking := strings.Builder{}
promptTokens := 0
outputTokens := 0
contentFilter := false
currentType := "text"
if thinkingEnabled {
@@ -42,12 +38,6 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
if !result.Parsed {
return true
}
if result.PromptTokens > 0 {
promptTokens = result.PromptTokens
}
if result.OutputTokens > 0 {
outputTokens = result.OutputTokens
}
if result.Stop {
if result.ContentFilter {
contentFilter = true
@@ -68,8 +58,6 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
return CollectResult{
Text: text.String(),
Thinking: thinking.String(),
PromptTokens: promptTokens,
OutputTokens: outputTokens,
ContentFilter: contentFilter,
}
}

View File

@@ -10,8 +10,6 @@ type LineResult struct {
ErrorMessage string
Parts []ContentPart
NextType string
PromptTokens int
OutputTokens int
}
// ParseDeepSeekContentLine centralizes one-line DeepSeek SSE parsing for both
@@ -21,9 +19,8 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri
if !parsed {
return LineResult{NextType: currentType}
}
promptTokens, outputTokens := extractAccumulatedTokenUsage(chunk)
if done {
return LineResult{Parsed: true, Stop: true, NextType: currentType, PromptTokens: promptTokens, OutputTokens: outputTokens}
return LineResult{Parsed: true, Stop: true, NextType: currentType}
}
if errObj, hasErr := chunk["error"]; hasErr {
return LineResult{
@@ -31,8 +28,6 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri
Stop: true,
ErrorMessage: fmt.Sprintf("%v", errObj),
NextType: currentType,
PromptTokens: promptTokens,
OutputTokens: outputTokens,
}
}
if code, _ := chunk["code"].(string); code == "content_filter" {
@@ -41,8 +36,6 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri
Stop: true,
ContentFilter: true,
NextType: currentType,
PromptTokens: promptTokens,
OutputTokens: outputTokens,
}
}
if hasContentFilterStatus(chunk) {
@@ -51,18 +44,14 @@ func ParseDeepSeekContentLine(raw []byte, thinkingEnabled bool, currentType stri
Stop: true,
ContentFilter: true,
NextType: currentType,
PromptTokens: promptTokens,
OutputTokens: outputTokens,
}
}
parts, finished, nextType := ParseSSEChunkForContent(chunk, thinkingEnabled, currentType)
parts = filterLeakedContentFilterParts(parts)
return LineResult{
Parsed: true,
Stop: finished,
Parts: parts,
NextType: nextType,
PromptTokens: promptTokens,
OutputTokens: outputTokens,
Parsed: true,
Stop: finished,
Parts: parts,
NextType: nextType,
}
}

View File

@@ -26,7 +26,7 @@ func TestParseDeepSeekContentLineContentFilter(t *testing.T) {
}
}
func TestParseDeepSeekContentLineContentFilterCodeIncludesOutputTokens(t *testing.T) {
func TestParseDeepSeekContentLineContentFilterCodeStops(t *testing.T) {
res := ParseDeepSeekContentLine(
[]byte(`data: {"code":"content_filter","accumulated_token_usage":99}`),
false, "text",
@@ -34,9 +34,6 @@ func TestParseDeepSeekContentLineContentFilterCodeIncludesOutputTokens(t *testin
if !res.Parsed || !res.Stop || !res.ContentFilter {
t.Fatalf("expected content-filter stop result: %#v", res)
}
if res.OutputTokens != 99 {
t.Fatalf("expected output token usage 99, got %d", res.OutputTokens)
}
}
func TestParseDeepSeekContentLineContentFilterStatus(t *testing.T) {
@@ -46,28 +43,25 @@ func TestParseDeepSeekContentLineContentFilterStatus(t *testing.T) {
}
}
func TestParseDeepSeekContentLineCapturesAccumulatedTokenUsage(t *testing.T) {
func TestParseDeepSeekContentLineIgnoresAccumulatedTokenUsage(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"p":"response","o":"BATCH","v":[{"p":"accumulated_token_usage","v":1383},{"p":"quasi_status","v":"FINISHED"}]}`), false, "text")
if res.OutputTokens != 1383 {
t.Fatalf("expected output token usage 1383, got %d", res.OutputTokens)
if !res.Parsed {
t.Fatalf("expected parsed result")
}
}
func TestParseDeepSeekContentLineCapturesAccumulatedTokenUsageString(t *testing.T) {
func TestParseDeepSeekContentLineIgnoresAccumulatedTokenUsageString(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"p":"response","o":"BATCH","v":[{"p":"accumulated_token_usage","v":"190"},{"p":"quasi_status","v":"FINISHED"}]}`), false, "text")
if res.OutputTokens != 190 {
t.Fatalf("expected output token usage 190, got %d", res.OutputTokens)
if !res.Parsed {
t.Fatalf("expected parsed result")
}
}
func TestParseDeepSeekContentLineErrorIncludesOutputTokens(t *testing.T) {
func TestParseDeepSeekContentLineErrorStops(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"error":"boom","accumulated_token_usage":123}`), false, "text")
if !res.Parsed || !res.Stop {
t.Fatalf("expected stop on error: %#v", res)
}
if res.OutputTokens != 123 {
t.Fatalf("expected output token usage 123 on error, got %d", res.OutputTokens)
}
}
func TestParseDeepSeekContentLineContent(t *testing.T) {

View File

@@ -3,8 +3,6 @@ package sse
import (
"bytes"
"encoding/json"
"math"
"strconv"
"strings"
"ds2api/internal/deepseek"
@@ -363,86 +361,3 @@ func hasContentFilterStatusValue(v any) bool {
}
return false
}
func extractAccumulatedTokenUsage(chunk map[string]any) (int, int) {
return findAccumulatedTokenUsage(chunk)
}
func findAccumulatedTokenUsage(v any) (int, int) {
switch x := v.(type) {
case map[string]any:
if p, _ := x["p"].(string); strings.Contains(strings.ToLower(p), "accumulated_token_usage") {
if n, ok := toInt(x["v"]); ok && n > 0 {
return 0, n
}
}
if p, _ := x["p"].(string); strings.Contains(strings.ToLower(p), "token_usage") {
if m, ok := x["v"].(map[string]any); ok {
p, _ := toInt(m["prompt_tokens"])
c, _ := toInt(m["completion_tokens"])
if p > 0 || c > 0 {
return p, c
}
}
}
if n, ok := toInt(x["accumulated_token_usage"]); ok && n > 0 {
return 0, n
}
if usage, ok := x["token_usage"].(map[string]any); ok {
p, _ := toInt(usage["prompt_tokens"])
c, _ := toInt(usage["completion_tokens"])
if p > 0 || c > 0 {
return p, c
}
}
for _, vv := range x {
if p, c := findAccumulatedTokenUsage(vv); p > 0 || c > 0 {
return p, c
}
}
case []any:
for _, item := range x {
if p, c := findAccumulatedTokenUsage(item); p > 0 || c > 0 {
return p, c
}
}
}
return 0, 0
}
func toInt(v any) (int, bool) {
switch x := v.(type) {
case int:
return x, true
case int32:
return int(x), true
case int64:
return int(x), true
case float64:
if math.IsNaN(x) || math.IsInf(x, 0) {
return 0, false
}
return int(x), true
case json.Number:
i, err := x.Int64()
if err != nil {
return 0, false
}
return int(i), true
case string:
s := strings.TrimSpace(x)
if s == "" {
return 0, false
}
if i, err := strconv.Atoi(s); err == nil {
return i, true
}
f, err := strconv.ParseFloat(s, 64)
if err != nil || math.IsNaN(f) || math.IsInf(f, 0) {
return 0, false
}
return int(f), true
default:
return 0, false
}
}

View File

@@ -19,20 +19,6 @@ func TestParseDeepSeekSSELineDone(t *testing.T) {
}
}
func TestExtractTokenUsage(t *testing.T) {
chunk := map[string]any{
"p": "response/token_usage",
"v": map[string]any{
"prompt_tokens": 123,
"completion_tokens": 456,
},
}
p, c := extractAccumulatedTokenUsage(chunk)
if p != 123 || c != 456 {
t.Fatalf("expected 123/456, got %d/%d", p, c)
}
}
func TestParseSSEChunkForContentSimple(t *testing.T) {
parts, finished, _ := ParseSSEChunkForContent(map[string]any{"v": "hello"}, false, "text")
if finished {

View File

@@ -1,134 +0,0 @@
package sse
import (
"bufio"
"encoding/json"
"errors"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
)
func TestRawStreamSamplesTokenReplay(t *testing.T) {
root := filepath.Join("..", "..", "tests", "raw_stream_samples")
entries, err := os.ReadDir(root)
if err != nil {
t.Fatalf("read samples root: %v", err)
}
found := 0
for _, entry := range entries {
if !entry.IsDir() {
continue
}
ssePath := filepath.Join(root, entry.Name(), "upstream.stream.sse")
if _, err := os.Stat(ssePath); err != nil {
continue
}
found++
t.Run(entry.Name(), func(t *testing.T) {
raw, err := os.ReadFile(ssePath)
if err != nil {
t.Fatalf("read sample: %v", err)
}
parsedTokens, expectedTokens, err := replayAndCollectTokens(string(raw))
if err != nil {
t.Fatalf("replay token collection failed: %v", err)
}
if expectedTokens <= 0 {
t.Fatalf("expected positive token usage from raw stream, got %d", expectedTokens)
}
if parsedTokens != expectedTokens {
t.Fatalf("token mismatch parsed=%d expected=%d", parsedTokens, expectedTokens)
}
})
}
if found == 0 {
t.Fatalf("no upstream.stream.sse samples found under %s", root)
}
}
func replayAndCollectTokens(raw string) (parsedTokens int, expectedTokens int, err error) {
currentType := "thinking"
scanner := bufio.NewScanner(strings.NewReader(raw))
scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if !strings.HasPrefix(line, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if payload == "" || payload == "[DONE]" || !strings.HasPrefix(payload, "{") {
continue
}
var chunk map[string]any
if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
continue
}
if n := rawAccumulatedTokenUsage(chunk); n > 0 {
expectedTokens = n
}
res := ParseDeepSeekContentLine([]byte(line), true, currentType)
currentType = res.NextType
if res.OutputTokens > 0 {
parsedTokens = res.OutputTokens
}
}
if scanErr := scanner.Err(); scanErr != nil {
if errors.Is(scanErr, bufio.ErrTooLong) {
return 0, 0, errors.New("raw stream line exceeds 2MiB scanner limit")
}
return 0, 0, scanErr
}
return parsedTokens, expectedTokens, nil
}
func rawAccumulatedTokenUsage(v any) int {
switch x := v.(type) {
case []any:
for _, item := range x {
if n := rawAccumulatedTokenUsage(item); n > 0 {
return n
}
}
case map[string]any:
if n := rawToInt(x["accumulated_token_usage"]); n > 0 {
return n
}
if p, _ := x["p"].(string); strings.Contains(strings.ToLower(strings.TrimSpace(p)), "accumulated_token_usage") {
if n := rawToInt(x["v"]); n > 0 {
return n
}
}
for _, vv := range x {
if n := rawAccumulatedTokenUsage(vv); n > 0 {
return n
}
}
}
return 0
}
func rawToInt(v any) int {
switch x := v.(type) {
case float64:
return int(x)
case int:
return x
case string:
s := strings.TrimSpace(x)
if s == "" {
return 0
}
if n, err := strconv.Atoi(s); err == nil {
return n
}
if f, err := strconv.ParseFloat(s, 64); err == nil {
return int(f)
}
}
return 0
}

View File

@@ -3,6 +3,6 @@
"finished": true,
"new_type": "text",
"content_filter": true,
"output_tokens": 77,
"output_tokens": 0,
"error_message": ""
}

View File

@@ -248,7 +248,7 @@ test('parseChunkForContent strips reference markers from fragment content', () =
assert.deepEqual(parsed.parts, [{ text: '广州天气 多云', type: 'text' }]);
});
test('parseChunkForContent detects content_filter status and carries output tokens', () => {
test('parseChunkForContent detects content_filter status and ignores upstream output tokens', () => {
const chunk = {
p: 'response',
v: [
@@ -260,7 +260,7 @@ test('parseChunkForContent detects content_filter status and carries output toke
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, true);
assert.equal(parsed.contentFilter, true);
assert.equal(parsed.outputTokens, 77);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});
@@ -275,11 +275,11 @@ test('parseChunkForContent keeps error branches distinct from content_filter sta
assert.equal(parsed.finished, true);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.errorMessage.length > 0, true);
assert.equal(parsed.outputTokens, 88);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent preserves output tokens on FINISHED lines', () => {
test('parseChunkForContent ignores output tokens on FINISHED lines', () => {
const parsed = parseChunkForContent(
{ p: 'response/status', v: 'FINISHED', accumulated_token_usage: 190 },
false,
@@ -288,11 +288,11 @@ test('parseChunkForContent preserves output tokens on FINISHED lines', () => {
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, true);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent captures output tokens from response BATCH status snapshots', () => {
test('parseChunkForContent ignores output tokens from response BATCH status snapshots', () => {
const parsed = parseChunkForContent(
{
p: 'response',
@@ -308,7 +308,7 @@ test('parseChunkForContent captures output tokens from response BATCH status sna
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, false);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});
@@ -321,7 +321,7 @@ test('parseChunkForContent matches FINISHED case-insensitively on status paths',
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, true);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});
@@ -334,7 +334,7 @@ test('parseChunkForContent filters INCOMPLETE status text without stopping strea
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, false);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.equal(parsed.outputTokens, 0);
assert.deepEqual(parsed.parts, []);
});

View File

@@ -31,7 +31,6 @@ test('js compat: sse fixtures', () => {
assert.equal(got.finished, expected.finished, `${name}: finished mismatch`);
assert.equal(got.newType, expected.new_type, `${name}: newType mismatch`);
assert.equal(Boolean(got.contentFilter), Boolean(expected.content_filter), `${name}: contentFilter mismatch`);
assert.equal(Number(got.outputTokens || 0), Number(expected.output_tokens || 0), `${name}: outputTokens mismatch`);
assert.equal(got.errorMessage || '', expected.error_message || '', `${name}: errorMessage mismatch`);
}
});

View File

@@ -20,7 +20,7 @@ function parseArgs(argv) {
failOnReferenceLeak: true,
failOnMissingFinish: true,
failOnBaselineMismatch: true,
failOnTokenMismatch: true,
failOnTokenMismatch: false,
showOutput: false,
writeReplayText: false,
};
@@ -44,6 +44,8 @@ function parseArgs(argv) {
out.failOnMissingFinish = false;
} else if (a === '--no-fail-on-baseline-mismatch' || a === '--no-fail-on-processed-mismatch') {
out.failOnBaselineMismatch = false;
} else if (a === '--fail-on-token-mismatch') {
out.failOnTokenMismatch = true;
} else if (a === '--no-fail-on-token-mismatch') {
out.failOnTokenMismatch = false;
} else if (a === '--show-output') {