refactor: unify empty-output retry logic into shared completionruntime package and normalize protocol adapter boundary.

This commit is contained in:
CJACK
2026-05-10 00:10:53 +08:00
parent 067cf465bb
commit 7c66742a19
32 changed files with 930 additions and 371 deletions

View File

@@ -145,7 +145,7 @@ func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Reques
return
}
streamReq := start.Request
h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession)
h.handleClaudeStreamRealtimeWithRetry(w, r, a, start.Response, start.Payload, start.Pow, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.PromptTokenText, historySession)
}
func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool {
@@ -360,3 +360,112 @@ func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Requ
OnFinalize: streamRuntime.onFinalize,
})
}
func (h *Handler) handleClaudeStreamRealtimeWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, promptTokenText string, historySession *responsehistory.Session) {
if resp.StatusCode != http.StatusOK {
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
if historySession != nil {
historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "")
}
writeClaudeError(w, http.StatusInternalServerError, string(body))
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
_, canFlush := w.(http.Flusher)
if !canFlush {
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
}
streamRuntime := newClaudeStreamRuntime(
w,
rc,
canFlush,
model,
messages,
thinkingEnabled,
searchEnabled,
stripReferenceMarkersEnabled(),
toolNames,
toolsRaw,
promptTokenText,
historySession,
)
streamRuntime.sendMessageStart()
completionruntime.ExecuteStreamWithRetry(r.Context(), h.DS, a, resp, payload, pow, completionruntime.StreamRetryOptions{
Surface: "claude.messages",
Stream: true,
RetryEnabled: true,
MaxAttempts: 3,
UsagePrompt: promptTokenText,
}, completionruntime.StreamRetryHooks{
ConsumeAttempt: func(currentResp *http.Response, allowDeferEmpty bool) (bool, bool) {
return h.consumeClaudeStreamAttempt(r, currentResp, streamRuntime, thinkingEnabled, allowDeferEmpty)
},
Finalize: func(_ int) {
streamRuntime.finalize("end_turn", false)
},
ParentMessageID: func() int {
return streamRuntime.responseMessageID
},
OnRetryPrompt: func(prompt string) {
streamRuntime.promptTokenText = prompt
},
OnRetryFailure: func(status int, message, code string) {
streamRuntime.sendErrorWithCode(status, strings.TrimSpace(message), code)
},
})
}
func (h *Handler) consumeClaudeStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *claudeStreamRuntime, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) {
defer func() { _ = resp.Body.Close() }()
initialType := "text"
if thinkingEnabled {
initialType = "thinking"
}
finalReason := streamengine.StopReason("")
var scannerErr error
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
Context: r.Context(),
Body: resp.Body,
ThinkingEnabled: thinkingEnabled,
InitialType: initialType,
KeepAliveInterval: claudeStreamPingInterval,
IdleTimeout: claudeStreamIdleTimeout,
MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt,
}, streamengine.ConsumeHooks{
OnKeepAlive: func() {
streamRuntime.sendPing()
},
OnParsed: streamRuntime.onParsed,
OnFinalize: func(reason streamengine.StopReason, err error) {
finalReason = reason
scannerErr = err
},
})
if string(finalReason) == "upstream_error" {
if streamRuntime.history != nil {
streamRuntime.history.Error(500, streamRuntime.upstreamErr, "upstream_error", responsehistory.ThinkingForArchive(streamRuntime.rawThinking.String(), streamRuntime.toolDetectionThinking.String(), streamRuntime.thinking.String()), responsehistory.TextForArchive(streamRuntime.rawText.String(), streamRuntime.text.String()))
}
streamRuntime.sendError(streamRuntime.upstreamErr)
return true, false
}
if scannerErr != nil {
if streamRuntime.history != nil {
streamRuntime.history.Error(500, scannerErr.Error(), "error", responsehistory.ThinkingForArchive(streamRuntime.rawThinking.String(), streamRuntime.toolDetectionThinking.String(), streamRuntime.thinking.String()), responsehistory.TextForArchive(streamRuntime.rawText.String(), streamRuntime.text.String()))
}
streamRuntime.sendError(scannerErr.Error())
return true, false
}
terminalWritten := streamRuntime.finalize("end_turn", allowDeferEmpty)
if terminalWritten {
return true, false
}
return false, true
}

View File

@@ -29,9 +29,10 @@ type claudeStreamRuntime struct {
bufferToolContent bool
stripReferenceMarkers bool
messageID string
thinking strings.Builder
text strings.Builder
messageID string
thinking strings.Builder
text strings.Builder
responseMessageID int
sieve toolstream.State
rawText strings.Builder
@@ -92,6 +93,9 @@ func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.Parse
s.upstreamErr = parsed.ErrorMessage
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("upstream_error")}
}
if parsed.ResponseMessageID > 0 {
s.responseMessageID = parsed.ResponseMessageID
}
if parsed.Stop {
return streamengine.ParsedDecision{Stop: true}
}

View File

@@ -22,16 +22,27 @@ func (s *claudeStreamRuntime) send(event string, v any) {
}
func (s *claudeStreamRuntime) sendError(message string) {
s.sendErrorWithCode(500, message, "internal_error")
}
func (s *claudeStreamRuntime) sendErrorWithCode(status int, message, code string) {
msg := strings.TrimSpace(message)
if msg == "" {
msg = "upstream stream error"
}
if code == "" {
code = "internal_error"
}
errType := "api_error"
if status == 429 {
errType = "rate_limit_error"
}
s.send("error", map[string]any{
"type": "error",
"error": map[string]any{
"type": "api_error",
"type": errType,
"message": msg,
"code": "internal_error",
"code": code,
"param": nil,
},
})

View File

@@ -63,13 +63,10 @@ func (s *claudeStreamRuntime) sendToolUseBlock(idx int, tc toolcall.ParsedToolCa
})
}
func (s *claudeStreamRuntime) finalize(stopReason string) {
func (s *claudeStreamRuntime) finalize(stopReason string, deferEmptyOutput bool) bool {
if s.ended {
return
return true
}
s.ended = true
s.closeThinkingBlock()
if s.bufferToolContent {
for _, evt := range toolstream.Flush(&s.sieve, s.toolNames) {
@@ -123,6 +120,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
RawThinking: s.rawThinking.String(),
VisibleThinking: s.thinking.String(),
DetectionThinking: s.toolDetectionThinking.String(),
ResponseMessageID: s.responseMessageID,
AlreadyEmittedCalls: s.toolCallsDetected,
AlreadyEmittedToolRaw: s.toolCallsDetected,
}, assistantturn.BuildOptions{
@@ -137,6 +135,22 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{
AlreadyEmittedToolCalls: s.toolCallsDetected,
})
if outcome.ShouldFail {
if deferEmptyOutput {
return false
}
s.ended = true
s.closeThinkingBlock()
s.closeTextBlock()
if s.history != nil {
s.history.Error(outcome.Error.Status, outcome.Error.Message, outcome.Error.Code, responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), responsehistory.TextForArchive(turn.RawText, turn.Text))
}
s.sendErrorWithCode(outcome.Error.Status, outcome.Error.Message, outcome.Error.Code)
return true
}
s.ended = true
s.closeThinkingBlock()
if s.bufferToolContent && !s.toolCallsDetected {
if len(turn.ToolCalls) > 0 {
@@ -197,6 +211,7 @@ func (s *claudeStreamRuntime) finalize(stopReason string) {
},
})
s.send("message_stop", map[string]any{"type": "message_stop"})
return true
}
func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scannerErr error) {
@@ -214,5 +229,5 @@ func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scanner
s.sendError(scannerErr.Error())
return
}
s.finalize("end_turn")
s.finalize("end_turn", false)
}