feat(ops): 添加QPS脉搏线图并优化指标布局

- 添加实时QPS/TPS历史数据追踪(最近60个数据点)
- 在平均QPS/TPS上方添加SVG脉搏线图(sparkline)
- 将延迟和TTFT卡片的指标布局从2列改为3列
- 恢复Max指标显示(P95/P90/P50/Avg/Max)
This commit is contained in:
IanShaw027
2026-01-11 11:49:34 +08:00
parent 645609d441
commit 89a725a433
6 changed files with 255 additions and 21 deletions

View File

@@ -392,6 +392,39 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
CreatedAt: time.Now(),
}
// Capture upstream error context set by gateway services (if present).
// This does NOT affect the client response; it enriches Ops troubleshooting data.
{
if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok {
switch t := v.(type) {
case int:
if t > 0 {
code := t
entry.UpstreamStatusCode = &code
}
case int64:
if t > 0 {
code := int(t)
entry.UpstreamStatusCode = &code
}
}
}
if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok {
if s, ok := v.(string); ok {
if msg := strings.TrimSpace(s); msg != "" {
entry.UpstreamErrorMessage = &msg
}
}
}
if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok {
if s, ok := v.(string); ok {
if detail := strings.TrimSpace(s); detail != "" {
entry.UpstreamErrorDetail = &detail
}
}
}
}
if apiKey != nil {
entry.APIKeyID = &apiKey.ID
if apiKey.User != nil {

View File

@@ -1399,7 +1399,17 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
return nil, fmt.Errorf("upstream request failed: %w", err)
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream request failed",
},
})
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
// 优先检测thinking block签名错误400并重试一次
@@ -1859,7 +1869,21 @@ func extractUpstreamErrorMessage(body []byte) string {
}
func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
// Enrich Ops error logs with upstream status + message, and optionally a truncated body snippet.
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
// 处理上游错误,标记账号状态
shouldDisable := false
@@ -1870,24 +1894,33 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
}
// 记录上游错误响应体摘要便于排障(可选:由配置控制;不回显到客户端)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Upstream error %d (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// 根据状态码返回适当的自定义错误响应(不透传上游详细信息)
var errType, errMsg string
var statusCode int
switch resp.StatusCode {
case 400:
// 仅记录上游错误摘要(避免输出请求内容);需要时可通过配置打开
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Upstream 400 error (account=%d platform=%s type=%s): %s",
account.ID,
account.Platform,
account.Type,
truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
c.Data(http.StatusBadRequest, "application/json", body)
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
summary := upstreamMsg
if summary == "" {
summary = truncateForLog(body, 512)
}
if summary == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, summary)
case 401:
statusCode = http.StatusBadGateway
errType = "upstream_error"
@@ -1923,7 +1956,10 @@ func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Res
},
})
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
func (s *GatewayService) handleRetryExhaustedSideEffects(ctx context.Context, resp *http.Response, account *Account) {
@@ -1949,8 +1985,36 @@ func (s *GatewayService) handleFailoverSideEffects(ctx context.Context, resp *ht
// OAuth 403标记账号异常
// API Key 未配置错误码:仅返回错误,不标记账号
func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) {
// Capture upstream error body before side-effects consume the stream.
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
resp.Body = io.NopCloser(bytes.NewReader(respBody))
s.handleRetryExhaustedSideEffects(ctx, resp, account)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"Upstream error %d retries_exhausted (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(respBody, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// 返回统一的重试耗尽错误响应
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
@@ -1960,7 +2024,10 @@ func (s *GatewayService) handleRetryExhaustedError(ctx context.Context, resp *ht
},
})
return nil, fmt.Errorf("upstream error: %d (retries exhausted)", resp.StatusCode)
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d (retries exhausted)", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d (retries exhausted) message=%s", resp.StatusCode, upstreamMsg)
}
// streamingResult 流式响应结果

View File

@@ -587,7 +587,16 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco
// Send request
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
return nil, fmt.Errorf("upstream request failed: %w", err)
// Ensure the client receives an error response (handlers assume Forward writes on non-failover errors).
safeErr := sanitizeUpstreamErrorMessage(err.Error())
setOpsUpstreamError(c, 0, safeErr, "")
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream request failed",
},
})
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
}
defer func() { _ = resp.Body.Close() }()
@@ -707,7 +716,30 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
}
func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*OpenAIForwardResult, error) {
body, _ := io.ReadAll(resp.Body)
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(body), maxBytes)
}
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
log.Printf(
"OpenAI upstream error %d (account=%d platform=%s type=%s): %s",
resp.StatusCode,
account.ID,
account.Platform,
account.Type,
truncateForLog(body, s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes),
)
}
// Check custom error codes
if !account.ShouldHandleErrorCode(resp.StatusCode) {
@@ -717,7 +749,10 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht
"message": "Upstream gateway error",
},
})
return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode)
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d (not in custom error codes)", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d (not in custom error codes) message=%s", resp.StatusCode, upstreamMsg)
}
// Handle upstream error (mark account status)
@@ -763,7 +798,10 @@ func (s *OpenAIGatewayService) handleErrorResponse(ctx context.Context, resp *ht
},
})
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
if upstreamMsg == "" {
return nil, fmt.Errorf("upstream error: %d", resp.StatusCode)
}
return nil, fmt.Errorf("upstream error: %d message=%s", resp.StatusCode, upstreamMsg)
}
// openaiStreamingResult streaming response result

View File

@@ -135,6 +135,34 @@ func (s *OpsService) RecordError(ctx context.Context, entry *OpsInsertErrorLogIn
entry.ErrorBody = sanitized
}
// Sanitize upstream error context if provided by gateway services.
if entry.UpstreamStatusCode != nil && *entry.UpstreamStatusCode <= 0 {
entry.UpstreamStatusCode = nil
}
if entry.UpstreamErrorMessage != nil {
msg := strings.TrimSpace(*entry.UpstreamErrorMessage)
msg = sanitizeUpstreamErrorMessage(msg)
msg = truncateString(msg, 2048)
if strings.TrimSpace(msg) == "" {
entry.UpstreamErrorMessage = nil
} else {
entry.UpstreamErrorMessage = &msg
}
}
if entry.UpstreamErrorDetail != nil {
detail := strings.TrimSpace(*entry.UpstreamErrorDetail)
if detail == "" {
entry.UpstreamErrorDetail = nil
} else {
sanitized, _ := sanitizeErrorBodyForStorage(detail, opsMaxStoredErrorBodyBytes)
if strings.TrimSpace(sanitized) == "" {
entry.UpstreamErrorDetail = nil
} else {
entry.UpstreamErrorDetail = &sanitized
}
}
}
if _, err := s.opsRepo.InsertErrorLog(ctx, entry); err != nil {
// Never bubble up to gateway; best-effort logging.
log.Printf("[Ops] RecordError failed: %v", err)

View File

@@ -0,0 +1,31 @@
package service
import (
"strings"
"github.com/gin-gonic/gin"
)
// Gin context keys used by Ops error logger for capturing upstream error details.
// These keys are set by gateway services and consumed by handler/ops_error_logger.go.
const (
OpsUpstreamStatusCodeKey = "ops_upstream_status_code"
OpsUpstreamErrorMessageKey = "ops_upstream_error_message"
OpsUpstreamErrorDetailKey = "ops_upstream_error_detail"
)
func setOpsUpstreamError(c *gin.Context, upstreamStatusCode int, upstreamMessage, upstreamDetail string) {
if c == nil {
return
}
if upstreamStatusCode > 0 {
c.Set(OpsUpstreamStatusCodeKey, upstreamStatusCode)
}
if msg := strings.TrimSpace(upstreamMessage); msg != "" {
c.Set(OpsUpstreamErrorMessageKey, msg)
}
if detail := strings.TrimSpace(upstreamDetail); detail != "" {
c.Set(OpsUpstreamErrorDetailKey, detail)
}
}

View File

@@ -162,6 +162,25 @@ const displayRealTimeTps = computed(() => {
return typeof v === 'number' && Number.isFinite(v) ? v : 0
})
// Sparkline history (keep last 60 data points)
const qpsHistory = ref<number[]>([])
const tpsHistory = ref<number[]>([])
const MAX_HISTORY_POINTS = 60
watch([displayRealTimeQps, displayRealTimeTps], ([newQps, newTps]) => {
// Add new data points
qpsHistory.value.push(newQps)
tpsHistory.value.push(newTps)
// Keep only last N points
if (qpsHistory.value.length > MAX_HISTORY_POINTS) {
qpsHistory.value.shift()
}
if (tpsHistory.value.length > MAX_HISTORY_POINTS) {
tpsHistory.value.shift()
}
})
const qpsPeakLabel = computed(() => {
const v = overview.value?.qps?.peak
if (typeof v !== 'number') return '-'
@@ -866,6 +885,16 @@ function openJobsDetails() {
<!-- Average QPS/TPS -->
<div class="col-span-2">
<!-- Sparkline -->
<svg v-if="qpsHistory.length > 1" class="mb-2 h-8 w-full" viewBox="0 0 200 32" preserveAspectRatio="none">
<polyline
:points="qpsHistory.map((v, i) => `${(i / (qpsHistory.length - 1)) * 200},${32 - (v / Math.max(...qpsHistory, 1)) * 28}`).join(' ')"
fill="none"
stroke="currentColor"
stroke-width="2"
class="text-blue-500"
/>
</svg>
<div class="text-[10px] font-bold uppercase text-gray-400">{{ t('admin.ops.average') }}</div>
<div class="mt-1 flex items-center gap-4 text-sm font-medium text-gray-600 dark:text-gray-400">
<span>QPS: <span class="font-bold">{{ qpsAvgLabel }}</span></span>
@@ -974,7 +1003,7 @@ function openJobsDetails() {
</div>
<span class="text-xs font-bold text-gray-400">ms (P99)</span>
</div>
<div class="mt-3 grid grid-cols-2 gap-x-4 gap-y-1 text-xs">
<div class="mt-3 grid grid-cols-3 gap-x-3 gap-y-1 text-xs">
<div class="flex justify-between">
<span class="text-gray-500">P95:</span>
<span class="font-bold" :class="getLatencyColor(durationP95Ms)">{{ durationP95Ms ?? '-' }}ms</span>
@@ -991,6 +1020,10 @@ function openJobsDetails() {
<span class="text-gray-500">Avg:</span>
<span class="font-bold" :class="getLatencyColor(durationAvgMs)">{{ durationAvgMs ?? '-' }}ms</span>
</div>
<div class="flex justify-between">
<span class="text-gray-500">Max:</span>
<span class="font-bold" :class="getLatencyColor(durationMaxMs)">{{ durationMaxMs ?? '-' }}ms</span>
</div>
</div>
</div>
@@ -1015,7 +1048,7 @@ function openJobsDetails() {
</div>
<span class="text-xs font-bold text-gray-400">ms (P99)</span>
</div>
<div class="mt-3 grid grid-cols-2 gap-x-4 gap-y-1 text-xs">
<div class="mt-3 grid grid-cols-3 gap-x-3 gap-y-1 text-xs">
<div class="flex justify-between">
<span class="text-gray-500">P95:</span>
<span class="font-bold" :class="getLatencyColor(ttftP95Ms)">{{ ttftP95Ms ?? '-' }}ms</span>
@@ -1032,6 +1065,10 @@ function openJobsDetails() {
<span class="text-gray-500">Avg:</span>
<span class="font-bold" :class="getLatencyColor(ttftAvgMs)">{{ ttftAvgMs ?? '-' }}ms</span>
</div>
<div class="flex justify-between">
<span class="text-gray-500">Max:</span>
<span class="font-bold" :class="getLatencyColor(ttftMaxMs)">{{ ttftMaxMs ?? '-' }}ms</span>
</div>
</div>
</div>