Merge branch 'main' into test-dev
This commit is contained in:
@@ -933,8 +933,16 @@ func (s *GatewayService) getOAuthToken(ctx context.Context, account *Account) (s
|
||||
|
||||
// 重试相关常量
|
||||
const (
|
||||
maxRetries = 10 // 最大重试次数
|
||||
retryDelay = 3 * time.Second // 重试等待时间
|
||||
// 最大尝试次数(包含首次请求)。过多重试会导致请求堆积与资源耗尽。
|
||||
maxRetryAttempts = 5
|
||||
|
||||
// 指数退避:第 N 次失败后的等待 = retryBaseDelay * 2^(N-1),并且上限为 retryMaxDelay。
|
||||
retryBaseDelay = 300 * time.Millisecond
|
||||
retryMaxDelay = 3 * time.Second
|
||||
|
||||
// 最大重试耗时(包含请求本身耗时 + 退避等待时间)。
|
||||
// 用于防止极端情况下 goroutine 长时间堆积导致资源耗尽。
|
||||
maxRetryElapsed = 10 * time.Second
|
||||
)
|
||||
|
||||
func (s *GatewayService) shouldRetryUpstreamError(account *Account, statusCode int) bool {
|
||||
@@ -957,6 +965,40 @@ func (s *GatewayService) shouldFailoverUpstreamError(statusCode int) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func retryBackoffDelay(attempt int) time.Duration {
|
||||
// attempt 从 1 开始,表示第 attempt 次请求刚失败,需要等待后进行第 attempt+1 次请求。
|
||||
if attempt <= 0 {
|
||||
return retryBaseDelay
|
||||
}
|
||||
delay := retryBaseDelay * time.Duration(1<<(attempt-1))
|
||||
if delay > retryMaxDelay {
|
||||
return retryMaxDelay
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
||||
func sleepWithContext(ctx context.Context, d time.Duration) error {
|
||||
if d <= 0 {
|
||||
return nil
|
||||
}
|
||||
timer := time.NewTimer(d)
|
||||
defer func() {
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// isClaudeCodeClient 判断请求是否来自 Claude Code 客户端
|
||||
// 简化判断:User-Agent 匹配 + metadata.user_id 存在
|
||||
func isClaudeCodeClient(userAgent string, metadataUserID string) bool {
|
||||
@@ -1073,7 +1115,8 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||
retryStart := time.Now()
|
||||
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
|
||||
// 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取)
|
||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType, reqModel)
|
||||
if err != nil {
|
||||
@@ -1083,6 +1126,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
// 发送请求
|
||||
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
return nil, fmt.Errorf("upstream request failed: %w", err)
|
||||
}
|
||||
|
||||
@@ -1093,28 +1139,80 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
_ = resp.Body.Close()
|
||||
|
||||
if s.isThinkingBlockSignatureError(respBody) {
|
||||
looksLikeToolSignatureError := func(msg string) bool {
|
||||
m := strings.ToLower(msg)
|
||||
return strings.Contains(m, "tool_use") ||
|
||||
strings.Contains(m, "tool_result") ||
|
||||
strings.Contains(m, "functioncall") ||
|
||||
strings.Contains(m, "function_call") ||
|
||||
strings.Contains(m, "functionresponse") ||
|
||||
strings.Contains(m, "function_response")
|
||||
}
|
||||
|
||||
// 避免在重试预算已耗尽时再发起额外请求
|
||||
if time.Since(retryStart) >= maxRetryElapsed {
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
break
|
||||
}
|
||||
log.Printf("Account %d: detected thinking block signature error, retrying with filtered thinking blocks", account.ID)
|
||||
|
||||
// 过滤thinking blocks并重试(使用更激进的过滤)
|
||||
// Conservative two-stage fallback:
|
||||
// 1) Disable thinking + thinking->text (preserve content)
|
||||
// 2) Only if upstream still errors AND error message points to tool/function signature issues:
|
||||
// also downgrade tool_use/tool_result blocks to text.
|
||||
|
||||
filteredBody := FilterThinkingBlocksForRetry(body)
|
||||
retryReq, buildErr := s.buildUpstreamRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
|
||||
if buildErr == nil {
|
||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
if retryErr == nil {
|
||||
// 使用重试后的响应,继续后续处理
|
||||
if retryResp.StatusCode < 400 {
|
||||
log.Printf("Account %d: signature error retry succeeded", account.ID)
|
||||
} else {
|
||||
log.Printf("Account %d: signature error retry returned status %d", account.ID, retryResp.StatusCode)
|
||||
log.Printf("Account %d: signature error retry succeeded (thinking downgraded)", account.ID)
|
||||
resp = retryResp
|
||||
break
|
||||
}
|
||||
|
||||
retryRespBody, retryReadErr := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
_ = retryResp.Body.Close()
|
||||
if retryReadErr == nil && retryResp.StatusCode == 400 && s.isThinkingBlockSignatureError(retryRespBody) {
|
||||
msg2 := extractUpstreamErrorMessage(retryRespBody)
|
||||
if looksLikeToolSignatureError(msg2) && time.Since(retryStart) < maxRetryElapsed {
|
||||
log.Printf("Account %d: signature retry still failing and looks tool-related, retrying with tool blocks downgraded", account.ID)
|
||||
filteredBody2 := FilterSignatureSensitiveBlocksForRetry(body)
|
||||
retryReq2, buildErr2 := s.buildUpstreamRequest(ctx, c, account, filteredBody2, token, tokenType, reqModel)
|
||||
if buildErr2 == nil {
|
||||
retryResp2, retryErr2 := s.httpUpstream.Do(retryReq2, proxyURL, account.ID, account.Concurrency)
|
||||
if retryErr2 == nil {
|
||||
resp = retryResp2
|
||||
break
|
||||
}
|
||||
if retryResp2 != nil && retryResp2.Body != nil {
|
||||
_ = retryResp2.Body.Close()
|
||||
}
|
||||
log.Printf("Account %d: tool-downgrade signature retry failed: %v", account.ID, retryErr2)
|
||||
} else {
|
||||
log.Printf("Account %d: tool-downgrade signature retry build failed: %v", account.ID, buildErr2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to the original retry response context.
|
||||
resp = &http.Response{
|
||||
StatusCode: retryResp.StatusCode,
|
||||
Header: retryResp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(retryRespBody)),
|
||||
}
|
||||
resp = retryResp
|
||||
break
|
||||
}
|
||||
if retryResp != nil && retryResp.Body != nil {
|
||||
_ = retryResp.Body.Close()
|
||||
}
|
||||
log.Printf("Account %d: signature error retry failed: %v", account.ID, retryErr)
|
||||
} else {
|
||||
log.Printf("Account %d: signature error retry build request failed: %v", account.ID, buildErr)
|
||||
}
|
||||
// 重试失败,恢复原始响应体继续处理
|
||||
|
||||
// Retry failed: restore original response body and continue handling.
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
break
|
||||
}
|
||||
@@ -1125,11 +1223,27 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
|
||||
// 检查是否需要通用重试(排除400,因为400已经在上面特殊处理过了)
|
||||
if resp.StatusCode >= 400 && resp.StatusCode != 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) {
|
||||
if attempt < maxRetries {
|
||||
log.Printf("Account %d: upstream error %d, retry %d/%d after %v",
|
||||
account.ID, resp.StatusCode, attempt, maxRetries, retryDelay)
|
||||
if attempt < maxRetryAttempts {
|
||||
elapsed := time.Since(retryStart)
|
||||
if elapsed >= maxRetryElapsed {
|
||||
break
|
||||
}
|
||||
|
||||
delay := retryBackoffDelay(attempt)
|
||||
remaining := maxRetryElapsed - elapsed
|
||||
if delay > remaining {
|
||||
delay = remaining
|
||||
}
|
||||
if delay <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
log.Printf("Account %d: upstream error %d, retry %d/%d after %v (elapsed=%v/%v)",
|
||||
account.ID, resp.StatusCode, attempt, maxRetryAttempts, delay, elapsed, maxRetryElapsed)
|
||||
_ = resp.Body.Close()
|
||||
time.Sleep(retryDelay)
|
||||
if err := sleepWithContext(ctx, delay); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
// 最后一次尝试也失败,跳出循环处理重试耗尽
|
||||
@@ -1146,6 +1260,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
}
|
||||
break
|
||||
}
|
||||
if resp == nil || resp.Body == nil {
|
||||
return nil, errors.New("upstream request failed: empty response")
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// 处理重试耗尽的情况
|
||||
@@ -1543,10 +1660,10 @@ func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, re
|
||||
// OAuth/Setup Token 账号的 403:标记账号异常
|
||||
if account.IsOAuth() && statusCode == 403 {
|
||||
s.rateLimitService.HandleUpstreamError(ctx, account, statusCode, resp.Header, body)
|
||||
log.Printf("Account %d: marked as error after %d retries for status %d", account.ID, maxRetries, statusCode)
|
||||
log.Printf("Account %d: marked as error after %d retries for status %d", account.ID, maxRetryAttempts, statusCode)
|
||||
} else {
|
||||
// API Key 未配置错误码:不标记账号状态
|
||||
log.Printf("Account %d: upstream error %d after %d retries (not marking account)", account.ID, statusCode, maxRetries)
|
||||
log.Printf("Account %d: upstream error %d after %d retries (not marking account)", account.ID, statusCode, maxRetryAttempts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2062,7 +2179,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
||||
if resp.StatusCode == 400 && s.isThinkingBlockSignatureError(respBody) {
|
||||
log.Printf("Account %d: detected thinking block signature error on count_tokens, retrying with filtered thinking blocks", account.ID)
|
||||
|
||||
filteredBody := FilterThinkingBlocks(body)
|
||||
filteredBody := FilterThinkingBlocksForRetry(body)
|
||||
retryReq, buildErr := s.buildCountTokensRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
|
||||
if buildErr == nil {
|
||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
|
||||
Reference in New Issue
Block a user