From da1f3d61becc3cc35f244c691b38576bb0728afe Mon Sep 17 00:00:00 2001 From: song Date: Fri, 9 Jan 2026 17:35:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20antigravity=20=E9=85=8D=E9=A2=9D?= =?UTF-8?q?=E5=9F=9F=E9=99=90=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/repository/account_repo.go | 55 ++++++++++++ backend/internal/service/account_service.go | 2 + .../service/account_service_delete_test.go | 8 ++ .../service/antigravity_gateway_service.go | 30 +++++-- .../service/antigravity_quota_scope.go | 88 +++++++++++++++++++ .../service/gateway_multiplatform_test.go | 6 ++ backend/internal/service/gateway_service.go | 15 +++- .../service/gemini_messages_compat_service.go | 5 +- .../service/gemini_multiplatform_test.go | 6 ++ backend/internal/service/ratelimit_service.go | 7 +- 10 files changed, 207 insertions(+), 15 deletions(-) create mode 100644 backend/internal/service/antigravity_quota_scope.go diff --git a/backend/internal/repository/account_repo.go b/backend/internal/repository/account_repo.go index 83f02608..30a783bc 100644 --- a/backend/internal/repository/account_repo.go +++ b/backend/internal/repository/account_repo.go @@ -675,6 +675,40 @@ func (r *accountRepository) SetRateLimited(ctx context.Context, id int64, resetA return err } +func (r *accountRepository) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope service.AntigravityQuotaScope, resetAt time.Time) error { + now := time.Now().UTC() + payload := map[string]string{ + "rate_limited_at": now.Format(time.RFC3339), + "rate_limit_reset_at": resetAt.UTC().Format(time.RFC3339), + } + raw, err := json.Marshal(payload) + if err != nil { + return err + } + + path := "{antigravity_quota_scopes," + string(scope) + "}" + client := clientFromContext(ctx, r.client) + result, err := client.ExecContext( + ctx, + "UPDATE accounts SET extra = jsonb_set(COALESCE(extra, '{}'::jsonb), $1::text[], $2::jsonb, true), updated_at = NOW() WHERE id = $3 AND deleted_at IS NULL", + path, + raw, + id, + ) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return service.ErrAccountNotFound + } + return nil +} + func (r *accountRepository) SetOverloaded(ctx context.Context, id int64, until time.Time) error { _, err := r.client.Account.Update(). Where(dbaccount.IDEQ(id)). @@ -718,6 +752,27 @@ func (r *accountRepository) ClearRateLimit(ctx context.Context, id int64) error return err } +func (r *accountRepository) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { + client := clientFromContext(ctx, r.client) + result, err := client.ExecContext( + ctx, + "UPDATE accounts SET extra = COALESCE(extra, '{}'::jsonb) - 'antigravity_quota_scopes', updated_at = NOW() WHERE id = $1 AND deleted_at IS NULL", + id, + ) + if err != nil { + return err + } + + affected, err := result.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return service.ErrAccountNotFound + } + return nil +} + func (r *accountRepository) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { builder := r.client.Account.Update(). Where(dbaccount.IDEQ(id)). diff --git a/backend/internal/service/account_service.go b/backend/internal/service/account_service.go index e1b93fcb..de32cfeb 100644 --- a/backend/internal/service/account_service.go +++ b/backend/internal/service/account_service.go @@ -49,10 +49,12 @@ type AccountRepository interface { ListSchedulableByGroupIDAndPlatforms(ctx context.Context, groupID int64, platforms []string) ([]Account, error) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error + SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error SetOverloaded(ctx context.Context, id int64, until time.Time) error SetTempUnschedulable(ctx context.Context, id int64, until time.Time, reason string) error ClearTempUnschedulable(ctx context.Context, id int64) error ClearRateLimit(ctx context.Context, id int64) error + ClearAntigravityQuotaScopes(ctx context.Context, id int64) error UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error UpdateExtra(ctx context.Context, id int64, updates map[string]any) error BulkUpdate(ctx context.Context, ids []int64, updates AccountBulkUpdate) (int64, error) diff --git a/backend/internal/service/account_service_delete_test.go b/backend/internal/service/account_service_delete_test.go index edad8672..6923067d 100644 --- a/backend/internal/service/account_service_delete_test.go +++ b/backend/internal/service/account_service_delete_test.go @@ -139,6 +139,10 @@ func (s *accountRepoStub) SetRateLimited(ctx context.Context, id int64, resetAt panic("unexpected SetRateLimited call") } +func (s *accountRepoStub) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { + panic("unexpected SetAntigravityQuotaScopeLimit call") +} + func (s *accountRepoStub) SetOverloaded(ctx context.Context, id int64, until time.Time) error { panic("unexpected SetOverloaded call") } @@ -155,6 +159,10 @@ func (s *accountRepoStub) ClearRateLimit(ctx context.Context, id int64) error { panic("unexpected ClearRateLimit call") } +func (s *accountRepoStub) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { + panic("unexpected ClearAntigravityQuotaScopes call") +} + func (s *accountRepoStub) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { panic("unexpected UpdateSessionWindow call") } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index aabeea16..fe4eb621 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -451,6 +451,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, originalModel := claudeReq.Model mappedModel := s.getMappedModel(account, claudeReq.Model) + quotaScope, _ := resolveAntigravityQuotaScope(originalModel) // 获取 access_token if s.tokenProvider == nil { @@ -529,7 +530,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, } // 所有重试都失败,标记限流状态 if resp.StatusCode == 429 { - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) } // 最后一次尝试也失败 resp = &http.Response{ @@ -621,7 +622,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, // 处理错误响应(重试后仍失败或不触发重试) if resp.StatusCode >= 400 { - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) if s.shouldFailoverUpstreamError(resp.StatusCode) { return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} @@ -946,6 +947,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co if len(body) == 0 { return nil, s.writeGoogleError(c, http.StatusBadRequest, "Request body is empty") } + quotaScope, _ := resolveAntigravityQuotaScope(originalModel) // 解析请求以获取 image_size(用于图片计费) imageSize := s.extractImageSize(body) @@ -1048,7 +1050,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co } // 所有重试都失败,标记限流状态 if resp.StatusCode == 429 { - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) } resp = &http.Response{ StatusCode: resp.StatusCode, @@ -1101,7 +1103,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co goto handleSuccess } - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody) + s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) if s.shouldFailoverUpstreamError(resp.StatusCode) { return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} @@ -1215,7 +1217,7 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { } } -func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte) { +func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope) { // 429 使用 Gemini 格式解析(从 body 解析重置时间) if statusCode == 429 { resetAt := ParseGeminiRateLimitResetTime(body) @@ -1226,13 +1228,23 @@ func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, pre defaultDur = 5 * time.Minute } ra := time.Now().Add(defaultDur) - log.Printf("%s status=429 rate_limited reset_in=%v (fallback)", prefix, defaultDur) - _ = s.accountRepo.SetRateLimited(ctx, account.ID, ra) + log.Printf("%s status=429 rate_limited scope=%s reset_in=%v (fallback)", prefix, quotaScope, defaultDur) + if quotaScope == "" { + return + } + if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, ra); err != nil { + log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err) + } return } resetTime := time.Unix(*resetAt, 0) - log.Printf("%s status=429 rate_limited reset_at=%v reset_in=%v", prefix, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second)) - _ = s.accountRepo.SetRateLimited(ctx, account.ID, resetTime) + log.Printf("%s status=429 rate_limited scope=%s reset_at=%v reset_in=%v", prefix, quotaScope, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second)) + if quotaScope == "" { + return + } + if err := s.accountRepo.SetAntigravityQuotaScopeLimit(ctx, account.ID, quotaScope, resetTime); err != nil { + log.Printf("%s status=429 rate_limit_set_failed scope=%s error=%v", prefix, quotaScope, err) + } return } // 其他错误码继续使用 rateLimitService diff --git a/backend/internal/service/antigravity_quota_scope.go b/backend/internal/service/antigravity_quota_scope.go new file mode 100644 index 00000000..e9f7184b --- /dev/null +++ b/backend/internal/service/antigravity_quota_scope.go @@ -0,0 +1,88 @@ +package service + +import ( + "strings" + "time" +) + +const antigravityQuotaScopesKey = "antigravity_quota_scopes" + +// AntigravityQuotaScope 表示 Antigravity 的配额域 +type AntigravityQuotaScope string + +const ( + AntigravityQuotaScopeClaude AntigravityQuotaScope = "claude" + AntigravityQuotaScopeGeminiText AntigravityQuotaScope = "gemini_text" + AntigravityQuotaScopeGeminiImage AntigravityQuotaScope = "gemini_image" +) + +// resolveAntigravityQuotaScope 根据模型名称解析配额域 +func resolveAntigravityQuotaScope(requestedModel string) (AntigravityQuotaScope, bool) { + model := normalizeAntigravityModelName(requestedModel) + if model == "" { + return "", false + } + switch { + case strings.HasPrefix(model, "claude-"): + return AntigravityQuotaScopeClaude, true + case strings.HasPrefix(model, "gemini-"): + if isImageGenerationModel(model) { + return AntigravityQuotaScopeGeminiImage, true + } + return AntigravityQuotaScopeGeminiText, true + default: + return "", false + } +} + +func normalizeAntigravityModelName(model string) string { + normalized := strings.ToLower(strings.TrimSpace(model)) + normalized = strings.TrimPrefix(normalized, "models/") + return normalized +} + +// IsSchedulableForModel 结合 Antigravity 配额域限流判断是否可调度 +func (a *Account) IsSchedulableForModel(requestedModel string) bool { + if a == nil { + return false + } + if !a.IsSchedulable() { + return false + } + if a.Platform != PlatformAntigravity { + return true + } + scope, ok := resolveAntigravityQuotaScope(requestedModel) + if !ok { + return true + } + resetAt := a.antigravityQuotaScopeResetAt(scope) + if resetAt == nil { + return true + } + now := time.Now() + return !now.Before(*resetAt) +} + +func (a *Account) antigravityQuotaScopeResetAt(scope AntigravityQuotaScope) *time.Time { + if a == nil || a.Extra == nil || scope == "" { + return nil + } + rawScopes, ok := a.Extra[antigravityQuotaScopesKey].(map[string]any) + if !ok { + return nil + } + rawScope, ok := rawScopes[string(scope)].(map[string]any) + if !ok { + return nil + } + resetAtRaw, ok := rawScope["rate_limit_reset_at"].(string) + if !ok || strings.TrimSpace(resetAtRaw) == "" { + return nil + } + resetAt, err := time.Parse(time.RFC3339, resetAtRaw) + if err != nil { + return nil + } + return &resetAt +} diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 47279581..8f29e07c 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -136,6 +136,9 @@ func (m *mockAccountRepoForPlatform) ListSchedulableByGroupIDAndPlatforms(ctx co func (m *mockAccountRepoForPlatform) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { return nil } +func (m *mockAccountRepoForPlatform) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { + return nil +} func (m *mockAccountRepoForPlatform) SetOverloaded(ctx context.Context, id int64, until time.Time) error { return nil } @@ -148,6 +151,9 @@ func (m *mockAccountRepoForPlatform) ClearTempUnschedulable(ctx context.Context, func (m *mockAccountRepoForPlatform) ClearRateLimit(ctx context.Context, id int64) error { return nil } +func (m *mockAccountRepoForPlatform) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { + return nil +} func (m *mockAccountRepoForPlatform) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { return nil } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 98c061d4..209e4dee 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -448,7 +448,7 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro account, err := s.accountRepo.GetByID(ctx, accountID) if err == nil && s.isAccountInGroup(account, groupID) && s.isAccountAllowedForPlatform(account, platform, useMixed) && - account.IsSchedulable() && + account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if err == nil && result.Acquired { @@ -486,6 +486,9 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if !s.isAccountAllowedForPlatform(acc, platform, useMixed) { continue } + if !acc.IsSchedulableForModel(requestedModel) { + continue + } if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) { continue } @@ -743,7 +746,7 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if _, excluded := excludedIDs[accountID]; !excluded { account, err := s.accountRepo.GetByID(ctx, accountID) // 检查账号分组归属和平台匹配(确保粘性会话不会跨分组或跨平台) - if err == nil && s.isAccountInGroup(account, groupID) && account.Platform == platform && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { + if err == nil && s.isAccountInGroup(account, groupID) && account.Platform == platform && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { if err := s.cache.RefreshSessionTTL(ctx, sessionHash, stickySessionTTL); err != nil { log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) } @@ -775,6 +778,9 @@ func (s *GatewayService) selectAccountForModelWithPlatform(ctx context.Context, if _, excluded := excludedIDs[acc.ID]; excluded { continue } + if !acc.IsSchedulableForModel(requestedModel) { + continue + } if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) { continue } @@ -832,7 +838,7 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if _, excluded := excludedIDs[accountID]; !excluded { account, err := s.accountRepo.GetByID(ctx, accountID) // 检查账号分组归属和有效性:原生平台直接匹配,antigravity 需要启用混合调度 - if err == nil && s.isAccountInGroup(account, groupID) && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { + if err == nil && s.isAccountInGroup(account, groupID) && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { if account.Platform == nativePlatform || (account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled()) { if err := s.cache.RefreshSessionTTL(ctx, sessionHash, stickySessionTTL); err != nil { log.Printf("refresh session ttl failed: session=%s err=%v", sessionHash, err) @@ -867,6 +873,9 @@ func (s *GatewayService) selectAccountWithMixedScheduling(ctx context.Context, g if acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() { continue } + if !acc.IsSchedulableForModel(requestedModel) { + continue + } if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) { continue } diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index fdf912d0..13f644c8 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -114,7 +114,7 @@ func (s *GeminiMessagesCompatService) SelectAccountForModelWithExclusions(ctx co if _, excluded := excludedIDs[accountID]; !excluded { account, err := s.accountRepo.GetByID(ctx, accountID) // 检查账号是否有效:原生平台直接匹配,antigravity 需要启用混合调度 - if err == nil && account.IsSchedulable() && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { + if err == nil && account.IsSchedulableForModel(requestedModel) && (requestedModel == "" || s.isModelSupportedByAccount(account, requestedModel)) { valid := false if account.Platform == platform { valid = true @@ -172,6 +172,9 @@ func (s *GeminiMessagesCompatService) SelectAccountForModelWithExclusions(ctx co if useMixedScheduling && acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() { continue } + if !acc.IsSchedulableForModel(requestedModel) { + continue + } if requestedModel != "" && !s.isModelSupportedByAccount(acc, requestedModel) { continue } diff --git a/backend/internal/service/gemini_multiplatform_test.go b/backend/internal/service/gemini_multiplatform_test.go index 5070b510..794e56a7 100644 --- a/backend/internal/service/gemini_multiplatform_test.go +++ b/backend/internal/service/gemini_multiplatform_test.go @@ -121,6 +121,9 @@ func (m *mockAccountRepoForGemini) ListSchedulableByGroupIDAndPlatforms(ctx cont func (m *mockAccountRepoForGemini) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { return nil } +func (m *mockAccountRepoForGemini) SetAntigravityQuotaScopeLimit(ctx context.Context, id int64, scope AntigravityQuotaScope, resetAt time.Time) error { + return nil +} func (m *mockAccountRepoForGemini) SetOverloaded(ctx context.Context, id int64, until time.Time) error { return nil } @@ -131,6 +134,9 @@ func (m *mockAccountRepoForGemini) ClearTempUnschedulable(ctx context.Context, i return nil } func (m *mockAccountRepoForGemini) ClearRateLimit(ctx context.Context, id int64) error { return nil } +func (m *mockAccountRepoForGemini) ClearAntigravityQuotaScopes(ctx context.Context, id int64) error { + return nil +} func (m *mockAccountRepoForGemini) UpdateSessionWindow(ctx context.Context, id int64, start, end *time.Time, status string) error { return nil } diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index 196f1643..f1362646 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -345,7 +345,7 @@ func (s *RateLimitService) UpdateSessionWindow(ctx context.Context, account *Acc // 如果状态为allowed且之前有限流,说明窗口已重置,清除限流状态 if status == "allowed" && account.IsRateLimited() { - if err := s.accountRepo.ClearRateLimit(ctx, account.ID); err != nil { + if err := s.ClearRateLimit(ctx, account.ID); err != nil { log.Printf("ClearRateLimit failed for account %d: %v", account.ID, err) } } @@ -353,7 +353,10 @@ func (s *RateLimitService) UpdateSessionWindow(ctx context.Context, account *Acc // ClearRateLimit 清除账号的限流状态 func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) error { - return s.accountRepo.ClearRateLimit(ctx, accountID) + if err := s.accountRepo.ClearRateLimit(ctx, accountID); err != nil { + return err + } + return s.accountRepo.ClearAntigravityQuotaScopes(ctx, accountID) } func (s *RateLimitService) ClearTempUnschedulable(ctx context.Context, accountID int64) error {