mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-03 16:05:26 +08:00
Compare commits
18 Commits
v3.0.0_bet
...
v3.1.0_bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
068f4b0df6 | ||
|
|
5a51045ba4 | ||
|
|
3497d5d019 | ||
|
|
95a9d16843 | ||
|
|
0847091864 | ||
|
|
c6340354ec | ||
|
|
6bf08e00cd | ||
|
|
35221002d5 | ||
|
|
4b1f1ea550 | ||
|
|
0258f83d10 | ||
|
|
da912f87bf | ||
|
|
6b32d84222 | ||
|
|
e1df5c8636 | ||
|
|
f23382ff5f | ||
|
|
fabdba48c3 | ||
|
|
a28e833f33 | ||
|
|
ec1be468ca | ||
|
|
2d62c658f8 |
82
docs/DeepSeekSSE流格式字段分析-2026-04-03.md
Normal file
82
docs/DeepSeekSSE流格式字段分析-2026-04-03.md
Normal file
@@ -0,0 +1,82 @@
|
||||
# DeepSeek SSE 流格式字段分析(2026-04-03)
|
||||
|
||||
> 日期:2026-04-03(UTC)
|
||||
>
|
||||
> 样本:`tests/raw_stream_samples/guangzhou-weather-reasoner-search-20260403/upstream.stream.sse`
|
||||
>
|
||||
> 模型:`deepseek-reasoner-search`(搜索 + 思考)
|
||||
|
||||
## 1. SSE 事件层结构
|
||||
|
||||
原始流由标准 SSE 帧组成,常见形态:
|
||||
|
||||
```text
|
||||
event: <type>
|
||||
data: <json or text>
|
||||
|
||||
```
|
||||
|
||||
样本中主要 `event` 类型:
|
||||
|
||||
- `ready`:流建立后返回请求/响应消息 ID。
|
||||
- `update_session`:会话时间戳更新。
|
||||
- `finish`:流式阶段结束。
|
||||
- (无 `event` 时)默认为 message 事件,`data:` 中承载主要增量数据。
|
||||
|
||||
## 2. `data` JSON 常见字段
|
||||
|
||||
上游增量主体多为 JSON Patch 风格对象:
|
||||
|
||||
- `p`(path):字段路径,如 `response/fragments/-1/content`。
|
||||
- `o`(op,可选):操作类型,常见 `SET` / `APPEND` / `BATCH`。
|
||||
- `v`(value):值(字符串、布尔、对象、数组都可能)。
|
||||
|
||||
示例(语义):
|
||||
|
||||
- `{"p":"response/fragments/-1/content","o":"APPEND","v":"..."}`
|
||||
- `{"p":"response/fragments/-16/status","v":"FINISHED"}`
|
||||
- `{"p":"response/status","o":"SET","v":"FINISHED"}`
|
||||
|
||||
## 3. 搜索+思考场景关键路径
|
||||
|
||||
### 3.1 文本内容
|
||||
|
||||
- `response/fragments/<idx>/content`
|
||||
- `response/content`
|
||||
- `response/thinking_content`
|
||||
- `response/fragments`(`APPEND` + fragment 数组)
|
||||
|
||||
### 3.2 搜索相关
|
||||
|
||||
- `response/fragments/<idx>/results`(检索结果数组)
|
||||
- `response/search_status`(检索状态,建议跳过展示)
|
||||
|
||||
### 3.3 状态相关(重点)
|
||||
|
||||
- `response/status = FINISHED`:**最终结束信号**(需要保留用于结束判定)
|
||||
- `response/fragments/<idx>/status = FINISHED`:**分片级状态**(高频,建议跳过输出)
|
||||
- `response/quasi_status`:过程状态(建议跳过输出)
|
||||
|
||||
## 4. 泄露问题根因(FINISHED 重复)
|
||||
|
||||
在搜索 + 思考模型中,`response/fragments/<idx>/status` 会出现大量不同 `<idx>`(例如 `-1/-2/-3/-16...`)的 `FINISHED`。
|
||||
|
||||
若只过滤固定少量索引(例如仅 `-1/-2/-3`),其他索引的状态会当普通文本透传,导致前端出现:
|
||||
|
||||
- `FINISHEDFINISHEDFINISHED...`
|
||||
|
||||
## 5. 适配建议(已落地)
|
||||
|
||||
1. 跳过所有 `response/fragments/-?\d+/status`。
|
||||
2. 继续保留 `response/status=FINISHED` 作为真正结束判定。
|
||||
3. 通过独立仿真工具持续回放全部样本,作为回归门禁:
|
||||
|
||||
```bash
|
||||
./tests/scripts/run-raw-stream-sim.sh
|
||||
```
|
||||
|
||||
## 6. 后续扩展建议
|
||||
|
||||
- 增加不同模型(`deepseek-chat-search` / 非 search / 非 thinking)样本。
|
||||
- 增加异常样本(限流、中断、content_filter、空结果)。
|
||||
- 为仿真报告加入字段覆盖率统计(路径频次、事件频次、终止路径命中率)。
|
||||
@@ -226,6 +226,17 @@ node --test tests/node/stream-tool-sieve.test.js
|
||||
go run ./cmd/ds2api-tests --no-preflight
|
||||
```
|
||||
|
||||
### 运行原始流仿真(独立工具)
|
||||
|
||||
```bash
|
||||
./tests/scripts/run-raw-stream-sim.sh
|
||||
```
|
||||
|
||||
说明:
|
||||
- 该工具会重放 `tests/raw_stream_samples` 下全部样本,按上游 SSE 顺序做 1:1 仿真解析。
|
||||
- 默认校验不出现 `FINISHED` 文本泄露,并要求存在结束信号。
|
||||
- 结果会写入 `artifacts/raw-stream-sim/*.json`,可供其他测试脚本或排障流程复用。
|
||||
|
||||
### 指定输出目录和超时
|
||||
|
||||
```bash
|
||||
|
||||
@@ -106,6 +106,9 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, re
|
||||
|
||||
finalThinking := result.Thinking
|
||||
finalText := sanitizeLeakedOutput(result.Text)
|
||||
if writeUpstreamEmptyOutputError(w, result) {
|
||||
return
|
||||
}
|
||||
respBody := openaifmt.BuildChatCompletion(completionID, model, finalPrompt, finalThinking, finalText, toolNames)
|
||||
if result.OutputTokens > 0 {
|
||||
if usage, ok := respBody["usage"].(map[string]any); ok {
|
||||
|
||||
@@ -275,6 +275,44 @@ func TestHandleNonStreamFencedToolCallExamplePromotesToolCall(t *testing.T) {
|
||||
TestHandleNonStreamFencedToolCallExampleDoesNotPromoteToolCall(t)
|
||||
}
|
||||
|
||||
func TestHandleNonStreamReturns502WhenUpstreamOutputEmpty(t *testing.T) {
|
||||
h := &Handler{}
|
||||
resp := makeSSEHTTPResponse(
|
||||
`data: {"p":"response/content","v":""}`,
|
||||
`data: [DONE]`,
|
||||
)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h.handleNonStream(rec, context.Background(), resp, "cid-empty", "deepseek-chat", "prompt", false, nil)
|
||||
if rec.Code != http.StatusBadGateway {
|
||||
t.Fatalf("expected status 502 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
out := decodeJSONBody(t, rec.Body.String())
|
||||
errObj, _ := out["error"].(map[string]any)
|
||||
if asString(errObj["code"]) != "upstream_empty_output" {
|
||||
t.Fatalf("expected code=upstream_empty_output, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) {
|
||||
h := &Handler{}
|
||||
resp := makeSSEHTTPResponse(
|
||||
`data: {"code":"content_filter"}`,
|
||||
`data: [DONE]`,
|
||||
)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h.handleNonStream(rec, context.Background(), resp, "cid-empty-filtered", "deepseek-chat", "prompt", false, nil)
|
||||
if rec.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected status 400 for filtered upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
out := decodeJSONBody(t, rec.Body.String())
|
||||
errObj, _ := out["error"].(map[string]any)
|
||||
if asString(errObj["code"]) != "content_filter" {
|
||||
t.Fatalf("expected code=content_filter, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleStreamToolCallInterceptsWithoutRawContentLeak(t *testing.T) {
|
||||
h := &Handler{}
|
||||
resp := makeSSEHTTPResponse(
|
||||
|
||||
@@ -114,6 +114,9 @@ func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Res
|
||||
}
|
||||
result := sse.CollectStream(resp, thinkingEnabled, true)
|
||||
sanitizedText := sanitizeLeakedOutput(result.Text)
|
||||
if writeUpstreamEmptyOutputError(w, result) {
|
||||
return
|
||||
}
|
||||
textParsed := util.ParseStandaloneToolCallsDetailed(sanitizedText, toolNames)
|
||||
logResponsesToolPolicyRejection(traceID, toolChoice, textParsed, "text")
|
||||
|
||||
|
||||
@@ -627,6 +627,50 @@ func TestHandleResponsesNonStreamToolChoiceNoneStillAllowsFunctionCall(t *testin
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleResponsesNonStreamReturns502WhenUpstreamOutputEmpty(t *testing.T) {
|
||||
h := &Handler{}
|
||||
rec := httptest.NewRecorder()
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(
|
||||
`data: {"p":"response/content","v":""}` + "\n" +
|
||||
`data: [DONE]` + "\n",
|
||||
)),
|
||||
}
|
||||
|
||||
h.handleResponsesNonStream(rec, resp, "owner-a", "resp_test", "deepseek-chat", "prompt", false, nil, util.DefaultToolChoicePolicy(), "")
|
||||
if rec.Code != http.StatusBadGateway {
|
||||
t.Fatalf("expected 502 for empty upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
out := decodeJSONBody(t, rec.Body.String())
|
||||
errObj, _ := out["error"].(map[string]any)
|
||||
if asString(errObj["code"]) != "upstream_empty_output" {
|
||||
t.Fatalf("expected code=upstream_empty_output, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleResponsesNonStreamReturnsContentFilterErrorWhenUpstreamFilteredWithoutOutput(t *testing.T) {
|
||||
h := &Handler{}
|
||||
rec := httptest.NewRecorder()
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(strings.NewReader(
|
||||
`data: {"code":"content_filter"}` + "\n" +
|
||||
`data: [DONE]` + "\n",
|
||||
)),
|
||||
}
|
||||
|
||||
h.handleResponsesNonStream(rec, resp, "owner-a", "resp_test", "deepseek-chat", "prompt", false, nil, util.DefaultToolChoicePolicy(), "")
|
||||
if rec.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected 400 for filtered empty upstream output, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
out := decodeJSONBody(t, rec.Body.String())
|
||||
errObj, _ := out["error"].(map[string]any)
|
||||
if asString(errObj["code"]) != "content_filter" {
|
||||
t.Fatalf("expected code=content_filter, got %#v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func extractSSEEventPayload(body, targetEvent string) (map[string]any, bool) {
|
||||
scanner := bufio.NewScanner(strings.NewReader(body))
|
||||
matched := false
|
||||
|
||||
@@ -71,12 +71,31 @@ func consumeXMLToolCapture(captured string, toolNames []string) (prefix string,
|
||||
prefixPart, suffixPart = trimWrappingJSONFence(prefixPart, suffixPart)
|
||||
return prefixPart, parsed, suffixPart, true
|
||||
}
|
||||
// If this block does not look like an executable tool-call payload,
|
||||
// pass it through as normal content (e.g. user-requested XML snippets).
|
||||
if !looksLikeExecutableXMLToolCallBlock(xmlBlock, pair.open) {
|
||||
return prefixPart + xmlBlock, nil, suffixPart, true
|
||||
}
|
||||
// Looks like XML tool syntax but failed to parse — consume it to avoid leak.
|
||||
return prefixPart, nil, suffixPart, true
|
||||
}
|
||||
return "", nil, "", false
|
||||
}
|
||||
|
||||
func looksLikeExecutableXMLToolCallBlock(xmlBlock, openTag string) bool {
|
||||
lower := strings.ToLower(xmlBlock)
|
||||
// Agent wrapper tags are always treated as internal tool-call wrappers.
|
||||
switch openTag {
|
||||
case "<attempt_completion", "<ask_followup_question", "<new_task":
|
||||
return true
|
||||
}
|
||||
return strings.Contains(lower, "<tool_name") ||
|
||||
strings.Contains(lower, "<parameters") ||
|
||||
strings.Contains(lower, `"tool"`) ||
|
||||
strings.Contains(lower, `"tool_name"`) ||
|
||||
strings.Contains(lower, `"name"`)
|
||||
}
|
||||
|
||||
// hasOpenXMLToolTag returns true if captured text contains an XML tool opening tag
|
||||
// whose SPECIFIC closing tag has not appeared yet.
|
||||
func hasOpenXMLToolTag(captured string) bool {
|
||||
|
||||
@@ -78,6 +78,49 @@ func TestProcessToolSieveXMLWithLeadingText(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessToolSievePassesThroughNonToolXMLBlock(t *testing.T) {
|
||||
var state toolStreamSieveState
|
||||
chunk := `<tool_call><title>示例 XML</title><body>plain text xml payload</body></tool_call>`
|
||||
events := processToolSieveChunk(&state, chunk, []string{"read_file"})
|
||||
events = append(events, flushToolSieve(&state, []string{"read_file"})...)
|
||||
|
||||
var textContent strings.Builder
|
||||
toolCalls := 0
|
||||
for _, evt := range events {
|
||||
textContent.WriteString(evt.Content)
|
||||
toolCalls += len(evt.ToolCalls)
|
||||
}
|
||||
if toolCalls != 0 {
|
||||
t.Fatalf("expected no tool calls for plain XML payload, got %d events=%#v", toolCalls, events)
|
||||
}
|
||||
if textContent.String() != chunk {
|
||||
t.Fatalf("expected XML payload to pass through unchanged, got %q", textContent.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessToolSieveNonToolXMLKeepsSuffixForToolParsing(t *testing.T) {
|
||||
var state toolStreamSieveState
|
||||
chunk := `<tool_call><title>plain xml</title></tool_call><invoke name="read_file"><parameters>{"path":"README.MD"}</parameters></invoke>`
|
||||
events := processToolSieveChunk(&state, chunk, []string{"read_file"})
|
||||
events = append(events, flushToolSieve(&state, []string{"read_file"})...)
|
||||
|
||||
var textContent strings.Builder
|
||||
toolCalls := 0
|
||||
for _, evt := range events {
|
||||
textContent.WriteString(evt.Content)
|
||||
toolCalls += len(evt.ToolCalls)
|
||||
}
|
||||
if !strings.Contains(textContent.String(), `<tool_call><title>plain xml</title></tool_call>`) {
|
||||
t.Fatalf("expected leading non-tool XML to be preserved, got %q", textContent.String())
|
||||
}
|
||||
if strings.Contains(textContent.String(), `<invoke name="read_file">`) {
|
||||
t.Fatalf("expected invoke tool XML to be intercepted, got %q", textContent.String())
|
||||
}
|
||||
if toolCalls != 1 {
|
||||
t.Fatalf("expected exactly one parsed tool call from suffix, got %d events=%#v", toolCalls, events)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessToolSievePartialXMLTagHeldBack(t *testing.T) {
|
||||
var state toolStreamSieveState
|
||||
// Chunk ends with a partial XML tool tag.
|
||||
@@ -364,7 +407,7 @@ func TestOpeningXMLTagNotLeakedAsContent(t *testing.T) {
|
||||
|
||||
func TestProcessToolSieveInterceptsAttemptCompletionLeak(t *testing.T) {
|
||||
var state toolStreamSieveState
|
||||
// Simulate an agent outputting attempt_completion XML tag
|
||||
// Simulate an agent outputting attempt_completion XML tag
|
||||
// which shouldn't leak to text output, even if it fails to parse as a valid tool.
|
||||
chunks := []string{
|
||||
"Done with task.\n",
|
||||
|
||||
20
internal/adapter/openai/upstream_empty.go
Normal file
20
internal/adapter/openai/upstream_empty.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"ds2api/internal/sse"
|
||||
)
|
||||
|
||||
func writeUpstreamEmptyOutputError(w http.ResponseWriter, result sse.CollectResult) bool {
|
||||
if strings.TrimSpace(result.Thinking) != "" || strings.TrimSpace(sanitizeLeakedOutput(result.Text)) != "" {
|
||||
return false
|
||||
}
|
||||
if result.ContentFilter {
|
||||
writeOpenAIErrorWithCode(w, http.StatusBadRequest, "Upstream content filtered the response and returned no output.", "content_filter")
|
||||
return true
|
||||
}
|
||||
writeOpenAIErrorWithCode(w, http.StatusBadGateway, "Upstream model returned empty output.", "upstream_empty_output")
|
||||
return true
|
||||
}
|
||||
@@ -59,8 +59,9 @@ async function handler(req, res) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep all non-stream behavior on Go side to avoid compatibility regressions.
|
||||
if (!toBool(payload.stream)) {
|
||||
// Keep all non-stream behavior and non-OpenAI-chat paths on Go side to avoid
|
||||
// protocol-shape regressions (e.g. Gemini/Claude clients expecting their own formats).
|
||||
if (!toBool(payload.stream) || !isNodeStreamSupportedPath(req.url || '')) {
|
||||
await proxyToGo(req, res, rawBody);
|
||||
return;
|
||||
}
|
||||
@@ -76,6 +77,23 @@ function isVercelRuntime() {
|
||||
return asString(process.env.VERCEL) !== '' || asString(process.env.NOW_REGION) !== '';
|
||||
}
|
||||
|
||||
function isNodeStreamSupportedPath(rawURL) {
|
||||
const path = extractPathname(rawURL);
|
||||
return path === '/v1/chat/completions';
|
||||
}
|
||||
|
||||
function extractPathname(rawURL) {
|
||||
const text = asString(rawURL);
|
||||
if (!text) {
|
||||
return '';
|
||||
}
|
||||
const q = text.indexOf('?');
|
||||
if (q >= 0) {
|
||||
return text.slice(0, q);
|
||||
}
|
||||
return text;
|
||||
}
|
||||
|
||||
module.exports = handler;
|
||||
|
||||
module.exports.__test = {
|
||||
@@ -89,4 +107,6 @@ module.exports.__test = {
|
||||
boolDefaultTrue,
|
||||
filterIncrementalToolCallDeltasByAllowed,
|
||||
estimateTokens,
|
||||
isNodeStreamSupportedPath,
|
||||
extractPathname,
|
||||
};
|
||||
|
||||
@@ -193,6 +193,9 @@ function extractContentRecursive(items, defaultType) {
|
||||
}
|
||||
|
||||
function shouldSkipPath(pathValue) {
|
||||
if (isFragmentStatusPath(pathValue)) {
|
||||
return true;
|
||||
}
|
||||
if (SKIP_EXACT_PATHS.has(pathValue)) {
|
||||
return true;
|
||||
}
|
||||
@@ -204,6 +207,13 @@ function shouldSkipPath(pathValue) {
|
||||
return false;
|
||||
}
|
||||
|
||||
function isFragmentStatusPath(pathValue) {
|
||||
if (!pathValue || pathValue === 'response/status') {
|
||||
return false;
|
||||
}
|
||||
return /^response\/fragments\/-?\d+\/status$/i.test(pathValue);
|
||||
}
|
||||
|
||||
function isCitation(text) {
|
||||
return asString(text).trim().startsWith('[citation:');
|
||||
}
|
||||
@@ -225,5 +235,6 @@ module.exports = {
|
||||
parseChunkForContent,
|
||||
extractContentRecursive,
|
||||
shouldSkipPath,
|
||||
isFragmentStatusPath,
|
||||
isCitation,
|
||||
};
|
||||
|
||||
@@ -10,9 +10,10 @@ import (
|
||||
// CollectResult holds the aggregated text and thinking content from a
|
||||
// DeepSeek SSE stream, consumed to completion (non-streaming use case).
|
||||
type CollectResult struct {
|
||||
Text string
|
||||
Thinking string
|
||||
OutputTokens int
|
||||
Text string
|
||||
Thinking string
|
||||
OutputTokens int
|
||||
ContentFilter bool
|
||||
}
|
||||
|
||||
// CollectStream fully consumes a DeepSeek SSE response and separates
|
||||
@@ -28,6 +29,7 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
|
||||
text := strings.Builder{}
|
||||
thinking := strings.Builder{}
|
||||
outputTokens := 0
|
||||
contentFilter := false
|
||||
currentType := "text"
|
||||
if thinkingEnabled {
|
||||
currentType = "thinking"
|
||||
@@ -39,6 +41,9 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
|
||||
return true
|
||||
}
|
||||
if result.Stop {
|
||||
if result.ContentFilter {
|
||||
contentFilter = true
|
||||
}
|
||||
if result.OutputTokens > 0 {
|
||||
outputTokens = result.OutputTokens
|
||||
}
|
||||
@@ -56,5 +61,10 @@ func CollectStream(resp *http.Response, thinkingEnabled bool, closeBody bool) Co
|
||||
}
|
||||
return true
|
||||
})
|
||||
return CollectResult{Text: text.String(), Thinking: thinking.String(), OutputTokens: outputTokens}
|
||||
return CollectResult{
|
||||
Text: text.String(),
|
||||
Thinking: thinking.String(),
|
||||
OutputTokens: outputTokens,
|
||||
ContentFilter: contentFilter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ func filterLeakedContentFilterParts(parts []ContentPart) []ContentPart {
|
||||
out := make([]ContentPart, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
cleaned := stripLeakedContentFilterSuffix(p.Text)
|
||||
if strings.TrimSpace(cleaned) == "" {
|
||||
if shouldDropCleanedLeakedChunk(cleaned) {
|
||||
continue
|
||||
}
|
||||
p.Text = cleaned
|
||||
@@ -27,5 +27,19 @@ func stripLeakedContentFilterSuffix(text string) string {
|
||||
if idx < 0 {
|
||||
return text
|
||||
}
|
||||
return strings.TrimRight(text[:idx], " \t\r\n")
|
||||
// Keep "\n" so we don't collapse line structure when the upstream model
|
||||
// appends leaked CONTENT_FILTER markers after a line break.
|
||||
return strings.TrimRight(text[:idx], " \t\r")
|
||||
}
|
||||
|
||||
func shouldDropCleanedLeakedChunk(cleaned string) bool {
|
||||
if cleaned == "" {
|
||||
return true
|
||||
}
|
||||
// Preserve newline-only chunks to avoid dropping legitimate line breaks
|
||||
// before a leaked CONTENT_FILTER suffix.
|
||||
if strings.Contains(cleaned, "\n") {
|
||||
return false
|
||||
}
|
||||
return strings.TrimSpace(cleaned) == ""
|
||||
}
|
||||
|
||||
@@ -102,3 +102,23 @@ func TestParseDeepSeekContentLineContentTextEqualContentFilterDoesNotStop(t *tes
|
||||
t.Fatalf("did not expect content-filter stop for content text: %#v", res)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDeepSeekContentLinePreservesTrailingNewlineBeforeLeakedContentFilter(t *testing.T) {
|
||||
res := ParseDeepSeekContentLine([]byte("data: {\"p\":\"response/content\",\"v\":\"line1\\nCONTENT_FILTERblocked\"}"), false, "text")
|
||||
if !res.Parsed || res.Stop {
|
||||
t.Fatalf("expected parsed non-stop result: %#v", res)
|
||||
}
|
||||
if len(res.Parts) != 1 || res.Parts[0].Text != "line1\n" {
|
||||
t.Fatalf("expected trailing newline preserved, got %#v", res.Parts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseDeepSeekContentLineKeepsNewlineOnlyChunkBeforeLeakedContentFilter(t *testing.T) {
|
||||
res := ParseDeepSeekContentLine([]byte("data: {\"p\":\"response/content\",\"v\":\"\\nCONTENT_FILTERblocked\"}"), false, "text")
|
||||
if !res.Parsed || res.Stop {
|
||||
t.Fatalf("expected parsed non-stop result: %#v", res)
|
||||
}
|
||||
if len(res.Parts) != 1 || res.Parts[0].Text != "\n" {
|
||||
t.Fatalf("expected newline-only chunk preserved, got %#v", res.Parts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,9 @@ func ParseDeepSeekSSELine(raw []byte) (map[string]any, bool, bool) {
|
||||
}
|
||||
|
||||
func shouldSkipPath(path string) bool {
|
||||
if isFragmentStatusPath(path) {
|
||||
return true
|
||||
}
|
||||
if _, ok := deepseek.SkipExactPathSet[path]; ok {
|
||||
return true
|
||||
}
|
||||
@@ -42,6 +45,31 @@ func shouldSkipPath(path string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func isFragmentStatusPath(path string) bool {
|
||||
if path == "" || path == "response/status" {
|
||||
return false
|
||||
}
|
||||
if !strings.HasPrefix(path, "response/fragments/") || !strings.HasSuffix(path, "/status") {
|
||||
return false
|
||||
}
|
||||
mid := strings.TrimSuffix(strings.TrimPrefix(path, "response/fragments/"), "/status")
|
||||
if mid == "" {
|
||||
return false
|
||||
}
|
||||
if strings.HasPrefix(mid, "-") {
|
||||
mid = mid[1:]
|
||||
}
|
||||
if mid == "" {
|
||||
return false
|
||||
}
|
||||
for _, r := range mid {
|
||||
if r < '0' || r > '9' {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func ParseSSEChunkForContent(chunk map[string]any, thinkingEnabled bool, currentFragmentType string) ([]ContentPart, bool, string) {
|
||||
v, ok := chunk["v"]
|
||||
if !ok {
|
||||
|
||||
@@ -90,6 +90,15 @@ func TestShouldSkipPathFragmentStatus(t *testing.T) {
|
||||
if !shouldSkipPath("response/fragments/-3/status") {
|
||||
t.Fatal("expected skip for fragment -3 status")
|
||||
}
|
||||
if !shouldSkipPath("response/fragments/-16/status") {
|
||||
t.Fatal("expected skip for fragment -16 status")
|
||||
}
|
||||
if !shouldSkipPath("response/fragments/7/status") {
|
||||
t.Fatal("expected skip for fragment 7 status")
|
||||
}
|
||||
if shouldSkipPath("response/status") {
|
||||
t.Fatal("expected response/status to be handled by finish logic, not skipped")
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldSkipPathRegularContent(t *testing.T) {
|
||||
|
||||
@@ -19,6 +19,10 @@ var toolUseFunctionPattern = regexp.MustCompile(`(?is)<tool_use>\s*<function\s+n
|
||||
var toolUseNameParametersPattern = regexp.MustCompile(`(?is)<tool_use>\s*<tool_name>\s*([^<]+?)\s*</tool_name>\s*<parameters>\s*(.*?)\s*</parameters>\s*</tool_use>`)
|
||||
var toolUseFunctionNameParametersPattern = regexp.MustCompile(`(?is)<tool_use>\s*<function_name>\s*([^<]+?)\s*</function_name>\s*<parameters>\s*(.*?)\s*</parameters>\s*</tool_use>`)
|
||||
var toolUseToolNameBodyPattern = regexp.MustCompile(`(?is)<tool_use>\s*<tool_name>\s*([^<]+?)\s*</tool_name>\s*(.*?)\s*</tool_use>`)
|
||||
var xmlToolNamePatterns = []*regexp.Regexp{
|
||||
regexp.MustCompile(`(?is)<(?:[a-z0-9_:-]+:)?tool_name\b[^>]*>(.*?)</(?:[a-z0-9_:-]+:)?tool_name>`),
|
||||
regexp.MustCompile(`(?is)<(?:[a-z0-9_:-]+:)?function_name\b[^>]*>(.*?)</(?:[a-z0-9_:-]+:)?function_name>`),
|
||||
}
|
||||
|
||||
func parseXMLToolCalls(text string) []ParsedToolCall {
|
||||
matches := xmlToolCallPattern.FindAllString(text, -1)
|
||||
@@ -81,9 +85,9 @@ func parseSingleXMLToolCall(block string) (ParsedToolCall, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
dec := xml.NewDecoder(strings.NewReader(block))
|
||||
name := ""
|
||||
params := map[string]any{}
|
||||
params := extractXMLToolParamsByRegex(inner)
|
||||
dec := xml.NewDecoder(strings.NewReader(block))
|
||||
inParams := false
|
||||
inTool := false
|
||||
for {
|
||||
@@ -132,9 +136,13 @@ func parseSingleXMLToolCall(block string) (ParsedToolCall, bool) {
|
||||
}
|
||||
}
|
||||
inParams = false
|
||||
case "tool_name", "name":
|
||||
case "tool_name", "function_name", "name":
|
||||
var v string
|
||||
if err := dec.DecodeElement(&v, &t); err == nil && strings.TrimSpace(v) != "" {
|
||||
if inParams {
|
||||
params[t.Name.Local] = strings.TrimSpace(v)
|
||||
break
|
||||
}
|
||||
name = strings.TrimSpace(v)
|
||||
}
|
||||
case "input", "arguments", "argument", "args", "params":
|
||||
@@ -164,12 +172,60 @@ func parseSingleXMLToolCall(block string) (ParsedToolCall, bool) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(name) == "" {
|
||||
name = strings.TrimSpace(extractXMLToolNameByRegex(stripTopLevelXMLParameters(inner)))
|
||||
}
|
||||
if strings.TrimSpace(name) == "" {
|
||||
return ParsedToolCall{}, false
|
||||
}
|
||||
return ParsedToolCall{Name: strings.TrimSpace(name), Input: params}, true
|
||||
}
|
||||
|
||||
func stripTopLevelXMLParameters(inner string) string {
|
||||
out := strings.TrimSpace(inner)
|
||||
for {
|
||||
idx := strings.Index(strings.ToLower(out), "<parameters")
|
||||
if idx < 0 {
|
||||
return out
|
||||
}
|
||||
segment := out[idx:]
|
||||
segmentLower := strings.ToLower(segment)
|
||||
openEnd := strings.Index(segmentLower, ">")
|
||||
if openEnd < 0 {
|
||||
return out
|
||||
}
|
||||
closeIdx := strings.Index(segmentLower, "</parameters>")
|
||||
if closeIdx < 0 {
|
||||
return out[:idx]
|
||||
}
|
||||
end := idx + closeIdx + len("</parameters>")
|
||||
out = out[:idx] + out[end:]
|
||||
}
|
||||
}
|
||||
|
||||
func extractXMLToolNameByRegex(inner string) string {
|
||||
for _, pattern := range xmlToolNamePatterns {
|
||||
if m := pattern.FindStringSubmatch(inner); len(m) >= 2 {
|
||||
if v := strings.TrimSpace(stripTagText(m[1])); v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func extractXMLToolParamsByRegex(inner string) map[string]any {
|
||||
raw := findMarkupTagValue(inner, toolCallMarkupArgsTagNames, toolCallMarkupArgsPatternByTag)
|
||||
if raw == "" {
|
||||
return map[string]any{}
|
||||
}
|
||||
parsed := parseMarkupInput(raw)
|
||||
if parsed == nil {
|
||||
return map[string]any{}
|
||||
}
|
||||
return parsed
|
||||
}
|
||||
|
||||
func parseFunctionCallTagStyle(text string) (ParsedToolCall, bool) {
|
||||
m := functionCallPattern.FindStringSubmatch(text)
|
||||
if len(m) < 2 {
|
||||
|
||||
@@ -176,6 +176,35 @@ func TestParseToolCallsSupportsCanonicalXMLParametersJSON(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseToolCallsSupportsXMLParametersJSONWithAmpersandCommand(t *testing.T) {
|
||||
text := `<tool_calls><tool_call><tool_name>execute_command</tool_name><parameters>{"command":"sshpass -p 'xxx' ssh -o StrictHostKeyChecking=no -p 1111 root@111.111.111.111 'cd /root && git clone https://github.com/ericc-ch/copilot-api.git'","cwd":null,"timeout":null}</parameters></tool_call></tool_calls>`
|
||||
calls := ParseToolCalls(text, []string{"execute_command"})
|
||||
if len(calls) != 1 {
|
||||
t.Fatalf("expected 1 call, got %#v", calls)
|
||||
}
|
||||
if calls[0].Name != "execute_command" {
|
||||
t.Fatalf("expected tool name execute_command, got %q", calls[0].Name)
|
||||
}
|
||||
cmd, _ := calls[0].Input["command"].(string)
|
||||
if !strings.Contains(cmd, "&& git clone") {
|
||||
t.Fatalf("expected command to keep && segment, got %#v", calls[0].Input)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseToolCallsDoesNotTreatParameterNameTagAsToolName(t *testing.T) {
|
||||
text := `<tool_call><tool name="execute_command"><parameters><name>file.txt</name><command>pwd</command></parameters></tool></tool_call>`
|
||||
calls := ParseToolCalls(text, []string{"execute_command"})
|
||||
if len(calls) != 1 {
|
||||
t.Fatalf("expected 1 call, got %#v", calls)
|
||||
}
|
||||
if calls[0].Name != "execute_command" {
|
||||
t.Fatalf("expected tool name execute_command, got %q", calls[0].Name)
|
||||
}
|
||||
if calls[0].Input["name"] != "file.txt" {
|
||||
t.Fatalf("expected parameter name preserved, got %#v", calls[0].Input)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseToolCallsPrefersJSONPayloadOverIncidentalXMLInString(t *testing.T) {
|
||||
text := `{"tool_calls":[{"name":"search","input":{"q":"latest <tool_call><tool_name>wrong</tool_name><parameters>{\"x\":1}</parameters></tool_call>"}}]}`
|
||||
calls := ParseToolCallsDetailed(text, []string{"search"}).Calls
|
||||
@@ -402,6 +431,14 @@ func TestParseToolCallsDoesNotAcceptMismatchedMarkupTags(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseToolCallsDoesNotTreatParametersFunctionNameAsToolName(t *testing.T) {
|
||||
text := `<tool_call><parameters><function_name>data_only</function_name><path>README.md</path></parameters></tool_call>`
|
||||
calls := ParseToolCalls(text, []string{"read_file"})
|
||||
if len(calls) != 0 {
|
||||
t.Fatalf("expected no tool call when function_name appears only under parameters, got %#v", calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepairInvalidJSONBackslashes(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
|
||||
@@ -17,6 +17,9 @@ const {
|
||||
normalizePreparedToolNames,
|
||||
boolDefaultTrue,
|
||||
filterIncrementalToolCallDeltasByAllowed,
|
||||
shouldSkipPath,
|
||||
isNodeStreamSupportedPath,
|
||||
extractPathname,
|
||||
} = handler.__test;
|
||||
|
||||
test('chat-stream exposes parser test hooks', () => {
|
||||
@@ -218,3 +221,21 @@ test('parseChunkForContent supports wrapped response.fragments object shape', ()
|
||||
assert.equal(parsed.finished, false);
|
||||
assert.equal(parsed.parts.map((p) => p.text).join(''), 'AB');
|
||||
});
|
||||
|
||||
test('shouldSkipPath skips dynamic response/fragments/*/status paths only', () => {
|
||||
assert.equal(shouldSkipPath('response/fragments/-16/status'), true);
|
||||
assert.equal(shouldSkipPath('response/fragments/8/status'), true);
|
||||
assert.equal(shouldSkipPath('response/status'), false);
|
||||
});
|
||||
|
||||
test('node stream path guard only allows /v1/chat/completions', () => {
|
||||
assert.equal(isNodeStreamSupportedPath('/v1/chat/completions'), true);
|
||||
assert.equal(isNodeStreamSupportedPath('/v1/chat/completions?x=1'), true);
|
||||
assert.equal(isNodeStreamSupportedPath('/v1beta/models/gemini-2.5-flash:streamGenerateContent'), false);
|
||||
assert.equal(isNodeStreamSupportedPath('/anthropic/v1/messages'), false);
|
||||
});
|
||||
|
||||
test('extractPathname strips query only', () => {
|
||||
assert.equal(extractPathname('/v1/chat/completions?stream=true'), '/v1/chat/completions');
|
||||
assert.equal(extractPathname('/v1beta/models/gemini-2.5-flash:streamGenerateContent?key=1'), '/v1beta/models/gemini-2.5-flash:streamGenerateContent');
|
||||
});
|
||||
|
||||
28
tests/raw_stream_samples/README.md
Normal file
28
tests/raw_stream_samples/README.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# 原始流数据样本目录
|
||||
|
||||
该目录用于存放**上游真实 SSE 原始流**样本,供本地仿真测试和解析适配使用。
|
||||
|
||||
## 目录规范
|
||||
|
||||
每个样本一个子目录:
|
||||
|
||||
- `meta.json`:样本元信息(问题、模型、采集时间、备注)
|
||||
- `upstream.stream.sse`:完整原始 SSE 文本(`event:` / `data:` 行)
|
||||
|
||||
## 扩展方式
|
||||
|
||||
1. 抓取一次真实请求(建议开启 `DS2API_DEV_PACKET_CAPTURE=1`)。
|
||||
2. 新建 `<sample-id>/` 目录并放入 `meta.json` + `upstream.stream.sse`。
|
||||
3. 运行独立仿真工具(可被其他测试脚本调用):
|
||||
|
||||
```bash
|
||||
./tests/scripts/run-raw-stream-sim.sh
|
||||
```
|
||||
|
||||
该工具会自动遍历本目录全部样本,按真实流顺序重放并验证:
|
||||
|
||||
- 不会把上游 `status=FINISHED` 片段当正文输出(防泄露)。
|
||||
- 能正确检测 `response/status=FINISHED` 流结束信号。
|
||||
- 生成可归档 JSON 报告(`artifacts/raw-stream-sim/`)。
|
||||
|
||||
> 注意:样本可能包含搜索结果正文与引用信息,请勿放入敏感账号/密钥。
|
||||
@@ -0,0 +1,55 @@
|
||||
# 样本分析(广州天气 / deepseek-reasoner-search)
|
||||
|
||||
- 样本来源:`/admin/dev/captures` 上游原始 SSE 抓包
|
||||
- 采集时间(UTC):2026-04-03 01:28:50
|
||||
- 原始字节数:41043
|
||||
- `FINISHED` 字符串出现次数:24
|
||||
- JSON `data:` chunk 数:420
|
||||
|
||||
## 事件分布
|
||||
|
||||
- `ready`: 1
|
||||
- `update_session`: 2
|
||||
- `finish`: 1
|
||||
|
||||
## 高频路径(Top 12)
|
||||
|
||||
- `response/fragments/-1/content`: 13
|
||||
- `response/fragments/-1`: 9
|
||||
- `response`: 5
|
||||
- `response/has_pending_fragment`: 4
|
||||
- `response/fragments/-1/elapsed_secs`: 3
|
||||
- `response/fragments/-5/status`: 2
|
||||
- `response/fragments/-6/status`: 2
|
||||
- `response/fragments/-3/status`: 2
|
||||
- `response/fragments/-1/status`: 2
|
||||
- `response/fragments/-4/status`: 2
|
||||
- `response/fragments/-2/status`: 2
|
||||
- `response/fragments/-5/results`: 1
|
||||
|
||||
## 关键泄露来源
|
||||
|
||||
以下状态路径会高频出现 `v=FINISHED`,如果解析器按普通文本透传,就会出现 `FINISHEDFINISHED...` 泄露:
|
||||
|
||||
- `response/fragments/-5/status`: 2
|
||||
- `response/fragments/-6/status`: 2
|
||||
- `response/fragments/-3/status`: 2
|
||||
- `response/fragments/-1/status`: 2
|
||||
- `response/fragments/-4/status`: 2
|
||||
- `response/fragments/-2/status`: 2
|
||||
- `response/fragments/-14/status`: 1
|
||||
- `response/fragments/-12/status`: 1
|
||||
- `response/fragments/-10/status`: 1
|
||||
- `response/fragments/-9/status`: 1
|
||||
- `response/fragments/-8/status`: 1
|
||||
- `response/fragments/-7/status`: 1
|
||||
- `response/fragments/-11/status`: 1
|
||||
- `response/fragments/-16/status`: 1
|
||||
- `response/fragments/-13/status`: 1
|
||||
- `response/fragments/-15/status`: 1
|
||||
|
||||
## 适配建议
|
||||
|
||||
1. 跳过 `response/fragments/<index>/status`(所有 index,而非仅 `-1/-2/-3`)。
|
||||
2. 保留 `response/status=FINISHED` 用于结束流判定,不应当输出正文。
|
||||
3. 在样本仿真测试中对全部样本执行“不得输出 `FINISHED`”断言。
|
||||
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"sample_id": "guangzhou-weather-reasoner-search-20260403",
|
||||
"captured_at_utc": "2026-04-03T01:28:50Z",
|
||||
"request": {
|
||||
"model": "deepseek-reasoner-search",
|
||||
"stream": true,
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "广州天气"
|
||||
}
|
||||
],
|
||||
"thinking_enabled": true,
|
||||
"search_enabled": true
|
||||
},
|
||||
"capture": {
|
||||
"label": "deepseek_completion",
|
||||
"url": "https://chat.deepseek.com/api/v0/chat/completion",
|
||||
"status_code": 200,
|
||||
"response_bytes": 41043,
|
||||
"contains_finished_token": true,
|
||||
"finished_token_count": 24
|
||||
},
|
||||
"notes": "Captured from upstream DeepSeek SSE via /admin/dev/captures with packet capture enabled. Account ID removed."
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
98
tests/scripts/capture-raw-stream-sample.sh
Executable file
98
tests/scripts/capture-raw-stream-sample.sh
Executable file
@@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
|
||||
cd "$ROOT_DIR"
|
||||
|
||||
CONFIG_PATH="${1:-config.json}"
|
||||
SAMPLE_ID="${2:-sample-$(date -u +%Y%m%dT%H%M%SZ)}"
|
||||
QUESTION="${3:-广州天气}"
|
||||
MODEL="${4:-deepseek-reasoner-search}"
|
||||
API_KEY="${5:-}"
|
||||
ADMIN_KEY="${DS2API_ADMIN_KEY:-admin}"
|
||||
|
||||
if [[ -z "$API_KEY" ]]; then
|
||||
API_KEY="$(python3 - <<'PY' "$CONFIG_PATH"
|
||||
import json,sys
|
||||
cfg=json.load(open(sys.argv[1]))
|
||||
keys=cfg.get('keys') or []
|
||||
print(keys[0] if keys else '')
|
||||
PY
|
||||
)"
|
||||
fi
|
||||
|
||||
if [[ -z "$API_KEY" ]]; then
|
||||
echo "[capture] missing API key (pass as arg5 or set config.keys[0])" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
OUT_DIR="tests/raw_stream_samples/${SAMPLE_ID}"
|
||||
mkdir -p "$OUT_DIR"
|
||||
|
||||
cleanup() {
|
||||
pkill -f "cmd/ds2api" >/dev/null 2>&1 || true
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
DS2API_CONFIG_PATH="$CONFIG_PATH" \
|
||||
DS2API_ADMIN_KEY="$ADMIN_KEY" \
|
||||
DS2API_DEV_PACKET_CAPTURE=1 \
|
||||
DS2API_DEV_PACKET_CAPTURE_LIMIT=20 \
|
||||
go run ./cmd/ds2api >/tmp/ds2api_capture_server.log 2>&1 &
|
||||
|
||||
for _ in $(seq 1 120); do
|
||||
if curl -sSf http://127.0.0.1:5001/healthz >/dev/null 2>&1; then
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
REQUEST_BODY="$(python3 - <<'PY' "$MODEL" "$QUESTION"
|
||||
import json,sys
|
||||
model,question=sys.argv[1:3]
|
||||
payload={
|
||||
'model':model,
|
||||
'stream':True,
|
||||
'messages':[{'role':'user','content':question}],
|
||||
}
|
||||
print(json.dumps(payload, ensure_ascii=False))
|
||||
PY
|
||||
)"
|
||||
|
||||
curl -sS http://127.0.0.1:5001/v1/chat/completions \
|
||||
-H 'Content-Type: application/json' \
|
||||
-H "Authorization: Bearer ${API_KEY}" \
|
||||
--data-binary "${REQUEST_BODY}" \
|
||||
>"${OUT_DIR}/openai.stream.sse"
|
||||
|
||||
curl -sS http://127.0.0.1:5001/admin/dev/captures \
|
||||
-H "Authorization: Bearer ${ADMIN_KEY}" \
|
||||
>"${OUT_DIR}/captures.json"
|
||||
|
||||
python3 - <<'PY' "$OUT_DIR" "$SAMPLE_ID" "$QUESTION" "$MODEL"
|
||||
import json,sys,pathlib,datetime
|
||||
out=pathlib.Path(sys.argv[1])
|
||||
sample_id,question,model=sys.argv[2:5]
|
||||
captures=json.loads((out/'captures.json').read_text())
|
||||
items=captures.get('items') or []
|
||||
if not items:
|
||||
raise SystemExit('no captured upstream stream found')
|
||||
best=max(items,key=lambda x:len((x.get('response_body') or '')))
|
||||
raw=best.get('response_body') or ''
|
||||
(out/'upstream.stream.sse').write_text(raw)
|
||||
meta={
|
||||
'sample_id':sample_id,
|
||||
'captured_at_utc':datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
|
||||
'request':{'model':model,'stream':True,'messages':[{'role':'user','content':question}]},
|
||||
'capture':{
|
||||
'label':best.get('label'),'url':best.get('url'),'status_code':best.get('status_code'),
|
||||
'response_bytes':len(raw),'contains_finished_token':('FINISHED' in raw),'finished_token_count':raw.count('FINISHED')
|
||||
}
|
||||
}
|
||||
(out/'meta.json').write_text(json.dumps(meta,ensure_ascii=False,indent=2))
|
||||
print(f'[capture] wrote sample to {out}')
|
||||
print(f'[capture] upstream bytes={len(raw)} finished_count={raw.count("FINISHED")}')
|
||||
PY
|
||||
|
||||
rm -f "${OUT_DIR}/captures.json"
|
||||
echo "[capture] done: ${OUT_DIR}"
|
||||
16
tests/scripts/run-raw-stream-sim.sh
Executable file
16
tests/scripts/run-raw-stream-sim.sh
Executable file
@@ -0,0 +1,16 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "$0")/../.." && pwd)"
|
||||
cd "$ROOT_DIR"
|
||||
|
||||
REPORT_DIR="artifacts/raw-stream-sim"
|
||||
mkdir -p "$REPORT_DIR"
|
||||
REPORT_PATH="$REPORT_DIR/report-$(date -u +%Y%m%dT%H%M%SZ).json"
|
||||
|
||||
node tests/tools/deepseek-sse-simulator.mjs \
|
||||
--samples-root tests/raw_stream_samples \
|
||||
--report "$REPORT_PATH" \
|
||||
"$@"
|
||||
|
||||
echo "[run-raw-stream-sim] report: $REPORT_PATH"
|
||||
158
tests/tools/deepseek-sse-simulator.mjs
Executable file
158
tests/tools/deepseek-sse-simulator.mjs
Executable file
@@ -0,0 +1,158 @@
|
||||
#!/usr/bin/env node
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import process from 'node:process';
|
||||
import { createRequire } from 'node:module';
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
const chatStream = require('../../api/chat-stream.js');
|
||||
const { parseChunkForContent } = chatStream.__test;
|
||||
|
||||
function parseArgs(argv) {
|
||||
const out = {
|
||||
samplesRoot: 'tests/raw_stream_samples',
|
||||
reportPath: '',
|
||||
failOnLeak: true,
|
||||
failOnMissingFinish: true,
|
||||
};
|
||||
for (let i = 2; i < argv.length; i += 1) {
|
||||
const a = argv[i];
|
||||
if (a === '--samples-root' && argv[i + 1]) {
|
||||
out.samplesRoot = argv[++i];
|
||||
} else if (a === '--report' && argv[i + 1]) {
|
||||
out.reportPath = argv[++i];
|
||||
} else if (a === '--no-fail-on-leak') {
|
||||
out.failOnLeak = false;
|
||||
} else if (a === '--no-fail-on-missing-finish') {
|
||||
out.failOnMissingFinish = false;
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function findSampleDirs(root) {
|
||||
if (!fs.existsSync(root)) {
|
||||
return [];
|
||||
}
|
||||
return fs.readdirSync(root)
|
||||
.map((name) => path.join(root, name))
|
||||
.filter((p) => fs.statSync(p).isDirectory())
|
||||
.filter((p) => fs.existsSync(path.join(p, 'upstream.stream.sse')))
|
||||
.sort();
|
||||
}
|
||||
|
||||
function parseSSE(raw) {
|
||||
const events = [];
|
||||
for (const block of raw.split(/\r?\n\r?\n/)) {
|
||||
if (!block.trim()) {
|
||||
continue;
|
||||
}
|
||||
let eventType = 'message';
|
||||
const dataLines = [];
|
||||
for (const line of block.split(/\r?\n/)) {
|
||||
if (line.startsWith('event:')) {
|
||||
eventType = line.slice(6).trim() || 'message';
|
||||
} else if (line.startsWith('data:')) {
|
||||
dataLines.push(line.slice(5).trimStart());
|
||||
}
|
||||
}
|
||||
if (dataLines.length === 0) {
|
||||
continue;
|
||||
}
|
||||
const payload = dataLines.join('\n').trim();
|
||||
events.push({ event: eventType, payload });
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
function replaySample(raw) {
|
||||
const events = parseSSE(raw);
|
||||
let currentType = 'thinking';
|
||||
let sawFinish = false;
|
||||
let outputText = '';
|
||||
let parsedChunks = 0;
|
||||
|
||||
for (const evt of events) {
|
||||
if (evt.event === 'finish') {
|
||||
sawFinish = true;
|
||||
}
|
||||
if (!evt.payload || evt.payload === '[DONE]' || evt.payload[0] !== '{') {
|
||||
continue;
|
||||
}
|
||||
let obj;
|
||||
try {
|
||||
obj = JSON.parse(evt.payload);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
parsedChunks += 1;
|
||||
const parsed = parseChunkForContent(obj, true, currentType);
|
||||
currentType = parsed.newType;
|
||||
if (parsed.finished) {
|
||||
sawFinish = true;
|
||||
}
|
||||
for (const part of parsed.parts) {
|
||||
outputText += part.text;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
events: events.length,
|
||||
parsedChunks,
|
||||
sawFinish,
|
||||
leakedFinishedText: outputText.includes('FINISHED'),
|
||||
outputChars: outputText.length,
|
||||
};
|
||||
}
|
||||
|
||||
function main() {
|
||||
const opts = parseArgs(process.argv);
|
||||
const dirs = findSampleDirs(opts.samplesRoot);
|
||||
if (dirs.length === 0) {
|
||||
console.error(`[sim] no samples found: ${opts.samplesRoot}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const report = {
|
||||
generated_at: new Date().toISOString(),
|
||||
samples_root: opts.samplesRoot,
|
||||
total: dirs.length,
|
||||
failed: 0,
|
||||
samples: [],
|
||||
};
|
||||
|
||||
for (const dir of dirs) {
|
||||
const sampleID = path.basename(dir);
|
||||
const raw = fs.readFileSync(path.join(dir, 'upstream.stream.sse'), 'utf8');
|
||||
const r = replaySample(raw);
|
||||
const errors = [];
|
||||
if (opts.failOnMissingFinish && !r.sawFinish) {
|
||||
errors.push('missing finish signal');
|
||||
}
|
||||
if (opts.failOnLeak && r.leakedFinishedText) {
|
||||
errors.push('FINISHED leaked into output text');
|
||||
}
|
||||
if (errors.length > 0) {
|
||||
report.failed += 1;
|
||||
}
|
||||
report.samples.push({ sample_id: sampleID, ...r, ok: errors.length === 0, errors });
|
||||
}
|
||||
|
||||
if (opts.reportPath) {
|
||||
fs.writeFileSync(opts.reportPath, JSON.stringify(report, null, 2));
|
||||
}
|
||||
|
||||
for (const s of report.samples) {
|
||||
const status = s.ok ? 'OK' : 'FAIL';
|
||||
const note = s.errors.length > 0 ? ` errors=${s.errors.join(';')}` : '';
|
||||
console.log(`[sim] ${status} ${s.sample_id} events=${s.events} parsed=${s.parsedChunks} chars=${s.outputChars}${note}`);
|
||||
}
|
||||
|
||||
if (report.failed > 0) {
|
||||
console.error(`[sim] ${report.failed}/${report.total} samples failed`);
|
||||
process.exit(2);
|
||||
}
|
||||
console.log(`[sim] all ${report.total} samples passed`);
|
||||
}
|
||||
|
||||
main();
|
||||
Reference in New Issue
Block a user