Merge pull request #215 from CJackHwang/dev

fix: ignore INCOMPLETE status messages in SSE stream parsing to prevent stream interruption
This commit is contained in:
CJACK.
2026-04-06 00:31:18 +08:00
committed by GitHub
15 changed files with 709 additions and 1173 deletions

View File

@@ -163,7 +163,7 @@ cp config.example.json config.json
### 方式一:本地运行
**前置要求**Go 1.26+Node.js 20+(仅在需要构建 WebUI 时)
**前置要求**Go 1.26+Node.js `20.19+` 或 `22.12+`(仅在需要构建 WebUI 时)
```bash
# 1. 克隆仓库
@@ -178,7 +178,9 @@ cp config.example.json config.json
go run ./cmd/ds2api
```
默认监听地址:`http://localhost:5001`
默认本地访问地址:`http://127.0.0.1:5001`
服务实际绑定:`0.0.0.0:5001`,因此同一局域网设备通常也可以通过你的内网 IP 访问。
> **WebUI 自动构建**:本地首次启动时,若 `static/admin` 不存在,会自动尝试执行 `npm ci`(仅在缺少依赖时)和 `npm run build -- --outDir static/admin --emptyOutDir`(需要本机有 Node.js。你也可以手动构建`./scripts/build-webui.sh`

View File

@@ -163,7 +163,7 @@ Recommended per deployment mode:
### Option 1: Local Run
**Prerequisites**: Go 1.26+, Node.js 20+ (only if building WebUI locally)
**Prerequisites**: Go 1.26+, Node.js `20.19+` or `22.12+` (only if building WebUI locally)
```bash
# 1. Clone
@@ -178,7 +178,9 @@ cp config.example.json config.json
go run ./cmd/ds2api
```
Default URL: `http://localhost:5001`
Default local URL: `http://127.0.0.1:5001`
The server actually binds to `0.0.0.0:5001`, so devices on the same LAN can usually reach it through your private IP as well.
> **WebUI auto-build**: On first local startup, if `static/admin` is missing, DS2API will auto-run `npm ci` (only when dependencies are missing) and `npm run build -- --outDir static/admin --emptyOutDir` (requires Node.js). You can also build manually: `./scripts/build-webui.sh`

View File

@@ -9,7 +9,7 @@ Thanks for your interest in contributing to DS2API!
### Prerequisites
- Go 1.26+
- Node.js 20+ (for WebUI development)
- Node.js `20.19+` or `22.12+` (for WebUI development)
- npm (bundled with Node.js)
### Backend Development
@@ -25,7 +25,8 @@ cp config.example.json config.json
# 3. Run backend
go run ./cmd/ds2api
# Default: http://localhost:5001
# Local access: http://127.0.0.1:5001
# Actual bind: 0.0.0.0:5001, so LAN access is available via your private IP
```
### Frontend Development (WebUI)

View File

@@ -9,7 +9,7 @@
### 前置要求
- Go 1.26+
- Node.js 20+WebUI 开发时)
- Node.js `20.19+``22.12+`WebUI 开发时)
- npm随 Node.js 提供)
### 后端开发
@@ -25,7 +25,8 @@ cp config.example.json config.json
# 3. 启动后端
go run ./cmd/ds2api
# 默认监听 http://localhost:5001
# 本地访问 http://127.0.0.1:5001
# 实际绑定 0.0.0.0:5001可通过局域网 IP 访问
```
### 前端开发WebUI

View File

@@ -25,7 +25,7 @@ This guide covers all deployment methods for the current Go-based codebase.
| Dependency | Minimum Version | Notes |
| --- | --- | --- |
| Go | 1.26+ | Build backend |
| Node.js | 20+ | Only needed to build WebUI locally |
| Node.js | `20.19+` or `22.12+` | Only needed to build WebUI locally |
| npm | Bundled with Node.js | Install WebUI dependencies |
Config source (choose one):

View File

@@ -25,7 +25,7 @@
| 依赖 | 最低版本 | 说明 |
| --- | --- | --- |
| Go | 1.26+ | 编译后端 |
| Node.js | 20+ | 仅在需要本地构建 WebUI 时 |
| Node.js | `20.19+``22.12+` | 仅在需要本地构建 WebUI 时 |
| npm | 随 Node.js 提供 | 安装 WebUI 依赖 |
配置来源(任选其一):

View File

@@ -2,6 +2,7 @@
> 说明:本文基于当前仓库 `tests/raw_stream_samples/` 下全部 `upstream.stream.sse` 原始流样本整理而成,属于第三方逆向观察文档,不是官方协议。
> 当前 corpus 由 4 份原始流组成,覆盖搜索+引用、风控终态、Markdown 输出和空格敏感输出等行为。
> 补充:文末还会注明少量“当前实现已确认、但 corpus 尚未完整覆盖”的行为,例如长思考场景下的自动续写状态。
## 1. 样本覆盖
@@ -143,6 +144,12 @@ close
这类路径决定流是否结束、是否被风控、是否还有待处理片段。它们不应作为正文输出。
尤其是 `response/status` / `status` 这类路径上的字符串值,应被视为控制信号而不是文本 token。当前已确认需要特殊对待的值包括
- `FINISHED`:正常完成终态,应触发收口。
- `CONTENT_FILTER`:风控终态,应走拒答/模板分支。
- `WIP` / `INCOMPLETE` / `AUTO_CONTINUE`:未完成但可继续生成的中间状态,不应直接输出给客户端。
### 6.4 统计与进度路径
- `accumulated_token_usage`
@@ -221,6 +228,21 @@ close
{"event":"finish"}
```
### 8.3 自动续写中间态(实现补充)
这部分不是当前 corpus 的直接覆盖项,而是 2026-04-05 在长思考实测中观察到、且已在当前实现中兼容的行为:
1. 上游可能先把 `response/status` 或 envelope 内的 `response.status` 置为 `WIP` / `INCOMPLETE`
2. 有时还会伴随 `auto_continue: true`
3. 这表示当前轮输出尚未真正结束,客户端或代理层可以继续调用 continue 接口续写同一条回答。
4. 续写后的内容会承接之前的思考与正文,不应把前一轮状态值泄露成可见文本。
对第三方实现,建议把这一类状态统一当作“可继续的控制信号”:
- 可以据此决定是否继续拉取后续流。
- 不能把 `INCOMPLETE``WIP``AUTO_CONTINUE` 直接拼接到最终文本。
- `finish` 事件本身也不能单独说明回答已完全结束,仍要结合状态字段判断。
## 9. 文本重建规则
如果你的目标是把流重建成最终可见文本,必须遵守下面这些规则:
@@ -231,6 +253,7 @@ close
- 不要合并连续空格、换行或 Markdown 符号附近的空白。
- 不要把 `[reference:N]` 视为协议元数据,它在当前 corpus 里就是正文的一部分。
- 如果你要屏蔽引用标记,应当把它做成可配置的后处理,而不是在解析阶段硬删。
- `response/status` / `status` 路径上的状态字符串不应进入正文,即使它们不是终态。
这点对 Markdown、代码块、引用、表格都很关键。样本里已经证明`#``-``>``|` 这类符号后面的空格必须原样保留,否则渲染结果会变形。
@@ -249,8 +272,10 @@ parse SSE block
-> 识别 event
-> 解析 JSON payload
-> 更新状态树
-> 识别 status / quasi_status / auto_continue 等控制信号
-> 判定是否有可见文本
-> 追加到输出缓冲
-> 遇到 WIP / INCOMPLETE / AUTO_CONTINUE 时决定是否续写
-> 遇到 FINISHED / CONTENT_FILTER / finish 时收口
```
@@ -271,6 +296,10 @@ parse SSE block
- `message` 是主体承载层,`ready` / `update_session` / `finish` / `title` / `close` 是控制层。
- `fragment.type` 是可视化和工具链分层的关键,不应只靠 `p` 路径判断。
结合 2026-04-05 的长思考实测,还可以补充一条当前实现层面的结论:
- 长思考场景下,上游可能先给出 `INCOMPLETE` / `WIP` / `AUTO_CONTINUE` 状态,再通过 continue 链路续写;这些状态值本身不应作为正文输出。
## 12. 适用边界
本文是基于当前 corpus 的逆向说明,不是恒定协议。
@@ -278,6 +307,7 @@ parse SSE block
- 新模型可能增加新的 `p` 路径。
- 新版本可能增加新的 fragment.type。
- `CONTENT_FILTER` 的终态模板内容可能变化。
- 自动续写相关状态(如 `INCOMPLETE` / `AUTO_CONTINUE`)当前主要来自实测与实现兼容逻辑,后续字段形态仍可能变化。
- 解析器应当对未知字段、未知路径、未知事件保持容忍。
如果你要把这份说明用于实际开发,建议同时保留原始流样本、回放脚本和回归测试,不要只依赖本文。

View File

@@ -58,11 +58,22 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc
newType: currentType,
};
}
if (pathValue === 'response/status' && asString(chunk.v) === 'FINISHED') {
if (isStatusPath(pathValue)) {
if (isFinishedStatus(chunk.v)) {
return {
parsed: true,
parts: [],
finished: true,
contentFilter: false,
errorMessage: '',
outputTokens,
newType: currentType,
};
}
return {
parsed: true,
parts: [],
finished: true,
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
@@ -138,7 +149,7 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc
const val = chunk.v;
if (typeof val === 'string') {
if (val === 'FINISHED' && (!pathValue || pathValue === 'status')) {
if (isFinishedStatus(val) && (!pathValue || pathValue === 'status')) {
return {
parsed: true,
parts: [],
@@ -149,6 +160,17 @@ function parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenc
newType,
};
}
if (isStatusPath(pathValue)) {
return {
parsed: true,
parts: [],
finished: false,
contentFilter: false,
errorMessage: '',
outputTokens,
newType,
};
}
const content = asContentString(val, stripReferenceMarkers);
if (content) {
parts.push({ text: content, type: partType });
@@ -235,8 +257,11 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
}
const itemPath = asString(it.p);
const itemV = it.v;
if (itemPath === 'status' && asString(itemV) === 'FINISHED') {
return { parts: [], finished: true };
if (isStatusPath(itemPath)) {
if (isFinishedStatus(itemV)) {
return { parts: [], finished: true };
}
continue;
}
if (shouldSkipPath(itemPath)) {
continue;
@@ -262,6 +287,9 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
}
if (typeof itemV === 'string') {
if (isStatusPath(itemPath)) {
continue;
}
if (itemV && itemV !== 'FINISHED') {
const content = asContentString(itemV, stripReferenceMarkers);
if (content) {
@@ -304,6 +332,14 @@ function extractContentRecursive(items, defaultType, stripReferenceMarkers = tru
return { parts, finished: false };
}
function isStatusPath(pathValue) {
return pathValue === 'response/status' || pathValue === 'status';
}
function isFinishedStatus(value) {
return asString(value).toUpperCase() === 'FINISHED';
}
function filterLeakedContentFilterParts(parts) {
if (!Array.isArray(parts) || parts.length === 0) {
return parts;

View File

@@ -63,6 +63,16 @@ func TestParseDeepSeekContentLineContent(t *testing.T) {
}
}
func TestParseDeepSeekContentLineFiltersIncompleteStatusText(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"p":"response/status","v":"INCOMPLETE"}`), false, "text")
if !res.Parsed || res.Stop {
t.Fatalf("expected parsed non-stop result: %#v", res)
}
if len(res.Parts) != 0 {
t.Fatalf("expected INCOMPLETE status to be filtered, got %#v", res.Parts)
}
}
func TestParseDeepSeekContentLinePreservesSpaceOnlyChunk(t *testing.T) {
res := ParseDeepSeekContentLine([]byte(`data: {"v":" "}`), false, "text")
if !res.Parsed || res.Stop {

View File

@@ -79,9 +79,12 @@ func ParseSSEChunkForContent(chunk map[string]any, thinkingEnabled bool, current
if shouldSkipPath(path) {
return nil, false, currentFragmentType
}
if path == "response/status" {
if s, ok := v.(string); ok && s == "FINISHED" {
return nil, true, currentFragmentType
if isStatusPath(path) {
if s, ok := v.(string); ok {
if strings.EqualFold(strings.TrimSpace(s), "FINISHED") {
return nil, true, currentFragmentType
}
return nil, false, currentFragmentType
}
}
newType := currentFragmentType
@@ -184,6 +187,9 @@ func appendChunkValueContent(v any, partType string, newType *string, parts *[]C
if val == "FINISHED" && (path == "" || path == "status") {
return true
}
if isStatusPath(path) {
return false
}
appendContentPart(parts, val, partType)
case []any:
pp, finished := extractContentRecursive(val, partType)
@@ -241,6 +247,10 @@ func appendContentPart(parts *[]ContentPart, content, kind string) {
*parts = append(*parts, ContentPart{Text: content, Type: kind})
}
func isStatusPath(path string) bool {
return path == "response/status" || path == "status"
}
func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bool) {
parts := make([]ContentPart, 0, len(items))
for _, it := range items {
@@ -253,10 +263,11 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo
if !hasV {
continue
}
if itemPath == "status" {
if s, ok := itemV.(string); ok && s == "FINISHED" {
if isStatusPath(itemPath) {
if s, ok := itemV.(string); ok && strings.EqualFold(strings.TrimSpace(s), "FINISHED") {
return nil, true
}
continue
}
if shouldSkipPath(itemPath) {
continue
@@ -282,6 +293,9 @@ func extractContentRecursive(items []any, defaultType string) ([]ContentPart, bo
}
switch v := itemV.(type) {
case string:
if isStatusPath(itemPath) {
continue
}
if v != "" && v != "FINISHED" {
parts = append(parts, ContentPart{Text: v, Type: partType})
}

View File

@@ -159,8 +159,8 @@ func TestParseSSEChunkForContentStatusNotFinished(t *testing.T) {
if finished {
t.Fatal("expected not finished for non-FINISHED status")
}
if len(parts) != 1 || parts[0].Text != "IN_PROGRESS" {
t.Fatalf("expected content for non-FINISHED status, got %#v", parts)
if len(parts) != 0 {
t.Fatalf("expected non-finished status to be filtered, got %#v", parts)
}
}

View File

@@ -264,10 +264,10 @@ async function buildWebui() {
}
// 启动后端开发模式go run无需预编译
async function startBackendDev() {
if (!checkGo()) throw new Error('未找到 Go请先安装 Go (https://go.dev/dl/)');
log.info(`启动后端go run... http://localhost:${CONFIG.port}`);
const proc = spawn('go', ['run', './cmd/ds2api'], {
async function startBackendDev() {
if (!checkGo()) throw new Error('未找到 Go请先安装 Go (https://go.dev/dl/)');
log.info(`启动后端go run... 本地 http://127.0.0.1:${CONFIG.port} 绑定 0.0.0.0:${CONFIG.port}`);
const proc = spawn('go', ['run', './cmd/ds2api'], {
cwd: __dirname,
stdio: 'inherit',
shell: true,
@@ -284,13 +284,13 @@ async function startBackendDev() {
}
// 启动后端(生产模式:运行编译好的二进制)
async function startBackendProd() {
if (!binaryExists()) {
log.warn('未找到编译产物,正在编译...');
await buildBackend();
}
log.info(`启动后端(二进制)... http://localhost:${CONFIG.port}`);
const proc = spawn(BINARY, [], {
async function startBackendProd() {
if (!binaryExists()) {
log.warn('未找到编译产物,正在编译...');
await buildBackend();
}
log.info(`启动后端(二进制)... 本地 http://127.0.0.1:${CONFIG.port} 绑定 0.0.0.0:${CONFIG.port}`);
const proc = spawn(BINARY, [], {
cwd: __dirname,
stdio: 'inherit',
shell: false,
@@ -323,13 +323,14 @@ async function startFrontend() {
}
// 显示状态信息
function showStatus() {
console.log('\n' + '─'.repeat(50));
log.success(`后端 API: http://localhost:${CONFIG.port}`);
log.success(`管理界面: http://localhost:${CONFIG.port}/admin`);
if (existsSync(CONFIG.webuiDir)) {
log.success(`前端 Dev: http://localhost:${CONFIG.frontendPort}`);
}
function showStatus() {
console.log('\n' + '─'.repeat(50));
log.success(`后端 API: http://127.0.0.1:${CONFIG.port}`);
log.success(`管理界面: http://127.0.0.1:${CONFIG.port}/admin`);
log.info(`后端绑定: 0.0.0.0:${CONFIG.port} (可通过局域网 IP 访问)`);
if (existsSync(CONFIG.webuiDir)) {
log.success(`前端 Dev: http://localhost:${CONFIG.frontendPort}`);
}
console.log('─'.repeat(50));
log.info('按 Ctrl+C 停止所有服务\n');
}

View File

@@ -291,6 +291,32 @@ test('parseChunkForContent preserves output tokens on FINISHED lines', () => {
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent matches FINISHED case-insensitively on status paths', () => {
const parsed = parseChunkForContent(
{ p: 'response/status', v: ' finished ', accumulated_token_usage: 190 },
false,
'text',
);
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, true);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent filters INCOMPLETE status text without stopping stream', () => {
const parsed = parseChunkForContent(
{ p: 'response/status', v: 'INCOMPLETE', accumulated_token_usage: 190 },
false,
'text',
);
assert.equal(parsed.parsed, true);
assert.equal(parsed.finished, false);
assert.equal(parsed.contentFilter, false);
assert.equal(parsed.outputTokens, 190);
assert.deepEqual(parsed.parts, []);
});
test('parseChunkForContent strips leaked CONTENT_FILTER suffix and preserves line breaks', () => {
const leaked = parseChunkForContent(
{ p: 'response/content', v: '正常输出CONTENT_FILTER你好这个问题我暂时无法回答' },

1675
webui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -17,10 +17,10 @@
"tailwind-merge": "^3.4.0"
},
"devDependencies": {
"@vitejs/plugin-react": "^4.2.1",
"@vitejs/plugin-react": "^6.0.1",
"autoprefixer": "^10.4.24",
"postcss": "^8.5.6",
"tailwindcss": "^3.4.19",
"vite": "^5.0.0"
"vite": "^8.0.3"
}
}