Merge pull request #320 from adnxx1wsx/main

fix: fallback claude non-stream tool calls from thinking
This commit is contained in:
CJACK.
2026-04-26 17:54:05 +08:00
committed by GitHub
13 changed files with 237 additions and 26 deletions

View File

@@ -52,7 +52,7 @@ func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store C
}
}
translatedReq := translatorcliproxy.ToOpenAI(sdktranslator.FormatClaude, translateModel, raw, stream)
translatedReq = applyClaudeThinkingPolicyToOpenAIRequest(translatedReq, req)
translatedReq, exposeThinking := applyClaudeThinkingPolicyToOpenAIRequest(translatedReq, req, stream)
isVercelPrepare := strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
isVercelRelease := strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
@@ -118,23 +118,26 @@ func (h *Handler) proxyViaOpenAI(w http.ResponseWriter, r *http.Request, store C
return true
}
converted := translatorcliproxy.FromOpenAINonStream(sdktranslator.FormatClaude, model, raw, translatedReq, body)
if !exposeThinking {
converted = stripClaudeThinkingBlocks(converted)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(converted)
return true
}
func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[string]any) []byte {
func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[string]any, stream bool) ([]byte, bool) {
req := map[string]any{}
if err := json.Unmarshal(translated, &req); err != nil {
return translated
return translated, false
}
enabled, ok := util.ResolveThinkingOverride(original)
if !ok {
if _, translatedHasOverride := util.ResolveThinkingOverride(req); translatedHasOverride {
return translated
return translated, false
}
enabled = false
enabled = !stream
}
typ := "disabled"
if enabled {
@@ -143,7 +146,33 @@ func applyClaudeThinkingPolicyToOpenAIRequest(translated []byte, original map[st
req["thinking"] = map[string]any{"type": typ}
out, err := json.Marshal(req)
if err != nil {
return translated
return translated, ok && enabled
}
return out, ok && enabled
}
func stripClaudeThinkingBlocks(raw []byte) []byte {
var payload map[string]any
if err := json.Unmarshal(raw, &payload); err != nil {
return raw
}
content, _ := payload["content"].([]any)
if len(content) == 0 {
return raw
}
filtered := make([]any, 0, len(content))
for _, item := range content {
block, _ := item.(map[string]any)
blockType, _ := block["type"].(string)
if strings.TrimSpace(blockType) == "thinking" {
continue
}
filtered = append(filtered, item)
}
payload["content"] = filtered
out, err := json.Marshal(payload)
if err != nil {
return raw
}
return out
}

View File

@@ -126,7 +126,7 @@ func TestClaudeProxyViaOpenAIPreservesThinkingOverride(t *testing.T) {
}
}
func TestClaudeProxyViaOpenAIDisablesThinkingByDefault(t *testing.T) {
func TestClaudeProxyViaOpenAIEnablesThinkingInternallyByDefaultForNonStream(t *testing.T) {
openAI := &openAIProxyCaptureStub{}
h := &Handler{
Store: claudeProxyStoreStub{aliases: map[string]string{"claude-sonnet-4-6": "deepseek-v4-flash"}},
@@ -141,8 +141,8 @@ func TestClaudeProxyViaOpenAIDisablesThinkingByDefault(t *testing.T) {
t.Fatalf("unexpected status: %d body=%s", rec.Code, rec.Body.String())
}
thinking, _ := openAI.seenReq["thinking"].(map[string]any)
if thinking["type"] != "disabled" {
t.Fatalf("expected Claude default to disable downstream thinking, got %#v", openAI.seenReq)
if thinking["type"] != "enabled" {
t.Fatalf("expected Claude non-stream default to enable downstream thinking internally, got %#v", openAI.seenReq)
}
}
@@ -166,6 +166,43 @@ func TestClaudeProxyViaOpenAIEnablesThinkingWhenRequested(t *testing.T) {
}
}
func TestClaudeProxyViaOpenAIKeepsStreamDefaultThinkingDisabled(t *testing.T) {
openAI := &openAIProxyCaptureStub{}
h := &Handler{
Store: claudeProxyStoreStub{aliases: map[string]string{"claude-sonnet-4-6": "deepseek-v4-flash"}},
OpenAI: openAI,
}
req := httptest.NewRequest(http.MethodPost, "/anthropic/v1/messages", strings.NewReader(`{"model":"claude-sonnet-4-6","messages":[{"role":"user","content":"hi"}],"stream":true}`))
rec := httptest.NewRecorder()
h.Messages(rec, req)
thinking, _ := openAI.seenReq["thinking"].(map[string]any)
if thinking["type"] != "disabled" {
t.Fatalf("expected Claude stream default to keep downstream thinking disabled, got %#v", openAI.seenReq)
}
}
func TestClaudeProxyViaOpenAIStripsThinkingBlocksFromNonStreamResponse(t *testing.T) {
body := `{"id":"chatcmpl_1","object":"chat.completion","created":1,"model":"claude-sonnet-4-5","choices":[{"index":0,"message":{"role":"assistant","content":null,"reasoning_content":"internal reasoning","tool_calls":[{"id":"call_1","type":"function","function":{"name":"search","arguments":"{\"q\":\"x\"}"}}]},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":1,"completion_tokens":1,"total_tokens":2}}`
h := &Handler{OpenAI: openAIProxyStub{status: 200, body: body}}
req := httptest.NewRequest(http.MethodPost, "/anthropic/v1/messages", strings.NewReader(`{"model":"claude-sonnet-4-5","messages":[{"role":"user","content":"hi"}],"stream":false}`))
rec := httptest.NewRecorder()
h.Messages(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("unexpected status: %d body=%s", rec.Code, rec.Body.String())
}
got := rec.Body.String()
if strings.Contains(got, `"type":"thinking"`) {
t.Fatalf("expected converted Claude response to strip thinking block, got %s", got)
}
if !strings.Contains(got, `"tool_use"`) {
t.Fatalf("expected converted Claude response to preserve tool_use, got %s", got)
}
}
func TestClaudeProxyTranslatesInlineImageToOpenAIDataURL(t *testing.T) {
openAI := &openAIProxyCaptureStub{}
h := &Handler{OpenAI: openAI}

View File

@@ -133,7 +133,7 @@ func (s *chatStreamRuntime) finalize(finishReason string) {
finalText := cleanVisibleOutput(s.text.String(), s.stripReferenceMarkers)
s.finalThinking = finalThinking
s.finalText = finalText
detected := toolcall.ParseStandaloneToolCallsDetailed(finalText, s.toolNames)
detected := toolcall.ParseAssistantToolCallsDetailed(finalText, finalThinking, s.toolNames)
if len(detected.Calls) > 0 && !s.toolCallsDoneEmitted {
finishReason = "tool_calls"
delta := map[string]any{

View File

@@ -15,6 +15,7 @@ import (
"ds2api/internal/promptcompat"
"ds2api/internal/sse"
streamengine "ds2api/internal/stream"
"ds2api/internal/toolcall"
)
func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
@@ -162,7 +163,8 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, co
if searchEnabled {
finalText = replaceCitationMarkersWithLinks(finalText, result.CitationLinks)
}
if shouldWriteUpstreamEmptyOutputError(finalText) {
detected := toolcall.ParseAssistantToolCallsDetailed(finalText, finalThinking, toolNames)
if shouldWriteUpstreamEmptyOutputError(finalText) && len(detected.Calls) == 0 {
status, message, code := upstreamEmptyOutputDetail(result.ContentFilter, finalText, finalThinking)
if historySession != nil {
historySession.error(status, message, code, finalThinking, finalText)

View File

@@ -142,6 +142,37 @@ func TestHandleNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testing.T) {
}
}
func TestHandleNonStreamPromotesThinkingToolCallsWhenTextEmpty(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/thinking_content","v":"<tool_calls><invoke name=\"search\"><parameter name=\"q\">from-thinking</parameter></invoke></tool_calls>"}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
h.handleNonStream(rec, resp, "cid-thinking-tool", "deepseek-v4-pro", "prompt", true, false, []string{"search"}, nil)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200 for thinking tool calls, got %d body=%s", rec.Code, rec.Body.String())
}
out := decodeJSONBody(t, rec.Body.String())
choices, _ := out["choices"].([]any)
if len(choices) == 0 {
t.Fatalf("expected choices, got %#v", out)
}
choice, _ := choices[0].(map[string]any)
if got := asString(choice["finish_reason"]); got != "tool_calls" {
t.Fatalf("expected finish_reason=tool_calls, got %#v", choice["finish_reason"])
}
message, _ := choice["message"].(map[string]any)
toolCalls, _ := message["tool_calls"].([]any)
if len(toolCalls) != 1 {
t.Fatalf("expected one tool call, got %#v", message["tool_calls"])
}
if content, exists := message["content"]; !exists || content != nil {
t.Fatalf("expected content nil when tool call promoted, got %#v", message["content"])
}
}
func TestHandleStreamToolsPlainTextStreamsBeforeFinish(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
@@ -214,6 +245,43 @@ func TestHandleStreamIncompleteCapturedToolJSONFlushesAsTextOnFinalize(t *testin
}
}
func TestHandleStreamPromotesThinkingToolCallsOnFinalizeWithoutMidstreamIntercept(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(
`data: {"p":"response/thinking_content","v":"<tool_calls><invoke name=\"search\"><parameter name=\"q\">from-thinking</parameter></invoke></tool_calls>"}`,
`data: [DONE]`,
)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", nil)
h.handleStream(rec, req, resp, "cid-thinking-stream", "deepseek-v4-pro", "prompt", true, false, []string{"search"}, nil)
frames, done := parseSSEDataFrames(t, rec.Body.String())
if !done {
t.Fatalf("expected [DONE], body=%s", rec.Body.String())
}
if !streamHasToolCallsDelta(frames) {
t.Fatalf("expected tool_calls delta from finalize fallback, body=%s", rec.Body.String())
}
reasoningSeen := false
for _, frame := range frames {
choices, _ := frame["choices"].([]any)
for _, item := range choices {
choice, _ := item.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if asString(delta["reasoning_content"]) != "" {
reasoningSeen = true
}
}
}
if !reasoningSeen {
t.Fatalf("expected reasoning_content to stream before finalize fallback, body=%s", rec.Body.String())
}
if streamFinishReason(frames) != "tool_calls" {
t.Fatalf("expected finish_reason=tool_calls, body=%s", rec.Body.String())
}
}
func TestHandleStreamEmitsDistinctToolCallIDsAcrossSeparateToolBlocks(t *testing.T) {
h := &Handler{}
resp := makeSSEHTTPResponse(

View File

@@ -135,10 +135,10 @@ func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Res
if searchEnabled {
sanitizedText = replaceCitationMarkersWithLinks(sanitizedText, result.CitationLinks)
}
if writeUpstreamEmptyOutputError(w, sanitizedText, sanitizedThinking, result.ContentFilter) {
textParsed := toolcall.ParseAssistantToolCallsDetailed(sanitizedText, sanitizedThinking, toolNames)
if len(textParsed.Calls) == 0 && writeUpstreamEmptyOutputError(w, sanitizedText, sanitizedThinking, result.ContentFilter) {
return
}
textParsed := toolcall.ParseStandaloneToolCallsDetailed(sanitizedText, toolNames)
logResponsesToolPolicyRejection(traceID, toolChoice, textParsed, "text")
callCount := len(textParsed.Calls)

View File

@@ -133,7 +133,7 @@ func (s *responsesStreamRuntime) finalize() {
s.processToolStreamEvents(toolstream.Flush(&s.sieve, s.toolNames), true, true)
}
textParsed := toolcall.ParseStandaloneToolCallsDetailed(finalText, s.toolNames)
textParsed := toolcall.ParseAssistantToolCallsDetailed(finalText, finalThinking, s.toolNames)
detected := textParsed.Calls
s.logToolPolicyRejections(textParsed)

View File

@@ -232,6 +232,39 @@ func TestHandleResponsesStreamFailsWhenUpstreamHasOnlyThinking(t *testing.T) {
}
}
func TestHandleResponsesStreamPromotesThinkingToolCallsOnFinalizeWithoutMidstreamIntercept(t *testing.T) {
h := &Handler{}
req := httptest.NewRequest(http.MethodPost, "/v1/responses", nil)
rec := httptest.NewRecorder()
sseLine := func(path, value string) string {
b, _ := json.Marshal(map[string]any{
"p": path,
"v": value,
})
return "data: " + string(b) + "\n"
}
streamBody := sseLine("response/thinking_content", `<tool_calls><invoke name="read_file"><parameter name="path">README.MD</parameter></invoke></tool_calls>`) + "data: [DONE]\n"
resp := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(streamBody)),
}
h.handleResponsesStream(rec, req, resp, "owner-a", "resp_test", "deepseek-v4-pro", "prompt", true, false, []string{"read_file"}, promptcompat.DefaultToolChoicePolicy(), "")
body := rec.Body.String()
if !strings.Contains(body, "event: response.reasoning.delta") {
t.Fatalf("expected reasoning delta in stream body, got %s", body)
}
if !strings.Contains(body, "event: response.function_call_arguments.done") {
t.Fatalf("expected finalize fallback function call event, got %s", body)
}
if strings.Contains(body, "event: response.failed") {
t.Fatalf("did not expect response.failed, body=%s", body)
}
}
func TestHandleResponsesNonStreamRequiredToolChoiceViolation(t *testing.T) {
h := &Handler{}
rec := httptest.NewRecorder()
@@ -258,7 +291,7 @@ func TestHandleResponsesNonStreamRequiredToolChoiceViolation(t *testing.T) {
}
}
func TestHandleResponsesNonStreamRequiredToolChoiceIgnoresThinkingToolPayload(t *testing.T) {
func TestHandleResponsesNonStreamRequiredToolChoiceIgnoresThinkingToolPayloadWhenTextExists(t *testing.T) {
h := &Handler{}
rec := httptest.NewRecorder()
resp := &http.Response{
@@ -351,6 +384,32 @@ func TestHandleResponsesNonStreamReturns429WhenUpstreamHasOnlyThinking(t *testin
}
}
func TestHandleResponsesNonStreamPromotesThinkingToolCallsWhenTextEmpty(t *testing.T) {
h := &Handler{}
rec := httptest.NewRecorder()
resp := &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader(
`data: {"p":"response/thinking_content","v":"<tool_calls><invoke name=\"read_file\"><parameter name=\"path\">README.MD</parameter></invoke></tool_calls>"}` + "\n" +
`data: [DONE]` + "\n",
)),
}
h.handleResponsesNonStream(rec, resp, "owner-a", "resp_test", "deepseek-v4-pro", "prompt", true, false, []string{"read_file"}, promptcompat.DefaultToolChoicePolicy(), "")
if rec.Code != http.StatusOK {
t.Fatalf("expected 200 for thinking tool calls, got %d body=%s", rec.Code, rec.Body.String())
}
out := decodeJSONBody(t, rec.Body.String())
output, _ := out["output"].([]any)
if len(output) != 1 {
t.Fatalf("expected one output item, got %#v", out["output"])
}
first, _ := output[0].(map[string]any)
if got := asString(first["type"]); got != "function_call" {
t.Fatalf("expected function_call output, got %#v", first["type"])
}
}
func extractSSEEventPayload(body, targetEvent string) (map[string]any, bool) {
scanner := bufio.NewScanner(strings.NewReader(body))
matched := false