diff --git a/docs/prompt-compatibility.md b/docs/prompt-compatibility.md index 43f2089..d92cea3 100644 --- a/docs/prompt-compatibility.md +++ b/docs/prompt-compatibility.md @@ -156,6 +156,7 @@ OpenAI Chat / Responses 在标准化后、current input file 之前,会默认 工具调用正例现在优先示范官方 DSML 风格:`<|DSML|tool_calls>` → `<|DSML|invoke name="...">` → `<|DSML|parameter name="...">`。 兼容层仍接受旧式纯 `` wrapper,但提示词会优先要求模型输出官方 DSML 标签,并强调不能只输出 closing wrapper 而漏掉 opening tag。需要注意:这是“兼容 DSML 外壳,内部仍以 XML 解析语义为准”,不是原生 DSML 全链路实现;DSML 标签会在解析入口归一化回现有 XML 标签后继续走同一套 parser。 数组参数使用 `...` 子节点表示;当某个参数体只包含 item 子节点时,Go / Node 解析器会把它还原成数组,避免 `questions` / `options` 这类 schema 中要求 array 的参数被误解析成 `{ "item": ... }` 对象。若模型把完整结构化 XML fragment 误包进 CDATA,兼容层会在保护 `content` / `command` 等原文字段的前提下,尝试把非原文字段中的 CDATA XML fragment 还原成 object / array。不过,如果 CDATA 只是单个平面的 XML/HTML 标签,例如 `urgent` 这种行内标记,兼容层会保留原始字符串,不会强行升成 object / array;只有明显表示结构的 CDATA 片段,例如多兄弟节点、嵌套子节点或 `item` 列表,才会触发结构化恢复。 +Go 侧读取 DeepSeek SSE 时不再依赖 `bufio.Scanner` 的固定 2MiB 单行上限;当写文件类工具把很长的 `content` 放在单个 `data:` 行里返回时,非流式收集、流式解析和 auto-continue 透传都会保留完整行,再进入同一套工具解析与序列化流程。 在 assistant 最终回包阶段,如果某个 tool 参数在声明 schema 中明确是 `string`,兼容层会在把解析后的 `tool_calls` / `function_call` 重新序列化成 OpenAI / Responses / Claude 可见参数前,递归把该路径上的 number / bool / object / array 统一转成字符串;其中 object / array 会压成紧凑 JSON 字符串。这个保护只对 schema 明确声明为 string 的路径生效,不会改写本来就是 `number` / `boolean` / `object` / `array` 的参数。这样可以兼容 DeepSeek 输出了结构化片段、但上游客户端工具 schema 又严格要求字符串参数的场景(例如 `content`、`prompt`、`path`、`taskId` 等)。 工具 schema 的权威来源始终是**当前请求实际携带的 schema**,而不是同名工具在其他 runtime(Claude Code / OpenCode / Codex 等)里的默认印象。兼容层现在会同时兼容 OpenAI 风格 `function.parameters`、直接工具对象上的 `parameters` / `input_schema`、以及 camelCase 的 `inputSchema` / `schema`,并在最终输出阶段按这份请求内 schema 决定是保留 array/object,还是仅对明确声明为 `string` 的路径做字符串化。该规则同样适用于 Claude 的流式收尾和 Vercel Node 流式 tool-call formatter,避免不同 runtime 因 schema shape 差异而出现同名工具参数类型漂移。 正例中的工具名只会来自当前请求实际声明的工具;如果当前请求没有足够的已知工具形态,就省略对应的单工具、多工具或嵌套示例,避免把不可用工具名写进 prompt。 diff --git a/docs/toolcall-semantics.md b/docs/toolcall-semantics.md index ddef29b..0f24d6f 100644 --- a/docs/toolcall-semantics.md +++ b/docs/toolcall-semantics.md @@ -62,6 +62,7 @@ - 支持嵌套围栏(如 4 反引号嵌套 3 反引号)和 CDATA 内围栏保护 - 如果模型把 `` 或 Markdown inline code 里的 `<|DSML|tool_calls>`)而后面紧跟真正工具调用时,sieve 会跳过不可解析的 mention 候选并继续匹配后续真实工具块,不会因 mention 导致工具调用丢失,也不会截断 mention 后的正文 +- Go 侧 SSE 读取不再使用 `bufio.Scanner` 的固定 token 上限;单个 `data:` 行中包含很长的写文件参数时,非流式收集、流式解析与 auto-continue 透传都应保留完整行,再交给 tool parser 处理 另外,`` 的值如果本身是合法 JSON 字面量,也会按结构化值解析,而不是一律保留为字符串。例如 `123`、`true`、`null`、`[1,2]`、`{"a":1}` 都会还原成对应的 number / boolean / null / array / object。 结构化 XML 参数也会还原为 JSON 结构:如果参数体只包含一个或多个 `...` 子节点,会输出数组;嵌套对象里的 item-only 字段也同样按数组处理。例如 `...` 会输出 `{"questions":[{"question":"..."}]}`,而不是 `{"questions":{"item":...}}`。 diff --git a/internal/deepseek/client/client_continue.go b/internal/deepseek/client/client_continue.go index b76d921..009c027 100644 --- a/internal/deepseek/client/client_continue.go +++ b/internal/deepseek/client/client_continue.go @@ -133,33 +133,51 @@ func pumpAutoContinue(ctx context.Context, pw *io.PipeWriter, initial io.ReadClo // sentinels are consumed (not forwarded) so that the downstream only sees // one final [DONE] at the very end. func streamBodyWithContinueState(ctx context.Context, pw *io.PipeWriter, body io.Reader, state *continueState) (bool, error) { - scanner := bufio.NewScanner(body) - scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024) + reader := bufio.NewReaderSize(body, 64*1024) hadDone := false - for scanner.Scan() { + for { select { case <-ctx.Done(): return hadDone, ctx.Err() default: } - line := append([]byte{}, scanner.Bytes()...) - trimmed := strings.TrimSpace(string(line)) - if trimmed == "" { - continue - } - if strings.HasPrefix(trimmed, "data:") { - data := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:")) - if data == "[DONE]" { - hadDone = true - continue + line, err := reader.ReadBytes('\n') + if len(line) == 0 && err != nil { + if err == io.EOF { + return hadDone, nil } - state.observe(data) + return hadDone, err } - if _, err := io.Copy(pw, bytes.NewReader(append(line, '\n'))); err != nil { + trimmed := strings.TrimSpace(string(line)) + if trimmed != "" { + if strings.HasPrefix(trimmed, "data:") { + data := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:")) + if data == "[DONE]" { + hadDone = true + if err != nil && err != io.EOF { + return hadDone, err + } + if err == io.EOF { + return hadDone, nil + } + continue + } + state.observe(data) + } + if !strings.HasSuffix(string(line), "\n") { + line = append(line, '\n') + } + if _, copyErr := io.Copy(pw, bytes.NewReader(line)); copyErr != nil { + return hadDone, copyErr + } + } + if err != nil { + if err == io.EOF { + return hadDone, nil + } return hadDone, err } } - return hadDone, scanner.Err() } // observe extracts continue-relevant signals from an SSE JSON chunk. @@ -175,34 +193,48 @@ func (s *continueState) observe(data string) { if id := intFrom(chunk["response_message_id"]); id > 0 { s.responseMessageID = id } - // Path-based status: {"p": "response/status", "v": "FINISHED"} - if p, _ := chunk["p"].(string); p == "response/status" { - s.setStatus(asString(chunk["v"])) - } + s.observeDirectPatch(asString(chunk["p"]), chunk["v"]) if p, _ := chunk["p"].(string); p == "response" { s.observeBatchPatches("response", chunk["v"]) } else { s.observeBatchPatches("", chunk["v"]) } - // Nested v.response - v, _ := chunk["v"].(map[string]any) - if response, _ := v["response"].(map[string]any); response != nil { - if id := intFrom(response["message_id"]); id > 0 { - s.responseMessageID = id - } - s.setStatus(asString(response["status"])) - if autoContinue, ok := response["auto_continue"].(bool); ok && autoContinue { + if v, _ := chunk["v"].(map[string]any); v != nil { + s.observeResponseObject(v["response"]) + } + if message, _ := chunk["message"].(map[string]any); message != nil { + s.observeResponseObject(message["response"]) + } +} + +func (s *continueState) observeDirectPatch(path string, value any) { + if s == nil { + return + } + switch strings.Trim(strings.TrimSpace(path), "/") { + case "response/status", "status", "response/quasi_status", "quasi_status": + s.setStatus(asString(value)) + case "response/auto_continue", "auto_continue": + if v, ok := value.(bool); ok && v { s.lastStatus = "AUTO_CONTINUE" } } - // Nested message.response - if message, _ := chunk["message"].(map[string]any); message != nil { - if response, _ := message["response"].(map[string]any); response != nil { - if id := intFrom(response["message_id"]); id > 0 { - s.responseMessageID = id - } - s.setStatus(asString(response["status"])) - } +} + +func (s *continueState) observeResponseObject(raw any) { + if s == nil { + return + } + response, _ := raw.(map[string]any) + if response == nil { + return + } + if id := intFrom(response["message_id"]); id > 0 { + s.responseMessageID = id + } + s.setStatus(asString(response["status"])) + if autoContinue, ok := response["auto_continue"].(bool); ok && autoContinue { + s.lastStatus = "AUTO_CONTINUE" } } @@ -230,6 +262,10 @@ func (s *continueState) observeBatchPatches(parentPath string, raw any) { switch strings.Trim(strings.TrimSpace(fullPath), "/") { case "response/status", "status", "response/quasi_status", "quasi_status": s.setStatus(asString(m["v"])) + case "response/auto_continue", "auto_continue": + if v, ok := m["v"].(bool); ok && v { + s.lastStatus = "AUTO_CONTINUE" + } } } } diff --git a/internal/deepseek/client/client_continue_test.go b/internal/deepseek/client/client_continue_test.go index bb1f58f..b79dbca 100644 --- a/internal/deepseek/client/client_continue_test.go +++ b/internal/deepseek/client/client_continue_test.go @@ -150,6 +150,62 @@ func TestAutoContinueDoesNotTriggerOnPlainWIPWithoutExplicitContinuationSignal(t } } +func TestAutoContinuePassesThroughLongSingleSSELine(t *testing.T) { + payload := strings.Repeat("x", 2*1024*1024+4096) + initialBody := `data: {"p":"response/content","v":"` + payload + `"}` + "\n" + + `data: [DONE]` + "\n" + + body := newAutoContinueBody(context.Background(), io.NopCloser(strings.NewReader(initialBody)), "session-123", 8, func(context.Context, string, int) (*http.Response, error) { + return nil, errors.New("continue should not have been called") + }) + defer func() { _ = body.Close() }() + + out, err := io.ReadAll(body) + if err != nil { + t.Fatalf("read body failed: %v", err) + } + if !bytes.Contains(out, []byte(payload)) { + t.Fatalf("expected long SSE payload to pass through, got len=%d want payload len=%d", len(out), len(payload)) + } + if !bytes.Contains(out, []byte(`data: [DONE]`)) { + t.Fatalf("expected final DONE sentinel in body, got len=%d", len(out)) + } +} + +func TestAutoContinueTriggersOnDirectQuasiStatusIncomplete(t *testing.T) { + initialBody := strings.Join([]string{ + `data: {"response_message_id":321,"p":"response/content","v":""}` + "\n" + + `data: {"p":"response/status","v":"FINISHED"}` + "\n" + + `data: [DONE]` + "\n", + )), + }, nil + }) + defer func() { _ = body.Close() }() + + out, err := io.ReadAll(body) + if err != nil { + t.Fatalf("read body failed: %v", err) + } + if continueCalls.Load() != 1 { + t.Fatalf("expected exactly one continue call, got %d", continueCalls.Load()) + } + if !bytes.Contains(out, []byte("part-one")) || !bytes.Contains(out, []byte("-part-two")) { + t.Fatalf("expected continued tool content in body, got=%s", string(out)) + } +} + func TestAutoContinueTriggersOnResponseBatchQuasiStatusIncomplete(t *testing.T) { initialBody := strings.Join([]string{ `data: {"response_message_id":321,"v":{"response":{"message_id":321,"status":"WIP","auto_continue":false}}}`, diff --git a/internal/deepseek/protocol/sse.go b/internal/deepseek/protocol/sse.go index c11b72b..af942aa 100644 --- a/internal/deepseek/protocol/sse.go +++ b/internal/deepseek/protocol/sse.go @@ -2,20 +2,24 @@ package protocol import ( "bufio" + "io" "net/http" ) func ScanSSELines(resp *http.Response, onLine func([]byte) bool) error { - scanner := bufio.NewScanner(resp.Body) - buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 2*1024*1024) - for scanner.Scan() { - if !onLine(scanner.Bytes()) { - break + reader := bufio.NewReaderSize(resp.Body, 64*1024) + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + if !onLine(line) { + return nil + } + } + if err != nil { + if err == io.EOF { + return nil + } + return err } } - if err := scanner.Err(); err != nil { - return err - } - return nil } diff --git a/internal/deepseek/protocol/sse_test.go b/internal/deepseek/protocol/sse_test.go new file mode 100644 index 0000000..17589c7 --- /dev/null +++ b/internal/deepseek/protocol/sse_test.go @@ -0,0 +1,26 @@ +package protocol + +import ( + "io" + "net/http" + "strings" + "testing" +) + +func TestScanSSELinesHandlesLongSingleLine(t *testing.T) { + payload := strings.Repeat("x", 2*1024*1024+4096) + body := "data: {\"p\":\"response/content\",\"v\":\"" + payload + "\"}\n" + resp := &http.Response{Body: io.NopCloser(strings.NewReader(body))} + + var got string + err := ScanSSELines(resp, func(line []byte) bool { + got = string(line) + return true + }) + if err != nil { + t.Fatalf("ScanSSELines returned error: %v", err) + } + if !strings.Contains(got, payload) { + t.Fatalf("long SSE line was not preserved: got len=%d want payload len=%d", len(got), len(payload)) + } +} diff --git a/internal/js/chat-stream/vercel_stream_impl.js b/internal/js/chat-stream/vercel_stream_impl.js index f6c642d..02af872 100644 --- a/internal/js/chat-stream/vercel_stream_impl.js +++ b/internal/js/chat-stream/vercel_stream_impl.js @@ -516,32 +516,51 @@ function observeContinueState(state, chunk) { if (topID > 0) { state.responseMessageID = topID; } - if (chunk.p === 'response/status') { - setContinueStatus(state, asString(chunk.v)); - } + observeContinueDirectPatch(state, chunk.p, chunk.v); if (chunk.p === 'response') { observeContinueBatchPatches(state, 'response', chunk.v); } else { observeContinueBatchPatches(state, '', chunk.v); } const response = chunk.v && typeof chunk.v === 'object' ? chunk.v.response : null; - if (response && typeof response === 'object') { - const id = numberValue(response.message_id); - if (id > 0) { - state.responseMessageID = id; - } - setContinueStatus(state, asString(response.status)); - if (response.auto_continue === true) { - state.lastStatus = 'AUTO_CONTINUE'; - } - } + observeContinueResponseObject(state, response); const messageResponse = chunk.message && typeof chunk.message === 'object' && chunk.message.response; - if (messageResponse && typeof messageResponse === 'object') { - const id = numberValue(messageResponse.message_id); - if (id > 0) { - state.responseMessageID = id; - } - setContinueStatus(state, asString(messageResponse.status)); + observeContinueResponseObject(state, messageResponse); +} + +function observeContinueDirectPatch(state, path, value) { + if (!state) { + return; + } + switch (asString(path).trim().replace(/^\/+|\/+$/g, '')) { + case 'response/status': + case 'status': + case 'response/quasi_status': + case 'quasi_status': + setContinueStatus(state, asString(value)); + break; + case 'response/auto_continue': + case 'auto_continue': + if (value === true) { + state.lastStatus = 'AUTO_CONTINUE'; + } + break; + default: + break; + } +} + +function observeContinueResponseObject(state, response) { + if (!state || !response || typeof response !== 'object') { + return; + } + const id = numberValue(response.message_id); + if (id > 0) { + state.responseMessageID = id; + } + setContinueStatus(state, asString(response.status)); + if (response.auto_continue === true) { + state.lastStatus = 'AUTO_CONTINUE'; } } @@ -569,6 +588,12 @@ function observeContinueBatchPatches(state, parentPath, raw) { case 'quasi_status': setContinueStatus(state, asString(patch.v)); break; + case 'response/auto_continue': + case 'auto_continue': + if (patch.v === true) { + state.lastStatus = 'AUTO_CONTINUE'; + } + break; default: break; } diff --git a/internal/sse/consumer_edge_test.go b/internal/sse/consumer_edge_test.go index 4654ef8..8d39a3f 100644 --- a/internal/sse/consumer_edge_test.go +++ b/internal/sse/consumer_edge_test.go @@ -41,6 +41,15 @@ func TestCollectStreamTextOnly(t *testing.T) { } } +func TestCollectStreamHandlesLongSingleSSELine(t *testing.T) { + payload := strings.Repeat("x", 2*1024*1024+4096) + resp := makeHTTPResponse(makeLargeContentSSEBody(t, payload)) + result := CollectStream(resp, false, true) + if result.Text != payload { + t.Fatalf("long SSE line payload mismatch: got len=%d want len=%d", len(result.Text), len(payload)) + } +} + func TestCollectStreamThinkingAndText(t *testing.T) { resp := makeHTTPResponse( "data: {\"p\":\"response/thinking_content\",\"v\":\"Thinking...\"}\n" + diff --git a/internal/sse/stream.go b/internal/sse/stream.go index 8b8aa9b..44f75cb 100644 --- a/internal/sse/stream.go +++ b/internal/sse/stream.go @@ -9,8 +9,7 @@ import ( const ( parsedLineBufferSize = 128 - scannerBufferSize = 64 * 1024 - maxScannerLineSize = 2 * 1024 * 1024 + lineReaderBufferSize = 64 * 1024 minFlushChars = 160 maxFlushWait = 80 * time.Millisecond ) @@ -29,8 +28,8 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo eof bool } lineCh := make(chan scanItem, 1) - stopScanner := make(chan struct{}) - defer close(stopScanner) + stopReader := make(chan struct{}) + defer close(stopReader) go func() { sendScanItem := func(item scanItem) bool { select { @@ -38,20 +37,28 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo return true case <-ctx.Done(): return false - case <-stopScanner: + case <-stopReader: return false } } defer close(lineCh) - scanner := bufio.NewScanner(body) - scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize) - for scanner.Scan() { - line := append([]byte{}, scanner.Bytes()...) - if !sendScanItem(scanItem{line: line}) { + reader := bufio.NewReaderSize(body, lineReaderBufferSize) + for { + line, err := reader.ReadBytes('\n') + if len(line) > 0 { + line = append([]byte{}, line...) + if !sendScanItem(scanItem{line: line}) { + return + } + } + if err != nil { + if err == io.EOF { + err = nil + } + _ = sendScanItem(scanItem{err: err, eof: true}) return } } - _ = sendScanItem(scanItem{err: scanner.Err(), eof: true}) }() ticker := time.NewTicker(maxFlushWait) diff --git a/internal/sse/stream_test.go b/internal/sse/stream_test.go index a4fd2bb..d6addb7 100644 --- a/internal/sse/stream_test.go +++ b/internal/sse/stream_test.go @@ -2,10 +2,23 @@ package sse import ( "context" + "encoding/json" "strings" "testing" ) +func makeLargeContentSSEBody(t *testing.T, payload string) string { + t.Helper() + line, err := json.Marshal(map[string]any{ + "p": "response/content", + "v": payload, + }) + if err != nil { + t.Fatalf("marshal SSE line failed: %v", err) + } + return "data: " + string(line) + "\n" + "data: [DONE]\n" +} + func TestStartParsedLinePumpParsesAndStops(t *testing.T) { body := strings.NewReader("data: {\"p\":\"response/content\",\"v\":\"hi\"}\n\ndata: [DONE]\n") results, done := StartParsedLinePump(context.Background(), body, false, "text") @@ -28,3 +41,28 @@ func TestStartParsedLinePumpParsesAndStops(t *testing.T) { t.Fatalf("expected last line to stop stream, got parsed=%v stop=%v", last.Parsed, last.Stop) } } + +func TestStartParsedLinePumpHandlesLongSingleSSELine(t *testing.T) { + payload := strings.Repeat("x", 2*1024*1024+4096) + results, done := StartParsedLinePump(context.Background(), strings.NewReader(makeLargeContentSSEBody(t, payload)), false, "text") + + var got strings.Builder + var sawDone bool + for r := range results { + for _, p := range r.Parts { + got.WriteString(p.Text) + } + if r.Stop { + sawDone = true + } + } + if err := <-done; err != nil { + t.Fatalf("unexpected long-line read error: %v", err) + } + if got.String() != payload { + t.Fatalf("long SSE line payload mismatch: got len=%d want len=%d", got.Len(), len(payload)) + } + if !sawDone { + t.Fatal("expected DONE after long SSE line") + } +} diff --git a/tests/node/chat-stream.test.js b/tests/node/chat-stream.test.js index 53ddeaf..ba5ef7d 100644 --- a/tests/node/chat-stream.test.js +++ b/tests/node/chat-stream.test.js @@ -258,6 +258,28 @@ test('vercel stream exhausts DeepSeek continue before synthetic retry', async () assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false); }); +test('vercel stream continues direct quasi_status incomplete before final tool call', async () => { + const { frames, fetchURLs } = await runMockVercelStreamSequence([ + [ + 'data: {"response_message_id":7,"p":"response/content","v":""}\n\n', + 'data: {"p":"response/status","v":"FINISHED"}\n\n', + 'data: [DONE]\n\n', + ], + ], { tool_names: ['write_file'] }); + const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame)); + const toolDelta = parsed.find((item) => item.choices?.[0]?.delta?.tool_calls); + assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1); + assert.ok(toolDelta); + const args = JSON.parse(toolDelta.choices[0].delta.tool_calls[0].function.arguments); + assert.equal(args.content, 'part-one-part-two'); + assert.equal(parsed.at(-1).choices[0].finish_reason, 'tool_calls'); +}); + test('vercel stream usage completion_tokens does not double-count visible output', async () => {