diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 6900fa55..82181948 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -245,6 +245,19 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } if lastFailoverErr != nil { h.handleFailoverExhausted(c, lastFailoverErr, service.PlatformGemini, streamStarted) } else { @@ -412,6 +425,19 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } if lastFailoverErr != nil { h.handleFailoverExhausted(c, lastFailoverErr, platform, streamStarted) } else { @@ -838,6 +864,27 @@ func sleepFailoverDelay(ctx context.Context, switchCount int) bool { } } +// sleepAntigravitySingleAccountBackoff Antigravity 平台单账号分组的 503 退避重试延时。 +// 当分组内只有一个可用账号且上游返回 503(MODEL_CAPACITY_EXHAUSTED)时使用, +// 采用短固定延时策略。Service 层在 SingleAccountRetry 模式下已经做了充分的原地重试 +// (最多 3 次、总等待 30s),所以 Handler 层的退避只需短暂等待即可。 +// 返回 false 表示 context 已取消。 +func sleepAntigravitySingleAccountBackoff(ctx context.Context, retryCount int) bool { + // 固定短延时:2s + // Service 层已经在原地等待了足够长的时间(retryDelay × 重试次数), + // Handler 层只需短暂间隔后重新进入 Service 层即可。 + const delay = 2 * time.Second + + log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)", delay, retryCount) + + select { + case <-ctx.Done(): + return false + case <-time.After(delay): + return true + } +} + func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) { statusCode := failoverErr.StatusCode responseBody := failoverErr.ResponseBody diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index d5149f22..2b67cb1f 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -334,6 +334,19 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } h.handleGeminiFailoverExhausted(c, lastFailoverErr) return } diff --git a/backend/internal/pkg/ctxkey/ctxkey.go b/backend/internal/pkg/ctxkey/ctxkey.go index 9bf563e7..0c4d82f7 100644 --- a/backend/internal/pkg/ctxkey/ctxkey.go +++ b/backend/internal/pkg/ctxkey/ctxkey.go @@ -28,4 +28,8 @@ const ( // IsMaxTokensOneHaikuRequest 标识当前请求是否为 max_tokens=1 + haiku 模型的探测请求 // 用于 ClaudeCodeOnly 验证绕过(绕过 system prompt 检查,但仍需验证 User-Agent) IsMaxTokensOneHaikuRequest Key = "ctx_is_max_tokens_one_haiku" + + // SingleAccountRetry 标识当前请求处于单账号 503 退避重试模式。 + // 在此模式下,Service 层的模型限流预检查将等待限流过期而非直接切换账号。 + SingleAccountRetry Key = "ctx_single_account_retry" ) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 014b3c86..11f975fe 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" "github.com/gin-gonic/gin" "github.com/google/uuid" ) @@ -46,6 +47,23 @@ const ( googleRPCTypeErrorInfo = "type.googleapis.com/google.rpc.ErrorInfo" googleRPCReasonModelCapacityExhausted = "MODEL_CAPACITY_EXHAUSTED" googleRPCReasonRateLimitExceeded = "RATE_LIMIT_EXCEEDED" + + // 单账号 503 退避重试:预检查中等待模型限流过期的最大时间 + // 超过此值的限流将直接切换账号(避免请求等待过久) + antigravitySingleAccountMaxWait = 30 * time.Second + + // 单账号 503 退避重试:Service 层原地重试的最大次数 + // 在 handleSmartRetry 中,对于 shouldRateLimitModel(长延迟 ≥ 7s)的情况, + // 多账号模式下会设限流+切换账号;但单账号模式下改为原地等待+重试。 + antigravitySingleAccountSmartRetryMaxAttempts = 3 + + // 单账号 503 退避重试:原地重试时单次最大等待时间 + // 防止上游返回过长的 retryDelay 导致请求卡住太久 + antigravitySingleAccountSmartRetryMaxWait = 15 * time.Second + + // 单账号 503 退避重试:原地重试的总累计等待时间上限 + // 超过此上限将不再重试,直接返回 503 + antigravitySingleAccountSmartRetryTotalMaxWait = 30 * time.Second ) // antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写) @@ -148,6 +166,13 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam // 情况1: retryDelay >= 阈值,限流模型并切换账号 if shouldRateLimitModel { + // 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + // 多账号场景下切换账号是最优选择,但单账号场景下设限流毫无意义(只会导致双重等待)。 + if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) { + return s.handleSingleAccountRetryInPlace(p, resp, respBody, baseURL, waitDuration, modelName) + } + rateLimitDuration := waitDuration if rateLimitDuration <= 0 { rateLimitDuration = antigravityDefaultRateLimitDuration @@ -236,7 +261,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam } } - // 所有重试都失败,限流当前模型并切换账号 + // 所有重试都失败 rateLimitDuration := waitDuration if rateLimitDuration <= 0 { rateLimitDuration = antigravityDefaultRateLimitDuration @@ -245,6 +270,22 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam if retryBody == nil { retryBody = respBody } + + // 单账号 503 退避重试模式:智能重试耗尽后不设限流、不切换账号, + // 直接返回 503 让 Handler 层的单账号退避循环做最终处理。 + if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) { + log.Printf("%s status=%d smart_retry_exhausted_single_account attempts=%d model=%s account=%d body=%s (return 503 directly)", + p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, truncateForLog(retryBody, 200)) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(retryBody)), + }, + } + } + log.Printf("%s status=%d smart_retry_exhausted attempts=%d model=%s account=%d upstream_retry_delay=%v body=%s (switch account)", p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, rateLimitDuration, truncateForLog(retryBody, 200)) @@ -279,17 +320,152 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam return &smartRetryResult{action: smartRetryActionContinue} } +// handleSingleAccountRetryInPlace 单账号 503 退避重试的原地重试逻辑。 +// +// 在多账号场景下,收到 503 + 长 retryDelay(≥ 7s)时会设置模型限流 + 切换账号; +// 但在单账号场景下,设限流毫无意义(因为切换回来的还是同一个账号,还要等限流过期)。 +// 此方法改为在 Service 层原地等待 + 重试,避免双重等待问题: +// +// 旧流程:Service 设限流 → Handler 退避等待 → Service 等限流过期 → 再请求(总耗时 = 退避 + 限流) +// 新流程:Service 直接等 retryDelay → 重试 → 成功/再等 → 重试...(总耗时 ≈ 实际 retryDelay × 重试次数) +// +// 约束: +// - 单次等待不超过 antigravitySingleAccountSmartRetryMaxWait +// - 总累计等待不超过 antigravitySingleAccountSmartRetryTotalMaxWait +// - 最多重试 antigravitySingleAccountSmartRetryMaxAttempts 次 +func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace( + p antigravityRetryLoopParams, + resp *http.Response, + respBody []byte, + baseURL string, + waitDuration time.Duration, + modelName string, +) *smartRetryResult { + // 限制单次等待时间 + if waitDuration > antigravitySingleAccountSmartRetryMaxWait { + waitDuration = antigravitySingleAccountSmartRetryMaxWait + } + if waitDuration < antigravitySmartRetryMinWait { + waitDuration = antigravitySmartRetryMinWait + } + + log.Printf("%s status=%d single_account_503_retry_in_place model=%s account=%d upstream_retry_delay=%v (retrying in-place instead of rate-limiting)", + p.prefix, resp.StatusCode, modelName, p.account.ID, waitDuration) + + var lastRetryResp *http.Response + var lastRetryBody []byte + totalWaited := time.Duration(0) + + for attempt := 1; attempt <= antigravitySingleAccountSmartRetryMaxAttempts; attempt++ { + // 检查累计等待是否超限 + if totalWaited+waitDuration > antigravitySingleAccountSmartRetryTotalMaxWait { + remaining := antigravitySingleAccountSmartRetryTotalMaxWait - totalWaited + if remaining <= 0 { + log.Printf("%s single_account_503_retry: total_wait_exceeded total=%v max=%v, giving up", + p.prefix, totalWaited, antigravitySingleAccountSmartRetryTotalMaxWait) + break + } + waitDuration = remaining + } + + log.Printf("%s status=%d single_account_503_retry attempt=%d/%d delay=%v total_waited=%v model=%s account=%d", + p.prefix, resp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, waitDuration, totalWaited, modelName, p.account.ID) + + select { + case <-p.ctx.Done(): + log.Printf("%s status=context_canceled_during_single_account_retry", p.prefix) + return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()} + case <-time.After(waitDuration): + } + totalWaited += waitDuration + + // 创建新请求 + retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) + if err != nil { + log.Printf("%s single_account_503_retry: request_build_failed error=%v", p.prefix, err) + break + } + + retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency) + if retryErr == nil && retryResp != nil && retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable { + log.Printf("%s status=%d single_account_503_retry_success attempt=%d/%d total_waited=%v", + p.prefix, retryResp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited) + // 关闭之前的响应 + if lastRetryResp != nil { + _ = lastRetryResp.Body.Close() + } + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 网络错误时继续重试 + if retryErr != nil || retryResp == nil { + log.Printf("%s single_account_503_retry: network_error attempt=%d/%d error=%v", + p.prefix, attempt, antigravitySingleAccountSmartRetryMaxAttempts, retryErr) + continue + } + + // 关闭之前的响应 + if lastRetryResp != nil { + _ = lastRetryResp.Body.Close() + } + lastRetryResp = retryResp + lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) + _ = retryResp.Body.Close() + + // 解析新的重试信息,更新下次等待时间 + if attempt < antigravitySingleAccountSmartRetryMaxAttempts && lastRetryBody != nil { + _, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) + if newWaitDuration > 0 { + waitDuration = newWaitDuration + if waitDuration > antigravitySingleAccountSmartRetryMaxWait { + waitDuration = antigravitySingleAccountSmartRetryMaxWait + } + if waitDuration < antigravitySmartRetryMinWait { + waitDuration = antigravitySmartRetryMinWait + } + } + } + } + + // 所有重试都失败,不设限流,直接返回 503 + // Handler 层的单账号退避循环会做最终处理 + retryBody := lastRetryBody + if retryBody == nil { + retryBody = respBody + } + log.Printf("%s status=%d single_account_503_retry_exhausted attempts=%d total_waited=%v model=%s account=%d body=%s (return 503 directly)", + p.prefix, resp.StatusCode, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited, modelName, p.account.ID, truncateForLog(retryBody, 200)) + + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(retryBody)), + }, + } +} + // antigravityRetryLoop 执行带 URL fallback 的重试循环 func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { // 预检查:如果账号已限流,直接返回切换信号 if p.requestedModel != "" { if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 { - log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", - p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) - return nil, &AntigravityAccountSwitchError{ - OriginalAccountID: p.account.ID, - RateLimitedModel: p.requestedModel, - IsStickySession: p.isStickySession, + // 单账号 503 退避重试模式:跳过限流预检查,直接发请求。 + // 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。 + // 如果上游确实还不可用,handleSmartRetry → handleSingleAccountRetryInPlace + // 会在 Service 层原地等待+重试,不需要在预检查这里等。 + if isSingleAccountRetry(p.ctx) { + log.Printf("%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + } else { + log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + return nil, &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: p.requestedModel, + IsStickySession: p.isStickySession, + } } } } @@ -1943,6 +2119,12 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { } } +// isSingleAccountRetry 检查 context 中是否设置了单账号退避重试标记 +func isSingleAccountRetry(ctx context.Context) bool { + v, _ := ctx.Value(ctxkey.SingleAccountRetry).(bool) + return v +} + // setModelRateLimitByModelName 使用官方模型 ID 设置模型级限流 // 直接使用上游返回的模型 ID(如 claude-sonnet-4-5)作为限流 key // 返回是否已成功设置(若模型名为空或 repo 为 nil 将返回 false)