From fc095bf054dddc6e998d9e27fc4029b7449af068 Mon Sep 17 00:00:00 2001 From: erio Date: Mon, 9 Feb 2026 08:19:01 +0800 Subject: [PATCH] refactor: replace scope-level rate limiting with model-level rate limiting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merge functional changes from develop branch: - Remove AntigravityQuotaScope system (claude/gemini_text/gemini_image) - Replace with per-model rate limiting using resolveAntigravityModelKey - Remove model load statistics (IncrModelCallCount/GetModelLoadBatch) - Simplify account selection to unified priority→load→LRU algorithm - Remove SetAntigravityQuotaScopeLimit from AccountRepository - Clean up scope-related UI indicators and API fields --- backend/internal/handler/dto/types.go | 8 - backend/internal/repository/account_repo.go | 47 ---- backend/internal/repository/gateway_cache.go | 97 ------- ...teway_cache_model_load_integration_test.go | 234 ---------------- backend/internal/service/account_service.go | 1 - .../service/account_service_delete_test.go | 4 - .../service/antigravity_gateway_service.go | 73 +++-- .../service/antigravity_quota_scope.go | 149 +--------- .../service/antigravity_rate_limit_test.go | 79 ++---- .../service/antigravity_smart_retry_test.go | 40 +-- .../service/gateway_multiplatform_test.go | 15 - backend/internal/service/gateway_service.go | 264 ++---------------- .../service/gemini_multiplatform_test.go | 11 - .../internal/service/model_rate_limit_test.go | 152 +--------- .../service/openai_gateway_service_test.go | 8 - .../service/ops_account_availability.go | 20 -- .../internal/service/ops_realtime_models.go | 33 +-- frontend/src/api/admin/ops.ts | 3 - .../account/AccountStatusIndicator.vue | 29 -- frontend/src/i18n/locales/en.ts | 2 - frontend/src/i18n/locales/zh.ts | 2 - frontend/src/types/index.ts | 3 - .../ops/components/OpsConcurrencyCard.vue | 25 +- 23 files changed, 137 insertions(+), 1162 deletions(-) delete mode 100644 backend/internal/repository/gateway_cache_model_load_integration_test.go diff --git a/backend/internal/handler/dto/types.go b/backend/internal/handler/dto/types.go index 2338eb78..2c1ae83c 100644 --- a/backend/internal/handler/dto/types.go +++ b/backend/internal/handler/dto/types.go @@ -2,11 +2,6 @@ package dto import "time" -type ScopeRateLimitInfo struct { - ResetAt time.Time `json:"reset_at"` - RemainingSec int64 `json:"remaining_sec"` -} - type User struct { ID int64 `json:"id"` Email string `json:"email"` @@ -129,9 +124,6 @@ type Account struct { RateLimitResetAt *time.Time `json:"rate_limit_reset_at"` OverloadUntil *time.Time `json:"overload_until"` - // Antigravity scope 级限流状态(从 extra 提取) - ScopeRateLimits map[string]ScopeRateLimitInfo `json:"scope_rate_limits,omitempty"` - TempUnschedulableUntil *time.Time `json:"temp_unschedulable_until"` TempUnschedulableReason string `json:"temp_unschedulable_reason"` diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index 11c206d8..7fb7d4ed 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -798,53 +798,6 @@ func (r *accountRepository) SetRateLimited(ctx context.Context, id int64, resetA return nil } -func (r *accountRepository) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope service.AntigravityQuotaScope, resetAt time.Time) error { - now := time.Now().UTC() - payload := map[string]string{ - "rate_limited_at": now.Format(time.RFC3339), - "rate_limit_reset_at": resetAt.UTC().Format(time.RFC3339), - } - raw, err := json.Marshal(payload) - if err != nil { - return err - } - - scopeKey := string(scope) - client := clientFromContext(ctx, r.client) - result, err := client.ExecContext( - ctx, - `UPDATE accounts SET - extra = jsonb_set( - jsonb_set(COALESCE(extra, '{}'::jsonb), '{antigravity_quota_scopes}'::text[], COALESCE(extra->'antigravity_quota_scopes', '{}'::jsonb), true), - ARRAY['antigravity_quota_scopes', $1]::text[], - $2::jsonb, - true - ), - updated_at = NOW(), - last_used_at = NOW() - WHERE id = $3 AND deleted_at IS NULL`, - scopeKey, - raw, - id, - ) - if err != nil { - return err - } - - affected, err := result.RowsAffected() - if err != nil { - return err - } - if affected == 0 { - return service.ErrAccountNotFound - } - - if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { - log.Printf("[SchedulerOutbox] enqueue quota scope failed: account=%d err=%v", id, err) - } - return nil -} - func (r *accountRepository) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { if scope == "" { return nil diff --git a/backend/internal/repository/gateway_cache.go b/backend/internal/repository/gateway_cache.go index 2c4f3b8e..58291b66 100644 --- a/backend/internal/repository/gateway_cache.go +++ b/backend/internal/repository/gateway_cache.go @@ -11,14 +11,6 @@ import ( const stickySessionPrefix = "sticky_session:" -// 模型负载统计相关常量 -const ( - modelLoadKeyPrefix = "ag:model_load:" // 模型调用次数 key 前缀 - modelLastUsedKeyPrefix = "ag:model_last_used:" // 模型最后调度时间 key 前缀 - modelLoadTTL = 24 * time.Hour // 调用次数 TTL(24 小时无调用后清零) - modelLastUsedTTL = 24 * time.Hour // 最后调度时间 TTL -) - type gatewayCache struct { rdb *redis.Client } @@ -59,92 +51,3 @@ func (c *gatewayCache) DeleteSessionAccountID(ctx context.Context, groupID int64 key := buildSessionKey(groupID, sessionHash) return c.rdb.Del(ctx, key).Err() } - -// ============ Antigravity 模型负载统计方法 ============ - -// modelLoadKey 构建模型调用次数 key -// 格式: ag:model_load:{accountID}:{model} -func modelLoadKey(accountID int64, model string) string { - return fmt.Sprintf("%s%d:%s", modelLoadKeyPrefix, accountID, model) -} - -// modelLastUsedKey 构建模型最后调度时间 key -// 格式: ag:model_last_used:{accountID}:{model} -func modelLastUsedKey(accountID int64, model string) string { - return fmt.Sprintf("%s%d:%s", modelLastUsedKeyPrefix, accountID, model) -} - -// IncrModelCallCount 增加模型调用次数并更新最后调度时间 -// 返回更新后的调用次数 -func (c *gatewayCache) IncrModelCallCount(ctx context.Context, accountID int64, model string) (int64, error) { - loadKey := modelLoadKey(accountID, model) - lastUsedKey := modelLastUsedKey(accountID, model) - - pipe := c.rdb.Pipeline() - incrCmd := pipe.Incr(ctx, loadKey) - pipe.Expire(ctx, loadKey, modelLoadTTL) // 每次调用刷新 TTL - pipe.Set(ctx, lastUsedKey, time.Now().Unix(), modelLastUsedTTL) - if _, err := pipe.Exec(ctx); err != nil { - return 0, err - } - return incrCmd.Val(), nil -} - -// GetModelLoadBatch 批量获取账号的模型负载信息 -func (c *gatewayCache) GetModelLoadBatch(ctx context.Context, accountIDs []int64, model string) (map[int64]*service.ModelLoadInfo, error) { - if len(accountIDs) == 0 { - return make(map[int64]*service.ModelLoadInfo), nil - } - - loadCmds, lastUsedCmds := c.pipelineModelLoadGet(ctx, accountIDs, model) - return c.parseModelLoadResults(accountIDs, loadCmds, lastUsedCmds), nil -} - -// pipelineModelLoadGet 批量获取模型负载的 Pipeline 操作 -func (c *gatewayCache) pipelineModelLoadGet( - ctx context.Context, - accountIDs []int64, - model string, -) (map[int64]*redis.StringCmd, map[int64]*redis.StringCmd) { - pipe := c.rdb.Pipeline() - loadCmds := make(map[int64]*redis.StringCmd, len(accountIDs)) - lastUsedCmds := make(map[int64]*redis.StringCmd, len(accountIDs)) - - for _, id := range accountIDs { - loadCmds[id] = pipe.Get(ctx, modelLoadKey(id, model)) - lastUsedCmds[id] = pipe.Get(ctx, modelLastUsedKey(id, model)) - } - _, _ = pipe.Exec(ctx) // 忽略错误,key 不存在是正常的 - return loadCmds, lastUsedCmds -} - -// parseModelLoadResults 解析 Pipeline 结果 -func (c *gatewayCache) parseModelLoadResults( - accountIDs []int64, - loadCmds map[int64]*redis.StringCmd, - lastUsedCmds map[int64]*redis.StringCmd, -) map[int64]*service.ModelLoadInfo { - result := make(map[int64]*service.ModelLoadInfo, len(accountIDs)) - for _, id := range accountIDs { - result[id] = &service.ModelLoadInfo{ - CallCount: getInt64OrZero(loadCmds[id]), - LastUsedAt: getTimeOrZero(lastUsedCmds[id]), - } - } - return result -} - -// getInt64OrZero 从 StringCmd 获取 int64 值,失败返回 0 -func getInt64OrZero(cmd *redis.StringCmd) int64 { - val, _ := cmd.Int64() - return val -} - -// getTimeOrZero 从 StringCmd 获取 time.Time,失败返回零值 -func getTimeOrZero(cmd *redis.StringCmd) time.Time { - val, err := cmd.Int64() - if err != nil { - return time.Time{} - } - return time.Unix(val, 0) -} diff --git a/backend/internal/repository/gateway_cache_model_load_integration_test.go b/backend/internal/repository/gateway_cache_model_load_integration_test.go deleted file mode 100644 index de6fa5ae..00000000 --- a/backend/internal/repository/gateway_cache_model_load_integration_test.go +++ /dev/null @@ -1,234 +0,0 @@ -//go:build integration - -package repository - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -// ============ Gateway Cache 模型负载统计集成测试 ============ - -type GatewayCacheModelLoadSuite struct { - suite.Suite -} - -func TestGatewayCacheModelLoadSuite(t *testing.T) { - suite.Run(t, new(GatewayCacheModelLoadSuite)) -} - -func (s *GatewayCacheModelLoadSuite) TestIncrModelCallCount_Basic() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - accountID := int64(123) - model := "claude-sonnet-4-20250514" - - // 首次调用应返回 1 - count1, err := cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - require.Equal(t, int64(1), count1) - - // 第二次调用应返回 2 - count2, err := cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - require.Equal(t, int64(2), count2) - - // 第三次调用应返回 3 - count3, err := cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - require.Equal(t, int64(3), count3) -} - -func (s *GatewayCacheModelLoadSuite) TestIncrModelCallCount_DifferentModels() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - accountID := int64(456) - model1 := "claude-sonnet-4-20250514" - model2 := "claude-opus-4-5-20251101" - - // 不同模型应该独立计数 - count1, err := cache.IncrModelCallCount(ctx, accountID, model1) - require.NoError(t, err) - require.Equal(t, int64(1), count1) - - count2, err := cache.IncrModelCallCount(ctx, accountID, model2) - require.NoError(t, err) - require.Equal(t, int64(1), count2) - - count1Again, err := cache.IncrModelCallCount(ctx, accountID, model1) - require.NoError(t, err) - require.Equal(t, int64(2), count1Again) -} - -func (s *GatewayCacheModelLoadSuite) TestIncrModelCallCount_DifferentAccounts() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - account1 := int64(111) - account2 := int64(222) - model := "gemini-2.5-pro" - - // 不同账号应该独立计数 - count1, err := cache.IncrModelCallCount(ctx, account1, model) - require.NoError(t, err) - require.Equal(t, int64(1), count1) - - count2, err := cache.IncrModelCallCount(ctx, account2, model) - require.NoError(t, err) - require.Equal(t, int64(1), count2) -} - -func (s *GatewayCacheModelLoadSuite) TestGetModelLoadBatch_Empty() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - result, err := cache.GetModelLoadBatch(ctx, []int64{}, "any-model") - require.NoError(t, err) - require.NotNil(t, result) - require.Empty(t, result) -} - -func (s *GatewayCacheModelLoadSuite) TestGetModelLoadBatch_NonExistent() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - // 查询不存在的账号应返回零值 - result, err := cache.GetModelLoadBatch(ctx, []int64{9999, 9998}, "claude-sonnet-4-20250514") - require.NoError(t, err) - require.Len(t, result, 2) - - require.Equal(t, int64(0), result[9999].CallCount) - require.True(t, result[9999].LastUsedAt.IsZero()) - require.Equal(t, int64(0), result[9998].CallCount) - require.True(t, result[9998].LastUsedAt.IsZero()) -} - -func (s *GatewayCacheModelLoadSuite) TestGetModelLoadBatch_AfterIncrement() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - accountID := int64(789) - model := "claude-sonnet-4-20250514" - - // 先增加调用次数 - beforeIncr := time.Now() - _, err := cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - _, err = cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - _, err = cache.IncrModelCallCount(ctx, accountID, model) - require.NoError(t, err) - afterIncr := time.Now() - - // 获取负载信息 - result, err := cache.GetModelLoadBatch(ctx, []int64{accountID}, model) - require.NoError(t, err) - require.Len(t, result, 1) - - loadInfo := result[accountID] - require.NotNil(t, loadInfo) - require.Equal(t, int64(3), loadInfo.CallCount) - require.False(t, loadInfo.LastUsedAt.IsZero()) - // LastUsedAt 应该在 beforeIncr 和 afterIncr 之间 - require.True(t, loadInfo.LastUsedAt.After(beforeIncr.Add(-time.Second)) || loadInfo.LastUsedAt.Equal(beforeIncr)) - require.True(t, loadInfo.LastUsedAt.Before(afterIncr.Add(time.Second)) || loadInfo.LastUsedAt.Equal(afterIncr)) -} - -func (s *GatewayCacheModelLoadSuite) TestGetModelLoadBatch_MultipleAccounts() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - model := "claude-opus-4-5-20251101" - account1 := int64(1001) - account2 := int64(1002) - account3 := int64(1003) // 不调用 - - // account1 调用 2 次 - _, err := cache.IncrModelCallCount(ctx, account1, model) - require.NoError(t, err) - _, err = cache.IncrModelCallCount(ctx, account1, model) - require.NoError(t, err) - - // account2 调用 5 次 - for i := 0; i < 5; i++ { - _, err = cache.IncrModelCallCount(ctx, account2, model) - require.NoError(t, err) - } - - // 批量获取 - result, err := cache.GetModelLoadBatch(ctx, []int64{account1, account2, account3}, model) - require.NoError(t, err) - require.Len(t, result, 3) - - require.Equal(t, int64(2), result[account1].CallCount) - require.False(t, result[account1].LastUsedAt.IsZero()) - - require.Equal(t, int64(5), result[account2].CallCount) - require.False(t, result[account2].LastUsedAt.IsZero()) - - require.Equal(t, int64(0), result[account3].CallCount) - require.True(t, result[account3].LastUsedAt.IsZero()) -} - -func (s *GatewayCacheModelLoadSuite) TestGetModelLoadBatch_ModelIsolation() { - t := s.T() - rdb := testRedis(t) - cache := &gatewayCache{rdb: rdb} - ctx := context.Background() - - accountID := int64(2001) - model1 := "claude-sonnet-4-20250514" - model2 := "gemini-2.5-pro" - - // 对 model1 调用 3 次 - for i := 0; i < 3; i++ { - _, err := cache.IncrModelCallCount(ctx, accountID, model1) - require.NoError(t, err) - } - - // 获取 model1 的负载 - result1, err := cache.GetModelLoadBatch(ctx, []int64{accountID}, model1) - require.NoError(t, err) - require.Equal(t, int64(3), result1[accountID].CallCount) - - // 获取 model2 的负载(应该为 0) - result2, err := cache.GetModelLoadBatch(ctx, []int64{accountID}, model2) - require.NoError(t, err) - require.Equal(t, int64(0), result2[accountID].CallCount) -} - -// ============ 辅助函数测试 ============ - -func (s *GatewayCacheModelLoadSuite) TestModelLoadKey_Format() { - t := s.T() - - key := modelLoadKey(123, "claude-sonnet-4") - require.Equal(t, "ag:model_load:123:claude-sonnet-4", key) -} - -func (s *GatewayCacheModelLoadSuite) TestModelLastUsedKey_Format() { - t := s.T() - - key := modelLastUsedKey(456, "gemini-2.5-pro") - require.Equal(t, "ag:model_last_used:456:gemini-2.5-pro", key) -} diff --git a/backend/internal/service/account_service.go b/backend/internal/service/account_service.go index 90365d2f..9bf58988 100644 --- a/backend/internal/service/account_service.go +++ b/backend/internal/service/account_service.go @@ -50,7 +50,6 @@ type AccountRepository interface { ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]Account, error) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error - SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error SetOverloaded(ctx context.Context, id int64, until time.Time) error SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error diff --git a/backend/internal/service/account_service_delete_test.go b/backend/internal/service/account_service_delete_test.go index e5eabfc6..af3a3784 100644 --- a/backend/internal/service/account_service_delete_test.go +++ b/backend/internal/service/account_service_delete_test.go @@ -143,10 +143,6 @@ func (s *accountRepoStub) SetRateLimited(ctx context.Context, id int64, resetAt panic("unexpected SetRateLimited call") } -func (s *accountRepoStub) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { - panic("unexpected SetAntigravityQuotaScopeLimit call") -} - func (s *accountRepoStub) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { panic("unexpected SetModelRateLimit call") } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index faac0a90..014b3c86 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -101,12 +101,11 @@ type antigravityRetryLoopParams struct { accessToken string action string body []byte - quotaScope AntigravityQuotaScope c *gin.Context httpUpstream HTTPUpstream settingService *SettingService accountRepo AccountRepository // 用于智能重试的模型级别限流 - handleError func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult + handleError func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult requestedModel string // 用于限流检查的原始请求模型 isStickySession bool // 是否为粘性会话(用于账号切换时的缓存计费判断) groupID int64 // 用于模型级限流时清除粘性会话 @@ -158,8 +157,8 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam resetAt := time.Now().Add(rateLimitDuration) if !setModelRateLimitByModelName(p.ctx, p.accountRepo, p.account.ID, modelName, p.prefix, resp.StatusCode, resetAt, false) { - p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope, p.groupID, p.sessionHash, p.isStickySession) - log.Printf("%s status=%d rate_limited account=%d (no scope mapping)", p.prefix, resp.StatusCode, p.account.ID) + p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.requestedModel, p.groupID, p.sessionHash, p.isStickySession) + log.Printf("%s status=%d rate_limited account=%d (no model mapping)", p.prefix, resp.StatusCode, p.account.ID) } else { s.updateAccountModelRateLimitInCache(p.ctx, p.account, modelName, resetAt) } @@ -195,7 +194,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) if err != nil { log.Printf("%s status=smart_retry_request_build_failed error=%v", p.prefix, err) - p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope, p.groupID, p.sessionHash, p.isStickySession) + p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.requestedModel, p.groupID, p.sessionHash, p.isStickySession) return &smartRetryResult{ action: smartRetryActionBreakWithResp, resp: &http.Response{ @@ -427,7 +426,7 @@ urlFallbackLoop: } // 重试用尽,标记账户限流 - p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope, p.groupID, p.sessionHash, p.isStickySession) + p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.requestedModel, p.groupID, p.sessionHash, p.isStickySession) log.Printf("%s status=%d rate_limited base_url=%s body=%s", p.prefix, resp.StatusCode, baseURL, truncateForLog(respBody, 200)) resp = &http.Response{ StatusCode: resp.StatusCode, @@ -618,7 +617,7 @@ func (s *AntigravityGatewayService) applyErrorPolicy(p antigravityRetryLoopParam return true, nil case ErrorPolicyMatched: _ = p.handleError(p.ctx, p.prefix, p.account, statusCode, headers, respBody, - p.quotaScope, p.groupID, p.sessionHash, p.isStickySession) + p.requestedModel, p.groupID, p.sessionHash, p.isStickySession) return true, nil case ErrorPolicyTempUnscheduled: slog.Info("temp_unschedulable_matched", @@ -1023,6 +1022,7 @@ func isModelNotFoundError(statusCode int, body []byte) bool { // ├─ 成功 → 正常返回 // └─ 失败 → 设置模型限流 + 清除粘性绑定 → 切换账号 func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, isStickySession bool) (*ForwardResult, error) { + // 上游透传账号直接转发,不走 OAuth token 刷新 if account.Type == AccountTypeUpstream { return s.ForwardUpstream(ctx, c, account, body) } @@ -1046,11 +1046,9 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, if mappedModel == "" { return nil, s.writeClaudeError(c, http.StatusForbidden, "permission_error", fmt.Sprintf("model %s not in whitelist", claudeReq.Model)) } - loadModel := mappedModel // 应用 thinking 模式自动后缀:如果 thinking 开启且目标是 claude-sonnet-4-5,自动改为 thinking 版本 thinkingEnabled := claudeReq.Thinking != nil && claudeReq.Thinking.Type == "enabled" mappedModel = applyThinkingModelSuffix(mappedModel, thinkingEnabled) - quotaScope, _ := resolveAntigravityQuotaScope(originalModel) // 获取 access_token if s.tokenProvider == nil { @@ -1085,11 +1083,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回 action := "streamGenerateContent" - // 统计模型调用次数(包括粘性会话,用于负载均衡调度) - if s.cache != nil { - _, _ = s.cache.IncrModelCallCount(ctx, account.ID, loadModel) - } - // 执行带重试的请求 result, err := s.antigravityRetryLoop(antigravityRetryLoopParams{ ctx: ctx, @@ -1099,7 +1092,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, accessToken: accessToken, action: action, body: geminiBody, - quotaScope: quotaScope, c: c, httpUpstream: s.httpUpstream, settingService: s.settingService, @@ -1180,7 +1172,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, accessToken: accessToken, action: action, body: retryGeminiBody, - quotaScope: quotaScope, c: c, httpUpstream: s.httpUpstream, settingService: s.settingService, @@ -1291,7 +1282,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, } } - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) if s.shouldFailoverUpstreamError(resp.StatusCode) { upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) @@ -1674,7 +1665,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co if len(body) == 0 { return nil, s.writeGoogleError(c, http.StatusBadRequest, "Request body is empty") } - quotaScope, _ := resolveAntigravityQuotaScope(originalModel) // 解析请求以获取 image_size(用于图片计费) imageSize := s.extractImageSize(body) @@ -1744,11 +1734,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回 upstreamAction := "streamGenerateContent" - // 统计模型调用次数(包括粘性会话,用于负载均衡调度) - if s.cache != nil { - _, _ = s.cache.IncrModelCallCount(ctx, account.ID, mappedModel) - } - // 执行带重试的请求 result, err := s.antigravityRetryLoop(antigravityRetryLoopParams{ ctx: ctx, @@ -1758,7 +1743,6 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co accessToken: accessToken, action: upstreamAction, body: wrappedBody, - quotaScope: quotaScope, c: c, httpUpstream: s.httpUpstream, settingService: s.settingService, @@ -1832,7 +1816,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co if unwrapErr != nil || len(unwrappedForOps) == 0 { unwrappedForOps = respBody } - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(unwrappedForOps)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) upstreamDetail := s.getUpstreamErrorDetail(unwrappedForOps) @@ -2255,10 +2239,10 @@ func (s *AntigravityGatewayService) updateAccountModelRateLimitInCache(ctx conte func (s *AntigravityGatewayService) handleUpstreamError( ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, - quotaScope AntigravityQuotaScope, + requestedModel string, groupID int64, sessionHash string, isStickySession bool, ) *handleModelRateLimitResult { - // ✨ 模型级限流处理(在原有逻辑之前) + // 模型级限流处理(优先) result := s.handleModelRateLimit(&handleModelRateLimitParams{ ctx: ctx, prefix: prefix, @@ -2280,27 +2264,35 @@ func (s *AntigravityGatewayService) handleUpstreamError( return nil } - // 429 使用 Gemini 格式解析(从 body 解析重置时间) + // 429:尝试解析模型级限流,解析失败时兜底为账号级限流 if statusCode == 429 { if logBody, maxBytes := s.getLogConfig(); logBody { log.Printf("[Antigravity-Debug] 429 response body: %s", truncateString(string(body), maxBytes)) } - useScopeLimit := quotaScope != "" resetAt := ParseGeminiRateLimitResetTime(body) defaultDur := s.getDefaultRateLimitDuration() - ra := s.resolveResetTime(resetAt, defaultDur) - if useScopeLimit { - log.Printf("%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v", prefix, quotaScope, ra.Format("15:04:05"), time.Until(ra).Truncate(time.Second)) - if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, ra); err != nil { - log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err) - } - } else { - log.Printf("%s status=429 rate_limited account=%d reset_at=%v reset_in=%v", prefix, account.ID, ra.Format("15:04:05"), time.Until(ra).Truncate(time.Second)) - if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil { - log.Printf("%s status=429 rate_limit_set_failed account=%d error=%v", prefix, account.ID, err) + // 尝试解析模型 key 并设置模型级限流 + modelKey := resolveAntigravityModelKey(requestedModel) + if modelKey != "" { + ra := s.resolveResetTime(resetAt, defaultDur) + if err := s.accountRepo.SetModelRateLimit(ctx, account.ID, modelKey, ra); err != nil { + log.Printf("%s status=429 model_rate_limit_set_failed model=%s error=%v", prefix, modelKey, err) + } else { + log.Printf("%s status=429 model_rate_limited model=%s account=%d reset_at=%v reset_in=%v", + prefix, modelKey, account.ID, ra.Format("15:04:05"), time.Until(ra).Truncate(time.Second)) + s.updateAccountModelRateLimitInCache(ctx, account, modelKey, ra) } + return nil + } + + // 无法解析模型 key,兜底为账号级限流 + ra := s.resolveResetTime(resetAt, defaultDur) + log.Printf("%s status=429 rate_limited account=%d reset_at=%v reset_in=%v (fallback)", + prefix, account.ID, ra.Format("15:04:05"), time.Until(ra).Truncate(time.Second)) + if err := s.accountRepo.SetRateLimited(ctx, account.ID, ra); err != nil { + log.Printf("%s status=429 rate_limit_set_failed account=%d error=%v", prefix, account.ID, err) } return nil } @@ -3533,8 +3525,7 @@ func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin. // 429 错误时标记账号限流 if resp.StatusCode == http.StatusTooManyRequests { - quotaScope, _ := resolveAntigravityQuotaScope(originalModel) - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", false) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", false) } // 透传上游错误 diff --git a/backend/internal/service/antigravity_quota_scope.go b/backend/internal/service/antigravity_quota_scope.go index 43ac6c2f..e181e7f8 100644 --- a/backend/internal/service/antigravity_quota_scope.go +++ b/backend/internal/service/antigravity_quota_scope.go @@ -2,63 +2,23 @@ package service import ( "context" - "slices" "strings" "time" ) -const antigravityQuotaScopesKey = "antigravity_quota_scopes" - -// AntigravityQuotaScope 表示 Antigravity 的配额域 -type AntigravityQuotaScope string - -const ( - AntigravityQuotaScopeClaude AntigravityQuotaScope = "claude" - AntigravityQuotaScopeGeminiText AntigravityQuotaScope = "gemini_text" - AntigravityQuotaScopeGeminiImage AntigravityQuotaScope = "gemini_image" -) - -// IsScopeSupported 检查给定的 scope 是否在分组支持的 scope 列表中 -func IsScopeSupported(supportedScopes []string, scope AntigravityQuotaScope) bool { - if len(supportedScopes) == 0 { - // 未配置时默认全部支持 - return true - } - supported := slices.Contains(supportedScopes, string(scope)) - return supported -} - -// ResolveAntigravityQuotaScope 根据模型名称解析配额域(导出版本) -func ResolveAntigravityQuotaScope(requestedModel string) (AntigravityQuotaScope, bool) { - return resolveAntigravityQuotaScope(requestedModel) -} - -// resolveAntigravityQuotaScope 根据模型名称解析配额域 -func resolveAntigravityQuotaScope(requestedModel string) (AntigravityQuotaScope, bool) { - model := normalizeAntigravityModelName(requestedModel) - if model == "" { - return "", false - } - switch { - case strings.HasPrefix(model, "claude-"): - return AntigravityQuotaScopeClaude, true - case strings.HasPrefix(model, "gemini-"): - if isImageGenerationModel(model) { - return AntigravityQuotaScopeGeminiImage, true - } - return AntigravityQuotaScopeGeminiText, true - default: - return "", false - } -} - func normalizeAntigravityModelName(model string) string { normalized := strings.ToLower(strings.TrimSpace(model)) normalized = strings.TrimPrefix(normalized, "models/") return normalized } -// IsSchedulableForModel 结合 Antigravity 配额域限流判断是否可调度。 +// resolveAntigravityModelKey 根据请求的模型名解析限流 key +// 返回空字符串表示无法解析 +func resolveAntigravityModelKey(requestedModel string) string { + return normalizeAntigravityModelName(requestedModel) +} + +// IsSchedulableForModel 结合模型级限流判断是否可调度。 // 保持旧签名以兼容既有调用方;默认使用 context.Background()。 func (a *Account) IsSchedulableForModel(requestedModel string) bool { return a.IsSchedulableForModelWithContext(context.Background(), requestedModel) @@ -74,107 +34,20 @@ func (a *Account) IsSchedulableForModelWithContext(ctx context.Context, requeste if a.isModelRateLimitedWithContext(ctx, requestedModel) { return false } - if a.Platform != PlatformAntigravity { - return true - } - scope, ok := resolveAntigravityQuotaScope(requestedModel) - if !ok { - return true - } - resetAt := a.antigravityQuotaScopeResetAt(scope) - if resetAt == nil { - return true - } - now := time.Now() - return !now.Before(*resetAt) + return true } -func (a *Account) antigravityQuotaScopeResetAt(scope AntigravityQuotaScope) *time.Time { - if a == nil || a.Extra == nil || scope == "" { - return nil - } - rawScopes, ok := a.Extra[antigravityQuotaScopesKey].(map[string]any) - if !ok { - return nil - } - rawScope, ok := rawScopes[string(scope)].(map[string]any) - if !ok { - return nil - } - resetAtRaw, ok := rawScope["rate_limit_reset_at"].(string) - if !ok || strings.TrimSpace(resetAtRaw) == "" { - return nil - } - resetAt, err := time.Parse(time.RFC3339, resetAtRaw) - if err != nil { - return nil - } - return &resetAt -} - -var antigravityAllScopes = []AntigravityQuotaScope{ - AntigravityQuotaScopeClaude, - AntigravityQuotaScopeGeminiText, - AntigravityQuotaScopeGeminiImage, -} - -func (a *Account) GetAntigravityScopeRateLimits() map[string]int64 { - if a == nil || a.Platform != PlatformAntigravity { - return nil - } - now := time.Now() - result := make(map[string]int64) - for _, scope := range antigravityAllScopes { - resetAt := a.antigravityQuotaScopeResetAt(scope) - if resetAt != nil && now.Before(*resetAt) { - remainingSec := int64(time.Until(*resetAt).Seconds()) - if remainingSec > 0 { - result[string(scope)] = remainingSec - } - } - } - if len(result) == 0 { - return nil - } - return result -} - -// GetQuotaScopeRateLimitRemainingTime 获取模型域限流剩余时间 -// 返回 0 表示未限流或已过期 -func (a *Account) GetQuotaScopeRateLimitRemainingTime(requestedModel string) time.Duration { - if a == nil || a.Platform != PlatformAntigravity { - return 0 - } - scope, ok := resolveAntigravityQuotaScope(requestedModel) - if !ok { - return 0 - } - resetAt := a.antigravityQuotaScopeResetAt(scope) - if resetAt == nil { - return 0 - } - if remaining := time.Until(*resetAt); remaining > 0 { - return remaining - } - return 0 -} - -// GetRateLimitRemainingTime 获取限流剩余时间(模型限流和模型域限流取最大值) +// GetRateLimitRemainingTime 获取限流剩余时间(模型级限流) // 返回 0 表示未限流或已过期 func (a *Account) GetRateLimitRemainingTime(requestedModel string) time.Duration { return a.GetRateLimitRemainingTimeWithContext(context.Background(), requestedModel) } -// GetRateLimitRemainingTimeWithContext 获取限流剩余时间(模型限流和模型域限流取最大值) +// GetRateLimitRemainingTimeWithContext 获取限流剩余时间(模型级限流) // 返回 0 表示未限流或已过期 func (a *Account) GetRateLimitRemainingTimeWithContext(ctx context.Context, requestedModel string) time.Duration { if a == nil { return 0 } - modelRemaining := a.GetModelRateLimitRemainingTimeWithContext(ctx, requestedModel) - scopeRemaining := a.GetQuotaScopeRateLimitRemainingTime(requestedModel) - if modelRemaining > scopeRemaining { - return modelRemaining - } - return scopeRemaining + return a.GetModelRateLimitRemainingTimeWithContext(ctx, requestedModel) } diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go index cd2a7a4a..59cc9331 100644 --- a/backend/internal/service/antigravity_rate_limit_test.go +++ b/backend/internal/service/antigravity_rate_limit_test.go @@ -59,12 +59,6 @@ func (s *stubAntigravityUpstream) DoWithTLS(req *http.Request, proxyURL string, return s.Do(req, proxyURL, accountID, accountConcurrency) } -type scopeLimitCall struct { - accountID int64 - scope AntigravityQuotaScope - resetAt time.Time -} - type rateLimitCall struct { accountID int64 resetAt time.Time @@ -78,16 +72,10 @@ type modelRateLimitCall struct { type stubAntigravityAccountRepo struct { AccountRepository - scopeCalls []scopeLimitCall rateCalls []rateLimitCall modelRateLimitCalls []modelRateLimitCall } -func (s *stubAntigravityAccountRepo) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { - s.scopeCalls = append(s.scopeCalls, scopeLimitCall{accountID: id, scope: scope, resetAt: resetAt}) - return nil -} - func (s *stubAntigravityAccountRepo) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { s.rateCalls = append(s.rateCalls, rateLimitCall{accountID: id, resetAt: resetAt}) return nil @@ -131,10 +119,9 @@ func TestAntigravityRetryLoop_URLFallback_UsesLatestSuccess(t *testing.T) { accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), - quotaScope: AntigravityQuotaScopeClaude, httpUpstream: upstream, requestedModel: "claude-sonnet-4-5", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { handleErrorCalled = true return nil }, @@ -155,23 +142,6 @@ func TestAntigravityRetryLoop_URLFallback_UsesLatestSuccess(t *testing.T) { require.Equal(t, base2, available[0]) } -func TestAntigravityHandleUpstreamError_UsesScopeLimit(t *testing.T) { - // 分区限流始终开启,不再支持通过环境变量关闭 - repo := &stubAntigravityAccountRepo{} - svc := &AntigravityGatewayService{accountRepo: repo} - account := &Account{ID: 9, Name: "acc-9", Platform: PlatformAntigravity} - - body := buildGeminiRateLimitBody("3s") - svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, AntigravityQuotaScopeClaude, 0, "", false) - - require.Len(t, repo.scopeCalls, 1) - require.Empty(t, repo.rateCalls) - call := repo.scopeCalls[0] - require.Equal(t, account.ID, call.accountID) - require.Equal(t, AntigravityQuotaScopeClaude, call.scope) - require.WithinDuration(t, time.Now().Add(3*time.Second), call.resetAt, 2*time.Second) -} - // TestHandleUpstreamError_429_ModelRateLimit 测试 429 模型限流场景 func TestHandleUpstreamError_429_ModelRateLimit(t *testing.T) { repo := &stubAntigravityAccountRepo{} @@ -189,7 +159,7 @@ func TestHandleUpstreamError_429_ModelRateLimit(t *testing.T) { } }`) - result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, AntigravityQuotaScopeClaude, 0, "", false) + result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, "claude-sonnet-4-5", 0, "", false) // 应该触发模型限流 require.NotNil(t, result) @@ -200,22 +170,22 @@ func TestHandleUpstreamError_429_ModelRateLimit(t *testing.T) { require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } -// TestHandleUpstreamError_429_NonModelRateLimit 测试 429 非模型限流场景(走 scope 限流) +// TestHandleUpstreamError_429_NonModelRateLimit 测试 429 非模型限流场景(走模型级限流兜底) func TestHandleUpstreamError_429_NonModelRateLimit(t *testing.T) { repo := &stubAntigravityAccountRepo{} svc := &AntigravityGatewayService{accountRepo: repo} account := &Account{ID: 2, Name: "acc-2", Platform: PlatformAntigravity} - // 429 + 普通限流响应(无 RATE_LIMIT_EXCEEDED reason)→ scope 限流 + // 429 + 普通限流响应(无 RATE_LIMIT_EXCEEDED reason)→ 走模型级限流兜底 body := buildGeminiRateLimitBody("5s") - result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, AntigravityQuotaScopeClaude, 0, "", false) + result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusTooManyRequests, http.Header{}, body, "claude-sonnet-4-5", 0, "", false) - // 不应该触发模型限流,应该走 scope 限流 + // handleModelRateLimit 不会处理(因为没有 RATE_LIMIT_EXCEEDED), + // 但 429 兜底逻辑会使用 requestedModel 设置模型级限流 require.Nil(t, result) - require.Empty(t, repo.modelRateLimitCalls) - require.Len(t, repo.scopeCalls, 1) - require.Equal(t, AntigravityQuotaScopeClaude, repo.scopeCalls[0].scope) + require.Len(t, repo.modelRateLimitCalls, 1) + require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } // TestHandleUpstreamError_503_ModelRateLimit 测试 503 模型限流场景 @@ -235,7 +205,7 @@ func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) { } }`) - result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, AntigravityQuotaScopeGeminiText, 0, "", false) + result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false) // 应该触发模型限流 require.NotNil(t, result) @@ -263,12 +233,11 @@ func TestHandleUpstreamError_503_NonModelRateLimit(t *testing.T) { } }`) - result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, AntigravityQuotaScopeGeminiText, 0, "", false) + result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false) // 503 非模型限流不应该做任何处理 require.Nil(t, result) require.Empty(t, repo.modelRateLimitCalls, "503 non-model rate limit should not trigger model rate limit") - require.Empty(t, repo.scopeCalls, "503 non-model rate limit should not trigger scope rate limit") require.Empty(t, repo.rateCalls, "503 non-model rate limit should not trigger account rate limit") } @@ -281,12 +250,11 @@ func TestHandleUpstreamError_503_EmptyBody(t *testing.T) { // 503 + 空响应体 → 不做任何处理 body := []byte(`{}`) - result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, AntigravityQuotaScopeGeminiText, 0, "", false) + result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false) // 503 空响应不应该做任何处理 require.Nil(t, result) require.Empty(t, repo.modelRateLimitCalls) - require.Empty(t, repo.scopeCalls) require.Empty(t, repo.rateCalls) } @@ -307,15 +275,7 @@ func TestAccountIsSchedulableForModel_AntigravityRateLimits(t *testing.T) { require.False(t, account.IsSchedulableForModel("gemini-3-flash")) account.RateLimitResetAt = nil - account.Extra = map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future.Format(time.RFC3339), - }, - }, - } - - require.False(t, account.IsSchedulableForModel("claude-sonnet-4-5")) + require.True(t, account.IsSchedulableForModel("claude-sonnet-4-5")) require.True(t, account.IsSchedulableForModel("gemini-3-flash")) } @@ -635,6 +595,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { }`, expectedShouldRetry: false, expectedShouldRateLimit: true, + minWait: 7 * time.Second, modelName: "gemini-pro", }, { @@ -652,6 +613,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { }`, expectedShouldRetry: false, expectedShouldRateLimit: true, + minWait: 39 * time.Second, modelName: "gemini-3-pro-high", }, { @@ -669,6 +631,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { }`, expectedShouldRetry: false, expectedShouldRateLimit: true, + minWait: 30 * time.Second, modelName: "gemini-2.5-flash", }, { @@ -686,6 +649,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { }`, expectedShouldRetry: false, expectedShouldRateLimit: true, + minWait: 30 * time.Second, modelName: "claude-sonnet-4-5", }, } @@ -704,6 +668,11 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { t.Errorf("wait = %v, want >= %v", wait, tt.minWait) } } + if shouldRateLimit && tt.minWait > 0 { + if wait < tt.minWait { + t.Errorf("rate limit wait = %v, want >= %v", wait, tt.minWait) + } + } if (shouldRetry || shouldRateLimit) && model != tt.modelName { t.Errorf("modelName = %q, want %q", model, tt.modelName) } @@ -832,7 +801,7 @@ func TestAntigravityRetryLoop_PreCheck_SwitchesWhenRateLimited(t *testing.T) { requestedModel: "claude-sonnet-4-5", httpUpstream: upstream, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, }) @@ -875,7 +844,7 @@ func TestAntigravityRetryLoop_PreCheck_SwitchesWhenRemainingLong(t *testing.T) { requestedModel: "claude-sonnet-4-5", httpUpstream: upstream, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, }) diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index 999b408f..a7e0d296 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -75,7 +75,7 @@ func TestHandleSmartRetry_URLLevelRateLimit(t *testing.T) { accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -127,7 +127,7 @@ func TestHandleSmartRetry_LongDelay_ReturnsSwitchError(t *testing.T) { body: []byte(`{"input":"test"}`), accountRepo: repo, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -194,7 +194,7 @@ func TestHandleSmartRetry_ShortDelay_SmartRetrySuccess(t *testing.T) { action: "generateContent", body: []byte(`{"input":"test"}`), httpUpstream: upstream, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -269,7 +269,7 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test httpUpstream: upstream, accountRepo: repo, isStickySession: false, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -331,7 +331,7 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi body: []byte(`{"input":"test"}`), accountRepo: repo, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -387,7 +387,7 @@ func TestHandleSmartRetry_NonAntigravityAccount_ContinuesDefaultLogic(t *testing accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -436,7 +436,7 @@ func TestHandleSmartRetry_NonModelRateLimit_ContinuesDefaultLogic(t *testing.T) accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -487,7 +487,7 @@ func TestHandleSmartRetry_ExactlyAtThreshold_ReturnsSwitchError(t *testing.T) { action: "generateContent", body: []byte(`{"input":"test"}`), accountRepo: repo, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -548,7 +548,7 @@ func TestAntigravityRetryLoop_HandleSmartRetry_SwitchError_Propagates(t *testing httpUpstream: upstream, accountRepo: repo, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, }) @@ -604,7 +604,7 @@ func TestHandleSmartRetry_NetworkError_ExhaustsRetry(t *testing.T) { body: []byte(`{"input":"test"}`), httpUpstream: upstream, accountRepo: repo, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -662,7 +662,7 @@ func TestHandleSmartRetry_NoRetryDelay_UsesDefaultRateLimit(t *testing.T) { body: []byte(`{"input":"test"}`), accountRepo: repo, isStickySession: true, - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -754,7 +754,7 @@ func TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_ClearsSession(t * isStickySession: true, groupID: 42, sessionHash: "sticky-hash-abc", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -842,7 +842,7 @@ func TestHandleSmartRetry_ShortDelay_NonStickySession_FailedRetry_NoDeleteSessio isStickySession: false, groupID: 42, sessionHash: "", // 非粘性会话,sessionHash 为空 - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -918,7 +918,7 @@ func TestHandleSmartRetry_ShortDelay_StickySession_FailedRetry_NilCache_NoPanic( isStickySession: true, groupID: 42, sessionHash: "sticky-hash-nil-cache", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -983,7 +983,7 @@ func TestHandleSmartRetry_ShortDelay_StickySession_SuccessRetry_NoDeleteSession( isStickySession: true, groupID: 42, sessionHash: "sticky-hash-success", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -1043,7 +1043,7 @@ func TestHandleSmartRetry_LongDelay_StickySession_NoDeleteInHandleSmartRetry(t * isStickySession: true, groupID: 42, sessionHash: "sticky-hash-long-delay", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -1108,7 +1108,7 @@ func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t isStickySession: true, groupID: 99, sessionHash: "sticky-net-error", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -1188,7 +1188,7 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession isStickySession: true, groupID: 77, sessionHash: "sticky-503-short", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, } @@ -1278,7 +1278,7 @@ func TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagat isStickySession: true, groupID: 55, sessionHash: "sticky-loop-test", - handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { return nil }, }) @@ -1296,4 +1296,4 @@ func TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagat require.Len(t, cache.deleteCalls, 1, "should clear sticky session in handleSmartRetry") require.Equal(t, int64(55), cache.deleteCalls[0].groupID) require.Equal(t, "sticky-loop-test", cache.deleteCalls[0].sessionHash) -} \ No newline at end of file +} diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 069ea7d7..b3962e54 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -142,9 +142,6 @@ func (m *mockAccountRepoForPlatform) ListSchedulableByGroupIDAndPlatforms(ctx co func (m *mockAccountRepoForPlatform) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { return nil } -func (m *mockAccountRepoForPlatform) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { - return nil -} func (m *mockAccountRepoForPlatform) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { return nil } @@ -216,14 +213,6 @@ func (m *mockGatewayCacheForPlatform) DeleteSessionAccountID(ctx context.Context return nil } -func (m *mockGatewayCacheForPlatform) IncrModelCallCount(ctx context.Context, accountID int64, model string) (int64, error) { - return 0, nil -} - -func (m *mockGatewayCacheForPlatform) GetModelLoadBatch(ctx context.Context, accountIDs []int64, model string) (map[int64]*ModelLoadInfo, error) { - return nil, nil -} - type mockGroupRepoForGateway struct { groups map[int64]*Group getByIDCalls int @@ -282,10 +271,6 @@ func (m *mockGroupRepoForGateway) GetAccountIDsByGroupIDs(ctx context.Context, g return nil, nil } -func (m *mockGroupRepoForGateway) UpdateSortOrders(ctx context.Context, updates []GroupSortOrderUpdate) error { - return nil -} - func ptr[T any](v T) *T { return &v } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 49591ecd..4e723232 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -246,9 +246,6 @@ var ( // ErrClaudeCodeOnly 表示分组仅允许 Claude Code 客户端访问 var ErrClaudeCodeOnly = errors.New("this group only allows Claude Code clients") -// ErrModelScopeNotSupported 表示请求的模型系列不在分组支持的范围内 -var ErrModelScopeNotSupported = errors.New("model scope not supported by this group") - // allowedHeaders 白名单headers(参考CRS项目) var allowedHeaders = map[string]bool{ "accept": true, @@ -274,13 +271,6 @@ var allowedHeaders = map[string]bool{ // GatewayCache 定义网关服务的缓存操作接口。 // 提供粘性会话(Sticky Session)的存储、查询、刷新和删除功能。 // -// ModelLoadInfo 模型负载信息(用于 Antigravity 调度) -// Model load info for Antigravity scheduling -type ModelLoadInfo struct { - CallCount int64 // 当前分钟调用次数 / Call count in current minute - LastUsedAt time.Time // 最后调度时间(零值表示未调度过)/ Last scheduling time (zero means never scheduled) -} - // GatewayCache defines cache operations for gateway service. // Provides sticky session storage, retrieval, refresh and deletion capabilities. type GatewayCache interface { @@ -296,15 +286,6 @@ type GatewayCache interface { // DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理 // Delete sticky session binding, used to proactively clean up when account becomes unavailable DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error - - // IncrModelCallCount 增加模型调用次数并更新最后调度时间(Antigravity 专用) - // Increment model call count and update last scheduling time (Antigravity only) - // 返回更新后的调用次数 - IncrModelCallCount(ctx context.Context, accountID int64, model string) (int64, error) - - // GetModelLoadBatch 批量获取账号的模型负载信息(Antigravity 专用) - // Batch get model load info for accounts (Antigravity only) - GetModelLoadBatch(ctx context.Context, accountIDs []int64, model string) (map[int64]*ModelLoadInfo, error) } // derefGroupID safely dereferences *int64 to int64, returning 0 if nil @@ -1018,13 +999,6 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro log.Printf("[ModelRoutingDebug] load-aware enabled: group_id=%v model=%s session=%s platform=%s", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), platform) } - // Antigravity 模型系列检查(在账号选择前检查,确保所有代码路径都经过此检查) - if platform == PlatformAntigravity && groupID != nil && requestedModel != "" { - if err := s.checkAntigravityModelScope(ctx, *groupID, requestedModel); err != nil { - return nil, err - } - } - accounts, useMixed, err := s.listSchedulableAccounts(ctx, groupID, platform, hasForcePlatform) if err != nil { return nil, err @@ -1372,10 +1346,6 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro return result, nil } } else { - // Antigravity 平台:获取模型负载信息 - var modelLoadMap map[int64]*ModelLoadInfo - isAntigravity := platform == PlatformAntigravity - var available []accountWithLoad for _, acc := range candidates { loadInfo := loadMap[acc.ID] @@ -1390,109 +1360,44 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } } - // Antigravity 平台:按账号实际映射后的模型名获取模型负载(与 Forward 的统计保持一致) - if isAntigravity && requestedModel != "" && s.cache != nil && len(available) > 0 { - modelLoadMap = make(map[int64]*ModelLoadInfo, len(available)) - modelToAccountIDs := make(map[string][]int64) - for _, item := range available { - mappedModel := mapAntigravityModel(item.account, requestedModel) - if mappedModel == "" { - continue - } - modelToAccountIDs[mappedModel] = append(modelToAccountIDs[mappedModel], item.account.ID) + // 分层过滤选择:优先级 → 负载率 → LRU + for len(available) > 0 { + // 1. 取优先级最小的集合 + candidates := filterByMinPriority(available) + // 2. 取负载率最低的集合 + candidates = filterByMinLoadRate(candidates) + // 3. LRU 选择最久未用的账号 + selected := selectByLRU(candidates, preferOAuth) + if selected == nil { + break } - for model, ids := range modelToAccountIDs { - batch, err := s.cache.GetModelLoadBatch(ctx, ids, model) - if err != nil { - continue - } - for id, info := range batch { - modelLoadMap[id] = info - } - } - if len(modelLoadMap) == 0 { - modelLoadMap = nil - } - } - // Antigravity 平台:优先级硬过滤 →(同优先级内)按调用次数选择(最少优先,新账号用平均值) - // 其他平台:分层过滤选择:优先级 → 负载率 → LRU - if isAntigravity { - for len(available) > 0 { - // 1. 取优先级最小的集合(硬过滤) - candidates := filterByMinPriority(available) - // 2. 同优先级内按调用次数选择(调用次数最少优先,新账号使用平均值) - selected := selectByCallCount(candidates, modelLoadMap, preferOAuth) - if selected == nil { - break - } - - result, err := s.tryAcquireAccountSlot(ctx, selected.account.ID, selected.account.Concurrency) - if err == nil && result.Acquired { - // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, selected.account, sessionHash) { - result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 - } else { - if sessionHash != "" && s.cache != nil { - _ = s.cache.SetSessionAccountID(ctx, derefGroupID(groupID), sessionHash, selected.account.ID, stickySessionTTL) - } - return &AccountSelectionResult{ - Account: selected.account, - Acquired: true, - ReleaseFunc: result.ReleaseFunc, - }, nil + result, err := s.tryAcquireAccountSlot(ctx, selected.account.ID, selected.account.Concurrency) + if err == nil && result.Acquired { + // 会话数量限制检查 + if !s.checkAndRegisterSession(ctx, selected.account, sessionHash) { + result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 + } else { + if sessionHash != "" && s.cache != nil { + _ = s.cache.SetSessionAccountID(ctx, derefGroupID(groupID), sessionHash, selected.account.ID, stickySessionTTL) } + return &AccountSelectionResult{ + Account: selected.account, + Acquired: true, + ReleaseFunc: result.ReleaseFunc, + }, nil } - - // 移除已尝试的账号,重新选择 - selectedID := selected.account.ID - newAvailable := make([]accountWithLoad, 0, len(available)-1) - for _, acc := range available { - if acc.account.ID != selectedID { - newAvailable = append(newAvailable, acc) - } - } - available = newAvailable } - } else { - for len(available) > 0 { - // 1. 取优先级最小的集合 - candidates := filterByMinPriority(available) - // 2. 取负载率最低的集合 - candidates = filterByMinLoadRate(candidates) - // 3. LRU 选择最久未用的账号 - selected := selectByLRU(candidates, preferOAuth) - if selected == nil { - break - } - result, err := s.tryAcquireAccountSlot(ctx, selected.account.ID, selected.account.Concurrency) - if err == nil && result.Acquired { - // 会话数量限制检查 - if !s.checkAndRegisterSession(ctx, selected.account, sessionHash) { - result.ReleaseFunc() // 释放槽位,继续尝试下一个账号 - } else { - if sessionHash != "" && s.cache != nil { - _ = s.cache.SetSessionAccountID(ctx, derefGroupID(groupID), sessionHash, selected.account.ID, stickySessionTTL) - } - return &AccountSelectionResult{ - Account: selected.account, - Acquired: true, - ReleaseFunc: result.ReleaseFunc, - }, nil - } + // 移除已尝试的账号,重新进行分层过滤 + selectedID := selected.account.ID + newAvailable := make([]accountWithLoad, 0, len(available)-1) + for _, acc := range available { + if acc.account.ID != selectedID { + newAvailable = append(newAvailable, acc) } - - // 移除已尝试的账号,重新进行分层过滤 - selectedID := selected.account.ID - newAvailable := make([]accountWithLoad, 0, len(available)-1) - for _, acc := range available { - if acc.account.ID != selectedID { - newAvailable = append(newAvailable, acc) - } - } - available = newAvailable } + available = newAvailable } } @@ -2103,87 +2008,6 @@ func sameLastUsedAt(a, b *time.Time) bool { } } -// selectByCallCount 从候选账号中选择调用次数最少的账号(Antigravity 专用) -// 新账号(CallCount=0)使用平均调用次数作为虚拟值,避免冷启动被猛调 -// 如果有多个账号具有相同的最小调用次数,则随机选择一个 -func selectByCallCount(accounts []accountWithLoad, modelLoadMap map[int64]*ModelLoadInfo, preferOAuth bool) *accountWithLoad { - if len(accounts) == 0 { - return nil - } - if len(accounts) == 1 { - return &accounts[0] - } - - // 如果没有负载信息,回退到 LRU - if modelLoadMap == nil { - return selectByLRU(accounts, preferOAuth) - } - - // 1. 计算平均调用次数(用于新账号冷启动) - var totalCallCount int64 - var countWithCalls int - for _, acc := range accounts { - if info := modelLoadMap[acc.account.ID]; info != nil && info.CallCount > 0 { - totalCallCount += info.CallCount - countWithCalls++ - } - } - - var avgCallCount int64 - if countWithCalls > 0 { - avgCallCount = totalCallCount / int64(countWithCalls) - } - - // 2. 获取每个账号的有效调用次数 - getEffectiveCallCount := func(acc accountWithLoad) int64 { - if acc.account == nil { - return 0 - } - info := modelLoadMap[acc.account.ID] - if info == nil || info.CallCount == 0 { - return avgCallCount // 新账号使用平均值 - } - return info.CallCount - } - - // 3. 找到最小调用次数 - minCount := getEffectiveCallCount(accounts[0]) - for _, acc := range accounts[1:] { - if c := getEffectiveCallCount(acc); c < minCount { - minCount = c - } - } - - // 4. 收集所有具有最小调用次数的账号 - var candidateIdxs []int - for i, acc := range accounts { - if getEffectiveCallCount(acc) == minCount { - candidateIdxs = append(candidateIdxs, i) - } - } - - // 5. 如果只有一个候选,直接返回 - if len(candidateIdxs) == 1 { - return &accounts[candidateIdxs[0]] - } - - // 6. preferOAuth 处理 - if preferOAuth { - var oauthIdxs []int - for _, idx := range candidateIdxs { - if accounts[idx].account.Type == AccountTypeOAuth { - oauthIdxs = append(oauthIdxs, idx) - } - } - if len(oauthIdxs) > 0 { - candidateIdxs = oauthIdxs - } - } - - // 7. 随机选择 - return &accounts[candidateIdxs[mathrand.Intn(len(candidateIdxs))]] -} - // sortCandidatesForFallback 根据配置选择排序策略 // mode: "last_used"(按最后使用时间) 或 "random"(随机) func (s *GatewayService) sortCandidatesForFallback(accounts []*Account, preferOAuth bool, mode string) { @@ -2236,13 +2060,6 @@ func shuffleWithinPriority(accounts []*Account) { // selectAccountForModelWithPlatform 选择单平台账户(完全隔离) func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, platform string) (*Account, error) { - // 对 Antigravity 平台,检查请求的模型系列是否在分组支持范围内 - if platform == PlatformAntigravity && groupID != nil && requestedModel != "" { - if err := s.checkAntigravityModelScope(ctx, *groupID, requestedModel); err != nil { - return nil, err - } - } - preferOAuth := platform == PlatformGemini routingAccountIDs := s.routingAccountIDsForRequest(ctx, groupID, requestedModel, platform) @@ -5254,27 +5071,6 @@ func (s *GatewayService) validateUpstreamBaseURL(raw string) (string, error) { return normalized, nil } -// checkAntigravityModelScope 检查 Antigravity 平台的模型系列是否在分组支持范围内 -func (s *GatewayService) checkAntigravityModelScope(ctx context.Context, groupID int64, requestedModel string) error { - scope, ok := ResolveAntigravityQuotaScope(requestedModel) - if !ok { - return nil // 无法解析 scope,跳过检查 - } - - group, err := s.resolveGroupByID(ctx, groupID) - if err != nil { - return nil // 查询失败时放行 - } - if group == nil { - return nil // 分组不存在时放行 - } - - if !IsScopeSupported(group.SupportedModelScopes, scope) { - return ErrModelScopeNotSupported - } - return nil -} - // GetAvailableModels returns the list of models available for a group // It aggregates model_mapping keys from all schedulable accounts in the group func (s *GatewayService) GetAvailableModels(ctx context.Context, groupID *int64, platform string) []string { diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go index c738b79f..d03b75df 100644 --- a/backend/internal/service/gemini_multiplatform_test.go +++ b/backend/internal/service/gemini_multiplatform_test.go @@ -133,9 +133,6 @@ func (m *mockAccountRepoForGemini) ListSchedulableByGroupIDAndPlatforms(ctx cont func (m *mockAccountRepoForGemini) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { return nil } -func (m *mockAccountRepoForGemini) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { - return nil -} func (m *mockAccountRepoForGemini) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { return nil } @@ -269,14 +266,6 @@ func (m *mockGatewayCacheForGemini) DeleteSessionAccountID(ctx context.Context, return nil } -func (m *mockGatewayCacheForGemini) IncrModelCallCount(ctx context.Context, accountID int64, model string) (int64, error) { - return 0, nil -} - -func (m *mockGatewayCacheForGemini) GetModelLoadBatch(ctx context.Context, accountIDs []int64, model string) (map[int64]*ModelLoadInfo, error) { - return nil, nil -} - // TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform 测试 Gemini 单平台选择 func TestGeminiMessagesCompatService_SelectAccountForModelWithExclusions_GeminiPlatform(t *testing.T) { ctx := context.Background() diff --git a/backend/internal/service/model_rate_limit_test.go b/backend/internal/service/model_rate_limit_test.go index a51e6909..b79b9688 100644 --- a/backend/internal/service/model_rate_limit_test.go +++ b/backend/internal/service/model_rate_limit_test.go @@ -318,110 +318,6 @@ func TestGetModelRateLimitRemainingTime(t *testing.T) { } } -func TestGetQuotaScopeRateLimitRemainingTime(t *testing.T) { - now := time.Now() - future10m := now.Add(10 * time.Minute).Format(time.RFC3339) - past := now.Add(-10 * time.Minute).Format(time.RFC3339) - - tests := []struct { - name string - account *Account - requestedModel string - minExpected time.Duration - maxExpected time.Duration - }{ - { - name: "nil account", - account: nil, - requestedModel: "claude-sonnet-4-5", - minExpected: 0, - maxExpected: 0, - }, - { - name: "non-antigravity platform", - account: &Account{ - Platform: PlatformAnthropic, - Extra: map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future10m, - }, - }, - }, - }, - requestedModel: "claude-sonnet-4-5", - minExpected: 0, - maxExpected: 0, - }, - { - name: "claude scope rate limited", - account: &Account{ - Platform: PlatformAntigravity, - Extra: map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future10m, - }, - }, - }, - }, - requestedModel: "claude-sonnet-4-5", - minExpected: 9 * time.Minute, - maxExpected: 11 * time.Minute, - }, - { - name: "gemini_text scope rate limited", - account: &Account{ - Platform: PlatformAntigravity, - Extra: map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "gemini_text": map[string]any{ - "rate_limit_reset_at": future10m, - }, - }, - }, - }, - requestedModel: "gemini-3-flash", - minExpected: 9 * time.Minute, - maxExpected: 11 * time.Minute, - }, - { - name: "expired scope rate limit", - account: &Account{ - Platform: PlatformAntigravity, - Extra: map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": past, - }, - }, - }, - }, - requestedModel: "claude-sonnet-4-5", - minExpected: 0, - maxExpected: 0, - }, - { - name: "unsupported model", - account: &Account{ - Platform: PlatformAntigravity, - }, - requestedModel: "gpt-4", - minExpected: 0, - maxExpected: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := tt.account.GetQuotaScopeRateLimitRemainingTime(tt.requestedModel) - if result < tt.minExpected || result > tt.maxExpected { - t.Errorf("GetQuotaScopeRateLimitRemainingTime() = %v, want between %v and %v", result, tt.minExpected, tt.maxExpected) - } - }) - } -} - func TestGetRateLimitRemainingTime(t *testing.T) { now := time.Now() future15m := now.Add(15 * time.Minute).Format(time.RFC3339) @@ -442,45 +338,19 @@ func TestGetRateLimitRemainingTime(t *testing.T) { maxExpected: 0, }, { - name: "model remaining > scope remaining - returns model", + name: "model rate limited - 15 minutes", account: &Account{ Platform: PlatformAntigravity, Extra: map[string]any{ modelRateLimitsKey: map[string]any{ "claude-sonnet-4-5": map[string]any{ - "rate_limit_reset_at": future15m, // 15 分钟 - }, - }, - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future5m, // 5 分钟 + "rate_limit_reset_at": future15m, }, }, }, }, requestedModel: "claude-sonnet-4-5", - minExpected: 14 * time.Minute, // 应返回较大的 15 分钟 - maxExpected: 16 * time.Minute, - }, - { - name: "scope remaining > model remaining - returns scope", - account: &Account{ - Platform: PlatformAntigravity, - Extra: map[string]any{ - modelRateLimitsKey: map[string]any{ - "claude-sonnet-4-5": map[string]any{ - "rate_limit_reset_at": future5m, // 5 分钟 - }, - }, - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future15m, // 15 分钟 - }, - }, - }, - }, - requestedModel: "claude-sonnet-4-5", - minExpected: 14 * time.Minute, // 应返回较大的 15 分钟 + minExpected: 14 * time.Minute, maxExpected: 16 * time.Minute, }, { @@ -499,22 +369,6 @@ func TestGetRateLimitRemainingTime(t *testing.T) { minExpected: 4 * time.Minute, maxExpected: 6 * time.Minute, }, - { - name: "only scope rate limited", - account: &Account{ - Platform: PlatformAntigravity, - Extra: map[string]any{ - antigravityQuotaScopesKey: map[string]any{ - "claude": map[string]any{ - "rate_limit_reset_at": future5m, - }, - }, - }, - }, - requestedModel: "claude-sonnet-4-5", - minExpected: 4 * time.Minute, - maxExpected: 6 * time.Minute, - }, { name: "neither rate limited", account: &Account{ diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 22b4730d..ae69a986 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -204,14 +204,6 @@ func (c *stubGatewayCache) DeleteSessionAccountID(ctx context.Context, groupID i return nil } -func (c *stubGatewayCache) IncrModelCallCount(ctx context.Context, accountID int64, model string) (int64, error) { - return 0, nil -} - -func (c *stubGatewayCache) GetModelLoadBatch(ctx context.Context, accountIDs []int64, model string) (map[int64]*ModelLoadInfo, error) { - return nil, nil -} - func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) { now := time.Now() resetAt := now.Add(10 * time.Minute) diff --git a/backend/internal/service/ops_account_availability.go b/backend/internal/service/ops_account_availability.go index a649e7b5..da66ec4d 100644 --- a/backend/internal/service/ops_account_availability.go +++ b/backend/internal/service/ops_account_availability.go @@ -66,7 +66,6 @@ func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFi } isAvailable := acc.Status == StatusActive && acc.Schedulable && !isRateLimited && !isOverloaded && !isTempUnsched - scopeRateLimits := acc.GetAntigravityScopeRateLimits() if acc.Platform != "" { if _, ok := platform[acc.Platform]; !ok { @@ -85,14 +84,6 @@ func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFi if hasError { p.ErrorCount++ } - if len(scopeRateLimits) > 0 { - if p.ScopeRateLimitCount == nil { - p.ScopeRateLimitCount = make(map[string]int64) - } - for scope := range scopeRateLimits { - p.ScopeRateLimitCount[scope]++ - } - } } for _, grp := range acc.Groups { @@ -117,14 +108,6 @@ func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFi if hasError { g.ErrorCount++ } - if len(scopeRateLimits) > 0 { - if g.ScopeRateLimitCount == nil { - g.ScopeRateLimitCount = make(map[string]int64) - } - for scope := range scopeRateLimits { - g.ScopeRateLimitCount[scope]++ - } - } } displayGroupID := int64(0) @@ -157,9 +140,6 @@ func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFi item.RateLimitRemainingSec = &remainingSec } } - if len(scopeRateLimits) > 0 { - item.ScopeRateLimits = scopeRateLimits - } if isOverloaded && acc.OverloadUntil != nil { item.OverloadUntil = acc.OverloadUntil remainingSec := int64(time.Until(*acc.OverloadUntil).Seconds()) diff --git a/backend/internal/service/ops_realtime_models.go b/backend/internal/service/ops_realtime_models.go index 33029f59..0837b130 100644 --- a/backend/internal/service/ops_realtime_models.go +++ b/backend/internal/service/ops_realtime_models.go @@ -50,24 +50,22 @@ type UserConcurrencyInfo struct { // PlatformAvailability aggregates account availability by platform. type PlatformAvailability struct { - Platform string `json:"platform"` - TotalAccounts int64 `json:"total_accounts"` - AvailableCount int64 `json:"available_count"` - RateLimitCount int64 `json:"rate_limit_count"` - ScopeRateLimitCount map[string]int64 `json:"scope_rate_limit_count,omitempty"` - ErrorCount int64 `json:"error_count"` + Platform string `json:"platform"` + TotalAccounts int64 `json:"total_accounts"` + AvailableCount int64 `json:"available_count"` + RateLimitCount int64 `json:"rate_limit_count"` + ErrorCount int64 `json:"error_count"` } // GroupAvailability aggregates account availability by group. type GroupAvailability struct { - GroupID int64 `json:"group_id"` - GroupName string `json:"group_name"` - Platform string `json:"platform"` - TotalAccounts int64 `json:"total_accounts"` - AvailableCount int64 `json:"available_count"` - RateLimitCount int64 `json:"rate_limit_count"` - ScopeRateLimitCount map[string]int64 `json:"scope_rate_limit_count,omitempty"` - ErrorCount int64 `json:"error_count"` + GroupID int64 `json:"group_id"` + GroupName string `json:"group_name"` + Platform string `json:"platform"` + TotalAccounts int64 `json:"total_accounts"` + AvailableCount int64 `json:"available_count"` + RateLimitCount int64 `json:"rate_limit_count"` + ErrorCount int64 `json:"error_count"` } // AccountAvailability represents current availability for a single account. @@ -85,10 +83,9 @@ type AccountAvailability struct { IsOverloaded bool `json:"is_overloaded"` HasError bool `json:"has_error"` - RateLimitResetAt *time.Time `json:"rate_limit_reset_at"` - RateLimitRemainingSec *int64 `json:"rate_limit_remaining_sec"` - ScopeRateLimits map[string]int64 `json:"scope_rate_limits,omitempty"` - OverloadUntil *time.Time `json:"overload_until"` + RateLimitResetAt *time.Time `json:"rate_limit_reset_at"` + RateLimitRemainingSec *int64 `json:"rate_limit_remaining_sec"` + OverloadUntil *time.Time `json:"overload_until"` OverloadRemainingSec *int64 `json:"overload_remaining_sec"` ErrorMessage string `json:"error_message"` TempUnschedulableUntil *time.Time `json:"temp_unschedulable_until,omitempty"` diff --git a/frontend/src/api/admin/ops.ts b/frontend/src/api/admin/ops.ts index 5b96feda..9f980a12 100644 --- a/frontend/src/api/admin/ops.ts +++ b/frontend/src/api/admin/ops.ts @@ -376,7 +376,6 @@ export interface PlatformAvailability { total_accounts: number available_count: number rate_limit_count: number - scope_rate_limit_count?: Record error_count: number } @@ -387,7 +386,6 @@ export interface GroupAvailability { total_accounts: number available_count: number rate_limit_count: number - scope_rate_limit_count?: Record error_count: number } @@ -402,7 +400,6 @@ export interface AccountAvailability { is_rate_limited: boolean rate_limit_reset_at?: string rate_limit_remaining_sec?: number - scope_rate_limits?: Record is_overloaded: boolean overload_until?: string overload_remaining_sec?: number diff --git a/frontend/src/components/account/AccountStatusIndicator.vue b/frontend/src/components/account/AccountStatusIndicator.vue index 3474da44..5fe96a1d 100644 --- a/frontend/src/components/account/AccountStatusIndicator.vue +++ b/frontend/src/components/account/AccountStatusIndicator.vue @@ -76,26 +76,6 @@ - - -