package completionruntime import ( "context" "fmt" "io" "net/http" "strings" "ds2api/internal/assistantturn" "ds2api/internal/auth" "ds2api/internal/config" dsclient "ds2api/internal/deepseek/client" "ds2api/internal/httpapi/openai/history" "ds2api/internal/httpapi/openai/shared" "ds2api/internal/promptcompat" "ds2api/internal/sse" ) type DeepSeekCaller interface { CreateSession(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error) GetPow(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error) UploadFile(ctx context.Context, a *auth.RequestAuth, req dsclient.UploadFileRequest, maxAttempts int) (*dsclient.UploadFileResult, error) CallCompletion(ctx context.Context, a *auth.RequestAuth, payload map[string]any, powResp string, maxAttempts int) (*http.Response, error) } type Options struct { StripReferenceMarkers bool MaxAttempts int RetryEnabled bool RetryMaxAttempts int CurrentInputFile history.CurrentInputConfigReader } type NonStreamResult struct { SessionID string Payload map[string]any Turn assistantturn.Turn Attempts int } type StartResult struct { SessionID string Payload map[string]any Pow string Response *http.Response Request promptcompat.StandardRequest } func StartCompletion(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, opts Options) (StartResult, *assistantturn.OutputError) { maxAttempts := opts.MaxAttempts if maxAttempts <= 0 { maxAttempts = 3 } var prepErr *assistantturn.OutputError stdReq, prepErr = prepareCurrentInputFile(ctx, ds, a, stdReq, opts) if prepErr != nil { return StartResult{Request: stdReq}, prepErr } sessionID, err := ds.CreateSession(ctx, a, maxAttempts) if err != nil { return StartResult{Request: stdReq}, authOutputError(a) } pow, err := ds.GetPow(ctx, a, maxAttempts) if err != nil { return StartResult{SessionID: sessionID, Request: stdReq}, &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Failed to get PoW (invalid token or unknown error).", Code: "error"} } payload := stdReq.CompletionPayload(sessionID) resp, err := ds.CallCompletion(ctx, a, payload, pow, maxAttempts) if err != nil { return StartResult{SessionID: sessionID, Payload: payload, Pow: pow, Request: stdReq}, &assistantturn.OutputError{Status: http.StatusInternalServerError, Message: "Failed to get completion.", Code: "error"} } return StartResult{SessionID: sessionID, Payload: payload, Pow: pow, Response: resp, Request: stdReq}, nil } func prepareCurrentInputFile(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, opts Options) (promptcompat.StandardRequest, *assistantturn.OutputError) { if opts.CurrentInputFile == nil || stdReq.CurrentInputFileApplied { return stdReq, nil } out, err := (history.Service{Store: opts.CurrentInputFile, DS: ds}).ApplyCurrentInputFile(ctx, a, stdReq) if err != nil { status, message := history.MapError(err) return out, &assistantturn.OutputError{Status: status, Message: message, Code: "error"} } return out, nil } func ExecuteNonStreamWithRetry(ctx context.Context, ds DeepSeekCaller, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, opts Options) (NonStreamResult, *assistantturn.OutputError) { start, startErr := StartCompletion(ctx, ds, a, stdReq, opts) if startErr != nil { return NonStreamResult{SessionID: start.SessionID, Payload: start.Payload}, startErr } stdReq = start.Request maxAttempts := opts.MaxAttempts if maxAttempts <= 0 { maxAttempts = 3 } sessionID := start.SessionID payload := start.Payload pow := start.Pow attempts := 0 currentResp := start.Response usagePrompt := stdReq.PromptTokenText accumulatedThinking := "" accumulatedRawThinking := "" accumulatedToolDetectionThinking := "" for { turn, outErr := collectAttempt(currentResp, stdReq, usagePrompt, opts) if outErr != nil { return NonStreamResult{SessionID: sessionID, Payload: payload, Attempts: attempts}, outErr } accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, turn.Thinking) accumulatedRawThinking += sse.TrimContinuationOverlap(accumulatedRawThinking, turn.RawThinking) accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, turn.DetectionThinking) turn.Thinking = accumulatedThinking turn.RawThinking = accumulatedRawThinking turn.DetectionThinking = accumulatedToolDetectionThinking turn = assistantturn.BuildTurnFromCollected(sse.CollectResult{ Text: turn.RawText, Thinking: turn.RawThinking, ToolDetectionThinking: turn.DetectionThinking, ContentFilter: turn.ContentFilter, CitationLinks: turn.CitationLinks, ResponseMessageID: turn.ResponseMessageID, }, buildOptions(stdReq, usagePrompt, opts)) retryMax := opts.RetryMaxAttempts if retryMax <= 0 { retryMax = shared.EmptyOutputRetryMaxAttempts() } if !opts.RetryEnabled || !assistantturn.ShouldRetryEmptyOutput(turn, attempts, retryMax) { return NonStreamResult{SessionID: sessionID, Payload: payload, Turn: turn, Attempts: attempts}, turn.Error } attempts++ config.Logger.Info("[completion_runtime_empty_retry] attempting synthetic retry", "surface", stdReq.Surface, "stream", false, "retry_attempt", attempts, "parent_message_id", turn.ResponseMessageID) retryPow, powErr := ds.GetPow(ctx, a, maxAttempts) if powErr != nil { config.Logger.Warn("[completion_runtime_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", stdReq.Surface, "retry_attempt", attempts, "error", powErr) retryPow = pow } retryPayload := shared.ClonePayloadForEmptyOutputRetry(payload, turn.ResponseMessageID) nextResp, err := ds.CallCompletion(ctx, a, retryPayload, retryPow, maxAttempts) if err != nil { return NonStreamResult{SessionID: sessionID, Payload: payload, Turn: turn, Attempts: attempts}, &assistantturn.OutputError{Status: http.StatusInternalServerError, Message: "Failed to get completion.", Code: "error"} } usagePrompt = shared.UsagePromptWithEmptyOutputRetry(usagePrompt, attempts) currentResp = nextResp } } func collectAttempt(resp *http.Response, stdReq promptcompat.StandardRequest, usagePrompt string, opts Options) (assistantturn.Turn, *assistantturn.OutputError) { defer func() { if err := resp.Body.Close(); err != nil { config.Logger.Warn("[completion_runtime] response body close failed", "surface", stdReq.Surface, "error", err) } }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) message := strings.TrimSpace(string(body)) if message == "" { message = http.StatusText(resp.StatusCode) } return assistantturn.Turn{}, &assistantturn.OutputError{Status: resp.StatusCode, Message: message, Code: "error"} } result := sse.CollectStream(resp, stdReq.Thinking, false) return assistantturn.BuildTurnFromCollected(result, buildOptions(stdReq, usagePrompt, opts)), nil } func buildOptions(stdReq promptcompat.StandardRequest, prompt string, opts Options) assistantturn.BuildOptions { return assistantturn.BuildOptions{ Model: stdReq.ResponseModel, Prompt: prompt, RefFileTokens: stdReq.RefFileTokens, SearchEnabled: stdReq.Search, StripReferenceMarkers: opts.StripReferenceMarkers, ToolNames: stdReq.ToolNames, ToolsRaw: stdReq.ToolsRaw, ToolChoice: stdReq.ToolChoice, } } func authOutputError(a *auth.RequestAuth) *assistantturn.OutputError { if a != nil && a.UseConfigToken { return &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Account token is invalid. Please re-login the account in admin.", Code: "error"} } return &assistantturn.OutputError{Status: http.StatusUnauthorized, Message: "Invalid token. If this should be a DS2API key, add it to config.keys first.", Code: "error"} } func Errorf(status int, format string, args ...any) *assistantturn.OutputError { return &assistantturn.OutputError{Status: status, Message: fmt.Sprintf(format, args...), Code: "error"} }