Files
ds2api/internal/completionruntime/stream_retry.go

180 lines
6.1 KiB
Go

package completionruntime
import (
"context"
"io"
"net/http"
"strings"
"ds2api/internal/assistantturn"
"ds2api/internal/auth"
"ds2api/internal/config"
"ds2api/internal/httpapi/openai/shared"
)
type StreamRetryOptions struct {
Surface string
Stream bool
RetryEnabled bool
RetryMaxAttempts int
MaxAttempts int
UsagePrompt string
}
type StreamRetryHooks struct {
ConsumeAttempt func(resp *http.Response, allowDeferEmpty bool) (terminalWritten bool, retryable bool)
Finalize func(attempts int)
ParentMessageID func() int
OnRetry func(attempts int)
OnRetryPrompt func(prompt string)
OnRetryFailure func(status int, message, code string)
OnAccountSwitch func(sessionID string)
OnTerminal func(attempts int)
}
func ExecuteStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, initialResp *http.Response, payload map[string]any, pow string, opts StreamRetryOptions, hooks StreamRetryHooks) {
if hooks.ConsumeAttempt == nil {
return
}
surface := strings.TrimSpace(opts.Surface)
if surface == "" {
surface = "completion"
}
maxAttempts := opts.MaxAttempts
if maxAttempts <= 0 {
maxAttempts = 3
}
retryMax := opts.RetryMaxAttempts
if retryMax <= 0 {
retryMax = shared.EmptyOutputRetryMaxAttempts()
}
attempts := 0
accountSwitchAttempted := false
currentResp := initialResp
currentPayload := clonePayload(payload)
for {
allowAccountSwitch := opts.RetryEnabled && attempts >= retryMax && !accountSwitchAttempted && a != nil && a.UseConfigToken
terminalWritten, retryable := hooks.ConsumeAttempt(currentResp, opts.RetryEnabled && (attempts < retryMax || allowAccountSwitch))
if terminalWritten {
if hooks.OnTerminal != nil {
hooks.OnTerminal(attempts)
}
return
}
if !retryable || !opts.RetryEnabled {
if hooks.Finalize != nil {
hooks.Finalize(attempts)
}
return
}
if attempts >= retryMax {
if canRetryOnAlternateAccount(ctx, a, &assistantturn.OutputError{Status: http.StatusTooManyRequests}, opts.RetryEnabled, &accountSwitchAttempted) {
switched, switchErr := startPayloadCompletionOnAlternateAccount(ctx, ds, a, payload, maxAttempts)
if switchErr != nil {
if hooks.OnRetryFailure != nil {
hooks.OnRetryFailure(switchErr.Status, switchErr.Message, switchErr.Code)
}
return
}
if switched.Response != nil {
config.Logger.Info("[completion_runtime_account_switch_retry] retrying after 429", "surface", surface, "stream", opts.Stream, "account", a.AccountID)
currentResp = switched.Response
currentPayload = switched.Payload
pow = switched.Pow
if hooks.OnAccountSwitch != nil {
hooks.OnAccountSwitch(switched.SessionID)
}
if hooks.OnRetryPrompt != nil {
hooks.OnRetryPrompt(opts.UsagePrompt)
}
continue
}
}
if hooks.Finalize != nil {
hooks.Finalize(attempts)
}
return
}
attempts++
parentMessageID := 0
if hooks.ParentMessageID != nil {
parentMessageID = hooks.ParentMessageID()
}
config.Logger.Info("[completion_runtime_empty_retry] attempting synthetic retry", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "parent_message_id", parentMessageID)
retryPow, powErr := ds.GetPow(ctx, a, maxAttempts)
if powErr != nil {
config.Logger.Warn("[completion_runtime_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", powErr)
retryPow = pow
}
nextResp, err := ds.CallCompletion(ctx, a, shared.ClonePayloadForEmptyOutputRetry(currentPayload, parentMessageID), retryPow, maxAttempts)
if err != nil {
if hooks.OnRetryFailure != nil {
hooks.OnRetryFailure(http.StatusInternalServerError, "Failed to get completion.", "error")
}
config.Logger.Warn("[completion_runtime_empty_retry] retry request failed", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", err)
return
}
if nextResp.StatusCode != http.StatusOK {
body, readErr := io.ReadAll(nextResp.Body)
if readErr != nil {
config.Logger.Warn("[completion_runtime_empty_retry] retry error body read failed", "surface", surface, "stream", opts.Stream, "retry_attempt", attempts, "error", readErr)
}
closeRetryBody(surface, nextResp.Body)
msg := strings.TrimSpace(string(body))
if msg == "" {
msg = http.StatusText(nextResp.StatusCode)
}
if hooks.OnRetryFailure != nil {
hooks.OnRetryFailure(nextResp.StatusCode, msg, "error")
}
return
}
if hooks.OnRetry != nil {
hooks.OnRetry(attempts)
}
if hooks.OnRetryPrompt != nil {
hooks.OnRetryPrompt(shared.UsagePromptWithEmptyOutputRetry(opts.UsagePrompt, attempts))
}
currentResp = nextResp
}
}
func startPayloadCompletionOnAlternateAccount(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, payload map[string]any, maxAttempts int) (StartResult, *assistantturn.OutputError) {
sessionID, err := ds.CreateSession(ctx, a, maxAttempts)
if err != nil {
return StartResult{}, authOutputError(a)
}
pow, err := ds.GetPow(ctx, a, maxAttempts)
if err != nil {
return StartResult{SessionID: sessionID}, &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Failed to get PoW (invalid token or unknown error).", Code: "error"}
}
nextPayload := clonePayload(payload)
nextPayload["chat_session_id"] = sessionID
delete(nextPayload, "parent_message_id")
resp, err := ds.CallCompletion(ctx, a, nextPayload, pow, maxAttempts)
if err != nil {
return StartResult{SessionID: sessionID, Payload: nextPayload, Pow: pow}, &assistantturn.OutputError{Status: http.StatusInternalServerError, Message: "Failed to get completion.", Code: "error"}
}
return StartResult{SessionID: sessionID, Payload: nextPayload, Pow: pow, Response: resp}, nil
}
func clonePayload(payload map[string]any) map[string]any {
clone := make(map[string]any, len(payload))
for k, v := range payload {
clone[k] = v
}
return clone
}
func closeRetryBody(surface string, body io.Closer) {
if body == nil {
return
}
if err := body.Close(); err != nil {
config.Logger.Warn("[completion_runtime_empty_retry] retry response body close failed", "surface", surface, "error", err)
}
}