diff --git a/.env.example b/.env.example index 41052ad..6812b39 100644 --- a/.env.example +++ b/.env.example @@ -16,9 +16,17 @@ LOG_LEVEL=INFO # Recommended client concurrency is calculated dynamically as: # account_count * DS2API_ACCOUNT_MAX_INFLIGHT # So by default it is account_count * 2. +# Requests beyond inflight slots enter a waiting queue first. +# Default queue size equals recommended concurrency, so 429 starts after: +# account_count * DS2API_ACCOUNT_MAX_INFLIGHT * 2 # Alias: DS2API_ACCOUNT_CONCURRENCY # DS2API_ACCOUNT_MAX_INFLIGHT=2 +# Optional waiting queue size override for managed-key mode. +# Default: recommended_concurrency (same as account_count * inflight_limit) +# Alias: DS2API_ACCOUNT_QUEUE_SIZE +# DS2API_ACCOUNT_MAX_QUEUE=10 + # --------------------------------------------------------------- # Admin auth # --------------------------------------------------------------- diff --git a/DEPLOY.en.md b/DEPLOY.en.md index 0bd5838..10341e1 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -79,8 +79,11 @@ Optional: - `VERCEL_TEAM_ID` - `DS2API_ACCOUNT_MAX_INFLIGHT` (per-account inflight limit, default `2`) - `DS2API_ACCOUNT_CONCURRENCY` (alias of the same setting) +- `DS2API_ACCOUNT_MAX_QUEUE` (waiting queue limit, default=`recommended_concurrency`) +- `DS2API_ACCOUNT_QUEUE_SIZE` (alias of the same setting) Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`). +When inflight slots are full, requests are queued first; with default queue size, 429 typically starts around `account_count * 4`. Notes: - `static/admin` build output is not committed diff --git a/DEPLOY.md b/DEPLOY.md index 3c185b2..a063b2b 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -79,8 +79,11 @@ docker-compose up -d --build - `VERCEL_TEAM_ID` - `DS2API_ACCOUNT_MAX_INFLIGHT`(每账号并发上限,默认 `2`) - `DS2API_ACCOUNT_CONCURRENCY`(同上别名) +- `DS2API_ACCOUNT_MAX_QUEUE`(等待队列上限,默认=`recommended_concurrency`) +- `DS2API_ACCOUNT_QUEUE_SIZE`(同上别名) 并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。 +当 in-flight 满时,请求先进入等待队列;默认队列上限等于建议并发值,因此默认 429 阈值约为 `账号数量 × 4`。 说明: - 仓库不提交 `static/admin` 构建产物 diff --git a/README.MD b/README.MD index 6831b98..7d3190b 100644 --- a/README.MD +++ b/README.MD @@ -156,6 +156,8 @@ cp config.example.json config.json | `LOG_LEVEL` | 日志级别:`DEBUG/INFO/WARN/ERROR` | | `DS2API_ACCOUNT_MAX_INFLIGHT` | 每个账号最大并发 in-flight 请求数,默认 `2` | | `DS2API_ACCOUNT_CONCURRENCY` | 同上别名(兼容旧写法) | +| `DS2API_ACCOUNT_MAX_QUEUE` | 等待队列上限,默认等于 `recommended_concurrency` | +| `DS2API_ACCOUNT_QUEUE_SIZE` | 同上别名(兼容旧写法) | | `DS2API_ADMIN_KEY` | Admin 登录密钥,默认 `admin` | | `DS2API_JWT_SECRET` | Admin JWT 签名密钥(可选) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT 过期小时数,默认 `24` | @@ -181,8 +183,12 @@ cp config.example.json config.json - 系统建议并发值按账号池动态计算:`账号数量 × 每账号并发上限` - 默认每账号并发上限是 `2`,因此默认建议值是 `账号数量 × 2` +- 当 in-flight 槽位满时,请求会进入等待队列,不会立即 429 +- 默认等待队列上限 = `recommended_concurrency`,因此默认总承载上限是 `账号数量 × 4` +- 超过总承载上限(in-flight + waiting)才返回 `429` - 可通过 `DS2API_ACCOUNT_MAX_INFLIGHT`(或 `DS2API_ACCOUNT_CONCURRENCY`)手动覆盖每账号并发上限 -- `GET /admin/queue/status` 会返回 `max_inflight_per_account` 与 `recommended_concurrency` +- 可通过 `DS2API_ACCOUNT_MAX_QUEUE`(或 `DS2API_ACCOUNT_QUEUE_SIZE`)手动覆盖等待队列上限 +- `GET /admin/queue/status` 会返回 `max_inflight_per_account`、`recommended_concurrency`、`waiting`、`max_queue_size` ## Tool Call 适配说明 diff --git a/README.en.md b/README.en.md index c6c8655..143b978 100644 --- a/README.en.md +++ b/README.en.md @@ -156,6 +156,8 @@ cp config.example.json config.json | `LOG_LEVEL` | `DEBUG/INFO/WARN/ERROR` | | `DS2API_ACCOUNT_MAX_INFLIGHT` | Max in-flight requests per managed account, default `2` | | `DS2API_ACCOUNT_CONCURRENCY` | Alias of the same setting (legacy compatibility) | +| `DS2API_ACCOUNT_MAX_QUEUE` | Waiting queue limit (managed-key mode), default=`recommended_concurrency` | +| `DS2API_ACCOUNT_QUEUE_SIZE` | Alias of the same setting (legacy compatibility) | | `DS2API_ADMIN_KEY` | Admin login key, default `admin` | | `DS2API_JWT_SECRET` | Admin JWT signing secret (optional) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT TTL in hours, default `24` | @@ -181,8 +183,12 @@ Optional header: `X-Ds2-Target-Account` to pin one managed account. - DS2API computes recommended concurrency dynamically as: `account_count * per_account_inflight_limit` - Default per-account inflight limit is `2`, so default recommendation is `account_count * 2` +- When inflight slots are full, requests enter a waiting queue instead of immediate 429 +- Default queue limit equals `recommended_concurrency`, so default 429 threshold is about `account_count * 4` +- 429 is returned only after total load exceeds `inflight + waiting` capacity - You can override per-account inflight via `DS2API_ACCOUNT_MAX_INFLIGHT` (or `DS2API_ACCOUNT_CONCURRENCY`) -- `GET /admin/queue/status` returns both `max_inflight_per_account` and `recommended_concurrency` +- You can override waiting queue size via `DS2API_ACCOUNT_MAX_QUEUE` (or `DS2API_ACCOUNT_QUEUE_SIZE`) +- `GET /admin/queue/status` returns `max_inflight_per_account`, `recommended_concurrency`, `waiting`, and `max_queue_size` ## Tool Call Adaptation diff --git a/internal/account/pool.go b/internal/account/pool.go index ded6893..665bcee 100644 --- a/internal/account/pool.go +++ b/internal/account/pool.go @@ -1,6 +1,7 @@ package account import ( + "context" "os" "sort" "strconv" @@ -11,12 +12,14 @@ import ( ) type Pool struct { - store *config.Store - mu sync.Mutex - queue []string - inUse map[string]int - maxInflightPerAccount int + store *config.Store + mu sync.Mutex + queue []string + inUse map[string]int + waiters []chan struct{} + maxInflightPerAccount int recommendedConcurrency int + maxQueueSize int } func NewPool(store *config.Store) *Pool { @@ -47,25 +50,64 @@ func (p *Pool) Reset() { } } recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount) + queueLimit := maxQueueFromEnv(recommended) p.mu.Lock() defer p.mu.Unlock() + p.drainWaitersLocked() p.queue = ids p.inUse = map[string]int{} p.recommendedConcurrency = recommended + p.maxQueueSize = queueLimit config.Logger.Info( "[init_account_queue] initialized", "total", len(ids), "max_inflight_per_account", p.maxInflightPerAccount, "recommended_concurrency", p.recommendedConcurrency, + "max_queue_size", p.maxQueueSize, ) } func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) { p.mu.Lock() defer p.mu.Unlock() - if exclude == nil { - exclude = map[string]bool{} + return p.acquireLocked(target, normalizeExclude(exclude)) +} + +func (p *Pool) AcquireWait(ctx context.Context, target string, exclude map[string]bool) (config.Account, bool) { + if ctx == nil { + ctx = context.Background() } + exclude = normalizeExclude(exclude) + for { + if ctx.Err() != nil { + return config.Account{}, false + } + + p.mu.Lock() + if acc, ok := p.acquireLocked(target, exclude); ok { + p.mu.Unlock() + return acc, true + } + if !p.canQueueLocked(target, exclude) { + p.mu.Unlock() + return config.Account{}, false + } + waiter := make(chan struct{}) + p.waiters = append(p.waiters, waiter) + p.mu.Unlock() + + select { + case <-ctx.Done(): + p.mu.Lock() + p.removeWaiterLocked(waiter) + p.mu.Unlock() + return config.Account{}, false + case <-waiter: + } + } +} + +func (p *Pool) acquireLocked(target string, exclude map[string]bool) (config.Account, bool) { if target != "" { if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount { return config.Account{}, false @@ -131,9 +173,11 @@ func (p *Pool) Release(accountID string) { } if count == 1 { delete(p.inUse, accountID) + p.notifyWaiterLocked() return } p.inUse[accountID] = count - 1 + p.notifyWaiterLocked() } func (p *Pool) Status() map[string]any { @@ -162,6 +206,8 @@ func (p *Pool) Status() map[string]any { "in_use_accounts": inUseAccounts, "max_inflight_per_account": p.maxInflightPerAccount, "recommended_concurrency": p.recommendedConcurrency, + "waiting": len(p.waiters), + "max_queue_size": p.maxQueueSize, } } @@ -188,3 +234,69 @@ func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int } return accountCount * maxInflightPerAccount } + +func normalizeExclude(exclude map[string]bool) map[string]bool { + if exclude == nil { + return map[string]bool{} + } + return exclude +} + +func (p *Pool) canQueueLocked(target string, exclude map[string]bool) bool { + if target != "" { + if exclude[target] { + return false + } + if _, ok := p.store.FindAccount(target); !ok { + return false + } + } + if p.maxQueueSize <= 0 { + return false + } + return len(p.waiters) < p.maxQueueSize +} + +func (p *Pool) notifyWaiterLocked() { + if len(p.waiters) == 0 { + return + } + waiter := p.waiters[0] + p.waiters = p.waiters[1:] + close(waiter) +} + +func (p *Pool) removeWaiterLocked(waiter chan struct{}) bool { + for i, w := range p.waiters { + if w != waiter { + continue + } + p.waiters = append(p.waiters[:i], p.waiters[i+1:]...) + return true + } + return false +} + +func (p *Pool) drainWaitersLocked() { + for _, waiter := range p.waiters { + close(waiter) + } + p.waiters = nil +} + +func maxQueueFromEnv(defaultSize int) int { + for _, key := range []string{"DS2API_ACCOUNT_MAX_QUEUE", "DS2API_ACCOUNT_QUEUE_SIZE"} { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + continue + } + n, err := strconv.Atoi(raw) + if err == nil && n >= 0 { + return n + } + } + if defaultSize < 0 { + return 0 + } + return defaultSize +} diff --git a/internal/account/pool_test.go b/internal/account/pool_test.go index 8d0348b..59ea32b 100644 --- a/internal/account/pool_test.go +++ b/internal/account/pool_test.go @@ -1,8 +1,10 @@ package account import ( + "context" "sync" "testing" + "time" "ds2api/internal/config" ) @@ -10,6 +12,9 @@ import ( func newPoolForTest(t *testing.T, maxInflight string) *Pool { t.Helper() t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight) + t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "") + t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "") + t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "") t.Setenv("DS2API_CONFIG_JSON", `{ "keys":["k1"], "accounts":[ @@ -21,6 +26,33 @@ func newPoolForTest(t *testing.T, maxInflight string) *Pool { return NewPool(store) } +func newSingleAccountPoolForTest(t *testing.T, maxInflight string) *Pool { + t.Helper() + t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight) + t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "") + t.Setenv("DS2API_ACCOUNT_MAX_QUEUE", "") + t.Setenv("DS2API_ACCOUNT_QUEUE_SIZE", "") + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[{"email":"acc1@example.com","token":"token1"}] + }`) + return NewPool(config.LoadStore()) +} + +func waitForWaitingCount(t *testing.T, pool *Pool, want int) { + t.Helper() + deadline := time.Now().Add(800 * time.Millisecond) + for time.Now().Before(deadline) { + status := pool.Status() + if got, ok := status["waiting"].(int); ok && got == want { + return + } + time.Sleep(10 * time.Millisecond) + } + status := pool.Status() + t.Fatalf("waiting count did not reach %d, current status=%v", want, status) +} + func TestPoolRoundRobinWithConcurrentSlots(t *testing.T) { pool := newPoolForTest(t, "2") @@ -118,6 +150,9 @@ func TestPoolStatusRecommendedConcurrencyDefault(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 4 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 4 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } } func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) { @@ -130,6 +165,9 @@ func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 6 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 6 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } } func TestPoolAccountConcurrencyAliasEnv(t *testing.T) { @@ -151,6 +189,9 @@ func TestPoolAccountConcurrencyAliasEnv(t *testing.T) { if got, ok := status["recommended_concurrency"].(int); !ok || got != 8 { t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) } + if got, ok := status["max_queue_size"].(int); !ok || got != 8 { + t.Fatalf("unexpected max_queue_size: %#v", status["max_queue_size"]) + } } func TestPoolSupportsTokenOnlyAccount(t *testing.T) { @@ -177,3 +218,79 @@ func TestPoolSupportsTokenOnlyAccount(t *testing.T) { t.Fatalf("unexpected token on acquired account: %q", acc.Token) } } + +func TestPoolAcquireWaitQueuesAndSucceedsAfterRelease(t *testing.T) { + pool := newSingleAccountPoolForTest(t, "1") + first, ok := pool.Acquire("", nil) + if !ok { + t.Fatal("expected first acquire to succeed") + } + + type result struct { + id string + ok bool + } + resCh := make(chan result, 1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + go func() { + acc, ok := pool.AcquireWait(ctx, "", nil) + resCh <- result{id: acc.Identifier(), ok: ok} + }() + + waitForWaitingCount(t, pool, 1) + pool.Release(first.Identifier()) + + select { + case res := <-resCh: + if !res.ok { + t.Fatal("expected queued acquire to succeed after release") + } + if res.id != "acc1@example.com" { + t.Fatalf("unexpected account id from queued acquire: %q", res.id) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for queued acquire result") + } +} + +func TestPoolAcquireWaitQueueLimitReturnsFalse(t *testing.T) { + pool := newSingleAccountPoolForTest(t, "1") + first, ok := pool.Acquire("", nil) + if !ok { + t.Fatal("expected first acquire to succeed") + } + + type result struct { + id string + ok bool + } + firstWaiter := make(chan result, 1) + ctx1, cancel1 := context.WithTimeout(context.Background(), 1200*time.Millisecond) + defer cancel1() + go func() { + acc, ok := pool.AcquireWait(ctx1, "", nil) + firstWaiter <- result{id: acc.Identifier(), ok: ok} + }() + waitForWaitingCount(t, pool, 1) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel2() + start := time.Now() + if _, ok := pool.AcquireWait(ctx2, "", nil); ok { + t.Fatal("expected second queued acquire to fail when queue is full") + } + if time.Since(start) > 120*time.Millisecond { + t.Fatalf("queue-full acquire should fail fast, took %s", time.Since(start)) + } + + pool.Release(first.Identifier()) + select { + case res := <-firstWaiter: + if !res.ok { + t.Fatal("expected first queued acquire to succeed after release") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for first queued acquire") + } +} diff --git a/internal/auth/request.go b/internal/auth/request.go index d36f4b1..ea3d7f1 100644 --- a/internal/auth/request.go +++ b/internal/auth/request.go @@ -50,7 +50,7 @@ func (r *Resolver) Determine(req *http.Request) (*RequestAuth, error) { return &RequestAuth{UseConfigToken: false, DeepSeekToken: callerKey, resolver: r, TriedAccounts: map[string]bool{}}, nil } target := strings.TrimSpace(req.Header.Get("X-Ds2-Target-Account")) - acc, ok := r.Pool.Acquire(target, nil) + acc, ok := r.Pool.AcquireWait(ctx, target, nil) if !ok { return nil, ErrNoAccount }