diff --git a/backend/internal/repository/concurrency_cache_integration_test.go b/backend/internal/repository/concurrency_cache_integration_test.go index f3d70ef1..707cbdab 100644 --- a/backend/internal/repository/concurrency_cache_integration_test.go +++ b/backend/internal/repository/concurrency_cache_integration_test.go @@ -274,6 +274,138 @@ func (s *ConcurrencyCacheSuite) TestGetUserConcurrency_Missing() { require.Equal(s.T(), 0, cur) } +func (s *ConcurrencyCacheSuite) TestGetAccountsLoadBatch() { + // Setup: Create accounts with different load states + account1 := int64(100) + account2 := int64(101) + account3 := int64(102) + + // Account 1: 2/3 slots used, 1 waiting + ok, err := s.cache.AcquireAccountSlot(s.ctx, account1, 3, "req1") + require.NoError(s.T(), err) + require.True(s.T(), ok) + ok, err = s.cache.AcquireAccountSlot(s.ctx, account1, 3, "req2") + require.NoError(s.T(), err) + require.True(s.T(), ok) + ok, err = s.cache.IncrementAccountWaitCount(s.ctx, account1, 5) + require.NoError(s.T(), err) + require.True(s.T(), ok) + + // Account 2: 1/2 slots used, 0 waiting + ok, err = s.cache.AcquireAccountSlot(s.ctx, account2, 2, "req3") + require.NoError(s.T(), err) + require.True(s.T(), ok) + + // Account 3: 0/1 slots used, 0 waiting (idle) + + // Query batch load + accounts := []service.AccountWithConcurrency{ + {ID: account1, MaxConcurrency: 3}, + {ID: account2, MaxConcurrency: 2}, + {ID: account3, MaxConcurrency: 1}, + } + + loadMap, err := s.cache.GetAccountsLoadBatch(s.ctx, accounts) + require.NoError(s.T(), err) + require.Len(s.T(), loadMap, 3) + + // Verify account1: (2 + 1) / 3 = 100% + load1 := loadMap[account1] + require.NotNil(s.T(), load1) + require.Equal(s.T(), account1, load1.AccountID) + require.Equal(s.T(), 2, load1.CurrentConcurrency) + require.Equal(s.T(), 1, load1.WaitingCount) + require.Equal(s.T(), 100, load1.LoadRate) + + // Verify account2: (1 + 0) / 2 = 50% + load2 := loadMap[account2] + require.NotNil(s.T(), load2) + require.Equal(s.T(), account2, load2.AccountID) + require.Equal(s.T(), 1, load2.CurrentConcurrency) + require.Equal(s.T(), 0, load2.WaitingCount) + require.Equal(s.T(), 50, load2.LoadRate) + + // Verify account3: (0 + 0) / 1 = 0% + load3 := loadMap[account3] + require.NotNil(s.T(), load3) + require.Equal(s.T(), account3, load3.AccountID) + require.Equal(s.T(), 0, load3.CurrentConcurrency) + require.Equal(s.T(), 0, load3.WaitingCount) + require.Equal(s.T(), 0, load3.LoadRate) +} + +func (s *ConcurrencyCacheSuite) TestGetAccountsLoadBatch_Empty() { + // Test with empty account list + loadMap, err := s.cache.GetAccountsLoadBatch(s.ctx, []service.AccountWithConcurrency{}) + require.NoError(s.T(), err) + require.Empty(s.T(), loadMap) +} + +func (s *ConcurrencyCacheSuite) TestCleanupExpiredAccountSlots() { + accountID := int64(200) + slotKey := fmt.Sprintf("%s%d", accountSlotKeyPrefix, accountID) + + // Acquire 3 slots + ok, err := s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req1") + require.NoError(s.T(), err) + require.True(s.T(), ok) + ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req2") + require.NoError(s.T(), err) + require.True(s.T(), ok) + ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req3") + require.NoError(s.T(), err) + require.True(s.T(), ok) + + // Verify 3 slots exist + cur, err := s.cache.GetAccountConcurrency(s.ctx, accountID) + require.NoError(s.T(), err) + require.Equal(s.T(), 3, cur) + + // Manually set old timestamps for req1 and req2 (simulate expired slots) + now := time.Now().Unix() + expiredTime := now - int64(testSlotTTL.Seconds()) - 10 // 10 seconds past TTL + err = s.rdb.ZAdd(s.ctx, slotKey, redis.Z{Score: float64(expiredTime), Member: "req1"}).Err() + require.NoError(s.T(), err) + err = s.rdb.ZAdd(s.ctx, slotKey, redis.Z{Score: float64(expiredTime), Member: "req2"}).Err() + require.NoError(s.T(), err) + + // Run cleanup + err = s.cache.CleanupExpiredAccountSlots(s.ctx, accountID) + require.NoError(s.T(), err) + + // Verify only 1 slot remains (req3) + cur, err = s.cache.GetAccountConcurrency(s.ctx, accountID) + require.NoError(s.T(), err) + require.Equal(s.T(), 1, cur) + + // Verify req3 still exists + members, err := s.rdb.ZRange(s.ctx, slotKey, 0, -1).Result() + require.NoError(s.T(), err) + require.Len(s.T(), members, 1) + require.Equal(s.T(), "req3", members[0]) +} + +func (s *ConcurrencyCacheSuite) TestCleanupExpiredAccountSlots_NoExpired() { + accountID := int64(201) + + // Acquire 2 fresh slots + ok, err := s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req1") + require.NoError(s.T(), err) + require.True(s.T(), ok) + ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req2") + require.NoError(s.T(), err) + require.True(s.T(), ok) + + // Run cleanup (should not remove anything) + err = s.cache.CleanupExpiredAccountSlots(s.ctx, accountID) + require.NoError(s.T(), err) + + // Verify both slots still exist + cur, err := s.cache.GetAccountConcurrency(s.ctx, accountID) + require.NoError(s.T(), err) + require.Equal(s.T(), 2, cur) +} + func TestConcurrencyCacheSuite(t *testing.T) { suite.Run(t, new(ConcurrencyCacheSuite)) } diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index e1b61632..560c7767 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -837,3 +837,160 @@ func TestAccount_IsMixedSchedulingEnabled(t *testing.T) { }) } } + +// mockConcurrencyService for testing +type mockConcurrencyService struct { + accountLoads map[int64]*AccountLoadInfo + accountWaitCounts map[int64]int + acquireResults map[int64]bool +} + +func (m *mockConcurrencyService) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) { + if m.accountLoads == nil { + return map[int64]*AccountLoadInfo{}, nil + } + result := make(map[int64]*AccountLoadInfo) + for _, acc := range accounts { + if load, ok := m.accountLoads[acc.ID]; ok { + result[acc.ID] = load + } else { + result[acc.ID] = &AccountLoadInfo{ + AccountID: acc.ID, + CurrentConcurrency: 0, + WaitingCount: 0, + LoadRate: 0, + } + } + } + return result, nil +} + +func (m *mockConcurrencyService) GetAccountWaitingCount(ctx context.Context, accountID int64) (int, error) { + if m.accountWaitCounts == nil { + return 0, nil + } + return m.accountWaitCounts[accountID], nil +} + +// TestGatewayService_SelectAccountWithLoadAwareness tests load-aware account selection +func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) { + ctx := context.Background() + + t.Run("禁用负载批量查询-降级到传统选择", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5}, + {ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{} + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = false + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, // No concurrency service + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(1), result.Account.ID, "应选择优先级最高的账号") + }) + + t.Run("无ConcurrencyService-降级到传统选择", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5}, + {ID: 2, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{} + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = true + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(2), result.Account.ID, "应选择优先级最高的账号") + }) + + t.Run("排除账号-不选择被排除的账号", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5}, + {ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{} + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = false + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, + } + + excludedIDs := map[int64]struct{}{1: {}} + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", excludedIDs) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(2), result.Account.ID, "不应选择被排除的账号") + }) + + t.Run("无可用账号-返回错误", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{}, + accountsByID: map[int64]*Account{}, + } + + cache := &mockGatewayCacheForPlatform{} + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = false + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil) + require.Error(t, err) + require.Nil(t, result) + require.Contains(t, err.Error(), "no available accounts") + }) +}