From d6c2921f2ba02c4886650620e2b469311779576d Mon Sep 17 00:00:00 2001 From: Edric Li Date: Tue, 10 Feb 2026 00:53:54 +0800 Subject: [PATCH] feat: same-account retry before failover for transient errors For retryable transient errors (Google 400 "invalid project resource name" and empty stream responses), retry on the same account up to 2 times (with 500ms delay) before switching to another account. - Add RetryableOnSameAccount field to UpstreamFailoverError - Add same-account retry loop in both Gemini and Claude/OpenAI handler paths - Move temp-unschedule from service layer to handler layer (only after all same-account retries exhausted) - Reduce temp-unschedule cooldown from 30 minutes to 1 minute --- backend/internal/handler/gateway_handler.go | 57 ++++++++++++++++++- .../service/antigravity_gateway_service.go | 40 ++++++------- backend/internal/service/gateway_service.go | 21 ++++++- .../service/gemini_messages_compat_service.go | 6 +- 4 files changed, 91 insertions(+), 33 deletions(-) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 6900fa55..b5fb379e 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -235,6 +235,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -339,11 +340,28 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if err != nil { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { + return + } + continue + } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} if switchCount >= maxAccountSwitches { h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted) return @@ -400,6 +418,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { maxAccountSwitches := h.maxAccountSwitches switchCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError retryWithFallback := false var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -539,11 +558,28 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { + return + } + continue + } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} if switchCount >= maxAccountSwitches { h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted) return @@ -823,6 +859,23 @@ func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFa return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling) } +const ( + // maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误) + maxSameAccountRetries = 2 + // sameAccountRetryDelay 同账号重试间隔 + sameAccountRetryDelay = 500 * time.Millisecond +) + +// sleepSameAccountRetryDelay 同账号重试固定延时,返回 false 表示 context 已取消。 +func sleepSameAccountRetryDelay(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(sameAccountRetryDelay): + return true + } +} + // sleepFailoverDelay 账号切换线性递增延时:第1次0s、第2次1s、第3次2s… // 返回 false 表示 context 已取消。 func sleepFailoverDelay(ctx context.Context, switchCount int) bool { diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index a5fd1535..9c2b9027 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -1285,7 +1285,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) - // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover if resp.StatusCode == http.StatusBadRequest { msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) if isGoogleProjectConfigError(msg) { @@ -1302,8 +1302,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, Message: upstreamMsg, Detail: upstreamDetail, }) - tempUnscheduleGoogleConfigError(ctx, s.accountRepo, account.ID, prefix) - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} } } @@ -1351,10 +1350,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel) if err != nil { log.Printf("%s status=stream_collect_error error=%v", prefix, err) - var failoverErr *UpstreamFailoverError - if errors.As(err, &failoverErr) && failoverErr.StatusCode == http.StatusBadGateway { - tempUnscheduleEmptyResponse(ctx, s.accountRepo, account.ID, prefix) - } return nil, err } usage = streamRes.usage @@ -1851,7 +1846,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co // Always record upstream context for Ops error logs, even when we will failover. setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) - // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover if resp.StatusCode == http.StatusBadRequest && isGoogleProjectConfigError(strings.ToLower(upstreamMsg)) { log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID) appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ @@ -1864,8 +1859,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co Message: upstreamMsg, Detail: upstreamDetail, }) - tempUnscheduleGoogleConfigError(ctx, s.accountRepo, account.ID, prefix) - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps} + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps, RetryableOnSameAccount: true} } if s.shouldFailoverUpstreamError(resp.StatusCode) { @@ -1924,10 +1918,6 @@ handleSuccess: streamRes, err := s.handleGeminiStreamToNonStreaming(c, resp, startTime) if err != nil { log.Printf("%s status=stream_collect_error error=%v", prefix, err) - var failoverErr *UpstreamFailoverError - if errors.As(err, &failoverErr) && failoverErr.StatusCode == http.StatusBadGateway { - tempUnscheduleEmptyResponse(ctx, s.accountRepo, account.ID, prefix) - } return nil, err } usage = streamRes.usage @@ -1976,13 +1966,13 @@ func isGoogleProjectConfigError(lowerMsg string) bool { } // googleConfigErrorCooldown 服务端配置类 400 错误的临时封禁时长 -const googleConfigErrorCooldown = 30 * time.Minute +const googleConfigErrorCooldown = 1 * time.Minute // tempUnscheduleGoogleConfigError 对服务端配置类 400 错误触发临时封禁, // 避免短时间内反复调度到同一个有问题的账号。 func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { until := time.Now().Add(googleConfigErrorCooldown) - reason := "400: invalid project resource name (auto temp-unschedule 30m)" + reason := "400: invalid project resource name (auto temp-unschedule 1m)" if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) } else { @@ -1991,13 +1981,13 @@ func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository } // emptyResponseCooldown 空流式响应的临时封禁时长 -const emptyResponseCooldown = 30 * time.Minute +const emptyResponseCooldown = 1 * time.Minute // tempUnscheduleEmptyResponse 对空流式响应触发临时封禁, // 避免短时间内反复调度到同一个返回空响应的账号。 func tempUnscheduleEmptyResponse(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { until := time.Now().Add(emptyResponseCooldown) - reason := "empty stream response (auto temp-unschedule 30m)" + reason := "empty stream response (auto temp-unschedule 1m)" if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) } else { @@ -2809,12 +2799,13 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 — 触发 failover 切换账号重试 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { log.Printf("[antigravity-Forward] warning: empty stream response (gemini non-stream), triggering failover") return nil, &UpstreamFailoverError{ - StatusCode: http.StatusBadGateway, - ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, } } @@ -3228,12 +3219,13 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 — 触发 failover 切换账号重试 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { log.Printf("[antigravity-Forward] warning: empty stream response (claude non-stream), triggering failover") return nil, &UpstreamFailoverError{ - StatusCode: http.StatusBadGateway, - ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, } } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 4e723232..01e1acb4 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -362,15 +362,30 @@ type ForwardResult struct { // UpstreamFailoverError indicates an upstream error that should trigger account failover. type UpstreamFailoverError struct { - StatusCode int - ResponseBody []byte // 上游响应体,用于错误透传规则匹配 - ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + StatusCode int + ResponseBody []byte // 上游响应体,用于错误透传规则匹配 + ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应),应在同一账号上重试 N 次再切换 } func (e *UpstreamFailoverError) Error() string { return fmt.Sprintf("upstream error: %d (failover)", e.StatusCode) } +// TempUnscheduleRetryableError 对 RetryableOnSameAccount 类型的 failover 错误触发临时封禁。 +// 由 handler 层在同账号重试全部用尽、切换账号时调用。 +func (s *GatewayService) TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *UpstreamFailoverError) { + if failoverErr == nil || !failoverErr.RetryableOnSameAccount { + return + } + // 根据状态码选择封禁策略 + if failoverErr.StatusCode == http.StatusBadRequest { + tempUnscheduleGoogleConfigError(ctx, s.accountRepo, accountID, "[handler]") + } else if failoverErr.StatusCode == http.StatusBadGateway { + tempUnscheduleEmptyResponse(ctx, s.accountRepo, accountID, "[handler]") + } +} + // GatewayService handles API gateway operations type GatewayService struct { accountRepo AccountRepository diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 1e59c5fd..7fa375ca 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -908,8 +908,7 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex Message: upstreamMsg, Detail: upstreamDetail, }) - tempUnscheduleGoogleConfigError(ctx, s.accountRepo, account.ID, "[Gemini]") - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody} + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} } } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { @@ -1387,8 +1386,7 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. Message: upstreamMsg, Detail: upstreamDetail, }) - tempUnscheduleGoogleConfigError(ctx, s.accountRepo, account.ID, "[Gemini]") - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: evBody} + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: evBody, RetryableOnSameAccount: true} } } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) {