mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-05 08:55:28 +08:00
- Stream: strip both and [reference:N] markers to prevent leaking partial link metadata during incremental output - Non-stream: convert citation/reference markers to Markdown links for Claude Messages, Gemini generateContent, and OpenAI Chat/Responses - Remove StripReferenceMarkers option from call sites; behavior is now determined automatically by stream vs non-stream context - Extend JS runtime stripReferenceMarkersText() to also match [citation:N] - Add tests for streaming marker stripping and non-stream link conversion Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
363 lines
11 KiB
Go
363 lines
11 KiB
Go
package claude
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"time"
|
|
|
|
"ds2api/internal/auth"
|
|
"ds2api/internal/completionruntime"
|
|
"ds2api/internal/config"
|
|
claudefmt "ds2api/internal/format/claude"
|
|
"ds2api/internal/httpapi/openai/history"
|
|
"ds2api/internal/httpapi/requestbody"
|
|
"ds2api/internal/promptcompat"
|
|
"ds2api/internal/responsehistory"
|
|
streamengine "ds2api/internal/stream"
|
|
"ds2api/internal/translatorcliproxy"
|
|
"ds2api/internal/util"
|
|
|
|
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
|
|
)
|
|
|
|
func (h *Handler) Messages(w http.ResponseWriter, r *http.Request) {
|
|
if strings.TrimSpace(r.Header.Get("anthropic-version")) == "" {
|
|
r.Header.Set("anthropic-version", "2023-06-01")
|
|
}
|
|
if isClaudeVercelProxyRequest(r) && h.proxyViaOpenAI(w, r, h.Store) {
|
|
return
|
|
}
|
|
if h.Auth == nil || h.DS == nil {
|
|
if h.OpenAI != nil && h.proxyViaOpenAI(w, r, h.Store) {
|
|
return
|
|
}
|
|
writeClaudeError(w, http.StatusInternalServerError, "Claude runtime backend unavailable.")
|
|
return
|
|
}
|
|
if h.handleClaudeDirect(w, r) {
|
|
return
|
|
}
|
|
writeClaudeError(w, http.StatusBadGateway, "Failed to handle Claude request.")
|
|
}
|
|
|
|
func isClaudeVercelProxyRequest(r *http.Request) bool {
|
|
if r == nil || r.URL == nil {
|
|
return false
|
|
}
|
|
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" ||
|
|
strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
|
|
}
|
|
|
|
func (h *Handler) handleClaudeDirect(w http.ResponseWriter, r *http.Request) bool {
|
|
raw, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if errors.Is(err, requestbody.ErrInvalidUTF8Body) {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid json")
|
|
} else {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid body")
|
|
}
|
|
return true
|
|
}
|
|
var req map[string]any
|
|
if err := json.Unmarshal(raw, &req); err != nil {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid json")
|
|
return true
|
|
}
|
|
norm, err := normalizeClaudeRequest(h.Store, req)
|
|
if err != nil {
|
|
writeClaudeError(w, http.StatusBadRequest, err.Error())
|
|
return true
|
|
}
|
|
exposeThinking := norm.Standard.Thinking
|
|
a, err := h.Auth.Determine(r)
|
|
if err != nil {
|
|
writeClaudeError(w, http.StatusUnauthorized, err.Error())
|
|
return true
|
|
}
|
|
defer h.Auth.Release(a)
|
|
stdReq, err := h.applyCurrentInputFile(r.Context(), a, norm.Standard)
|
|
if err != nil {
|
|
status, message := mapCurrentInputFileError(err)
|
|
writeClaudeError(w, status, message)
|
|
return true
|
|
}
|
|
historySession := responsehistory.Start(responsehistory.StartParams{
|
|
Store: h.ChatHistory,
|
|
Request: r,
|
|
Auth: a,
|
|
Surface: "claude.messages",
|
|
Standard: stdReq,
|
|
})
|
|
if stdReq.Stream {
|
|
h.handleClaudeDirectStream(w, r, a, stdReq, historySession)
|
|
return true
|
|
}
|
|
result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{
|
|
RetryEnabled: true,
|
|
CurrentInputFile: h.Store,
|
|
})
|
|
if outErr != nil {
|
|
if historySession != nil {
|
|
historySession.ErrorTurn(outErr.Status, outErr.Message, outErr.Code, result.Turn)
|
|
}
|
|
writeClaudeError(w, outErr.Status, outErr.Message)
|
|
return true
|
|
}
|
|
if historySession != nil {
|
|
historySession.SuccessTurn(http.StatusOK, result.Turn, responsehistory.GenericUsage(result.Turn))
|
|
}
|
|
writeJSON(w, http.StatusOK, claudefmt.BuildMessageResponseFromTurn(
|
|
fmt.Sprintf("msg_%d", time.Now().UnixNano()),
|
|
stdReq.ResponseModel,
|
|
result.Turn,
|
|
exposeThinking,
|
|
))
|
|
return true
|
|
}
|
|
|
|
func (h *Handler) applyCurrentInputFile(ctx context.Context, a *auth.RequestAuth, stdReq promptcompat.StandardRequest) (promptcompat.StandardRequest, error) {
|
|
if h == nil {
|
|
return stdReq, nil
|
|
}
|
|
return (history.Service{Store: h.Store, DS: h.DS}).ApplyCurrentInputFile(ctx, a, stdReq)
|
|
}
|
|
|
|
func mapCurrentInputFileError(err error) (int, string) {
|
|
return history.MapError(err)
|
|
}
|
|
|
|
func (h *Handler) handleClaudeDirectStream(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, stdReq promptcompat.StandardRequest, historySession *responsehistory.Session) {
|
|
start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{
|
|
CurrentInputFile: h.Store,
|
|
})
|
|
if outErr != nil {
|
|
if historySession != nil {
|
|
historySession.Error(outErr.Status, outErr.Message, outErr.Code, "", "")
|
|
}
|
|
writeClaudeError(w, outErr.Status, outErr.Message)
|
|
return
|
|
}
|
|
streamReq := start.Request
|
|
h.handleClaudeStreamRealtime(w, r, start.Response, streamReq.ResponseModel, streamReq.Messages, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, historySession)
|
|
}
|
|
|
|
func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store ConfigReader) bool {
|
|
raw, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
if errors.Is(err, requestbody.ErrInvalidUTF8Body) {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid json")
|
|
} else {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid body")
|
|
}
|
|
return true
|
|
}
|
|
var req map[string]any
|
|
if err := json.Unmarshal(raw, &req); err != nil {
|
|
writeClaudeError(w, http.StatusBadRequest, "invalid json")
|
|
return true
|
|
}
|
|
model, _ := req["model"].(string)
|
|
stream := util.ToBool(req["stream"])
|
|
|
|
// Use the shared global model resolver so Claude/OpenAI/Gemini stay consistent.
|
|
translateModel := model
|
|
if store != nil {
|
|
if norm, normErr := normalizeClaudeRequest(store, cloneMap(req)); normErr == nil && strings.TrimSpace(norm.Standard.ResolvedModel) != "" {
|
|
translateModel = strings.TrimSpace(norm.Standard.ResolvedModel)
|
|
}
|
|
}
|
|
translatedReq := translatorcliproxy.ToOpenAI(sdktranslator.FormatClaude, translateModel, raw, stream)
|
|
translatedReq, exposeThinking := applyClaudeThinkingPolicyToOpenAIRequest(translatedReq, req)
|
|
|
|
isVercelPrepare := strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
|
|
isVercelRelease := strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
|
|
|
|
if isVercelRelease {
|
|
proxyReq := r.Clone(r.Context())
|
|
proxyReq.URL.Path = "/v1/chat/completions"
|
|
proxyReq.Body = io.NopCloser(bytes.NewReader(raw))
|
|
proxyReq.ContentLength = int64(len(raw))
|
|
rec := httptest.NewRecorder()
|
|
h.OpenAI.ChatCompletions(rec, proxyReq)
|
|
res := rec.Result()
|
|
defer func() { _ = res.Body.Close() }()
|
|
body, _ := io.ReadAll(res.Body)
|
|
for k, vv := range res.Header {
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
w.WriteHeader(res.StatusCode)
|
|
_, _ = w.Write(body)
|
|
return true
|
|
}
|
|
|
|
proxyReq := r.Clone(r.Context())
|
|
proxyReq.URL.Path = "/v1/chat/completions"
|
|
proxyReq.Body = io.NopCloser(bytes.NewReader(translatedReq))
|
|
proxyReq.ContentLength = int64(len(translatedReq))
|
|
|
|
if stream && !isVercelPrepare {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
streamWriter := translatorcliproxy.NewOpenAIStreamTranslatorWriter(w, sdktranslator.FormatClaude, model, raw, translatedReq)
|
|
h.OpenAI.ChatCompletions(streamWriter, proxyReq)
|
|
return true
|
|
}
|
|
|
|
rec := httptest.NewRecorder()
|
|
h.OpenAI.ChatCompletions(rec, proxyReq)
|
|
res := rec.Result()
|
|
defer func() { _ = res.Body.Close() }()
|
|
body, _ := io.ReadAll(res.Body)
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
for k, vv := range res.Header {
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
w.WriteHeader(res.StatusCode)
|
|
_, _ = w.Write(body)
|
|
return true
|
|
}
|
|
if isVercelPrepare {
|
|
for k, vv := range res.Header {
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
w.WriteHeader(res.StatusCode)
|
|
_, _ = w.Write(body)
|
|
return true
|
|
}
|
|
converted := translatorcliproxy.FromOpenAINonStream(sdktranslator.FormatClaude, model, raw, translatedReq, body)
|
|
if !exposeThinking {
|
|
converted = stripClaudeThinkingBlocks(converted)
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(converted)
|
|
return true
|
|
}
|
|
|
|
func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[string]any) ([]byte, bool) {
|
|
req := map[string]any{}
|
|
if err := json.Unmarshal(translated, &req); err != nil {
|
|
return translated, false
|
|
}
|
|
enabled, ok := util.ResolveThinkingOverride(original)
|
|
if !ok {
|
|
if _, translatedHasOverride := util.ResolveThinkingOverride(req); translatedHasOverride {
|
|
return translated, false
|
|
}
|
|
enabled = true
|
|
}
|
|
typ := "disabled"
|
|
if enabled {
|
|
typ = "enabled"
|
|
}
|
|
req["thinking"] = map[string]any{"type": typ}
|
|
out, err := json.Marshal(req)
|
|
if err != nil {
|
|
return translated, enabled
|
|
}
|
|
return out, enabled
|
|
}
|
|
|
|
func stripClaudeThinkingBlocks(raw []byte) []byte {
|
|
var payload map[string]any
|
|
if err := json.Unmarshal(raw, &payload); err != nil {
|
|
return raw
|
|
}
|
|
content, _ := payload["content"].([]any)
|
|
if len(content) == 0 {
|
|
return raw
|
|
}
|
|
filtered := make([]any, 0, len(content))
|
|
for _, item := range content {
|
|
block, _ := item.(map[string]any)
|
|
blockType, _ := block["type"].(string)
|
|
if strings.TrimSpace(blockType) == "thinking" {
|
|
continue
|
|
}
|
|
filtered = append(filtered, item)
|
|
}
|
|
payload["content"] = filtered
|
|
out, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return raw
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Request, resp *http.Response, model string, messages []any, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) {
|
|
var historySession *responsehistory.Session
|
|
if len(historySessions) > 0 {
|
|
historySession = historySessions[0]
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
if historySession != nil {
|
|
historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "")
|
|
}
|
|
writeClaudeError(w, http.StatusInternalServerError, string(body))
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache, no-transform")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("X-Accel-Buffering", "no")
|
|
rc := http.NewResponseController(w)
|
|
_, canFlush := w.(http.Flusher)
|
|
if !canFlush {
|
|
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
|
|
}
|
|
|
|
streamRuntime := newClaudeStreamRuntime(
|
|
w,
|
|
rc,
|
|
canFlush,
|
|
model,
|
|
messages,
|
|
thinkingEnabled,
|
|
searchEnabled,
|
|
stripReferenceMarkersEnabled(),
|
|
toolNames,
|
|
toolsRaw,
|
|
buildClaudePromptTokenText(messages, thinkingEnabled),
|
|
historySession,
|
|
)
|
|
streamRuntime.sendMessageStart()
|
|
|
|
initialType := "text"
|
|
if thinkingEnabled {
|
|
initialType = "thinking"
|
|
}
|
|
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
|
Context: r.Context(),
|
|
Body: resp.Body,
|
|
ThinkingEnabled: thinkingEnabled,
|
|
InitialType: initialType,
|
|
KeepAliveInterval: claudeStreamPingInterval,
|
|
IdleTimeout: claudeStreamIdleTimeout,
|
|
MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt,
|
|
}, streamengine.ConsumeHooks{
|
|
OnKeepAlive: func() {
|
|
streamRuntime.sendPing()
|
|
},
|
|
OnParsed: streamRuntime.onParsed,
|
|
OnFinalize: streamRuntime.onFinalize,
|
|
})
|
|
}
|