feat(运维监控): 增强监控功能和健康评分系统

后端改进:
- 新增健康评分计算服务(ops_health_score.go)
- 添加分布式锁支持(ops_advisory_lock.go)
- 优化指标采集和聚合逻辑
- 新增运维指标采集间隔配置(60-3600秒)
- 移除未使用的WebSocket查询token认证中间件
- 改进清理服务和告警评估逻辑

前端改进:
- 简化OpsDashboard组件结构
- 完善国际化文本(中英文)
- 新增运维监控相关API类型定义
- 添加运维指标采集间隔设置界面
- 优化错误详情模态框

测试:
- 添加健康评分单元测试
- 更新API契约测试
This commit is contained in:
IanShaw027
2026-01-10 01:38:47 +08:00
parent 8ae75e7f6e
commit 585257d340
25 changed files with 570 additions and 385 deletions

View File

@@ -68,6 +68,7 @@ func (h *SettingHandler) GetSettings(c *gin.Context) {
OpsMonitoringEnabled: settings.OpsMonitoringEnabled,
OpsRealtimeMonitoringEnabled: settings.OpsRealtimeMonitoringEnabled,
OpsQueryModeDefault: settings.OpsQueryModeDefault,
OpsMetricsIntervalSeconds: settings.OpsMetricsIntervalSeconds,
})
}
@@ -115,9 +116,10 @@ type UpdateSettingsRequest struct {
IdentityPatchPrompt string `json:"identity_patch_prompt"`
// Ops monitoring (vNext)
OpsMonitoringEnabled *bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled *bool `json:"ops_realtime_monitoring_enabled"`
OpsMonitoringEnabled *bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled *bool `json:"ops_realtime_monitoring_enabled"`
OpsQueryModeDefault *string `json:"ops_query_mode_default"`
OpsMetricsIntervalSeconds *int `json:"ops_metrics_interval_seconds"`
}
// UpdateSettings 更新系统设置
@@ -173,6 +175,18 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
}
}
// Ops metrics collector interval validation (seconds).
if req.OpsMetricsIntervalSeconds != nil {
v := *req.OpsMetricsIntervalSeconds
if v < 60 {
v = 60
}
if v > 3600 {
v = 3600
}
req.OpsMetricsIntervalSeconds = &v
}
settings := &service.SystemSettings{
RegistrationEnabled: req.RegistrationEnabled,
EmailVerifyEnabled: req.EmailVerifyEnabled,
@@ -219,6 +233,12 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
}
return previousSettings.OpsQueryModeDefault
}(),
OpsMetricsIntervalSeconds: func() int {
if req.OpsMetricsIntervalSeconds != nil {
return *req.OpsMetricsIntervalSeconds
}
return previousSettings.OpsMetricsIntervalSeconds
}(),
}
if err := h.settingService.UpdateSettings(c.Request.Context(), settings); err != nil {
@@ -266,6 +286,7 @@ func (h *SettingHandler) UpdateSettings(c *gin.Context) {
OpsMonitoringEnabled: updatedSettings.OpsMonitoringEnabled,
OpsRealtimeMonitoringEnabled: updatedSettings.OpsRealtimeMonitoringEnabled,
OpsQueryModeDefault: updatedSettings.OpsQueryModeDefault,
OpsMetricsIntervalSeconds: updatedSettings.OpsMetricsIntervalSeconds,
})
}
@@ -375,6 +396,9 @@ func diffSettings(before *service.SystemSettings, after *service.SystemSettings,
if before.OpsQueryModeDefault != after.OpsQueryModeDefault {
changed = append(changed, "ops_query_mode_default")
}
if before.OpsMetricsIntervalSeconds != after.OpsMetricsIntervalSeconds {
changed = append(changed, "ops_metrics_interval_seconds")
}
return changed
}

View File

@@ -39,9 +39,10 @@ type SystemSettings struct {
IdentityPatchPrompt string `json:"identity_patch_prompt"`
// Ops monitoring (vNext)
OpsMonitoringEnabled bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled bool `json:"ops_realtime_monitoring_enabled"`
OpsMonitoringEnabled bool `json:"ops_monitoring_enabled"`
OpsRealtimeMonitoringEnabled bool `json:"ops_realtime_monitoring_enabled"`
OpsQueryModeDefault string `json:"ops_query_mode_default"`
OpsMetricsIntervalSeconds int `json:"ops_metrics_interval_seconds"`
}
type PublicSettings struct {

View File

@@ -68,6 +68,9 @@ INSERT INTO ops_system_metrics (
db_ok,
redis_ok,
redis_conn_total,
redis_conn_idle,
db_conn_active,
db_conn_idle,
db_conn_waiting,
@@ -83,8 +86,9 @@ INSERT INTO ops_system_metrics (
$21,$22,$23,$24,$25,$26,
$27,$28,$29,$30,
$31,$32,
$33,$34,$35,
$36,$37
$33,$34,
$35,$36,$37,
$38,$39
)`
_, err := r.db.ExecContext(
@@ -130,6 +134,9 @@ INSERT INTO ops_system_metrics (
opsNullBool(input.DBOK),
opsNullBool(input.RedisOK),
opsNullInt(input.RedisConnTotal),
opsNullInt(input.RedisConnIdle),
opsNullInt(input.DBConnActive),
opsNullInt(input.DBConnIdle),
opsNullInt(input.DBConnWaiting),
@@ -162,6 +169,9 @@ SELECT
db_ok,
redis_ok,
redis_conn_total,
redis_conn_idle,
db_conn_active,
db_conn_idle,
db_conn_waiting,
@@ -182,6 +192,8 @@ LIMIT 1`
var memPct sql.NullFloat64
var dbOK sql.NullBool
var redisOK sql.NullBool
var redisTotal sql.NullInt64
var redisIdle sql.NullInt64
var dbActive sql.NullInt64
var dbIdle sql.NullInt64
var dbWaiting sql.NullInt64
@@ -198,6 +210,8 @@ LIMIT 1`
&memPct,
&dbOK,
&redisOK,
&redisTotal,
&redisIdle,
&dbActive,
&dbIdle,
&dbWaiting,
@@ -231,6 +245,14 @@ LIMIT 1`
v := redisOK.Bool
out.RedisOK = &v
}
if redisTotal.Valid {
v := int(redisTotal.Int64)
out.RedisConnTotal = &v
}
if redisIdle.Valid {
v := int(redisIdle.Int64)
out.RedisConnIdle = &v
}
if dbActive.Valid {
v := int(dbActive.Int64)
out.DBConnActive = &v
@@ -398,4 +420,3 @@ func opsNullTime(v *time.Time) any {
}
return sql.NullTime{Time: *v, Valid: true}
}

View File

@@ -319,7 +319,9 @@ func TestAPIContracts(t *testing.T) {
"enable_identity_patch": true,
"identity_patch_prompt": "",
"ops_monitoring_enabled": true,
"ops_realtime_monitoring_enabled": true
"ops_realtime_monitoring_enabled": true,
"ops_query_mode_default": "auto",
"ops_metrics_interval_seconds": 60
}
}`,
},

View File

@@ -1,54 +0,0 @@
package middleware
import (
"net/http"
"strings"
"github.com/gin-gonic/gin"
)
// InjectBearerTokenFromQueryForWebSocket copies `?token=` into the Authorization header
// for WebSocket handshake requests on a small allow-list of endpoints.
//
// Why: browsers can't set custom headers on WebSocket handshake, but our admin routes
// are protected by header-based auth. This keeps the token support scoped to WS only.
func InjectBearerTokenFromQueryForWebSocket() gin.HandlerFunc {
return func(c *gin.Context) {
if c == nil || c.Request == nil {
if c != nil {
c.Next()
}
return
}
// Only GET websocket upgrades.
if c.Request.Method != http.MethodGet {
c.Next()
return
}
if !strings.EqualFold(strings.TrimSpace(c.GetHeader("Upgrade")), "websocket") {
c.Next()
return
}
// If caller already supplied auth headers, don't override.
if strings.TrimSpace(c.GetHeader("Authorization")) != "" || strings.TrimSpace(c.GetHeader("x-api-key")) != "" {
c.Next()
return
}
// Allow-list ops websocket endpoints.
path := strings.TrimSpace(c.Request.URL.Path)
if !strings.HasPrefix(path, "/api/v1/admin/ops/ws/") {
c.Next()
return
}
token := strings.TrimSpace(c.Query("token"))
if token != "" {
c.Request.Header.Set("Authorization", "Bearer "+token)
}
c.Next()
}
}

View File

@@ -25,8 +25,6 @@ func SetupRouter(
) *gin.Engine {
// 应用中间件
r.Use(middleware2.Logger())
// WebSocket handshake auth helper (token via query param, WS endpoints only).
r.Use(middleware2.InjectBearerTokenFromQueryForWebSocket())
r.Use(middleware2.CORS(cfg.CORS))
r.Use(middleware2.SecurityHeaders(cfg.Security.CSP))

View File

@@ -0,0 +1,46 @@
package service
import (
"context"
"database/sql"
"hash/fnv"
"time"
)
func hashAdvisoryLockID(key string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(key))
return int64(h.Sum64())
}
func tryAcquireDBAdvisoryLock(ctx context.Context, db *sql.DB, lockID int64) (func(), bool) {
if db == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
conn, err := db.Conn(ctx)
if err != nil {
return nil, false
}
acquired := false
if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", lockID).Scan(&acquired); err != nil {
_ = conn.Close()
return nil, false
}
if !acquired {
_ = conn.Close()
return nil, false
}
release := func() {
unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", lockID)
_ = conn.Close()
}
return release, true
}

View File

@@ -376,28 +376,37 @@ return 0
`)
func (s *OpsAggregationService) tryAcquireLeaderLock(ctx context.Context, key string, ttl time.Duration, logPrefix string) (func(), bool) {
if s == nil || s.redisClient == nil {
return nil, true
if s == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Fail-open: do not block single-instance deployments.
return nil, true
// Prefer Redis leader lock when available (multi-instance), but avoid stampeding
// the DB when Redis is flaky by falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
release := func() {
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result()
}
return release, true
}
// Redis error: fall through to DB advisory lock.
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
release := func() {
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result()
}
return release, true
}

View File

@@ -720,11 +720,12 @@ func (s *OpsAlertEvaluatorService) tryAcquireLeaderLock(ctx context.Context, loc
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Fail-open for single-node environments, but warn.
// Prefer fail-closed to avoid duplicate evaluators stampeding the DB when Redis is flaky.
// Single-node deployments can disable the distributed lock via runtime settings.
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; running without lock: %v", err)
log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; skipping this cycle: %v", err)
})
return nil, true
return nil, false
}
if !ok {
s.maybeLogSkip(key)

View File

@@ -300,30 +300,36 @@ func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), b
return nil, true
}
if s.redisClient == nil {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] redis not configured; running without distributed lock")
})
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by
// falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
// Redis error: fall back to DB advisory lock.
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] leader lock SetNX failed; running without lock: %v", err)
log.Printf("[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err)
})
} else {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsCleanup] redis not configured; using DB advisory lock")
})
return nil, true
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
return release, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {

View File

@@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"log"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
@@ -39,6 +40,16 @@ func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashbo
// Best-effort system health + jobs; dashboard metrics should still render if these are missing.
if metrics, err := s.opsRepo.GetLatestSystemMetrics(ctx, 1); err == nil {
// Attach config-derived limits so the UI can show "current / max" for connection pools.
// These are best-effort and should never block the dashboard rendering.
if s != nil && s.cfg != nil {
if s.cfg.Database.MaxOpenConns > 0 {
metrics.DBMaxOpenConns = intPtr(s.cfg.Database.MaxOpenConns)
}
if s.cfg.Redis.PoolSize > 0 {
metrics.RedisPoolSize = intPtr(s.cfg.Redis.PoolSize)
}
}
overview.SystemMetrics = metrics
} else if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Printf("[Ops] GetLatestSystemMetrics failed: %v", err)
@@ -50,6 +61,8 @@ func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashbo
log.Printf("[Ops] ListJobHeartbeats failed: %v", err)
}
overview.HealthScore = computeDashboardHealthScore(time.Now().UTC(), overview)
return overview, nil
}

View File

@@ -35,6 +35,10 @@ type OpsDashboardOverview struct {
Platform string `json:"platform"`
GroupID *int64 `json:"group_id"`
// HealthScore is a backend-computed overall health score (0-100).
// It is derived from the monitored metrics in this overview, plus best-effort system metrics/job heartbeats.
HealthScore int `json:"health_score"`
// Latest system-level snapshot (window=1m, global).
SystemMetrics *OpsSystemMetricsSnapshot `json:"system_metrics"`

View File

@@ -0,0 +1,126 @@
package service
import (
"math"
"time"
)
// computeDashboardHealthScore computes a 0-100 health score from the metrics returned by the dashboard overview.
//
// Design goals:
// - Backend-owned scoring (UI only displays).
// - Uses "overall" business indicators (SLA/error/latency) plus infra indicators (db/redis/cpu/mem/jobs).
// - Conservative + stable: penalize clear degradations; avoid overreacting to missing/idle data.
func computeDashboardHealthScore(now time.Time, overview *OpsDashboardOverview) int {
if overview == nil {
return 0
}
// Idle/no-data: avoid showing a "bad" score when there is no traffic.
// UI can still render a gray/idle state based on QPS + error rate.
if overview.RequestCountSLA <= 0 && overview.RequestCountTotal <= 0 && overview.ErrorCountTotal <= 0 {
return 100
}
score := 100.0
// --- SLA (primary signal) ---
// SLA is a ratio (0..1). Target is intentionally modest for LLM gateways; it can be tuned later.
slaPct := clampFloat64(overview.SLA*100, 0, 100)
if slaPct < 99.5 {
// Up to -45 points as SLA drops.
score -= math.Min(45, (99.5-slaPct)*12)
}
// --- Error rates (secondary signal) ---
errorPct := clampFloat64(overview.ErrorRate*100, 0, 100)
if errorPct > 1 {
// Cap at -20 points by 6% error rate.
score -= math.Min(20, (errorPct-1)*4)
}
upstreamPct := clampFloat64(overview.UpstreamErrorRate*100, 0, 100)
if upstreamPct > 1 {
// Upstream instability deserves extra weight, but keep it smaller than SLA/error.
score -= math.Min(15, (upstreamPct-1)*3)
}
// --- Latency (tail-focused) ---
// Use p99 of duration + TTFT. Penalize only when clearly elevated.
if overview.Duration.P99 != nil {
p99 := float64(*overview.Duration.P99)
if p99 > 2000 {
// From 2s upward, gradually penalize up to -20.
score -= math.Min(20, (p99-2000)/900) // ~20s => ~-20
}
}
if overview.TTFT.P99 != nil {
p99 := float64(*overview.TTFT.P99)
if p99 > 500 {
// TTFT > 500ms starts hurting; cap at -10.
score -= math.Min(10, (p99-500)/200) // 2.5s => -10
}
}
// --- System metrics snapshot (best-effort) ---
if overview.SystemMetrics != nil {
if overview.SystemMetrics.DBOK != nil && !*overview.SystemMetrics.DBOK {
score -= 20
}
if overview.SystemMetrics.RedisOK != nil && !*overview.SystemMetrics.RedisOK {
score -= 15
}
if overview.SystemMetrics.CPUUsagePercent != nil {
cpuPct := clampFloat64(*overview.SystemMetrics.CPUUsagePercent, 0, 100)
if cpuPct > 85 {
score -= math.Min(10, (cpuPct-85)*1.5)
}
}
if overview.SystemMetrics.MemoryUsagePercent != nil {
memPct := clampFloat64(*overview.SystemMetrics.MemoryUsagePercent, 0, 100)
if memPct > 90 {
score -= math.Min(10, (memPct-90)*1.0)
}
}
if overview.SystemMetrics.DBConnWaiting != nil && *overview.SystemMetrics.DBConnWaiting > 0 {
waiting := float64(*overview.SystemMetrics.DBConnWaiting)
score -= math.Min(10, waiting*2)
}
if overview.SystemMetrics.ConcurrencyQueueDepth != nil && *overview.SystemMetrics.ConcurrencyQueueDepth > 0 {
depth := float64(*overview.SystemMetrics.ConcurrencyQueueDepth)
score -= math.Min(10, depth*0.5)
}
}
// --- Job heartbeats (best-effort) ---
// Penalize only clear "error after last success" signals, and cap the impact.
jobPenalty := 0.0
for _, hb := range overview.JobHeartbeats {
if hb == nil {
continue
}
if hb.LastErrorAt != nil && (hb.LastSuccessAt == nil || hb.LastErrorAt.After(*hb.LastSuccessAt)) {
jobPenalty += 5
continue
}
if hb.LastSuccessAt != nil && now.Sub(*hb.LastSuccessAt) > 15*time.Minute {
jobPenalty += 2
}
}
score -= math.Min(15, jobPenalty)
score = clampFloat64(score, 0, 100)
return int(math.Round(score))
}
func clampFloat64(v float64, min float64, max float64) float64 {
if v < min {
return min
}
if v > max {
return max
}
return v
}

View File

@@ -0,0 +1,60 @@
//go:build unit
package service
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestComputeDashboardHealthScore_IdleReturns100(t *testing.T) {
t.Parallel()
score := computeDashboardHealthScore(time.Now().UTC(), &OpsDashboardOverview{})
require.Equal(t, 100, score)
}
func TestComputeDashboardHealthScore_DegradesOnBadSignals(t *testing.T) {
t.Parallel()
ov := &OpsDashboardOverview{
RequestCountTotal: 100,
RequestCountSLA: 100,
SuccessCount: 90,
ErrorCountTotal: 10,
ErrorCountSLA: 10,
SLA: 0.90,
ErrorRate: 0.10,
UpstreamErrorRate: 0.08,
Duration: OpsPercentiles{P99: intPtr(20_000)},
TTFT: OpsPercentiles{P99: intPtr(2_000)},
SystemMetrics: &OpsSystemMetricsSnapshot{
DBOK: boolPtr(false),
RedisOK: boolPtr(false),
CPUUsagePercent: float64Ptr(98.0),
MemoryUsagePercent: float64Ptr(97.0),
DBConnWaiting: intPtr(3),
ConcurrencyQueueDepth: intPtr(10),
},
JobHeartbeats: []*OpsJobHeartbeat{
{
JobName: "job-a",
LastErrorAt: timePtr(time.Now().UTC().Add(-1 * time.Minute)),
LastError: stringPtr("boom"),
},
},
}
score := computeDashboardHealthScore(time.Now().UTC(), ov)
require.Less(t, score, 80)
require.GreaterOrEqual(t, score, 0)
}
func timePtr(v time.Time) *time.Time { return &v }
func stringPtr(v string) *string { return &v }

View File

@@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"hash/fnv"
"log"
"math"
"os"
@@ -262,6 +261,7 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
dbOK := c.checkDB(ctx)
redisOK := c.checkRedis(ctx)
active, idle := c.dbPoolStats()
redisTotal, redisIdle, redisStatsOK := c.redisPoolStats()
successCount, tokenConsumed, err := c.queryUsageCounts(ctx, windowStart, windowEnd)
if err != nil {
@@ -327,6 +327,19 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
DBOK: boolPtr(dbOK),
RedisOK: boolPtr(redisOK),
RedisConnTotal: func() *int {
if !redisStatsOK {
return nil
}
return intPtr(redisTotal)
}(),
RedisConnIdle: func() *int {
if !redisStatsOK {
return nil
}
return intPtr(redisIdle)
}(),
DBConnActive: intPtr(active),
DBConnIdle: intPtr(idle),
GoroutineCount: intPtr(goroutines),
@@ -722,6 +735,17 @@ func (c *OpsMetricsCollector) checkRedis(ctx context.Context) bool {
return c.redisClient.Ping(ctx).Err() == nil
}
func (c *OpsMetricsCollector) redisPoolStats() (total int, idle int, ok bool) {
if c == nil || c.redisClient == nil {
return 0, 0, false
}
stats := c.redisClient.PoolStats()
if stats == nil {
return 0, 0, false
}
return int(stats.TotalConns), int(stats.IdleConns), true
}
func (c *OpsMetricsCollector) dbPoolStats() (active int, idle int) {
if c == nil || c.db == nil {
return 0, 0
@@ -749,7 +773,7 @@ func (c *OpsMetricsCollector) tryAcquireLeaderLock(ctx context.Context) (func(),
if err != nil {
// Prefer fail-closed to avoid stampeding the database when Redis is flaky.
// Fallback to a DB advisory lock when Redis is present but unavailable.
release, ok := c.tryAcquireDBAdvisoryLock(ctx)
release, ok := tryAcquireDBAdvisoryLock(ctx, c.db, opsMetricsCollectorAdvisoryLockID)
if !ok {
c.maybeLogSkip()
return nil, false
@@ -769,38 +793,6 @@ func (c *OpsMetricsCollector) tryAcquireLeaderLock(ctx context.Context) (func(),
return release, true
}
func (c *OpsMetricsCollector) tryAcquireDBAdvisoryLock(ctx context.Context) (func(), bool) {
if c == nil || c.db == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
conn, err := c.db.Conn(ctx)
if err != nil {
return nil, false
}
acquired := false
if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", opsMetricsCollectorAdvisoryLockID).Scan(&acquired); err != nil {
_ = conn.Close()
return nil, false
}
if !acquired {
_ = conn.Close()
return nil, false
}
release := func() {
unlockCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = conn.ExecContext(unlockCtx, "SELECT pg_advisory_unlock($1)", opsMetricsCollectorAdvisoryLockID)
_ = conn.Close()
}
return release, true
}
func (c *OpsMetricsCollector) maybeLogSkip() {
c.skipLogMu.Lock()
defer c.skipLogMu.Unlock()
@@ -853,9 +845,3 @@ func float64Ptr(v float64) *float64 {
out := v
return &out
}
func hashAdvisoryLockID(s string) int64 {
h := fnv.New64a()
_, _ = h.Write([]byte(s))
return int64(h.Sum64())
}

View File

@@ -165,6 +165,9 @@ type OpsInsertSystemMetricsInput struct {
DBOK *bool
RedisOK *bool
RedisConnTotal *int
RedisConnIdle *int
DBConnActive *int
DBConnIdle *int
DBConnWaiting *int
@@ -186,6 +189,13 @@ type OpsSystemMetricsSnapshot struct {
DBOK *bool `json:"db_ok"`
RedisOK *bool `json:"redis_ok"`
// Config-derived limits (best-effort). These are not historical metrics; they help UI render "current vs max".
DBMaxOpenConns *int `json:"db_max_open_conns"`
RedisPoolSize *int `json:"redis_pool_size"`
RedisConnTotal *int `json:"redis_conn_total"`
RedisConnIdle *int `json:"redis_conn_idle"`
DBConnActive *int `json:"db_conn_active"`
DBConnIdle *int `json:"db_conn_idle"`
DBConnWaiting *int `json:"db_conn_waiting"`

View File

@@ -139,6 +139,9 @@ func (s *SettingService) UpdateSettings(ctx context.Context, settings *SystemSet
updates[SettingKeyOpsMonitoringEnabled] = strconv.FormatBool(settings.OpsMonitoringEnabled)
updates[SettingKeyOpsRealtimeMonitoringEnabled] = strconv.FormatBool(settings.OpsRealtimeMonitoringEnabled)
updates[SettingKeyOpsQueryModeDefault] = string(ParseOpsQueryMode(settings.OpsQueryModeDefault))
if settings.OpsMetricsIntervalSeconds > 0 {
updates[SettingKeyOpsMetricsIntervalSeconds] = strconv.Itoa(settings.OpsMetricsIntervalSeconds)
}
return s.settingRepo.SetMultiple(ctx, updates)
}
@@ -231,6 +234,7 @@ func (s *SettingService) InitializeDefaultSettings(ctx context.Context) error {
SettingKeyOpsMonitoringEnabled: "true",
SettingKeyOpsRealtimeMonitoringEnabled: "true",
SettingKeyOpsQueryModeDefault: "auto",
SettingKeyOpsMetricsIntervalSeconds: "60",
}
return s.settingRepo.SetMultiple(ctx, defaults)
@@ -301,6 +305,18 @@ func (s *SettingService) parseSettings(settings map[string]string) *SystemSettin
result.OpsMonitoringEnabled = !isFalseSettingValue(settings[SettingKeyOpsMonitoringEnabled])
result.OpsRealtimeMonitoringEnabled = !isFalseSettingValue(settings[SettingKeyOpsRealtimeMonitoringEnabled])
result.OpsQueryModeDefault = string(ParseOpsQueryMode(settings[SettingKeyOpsQueryModeDefault]))
result.OpsMetricsIntervalSeconds = 60
if raw := strings.TrimSpace(settings[SettingKeyOpsMetricsIntervalSeconds]); raw != "" {
if v, err := strconv.Atoi(raw); err == nil {
if v < 60 {
v = 60
}
if v > 3600 {
v = 3600
}
result.OpsMetricsIntervalSeconds = v
}
}
return result
}

View File

@@ -43,6 +43,7 @@ type SystemSettings struct {
OpsMonitoringEnabled bool
OpsRealtimeMonitoringEnabled bool
OpsQueryModeDefault string
OpsMetricsIntervalSeconds int
}
type PublicSettings struct {