From 89a725a433988d4dcf7184019a76a2b89f701687 Mon Sep 17 00:00:00 2001 From: IanShaw027 <131567472+IanShaw027@users.noreply.github.com> Date: Sun, 11 Jan 2026 11:49:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(ops):=20=E6=B7=BB=E5=8A=A0QPS=E8=84=89?= =?UTF-8?q?=E6=90=8F=E7=BA=BF=E5=9B=BE=E5=B9=B6=E4=BC=98=E5=8C=96=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=B8=83=E5=B1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加实时QPS/TPS历史数据追踪(最近60个数据点) - 在平均QPS/TPS上方添加SVG脉搏线图(sparkline) - 将延迟和TTFT卡片的指标布局从2列改为3列 - 恢复Max指标显示(P95/P90/P50/Avg/Max) --- backend/internal/handler/ops_error_logger.go | 33 +++++++ backend/internal/service/gateway_service.go | 97 ++++++++++++++++--- .../service/openai_gateway_service.go | 46 ++++++++- backend/internal/service/ops_service.go | 28 ++++++ .../internal/service/ops_upstream_context.go | 31 ++++++ .../ops/components/OpsDashboardHeader.vue | 41 +++++++- 6 files changed, 255 insertions(+), 21 deletions(-) create mode 100644 backend/internal/service/ops_upstream_context.go diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index b3a90c2f..5e692cdf 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -392,6 +392,39 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { CreatedAt: time.Now(), } + // Capture upstream error context set by gateway services (if present). + // This does NOT affect the client response; it enriches Ops troubleshooting data. + { + if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok { + switch t := v.(type) { + case int: + if t > 0 { + code := t + entry.UpstreamStatusCode = &code + } + case int64: + if t > 0 { + code := int(t) + entry.UpstreamStatusCode = &code + } + } + } + if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok { + if s, ok := v.(string); ok { + if msg := strings.TrimSpace(s); msg != "" { + entry.UpstreamErrorMessage = &msg + } + } + } + if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok { + if s, ok := v.(string); ok { + if detail := strings.TrimSpace(s); detail != "" { + entry.UpstreamErrorDetail = &detail + } + } + } + } + if apiKey != nil { entry.APIKeyID = &apiKey.ID if apiKey.User != nil { diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 31148b17..a2b74a15 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -1399,7 +1399,17 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A if resp != nil && resp.Body != nil { _ = resp.Body.Close() } - return nil, fmt.Errorf("upstream request failed: %w", err) + // Ensure the client receives an error response (handlers assume Forward writes on non-failover errors). + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + setOpsUpstreamError(c, 0, safeErr, "") + c.JSON(http.StatusBadGateway, gin.H{ + "type": "error", + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream request failed", + }, + }) + return nil, fmt.Errorf("upstream request failed: %s", safeErr) } // 优先检测thinking block签名错误(400)并重试一次 @@ -1859,7 +1869,21 @@ func extractUpstreamErrorMessage(body []byte) string { } func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + + // Enrich Ops error logs with upstream status + message, and optionally a truncated body snippet. + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(body), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) // 处理上游错误,标记账号状态 shouldDisable := false @@ -1870,24 +1894,33 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} } + // 记录上游错误响应体摘要便于排障(可选:由配置控制;不回显到客户端) + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + log.Printf( + "Upstream error %d (account=%d platform=%s type=%s): %s", + resp.StatusCode, + account.ID, + account.Platform, + account.Type, + truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes), + ) + } + // 根据状态码返回适当的自定义错误响应(不透传上游详细信息) var errType, errMsg string var statusCode int switch resp.StatusCode { case 400: - // 仅记录上游错误摘要(避免输出请求内容);需要时可通过配置打开 - if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { - log.Printf( - "Upstream 400 error (account=%d platform=%s type=%s): %s", - account.ID, - account.Platform, - account.Type, - truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes), - ) - } c.Data(http.StatusBadRequest, "application/json", body) - return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + summary := upstreamMsg + if summary == "" { + summary = truncateForLog(body, 512) + } + if summary == "" { + return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + } + return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, summary) case 401: statusCode = http.StatusBadGateway errType = "upstream_error" @@ -1923,7 +1956,10 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res }, }) - return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + if upstreamMsg == "" { + return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + } + return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg) } func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, resp *http.Response, account *Account) { @@ -1949,8 +1985,36 @@ func (s *GatewayService) handleFailoverSideEffects(ctx context.Context, resp *ht // OAuth 403:标记账号异常 // API Key 未配置错误码:仅返回错误,不标记账号 func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) { + // Capture upstream error body before side-effects consume the stream. + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + _ = resp.Body.Close() + resp.Body = io.NopCloser(bytes.NewReader(respBody)) + s.handleRetryExhaustedSideEffects(ctx, resp, account) + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + log.Printf( + "Upstream error %d retries_exhausted (account=%d platform=%s type=%s): %s", + resp.StatusCode, + account.ID, + account.Platform, + account.Type, + truncateForLog(respBody, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes), + ) + } + // 返回统一的重试耗尽错误响应 c.JSON(http.StatusBadGateway, gin.H{ "type": "error", @@ -1960,7 +2024,10 @@ func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *ht }, }) - return nil, fmt.Errorf("upstream error: %d (retries exhausted)", resp.StatusCode) + if upstreamMsg == "" { + return nil, fmt.Errorf("upstream error: %d (retries exhausted)", resp.StatusCode) + } + return nil, fmt.Errorf("upstream error: %d (retries exhausted) message=%s", resp.StatusCode, upstreamMsg) } // streamingResult 流式响应结果 diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 9d365ad6..c8d133df 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -587,7 +587,16 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco // Send request resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) if err != nil { - return nil, fmt.Errorf("upstream request failed: %w", err) + // Ensure the client receives an error response (handlers assume Forward writes on non-failover errors). + safeErr := sanitizeUpstreamErrorMessage(err.Error()) + setOpsUpstreamError(c, 0, safeErr, "") + c.JSON(http.StatusBadGateway, gin.H{ + "error": gin.H{ + "type": "upstream_error", + "message": "Upstream request failed", + }, + }) + return nil, fmt.Errorf("upstream request failed: %s", safeErr) } defer func() { _ = resp.Body.Close() }() @@ -707,7 +716,30 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. } func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*OpenAIForwardResult, error) { - body, _ := io.ReadAll(resp.Body) + body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) + + upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body)) + upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(body), maxBytes) + } + setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + log.Printf( + "OpenAI upstream error %d (account=%d platform=%s type=%s): %s", + resp.StatusCode, + account.ID, + account.Platform, + account.Type, + truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes), + ) + } // Check custom error codes if !account.ShouldHandleErrorCode(resp.StatusCode) { @@ -717,7 +749,10 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht "message": "Upstream gateway error", }, }) - return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode) + if upstreamMsg == "" { + return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode) + } + return nil, fmt.Errorf("upstream error: %d (not in custom error codes) message=%s", resp.StatusCode, upstreamMsg) } // Handle upstream error (mark account status) @@ -763,7 +798,10 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht }, }) - return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + if upstreamMsg == "" { + return nil, fmt.Errorf("upstream error: %d", resp.StatusCode) + } + return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg) } // openaiStreamingResult streaming response result diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 169c523a..c9cccdc7 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -135,6 +135,34 @@ func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogIn entry.ErrorBody = sanitized } + // Sanitize upstream error context if provided by gateway services. + if entry.UpstreamStatusCode != nil && *entry.UpstreamStatusCode <= 0 { + entry.UpstreamStatusCode = nil + } + if entry.UpstreamErrorMessage != nil { + msg := strings.TrimSpace(*entry.UpstreamErrorMessage) + msg = sanitizeUpstreamErrorMessage(msg) + msg = truncateString(msg, 2048) + if strings.TrimSpace(msg) == "" { + entry.UpstreamErrorMessage = nil + } else { + entry.UpstreamErrorMessage = &msg + } + } + if entry.UpstreamErrorDetail != nil { + detail := strings.TrimSpace(*entry.UpstreamErrorDetail) + if detail == "" { + entry.UpstreamErrorDetail = nil + } else { + sanitized, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes) + if strings.TrimSpace(sanitized) == "" { + entry.UpstreamErrorDetail = nil + } else { + entry.UpstreamErrorDetail = &sanitized + } + } + } + if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil { // Never bubble up to gateway; best-effort logging. log.Printf("[Ops] RecordError failed: %v", err) diff --git a/backend/internal/service/ops_upstream_context.go b/backend/internal/service/ops_upstream_context.go new file mode 100644 index 00000000..70e8f6af --- /dev/null +++ b/backend/internal/service/ops_upstream_context.go @@ -0,0 +1,31 @@ +package service + +import ( + "strings" + + "github.com/gin-gonic/gin" +) + +// Gin context keys used by Ops error logger for capturing upstream error details. +// These keys are set by gateway services and consumed by handler/ops_error_logger.go. +const ( + OpsUpstreamStatusCodeKey = "ops_upstream_status_code" + OpsUpstreamErrorMessageKey = "ops_upstream_error_message" + OpsUpstreamErrorDetailKey = "ops_upstream_error_detail" +) + +func setOpsUpstreamError(c *gin.Context, upstreamStatusCode int, upstreamMessage, upstreamDetail string) { + if c == nil { + return + } + if upstreamStatusCode > 0 { + c.Set(OpsUpstreamStatusCodeKey, upstreamStatusCode) + } + if msg := strings.TrimSpace(upstreamMessage); msg != "" { + c.Set(OpsUpstreamErrorMessageKey, msg) + } + if detail := strings.TrimSpace(upstreamDetail); detail != "" { + c.Set(OpsUpstreamErrorDetailKey, detail) + } +} + diff --git a/frontend/src/views/admin/ops/components/OpsDashboardHeader.vue b/frontend/src/views/admin/ops/components/OpsDashboardHeader.vue index 20e6dcd3..35eeb59c 100644 --- a/frontend/src/views/admin/ops/components/OpsDashboardHeader.vue +++ b/frontend/src/views/admin/ops/components/OpsDashboardHeader.vue @@ -162,6 +162,25 @@ const displayRealTimeTps = computed(() => { return typeof v === 'number' && Number.isFinite(v) ? v : 0 }) +// Sparkline history (keep last 60 data points) +const qpsHistory = ref([]) +const tpsHistory = ref([]) +const MAX_HISTORY_POINTS = 60 + +watch([displayRealTimeQps, displayRealTimeTps], ([newQps, newTps]) => { + // Add new data points + qpsHistory.value.push(newQps) + tpsHistory.value.push(newTps) + + // Keep only last N points + if (qpsHistory.value.length > MAX_HISTORY_POINTS) { + qpsHistory.value.shift() + } + if (tpsHistory.value.length > MAX_HISTORY_POINTS) { + tpsHistory.value.shift() + } +}) + const qpsPeakLabel = computed(() => { const v = overview.value?.qps?.peak if (typeof v !== 'number') return '-' @@ -866,6 +885,16 @@ function openJobsDetails() {
+ + + +
{{ t('admin.ops.average') }}
QPS: {{ qpsAvgLabel }} @@ -974,7 +1003,7 @@ function openJobsDetails() {
ms (P99)
-
+
P95: {{ durationP95Ms ?? '-' }}ms @@ -991,6 +1020,10 @@ function openJobsDetails() { Avg: {{ durationAvgMs ?? '-' }}ms
+
+ Max: + {{ durationMaxMs ?? '-' }}ms +
@@ -1015,7 +1048,7 @@ function openJobsDetails() { ms (P99) -
+
P95: {{ ttftP95Ms ?? '-' }}ms @@ -1032,6 +1065,10 @@ function openJobsDetails() { Avg: {{ ttftAvgMs ?? '-' }}ms
+
+ Max: + {{ ttftMaxMs ?? '-' }}ms +