diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 118c42fa..5674386b 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -128,7 +128,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 2. 【新增】Wait后二次检查余额/订阅 if err := h.billingCacheService.CheckBillingEligibility(c.Request.Context(), apiKey.User, apiKey, apiKey.Group, subscription); err != nil { log.Printf("Billing eligibility check failed after wait: %v", err) - h.handleStreamingAwareError(c, http.StatusForbidden, "billing_error", err.Error(), streamStarted) + h.handleStreamingAwareError(c, http.StatusForbidden, "permission_error", "Insufficient balance or active subscription required", streamStarted) return } @@ -156,8 +156,9 @@ func (h *GatewayHandler) Messages(c *gin.Context) { for { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, failedAccountIDs) if err != nil { + log.Printf("Select account failed: %v", err) if len(failedAccountIDs) == 0 { - h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) + h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts for requested model", streamStarted) return } h.handleFailoverExhausted(c, lastFailoverStatus, streamStarted) @@ -280,8 +281,9 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 选择支持该模型的账号 selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, failedAccountIDs) if err != nil { + log.Printf("Select account failed: %v", err) if len(failedAccountIDs) == 0 { - h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) + h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts for requested model", streamStarted) return } h.handleFailoverExhausted(c, lastFailoverStatus, streamStarted) @@ -566,32 +568,68 @@ func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, statusCode int, func (h *GatewayHandler) mapUpstreamError(statusCode int) (int, string, string) { switch statusCode { case 401: - return http.StatusBadGateway, "upstream_error", "Upstream authentication failed, please contact administrator" + return http.StatusBadGateway, "api_error", "Upstream authentication failed, please contact administrator" case 403: - return http.StatusBadGateway, "upstream_error", "Upstream access forbidden, please contact administrator" + return http.StatusBadGateway, "api_error", "Upstream access forbidden, please contact administrator" case 429: return http.StatusTooManyRequests, "rate_limit_error", "Upstream rate limit exceeded, please retry later" case 529: return http.StatusServiceUnavailable, "overloaded_error", "Upstream service overloaded, please retry later" case 500, 502, 503, 504: - return http.StatusBadGateway, "upstream_error", "Upstream service temporarily unavailable" + return http.StatusBadGateway, "api_error", "Upstream service temporarily unavailable" default: - return http.StatusBadGateway, "upstream_error", "Upstream request failed" + return http.StatusBadGateway, "api_error", "Upstream request failed" } } +func normalizeAnthropicErrorType(errType string) string { + switch errType { + case "invalid_request_error", + "authentication_error", + "permission_error", + "not_found_error", + "rate_limit_error", + "api_error", + "overloaded_error": + return errType + case "billing_error": + // Not an Anthropic-standard error type; map to the closest equivalent. + return "permission_error" + case "upstream_error": + // Not an Anthropic-standard error type; keep clients compatible. + return "api_error" + default: + return "api_error" + } +} + +const maxPublicErrorMessageLen = 512 + +func sanitizePublicErrorMessage(message string) string { + cleaned := strings.TrimSpace(message) + cleaned = strings.ReplaceAll(cleaned, "\r", " ") + cleaned = strings.ReplaceAll(cleaned, "\n", " ") + if len(cleaned) > maxPublicErrorMessageLen { + cleaned = cleaned[:maxPublicErrorMessageLen] + "..." + } + return cleaned +} + // handleStreamingAwareError handles errors that may occur after streaming has started func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) { + normalizedType := normalizeAnthropicErrorType(errType) + publicMessage := sanitizePublicErrorMessage(message) + if streamStarted { // Stream already started, send error as SSE event then close flusher, ok := c.Writer.(http.Flusher) if ok { - // Send error event in SSE format with proper JSON marshaling + // Anthropic streaming spec: send `event: error` with JSON `data`. errorData := map[string]any{ "type": "error", "error": map[string]string{ - "type": errType, - "message": message, + "type": normalizedType, + "message": publicMessage, }, } jsonBytes, err := json.Marshal(errorData) @@ -599,8 +637,11 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e _ = c.Error(err) return } - errorEvent := fmt.Sprintf("data: %s\n\n", string(jsonBytes)) - if _, err := fmt.Fprint(c.Writer, errorEvent); err != nil { + if _, err := fmt.Fprintf(c.Writer, "event: error\n"); err != nil { + _ = c.Error(err) + return + } + if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", string(jsonBytes)); err != nil { _ = c.Error(err) } flusher.Flush() @@ -609,16 +650,19 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e } // Normal case: return JSON response with proper status code - h.errorResponse(c, status, errType, message) + h.errorResponse(c, status, normalizedType, publicMessage) } // errorResponse 返回Claude API格式的错误响应 func (h *GatewayHandler) errorResponse(c *gin.Context, status int, errType, message string) { + normalizedType := normalizeAnthropicErrorType(errType) + publicMessage := sanitizePublicErrorMessage(message) + c.JSON(status, gin.H{ "type": "error", "error": gin.H{ - "type": errType, - "message": message, + "type": normalizedType, + "message": publicMessage, }, }) } @@ -674,7 +718,8 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) { // 校验 billing eligibility(订阅/余额) // 【注意】不计算并发,但需要校验订阅/余额 if err := h.billingCacheService.CheckBillingEligibility(c.Request.Context(), apiKey.User, apiKey, apiKey.Group, subscription); err != nil { - h.errorResponse(c, http.StatusForbidden, "billing_error", err.Error()) + log.Printf("Billing eligibility check failed: %v", err) + h.errorResponse(c, http.StatusForbidden, "permission_error", "Insufficient balance or active subscription required") return } @@ -684,7 +729,8 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) { // 选择支持该模型的账号 account, err := h.gatewayService.SelectAccountForModel(c.Request.Context(), apiKey.GroupID, sessionHash, parsedReq.Model) if err != nil { - h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error()) + log.Printf("Select account failed: %v", err) + h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts for requested model") return } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 97e4c2e8..cbd4abd7 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -929,8 +929,16 @@ func (s *GatewayService) getOAuthToken(ctx context.Context, account *Account) (s // 重试相关常量 const ( - maxRetries = 10 // 最大重试次数 - retryDelay = 3 * time.Second // 重试等待时间 + // 最大尝试次数(包含首次请求)。过多重试会导致请求堆积与资源耗尽。 + maxRetryAttempts = 5 + + // 指数退避:第 N 次失败后的等待 = retryBaseDelay * 2^(N-1),并且上限为 retryMaxDelay。 + retryBaseDelay = 300 * time.Millisecond + retryMaxDelay = 3 * time.Second + + // 最大重试耗时(包含请求本身耗时 + 退避等待时间)。 + // 用于防止极端情况下 goroutine 长时间堆积导致资源耗尽。 + maxRetryElapsed = 10 * time.Second ) func (s *GatewayService) shouldRetryUpstreamError(account *Account, statusCode int) bool { @@ -953,6 +961,40 @@ func (s *GatewayService) shouldFailoverUpstreamError(statusCode int) bool { } } +func retryBackoffDelay(attempt int) time.Duration { + // attempt 从 1 开始,表示第 attempt 次请求刚失败,需要等待后进行第 attempt+1 次请求。 + if attempt <= 0 { + return retryBaseDelay + } + delay := retryBaseDelay * time.Duration(1<<(attempt-1)) + if delay > retryMaxDelay { + return retryMaxDelay + } + return delay +} + +func sleepWithContext(ctx context.Context, d time.Duration) error { + if d <= 0 { + return nil + } + timer := time.NewTimer(d) + defer func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + // isClaudeCodeClient 判断请求是否来自 Claude Code 客户端 // 简化判断:User-Agent 匹配 + metadata.user_id 存在 func isClaudeCodeClient(userAgent string, metadataUserID string) bool { @@ -1069,7 +1111,8 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 重试循环 var resp *http.Response - for attempt := 1; attempt <= maxRetries; attempt++ { + retryStart := time.Now() + for attempt := 1; attempt <= maxRetryAttempts; attempt++ { // 构建上游请求(每次重试需要重新构建,因为请求体需要重新读取) upstreamReq, err := s.buildUpstreamRequest(ctx, c, account, body, token, tokenType, reqModel) if err != nil { @@ -1079,6 +1122,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 发送请求 resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } return nil, fmt.Errorf("upstream request failed: %w", err) } @@ -1089,6 +1135,11 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A _ = resp.Body.Close() if s.isThinkingBlockSignatureError(respBody) { + // 避免在重试预算已耗尽时再发起额外请求 + if time.Since(retryStart) >= maxRetryElapsed { + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + break + } log.Printf("Account %d: detected thinking block signature error, retrying with filtered thinking blocks", account.ID) // 过滤thinking blocks并重试(使用更激进的过滤) @@ -1121,11 +1172,27 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 检查是否需要通用重试(排除400,因为400已经在上面特殊处理过了) if resp.StatusCode >= 400 && resp.StatusCode != 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) { - if attempt < maxRetries { - log.Printf("Account %d: upstream error %d, retry %d/%d after %v", - account.ID, resp.StatusCode, attempt, maxRetries, retryDelay) + if attempt < maxRetryAttempts { + elapsed := time.Since(retryStart) + if elapsed >= maxRetryElapsed { + break + } + + delay := retryBackoffDelay(attempt) + remaining := maxRetryElapsed - elapsed + if delay > remaining { + delay = remaining + } + if delay <= 0 { + break + } + + log.Printf("Account %d: upstream error %d, retry %d/%d after %v (elapsed=%v/%v)", + account.ID, resp.StatusCode, attempt, maxRetryAttempts, delay, elapsed, maxRetryElapsed) _ = resp.Body.Close() - time.Sleep(retryDelay) + if err := sleepWithContext(ctx, delay); err != nil { + return nil, err + } continue } // 最后一次尝试也失败,跳出循环处理重试耗尽 @@ -1142,6 +1209,9 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A } break } + if resp == nil || resp.Body == nil { + return nil, errors.New("upstream request failed: empty response") + } defer func() { _ = resp.Body.Close() }() // 处理重试耗尽的情况 diff --git a/frontend/src/components/common/GroupSelector.vue b/frontend/src/components/common/GroupSelector.vue index 5b78808b..c67d32fc 100644 --- a/frontend/src/components/common/GroupSelector.vue +++ b/frontend/src/components/common/GroupSelector.vue @@ -1,8 +1,8 @@