diff --git a/.env.example b/.env.example index 3b0a6aa..b3db288 100644 --- a/.env.example +++ b/.env.example @@ -11,6 +11,14 @@ PORT=5001 # Log level: DEBUG | INFO | WARN | ERROR LOG_LEVEL=INFO +# Max concurrent inflight requests per account in managed-key mode. +# Default: 2 +# Recommended client concurrency is calculated dynamically as: +# account_count * DS2API_ACCOUNT_MAX_INFLIGHT +# So by default it is account_count * 2. +# Alias: DS2API_ACCOUNT_CONCURRENCY +# DS2API_ACCOUNT_MAX_INFLIGHT=2 + # --------------------------------------------------------------- # Admin auth # --------------------------------------------------------------- diff --git a/API.en.md b/API.en.md index 14fbd50..678e6b0 100644 --- a/API.en.md +++ b/API.en.md @@ -460,10 +460,17 @@ Response: "in_use": 1, "total": 4, "available_accounts": ["a@example.com"], - "in_use_accounts": ["b@example.com"] + "in_use_accounts": ["b@example.com"], + "max_inflight_per_account": 2, + "recommended_concurrency": 8 } ``` +Field notes: + +- `max_inflight_per_account`: per-account in-flight limit (default `2`, override via env) +- `recommended_concurrency`: suggested client concurrency, dynamically computed as `account_count * max_inflight_per_account` + ### `POST /admin/accounts/test` Request fields: diff --git a/API.md b/API.md index 9ee139f..55f34e5 100644 --- a/API.md +++ b/API.md @@ -478,10 +478,17 @@ data: {"type":"message_stop"} "in_use": 1, "total": 4, "available_accounts": ["a@example.com"], - "in_use_accounts": ["b@example.com"] + "in_use_accounts": ["b@example.com"], + "max_inflight_per_account": 2, + "recommended_concurrency": 8 } ``` +字段说明: + +- `max_inflight_per_account`:每个账号允许的并发 in-flight 请求上限(默认 `2`,可由环境变量覆盖) +- `recommended_concurrency`:建议客户端并发值,按 `账号数量 × max_inflight_per_account` 动态计算 + ### `POST /admin/accounts/test` 请求字段: diff --git a/DEPLOY.en.md b/DEPLOY.en.md index 17555fb..fb61884 100644 --- a/DEPLOY.en.md +++ b/DEPLOY.en.md @@ -73,6 +73,10 @@ Optional: - `VERCEL_TOKEN` - `VERCEL_PROJECT_ID` - `VERCEL_TEAM_ID` +- `DS2API_ACCOUNT_MAX_INFLIGHT` (per-account inflight limit, default `2`) +- `DS2API_ACCOUNT_CONCURRENCY` (alias of the same setting) + +Recommended concurrency is computed dynamically as `account_count * per_account_inflight_limit` (default is `account_count * 2`). After deploy, verify: diff --git a/DEPLOY.md b/DEPLOY.md index 1017e9e..6d23746 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -73,6 +73,10 @@ docker-compose up -d --build - `VERCEL_TOKEN` - `VERCEL_PROJECT_ID` - `VERCEL_TEAM_ID` +- `DS2API_ACCOUNT_MAX_INFLIGHT`(每账号并发上限,默认 `2`) +- `DS2API_ACCOUNT_CONCURRENCY`(同上别名) + +并发建议值会动态按 `账号数量 × 每账号并发上限` 计算(默认即 `账号数量 × 2`)。 部署后建议先访问: diff --git a/README.MD b/README.MD index 2f49d55..08ccf11 100644 --- a/README.MD +++ b/README.MD @@ -121,6 +121,8 @@ docker-compose logs -f | --- | --- | | `PORT` | 服务端口,默认 `5001` | | `LOG_LEVEL` | 日志级别:`DEBUG/INFO/WARN/ERROR` | +| `DS2API_ACCOUNT_MAX_INFLIGHT` | 每个账号最大并发 in-flight 请求数,默认 `2` | +| `DS2API_ACCOUNT_CONCURRENCY` | 同上别名(兼容旧写法) | | `DS2API_ADMIN_KEY` | Admin 登录密钥,默认 `admin` | | `DS2API_JWT_SECRET` | Admin JWT 签名密钥(可选) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT 过期小时数,默认 `24` | @@ -141,6 +143,13 @@ docker-compose logs -f 可选请求头:`X-Ds2-Target-Account`,用于指定托管账号。 +## 并发建议值 + +- 系统建议并发值按账号池动态计算:`账号数量 × 每账号并发上限` +- 默认每账号并发上限是 `2`,因此默认建议值是 `账号数量 × 2` +- 可通过 `DS2API_ACCOUNT_MAX_INFLIGHT`(或 `DS2API_ACCOUNT_CONCURRENCY`)手动覆盖每账号并发上限 +- `GET /admin/queue/status` 会返回 `max_inflight_per_account` 与 `recommended_concurrency` + ## Tool Call 适配说明 当前实现对 toolcall 做了防泄漏处理: diff --git a/README.en.md b/README.en.md index 82ca039..5fa9e29 100644 --- a/README.en.md +++ b/README.en.md @@ -121,6 +121,8 @@ docker-compose logs -f | --- | --- | | `PORT` | Service port, default `5001` | | `LOG_LEVEL` | `DEBUG/INFO/WARN/ERROR` | +| `DS2API_ACCOUNT_MAX_INFLIGHT` | Max in-flight requests per managed account, default `2` | +| `DS2API_ACCOUNT_CONCURRENCY` | Alias of the same setting (legacy compatibility) | | `DS2API_ADMIN_KEY` | Admin login key, default `admin` | | `DS2API_JWT_SECRET` | Admin JWT signing secret (optional) | | `DS2API_JWT_EXPIRE_HOURS` | Admin JWT TTL in hours, default `24` | @@ -141,6 +143,13 @@ For business endpoints (`/v1/*`, `/anthropic/*`), DS2API supports two modes: Optional header: `X-Ds2-Target-Account` to pin one managed account. +## Recommended Concurrency + +- DS2API computes recommended concurrency dynamically as: `account_count * per_account_inflight_limit` +- Default per-account inflight limit is `2`, so default recommendation is `account_count * 2` +- You can override per-account inflight via `DS2API_ACCOUNT_MAX_INFLIGHT` (or `DS2API_ACCOUNT_CONCURRENCY`) +- `GET /admin/queue/status` returns both `max_inflight_per_account` and `recommended_concurrency` + ## Tool Call Adaptation Tool-call leakage is handled in the current implementation: diff --git a/internal/account/pool.go b/internal/account/pool.go index 285299b..ded6893 100644 --- a/internal/account/pool.go +++ b/internal/account/pool.go @@ -1,21 +1,30 @@ package account import ( + "os" "sort" + "strconv" + "strings" "sync" "ds2api/internal/config" ) type Pool struct { - store *config.Store - mu sync.Mutex - queue []string - inUse map[string]bool + store *config.Store + mu sync.Mutex + queue []string + inUse map[string]int + maxInflightPerAccount int + recommendedConcurrency int } func NewPool(store *config.Store) *Pool { - p := &Pool{store: store, inUse: map[string]bool{}} + p := &Pool{ + store: store, + inUse: map[string]int{}, + maxInflightPerAccount: maxInflightFromEnv(), + } p.Reset() return p } @@ -37,91 +46,145 @@ func (p *Pool) Reset() { ids = append(ids, id) } } + recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount) p.mu.Lock() defer p.mu.Unlock() p.queue = ids - p.inUse = map[string]bool{} - config.Logger.Info("[init_account_queue] initialized", "total", len(ids)) + p.inUse = map[string]int{} + p.recommendedConcurrency = recommended + config.Logger.Info( + "[init_account_queue] initialized", + "total", len(ids), + "max_inflight_per_account", p.maxInflightPerAccount, + "recommended_concurrency", p.recommendedConcurrency, + ) } func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) { p.mu.Lock() defer p.mu.Unlock() - if target != "" { - for i, id := range p.queue { - if id != target { - continue - } - acc, ok := p.store.FindAccount(id) - if !ok { - return config.Account{}, false - } - p.queue = append(p.queue[:i], p.queue[i+1:]...) - p.inUse[id] = true - return acc, true - } - return config.Account{}, false + if exclude == nil { + exclude = map[string]bool{} } - - for i := 0; i < len(p.queue); i++ { - id := p.queue[i] - if exclude[id] { - continue + if target != "" { + if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount { + return config.Account{}, false } - acc, ok := p.store.FindAccount(id) + acc, ok := p.store.FindAccount(target) if !ok { - continue + return config.Account{}, false } - if acc.Token == "" { - continue - } - p.queue = append(p.queue[:i], p.queue[i+1:]...) - p.inUse[id] = true + p.inUse[target]++ + p.bumpQueue(target) return acc, true } - for i := 0; i < len(p.queue); i++ { - id := p.queue[i] - if exclude[id] { - continue - } - acc, ok := p.store.FindAccount(id) - if !ok { - continue - } - p.queue = append(p.queue[:i], p.queue[i+1:]...) - p.inUse[id] = true + if acc, ok := p.tryAcquire(exclude, true); ok { + return acc, true + } + if acc, ok := p.tryAcquire(exclude, false); ok { return acc, true } return config.Account{}, false } +func (p *Pool) tryAcquire(exclude map[string]bool, requireToken bool) (config.Account, bool) { + for i := 0; i < len(p.queue); i++ { + id := p.queue[i] + if exclude[id] || p.inUse[id] >= p.maxInflightPerAccount { + continue + } + acc, ok := p.store.FindAccount(id) + if !ok { + continue + } + if requireToken && acc.Token == "" { + continue + } + p.inUse[id]++ + p.bumpQueue(id) + return acc, true + } + return config.Account{}, false +} + +func (p *Pool) bumpQueue(accountID string) { + for i, id := range p.queue { + if id != accountID { + continue + } + p.queue = append(p.queue[:i], p.queue[i+1:]...) + p.queue = append(p.queue, accountID) + return + } +} + func (p *Pool) Release(accountID string) { if accountID == "" { return } p.mu.Lock() defer p.mu.Unlock() - if !p.inUse[accountID] { + count := p.inUse[accountID] + if count <= 0 { return } - delete(p.inUse, accountID) - p.queue = append(p.queue, accountID) + if count == 1 { + delete(p.inUse, accountID) + return + } + p.inUse[accountID] = count - 1 } func (p *Pool) Status() map[string]any { p.mu.Lock() defer p.mu.Unlock() - available := append([]string{}, p.queue...) - inUse := make([]string, 0, len(p.inUse)) - for id := range p.inUse { - inUse = append(inUse, id) + available := make([]string, 0, len(p.queue)) + inUseAccounts := make([]string, 0, len(p.inUse)) + inUseSlots := 0 + for _, id := range p.queue { + if p.inUse[id] < p.maxInflightPerAccount { + available = append(available, id) + } } + for id, count := range p.inUse { + if count > 0 { + inUseAccounts = append(inUseAccounts, id) + inUseSlots += count + } + } + sort.Strings(inUseAccounts) return map[string]any{ - "available": len(available), - "in_use": len(inUse), - "total": len(p.store.Accounts()), - "available_accounts": available, - "in_use_accounts": inUse, + "available": len(available), + "in_use": inUseSlots, + "total": len(p.store.Accounts()), + "available_accounts": available, + "in_use_accounts": inUseAccounts, + "max_inflight_per_account": p.maxInflightPerAccount, + "recommended_concurrency": p.recommendedConcurrency, } } + +func maxInflightFromEnv() int { + for _, key := range []string{"DS2API_ACCOUNT_MAX_INFLIGHT", "DS2API_ACCOUNT_CONCURRENCY"} { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + continue + } + n, err := strconv.Atoi(raw) + if err == nil && n > 0 { + return n + } + } + return 2 +} + +func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int { + if accountCount <= 0 { + return 0 + } + if maxInflightPerAccount <= 0 { + maxInflightPerAccount = 2 + } + return accountCount * maxInflightPerAccount +} diff --git a/internal/account/pool_test.go b/internal/account/pool_test.go new file mode 100644 index 0000000..82c9ced --- /dev/null +++ b/internal/account/pool_test.go @@ -0,0 +1,154 @@ +package account + +import ( + "sync" + "testing" + + "ds2api/internal/config" +) + +func newPoolForTest(t *testing.T, maxInflight string) *Pool { + t.Helper() + t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight) + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[ + {"email":"acc1@example.com","token":"token1"}, + {"email":"acc2@example.com","token":"token2"} + ] + }`) + store := config.LoadStore() + return NewPool(store) +} + +func TestPoolRoundRobinWithConcurrentSlots(t *testing.T) { + pool := newPoolForTest(t, "2") + + order := make([]string, 0, 4) + for i := 0; i < 4; i++ { + acc, ok := pool.Acquire("", nil) + if !ok { + t.Fatalf("expected acquire success at step %d", i+1) + } + order = append(order, acc.Identifier()) + } + want := []string{"acc1@example.com", "acc2@example.com", "acc1@example.com", "acc2@example.com"} + for i := range want { + if order[i] != want[i] { + t.Fatalf("unexpected order at %d: got %q want %q (full=%v)", i, order[i], want[i], order) + } + } + + if _, ok := pool.Acquire("", nil); ok { + t.Fatalf("expected acquire to fail when all inflight slots are occupied") + } + + pool.Release("acc1@example.com") + acc, ok := pool.Acquire("", nil) + if !ok || acc.Identifier() != "acc1@example.com" { + t.Fatalf("expected reacquire acc1 after releasing one slot, got ok=%v id=%q", ok, acc.Identifier()) + } +} + +func TestPoolTargetAccountInflightLimit(t *testing.T) { + pool := newPoolForTest(t, "2") + + for i := 0; i < 2; i++ { + if _, ok := pool.Acquire("acc1@example.com", nil); !ok { + t.Fatalf("expected target acquire success at step %d", i+1) + } + } + if _, ok := pool.Acquire("acc1@example.com", nil); ok { + t.Fatalf("expected third acquire on same target to fail due to inflight limit") + } +} + +func TestPoolConcurrentAcquireDistribution(t *testing.T) { + pool := newPoolForTest(t, "2") + + start := make(chan struct{}) + results := make(chan string, 6) + var wg sync.WaitGroup + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + acc, ok := pool.Acquire("", nil) + if !ok { + results <- "FAIL" + return + } + results <- acc.Identifier() + }() + } + + close(start) + wg.Wait() + close(results) + + success := 0 + fail := 0 + perAccount := map[string]int{} + for id := range results { + if id == "FAIL" { + fail++ + continue + } + success++ + perAccount[id]++ + } + if success != 4 || fail != 2 { + t.Fatalf("unexpected concurrent acquire result: success=%d fail=%d perAccount=%v", success, fail, perAccount) + } + for id, n := range perAccount { + if n > 2 { + t.Fatalf("account %s exceeded inflight limit: %d", id, n) + } + } +} + +func TestPoolStatusRecommendedConcurrencyDefault(t *testing.T) { + pool := newPoolForTest(t, "") + status := pool.Status() + + if got, ok := status["max_inflight_per_account"].(int); !ok || got != 2 { + t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"]) + } + if got, ok := status["recommended_concurrency"].(int); !ok || got != 4 { + t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) + } +} + +func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) { + pool := newPoolForTest(t, "3") + status := pool.Status() + + if got, ok := status["max_inflight_per_account"].(int); !ok || got != 3 { + t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"]) + } + if got, ok := status["recommended_concurrency"].(int); !ok || got != 6 { + t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) + } +} + +func TestPoolAccountConcurrencyAliasEnv(t *testing.T) { + t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", "") + t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "4") + t.Setenv("DS2API_CONFIG_JSON", `{ + "keys":["k1"], + "accounts":[ + {"email":"acc1@example.com","token":"token1"}, + {"email":"acc2@example.com","token":"token2"} + ] + }`) + + pool := NewPool(config.LoadStore()) + status := pool.Status() + if got, ok := status["max_inflight_per_account"].(int); !ok || got != 4 { + t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"]) + } + if got, ok := status["recommended_concurrency"].(int); !ok || got != 8 { + t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"]) + } +}