From 7a0ca05233e318a177e3eb700bf799bd776be5b5 Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sat, 10 Jan 2026 14:39:33 +0800 Subject: [PATCH 1/2] =?UTF-8?q?perf(=E7=BD=91=E5=85=B3):=20=E7=B2=98?= =?UTF-8?q?=E6=80=A7=E4=BC=9A=E8=AF=9D=E5=91=BD=E4=B8=AD=E5=A4=8D=E7=94=A8?= =?UTF-8?q?=E5=80=99=E9=80=89=E8=B4=A6=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 使用候选账号映射避免粘性命中时额外的 GetByID 查询 新增单测确保粘性命中不触发 GetByID 且提前返回 --- .../service/gateway_multiplatform_test.go | 106 ++++++++++++++++++ backend/internal/service/gateway_service.go | 9 +- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 887f26e8..acc6952e 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -24,9 +24,11 @@ type mockAccountRepoForPlatform struct { accounts []Account accountsByID map[int64]*Account listPlatformFunc func(ctx context.Context, platform string) ([]Account, error) + getByIDCalls int } func (m *mockAccountRepoForPlatform) GetByID(ctx context.Context, id int64) (*Account, error) { + m.getByIDCalls++ if acc, ok := m.accountsByID[id]; ok { return acc, nil } @@ -948,6 +950,74 @@ func (m *mockConcurrencyService) GetAccountWaitingCount(ctx context.Context, acc return m.accountWaitCounts[accountID], nil } +type mockConcurrencyCache struct { + acquireAccountCalls int + loadBatchCalls int +} + +func (m *mockConcurrencyCache) AcquireAccountSlot(ctx context.Context, accountID int64, maxConcurrency int, requestID string) (bool, error) { + m.acquireAccountCalls++ + return true, nil +} + +func (m *mockConcurrencyCache) ReleaseAccountSlot(ctx context.Context, accountID int64, requestID string) error { + return nil +} + +func (m *mockConcurrencyCache) GetAccountConcurrency(ctx context.Context, accountID int64) (int, error) { + return 0, nil +} + +func (m *mockConcurrencyCache) IncrementAccountWaitCount(ctx context.Context, accountID int64, maxWait int) (bool, error) { + return true, nil +} + +func (m *mockConcurrencyCache) DecrementAccountWaitCount(ctx context.Context, accountID int64) error { + return nil +} + +func (m *mockConcurrencyCache) GetAccountWaitingCount(ctx context.Context, accountID int64) (int, error) { + return 0, nil +} + +func (m *mockConcurrencyCache) AcquireUserSlot(ctx context.Context, userID int64, maxConcurrency int, requestID string) (bool, error) { + return true, nil +} + +func (m *mockConcurrencyCache) ReleaseUserSlot(ctx context.Context, userID int64, requestID string) error { + return nil +} + +func (m *mockConcurrencyCache) GetUserConcurrency(ctx context.Context, userID int64) (int, error) { + return 0, nil +} + +func (m *mockConcurrencyCache) IncrementWaitCount(ctx context.Context, userID int64, maxWait int) (bool, error) { + return true, nil +} + +func (m *mockConcurrencyCache) DecrementWaitCount(ctx context.Context, userID int64) error { + return nil +} + +func (m *mockConcurrencyCache) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) { + m.loadBatchCalls++ + result := make(map[int64]*AccountLoadInfo, len(accounts)) + for _, acc := range accounts { + result[acc.ID] = &AccountLoadInfo{ + AccountID: acc.ID, + CurrentConcurrency: 0, + WaitingCount: 0, + LoadRate: 0, + } + } + return result, nil +} + +func (m *mockConcurrencyCache) CleanupExpiredAccountSlots(ctx context.Context, accountID int64) error { + return nil +} + // TestGatewayService_SelectAccountWithLoadAwareness tests load-aware account selection func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) { ctx := context.Background() @@ -1046,6 +1116,42 @@ func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) { require.Equal(t, int64(2), result.Account.ID, "不应选择被排除的账号") }) + t.Run("粘性命中-不调用GetByID", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{ + sessionBindings: map[string]int64{"sticky": 1}, + } + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = true + + concurrencyCache := &mockConcurrencyCache{} + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "sticky", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(1), result.Account.ID) + require.Equal(t, 0, repo.getByIDCalls, "粘性命中不应调用GetByID") + require.Equal(t, 0, concurrencyCache.loadBatchCalls, "粘性命中应在负载批量查询前返回") + }) + t.Run("无可用账号-返回错误", func(t *testing.T) { repo := &mockAccountRepoForPlatform{ accounts: []Account{}, diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 90a22f5a..1ca24d4c 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -453,6 +453,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro return nil, errors.New("no available accounts") } + accountByID := make(map[int64]*Account, len(accounts)) + for i := range accounts { + accountByID[accounts[i].ID] = &accounts[i] + } + isExcluded := func(accountID int64) bool { if excludedIDs == nil { return false @@ -465,8 +470,8 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if sessionHash != "" && s.cache != nil { accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash) if err == nil && accountID > 0 && !isExcluded(accountID) { - account, err := s.accountRepo.GetByID(ctx, accountID) - if err == nil && s.isAccountInGroup(account, groupID) && + account, ok := accountByID[accountID] + if ok && s.isAccountInGroup(account, groupID) && s.isAccountAllowedForPlatform(account, platform, useMixed) && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { From 6a9cc13e3e4a5ebb6dfd5b157876a0283f0e655e Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sat, 10 Jan 2026 14:51:16 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix(=E7=BD=91=E5=85=B3):=20=E6=98=8E?= =?UTF-8?q?=E7=A1=AE=E7=B2=98=E6=80=A7=E5=91=BD=E4=B8=AD=E8=8C=83=E5=9B=B4?= =?UTF-8?q?=E5=B9=B6=E4=BC=98=E5=8C=96=E6=98=A0=E5=B0=84=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 仅在粘性命中时构建候选账号映射以减少开销 新增用例验证粘性账号缺失时回退负载感知选择 --- .../service/gateway_multiplatform_test.go | 36 +++++++++++++++++++ backend/internal/service/gateway_service.go | 10 +++--- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index acc6952e..d863291a 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -1152,6 +1152,42 @@ func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) { require.Equal(t, 0, concurrencyCache.loadBatchCalls, "粘性命中应在负载批量查询前返回") }) + t.Run("粘性账号不在候选集-回退负载感知选择", func(t *testing.T) { + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 2, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{ + sessionBindings: map[string]int64{"sticky": 1}, + } + + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = true + + concurrencyCache := &mockConcurrencyCache{} + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: NewConcurrencyService(concurrencyCache), + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "sticky", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(2), result.Account.ID, "粘性账号不在候选集时应回退到可用账号") + require.Equal(t, 0, repo.getByIDCalls, "粘性账号缺失不应回退到GetByID") + require.Equal(t, 1, concurrencyCache.loadBatchCalls, "应继续进行负载批量查询") + }) + t.Run("无可用账号-返回错误", func(t *testing.T) { repo := &mockAccountRepoForPlatform{ accounts: []Account{}, diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 1ca24d4c..31148b17 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -453,11 +453,6 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro return nil, errors.New("no available accounts") } - accountByID := make(map[int64]*Account, len(accounts)) - for i := range accounts { - accountByID[accounts[i].ID] = &accounts[i] - } - isExcluded := func(accountID int64) bool { if excludedIDs == nil { return false @@ -470,6 +465,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if sessionHash != "" && s.cache != nil { accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash) if err == nil && accountID > 0 && !isExcluded(accountID) { + // 粘性命中仅在当前可调度候选集中生效。 + accountByID := make(map[int64]*Account, len(accounts)) + for i := range accounts { + accountByID[accounts[i].ID] = &accounts[i] + } account, ok := accountByID[accountID] if ok && s.isAccountInGroup(account, groupID) && s.isAccountAllowedForPlatform(account, platform, useMixed) &&