From 5baa8b5673e77a5ebb3453094c8b13bef83e71f6 Mon Sep 17 00:00:00 2001 From: IanShaw027 <131567472+IanShaw027@users.noreply.github.com> Date: Fri, 9 Jan 2026 20:53:44 +0800 Subject: [PATCH] =?UTF-8?q?feat(service):=20=E5=AE=9E=E7=8E=B0=E8=BF=90?= =?UTF-8?q?=E7=BB=B4=E7=9B=91=E6=8E=A7=E4=B8=9A=E5=8A=A1=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 ops 主服务(ops_service.go)和端口定义(ops_port.go) - 实现账号可用性检查服务(ops_account_availability.go) - 实现数据聚合服务(ops_aggregation_service.go) - 实现告警评估服务(ops_alert_evaluator_service.go) - 实现告警管理服务(ops_alerts.go) - 实现数据清理服务(ops_cleanup_service.go) - 实现并发控制服务(ops_concurrency.go) - 实现仪表板服务(ops_dashboard.go) - 实现错误处理服务(ops_errors.go) - 实现直方图服务(ops_histograms.go) - 实现指标采集服务(ops_metrics_collector.go) - 实现查询模式服务(ops_query_mode.go) - 实现实时监控服务(ops_realtime.go) - 实现请求详情服务(ops_request_details.go) - 实现重试机制服务(ops_retry.go) - 实现配置管理服务(ops_settings.go) - 实现趋势分析服务(ops_trends.go) - 实现窗口统计服务(ops_window_stats.go) - 添加 ops 相关领域常量 - 注册 service 依赖注入 --- backend/internal/service/domain_constants.go | 22 + .../service/ops_account_availability.go | 157 ++++ .../service/ops_aggregation_service.go | 434 +++++++++ .../service/ops_alert_evaluator_service.go | 839 +++++++++++++++++ backend/internal/service/ops_alerts.go | 162 ++++ .../internal/service/ops_cleanup_service.go | 361 ++++++++ backend/internal/service/ops_concurrency.go | 257 ++++++ backend/internal/service/ops_dashboard.go | 77 ++ backend/internal/service/ops_errors.go | 45 + backend/internal/service/ops_histograms.go | 26 + .../internal/service/ops_metrics_collector.go | 861 ++++++++++++++++++ backend/internal/service/ops_port.go | 226 +++++ backend/internal/service/ops_query_mode.go | 40 + backend/internal/service/ops_realtime.go | 36 + .../internal/service/ops_request_details.go | 152 ++++ backend/internal/service/ops_retry.go | 635 +++++++++++++ backend/internal/service/ops_service.go | 451 +++++++++ backend/internal/service/ops_settings.go | 354 +++++++ backend/internal/service/ops_trends.go | 27 + backend/internal/service/ops_window_stats.go | 24 + backend/internal/service/wire.go | 58 ++ 21 files changed, 5244 insertions(+) create mode 100644 backend/internal/service/ops_account_availability.go create mode 100644 backend/internal/service/ops_aggregation_service.go create mode 100644 backend/internal/service/ops_alert_evaluator_service.go create mode 100644 backend/internal/service/ops_alerts.go create mode 100644 backend/internal/service/ops_cleanup_service.go create mode 100644 backend/internal/service/ops_concurrency.go create mode 100644 backend/internal/service/ops_dashboard.go create mode 100644 backend/internal/service/ops_errors.go create mode 100644 backend/internal/service/ops_histograms.go create mode 100644 backend/internal/service/ops_metrics_collector.go create mode 100644 backend/internal/service/ops_port.go create mode 100644 backend/internal/service/ops_query_mode.go create mode 100644 backend/internal/service/ops_realtime.go create mode 100644 backend/internal/service/ops_request_details.go create mode 100644 backend/internal/service/ops_retry.go create mode 100644 backend/internal/service/ops_service.go create mode 100644 backend/internal/service/ops_settings.go create mode 100644 backend/internal/service/ops_trends.go create mode 100644 backend/internal/service/ops_window_stats.go diff --git a/backend/internal/service/domain_constants.go b/backend/internal/service/domain_constants.go index 9c61ea2e..04f80dbe 100644 --- a/backend/internal/service/domain_constants.go +++ b/backend/internal/service/domain_constants.go @@ -105,6 +105,28 @@ const ( // Request identity patch (Claude -> Gemini systemInstruction injection) SettingKeyEnableIdentityPatch = "enable_identity_patch" SettingKeyIdentityPatchPrompt = "identity_patch_prompt" + + // ========================= + // Ops Monitoring (vNext) + // ========================= + + // SettingKeyOpsMonitoringEnabled is a DB-backed soft switch to enable/disable ops module at runtime. + SettingKeyOpsMonitoringEnabled = "ops_monitoring_enabled" + + // SettingKeyOpsRealtimeMonitoringEnabled controls realtime features (e.g. WS/QPS push). + SettingKeyOpsRealtimeMonitoringEnabled = "ops_realtime_monitoring_enabled" + + // SettingKeyOpsQueryModeDefault controls the default query mode for ops dashboard (auto/raw/preagg). + SettingKeyOpsQueryModeDefault = "ops_query_mode_default" + + // SettingKeyOpsEmailNotificationConfig stores JSON config for ops email notifications. + SettingKeyOpsEmailNotificationConfig = "ops_email_notification_config" + + // SettingKeyOpsAlertRuntimeSettings stores JSON config for ops alert evaluator runtime settings. + SettingKeyOpsAlertRuntimeSettings = "ops_alert_runtime_settings" + + // SettingKeyOpsMetricsIntervalSeconds controls the ops metrics collector interval (>=60). + SettingKeyOpsMetricsIntervalSeconds = "ops_metrics_interval_seconds" ) // AdminAPIKeyPrefix is the prefix for admin API keys (distinct from user "sk-" keys). diff --git a/backend/internal/service/ops_account_availability.go b/backend/internal/service/ops_account_availability.go new file mode 100644 index 00000000..d0cbbe5c --- /dev/null +++ b/backend/internal/service/ops_account_availability.go @@ -0,0 +1,157 @@ +package service + +import ( + "context" + "time" +) + +// GetAccountAvailabilityStats returns current account availability stats. +// +// Query-level filtering is intentionally limited to platform/group to match the dashboard scope. +func (s *OpsService) GetAccountAvailabilityStats(ctx context.Context, platformFilter string, groupIDFilter *int64) ( + map[string]*PlatformAvailability, + map[int64]*GroupAvailability, + map[int64]*AccountAvailability, + *time.Time, + error, +) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, nil, nil, nil, err + } + + accounts, err := s.listAllAccountsForOps(ctx, platformFilter) + if err != nil { + return nil, nil, nil, nil, err + } + + if groupIDFilter != nil && *groupIDFilter > 0 { + filtered := make([]Account, 0, len(accounts)) + for _, acc := range accounts { + for _, grp := range acc.Groups { + if grp != nil && grp.ID == *groupIDFilter { + filtered = append(filtered, acc) + break + } + } + } + accounts = filtered + } + + now := time.Now() + collectedAt := now + + platform := make(map[string]*PlatformAvailability) + group := make(map[int64]*GroupAvailability) + account := make(map[int64]*AccountAvailability) + + for _, acc := range accounts { + if acc.ID <= 0 { + continue + } + + isTempUnsched := false + if acc.TempUnschedulableUntil != nil && now.Before(*acc.TempUnschedulableUntil) { + isTempUnsched = true + } + + isRateLimited := acc.RateLimitResetAt != nil && now.Before(*acc.RateLimitResetAt) + isOverloaded := acc.OverloadUntil != nil && now.Before(*acc.OverloadUntil) + hasError := acc.Status == StatusError + + // Normalize exclusive status flags so the UI doesn't show conflicting badges. + if hasError { + isRateLimited = false + isOverloaded = false + } + + isAvailable := acc.Status == StatusActive && acc.Schedulable && !isRateLimited && !isOverloaded && !isTempUnsched + + if acc.Platform != "" { + if _, ok := platform[acc.Platform]; !ok { + platform[acc.Platform] = &PlatformAvailability{ + Platform: acc.Platform, + } + } + p := platform[acc.Platform] + p.TotalAccounts++ + if isAvailable { + p.AvailableCount++ + } + if isRateLimited { + p.RateLimitCount++ + } + if hasError { + p.ErrorCount++ + } + } + + for _, grp := range acc.Groups { + if grp == nil || grp.ID <= 0 { + continue + } + if _, ok := group[grp.ID]; !ok { + group[grp.ID] = &GroupAvailability{ + GroupID: grp.ID, + GroupName: grp.Name, + Platform: grp.Platform, + } + } + g := group[grp.ID] + g.TotalAccounts++ + if isAvailable { + g.AvailableCount++ + } + if isRateLimited { + g.RateLimitCount++ + } + if hasError { + g.ErrorCount++ + } + } + + displayGroupID := int64(0) + displayGroupName := "" + if len(acc.Groups) > 0 && acc.Groups[0] != nil { + displayGroupID = acc.Groups[0].ID + displayGroupName = acc.Groups[0].Name + } + + item := &AccountAvailability{ + AccountID: acc.ID, + AccountName: acc.Name, + Platform: acc.Platform, + GroupID: displayGroupID, + GroupName: displayGroupName, + Status: acc.Status, + + IsAvailable: isAvailable, + IsRateLimited: isRateLimited, + IsOverloaded: isOverloaded, + HasError: hasError, + + ErrorMessage: acc.ErrorMessage, + } + + if isRateLimited && acc.RateLimitResetAt != nil { + item.RateLimitResetAt = acc.RateLimitResetAt + remainingSec := int64(time.Until(*acc.RateLimitResetAt).Seconds()) + if remainingSec > 0 { + item.RateLimitRemainingSec = &remainingSec + } + } + if isOverloaded && acc.OverloadUntil != nil { + item.OverloadUntil = acc.OverloadUntil + remainingSec := int64(time.Until(*acc.OverloadUntil).Seconds()) + if remainingSec > 0 { + item.OverloadRemainingSec = &remainingSec + } + } + if isTempUnsched && acc.TempUnschedulableUntil != nil { + item.TempUnschedulableUntil = acc.TempUnschedulableUntil + } + + account[acc.ID] = item + } + + return platform, group, account, &collectedAt, nil +} diff --git a/backend/internal/service/ops_aggregation_service.go b/backend/internal/service/ops_aggregation_service.go new file mode 100644 index 00000000..04dbb11b --- /dev/null +++ b/backend/internal/service/ops_aggregation_service.go @@ -0,0 +1,434 @@ +package service + +import ( + "context" + "database/sql" + "errors" + "log" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" +) + +const ( + opsAggHourlyJobName = "ops_preaggregation_hourly" + opsAggDailyJobName = "ops_preaggregation_daily" + + opsAggHourlyInterval = 10 * time.Minute + opsAggDailyInterval = 1 * time.Hour + + // Keep in sync with ops retention target (vNext default 30d). + opsAggBackfillWindow = 30 * 24 * time.Hour + + // Recompute overlap to absorb late-arriving rows near boundaries. + opsAggHourlyOverlap = 2 * time.Hour + opsAggDailyOverlap = 48 * time.Hour + + opsAggHourlyChunk = 24 * time.Hour + opsAggDailyChunk = 7 * 24 * time.Hour + + // Delay around boundaries (e.g. 10:00..10:05) to avoid aggregating buckets + // that may still receive late inserts. + opsAggSafeDelay = 5 * time.Minute + + opsAggMaxQueryTimeout = 3 * time.Second + opsAggHourlyTimeout = 5 * time.Minute + opsAggDailyTimeout = 2 * time.Minute + + opsAggHourlyLeaderLockKey = "ops:aggregation:hourly:leader" + opsAggDailyLeaderLockKey = "ops:aggregation:daily:leader" + + opsAggHourlyLeaderLockTTL = 15 * time.Minute + opsAggDailyLeaderLockTTL = 10 * time.Minute +) + +// OpsAggregationService periodically backfills ops_metrics_hourly / ops_metrics_daily +// for stable long-window dashboard queries. +// +// It is safe to run in multi-replica deployments when Redis is available (leader lock). +type OpsAggregationService struct { + opsRepo OpsRepository + settingRepo SettingRepository + cfg *config.Config + + db *sql.DB + redisClient *redis.Client + instanceID string + + stopCh chan struct{} + startOnce sync.Once + stopOnce sync.Once + + hourlyMu sync.Mutex + dailyMu sync.Mutex + + skipLogMu sync.Mutex + skipLogAt time.Time +} + +func NewOpsAggregationService( + opsRepo OpsRepository, + settingRepo SettingRepository, + db *sql.DB, + redisClient *redis.Client, + cfg *config.Config, +) *OpsAggregationService { + return &OpsAggregationService{ + opsRepo: opsRepo, + settingRepo: settingRepo, + cfg: cfg, + db: db, + redisClient: redisClient, + instanceID: uuid.NewString(), + } +} + +func (s *OpsAggregationService) Start() { + if s == nil { + return + } + s.startOnce.Do(func() { + if s.stopCh == nil { + s.stopCh = make(chan struct{}) + } + go s.hourlyLoop() + go s.dailyLoop() + }) +} + +func (s *OpsAggregationService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + if s.stopCh != nil { + close(s.stopCh) + } + }) +} + +func (s *OpsAggregationService) hourlyLoop() { + // First run immediately. + s.aggregateHourly() + + ticker := time.NewTicker(opsAggHourlyInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.aggregateHourly() + case <-s.stopCh: + return + } + } +} + +func (s *OpsAggregationService) dailyLoop() { + // First run immediately. + s.aggregateDaily() + + ticker := time.NewTicker(opsAggDailyInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.aggregateDaily() + case <-s.stopCh: + return + } + } +} + +func (s *OpsAggregationService) aggregateHourly() { + if s == nil || s.opsRepo == nil { + return + } + if s.cfg != nil { + if !s.cfg.Ops.Enabled { + return + } + if !s.cfg.Ops.Aggregation.Enabled { + return + } + } + + ctx, cancel := context.WithTimeout(context.Background(), opsAggHourlyTimeout) + defer cancel() + + if !s.isMonitoringEnabled(ctx) { + return + } + + release, ok := s.tryAcquireLeaderLock(ctx, opsAggHourlyLeaderLockKey, opsAggHourlyLeaderLockTTL, "[OpsAggregation][hourly]") + if !ok { + return + } + if release != nil { + defer release() + } + + s.hourlyMu.Lock() + defer s.hourlyMu.Unlock() + + startedAt := time.Now().UTC() + runAt := startedAt + + // Aggregate stable full hours only. + end := utcFloorToHour(time.Now().UTC().Add(-opsAggSafeDelay)) + start := end.Add(-opsAggBackfillWindow) + + // Resume from the latest bucket with overlap. + { + ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout) + latest, ok, err := s.opsRepo.GetLatestHourlyBucketStart(ctxMax) + cancelMax() + if err != nil { + log.Printf("[OpsAggregation][hourly] failed to read latest bucket: %v", err) + } else if ok { + candidate := latest.Add(-opsAggHourlyOverlap) + if candidate.After(start) { + start = candidate + } + } + } + + start = utcFloorToHour(start) + if !start.Before(end) { + return + } + + var aggErr error + for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggHourlyChunk) { + chunkEnd := minTime(cursor.Add(opsAggHourlyChunk), end) + if err := s.opsRepo.UpsertHourlyMetrics(ctx, cursor, chunkEnd); err != nil { + aggErr = err + log.Printf("[OpsAggregation][hourly] upsert failed (%s..%s): %v", cursor.Format(time.RFC3339), chunkEnd.Format(time.RFC3339), err) + break + } + } + + finishedAt := time.Now().UTC() + durationMs := finishedAt.Sub(startedAt).Milliseconds() + dur := durationMs + + if aggErr != nil { + msg := truncateString(aggErr.Error(), 2048) + errAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer hbCancel() + _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAggHourlyJobName, + LastRunAt: &runAt, + LastErrorAt: &errAt, + LastError: &msg, + LastDurationMs: &dur, + }) + return + } + + successAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer hbCancel() + _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAggHourlyJobName, + LastRunAt: &runAt, + LastSuccessAt: &successAt, + LastDurationMs: &dur, + }) +} + +func (s *OpsAggregationService) aggregateDaily() { + if s == nil || s.opsRepo == nil { + return + } + if s.cfg != nil { + if !s.cfg.Ops.Enabled { + return + } + if !s.cfg.Ops.Aggregation.Enabled { + return + } + } + + ctx, cancel := context.WithTimeout(context.Background(), opsAggDailyTimeout) + defer cancel() + + if !s.isMonitoringEnabled(ctx) { + return + } + + release, ok := s.tryAcquireLeaderLock(ctx, opsAggDailyLeaderLockKey, opsAggDailyLeaderLockTTL, "[OpsAggregation][daily]") + if !ok { + return + } + if release != nil { + defer release() + } + + s.dailyMu.Lock() + defer s.dailyMu.Unlock() + + startedAt := time.Now().UTC() + runAt := startedAt + + end := utcFloorToDay(time.Now().UTC()) + start := end.Add(-opsAggBackfillWindow) + + { + ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout) + latest, ok, err := s.opsRepo.GetLatestDailyBucketDate(ctxMax) + cancelMax() + if err != nil { + log.Printf("[OpsAggregation][daily] failed to read latest bucket: %v", err) + } else if ok { + candidate := latest.Add(-opsAggDailyOverlap) + if candidate.After(start) { + start = candidate + } + } + } + + start = utcFloorToDay(start) + if !start.Before(end) { + return + } + + var aggErr error + for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggDailyChunk) { + chunkEnd := minTime(cursor.Add(opsAggDailyChunk), end) + if err := s.opsRepo.UpsertDailyMetrics(ctx, cursor, chunkEnd); err != nil { + aggErr = err + log.Printf("[OpsAggregation][daily] upsert failed (%s..%s): %v", cursor.Format("2006-01-02"), chunkEnd.Format("2006-01-02"), err) + break + } + } + + finishedAt := time.Now().UTC() + durationMs := finishedAt.Sub(startedAt).Milliseconds() + dur := durationMs + + if aggErr != nil { + msg := truncateString(aggErr.Error(), 2048) + errAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer hbCancel() + _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAggDailyJobName, + LastRunAt: &runAt, + LastErrorAt: &errAt, + LastError: &msg, + LastDurationMs: &dur, + }) + return + } + + successAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer hbCancel() + _ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAggDailyJobName, + LastRunAt: &runAt, + LastSuccessAt: &successAt, + LastDurationMs: &dur, + }) +} + +func (s *OpsAggregationService) isMonitoringEnabled(ctx context.Context) bool { + if s == nil { + return false + } + if s.cfg != nil && !s.cfg.Ops.Enabled { + return false + } + if s.settingRepo == nil { + return true + } + if ctx == nil { + ctx = context.Background() + } + + value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled) + if err != nil { + if errors.Is(err, ErrSettingNotFound) { + return true + } + return true + } + switch strings.ToLower(strings.TrimSpace(value)) { + case "false", "0", "off", "disabled": + return false + default: + return true + } +} + +var opsAggReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +func (s *OpsAggregationService) tryAcquireLeaderLock(ctx context.Context, key string, ttl time.Duration, logPrefix string) (func(), bool) { + if s == nil || s.redisClient == nil { + return nil, true + } + if ctx == nil { + ctx = context.Background() + } + + ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() + if err != nil { + // Fail-open: do not block single-instance deployments. + return nil, true + } + if !ok { + s.maybeLogSkip(logPrefix) + return nil, false + } + + release := func() { + ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result() + } + return release, true +} + +func (s *OpsAggregationService) maybeLogSkip(prefix string) { + s.skipLogMu.Lock() + defer s.skipLogMu.Unlock() + + now := time.Now() + if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < time.Minute { + return + } + s.skipLogAt = now + if prefix == "" { + prefix = "[OpsAggregation]" + } + log.Printf("%s leader lock held by another instance; skipping", prefix) +} + +func utcFloorToHour(t time.Time) time.Time { + return t.UTC().Truncate(time.Hour) +} + +func utcFloorToDay(t time.Time) time.Time { + u := t.UTC() + y, m, d := u.Date() + return time.Date(y, m, d, 0, 0, 0, 0, time.UTC) +} + +func minTime(a, b time.Time) time.Time { + if a.Before(b) { + return a + } + return b +} diff --git a/backend/internal/service/ops_alert_evaluator_service.go b/backend/internal/service/ops_alert_evaluator_service.go new file mode 100644 index 00000000..b970c720 --- /dev/null +++ b/backend/internal/service/ops_alert_evaluator_service.go @@ -0,0 +1,839 @@ +package service + +import ( + "context" + "fmt" + "log" + "math" + "strconv" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" +) + +const ( + opsAlertEvaluatorJobName = "ops_alert_evaluator" + + opsAlertEvaluatorTimeout = 45 * time.Second + opsAlertEvaluatorLeaderLockKey = "ops:alert:evaluator:leader" + opsAlertEvaluatorLeaderLockTTL = 90 * time.Second + opsAlertEvaluatorSkipLogInterval = 1 * time.Minute +) + +var opsAlertEvaluatorReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +type OpsAlertEvaluatorService struct { + opsService *OpsService + opsRepo OpsRepository + emailService *EmailService + + redisClient *redis.Client + cfg *config.Config + instanceID string + + stopCh chan struct{} + startOnce sync.Once + stopOnce sync.Once + wg sync.WaitGroup + + mu sync.Mutex + ruleStates map[int64]*opsAlertRuleState + + emailLimiter *slidingWindowLimiter + + skipLogMu sync.Mutex + skipLogAt time.Time + + warnNoRedisOnce sync.Once +} + +type opsAlertRuleState struct { + LastEvaluatedAt time.Time + ConsecutiveBreaches int +} + +func NewOpsAlertEvaluatorService( + opsService *OpsService, + opsRepo OpsRepository, + emailService *EmailService, + redisClient *redis.Client, + cfg *config.Config, +) *OpsAlertEvaluatorService { + return &OpsAlertEvaluatorService{ + opsService: opsService, + opsRepo: opsRepo, + emailService: emailService, + redisClient: redisClient, + cfg: cfg, + instanceID: uuid.NewString(), + ruleStates: map[int64]*opsAlertRuleState{}, + emailLimiter: newSlidingWindowLimiter(0, time.Hour), + } +} + +func (s *OpsAlertEvaluatorService) Start() { + if s == nil { + return + } + s.startOnce.Do(func() { + if s.stopCh == nil { + s.stopCh = make(chan struct{}) + } + go s.run() + }) +} + +func (s *OpsAlertEvaluatorService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + if s.stopCh != nil { + close(s.stopCh) + } + }) + s.wg.Wait() +} + +func (s *OpsAlertEvaluatorService) run() { + s.wg.Add(1) + defer s.wg.Done() + + // Start immediately to produce early feedback in ops dashboard. + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case <-timer.C: + interval := s.getInterval() + s.evaluateOnce(interval) + timer.Reset(interval) + case <-s.stopCh: + return + } + } +} + +func (s *OpsAlertEvaluatorService) getInterval() time.Duration { + // Default. + interval := 60 * time.Second + + if s == nil || s.opsService == nil { + return interval + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + cfg, err := s.opsService.GetOpsAlertRuntimeSettings(ctx) + if err != nil || cfg == nil { + return interval + } + if cfg.EvaluationIntervalSeconds <= 0 { + return interval + } + if cfg.EvaluationIntervalSeconds < 1 { + return interval + } + if cfg.EvaluationIntervalSeconds > int((24 * time.Hour).Seconds()) { + return interval + } + return time.Duration(cfg.EvaluationIntervalSeconds) * time.Second +} + +func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { + if s == nil || s.opsRepo == nil { + return + } + if s.cfg != nil && !s.cfg.Ops.Enabled { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), opsAlertEvaluatorTimeout) + defer cancel() + + if s.opsService != nil && !s.opsService.IsMonitoringEnabled(ctx) { + return + } + + runtimeCfg := defaultOpsAlertRuntimeSettings() + if s.opsService != nil { + if loaded, err := s.opsService.GetOpsAlertRuntimeSettings(ctx); err == nil && loaded != nil { + runtimeCfg = loaded + } + } + + release, ok := s.tryAcquireLeaderLock(ctx, runtimeCfg.DistributedLock) + if !ok { + return + } + if release != nil { + defer release() + } + + startedAt := time.Now().UTC() + runAt := startedAt + + rules, err := s.opsRepo.ListAlertRules(ctx) + if err != nil { + s.recordHeartbeatError(runAt, time.Since(startedAt), err) + log.Printf("[OpsAlertEvaluator] list rules failed: %v", err) + return + } + + now := time.Now().UTC() + safeEnd := now.Truncate(time.Minute) + if safeEnd.IsZero() { + safeEnd = now + } + + systemMetrics, _ := s.opsRepo.GetLatestSystemMetrics(ctx, 1) + + // Cleanup stale state for removed rules. + s.pruneRuleStates(rules) + + for _, rule := range rules { + if rule == nil || !rule.Enabled || rule.ID <= 0 { + continue + } + + scopePlatform, scopeGroupID := parseOpsAlertRuleScope(rule.Filters) + + windowMinutes := rule.WindowMinutes + if windowMinutes <= 0 { + windowMinutes = 1 + } + windowStart := safeEnd.Add(-time.Duration(windowMinutes) * time.Minute) + windowEnd := safeEnd + + metricValue, ok := s.computeRuleMetric(ctx, rule, systemMetrics, windowStart, windowEnd, scopePlatform, scopeGroupID) + if !ok { + s.resetRuleState(rule.ID, now) + continue + } + + breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold) + required := requiredSustainedBreaches(rule.SustainedMinutes, interval) + consecutive := s.updateRuleBreaches(rule.ID, now, interval, breachedNow) + + activeEvent, err := s.opsRepo.GetActiveAlertEvent(ctx, rule.ID) + if err != nil { + log.Printf("[OpsAlertEvaluator] get active event failed (rule=%d): %v", rule.ID, err) + continue + } + + if breachedNow && consecutive >= required { + if activeEvent != nil { + continue + } + + latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID) + if err != nil { + log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err) + continue + } + if latestEvent != nil && rule.CooldownMinutes > 0 { + cooldown := time.Duration(rule.CooldownMinutes) * time.Minute + if now.Sub(latestEvent.FiredAt) < cooldown { + continue + } + } + + firedEvent := &OpsAlertEvent{ + RuleID: rule.ID, + Severity: strings.TrimSpace(rule.Severity), + Status: OpsAlertStatusFiring, + Title: fmt.Sprintf("%s: %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)), + Description: buildOpsAlertDescription(rule, metricValue, windowMinutes, scopePlatform, scopeGroupID), + MetricValue: float64Ptr(metricValue), + ThresholdValue: float64Ptr(rule.Threshold), + Dimensions: buildOpsAlertDimensions(scopePlatform, scopeGroupID), + FiredAt: now, + CreatedAt: now, + } + + created, err := s.opsRepo.CreateAlertEvent(ctx, firedEvent) + if err != nil { + log.Printf("[OpsAlertEvaluator] create event failed (rule=%d): %v", rule.ID, err) + continue + } + + if created != nil && created.ID > 0 { + s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created) + } + continue + } + + // Not breached: resolve active event if present. + if activeEvent != nil { + resolvedAt := now + if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil { + log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err) + } + } + } + + s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) +} + +func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) { + s.mu.Lock() + defer s.mu.Unlock() + + live := map[int64]struct{}{} + for _, r := range rules { + if r != nil && r.ID > 0 { + live[r.ID] = struct{}{} + } + } + for id := range s.ruleStates { + if _, ok := live[id]; !ok { + delete(s.ruleStates, id) + } + } +} + +func (s *OpsAlertEvaluatorService) resetRuleState(ruleID int64, now time.Time) { + if ruleID <= 0 { + return + } + s.mu.Lock() + defer s.mu.Unlock() + state, ok := s.ruleStates[ruleID] + if !ok { + state = &opsAlertRuleState{} + s.ruleStates[ruleID] = state + } + state.LastEvaluatedAt = now + state.ConsecutiveBreaches = 0 +} + +func (s *OpsAlertEvaluatorService) updateRuleBreaches(ruleID int64, now time.Time, interval time.Duration, breached bool) int { + if ruleID <= 0 { + return 0 + } + s.mu.Lock() + defer s.mu.Unlock() + + state, ok := s.ruleStates[ruleID] + if !ok { + state = &opsAlertRuleState{} + s.ruleStates[ruleID] = state + } + + if !state.LastEvaluatedAt.IsZero() && interval > 0 { + if now.Sub(state.LastEvaluatedAt) > interval*2 { + state.ConsecutiveBreaches = 0 + } + } + + state.LastEvaluatedAt = now + if breached { + state.ConsecutiveBreaches++ + } else { + state.ConsecutiveBreaches = 0 + } + return state.ConsecutiveBreaches +} + +func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int { + if sustainedMinutes <= 0 { + return 1 + } + if interval <= 0 { + return sustainedMinutes + } + required := int(math.Ceil(float64(sustainedMinutes*60) / interval.Seconds())) + if required < 1 { + return 1 + } + return required +} + +func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64) { + if filters == nil { + return "", nil + } + if v, ok := filters["platform"]; ok { + if s, ok := v.(string); ok { + platform = strings.TrimSpace(s) + } + } + if v, ok := filters["group_id"]; ok { + switch t := v.(type) { + case float64: + if t > 0 { + id := int64(t) + groupID = &id + } + case int64: + if t > 0 { + id := t + groupID = &id + } + case int: + if t > 0 { + id := int64(t) + groupID = &id + } + case string: + n, err := strconv.ParseInt(strings.TrimSpace(t), 10, 64) + if err == nil && n > 0 { + groupID = &n + } + } + } + return platform, groupID +} + +func (s *OpsAlertEvaluatorService) computeRuleMetric( + ctx context.Context, + rule *OpsAlertRule, + systemMetrics *OpsSystemMetricsSnapshot, + start time.Time, + end time.Time, + platform string, + groupID *int64, +) (float64, bool) { + if rule == nil { + return 0, false + } + switch strings.TrimSpace(rule.MetricType) { + case "cpu_usage_percent": + if systemMetrics != nil && systemMetrics.CPUUsagePercent != nil { + return *systemMetrics.CPUUsagePercent, true + } + return 0, false + case "memory_usage_percent": + if systemMetrics != nil && systemMetrics.MemoryUsagePercent != nil { + return *systemMetrics.MemoryUsagePercent, true + } + return 0, false + case "concurrency_queue_depth": + if systemMetrics != nil && systemMetrics.ConcurrencyQueueDepth != nil { + return float64(*systemMetrics.ConcurrencyQueueDepth), true + } + return 0, false + } + + overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{ + StartTime: start, + EndTime: end, + Platform: platform, + GroupID: groupID, + QueryMode: OpsQueryModeRaw, + }) + if err != nil { + return 0, false + } + if overview == nil { + return 0, false + } + + switch strings.TrimSpace(rule.MetricType) { + case "success_rate": + if overview.RequestCountSLA <= 0 { + return 0, false + } + return overview.SLA * 100, true + case "error_rate": + if overview.RequestCountSLA <= 0 { + return 0, false + } + return overview.ErrorRate * 100, true + case "upstream_error_rate": + if overview.RequestCountSLA <= 0 { + return 0, false + } + return overview.UpstreamErrorRate * 100, true + case "p95_latency_ms": + if overview.Duration.P95 == nil { + return 0, false + } + return float64(*overview.Duration.P95), true + case "p99_latency_ms": + if overview.Duration.P99 == nil { + return 0, false + } + return float64(*overview.Duration.P99), true + default: + return 0, false + } +} + +func compareMetric(value float64, operator string, threshold float64) bool { + switch strings.TrimSpace(operator) { + case ">": + return value > threshold + case ">=": + return value >= threshold + case "<": + return value < threshold + case "<=": + return value <= threshold + case "==": + return value == threshold + case "!=": + return value != threshold + default: + return false + } +} + +func buildOpsAlertDimensions(platform string, groupID *int64) map[string]any { + dims := map[string]any{} + if strings.TrimSpace(platform) != "" { + dims["platform"] = strings.TrimSpace(platform) + } + if groupID != nil && *groupID > 0 { + dims["group_id"] = *groupID + } + if len(dims) == 0 { + return nil + } + return dims +} + +func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes int, platform string, groupID *int64) string { + if rule == nil { + return "" + } + scope := "overall" + if strings.TrimSpace(platform) != "" { + scope = fmt.Sprintf("platform=%s", strings.TrimSpace(platform)) + } + if groupID != nil && *groupID > 0 { + scope = fmt.Sprintf("%s group_id=%d", scope, *groupID) + } + if windowMinutes <= 0 { + windowMinutes = 1 + } + return fmt.Sprintf("%s %s %.2f (current %.2f) over last %dm (%s)", + strings.TrimSpace(rule.MetricType), + strings.TrimSpace(rule.Operator), + rule.Threshold, + value, + windowMinutes, + strings.TrimSpace(scope), + ) +} + +func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) { + if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil { + return + } + if event.EmailSent { + return + } + if !rule.NotifyEmail { + return + } + + emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx) + if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled { + return + } + + if len(emailCfg.Alert.Recipients) == 0 { + return + } + if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) { + return + } + + if runtimeCfg != nil && runtimeCfg.Silencing.Enabled { + if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) { + return + } + } + + // Apply/update rate limiter. + s.emailLimiter.SetLimit(emailCfg.Alert.RateLimitPerHour) + + subject := fmt.Sprintf("[Ops Alert][%s] %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)) + body := buildOpsAlertEmailBody(rule, event) + + anySent := false + for _, to := range emailCfg.Alert.Recipients { + addr := strings.TrimSpace(to) + if addr == "" { + continue + } + if !s.emailLimiter.Allow(time.Now().UTC()) { + continue + } + if err := s.emailService.SendEmail(ctx, addr, subject, body); err != nil { + // Ignore per-recipient failures; continue best-effort. + continue + } + anySent = true + } + + if anySent { + _ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true) + } +} + +func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string { + if rule == nil || event == nil { + return "" + } + metric := strings.TrimSpace(rule.MetricType) + value := "-" + threshold := fmt.Sprintf("%.2f", rule.Threshold) + if event.MetricValue != nil { + value = fmt.Sprintf("%.2f", *event.MetricValue) + } + if event.ThresholdValue != nil { + threshold = fmt.Sprintf("%.2f", *event.ThresholdValue) + } + return fmt.Sprintf(` +
Rule: %s
+Severity: %s
+Status: %s
+Metric: %s %s %s
+Fired at: %s
+Description: %s
+`, + htmlEscape(rule.Name), + htmlEscape(rule.Severity), + htmlEscape(event.Status), + htmlEscape(metric), + htmlEscape(rule.Operator), + htmlEscape(fmt.Sprintf("%s (threshold %s)", value, threshold)), + event.FiredAt.Format(time.RFC3339), + htmlEscape(event.Description), + ) +} + +func shouldSendOpsAlertEmailByMinSeverity(minSeverity string, ruleSeverity string) bool { + minSeverity = strings.ToLower(strings.TrimSpace(minSeverity)) + if minSeverity == "" { + return true + } + + eventLevel := opsEmailSeverityForOps(ruleSeverity) + minLevel := strings.ToLower(minSeverity) + + rank := func(level string) int { + switch level { + case "critical": + return 3 + case "warning": + return 2 + case "info": + return 1 + default: + return 0 + } + } + return rank(eventLevel) >= rank(minLevel) +} + +func opsEmailSeverityForOps(severity string) string { + switch strings.ToUpper(strings.TrimSpace(severity)) { + case "P0": + return "critical" + case "P1": + return "warning" + default: + return "info" + } +} + +func isOpsAlertSilenced(now time.Time, rule *OpsAlertRule, event *OpsAlertEvent, silencing OpsAlertSilencingSettings) bool { + if !silencing.Enabled { + return false + } + if now.IsZero() { + now = time.Now().UTC() + } + if strings.TrimSpace(silencing.GlobalUntilRFC3339) != "" { + if t, err := time.Parse(time.RFC3339, strings.TrimSpace(silencing.GlobalUntilRFC3339)); err == nil { + if now.Before(t) { + return true + } + } + } + + for _, entry := range silencing.Entries { + untilRaw := strings.TrimSpace(entry.UntilRFC3339) + if untilRaw == "" { + continue + } + until, err := time.Parse(time.RFC3339, untilRaw) + if err != nil { + continue + } + if now.After(until) { + continue + } + if entry.RuleID != nil && rule != nil && rule.ID > 0 && *entry.RuleID != rule.ID { + continue + } + if len(entry.Severities) > 0 { + match := false + for _, s := range entry.Severities { + if strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(event.Severity)) || strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(rule.Severity)) { + match = true + break + } + } + if !match { + continue + } + } + return true + } + + return false +} + +func (s *OpsAlertEvaluatorService) tryAcquireLeaderLock(ctx context.Context, lock OpsDistributedLockSettings) (func(), bool) { + if !lock.Enabled { + return nil, true + } + if s.redisClient == nil { + s.warnNoRedisOnce.Do(func() { + log.Printf("[OpsAlertEvaluator] redis not configured; running without distributed lock") + }) + return nil, true + } + key := strings.TrimSpace(lock.Key) + if key == "" { + key = opsAlertEvaluatorLeaderLockKey + } + ttl := time.Duration(lock.TTLSeconds) * time.Second + if ttl <= 0 { + ttl = opsAlertEvaluatorLeaderLockTTL + } + + ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() + if err != nil { + // Fail-open for single-node environments, but warn. + s.warnNoRedisOnce.Do(func() { + log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; running without lock: %v", err) + }) + return nil, true + } + if !ok { + s.maybeLogSkip(key) + return nil, false + } + return func() { + _, _ = opsAlertEvaluatorReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() + }, true +} + +func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) { + s.skipLogMu.Lock() + defer s.skipLogMu.Unlock() + + now := time.Now() + if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < opsAlertEvaluatorSkipLogInterval { + return + } + s.skipLogAt = now + log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key) +} + +func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { + if s == nil || s.opsRepo == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAlertEvaluatorJobName, + LastRunAt: &runAt, + LastSuccessAt: &now, + LastDurationMs: &durMs, + }) +} + +func (s *OpsAlertEvaluatorService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { + if s == nil || s.opsRepo == nil || err == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + msg := truncateString(err.Error(), 2048) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsAlertEvaluatorJobName, + LastRunAt: &runAt, + LastErrorAt: &now, + LastError: &msg, + LastDurationMs: &durMs, + }) +} + +func htmlEscape(s string) string { + replacer := strings.NewReplacer( + "&", "&", + "<", "<", + ">", ">", + `"`, """, + "'", "'", + ) + return replacer.Replace(s) +} + +type slidingWindowLimiter struct { + mu sync.Mutex + limit int + window time.Duration + sent []time.Time +} + +func newSlidingWindowLimiter(limit int, window time.Duration) *slidingWindowLimiter { + if window <= 0 { + window = time.Hour + } + return &slidingWindowLimiter{ + limit: limit, + window: window, + sent: []time.Time{}, + } +} + +func (l *slidingWindowLimiter) SetLimit(limit int) { + l.mu.Lock() + defer l.mu.Unlock() + l.limit = limit +} + +func (l *slidingWindowLimiter) Allow(now time.Time) bool { + l.mu.Lock() + defer l.mu.Unlock() + + if l.limit <= 0 { + return true + } + cutoff := now.Add(-l.window) + keep := l.sent[:0] + for _, t := range l.sent { + if t.After(cutoff) { + keep = append(keep, t) + } + } + l.sent = keep + if len(l.sent) >= l.limit { + return false + } + l.sent = append(l.sent, now) + return true +} diff --git a/backend/internal/service/ops_alerts.go b/backend/internal/service/ops_alerts.go new file mode 100644 index 00000000..b6c3d1c3 --- /dev/null +++ b/backend/internal/service/ops_alerts.go @@ -0,0 +1,162 @@ +package service + +import ( + "context" + "database/sql" + "errors" + "strings" + "time" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" +) + +func (s *OpsService) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return []*OpsAlertRule{}, nil + } + return s.opsRepo.ListAlertRules(ctx) +} + +func (s *OpsService) CreateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if rule == nil { + return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule") + } + + created, err := s.opsRepo.CreateAlertRule(ctx, rule) + if err != nil { + return nil, err + } + return created, nil +} + +func (s *OpsService) UpdateAlertRule(ctx context.Context, rule *OpsAlertRule) (*OpsAlertRule, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if rule == nil || rule.ID <= 0 { + return nil, infraerrors.BadRequest("INVALID_RULE", "invalid rule") + } + + updated, err := s.opsRepo.UpdateAlertRule(ctx, rule) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found") + } + return nil, err + } + return updated, nil +} + +func (s *OpsService) DeleteAlertRule(ctx context.Context, id int64) error { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return err + } + if s.opsRepo == nil { + return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if id <= 0 { + return infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id") + } + if err := s.opsRepo.DeleteAlertRule(ctx, id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return infraerrors.NotFound("OPS_ALERT_RULE_NOT_FOUND", "alert rule not found") + } + return err + } + return nil +} + +func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return []*OpsAlertEvent{}, nil + } + return s.opsRepo.ListAlertEvents(ctx, filter) +} + +func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if ruleID <= 0 { + return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id") + } + return s.opsRepo.GetActiveAlertEvent(ctx, ruleID) +} + +func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if ruleID <= 0 { + return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id") + } + return s.opsRepo.GetLatestAlertEvent(ctx, ruleID) +} + +func (s *OpsService) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if event == nil { + return nil, infraerrors.BadRequest("INVALID_EVENT", "invalid event") + } + + created, err := s.opsRepo.CreateAlertEvent(ctx, event) + if err != nil { + return nil, err + } + return created, nil +} + +func (s *OpsService) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return err + } + if s.opsRepo == nil { + return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if eventID <= 0 { + return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id") + } + if strings.TrimSpace(status) == "" { + return infraerrors.BadRequest("INVALID_STATUS", "invalid status") + } + return s.opsRepo.UpdateAlertEventStatus(ctx, eventID, status, resolvedAt) +} + +func (s *OpsService) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return err + } + if s.opsRepo == nil { + return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if eventID <= 0 { + return infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id") + } + return s.opsRepo.UpdateAlertEventEmailSent(ctx, eventID, emailSent) +} diff --git a/backend/internal/service/ops_cleanup_service.go b/backend/internal/service/ops_cleanup_service.go new file mode 100644 index 00000000..ef825c04 --- /dev/null +++ b/backend/internal/service/ops_cleanup_service.go @@ -0,0 +1,361 @@ +package service + +import ( + "context" + "database/sql" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +const ( + opsCleanupJobName = "ops_cleanup" + + opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader" + opsCleanupLeaderLockTTLDefault = 30 * time.Minute +) + +var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + +var opsCleanupReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth. +// +// - Scheduling: 5-field cron spec (minute hour dom month dow). +// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup. +// - Safety: deletes in batches to avoid long transactions. +type OpsCleanupService struct { + opsRepo OpsRepository + db *sql.DB + redisClient *redis.Client + cfg *config.Config + + instanceID string + + cron *cron.Cron + entryID cron.EntryID + + startOnce sync.Once + stopOnce sync.Once + + warnNoRedisOnce sync.Once +} + +func NewOpsCleanupService( + opsRepo OpsRepository, + db *sql.DB, + redisClient *redis.Client, + cfg *config.Config, +) *OpsCleanupService { + return &OpsCleanupService{ + opsRepo: opsRepo, + db: db, + redisClient: redisClient, + cfg: cfg, + instanceID: uuid.NewString(), + } +} + +func (s *OpsCleanupService) Start() { + if s == nil { + return + } + if s.cfg != nil && !s.cfg.Ops.Enabled { + return + } + if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled { + log.Printf("[OpsCleanup] not started (disabled)") + return + } + if s.opsRepo == nil || s.db == nil { + log.Printf("[OpsCleanup] not started (missing deps)") + return + } + + s.startOnce.Do(func() { + schedule := "0 2 * * *" + if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" { + schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) + } + + loc := time.Local + if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" { + if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil { + loc = parsed + } + } + + c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc)) + id, err := c.AddFunc(schedule, func() { s.runScheduled() }) + if err != nil { + log.Printf("[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err) + return + } + s.cron = c + s.entryID = id + s.cron.Start() + log.Printf("[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String()) + }) +} + +func (s *OpsCleanupService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + if s.cron != nil { + ctx := s.cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(3 * time.Second): + log.Printf("[OpsCleanup] cron stop timed out") + } + } + }) +} + +func (s *OpsCleanupService) runScheduled() { + if s == nil || s.db == nil || s.opsRepo == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + release, ok := s.tryAcquireLeaderLock(ctx) + if !ok { + return + } + if release != nil { + defer release() + } + + startedAt := time.Now().UTC() + runAt := startedAt + + counts, err := s.runCleanupOnce(ctx) + if err != nil { + s.recordHeartbeatError(runAt, time.Since(startedAt), err) + log.Printf("[OpsCleanup] cleanup failed: %v", err) + return + } + s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) + log.Printf("[OpsCleanup] cleanup complete: %s", counts) +} + +type opsCleanupDeletedCounts struct { + errorLogs int64 + retryAttempts int64 + alertEvents int64 + systemMetrics int64 + hourlyPreagg int64 + dailyPreagg int64 +} + +func (c opsCleanupDeletedCounts) String() string { + return fmt.Sprintf( + "error_logs=%d retry_attempts=%d alert_events=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", + c.errorLogs, + c.retryAttempts, + c.alertEvents, + c.systemMetrics, + c.hourlyPreagg, + c.dailyPreagg, + ) +} + +func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) { + out := opsCleanupDeletedCounts{} + if s == nil || s.db == nil || s.cfg == nil { + return out, nil + } + + batchSize := 5000 + + now := time.Now().UTC() + + // Error-like tables: error logs / retry attempts / alert events. + if days := s.cfg.Ops.Cleanup.ErrorLogRetentionDays; days > 0 { + cutoff := now.AddDate(0, 0, -days) + n, err := deleteOldRowsByID(ctx, s.db, "ops_error_logs", "created_at", cutoff, batchSize, false) + if err != nil { + return out, err + } + out.errorLogs = n + + n, err = deleteOldRowsByID(ctx, s.db, "ops_retry_attempts", "created_at", cutoff, batchSize, false) + if err != nil { + return out, err + } + out.retryAttempts = n + + n, err = deleteOldRowsByID(ctx, s.db, "ops_alert_events", "created_at", cutoff, batchSize, false) + if err != nil { + return out, err + } + out.alertEvents = n + } + + // Minute-level metrics snapshots. + if days := s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays; days > 0 { + cutoff := now.AddDate(0, 0, -days) + n, err := deleteOldRowsByID(ctx, s.db, "ops_system_metrics", "created_at", cutoff, batchSize, false) + if err != nil { + return out, err + } + out.systemMetrics = n + } + + // Pre-aggregation tables (hourly/daily). + if days := s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays; days > 0 { + cutoff := now.AddDate(0, 0, -days) + n, err := deleteOldRowsByID(ctx, s.db, "ops_metrics_hourly", "bucket_start", cutoff, batchSize, false) + if err != nil { + return out, err + } + out.hourlyPreagg = n + + n, err = deleteOldRowsByID(ctx, s.db, "ops_metrics_daily", "bucket_date", cutoff, batchSize, true) + if err != nil { + return out, err + } + out.dailyPreagg = n + } + + return out, nil +} + +func deleteOldRowsByID( + ctx context.Context, + db *sql.DB, + table string, + timeColumn string, + cutoff time.Time, + batchSize int, + castCutoffToDate bool, +) (int64, error) { + if db == nil { + return 0, nil + } + if batchSize <= 0 { + batchSize = 5000 + } + + where := fmt.Sprintf("%s < $1", timeColumn) + if castCutoffToDate { + where = fmt.Sprintf("%s < $1::date", timeColumn) + } + + q := fmt.Sprintf(` +WITH batch AS ( + SELECT id FROM %s + WHERE %s + ORDER BY id + LIMIT $2 +) +DELETE FROM %s +WHERE id IN (SELECT id FROM batch) +`, table, where, table) + + var total int64 + for { + res, err := db.ExecContext(ctx, q, cutoff, batchSize) + if err != nil { + // If ops tables aren't present yet (partial deployments), treat as no-op. + if strings.Contains(strings.ToLower(err.Error()), "does not exist") && strings.Contains(strings.ToLower(err.Error()), "relation") { + return total, nil + } + return total, err + } + affected, err := res.RowsAffected() + if err != nil { + return total, err + } + total += affected + if affected == 0 { + break + } + } + return total, nil +} + +func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { + if s == nil { + return nil, false + } + // In simple run mode, assume single instance. + if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple { + return nil, true + } + + if s.redisClient == nil { + s.warnNoRedisOnce.Do(func() { + log.Printf("[OpsCleanup] redis not configured; running without distributed lock") + }) + return nil, true + } + + key := opsCleanupLeaderLockKeyDefault + ttl := opsCleanupLeaderLockTTLDefault + + ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() + if err != nil { + s.warnNoRedisOnce.Do(func() { + log.Printf("[OpsCleanup] leader lock SetNX failed; running without lock: %v", err) + }) + return nil, true + } + if !ok { + return nil, false + } + + return func() { + _, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() + }, true +} + +func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { + if s == nil || s.opsRepo == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsCleanupJobName, + LastRunAt: &runAt, + LastSuccessAt: &now, + LastDurationMs: &durMs, + }) +} + +func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { + if s == nil || s.opsRepo == nil || err == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + msg := truncateString(err.Error(), 2048) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsCleanupJobName, + LastRunAt: &runAt, + LastErrorAt: &now, + LastError: &msg, + LastDurationMs: &durMs, + }) +} diff --git a/backend/internal/service/ops_concurrency.go b/backend/internal/service/ops_concurrency.go new file mode 100644 index 00000000..c3b7b853 --- /dev/null +++ b/backend/internal/service/ops_concurrency.go @@ -0,0 +1,257 @@ +package service + +import ( + "context" + "log" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/pagination" +) + +const ( + opsAccountsPageSize = 100 + opsConcurrencyBatchChunkSize = 200 +) + +func (s *OpsService) listAllAccountsForOps(ctx context.Context, platformFilter string) ([]Account, error) { + if s == nil || s.accountRepo == nil { + return []Account{}, nil + } + + out := make([]Account, 0, 128) + page := 1 + for { + accounts, pageInfo, err := s.accountRepo.ListWithFilters(ctx, pagination.PaginationParams{ + Page: page, + PageSize: opsAccountsPageSize, + }, platformFilter, "", "", "") + if err != nil { + return nil, err + } + if len(accounts) == 0 { + break + } + + out = append(out, accounts...) + if pageInfo != nil && int64(len(out)) >= pageInfo.Total { + break + } + if len(accounts) < opsAccountsPageSize { + break + } + + page++ + if page > 10_000 { + log.Printf("[Ops] listAllAccountsForOps: aborting after too many pages (platform=%q)", platformFilter) + break + } + } + + return out, nil +} + +func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts []Account) map[int64]*AccountLoadInfo { + if s == nil || s.concurrencyService == nil { + return map[int64]*AccountLoadInfo{} + } + if len(accounts) == 0 { + return map[int64]*AccountLoadInfo{} + } + + // De-duplicate IDs (and keep the max concurrency to avoid under-reporting). + unique := make(map[int64]int, len(accounts)) + for _, acc := range accounts { + if acc.ID <= 0 { + continue + } + if prev, ok := unique[acc.ID]; !ok || acc.Concurrency > prev { + unique[acc.ID] = acc.Concurrency + } + } + + batch := make([]AccountWithConcurrency, 0, len(unique)) + for id, maxConc := range unique { + batch = append(batch, AccountWithConcurrency{ + ID: id, + MaxConcurrency: maxConc, + }) + } + + out := make(map[int64]*AccountLoadInfo, len(batch)) + for i := 0; i < len(batch); i += opsConcurrencyBatchChunkSize { + end := i + opsConcurrencyBatchChunkSize + if end > len(batch) { + end = len(batch) + } + part, err := s.concurrencyService.GetAccountsLoadBatch(ctx, batch[i:end]) + if err != nil { + // Best-effort: return zeros rather than failing the ops UI. + log.Printf("[Ops] GetAccountsLoadBatch failed: %v", err) + continue + } + for k, v := range part { + out[k] = v + } + } + + return out +} + +// GetConcurrencyStats returns real-time concurrency usage aggregated by platform/group/account. +// +// Optional filters: +// - platformFilter: only include accounts in that platform (best-effort reduces DB load) +// - groupIDFilter: only include accounts that belong to that group +func (s *OpsService) GetConcurrencyStats( + ctx context.Context, + platformFilter string, + groupIDFilter *int64, +) (map[string]*PlatformConcurrencyInfo, map[int64]*GroupConcurrencyInfo, map[int64]*AccountConcurrencyInfo, *time.Time, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, nil, nil, nil, err + } + + accounts, err := s.listAllAccountsForOps(ctx, platformFilter) + if err != nil { + return nil, nil, nil, nil, err + } + + collectedAt := time.Now() + loadMap := s.getAccountsLoadMapBestEffort(ctx, accounts) + + platform := make(map[string]*PlatformConcurrencyInfo) + group := make(map[int64]*GroupConcurrencyInfo) + account := make(map[int64]*AccountConcurrencyInfo) + + for _, acc := range accounts { + if acc.ID <= 0 { + continue + } + + var matchedGroup *Group + if groupIDFilter != nil && *groupIDFilter > 0 { + for _, grp := range acc.Groups { + if grp == nil || grp.ID <= 0 { + continue + } + if grp.ID == *groupIDFilter { + matchedGroup = grp + break + } + } + // Group filter provided: skip accounts not in that group. + if matchedGroup == nil { + continue + } + } + + load := loadMap[acc.ID] + currentInUse := int64(0) + waiting := int64(0) + if load != nil { + currentInUse = int64(load.CurrentConcurrency) + waiting = int64(load.WaitingCount) + } + + // Account-level view picks one display group (the first group). + displayGroupID := int64(0) + displayGroupName := "" + if matchedGroup != nil { + displayGroupID = matchedGroup.ID + displayGroupName = matchedGroup.Name + } else if len(acc.Groups) > 0 && acc.Groups[0] != nil { + displayGroupID = acc.Groups[0].ID + displayGroupName = acc.Groups[0].Name + } + + if _, ok := account[acc.ID]; !ok { + info := &AccountConcurrencyInfo{ + AccountID: acc.ID, + AccountName: acc.Name, + Platform: acc.Platform, + GroupID: displayGroupID, + GroupName: displayGroupName, + CurrentInUse: currentInUse, + MaxCapacity: int64(acc.Concurrency), + WaitingInQueue: waiting, + } + if info.MaxCapacity > 0 { + info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100 + } + account[acc.ID] = info + } + + // Platform aggregation. + if acc.Platform != "" { + if _, ok := platform[acc.Platform]; !ok { + platform[acc.Platform] = &PlatformConcurrencyInfo{ + Platform: acc.Platform, + } + } + p := platform[acc.Platform] + p.MaxCapacity += int64(acc.Concurrency) + p.CurrentInUse += currentInUse + p.WaitingInQueue += waiting + } + + // Group aggregation (one account may contribute to multiple groups). + if matchedGroup != nil { + grp := matchedGroup + if _, ok := group[grp.ID]; !ok { + group[grp.ID] = &GroupConcurrencyInfo{ + GroupID: grp.ID, + GroupName: grp.Name, + Platform: grp.Platform, + } + } + g := group[grp.ID] + if g.GroupName == "" && grp.Name != "" { + g.GroupName = grp.Name + } + if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform { + // Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels. + g.Platform = "" + } + g.MaxCapacity += int64(acc.Concurrency) + g.CurrentInUse += currentInUse + g.WaitingInQueue += waiting + } else { + for _, grp := range acc.Groups { + if grp == nil || grp.ID <= 0 { + continue + } + if _, ok := group[grp.ID]; !ok { + group[grp.ID] = &GroupConcurrencyInfo{ + GroupID: grp.ID, + GroupName: grp.Name, + Platform: grp.Platform, + } + } + g := group[grp.ID] + if g.GroupName == "" && grp.Name != "" { + g.GroupName = grp.Name + } + if g.Platform != "" && grp.Platform != "" && g.Platform != grp.Platform { + // Groups are expected to be platform-scoped. If mismatch is observed, avoid misleading labels. + g.Platform = "" + } + g.MaxCapacity += int64(acc.Concurrency) + g.CurrentInUse += currentInUse + g.WaitingInQueue += waiting + } + } + } + + for _, info := range platform { + if info.MaxCapacity > 0 { + info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100 + } + } + for _, info := range group { + if info.MaxCapacity > 0 { + info.LoadPercentage = float64(info.CurrentInUse) / float64(info.MaxCapacity) * 100 + } + } + + return platform, group, account, &collectedAt, nil +} diff --git a/backend/internal/service/ops_dashboard.go b/backend/internal/service/ops_dashboard.go new file mode 100644 index 00000000..23d6d82f --- /dev/null +++ b/backend/internal/service/ops_dashboard.go @@ -0,0 +1,77 @@ +package service + +import ( + "context" + "database/sql" + "errors" + "log" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" +) + +func (s *OpsService) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if filter == nil { + return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required") + } + if filter.StartTime.After(filter.EndTime) { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time") + } + + // Resolve query mode (requested via query param, or DB default). + filter.QueryMode = s.resolveOpsQueryMode(ctx, filter.QueryMode) + + overview, err := s.opsRepo.GetDashboardOverview(ctx, filter) + if err != nil { + if errors.Is(err, ErrOpsPreaggregatedNotPopulated) { + return nil, infraerrors.Conflict("OPS_PREAGG_NOT_READY", "Pre-aggregated ops metrics are not populated yet") + } + return nil, err + } + + // Best-effort system health + jobs; dashboard metrics should still render if these are missing. + if metrics, err := s.opsRepo.GetLatestSystemMetrics(ctx, 1); err == nil { + overview.SystemMetrics = metrics + } else if err != nil && !errors.Is(err, sql.ErrNoRows) { + log.Printf("[Ops] GetLatestSystemMetrics failed: %v", err) + } + + if heartbeats, err := s.opsRepo.ListJobHeartbeats(ctx); err == nil { + overview.JobHeartbeats = heartbeats + } else { + log.Printf("[Ops] ListJobHeartbeats failed: %v", err) + } + + return overview, nil +} + +func (s *OpsService) resolveOpsQueryMode(ctx context.Context, requested OpsQueryMode) OpsQueryMode { + if requested.IsValid() { + // Allow "auto" to be disabled via config until preagg is proven stable in production. + // Forced `preagg` via query param still works. + if requested == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables { + return OpsQueryModeRaw + } + return requested + } + + mode := OpsQueryModeAuto + if s != nil && s.settingRepo != nil { + if raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsQueryModeDefault); err == nil { + mode = ParseOpsQueryMode(raw) + } + } + + if mode == OpsQueryModeAuto && s != nil && s.cfg != nil && !s.cfg.Ops.UsePreaggregatedTables { + return OpsQueryModeRaw + } + return mode +} diff --git a/backend/internal/service/ops_errors.go b/backend/internal/service/ops_errors.go new file mode 100644 index 00000000..76b5ce8b --- /dev/null +++ b/backend/internal/service/ops_errors.go @@ -0,0 +1,45 @@ +package service + +import ( + "context" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" +) + +func (s *OpsService) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if filter == nil { + return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required") + } + if filter.StartTime.After(filter.EndTime) { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time") + } + return s.opsRepo.GetErrorTrend(ctx, filter, bucketSeconds) +} + +func (s *OpsService) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if filter == nil { + return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required") + } + if filter.StartTime.After(filter.EndTime) { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time") + } + return s.opsRepo.GetErrorDistribution(ctx, filter) +} diff --git a/backend/internal/service/ops_histograms.go b/backend/internal/service/ops_histograms.go new file mode 100644 index 00000000..9f5b514f --- /dev/null +++ b/backend/internal/service/ops_histograms.go @@ -0,0 +1,26 @@ +package service + +import ( + "context" + + infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors" +) + +func (s *OpsService) GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if filter == nil { + return nil, infraerrors.BadRequest("OPS_FILTER_REQUIRED", "filter is required") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_REQUIRED", "start_time/end_time are required") + } + if filter.StartTime.After(filter.EndTime) { + return nil, infraerrors.BadRequest("OPS_TIME_RANGE_INVALID", "start_time must be <= end_time") + } + return s.opsRepo.GetLatencyHistogram(ctx, filter) +} diff --git a/backend/internal/service/ops_metrics_collector.go b/backend/internal/service/ops_metrics_collector.go new file mode 100644 index 00000000..cd90e1bd --- /dev/null +++ b/backend/internal/service/ops_metrics_collector.go @@ -0,0 +1,861 @@ +package service + +import ( + "context" + "database/sql" + "errors" + "fmt" + "hash/fnv" + "log" + "math" + "os" + "runtime" + "strconv" + "strings" + "sync" + "time" + "unicode/utf8" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/mem" +) + +const ( + opsMetricsCollectorJobName = "ops_metrics_collector" + opsMetricsCollectorMinInterval = 60 * time.Second + opsMetricsCollectorMaxInterval = 1 * time.Hour + + opsMetricsCollectorTimeout = 10 * time.Second + + opsMetricsCollectorLeaderLockKey = "ops:metrics:collector:leader" + opsMetricsCollectorLeaderLockTTL = 90 * time.Second + + opsMetricsCollectorHeartbeatTimeout = 2 * time.Second + + bytesPerMB = 1024 * 1024 +) + +var opsMetricsCollectorAdvisoryLockID = hashAdvisoryLockID(opsMetricsCollectorLeaderLockKey) + +type OpsMetricsCollector struct { + opsRepo OpsRepository + settingRepo SettingRepository + cfg *config.Config + + db *sql.DB + redisClient *redis.Client + instanceID string + + lastCgroupCPUUsageNanos uint64 + lastCgroupCPUSampleAt time.Time + + stopCh chan struct{} + startOnce sync.Once + stopOnce sync.Once + + skipLogMu sync.Mutex + skipLogAt time.Time +} + +func NewOpsMetricsCollector( + opsRepo OpsRepository, + settingRepo SettingRepository, + db *sql.DB, + redisClient *redis.Client, + cfg *config.Config, +) *OpsMetricsCollector { + return &OpsMetricsCollector{ + opsRepo: opsRepo, + settingRepo: settingRepo, + cfg: cfg, + db: db, + redisClient: redisClient, + instanceID: uuid.NewString(), + } +} + +func (c *OpsMetricsCollector) Start() { + if c == nil { + return + } + c.startOnce.Do(func() { + if c.stopCh == nil { + c.stopCh = make(chan struct{}) + } + go c.run() + }) +} + +func (c *OpsMetricsCollector) Stop() { + if c == nil { + return + } + c.stopOnce.Do(func() { + if c.stopCh != nil { + close(c.stopCh) + } + }) +} + +func (c *OpsMetricsCollector) run() { + // First run immediately so the dashboard has data soon after startup. + c.collectOnce() + + for { + interval := c.getInterval() + timer := time.NewTimer(interval) + select { + case <-timer.C: + c.collectOnce() + case <-c.stopCh: + timer.Stop() + return + } + } +} + +func (c *OpsMetricsCollector) getInterval() time.Duration { + interval := opsMetricsCollectorMinInterval + + if c.settingRepo == nil { + return interval + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + raw, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMetricsIntervalSeconds) + if err != nil { + return interval + } + raw = strings.TrimSpace(raw) + if raw == "" { + return interval + } + + seconds, err := strconv.Atoi(raw) + if err != nil { + return interval + } + if seconds < int(opsMetricsCollectorMinInterval.Seconds()) { + seconds = int(opsMetricsCollectorMinInterval.Seconds()) + } + if seconds > int(opsMetricsCollectorMaxInterval.Seconds()) { + seconds = int(opsMetricsCollectorMaxInterval.Seconds()) + } + return time.Duration(seconds) * time.Second +} + +func (c *OpsMetricsCollector) collectOnce() { + if c == nil { + return + } + if c.cfg != nil && !c.cfg.Ops.Enabled { + return + } + if c.opsRepo == nil { + return + } + if c.db == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), opsMetricsCollectorTimeout) + defer cancel() + + if !c.isMonitoringEnabled(ctx) { + return + } + + release, ok := c.tryAcquireLeaderLock(ctx) + if !ok { + return + } + if release != nil { + defer release() + } + + startedAt := time.Now().UTC() + err := c.collectAndPersist(ctx) + finishedAt := time.Now().UTC() + + durationMs := finishedAt.Sub(startedAt).Milliseconds() + dur := durationMs + runAt := startedAt + + if err != nil { + msg := truncateString(err.Error(), 2048) + errAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout) + defer hbCancel() + _ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsMetricsCollectorJobName, + LastRunAt: &runAt, + LastErrorAt: &errAt, + LastError: &msg, + LastDurationMs: &dur, + }) + log.Printf("[OpsMetricsCollector] collect failed: %v", err) + return + } + + successAt := finishedAt + hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout) + defer hbCancel() + _ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{ + JobName: opsMetricsCollectorJobName, + LastRunAt: &runAt, + LastSuccessAt: &successAt, + LastDurationMs: &dur, + }) +} + +func (c *OpsMetricsCollector) isMonitoringEnabled(ctx context.Context) bool { + if c == nil { + return false + } + if c.cfg != nil && !c.cfg.Ops.Enabled { + return false + } + if c.settingRepo == nil { + return true + } + if ctx == nil { + ctx = context.Background() + } + + value, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled) + if err != nil { + if errors.Is(err, ErrSettingNotFound) { + return true + } + // Fail-open: collector should not become a hard dependency. + return true + } + switch strings.ToLower(strings.TrimSpace(value)) { + case "false", "0", "off", "disabled": + return false + default: + return true + } +} + +func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } + + // Align to stable minute boundaries to avoid partial buckets and to maximize cache hits. + now := time.Now().UTC() + windowEnd := now.Truncate(time.Minute) + windowStart := windowEnd.Add(-1 * time.Minute) + + sys, err := c.collectSystemStats(ctx) + if err != nil { + // Continue; system stats are best-effort. + log.Printf("[OpsMetricsCollector] system stats error: %v", err) + } + + dbOK := c.checkDB(ctx) + redisOK := c.checkRedis(ctx) + active, idle := c.dbPoolStats() + + successCount, tokenConsumed, err := c.queryUsageCounts(ctx, windowStart, windowEnd) + if err != nil { + return fmt.Errorf("query usage counts: %w", err) + } + + duration, ttft, err := c.queryUsageLatency(ctx, windowStart, windowEnd) + if err != nil { + return fmt.Errorf("query usage latency: %w", err) + } + + errorTotal, businessLimited, errorSLA, upstreamExcl, upstream429, upstream529, err := c.queryErrorCounts(ctx, windowStart, windowEnd) + if err != nil { + return fmt.Errorf("query error counts: %w", err) + } + + windowSeconds := windowEnd.Sub(windowStart).Seconds() + if windowSeconds <= 0 { + windowSeconds = 60 + } + requestTotal := successCount + errorTotal + qps := float64(requestTotal) / windowSeconds + tps := float64(tokenConsumed) / windowSeconds + + goroutines := runtime.NumGoroutine() + + input := &OpsInsertSystemMetricsInput{ + CreatedAt: windowEnd, + WindowMinutes: 1, + + SuccessCount: successCount, + ErrorCountTotal: errorTotal, + BusinessLimitedCount: businessLimited, + ErrorCountSLA: errorSLA, + + UpstreamErrorCountExcl429529: upstreamExcl, + Upstream429Count: upstream429, + Upstream529Count: upstream529, + + TokenConsumed: tokenConsumed, + QPS: float64Ptr(roundTo1DP(qps)), + TPS: float64Ptr(roundTo1DP(tps)), + + DurationP50Ms: duration.p50, + DurationP90Ms: duration.p90, + DurationP95Ms: duration.p95, + DurationP99Ms: duration.p99, + DurationAvgMs: duration.avg, + DurationMaxMs: duration.max, + + TTFTP50Ms: ttft.p50, + TTFTP90Ms: ttft.p90, + TTFTP95Ms: ttft.p95, + TTFTP99Ms: ttft.p99, + TTFTAvgMs: ttft.avg, + TTFTMaxMs: ttft.max, + + CPUUsagePercent: sys.cpuUsagePercent, + MemoryUsedMB: sys.memoryUsedMB, + MemoryTotalMB: sys.memoryTotalMB, + MemoryUsagePercent: sys.memoryUsagePercent, + + DBOK: boolPtr(dbOK), + RedisOK: boolPtr(redisOK), + + DBConnActive: intPtr(active), + DBConnIdle: intPtr(idle), + GoroutineCount: intPtr(goroutines), + } + + return c.opsRepo.InsertSystemMetrics(ctx, input) +} + +type opsCollectedPercentiles struct { + p50 *int + p90 *int + p95 *int + p99 *int + avg *float64 + max *int +} + +func (c *OpsMetricsCollector) queryUsageCounts(ctx context.Context, start, end time.Time) (successCount int64, tokenConsumed int64, err error) { + q := ` +SELECT + COALESCE(COUNT(*), 0) AS success_count, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed +FROM usage_logs +WHERE created_at >= $1 AND created_at < $2` + + var tokens sql.NullInt64 + if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&successCount, &tokens); err != nil { + return 0, 0, err + } + if tokens.Valid { + tokenConsumed = tokens.Int64 + } + return successCount, tokenConsumed, nil +} + +func (c *OpsMetricsCollector) queryUsageLatency(ctx context.Context, start, end time.Time) (duration opsCollectedPercentiles, ttft opsCollectedPercentiles, err error) { + { + q := ` +SELECT + percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) AS p50, + percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) AS p90, + percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) AS p95, + percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99, + AVG(duration_ms) AS avg_ms, + MAX(duration_ms) AS max_ms +FROM usage_logs +WHERE created_at >= $1 AND created_at < $2 + AND duration_ms IS NOT NULL` + + var p50, p90, p95, p99 sql.NullFloat64 + var avg sql.NullFloat64 + var max sql.NullInt64 + if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil { + return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err + } + duration.p50 = floatToIntPtr(p50) + duration.p90 = floatToIntPtr(p90) + duration.p95 = floatToIntPtr(p95) + duration.p99 = floatToIntPtr(p99) + if avg.Valid { + v := roundTo1DP(avg.Float64) + duration.avg = &v + } + if max.Valid { + v := int(max.Int64) + duration.max = &v + } + } + + { + q := ` +SELECT + percentile_cont(0.50) WITHIN GROUP (ORDER BY first_token_ms) AS p50, + percentile_cont(0.90) WITHIN GROUP (ORDER BY first_token_ms) AS p90, + percentile_cont(0.95) WITHIN GROUP (ORDER BY first_token_ms) AS p95, + percentile_cont(0.99) WITHIN GROUP (ORDER BY first_token_ms) AS p99, + AVG(first_token_ms) AS avg_ms, + MAX(first_token_ms) AS max_ms +FROM usage_logs +WHERE created_at >= $1 AND created_at < $2 + AND first_token_ms IS NOT NULL` + + var p50, p90, p95, p99 sql.NullFloat64 + var avg sql.NullFloat64 + var max sql.NullInt64 + if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil { + return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err + } + ttft.p50 = floatToIntPtr(p50) + ttft.p90 = floatToIntPtr(p90) + ttft.p95 = floatToIntPtr(p95) + ttft.p99 = floatToIntPtr(p99) + if avg.Valid { + v := roundTo1DP(avg.Float64) + ttft.avg = &v + } + if max.Valid { + v := int(max.Int64) + ttft.max = &v + } + } + + return duration, ttft, nil +} + +func (c *OpsMetricsCollector) queryErrorCounts(ctx context.Context, start, end time.Time) ( + errorTotal int64, + businessLimited int64, + errorSLA int64, + upstreamExcl429529 int64, + upstream429 int64, + upstream529 int64, + err error, +) { + q := ` +SELECT + COALESCE(COUNT(*), 0) AS error_total, + COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited, + COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429), 0) AS upstream_429, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529), 0) AS upstream_529 +FROM ops_error_logs +WHERE created_at >= $1 AND created_at < $2` + + if err := c.db.QueryRowContext(ctx, q, start, end).Scan( + &errorTotal, + &businessLimited, + &errorSLA, + &upstreamExcl429529, + &upstream429, + &upstream529, + ); err != nil { + return 0, 0, 0, 0, 0, 0, err + } + return errorTotal, businessLimited, errorSLA, upstreamExcl429529, upstream429, upstream529, nil +} + +type opsCollectedSystemStats struct { + cpuUsagePercent *float64 + memoryUsedMB *int64 + memoryTotalMB *int64 + memoryUsagePercent *float64 +} + +func (c *OpsMetricsCollector) collectSystemStats(ctx context.Context) (*opsCollectedSystemStats, error) { + out := &opsCollectedSystemStats{} + if ctx == nil { + ctx = context.Background() + } + + sampleAt := time.Now().UTC() + + // Prefer cgroup (container) metrics when available. + if cpuPct := c.tryCgroupCPUPercent(sampleAt); cpuPct != nil { + out.cpuUsagePercent = cpuPct + } + + cgroupUsed, cgroupTotal, cgroupOK := readCgroupMemoryBytes() + if cgroupOK { + usedMB := int64(cgroupUsed / bytesPerMB) + out.memoryUsedMB = &usedMB + if cgroupTotal > 0 { + totalMB := int64(cgroupTotal / bytesPerMB) + out.memoryTotalMB = &totalMB + pct := roundTo1DP(float64(cgroupUsed) / float64(cgroupTotal) * 100) + out.memoryUsagePercent = &pct + } + } + + // Fallback to host metrics if cgroup metrics are unavailable (or incomplete). + if out.cpuUsagePercent == nil { + if cpuPercents, err := cpu.PercentWithContext(ctx, 0, false); err == nil && len(cpuPercents) > 0 { + v := roundTo1DP(cpuPercents[0]) + out.cpuUsagePercent = &v + } + } + + // If total memory isn't available from cgroup (e.g. memory.max = "max"), fill total from host. + if out.memoryUsedMB == nil || out.memoryTotalMB == nil || out.memoryUsagePercent == nil { + if vm, err := mem.VirtualMemoryWithContext(ctx); err == nil && vm != nil { + if out.memoryUsedMB == nil { + usedMB := int64(vm.Used / bytesPerMB) + out.memoryUsedMB = &usedMB + } + if out.memoryTotalMB == nil { + totalMB := int64(vm.Total / bytesPerMB) + out.memoryTotalMB = &totalMB + } + if out.memoryUsagePercent == nil { + if out.memoryUsedMB != nil && out.memoryTotalMB != nil && *out.memoryTotalMB > 0 { + pct := roundTo1DP(float64(*out.memoryUsedMB) / float64(*out.memoryTotalMB) * 100) + out.memoryUsagePercent = &pct + } else { + pct := roundTo1DP(vm.UsedPercent) + out.memoryUsagePercent = &pct + } + } + } + } + + return out, nil +} + +func (c *OpsMetricsCollector) tryCgroupCPUPercent(now time.Time) *float64 { + usageNanos, ok := readCgroupCPUUsageNanos() + if !ok { + return nil + } + + // Initialize baseline sample. + if c.lastCgroupCPUSampleAt.IsZero() { + c.lastCgroupCPUUsageNanos = usageNanos + c.lastCgroupCPUSampleAt = now + return nil + } + + elapsed := now.Sub(c.lastCgroupCPUSampleAt) + if elapsed <= 0 { + c.lastCgroupCPUUsageNanos = usageNanos + c.lastCgroupCPUSampleAt = now + return nil + } + + prev := c.lastCgroupCPUUsageNanos + c.lastCgroupCPUUsageNanos = usageNanos + c.lastCgroupCPUSampleAt = now + + if usageNanos < prev { + // Counter reset (container restarted). + return nil + } + + deltaUsageSec := float64(usageNanos-prev) / 1e9 + elapsedSec := elapsed.Seconds() + if elapsedSec <= 0 { + return nil + } + + cores := readCgroupCPULimitCores() + if cores <= 0 { + // Can't reliably normalize; skip and fall back to gopsutil. + return nil + } + + pct := (deltaUsageSec / (elapsedSec * cores)) * 100 + if pct < 0 { + pct = 0 + } + // Clamp to avoid noise/jitter showing impossible values. + if pct > 100 { + pct = 100 + } + v := roundTo1DP(pct) + return &v +} + +func readCgroupMemoryBytes() (usedBytes uint64, totalBytes uint64, ok bool) { + // cgroup v2 (most common in modern containers) + if used, ok1 := readUintFile("/sys/fs/cgroup/memory.current"); ok1 { + usedBytes = used + rawMax, err := os.ReadFile("/sys/fs/cgroup/memory.max") + if err == nil { + s := strings.TrimSpace(string(rawMax)) + if s != "" && s != "max" { + if v, err := strconv.ParseUint(s, 10, 64); err == nil { + totalBytes = v + } + } + } + return usedBytes, totalBytes, true + } + + // cgroup v1 fallback + if used, ok1 := readUintFile("/sys/fs/cgroup/memory/memory.usage_in_bytes"); ok1 { + usedBytes = used + if limit, ok2 := readUintFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); ok2 { + // Some environments report a very large number when unlimited. + if limit > 0 && limit < (1<<60) { + totalBytes = limit + } + } + return usedBytes, totalBytes, true + } + + return 0, 0, false +} + +func readCgroupCPUUsageNanos() (usageNanos uint64, ok bool) { + // cgroup v2: cpu.stat has usage_usec + if raw, err := os.ReadFile("/sys/fs/cgroup/cpu.stat"); err == nil { + lines := strings.Split(string(raw), "\n") + for _, line := range lines { + fields := strings.Fields(line) + if len(fields) != 2 { + continue + } + if fields[0] != "usage_usec" { + continue + } + v, err := strconv.ParseUint(fields[1], 10, 64) + if err != nil { + continue + } + return v * 1000, true + } + } + + // cgroup v1: cpuacct.usage is in nanoseconds + if v, ok := readUintFile("/sys/fs/cgroup/cpuacct/cpuacct.usage"); ok { + return v, true + } + + return 0, false +} + +func readCgroupCPULimitCores() float64 { + // cgroup v2: cpu.max => "