diff --git a/internal/httpapi/openai/chat/handler.go b/internal/httpapi/openai/chat/handler.go index 964147e..d91091d 100644 --- a/internal/httpapi/openai/chat/handler.go +++ b/internal/httpapi/openai/chat/handler.go @@ -34,6 +34,7 @@ type Handler struct { type streamLease struct { Auth *auth.RequestAuth Standard promptcompat.StandardRequest + SessionID string ExpiresAt time.Time } diff --git a/internal/httpapi/openai/chat/vercel_prepare_test.go b/internal/httpapi/openai/chat/vercel_prepare_test.go index 07861c2..b881180 100644 --- a/internal/httpapi/openai/chat/vercel_prepare_test.go +++ b/internal/httpapi/openai/chat/vercel_prepare_test.go @@ -68,14 +68,16 @@ func TestVercelInternalSecret(t *testing.T) { func TestStreamLeaseLifecycle(t *testing.T) { h := &Handler{} - leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}) + leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}, promptcompat.StandardRequest{}, "test-session-id") if leaseID == "" { t.Fatalf("expected non-empty lease id") } - if ok := h.releaseStreamLease(leaseID); !ok { + if lease, ok := h.releaseStreamLease(leaseID); !ok { t.Fatalf("expected lease release success") + } else if lease.SessionID != "test-session-id" { + t.Fatalf("expected released session id, got %q", lease.SessionID) } - if ok := h.releaseStreamLease(leaseID); ok { + if _, ok := h.releaseStreamLease(leaseID); ok { t.Fatalf("expected duplicate release to fail") } } @@ -210,6 +212,108 @@ func TestHandleVercelStreamPrepareUsesHalfwidthDSMLToolPrompt(t *testing.T) { } } +type vercelReleaseAutoDeleteDSStub struct { + resp *http.Response + deleteCallCount int + deletedSessionID string + deletedToken string + deleteErr error + events *[]string +} + +func (m *vercelReleaseAutoDeleteDSStub) CreateSession(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) { + return "session-id", nil +} + +func (m *vercelReleaseAutoDeleteDSStub) GetPow(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) { + return "pow", nil +} + +func (m *vercelReleaseAutoDeleteDSStub) UploadFile(_ context.Context, _ *auth.RequestAuth, _ dsclient.UploadFileRequest, _ int) (*dsclient.UploadFileResult, error) { + return &dsclient.UploadFileResult{ID: "file-id", Filename: "file.txt", Bytes: 1, Status: "uploaded"}, nil +} + +func (m *vercelReleaseAutoDeleteDSStub) CallCompletion(_ context.Context, _ *auth.RequestAuth, _ map[string]any, _ string, _ int) (*http.Response, error) { + return m.resp, nil +} + +func (m *vercelReleaseAutoDeleteDSStub) DeleteSessionForToken(_ context.Context, token string, sessionID string) (*dsclient.DeleteSessionResult, error) { + if m.events != nil { + *m.events = append(*m.events, "delete") + } + m.deleteCallCount++ + m.deletedSessionID = sessionID + m.deletedToken = token + if m.deleteErr != nil { + return nil, m.deleteErr + } + return &dsclient.DeleteSessionResult{SessionID: sessionID, Success: true}, nil +} + +func (m *vercelReleaseAutoDeleteDSStub) DeleteAllSessionsForToken(_ context.Context, _ string) error { + return nil +} + +type vercelReleaseAuthStub struct { + events *[]string +} + +func (a *vercelReleaseAuthStub) Determine(_ *http.Request) (*auth.RequestAuth, error) { + return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil +} + +func (a *vercelReleaseAuthStub) DetermineCaller(_ *http.Request) (*auth.RequestAuth, error) { + return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil +} + +func (a *vercelReleaseAuthStub) Release(_ *auth.RequestAuth) { + if a.events != nil { + *a.events = append(*a.events, "release") + } +} + +func TestHandleVercelStreamReleaseTriggersAutoDelete(t *testing.T) { + t.Setenv("VERCEL", "1") + t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") + + events := []string{} + ds := &vercelReleaseAutoDeleteDSStub{events: &events} + h := &Handler{ + Store: mockOpenAIConfig{ + autoDeleteMode: "single", + }, + Auth: &vercelReleaseAuthStub{events: &events}, + DS: ds, + } + + leaseID := h.holdStreamLease(&auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, promptcompat.StandardRequest{}, "session-to-delete") + if leaseID == "" { + t.Fatalf("expected non-empty lease id") + } + + reqBody := map[string]any{"lease_id": leaseID} + reqJSON, _ := json.Marshal(reqBody) + req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_release=1", strings.NewReader(string(reqJSON))) + req.Header.Set("X-Ds2-Internal-Token", "stream-secret") + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + h.handleVercelStreamRelease(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String()) + } + if ds.deleteCallCount != 1 { + t.Fatalf("expected auto delete call count=1, got %d", ds.deleteCallCount) + } + if ds.deletedSessionID != "session-to-delete" { + t.Fatalf("expected deleted session id=session-to-delete, got %q", ds.deletedSessionID) + } + if got, want := strings.Join(events, ","), "delete,release"; got != want { + t.Fatalf("expected auto-delete before auth release, got %s", got) + } +} + func TestHandleVercelStreamPrepareUploadsToolsSeparately(t *testing.T) { t.Setenv("VERCEL", "1") t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret") @@ -367,7 +471,7 @@ func TestHandleVercelStreamSwitchReuploadsCurrentInputFile(t *testing.T) { RefFileIDs: []string{"file-old", "file-old-tools", "client-file"}, Thinking: true, } - leaseID := h.holdStreamLease(a, stdReq) + leaseID := h.holdStreamLease(a, stdReq, "") req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_switch=1", strings.NewReader(`{"lease_id":"`+leaseID+`"}`)) req.Header.Set("X-Ds2-Internal-Token", "stream-secret") rec := httptest.NewRecorder() diff --git a/internal/httpapi/openai/chat/vercel_stream.go b/internal/httpapi/openai/chat/vercel_stream.go index 6e00a37..77b216a 100644 --- a/internal/httpapi/openai/chat/vercel_stream.go +++ b/internal/httpapi/openai/chat/vercel_stream.go @@ -97,7 +97,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque } payload := stdReq.CompletionPayload(sessionID) - leaseID := h.holdStreamLease(a, stdReq) + leaseID := h.holdStreamLease(a, stdReq, sessionID) if leaseID == "" { writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease") return @@ -141,10 +141,17 @@ func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Reque writeOpenAIError(w, http.StatusBadRequest, "lease_id is required") return } - if !h.releaseStreamLease(leaseID) { + lease, ok := h.releaseStreamLease(leaseID) + if !ok { writeOpenAIError(w, http.StatusNotFound, "stream lease not found") return } + if h.Auth != nil && lease.Auth != nil { + defer h.Auth.Release(lease.Auth) + } + if lease.Auth != nil { + h.autoDeleteRemoteSession(r.Context(), lease.Auth, lease.SessionID) + } writeJSON(w, http.StatusOK, map[string]any{"success": true}) } @@ -245,7 +252,7 @@ func (h *Handler) handleVercelStreamSwitch(w http.ResponseWriter, r *http.Reques writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.") return } - h.updateStreamLeaseStandard(leaseID, stdReq) + h.updateStreamLeaseState(leaseID, stdReq, sessionID) writeJSON(w, http.StatusOK, map[string]any{ "session_id": sessionID, "lease_id": leaseID, @@ -298,14 +305,10 @@ func vercelInternalSecret() string { return "admin" } -func (h *Handler) holdStreamLease(a *auth.RequestAuth, standards ...promptcompat.StandardRequest) string { +func (h *Handler) holdStreamLease(a *auth.RequestAuth, stdReq promptcompat.StandardRequest, sessionID string) string { if a == nil { return "" } - var stdReq promptcompat.StandardRequest - if len(standards) > 0 { - stdReq = standards[0] - } now := time.Now() ttl := streamLeaseTTL() if ttl <= 0 { @@ -321,6 +324,7 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth, standards ...promptcompat h.streamLeases[leaseID] = streamLease{ Auth: a, Standard: stdReq, + SessionID: sessionID, ExpiresAt: now.Add(ttl), } h.leaseMu.Unlock() @@ -350,7 +354,7 @@ func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth { return lease.Auth } -func (h *Handler) updateStreamLeaseStandard(leaseID string, stdReq promptcompat.StandardRequest) { +func (h *Handler) updateStreamLeaseState(leaseID string, stdReq promptcompat.StandardRequest, sessionID string) { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { return @@ -362,13 +366,14 @@ func (h *Handler) updateStreamLeaseStandard(leaseID string, stdReq promptcompat. return } lease.Standard = stdReq + lease.SessionID = sessionID h.streamLeases[leaseID] = lease } -func (h *Handler) releaseStreamLease(leaseID string) bool { +func (h *Handler) releaseStreamLease(leaseID string) (streamLease, bool) { leaseID = strings.TrimSpace(leaseID) if leaseID == "" { - return false + return streamLease{}, false } h.leaseMu.Lock() @@ -381,12 +386,9 @@ func (h *Handler) releaseStreamLease(leaseID string) bool { h.releaseExpiredAuths(expired) if !ok { - return false + return streamLease{}, false } - if h.Auth != nil { - h.Auth.Release(lease.Auth) - } - return true + return lease, true } func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {