mirror of
https://github.com/CJackHwang/ds2api.git
synced 2026-05-13 04:38:00 +08:00
feat: implement stream lease management for Vercel hybrid streaming path to align occupancy duration with native Go streaming behavior.
This commit is contained in:
@@ -3,13 +3,17 @@ package openai
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/subtle"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -25,6 +29,14 @@ type Handler struct {
|
||||
Store *config.Store
|
||||
Auth *auth.Resolver
|
||||
DS *deepseek.Client
|
||||
|
||||
leaseMu sync.Mutex
|
||||
streamLeases map[string]streamLease
|
||||
}
|
||||
|
||||
type streamLease struct {
|
||||
Auth *auth.RequestAuth
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
func RegisterRoutes(r chi.Router, h *Handler) {
|
||||
@@ -37,6 +49,10 @@ func (h *Handler) ListModels(w http.ResponseWriter, _ *http.Request) {
|
||||
}
|
||||
|
||||
func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
|
||||
if isVercelStreamReleaseRequest(r) {
|
||||
h.handleVercelStreamRelease(w, r)
|
||||
return
|
||||
}
|
||||
if isVercelStreamPrepareRequest(r) {
|
||||
h.handleVercelStreamPrepare(w, r)
|
||||
return
|
||||
@@ -118,6 +134,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
h.sweepExpiredStreamLeases()
|
||||
internalSecret := vercelInternalSecret()
|
||||
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
|
||||
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
|
||||
@@ -134,7 +151,12 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
|
||||
writeOpenAIError(w, status, err.Error())
|
||||
return
|
||||
}
|
||||
defer h.Auth.Release(a)
|
||||
leased := false
|
||||
defer func() {
|
||||
if !leased {
|
||||
h.Auth.Release(a)
|
||||
}
|
||||
}()
|
||||
r = r.WithContext(auth.WithAuth(r.Context(), a))
|
||||
|
||||
var req map[string]any
|
||||
@@ -193,8 +215,15 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
|
||||
"thinking_enabled": thinkingEnabled,
|
||||
"search_enabled": searchEnabled,
|
||||
}
|
||||
leaseID := h.holdStreamLease(a)
|
||||
if leaseID == "" {
|
||||
writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease")
|
||||
return
|
||||
}
|
||||
leased = true
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"session_id": sessionID,
|
||||
"lease_id": leaseID,
|
||||
"model": model,
|
||||
"final_prompt": finalPrompt,
|
||||
"thinking_enabled": thinkingEnabled,
|
||||
@@ -205,6 +234,37 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) {
|
||||
if !config.IsVercel() {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
h.sweepExpiredStreamLeases()
|
||||
internalSecret := vercelInternalSecret()
|
||||
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
|
||||
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
|
||||
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
|
||||
return
|
||||
}
|
||||
|
||||
var req map[string]any
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
|
||||
return
|
||||
}
|
||||
leaseID, _ := req["lease_id"].(string)
|
||||
leaseID = strings.TrimSpace(leaseID)
|
||||
if leaseID == "" {
|
||||
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
|
||||
return
|
||||
}
|
||||
if !h.releaseStreamLease(leaseID) {
|
||||
writeOpenAIError(w, http.StatusNotFound, "stream lease not found")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"success": true})
|
||||
}
|
||||
|
||||
func (h *Handler) handleNonStream(w http.ResponseWriter, ctx context.Context, resp *http.Response, completionID, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string) {
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
@@ -592,6 +652,13 @@ func isVercelStreamPrepareRequest(r *http.Request) bool {
|
||||
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
|
||||
}
|
||||
|
||||
func isVercelStreamReleaseRequest(r *http.Request) bool {
|
||||
if r == nil {
|
||||
return false
|
||||
}
|
||||
return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
|
||||
}
|
||||
|
||||
func vercelInternalSecret() string {
|
||||
if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" {
|
||||
return v
|
||||
@@ -601,3 +668,102 @@ func vercelInternalSecret() string {
|
||||
}
|
||||
return "admin"
|
||||
}
|
||||
|
||||
func (h *Handler) holdStreamLease(a *auth.RequestAuth) string {
|
||||
if a == nil {
|
||||
return ""
|
||||
}
|
||||
now := time.Now()
|
||||
ttl := streamLeaseTTL()
|
||||
if ttl <= 0 {
|
||||
ttl = 15 * time.Minute
|
||||
}
|
||||
|
||||
h.leaseMu.Lock()
|
||||
expired := h.popExpiredLeasesLocked(now)
|
||||
if h.streamLeases == nil {
|
||||
h.streamLeases = make(map[string]streamLease)
|
||||
}
|
||||
leaseID := newLeaseID()
|
||||
h.streamLeases[leaseID] = streamLease{
|
||||
Auth: a,
|
||||
ExpiresAt: now.Add(ttl),
|
||||
}
|
||||
h.leaseMu.Unlock()
|
||||
h.releaseExpiredAuths(expired)
|
||||
return leaseID
|
||||
}
|
||||
|
||||
func (h *Handler) releaseStreamLease(leaseID string) bool {
|
||||
leaseID = strings.TrimSpace(leaseID)
|
||||
if leaseID == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
h.leaseMu.Lock()
|
||||
expired := h.popExpiredLeasesLocked(time.Now())
|
||||
lease, ok := h.streamLeases[leaseID]
|
||||
if ok {
|
||||
delete(h.streamLeases, leaseID)
|
||||
}
|
||||
h.leaseMu.Unlock()
|
||||
h.releaseExpiredAuths(expired)
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if h.Auth != nil {
|
||||
h.Auth.Release(lease.Auth)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {
|
||||
if len(h.streamLeases) == 0 {
|
||||
return nil
|
||||
}
|
||||
expired := make([]*auth.RequestAuth, 0)
|
||||
for leaseID, lease := range h.streamLeases {
|
||||
if now.After(lease.ExpiresAt) {
|
||||
delete(h.streamLeases, leaseID)
|
||||
expired = append(expired, lease.Auth)
|
||||
}
|
||||
}
|
||||
return expired
|
||||
}
|
||||
|
||||
func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) {
|
||||
if h.Auth == nil || len(expired) == 0 {
|
||||
return
|
||||
}
|
||||
for _, a := range expired {
|
||||
h.Auth.Release(a)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) sweepExpiredStreamLeases() {
|
||||
h.leaseMu.Lock()
|
||||
expired := h.popExpiredLeasesLocked(time.Now())
|
||||
h.leaseMu.Unlock()
|
||||
h.releaseExpiredAuths(expired)
|
||||
}
|
||||
|
||||
func streamLeaseTTL() time.Duration {
|
||||
raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS"))
|
||||
if raw == "" {
|
||||
return 15 * time.Minute
|
||||
}
|
||||
seconds, err := strconv.Atoi(raw)
|
||||
if err != nil || seconds <= 0 {
|
||||
return 15 * time.Minute
|
||||
}
|
||||
return time.Duration(seconds) * time.Second
|
||||
}
|
||||
|
||||
func newLeaseID() string {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := rand.Read(buf); err == nil {
|
||||
return hex.EncodeToString(buf)
|
||||
}
|
||||
return fmt.Sprintf("lease-%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"ds2api/internal/auth"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestIsVercelStreamPrepareRequest(t *testing.T) {
|
||||
@@ -17,6 +19,18 @@ func TestIsVercelStreamPrepareRequest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsVercelStreamReleaseRequest(t *testing.T) {
|
||||
req := httptest.NewRequest("POST", "/v1/chat/completions?__stream_release=1", nil)
|
||||
if !isVercelStreamReleaseRequest(req) {
|
||||
t.Fatalf("expected release request to be detected")
|
||||
}
|
||||
|
||||
req2 := httptest.NewRequest("POST", "/v1/chat/completions", nil)
|
||||
if isVercelStreamReleaseRequest(req2) {
|
||||
t.Fatalf("expected non-release request")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVercelInternalSecret(t *testing.T) {
|
||||
t.Run("prefer explicit secret", func(t *testing.T) {
|
||||
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
|
||||
@@ -42,3 +56,28 @@ func TestVercelInternalSecret(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamLeaseLifecycle(t *testing.T) {
|
||||
h := &Handler{}
|
||||
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false})
|
||||
if leaseID == "" {
|
||||
t.Fatalf("expected non-empty lease id")
|
||||
}
|
||||
if ok := h.releaseStreamLease(leaseID); !ok {
|
||||
t.Fatalf("expected lease release success")
|
||||
}
|
||||
if ok := h.releaseStreamLease(leaseID); ok {
|
||||
t.Fatalf("expected duplicate release to fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStreamLeaseTTL(t *testing.T) {
|
||||
t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "120")
|
||||
if got := streamLeaseTTL(); got != 120*time.Second {
|
||||
t.Fatalf("expected ttl=120s, got %v", got)
|
||||
}
|
||||
t.Setenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS", "invalid")
|
||||
if got := streamLeaseTTL(); got != 15*time.Minute {
|
||||
t.Fatalf("expected default ttl on invalid value, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user