feat: implement per-account inflight request limits and dynamic recommended concurrency calculation.

This commit is contained in:
CJACK
2026-02-16 16:11:12 +08:00
parent c7ffcd76e6
commit 6a6f380987
9 changed files with 323 additions and 58 deletions

View File

@@ -1,21 +1,30 @@
package account
import (
"os"
"sort"
"strconv"
"strings"
"sync"
"ds2api/internal/config"
)
type Pool struct {
store *config.Store
mu sync.Mutex
queue []string
inUse map[string]bool
store *config.Store
mu sync.Mutex
queue []string
inUse map[string]int
maxInflightPerAccount int
recommendedConcurrency int
}
func NewPool(store *config.Store) *Pool {
p := &Pool{store: store, inUse: map[string]bool{}}
p := &Pool{
store: store,
inUse: map[string]int{},
maxInflightPerAccount: maxInflightFromEnv(),
}
p.Reset()
return p
}
@@ -37,91 +46,145 @@ func (p *Pool) Reset() {
ids = append(ids, id)
}
}
recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount)
p.mu.Lock()
defer p.mu.Unlock()
p.queue = ids
p.inUse = map[string]bool{}
config.Logger.Info("[init_account_queue] initialized", "total", len(ids))
p.inUse = map[string]int{}
p.recommendedConcurrency = recommended
config.Logger.Info(
"[init_account_queue] initialized",
"total", len(ids),
"max_inflight_per_account", p.maxInflightPerAccount,
"recommended_concurrency", p.recommendedConcurrency,
)
}
func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if target != "" {
for i, id := range p.queue {
if id != target {
continue
}
acc, ok := p.store.FindAccount(id)
if !ok {
return config.Account{}, false
}
p.queue = append(p.queue[:i], p.queue[i+1:]...)
p.inUse[id] = true
return acc, true
}
return config.Account{}, false
if exclude == nil {
exclude = map[string]bool{}
}
for i := 0; i < len(p.queue); i++ {
id := p.queue[i]
if exclude[id] {
continue
if target != "" {
if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount {
return config.Account{}, false
}
acc, ok := p.store.FindAccount(id)
acc, ok := p.store.FindAccount(target)
if !ok {
continue
return config.Account{}, false
}
if acc.Token == "" {
continue
}
p.queue = append(p.queue[:i], p.queue[i+1:]...)
p.inUse[id] = true
p.inUse[target]++
p.bumpQueue(target)
return acc, true
}
for i := 0; i < len(p.queue); i++ {
id := p.queue[i]
if exclude[id] {
continue
}
acc, ok := p.store.FindAccount(id)
if !ok {
continue
}
p.queue = append(p.queue[:i], p.queue[i+1:]...)
p.inUse[id] = true
if acc, ok := p.tryAcquire(exclude, true); ok {
return acc, true
}
if acc, ok := p.tryAcquire(exclude, false); ok {
return acc, true
}
return config.Account{}, false
}
func (p *Pool) tryAcquire(exclude map[string]bool, requireToken bool) (config.Account, bool) {
for i := 0; i < len(p.queue); i++ {
id := p.queue[i]
if exclude[id] || p.inUse[id] >= p.maxInflightPerAccount {
continue
}
acc, ok := p.store.FindAccount(id)
if !ok {
continue
}
if requireToken && acc.Token == "" {
continue
}
p.inUse[id]++
p.bumpQueue(id)
return acc, true
}
return config.Account{}, false
}
func (p *Pool) bumpQueue(accountID string) {
for i, id := range p.queue {
if id != accountID {
continue
}
p.queue = append(p.queue[:i], p.queue[i+1:]...)
p.queue = append(p.queue, accountID)
return
}
}
func (p *Pool) Release(accountID string) {
if accountID == "" {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if !p.inUse[accountID] {
count := p.inUse[accountID]
if count <= 0 {
return
}
delete(p.inUse, accountID)
p.queue = append(p.queue, accountID)
if count == 1 {
delete(p.inUse, accountID)
return
}
p.inUse[accountID] = count - 1
}
func (p *Pool) Status() map[string]any {
p.mu.Lock()
defer p.mu.Unlock()
available := append([]string{}, p.queue...)
inUse := make([]string, 0, len(p.inUse))
for id := range p.inUse {
inUse = append(inUse, id)
available := make([]string, 0, len(p.queue))
inUseAccounts := make([]string, 0, len(p.inUse))
inUseSlots := 0
for _, id := range p.queue {
if p.inUse[id] < p.maxInflightPerAccount {
available = append(available, id)
}
}
for id, count := range p.inUse {
if count > 0 {
inUseAccounts = append(inUseAccounts, id)
inUseSlots += count
}
}
sort.Strings(inUseAccounts)
return map[string]any{
"available": len(available),
"in_use": len(inUse),
"total": len(p.store.Accounts()),
"available_accounts": available,
"in_use_accounts": inUse,
"available": len(available),
"in_use": inUseSlots,
"total": len(p.store.Accounts()),
"available_accounts": available,
"in_use_accounts": inUseAccounts,
"max_inflight_per_account": p.maxInflightPerAccount,
"recommended_concurrency": p.recommendedConcurrency,
}
}
func maxInflightFromEnv() int {
for _, key := range []string{"DS2API_ACCOUNT_MAX_INFLIGHT", "DS2API_ACCOUNT_CONCURRENCY"} {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
continue
}
n, err := strconv.Atoi(raw)
if err == nil && n > 0 {
return n
}
}
return 2
}
func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int {
if accountCount <= 0 {
return 0
}
if maxInflightPerAccount <= 0 {
maxInflightPerAccount = 2
}
return accountCount * maxInflightPerAccount
}

View File

@@ -0,0 +1,154 @@
package account
import (
"sync"
"testing"
"ds2api/internal/config"
)
func newPoolForTest(t *testing.T, maxInflight string) *Pool {
t.Helper()
t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", maxInflight)
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[
{"email":"acc1@example.com","token":"token1"},
{"email":"acc2@example.com","token":"token2"}
]
}`)
store := config.LoadStore()
return NewPool(store)
}
func TestPoolRoundRobinWithConcurrentSlots(t *testing.T) {
pool := newPoolForTest(t, "2")
order := make([]string, 0, 4)
for i := 0; i < 4; i++ {
acc, ok := pool.Acquire("", nil)
if !ok {
t.Fatalf("expected acquire success at step %d", i+1)
}
order = append(order, acc.Identifier())
}
want := []string{"acc1@example.com", "acc2@example.com", "acc1@example.com", "acc2@example.com"}
for i := range want {
if order[i] != want[i] {
t.Fatalf("unexpected order at %d: got %q want %q (full=%v)", i, order[i], want[i], order)
}
}
if _, ok := pool.Acquire("", nil); ok {
t.Fatalf("expected acquire to fail when all inflight slots are occupied")
}
pool.Release("acc1@example.com")
acc, ok := pool.Acquire("", nil)
if !ok || acc.Identifier() != "acc1@example.com" {
t.Fatalf("expected reacquire acc1 after releasing one slot, got ok=%v id=%q", ok, acc.Identifier())
}
}
func TestPoolTargetAccountInflightLimit(t *testing.T) {
pool := newPoolForTest(t, "2")
for i := 0; i < 2; i++ {
if _, ok := pool.Acquire("acc1@example.com", nil); !ok {
t.Fatalf("expected target acquire success at step %d", i+1)
}
}
if _, ok := pool.Acquire("acc1@example.com", nil); ok {
t.Fatalf("expected third acquire on same target to fail due to inflight limit")
}
}
func TestPoolConcurrentAcquireDistribution(t *testing.T) {
pool := newPoolForTest(t, "2")
start := make(chan struct{})
results := make(chan string, 6)
var wg sync.WaitGroup
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-start
acc, ok := pool.Acquire("", nil)
if !ok {
results <- "FAIL"
return
}
results <- acc.Identifier()
}()
}
close(start)
wg.Wait()
close(results)
success := 0
fail := 0
perAccount := map[string]int{}
for id := range results {
if id == "FAIL" {
fail++
continue
}
success++
perAccount[id]++
}
if success != 4 || fail != 2 {
t.Fatalf("unexpected concurrent acquire result: success=%d fail=%d perAccount=%v", success, fail, perAccount)
}
for id, n := range perAccount {
if n > 2 {
t.Fatalf("account %s exceeded inflight limit: %d", id, n)
}
}
}
func TestPoolStatusRecommendedConcurrencyDefault(t *testing.T) {
pool := newPoolForTest(t, "")
status := pool.Status()
if got, ok := status["max_inflight_per_account"].(int); !ok || got != 2 {
t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"])
}
if got, ok := status["recommended_concurrency"].(int); !ok || got != 4 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
}
func TestPoolStatusRecommendedConcurrencyRespectsOverride(t *testing.T) {
pool := newPoolForTest(t, "3")
status := pool.Status()
if got, ok := status["max_inflight_per_account"].(int); !ok || got != 3 {
t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"])
}
if got, ok := status["recommended_concurrency"].(int); !ok || got != 6 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
}
func TestPoolAccountConcurrencyAliasEnv(t *testing.T) {
t.Setenv("DS2API_ACCOUNT_MAX_INFLIGHT", "")
t.Setenv("DS2API_ACCOUNT_CONCURRENCY", "4")
t.Setenv("DS2API_CONFIG_JSON", `{
"keys":["k1"],
"accounts":[
{"email":"acc1@example.com","token":"token1"},
{"email":"acc2@example.com","token":"token2"}
]
}`)
pool := NewPool(config.LoadStore())
status := pool.Status()
if got, ok := status["max_inflight_per_account"].(int); !ok || got != 4 {
t.Fatalf("unexpected max_inflight_per_account: %#v", status["max_inflight_per_account"])
}
if got, ok := status["recommended_concurrency"].(int); !ok || got != 8 {
t.Fatalf("unexpected recommended_concurrency: %#v", status["recommended_concurrency"])
}
}