Merge pull request #251 from IanShaw027/fix/ops-bugs
feat(ops): 运维看板功能增强 - 实时流量监控与指标阈值配置
This commit is contained in:
@@ -118,3 +118,96 @@ func (h *OpsHandler) GetAccountAvailability(c *gin.Context) {
|
||||
}
|
||||
response.Success(c, payload)
|
||||
}
|
||||
|
||||
func parseOpsRealtimeWindow(v string) (time.Duration, string, bool) {
|
||||
switch strings.ToLower(strings.TrimSpace(v)) {
|
||||
case "", "1min", "1m":
|
||||
return 1 * time.Minute, "1min", true
|
||||
case "5min", "5m":
|
||||
return 5 * time.Minute, "5min", true
|
||||
case "30min", "30m":
|
||||
return 30 * time.Minute, "30min", true
|
||||
case "1h", "60m", "60min":
|
||||
return 1 * time.Hour, "1h", true
|
||||
default:
|
||||
return 0, "", false
|
||||
}
|
||||
}
|
||||
|
||||
// GetRealtimeTrafficSummary returns QPS/TPS current/peak/avg for the selected window.
|
||||
// GET /api/v1/admin/ops/realtime-traffic
|
||||
//
|
||||
// Query params:
|
||||
// - window: 1min|5min|30min|1h (default: 1min)
|
||||
// - platform: optional
|
||||
// - group_id: optional
|
||||
func (h *OpsHandler) GetRealtimeTrafficSummary(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
windowDur, windowLabel, ok := parseOpsRealtimeWindow(c.Query("window"))
|
||||
if !ok {
|
||||
response.BadRequest(c, "Invalid window")
|
||||
return
|
||||
}
|
||||
|
||||
platform := strings.TrimSpace(c.Query("platform"))
|
||||
var groupID *int64
|
||||
if v := strings.TrimSpace(c.Query("group_id")); v != "" {
|
||||
id, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil || id <= 0 {
|
||||
response.BadRequest(c, "Invalid group_id")
|
||||
return
|
||||
}
|
||||
groupID = &id
|
||||
}
|
||||
|
||||
endTime := time.Now().UTC()
|
||||
startTime := endTime.Add(-windowDur)
|
||||
|
||||
if !h.opsService.IsRealtimeMonitoringEnabled(c.Request.Context()) {
|
||||
disabledSummary := &service.OpsRealtimeTrafficSummary{
|
||||
Window: windowLabel,
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
Platform: platform,
|
||||
GroupID: groupID,
|
||||
QPS: service.OpsRateSummary{},
|
||||
TPS: service.OpsRateSummary{},
|
||||
}
|
||||
response.Success(c, gin.H{
|
||||
"enabled": false,
|
||||
"summary": disabledSummary,
|
||||
"timestamp": endTime,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
filter := &service.OpsDashboardFilter{
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
Platform: platform,
|
||||
GroupID: groupID,
|
||||
QueryMode: service.OpsQueryModeRaw,
|
||||
}
|
||||
|
||||
summary, err := h.opsService.GetRealtimeTrafficSummary(c.Request.Context(), filter)
|
||||
if err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
if summary != nil {
|
||||
summary.Window = windowLabel
|
||||
}
|
||||
response.Success(c, gin.H{
|
||||
"enabled": true,
|
||||
"summary": summary,
|
||||
"timestamp": endTime,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -146,3 +146,49 @@ func (h *OpsHandler) UpdateAdvancedSettings(c *gin.Context) {
|
||||
}
|
||||
response.Success(c, updated)
|
||||
}
|
||||
|
||||
// GetMetricThresholds returns Ops metric thresholds (DB-backed).
|
||||
// GET /api/v1/admin/ops/settings/metric-thresholds
|
||||
func (h *OpsHandler) GetMetricThresholds(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
cfg, err := h.opsService.GetMetricThresholds(c.Request.Context())
|
||||
if err != nil {
|
||||
response.Error(c, http.StatusInternalServerError, "Failed to get metric thresholds")
|
||||
return
|
||||
}
|
||||
response.Success(c, cfg)
|
||||
}
|
||||
|
||||
// UpdateMetricThresholds updates Ops metric thresholds (DB-backed).
|
||||
// PUT /api/v1/admin/ops/settings/metric-thresholds
|
||||
func (h *OpsHandler) UpdateMetricThresholds(c *gin.Context) {
|
||||
if h.opsService == nil {
|
||||
response.Error(c, http.StatusServiceUnavailable, "Ops service not available")
|
||||
return
|
||||
}
|
||||
if err := h.opsService.RequireMonitoringEnabled(c.Request.Context()); err != nil {
|
||||
response.ErrorFrom(c, err)
|
||||
return
|
||||
}
|
||||
|
||||
var req service.OpsMetricThresholds
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
response.BadRequest(c, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
updated, err := h.opsService.UpdateMetricThresholds(c.Request.Context(), &req)
|
||||
if err != nil {
|
||||
response.Error(c, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
response.Success(c, updated)
|
||||
}
|
||||
|
||||
129
backend/internal/repository/ops_repo_realtime_traffic.go
Normal file
129
backend/internal/repository/ops_repo_realtime_traffic.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
)
|
||||
|
||||
func (r *opsRepository) GetRealtimeTrafficSummary(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsRealtimeTrafficSummary, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if filter == nil {
|
||||
return nil, fmt.Errorf("nil filter")
|
||||
}
|
||||
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
|
||||
return nil, fmt.Errorf("start_time/end_time required")
|
||||
}
|
||||
|
||||
start := filter.StartTime.UTC()
|
||||
end := filter.EndTime.UTC()
|
||||
if start.After(end) {
|
||||
return nil, fmt.Errorf("start_time must be <= end_time")
|
||||
}
|
||||
|
||||
window := end.Sub(start)
|
||||
if window <= 0 {
|
||||
return nil, fmt.Errorf("invalid time window")
|
||||
}
|
||||
if window > time.Hour {
|
||||
return nil, fmt.Errorf("window too large")
|
||||
}
|
||||
|
||||
usageJoin, usageWhere, usageArgs, next := buildUsageWhere(filter, start, end, 1)
|
||||
errorWhere, errorArgs, _ := buildErrorWhere(filter, start, end, next)
|
||||
|
||||
q := `
|
||||
WITH usage_buckets AS (
|
||||
SELECT
|
||||
date_trunc('minute', ul.created_at) AS bucket,
|
||||
COALESCE(COUNT(*), 0) AS success_count,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_sum
|
||||
FROM usage_logs ul
|
||||
` + usageJoin + `
|
||||
` + usageWhere + `
|
||||
GROUP BY 1
|
||||
),
|
||||
error_buckets AS (
|
||||
SELECT
|
||||
date_trunc('minute', created_at) AS bucket,
|
||||
COALESCE(COUNT(*), 0) AS error_count
|
||||
FROM ops_error_logs
|
||||
` + errorWhere + `
|
||||
AND COALESCE(status_code, 0) >= 400
|
||||
GROUP BY 1
|
||||
),
|
||||
combined AS (
|
||||
SELECT
|
||||
COALESCE(u.bucket, e.bucket) AS bucket,
|
||||
COALESCE(u.success_count, 0) AS success_count,
|
||||
COALESCE(u.token_sum, 0) AS token_sum,
|
||||
COALESCE(e.error_count, 0) AS error_count,
|
||||
COALESCE(u.success_count, 0) + COALESCE(e.error_count, 0) AS request_total
|
||||
FROM usage_buckets u
|
||||
FULL OUTER JOIN error_buckets e ON u.bucket = e.bucket
|
||||
)
|
||||
SELECT
|
||||
COALESCE(SUM(success_count), 0) AS success_total,
|
||||
COALESCE(SUM(error_count), 0) AS error_total,
|
||||
COALESCE(SUM(token_sum), 0) AS token_total,
|
||||
COALESCE(MAX(request_total), 0) AS peak_requests_per_min,
|
||||
COALESCE(MAX(token_sum), 0) AS peak_tokens_per_min
|
||||
FROM combined`
|
||||
|
||||
args := append(usageArgs, errorArgs...)
|
||||
var successCount int64
|
||||
var errorTotal int64
|
||||
var tokenConsumed int64
|
||||
var peakRequestsPerMin int64
|
||||
var peakTokensPerMin int64
|
||||
if err := r.db.QueryRowContext(ctx, q, args...).Scan(
|
||||
&successCount,
|
||||
&errorTotal,
|
||||
&tokenConsumed,
|
||||
&peakRequestsPerMin,
|
||||
&peakTokensPerMin,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
windowSeconds := window.Seconds()
|
||||
if windowSeconds <= 0 {
|
||||
windowSeconds = 1
|
||||
}
|
||||
|
||||
requestCountTotal := successCount + errorTotal
|
||||
qpsAvg := roundTo1DP(float64(requestCountTotal) / windowSeconds)
|
||||
tpsAvg := roundTo1DP(float64(tokenConsumed) / windowSeconds)
|
||||
|
||||
// Keep "current" consistent with the dashboard overview semantics: last 1 minute.
|
||||
// This remains "within the selected window" since end=start+window.
|
||||
qpsCurrent, tpsCurrent, err := r.queryCurrentRates(ctx, filter, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
qpsPeak := roundTo1DP(float64(peakRequestsPerMin) / 60.0)
|
||||
tpsPeak := roundTo1DP(float64(peakTokensPerMin) / 60.0)
|
||||
|
||||
return &service.OpsRealtimeTrafficSummary{
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
Platform: strings.TrimSpace(filter.Platform),
|
||||
GroupID: filter.GroupID,
|
||||
QPS: service.OpsRateSummary{
|
||||
Current: qpsCurrent,
|
||||
Peak: qpsPeak,
|
||||
Avg: qpsAvg,
|
||||
},
|
||||
TPS: service.OpsRateSummary{
|
||||
Current: tpsCurrent,
|
||||
Peak: tpsPeak,
|
||||
Avg: tpsAvg,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@@ -73,6 +73,7 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
|
||||
// Realtime ops signals
|
||||
ops.GET("/concurrency", h.Admin.Ops.GetConcurrencyStats)
|
||||
ops.GET("/account-availability", h.Admin.Ops.GetAccountAvailability)
|
||||
ops.GET("/realtime-traffic", h.Admin.Ops.GetRealtimeTrafficSummary)
|
||||
|
||||
// Alerts (rules + events)
|
||||
ops.GET("/alert-rules", h.Admin.Ops.ListAlertRules)
|
||||
@@ -96,6 +97,13 @@ func registerOpsRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
|
||||
ops.GET("/advanced-settings", h.Admin.Ops.GetAdvancedSettings)
|
||||
ops.PUT("/advanced-settings", h.Admin.Ops.UpdateAdvancedSettings)
|
||||
|
||||
// Settings group (DB-backed)
|
||||
settings := ops.Group("/settings")
|
||||
{
|
||||
settings.GET("/metric-thresholds", h.Admin.Ops.GetMetricThresholds)
|
||||
settings.PUT("/metric-thresholds", h.Admin.Ops.UpdateMetricThresholds)
|
||||
}
|
||||
|
||||
// WebSocket realtime (QPS/TPS)
|
||||
ws := ops.Group("/ws")
|
||||
{
|
||||
|
||||
@@ -17,6 +17,8 @@ type OpsRepository interface {
|
||||
|
||||
// Lightweight window stats (for realtime WS / quick sampling).
|
||||
GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error)
|
||||
// Lightweight realtime traffic summary (for the Ops dashboard header card).
|
||||
GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error)
|
||||
|
||||
GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error)
|
||||
GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error)
|
||||
|
||||
36
backend/internal/service/ops_realtime_traffic.go
Normal file
36
backend/internal/service/ops_realtime_traffic.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||||
)
|
||||
|
||||
// GetRealtimeTrafficSummary returns QPS/TPS current/peak/avg for the provided window.
|
||||
// This is used by the Ops dashboard "Realtime Traffic" card and is intentionally lightweight.
|
||||
func (s *OpsService) GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error) {
|
||||
if err := s.RequireMonitoringEnabled(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.opsRepo == nil {
|
||||
return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
|
||||
}
|
||||
if filter == nil {
|
||||
return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required")
|
||||
}
|
||||
if filter.StartTime.IsZero() || filter.EndTime.IsZero() {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required")
|
||||
}
|
||||
if filter.StartTime.After(filter.EndTime) {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time")
|
||||
}
|
||||
if filter.EndTime.Sub(filter.StartTime) > time.Hour {
|
||||
return nil, infraerrors.BadRequest("OPS_TIME_RANGE_TOO_LARGE", "invalid time range: max window is 1 hour")
|
||||
}
|
||||
|
||||
// Realtime traffic summary always uses raw logs (minute granularity peaks).
|
||||
filter.QueryMode = OpsQueryModeRaw
|
||||
|
||||
return s.opsRepo.GetRealtimeTrafficSummary(ctx, filter)
|
||||
}
|
||||
19
backend/internal/service/ops_realtime_traffic_models.go
Normal file
19
backend/internal/service/ops_realtime_traffic_models.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package service
|
||||
|
||||
import "time"
|
||||
|
||||
// OpsRealtimeTrafficSummary is a lightweight summary used by the Ops dashboard "Realtime Traffic" card.
|
||||
// It reports QPS/TPS current/peak/avg for the requested time window.
|
||||
type OpsRealtimeTrafficSummary struct {
|
||||
// Window is a normalized label (e.g. "1min", "5min", "30min", "1h").
|
||||
Window string `json:"window"`
|
||||
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time"`
|
||||
|
||||
Platform string `json:"platform"`
|
||||
GroupID *int64 `json:"group_id"`
|
||||
|
||||
QPS OpsRateSummary `json:"qps"`
|
||||
TPS OpsRateSummary `json:"tps"`
|
||||
}
|
||||
@@ -463,3 +463,93 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva
|
||||
_ = json.Unmarshal(raw, updated)
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
// =========================
|
||||
// Metric thresholds
|
||||
// =========================
|
||||
|
||||
const SettingKeyOpsMetricThresholds = "ops_metric_thresholds"
|
||||
|
||||
func defaultOpsMetricThresholds() *OpsMetricThresholds {
|
||||
slaMin := 99.5
|
||||
latencyMax := 2000.0
|
||||
ttftMax := 500.0
|
||||
reqErrMax := 5.0
|
||||
upstreamErrMax := 5.0
|
||||
return &OpsMetricThresholds{
|
||||
SLAPercentMin: &slaMin,
|
||||
LatencyP99MsMax: &latencyMax,
|
||||
TTFTp99MsMax: &ttftMax,
|
||||
RequestErrorRatePercentMax: &reqErrMax,
|
||||
UpstreamErrorRatePercentMax: &upstreamErrMax,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OpsService) GetMetricThresholds(ctx context.Context) (*OpsMetricThresholds, error) {
|
||||
defaultCfg := defaultOpsMetricThresholds()
|
||||
if s == nil || s.settingRepo == nil {
|
||||
return defaultCfg, nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMetricThresholds)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrSettingNotFound) {
|
||||
if b, mErr := json.Marshal(defaultCfg); mErr == nil {
|
||||
_ = s.settingRepo.Set(ctx, SettingKeyOpsMetricThresholds, string(b))
|
||||
}
|
||||
return defaultCfg, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &OpsMetricThresholds{}
|
||||
if err := json.Unmarshal([]byte(raw), cfg); err != nil {
|
||||
return defaultCfg, nil
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (s *OpsService) UpdateMetricThresholds(ctx context.Context, cfg *OpsMetricThresholds) (*OpsMetricThresholds, error) {
|
||||
if s == nil || s.settingRepo == nil {
|
||||
return nil, errors.New("setting repository not initialized")
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if cfg == nil {
|
||||
return nil, errors.New("invalid config")
|
||||
}
|
||||
|
||||
// Validate thresholds
|
||||
if cfg.SLAPercentMin != nil && (*cfg.SLAPercentMin < 0 || *cfg.SLAPercentMin > 100) {
|
||||
return nil, errors.New("sla_percent_min must be between 0 and 100")
|
||||
}
|
||||
if cfg.LatencyP99MsMax != nil && *cfg.LatencyP99MsMax < 0 {
|
||||
return nil, errors.New("latency_p99_ms_max must be >= 0")
|
||||
}
|
||||
if cfg.TTFTp99MsMax != nil && *cfg.TTFTp99MsMax < 0 {
|
||||
return nil, errors.New("ttft_p99_ms_max must be >= 0")
|
||||
}
|
||||
if cfg.RequestErrorRatePercentMax != nil && (*cfg.RequestErrorRatePercentMax < 0 || *cfg.RequestErrorRatePercentMax > 100) {
|
||||
return nil, errors.New("request_error_rate_percent_max must be between 0 and 100")
|
||||
}
|
||||
if cfg.UpstreamErrorRatePercentMax != nil && (*cfg.UpstreamErrorRatePercentMax < 0 || *cfg.UpstreamErrorRatePercentMax > 100) {
|
||||
return nil, errors.New("upstream_error_rate_percent_max must be between 0 and 100")
|
||||
}
|
||||
|
||||
raw, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.settingRepo.Set(ctx, SettingKeyOpsMetricThresholds, string(raw)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
updated := &OpsMetricThresholds{}
|
||||
_ = json.Unmarshal(raw, updated)
|
||||
return updated, nil
|
||||
}
|
||||
|
||||
@@ -61,11 +61,20 @@ type OpsAlertSilencingSettings struct {
|
||||
Entries []OpsAlertSilenceEntry `json:"entries,omitempty"`
|
||||
}
|
||||
|
||||
type OpsMetricThresholds struct {
|
||||
SLAPercentMin *float64 `json:"sla_percent_min,omitempty"` // SLA低于此值变红
|
||||
LatencyP99MsMax *float64 `json:"latency_p99_ms_max,omitempty"` // 延迟P99高于此值变红
|
||||
TTFTp99MsMax *float64 `json:"ttft_p99_ms_max,omitempty"` // TTFT P99高于此值变红
|
||||
RequestErrorRatePercentMax *float64 `json:"request_error_rate_percent_max,omitempty"` // 请求错误率高于此值变红
|
||||
UpstreamErrorRatePercentMax *float64 `json:"upstream_error_rate_percent_max,omitempty"` // 上游错误率高于此值变红
|
||||
}
|
||||
|
||||
type OpsAlertRuntimeSettings struct {
|
||||
EvaluationIntervalSeconds int `json:"evaluation_interval_seconds"`
|
||||
|
||||
DistributedLock OpsDistributedLockSettings `json:"distributed_lock"`
|
||||
Silencing OpsAlertSilencingSettings `json:"silencing"`
|
||||
Thresholds OpsMetricThresholds `json:"thresholds"` // 指标阈值配置
|
||||
}
|
||||
|
||||
// OpsAdvancedSettings stores advanced ops configuration (data retention, aggregation).
|
||||
|
||||
Reference in New Issue
Block a user