feat: Introduce hybrid streaming for Vercel deployments using a Go prepare endpoint and Node.js stream handler to mitigate buffering.

This commit is contained in:
CJACK
2026-02-16 21:56:01 +08:00
parent d668465734
commit d70a0acaa8
10 changed files with 783 additions and 28 deletions

View File

@@ -230,19 +230,21 @@ func collectDeepSeek(resp *http.Response, thinkingEnabled bool) (string, string)
func (h *Handler) writeClaudeStream(w http.ResponseWriter, r *http.Request, model string, messages []any, fullText string, detected []util.ParsedToolCall) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
flusher, hasFlusher := w.(http.Flusher)
if !hasFlusher {
config.Logger.Warn("[claude_stream] response writer does not support flush; falling back to buffered SSE")
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
canFlush := rc.Flush() == nil
if !canFlush {
config.Logger.Warn("[claude_stream] response writer does not support flush; streaming may be buffered")
}
send := func(v any) {
b, _ := json.Marshal(v)
_, _ = w.Write([]byte("data: "))
_, _ = w.Write(b)
_, _ = w.Write([]byte("\n\n"))
if hasFlusher {
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
messageID := fmt.Sprintf("msg_%d", time.Now().UnixNano())

View File

@@ -3,10 +3,12 @@ package openai
import (
"bufio"
"context"
"crypto/subtle"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
@@ -35,6 +37,11 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) {
}
func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
if isVercelStreamPrepareRequest(r) {
h.handleVercelStreamPrepare(w, r)
return
}
a, err := h.Auth.Determine(r)
if err != nil {
status := http.StatusUnauthorized
@@ -106,6 +113,98 @@ func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
h.handleNonStream(w, r.Context(), resp, sessionID, model, finalPrompt, thinkingEnabled, searchEnabled, toolNames)
}
func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
a, err := h.Auth.Determine(r)
if err != nil {
status := http.StatusUnauthorized
if err == auth.ErrNoAccount {
status = http.StatusTooManyRequests
}
writeOpenAIError(w, status, err.Error())
return
}
defer h.Auth.Release(a)
r = r.WithContext(auth.WithAuth(r.Context(), a))
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
if !toBool(req["stream"]) {
writeOpenAIError(w, http.StatusBadRequest, "stream must be true")
return
}
if tools, ok := req["tools"].([]any); ok && len(tools) > 0 {
writeOpenAIError(w, http.StatusBadRequest, "tools are not supported by vercel stream prepare")
return
}
model, _ := req["model"].(string)
messagesRaw, _ := req["messages"].([]any)
if model == "" || len(messagesRaw) == 0 {
writeOpenAIError(w, http.StatusBadRequest, "Request must include 'model' and 'messages'.")
return
}
thinkingEnabled, searchEnabled, ok := config.GetModelConfig(model)
if !ok {
writeOpenAIError(w, http.StatusServiceUnavailable, fmt.Sprintf("Model '%s' is not available.", model))
return
}
messages := normalizeMessages(messagesRaw)
finalPrompt := util.MessagesPrepare(messages)
sessionID, err := h.DS.CreateSession(r.Context(), a, 3)
if err != nil {
if a.UseConfigToken {
writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.")
} else {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
}
return
}
powHeader, err := h.DS.GetPow(r.Context(), a, 3)
if err != nil {
writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).")
return
}
if strings.TrimSpace(a.DeepSeekToken) == "" {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
return
}
payload := map[string]any{
"chat_session_id": sessionID,
"parent_message_id": nil,
"prompt": finalPrompt,
"ref_file_ids": []any{},
"thinking_enabled": thinkingEnabled,
"search_enabled": searchEnabled,
}
writeJSON(w, http.StatusOK, map[string]any{
"session_id": sessionID,
"model": model,
"final_prompt": finalPrompt,
"thinking_enabled": thinkingEnabled,
"search_enabled": searchEnabled,
"deepseek_token": a.DeepSeekToken,
"pow_header": powHeader,
"payload": payload,
})
}
func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
@@ -191,11 +290,13 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Cache-Control", "no-cache, no-transform")
w.Header().Set("Connection", "keep-alive")
flusher, hasFlusher := w.(http.Flusher)
if !hasFlusher {
config.Logger.Warn("[stream] response writer does not support flush; falling back to buffered SSE")
w.Header().Set("X-Accel-Buffering", "no")
rc := http.NewResponseController(w)
canFlush := rc.Flush() == nil
if !canFlush {
config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered")
}
lines := make(chan []byte, 128)
@@ -232,14 +333,14 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
_, _ = w.Write([]byte("data: "))
_, _ = w.Write(b)
_, _ = w.Write([]byte("\n\n"))
if hasFlusher {
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
sendDone := func() {
_, _ = w.Write([]byte("data: [DONE]\n\n"))
if hasFlusher {
flusher.Flush()
if canFlush {
_ = rc.Flush()
}
}
@@ -316,9 +417,9 @@ func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *htt
finalize("stop")
return
}
if hasFlusher {
if canFlush {
_, _ = w.Write([]byte(": keep-alive\n\n"))
flusher.Flush()
_ = rc.Flush()
}
case line, ok := <-lines:
if !ok {
@@ -483,3 +584,20 @@ func openAIErrorType(status int) string {
return "invalid_request_error"
}
}
func isVercelStreamPrepareRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
}
func vercelInternalSecret() string {
if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" {
return v
}
if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" {
return v
}
return "admin"
}

View File

@@ -0,0 +1,44 @@
package openai
import (
"net/http/httptest"
"testing"
)
func TestIsVercelStreamPrepareRequest(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_prepare=1", nil)
if !isVercelStreamPrepareRequest(req) {
t.Fatalf("expected prepare request to be detected")
}
req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil)
if isVercelStreamPrepareRequest(req2) {
t.Fatalf("expected non-prepare request")
}
}
func TestVercelInternalSecret(t *testing.T) {
t.Run("prefer explicit secret", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
t.Setenv("DS2API_ADMIN_KEY", "admin-fallback")
if got := vercelInternalSecret(); got != "stream-secret" {
t.Fatalf("expected explicit secret, got %q", got)
}
})
t.Run("fallback to admin key", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "")
t.Setenv("DS2API_ADMIN_KEY", "admin-fallback")
if got := vercelInternalSecret(); got != "admin-fallback" {
t.Fatalf("expected admin key fallback, got %q", got)
}
})
t.Run("default admin when env missing", func(t *testing.T) {
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "")
t.Setenv("DS2API_ADMIN_KEY", "")
if got := vercelInternalSecret(); got != "admin" {
t.Fatalf("expected default admin fallback, got %q", got)
}
})
}