refactor(antigravity): 提取公共重试循环函数减少重复代码
- 新增 antigravityRetryLoop 函数统一处理 Forward 和 ForwardGemini 的重试逻辑 - 429 日志增加 base_url 字段便于调试 - 删除重复的 shouldRetryUpstreamError 方法
This commit is contained in:
@@ -28,6 +28,135 @@ const (
|
|||||||
antigravityRetryMaxDelay = 16 * time.Second
|
antigravityRetryMaxDelay = 16 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// antigravityRetryLoopParams 重试循环的参数
|
||||||
|
type antigravityRetryLoopParams struct {
|
||||||
|
ctx context.Context
|
||||||
|
prefix string
|
||||||
|
account *Account
|
||||||
|
proxyURL string
|
||||||
|
accessToken string
|
||||||
|
action string
|
||||||
|
body []byte
|
||||||
|
quotaScope AntigravityQuotaScope
|
||||||
|
httpUpstream HTTPUpstream
|
||||||
|
accountRepo AccountRepository
|
||||||
|
handleError func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, quotaScope AntigravityQuotaScope)
|
||||||
|
}
|
||||||
|
|
||||||
|
// antigravityRetryLoopResult 重试循环的结果
|
||||||
|
type antigravityRetryLoopResult struct {
|
||||||
|
resp *http.Response
|
||||||
|
usedBaseURL string
|
||||||
|
}
|
||||||
|
|
||||||
|
// antigravityRetryLoop 执行带 URL fallback 的重试循环
|
||||||
|
func antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
|
||||||
|
availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
|
||||||
|
if len(availableURLs) == 0 {
|
||||||
|
availableURLs = antigravity.BaseURLs
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp *http.Response
|
||||||
|
var usedBaseURL string
|
||||||
|
|
||||||
|
urlFallbackLoop:
|
||||||
|
for urlIdx, baseURL := range availableURLs {
|
||||||
|
usedBaseURL = baseURL
|
||||||
|
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
||||||
|
select {
|
||||||
|
case <-p.ctx.Done():
|
||||||
|
log.Printf("%s status=context_canceled error=%v", p.prefix, p.ctx.Err())
|
||||||
|
return nil, p.ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
upstreamReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err = p.httpUpstream.Do(upstreamReq, p.proxyURL, p.account.ID, p.account.Concurrency)
|
||||||
|
if err != nil {
|
||||||
|
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
|
||||||
|
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
||||||
|
log.Printf("%s URL fallback (connection error): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1])
|
||||||
|
continue urlFallbackLoop
|
||||||
|
}
|
||||||
|
if attempt < antigravityMaxRetries {
|
||||||
|
log.Printf("%s status=request_failed retry=%d/%d error=%v", p.prefix, attempt, antigravityMaxRetries, err)
|
||||||
|
if !sleepAntigravityBackoffWithContext(p.ctx, attempt) {
|
||||||
|
log.Printf("%s status=context_canceled_during_backoff", p.prefix)
|
||||||
|
return nil, p.ctx.Err()
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Printf("%s status=request_failed retries_exhausted error=%v", p.prefix, err)
|
||||||
|
return nil, fmt.Errorf("upstream request failed after retries: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 429 限流:优先切换 URL,所有 URL 都 429 时才返回
|
||||||
|
if resp.StatusCode == http.StatusTooManyRequests {
|
||||||
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
|
if urlIdx < len(availableURLs)-1 {
|
||||||
|
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
||||||
|
log.Printf("%s URL fallback (429): %s -> %s", p.prefix, baseURL, availableURLs[urlIdx+1])
|
||||||
|
continue urlFallbackLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
p.handleError(p.ctx, p.prefix, p.account, resp.StatusCode, resp.Header, respBody, p.quotaScope)
|
||||||
|
log.Printf("%s status=429 rate_limited base_url=%s body=%s", p.prefix, baseURL, truncateForLog(respBody, 200))
|
||||||
|
resp = &http.Response{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
Header: resp.Header.Clone(),
|
||||||
|
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||||
|
}
|
||||||
|
break urlFallbackLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
// 其他可重试错误
|
||||||
|
if resp.StatusCode >= 400 && shouldRetryAntigravityError(resp.StatusCode) {
|
||||||
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
|
if attempt < antigravityMaxRetries {
|
||||||
|
log.Printf("%s status=%d retry=%d/%d body=%s", p.prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
|
||||||
|
if !sleepAntigravityBackoffWithContext(p.ctx, attempt) {
|
||||||
|
log.Printf("%s status=context_canceled_during_backoff", p.prefix)
|
||||||
|
return nil, p.ctx.Err()
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp = &http.Response{
|
||||||
|
StatusCode: resp.StatusCode,
|
||||||
|
Header: resp.Header.Clone(),
|
||||||
|
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||||
|
}
|
||||||
|
break urlFallbackLoop
|
||||||
|
}
|
||||||
|
|
||||||
|
break urlFallbackLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp != nil && resp.StatusCode < 400 && usedBaseURL != "" {
|
||||||
|
antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &antigravityRetryLoopResult{resp: resp, usedBaseURL: usedBaseURL}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldRetryAntigravityError 判断是否应该重试
|
||||||
|
func shouldRetryAntigravityError(statusCode int) bool {
|
||||||
|
switch statusCode {
|
||||||
|
case 429, 500, 502, 503, 504, 529:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// isAntigravityConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
|
// isAntigravityConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
|
||||||
func isAntigravityConnectionError(err error) bool {
|
func isAntigravityConnectionError(err error) bool {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -545,106 +674,26 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
|||||||
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
|
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
|
||||||
action := "streamGenerateContent"
|
action := "streamGenerateContent"
|
||||||
|
|
||||||
// URL fallback 循环
|
// 执行带重试的请求
|
||||||
availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
|
result, err := antigravityRetryLoop(antigravityRetryLoopParams{
|
||||||
if len(availableURLs) == 0 {
|
ctx: ctx,
|
||||||
availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有
|
prefix: prefix,
|
||||||
}
|
account: account,
|
||||||
|
proxyURL: proxyURL,
|
||||||
// 重试循环
|
accessToken: accessToken,
|
||||||
var resp *http.Response
|
action: action,
|
||||||
var usedBaseURL string // 追踪成功使用的 URL
|
body: geminiBody,
|
||||||
urlFallbackLoop:
|
quotaScope: quotaScope,
|
||||||
for urlIdx, baseURL := range availableURLs {
|
httpUpstream: s.httpUpstream,
|
||||||
usedBaseURL = baseURL
|
accountRepo: s.accountRepo,
|
||||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
handleError: s.handleUpstreamError,
|
||||||
// 检查 context 是否已取消(客户端断开连接)
|
})
|
||||||
select {
|
if err != nil {
|
||||||
case <-ctx.Done():
|
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
|
||||||
log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, action, accessToken, geminiBody)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
|
||||||
if err != nil {
|
|
||||||
// 检查是否应触发 URL 降级
|
|
||||||
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
|
|
||||||
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
|
||||||
log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
|
|
||||||
continue urlFallbackLoop
|
|
||||||
}
|
|
||||||
if attempt < antigravityMaxRetries {
|
|
||||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
|
|
||||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 429 限流:优先切换 URL,所有 URL 都 429 时才返回
|
|
||||||
if resp.StatusCode == http.StatusTooManyRequests {
|
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
|
|
||||||
// 还有其他 URL,切换重试
|
|
||||||
if urlIdx < len(availableURLs)-1 {
|
|
||||||
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
|
||||||
log.Printf("%s URL fallback (429): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
|
|
||||||
continue urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
// 所有 URL 都 429,限流账户并返回
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
|
||||||
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
|
||||||
resp = &http.Response{
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Header: resp.Header.Clone(),
|
|
||||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
|
||||||
}
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) {
|
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
|
|
||||||
if attempt < antigravityMaxRetries {
|
|
||||||
log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// 最后一次尝试也失败
|
|
||||||
resp = &http.Response{
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Header: resp.Header.Clone(),
|
|
||||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
|
||||||
}
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
resp := result.resp
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
// 请求成功,标记 URL 供后续优先使用
|
|
||||||
if resp.StatusCode < 400 && usedBaseURL != "" {
|
|
||||||
antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
|
|
||||||
@@ -1106,109 +1155,30 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
|||||||
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
|
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
|
||||||
upstreamAction := "streamGenerateContent"
|
upstreamAction := "streamGenerateContent"
|
||||||
|
|
||||||
// URL fallback 循环
|
// 执行带重试的请求
|
||||||
availableURLs := antigravity.DefaultURLAvailability.GetAvailableURLs()
|
result, err := antigravityRetryLoop(antigravityRetryLoopParams{
|
||||||
if len(availableURLs) == 0 {
|
ctx: ctx,
|
||||||
availableURLs = antigravity.BaseURLs // 所有 URL 都不可用时,重试所有
|
prefix: prefix,
|
||||||
}
|
account: account,
|
||||||
|
proxyURL: proxyURL,
|
||||||
// 重试循环
|
accessToken: accessToken,
|
||||||
var resp *http.Response
|
action: upstreamAction,
|
||||||
var usedBaseURL string // 追踪成功使用的 URL
|
body: wrappedBody,
|
||||||
urlFallbackLoop:
|
quotaScope: quotaScope,
|
||||||
for urlIdx, baseURL := range availableURLs {
|
httpUpstream: s.httpUpstream,
|
||||||
usedBaseURL = baseURL
|
accountRepo: s.accountRepo,
|
||||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
handleError: s.handleUpstreamError,
|
||||||
// 检查 context 是否已取消(客户端断开连接)
|
})
|
||||||
select {
|
if err != nil {
|
||||||
case <-ctx.Done():
|
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
||||||
log.Printf("%s status=context_canceled error=%v", prefix, ctx.Err())
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
upstreamReq, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, upstreamAction, accessToken, wrappedBody)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
|
||||||
if err != nil {
|
|
||||||
// 检查是否应触发 URL 降级
|
|
||||||
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
|
|
||||||
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
|
||||||
log.Printf("%s URL fallback (connection error): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
|
|
||||||
continue urlFallbackLoop
|
|
||||||
}
|
|
||||||
if attempt < antigravityMaxRetries {
|
|
||||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
|
|
||||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
|
||||||
}
|
|
||||||
|
|
||||||
// 429 限流:优先切换 URL,所有 URL 都 429 时才返回
|
|
||||||
if resp.StatusCode == http.StatusTooManyRequests {
|
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
|
|
||||||
// 还有其他 URL,切换重试
|
|
||||||
if urlIdx < len(availableURLs)-1 {
|
|
||||||
antigravity.DefaultURLAvailability.MarkUnavailable(baseURL)
|
|
||||||
log.Printf("%s URL fallback (429): %s -> %s", prefix, baseURL, availableURLs[urlIdx+1])
|
|
||||||
continue urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
// 所有 URL 都 429,限流账户并返回
|
|
||||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope)
|
|
||||||
log.Printf("%s status=429 rate_limited body=%s", prefix, truncateForLog(respBody, 200))
|
|
||||||
resp = &http.Response{
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Header: resp.Header.Clone(),
|
|
||||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
|
||||||
}
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(resp.StatusCode) {
|
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
|
|
||||||
if attempt < antigravityMaxRetries {
|
|
||||||
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
|
|
||||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
|
||||||
log.Printf("%s status=context_canceled_during_backoff", prefix)
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
resp = &http.Response{
|
|
||||||
StatusCode: resp.StatusCode,
|
|
||||||
Header: resp.Header.Clone(),
|
|
||||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
|
||||||
}
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
|
|
||||||
break urlFallbackLoop
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
resp := result.resp
|
||||||
defer func() {
|
defer func() {
|
||||||
if resp != nil && resp.Body != nil {
|
if resp != nil && resp.Body != nil {
|
||||||
_ = resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 请求成功,标记 URL 供后续优先使用
|
|
||||||
if resp.StatusCode < 400 && usedBaseURL != "" {
|
|
||||||
antigravity.DefaultURLAvailability.MarkSuccess(usedBaseURL)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理错误响应
|
// 处理错误响应
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||||
@@ -1317,15 +1287,6 @@ handleSuccess:
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *AntigravityGatewayService) shouldRetryUpstreamError(statusCode int) bool {
|
|
||||||
switch statusCode {
|
|
||||||
case 429, 500, 502, 503, 504, 529:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) bool {
|
func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) bool {
|
||||||
switch statusCode {
|
switch statusCode {
|
||||||
case 401, 403, 429, 529:
|
case 401, 403, 429, 529:
|
||||||
|
|||||||
Reference in New Issue
Block a user