From 3ee6f085db25ae699f1c81e0ca0906eeea6d9fd2 Mon Sep 17 00:00:00 2001 From: erio Date: Tue, 24 Mar 2026 20:33:11 +0800 Subject: [PATCH] refactor: extract internal500 penalty logic to dedicated file Move constants, detection, and penalty functions from antigravity_gateway_service.go to antigravity_internal500_penalty.go. Fix gofmt alignment and replace hardcoded duration strings with constant references. --- .../repository/internal500_counter_cache.go | 2 +- .../service/antigravity_gateway_service.go | 74 +------------- .../antigravity_internal500_penalty.go | 97 +++++++++++++++++++ 3 files changed, 102 insertions(+), 71 deletions(-) create mode 100644 backend/internal/service/antigravity_internal500_penalty.go diff --git a/backend/internal/repository/internal500_counter_cache.go b/backend/internal/repository/internal500_counter_cache.go index 5b9071a8..13b0faa8 100644 --- a/backend/internal/repository/internal500_counter_cache.go +++ b/backend/internal/repository/internal500_counter_cache.go @@ -9,7 +9,7 @@ import ( ) const ( - internal500CounterPrefix = "internal500_count:account:" + internal500CounterPrefix = "internal500_count:account:" internal500CounterTTLSeconds = 86400 // 24 小时兜底 ) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 6ba94ee2..a76e59fb 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -71,11 +71,6 @@ const ( // MODEL_CAPACITY_EXHAUSTED 全局去重:重试全部失败后的 cooldown 时间 antigravityModelCapacityCooldown = 10 * time.Second - - // INTERNAL 500 渐进惩罚:连续多轮全部返回特定 500 错误时的惩罚时长 - internal500PenaltyTier1Duration = 10 * time.Minute - internal500PenaltyTier2Duration = 10 * time.Hour - internal500PenaltyTier3Threshold = 3 // 第 3+ 轮:永久禁用 ) // antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写) @@ -781,16 +776,8 @@ urlFallbackLoop: } // INTERNAL 500 渐进惩罚:3 次重试全部命中特定 500 时递增计数器并惩罚 - if allAttemptsInternal500 && - isAntigravityInternalServerError(resp.StatusCode, respBody) && - s.internal500Cache != nil { - count, incrErr := s.internal500Cache.IncrementInternal500Count(p.ctx, p.account.ID) - if incrErr != nil { - slog.Error("internal500_counter_increment_failed", - "prefix", p.prefix, "account_id", p.account.ID, "error", incrErr) - } else { - s.applyInternal500Penalty(p.ctx, p.prefix, p.account, count) - } + if allAttemptsInternal500 && isAntigravityInternalServerError(resp.StatusCode, respBody) { + s.handleInternal500RetryExhausted(p.ctx, p.prefix, p.account) } // 其他 4xx 错误或重试用尽,直接返回 @@ -812,11 +799,8 @@ urlFallbackLoop: } // 成功响应时清零 INTERNAL 500 连续失败计数器(覆盖所有成功路径,含 smart retry) - if resp != nil && resp.StatusCode < 400 && s.internal500Cache != nil { - if err := s.internal500Cache.ResetInternal500Count(p.ctx, p.account.ID); err != nil { - slog.Error("internal500_counter_reset_failed", - "prefix", p.prefix, "account_id", p.account.ID, "error", err) - } + if resp != nil && resp.StatusCode < 400 { + s.resetInternal500Counter(p.ctx, p.prefix, p.account.ID) } return &antigravityRetryLoopResult{resp: resp}, nil @@ -832,56 +816,6 @@ func shouldRetryAntigravityError(statusCode int) bool { } } -// isAntigravityInternalServerError 检测特定的 INTERNAL 500 错误 -// 必须同时匹配 error.code==500, error.message=="Internal error encountered.", error.status=="INTERNAL" -func isAntigravityInternalServerError(statusCode int, body []byte) bool { - if statusCode != http.StatusInternalServerError { - return false - } - return gjson.GetBytes(body, "error.code").Int() == 500 && - gjson.GetBytes(body, "error.message").String() == "Internal error encountered." && - gjson.GetBytes(body, "error.status").String() == "INTERNAL" -} - -// applyInternal500Penalty 根据连续 INTERNAL 500 轮次数应用渐进惩罚 -// count=1: temp_unschedulable 10 分钟 -// count=2: temp_unschedulable 10 小时 -// count>=3: SetError 永久禁用 -func (s *AntigravityGatewayService) applyInternal500Penalty( - ctx context.Context, prefix string, account *Account, count int64, -) { - switch { - case count >= int64(internal500PenaltyTier3Threshold): - reason := fmt.Sprintf("INTERNAL 500 consecutive failures: %d rounds", count) - if err := s.accountRepo.SetError(ctx, account.ID, reason); err != nil { - slog.Error("internal500_set_error_failed", "account_id", account.ID, "error", err) - return - } - slog.Warn("internal500_account_disabled", - "account_id", account.ID, "account_name", account.Name, "consecutive_count", count) - case count == 2: - until := time.Now().Add(internal500PenaltyTier2Duration) - reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10h)", count) - if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil { - slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err) - return - } - slog.Warn("internal500_temp_unschedulable", - "account_id", account.ID, "account_name", account.Name, - "duration", internal500PenaltyTier2Duration, "consecutive_count", count) - case count == 1: - until := time.Now().Add(internal500PenaltyTier1Duration) - reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched 10m)", count) - if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil { - slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err) - return - } - slog.Info("internal500_temp_unschedulable", - "account_id", account.ID, "account_name", account.Name, - "duration", internal500PenaltyTier1Duration, "consecutive_count", count) - } -} - // isURLLevelRateLimit 判断是否为 URL 级别的限流(应切换 URL 重试) // "Resource has been exhausted" 是 URL/节点级别限流,切换 URL 可能成功 // "exhausted your capacity on this model" 是账户/模型配额限流,切换 URL 无效 diff --git a/backend/internal/service/antigravity_internal500_penalty.go b/backend/internal/service/antigravity_internal500_penalty.go new file mode 100644 index 00000000..c6985754 --- /dev/null +++ b/backend/internal/service/antigravity_internal500_penalty.go @@ -0,0 +1,97 @@ +package service + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/tidwall/gjson" +) + +// INTERNAL 500 渐进惩罚:连续多轮全部返回特定 500 错误时的惩罚时长 +const ( + internal500PenaltyTier1Duration = 10 * time.Minute // 第 1 轮:临时不可调度 10 分钟 + internal500PenaltyTier2Duration = 10 * time.Hour // 第 2 轮:临时不可调度 10 小时 + internal500PenaltyTier3Threshold = 3 // 第 3+ 轮:永久禁用 +) + +// isAntigravityInternalServerError 检测特定的 INTERNAL 500 错误 +// 必须同时匹配 error.code==500, error.message=="Internal error encountered.", error.status=="INTERNAL" +func isAntigravityInternalServerError(statusCode int, body []byte) bool { + if statusCode != http.StatusInternalServerError { + return false + } + return gjson.GetBytes(body, "error.code").Int() == 500 && + gjson.GetBytes(body, "error.message").String() == "Internal error encountered." && + gjson.GetBytes(body, "error.status").String() == "INTERNAL" +} + +// applyInternal500Penalty 根据连续 INTERNAL 500 轮次数应用渐进惩罚 +// count=1: temp_unschedulable 10 分钟 +// count=2: temp_unschedulable 10 小时 +// count>=3: SetError 永久禁用 +func (s *AntigravityGatewayService) applyInternal500Penalty( + ctx context.Context, prefix string, account *Account, count int64, +) { + switch { + case count >= int64(internal500PenaltyTier3Threshold): + reason := fmt.Sprintf("INTERNAL 500 consecutive failures: %d rounds", count) + if err := s.accountRepo.SetError(ctx, account.ID, reason); err != nil { + slog.Error("internal500_set_error_failed", "account_id", account.ID, "error", err) + return + } + slog.Warn("internal500_account_disabled", + "account_id", account.ID, "account_name", account.Name, "consecutive_count", count) + case count == 2: + until := time.Now().Add(internal500PenaltyTier2Duration) + reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched %v)", count, internal500PenaltyTier2Duration) + if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil { + slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err) + return + } + slog.Warn("internal500_temp_unschedulable", + "account_id", account.ID, "account_name", account.Name, + "duration", internal500PenaltyTier2Duration, "consecutive_count", count) + case count == 1: + until := time.Now().Add(internal500PenaltyTier1Duration) + reason := fmt.Sprintf("INTERNAL 500 x%d (temp unsched %v)", count, internal500PenaltyTier1Duration) + if err := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); err != nil { + slog.Error("internal500_temp_unsched_failed", "account_id", account.ID, "error", err) + return + } + slog.Info("internal500_temp_unschedulable", + "account_id", account.ID, "account_name", account.Name, + "duration", internal500PenaltyTier1Duration, "consecutive_count", count) + } +} + +// handleInternal500RetryExhausted 处理 INTERNAL 500 重试耗尽:递增计数器并应用惩罚 +func (s *AntigravityGatewayService) handleInternal500RetryExhausted( + ctx context.Context, prefix string, account *Account, +) { + if s.internal500Cache == nil { + return + } + count, err := s.internal500Cache.IncrementInternal500Count(ctx, account.ID) + if err != nil { + slog.Error("internal500_counter_increment_failed", + "prefix", prefix, "account_id", account.ID, "error", err) + return + } + s.applyInternal500Penalty(ctx, prefix, account, count) +} + +// resetInternal500Counter 成功响应时清零 INTERNAL 500 计数器 +func (s *AntigravityGatewayService) resetInternal500Counter( + ctx context.Context, prefix string, accountID int64, +) { + if s.internal500Cache == nil { + return + } + if err := s.internal500Cache.ResetInternal500Count(ctx, accountID); err != nil { + slog.Error("internal500_counter_reset_failed", + "prefix", prefix, "account_id", accountID, "error", err) + } +}