mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-04 00:15:28 +08:00
feat: add empty-output retry and Vercel auto-continue support
- Auto-retry Chat/Responses streams once when upstream output is empty but not content-filtered, reusing session/token/PoW and appending a regeneration suffix to the prompt - Wire DeepSeek continue API into Vercel streams for multi-round thinking output exhaustion - Defer empty-output errors in stream finalizers to enable synthetic retry; only surface failure when the retry budget is exhausted - Track content_filter stops to avoid retry on filtered outputs - Add comprehensive tests for stream/non-stream retry, Responses retry, and content_filter no-retry - Update prompt-compatibility.md documentation Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -101,6 +101,7 @@ DS2API 当前的核心思路,不是把客户端传来的 `messages`、`tools`
|
||||
- OpenAI Chat / Responses 原生走统一 OpenAI 标准化与 DeepSeek payload 组装;Claude / Gemini 会尽量复用 OpenAI prompt/tool 语义,其中 Gemini 直接复用 `promptcompat.BuildOpenAIPromptForAdapter`,Claude 消息接口在可代理场景会转换为 OpenAI chat 形态再执行。
|
||||
- 客户端传入的 thinking / reasoning 开关会被归一到下游 `thinking_enabled`。Gemini `generationConfig.thinkingConfig.thinkingBudget` 会翻译成同一套 thinking 开关;关闭时即使上游返回 `response/thinking_content`,兼容层也不会把它当作可见正文输出。若最终解析出的模型名带 `-nothinking` 后缀,则会无条件强制关闭 thinking,优先级高于请求体中的 `thinking` / `reasoning` / `reasoning_effort`。Claude surface 在流式请求且未显式声明 `thinking` 时,仍按 Anthropic 语义默认关闭;但在非流式代理场景,兼容层会内部开启一次下游 thinking,用于捕获“正文为空、工具调用落在 thinking 里”的情况,随后在回包前剥离用户不可见的 thinking block。
|
||||
- 对 OpenAI Chat / Responses 的非流式收尾,如果最终可见正文为空,兼容层会优先尝试把思维链中的独立 DSML / XML 工具块当作真实工具调用解析出来。流式链路也会在收尾阶段做同样的 fallback 检测,但不会因为思维链内容去中途拦截或改写流式输出;thinking / reasoning 增量仍按原样先发,只有在结束收尾时才可能补发最终工具调用结果。补发结果会作为本轮 assistant 的结构化 `tool_calls` / `function_call` 输出返回,而不是塞进 `content` 文本;如果客户端没有开启 thinking / reasoning,思维链只用于检测,不会作为 `reasoning_content` 或可见正文暴露。只有正文为空且思维链里也没有可执行工具调用时,才继续按空回复错误处理。
|
||||
- OpenAI Chat / Responses 的空回复错误处理之前会默认做一次内部补偿重试:第一次上游完整结束后,如果最终可见正文为空、没有解析到工具调用、也没有已经向客户端流式发出工具调用,并且终止原因不是 `content_filter`,兼容层会复用同一个 `chat_session_id`、账号、token、PoW 与工具策略,把原始 completion `prompt` 追加固定后缀 `Previous reply had no visible output. Please regenerate the visible final answer or tool call now.` 后重新提交一次。该重试不会重新标准化消息、不会新建 session、不会切换账号,也不会向流式客户端插入重试标记;第二次 thinking / reasoning 会按正常增量直接接到第一次之后,并继续使用 overlap trim 去重。若第二次仍为空,终端错误码仍保持现有 `upstream_empty_output`;若任一尝试触发空 `content_filter`,不做补偿重试并保持 `content_filter` 错误。
|
||||
|
||||
## 5. prompt 是怎么拼出来的
|
||||
|
||||
|
||||
@@ -128,7 +128,10 @@ func (s *chatStreamRuntime) resetStreamToolCallState() {
|
||||
s.streamToolNames = map[int]string{}
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
func (s *chatStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool {
|
||||
s.finalErrorStatus = 0
|
||||
s.finalErrorMessage = ""
|
||||
s.finalErrorCode = ""
|
||||
finalThinking := s.thinking.String()
|
||||
finalToolDetectionThinking := s.toolDetectionThinking.String()
|
||||
finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers)
|
||||
@@ -204,8 +207,14 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
}
|
||||
if len(detected.Calls) == 0 && !s.toolCallsEmitted && strings.TrimSpace(finalText) == "" {
|
||||
status, message, code := upstreamEmptyOutputDetail(finishReason == "content_filter", finalText, finalThinking)
|
||||
if deferEmptyOutput {
|
||||
s.finalErrorStatus = status
|
||||
s.finalErrorMessage = message
|
||||
s.finalErrorCode = code
|
||||
return false
|
||||
}
|
||||
s.sendFailedChunk(status, message, code)
|
||||
return
|
||||
return true
|
||||
}
|
||||
usage := openaifmt.BuildChatUsage(s.finalPrompt, finalThinking, finalText)
|
||||
s.finalFinishReason = finishReason
|
||||
@@ -218,6 +227,7 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
usage,
|
||||
))
|
||||
s.sendDone()
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
|
||||
271
internal/httpapi/openai/chat/empty_retry_runtime.go
Normal file
271
internal/httpapi/openai/chat/empty_retry_runtime.go
Normal file
@@ -0,0 +1,271 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
dsprotocol "ds2api/internal/deepseek/protocol"
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
)
|
||||
|
||||
type chatNonStreamResult struct {
|
||||
thinking string
|
||||
toolDetectionThinking string
|
||||
text string
|
||||
contentFilter bool
|
||||
detectedCalls int
|
||||
body map[string]any
|
||||
finishReason string
|
||||
}
|
||||
|
||||
func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) {
|
||||
attempts := 0
|
||||
currentResp := resp
|
||||
usagePrompt := finalPrompt
|
||||
accumulatedThinking := ""
|
||||
accumulatedToolDetectionThinking := ""
|
||||
for {
|
||||
result, ok := h.collectChatNonStreamAttempt(w, currentResp, completionID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking)
|
||||
accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking)
|
||||
result.thinking = accumulatedThinking
|
||||
result.toolDetectionThinking = accumulatedToolDetectionThinking
|
||||
detected := detectAssistantToolCalls(result.text, result.thinking, result.toolDetectionThinking, toolNames)
|
||||
result.detectedCalls = len(detected.Calls)
|
||||
result.body = openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, result.thinking, result.text, detected.Calls)
|
||||
result.finishReason = chatFinishReason(result.body)
|
||||
if !shouldRetryChatNonStream(result, attempts) {
|
||||
h.finishChatNonStreamResult(w, result, attempts, usagePrompt, historySession)
|
||||
return
|
||||
}
|
||||
|
||||
attempts++
|
||||
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts)
|
||||
retryPayload := clonePayloadWithEmptyOutputRetryPrompt(payload)
|
||||
nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, pow, 3)
|
||||
if err != nil {
|
||||
if historySession != nil {
|
||||
historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.thinking, result.text)
|
||||
}
|
||||
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.")
|
||||
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err)
|
||||
return
|
||||
}
|
||||
usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts)
|
||||
currentResp = nextResp
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http.Response, completionID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) (chatNonStreamResult, bool) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
writeOpenAIError(w, resp.StatusCode, string(body))
|
||||
return chatNonStreamResult{}, false
|
||||
}
|
||||
result := sse.CollectStream(resp, thinkingEnabled, true)
|
||||
stripReferenceMarkers := h.compatStripReferenceMarkers()
|
||||
finalThinking := cleanVisibleOutput(result.Thinking, stripReferenceMarkers)
|
||||
finalToolDetectionThinking := cleanVisibleOutput(result.ToolDetectionThinking, stripReferenceMarkers)
|
||||
finalText := cleanVisibleOutput(result.Text, stripReferenceMarkers)
|
||||
if searchEnabled {
|
||||
finalText = replaceCitationMarkersWithLinks(finalText, result.CitationLinks)
|
||||
}
|
||||
detected := detectAssistantToolCalls(finalText, finalThinking, finalToolDetectionThinking, toolNames)
|
||||
respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, finalThinking, finalText, detected.Calls)
|
||||
return chatNonStreamResult{
|
||||
thinking: finalThinking,
|
||||
toolDetectionThinking: finalToolDetectionThinking,
|
||||
text: finalText,
|
||||
contentFilter: result.ContentFilter,
|
||||
detectedCalls: len(detected.Calls),
|
||||
body: respBody,
|
||||
finishReason: chatFinishReason(respBody),
|
||||
}, true
|
||||
}
|
||||
|
||||
func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNonStreamResult, attempts int, usagePrompt string, historySession *chatHistorySession) {
|
||||
if result.detectedCalls == 0 && shouldWriteUpstreamEmptyOutputError(result.text) {
|
||||
status, message, code := upstreamEmptyOutputDetail(result.contentFilter, result.text, result.thinking)
|
||||
if historySession != nil {
|
||||
historySession.error(status, message, code, result.thinking, result.text)
|
||||
}
|
||||
writeUpstreamEmptyOutputError(w, result.text, result.thinking, result.contentFilter)
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter)
|
||||
return
|
||||
}
|
||||
if historySession != nil {
|
||||
historySession.success(http.StatusOK, result.thinking, result.text, result.finishReason, openaifmt.BuildChatUsage(usagePrompt, result.thinking, result.text))
|
||||
}
|
||||
writeJSON(w, http.StatusOK, result.body)
|
||||
source := "first_attempt"
|
||||
if attempts > 0 {
|
||||
source = "synthetic_retry"
|
||||
}
|
||||
config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", source)
|
||||
}
|
||||
|
||||
func chatFinishReason(respBody map[string]any) string {
|
||||
if choices, ok := respBody["choices"].([]map[string]any); ok && len(choices) > 0 {
|
||||
if fr, _ := choices[0]["finish_reason"].(string); strings.TrimSpace(fr) != "" {
|
||||
return fr
|
||||
}
|
||||
}
|
||||
return "stop"
|
||||
}
|
||||
|
||||
func shouldRetryChatNonStream(result chatNonStreamResult, attempts int) bool {
|
||||
return emptyOutputRetryEnabled() &&
|
||||
attempts < emptyOutputRetryMaxAttempts() &&
|
||||
!result.contentFilter &&
|
||||
result.detectedCalls == 0 &&
|
||||
strings.TrimSpace(result.text) == ""
|
||||
}
|
||||
|
||||
func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) {
|
||||
streamRuntime, initialType, ok := h.prepareChatStreamRuntime(w, resp, completionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames, historySession)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
attempts := 0
|
||||
currentResp := resp
|
||||
for {
|
||||
terminalWritten, retryable := h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, attempts < emptyOutputRetryMaxAttempts())
|
||||
if terminalWritten {
|
||||
logChatStreamTerminal(streamRuntime, attempts)
|
||||
return
|
||||
}
|
||||
if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() {
|
||||
streamRuntime.finalize("stop", false)
|
||||
recordChatStreamHistory(streamRuntime, historySession)
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none")
|
||||
return
|
||||
}
|
||||
attempts++
|
||||
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts)
|
||||
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
|
||||
if err != nil {
|
||||
failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error")
|
||||
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err)
|
||||
return
|
||||
}
|
||||
if nextResp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = nextResp.Body.Close() }()
|
||||
body, _ := io.ReadAll(nextResp.Body)
|
||||
failChatStreamRetry(streamRuntime, historySession, nextResp.StatusCode, string(body), "error")
|
||||
return
|
||||
}
|
||||
streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts)
|
||||
currentResp = nextResp
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) prepareChatStreamRuntime(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, historySession *chatHistorySession) (*chatStreamRuntime, string, bool) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if historySession != nil {
|
||||
historySession.error(resp.StatusCode, string(body), "error", "", "")
|
||||
}
|
||||
writeOpenAIError(w, resp.StatusCode, string(body))
|
||||
return nil, "", false
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
rc := http.NewResponseController(w)
|
||||
_, canFlush := w.(http.Flusher)
|
||||
if !canFlush {
|
||||
config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered")
|
||||
}
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
initialType = "thinking"
|
||||
}
|
||||
streamRuntime := newChatStreamRuntime(
|
||||
w, rc, canFlush, completionID, time.Now().Unix(), model, finalPrompt,
|
||||
thinkingEnabled, searchEnabled, h.compatStripReferenceMarkers(), toolNames,
|
||||
len(toolNames) > 0, h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(),
|
||||
)
|
||||
return streamRuntime, initialType, true
|
||||
}
|
||||
|
||||
func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *chatStreamRuntime, initialType string, thinkingEnabled bool, historySession *chatHistorySession, allowDeferEmpty bool) (bool, bool) {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
finalReason := "stop"
|
||||
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
||||
Context: r.Context(),
|
||||
Body: resp.Body,
|
||||
ThinkingEnabled: thinkingEnabled,
|
||||
InitialType: initialType,
|
||||
KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second,
|
||||
IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second,
|
||||
MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnKeepAlive: streamRuntime.sendKeepAlive,
|
||||
OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
decision := streamRuntime.onParsed(parsed)
|
||||
if historySession != nil {
|
||||
historySession.progress(streamRuntime.thinking.String(), streamRuntime.text.String())
|
||||
}
|
||||
return decision
|
||||
},
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
finalReason = "content_filter"
|
||||
}
|
||||
},
|
||||
OnContextDone: func() {
|
||||
if historySession != nil {
|
||||
historySession.stopped(streamRuntime.thinking.String(), streamRuntime.text.String(), string(streamengine.StopReasonContextCancelled))
|
||||
}
|
||||
},
|
||||
})
|
||||
terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter")
|
||||
if terminalWritten {
|
||||
recordChatStreamHistory(streamRuntime, historySession)
|
||||
return true, false
|
||||
}
|
||||
return false, true
|
||||
}
|
||||
|
||||
func recordChatStreamHistory(streamRuntime *chatStreamRuntime, historySession *chatHistorySession) {
|
||||
if historySession == nil {
|
||||
return
|
||||
}
|
||||
if streamRuntime.finalErrorMessage != "" {
|
||||
historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.thinking.String(), streamRuntime.text.String())
|
||||
return
|
||||
}
|
||||
historySession.success(http.StatusOK, streamRuntime.finalThinking, streamRuntime.finalText, streamRuntime.finalFinishReason, streamRuntime.finalUsage)
|
||||
}
|
||||
|
||||
func failChatStreamRetry(streamRuntime *chatStreamRuntime, historySession *chatHistorySession, status int, message, code string) {
|
||||
streamRuntime.sendFailedChunk(status, message, code)
|
||||
if historySession != nil {
|
||||
historySession.error(status, message, code, streamRuntime.thinking.String(), streamRuntime.text.String())
|
||||
}
|
||||
}
|
||||
|
||||
func logChatStreamTerminal(streamRuntime *chatStreamRuntime, attempts int) {
|
||||
source := "first_attempt"
|
||||
if attempts > 0 {
|
||||
source = "synthetic_retry"
|
||||
}
|
||||
if streamRuntime.finalErrorMessage != "" {
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
|
||||
return
|
||||
}
|
||||
config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", source)
|
||||
}
|
||||
@@ -123,6 +123,22 @@ func writeUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string,
|
||||
return shared.WriteUpstreamEmptyOutputError(w, text, thinking, contentFilter)
|
||||
}
|
||||
|
||||
func emptyOutputRetryEnabled() bool {
|
||||
return shared.EmptyOutputRetryEnabled()
|
||||
}
|
||||
|
||||
func emptyOutputRetryMaxAttempts() int {
|
||||
return shared.EmptyOutputRetryMaxAttempts()
|
||||
}
|
||||
|
||||
func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
|
||||
return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload)
|
||||
}
|
||||
|
||||
func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string {
|
||||
return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts)
|
||||
}
|
||||
|
||||
func formatIncrementalStreamToolCallDeltas(deltas []toolstream.ToolCallDelta, ids map[int]string) []map[string]any {
|
||||
return shared.FormatIncrementalStreamToolCallDeltas(deltas, ids)
|
||||
}
|
||||
|
||||
@@ -105,10 +105,10 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
if stdReq.Stream {
|
||||
h.handleStream(w, r, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
h.handleStreamWithRetry(w, r, a, resp, payload, pow, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
return
|
||||
}
|
||||
h.handleNonStream(w, resp, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
h.handleNonStreamWithRetry(w, r.Context(), a, resp, payload, pow, sessionID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, historySession)
|
||||
}
|
||||
|
||||
func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) {
|
||||
@@ -251,9 +251,9 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
},
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
streamRuntime.finalize("content_filter")
|
||||
streamRuntime.finalize("content_filter", false)
|
||||
} else {
|
||||
streamRuntime.finalize("stop")
|
||||
streamRuntime.finalize("stop", false)
|
||||
}
|
||||
if historySession == nil {
|
||||
return
|
||||
|
||||
221
internal/httpapi/openai/responses/empty_retry_runtime.go
Normal file
221
internal/httpapi/openai/responses/empty_retry_runtime.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package responses
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
dsprotocol "ds2api/internal/deepseek/protocol"
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/promptcompat"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/toolcall"
|
||||
)
|
||||
|
||||
type responsesNonStreamResult struct {
|
||||
thinking string
|
||||
toolDetectionThinking string
|
||||
text string
|
||||
contentFilter bool
|
||||
parsed toolcall.ToolCallParseResult
|
||||
body map[string]any
|
||||
}
|
||||
|
||||
func (h *Handler) handleResponsesNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
||||
attempts := 0
|
||||
currentResp := resp
|
||||
usagePrompt := finalPrompt
|
||||
accumulatedThinking := ""
|
||||
accumulatedToolDetectionThinking := ""
|
||||
for {
|
||||
result, ok := h.collectResponsesNonStreamAttempt(w, currentResp, responseID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking)
|
||||
accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking)
|
||||
result.thinking = accumulatedThinking
|
||||
result.toolDetectionThinking = accumulatedToolDetectionThinking
|
||||
result.parsed = detectAssistantToolCalls(result.text, result.thinking, result.toolDetectionThinking, toolNames)
|
||||
result.body = openaifmt.BuildResponseObjectWithToolCalls(responseID, model, usagePrompt, result.thinking, result.text, result.parsed.Calls)
|
||||
|
||||
if !shouldRetryResponsesNonStream(result, attempts) {
|
||||
h.finishResponsesNonStreamResult(w, result, attempts, owner, responseID, toolChoice, traceID)
|
||||
return
|
||||
}
|
||||
|
||||
attempts++
|
||||
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", false, "retry_attempt", attempts)
|
||||
nextResp, err := h.DS.CallCompletion(ctx, a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
|
||||
if err != nil {
|
||||
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.")
|
||||
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", false, "retry_attempt", attempts, "error", err)
|
||||
return
|
||||
}
|
||||
usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts)
|
||||
currentResp = nextResp
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) collectResponsesNonStreamAttempt(w http.ResponseWriter, resp *http.Response, responseID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) (responsesNonStreamResult, bool) {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
return responsesNonStreamResult{}, false
|
||||
}
|
||||
result := sse.CollectStream(resp, thinkingEnabled, false)
|
||||
stripReferenceMarkers := h.compatStripReferenceMarkers()
|
||||
sanitizedThinking := cleanVisibleOutput(result.Thinking, stripReferenceMarkers)
|
||||
toolDetectionThinking := cleanVisibleOutput(result.ToolDetectionThinking, stripReferenceMarkers)
|
||||
sanitizedText := cleanVisibleOutput(result.Text, stripReferenceMarkers)
|
||||
if searchEnabled {
|
||||
sanitizedText = replaceCitationMarkersWithLinks(sanitizedText, result.CitationLinks)
|
||||
}
|
||||
textParsed := detectAssistantToolCalls(sanitizedText, sanitizedThinking, toolDetectionThinking, toolNames)
|
||||
responseObj := openaifmt.BuildResponseObjectWithToolCalls(responseID, model, usagePrompt, sanitizedThinking, sanitizedText, textParsed.Calls)
|
||||
return responsesNonStreamResult{
|
||||
thinking: sanitizedThinking,
|
||||
toolDetectionThinking: toolDetectionThinking,
|
||||
text: sanitizedText,
|
||||
contentFilter: result.ContentFilter,
|
||||
parsed: textParsed,
|
||||
body: responseObj,
|
||||
}, true
|
||||
}
|
||||
|
||||
func (h *Handler) finishResponsesNonStreamResult(w http.ResponseWriter, result responsesNonStreamResult, attempts int, owner, responseID string, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
||||
if len(result.parsed.Calls) == 0 && writeUpstreamEmptyOutputError(w, result.text, result.thinking, result.contentFilter) {
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter)
|
||||
return
|
||||
}
|
||||
logResponsesToolPolicyRejection(traceID, toolChoice, result.parsed, "text")
|
||||
if toolChoice.IsRequired() && len(result.parsed.Calls) == 0 {
|
||||
writeOpenAIErrorWithCode(w, http.StatusUnprocessableEntity, "tool_choice requires at least one valid tool call.", "tool_choice_violation")
|
||||
return
|
||||
}
|
||||
h.getResponseStore().put(owner, responseID, result.body)
|
||||
writeJSON(w, http.StatusOK, result.body)
|
||||
source := "first_attempt"
|
||||
if attempts > 0 {
|
||||
source = "synthetic_retry"
|
||||
}
|
||||
config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", false, "retry_attempts", attempts, "success_source", source)
|
||||
}
|
||||
|
||||
func shouldRetryResponsesNonStream(result responsesNonStreamResult, attempts int) bool {
|
||||
return emptyOutputRetryEnabled() &&
|
||||
attempts < emptyOutputRetryMaxAttempts() &&
|
||||
!result.contentFilter &&
|
||||
len(result.parsed.Calls) == 0 &&
|
||||
strings.TrimSpace(result.text) == ""
|
||||
}
|
||||
|
||||
func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
||||
streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames, toolChoice, traceID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
attempts := 0
|
||||
currentResp := resp
|
||||
for {
|
||||
terminalWritten, retryable := h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, attempts < emptyOutputRetryMaxAttempts())
|
||||
if terminalWritten {
|
||||
logResponsesStreamTerminal(streamRuntime, attempts)
|
||||
return
|
||||
}
|
||||
if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() {
|
||||
streamRuntime.finalize("stop", false)
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
|
||||
return
|
||||
}
|
||||
attempts++
|
||||
config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts)
|
||||
nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadWithEmptyOutputRetryPrompt(payload), pow, 3)
|
||||
if err != nil {
|
||||
streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error")
|
||||
config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err)
|
||||
return
|
||||
}
|
||||
if nextResp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = nextResp.Body.Close() }()
|
||||
body, _ := io.ReadAll(nextResp.Body)
|
||||
streamRuntime.failResponse(nextResp.StatusCode, strings.TrimSpace(string(body)), "error")
|
||||
return
|
||||
}
|
||||
streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts)
|
||||
currentResp = nextResp
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) (*responsesStreamRuntime, string, bool) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body)))
|
||||
return nil, "", false
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
rc := http.NewResponseController(w)
|
||||
_, canFlush := w.(http.Flusher)
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
initialType = "thinking"
|
||||
}
|
||||
streamRuntime := newResponsesStreamRuntime(
|
||||
w, rc, canFlush, responseID, model, finalPrompt, thinkingEnabled, searchEnabled,
|
||||
h.compatStripReferenceMarkers(), toolNames, len(toolNames) > 0,
|
||||
h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(),
|
||||
toolChoice, traceID, func(obj map[string]any) {
|
||||
h.getResponseStore().put(owner, responseID, obj)
|
||||
},
|
||||
)
|
||||
streamRuntime.sendCreated()
|
||||
return streamRuntime, initialType, true
|
||||
}
|
||||
|
||||
func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *responsesStreamRuntime, initialType string, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) {
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
finalReason := "stop"
|
||||
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
||||
Context: r.Context(),
|
||||
Body: resp.Body,
|
||||
ThinkingEnabled: thinkingEnabled,
|
||||
InitialType: initialType,
|
||||
KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second,
|
||||
IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second,
|
||||
MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
finalReason = "content_filter"
|
||||
}
|
||||
},
|
||||
})
|
||||
terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter")
|
||||
if terminalWritten {
|
||||
return true, false
|
||||
}
|
||||
return false, true
|
||||
}
|
||||
|
||||
func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts int) {
|
||||
source := "first_attempt"
|
||||
if attempts > 0 {
|
||||
source = "synthetic_retry"
|
||||
}
|
||||
if streamRuntime.failed {
|
||||
config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode)
|
||||
return
|
||||
}
|
||||
config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", source)
|
||||
}
|
||||
@@ -113,6 +113,22 @@ func writeUpstreamEmptyOutputError(w http.ResponseWriter, text, thinking string,
|
||||
return shared.WriteUpstreamEmptyOutputError(w, text, thinking, contentFilter)
|
||||
}
|
||||
|
||||
func emptyOutputRetryEnabled() bool {
|
||||
return shared.EmptyOutputRetryEnabled()
|
||||
}
|
||||
|
||||
func emptyOutputRetryMaxAttempts() int {
|
||||
return shared.EmptyOutputRetryMaxAttempts()
|
||||
}
|
||||
|
||||
func clonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
|
||||
return shared.ClonePayloadWithEmptyOutputRetryPrompt(payload)
|
||||
}
|
||||
|
||||
func usagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string {
|
||||
return shared.UsagePromptWithEmptyOutputRetry(originalPrompt, retryAttempts)
|
||||
}
|
||||
|
||||
func filterIncrementalToolCallDeltasByAllowed(deltas []toolstream.ToolCallDelta, seenNames map[int]string) []toolstream.ToolCallDelta {
|
||||
return shared.FilterIncrementalToolCallDeltasByAllowed(deltas, seenNames)
|
||||
}
|
||||
|
||||
@@ -115,10 +115,10 @@ func (h *Handler) Responses(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
responseID := "resp_" + strings.ReplaceAll(uuid.NewString(), "-", "")
|
||||
if stdReq.Stream {
|
||||
h.handleResponsesStream(w, r, resp, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID)
|
||||
h.handleResponsesStreamWithRetry(w, r, a, resp, payload, pow, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID)
|
||||
return
|
||||
}
|
||||
h.handleResponsesNonStream(w, resp, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID)
|
||||
h.handleResponsesNonStreamWithRetry(w, r.Context(), a, resp, payload, pow, owner, responseID, stdReq.ResponseModel, stdReq.FinalPrompt, stdReq.Thinking, stdReq.Search, stdReq.ToolNames, stdReq.ToolChoice, traceID)
|
||||
}
|
||||
|
||||
func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolChoice promptcompat.ToolChoicePolicy, traceID string) {
|
||||
@@ -206,8 +206,12 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnFinalize: func(_ streamengine.StopReason, _ error) {
|
||||
streamRuntime.finalize()
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
streamRuntime.finalize("content_filter", false)
|
||||
return
|
||||
}
|
||||
streamRuntime.finalize("stop", false)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -53,6 +53,9 @@ type responsesStreamRuntime struct {
|
||||
messagePartAdded bool
|
||||
sequence int
|
||||
failed bool
|
||||
finalErrorStatus int
|
||||
finalErrorMessage string
|
||||
finalErrorCode string
|
||||
|
||||
persistResponse func(obj map[string]any)
|
||||
}
|
||||
@@ -103,6 +106,9 @@ func newResponsesStreamRuntime(
|
||||
|
||||
func (s *responsesStreamRuntime) failResponse(status int, message, code string) {
|
||||
s.failed = true
|
||||
s.finalErrorStatus = status
|
||||
s.finalErrorMessage = message
|
||||
s.finalErrorCode = code
|
||||
failedResp := map[string]any{
|
||||
"id": s.responseID,
|
||||
"type": "response",
|
||||
@@ -126,7 +132,11 @@ func (s *responsesStreamRuntime) failResponse(status int, message, code string)
|
||||
s.sendDone()
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) finalize() {
|
||||
func (s *responsesStreamRuntime) finalize(finishReason string, deferEmptyOutput bool) bool {
|
||||
s.failed = false
|
||||
s.finalErrorStatus = 0
|
||||
s.finalErrorMessage = ""
|
||||
s.finalErrorCode = ""
|
||||
finalThinking := s.thinking.String()
|
||||
finalToolDetectionThinking := s.toolDetectionThinking.String()
|
||||
finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers)
|
||||
@@ -150,12 +160,18 @@ func (s *responsesStreamRuntime) finalize() {
|
||||
|
||||
if s.toolChoice.IsRequired() && len(detected) == 0 {
|
||||
s.failResponse(http.StatusUnprocessableEntity, "tool_choice requires at least one valid tool call.", "tool_choice_violation")
|
||||
return
|
||||
return true
|
||||
}
|
||||
if len(detected) == 0 && strings.TrimSpace(finalText) == "" {
|
||||
status, message, code := upstreamEmptyOutputDetail(false, finalText, finalThinking)
|
||||
status, message, code := upstreamEmptyOutputDetail(finishReason == "content_filter", finalText, finalThinking)
|
||||
if deferEmptyOutput {
|
||||
s.finalErrorStatus = status
|
||||
s.finalErrorMessage = message
|
||||
s.finalErrorCode = code
|
||||
return false
|
||||
}
|
||||
s.failResponse(status, message, code)
|
||||
return
|
||||
return true
|
||||
}
|
||||
s.closeIncompleteFunctionItems()
|
||||
|
||||
@@ -165,6 +181,7 @@ func (s *responsesStreamRuntime) finalize() {
|
||||
}
|
||||
s.sendEvent("response.completed", openaifmt.BuildResponsesCompletedPayload(obj))
|
||||
s.sendDone()
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) logToolPolicyRejections(textParsed toolcall.ToolCallParseResult) {
|
||||
@@ -188,7 +205,10 @@ func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Pa
|
||||
if !parsed.Parsed {
|
||||
return streamengine.ParsedDecision{}
|
||||
}
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop {
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" {
|
||||
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")}
|
||||
}
|
||||
if parsed.Stop {
|
||||
return streamengine.ParsedDecision{Stop: true}
|
||||
}
|
||||
|
||||
|
||||
45
internal/httpapi/openai/shared/empty_retry.go
Normal file
45
internal/httpapi/openai/shared/empty_retry.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package shared
|
||||
|
||||
import "strings"
|
||||
|
||||
const EmptyOutputRetrySuffix = "Previous reply had no visible output. Please regenerate the visible final answer or tool call now."
|
||||
|
||||
func EmptyOutputRetryEnabled() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func EmptyOutputRetryMaxAttempts() int {
|
||||
return 1
|
||||
}
|
||||
|
||||
func ClonePayloadWithEmptyOutputRetryPrompt(payload map[string]any) map[string]any {
|
||||
clone := make(map[string]any, len(payload))
|
||||
for k, v := range payload {
|
||||
clone[k] = v
|
||||
}
|
||||
original, _ := payload["prompt"].(string)
|
||||
clone["prompt"] = AppendEmptyOutputRetrySuffix(original)
|
||||
return clone
|
||||
}
|
||||
|
||||
func AppendEmptyOutputRetrySuffix(prompt string) string {
|
||||
prompt = strings.TrimRight(prompt, "\r\n\t ")
|
||||
if prompt == "" {
|
||||
return EmptyOutputRetrySuffix
|
||||
}
|
||||
return prompt + "\n\n" + EmptyOutputRetrySuffix
|
||||
}
|
||||
|
||||
func UsagePromptWithEmptyOutputRetry(originalPrompt string, retryAttempts int) string {
|
||||
if retryAttempts <= 0 {
|
||||
return originalPrompt
|
||||
}
|
||||
parts := make([]string, 0, retryAttempts+1)
|
||||
parts = append(parts, originalPrompt)
|
||||
next := originalPrompt
|
||||
for i := 0; i < retryAttempts; i++ {
|
||||
next = AppendEmptyOutputRetrySuffix(next)
|
||||
parts = append(parts, next)
|
||||
}
|
||||
return strings.Join(parts, "\n")
|
||||
}
|
||||
@@ -66,6 +66,44 @@ func (m streamStatusDSStub) DeleteAllSessionsForToken(_ context.Context, _ strin
|
||||
return nil
|
||||
}
|
||||
|
||||
type streamStatusDSSeqStub struct {
|
||||
resps []*http.Response
|
||||
payloads []map[string]any
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) CreateSession(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
|
||||
return "session-id", nil
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) GetPow(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
|
||||
return "pow", nil
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) UploadFile(_ context.Context, _ *auth.RequestAuth, _ dsclient.UploadFileRequest, _ int) (*dsclient.UploadFileResult, error) {
|
||||
return &dsclient.UploadFileResult{ID: "file-id", Filename: "file.txt", Bytes: 1, Status: "uploaded"}, nil
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) CallCompletion(_ context.Context, _ *auth.RequestAuth, payload map[string]any, _ string, _ int) (*http.Response, error) {
|
||||
clone := make(map[string]any, len(payload))
|
||||
for k, v := range payload {
|
||||
clone[k] = v
|
||||
}
|
||||
m.payloads = append(m.payloads, clone)
|
||||
idx := len(m.payloads) - 1
|
||||
if idx >= len(m.resps) {
|
||||
idx = len(m.resps) - 1
|
||||
}
|
||||
return m.resps[idx], nil
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) DeleteSessionForToken(_ context.Context, _ string, _ string) (*dsclient.DeleteSessionResult, error) {
|
||||
return &dsclient.DeleteSessionResult{Success: true}, nil
|
||||
}
|
||||
|
||||
func (m *streamStatusDSSeqStub) DeleteAllSessionsForToken(_ context.Context, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeOpenAISSEHTTPResponse(lines ...string) *http.Response {
|
||||
body := strings.Join(lines, "\n")
|
||||
if !strings.HasSuffix(body, "\n") {
|
||||
@@ -78,6 +116,12 @@ func makeOpenAISSEHTTPResponse(lines ...string) *http.Response {
|
||||
}
|
||||
}
|
||||
|
||||
func newOpenAITestRouter(h *openAITestSurface) http.Handler {
|
||||
r := chi.NewRouter()
|
||||
registerOpenAITestRoutes(r, h)
|
||||
return r
|
||||
}
|
||||
|
||||
func captureStatusMiddleware(statuses *[]int) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -239,6 +283,125 @@ func TestChatCompletionsStreamEmitsFailureFrameWhenUpstreamOutputEmpty(t *testin
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsStreamRetriesEmptyOutputOnSameSession(t *testing.T) {
|
||||
ds := &streamStatusDSSeqStub{resps: []*http.Response{
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
|
||||
}}
|
||||
h := &openAITestSurface{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: ds,
|
||||
}
|
||||
reqBody := `{"model":"deepseek-v4-pro","messages":[{"role":"user","content":"hi"}],"stream":true}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
newOpenAITestRouter(h).ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if len(ds.payloads) != 2 {
|
||||
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
|
||||
}
|
||||
if ds.payloads[0]["chat_session_id"] != ds.payloads[1]["chat_session_id"] {
|
||||
t.Fatalf("expected retry to reuse session, payloads=%#v", ds.payloads)
|
||||
}
|
||||
retryPrompt := asString(ds.payloads[1]["prompt"])
|
||||
if !strings.Contains(retryPrompt, "Previous reply had no visible output. Please regenerate the visible final answer or tool call now.") {
|
||||
t.Fatalf("expected retry suffix in prompt, got %q", retryPrompt)
|
||||
}
|
||||
|
||||
frames, done := parseSSEDataFrames(t, rec.Body.String())
|
||||
if !done {
|
||||
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
|
||||
}
|
||||
doneCount := strings.Count(rec.Body.String(), "data: [DONE]")
|
||||
if doneCount != 1 {
|
||||
t.Fatalf("expected one [DONE], got %d body=%s", doneCount, rec.Body.String())
|
||||
}
|
||||
if len(frames) != 3 {
|
||||
t.Fatalf("expected reasoning, content, finish frames, got %#v body=%s", frames, rec.Body.String())
|
||||
}
|
||||
id := asString(frames[0]["id"])
|
||||
for _, frame := range frames[1:] {
|
||||
if asString(frame["id"]) != id {
|
||||
t.Fatalf("expected same completion id across retry stream, frames=%#v", frames)
|
||||
}
|
||||
}
|
||||
choices, _ := frames[1]["choices"].([]any)
|
||||
choice, _ := choices[0].(map[string]any)
|
||||
delta, _ := choice["delta"].(map[string]any)
|
||||
if asString(delta["content"]) != "visible" {
|
||||
t.Fatalf("expected retry content delta, got %#v body=%s", delta, rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
|
||||
ds := &streamStatusDSSeqStub{resps: []*http.Response{
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
|
||||
}}
|
||||
h := &openAITestSurface{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: ds,
|
||||
}
|
||||
reqBody := `{"model":"deepseek-v4-pro","messages":[{"role":"user","content":"hi"}],"stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
newOpenAITestRouter(h).ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 after retry, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if len(ds.payloads) != 2 {
|
||||
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
|
||||
}
|
||||
var out map[string]any
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
|
||||
t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String())
|
||||
}
|
||||
choices, _ := out["choices"].([]any)
|
||||
choice, _ := choices[0].(map[string]any)
|
||||
message, _ := choice["message"].(map[string]any)
|
||||
if asString(message["content"]) != "visible" {
|
||||
t.Fatalf("expected retry visible content, got %#v", message)
|
||||
}
|
||||
if !strings.Contains(asString(message["reasoning_content"]), "plan") {
|
||||
t.Fatalf("expected first-attempt reasoning to be preserved, got %#v", message)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatCompletionsContentFilterDoesNotRetry(t *testing.T) {
|
||||
ds := &streamStatusDSSeqStub{resps: []*http.Response{
|
||||
makeOpenAISSEHTTPResponse(`data: {"code":"content_filter"}`),
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
|
||||
}}
|
||||
h := &openAITestSurface{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: ds,
|
||||
}
|
||||
reqBody := `{"model":"deepseek-v4-flash","messages":[{"role":"user","content":"hi"}],"stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
newOpenAITestRouter(h).ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusBadRequest {
|
||||
t.Fatalf("expected content_filter 400, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if len(ds.payloads) != 1 {
|
||||
t.Fatalf("expected no retry on content_filter, got %d calls", len(ds.payloads))
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) {
|
||||
statuses := make([]int, 0, 1)
|
||||
h := &openAITestSurface{
|
||||
@@ -287,6 +450,86 @@ func TestResponsesStreamUsageIgnoresBatchAccumulatedTokenUsage(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponsesStreamRetriesThinkingOnlyOutput(t *testing.T) {
|
||||
ds := &streamStatusDSSeqStub{resps: []*http.Response{
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
|
||||
}}
|
||||
h := &openAITestSurface{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: ds,
|
||||
}
|
||||
reqBody := `{"model":"deepseek-v4-pro","input":"hi","stream":true}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
newOpenAITestRouter(h).ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if len(ds.payloads) != 2 {
|
||||
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
|
||||
}
|
||||
body := rec.Body.String()
|
||||
if strings.Contains(body, "response.failed") {
|
||||
t.Fatalf("did not expect premature response.failed, body=%s", body)
|
||||
}
|
||||
if !strings.Contains(body, "response.reasoning.delta") || !strings.Contains(body, "response.output_text.delta") || !strings.Contains(body, "response.completed") {
|
||||
t.Fatalf("expected reasoning, text delta, and completed events, body=%s", body)
|
||||
}
|
||||
if strings.Count(body, "data: [DONE]") != 1 {
|
||||
t.Fatalf("expected one [DONE], body=%s", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponsesNonStreamRetriesThinkingOnlyOutput(t *testing.T) {
|
||||
ds := &streamStatusDSSeqStub{resps: []*http.Response{
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/thinking_content","v":"plan"}`, "data: [DONE]"),
|
||||
makeOpenAISSEHTTPResponse(`data: {"p":"response/content","v":"visible"}`, "data: [DONE]"),
|
||||
}}
|
||||
h := &openAITestSurface{
|
||||
Store: mockOpenAIConfig{wideInput: true},
|
||||
Auth: streamStatusAuthStub{},
|
||||
DS: ds,
|
||||
}
|
||||
reqBody := `{"model":"deepseek-v4-pro","input":"hi","stream":false}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(reqBody))
|
||||
req.Header.Set("Authorization", "Bearer direct-token")
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
rec := httptest.NewRecorder()
|
||||
newOpenAITestRouter(h).ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 after retry, got %d body=%s", rec.Code, rec.Body.String())
|
||||
}
|
||||
if len(ds.payloads) != 2 {
|
||||
t.Fatalf("expected one synthetic retry call, got %d", len(ds.payloads))
|
||||
}
|
||||
var out map[string]any
|
||||
if err := json.Unmarshal(rec.Body.Bytes(), &out); err != nil {
|
||||
t.Fatalf("decode response failed: %v body=%s", err, rec.Body.String())
|
||||
}
|
||||
if asString(out["output_text"]) != "visible" {
|
||||
t.Fatalf("expected retry visible output_text, got %#v", out["output_text"])
|
||||
}
|
||||
output, _ := out["output"].([]any)
|
||||
if len(output) == 0 {
|
||||
t.Fatalf("expected output items, got %#v", out)
|
||||
}
|
||||
item, _ := output[0].(map[string]any)
|
||||
content, _ := item["content"].([]any)
|
||||
if len(content) == 0 {
|
||||
t.Fatalf("expected content entries, got %#v", item)
|
||||
}
|
||||
reasoning, _ := content[0].(map[string]any)
|
||||
if asString(reasoning["type"]) != "reasoning" || !strings.Contains(asString(reasoning["text"]), "plan") {
|
||||
t.Fatalf("expected preserved reasoning entry, got %#v", content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponsesNonStreamUsageIgnoresPromptAndOutputTokenUsage(t *testing.T) {
|
||||
statuses := make([]int, 0, 1)
|
||||
h := &openAITestSurface{
|
||||
|
||||
@@ -33,6 +33,10 @@ const {
|
||||
} = require('./dedupe');
|
||||
|
||||
const DEEPSEEK_COMPLETION_URL = 'https://chat.deepseek.com/api/v0/chat/completion';
|
||||
const DEEPSEEK_CONTINUE_URL = 'https://chat.deepseek.com/api/v0/chat/continue';
|
||||
const EMPTY_OUTPUT_RETRY_SUFFIX = 'Previous reply had no visible output. Please regenerate the visible final answer or tool call now.';
|
||||
const EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS = 1;
|
||||
const AUTO_CONTINUE_MAX_ROUNDS = 8;
|
||||
|
||||
async function handleVercelStream(req, res, rawBody, payload) {
|
||||
const prep = await fetchStreamPrepare(req, rawBody);
|
||||
@@ -84,23 +88,35 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
res.on('close', onResClose);
|
||||
|
||||
try {
|
||||
let completionRes;
|
||||
try {
|
||||
completionRes = await fetch(DEEPSEEK_COMPLETION_URL, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
...BASE_HEADERS,
|
||||
authorization: `Bearer ${deepseekToken}`,
|
||||
'x-ds-pow-response': powHeader,
|
||||
},
|
||||
body: JSON.stringify(completionPayload),
|
||||
signal: upstreamController.signal,
|
||||
});
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
return;
|
||||
const fetchDeepSeekStream = async (url, bodyPayload) => {
|
||||
try {
|
||||
return await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
...BASE_HEADERS,
|
||||
authorization: `Bearer ${deepseekToken}`,
|
||||
'x-ds-pow-response': powHeader,
|
||||
},
|
||||
body: JSON.stringify(bodyPayload),
|
||||
signal: upstreamController.signal,
|
||||
});
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
throw err;
|
||||
};
|
||||
const fetchCompletion = (bodyPayload) => fetchDeepSeekStream(DEEPSEEK_COMPLETION_URL, bodyPayload);
|
||||
const fetchContinue = (messageID) => fetchDeepSeekStream(DEEPSEEK_CONTINUE_URL, {
|
||||
chat_session_id: sessionID,
|
||||
message_id: messageID,
|
||||
fallback_to_resume: true,
|
||||
});
|
||||
|
||||
let completionRes = await fetchCompletion(completionPayload);
|
||||
if (completionRes === null) {
|
||||
return;
|
||||
}
|
||||
if (clientClosed) {
|
||||
return;
|
||||
@@ -126,6 +142,7 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
let currentType = thinkingEnabled ? 'thinking' : 'text';
|
||||
let thinkingText = '';
|
||||
let outputText = '';
|
||||
let usagePrompt = finalPrompt;
|
||||
const toolSieveEnabled = toolPolicy.toolSieveEnabled;
|
||||
const toolSieveState = createToolSieveState();
|
||||
let toolCallsEmitted = false;
|
||||
@@ -133,7 +150,6 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
const streamToolCallIDs = new Map();
|
||||
const streamToolNames = new Map();
|
||||
const decoder = new TextDecoder();
|
||||
reader = completionRes.body.getReader();
|
||||
let buffered = '';
|
||||
let ended = false;
|
||||
const { sendFrame, sendDeltaFrame } = createChatCompletionEmitter({
|
||||
@@ -144,14 +160,14 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
isClosed: () => clientClosed,
|
||||
});
|
||||
|
||||
const finish = async (reason) => {
|
||||
const finish = async (reason, options = {}) => {
|
||||
if (ended) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
ended = true;
|
||||
if (clientClosed || res.writableEnded || res.destroyed) {
|
||||
ended = true;
|
||||
await releaseLease();
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
const detected = parseStandaloneToolCalls(outputText, toolNames);
|
||||
if (detected.length > 0 && !toolCallsDoneEmitted) {
|
||||
@@ -177,21 +193,26 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
reason = 'tool_calls';
|
||||
}
|
||||
if (detected.length === 0 && !toolCallsEmitted && outputText.trim() === '') {
|
||||
if (options.deferEmpty && reason !== 'content_filter') {
|
||||
return false;
|
||||
}
|
||||
ended = true;
|
||||
const detail = upstreamEmptyOutputDetail(reason === 'content_filter', outputText, thinkingText);
|
||||
sendFailedChunk(res, detail.status, detail.message, detail.code);
|
||||
await releaseLease();
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.end();
|
||||
}
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
ended = true;
|
||||
sendFrame({
|
||||
id: sessionID,
|
||||
object: 'chat.completion.chunk',
|
||||
created,
|
||||
model,
|
||||
choices: [{ delta: {}, index: 0, finish_reason: reason }],
|
||||
usage: buildUsage(finalPrompt, thinkingText, outputText),
|
||||
usage: buildUsage(usagePrompt, thinkingText, outputText),
|
||||
});
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.write('data: [DONE]\n\n');
|
||||
@@ -200,122 +221,185 @@ async function handleVercelStream(req, res, rawBody, payload) {
|
||||
if (!res.writableEnded && !res.destroyed) {
|
||||
res.end();
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
try {
|
||||
const processStream = async (initialResponse, allowDeferEmpty) => {
|
||||
let currentResponse = initialResponse;
|
||||
let continueState = createContinueState(sessionID);
|
||||
let continueRounds = 0;
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
if (clientClosed) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
buffered += decoder.decode(value, { stream: true });
|
||||
const lines = buffered.split('\n');
|
||||
buffered = lines.pop() || '';
|
||||
|
||||
for (const rawLine of lines) {
|
||||
const line = rawLine.trim();
|
||||
if (!line.startsWith('data:')) {
|
||||
continue;
|
||||
}
|
||||
const dataStr = line.slice(5).trim();
|
||||
if (!dataStr) {
|
||||
continue;
|
||||
}
|
||||
if (dataStr === '[DONE]') {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
let chunk;
|
||||
try {
|
||||
chunk = JSON.parse(dataStr);
|
||||
} catch (_err) {
|
||||
continue;
|
||||
}
|
||||
const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers);
|
||||
if (!parsed.parsed) {
|
||||
continue;
|
||||
}
|
||||
currentType = parsed.newType;
|
||||
if (parsed.errorMessage) {
|
||||
await finish('content_filter');
|
||||
return;
|
||||
}
|
||||
if (parsed.contentFilter) {
|
||||
await finish(outputText.trim() === '' ? 'content_filter' : 'stop');
|
||||
return;
|
||||
}
|
||||
if (parsed.finished) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
|
||||
for (const p of parsed.parts) {
|
||||
if (!p.text) {
|
||||
continue;
|
||||
reader = currentResponse.body.getReader();
|
||||
buffered = '';
|
||||
let streamEnded = false;
|
||||
try {
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
if (clientClosed) {
|
||||
await finish('stop');
|
||||
return { terminal: true, retryable: false };
|
||||
}
|
||||
if (p.type === 'thinking') {
|
||||
if (thinkingEnabled) {
|
||||
const trimmed = trimContinuationOverlap(thinkingText, p.text);
|
||||
if (!trimmed) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
buffered += decoder.decode(value, { stream: true });
|
||||
const lines = buffered.split('\n');
|
||||
buffered = lines.pop() || '';
|
||||
|
||||
for (const rawLine of lines) {
|
||||
const line = rawLine.trim();
|
||||
if (!line.startsWith('data:')) {
|
||||
continue;
|
||||
}
|
||||
const dataStr = line.slice(5).trim();
|
||||
if (!dataStr) {
|
||||
continue;
|
||||
}
|
||||
if (dataStr === '[DONE]') {
|
||||
streamEnded = true;
|
||||
break;
|
||||
}
|
||||
let chunk;
|
||||
try {
|
||||
chunk = JSON.parse(dataStr);
|
||||
} catch (_err) {
|
||||
continue;
|
||||
}
|
||||
observeContinueState(continueState, chunk);
|
||||
const parsed = parseChunkForContent(chunk, thinkingEnabled, currentType, stripReferenceMarkers);
|
||||
if (!parsed.parsed) {
|
||||
continue;
|
||||
}
|
||||
currentType = parsed.newType;
|
||||
if (parsed.errorMessage) {
|
||||
return { terminal: await finish('content_filter'), retryable: false };
|
||||
}
|
||||
if (parsed.contentFilter) {
|
||||
return { terminal: await finish(outputText.trim() === '' ? 'content_filter' : 'stop'), retryable: false };
|
||||
}
|
||||
if (parsed.finished) {
|
||||
streamEnded = true;
|
||||
break;
|
||||
}
|
||||
|
||||
for (const p of parsed.parts) {
|
||||
if (!p.text) {
|
||||
continue;
|
||||
}
|
||||
thinkingText += trimmed;
|
||||
sendDeltaFrame({ reasoning_content: trimmed });
|
||||
}
|
||||
} else {
|
||||
const trimmed = trimContinuationOverlap(outputText, p.text);
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
if (searchEnabled && isCitation(trimmed)) {
|
||||
continue;
|
||||
}
|
||||
outputText += trimmed;
|
||||
if (!toolSieveEnabled) {
|
||||
sendDeltaFrame({ content: trimmed });
|
||||
continue;
|
||||
}
|
||||
const events = processToolSieveChunk(toolSieveState, trimmed, toolNames);
|
||||
for (const evt of events) {
|
||||
if (evt.type === 'tool_call_deltas') {
|
||||
if (!emitEarlyToolDeltas) {
|
||||
if (p.type === 'thinking') {
|
||||
if (thinkingEnabled) {
|
||||
const trimmed = trimContinuationOverlap(thinkingText, p.text);
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
thinkingText += trimmed;
|
||||
sendDeltaFrame({ reasoning_content: trimmed });
|
||||
}
|
||||
} else {
|
||||
const trimmed = trimContinuationOverlap(outputText, p.text);
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames);
|
||||
const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs);
|
||||
if (formatted.length > 0) {
|
||||
toolCallsEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatted });
|
||||
if (searchEnabled && isCitation(trimmed)) {
|
||||
continue;
|
||||
}
|
||||
outputText += trimmed;
|
||||
if (!toolSieveEnabled) {
|
||||
sendDeltaFrame({ content: trimmed });
|
||||
continue;
|
||||
}
|
||||
const events = processToolSieveChunk(toolSieveState, trimmed, toolNames);
|
||||
for (const evt of events) {
|
||||
if (evt.type === 'tool_call_deltas') {
|
||||
if (!emitEarlyToolDeltas) {
|
||||
continue;
|
||||
}
|
||||
const filtered = filterIncrementalToolCallDeltasByAllowed(evt.deltas, toolNames, streamToolNames);
|
||||
const formatted = formatIncrementalToolCallDeltas(filtered, streamToolCallIDs);
|
||||
if (formatted.length > 0) {
|
||||
toolCallsEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatted });
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (evt.type === 'tool_calls') {
|
||||
toolCallsEmitted = true;
|
||||
toolCallsDoneEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
|
||||
resetStreamToolCallState(streamToolCallIDs, streamToolNames);
|
||||
continue;
|
||||
}
|
||||
if (evt.text) {
|
||||
sendDeltaFrame({ content: evt.text });
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (evt.type === 'tool_calls') {
|
||||
toolCallsEmitted = true;
|
||||
toolCallsDoneEmitted = true;
|
||||
sendDeltaFrame({ tool_calls: formatOpenAIStreamToolCalls(evt.calls, streamToolCallIDs) });
|
||||
resetStreamToolCallState(streamToolCallIDs, streamToolNames);
|
||||
continue;
|
||||
}
|
||||
if (evt.text) {
|
||||
sendDeltaFrame({ content: evt.text });
|
||||
}
|
||||
}
|
||||
if (streamEnded) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (streamEnded) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
await finish('stop');
|
||||
return { terminal: true, retryable: false };
|
||||
}
|
||||
await finish('stop');
|
||||
return { terminal: true, retryable: false };
|
||||
}
|
||||
|
||||
if (shouldAutoContinue(continueState) && continueRounds < AUTO_CONTINUE_MAX_ROUNDS) {
|
||||
continueRounds += 1;
|
||||
const nextRes = await fetchContinue(continueState.responseMessageID);
|
||||
if (nextRes === null) {
|
||||
return { terminal: true, retryable: false };
|
||||
}
|
||||
if (!nextRes.ok || !nextRes.body) {
|
||||
return { terminal: await finish('stop'), retryable: false };
|
||||
}
|
||||
continueState = prepareContinueStateForNextRound(continueState);
|
||||
currentResponse = nextRes;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
await finish('stop');
|
||||
} catch (err) {
|
||||
if (clientClosed || isAbortError(err)) {
|
||||
|
||||
const terminal = await finish('stop', { deferEmpty: allowDeferEmpty });
|
||||
return { terminal, retryable: !terminal && allowDeferEmpty };
|
||||
};
|
||||
|
||||
let retryAttempts = 0;
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const processed = await processStream(completionRes, retryAttempts < EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS);
|
||||
if (processed.terminal) {
|
||||
return;
|
||||
}
|
||||
if (!processed.retryable || retryAttempts >= EMPTY_OUTPUT_RETRY_MAX_ATTEMPTS) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
retryAttempts += 1;
|
||||
console.info('[openai_empty_retry] attempting synthetic retry', {
|
||||
surface: 'chat.completions',
|
||||
stream: true,
|
||||
retry_attempt: retryAttempts,
|
||||
});
|
||||
usagePrompt = usagePromptWithEmptyOutputRetry(finalPrompt, retryAttempts);
|
||||
completionRes = await fetchCompletion(clonePayloadWithEmptyOutputRetryPrompt(completionPayload));
|
||||
if (completionRes === null) {
|
||||
return;
|
||||
}
|
||||
if (!completionRes.ok || !completionRes.body) {
|
||||
await finish('stop');
|
||||
return;
|
||||
}
|
||||
await finish('stop');
|
||||
}
|
||||
} finally {
|
||||
req.removeListener('aborted', onReqAborted);
|
||||
@@ -328,6 +412,109 @@ function toBool(v) {
|
||||
return v === true;
|
||||
}
|
||||
|
||||
function clonePayloadWithEmptyOutputRetryPrompt(payload) {
|
||||
return {
|
||||
...(payload || {}),
|
||||
prompt: appendEmptyOutputRetrySuffix(asString(payload && payload.prompt)),
|
||||
};
|
||||
}
|
||||
|
||||
function appendEmptyOutputRetrySuffix(prompt) {
|
||||
const base = asString(prompt).trimEnd();
|
||||
if (!base) {
|
||||
return EMPTY_OUTPUT_RETRY_SUFFIX;
|
||||
}
|
||||
return `${base}\n\n${EMPTY_OUTPUT_RETRY_SUFFIX}`;
|
||||
}
|
||||
|
||||
function usagePromptWithEmptyOutputRetry(originalPrompt, attempts) {
|
||||
if (!attempts || attempts <= 0) {
|
||||
return originalPrompt;
|
||||
}
|
||||
const parts = [originalPrompt];
|
||||
let next = originalPrompt;
|
||||
for (let i = 0; i < attempts; i += 1) {
|
||||
next = appendEmptyOutputRetrySuffix(next);
|
||||
parts.push(next);
|
||||
}
|
||||
return parts.join('\n');
|
||||
}
|
||||
|
||||
function createContinueState(sessionID) {
|
||||
return {
|
||||
sessionID: asString(sessionID),
|
||||
responseMessageID: 0,
|
||||
lastStatus: '',
|
||||
finished: false,
|
||||
};
|
||||
}
|
||||
|
||||
function prepareContinueStateForNextRound(state) {
|
||||
return {
|
||||
...state,
|
||||
lastStatus: '',
|
||||
finished: false,
|
||||
};
|
||||
}
|
||||
|
||||
function observeContinueState(state, chunk) {
|
||||
if (!state || !chunk || typeof chunk !== 'object') {
|
||||
return;
|
||||
}
|
||||
const topID = numberValue(chunk.response_message_id);
|
||||
if (topID > 0) {
|
||||
state.responseMessageID = topID;
|
||||
}
|
||||
if (chunk.p === 'response/status') {
|
||||
setContinueStatus(state, asString(chunk.v));
|
||||
}
|
||||
const response = chunk.v && typeof chunk.v === 'object' ? chunk.v.response : null;
|
||||
if (response && typeof response === 'object') {
|
||||
const id = numberValue(response.message_id);
|
||||
if (id > 0) {
|
||||
state.responseMessageID = id;
|
||||
}
|
||||
setContinueStatus(state, asString(response.status));
|
||||
if (response.auto_continue === true) {
|
||||
state.lastStatus = 'AUTO_CONTINUE';
|
||||
}
|
||||
}
|
||||
const messageResponse = chunk.message && typeof chunk.message === 'object' && chunk.message.response;
|
||||
if (messageResponse && typeof messageResponse === 'object') {
|
||||
const id = numberValue(messageResponse.message_id);
|
||||
if (id > 0) {
|
||||
state.responseMessageID = id;
|
||||
}
|
||||
setContinueStatus(state, asString(messageResponse.status));
|
||||
}
|
||||
}
|
||||
|
||||
function setContinueStatus(state, status) {
|
||||
const normalized = asString(status).trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
state.lastStatus = normalized;
|
||||
if (normalized.toUpperCase() === 'FINISHED') {
|
||||
state.finished = true;
|
||||
}
|
||||
}
|
||||
|
||||
function shouldAutoContinue(state) {
|
||||
if (!state || state.finished || !state.sessionID || state.responseMessageID <= 0) {
|
||||
return false;
|
||||
}
|
||||
return ['WIP', 'INCOMPLETE', 'AUTO_CONTINUE'].includes(asString(state.lastStatus).trim().toUpperCase());
|
||||
}
|
||||
|
||||
function numberValue(v) {
|
||||
if (typeof v === 'number' && Number.isFinite(v)) {
|
||||
return Math.trunc(v);
|
||||
}
|
||||
const parsed = Number.parseInt(asString(v), 10);
|
||||
return Number.isFinite(parsed) ? parsed : 0;
|
||||
}
|
||||
|
||||
function upstreamEmptyOutputDetail(contentFilter, _text, thinking) {
|
||||
if (contentFilter) {
|
||||
return {
|
||||
|
||||
@@ -121,8 +121,15 @@ function parseSSEDataFrames(body) {
|
||||
}
|
||||
|
||||
async function runMockVercelStream(upstreamLines, prepareOverrides = {}) {
|
||||
return runMockVercelStreamSequence([upstreamLines], prepareOverrides);
|
||||
}
|
||||
|
||||
async function runMockVercelStreamSequence(upstreamSequences, prepareOverrides = {}) {
|
||||
const originalFetch = global.fetch;
|
||||
const fetchURLs = [];
|
||||
const fetchBodies = [];
|
||||
let completionCalls = 0;
|
||||
let continueCalls = 0;
|
||||
const prepareBody = {
|
||||
session_id: 'chatcmpl-test',
|
||||
lease_id: 'lease-test',
|
||||
@@ -137,23 +144,33 @@ async function runMockVercelStream(upstreamLines, prepareOverrides = {}) {
|
||||
payload: { prompt: 'hello' },
|
||||
...prepareOverrides,
|
||||
};
|
||||
global.fetch = async (url) => {
|
||||
global.fetch = async (url, init = {}) => {
|
||||
const textURL = String(url);
|
||||
fetchURLs.push(textURL);
|
||||
if (init && init.body) {
|
||||
fetchBodies.push(JSON.parse(String(init.body)));
|
||||
}
|
||||
if (textURL.includes('__stream_prepare=1')) {
|
||||
return jsonResponse(prepareBody);
|
||||
}
|
||||
if (textURL.includes('__stream_release=1')) {
|
||||
return jsonResponse({ success: true });
|
||||
}
|
||||
return sseResponse(upstreamLines);
|
||||
if (textURL.includes('/continue')) {
|
||||
const idx = Math.min(continueCalls + 1, upstreamSequences.length - 1);
|
||||
continueCalls += 1;
|
||||
return sseResponse(upstreamSequences[idx]);
|
||||
}
|
||||
const idx = Math.min(completionCalls, upstreamSequences.length - 1);
|
||||
completionCalls += 1;
|
||||
return sseResponse(upstreamSequences[idx]);
|
||||
};
|
||||
try {
|
||||
const req = new MockStreamRequest();
|
||||
const res = new MockStreamResponse();
|
||||
const payload = { model: 'gpt-test', stream: true };
|
||||
await handleVercelStream(req, res, Buffer.from(JSON.stringify(payload)), payload);
|
||||
return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs };
|
||||
return { res, frames: parseSSEDataFrames(res.bodyText()), fetchURLs, fetchBodies };
|
||||
} finally {
|
||||
global.fetch = originalFetch;
|
||||
}
|
||||
@@ -174,6 +191,37 @@ test('vercel stream emits Go-parity empty-output failure on DONE', async () => {
|
||||
assert.equal(frames[1], '[DONE]');
|
||||
});
|
||||
|
||||
test('vercel stream retries empty output once and keeps one terminal frame', async () => {
|
||||
const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([
|
||||
['data: [DONE]\n\n'],
|
||||
['data: {"p":"response/content","v":"visible"}\n\n', 'data: [DONE]\n\n'],
|
||||
]);
|
||||
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
|
||||
const completionBodies = fetchBodies.filter((body) => Object.hasOwn(body, 'prompt'));
|
||||
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 2);
|
||||
assert.equal(frames.filter((frame) => frame === '[DONE]').length, 1);
|
||||
assert.equal(parsed[0].choices[0].delta.content, 'visible');
|
||||
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
|
||||
assert.equal(parsed[0].id, parsed[1].id);
|
||||
assert.match(completionBodies[1].prompt, /Previous reply had no visible output\. Please regenerate the visible final answer or tool call now\.$/);
|
||||
});
|
||||
|
||||
test('vercel stream exhausts DeepSeek continue before synthetic retry', async () => {
|
||||
const { frames, fetchURLs, fetchBodies } = await runMockVercelStreamSequence([
|
||||
[
|
||||
'data: {"response_message_id":7,"v":{"response":{"message_id":7,"status":"WIP","auto_continue":true}}}\n\n',
|
||||
'data: [DONE]\n\n',
|
||||
],
|
||||
['data: {"p":"response/content","v":"continued"}\n\n', 'data: [DONE]\n\n'],
|
||||
]);
|
||||
const parsed = frames.filter((frame) => frame !== '[DONE]').map((frame) => JSON.parse(frame));
|
||||
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/completion').length, 1);
|
||||
assert.equal(fetchURLs.filter((url) => url === 'https://chat.deepseek.com/api/v0/chat/continue').length, 1);
|
||||
assert.equal(parsed[0].choices[0].delta.content, 'continued');
|
||||
assert.equal(parsed[1].choices[0].finish_reason, 'stop');
|
||||
assert.equal(fetchBodies.some((body) => String(body.prompt || '').includes('Previous reply had no visible output')), false);
|
||||
});
|
||||
|
||||
test('vercel stream emits content_filter failure when upstream filters empty output', async () => {
|
||||
const { frames } = await runMockVercelStream(['data: {"code":"content_filter"}\n\n']);
|
||||
assert.equal(frames.length, 2);
|
||||
|
||||
Reference in New Issue
Block a user