Merge branch 'pr-474' into dev

# Conflicts:
#	internal/httpapi/openai/chat/handler.go
#	internal/httpapi/openai/chat/vercel_prepare_test.go
#	internal/httpapi/openai/chat/vercel_stream.go
This commit is contained in:
CJACK
2026-05-10 16:28:36 +08:00
3 changed files with 127 additions and 20 deletions

View File

@@ -34,6 +34,7 @@ type Handler struct {
type streamLease struct {
Auth *auth.RequestAuth
Standard promptcompat.StandardRequest
SessionID string
ExpiresAt time.Time
}

View File

@@ -68,14 +68,16 @@ func TestVercelInternalSecret(t *testing.T) {
func TestStreamLeaseLifecycle(t *testing.T) {
h := &Handler{}
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false})
leaseID := h.holdStreamLease(&auth.RequestAuth{UseConfigToken: false}, promptcompat.StandardRequest{}, "test-session-id")
if leaseID == "" {
t.Fatalf("expected non-empty lease id")
}
if ok := h.releaseStreamLease(leaseID); !ok {
if lease, ok := h.releaseStreamLease(leaseID); !ok {
t.Fatalf("expected lease release success")
} else if lease.SessionID != "test-session-id" {
t.Fatalf("expected released session id, got %q", lease.SessionID)
}
if ok := h.releaseStreamLease(leaseID); ok {
if _, ok := h.releaseStreamLease(leaseID); ok {
t.Fatalf("expected duplicate release to fail")
}
}
@@ -210,6 +212,108 @@ func TestHandleVercelStreamPrepareUsesHalfwidthDSMLToolPrompt(t *testing.T) {
}
}
type vercelReleaseAutoDeleteDSStub struct {
resp *http.Response
deleteCallCount int
deletedSessionID string
deletedToken string
deleteErr error
events *[]string
}
func (m *vercelReleaseAutoDeleteDSStub) CreateSession(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
return "session-id", nil
}
func (m *vercelReleaseAutoDeleteDSStub) GetPow(_ context.Context, _ *auth.RequestAuth, _ int) (string, error) {
return "pow", nil
}
func (m *vercelReleaseAutoDeleteDSStub) UploadFile(_ context.Context, _ *auth.RequestAuth, _ dsclient.UploadFileRequest, _ int) (*dsclient.UploadFileResult, error) {
return &dsclient.UploadFileResult{ID: "file-id", Filename: "file.txt", Bytes: 1, Status: "uploaded"}, nil
}
func (m *vercelReleaseAutoDeleteDSStub) CallCompletion(_ context.Context, _ *auth.RequestAuth, _ map[string]any, _ string, _ int) (*http.Response, error) {
return m.resp, nil
}
func (m *vercelReleaseAutoDeleteDSStub) DeleteSessionForToken(_ context.Context, token string, sessionID string) (*dsclient.DeleteSessionResult, error) {
if m.events != nil {
*m.events = append(*m.events, "delete")
}
m.deleteCallCount++
m.deletedSessionID = sessionID
m.deletedToken = token
if m.deleteErr != nil {
return nil, m.deleteErr
}
return &dsclient.DeleteSessionResult{SessionID: sessionID, Success: true}, nil
}
func (m *vercelReleaseAutoDeleteDSStub) DeleteAllSessionsForToken(_ context.Context, _ string) error {
return nil
}
type vercelReleaseAuthStub struct {
events *[]string
}
func (a *vercelReleaseAuthStub) Determine(_ *http.Request) (*auth.RequestAuth, error) {
return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil
}
func (a *vercelReleaseAuthStub) DetermineCaller(_ *http.Request) (*auth.RequestAuth, error) {
return &auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, nil
}
func (a *vercelReleaseAuthStub) Release(_ *auth.RequestAuth) {
if a.events != nil {
*a.events = append(*a.events, "release")
}
}
func TestHandleVercelStreamReleaseTriggersAutoDelete(t *testing.T) {
t.Setenv("VERCEL", "1")
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
events := []string{}
ds := &vercelReleaseAutoDeleteDSStub{events: &events}
h := &Handler{
Store: mockOpenAIConfig{
autoDeleteMode: "single",
},
Auth: &vercelReleaseAuthStub{events: &events},
DS: ds,
}
leaseID := h.holdStreamLease(&auth.RequestAuth{DeepSeekToken: "test-token", AccountID: "test-account"}, promptcompat.StandardRequest{}, "session-to-delete")
if leaseID == "" {
t.Fatalf("expected non-empty lease id")
}
reqBody := map[string]any{"lease_id": leaseID}
reqJSON, _ := json.Marshal(reqBody)
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_release=1", strings.NewReader(string(reqJSON)))
req.Header.Set("X-Ds2-Internal-Token", "stream-secret")
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
h.handleVercelStreamRelease(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("expected 200, got %d body=%s", rec.Code, rec.Body.String())
}
if ds.deleteCallCount != 1 {
t.Fatalf("expected auto delete call count=1, got %d", ds.deleteCallCount)
}
if ds.deletedSessionID != "session-to-delete" {
t.Fatalf("expected deleted session id=session-to-delete, got %q", ds.deletedSessionID)
}
if got, want := strings.Join(events, ","), "delete,release"; got != want {
t.Fatalf("expected auto-delete before auth release, got %s", got)
}
}
func TestHandleVercelStreamPrepareUploadsToolsSeparately(t *testing.T) {
t.Setenv("VERCEL", "1")
t.Setenv("DS2API_VERCEL_INTERNAL_SECRET", "stream-secret")
@@ -367,7 +471,7 @@ func TestHandleVercelStreamSwitchReuploadsCurrentInputFile(t *testing.T) {
RefFileIDs: []string{"file-old", "file-old-tools", "client-file"},
Thinking: true,
}
leaseID := h.holdStreamLease(a, stdReq)
leaseID := h.holdStreamLease(a, stdReq, "")
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions?__stream_switch=1", strings.NewReader(`{"lease_id":"`+leaseID+`"}`))
req.Header.Set("X-Ds2-Internal-Token", "stream-secret")
rec := httptest.NewRecorder()

View File

@@ -97,7 +97,7 @@ func (h *Handler) handleVercelStreamPrepare(w http.ResponseWriter, r *http.Reque
}
payload := stdReq.CompletionPayload(sessionID)
leaseID := h.holdStreamLease(a, stdReq)
leaseID := h.holdStreamLease(a, stdReq, sessionID)
if leaseID == "" {
writeOpenAIError(w, http.StatusInternalServerError, "failed to create stream lease")
return
@@ -141,10 +141,17 @@ func (h *Handler) handleVercelStreamRelease(w http.ResponseWriter, r *http.Reque
writeOpenAIError(w, http.StatusBadRequest, "lease_id is required")
return
}
if !h.releaseStreamLease(leaseID) {
lease, ok := h.releaseStreamLease(leaseID)
if !ok {
writeOpenAIError(w, http.StatusNotFound, "stream lease not found")
return
}
if h.Auth != nil && lease.Auth != nil {
defer h.Auth.Release(lease.Auth)
}
if lease.Auth != nil {
h.autoDeleteRemoteSession(r.Context(), lease.Auth, lease.SessionID)
}
writeJSON(w, http.StatusOK, map[string]any{"success": true})
}
@@ -245,7 +252,7 @@ func (h *Handler) handleVercelStreamSwitch(w http.ResponseWriter, r *http.Reques
writeOpenAIError(w, http.StatusUnauthorized, "Account token is invalid. Please re-login the account in admin.")
return
}
h.updateStreamLeaseStandard(leaseID, stdReq)
h.updateStreamLeaseState(leaseID, stdReq, sessionID)
writeJSON(w, http.StatusOK, map[string]any{
"session_id": sessionID,
"lease_id": leaseID,
@@ -298,14 +305,10 @@ func vercelInternalSecret() string {
return "admin"
}
func (h *Handler) holdStreamLease(a *auth.RequestAuth, standards ...promptcompat.StandardRequest) string {
func (h *Handler) holdStreamLease(a *auth.RequestAuth, stdReq promptcompat.StandardRequest, sessionID string) string {
if a == nil {
return ""
}
var stdReq promptcompat.StandardRequest
if len(standards) > 0 {
stdReq = standards[0]
}
now := time.Now()
ttl := streamLeaseTTL()
if ttl <= 0 {
@@ -321,6 +324,7 @@ func (h *Handler) holdStreamLease(a *auth.RequestAuth, standards ...promptcompat
h.streamLeases[leaseID] = streamLease{
Auth: a,
Standard: stdReq,
SessionID: sessionID,
ExpiresAt: now.Add(ttl),
}
h.leaseMu.Unlock()
@@ -350,7 +354,7 @@ func (h *Handler) lookupStreamLeaseAuth(leaseID string) *auth.RequestAuth {
return lease.Auth
}
func (h *Handler) updateStreamLeaseStandard(leaseID string, stdReq promptcompat.StandardRequest) {
func (h *Handler) updateStreamLeaseState(leaseID string, stdReq promptcompat.StandardRequest, sessionID string) {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return
@@ -362,13 +366,14 @@ func (h *Handler) updateStreamLeaseStandard(leaseID string, stdReq promptcompat.
return
}
lease.Standard = stdReq
lease.SessionID = sessionID
h.streamLeases[leaseID] = lease
}
func (h *Handler) releaseStreamLease(leaseID string) bool {
func (h *Handler) releaseStreamLease(leaseID string) (streamLease, bool) {
leaseID = strings.TrimSpace(leaseID)
if leaseID == "" {
return false
return streamLease{}, false
}
h.leaseMu.Lock()
@@ -381,12 +386,9 @@ func (h *Handler) releaseStreamLease(leaseID string) bool {
h.releaseExpiredAuths(expired)
if !ok {
return false
return streamLease{}, false
}
if h.Auth != nil {
h.Auth.Release(lease.Auth)
}
return true
return lease, true
}
func (h *Handler) popExpiredLeasesLocked(now time.Time) []*auth.RequestAuth {