diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 45381d37..7e89c97d 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -28,6 +28,135 @@ const ( antigravityRetryMaxDelay = 16 * time.Second ) +// antigravityRetryLoopParams 重试循环的参数 +type antigravityRetryLoopParams struct { + ctx context.Context + prefix string + account *Account + proxyURL string + accessToken string + action string + body []byte + quotaScope AntigravityQuotaScope + httpUpstream HTTPUpstream + accountRepo AccountRepository + handleError func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope) +} + +// antigravityRetryLoopResult 重试循环的结果 +type antigravityRetryLoopResult struct { + resp *http.Response + usedBaseURL string +} + +// antigravityRetryLoop 执行带 URL fallback 的重试循环 +func antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { + availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs() + if len(availableURLs) == 0 { + availableURLs = antigravity.BaseURLs + } + + var resp *http.Response + var usedBaseURL string + +urlFallbackLoop: + for urlIdx, baseURL := range availableURLs { + usedBaseURL = baseURL + for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { + select { + case <-p.ctx.Done(): + log.Printf("%s status=context_canceled error=%v", p.prefix, p.ctx.Err()) + return nil, p.ctx.Err() + default: + } + + upstreamReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) + if err != nil { + return nil, err + } + + resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency) + if err != nil { + if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { + antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) + log.Printf("%s URL fallback (connection error): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1]) + continue urlFallbackLoop + } + if attempt < antigravityMaxRetries { + log.Printf("%s status=request_failed retry=%d/%d error=%v", p.prefix, attempt, antigravityMaxRetries, err) + if !sleepAntigravityBackoffWithContext(p.ctx, attempt) { + log.Printf("%s status=context_canceled_during_backoff", p.prefix) + return nil, p.ctx.Err() + } + continue + } + log.Printf("%s status=request_failed retries_exhausted error=%v", p.prefix, err) + return nil, fmt.Errorf("upstream request failed after retries: %w", err) + } + + // 429 限流:优先切换 URL,所有 URL 都 429 时才返回 + if resp.StatusCode == http.StatusTooManyRequests { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + + if urlIdx < len(availableURLs)-1 { + antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) + log.Printf("%s URL fallback (429): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1]) + continue urlFallbackLoop + } + + p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope) + log.Printf("%s status=429 rate_limited base_url=%s body=%s", p.prefix, baseURL, truncateForLog(respBody, 200)) + resp = &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(respBody)), + } + break urlFallbackLoop + } + + // 其他可重试错误 + if resp.StatusCode >= 400 && shouldRetryAntigravityError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + + if attempt < antigravityMaxRetries { + log.Printf("%s status=%d retry=%d/%d body=%s", p.prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500)) + if !sleepAntigravityBackoffWithContext(p.ctx, attempt) { + log.Printf("%s status=context_canceled_during_backoff", p.prefix) + return nil, p.ctx.Err() + } + continue + } + resp = &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(respBody)), + } + break urlFallbackLoop + } + + break urlFallbackLoop + } + } + + if resp != nil && resp.StatusCode < 400 && usedBaseURL != "" { + antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL) + } + + return &antigravityRetryLoopResult{resp: resp, usedBaseURL: usedBaseURL}, nil +} + +// shouldRetryAntigravityError 判断是否应该重试 +func shouldRetryAntigravityError(statusCode int) bool { + switch statusCode { + case 429, 500, 502, 503, 504, 529: + return true + default: + return false + } +} + // isAntigravityConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝) func isAntigravityConnectionError(err error) bool { if err == nil { @@ -545,106 +674,26 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回 action := "streamGenerateContent" - // URL fallback 循环 - availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs() - if len(availableURLs) == 0 { - availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有 - } - - // 重试循环 - var resp *http.Response - var usedBaseURL string // 追踪成功使用的 URL -urlFallbackLoop: - for urlIdx, baseURL := range availableURLs { - usedBaseURL = baseURL - for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { - // 检查 context 是否已取消(客户端断开连接) - select { - case <-ctx.Done(): - log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err()) - return nil, ctx.Err() - default: - } - - upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, action, accessToken, geminiBody) - if err != nil { - return nil, err - } - - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) - if err != nil { - // 检查是否应触发 URL 降级 - if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { - antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) - log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1]) - continue urlFallbackLoop - } - if attempt < antigravityMaxRetries { - log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - log.Printf("%s status=context_canceled_during_backoff", prefix) - return nil, ctx.Err() - } - continue - } - log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err) - return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries") - } - - // 429 限流:优先切换 URL,所有 URL 都 429 时才返回 - if resp.StatusCode == http.StatusTooManyRequests { - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - _ = resp.Body.Close() - - // 还有其他 URL,切换重试 - if urlIdx < len(availableURLs)-1 { - antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) - log.Printf("%s URL fallback (429): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1]) - continue urlFallbackLoop - } - - // 所有 URL 都 429,限流账户并返回 - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) - log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200)) - resp = &http.Response{ - StatusCode: resp.StatusCode, - Header: resp.Header.Clone(), - Body: io.NopCloser(bytes.NewReader(respBody)), - } - break urlFallbackLoop - } - - if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) { - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - _ = resp.Body.Close() - - if attempt < antigravityMaxRetries { - log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500)) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - log.Printf("%s status=context_canceled_during_backoff", prefix) - return nil, ctx.Err() - } - continue - } - // 最后一次尝试也失败 - resp = &http.Response{ - StatusCode: resp.StatusCode, - Header: resp.Header.Clone(), - Body: io.NopCloser(bytes.NewReader(respBody)), - } - break urlFallbackLoop - } - - break urlFallbackLoop - } + // 执行带重试的请求 + result, err := antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: ctx, + prefix: prefix, + account: account, + proxyURL: proxyURL, + accessToken: accessToken, + action: action, + body: geminiBody, + quotaScope: quotaScope, + httpUpstream: s.httpUpstream, + accountRepo: s.accountRepo, + handleError: s.handleUpstreamError, + }) + if err != nil { + return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries") } + resp := result.resp defer func() { _ = resp.Body.Close() }() - // 请求成功,标记 URL 供后续优先使用 - if resp.StatusCode < 400 && usedBaseURL != "" { - antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL) - } - if resp.StatusCode >= 400 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) @@ -1106,109 +1155,30 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回 upstreamAction := "streamGenerateContent" - // URL fallback 循环 - availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs() - if len(availableURLs) == 0 { - availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有 - } - - // 重试循环 - var resp *http.Response - var usedBaseURL string // 追踪成功使用的 URL -urlFallbackLoop: - for urlIdx, baseURL := range availableURLs { - usedBaseURL = baseURL - for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { - // 检查 context 是否已取消(客户端断开连接) - select { - case <-ctx.Done(): - log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err()) - return nil, ctx.Err() - default: - } - - upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, upstreamAction, accessToken, wrappedBody) - if err != nil { - return nil, err - } - - resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) - if err != nil { - // 检查是否应触发 URL 降级 - if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { - antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) - log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1]) - continue urlFallbackLoop - } - if attempt < antigravityMaxRetries { - log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - log.Printf("%s status=context_canceled_during_backoff", prefix) - return nil, ctx.Err() - } - continue - } - log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err) - return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries") - } - - // 429 限流:优先切换 URL,所有 URL 都 429 时才返回 - if resp.StatusCode == http.StatusTooManyRequests { - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - _ = resp.Body.Close() - - // 还有其他 URL,切换重试 - if urlIdx < len(availableURLs)-1 { - antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) - log.Printf("%s URL fallback (429): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1]) - continue urlFallbackLoop - } - - // 所有 URL 都 429,限流账户并返回 - s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) - log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200)) - resp = &http.Response{ - StatusCode: resp.StatusCode, - Header: resp.Header.Clone(), - Body: io.NopCloser(bytes.NewReader(respBody)), - } - break urlFallbackLoop - } - - if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) { - respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - _ = resp.Body.Close() - - if attempt < antigravityMaxRetries { - log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries) - if !sleepAntigravityBackoffWithContext(ctx, attempt) { - log.Printf("%s status=context_canceled_during_backoff", prefix) - return nil, ctx.Err() - } - continue - } - resp = &http.Response{ - StatusCode: resp.StatusCode, - Header: resp.Header.Clone(), - Body: io.NopCloser(bytes.NewReader(respBody)), - } - break urlFallbackLoop - } - - break urlFallbackLoop - } + // 执行带重试的请求 + result, err := antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: ctx, + prefix: prefix, + account: account, + proxyURL: proxyURL, + accessToken: accessToken, + action: upstreamAction, + body: wrappedBody, + quotaScope: quotaScope, + httpUpstream: s.httpUpstream, + accountRepo: s.accountRepo, + handleError: s.handleUpstreamError, + }) + if err != nil { + return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries") } + resp := result.resp defer func() { if resp != nil && resp.Body != nil { _ = resp.Body.Close() } }() - // 请求成功,标记 URL 供后续优先使用 - if resp.StatusCode < 400 && usedBaseURL != "" { - antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL) - } - // 处理错误响应 if resp.StatusCode >= 400 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) @@ -1317,15 +1287,6 @@ handleSuccess: }, nil } -func (s *AntigravityGatewayService) shouldRetryUpstreamError(statusCode int) bool { - switch statusCode { - case 429, 500, 502, 503, 504, 529: - return true - default: - return false - } -} - func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) bool { switch statusCode { case 401, 403, 429, 529: