feat(gateway): 添加上游错误重试机制
- OAuth/Setup Token 账号遇到 403 错误时,等待 2 秒后重试,最多 3 次 - Console 账号遇到未配置的错误码时,同样进行重试 - 重试耗尽后:OAuth 403 标记账号异常,Console 未配置错误码不标记账号 - 移除 handleErrorResponse 中已被重试逻辑覆盖的死代码
This commit is contained in:
@@ -358,6 +358,25 @@ func (s *GatewayService) getOAuthToken(ctx context.Context, account *model.Accou
|
|||||||
return accessToken, "oauth", nil
|
return accessToken, "oauth", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 重试相关常量
|
||||||
|
const (
|
||||||
|
maxRetries = 3 // 最大重试次数
|
||||||
|
retryDelay = 2 * time.Second // 重试等待时间
|
||||||
|
)
|
||||||
|
|
||||||
|
// shouldRetryUpstreamError 判断是否应该重试上游错误
|
||||||
|
// OAuth/Setup Token 账号:仅 403 重试
|
||||||
|
// API Key 账号:未配置的错误码重试
|
||||||
|
func (s *GatewayService) shouldRetryUpstreamError(account *model.Account, statusCode int) bool {
|
||||||
|
// OAuth/Setup Token 账号:仅 403 重试
|
||||||
|
if account.IsOAuth() {
|
||||||
|
return statusCode == 403
|
||||||
|
}
|
||||||
|
|
||||||
|
// API Key 账号:未配置的错误码重试
|
||||||
|
return !account.ShouldHandleErrorCode(statusCode)
|
||||||
|
}
|
||||||
|
|
||||||
// Forward 转发请求到Claude API
|
// Forward 转发请求到Claude API
|
||||||
func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *model.Account, body []byte) (*ForwardResult, error) {
|
func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *model.Account, body []byte) (*ForwardResult, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
@@ -389,26 +408,51 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *m
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 构建上游请求
|
|
||||||
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 获取代理URL
|
// 获取代理URL
|
||||||
proxyURL := ""
|
proxyURL := ""
|
||||||
if account.ProxyID != nil && account.Proxy != nil {
|
if account.ProxyID != nil && account.Proxy != nil {
|
||||||
proxyURL = account.Proxy.URL()
|
proxyURL = account.Proxy.URL()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送请求
|
// 重试循环
|
||||||
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL)
|
var resp *http.Response
|
||||||
if err != nil {
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||||||
return nil, fmt.Errorf("upstream request failed: %w", err)
|
// 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取)
|
||||||
|
upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送请求
|
||||||
|
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("upstream request failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 检查是否需要重试
|
||||||
|
if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) {
|
||||||
|
if attempt < maxRetries {
|
||||||
|
log.Printf("Account %d: upstream error %d, retry %d/%d after %v",
|
||||||
|
account.ID, resp.StatusCode, attempt, maxRetries, retryDelay)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// 最后一次尝试也失败,跳出循环处理重试耗尽
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// 不需要重试(成功或不可重试的错误),跳出循环
|
||||||
|
break
|
||||||
}
|
}
|
||||||
defer func() { _ = resp.Body.Close() }()
|
defer func() { _ = resp.Body.Close() }()
|
||||||
|
|
||||||
// 处理错误响应(包括401,由后台TokenRefreshService维护token有效性)
|
// 处理重试耗尽的情况
|
||||||
|
if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) {
|
||||||
|
return s.handleRetryExhaustedError(ctx, resp, c, account)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理错误响应(不可重试的错误)
|
||||||
if resp.StatusCode >= 400 {
|
if resp.StatusCode >= 400 {
|
||||||
return s.handleErrorResponse(ctx, resp, c, account)
|
return s.handleErrorResponse(ctx, resp, c, account)
|
||||||
}
|
}
|
||||||
@@ -570,19 +614,6 @@ func (s *GatewayService) getBetaHeader(body []byte, clientBetaHeader string) str
|
|||||||
func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *model.Account) (*ForwardResult, error) {
|
func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *model.Account) (*ForwardResult, error) {
|
||||||
body, _ := io.ReadAll(resp.Body)
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
|
||||||
// apikey 类型账号:检查自定义错误码配置
|
|
||||||
// 如果启用且错误码不在列表中,返回通用 500 错误(不做任何账号状态处理)
|
|
||||||
if !account.ShouldHandleErrorCode(resp.StatusCode) {
|
|
||||||
c.JSON(http.StatusInternalServerError, gin.H{
|
|
||||||
"type": "error",
|
|
||||||
"error": gin.H{
|
|
||||||
"type": "upstream_error",
|
|
||||||
"message": "Upstream gateway error",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 处理上游错误,标记账号状态
|
// 处理上游错误,标记账号状态
|
||||||
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body)
|
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body)
|
||||||
|
|
||||||
@@ -629,6 +660,34 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res
|
|||||||
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
|
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleRetryExhaustedError 处理重试耗尽后的错误
|
||||||
|
// OAuth 403:标记账号异常
|
||||||
|
// API Key 未配置错误码:仅返回错误,不标记账号
|
||||||
|
func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *http.Response, c *gin.Context, account *model.Account) (*ForwardResult, error) {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
statusCode := resp.StatusCode
|
||||||
|
|
||||||
|
// OAuth/Setup Token 账号的 403:标记账号异常
|
||||||
|
if account.IsOAuth() && statusCode == 403 {
|
||||||
|
s.rateLimitService.HandleUpstreamError(ctx, account, statusCode, resp.Header, body)
|
||||||
|
log.Printf("Account %d: marked as error after %d retries for status %d", account.ID, maxRetries, statusCode)
|
||||||
|
} else {
|
||||||
|
// API Key 未配置错误码:不标记账号状态
|
||||||
|
log.Printf("Account %d: upstream error %d after %d retries (not marking account)", account.ID, statusCode, maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 返回统一的重试耗尽错误响应
|
||||||
|
c.JSON(http.StatusBadGateway, gin.H{
|
||||||
|
"type": "error",
|
||||||
|
"error": gin.H{
|
||||||
|
"type": "upstream_error",
|
||||||
|
"message": "Upstream request failed after retries",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("upstream error: %d (retries exhausted)", statusCode)
|
||||||
|
}
|
||||||
|
|
||||||
// streamingResult 流式响应结果
|
// streamingResult 流式响应结果
|
||||||
type streamingResult struct {
|
type streamingResult struct {
|
||||||
usage *ClaudeUsage
|
usage *ClaudeUsage
|
||||||
|
|||||||
Reference in New Issue
Block a user