Files
ds2api/internal/httpapi/claude/handler_messages.go

320 lines
9.4 KiB
Go

package claude
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"time"
"ds2api/internal/auth"
"ds2api/internal/completionruntime"
"ds2api/internal/config"
claudefmt "ds2api/internal/format/claude"
"ds2api/internal/httpapi/requestbody"
"ds2api/internal/promptcompat"
streamengine "ds2api/internal/stream"
"ds2api/internal/translatorcliproxy"
"ds2api/internal/util"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
)
func (h *Handler) Messages(w http.ResponseWriter, r *http.Request) {
if strings.TrimSpace(r.Header.Get("anthropic-version")) == "" {
r.Header.Set("anthropic-version", "2023-06-01")
}
if isClaudeVercelProxyRequest(r) && h.proxyViaOpenAI(w, r, h.Store) {
return
}
if h.Auth == nil || h.DS == nil {
if h.OpenAI != nil && h.proxyViaOpenAI(w, r, h.Store) {
return
}
writeClaudeError(w, http.StatusInternalServerError, "Claude runtime backend unavailable.")
return
}
if h.handleClaudeDirect(w, r) {
return
}
writeClaudeError(w, http.StatusBadGateway, "Failed to handle Claude request.")
}
func isClaudeVercelProxyRequest(r *http.Request) bool {
if r == nil || r.URL == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" ||
strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
}
func (h *Handler) handleClaudeDirect(w http.ResponseWriter, r *http.Request) bool {
raw, err := io.ReadAll(r.Body)
if err != nil {
if errors.Is(err, requestbody.ErrInvalidUTF8Body) {
writeClaudeError(w, http.StatusBadRequest, "invalid json")
} else {
writeClaudeError(w, http.StatusBadRequest, "invalid body")
}
return true
}
var req map[string]any
if err := json.Unmarshal(raw, &req); err != nil {
writeClaudeError(w, http.StatusBadRequest, "invalid json")
return true
}
norm, err := normalizeClaudeRequest(h.Store, req)
if err != nil {
writeClaudeError(w, http.StatusBadRequest, err.Error())
return true
}
exposeThinking := norm.Standard.Thinking
a, err := h.Auth.Determine(r)
if err != nil {
writeClaudeError(w, http.StatusUnauthorized, err.Error())
return true
}
defer h.Auth.Release(a)
if norm.Standard.Stream {
h.handleClaudeDirectStream(w, r, a, norm.Standard)
return true
}
result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, norm.Standard, completionruntime.Options{
StripReferenceMarkers: h.compatStripReferenceMarkers(),
RetryEnabled: true,
CurrentInputFile: h.Store,
})
if outErr != nil {
writeClaudeError(w, outErr.Status, outErr.Message)
return true
}
writeJSON(w, http.StatusOK, claudefmt.BuildMessageResponseFromTurn(
fmt.Sprintf("msg_%d", time.Now().UnixNano()),
norm.Standard.ResponseModel,
result.Turn,
exposeThinking,
))
return true
}
func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) {
start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{
CurrentInputFile: h.Store,
})
if outErr != nil {
writeClaudeError(w, outErr.Status, outErr.Message)
return
}
streamReq := start.Request
h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw)
}
func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool {
raw, err := io.ReadAll(r.Body)
if err != nil {
if errors.Is(err, requestbody.ErrInvalidUTF8Body) {
writeClaudeError(w, http.StatusBadRequest, "invalid json")
} else {
writeClaudeError(w, http.StatusBadRequest, "invalid body")
}
return true
}
var req map[string]any
if err := json.Unmarshal(raw, &req); err != nil {
writeClaudeError(w, http.StatusBadRequest, "invalid json")
return true
}
model, _ := req["model"].(string)
stream := util.ToBool(req["stream"])
// Use the shared global model resolver so Claude/OpenAI/Gemini stay consistent.
translateModel := model
if store != nil {
if norm, normErr := normalizeClaudeRequest(store, cloneMap(req)); normErr == nil && strings.TrimSpace(norm.Standard.ResolvedModel) != "" {
translateModel = strings.TrimSpace(norm.Standard.ResolvedModel)
}
}
translatedReq := translatorcliproxy.ToOpenAI(sdktranslator.FormatClaude, translateModel, raw, stream)
translatedReq, exposeThinking := applyClaudeThinkingPolicyToOpenAIRequest(translatedReq, req)
isVercelPrepare := strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
isVercelRelease := strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
if isVercelRelease {
proxyReq := r.Clone(r.Context())
proxyReq.URL.Path = "/v1/chat/completions"
proxyReq.Body = io.NopCloser(bytes.NewReader(raw))
proxyReq.ContentLength = int64(len(raw))
rec := httptest.NewRecorder()
h.OpenAI.ChatCompletions(rec, proxyReq)
res := rec.Result()
defer func() { _ = res.Body.Close() }()
body, _ := io.ReadAll(res.Body)
for k, vv := range res.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(res.StatusCode)
_, _ = w.Write(body)
return true
}
proxyReq := r.Clone(r.Context())
proxyReq.URL.Path = "/v1/chat/completions"
proxyReq.Body = io.NopCloser(bytes.NewReader(translatedReq))
proxyReq.ContentLength = int64(len(translatedReq))
if stream && !isVercelPrepare {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
streamWriter := translatorcliproxy.NewOpenAIStreamTranslatorWriter(w, sdktranslator.FormatClaude, model, raw, translatedReq)
h.OpenAI.ChatCompletions(streamWriter, proxyReq)
return true
}
rec := httptest.NewRecorder()
h.OpenAI.ChatCompletions(rec, proxyReq)
res := rec.Result()
defer func() { _ = res.Body.Close() }()
body, _ := io.ReadAll(res.Body)
if res.StatusCode < 200 || res.StatusCode >= 300 {
for k, vv := range res.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(res.StatusCode)
_, _ = w.Write(body)
return true
}
if isVercelPrepare {
for k, vv := range res.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(res.StatusCode)
_, _ = w.Write(body)
return true
}
converted := translatorcliproxy.FromOpenAINonStream(sdktranslator.FormatClaude, model, raw, translatedReq, body)
if !exposeThinking {
converted = stripClaudeThinkingBlocks(converted)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(converted)
return true
}
func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[string]any) ([]byte, bool) {
req := map[string]any{}
if err := json.Unmarshal(translated, &req); err != nil {
return translated, false
}
enabled, ok := util.ResolveThinkingOverride(original)
if !ok {
if _, translatedHasOverride := util.ResolveThinkingOverride(req); translatedHasOverride {
return translated, false
}
enabled = true
}
typ := "disabled"
if enabled {
typ = "enabled"
}
req["thinking"] = map[string]any{"type": typ}
out, err := json.Marshal(req)
if err != nil {
return translated, enabled
}
return out, enabled
}
func stripClaudeThinkingBlocks(raw []byte) []byte {
var payload map[string]any
if err := json.Unmarshal(raw, &payload); err != nil {
return raw
}
content, _ := payload["content"].([]any)
if len(content) == 0 {
return raw
}
filtered := make([]any, 0, len(content))
for _, item := range content {
block, _ := item.(map[string]any)
blockType, _ := block["type"].(string)
if strings.TrimSpace(blockType) == "thinking" {
continue
}
filtered = append(filtered, item)
}
payload["content"] = filtered
out, err := json.Marshal(payload)
if err != nil {
return raw
}
return out
}
func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any) {
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
writeClaudeError(w, http.StatusInternalServerError, string(body))
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
_, canFlush := w.(http.Flusher)
if !canFlush {
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
}
streamRuntime := newClaudeStreamRuntime(
w,
rc,
canFlush,
model,
messages,
thinkingEnabled,
searchEnabled,
h.compatStripReferenceMarkers(),
toolNames,
toolsRaw,
buildClaudePromptTokenText(messages, thinkingEnabled),
)
streamRuntime.sendMessageStart()
initialType := "text"
if thinkingEnabled {
initialType = "thinking"
}
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
Context: r.Context(),
Body: resp.Body,
ThinkingEnabled: thinkingEnabled,
InitialType: initialType,
KeepAliveInterval: claudeStreamPingInterval,
IdleTimeout: claudeStreamIdleTimeout,
MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt,
}, streamengine.ConsumeHooks{
OnKeepAlive: func() {
streamRuntime.sendPing()
},
OnParsed: streamRuntime.onParsed,
OnFinalize: streamRuntime.onFinalize,
})
}