mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-11 03:37:40 +08:00
Promote raw stream replay into standalone simulator tool and add SSE field doc
This commit is contained in:
@@ -17,6 +17,7 @@ const {
|
||||
normalizePreparedToolNames,
|
||||
boolDefaultTrue,
|
||||
filterIncrementalToolCallDeltasByAllowed,
|
||||
shouldSkipPath,
|
||||
} = handler.__test;
|
||||
|
||||
test('chat-stream exposes parser test hooks', () => {
|
||||
@@ -218,3 +219,9 @@ test('parseChunkForContent supports wrapped response.fragments object shape', ()
|
||||
assert.equal(parsed.finished, false);
|
||||
assert.equal(parsed.parts.map((p) => p.text).join(''), 'AB');
|
||||
});
|
||||
|
||||
test('shouldSkipPath skips dynamic response/fragments/*/status paths only', () => {
|
||||
assert.equal(shouldSkipPath('response/fragments/-16/status'), true);
|
||||
assert.equal(shouldSkipPath('response/fragments/8/status'), true);
|
||||
assert.equal(shouldSkipPath('response/status'), false);
|
||||
});
|
||||
|
||||
28
tests/raw_stream_samples/README.md
Normal file
28
tests/raw_stream_samples/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# 原始流数据样本目录
|
||||
|
||||
该目录用于存放**上游真实 SSE 原始流**样本,供本地仿真测试和解析适配使用。
|
||||
|
||||
## 目录规范
|
||||
|
||||
每个样本一个子目录:
|
||||
|
||||
- `meta.json`:样本元信息(问题、模型、采集时间、备注)
|
||||
- `upstream.stream.sse`:完整原始 SSE 文本(`event:` / `data:` 行)
|
||||
|
||||
## 扩展方式
|
||||
|
||||
1. 抓取一次真实请求(建议开启 `DS2API_DEV_PACKET_CAPTURE=1`)。
|
||||
2. 新建 `<sample-id>/` 目录并放入 `meta.json` + `upstream.stream.sse`。
|
||||
3. 运行独立仿真工具(可被其他测试脚本调用):
|
||||
|
||||
```bash
|
||||
./tests/scripts/run-raw-stream-sim.sh
|
||||
```
|
||||
|
||||
该工具会自动遍历本目录全部样本,按真实流顺序重放并验证:
|
||||
|
||||
- 不会把上游 `status=FINISHED` 片段当正文输出(防泄露)。
|
||||
- 能正确检测 `response/status=FINISHED` 流结束信号。
|
||||
- 生成可归档 JSON 报告(`artifacts/raw-stream-sim/`)。
|
||||
|
||||
> 注意:样本可能包含搜索结果正文与引用信息,请勿放入敏感账号/密钥。
|
||||
@@ -0,0 +1,55 @@
|
||||
# 样本分析(广州天气 / deepseek-reasoner-search)
|
||||
|
||||
- 样本来源:`/admin/dev/captures` 上游原始 SSE 抓包
|
||||
- 采集时间(UTC):2026-04-03 01:28:50
|
||||
- 原始字节数:41043
|
||||
- `FINISHED` 字符串出现次数:24
|
||||
- JSON `data:` chunk 数:420
|
||||
|
||||
## 事件分布
|
||||
|
||||
- `ready`: 1
|
||||
- `update_session`: 2
|
||||
- `finish`: 1
|
||||
|
||||
## 高频路径(Top 12)
|
||||
|
||||
- `response/fragments/-1/content`: 13
|
||||
- `response/fragments/-1`: 9
|
||||
- `response`: 5
|
||||
- `response/has_pending_fragment`: 4
|
||||
- `response/fragments/-1/elapsed_secs`: 3
|
||||
- `response/fragments/-5/status`: 2
|
||||
- `response/fragments/-6/status`: 2
|
||||
- `response/fragments/-3/status`: 2
|
||||
- `response/fragments/-1/status`: 2
|
||||
- `response/fragments/-4/status`: 2
|
||||
- `response/fragments/-2/status`: 2
|
||||
- `response/fragments/-5/results`: 1
|
||||
|
||||
## 关键泄露来源
|
||||
|
||||
以下状态路径会高频出现 `v=FINISHED`,如果解析器按普通文本透传,就会出现 `FINISHEDFINISHED...` 泄露:
|
||||
|
||||
- `response/fragments/-5/status`: 2
|
||||
- `response/fragments/-6/status`: 2
|
||||
- `response/fragments/-3/status`: 2
|
||||
- `response/fragments/-1/status`: 2
|
||||
- `response/fragments/-4/status`: 2
|
||||
- `response/fragments/-2/status`: 2
|
||||
- `response/fragments/-14/status`: 1
|
||||
- `response/fragments/-12/status`: 1
|
||||
- `response/fragments/-10/status`: 1
|
||||
- `response/fragments/-9/status`: 1
|
||||
- `response/fragments/-8/status`: 1
|
||||
- `response/fragments/-7/status`: 1
|
||||
- `response/fragments/-11/status`: 1
|
||||
- `response/fragments/-16/status`: 1
|
||||
- `response/fragments/-13/status`: 1
|
||||
- `response/fragments/-15/status`: 1
|
||||
|
||||
## 适配建议
|
||||
|
||||
1. 跳过 `response/fragments/<index>/status`(所有 index,而非仅 `-1/-2/-3`)。
|
||||
2. 保留 `response/status=FINISHED` 用于结束流判定,不应当输出正文。
|
||||
3. 在样本仿真测试中对全部样本执行“不得输出 `FINISHED`”断言。
|
||||
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"sample_id": "guangzhou-weather-reasoner-search-20260403",
|
||||
"captured_at_utc": "2026-04-03T01:28:50Z",
|
||||
"request": {
|
||||
"model": "deepseek-reasoner-search",
|
||||
"stream": true,
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "广州天气"
|
||||
}
|
||||
],
|
||||
"thinking_enabled": true,
|
||||
"search_enabled": true
|
||||
},
|
||||
"capture": {
|
||||
"label": "deepseek_completion",
|
||||
"url": "https://chat.deepseek.com/api/v0/chat/completion",
|
||||
"status_code": 200,
|
||||
"response_bytes": 41043,
|
||||
"contains_finished_token": true,
|
||||
"finished_token_count": 24
|
||||
},
|
||||
"notes": "Captured from upstream DeepSeek SSE via /admin/dev/captures with packet capture enabled. Account ID removed."
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
86
tests/scripts/capture-raw-stream-sample.sh
Executable file
86
tests/scripts/capture-raw-stream-sample.sh
Executable file
@@ -0,0 +1,86 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
|
||||
cd "$ROOT_DIR"
|
||||
|
||||
CONFIG_PATH="${1:-config.json}"
|
||||
SAMPLE_ID="${2:-sample-$(date -u +%Y%m%dT%H%M%SZ)}"
|
||||
QUESTION="${3:-广州天气}"
|
||||
MODEL="${4:-deepseek-reasoner-search}"
|
||||
API_KEY="${5:-}"
|
||||
ADMIN_KEY="${DS2API_ADMIN_KEY:-admin}"
|
||||
|
||||
if [[ -z "$API_KEY" ]]; then
|
||||
API_KEY="$(python3 - <<'PY' "$CONFIG_PATH"
|
||||
import json,sys
|
||||
cfg=json.load(open(sys.argv[1]))
|
||||
keys=cfg.get('keys') or []
|
||||
print(keys[0] if keys else '')
|
||||
PY
|
||||
)"
|
||||
fi
|
||||
|
||||
if [[ -z "$API_KEY" ]]; then
|
||||
echo "[capture] missing API key (pass as arg5 or set config.keys[0])" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
OUT_DIR="tests/raw_stream_samples/${SAMPLE_ID}"
|
||||
mkdir -p "$OUT_DIR"
|
||||
|
||||
cleanup() {
|
||||
pkill -f "cmd/ds2api" >/dev/null 2>&1 || true
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
DS2API_CONFIG_PATH="$CONFIG_PATH" \
|
||||
DS2API_ADMIN_KEY="$ADMIN_KEY" \
|
||||
DS2API_DEV_PACKET_CAPTURE=1 \
|
||||
DS2API_DEV_PACKET_CAPTURE_LIMIT=20 \
|
||||
go run ./cmd/ds2api >/tmp/ds2api_capture_server.log 2>&1 &
|
||||
|
||||
for _ in $(seq 1 120); do
|
||||
if curl -sSf http://127.0.0.1:5001/healthz >/dev/null 2>&1; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
curl -sS http://127.0.0.1:5001/v1/chat/completions \
|
||||
-H 'Content-Type: application/json' \
|
||||
-H "Authorization: Bearer ${API_KEY}" \
|
||||
-d "{\"model\":\"${MODEL}\",\"stream\":true,\"messages\":[{\"role\":\"user\",\"content\":\"${QUESTION}\"}]}" \
|
||||
>"${OUT_DIR}/openai.stream.sse" || true
|
||||
|
||||
curl -sS http://127.0.0.1:5001/admin/dev/captures \
|
||||
-H "Authorization: Bearer ${ADMIN_KEY}" \
|
||||
>"${OUT_DIR}/captures.json"
|
||||
|
||||
python3 - <<'PY' "$OUT_DIR" "$SAMPLE_ID" "$QUESTION" "$MODEL"
|
||||
import json,sys,pathlib,datetime
|
||||
out=pathlib.Path(sys.argv[1])
|
||||
sample_id,question,model=sys.argv[2:5]
|
||||
captures=json.loads((out/'captures.json').read_text())
|
||||
items=captures.get('items') or []
|
||||
if not items:
|
||||
raise SystemExit('no captured upstream stream found')
|
||||
best=max(items,key=lambda x:len((x.get('response_body') or '')))
|
||||
raw=best.get('response_body') or ''
|
||||
(out/'upstream.stream.sse').write_text(raw)
|
||||
meta={
|
||||
'sample_id':sample_id,
|
||||
'captured_at_utc':datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
|
||||
'request':{'model':model,'stream':True,'messages':[{'role':'user','content':question}]},
|
||||
'capture':{
|
||||
'label':best.get('label'),'url':best.get('url'),'status_code':best.get('status_code'),
|
||||
'response_bytes':len(raw),'contains_finished_token':('FINISHED' in raw),'finished_token_count':raw.count('FINISHED')
|
||||
}
|
||||
}
|
||||
(out/'meta.json').write_text(json.dumps(meta,ensure_ascii=False,indent=2))
|
||||
print(f'[capture] wrote sample to {out}')
|
||||
print(f'[capture] upstream bytes={len(raw)} finished_count={raw.count("FINISHED")}')
|
||||
PY
|
||||
|
||||
rm -f "${OUT_DIR}/captures.json"
|
||||
echo "[capture] done: ${OUT_DIR}"
|
||||
16
tests/scripts/run-raw-stream-sim.sh
Executable file
16
tests/scripts/run-raw-stream-sim.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
|
||||
cd "$ROOT_DIR"
|
||||
|
||||
REPORT_DIR="artifacts/raw-stream-sim"
|
||||
mkdir -p "$REPORT_DIR"
|
||||
REPORT_PATH="$REPORT_DIR/report-$(date -u +%Y%m%dT%H%M%SZ).json"
|
||||
|
||||
node tests/tools/deepseek-sse-simulator.mjs \
|
||||
--samples-root tests/raw_stream_samples \
|
||||
--report "$REPORT_PATH" \
|
||||
"$@"
|
||||
|
||||
echo "[run-raw-stream-sim] report: $REPORT_PATH"
|
||||
158
tests/tools/deepseek-sse-simulator.mjs
Executable file
158
tests/tools/deepseek-sse-simulator.mjs
Executable file
@@ -0,0 +1,158 @@
|
||||
#!/usr/bin/env node
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { createRequire } from 'node:module';
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
const chatStream = require('../../api/chat-stream.js');
|
||||
const { parseChunkForContent } = chatStream.__test;
|
||||
|
||||
function parseArgs(argv) {
|
||||
const out = {
|
||||
samplesRoot: 'tests/raw_stream_samples',
|
||||
reportPath: '',
|
||||
failOnLeak: true,
|
||||
failOnMissingFinish: true,
|
||||
};
|
||||
for (let i = 2; i < argv.length; i += 1) {
|
||||
const a = argv[i];
|
||||
if (a === '--samples-root' && argv[i + 1]) {
|
||||
out.samplesRoot = argv[++i];
|
||||
} else if (a === '--report' && argv[i + 1]) {
|
||||
out.reportPath = argv[++i];
|
||||
} else if (a === '--no-fail-on-leak') {
|
||||
out.failOnLeak = false;
|
||||
} else if (a === '--no-fail-on-missing-finish') {
|
||||
out.failOnMissingFinish = false;
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function findSampleDirs(root) {
|
||||
if (!fs.existsSync(root)) {
|
||||
return [];
|
||||
}
|
||||
return fs.readdirSync(root)
|
||||
.map((name) => path.join(root, name))
|
||||
.filter((p) => fs.statSync(p).isDirectory())
|
||||
.filter((p) => fs.existsSync(path.join(p, 'upstream.stream.sse')))
|
||||
.sort();
|
||||
}
|
||||
|
||||
function parseSSE(raw) {
|
||||
const events = [];
|
||||
for (const block of raw.split(/\r?\n\r?\n/)) {
|
||||
if (!block.trim()) {
|
||||
continue;
|
||||
}
|
||||
let eventType = 'message';
|
||||
const dataLines = [];
|
||||
for (const line of block.split(/\r?\n/)) {
|
||||
if (line.startsWith('event:')) {
|
||||
eventType = line.slice(6).trim() || 'message';
|
||||
} else if (line.startsWith('data:')) {
|
||||
dataLines.push(line.slice(5).trimStart());
|
||||
}
|
||||
}
|
||||
if (dataLines.length === 0) {
|
||||
continue;
|
||||
}
|
||||
const payload = dataLines.join('\n').trim();
|
||||
events.push({ event: eventType, payload });
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
function replaySample(raw) {
|
||||
const events = parseSSE(raw);
|
||||
let currentType = 'thinking';
|
||||
let sawFinish = false;
|
||||
let outputText = '';
|
||||
let parsedChunks = 0;
|
||||
|
||||
for (const evt of events) {
|
||||
if (evt.event === 'finish') {
|
||||
sawFinish = true;
|
||||
}
|
||||
if (!evt.payload || evt.payload === '[DONE]' || evt.payload[0] !== '{') {
|
||||
continue;
|
||||
}
|
||||
let obj;
|
||||
try {
|
||||
obj = JSON.parse(evt.payload);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
parsedChunks += 1;
|
||||
const parsed = parseChunkForContent(obj, true, currentType);
|
||||
currentType = parsed.newType;
|
||||
if (parsed.finished) {
|
||||
sawFinish = true;
|
||||
}
|
||||
for (const part of parsed.parts) {
|
||||
outputText += part.text;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
events: events.length,
|
||||
parsedChunks,
|
||||
sawFinish,
|
||||
leakedFinishedText: outputText.includes('FINISHED'),
|
||||
outputChars: outputText.length,
|
||||
};
|
||||
}
|
||||
|
||||
function main() {
|
||||
const opts = parseArgs(process.argv);
|
||||
const dirs = findSampleDirs(opts.samplesRoot);
|
||||
if (dirs.length === 0) {
|
||||
console.error(`[sim] no samples found: ${opts.samplesRoot}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const report = {
|
||||
generated_at: new Date().toISOString(),
|
||||
samples_root: opts.samplesRoot,
|
||||
total: dirs.length,
|
||||
failed: 0,
|
||||
samples: [],
|
||||
};
|
||||
|
||||
for (const dir of dirs) {
|
||||
const sampleID = path.basename(dir);
|
||||
const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8');
|
||||
const r = replaySample(raw);
|
||||
const errors = [];
|
||||
if (opts.failOnMissingFinish && !r.sawFinish) {
|
||||
errors.push('missing finish signal');
|
||||
}
|
||||
if (opts.failOnLeak && r.leakedFinishedText) {
|
||||
errors.push('FINISHED leaked into output text');
|
||||
}
|
||||
if (errors.length > 0) {
|
||||
report.failed += 1;
|
||||
}
|
||||
report.samples.push({ sample_id: sampleID, ...r, ok: errors.length === 0, errors });
|
||||
}
|
||||
|
||||
if (opts.reportPath) {
|
||||
fs.writeFileSync(opts.reportPath, JSON.stringify(report, null, 2));
|
||||
}
|
||||
|
||||
for (const s of report.samples) {
|
||||
const status = s.ok ? 'OK' : 'FAIL';
|
||||
const note = s.errors.length > 0 ? ` errors=${s.errors.join(';')}` : '';
|
||||
console.log(`[sim] ${status} ${s.sample_id} events=${s.events} parsed=${s.parsedChunks} chars=${s.outputChars}${note}`);
|
||||
}
|
||||
|
||||
if (report.failed > 0) {
|
||||
console.error(`[sim] ${report.failed}/${report.total} samples failed`);
|
||||
process.exit(2);
|
||||
}
|
||||
console.log(`[sim] all ${report.total} samples passed`);
|
||||
}
|
||||
|
||||
main();
|
||||
Reference in New Issue
Block a user