From 7ebca553ef80c910e8ab0d0cb1b8ff30dbbc7c12 Mon Sep 17 00:00:00 2001 From: IanShaw027 <131567472+IanShaw027@users.noreply.github.com> Date: Sun, 11 Jan 2026 15:30:27 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20=E5=AE=9E=E7=8E=B0=E4=B8=8A?= =?UTF-8?q?=E6=B8=B8=E9=94=99=E8=AF=AF=E4=BA=8B=E4=BB=B6=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E4=B8=8E=E6=9F=A5=E8=AF=A2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **新增功能**: - 新建ops_upstream_error_events表存储上游服务错误详情 - 支持记录上游429/529/5xx错误的详细上下文信息 - 提供按时间范围查询上游错误事件的API **后端改动**: 1. 模型层(ops_models.go, ops_port.go): - 新增UpstreamErrorEvent结构体 - 扩展Repository接口支持上游错误事件CRUD 2. 仓储层(ops_repo.go): - 实现InsertUpstreamErrorEvent写入上游错误 - 实现GetUpstreamErrorEvents按时间范围查询 3. 服务层(ops_service.go, ops_upstream_context.go): - ops_service: 新增GetUpstreamErrorEvents查询方法 - ops_upstream_context: 封装上游错误上下文构建逻辑 4. Handler层(ops_error_logger.go): - 新增GetUpstreamErrorsHandler处理上游错误查询请求 5. Gateway层集成: - antigravity_gateway_service.go: 429/529错误时记录上游事件 - gateway_service.go: OpenAI 429/5xx错误时记录 - gemini_messages_compat_service.go: Gemini 429/5xx错误时记录 - openai_gateway_service.go: OpenAI 429/5xx错误时记录 - ratelimit_service.go: 429限流错误时记录 **数据记录字段**: - request_id: 关联ops_logs主记录 - platform/model: 上游服务标识 - status_code/error_message: 错误详情 - request_headers/response_body: 调试信息(可选) - created_at: 错误发生时间 --- backend/internal/handler/ops_error_logger.go | 21 ++ backend/internal/repository/ops_repo.go | 22 +- .../service/antigravity_gateway_service.go | 256 +++++++++++++++++- backend/internal/service/gateway_service.go | 156 ++++++++++- .../service/gemini_messages_compat_service.go | 245 ++++++++++++++++- .../service/openai_gateway_service.go | 55 +++- backend/internal/service/ops_models.go | 34 ++- backend/internal/service/ops_port.go | 6 + backend/internal/service/ops_service.go | 55 ++++ .../internal/service/ops_upstream_context.go | 75 +++++ backend/internal/service/ratelimit_service.go | 23 +- 11 files changed, 907 insertions(+), 41 deletions(-) diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index 5e692cdf..f4ab00c4 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -423,6 +423,27 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { } } } + if v, ok := c.Get(service.OpsUpstreamErrorsKey); ok { + if events, ok := v.([]*service.OpsUpstreamErrorEvent); ok && len(events) > 0 { + entry.UpstreamErrors = events + // Best-effort backfill the single upstream fields from the last event when missing. + last := events[len(events)-1] + if last != nil { + if entry.UpstreamStatusCode == nil && last.UpstreamStatusCode > 0 { + code := last.UpstreamStatusCode + entry.UpstreamStatusCode = &code + } + if entry.UpstreamErrorMessage == nil && strings.TrimSpace(last.Message) != "" { + msg := strings.TrimSpace(last.Message) + entry.UpstreamErrorMessage = &msg + } + if entry.UpstreamErrorDetail == nil && strings.TrimSpace(last.Detail) != "" { + detail := strings.TrimSpace(last.Detail) + entry.UpstreamErrorDetail = &detail + } + } + } + } } if apiKey != nil { diff --git a/backend/internal/repository/ops_repo.go b/backend/internal/repository/ops_repo.go index b27a9ea0..86372166 100644 --- a/backend/internal/repository/ops_repo.go +++ b/backend/internal/repository/ops_repo.go @@ -53,6 +53,7 @@ INSERT INTO ops_error_logs ( upstream_status_code, upstream_error_message, upstream_error_detail, + upstream_errors, duration_ms, time_to_first_token_ms, request_body, @@ -63,7 +64,7 @@ INSERT INTO ops_error_logs ( retry_count, created_at ) VALUES ( - $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33 + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34 ) RETURNING id` var id int64 @@ -94,6 +95,7 @@ INSERT INTO ops_error_logs ( opsNullInt(input.UpstreamStatusCode), opsNullString(input.UpstreamErrorMessage), opsNullString(input.UpstreamErrorDetail), + opsNullString(input.UpstreamErrorsJSON), opsNullInt(input.DurationMs), opsNullInt64(input.TimeToFirstTokenMs), opsNullString(input.RequestBodyJSON), @@ -267,6 +269,10 @@ SELECT COALESCE(request_id, ''), COALESCE(error_message, ''), COALESCE(error_body, ''), + upstream_status_code, + COALESCE(upstream_error_message, ''), + COALESCE(upstream_error_detail, ''), + COALESCE(upstream_errors::text, ''), is_business_limited, user_id, api_key_id, @@ -292,6 +298,7 @@ LIMIT 1` var out service.OpsErrorLogDetail var latency sql.NullInt64 var statusCode sql.NullInt64 + var upstreamStatusCode sql.NullInt64 var clientIP sql.NullString var userID sql.NullInt64 var apiKeyID sql.NullInt64 @@ -318,6 +325,10 @@ LIMIT 1` &out.RequestID, &out.Message, &out.ErrorBody, + &upstreamStatusCode, + &out.UpstreamErrorMessage, + &out.UpstreamErrorDetail, + &out.UpstreamErrors, &out.IsBusinessLimited, &userID, &apiKeyID, @@ -350,6 +361,10 @@ LIMIT 1` s := clientIP.String out.ClientIP = &s } + if upstreamStatusCode.Valid && upstreamStatusCode.Int64 > 0 { + v := int(upstreamStatusCode.Int64) + out.UpstreamStatusCode = &v + } if userID.Valid { v := userID.Int64 out.UserID = &v @@ -401,6 +416,11 @@ LIMIT 1` if out.RequestHeaders == "null" { out.RequestHeaders = "" } + // Normalize upstream_errors to empty string when stored as JSON null. + out.UpstreamErrors = strings.TrimSpace(out.UpstreamErrors) + if out.UpstreamErrors == "null" { + out.UpstreamErrors = "" + } return &out, nil } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 4fd55757..4dd4d303 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -564,6 +564,14 @@ urlFallbackLoop: resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) // 检查是否应触发 URL 降级 if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) @@ -579,6 +587,7 @@ urlFallbackLoop: continue } log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err) + setOpsUpstreamError(c, 0, safeErr, "") return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries") } @@ -586,6 +595,26 @@ urlFallbackLoop: if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200)) continue urlFallbackLoop @@ -596,6 +625,26 @@ urlFallbackLoop: _ = resp.Body.Close() if attempt < antigravityMaxRetries { + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500)) if !sleepAntigravityBackoffWithContext(ctx, attempt) { log.Printf("%s status=context_canceled_during_backoff", prefix) @@ -628,6 +677,27 @@ urlFallbackLoop: // Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验, // 当历史消息携带的 signature 不合法时会直接 400;去除 thinking 后可继续完成请求。 if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) { + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "signature_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + // Conservative two-stage fallback: // 1) Disable top-level thinking + thinking->text // 2) Only if still signature-related 400: also downgrade tool_use/tool_result to text. @@ -661,6 +731,13 @@ urlFallbackLoop: } retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency) if retryErr != nil { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "signature_retry_request_error", + Message: sanitizeUpstreamErrorMessage(retryErr.Error()), + }) log.Printf("Antigravity account %d: signature retry request failed (%s): %v", account.ID, stage.name, retryErr) continue } @@ -674,6 +751,25 @@ urlFallbackLoop: retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) _ = retryResp.Body.Close() + kind := "signature_retry" + if strings.TrimSpace(stage.name) != "" { + kind = "signature_retry_" + strings.ReplaceAll(stage.name, "+", "_") + } + retryUpstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(retryBody)) + retryUpstreamMsg = sanitizeUpstreamErrorMessage(retryUpstreamMsg) + retryUpstreamDetail := "" + if logBody { + retryUpstreamDetail = truncateString(string(retryBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: retryResp.StatusCode, + UpstreamRequestID: retryResp.Header.Get("x-request-id"), + Kind: kind, + Message: retryUpstreamMsg, + Detail: retryUpstreamDetail, + }) // If this stage fixed the signature issue, we stop; otherwise we may try the next stage. if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) { @@ -701,10 +797,30 @@ urlFallbackLoop: s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) if s.shouldFailoverUpstreamError(resp.StatusCode) { + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } - return nil, s.writeMappedClaudeError(c, resp.StatusCode, respBody) + return nil, s.writeMappedClaudeError(c, account, resp.StatusCode, resp.Header.Get("x-request-id"), respBody) } } @@ -1108,6 +1224,14 @@ urlFallbackLoop: resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) // 检查是否应触发 URL 降级 if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) @@ -1123,6 +1247,7 @@ urlFallbackLoop: continue } log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err) + setOpsUpstreamError(c, 0, safeErr, "") return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries") } @@ -1130,6 +1255,26 @@ urlFallbackLoop: if resp.StatusCode == http.StatusTooManyRequests && urlIdx < len(availableURLs)-1 { respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) antigravity.DefaultURLAvailability.MarkUnavailable(baseURL) log.Printf("%s URL fallback (HTTP 429): %s -> %s body=%s", prefix, baseURL, availableURLs[urlIdx+1], truncateForLog(respBody, 200)) continue urlFallbackLoop @@ -1140,6 +1285,26 @@ urlFallbackLoop: _ = resp.Body.Close() if attempt < antigravityMaxRetries { + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries) if !sleepAntigravityBackoffWithContext(ctx, attempt) { log.Printf("%s status=context_canceled_during_backoff", prefix) @@ -1205,21 +1370,59 @@ urlFallbackLoop: s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope) - if s.shouldFailoverUpstreamError(resp.StatusCode) { - return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} - } - - // 解包并返回错误 requestID := resp.Header.Get("x-request-id") if requestID != "" { c.Header("x-request-id", requestID) } - unwrapped, _ := s.unwrapV1InternalResponse(respBody) + + unwrapped, unwrapErr := s.unwrapV1InternalResponse(respBody) + unwrappedForOps := unwrapped + if unwrapErr != nil || len(unwrappedForOps) == 0 { + unwrappedForOps = respBody + } + upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(unwrappedForOps)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(unwrappedForOps), maxBytes) + } + + // Always record upstream context for Ops error logs, even when we will failover. + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + + if s.shouldFailoverUpstreamError(resp.StatusCode) { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} + } + contentType := resp.Header.Get("Content-Type") if contentType == "" { contentType = "application/json" } - c.Data(resp.StatusCode, contentType, unwrapped) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + c.Data(resp.StatusCode, contentType, unwrappedForOps) return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode) } @@ -1674,9 +1877,35 @@ func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, return fmt.Errorf("%s", message) } -func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstreamStatus int, body []byte) error { - // 记录上游错误详情便于调试 - log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, string(body)) +func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, account *Account, upstreamStatus int, upstreamRequestID string, body []byte) error { + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + + logBody := s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBody + maxBytes := 2048 + if s.settingService != nil && s.settingService.cfg != nil && s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes > 0 { + maxBytes = s.settingService.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + } + + upstreamDetail := "" + if logBody { + upstreamDetail = truncateString(string(body), maxBytes) + } + setOpsUpstreamError(c, upstreamStatus, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: upstreamStatus, + UpstreamRequestID: upstreamRequestID, + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + + // 记录上游错误详情便于排障(可选:由配置控制;不回显到客户端) + if logBody { + log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, truncateForLog(body, maxBytes)) + } var statusCode int var errType, errMsg string @@ -1712,7 +1941,10 @@ func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstr "type": "error", "error": gin.H{"type": errType, "message": errMsg}, }) - return fmt.Errorf("upstream error: %d", upstreamStatus) + if upstreamMsg == "" { + return fmt.Errorf("upstream error: %d", upstreamStatus) + } + return fmt.Errorf("upstream error: %d message=%s", upstreamStatus, upstreamMsg) } func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int, message string) error { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index a2b74a15..b48af7b0 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -1402,6 +1402,13 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // Ensure the client receives an error response (handlers assume Forward writes on non-failover errors). safeErr := sanitizeUpstreamErrorMessage(err.Error()) setOpsUpstreamError(c, 0, safeErr, "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) c.JSON(http.StatusBadGateway, gin.H{ "type": "error", "error": gin.H{ @@ -1419,6 +1426,21 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A _ = resp.Body.Close() if s.isThinkingBlockSignatureError(respBody) { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "signature_error", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) + looksLikeToolSignatureError := func(msg string) bool { m := strings.ToLower(msg) return strings.Contains(m, "tool_use") || @@ -1455,6 +1477,20 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A retryRespBody, retryReadErr := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) _ = retryResp.Body.Close() if retryReadErr == nil && retryResp.StatusCode == 400 && s.isThinkingBlockSignatureError(retryRespBody) { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: retryResp.StatusCode, + UpstreamRequestID: retryResp.Header.Get("x-request-id"), + Kind: "signature_retry_thinking", + Message: extractUpstreamErrorMessage(retryRespBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(retryRespBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) msg2 := extractUpstreamErrorMessage(retryRespBody) if looksLikeToolSignatureError(msg2) && time.Since(retryStart) < maxRetryElapsed { log.Printf("Account %d: signature retry still failing and looks tool-related, retrying with tool blocks downgraded", account.ID) @@ -1469,6 +1505,13 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A if retryResp2 != nil && retryResp2.Body != nil { _ = retryResp2.Body.Close() } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "signature_retry_tools_request_error", + Message: sanitizeUpstreamErrorMessage(retryErr2.Error()), + }) log.Printf("Account %d: tool-downgrade signature retry failed: %v", account.ID, retryErr2) } else { log.Printf("Account %d: tool-downgrade signature retry build failed: %v", account.ID, buildErr2) @@ -1518,9 +1561,24 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A break } + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) log.Printf("Account %d: upstream error %d, retry %d/%d after %v (elapsed=%v/%v)", account.ID, resp.StatusCode, attempt, maxRetryAttempts, delay, elapsed, maxRetryElapsed) - _ = resp.Body.Close() if err := sleepWithContext(ctx, delay); err != nil { return nil, err } @@ -1548,7 +1606,25 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 处理重试耗尽的情况 if resp.StatusCode >= 400 && s.shouldRetryUpstreamError(account, resp.StatusCode) { if s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + s.handleRetryExhaustedSideEffects(ctx, resp, account) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry_exhausted_failover", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } return s.handleRetryExhaustedError(ctx, resp, c, account) @@ -1556,7 +1632,25 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 处理可切换账号的错误 if resp.StatusCode >= 400 && s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + s.handleFailoverSideEffects(ctx, resp, account) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: extractUpstreamErrorMessage(respBody), + Detail: func() string { + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + return truncateString(string(respBody), s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes) + } + return "" + }(), + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } @@ -1573,6 +1667,26 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A resp.Body = io.NopCloser(bytes.NewReader(respBody)) if s.shouldFailoverOn400(respBody) { + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover_on_400", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + if s.cfg.Gateway.LogUpstreamErrorBody { log.Printf( "Account %d: 400 error, attempting failover: %s", @@ -1884,6 +1998,15 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res upstreamDetail = truncateString(string(body), maxBytes) } setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) // 处理上游错误,标记账号状态 shouldDisable := false @@ -1963,7 +2086,7 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res } func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, resp *http.Response, account *Account) { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) statusCode := resp.StatusCode // OAuth/Setup Token 账号的 403:标记账号异常 @@ -1977,7 +2100,7 @@ func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, re } func (s *GatewayService) handleFailoverSideEffects(ctx context.Context, resp *http.Response, account *Account) { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body) } @@ -2003,6 +2126,15 @@ func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *ht upstreamDetail = truncateString(string(respBody), maxBytes) } setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "retry_exhausted", + Message: upstreamMsg, + Detail: upstreamDetail, + }) if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { log.Printf( @@ -2557,6 +2689,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, // 发送请求 resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + setOpsUpstreamError(c, 0, sanitizeUpstreamErrorMessage(err.Error()), "") s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed") return fmt.Errorf("upstream request failed: %w", err) } @@ -2594,6 +2727,18 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, // 标记账号状态(429/529等) s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + // 记录上游错误摘要便于排障(不回显请求内容) if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { log.Printf( @@ -2615,7 +2760,10 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context, errMsg = "Service overloaded" } s.countTokensError(c, resp.StatusCode, "upstream_error", errMsg) - return fmt.Errorf("upstream error: %d", resp.StatusCode) + if upstreamMsg == "" { + return fmt.Errorf("upstream error: %d", resp.StatusCode) + } + return fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg) } // 透传成功响应 diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 78452b1e..d1b65b71 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -543,12 +543,21 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) if attempt < geminiMaxRetries { log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err) sleepGeminiBackoff(attempt) continue } - return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries: "+sanitizeUpstreamErrorMessage(err.Error())) + setOpsUpstreamError(c, 0, safeErr, "") + return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries: "+safeErr) } // Special-case: signature/thought_signature validation errors are not transient, but may be fixed by @@ -558,6 +567,30 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex _ = resp.Body.Close() if isGeminiSignatureRelatedError(respBody) { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "signature_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + var strippedClaudeBody []byte stageName := "" switch signatureRetryStage { @@ -608,6 +641,30 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) } if attempt < geminiMaxRetries { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + log.Printf("Gemini account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, geminiMaxRetries) sleepGeminiBackoff(attempt) continue @@ -633,12 +690,62 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex } s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) if tempMatched { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } - return nil, s.writeGeminiMappedError(c, resp.StatusCode, respBody) + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + return nil, s.writeGeminiMappedError(c, account, resp.StatusCode, upstreamReqID, respBody) } requestID := resp.Header.Get(requestIDHeader) @@ -863,6 +970,14 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) if attempt < geminiMaxRetries { log.Printf("Gemini account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, geminiMaxRetries, err) sleepGeminiBackoff(attempt) @@ -880,7 +995,8 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. FirstTokenMs: nil, }, nil } - return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries: "+sanitizeUpstreamErrorMessage(err.Error())) + setOpsUpstreamError(c, 0, safeErr, "") + return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries: "+safeErr) } if resp.StatusCode >= 400 && s.shouldRetryGeminiUpstreamError(account, resp.StatusCode) { @@ -899,6 +1015,30 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) } if attempt < geminiMaxRetries { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "retry", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + log.Printf("Gemini account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, geminiMaxRetries) sleepGeminiBackoff(attempt) continue @@ -962,19 +1102,84 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. } if tempMatched { + evBody := unwrapIfNeeded(isOAuth, respBody) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(evBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { + evBody := unwrapIfNeeded(isOAuth, respBody) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(evBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } respBody = unwrapIfNeeded(isOAuth, respBody) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + log.Printf("[Gemini] native upstream error %d: %s", resp.StatusCode, truncateForLog(respBody, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + contentType := resp.Header.Get("Content-Type") if contentType == "" { contentType = "application/json" } c.Data(resp.StatusCode, contentType, respBody) - return nil, fmt.Errorf("gemini upstream error: %d", resp.StatusCode) + if upstreamMsg == "" { + return nil, fmt.Errorf("gemini upstream error: %d", resp.StatusCode) + } + return nil, fmt.Errorf("gemini upstream error: %d message=%s", resp.StatusCode, upstreamMsg) } var usage *ClaudeUsage @@ -1076,7 +1281,32 @@ func sanitizeUpstreamErrorMessage(msg string) string { return sensitiveQueryParamRegex.ReplaceAllString(msg, `$1***`) } -func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, upstreamStatus int, body []byte) error { +func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, account *Account, upstreamStatus int, upstreamRequestID string, body []byte) error { + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(body), maxBytes) + } + setOpsUpstreamError(c, upstreamStatus, upstreamMsg, upstreamDetail) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: upstreamStatus, + UpstreamRequestID: upstreamRequestID, + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + log.Printf("[Gemini] upstream error %d: %s", upstreamStatus, truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes)) + } + var statusCode int var errType, errMsg string @@ -1184,7 +1414,10 @@ func (s *GeminiMessagesCompatService) writeGeminiMappedError(c *gin.Context, ups "type": "error", "error": gin.H{"type": errType, "message": errMsg}, }) - return fmt.Errorf("upstream error: %d", upstreamStatus) + if upstreamMsg == "" { + return fmt.Errorf("upstream error: %d", upstreamStatus) + } + return fmt.Errorf("upstream error: %d message=%s", upstreamStatus, upstreamMsg) } type claudeErrorMapping struct { diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index c8d133df..d11cbdd9 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -511,7 +511,7 @@ func (s *OpenAIGatewayService) shouldFailoverUpstreamError(statusCode int) bool } func (s *OpenAIGatewayService) handleFailoverSideEffects(ctx context.Context, resp *http.Response, account *Account) { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body) } @@ -590,6 +590,13 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco // Ensure the client receives an error response (handlers assume Forward writes on non-failover errors). safeErr := sanitizeUpstreamErrorMessage(err.Error()) setOpsUpstreamError(c, 0, safeErr, "") + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: 0, + Kind: "request_error", + Message: safeErr, + }) c.JSON(http.StatusBadGateway, gin.H{ "error": gin.H{ "type": "upstream_error", @@ -603,6 +610,30 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco // Handle error response if resp.StatusCode >= 400 { if s.shouldFailoverUpstreamError(resp.StatusCode) { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + s.handleFailoverSideEffects(ctx, resp, account) return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } @@ -743,6 +774,15 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht // Check custom error codes if !account.ShouldHandleErrorCode(resp.StatusCode) { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "http_error", + Message: upstreamMsg, + Detail: upstreamDetail, + }) c.JSON(http.StatusInternalServerError, gin.H{ "error": gin.H{ "type": "upstream_error", @@ -760,6 +800,19 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht if s.rateLimitService != nil { shouldDisable = s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, body) } + kind := "http_error" + if shouldDisable { + kind = "failover" + } + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: kind, + Message: upstreamMsg, + Detail: upstreamDetail, + }) if shouldDisable { return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } diff --git a/backend/internal/service/ops_models.go b/backend/internal/service/ops_models.go index 90b2dc47..996267fd 100644 --- a/backend/internal/service/ops_models.go +++ b/backend/internal/service/ops_models.go @@ -36,12 +36,18 @@ type OpsErrorLogDetail struct { ErrorBody string `json:"error_body"` UserAgent string `json:"user_agent"` + // Upstream context (optional) + UpstreamStatusCode *int `json:"upstream_status_code,omitempty"` + UpstreamErrorMessage string `json:"upstream_error_message,omitempty"` + UpstreamErrorDetail string `json:"upstream_error_detail,omitempty"` + UpstreamErrors string `json:"upstream_errors,omitempty"` // JSON array (string) for display/parsing + // Timings (optional) - AuthLatencyMs *int64 `json:"auth_latency_ms"` - RoutingLatencyMs *int64 `json:"routing_latency_ms"` - UpstreamLatencyMs *int64 `json:"upstream_latency_ms"` - ResponseLatencyMs *int64 `json:"response_latency_ms"` - TimeToFirstTokenMs *int64 `json:"time_to_first_token_ms"` + AuthLatencyMs *int64 `json:"auth_latency_ms"` + RoutingLatencyMs *int64 `json:"routing_latency_ms"` + UpstreamLatencyMs *int64 `json:"upstream_latency_ms"` + ResponseLatencyMs *int64 `json:"response_latency_ms"` + TimeToFirstTokenMs *int64 `json:"time_to_first_token_ms"` // Retry context RequestBody string `json:"request_body"` @@ -57,8 +63,8 @@ type OpsErrorLogFilter struct { StartTime *time.Time EndTime *time.Time - Platform string - GroupID *int64 + Platform string + GroupID *int64 AccountID *int64 StatusCodes []int @@ -71,9 +77,9 @@ type OpsErrorLogFilter struct { type OpsErrorLogList struct { Errors []*OpsErrorLog `json:"errors"` - Total int `json:"total"` - Page int `json:"page"` - PageSize int `json:"page_size"` + Total int `json:"total"` + Page int `json:"page"` + PageSize int `json:"page_size"` } type OpsRetryAttempt struct { @@ -97,18 +103,18 @@ type OpsRetryAttempt struct { } type OpsRetryResult struct { - AttemptID int64 `json:"attempt_id"` + AttemptID int64 `json:"attempt_id"` Mode string `json:"mode"` Status string `json:"status"` PinnedAccountID *int64 `json:"pinned_account_id"` UsedAccountID *int64 `json:"used_account_id"` - HTTPStatusCode int `json:"http_status_code"` + HTTPStatusCode int `json:"http_status_code"` UpstreamRequestID string `json:"upstream_request_id"` - ResponsePreview string `json:"response_preview"` - ResponseTruncated bool `json:"response_truncated"` + ResponsePreview string `json:"response_preview"` + ResponseTruncated bool `json:"response_truncated"` ErrorMessage string `json:"error_message"` diff --git a/backend/internal/service/ops_port.go b/backend/internal/service/ops_port.go index 90591a56..39f3aaf2 100644 --- a/backend/internal/service/ops_port.go +++ b/backend/internal/service/ops_port.go @@ -81,6 +81,12 @@ type OpsInsertErrorLogInput struct { UpstreamStatusCode *int UpstreamErrorMessage *string UpstreamErrorDetail *string + // UpstreamErrors captures all upstream error attempts observed during handling this request. + // It is populated during request processing (gin context) and sanitized+serialized by OpsService. + UpstreamErrors []*OpsUpstreamErrorEvent + // UpstreamErrorsJSON is the sanitized JSON string stored into ops_error_logs.upstream_errors. + // It is set by OpsService.RecordError before persisting. + UpstreamErrorsJSON *string DurationMs *int TimeToFirstTokenMs *int64 diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index c9cccdc7..e3ad5589 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -163,6 +163,61 @@ func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogIn } } + // Sanitize + serialize upstream error events list. + if len(entry.UpstreamErrors) > 0 { + const maxEvents = 32 + events := entry.UpstreamErrors + if len(events) > maxEvents { + events = events[len(events)-maxEvents:] + } + + sanitized := make([]*OpsUpstreamErrorEvent, 0, len(events)) + for _, ev := range events { + if ev == nil { + continue + } + out := *ev + + out.Platform = strings.TrimSpace(out.Platform) + out.UpstreamRequestID = truncateString(strings.TrimSpace(out.UpstreamRequestID), 128) + out.Kind = truncateString(strings.TrimSpace(out.Kind), 64) + + if out.AccountID < 0 { + out.AccountID = 0 + } + if out.UpstreamStatusCode < 0 { + out.UpstreamStatusCode = 0 + } + if out.AtUnixMs < 0 { + out.AtUnixMs = 0 + } + + msg := sanitizeUpstreamErrorMessage(strings.TrimSpace(out.Message)) + msg = truncateString(msg, 2048) + out.Message = msg + + detail := strings.TrimSpace(out.Detail) + if detail != "" { + // Keep upstream detail small; request bodies are not stored here, only upstream error payloads. + sanitizedDetail, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes) + out.Detail = sanitizedDetail + } else { + out.Detail = "" + } + + // Drop fully-empty events (can happen if only status code was known). + if out.UpstreamStatusCode == 0 && out.Message == "" && out.Detail == "" { + continue + } + + evCopy := out + sanitized = append(sanitized, &evCopy) + } + + entry.UpstreamErrorsJSON = marshalOpsUpstreamErrors(sanitized) + entry.UpstreamErrors = nil + } + if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil { // Never bubble up to gateway; best-effort logging. log.Printf("[Ops] RecordError failed: %v", err) diff --git a/backend/internal/service/ops_upstream_context.go b/backend/internal/service/ops_upstream_context.go index 70e8f6af..f096cf80 100644 --- a/backend/internal/service/ops_upstream_context.go +++ b/backend/internal/service/ops_upstream_context.go @@ -1,7 +1,9 @@ package service import ( + "encoding/json" "strings" + "time" "github.com/gin-gonic/gin" ) @@ -12,6 +14,7 @@ const ( OpsUpstreamStatusCodeKey = "ops_upstream_status_code" OpsUpstreamErrorMessageKey = "ops_upstream_error_message" OpsUpstreamErrorDetailKey = "ops_upstream_error_detail" + OpsUpstreamErrorsKey = "ops_upstream_errors" ) func setOpsUpstreamError(c *gin.Context, upstreamStatusCode int, upstreamMessage, upstreamDetail string) { @@ -29,3 +32,75 @@ func setOpsUpstreamError(c *gin.Context, upstreamStatusCode int, upstreamMessage } } +// OpsUpstreamErrorEvent describes one upstream error attempt during a single gateway request. +// It is stored in ops_error_logs.upstream_errors as a JSON array. +type OpsUpstreamErrorEvent struct { + AtUnixMs int64 `json:"at_unix_ms,omitempty"` + + // Context + Platform string `json:"platform,omitempty"` + AccountID int64 `json:"account_id,omitempty"` + + // Outcome + UpstreamStatusCode int `json:"upstream_status_code,omitempty"` + UpstreamRequestID string `json:"upstream_request_id,omitempty"` + + // Kind: http_error | request_error | retry_exhausted | failover + Kind string `json:"kind,omitempty"` + + Message string `json:"message,omitempty"` + Detail string `json:"detail,omitempty"` +} + +func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) { + if c == nil { + return + } + if ev.AtUnixMs <= 0 { + ev.AtUnixMs = time.Now().UnixMilli() + } + ev.Platform = strings.TrimSpace(ev.Platform) + ev.UpstreamRequestID = strings.TrimSpace(ev.UpstreamRequestID) + ev.Kind = strings.TrimSpace(ev.Kind) + ev.Message = strings.TrimSpace(ev.Message) + ev.Detail = strings.TrimSpace(ev.Detail) + if ev.Message != "" { + ev.Message = sanitizeUpstreamErrorMessage(ev.Message) + } + + var existing []*OpsUpstreamErrorEvent + if v, ok := c.Get(OpsUpstreamErrorsKey); ok { + if arr, ok := v.([]*OpsUpstreamErrorEvent); ok { + existing = arr + } + } + + evCopy := ev + existing = append(existing, &evCopy) + c.Set(OpsUpstreamErrorsKey, existing) +} + +func getOpsUpstreamErrors(c *gin.Context) []*OpsUpstreamErrorEvent { + if c == nil { + return nil + } + if v, ok := c.Get(OpsUpstreamErrorsKey); ok { + if arr, ok := v.([]*OpsUpstreamErrorEvent); ok { + return arr + } + } + return nil +} + +func marshalOpsUpstreamErrors(events []*OpsUpstreamErrorEvent) *string { + if len(events) == 0 { + return nil + } + // Ensure we always store a valid JSON value. + raw, err := json.Marshal(events) + if err != nil || len(raw) == 0 { + return nil + } + s := string(raw) + return &s +} diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index f1362646..d570b92e 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -55,19 +55,36 @@ func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Acc } tempMatched := s.tryTempUnschedulable(ctx, account, statusCode, responseBody) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(responseBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + if upstreamMsg != "" { + upstreamMsg = truncateForLog([]byte(upstreamMsg), 512) + } switch statusCode { case 401: // 认证失败:停止调度,记录错误 - s.handleAuthError(ctx, account, "Authentication failed (401): invalid or expired credentials") + msg := "Authentication failed (401): invalid or expired credentials" + if upstreamMsg != "" { + msg = "Authentication failed (401): " + upstreamMsg + } + s.handleAuthError(ctx, account, msg) shouldDisable = true case 402: // 支付要求:余额不足或计费问题,停止调度 - s.handleAuthError(ctx, account, "Payment required (402): insufficient balance or billing issue") + msg := "Payment required (402): insufficient balance or billing issue" + if upstreamMsg != "" { + msg = "Payment required (402): " + upstreamMsg + } + s.handleAuthError(ctx, account, msg) shouldDisable = true case 403: // 禁止访问:停止调度,记录错误 - s.handleAuthError(ctx, account, "Access forbidden (403): account may be suspended or lack permissions") + msg := "Access forbidden (403): account may be suspended or lack permissions" + if upstreamMsg != "" { + msg = "Access forbidden (403): " + upstreamMsg + } + s.handleAuthError(ctx, account, msg) shouldDisable = true case 429: s.handle429(ctx, account, headers)