test(gateway): 补充账号调度优化的单元测试
- 添加 GetAccountsLoadBatch 批量负载查询测试 - 添加 CleanupExpiredAccountSlots 过期槽位清理测试 - 添加 SelectAccountWithLoadAwareness 负载感知选择测试 - 测试覆盖降级行为、账号排除、错误处理等场景
This commit is contained in:
@@ -274,6 +274,138 @@ func (s *ConcurrencyCacheSuite) TestGetUserConcurrency_Missing() {
|
||||
require.Equal(s.T(), 0, cur)
|
||||
}
|
||||
|
||||
func (s *ConcurrencyCacheSuite) TestGetAccountsLoadBatch() {
|
||||
// Setup: Create accounts with different load states
|
||||
account1 := int64(100)
|
||||
account2 := int64(101)
|
||||
account3 := int64(102)
|
||||
|
||||
// Account 1: 2/3 slots used, 1 waiting
|
||||
ok, err := s.cache.AcquireAccountSlot(s.ctx, account1, 3, "req1")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
ok, err = s.cache.AcquireAccountSlot(s.ctx, account1, 3, "req2")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
ok, err = s.cache.IncrementAccountWaitCount(s.ctx, account1, 5)
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
|
||||
// Account 2: 1/2 slots used, 0 waiting
|
||||
ok, err = s.cache.AcquireAccountSlot(s.ctx, account2, 2, "req3")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
|
||||
// Account 3: 0/1 slots used, 0 waiting (idle)
|
||||
|
||||
// Query batch load
|
||||
accounts := []service.AccountWithConcurrency{
|
||||
{ID: account1, MaxConcurrency: 3},
|
||||
{ID: account2, MaxConcurrency: 2},
|
||||
{ID: account3, MaxConcurrency: 1},
|
||||
}
|
||||
|
||||
loadMap, err := s.cache.GetAccountsLoadBatch(s.ctx, accounts)
|
||||
require.NoError(s.T(), err)
|
||||
require.Len(s.T(), loadMap, 3)
|
||||
|
||||
// Verify account1: (2 + 1) / 3 = 100%
|
||||
load1 := loadMap[account1]
|
||||
require.NotNil(s.T(), load1)
|
||||
require.Equal(s.T(), account1, load1.AccountID)
|
||||
require.Equal(s.T(), 2, load1.CurrentConcurrency)
|
||||
require.Equal(s.T(), 1, load1.WaitingCount)
|
||||
require.Equal(s.T(), 100, load1.LoadRate)
|
||||
|
||||
// Verify account2: (1 + 0) / 2 = 50%
|
||||
load2 := loadMap[account2]
|
||||
require.NotNil(s.T(), load2)
|
||||
require.Equal(s.T(), account2, load2.AccountID)
|
||||
require.Equal(s.T(), 1, load2.CurrentConcurrency)
|
||||
require.Equal(s.T(), 0, load2.WaitingCount)
|
||||
require.Equal(s.T(), 50, load2.LoadRate)
|
||||
|
||||
// Verify account3: (0 + 0) / 1 = 0%
|
||||
load3 := loadMap[account3]
|
||||
require.NotNil(s.T(), load3)
|
||||
require.Equal(s.T(), account3, load3.AccountID)
|
||||
require.Equal(s.T(), 0, load3.CurrentConcurrency)
|
||||
require.Equal(s.T(), 0, load3.WaitingCount)
|
||||
require.Equal(s.T(), 0, load3.LoadRate)
|
||||
}
|
||||
|
||||
func (s *ConcurrencyCacheSuite) TestGetAccountsLoadBatch_Empty() {
|
||||
// Test with empty account list
|
||||
loadMap, err := s.cache.GetAccountsLoadBatch(s.ctx, []service.AccountWithConcurrency{})
|
||||
require.NoError(s.T(), err)
|
||||
require.Empty(s.T(), loadMap)
|
||||
}
|
||||
|
||||
func (s *ConcurrencyCacheSuite) TestCleanupExpiredAccountSlots() {
|
||||
accountID := int64(200)
|
||||
slotKey := fmt.Sprintf("%s%d", accountSlotKeyPrefix, accountID)
|
||||
|
||||
// Acquire 3 slots
|
||||
ok, err := s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req1")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req2")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req3")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
|
||||
// Verify 3 slots exist
|
||||
cur, err := s.cache.GetAccountConcurrency(s.ctx, accountID)
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), 3, cur)
|
||||
|
||||
// Manually set old timestamps for req1 and req2 (simulate expired slots)
|
||||
now := time.Now().Unix()
|
||||
expiredTime := now - int64(testSlotTTL.Seconds()) - 10 // 10 seconds past TTL
|
||||
err = s.rdb.ZAdd(s.ctx, slotKey, redis.Z{Score: float64(expiredTime), Member: "req1"}).Err()
|
||||
require.NoError(s.T(), err)
|
||||
err = s.rdb.ZAdd(s.ctx, slotKey, redis.Z{Score: float64(expiredTime), Member: "req2"}).Err()
|
||||
require.NoError(s.T(), err)
|
||||
|
||||
// Run cleanup
|
||||
err = s.cache.CleanupExpiredAccountSlots(s.ctx, accountID)
|
||||
require.NoError(s.T(), err)
|
||||
|
||||
// Verify only 1 slot remains (req3)
|
||||
cur, err = s.cache.GetAccountConcurrency(s.ctx, accountID)
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), 1, cur)
|
||||
|
||||
// Verify req3 still exists
|
||||
members, err := s.rdb.ZRange(s.ctx, slotKey, 0, -1).Result()
|
||||
require.NoError(s.T(), err)
|
||||
require.Len(s.T(), members, 1)
|
||||
require.Equal(s.T(), "req3", members[0])
|
||||
}
|
||||
|
||||
func (s *ConcurrencyCacheSuite) TestCleanupExpiredAccountSlots_NoExpired() {
|
||||
accountID := int64(201)
|
||||
|
||||
// Acquire 2 fresh slots
|
||||
ok, err := s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req1")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
ok, err = s.cache.AcquireAccountSlot(s.ctx, accountID, 5, "req2")
|
||||
require.NoError(s.T(), err)
|
||||
require.True(s.T(), ok)
|
||||
|
||||
// Run cleanup (should not remove anything)
|
||||
err = s.cache.CleanupExpiredAccountSlots(s.ctx, accountID)
|
||||
require.NoError(s.T(), err)
|
||||
|
||||
// Verify both slots still exist
|
||||
cur, err := s.cache.GetAccountConcurrency(s.ctx, accountID)
|
||||
require.NoError(s.T(), err)
|
||||
require.Equal(s.T(), 2, cur)
|
||||
}
|
||||
|
||||
func TestConcurrencyCacheSuite(t *testing.T) {
|
||||
suite.Run(t, new(ConcurrencyCacheSuite))
|
||||
}
|
||||
|
||||
@@ -837,3 +837,160 @@ func TestAccount_IsMixedSchedulingEnabled(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// mockConcurrencyService for testing
|
||||
type mockConcurrencyService struct {
|
||||
accountLoads map[int64]*AccountLoadInfo
|
||||
accountWaitCounts map[int64]int
|
||||
acquireResults map[int64]bool
|
||||
}
|
||||
|
||||
func (m *mockConcurrencyService) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) {
|
||||
if m.accountLoads == nil {
|
||||
return map[int64]*AccountLoadInfo{}, nil
|
||||
}
|
||||
result := make(map[int64]*AccountLoadInfo)
|
||||
for _, acc := range accounts {
|
||||
if load, ok := m.accountLoads[acc.ID]; ok {
|
||||
result[acc.ID] = load
|
||||
} else {
|
||||
result[acc.ID] = &AccountLoadInfo{
|
||||
AccountID: acc.ID,
|
||||
CurrentConcurrency: 0,
|
||||
WaitingCount: 0,
|
||||
LoadRate: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (m *mockConcurrencyService) GetAccountWaitingCount(ctx context.Context, accountID int64) (int, error) {
|
||||
if m.accountWaitCounts == nil {
|
||||
return 0, nil
|
||||
}
|
||||
return m.accountWaitCounts[accountID], nil
|
||||
}
|
||||
|
||||
// TestGatewayService_SelectAccountWithLoadAwareness tests load-aware account selection
|
||||
func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("禁用负载批量查询-降级到传统选择", func(t *testing.T) {
|
||||
repo := &mockAccountRepoForPlatform{
|
||||
accounts: []Account{
|
||||
{ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
{ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
},
|
||||
accountsByID: map[int64]*Account{},
|
||||
}
|
||||
for i := range repo.accounts {
|
||||
repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i]
|
||||
}
|
||||
|
||||
cache := &mockGatewayCacheForPlatform{}
|
||||
|
||||
cfg := testConfig()
|
||||
cfg.Gateway.Scheduling.LoadBatchEnabled = false
|
||||
|
||||
svc := &GatewayService{
|
||||
accountRepo: repo,
|
||||
cache: cache,
|
||||
cfg: cfg,
|
||||
concurrencyService: nil, // No concurrency service
|
||||
}
|
||||
|
||||
result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.NotNil(t, result.Account)
|
||||
require.Equal(t, int64(1), result.Account.ID, "应选择优先级最高的账号")
|
||||
})
|
||||
|
||||
t.Run("无ConcurrencyService-降级到传统选择", func(t *testing.T) {
|
||||
repo := &mockAccountRepoForPlatform{
|
||||
accounts: []Account{
|
||||
{ID: 1, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
{ID: 2, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
},
|
||||
accountsByID: map[int64]*Account{},
|
||||
}
|
||||
for i := range repo.accounts {
|
||||
repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i]
|
||||
}
|
||||
|
||||
cache := &mockGatewayCacheForPlatform{}
|
||||
|
||||
cfg := testConfig()
|
||||
cfg.Gateway.Scheduling.LoadBatchEnabled = true
|
||||
|
||||
svc := &GatewayService{
|
||||
accountRepo: repo,
|
||||
cache: cache,
|
||||
cfg: cfg,
|
||||
concurrencyService: nil,
|
||||
}
|
||||
|
||||
result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.NotNil(t, result.Account)
|
||||
require.Equal(t, int64(2), result.Account.ID, "应选择优先级最高的账号")
|
||||
})
|
||||
|
||||
t.Run("排除账号-不选择被排除的账号", func(t *testing.T) {
|
||||
repo := &mockAccountRepoForPlatform{
|
||||
accounts: []Account{
|
||||
{ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
{ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5},
|
||||
},
|
||||
accountsByID: map[int64]*Account{},
|
||||
}
|
||||
for i := range repo.accounts {
|
||||
repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i]
|
||||
}
|
||||
|
||||
cache := &mockGatewayCacheForPlatform{}
|
||||
|
||||
cfg := testConfig()
|
||||
cfg.Gateway.Scheduling.LoadBatchEnabled = false
|
||||
|
||||
svc := &GatewayService{
|
||||
accountRepo: repo,
|
||||
cache: cache,
|
||||
cfg: cfg,
|
||||
concurrencyService: nil,
|
||||
}
|
||||
|
||||
excludedIDs := map[int64]struct{}{1: {}}
|
||||
result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", excludedIDs)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.NotNil(t, result.Account)
|
||||
require.Equal(t, int64(2), result.Account.ID, "不应选择被排除的账号")
|
||||
})
|
||||
|
||||
t.Run("无可用账号-返回错误", func(t *testing.T) {
|
||||
repo := &mockAccountRepoForPlatform{
|
||||
accounts: []Account{},
|
||||
accountsByID: map[int64]*Account{},
|
||||
}
|
||||
|
||||
cache := &mockGatewayCacheForPlatform{}
|
||||
|
||||
cfg := testConfig()
|
||||
cfg.Gateway.Scheduling.LoadBatchEnabled = false
|
||||
|
||||
svc := &GatewayService{
|
||||
accountRepo: repo,
|
||||
cache: cache,
|
||||
cfg: cfg,
|
||||
concurrencyService: nil,
|
||||
}
|
||||
|
||||
result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, result)
|
||||
require.Contains(t, err.Error(), "no available accounts")
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user