Merge pull request #205 from CJackHwang/dev

Merge pull request #204 from CJackHwang/codex/capture-raw-stream-data-with-search-model

Skip dynamic DeepSeek fragment status paths to avoid FINISHED leakage; add simulator, samples and tests
This commit is contained in:
CJACK.
2026-04-03 13:08:19 +08:00
committed by GitHub
14 changed files with 1412 additions and 2 deletions

View File

@@ -0,0 +1,82 @@
# DeepSeek SSE 流格式字段分析2026-04-03
> 日期2026-04-03UTC
>
> 样本:`tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260403/upstream.stream.sse`
>
> 模型:`deepseek-reasoner-search`(搜索 + 思考)
## 1. SSE 事件层结构
原始流由标准 SSE 帧组成,常见形态:
```text
event: <type>
data: <json or text>
```
样本中主要 `event` 类型:
- `ready`:流建立后返回请求/响应消息 ID。
- `update_session`:会话时间戳更新。
- `finish`:流式阶段结束。
- (无 `event` 时)默认为 message 事件,`data:` 中承载主要增量数据。
## 2. `data` JSON 常见字段
上游增量主体多为 JSON Patch 风格对象:
- `p`path字段路径`response/fragments/-1/content`
- `o`op可选操作类型常见 `SET` / `APPEND` / `BATCH`
- `v`value字符串、布尔、对象、数组都可能
示例(语义):
- `{"p":"response/fragments/-1/content","o":"APPEND","v":"..."}`
- `{"p":"response/fragments/-16/status","v":"FINISHED"}`
- `{"p":"response/status","o":"SET","v":"FINISHED"}`
## 3. 搜索+思考场景关键路径
### 3.1 文本内容
- `response/fragments/<idx>/content`
- `response/content`
- `response/thinking_content`
- `response/fragments``APPEND` + fragment 数组)
### 3.2 搜索相关
- `response/fragments/<idx>/results`(检索结果数组)
- `response/search_status`(检索状态,建议跳过展示)
### 3.3 状态相关(重点)
- `response/status = FINISHED`**最终结束信号**(需要保留用于结束判定)
- `response/fragments/<idx>/status = FINISHED`**分片级状态**(高频,建议跳过输出)
- `response/quasi_status`:过程状态(建议跳过输出)
## 4. 泄露问题根因FINISHED 重复)
在搜索 + 思考模型中,`response/fragments/<idx>/status` 会出现大量不同 `<idx>`(例如 `-1/-2/-3/-16...`)的 `FINISHED`
若只过滤固定少量索引(例如仅 `-1/-2/-3`),其他索引的状态会当普通文本透传,导致前端出现:
- `FINISHEDFINISHEDFINISHED...`
## 5. 适配建议(已落地)
1. 跳过所有 `response/fragments/-?\d+/status`
2. 继续保留 `response/status=FINISHED` 作为真正结束判定。
3. 通过独立仿真工具持续回放全部样本,作为回归门禁:
```bash
./tests/scripts/run-raw-stream-sim.sh
```
## 6. 后续扩展建议
- 增加不同模型(`deepseek-chat-search` / 非 search / 非 thinking样本。
- 增加异常样本限流、中断、content_filter、空结果
- 为仿真报告加入字段覆盖率统计(路径频次、事件频次、终止路径命中率)。

View File

@@ -226,6 +226,17 @@ node --test tests/node/stream-tool-sieve.test.js
go run ./cmd/ds2api-tests --no-preflight
```
### 运行原始流仿真(独立工具)
```bash
./tests/scripts/run-raw-stream-sim.sh
```
说明:
- 该工具会重放 `tests/raw_stream_samples` 下全部样本,按上游 SSE 顺序做 1:1 仿真解析。
- 默认校验不出现 `FINISHED` 文本泄露,并要求存在结束信号。
- 结果会写入 `artifacts/raw-stream-sim/*.json`,可供其他测试脚本或排障流程复用。
### 指定输出目录和超时
```bash

View File

@@ -59,8 +59,9 @@ async function handler(req, res) {
return;
}
// Keep all non-stream behavior on Go side to avoid compatibility regressions.
if (!toBool(payload.stream)) {
// Keep all non-stream behavior and non-OpenAI-chat paths on Go side to avoid
// protocol-shape regressions (e.g. Gemini/Claude clients expecting their own formats).
if (!toBool(payload.stream) || !isNodeStreamSupportedPath(req.url || '')) {
await proxyToGo(req, res, rawBody);
return;
}
@@ -76,6 +77,23 @@ function isVercelRuntime() {
return asString(process.env.VERCEL) !== '' || asString(process.env.NOW_REGION) !== '';
}
function isNodeStreamSupportedPath(rawURL) {
const path = extractPathname(rawURL);
return path === '/v1/chat/completions';
}
function extractPathname(rawURL) {
const text = asString(rawURL);
if (!text) {
return '';
}
const q = text.indexOf('?');
if (q >= 0) {
return text.slice(0, q);
}
return text;
}
module.exports = handler;
module.exports.__test = {
@@ -89,4 +107,6 @@ module.exports.__test = {
boolDefaultTrue,
filterIncrementalToolCallDeltasByAllowed,
estimateTokens,
isNodeStreamSupportedPath,
extractPathname,
};

View File

@@ -193,6 +193,9 @@ function extractContentRecursive(items, defaultType) {
}
function shouldSkipPath(pathValue) {
if (isFragmentStatusPath(pathValue)) {
return true;
}
if (SKIP_EXACT_PATHS.has(pathValue)) {
return true;
}
@@ -204,6 +207,13 @@ function shouldSkipPath(pathValue) {
return false;
}
function isFragmentStatusPath(pathValue) {
if (!pathValue || pathValue === 'response/status') {
return false;
}
return /^response\/fragments\/-?\d+\/status$/i.test(pathValue);
}
function isCitation(text) {
return asString(text).trim().startsWith('[citation:');
}
@@ -225,5 +235,6 @@ module.exports = {
parseChunkForContent,
extractContentRecursive,
shouldSkipPath,
isFragmentStatusPath,
isCitation,
};

View File

@@ -31,6 +31,9 @@ func ParseDeepSeekSSELine(raw []byte) (map[string]any, bool, bool) {
}
func shouldSkipPath(path string) bool {
if isFragmentStatusPath(path) {
return true
}
if _, ok := deepseek.SkipExactPathSet[path]; ok {
return true
}
@@ -42,6 +45,31 @@ func shouldSkipPath(path string) bool {
return false
}
func isFragmentStatusPath(path string) bool {
if path == "" || path == "response/status" {
return false
}
if !strings.HasPrefix(path, "response/fragments/") || !strings.HasSuffix(path, "/status") {
return false
}
mid := strings.TrimSuffix(strings.TrimPrefix(path, "response/fragments/"), "/status")
if mid == "" {
return false
}
if strings.HasPrefix(mid, "-") {
mid = mid[1:]
}
if mid == "" {
return false
}
for _, r := range mid {
if r < '0' || r > '9' {
return false
}
}
return true
}
func ParseSSEChunkForContent(chunk map[string]any, thinkingEnabled bool, currentFragmentType string) ([]ContentPart, bool, string) {
v, ok := chunk["v"]
if !ok {

View File

@@ -90,6 +90,15 @@ func TestShouldSkipPathFragmentStatus(t *testing.T) {
if !shouldSkipPath("response/fragments/-3/status") {
t.Fatal("expected skip for fragment -3 status")
}
if !shouldSkipPath("response/fragments/-16/status") {
t.Fatal("expected skip for fragment -16 status")
}
if !shouldSkipPath("response/fragments/7/status") {
t.Fatal("expected skip for fragment 7 status")
}
if shouldSkipPath("response/status") {
t.Fatal("expected response/status to be handled by finish logic, not skipped")
}
}
func TestShouldSkipPathRegularContent(t *testing.T) {

View File

@@ -17,6 +17,9 @@ const {
normalizePreparedToolNames,
boolDefaultTrue,
filterIncrementalToolCallDeltasByAllowed,
shouldSkipPath,
isNodeStreamSupportedPath,
extractPathname,
} = handler.__test;
test('chat-stream exposes parser test hooks', () => {
@@ -218,3 +221,21 @@ test('parseChunkForContent supports wrapped response.fragments object shape', ()
assert.equal(parsed.finished, false);
assert.equal(parsed.parts.map((p) => p.text).join(''), 'AB');
});
test('shouldSkipPath skips dynamic response/fragments/*/status paths only', () => {
assert.equal(shouldSkipPath('response/fragments/-16/status'), true);
assert.equal(shouldSkipPath('response/fragments/8/status'), true);
assert.equal(shouldSkipPath('response/status'), false);
});
test('node stream path guard only allows /v1/chat/completions', () => {
assert.equal(isNodeStreamSupportedPath('/v1/chat/completions'), true);
assert.equal(isNodeStreamSupportedPath('/v1/chat/completions?x=1'), true);
assert.equal(isNodeStreamSupportedPath('/v1beta/models/gemini-2.5-flash:streamGenerateContent'), false);
assert.equal(isNodeStreamSupportedPath('/anthropic/v1/messages'), false);
});
test('extractPathname strips query only', () => {
assert.equal(extractPathname('/v1/chat/completions?stream=true'), '/v1/chat/completions');
assert.equal(extractPathname('/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=1'), '/v1beta/models/gemini-2.5-flash:streamGenerateContent');
});

View File

@@ -0,0 +1,28 @@
# 原始流数据样本目录
该目录用于存放**上游真实 SSE 原始流**样本,供本地仿真测试和解析适配使用。
## 目录规范
每个样本一个子目录:
- `meta.json`:样本元信息(问题、模型、采集时间、备注)
- `upstream.stream.sse`:完整原始 SSE 文本(`event:` / `data:` 行)
## 扩展方式
1. 抓取一次真实请求(建议开启 `DS2API_DEV_PACKET_CAPTURE=1`)。
2. 新建 `<sample-id>/` 目录并放入 `meta.json` + `upstream.stream.sse`
3. 运行独立仿真工具(可被其他测试脚本调用):
```bash
./tests/scripts/run-raw-stream-sim.sh
```
该工具会自动遍历本目录全部样本,按真实流顺序重放并验证:
- 不会把上游 `status=FINISHED` 片段当正文输出(防泄露)。
- 能正确检测 `response/status=FINISHED` 流结束信号。
- 生成可归档 JSON 报告(`artifacts/raw-stream-sim/`)。
> 注意:样本可能包含搜索结果正文与引用信息,请勿放入敏感账号/密钥。

View File

@@ -0,0 +1,55 @@
# 样本分析(广州天气 / deepseek-reasoner-search
- 样本来源:`/admin/dev/captures` 上游原始 SSE 抓包
- 采集时间UTC2026-04-03 01:28:50
- 原始字节数41043
- `FINISHED` 字符串出现次数24
- JSON `data:` chunk 数420
## 事件分布
- `ready`: 1
- `update_session`: 2
- `finish`: 1
## 高频路径Top 12
- `response/fragments/-1/content`: 13
- `response/fragments/-1`: 9
- `response`: 5
- `response/has_pending_fragment`: 4
- `response/fragments/-1/elapsed_secs`: 3
- `response/fragments/-5/status`: 2
- `response/fragments/-6/status`: 2
- `response/fragments/-3/status`: 2
- `response/fragments/-1/status`: 2
- `response/fragments/-4/status`: 2
- `response/fragments/-2/status`: 2
- `response/fragments/-5/results`: 1
## 关键泄露来源
以下状态路径会高频出现 `v=FINISHED`,如果解析器按普通文本透传,就会出现 `FINISHEDFINISHED...` 泄露:
- `response/fragments/-5/status`: 2
- `response/fragments/-6/status`: 2
- `response/fragments/-3/status`: 2
- `response/fragments/-1/status`: 2
- `response/fragments/-4/status`: 2
- `response/fragments/-2/status`: 2
- `response/fragments/-14/status`: 1
- `response/fragments/-12/status`: 1
- `response/fragments/-10/status`: 1
- `response/fragments/-9/status`: 1
- `response/fragments/-8/status`: 1
- `response/fragments/-7/status`: 1
- `response/fragments/-11/status`: 1
- `response/fragments/-16/status`: 1
- `response/fragments/-13/status`: 1
- `response/fragments/-15/status`: 1
## 适配建议
1. 跳过 `response/fragments/<index>/status`(所有 index而非仅 `-1/-2/-3`)。
2. 保留 `response/status=FINISHED` 用于结束流判定,不应当输出正文。
3. 在样本仿真测试中对全部样本执行“不得输出 `FINISHED`”断言。

View File

@@ -0,0 +1,25 @@
{
"sample_id": "guangzhou-weather-reasoner-search-20260403",
"captured_at_utc": "2026-04-03T01:28:50Z",
"request": {
"model": "deepseek-reasoner-search",
"stream": true,
"messages": [
{
"role": "user",
"content": "广州天气"
}
],
"thinking_enabled": true,
"search_enabled": true
},
"capture": {
"label": "deepseek_completion",
"url": "https://chat.deepseek.com/api/v0/chat/completion",
"status_code": 200,
"response_bytes": 41043,
"contains_finished_token": true,
"finished_token_count": 24
},
"notes": "Captured from upstream DeepSeek SSE via /admin/dev/captures with packet capture enabled. Account ID removed."
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
cd "$ROOT_DIR"
CONFIG_PATH="${1:-config.json}"
SAMPLE_ID="${2:-sample-$(date -u +%Y%m%dT%H%M%SZ)}"
QUESTION="${3:-广州天气}"
MODEL="${4:-deepseek-reasoner-search}"
API_KEY="${5:-}"
ADMIN_KEY="${DS2API_ADMIN_KEY:-admin}"
if [[ -z "$API_KEY" ]]; then
API_KEY="$(python3 - <<'PY' "$CONFIG_PATH"
import json,sys
cfg=json.load(open(sys.argv[1]))
keys=cfg.get('keys') or []
print(keys[0] if keys else '')
PY
)"
fi
if [[ -z "$API_KEY" ]]; then
echo "[capture] missing API key (pass as arg5 or set config.keys[0])" >&2
exit 1
fi
OUT_DIR="tests/raw_stream_samples/${SAMPLE_ID}"
mkdir -p "$OUT_DIR"
cleanup() {
pkill -f "cmd/ds2api" >/dev/null 2>&1 || true
}
trap cleanup EXIT
DS2API_CONFIG_PATH="$CONFIG_PATH" \
DS2API_ADMIN_KEY="$ADMIN_KEY" \
DS2API_DEV_PACKET_CAPTURE=1 \
DS2API_DEV_PACKET_CAPTURE_LIMIT=20 \
go run ./cmd/ds2api >/tmp/ds2api_capture_server.log 2>&1 &
for _ in $(seq 1 120); do
if curl -sSf http://127.0.0.1:5001/healthz >/dev/null 2>&1; then
break
fi
sleep 1
done
REQUEST_BODY="$(python3 - <<'PY' "$MODEL" "$QUESTION"
import json,sys
model,question=sys.argv[1:3]
payload={
'model':model,
'stream':True,
'messages':[{'role':'user','content':question}],
}
print(json.dumps(payload, ensure_ascii=False))
PY
)"
curl -sS http://127.0.0.1:5001/v1/chat/completions \
-H 'Content-Type: application/json' \
-H "Authorization: Bearer ${API_KEY}" \
--data-binary "${REQUEST_BODY}" \
>"${OUT_DIR}/openai.stream.sse"
curl -sS http://127.0.0.1:5001/admin/dev/captures \
-H "Authorization: Bearer ${ADMIN_KEY}" \
>"${OUT_DIR}/captures.json"
python3 - <<'PY' "$OUT_DIR" "$SAMPLE_ID" "$QUESTION" "$MODEL"
import json,sys,pathlib,datetime
out=pathlib.Path(sys.argv[1])
sample_id,question,model=sys.argv[2:5]
captures=json.loads((out/'captures.json').read_text())
items=captures.get('items') or []
if not items:
raise SystemExit('no captured upstream stream found')
best=max(items,key=lambda x:len((x.get('response_body') or '')))
raw=best.get('response_body') or ''
(out/'upstream.stream.sse').write_text(raw)
meta={
'sample_id':sample_id,
'captured_at_utc':datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'request':{'model':model,'stream':True,'messages':[{'role':'user','content':question}]},
'capture':{
'label':best.get('label'),'url':best.get('url'),'status_code':best.get('status_code'),
'response_bytes':len(raw),'contains_finished_token':('FINISHED' in raw),'finished_token_count':raw.count('FINISHED')
}
}
(out/'meta.json').write_text(json.dumps(meta,ensure_ascii=False,indent=2))
print(f'[capture] wrote sample to {out}')
print(f'[capture] upstream bytes={len(raw)} finished_count={raw.count("FINISHED")}')
PY
rm -f "${OUT_DIR}/captures.json"
echo "[capture] done: ${OUT_DIR}"

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
cd "$ROOT_DIR"
REPORT_DIR="artifacts/raw-stream-sim"
mkdir -p "$REPORT_DIR"
REPORT_PATH="$REPORT_DIR/report-$(date -u +%Y%m%dT%H%M%SZ).json"
node tests/tools/deepseek-sse-simulator.mjs \
--samples-root tests/raw_stream_samples \
--report "$REPORT_PATH" \
"$@"
echo "[run-raw-stream-sim] report: $REPORT_PATH"

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env node
import fs from 'node:fs';
import path from 'node:path';
import process from 'node:process';
import { createRequire } from 'node:module';
const require = createRequire(import.meta.url);
const chatStream = require('../../api/chat-stream.js');
const { parseChunkForContent } = chatStream.__test;
function parseArgs(argv) {
const out = {
samplesRoot: 'tests/raw_stream_samples',
reportPath: '',
failOnLeak: true,
failOnMissingFinish: true,
};
for (let i = 2; i < argv.length; i += 1) {
const a = argv[i];
if (a === '--samples-root' && argv[i + 1]) {
out.samplesRoot = argv[++i];
} else if (a === '--report' && argv[i + 1]) {
out.reportPath = argv[++i];
} else if (a === '--no-fail-on-leak') {
out.failOnLeak = false;
} else if (a === '--no-fail-on-missing-finish') {
out.failOnMissingFinish = false;
}
}
return out;
}
function findSampleDirs(root) {
if (!fs.existsSync(root)) {
return [];
}
return fs.readdirSync(root)
.map((name) => path.join(root, name))
.filter((p) => fs.statSync(p).isDirectory())
.filter((p) => fs.existsSync(path.join(p, 'upstream.stream.sse')))
.sort();
}
function parseSSE(raw) {
const events = [];
for (const block of raw.split(/\r?\n\r?\n/)) {
if (!block.trim()) {
continue;
}
let eventType = 'message';
const dataLines = [];
for (const line of block.split(/\r?\n/)) {
if (line.startsWith('event:')) {
eventType = line.slice(6).trim() || 'message';
} else if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trimStart());
}
}
if (dataLines.length === 0) {
continue;
}
const payload = dataLines.join('\n').trim();
events.push({ event: eventType, payload });
}
return events;
}
function replaySample(raw) {
const events = parseSSE(raw);
let currentType = 'thinking';
let sawFinish = false;
let outputText = '';
let parsedChunks = 0;
for (const evt of events) {
if (evt.event === 'finish') {
sawFinish = true;
}
if (!evt.payload || evt.payload === '[DONE]' || evt.payload[0] !== '{') {
continue;
}
let obj;
try {
obj = JSON.parse(evt.payload);
} catch {
continue;
}
parsedChunks += 1;
const parsed = parseChunkForContent(obj, true, currentType);
currentType = parsed.newType;
if (parsed.finished) {
sawFinish = true;
}
for (const part of parsed.parts) {
outputText += part.text;
}
}
return {
events: events.length,
parsedChunks,
sawFinish,
leakedFinishedText: outputText.includes('FINISHED'),
outputChars: outputText.length,
};
}
function main() {
const opts = parseArgs(process.argv);
const dirs = findSampleDirs(opts.samplesRoot);
if (dirs.length === 0) {
console.error(`[sim] no samples found: ${opts.samplesRoot}`);
process.exit(1);
}
const report = {
generated_at: new Date().toISOString(),
samples_root: opts.samplesRoot,
total: dirs.length,
failed: 0,
samples: [],
};
for (const dir of dirs) {
const sampleID = path.basename(dir);
const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8');
const r = replaySample(raw);
const errors = [];
if (opts.failOnMissingFinish && !r.sawFinish) {
errors.push('missing finish signal');
}
if (opts.failOnLeak && r.leakedFinishedText) {
errors.push('FINISHED leaked into output text');
}
if (errors.length > 0) {
report.failed += 1;
}
report.samples.push({ sample_id: sampleID, ...r, ok: errors.length === 0, errors });
}
if (opts.reportPath) {
fs.writeFileSync(opts.reportPath, JSON.stringify(report, null, 2));
}
for (const s of report.samples) {
const status = s.ok ? 'OK' : 'FAIL';
const note = s.errors.length > 0 ? ` errors=${s.errors.join(';')}` : '';
console.log(`[sim] ${status} ${s.sample_id} events=${s.events} parsed=${s.parsedChunks} chars=${s.outputChars}${note}`);
}
if (report.failed > 0) {
console.error(`[sim] ${report.failed}/${report.total} samples failed`);
process.exit(2);
}
console.log(`[sim] all ${report.total} samples passed`);
}
main();