fix: remove bufio.Scanner 2MiB line limit for SSE; support quasi_status direct patch

Replace bufio.Scanner with bufio.NewReaderSize + ReadBytes('\n') across all
SSE read paths to preserve long single-line data (e.g. write_file content).
Add quasi_status and auto_continue handling as direct path-based patches in
both Go continue observer and Node vercel_stream_impl, mirroring existing
batch-patch logic. Add 2MiB+ line throughput tests at every SSE layer.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
CJACK
2026-05-01 15:45:17 +08:00
parent fd0ec29991
commit 0a6ef8e3f2
11 changed files with 301 additions and 76 deletions

View File

@@ -156,6 +156,7 @@ OpenAI Chat / Responses 在标准化后、current input file 之前,会默认
工具调用正例现在优先示范官方 DSML 风格:`<|DSML|tool_calls>``<|DSML|invoke name="...">``<|DSML|parameter name="...">`
兼容层仍接受旧式纯 `<tool_calls>` wrapper但提示词会优先要求模型输出官方 DSML 标签,并强调不能只输出 closing wrapper 而漏掉 opening tag。需要注意这是“兼容 DSML 外壳,内部仍以 XML 解析语义为准”,不是原生 DSML 全链路实现DSML 标签会在解析入口归一化回现有 XML 标签后继续走同一套 parser。
数组参数使用 `<item>...</item>` 子节点表示;当某个参数体只包含 item 子节点时Go / Node 解析器会把它还原成数组,避免 `questions` / `options` 这类 schema 中要求 array 的参数被误解析成 `{ "item": ... }` 对象。若模型把完整结构化 XML fragment 误包进 CDATA兼容层会在保护 `content` / `command` 等原文字段的前提下,尝试把非原文字段中的 CDATA XML fragment 还原成 object / array。不过如果 CDATA 只是单个平面的 XML/HTML 标签,例如 `<b>urgent</b>` 这种行内标记,兼容层会保留原始字符串,不会强行升成 object / array只有明显表示结构的 CDATA 片段,例如多兄弟节点、嵌套子节点或 `item` 列表,才会触发结构化恢复。
Go 侧读取 DeepSeek SSE 时不再依赖 `bufio.Scanner` 的固定 2MiB 单行上限;当写文件类工具把很长的 `content` 放在单个 `data:` 行里返回时,非流式收集、流式解析和 auto-continue 透传都会保留完整行,再进入同一套工具解析与序列化流程。
在 assistant 最终回包阶段,如果某个 tool 参数在声明 schema 中明确是 `string`,兼容层会在把解析后的 `tool_calls` / `function_call` 重新序列化成 OpenAI / Responses / Claude 可见参数前,递归把该路径上的 number / bool / object / array 统一转成字符串;其中 object / array 会压成紧凑 JSON 字符串。这个保护只对 schema 明确声明为 string 的路径生效,不会改写本来就是 `number` / `boolean` / `object` / `array` 的参数。这样可以兼容 DeepSeek 输出了结构化片段、但上游客户端工具 schema 又严格要求字符串参数的场景(例如 `content``prompt``path``taskId` 等)。
工具 schema 的权威来源始终是**当前请求实际携带的 schema**,而不是同名工具在其他 runtimeClaude Code / OpenCode / Codex 等)里的默认印象。兼容层现在会同时兼容 OpenAI 风格 `function.parameters`、直接工具对象上的 `parameters` / `input_schema`、以及 camelCase 的 `inputSchema` / `schema`,并在最终输出阶段按这份请求内 schema 决定是保留 array/object还是仅对明确声明为 `string` 的路径做字符串化。该规则同样适用于 Claude 的流式收尾和 Vercel Node 流式 tool-call formatter避免不同 runtime 因 schema shape 差异而出现同名工具参数类型漂移。
正例中的工具名只会来自当前请求实际声明的工具;如果当前请求没有足够的已知工具形态,就省略对应的单工具、多工具或嵌套示例,避免把不可用工具名写进 prompt。

View File

@@ -62,6 +62,7 @@
- 支持嵌套围栏(如 4 反引号嵌套 3 反引号)和 CDATA 内围栏保护
- 如果模型把 `<![CDATA[` 打开后却没有闭合,流式扫描阶段仍会保守地继续缓冲,不会误把 CDATA 里的示例 XML 当成真实工具调用;在最终 parse / flush 恢复阶段,会对这类 loose CDATA 做窄修复,尽量保住外层已完整包裹的真实工具调用
- 当文本中 mention 了某种标签名(如 `<dsml|tool_calls>` 或 Markdown inline code 里的 `<|DSML|tool_calls>`而后面紧跟真正工具调用时sieve 会跳过不可解析的 mention 候选并继续匹配后续真实工具块,不会因 mention 导致工具调用丢失,也不会截断 mention 后的正文
- Go 侧 SSE 读取不再使用 `bufio.Scanner` 的固定 token 上限;单个 `data:` 行中包含很长的写文件参数时,非流式收集、流式解析与 auto-continue 透传都应保留完整行,再交给 tool parser 处理
另外,`<parameter>` 的值如果本身是合法 JSON 字面量,也会按结构化值解析,而不是一律保留为字符串。例如 `123``true``null``[1,2]``{"a":1}` 都会还原成对应的 number / boolean / null / array / object。
结构化 XML 参数也会还原为 JSON 结构:如果参数体只包含一个或多个 `<item>...</item>` 子节点,会输出数组;嵌套对象里的 item-only 字段也同样按数组处理。例如 `<parameter name="questions"><item><question>...</question></item></parameter>` 会输出 `{"questions":[{"question":"..."}]}`,而不是 `{"questions":{"item":...}}`

View File

@@ -133,33 +133,51 @@ func pumpAutoContinue(ctx context.Context, pw *io.PipeWriter, initial io.ReadClo
// sentinels are consumed (not forwarded) so that the downstream only sees
// one final [DONE] at the very end.
func streamBodyWithContinueState(ctx context.Context, pw *io.PipeWriter, body io.Reader, state *continueState) (bool, error) {
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
reader := bufio.NewReaderSize(body, 64*1024)
hadDone := false
for scanner.Scan() {
for {
select {
case <-ctx.Done():
return hadDone, ctx.Err()
default:
}
line := append([]byte{}, scanner.Bytes()...)
trimmed := strings.TrimSpace(string(line))
if trimmed == "" {
continue
}
if strings.HasPrefix(trimmed, "data:") {
data := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if data == "[DONE]" {
hadDone = true
continue
line, err := reader.ReadBytes('\n')
if len(line) == 0 && err != nil {
if err == io.EOF {
return hadDone, nil
}
state.observe(data)
return hadDone, err
}
if _, err := io.Copy(pw, bytes.NewReader(append(line, '\n'))); err != nil {
trimmed := strings.TrimSpace(string(line))
if trimmed != "" {
if strings.HasPrefix(trimmed, "data:") {
data := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if data == "[DONE]" {
hadDone = true
if err != nil && err != io.EOF {
return hadDone, err
}
if err == io.EOF {
return hadDone, nil
}
continue
}
state.observe(data)
}
if !strings.HasSuffix(string(line), "\n") {
line = append(line, '\n')
}
if _, copyErr := io.Copy(pw, bytes.NewReader(line)); copyErr != nil {
return hadDone, copyErr
}
}
if err != nil {
if err == io.EOF {
return hadDone, nil
}
return hadDone, err
}
}
return hadDone, scanner.Err()
}
// observe extracts continue-relevant signals from an SSE JSON chunk.
@@ -175,34 +193,48 @@ func (s *continueState) observe(data string) {
if id := intFrom(chunk["response_message_id"]); id > 0 {
s.responseMessageID = id
}
// Path-based status: {"p": "response/status", "v": "FINISHED"}
if p, _ := chunk["p"].(string); p == "response/status" {
s.setStatus(asString(chunk["v"]))
}
s.observeDirectPatch(asString(chunk["p"]), chunk["v"])
if p, _ := chunk["p"].(string); p == "response" {
s.observeBatchPatches("response", chunk["v"])
} else {
s.observeBatchPatches("", chunk["v"])
}
// Nested v.response
v, _ := chunk["v"].(map[string]any)
if response, _ := v["response"].(map[string]any); response != nil {
if id := intFrom(response["message_id"]); id > 0 {
s.responseMessageID = id
}
s.setStatus(asString(response["status"]))
if autoContinue, ok := response["auto_continue"].(bool); ok && autoContinue {
if v, _ := chunk["v"].(map[string]any); v != nil {
s.observeResponseObject(v["response"])
}
if message, _ := chunk["message"].(map[string]any); message != nil {
s.observeResponseObject(message["response"])
}
}
func (s *continueState) observeDirectPatch(path string, value any) {
if s == nil {
return
}
switch strings.Trim(strings.TrimSpace(path), "/") {
case "response/status", "status", "response/quasi_status", "quasi_status":
s.setStatus(asString(value))
case "response/auto_continue", "auto_continue":
if v, ok := value.(bool); ok && v {
s.lastStatus = "AUTO_CONTINUE"
}
}
// Nested message.response
if message, _ := chunk["message"].(map[string]any); message != nil {
if response, _ := message["response"].(map[string]any); response != nil {
if id := intFrom(response["message_id"]); id > 0 {
s.responseMessageID = id
}
s.setStatus(asString(response["status"]))
}
}
func (s *continueState) observeResponseObject(raw any) {
if s == nil {
return
}
response, _ := raw.(map[string]any)
if response == nil {
return
}
if id := intFrom(response["message_id"]); id > 0 {
s.responseMessageID = id
}
s.setStatus(asString(response["status"]))
if autoContinue, ok := response["auto_continue"].(bool); ok && autoContinue {
s.lastStatus = "AUTO_CONTINUE"
}
}
@@ -230,6 +262,10 @@ func (s *continueState) observeBatchPatches(parentPath string, raw any) {
switch strings.Trim(strings.TrimSpace(fullPath), "/") {
case "response/status", "status", "response/quasi_status", "quasi_status":
s.setStatus(asString(m["v"]))
case "response/auto_continue", "auto_continue":
if v, ok := m["v"].(bool); ok && v {
s.lastStatus = "AUTO_CONTINUE"
}
}
}
}

View File

@@ -150,6 +150,62 @@ func TestAutoContinueDoesNotTriggerOnPlainWIPWithoutExplicitContinuationSignal(t
}
}
func TestAutoContinuePassesThroughLongSingleSSELine(t *testing.T) {
payload := strings.Repeat("x", 2*1024*1024+4096)
initialBody := `data: {"p":"response/content","v":"` + payload + `"}` + "\n" +
`data: [DONE]` + "\n"
body := newAutoContinueBody(context.Background(), io.NopCloser(strings.NewReader(initialBody)), "session-123", 8, func(context.Context, string, int) (*http.Response, error) {
return nil, errors.New("continue should not have been called")
})
defer func() { _ = body.Close() }()
out, err := io.ReadAll(body)
if err != nil {
t.Fatalf("read body failed: %v", err)
}
if !bytes.Contains(out, []byte(payload)) {
t.Fatalf("expected long SSE payload to pass through, got len=%d want payload len=%d", len(out), len(payload))
}
if !bytes.Contains(out, []byte(`data: [DONE]`)) {
t.Fatalf("expected final DONE sentinel in body, got len=%d", len(out))
}
}
func TestAutoContinueTriggersOnDirectQuasiStatusIncomplete(t *testing.T) {
initialBody := strings.Join([]string{
`data: {"response_message_id":321,"p":"response/content","v":"<tool_calls><invoke name=\"write_file\"><parameter name=\"content\"><![CDATA[part-one"}`,
`data: {"p":"response/quasi_status","v":"INCOMPLETE"}`,
`data: [DONE]`,
}, "\n") + "\n"
var continueCalls atomic.Int32
body := newAutoContinueBody(context.Background(), io.NopCloser(strings.NewReader(initialBody)), "session-123", 8, func(context.Context, string, int) (*http.Response, error) {
continueCalls.Add(1)
return &http.Response{
StatusCode: http.StatusOK,
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader(
`data: {"response_message_id":322,"p":"response/content","v":"-part-two]]></parameter></invoke></tool_calls>"}` + "\n" +
`data: {"p":"response/status","v":"FINISHED"}` + "\n" +
`data: [DONE]` + "\n",
)),
}, nil
})
defer func() { _ = body.Close() }()
out, err := io.ReadAll(body)
if err != nil {
t.Fatalf("read body failed: %v", err)
}
if continueCalls.Load() != 1 {
t.Fatalf("expected exactly one continue call, got %d", continueCalls.Load())
}
if !bytes.Contains(out, []byte("part-one")) || !bytes.Contains(out, []byte("-part-two")) {
t.Fatalf("expected continued tool content in body, got=%s", string(out))
}
}
func TestAutoContinueTriggersOnResponseBatchQuasiStatusIncomplete(t *testing.T) {
initialBody := strings.Join([]string{
`data: {"response_message_id":321,"v":{"response":{"message_id":321,"status":"WIP","auto_continue":false}}}`,

View File

@@ -2,20 +2,24 @@ package protocol
import (
"bufio"
"io"
"net/http"
)
func ScanSSELines(resp *http.Response, onLine func([]byte) bool) error {
scanner := bufio.NewScanner(resp.Body)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 2*1024*1024)
for scanner.Scan() {
if !onLine(scanner.Bytes()) {
break
reader := bufio.NewReaderSize(resp.Body, 64*1024)
for {
line, err := reader.ReadBytes('\n')
if len(line) > 0 {
if !onLine(line) {
return nil
}
}
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,26 @@
package protocol
import (
"io"
"net/http"
"strings"
"testing"
)
func TestScanSSELinesHandlesLongSingleLine(t *testing.T) {
payload := strings.Repeat("x", 2*1024*1024+4096)
body := "data: {\"p\":\"response/content\",\"v\":\"" + payload + "\"}\n"
resp := &http.Response{Body: io.NopCloser(strings.NewReader(body))}
var got string
err := ScanSSELines(resp, func(line []byte) bool {
got = string(line)
return true
})
if err != nil {
t.Fatalf("ScanSSELines returned error: %v", err)
}
if !strings.Contains(got, payload) {
t.Fatalf("long SSE line was not preserved: got len=%d want payload len=%d", len(got), len(payload))
}
}

View File

@@ -516,32 +516,51 @@ function observeContinueState(state, chunk) {
if (topID > 0) {
state.responseMessageID = topID;
}
if (chunk.p === 'response/status') {
setContinueStatus(state, asString(chunk.v));
}
observeContinueDirectPatch(state, chunk.p, chunk.v);
if (chunk.p === 'response') {
observeContinueBatchPatches(state, 'response', chunk.v);
} else {
observeContinueBatchPatches(state, '', chunk.v);
}
const response = chunk.v && typeof chunk.v === 'object' ? chunk.v.response : null;
if (response && typeof response === 'object') {
const id = numberValue(response.message_id);
if (id > 0) {
state.responseMessageID = id;
}
setContinueStatus(state, asString(response.status));
if (response.auto_continue === true) {
state.lastStatus = 'AUTO_CONTINUE';
}
}
observeContinueResponseObject(state, response);
const messageResponse = chunk.message && typeof chunk.message === 'object' && chunk.message.response;
if (messageResponse && typeof messageResponse === 'object') {
const id = numberValue(messageResponse.message_id);
if (id > 0) {
state.responseMessageID = id;
}
setContinueStatus(state, asString(messageResponse.status));
observeContinueResponseObject(state, messageResponse);
}
function observeContinueDirectPatch(state, path, value) {
if (!state) {
return;
}
switch (asString(path).trim().replace(/^\/+|\/+$/g, '')) {
case 'response/status':
case 'status':
case 'response/quasi_status':
case 'quasi_status':
setContinueStatus(state, asString(value));
break;
case 'response/auto_continue':
case 'auto_continue':
if (value === true) {
state.lastStatus = 'AUTO_CONTINUE';
}
break;
default:
break;
}
}
function observeContinueResponseObject(state, response) {
if (!state || !response || typeof response !== 'object') {
return;
}
const id = numberValue(response.message_id);
if (id > 0) {
state.responseMessageID = id;
}
setContinueStatus(state, asString(response.status));
if (response.auto_continue === true) {
state.lastStatus = 'AUTO_CONTINUE';
}
}
@@ -569,6 +588,12 @@ function observeContinueBatchPatches(state, parentPath, raw) {
case 'quasi_status':
setContinueStatus(state, asString(patch.v));
break;
case 'response/auto_continue':
case 'auto_continue':
if (patch.v === true) {
state.lastStatus = 'AUTO_CONTINUE';
}
break;
default:
break;
}

View File

@@ -41,6 +41,15 @@ func TestCollectStreamTextOnly(t *testing.T) {
}
}
func TestCollectStreamHandlesLongSingleSSELine(t *testing.T) {
payload := strings.Repeat("x", 2*1024*1024+4096)
resp := makeHTTPResponse(makeLargeContentSSEBody(t, payload))
result := CollectStream(resp, false, true)
if result.Text != payload {
t.Fatalf("long SSE line payload mismatch: got len=%d want len=%d", len(result.Text), len(payload))
}
}
func TestCollectStreamThinkingAndText(t *testing.T) {
resp := makeHTTPResponse(
"data: {\"p\":\"response/thinking_content\",\"v\":\"Thinking...\"}\n" +

View File

@@ -9,8 +9,7 @@ import (
const (
parsedLineBufferSize = 128
scannerBufferSize = 64 * 1024
maxScannerLineSize = 2 * 1024 * 1024
lineReaderBufferSize = 64 * 1024
minFlushChars = 160
maxFlushWait = 80 * time.Millisecond
)
@@ -29,8 +28,8 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo
eof bool
}
lineCh := make(chan scanItem, 1)
stopScanner := make(chan struct{})
defer close(stopScanner)
stopReader := make(chan struct{})
defer close(stopReader)
go func() {
sendScanItem := func(item scanItem) bool {
select {
@@ -38,20 +37,28 @@ func StartParsedLinePump(ctx context.Context, body io.Reader, thinkingEnabled bo
return true
case <-ctx.Done():
return false
case <-stopScanner:
case <-stopReader:
return false
}
}
defer close(lineCh)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 0, scannerBufferSize), maxScannerLineSize)
for scanner.Scan() {
line := append([]byte{}, scanner.Bytes()...)
if !sendScanItem(scanItem{line: line}) {
reader := bufio.NewReaderSize(body, lineReaderBufferSize)
for {
line, err := reader.ReadBytes('\n')
if len(line) > 0 {
line = append([]byte{}, line...)
if !sendScanItem(scanItem{line: line}) {
return
}
}
if err != nil {
if err == io.EOF {
err = nil
}
_ = sendScanItem(scanItem{err: err, eof: true})
return
}
}
_ = sendScanItem(scanItem{err: scanner.Err(), eof: true})
}()
ticker := time.NewTicker(maxFlushWait)

View File

@@ -2,10 +2,23 @@ package sse
import (
"context"
"encoding/json"
"strings"
"testing"
)
func makeLargeContentSSEBody(t *testing.T, payload string) string {
t.Helper()
line, err := json.Marshal(map[string]any{
"p": "response/content",
"v": payload,
})
if err != nil {
t.Fatalf("marshal SSE line failed: %v", err)
}
return "data: " + string(line) + "\n" + "data: [DONE]\n"
}
func TestStartParsedLinePumpParsesAndStops(t *testing.T) {
body := strings.NewReader("data: {\"p\":\"response/content\",\"v\":\"hi\"}\n\ndata: [DONE]\n")
results, done := StartParsedLinePump(context.Background(), body, false, "text")
@@ -28,3 +41,28 @@ func TestStartParsedLinePumpParsesAndStops(t *testing.T) {
t.Fatalf("expected last line to stop stream, got parsed=%v stop=%v", last.Parsed, last.Stop)
}
}
func TestStartParsedLinePumpHandlesLongSingleSSELine(t *testing.T) {
payload := strings.Repeat("x", 2*1024*1024+4096)
results, done := StartParsedLinePump(context.Background(), strings.NewReader(makeLargeContentSSEBody(t, payload)), false, "text")
var got strings.Builder
var sawDone bool
for r := range results {
for _, p := range r.Parts {
got.WriteString(p.Text)
}
if r.Stop {
sawDone = true
}
}
if err := <-done; err != nil {
t.Fatalf("unexpected long-line read error: %v", err)
}
if got.String() != payload {
t.Fatalf("long SSE line payload mismatch: got len=%d want len=%d", got.Len(), len(payload))
}
if !sawDone {
t.Fatal("expected DONE after long SSE line")
}
}

View File

@@ -258,6 +258,28 @@ test('vercel stream exhausts DeepSeek continue before synthetic retry', async ()
assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false);
});
test('vercel stream continues direct quasi_status incomplete before final tool call', async () => {
const { frames, fetchURLs } = await runMockVercelStreamSequence([
[
'data: {"response_message_id":7,"p":"response/content","v":"<tool_calls><invoke name=\\"write_file\\"><parameter name=\\"content\\"><![CDATA[part-one"}\n\n',
'data: {"p":"response/quasi_status","v":"INCOMPLETE"}\n\n',
'data: [DONE]\n\n',
],
[
'data: {"response_message_id":8,"p":"response/content","v":"-part-two]]></parameter></invoke></tool_calls>"}\n\n',
'data: {"p":"response/status","v":"FINISHED"}\n\n',
'data: [DONE]\n\n',
],
], { tool_names: ['write_file'] });
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
const toolDelta = parsed.find((item) => item.choices?.[0]?.delta?.tool_calls);
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1);
assert.ok(toolDelta);
const args = JSON.parse(toolDelta.choices[0].delta.tool_calls[0].function.arguments);
assert.equal(args.content, 'part-one-part-two');
assert.equal(parsed.at(-1).choices[0].finish_reason, 'tool_calls');
});
test('vercel stream usage completion_tokens does not double-count visible output', async () => {