package chat import ( "crypto/subtle" "encoding/json" "net/http" "os" "strconv" "strings" "time" "ds2api/internal/auth" "ds2api/internal/config" "ds2api/internal/promptcompat" "ds2api/internal/util" "github.com/google/uuid" ) func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) { if !config.IsVercel() { http.NotFound(w, r) return } h.sweepExpiredStreamLeases() internalSecret := vercelInternalSecret() internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") return } a, err := h.Auth.Determine(r) if err != nil { status := http.StatusUnauthorized if err == auth.ErrNoAccount { status = http.StatusTooManyRequests } writeOpenAIError(w, status, err.Error()) return } leased := false defer func() { if !leased { h.Auth.Release(a) } }() r = r.WithContext(auth.WithAuth(r.Context(), a)) var req map[string]any if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeOpenAIError(w, http.StatusBadRequest, "invalid json") return } if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil { writeOpenAIInlineFileError(w, err) return } if !util.ToBool(req["stream"]) { writeOpenAIError(w, http.StatusBadRequest, "stream must be true") return } stdReq, err := promptcompat.NormalizeOpenAIChatRequest(h.Store, req, requestTraceID(r)) if err != nil { writeOpenAIError(w, http.StatusBadRequest, err.Error()) return } if !stdReq.Stream { writeOpenAIError(w, http.StatusBadRequest, "stream must be true") return } stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq) if err != nil { status, message := mapCurrentInputFileError(err) writeOpenAIError(w, status, message) return } sessionID, err := h.DS.CreateSession(r.Context(), a, 3) if err != nil { if a.UseConfigToken { writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") } else { writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") } return } powHeader, err := h.DS.GetPow(r.Context(), a, 3) if err != nil { writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).") return } if strings.TrimSpace(a.DeepSeekToken) == "" { writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.") return } payload := stdReq.CompletionPayload(sessionID) leaseID := h.holdStreamLease(a) if leaseID == "" { writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") return } leased = true writeJSON(w, http.StatusOK, map[string]any{ "session_id": sessionID, "lease_id": leaseID, "model": stdReq.ResponseModel, "final_prompt": stdReq.FinalPrompt, "thinking_enabled": stdReq.Thinking, "search_enabled": stdReq.Search, "tool_names": stdReq.ToolNames, "deepseek_token": a.DeepSeekToken, "pow_header": powHeader, "payload": payload, }) } func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) { if !config.IsVercel() { http.NotFound(w, r) return } h.sweepExpiredStreamLeases() internalSecret := vercelInternalSecret() internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") return } var req map[string]any if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeOpenAIError(w, http.StatusBadRequest, "invalid json") return } leaseID, _ := req["lease_id"].(string) leaseID = strings.TrimSpace(leaseID) if leaseID == "" { writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") return } if !h.releaseStreamLease(leaseID) { writeOpenAIError(w, http.StatusNotFound, "stream lease not found") return } writeJSON(w, http.StatusOK, map[string]any{"success": true}) } func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) { if !config.IsVercel() { http.NotFound(w, r) return } internalSecret := vercelInternalSecret() internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token")) if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 { writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request") return } var req map[string]any if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeOpenAIError(w, http.StatusBadRequest, "invalid json") return } leaseID, _ := req["lease_id"].(string) leaseID = strings.TrimSpace(leaseID) if leaseID == "" { writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") return } leaseAuth := h.lookupStreamLeaseAuth(leaseID) if leaseAuth == nil { writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired") return } powHeader, err := h.DS.GetPow(r.Context(), leaseAuth, 3) if err != nil { writeOpenAIError(w, http.StatusInternalServerError, "Failed to get PoW.") return } writeJSON(w, http.StatusOK, map[string]any{ "pow_header": powHeader, }) } func isVercelStreamPrepareRequest(r *http.Request) bool { if r == nil { return false } return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1" } func isVercelStreamReleaseRequest(r *http.Request) bool { if r == nil { return false } return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1" } func isVercelStreamPowRequest(r *http.Request) bool { if r == nil { return false } return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1" } func vercelInternalSecret() string { if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" { return v } if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" { return v } return "admin" } func (h *Handler) holdStreamLease(a *auth.RequestAuth) string { if a == nil { return "" } now := time.Now() ttl := streamLeaseTTL() if ttl <= 0 { ttl = 15 * time.Minute } h.leaseMu.Lock() expired := h.popExpiredLeasesLocked(now) if h.streamLeases == nil { h.streamLeases = make(map[string]streamLease) } leaseID := newLeaseID() h.streamLeases[leaseID] = streamLease{ Auth: a, ExpiresAt: now.Add(ttl), } h.leaseMu.Unlock() h.releaseExpiredAuths(expired) return leaseID } func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { return nil } h.leaseMu.Lock() lease, ok := h.streamLeases[leaseID] h.leaseMu.Unlock() if !ok || time.Now().After(lease.ExpiresAt) { return nil } return lease.Auth } func (h *Handler) releaseStreamLease(leaseID string) bool { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { return false } h.leaseMu.Lock() expired := h.popExpiredLeasesLocked(time.Now()) lease, ok := h.streamLeases[leaseID] if ok { delete(h.streamLeases, leaseID) } h.leaseMu.Unlock() h.releaseExpiredAuths(expired) if !ok { return false } if h.Auth != nil { h.Auth.Release(lease.Auth) } return true } func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth { if len(h.streamLeases) == 0 { return nil } expired := make([]*auth.RequestAuth, 0) for leaseID, lease := range h.streamLeases { if now.After(lease.ExpiresAt) { delete(h.streamLeases, leaseID) expired = append(expired, lease.Auth) } } return expired } func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) { if h.Auth == nil || len(expired) == 0 { return } for _, a := range expired { h.Auth.Release(a) } } func (h *Handler) sweepExpiredStreamLeases() { h.leaseMu.Lock() expired := h.popExpiredLeasesLocked(time.Now()) h.leaseMu.Unlock() h.releaseExpiredAuths(expired) } func streamLeaseTTL() time.Duration { raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS")) if raw == "" { return 15 * time.Minute } seconds, err := strconv.Atoi(raw) if err != nil || seconds <= 0 { return 15 * time.Minute } return time.Duration(seconds) * time.Second } func newLeaseID() string { return strings.ReplaceAll(uuid.NewString(), "-", "") }