package account import ( "context" "os" "sort" "strconv" "strings" "sync" "ds2api/internal/config" ) type Pool struct { store *config.Store mu sync.Mutex queue []string inUse map[string]int waiters []chan struct{} maxInflightPerAccount int recommendedConcurrency int maxQueueSize int } func NewPool(store *config.Store) *Pool { p := &Pool{ store: store, inUse: map[string]int{}, maxInflightPerAccount: maxInflightFromEnv(), } p.Reset() return p } func (p *Pool) Reset() { accounts := p.store.Accounts() sort.SliceStable(accounts, func(i, j int) bool { iHas := accounts[i].Token != "" jHas := accounts[j].Token != "" if iHas == jHas { return i < j } return iHas }) ids := make([]string, 0, len(accounts)) for _, a := range accounts { id := a.Identifier() if id != "" { ids = append(ids, id) } } recommended := defaultRecommendedConcurrency(len(ids), p.maxInflightPerAccount) queueLimit := maxQueueFromEnv(recommended) p.mu.Lock() defer p.mu.Unlock() p.drainWaitersLocked() p.queue = ids p.inUse = map[string]int{} p.recommendedConcurrency = recommended p.maxQueueSize = queueLimit config.Logger.Info( "[init_account_queue] initialized", "total", len(ids), "max_inflight_per_account", p.maxInflightPerAccount, "recommended_concurrency", p.recommendedConcurrency, "max_queue_size", p.maxQueueSize, ) } func (p *Pool) Acquire(target string, exclude map[string]bool) (config.Account, bool) { p.mu.Lock() defer p.mu.Unlock() return p.acquireLocked(target, normalizeExclude(exclude)) } func (p *Pool) AcquireWait(ctx context.Context, target string, exclude map[string]bool) (config.Account, bool) { if ctx == nil { ctx = context.Background() } exclude = normalizeExclude(exclude) for { if ctx.Err() != nil { return config.Account{}, false } p.mu.Lock() if acc, ok := p.acquireLocked(target, exclude); ok { p.mu.Unlock() return acc, true } if !p.canQueueLocked(target, exclude) { p.mu.Unlock() return config.Account{}, false } waiter := make(chan struct{}) p.waiters = append(p.waiters, waiter) p.mu.Unlock() select { case <-ctx.Done(): p.mu.Lock() p.removeWaiterLocked(waiter) p.mu.Unlock() return config.Account{}, false case <-waiter: } } } func (p *Pool) acquireLocked(target string, exclude map[string]bool) (config.Account, bool) { if target != "" { if exclude[target] || p.inUse[target] >= p.maxInflightPerAccount { return config.Account{}, false } acc, ok := p.store.FindAccount(target) if !ok { return config.Account{}, false } p.inUse[target]++ p.bumpQueue(target) return acc, true } if acc, ok := p.tryAcquire(exclude, true); ok { return acc, true } if acc, ok := p.tryAcquire(exclude, false); ok { return acc, true } return config.Account{}, false } func (p *Pool) tryAcquire(exclude map[string]bool, requireToken bool) (config.Account, bool) { for i := 0; i < len(p.queue); i++ { id := p.queue[i] if exclude[id] || p.inUse[id] >= p.maxInflightPerAccount { continue } acc, ok := p.store.FindAccount(id) if !ok { continue } if requireToken && acc.Token == "" { continue } p.inUse[id]++ p.bumpQueue(id) return acc, true } return config.Account{}, false } func (p *Pool) bumpQueue(accountID string) { for i, id := range p.queue { if id != accountID { continue } p.queue = append(p.queue[:i], p.queue[i+1:]...) p.queue = append(p.queue, accountID) return } } func (p *Pool) Release(accountID string) { if accountID == "" { return } p.mu.Lock() defer p.mu.Unlock() count := p.inUse[accountID] if count <= 0 { return } if count == 1 { delete(p.inUse, accountID) p.notifyWaiterLocked() return } p.inUse[accountID] = count - 1 p.notifyWaiterLocked() } func (p *Pool) Status() map[string]any { p.mu.Lock() defer p.mu.Unlock() available := make([]string, 0, len(p.queue)) inUseAccounts := make([]string, 0, len(p.inUse)) inUseSlots := 0 for _, id := range p.queue { if p.inUse[id] < p.maxInflightPerAccount { available = append(available, id) } } for id, count := range p.inUse { if count > 0 { inUseAccounts = append(inUseAccounts, id) inUseSlots += count } } sort.Strings(inUseAccounts) return map[string]any{ "available": len(available), "in_use": inUseSlots, "total": len(p.store.Accounts()), "available_accounts": available, "in_use_accounts": inUseAccounts, "max_inflight_per_account": p.maxInflightPerAccount, "recommended_concurrency": p.recommendedConcurrency, "waiting": len(p.waiters), "max_queue_size": p.maxQueueSize, } } func maxInflightFromEnv() int { for _, key := range []string{"DS2API_ACCOUNT_MAX_INFLIGHT", "DS2API_ACCOUNT_CONCURRENCY"} { raw := strings.TrimSpace(os.Getenv(key)) if raw == "" { continue } n, err := strconv.Atoi(raw) if err == nil && n > 0 { return n } } return 2 } func defaultRecommendedConcurrency(accountCount, maxInflightPerAccount int) int { if accountCount <= 0 { return 0 } if maxInflightPerAccount <= 0 { maxInflightPerAccount = 2 } return accountCount * maxInflightPerAccount } func normalizeExclude(exclude map[string]bool) map[string]bool { if exclude == nil { return map[string]bool{} } return exclude } func (p *Pool) canQueueLocked(target string, exclude map[string]bool) bool { if target != "" { if exclude[target] { return false } if _, ok := p.store.FindAccount(target); !ok { return false } } if p.maxQueueSize <= 0 { return false } return len(p.waiters) < p.maxQueueSize } func (p *Pool) notifyWaiterLocked() { if len(p.waiters) == 0 { return } waiter := p.waiters[0] p.waiters = p.waiters[1:] close(waiter) } func (p *Pool) removeWaiterLocked(waiter chan struct{}) bool { for i, w := range p.waiters { if w != waiter { continue } p.waiters = append(p.waiters[:i], p.waiters[i+1:]...) return true } return false } func (p *Pool) drainWaitersLocked() { for _, waiter := range p.waiters { close(waiter) } p.waiters = nil } func maxQueueFromEnv(defaultSize int) int { for _, key := range []string{"DS2API_ACCOUNT_MAX_QUEUE", "DS2API_ACCOUNT_QUEUE_SIZE"} { raw := strings.TrimSpace(os.Getenv(key)) if raw == "" { continue } n, err := strconv.Atoi(raw) if err == nil && n >= 0 { return n } } if defaultSize < 0 { return 0 } return defaultSize }