Files
ds2api/internal/httpapi/openai/chat/vercel_stream.go
ds2api-bot df6859bddc fix(vercel): enable auto-delete session on Vercel stream release
The "delete current conversation" feature was not working on Vercel
deployment because the stream flow uses a separate lease mechanism.
The session_id created during prepare phase was not preserved for
deletion when the stream ends.

Changes:
- Add SessionID field to streamLease struct to preserve session_id
- Pass session_id to holdStreamLease during prepare
- Modify releaseStreamLease to return auth and session_id
- Call autoDeleteRemoteSession in handleVercelStreamRelease when
  releasing a lease with auto-delete mode enabled

Closes #vercel-auto-delete
2026-05-10 02:05:05 +00:00

332 lines
8.6 KiB
Go

package chat
import (
"crypto/subtle"
"encoding/json"
"net/http"
"os"
"strconv"
"strings"
"time"
"ds2api/internal/auth"
"ds2api/internal/config"
"ds2api/internal/promptcompat"
"ds2api/internal/util"
"github.com/google/uuid"
)
func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
h.sweepExpiredStreamLeases()
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
a, err := h.Auth.Determine(r)
if err != nil {
status := http.StatusUnauthorized
if err == auth.ErrNoAccount {
status = http.StatusTooManyRequests
}
writeOpenAIError(w, status, err.Error())
return
}
leased := false
defer func() {
if !leased {
h.Auth.Release(a)
}
}()
r = r.WithContext(auth.WithAuth(r.Context(), a))
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil {
writeOpenAIInlineFileError(w, err)
return
}
if !util.ToBool(req["stream"]) {
writeOpenAIError(w, http.StatusBadRequest, "stream must be true")
return
}
stdReq, err := promptcompat.NormalizeOpenAIChatRequest(h.Store, req, requestTraceID(r))
if err != nil {
writeOpenAIError(w, http.StatusBadRequest, err.Error())
return
}
if !stdReq.Stream {
writeOpenAIError(w, http.StatusBadRequest, "stream must be true")
return
}
stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq)
if err != nil {
status, message := mapCurrentInputFileError(err)
writeOpenAIError(w, status, message)
return
}
sessionID, err := h.DS.CreateSession(r.Context(), a, 3)
if err != nil {
if a.UseConfigToken {
writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.")
} else {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
}
return
}
powHeader, err := h.DS.GetPow(r.Context(), a, 3)
if err != nil {
writeOpenAIError(w, http.StatusUnauthorized, "Failed to get PoW (invalid token or unknown error).")
return
}
if strings.TrimSpace(a.DeepSeekToken) == "" {
writeOpenAIError(w, http.StatusUnauthorized, "Invalid token. If this should be a DS2API key, add it to config.keys first.")
return
}
payload := stdReq.CompletionPayload(sessionID)
leaseID := h.holdStreamLease(a, sessionID)
if leaseID == "" {
writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease")
return
}
leased = true
writeJSON(w, http.StatusOK, map[string]any{
"session_id": sessionID,
"lease_id": leaseID,
"model": stdReq.ResponseModel,
"final_prompt": stdReq.FinalPrompt,
"thinking_enabled": stdReq.Thinking,
"search_enabled": stdReq.Search,
"tool_names": stdReq.ToolNames,
"deepseek_token": a.DeepSeekToken,
"pow_header": powHeader,
"payload": payload,
})
}
func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
h.sweepExpiredStreamLeases()
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
leaseID, _ := req["lease_id"].(string)
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
ok, leaseAuth, sessionID := h.releaseStreamLease(leaseID)
if !ok {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found")
return
}
if sessionID != "" && leaseAuth != nil {
h.autoDeleteRemoteSession(r.Context(), leaseAuth, sessionID)
}
writeJSON(w, http.StatusOK, map[string]any{"success": true})
}
func (h *Handler) handleVercelStreamPow(w http.ResponseWriter, r *http.Request) {
if !config.IsVercel() {
http.NotFound(w, r)
return
}
internalSecret := vercelInternalSecret()
internalToken := strings.TrimSpace(r.Header.Get("X-Ds2-Internal-Token"))
if internalSecret == "" || subtle.ConstantTimeCompare([]byte(internalToken), []byte(internalSecret)) != 1 {
writeOpenAIError(w, http.StatusUnauthorized, "unauthorized internal request")
return
}
var req map[string]any
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeOpenAIError(w, http.StatusBadRequest, "invalid json")
return
}
leaseID, _ := req["lease_id"].(string)
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
leaseAuth := h.lookupStreamLeaseAuth(leaseID)
if leaseAuth == nil {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found or expired")
return
}
powHeader, err := h.DS.GetPow(r.Context(), leaseAuth, 3)
if err != nil {
writeOpenAIError(w, http.StatusInternalServerError, "Failed to get PoW.")
return
}
writeJSON(w, http.StatusOK, map[string]any{
"pow_header": powHeader,
})
}
func isVercelStreamPrepareRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_prepare")) == "1"
}
func isVercelStreamReleaseRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_release")) == "1"
}
func isVercelStreamPowRequest(r *http.Request) bool {
if r == nil {
return false
}
return strings.TrimSpace(r.URL.Query().Get("__stream_pow")) == "1"
}
func vercelInternalSecret() string {
if v := strings.TrimSpace(os.Getenv("DS2API_VERCEL_INTERNAL_SECRET")); v != "" {
return v
}
if v := strings.TrimSpace(os.Getenv("DS2API_ADMIN_KEY")); v != "" {
return v
}
return "admin"
}
func (h *Handler) holdStreamLease(a *auth.RequestAuth, sessionID string) string {
if a == nil {
return ""
}
now := time.Now()
ttl := streamLeaseTTL()
if ttl <= 0 {
ttl = 15 * time.Minute
}
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(now)
if h.streamLeases == nil {
h.streamLeases = make(map[string]streamLease)
}
leaseID := newLeaseID()
h.streamLeases[leaseID] = streamLease{
Auth: a,
SessionID: sessionID,
ExpiresAt: now.Add(ttl),
}
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
return leaseID
}
func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return nil
}
h.leaseMu.Lock()
lease, ok := h.streamLeases[leaseID]
h.leaseMu.Unlock()
if !ok || time.Now().After(lease.ExpiresAt) {
return nil
}
return lease.Auth
}
func (h *Handler) releaseStreamLease(leaseID string) (bool, *auth.RequestAuth, string) {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return false, nil, ""
}
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(time.Now())
lease, ok := h.streamLeases[leaseID]
if ok {
delete(h.streamLeases, leaseID)
}
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
if !ok {
return false, nil, ""
}
if h.Auth != nil {
h.Auth.Release(lease.Auth)
}
return true, lease.Auth, lease.SessionID
}
func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {
if len(h.streamLeases) == 0 {
return nil
}
expired := make([]*auth.RequestAuth, 0)
for leaseID, lease := range h.streamLeases {
if now.After(lease.ExpiresAt) {
delete(h.streamLeases, leaseID)
expired = append(expired, lease.Auth)
}
}
return expired
}
func (h *Handler) releaseExpiredAuths(expired []*auth.RequestAuth) {
if h.Auth == nil || len(expired) == 0 {
return
}
for _, a := range expired {
h.Auth.Release(a)
}
}
func (h *Handler) sweepExpiredStreamLeases() {
h.leaseMu.Lock()
expired := h.popExpiredLeasesLocked(time.Now())
h.leaseMu.Unlock()
h.releaseExpiredAuths(expired)
}
func streamLeaseTTL() time.Duration {
raw := strings.TrimSpace(os.Getenv("DS2API_VERCEL_STREAM_LEASE_TTL_SECONDS"))
if raw == "" {
return 15 * time.Minute
}
seconds, err := strconv.Atoi(raw)
if err != nil || seconds <= 0 {
return 15 * time.Minute
}
return time.Duration(seconds) * time.Second
}
func newLeaseID() string {
return strings.ReplaceAll(uuid.NewString(), "-", "")
}