package claude import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/http/httptest" "strings" "time" "ds2api/internal/auth" "ds2api/internal/completionruntime" "ds2api/internal/config" claudefmt "ds2api/internal/format/claude" "ds2api/internal/httpapi/openai/history" "ds2api/internal/httpapi/requestbody" "ds2api/internal/promptcompat" "ds2api/internal/responsehistory" streamengine "ds2api/internal/stream" "ds2api/internal/translatorcliproxy" "ds2api/internal/util" sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" ) func (h *Handler) Messages(w http.ResponseWriter, r *http.Request) { if strings.TrimSpace(r.Header.Get("anthropic-version")) == "" { r.Header.Set("anthropic-version", "2023-06-01") } if isClaudeVercelProxyRequest(r) && h.proxyViaOpenAI(w, r, h.Store) { return } if h.Auth == nil || h.DS == nil { if h.OpenAI != nil && h.proxyViaOpenAI(w, r, h.Store) { return } writeClaudeError(w, http.StatusInternalServerError, "Claude runtime backend unavailable.") return } if h.handleClaudeDirect(w, r) { return } writeClaudeError(w, http.StatusBadGateway, "Failed to handle Claude request.") } func isClaudeVercelProxyRequest(r *http.Request) bool { if r == nil || r.URL == nil { return false } return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" || strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" } func (h *Handler) handleClaudeDirect(w http.ResponseWriter, r *http.Request) bool { raw, err := io.ReadAll(r.Body) if err != nil { if errors.Is(err, requestbody.ErrInvalidUTF8Body) { writeClaudeError(w, http.StatusBadRequest, "invalid json") } else { writeClaudeError(w, http.StatusBadRequest, "invalid body") } return true } var req map[string]any if err := json.Unmarshal(raw, &req); err != nil { writeClaudeError(w, http.StatusBadRequest, "invalid json") return true } norm, err := normalizeClaudeRequest(h.Store, req) if err != nil { writeClaudeError(w, http.StatusBadRequest, err.Error()) return true } exposeThinking := norm.Standard.Thinking a, err := h.Auth.Determine(r) if err != nil { writeClaudeError(w, http.StatusUnauthorized, err.Error()) return true } defer h.Auth.Release(a) stdReq, err := h.applyCurrentInputFile(r.Context(), a, norm.Standard) if err != nil { status, message := mapCurrentInputFileError(err) writeClaudeError(w, status, message) return true } historySession := responsehistory.Start(responsehistory.StartParams{ Store: h.ChatHistory, Request: r, Auth: a, Surface: "claude.messages", Standard: stdReq, }) if stdReq.Stream { h.handleClaudeDirectStream(w, r, a, stdReq, historySession) return true } result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ RetryEnabled: true, CurrentInputFile: h.Store, }) if outErr != nil { if historySession != nil { historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn) } writeClaudeError(w, outErr.Status, outErr.Message) return true } if historySession != nil { historySession.SuccessTurn(http.StatusOK, result.Turn, responsehistory.GenericUsage(result.Turn)) } writeJSON(w, http.StatusOK, claudefmt.BuildMessageResponseFromTurn( fmt.Sprintf("msg_%d", time.Now().UnixNano()), stdReq.ResponseModel, result.Turn, exposeThinking, )) return true } func (h *Handler) applyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) (promptcompat.StandardRequest, error) { if h == nil { return stdReq, nil } return (history.Service{Store: h.Store, DS: h.DS}).ApplyCurrentInputFile(ctx, a, stdReq) } func mapCurrentInputFileError(err error) (int, string) { return history.MapError(err) } func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session) { start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{ CurrentInputFile: h.Store, }) if outErr != nil { if historySession != nil { historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "") } writeClaudeError(w, outErr.Status, outErr.Message) return } streamReq := start.Request h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession) } func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool { raw, err := io.ReadAll(r.Body) if err != nil { if errors.Is(err, requestbody.ErrInvalidUTF8Body) { writeClaudeError(w, http.StatusBadRequest, "invalid json") } else { writeClaudeError(w, http.StatusBadRequest, "invalid body") } return true } var req map[string]any if err := json.Unmarshal(raw, &req); err != nil { writeClaudeError(w, http.StatusBadRequest, "invalid json") return true } model, _ := req["model"].(string) stream := util.ToBool(req["stream"]) // Use the shared global model resolver so Claude/OpenAI/Gemini stay consistent. translateModel := model if store != nil { if norm, normErr := normalizeClaudeRequest(store, cloneMap(req)); normErr == nil && strings.TrimSpace(norm.Standard.ResolvedModel) != "" { translateModel = strings.TrimSpace(norm.Standard.ResolvedModel) } } translatedReq := translatorcliproxy.ToOpenAI(sdktranslator.FormatClaude, translateModel, raw, stream) translatedReq, exposeThinking := applyClaudeThinkingPolicyToOpenAIRequest(translatedReq, req) isVercelPrepare := strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" isVercelRelease := strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" if isVercelRelease { proxyReq := r.Clone(r.Context()) proxyReq.URL.Path = "/v1/chat/completions" proxyReq.Body = io.NopCloser(bytes.NewReader(raw)) proxyReq.ContentLength = int64(len(raw)) rec := httptest.NewRecorder() h.OpenAI.ChatCompletions(rec, proxyReq) res := rec.Result() defer func() { _ = res.Body.Close() }() body, _ := io.ReadAll(res.Body) for k, vv := range res.Header { for _, v := range vv { w.Header().Add(k, v) } } w.WriteHeader(res.StatusCode) _, _ = w.Write(body) return true } proxyReq := r.Clone(r.Context()) proxyReq.URL.Path = "/v1/chat/completions" proxyReq.Body = io.NopCloser(bytes.NewReader(translatedReq)) proxyReq.ContentLength = int64(len(translatedReq)) if stream && !isVercelPrepare { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") streamWriter := translatorcliproxy.NewOpenAIStreamTranslatorWriter(w, sdktranslator.FormatClaude, model, raw, translatedReq) h.OpenAI.ChatCompletions(streamWriter, proxyReq) return true } rec := httptest.NewRecorder() h.OpenAI.ChatCompletions(rec, proxyReq) res := rec.Result() defer func() { _ = res.Body.Close() }() body, _ := io.ReadAll(res.Body) if res.StatusCode < 200 || res.StatusCode >= 300 { for k, vv := range res.Header { for _, v := range vv { w.Header().Add(k, v) } } w.WriteHeader(res.StatusCode) _, _ = w.Write(body) return true } if isVercelPrepare { for k, vv := range res.Header { for _, v := range vv { w.Header().Add(k, v) } } w.WriteHeader(res.StatusCode) _, _ = w.Write(body) return true } converted := translatorcliproxy.FromOpenAINonStream(sdktranslator.FormatClaude, model, raw, translatedReq, body) if !exposeThinking { converted = stripClaudeThinkingBlocks(converted) } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write(converted) return true } func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[string]any) ([]byte, bool) { req := map[string]any{} if err := json.Unmarshal(translated, &req); err != nil { return translated, false } enabled, ok := util.ResolveThinkingOverride(original) if !ok { if _, translatedHasOverride := util.ResolveThinkingOverride(req); translatedHasOverride { return translated, false } enabled = true } typ := "disabled" if enabled { typ = "enabled" } req["thinking"] = map[string]any{"type": typ} out, err := json.Marshal(req) if err != nil { return translated, enabled } return out, enabled } func stripClaudeThinkingBlocks(raw []byte) []byte { var payload map[string]any if err := json.Unmarshal(raw, &payload); err != nil { return raw } content, _ := payload["content"].([]any) if len(content) == 0 { return raw } filtered := make([]any, 0, len(content)) for _, item := range content { block, _ := item.(map[string]any) blockType, _ := block["type"].(string) if strings.TrimSpace(blockType) == "thinking" { continue } filtered = append(filtered, item) } payload["content"] = filtered out, err := json.Marshal(payload) if err != nil { return raw } return out } func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) { var historySession *responsehistory.Session if len(historySessions) > 0 { historySession = historySessions[0] } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) if historySession != nil { historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") } writeClaudeError(w, http.StatusInternalServerError, string(body)) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") rc := http.NewResponseController(w) _, canFlush := w.(http.Flusher) if !canFlush { config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered") } streamRuntime := newClaudeStreamRuntime( w, rc, canFlush, model, messages, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, buildClaudePromptTokenText(messages, thinkingEnabled), historySession, ) streamRuntime.sendMessageStart() initialType := "text" if thinkingEnabled { initialType = "thinking" } streamengine.ConsumeSSE(streamengine.ConsumeConfig{ Context: r.Context(), Body: resp.Body, ThinkingEnabled: thinkingEnabled, InitialType: initialType, KeepAliveInterval: claudeStreamPingInterval, IdleTimeout: claudeStreamIdleTimeout, MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt, }, streamengine.ConsumeHooks{ OnKeepAlive: func() { streamRuntime.sendPing() }, OnParsed: streamRuntime.onParsed, OnFinalize: streamRuntime.onFinalize, }) }