diff --git a/backend/internal/repository/ops_repo_preagg.go b/backend/internal/repository/ops_repo_preagg.go index 60f6da0f..ad94e13f 100644 --- a/backend/internal/repository/ops_repo_preagg.go +++ b/backend/internal/repository/ops_repo_preagg.go @@ -71,7 +71,9 @@ usage_agg AS ( error_base AS ( SELECT date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, - platform AS platform, + -- platform is NULL for some early-phase errors (e.g. before routing); map to a sentinel + -- value so platform-level GROUPING SETS don't collide with the overall (platform=NULL) row. + COALESCE(platform, 'unknown') AS platform, group_id AS group_id, is_business_limited AS is_business_limited, error_owner AS error_owner, diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index d863291a..c2dbf7c9 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -1211,6 +1211,72 @@ func TestGatewayService_SelectAccountWithLoadAwareness(t *testing.T) { require.Nil(t, result) require.Contains(t, err.Error(), "no available accounts") }) + + t.Run("过滤不可调度账号-限流账号被跳过", func(t *testing.T) { + now := time.Now() + resetAt := now.Add(10 * time.Minute) + + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5, RateLimitResetAt: &resetAt}, + {ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{} + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = false + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(2), result.Account.ID, "应跳过限流账号,选择可用账号") + }) + + t.Run("过滤不可调度账号-过载账号被跳过", func(t *testing.T) { + now := time.Now() + overloadUntil := now.Add(10 * time.Minute) + + repo := &mockAccountRepoForPlatform{ + accounts: []Account{ + {ID: 1, Platform: PlatformAnthropic, Priority: 1, Status: StatusActive, Schedulable: true, Concurrency: 5, OverloadUntil: &overloadUntil}, + {ID: 2, Platform: PlatformAnthropic, Priority: 2, Status: StatusActive, Schedulable: true, Concurrency: 5}, + }, + accountsByID: map[int64]*Account{}, + } + for i := range repo.accounts { + repo.accountsByID[repo.accounts[i].ID] = &repo.accounts[i] + } + + cache := &mockGatewayCacheForPlatform{} + cfg := testConfig() + cfg.Gateway.Scheduling.LoadBatchEnabled = false + + svc := &GatewayService{ + accountRepo: repo, + cache: cache, + cfg: cfg, + concurrencyService: nil, + } + + result, err := svc.SelectAccountWithLoadAwareness(ctx, nil, "", "claude-3-5-sonnet-20241022", nil) + require.NoError(t, err) + require.NotNil(t, result) + require.NotNil(t, result.Account) + require.Equal(t, int64(2), result.Account.ID, "应跳过过载账号,选择可用账号") + }) } func TestGatewayService_GroupResolution_ReusesContextGroup(t *testing.T) { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 2a5c44c6..d5eb0e52 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -511,6 +511,12 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if isExcluded(acc.ID) { continue } + // Scheduler snapshots can be temporarily stale (bucket rebuild is throttled); + // re-check schedulability here so recently rate-limited/overloaded accounts + // are not selected again before the bucket is rebuilt. + if !acc.IsSchedulable() { + continue + } if !s.isAccountAllowedForPlatform(acc, platform, useMixed) { continue } @@ -893,6 +899,11 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if _, excluded := excludedIDs[acc.ID]; excluded { continue } + // Scheduler snapshots can be temporarily stale; re-check schedulability here to + // avoid selecting accounts that were recently rate-limited/overloaded. + if !acc.IsSchedulable() { + continue + } if !acc.IsSchedulableForModel(requestedModel) { continue } @@ -977,6 +988,11 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if _, excluded := excludedIDs[acc.ID]; excluded { continue } + // Scheduler snapshots can be temporarily stale; re-check schedulability here to + // avoid selecting accounts that were recently rate-limited/overloaded. + if !acc.IsSchedulable() { + continue + } // 过滤:原生平台直接通过,antigravity 需要启用混合调度 if acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() { continue diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index bac117b8..04a90fdd 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -186,6 +186,11 @@ func (s *OpenAIGatewayService) SelectAccountForModelWithExclusions(ctx context.C if _, excluded := excludedIDs[acc.ID]; excluded { continue } + // Scheduler snapshots can be temporarily stale; re-check schedulability here to + // avoid selecting accounts that were recently rate-limited/overloaded. + if !acc.IsSchedulable() { + continue + } // Check model support if requestedModel != "" && !acc.IsModelSupported(requestedModel) { continue @@ -332,6 +337,12 @@ func (s *OpenAIGatewayService) SelectAccountWithLoadAwareness(ctx context.Contex if isExcluded(acc.ID) { continue } + // Scheduler snapshots can be temporarily stale (bucket rebuild is throttled); + // re-check schedulability here so recently rate-limited/overloaded accounts + // are not selected again before the bucket is rebuilt. + if !acc.IsSchedulable() { + continue + } if requestedModel != "" && !acc.IsModelSupported(requestedModel) { continue } diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 55e11b30..42b88b7d 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -3,6 +3,7 @@ package service import ( "bufio" "bytes" + "context" "errors" "io" "net/http" @@ -15,6 +16,129 @@ import ( "github.com/gin-gonic/gin" ) +type stubOpenAIAccountRepo struct { + AccountRepository + accounts []Account +} + +func (r stubOpenAIAccountRepo) ListSchedulableByGroupIDAndPlatform(ctx context.Context, groupID int64, platform string) ([]Account, error) { + return append([]Account(nil), r.accounts...), nil +} + +func (r stubOpenAIAccountRepo) ListSchedulableByPlatform(ctx context.Context, platform string) ([]Account, error) { + return append([]Account(nil), r.accounts...), nil +} + +type stubConcurrencyCache struct { + ConcurrencyCache +} + +func (c stubConcurrencyCache) AcquireAccountSlot(ctx context.Context, accountID int64, maxConcurrency int, requestID string) (bool, error) { + return true, nil +} + +func (c stubConcurrencyCache) ReleaseAccountSlot(ctx context.Context, accountID int64, requestID string) error { + return nil +} + +func (c stubConcurrencyCache) GetAccountsLoadBatch(ctx context.Context, accounts []AccountWithConcurrency) (map[int64]*AccountLoadInfo, error) { + out := make(map[int64]*AccountLoadInfo, len(accounts)) + for _, acc := range accounts { + out[acc.ID] = &AccountLoadInfo{AccountID: acc.ID, LoadRate: 0} + } + return out, nil +} + +func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) { + now := time.Now() + resetAt := now.Add(10 * time.Minute) + groupID := int64(1) + + rateLimited := Account{ + ID: 1, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + RateLimitResetAt: &resetAt, + } + available := Account{ + ID: 2, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 1, + } + + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{rateLimited, available}}, + concurrencyService: NewConcurrencyService(stubConcurrencyCache{}), + } + + selection, err := svc.SelectAccountWithLoadAwareness(context.Background(), &groupID, "", "gpt-5.2", nil) + if err != nil { + t.Fatalf("SelectAccountWithLoadAwareness error: %v", err) + } + if selection == nil || selection.Account == nil { + t.Fatalf("expected selection with account") + } + if selection.Account.ID != available.ID { + t.Fatalf("expected account %d, got %d", available.ID, selection.Account.ID) + } + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + +func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulableWhenNoConcurrencyService(t *testing.T) { + now := time.Now() + resetAt := now.Add(10 * time.Minute) + groupID := int64(1) + + rateLimited := Account{ + ID: 1, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 0, + RateLimitResetAt: &resetAt, + } + available := Account{ + ID: 2, + Platform: PlatformOpenAI, + Type: AccountTypeAPIKey, + Status: StatusActive, + Schedulable: true, + Concurrency: 1, + Priority: 1, + } + + svc := &OpenAIGatewayService{ + accountRepo: stubOpenAIAccountRepo{accounts: []Account{rateLimited, available}}, + // concurrencyService is nil, forcing the non-load-batch selection path. + } + + selection, err := svc.SelectAccountWithLoadAwareness(context.Background(), &groupID, "", "gpt-5.2", nil) + if err != nil { + t.Fatalf("SelectAccountWithLoadAwareness error: %v", err) + } + if selection == nil || selection.Account == nil { + t.Fatalf("expected selection with account") + } + if selection.Account.ID != available.ID { + t.Fatalf("expected account %d, got %d", available.ID, selection.Account.ID) + } + if selection.ReleaseFunc != nil { + selection.ReleaseFunc() + } +} + func TestOpenAIStreamingTimeout(t *testing.T) { gin.SetMode(gin.TestMode) cfg := &config.Config{