mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-22 08:57:42 +08:00
feat: Implement admin settings UI, enhance admin authentication with password hashing, and add new streaming runtime logic for Claude and OpenAI adapters with extensive compatibility tests.
This commit is contained in:
11
internal/adapter/claude/convert.go
Normal file
11
internal/adapter/claude/convert.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"ds2api/internal/claudeconv"
|
||||
)
|
||||
|
||||
const defaultClaudeModel = "claude-sonnet-4-5"
|
||||
|
||||
func convertClaudeToDeepSeek(claudeReq map[string]any, store ConfigReader) map[string]any {
|
||||
return claudeconv.ConvertClaudeToDeepSeek(claudeReq, store, defaultClaudeModel)
|
||||
}
|
||||
29
internal/adapter/claude/deps.go
Normal file
29
internal/adapter/claude/deps.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/deepseek"
|
||||
)
|
||||
|
||||
type AuthResolver interface {
|
||||
Determine(req *http.Request) (*auth.RequestAuth, error)
|
||||
Release(a *auth.RequestAuth)
|
||||
}
|
||||
|
||||
type DeepSeekCaller interface {
|
||||
CreateSession(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error)
|
||||
GetPow(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error)
|
||||
CallCompletion(ctx context.Context, a *auth.RequestAuth, payload map[string]any, powResp string, maxAttempts int) (*http.Response, error)
|
||||
}
|
||||
|
||||
type ConfigReader interface {
|
||||
ClaudeMapping() map[string]string
|
||||
}
|
||||
|
||||
var _ AuthResolver = (*auth.Resolver)(nil)
|
||||
var _ DeepSeekCaller = (*deepseek.Client)(nil)
|
||||
var _ ConfigReader = (*config.Store)(nil)
|
||||
33
internal/adapter/claude/deps_injection_test.go
Normal file
33
internal/adapter/claude/deps_injection_test.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package claude
|
||||
|
||||
import "testing"
|
||||
|
||||
type mockClaudeConfig struct {
|
||||
m map[string]string
|
||||
}
|
||||
|
||||
func (m mockClaudeConfig) ClaudeMapping() map[string]string { return m.m }
|
||||
|
||||
func TestNormalizeClaudeRequestUsesConfigInterfaceMapping(t *testing.T) {
|
||||
req := map[string]any{
|
||||
"model": "claude-opus-4-6",
|
||||
"messages": []any{
|
||||
map[string]any{"role": "user", "content": "hello"},
|
||||
},
|
||||
}
|
||||
out, err := normalizeClaudeRequest(mockClaudeConfig{
|
||||
m: map[string]string{
|
||||
"fast": "deepseek-chat",
|
||||
"slow": "deepseek-reasoner-search",
|
||||
},
|
||||
}, req)
|
||||
if err != nil {
|
||||
t.Fatalf("normalizeClaudeRequest error: %v", err)
|
||||
}
|
||||
if out.Standard.ResolvedModel != "deepseek-reasoner-search" {
|
||||
t.Fatalf("resolved model mismatch: got=%q", out.Standard.ResolvedModel)
|
||||
}
|
||||
if !out.Standard.Thinking || !out.Standard.Search {
|
||||
t.Fatalf("unexpected flags: thinking=%v search=%v", out.Standard.Thinking, out.Standard.Search)
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,9 @@ import (
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/deepseek"
|
||||
claudefmt "ds2api/internal/format/claude"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
@@ -21,9 +23,9 @@ import (
|
||||
var writeJSON = util.WriteJSON
|
||||
|
||||
type Handler struct {
|
||||
Store *config.Store
|
||||
Auth *auth.Resolver
|
||||
DS *deepseek.Client
|
||||
Store ConfigReader
|
||||
Auth AuthResolver
|
||||
DS DeepSeekCaller
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -98,7 +100,7 @@ func (h *Handler) Messages(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
result := sse.CollectStream(resp, stdReq.Thinking, true)
|
||||
respBody := util.BuildClaudeMessageResponse(
|
||||
respBody := claudefmt.BuildMessageResponse(
|
||||
fmt.Sprintf("msg_%d", time.Now().UnixNano()),
|
||||
stdReq.ResponseModel,
|
||||
norm.NormalizedMessages,
|
||||
@@ -169,279 +171,38 @@ func (h *Handler) handleClaudeStreamRealtime(w http.ResponseWriter, r *http.Requ
|
||||
if !canFlush {
|
||||
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
|
||||
}
|
||||
send := func(event string, v any) {
|
||||
b, _ := json.Marshal(v)
|
||||
_, _ = w.Write([]byte("event: "))
|
||||
_, _ = w.Write([]byte(event))
|
||||
_, _ = w.Write([]byte("\n"))
|
||||
_, _ = w.Write([]byte("data: "))
|
||||
_, _ = w.Write(b)
|
||||
_, _ = w.Write([]byte("\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
}
|
||||
}
|
||||
sendError := func(message string) {
|
||||
msg := strings.TrimSpace(message)
|
||||
if msg == "" {
|
||||
msg = "upstream stream error"
|
||||
}
|
||||
send("error", map[string]any{
|
||||
"type": "error",
|
||||
"error": map[string]any{
|
||||
"type": "api_error",
|
||||
"message": msg,
|
||||
"code": "internal_error",
|
||||
"param": nil,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
messageID := fmt.Sprintf("msg_%d", time.Now().UnixNano())
|
||||
inputTokens := util.EstimateTokens(fmt.Sprintf("%v", messages))
|
||||
send("message_start", map[string]any{
|
||||
"type": "message_start",
|
||||
"message": map[string]any{
|
||||
"id": messageID,
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"model": model,
|
||||
"content": []any{},
|
||||
"stop_reason": nil,
|
||||
"stop_sequence": nil,
|
||||
"usage": map[string]any{"input_tokens": inputTokens, "output_tokens": 0},
|
||||
},
|
||||
})
|
||||
streamRuntime := newClaudeStreamRuntime(
|
||||
w,
|
||||
rc,
|
||||
canFlush,
|
||||
model,
|
||||
messages,
|
||||
thinkingEnabled,
|
||||
searchEnabled,
|
||||
toolNames,
|
||||
)
|
||||
streamRuntime.sendMessageStart()
|
||||
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
initialType = "thinking"
|
||||
}
|
||||
parsedLines, done := sse.StartParsedLinePump(r.Context(), resp.Body, thinkingEnabled, initialType)
|
||||
bufferToolContent := len(toolNames) > 0
|
||||
hasContent := false
|
||||
lastContent := time.Now()
|
||||
keepaliveCount := 0
|
||||
|
||||
thinking := strings.Builder{}
|
||||
text := strings.Builder{}
|
||||
|
||||
nextBlockIndex := 0
|
||||
thinkingBlockOpen := false
|
||||
thinkingBlockIndex := -1
|
||||
textBlockOpen := false
|
||||
textBlockIndex := -1
|
||||
ended := false
|
||||
|
||||
closeThinkingBlock := func() {
|
||||
if !thinkingBlockOpen {
|
||||
return
|
||||
}
|
||||
send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": thinkingBlockIndex,
|
||||
})
|
||||
thinkingBlockOpen = false
|
||||
thinkingBlockIndex = -1
|
||||
}
|
||||
closeTextBlock := func() {
|
||||
if !textBlockOpen {
|
||||
return
|
||||
}
|
||||
send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": textBlockIndex,
|
||||
})
|
||||
textBlockOpen = false
|
||||
textBlockIndex = -1
|
||||
}
|
||||
|
||||
finalize := func(stopReason string) {
|
||||
if ended {
|
||||
return
|
||||
}
|
||||
ended = true
|
||||
|
||||
closeThinkingBlock()
|
||||
closeTextBlock()
|
||||
|
||||
finalThinking := thinking.String()
|
||||
finalText := text.String()
|
||||
|
||||
if bufferToolContent {
|
||||
detected := util.ParseToolCalls(finalText, toolNames)
|
||||
if len(detected) > 0 {
|
||||
stopReason = "tool_use"
|
||||
for i, tc := range detected {
|
||||
idx := nextBlockIndex + i
|
||||
send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": idx,
|
||||
"content_block": map[string]any{
|
||||
"type": "tool_use",
|
||||
"id": fmt.Sprintf("toolu_%d_%d", time.Now().Unix(), idx),
|
||||
"name": tc.Name,
|
||||
"input": tc.Input,
|
||||
},
|
||||
})
|
||||
send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": idx,
|
||||
})
|
||||
}
|
||||
nextBlockIndex += len(detected)
|
||||
} else if finalText != "" {
|
||||
idx := nextBlockIndex
|
||||
nextBlockIndex++
|
||||
send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": idx,
|
||||
"content_block": map[string]any{
|
||||
"type": "text",
|
||||
"text": "",
|
||||
},
|
||||
})
|
||||
send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": idx,
|
||||
"delta": map[string]any{
|
||||
"type": "text_delta",
|
||||
"text": finalText,
|
||||
},
|
||||
})
|
||||
send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": idx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
outputTokens := util.EstimateTokens(finalThinking) + util.EstimateTokens(finalText)
|
||||
send("message_delta", map[string]any{
|
||||
"type": "message_delta",
|
||||
"delta": map[string]any{
|
||||
"stop_reason": stopReason,
|
||||
"stop_sequence": nil,
|
||||
},
|
||||
"usage": map[string]any{
|
||||
"output_tokens": outputTokens,
|
||||
},
|
||||
})
|
||||
send("message_stop", map[string]any{"type": "message_stop"})
|
||||
}
|
||||
|
||||
pingTicker := time.NewTicker(claudeStreamPingInterval)
|
||||
defer pingTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-pingTicker.C:
|
||||
if !hasContent {
|
||||
keepaliveCount++
|
||||
if keepaliveCount >= claudeStreamMaxKeepaliveCnt {
|
||||
finalize("end_turn")
|
||||
return
|
||||
}
|
||||
}
|
||||
if hasContent && time.Since(lastContent) > claudeStreamIdleTimeout {
|
||||
finalize("end_turn")
|
||||
return
|
||||
}
|
||||
send("ping", map[string]any{"type": "ping"})
|
||||
case parsed, ok := <-parsedLines:
|
||||
if !ok {
|
||||
if err := <-done; err != nil {
|
||||
sendError(err.Error())
|
||||
return
|
||||
}
|
||||
finalize("end_turn")
|
||||
return
|
||||
}
|
||||
if !parsed.Parsed {
|
||||
continue
|
||||
}
|
||||
if parsed.ErrorMessage != "" {
|
||||
sendError(parsed.ErrorMessage)
|
||||
return
|
||||
}
|
||||
if parsed.Stop {
|
||||
finalize("end_turn")
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range parsed.Parts {
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
if p.Type != "thinking" && searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
|
||||
hasContent = true
|
||||
lastContent = time.Now()
|
||||
keepaliveCount = 0
|
||||
|
||||
if p.Type == "thinking" {
|
||||
if !thinkingEnabled {
|
||||
continue
|
||||
}
|
||||
thinking.WriteString(p.Text)
|
||||
closeTextBlock()
|
||||
if !thinkingBlockOpen {
|
||||
thinkingBlockIndex = nextBlockIndex
|
||||
nextBlockIndex++
|
||||
send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": thinkingBlockIndex,
|
||||
"content_block": map[string]any{
|
||||
"type": "thinking",
|
||||
"thinking": "",
|
||||
},
|
||||
})
|
||||
thinkingBlockOpen = true
|
||||
}
|
||||
send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": thinkingBlockIndex,
|
||||
"delta": map[string]any{
|
||||
"type": "thinking_delta",
|
||||
"thinking": p.Text,
|
||||
},
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
text.WriteString(p.Text)
|
||||
if bufferToolContent {
|
||||
continue
|
||||
}
|
||||
closeThinkingBlock()
|
||||
if !textBlockOpen {
|
||||
textBlockIndex = nextBlockIndex
|
||||
nextBlockIndex++
|
||||
send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": textBlockIndex,
|
||||
"content_block": map[string]any{
|
||||
"type": "text",
|
||||
"text": "",
|
||||
},
|
||||
})
|
||||
textBlockOpen = true
|
||||
}
|
||||
send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": textBlockIndex,
|
||||
"delta": map[string]any{
|
||||
"type": "text_delta",
|
||||
"text": p.Text,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
||||
Context: r.Context(),
|
||||
Body: resp.Body,
|
||||
ThinkingEnabled: thinkingEnabled,
|
||||
InitialType: initialType,
|
||||
KeepAliveInterval: claudeStreamPingInterval,
|
||||
IdleTimeout: claudeStreamIdleTimeout,
|
||||
MaxKeepAliveNoInput: claudeStreamMaxKeepaliveCnt,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnKeepAlive: func() {
|
||||
streamRuntime.sendPing()
|
||||
},
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnFinalize: streamRuntime.onFinalize,
|
||||
})
|
||||
}
|
||||
|
||||
func writeClaudeError(w http.ResponseWriter, status int, message string) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/deepseek"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
@@ -13,7 +14,7 @@ type claudeNormalizedRequest struct {
|
||||
NormalizedMessages []any
|
||||
}
|
||||
|
||||
func normalizeClaudeRequest(store *config.Store, req map[string]any) (claudeNormalizedRequest, error) {
|
||||
func normalizeClaudeRequest(store ConfigReader, req map[string]any) (claudeNormalizedRequest, error) {
|
||||
model, _ := req["model"].(string)
|
||||
messagesRaw, _ := req["messages"].([]any)
|
||||
if strings.TrimSpace(model) == "" || len(messagesRaw) == 0 {
|
||||
@@ -30,14 +31,14 @@ func normalizeClaudeRequest(store *config.Store, req map[string]any) (claudeNorm
|
||||
payload["messages"] = append([]any{map[string]any{"role": "system", "content": buildClaudeToolPrompt(toolsRequested)}}, normalizedMessages...)
|
||||
}
|
||||
|
||||
dsPayload := util.ConvertClaudeToDeepSeek(payload, store)
|
||||
dsPayload := convertClaudeToDeepSeek(payload, store)
|
||||
dsModel, _ := dsPayload["model"].(string)
|
||||
thinkingEnabled, searchEnabled, ok := config.GetModelConfig(dsModel)
|
||||
if !ok {
|
||||
thinkingEnabled = false
|
||||
searchEnabled = false
|
||||
}
|
||||
finalPrompt := util.MessagesPrepare(toMessageMaps(dsPayload["messages"]))
|
||||
finalPrompt := deepseek.MessagesPrepare(toMessageMaps(dsPayload["messages"]))
|
||||
toolNames := extractClaudeToolNames(toolsRequested)
|
||||
|
||||
return claudeNormalizedRequest{
|
||||
|
||||
308
internal/adapter/claude/stream_runtime.go
Normal file
308
internal/adapter/claude/stream_runtime.go
Normal file
@@ -0,0 +1,308 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
type claudeStreamRuntime struct {
|
||||
w http.ResponseWriter
|
||||
rc *http.ResponseController
|
||||
canFlush bool
|
||||
|
||||
model string
|
||||
toolNames []string
|
||||
messages []any
|
||||
|
||||
thinkingEnabled bool
|
||||
searchEnabled bool
|
||||
bufferToolContent bool
|
||||
|
||||
messageID string
|
||||
thinking strings.Builder
|
||||
text strings.Builder
|
||||
|
||||
nextBlockIndex int
|
||||
thinkingBlockOpen bool
|
||||
thinkingBlockIndex int
|
||||
textBlockOpen bool
|
||||
textBlockIndex int
|
||||
ended bool
|
||||
upstreamErr string
|
||||
}
|
||||
|
||||
func newClaudeStreamRuntime(
|
||||
w http.ResponseWriter,
|
||||
rc *http.ResponseController,
|
||||
canFlush bool,
|
||||
model string,
|
||||
messages []any,
|
||||
thinkingEnabled bool,
|
||||
searchEnabled bool,
|
||||
toolNames []string,
|
||||
) *claudeStreamRuntime {
|
||||
return &claudeStreamRuntime{
|
||||
w: w,
|
||||
rc: rc,
|
||||
canFlush: canFlush,
|
||||
model: model,
|
||||
messages: messages,
|
||||
thinkingEnabled: thinkingEnabled,
|
||||
searchEnabled: searchEnabled,
|
||||
bufferToolContent: len(toolNames) > 0,
|
||||
toolNames: toolNames,
|
||||
messageID: fmt.Sprintf("msg_%d", time.Now().UnixNano()),
|
||||
thinkingBlockIndex: -1,
|
||||
textBlockIndex: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) send(event string, v any) {
|
||||
b, _ := json.Marshal(v)
|
||||
_, _ = s.w.Write([]byte("event: "))
|
||||
_, _ = s.w.Write([]byte(event))
|
||||
_, _ = s.w.Write([]byte("\n"))
|
||||
_, _ = s.w.Write([]byte("data: "))
|
||||
_, _ = s.w.Write(b)
|
||||
_, _ = s.w.Write([]byte("\n\n"))
|
||||
if s.canFlush {
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) sendError(message string) {
|
||||
msg := strings.TrimSpace(message)
|
||||
if msg == "" {
|
||||
msg = "upstream stream error"
|
||||
}
|
||||
s.send("error", map[string]any{
|
||||
"type": "error",
|
||||
"error": map[string]any{
|
||||
"type": "api_error",
|
||||
"message": msg,
|
||||
"code": "internal_error",
|
||||
"param": nil,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) sendPing() {
|
||||
s.send("ping", map[string]any{"type": "ping"})
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) sendMessageStart() {
|
||||
inputTokens := util.EstimateTokens(fmt.Sprintf("%v", s.messages))
|
||||
s.send("message_start", map[string]any{
|
||||
"type": "message_start",
|
||||
"message": map[string]any{
|
||||
"id": s.messageID,
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"model": s.model,
|
||||
"content": []any{},
|
||||
"stop_reason": nil,
|
||||
"stop_sequence": nil,
|
||||
"usage": map[string]any{"input_tokens": inputTokens, "output_tokens": 0},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) closeThinkingBlock() {
|
||||
if !s.thinkingBlockOpen {
|
||||
return
|
||||
}
|
||||
s.send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": s.thinkingBlockIndex,
|
||||
})
|
||||
s.thinkingBlockOpen = false
|
||||
s.thinkingBlockIndex = -1
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) closeTextBlock() {
|
||||
if !s.textBlockOpen {
|
||||
return
|
||||
}
|
||||
s.send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": s.textBlockIndex,
|
||||
})
|
||||
s.textBlockOpen = false
|
||||
s.textBlockIndex = -1
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) finalize(stopReason string) {
|
||||
if s.ended {
|
||||
return
|
||||
}
|
||||
s.ended = true
|
||||
|
||||
s.closeThinkingBlock()
|
||||
s.closeTextBlock()
|
||||
|
||||
finalThinking := s.thinking.String()
|
||||
finalText := s.text.String()
|
||||
|
||||
if s.bufferToolContent {
|
||||
detected := util.ParseToolCalls(finalText, s.toolNames)
|
||||
if len(detected) > 0 {
|
||||
stopReason = "tool_use"
|
||||
for i, tc := range detected {
|
||||
idx := s.nextBlockIndex + i
|
||||
s.send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": idx,
|
||||
"content_block": map[string]any{
|
||||
"type": "tool_use",
|
||||
"id": fmt.Sprintf("toolu_%d_%d", time.Now().Unix(), idx),
|
||||
"name": tc.Name,
|
||||
"input": tc.Input,
|
||||
},
|
||||
})
|
||||
s.send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": idx,
|
||||
})
|
||||
}
|
||||
s.nextBlockIndex += len(detected)
|
||||
} else if finalText != "" {
|
||||
idx := s.nextBlockIndex
|
||||
s.nextBlockIndex++
|
||||
s.send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": idx,
|
||||
"content_block": map[string]any{
|
||||
"type": "text",
|
||||
"text": "",
|
||||
},
|
||||
})
|
||||
s.send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": idx,
|
||||
"delta": map[string]any{
|
||||
"type": "text_delta",
|
||||
"text": finalText,
|
||||
},
|
||||
})
|
||||
s.send("content_block_stop", map[string]any{
|
||||
"type": "content_block_stop",
|
||||
"index": idx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
outputTokens := util.EstimateTokens(finalThinking) + util.EstimateTokens(finalText)
|
||||
s.send("message_delta", map[string]any{
|
||||
"type": "message_delta",
|
||||
"delta": map[string]any{
|
||||
"stop_reason": stopReason,
|
||||
"stop_sequence": nil,
|
||||
},
|
||||
"usage": map[string]any{
|
||||
"output_tokens": outputTokens,
|
||||
},
|
||||
})
|
||||
s.send("message_stop", map[string]any{"type": "message_stop"})
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
if !parsed.Parsed {
|
||||
return streamengine.ParsedDecision{}
|
||||
}
|
||||
if parsed.ErrorMessage != "" {
|
||||
s.upstreamErr = parsed.ErrorMessage
|
||||
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("upstream_error")}
|
||||
}
|
||||
if parsed.Stop {
|
||||
return streamengine.ParsedDecision{Stop: true}
|
||||
}
|
||||
|
||||
contentSeen := false
|
||||
for _, p := range parsed.Parts {
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
if p.Type != "thinking" && s.searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
contentSeen = true
|
||||
|
||||
if p.Type == "thinking" {
|
||||
if !s.thinkingEnabled {
|
||||
continue
|
||||
}
|
||||
s.thinking.WriteString(p.Text)
|
||||
s.closeTextBlock()
|
||||
if !s.thinkingBlockOpen {
|
||||
s.thinkingBlockIndex = s.nextBlockIndex
|
||||
s.nextBlockIndex++
|
||||
s.send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": s.thinkingBlockIndex,
|
||||
"content_block": map[string]any{
|
||||
"type": "thinking",
|
||||
"thinking": "",
|
||||
},
|
||||
})
|
||||
s.thinkingBlockOpen = true
|
||||
}
|
||||
s.send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": s.thinkingBlockIndex,
|
||||
"delta": map[string]any{
|
||||
"type": "thinking_delta",
|
||||
"thinking": p.Text,
|
||||
},
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
s.text.WriteString(p.Text)
|
||||
if s.bufferToolContent {
|
||||
continue
|
||||
}
|
||||
s.closeThinkingBlock()
|
||||
if !s.textBlockOpen {
|
||||
s.textBlockIndex = s.nextBlockIndex
|
||||
s.nextBlockIndex++
|
||||
s.send("content_block_start", map[string]any{
|
||||
"type": "content_block_start",
|
||||
"index": s.textBlockIndex,
|
||||
"content_block": map[string]any{
|
||||
"type": "text",
|
||||
"text": "",
|
||||
},
|
||||
})
|
||||
s.textBlockOpen = true
|
||||
}
|
||||
s.send("content_block_delta", map[string]any{
|
||||
"type": "content_block_delta",
|
||||
"index": s.textBlockIndex,
|
||||
"delta": map[string]any{
|
||||
"type": "text_delta",
|
||||
"text": p.Text,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return streamengine.ParsedDecision{ContentSeen: contentSeen}
|
||||
}
|
||||
|
||||
func (s *claudeStreamRuntime) onFinalize(reason streamengine.StopReason, scannerErr error) {
|
||||
if string(reason) == "upstream_error" {
|
||||
s.sendError(s.upstreamErr)
|
||||
return
|
||||
}
|
||||
if scannerErr != nil {
|
||||
s.sendError(scannerErr.Error())
|
||||
return
|
||||
}
|
||||
s.finalize("end_turn")
|
||||
}
|
||||
237
internal/adapter/openai/chat_stream_runtime.go
Normal file
237
internal/adapter/openai/chat_stream_runtime.go
Normal file
@@ -0,0 +1,237 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
type chatStreamRuntime struct {
|
||||
w http.ResponseWriter
|
||||
rc *http.ResponseController
|
||||
canFlush bool
|
||||
|
||||
completionID string
|
||||
created int64
|
||||
model string
|
||||
finalPrompt string
|
||||
toolNames []string
|
||||
|
||||
thinkingEnabled bool
|
||||
searchEnabled bool
|
||||
|
||||
firstChunkSent bool
|
||||
bufferToolContent bool
|
||||
emitEarlyToolDeltas bool
|
||||
toolCallsEmitted bool
|
||||
|
||||
toolSieve toolStreamSieveState
|
||||
streamToolCallIDs map[int]string
|
||||
thinking strings.Builder
|
||||
text strings.Builder
|
||||
}
|
||||
|
||||
func newChatStreamRuntime(
|
||||
w http.ResponseWriter,
|
||||
rc *http.ResponseController,
|
||||
canFlush bool,
|
||||
completionID string,
|
||||
created int64,
|
||||
model string,
|
||||
finalPrompt string,
|
||||
thinkingEnabled bool,
|
||||
searchEnabled bool,
|
||||
toolNames []string,
|
||||
bufferToolContent bool,
|
||||
emitEarlyToolDeltas bool,
|
||||
) *chatStreamRuntime {
|
||||
return &chatStreamRuntime{
|
||||
w: w,
|
||||
rc: rc,
|
||||
canFlush: canFlush,
|
||||
completionID: completionID,
|
||||
created: created,
|
||||
model: model,
|
||||
finalPrompt: finalPrompt,
|
||||
toolNames: toolNames,
|
||||
thinkingEnabled: thinkingEnabled,
|
||||
searchEnabled: searchEnabled,
|
||||
bufferToolContent: bufferToolContent,
|
||||
emitEarlyToolDeltas: emitEarlyToolDeltas,
|
||||
streamToolCallIDs: map[int]string{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) sendKeepAlive() {
|
||||
if !s.canFlush {
|
||||
return
|
||||
}
|
||||
_, _ = s.w.Write([]byte(": keep-alive\n\n"))
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) sendChunk(v any) {
|
||||
b, _ := json.Marshal(v)
|
||||
_, _ = s.w.Write([]byte("data: "))
|
||||
_, _ = s.w.Write(b)
|
||||
_, _ = s.w.Write([]byte("\n\n"))
|
||||
if s.canFlush {
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) sendDone() {
|
||||
_, _ = s.w.Write([]byte("data: [DONE]\n\n"))
|
||||
if s.canFlush {
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) finalize(finishReason string) {
|
||||
finalThinking := s.thinking.String()
|
||||
finalText := s.text.String()
|
||||
detected := util.ParseToolCalls(finalText, s.toolNames)
|
||||
if len(detected) > 0 && !s.toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
delta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(detected),
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
s.sendChunk(openaifmt.BuildChatStreamChunk(
|
||||
s.completionID,
|
||||
s.created,
|
||||
s.model,
|
||||
[]map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
} else if s.bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&s.toolSieve, s.toolNames) {
|
||||
if evt.Content == "" {
|
||||
continue
|
||||
}
|
||||
delta := map[string]any{
|
||||
"content": evt.Content,
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
s.sendChunk(openaifmt.BuildChatStreamChunk(
|
||||
s.completionID,
|
||||
s.created,
|
||||
s.model,
|
||||
[]map[string]any{openaifmt.BuildChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
if len(detected) > 0 || s.toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
}
|
||||
s.sendChunk(openaifmt.BuildChatStreamChunk(
|
||||
s.completionID,
|
||||
s.created,
|
||||
s.model,
|
||||
[]map[string]any{openaifmt.BuildChatStreamFinishChoice(0, finishReason)},
|
||||
openaifmt.BuildChatUsage(s.finalPrompt, finalThinking, finalText),
|
||||
))
|
||||
s.sendDone()
|
||||
}
|
||||
|
||||
func (s *chatStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
if !parsed.Parsed {
|
||||
return streamengine.ParsedDecision{}
|
||||
}
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" {
|
||||
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReason("content_filter")}
|
||||
}
|
||||
if parsed.Stop {
|
||||
return streamengine.ParsedDecision{Stop: true, StopReason: streamengine.StopReasonHandlerRequested}
|
||||
}
|
||||
|
||||
newChoices := make([]map[string]any, 0, len(parsed.Parts))
|
||||
contentSeen := false
|
||||
for _, p := range parsed.Parts {
|
||||
if s.searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
contentSeen = true
|
||||
delta := map[string]any{}
|
||||
if !s.firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
if p.Type == "thinking" {
|
||||
if s.thinkingEnabled {
|
||||
s.thinking.WriteString(p.Text)
|
||||
delta["reasoning_content"] = p.Text
|
||||
}
|
||||
} else {
|
||||
s.text.WriteString(p.Text)
|
||||
if !s.bufferToolContent {
|
||||
delta["content"] = p.Text
|
||||
} else {
|
||||
events := processToolSieveChunk(&s.toolSieve, p.Text, s.toolNames)
|
||||
for _, evt := range events {
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !s.emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
s.toolCallsEmitted = true
|
||||
tcDelta := map[string]any{
|
||||
"tool_calls": formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, s.streamToolCallIDs),
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
tcDelta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
tcDelta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
tcDelta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if evt.Content != "" {
|
||||
contentDelta := map[string]any{
|
||||
"content": evt.Content,
|
||||
}
|
||||
if !s.firstChunkSent {
|
||||
contentDelta["role"] = "assistant"
|
||||
s.firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, contentDelta))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(delta) > 0 {
|
||||
newChoices = append(newChoices, openaifmt.BuildChatStreamDeltaChoice(0, delta))
|
||||
}
|
||||
}
|
||||
|
||||
if len(newChoices) > 0 {
|
||||
s.sendChunk(openaifmt.BuildChatStreamChunk(s.completionID, s.created, s.model, newChoices, nil))
|
||||
}
|
||||
return streamengine.ParsedDecision{ContentSeen: contentSeen}
|
||||
}
|
||||
35
internal/adapter/openai/deps.go
Normal file
35
internal/adapter/openai/deps.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/deepseek"
|
||||
)
|
||||
|
||||
type AuthResolver interface {
|
||||
Determine(req *http.Request) (*auth.RequestAuth, error)
|
||||
DetermineCaller(req *http.Request) (*auth.RequestAuth, error)
|
||||
Release(a *auth.RequestAuth)
|
||||
}
|
||||
|
||||
type DeepSeekCaller interface {
|
||||
CreateSession(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error)
|
||||
GetPow(ctx context.Context, a *auth.RequestAuth, maxAttempts int) (string, error)
|
||||
CallCompletion(ctx context.Context, a *auth.RequestAuth, payload map[string]any, powResp string, maxAttempts int) (*http.Response, error)
|
||||
}
|
||||
|
||||
type ConfigReader interface {
|
||||
ModelAliases() map[string]string
|
||||
CompatWideInputStrictOutput() bool
|
||||
ToolcallMode() string
|
||||
ToolcallEarlyEmitConfidence() string
|
||||
ResponsesStoreTTLSeconds() int
|
||||
EmbeddingsProvider() string
|
||||
}
|
||||
|
||||
var _ AuthResolver = (*auth.Resolver)(nil)
|
||||
var _ DeepSeekCaller = (*deepseek.Client)(nil)
|
||||
var _ ConfigReader = (*config.Store)(nil)
|
||||
70
internal/adapter/openai/deps_injection_test.go
Normal file
70
internal/adapter/openai/deps_injection_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package openai
|
||||
|
||||
import "testing"
|
||||
|
||||
type mockOpenAIConfig struct {
|
||||
aliases map[string]string
|
||||
wideInput bool
|
||||
toolMode string
|
||||
earlyEmit string
|
||||
responsesTTL int
|
||||
embedProv string
|
||||
}
|
||||
|
||||
func (m mockOpenAIConfig) ModelAliases() map[string]string { return m.aliases }
|
||||
func (m mockOpenAIConfig) CompatWideInputStrictOutput() bool {
|
||||
return m.wideInput
|
||||
}
|
||||
func (m mockOpenAIConfig) ToolcallMode() string { return m.toolMode }
|
||||
func (m mockOpenAIConfig) ToolcallEarlyEmitConfidence() string { return m.earlyEmit }
|
||||
func (m mockOpenAIConfig) ResponsesStoreTTLSeconds() int { return m.responsesTTL }
|
||||
func (m mockOpenAIConfig) EmbeddingsProvider() string { return m.embedProv }
|
||||
|
||||
func TestNormalizeOpenAIChatRequestWithConfigInterface(t *testing.T) {
|
||||
cfg := mockOpenAIConfig{
|
||||
aliases: map[string]string{
|
||||
"my-model": "deepseek-chat-search",
|
||||
},
|
||||
wideInput: true,
|
||||
}
|
||||
req := map[string]any{
|
||||
"model": "my-model",
|
||||
"messages": []any{map[string]any{"role": "user", "content": "hello"}},
|
||||
}
|
||||
out, err := normalizeOpenAIChatRequest(cfg, req)
|
||||
if err != nil {
|
||||
t.Fatalf("normalizeOpenAIChatRequest error: %v", err)
|
||||
}
|
||||
if out.ResolvedModel != "deepseek-chat-search" {
|
||||
t.Fatalf("resolved model mismatch: got=%q", out.ResolvedModel)
|
||||
}
|
||||
if !out.Search || out.Thinking {
|
||||
t.Fatalf("unexpected model flags: thinking=%v search=%v", out.Thinking, out.Search)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeOpenAIResponsesRequestWideInputPolicyFromInterface(t *testing.T) {
|
||||
req := map[string]any{
|
||||
"model": "deepseek-chat",
|
||||
"input": "hi",
|
||||
}
|
||||
|
||||
_, err := normalizeOpenAIResponsesRequest(mockOpenAIConfig{
|
||||
aliases: map[string]string{},
|
||||
wideInput: false,
|
||||
}, req)
|
||||
if err == nil {
|
||||
t.Fatal("expected error when wide input is disabled and only input is provided")
|
||||
}
|
||||
|
||||
out, err := normalizeOpenAIResponsesRequest(mockOpenAIConfig{
|
||||
aliases: map[string]string{},
|
||||
wideInput: true,
|
||||
}, req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error when wide input is enabled: %v", err)
|
||||
}
|
||||
if out.Surface != "openai_responses" {
|
||||
t.Fatalf("unexpected surface: %q", out.Surface)
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,9 @@ import (
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/config"
|
||||
"ds2api/internal/deepseek"
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
@@ -25,9 +27,9 @@ import (
|
||||
var writeJSON = util.WriteJSON
|
||||
|
||||
type Handler struct {
|
||||
Store *config.Store
|
||||
Auth *auth.Resolver
|
||||
DS *deepseek.Client
|
||||
Store ConfigReader
|
||||
Auth AuthResolver
|
||||
DS DeepSeekCaller
|
||||
|
||||
leaseMu sync.Mutex
|
||||
streamLeases map[string]streamLease
|
||||
@@ -136,7 +138,7 @@ func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, re
|
||||
|
||||
finalThinking := result.Thinking
|
||||
finalText := result.Text
|
||||
respBody := util.BuildOpenAIChatCompletion(completionID, model, finalPrompt, finalThinking, finalText, toolNames)
|
||||
respBody := openaifmt.BuildChatCompletion(completionID, model, finalPrompt, finalThinking, finalText, toolNames)
|
||||
writeJSON(w, http.StatusOK, respBody)
|
||||
}
|
||||
|
||||
@@ -158,214 +160,49 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
|
||||
}
|
||||
|
||||
created := time.Now().Unix()
|
||||
firstChunkSent := false
|
||||
bufferToolContent := len(toolNames) > 0 && h.toolcallFeatureMatchEnabled()
|
||||
emitEarlyToolDeltas := h.toolcallEarlyEmitHighConfidence()
|
||||
var toolSieve toolStreamSieveState
|
||||
toolCallsEmitted := false
|
||||
streamToolCallIDs := map[int]string{}
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
initialType = "thinking"
|
||||
}
|
||||
parsedLines, done := sse.StartParsedLinePump(r.Context(), resp.Body, thinkingEnabled, initialType)
|
||||
thinking := strings.Builder{}
|
||||
text := strings.Builder{}
|
||||
lastContent := time.Now()
|
||||
hasContent := false
|
||||
keepaliveTicker := time.NewTicker(time.Duration(deepseek.KeepAliveTimeout) * time.Second)
|
||||
defer keepaliveTicker.Stop()
|
||||
keepaliveCountWithoutContent := 0
|
||||
|
||||
sendChunk := func(v any) {
|
||||
b, _ := json.Marshal(v)
|
||||
_, _ = w.Write([]byte("data: "))
|
||||
_, _ = w.Write(b)
|
||||
_, _ = w.Write([]byte("\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
}
|
||||
}
|
||||
sendDone := func() {
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
}
|
||||
}
|
||||
streamRuntime := newChatStreamRuntime(
|
||||
w,
|
||||
rc,
|
||||
canFlush,
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
finalPrompt,
|
||||
thinkingEnabled,
|
||||
searchEnabled,
|
||||
toolNames,
|
||||
bufferToolContent,
|
||||
emitEarlyToolDeltas,
|
||||
)
|
||||
|
||||
finalize := func(finishReason string) {
|
||||
finalThinking := thinking.String()
|
||||
finalText := text.String()
|
||||
detected := util.ParseToolCalls(finalText, toolNames)
|
||||
if len(detected) > 0 && !toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
delta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(detected),
|
||||
}
|
||||
if !firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
} else if bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&toolSieve, toolNames) {
|
||||
if evt.Content == "" {
|
||||
continue
|
||||
}
|
||||
delta := map[string]any{
|
||||
"content": evt.Content,
|
||||
}
|
||||
if !firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamDeltaChoice(0, delta)},
|
||||
nil,
|
||||
))
|
||||
}
|
||||
}
|
||||
if len(detected) > 0 || toolCallsEmitted {
|
||||
finishReason = "tool_calls"
|
||||
}
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(
|
||||
completionID,
|
||||
created,
|
||||
model,
|
||||
[]map[string]any{util.BuildOpenAIChatStreamFinishChoice(0, finishReason)},
|
||||
util.BuildOpenAIChatUsage(finalPrompt, finalThinking, finalText),
|
||||
))
|
||||
sendDone()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-keepaliveTicker.C:
|
||||
if !hasContent {
|
||||
keepaliveCountWithoutContent++
|
||||
if keepaliveCountWithoutContent >= deepseek.MaxKeepaliveCount {
|
||||
finalize("stop")
|
||||
return
|
||||
}
|
||||
}
|
||||
if hasContent && time.Since(lastContent) > time.Duration(deepseek.StreamIdleTimeout)*time.Second {
|
||||
finalize("stop")
|
||||
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
||||
Context: r.Context(),
|
||||
Body: resp.Body,
|
||||
ThinkingEnabled: thinkingEnabled,
|
||||
InitialType: initialType,
|
||||
KeepAliveInterval: time.Duration(deepseek.KeepAliveTimeout) * time.Second,
|
||||
IdleTimeout: time.Duration(deepseek.StreamIdleTimeout) * time.Second,
|
||||
MaxKeepAliveNoInput: deepseek.MaxKeepaliveCount,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnKeepAlive: func() {
|
||||
streamRuntime.sendKeepAlive()
|
||||
},
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnFinalize: func(reason streamengine.StopReason, _ error) {
|
||||
if string(reason) == "content_filter" {
|
||||
streamRuntime.finalize("content_filter")
|
||||
return
|
||||
}
|
||||
if canFlush {
|
||||
_, _ = w.Write([]byte(": keep-alive\n\n"))
|
||||
_ = rc.Flush()
|
||||
}
|
||||
case parsed, ok := <-parsedLines:
|
||||
if !ok {
|
||||
// Ensure scanner completion is observed only after all queued
|
||||
// SSE lines are drained, avoiding early finalize races.
|
||||
_ = <-done
|
||||
finalize("stop")
|
||||
return
|
||||
}
|
||||
if !parsed.Parsed {
|
||||
continue
|
||||
}
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" {
|
||||
finalize("content_filter")
|
||||
return
|
||||
}
|
||||
if parsed.Stop {
|
||||
finalize("stop")
|
||||
return
|
||||
}
|
||||
newChoices := make([]map[string]any, 0, len(parsed.Parts))
|
||||
for _, p := range parsed.Parts {
|
||||
if searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
hasContent = true
|
||||
lastContent = time.Now()
|
||||
keepaliveCountWithoutContent = 0
|
||||
delta := map[string]any{}
|
||||
if !firstChunkSent {
|
||||
delta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
if p.Type == "thinking" {
|
||||
if thinkingEnabled {
|
||||
thinking.WriteString(p.Text)
|
||||
delta["reasoning_content"] = p.Text
|
||||
}
|
||||
} else {
|
||||
text.WriteString(p.Text)
|
||||
if !bufferToolContent {
|
||||
delta["content"] = p.Text
|
||||
} else {
|
||||
events := processToolSieveChunk(&toolSieve, p.Text, toolNames)
|
||||
if len(events) == 0 {
|
||||
// Keep thinking delta only frame.
|
||||
}
|
||||
for _, evt := range events {
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
toolCallsEmitted = true
|
||||
tcDelta := map[string]any{
|
||||
"tool_calls": formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs),
|
||||
}
|
||||
if !firstChunkSent {
|
||||
tcDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
toolCallsEmitted = true
|
||||
tcDelta := map[string]any{
|
||||
"tool_calls": util.FormatOpenAIStreamToolCalls(evt.ToolCalls),
|
||||
}
|
||||
if !firstChunkSent {
|
||||
tcDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, tcDelta))
|
||||
continue
|
||||
}
|
||||
if evt.Content != "" {
|
||||
contentDelta := map[string]any{
|
||||
"content": evt.Content,
|
||||
}
|
||||
if !firstChunkSent {
|
||||
contentDelta["role"] = "assistant"
|
||||
firstChunkSent = true
|
||||
}
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, contentDelta))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(delta) > 0 {
|
||||
newChoices = append(newChoices, util.BuildOpenAIChatStreamDeltaChoice(0, delta))
|
||||
}
|
||||
}
|
||||
if len(newChoices) > 0 {
|
||||
sendChunk(util.BuildOpenAIChatStreamChunk(completionID, created, model, newChoices, nil))
|
||||
}
|
||||
}
|
||||
}
|
||||
streamRuntime.finalize("stop")
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func injectToolPrompt(messages []map[string]any, tools []any) ([]map[string]any, []string) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package openai
|
||||
|
||||
import "ds2api/internal/util"
|
||||
import (
|
||||
"ds2api/internal/deepseek"
|
||||
)
|
||||
|
||||
func buildOpenAIFinalPrompt(messagesRaw []any, toolsRaw any) (string, []string) {
|
||||
messages := normalizeOpenAIMessagesForPrompt(messagesRaw)
|
||||
@@ -8,5 +10,5 @@ func buildOpenAIFinalPrompt(messagesRaw []any, toolsRaw any) (string, []string)
|
||||
if tools, ok := toolsRaw.([]any); ok && len(tools) > 0 {
|
||||
messages, toolNames = injectToolPrompt(messages, tools)
|
||||
}
|
||||
return util.MessagesPrepare(messages), toolNames
|
||||
return deepseek.MessagesPrepare(messages), toolNames
|
||||
}
|
||||
|
||||
@@ -6,13 +6,16 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"ds2api/internal/auth"
|
||||
"ds2api/internal/deepseek"
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
"ds2api/internal/util"
|
||||
streamengine "ds2api/internal/stream"
|
||||
)
|
||||
|
||||
func (h *Handler) GetResponseByID(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -108,7 +111,7 @@ func (h *Handler) handleResponsesNonStream(w http.ResponseWriter, resp *http.Res
|
||||
return
|
||||
}
|
||||
result := sse.CollectStream(resp, thinkingEnabled, true)
|
||||
responseObj := util.BuildOpenAIResponseObject(responseID, model, finalPrompt, result.Thinking, result.Text, toolNames)
|
||||
responseObj := openaifmt.BuildResponseObject(responseID, model, finalPrompt, result.Thinking, result.Text, toolNames)
|
||||
h.getResponseStore().put(owner, responseID, responseObj)
|
||||
writeJSON(w, http.StatusOK, responseObj)
|
||||
}
|
||||
@@ -127,114 +130,45 @@ func (h *Handler) handleResponsesStream(w http.ResponseWriter, r *http.Request,
|
||||
rc := http.NewResponseController(w)
|
||||
canFlush := rc.Flush() == nil
|
||||
|
||||
sendEvent := func(event string, payload map[string]any) {
|
||||
b, _ := json.Marshal(payload)
|
||||
_, _ = w.Write([]byte("event: " + event + "\n"))
|
||||
_, _ = w.Write([]byte("data: "))
|
||||
_, _ = w.Write(b)
|
||||
_, _ = w.Write([]byte("\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
sendEvent("response.created", util.BuildOpenAIResponsesCreatedPayload(responseID, model))
|
||||
|
||||
initialType := "text"
|
||||
if thinkingEnabled {
|
||||
initialType = "thinking"
|
||||
}
|
||||
parsedLines, done := sse.StartParsedLinePump(r.Context(), resp.Body, thinkingEnabled, initialType)
|
||||
bufferToolContent := len(toolNames) > 0 && h.toolcallFeatureMatchEnabled()
|
||||
emitEarlyToolDeltas := h.toolcallEarlyEmitHighConfidence()
|
||||
var sieve toolStreamSieveState
|
||||
thinking := strings.Builder{}
|
||||
text := strings.Builder{}
|
||||
toolCallsEmitted := false
|
||||
streamToolCallIDs := map[int]string{}
|
||||
|
||||
finalize := func() {
|
||||
finalThinking := thinking.String()
|
||||
finalText := text.String()
|
||||
if bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&sieve, toolNames) {
|
||||
if evt.Content != "" {
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
obj := util.BuildOpenAIResponseObject(responseID, model, finalPrompt, finalThinking, finalText, toolNames)
|
||||
if toolCallsEmitted {
|
||||
obj["status"] = "completed"
|
||||
}
|
||||
h.getResponseStore().put(owner, responseID, obj)
|
||||
sendEvent("response.completed", util.BuildOpenAIResponsesCompletedPayload(obj))
|
||||
_, _ = w.Write([]byte("data: [DONE]\n\n"))
|
||||
if canFlush {
|
||||
_ = rc.Flush()
|
||||
}
|
||||
}
|
||||
streamRuntime := newResponsesStreamRuntime(
|
||||
w,
|
||||
rc,
|
||||
canFlush,
|
||||
responseID,
|
||||
model,
|
||||
finalPrompt,
|
||||
thinkingEnabled,
|
||||
searchEnabled,
|
||||
toolNames,
|
||||
bufferToolContent,
|
||||
emitEarlyToolDeltas,
|
||||
func(obj map[string]any) {
|
||||
h.getResponseStore().put(owner, responseID, obj)
|
||||
},
|
||||
)
|
||||
streamRuntime.sendCreated()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case parsed, ok := <-parsedLines:
|
||||
if !ok {
|
||||
_ = <-done
|
||||
finalize()
|
||||
return
|
||||
}
|
||||
if !parsed.Parsed {
|
||||
continue
|
||||
}
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop {
|
||||
finalize()
|
||||
return
|
||||
}
|
||||
for _, p := range parsed.Parts {
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
if p.Type != "thinking" && searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
if p.Type == "thinking" {
|
||||
if !thinkingEnabled {
|
||||
continue
|
||||
}
|
||||
thinking.WriteString(p.Text)
|
||||
sendEvent("response.reasoning.delta", util.BuildOpenAIResponsesReasoningDeltaPayload(responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
text.WriteString(p.Text)
|
||||
if !bufferToolContent {
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
for _, evt := range processToolSieveChunk(&sieve, p.Text, toolNames) {
|
||||
if evt.Content != "" {
|
||||
sendEvent("response.output_text.delta", util.BuildOpenAIResponsesTextDeltaPayload(responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.delta", util.BuildOpenAIResponsesToolCallDeltaPayload(responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, streamToolCallIDs)))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
toolCallsEmitted = true
|
||||
sendEvent("response.output_tool_call.done", util.BuildOpenAIResponsesToolCallDonePayload(responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
streamengine.ConsumeSSE(streamengine.ConsumeConfig{
|
||||
Context: r.Context(),
|
||||
Body: resp.Body,
|
||||
ThinkingEnabled: thinkingEnabled,
|
||||
InitialType: initialType,
|
||||
KeepAliveInterval: time.Duration(deepseek.KeepAliveTimeout) * time.Second,
|
||||
IdleTimeout: time.Duration(deepseek.StreamIdleTimeout) * time.Second,
|
||||
MaxKeepAliveNoInput: deepseek.MaxKeepaliveCount,
|
||||
}, streamengine.ConsumeHooks{
|
||||
OnParsed: streamRuntime.onParsed,
|
||||
OnFinalize: func(_ streamengine.StopReason, _ error) {
|
||||
streamRuntime.finalize()
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func responsesMessagesFromRequest(req map[string]any) []any {
|
||||
|
||||
168
internal/adapter/openai/responses_stream_runtime.go
Normal file
168
internal/adapter/openai/responses_stream_runtime.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
openaifmt "ds2api/internal/format/openai"
|
||||
"ds2api/internal/sse"
|
||||
streamengine "ds2api/internal/stream"
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
type responsesStreamRuntime struct {
|
||||
w http.ResponseWriter
|
||||
rc *http.ResponseController
|
||||
canFlush bool
|
||||
|
||||
responseID string
|
||||
model string
|
||||
finalPrompt string
|
||||
toolNames []string
|
||||
|
||||
thinkingEnabled bool
|
||||
searchEnabled bool
|
||||
|
||||
bufferToolContent bool
|
||||
emitEarlyToolDeltas bool
|
||||
toolCallsEmitted bool
|
||||
|
||||
sieve toolStreamSieveState
|
||||
thinking strings.Builder
|
||||
text strings.Builder
|
||||
streamToolCallIDs map[int]string
|
||||
|
||||
persistResponse func(obj map[string]any)
|
||||
}
|
||||
|
||||
func newResponsesStreamRuntime(
|
||||
w http.ResponseWriter,
|
||||
rc *http.ResponseController,
|
||||
canFlush bool,
|
||||
responseID string,
|
||||
model string,
|
||||
finalPrompt string,
|
||||
thinkingEnabled bool,
|
||||
searchEnabled bool,
|
||||
toolNames []string,
|
||||
bufferToolContent bool,
|
||||
emitEarlyToolDeltas bool,
|
||||
persistResponse func(obj map[string]any),
|
||||
) *responsesStreamRuntime {
|
||||
return &responsesStreamRuntime{
|
||||
w: w,
|
||||
rc: rc,
|
||||
canFlush: canFlush,
|
||||
responseID: responseID,
|
||||
model: model,
|
||||
finalPrompt: finalPrompt,
|
||||
thinkingEnabled: thinkingEnabled,
|
||||
searchEnabled: searchEnabled,
|
||||
toolNames: toolNames,
|
||||
bufferToolContent: bufferToolContent,
|
||||
emitEarlyToolDeltas: emitEarlyToolDeltas,
|
||||
streamToolCallIDs: map[int]string{},
|
||||
persistResponse: persistResponse,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) sendEvent(event string, payload map[string]any) {
|
||||
b, _ := json.Marshal(payload)
|
||||
_, _ = s.w.Write([]byte("event: " + event + "\n"))
|
||||
_, _ = s.w.Write([]byte("data: "))
|
||||
_, _ = s.w.Write(b)
|
||||
_, _ = s.w.Write([]byte("\n\n"))
|
||||
if s.canFlush {
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) sendCreated() {
|
||||
s.sendEvent("response.created", openaifmt.BuildResponsesCreatedPayload(s.responseID, s.model))
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) sendDone() {
|
||||
_, _ = s.w.Write([]byte("data: [DONE]\n\n"))
|
||||
if s.canFlush {
|
||||
_ = s.rc.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) finalize() {
|
||||
finalThinking := s.thinking.String()
|
||||
finalText := s.text.String()
|
||||
if s.bufferToolContent {
|
||||
for _, evt := range flushToolSieve(&s.sieve, s.toolNames) {
|
||||
if evt.Content != "" {
|
||||
s.sendEvent("response.output_text.delta", openaifmt.BuildResponsesTextDeltaPayload(s.responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
obj := openaifmt.BuildResponseObject(s.responseID, s.model, s.finalPrompt, finalThinking, finalText, s.toolNames)
|
||||
if s.toolCallsEmitted {
|
||||
obj["status"] = "completed"
|
||||
}
|
||||
if s.persistResponse != nil {
|
||||
s.persistResponse(obj)
|
||||
}
|
||||
s.sendEvent("response.completed", openaifmt.BuildResponsesCompletedPayload(obj))
|
||||
s.sendDone()
|
||||
}
|
||||
|
||||
func (s *responsesStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision {
|
||||
if !parsed.Parsed {
|
||||
return streamengine.ParsedDecision{}
|
||||
}
|
||||
if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop {
|
||||
return streamengine.ParsedDecision{Stop: true}
|
||||
}
|
||||
|
||||
contentSeen := false
|
||||
for _, p := range parsed.Parts {
|
||||
if p.Text == "" {
|
||||
continue
|
||||
}
|
||||
if p.Type != "thinking" && s.searchEnabled && sse.IsCitation(p.Text) {
|
||||
continue
|
||||
}
|
||||
contentSeen = true
|
||||
if p.Type == "thinking" {
|
||||
if !s.thinkingEnabled {
|
||||
continue
|
||||
}
|
||||
s.thinking.WriteString(p.Text)
|
||||
s.sendEvent("response.reasoning.delta", openaifmt.BuildResponsesReasoningDeltaPayload(s.responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
|
||||
s.text.WriteString(p.Text)
|
||||
if !s.bufferToolContent {
|
||||
s.sendEvent("response.output_text.delta", openaifmt.BuildResponsesTextDeltaPayload(s.responseID, p.Text))
|
||||
continue
|
||||
}
|
||||
for _, evt := range processToolSieveChunk(&s.sieve, p.Text, s.toolNames) {
|
||||
if evt.Content != "" {
|
||||
s.sendEvent("response.output_text.delta", openaifmt.BuildResponsesTextDeltaPayload(s.responseID, evt.Content))
|
||||
}
|
||||
if len(evt.ToolCallDeltas) > 0 {
|
||||
if !s.emitEarlyToolDeltas {
|
||||
continue
|
||||
}
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.delta", openaifmt.BuildResponsesToolCallDeltaPayload(s.responseID, formatIncrementalStreamToolCallDeltas(evt.ToolCallDeltas, s.streamToolCallIDs)))
|
||||
}
|
||||
if len(evt.ToolCalls) > 0 {
|
||||
s.toolCallsEmitted = true
|
||||
s.sendEvent("response.output_tool_call.done", openaifmt.BuildResponsesToolCallDonePayload(s.responseID, util.FormatOpenAIStreamToolCalls(evt.ToolCalls)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return streamengine.ParsedDecision{ContentSeen: contentSeen}
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"ds2api/internal/util"
|
||||
)
|
||||
|
||||
func normalizeOpenAIChatRequest(store *config.Store, req map[string]any) (util.StandardRequest, error) {
|
||||
func normalizeOpenAIChatRequest(store ConfigReader, req map[string]any) (util.StandardRequest, error) {
|
||||
model, _ := req["model"].(string)
|
||||
messagesRaw, _ := req["messages"].([]any)
|
||||
if strings.TrimSpace(model) == "" || len(messagesRaw) == 0 {
|
||||
@@ -41,7 +41,7 @@ func normalizeOpenAIChatRequest(store *config.Store, req map[string]any) (util.S
|
||||
}, nil
|
||||
}
|
||||
|
||||
func normalizeOpenAIResponsesRequest(store *config.Store, req map[string]any) (util.StandardRequest, error) {
|
||||
func normalizeOpenAIResponsesRequest(store ConfigReader, req map[string]any) (util.StandardRequest, error) {
|
||||
model, _ := req["model"].(string)
|
||||
model = strings.TrimSpace(model)
|
||||
if model == "" {
|
||||
|
||||
Reference in New Issue
Block a user