diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index ba8751df..f7725820 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -794,6 +794,46 @@ func (r *accountRepository) SetAntigravityQuotaScopeLimit(ctx context.Context, i return nil } +func (r *accountRepository) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { + if scope == "" { + return nil + } + now := time.Now().UTC() + payload := map[string]string{ + "rate_limited_at": now.Format(time.RFC3339), + "rate_limit_reset_at": resetAt.UTC().Format(time.RFC3339), + } + raw, err := json.Marshal(payload) + if err != nil { + return err + } + + path := "{model_rate_limits," + scope + "}" + client := clientFromContext(ctx, r.client) + result, err := client.ExecContext( + ctx, + "UPDATE accounts SET extra = jsonb_set(COALESCE(extra, '{}'::jsonb), $1::text[], $2::jsonb, true), updated_at = NOW() WHERE id = $3 AND deleted_at IS NULL", + path, + raw, + id, + ) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return service.ErrAccountNotFound + } + if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { + log.Printf("[SchedulerOutbox] enqueue model rate limit failed: account=%d err=%v", id, err) + } + return nil +} + func (r *accountRepository) SetOverloaded(ctx context.Context, id int64, until time.Time) error { _, err := r.client.Account.Update(). Where(dbaccount.IDEQ(id)). @@ -885,6 +925,30 @@ func (r *accountRepository) ClearAntigravityQuotaScopes(ctx context.Context, id return nil } +func (r *accountRepository) ClearModelRateLimits(ctx context.Context, id int64) error { + client := clientFromContext(ctx, r.client) + result, err := client.ExecContext( + ctx, + "UPDATE accounts SET extra = COALESCE(extra, '{}'::jsonb) - 'model_rate_limits', updated_at = NOW() WHERE id = $1 AND deleted_at IS NULL", + id, + ) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return service.ErrAccountNotFound + } + if err := enqueueSchedulerOutbox(ctx, r.sql, service.SchedulerOutboxEventAccountChanged, &id, nil, nil); err != nil { + log.Printf("[SchedulerOutbox] enqueue clear model rate limit failed: account=%d err=%v", id, err) + } + return nil +} + func (r *accountRepository) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { builder := r.client.Account.Update(). Where(dbaccount.IDEQ(id)). diff --git a/backend/internal/server/api_contract_test.go b/backend/internal/server/api_contract_test.go index 7d2b789f..7971c65f 100644 --- a/backend/internal/server/api_contract_test.go +++ b/backend/internal/server/api_contract_test.go @@ -780,6 +780,10 @@ func (s *stubAccountRepo) SetAntigravityQuotaScopeLimit(ctx context.Context, id return errors.New("not implemented") } +func (s *stubAccountRepo) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { + return errors.New("not implemented") +} + func (s *stubAccountRepo) SetOverloaded(ctx context.Context, id int64, until time.Time) error { return errors.New("not implemented") } @@ -800,6 +804,10 @@ func (s *stubAccountRepo) ClearAntigravityQuotaScopes(ctx context.Context, id in return errors.New("not implemented") } +func (s *stubAccountRepo) ClearModelRateLimits(ctx context.Context, id int64) error { + return errors.New("not implemented") +} + func (s *stubAccountRepo) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { return errors.New("not implemented") } diff --git a/backend/internal/service/account_service.go b/backend/internal/service/account_service.go index 2badc760..ede5b12f 100644 --- a/backend/internal/service/account_service.go +++ b/backend/internal/service/account_service.go @@ -50,11 +50,13 @@ type AccountRepository interface { SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error + SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error SetOverloaded(ctx context.Context, id int64, until time.Time) error SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error ClearTempUnschedulable(ctx context.Context, id int64) error ClearRateLimit(ctx context.Context, id int64) error ClearAntigravityQuotaScopes(ctx context.Context, id int64) error + ClearModelRateLimits(ctx context.Context, id int64) error UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error UpdateExtra(ctx context.Context, id int64, updates map[string]any) error BulkUpdate(ctx context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) diff --git a/backend/internal/service/account_service_delete_test.go b/backend/internal/service/account_service_delete_test.go index 6923067d..36af719c 100644 --- a/backend/internal/service/account_service_delete_test.go +++ b/backend/internal/service/account_service_delete_test.go @@ -143,6 +143,10 @@ func (s *accountRepoStub) SetAntigravityQuotaScopeLimit(ctx context.Context, id panic("unexpected SetAntigravityQuotaScopeLimit call") } +func (s *accountRepoStub) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { + panic("unexpected SetModelRateLimit call") +} + func (s *accountRepoStub) SetOverloaded(ctx context.Context, id int64, until time.Time) error { panic("unexpected SetOverloaded call") } @@ -163,6 +167,10 @@ func (s *accountRepoStub) ClearAntigravityQuotaScopes(ctx context.Context, id in panic("unexpected ClearAntigravityQuotaScopes call") } +func (s *accountRepoStub) ClearModelRateLimits(ctx context.Context, id int64) error { + panic("unexpected ClearModelRateLimits call") +} + func (s *accountRepoStub) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { panic("unexpected UpdateSessionWindow call") } diff --git a/backend/internal/service/antigravity_quota_scope.go b/backend/internal/service/antigravity_quota_scope.go index e9f7184b..a3b2ec66 100644 --- a/backend/internal/service/antigravity_quota_scope.go +++ b/backend/internal/service/antigravity_quota_scope.go @@ -49,6 +49,9 @@ func (a *Account) IsSchedulableForModel(requestedModel string) bool { if !a.IsSchedulable() { return false } + if a.isModelRateLimited(requestedModel) { + return false + } if a.Platform != PlatformAntigravity { return true } diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 7673f5ef..76d73286 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -142,6 +142,9 @@ func (m *mockAccountRepoForPlatform) SetRateLimited(ctx context.Context, id int6 func (m *mockAccountRepoForPlatform) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { return nil } +func (m *mockAccountRepoForPlatform) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { + return nil +} func (m *mockAccountRepoForPlatform) SetOverloaded(ctx context.Context, id int64, until time.Time) error { return nil } @@ -157,6 +160,9 @@ func (m *mockAccountRepoForPlatform) ClearRateLimit(ctx context.Context, id int6 func (m *mockAccountRepoForPlatform) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { return nil } +func (m *mockAccountRepoForPlatform) ClearModelRateLimits(ctx context.Context, id int64) error { + return nil +} func (m *mockAccountRepoForPlatform) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { return nil } diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go index c99cb87d..03f5d757 100644 --- a/backend/internal/service/gemini_multiplatform_test.go +++ b/backend/internal/service/gemini_multiplatform_test.go @@ -125,6 +125,9 @@ func (m *mockAccountRepoForGemini) SetRateLimited(ctx context.Context, id int64, func (m *mockAccountRepoForGemini) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { return nil } +func (m *mockAccountRepoForGemini) SetModelRateLimit(ctx context.Context, id int64, scope string, resetAt time.Time) error { + return nil +} func (m *mockAccountRepoForGemini) SetOverloaded(ctx context.Context, id int64, until time.Time) error { return nil } @@ -138,6 +141,9 @@ func (m *mockAccountRepoForGemini) ClearRateLimit(ctx context.Context, id int64) func (m *mockAccountRepoForGemini) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { return nil } +func (m *mockAccountRepoForGemini) ClearModelRateLimits(ctx context.Context, id int64) error { + return nil +} func (m *mockAccountRepoForGemini) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { return nil } diff --git a/backend/internal/service/model_rate_limit.go b/backend/internal/service/model_rate_limit.go new file mode 100644 index 00000000..49354a7f --- /dev/null +++ b/backend/internal/service/model_rate_limit.go @@ -0,0 +1,56 @@ +package service + +import ( + "strings" + "time" +) + +const modelRateLimitsKey = "model_rate_limits" +const modelRateLimitScopeClaudeSonnet = "claude_sonnet" + +func resolveModelRateLimitScope(requestedModel string) (string, bool) { + model := strings.ToLower(strings.TrimSpace(requestedModel)) + if model == "" { + return "", false + } + model = strings.TrimPrefix(model, "models/") + if strings.Contains(model, "sonnet") { + return modelRateLimitScopeClaudeSonnet, true + } + return "", false +} + +func (a *Account) isModelRateLimited(requestedModel string) bool { + scope, ok := resolveModelRateLimitScope(requestedModel) + if !ok { + return false + } + resetAt := a.modelRateLimitResetAt(scope) + if resetAt == nil { + return false + } + return time.Now().Before(*resetAt) +} + +func (a *Account) modelRateLimitResetAt(scope string) *time.Time { + if a == nil || a.Extra == nil || scope == "" { + return nil + } + rawLimits, ok := a.Extra[modelRateLimitsKey].(map[string]any) + if !ok { + return nil + } + rawLimit, ok := rawLimits[scope].(map[string]any) + if !ok { + return nil + } + resetAtRaw, ok := rawLimit["rate_limit_reset_at"].(string) + if !ok || strings.TrimSpace(resetAtRaw) == "" { + return nil + } + resetAt, err := time.Parse(time.RFC3339, resetAtRaw) + if err != nil { + return nil + } + return &resetAt +} diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index 20e08c52..47a04cf5 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -127,7 +127,7 @@ func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Acc s.handleAuthError(ctx, account, msg) shouldDisable = true case 429: - s.handle429(ctx, account, headers) + s.handle429(ctx, account, headers, responseBody) shouldDisable = false case 529: s.handle529(ctx, account) @@ -333,12 +333,20 @@ func (s *RateLimitService) handleCustomErrorCode(ctx context.Context, account *A // handle429 处理429限流错误 // 解析响应头获取重置时间,标记账号为限流状态 -func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header) { +func (s *RateLimitService) handle429(ctx context.Context, account *Account, headers http.Header, responseBody []byte) { // 解析重置时间戳 resetTimestamp := headers.Get("anthropic-ratelimit-unified-reset") if resetTimestamp == "" { // 没有重置时间,使用默认5分钟 resetAt := time.Now().Add(5 * time.Minute) + if s.shouldScopeClaudeSonnetRateLimit(account, responseBody) { + if err := s.accountRepo.SetModelRateLimit(ctx, account.ID, modelRateLimitScopeClaudeSonnet, resetAt); err != nil { + slog.Warn("model_rate_limit_set_failed", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "error", err) + } else { + slog.Info("account_model_rate_limited", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "reset_at", resetAt) + } + return + } if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetAt); err != nil { slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err) } @@ -350,6 +358,14 @@ func (s *RateLimitService) handle429(ctx context.Context, account *Account, head if err != nil { slog.Warn("rate_limit_reset_parse_failed", "reset_timestamp", resetTimestamp, "error", err) resetAt := time.Now().Add(5 * time.Minute) + if s.shouldScopeClaudeSonnetRateLimit(account, responseBody) { + if err := s.accountRepo.SetModelRateLimit(ctx, account.ID, modelRateLimitScopeClaudeSonnet, resetAt); err != nil { + slog.Warn("model_rate_limit_set_failed", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "error", err) + } else { + slog.Info("account_model_rate_limited", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "reset_at", resetAt) + } + return + } if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetAt); err != nil { slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err) } @@ -358,6 +374,15 @@ func (s *RateLimitService) handle429(ctx context.Context, account *Account, head resetAt := time.Unix(ts, 0) + if s.shouldScopeClaudeSonnetRateLimit(account, responseBody) { + if err := s.accountRepo.SetModelRateLimit(ctx, account.ID, modelRateLimitScopeClaudeSonnet, resetAt); err != nil { + slog.Warn("model_rate_limit_set_failed", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "error", err) + return + } + slog.Info("account_model_rate_limited", "account_id", account.ID, "scope", modelRateLimitScopeClaudeSonnet, "reset_at", resetAt) + return + } + // 标记限流状态 if err := s.accountRepo.SetRateLimited(ctx, account.ID, resetAt); err != nil { slog.Warn("rate_limit_set_failed", "account_id", account.ID, "error", err) @@ -374,6 +399,17 @@ func (s *RateLimitService) handle429(ctx context.Context, account *Account, head slog.Info("account_rate_limited", "account_id", account.ID, "reset_at", resetAt) } +func (s *RateLimitService) shouldScopeClaudeSonnetRateLimit(account *Account, responseBody []byte) bool { + if account == nil || account.Platform != PlatformAnthropic { + return false + } + msg := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(responseBody))) + if msg == "" { + return false + } + return strings.Contains(msg, "sonnet") +} + // handle529 处理529过载错误 // 根据配置设置过载冷却时间 func (s *RateLimitService) handle529(ctx context.Context, account *Account) { @@ -431,7 +467,10 @@ func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) if err := s.accountRepo.ClearRateLimit(ctx, accountID); err != nil { return err } - return s.accountRepo.ClearAntigravityQuotaScopes(ctx, accountID) + if err := s.accountRepo.ClearAntigravityQuotaScopes(ctx, accountID); err != nil { + return err + } + return s.accountRepo.ClearModelRateLimits(ctx, accountID) } func (s *RateLimitService) ClearTempUnschedulable(ctx context.Context, accountID int64) error {