package service import ( "context" "fmt" "log" "math" "strconv" "strings" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) const ( opsAlertEvaluatorJobName = "ops_alert_evaluator" opsAlertEvaluatorTimeout = 45 * time.Second opsAlertEvaluatorLeaderLockKey = "ops:alert:evaluator:leader" opsAlertEvaluatorLeaderLockTTL = 90 * time.Second opsAlertEvaluatorSkipLogInterval = 1 * time.Minute ) var opsAlertEvaluatorReleaseScript = redis.NewScript(` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 `) type OpsAlertEvaluatorService struct { opsService *OpsService opsRepo OpsRepository emailService *EmailService redisClient *redis.Client cfg *config.Config instanceID string stopCh chan struct{} startOnce sync.Once stopOnce sync.Once wg sync.WaitGroup mu sync.Mutex ruleStates map[int64]*opsAlertRuleState emailLimiter *slidingWindowLimiter skipLogMu sync.Mutex skipLogAt time.Time warnNoRedisOnce sync.Once } type opsAlertRuleState struct { LastEvaluatedAt time.Time ConsecutiveBreaches int } func NewOpsAlertEvaluatorService( opsService *OpsService, opsRepo OpsRepository, emailService *EmailService, redisClient *redis.Client, cfg *config.Config, ) *OpsAlertEvaluatorService { return &OpsAlertEvaluatorService{ opsService: opsService, opsRepo: opsRepo, emailService: emailService, redisClient: redisClient, cfg: cfg, instanceID: uuid.NewString(), ruleStates: map[int64]*opsAlertRuleState{}, emailLimiter: newSlidingWindowLimiter(0, time.Hour), } } func (s *OpsAlertEvaluatorService) Start() { if s == nil { return } s.startOnce.Do(func() { if s.stopCh == nil { s.stopCh = make(chan struct{}) } go s.run() }) } func (s *OpsAlertEvaluatorService) Stop() { if s == nil { return } s.stopOnce.Do(func() { if s.stopCh != nil { close(s.stopCh) } }) s.wg.Wait() } func (s *OpsAlertEvaluatorService) run() { s.wg.Add(1) defer s.wg.Done() // Start immediately to produce early feedback in ops dashboard. timer := time.NewTimer(0) defer timer.Stop() for { select { case <-timer.C: interval := s.getInterval() s.evaluateOnce(interval) timer.Reset(interval) case <-s.stopCh: return } } } func (s *OpsAlertEvaluatorService) getInterval() time.Duration { // Default. interval := 60 * time.Second if s == nil || s.opsService == nil { return interval } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() cfg, err := s.opsService.GetOpsAlertRuntimeSettings(ctx) if err != nil || cfg == nil { return interval } if cfg.EvaluationIntervalSeconds <= 0 { return interval } if cfg.EvaluationIntervalSeconds < 1 { return interval } if cfg.EvaluationIntervalSeconds > int((24 * time.Hour).Seconds()) { return interval } return time.Duration(cfg.EvaluationIntervalSeconds) * time.Second } func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { if s == nil || s.opsRepo == nil { return } if s.cfg != nil && !s.cfg.Ops.Enabled { return } ctx, cancel := context.WithTimeout(context.Background(), opsAlertEvaluatorTimeout) defer cancel() if s.opsService != nil && !s.opsService.IsMonitoringEnabled(ctx) { return } runtimeCfg := defaultOpsAlertRuntimeSettings() if s.opsService != nil { if loaded, err := s.opsService.GetOpsAlertRuntimeSettings(ctx); err == nil && loaded != nil { runtimeCfg = loaded } } release, ok := s.tryAcquireLeaderLock(ctx, runtimeCfg.DistributedLock) if !ok { return } if release != nil { defer release() } startedAt := time.Now().UTC() runAt := startedAt rules, err := s.opsRepo.ListAlertRules(ctx) if err != nil { s.recordHeartbeatError(runAt, time.Since(startedAt), err) log.Printf("[OpsAlertEvaluator] list rules failed: %v", err) return } rulesTotal := len(rules) rulesEnabled := 0 rulesEvaluated := 0 eventsCreated := 0 eventsResolved := 0 emailsSent := 0 now := time.Now().UTC() safeEnd := now.Truncate(time.Minute) if safeEnd.IsZero() { safeEnd = now } systemMetrics, _ := s.opsRepo.GetLatestSystemMetrics(ctx, 1) // Cleanup stale state for removed rules. s.pruneRuleStates(rules) for _, rule := range rules { if rule == nil || !rule.Enabled || rule.ID <= 0 { continue } rulesEnabled++ scopePlatform, scopeGroupID, scopeRegion := parseOpsAlertRuleScope(rule.Filters) windowMinutes := rule.WindowMinutes if windowMinutes <= 0 { windowMinutes = 1 } windowStart := safeEnd.Add(-time.Duration(windowMinutes) * time.Minute) windowEnd := safeEnd metricValue, ok := s.computeRuleMetric(ctx, rule, systemMetrics, windowStart, windowEnd, scopePlatform, scopeGroupID) if !ok { s.resetRuleState(rule.ID, now) continue } rulesEvaluated++ breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold) required := requiredSustainedBreaches(rule.SustainedMinutes, interval) consecutive := s.updateRuleBreaches(rule.ID, now, interval, breachedNow) activeEvent, err := s.opsRepo.GetActiveAlertEvent(ctx, rule.ID) if err != nil { log.Printf("[OpsAlertEvaluator] get active event failed (rule=%d): %v", rule.ID, err) continue } if breachedNow && consecutive >= required { if activeEvent != nil { continue } // Scoped silencing: if a matching silence exists, skip creating a firing event. if s.opsService != nil { platform := strings.TrimSpace(scopePlatform) region := scopeRegion if platform != "" { if ok, err := s.opsService.IsAlertSilenced(ctx, rule.ID, platform, scopeGroupID, region, now); err == nil && ok { continue } } } latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID) if err != nil { log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err) continue } if latestEvent != nil && rule.CooldownMinutes > 0 { cooldown := time.Duration(rule.CooldownMinutes) * time.Minute if now.Sub(latestEvent.FiredAt) < cooldown { continue } } firedEvent := &OpsAlertEvent{ RuleID: rule.ID, Severity: strings.TrimSpace(rule.Severity), Status: OpsAlertStatusFiring, Title: fmt.Sprintf("%s: %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)), Description: buildOpsAlertDescription(rule, metricValue, windowMinutes, scopePlatform, scopeGroupID), MetricValue: float64Ptr(metricValue), ThresholdValue: float64Ptr(rule.Threshold), Dimensions: buildOpsAlertDimensions(scopePlatform, scopeGroupID), FiredAt: now, CreatedAt: now, } created, err := s.opsRepo.CreateAlertEvent(ctx, firedEvent) if err != nil { log.Printf("[OpsAlertEvaluator] create event failed (rule=%d): %v", rule.ID, err) continue } eventsCreated++ if created != nil && created.ID > 0 { if s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created) { emailsSent++ } } continue } // Not breached: resolve active event if present. if activeEvent != nil { resolvedAt := now if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil { log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err) } else { eventsResolved++ } } } result := truncateString(fmt.Sprintf("rules=%d enabled=%d evaluated=%d created=%d resolved=%d emails_sent=%d", rulesTotal, rulesEnabled, rulesEvaluated, eventsCreated, eventsResolved, emailsSent), 2048) s.recordHeartbeatSuccess(runAt, time.Since(startedAt), result) } func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) { s.mu.Lock() defer s.mu.Unlock() live := map[int64]struct{}{} for _, r := range rules { if r != nil && r.ID > 0 { live[r.ID] = struct{}{} } } for id := range s.ruleStates { if _, ok := live[id]; !ok { delete(s.ruleStates, id) } } } func (s *OpsAlertEvaluatorService) resetRuleState(ruleID int64, now time.Time) { if ruleID <= 0 { return } s.mu.Lock() defer s.mu.Unlock() state, ok := s.ruleStates[ruleID] if !ok { state = &opsAlertRuleState{} s.ruleStates[ruleID] = state } state.LastEvaluatedAt = now state.ConsecutiveBreaches = 0 } func (s *OpsAlertEvaluatorService) updateRuleBreaches(ruleID int64, now time.Time, interval time.Duration, breached bool) int { if ruleID <= 0 { return 0 } s.mu.Lock() defer s.mu.Unlock() state, ok := s.ruleStates[ruleID] if !ok { state = &opsAlertRuleState{} s.ruleStates[ruleID] = state } if !state.LastEvaluatedAt.IsZero() && interval > 0 { if now.Sub(state.LastEvaluatedAt) > interval*2 { state.ConsecutiveBreaches = 0 } } state.LastEvaluatedAt = now if breached { state.ConsecutiveBreaches++ } else { state.ConsecutiveBreaches = 0 } return state.ConsecutiveBreaches } func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int { if sustainedMinutes <= 0 { return 1 } if interval <= 0 { return sustainedMinutes } required := int(math.Ceil(float64(sustainedMinutes*60) / interval.Seconds())) if required < 1 { return 1 } return required } func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64, region *string) { if filters == nil { return "", nil, nil } if v, ok := filters["platform"]; ok { if s, ok := v.(string); ok { platform = strings.TrimSpace(s) } } if v, ok := filters["group_id"]; ok { switch t := v.(type) { case float64: if t > 0 { id := int64(t) groupID = &id } case int64: if t > 0 { id := t groupID = &id } case int: if t > 0 { id := int64(t) groupID = &id } case string: n, err := strconv.ParseInt(strings.TrimSpace(t), 10, 64) if err == nil && n > 0 { groupID = &n } } } if v, ok := filters["region"]; ok { if s, ok := v.(string); ok { vv := strings.TrimSpace(s) if vv != "" { region = &vv } } } return platform, groupID, region } func (s *OpsAlertEvaluatorService) computeRuleMetric( ctx context.Context, rule *OpsAlertRule, systemMetrics *OpsSystemMetricsSnapshot, start time.Time, end time.Time, platform string, groupID *int64, ) (float64, bool) { if rule == nil { return 0, false } switch strings.TrimSpace(rule.MetricType) { case "cpu_usage_percent": if systemMetrics != nil && systemMetrics.CPUUsagePercent != nil { return *systemMetrics.CPUUsagePercent, true } return 0, false case "memory_usage_percent": if systemMetrics != nil && systemMetrics.MemoryUsagePercent != nil { return *systemMetrics.MemoryUsagePercent, true } return 0, false case "concurrency_queue_depth": if systemMetrics != nil && systemMetrics.ConcurrencyQueueDepth != nil { return float64(*systemMetrics.ConcurrencyQueueDepth), true } return 0, false case "group_available_accounts": if groupID == nil || *groupID <= 0 { return 0, false } if s == nil || s.opsService == nil { return 0, false } availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) if err != nil || availability == nil { return 0, false } if availability.Group == nil { return 0, true } return float64(availability.Group.AvailableCount), true case "group_available_ratio": if groupID == nil || *groupID <= 0 { return 0, false } if s == nil || s.opsService == nil { return 0, false } availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) if err != nil || availability == nil { return 0, false } return computeGroupAvailableRatio(availability.Group), true case "account_rate_limited_count": if s == nil || s.opsService == nil { return 0, false } availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) if err != nil || availability == nil { return 0, false } return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool { return acc.IsRateLimited })), true case "account_error_count": if s == nil || s.opsService == nil { return 0, false } availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) if err != nil || availability == nil { return 0, false } return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool { return acc.HasError && acc.TempUnschedulableUntil == nil })), true } overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{ StartTime: start, EndTime: end, Platform: platform, GroupID: groupID, QueryMode: OpsQueryModeRaw, }) if err != nil { return 0, false } if overview == nil { return 0, false } switch strings.TrimSpace(rule.MetricType) { case "success_rate": if overview.RequestCountSLA <= 0 { return 0, false } return overview.SLA * 100, true case "error_rate": if overview.RequestCountSLA <= 0 { return 0, false } return overview.ErrorRate * 100, true case "upstream_error_rate": if overview.RequestCountSLA <= 0 { return 0, false } return overview.UpstreamErrorRate * 100, true default: return 0, false } } func compareMetric(value float64, operator string, threshold float64) bool { switch strings.TrimSpace(operator) { case ">": return value > threshold case ">=": return value >= threshold case "<": return value < threshold case "<=": return value <= threshold case "==": return value == threshold case "!=": return value != threshold default: return false } } func buildOpsAlertDimensions(platform string, groupID *int64) map[string]any { dims := map[string]any{} if strings.TrimSpace(platform) != "" { dims["platform"] = strings.TrimSpace(platform) } if groupID != nil && *groupID > 0 { dims["group_id"] = *groupID } if len(dims) == 0 { return nil } return dims } func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes int, platform string, groupID *int64) string { if rule == nil { return "" } scope := "overall" if strings.TrimSpace(platform) != "" { scope = fmt.Sprintf("platform=%s", strings.TrimSpace(platform)) } if groupID != nil && *groupID > 0 { scope = fmt.Sprintf("%s group_id=%d", scope, *groupID) } if windowMinutes <= 0 { windowMinutes = 1 } return fmt.Sprintf("%s %s %.2f (current %.2f) over last %dm (%s)", strings.TrimSpace(rule.MetricType), strings.TrimSpace(rule.Operator), rule.Threshold, value, windowMinutes, strings.TrimSpace(scope), ) } func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) bool { if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil { return false } if event.EmailSent { return false } if !rule.NotifyEmail { return false } emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx) if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled { return false } if len(emailCfg.Alert.Recipients) == 0 { return false } if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) { return false } if runtimeCfg != nil && runtimeCfg.Silencing.Enabled { if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) { return false } } // Apply/update rate limiter. s.emailLimiter.SetLimit(emailCfg.Alert.RateLimitPerHour) subject := fmt.Sprintf("[Ops Alert][%s] %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)) body := buildOpsAlertEmailBody(rule, event) anySent := false for _, to := range emailCfg.Alert.Recipients { addr := strings.TrimSpace(to) if addr == "" { continue } if !s.emailLimiter.Allow(time.Now().UTC()) { continue } if err := s.emailService.SendEmail(ctx, addr, subject, body); err != nil { // Ignore per-recipient failures; continue best-effort. continue } anySent = true } if anySent { _ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true) } return anySent } func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string { if rule == nil || event == nil { return "" } metric := strings.TrimSpace(rule.MetricType) value := "-" threshold := fmt.Sprintf("%.2f", rule.Threshold) if event.MetricValue != nil { value = fmt.Sprintf("%.2f", *event.MetricValue) } if event.ThresholdValue != nil { threshold = fmt.Sprintf("%.2f", *event.ThresholdValue) } return fmt.Sprintf(`

Ops Alert

Rule: %s

Severity: %s

Status: %s

Metric: %s %s %s

Fired at: %s

Description: %s

`, htmlEscape(rule.Name), htmlEscape(rule.Severity), htmlEscape(event.Status), htmlEscape(metric), htmlEscape(rule.Operator), htmlEscape(fmt.Sprintf("%s (threshold %s)", value, threshold)), event.FiredAt.Format(time.RFC3339), htmlEscape(event.Description), ) } func shouldSendOpsAlertEmailByMinSeverity(minSeverity string, ruleSeverity string) bool { minSeverity = strings.ToLower(strings.TrimSpace(minSeverity)) if minSeverity == "" { return true } eventLevel := opsEmailSeverityForOps(ruleSeverity) minLevel := strings.ToLower(minSeverity) rank := func(level string) int { switch level { case "critical": return 3 case "warning": return 2 case "info": return 1 default: return 0 } } return rank(eventLevel) >= rank(minLevel) } func opsEmailSeverityForOps(severity string) string { switch strings.ToUpper(strings.TrimSpace(severity)) { case "P0": return "critical" case "P1": return "warning" default: return "info" } } func isOpsAlertSilenced(now time.Time, rule *OpsAlertRule, event *OpsAlertEvent, silencing OpsAlertSilencingSettings) bool { if !silencing.Enabled { return false } if now.IsZero() { now = time.Now().UTC() } if strings.TrimSpace(silencing.GlobalUntilRFC3339) != "" { if t, err := time.Parse(time.RFC3339, strings.TrimSpace(silencing.GlobalUntilRFC3339)); err == nil { if now.Before(t) { return true } } } for _, entry := range silencing.Entries { untilRaw := strings.TrimSpace(entry.UntilRFC3339) if untilRaw == "" { continue } until, err := time.Parse(time.RFC3339, untilRaw) if err != nil { continue } if now.After(until) { continue } if entry.RuleID != nil && rule != nil && rule.ID > 0 && *entry.RuleID != rule.ID { continue } if len(entry.Severities) > 0 { match := false for _, s := range entry.Severities { if strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(event.Severity)) || strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(rule.Severity)) { match = true break } } if !match { continue } } return true } return false } func (s *OpsAlertEvaluatorService) tryAcquireLeaderLock(ctx context.Context, lock OpsDistributedLockSettings) (func(), bool) { if !lock.Enabled { return nil, true } if s.redisClient == nil { s.warnNoRedisOnce.Do(func() { log.Printf("[OpsAlertEvaluator] redis not configured; running without distributed lock") }) return nil, true } key := strings.TrimSpace(lock.Key) if key == "" { key = opsAlertEvaluatorLeaderLockKey } ttl := time.Duration(lock.TTLSeconds) * time.Second if ttl <= 0 { ttl = opsAlertEvaluatorLeaderLockTTL } ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() if err != nil { // Prefer fail-closed to avoid duplicate evaluators stampeding the DB when Redis is flaky. // Single-node deployments can disable the distributed lock via runtime settings. s.warnNoRedisOnce.Do(func() { log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; skipping this cycle: %v", err) }) return nil, false } if !ok { s.maybeLogSkip(key) return nil, false } return func() { _, _ = opsAlertEvaluatorReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() }, true } func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) { s.skipLogMu.Lock() defer s.skipLogMu.Unlock() now := time.Now() if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < opsAlertEvaluatorSkipLogInterval { return } s.skipLogAt = now log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key) } func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, result string) { if s == nil || s.opsRepo == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msg := strings.TrimSpace(result) if msg == "" { msg = "ok" } msg = truncateString(msg, 2048) _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsAlertEvaluatorJobName, LastRunAt: &runAt, LastSuccessAt: &now, LastDurationMs: &durMs, LastResult: &msg, }) } func (s *OpsAlertEvaluatorService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { if s == nil || s.opsRepo == nil || err == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() msg := truncateString(err.Error(), 2048) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsAlertEvaluatorJobName, LastRunAt: &runAt, LastErrorAt: &now, LastError: &msg, LastDurationMs: &durMs, }) } func htmlEscape(s string) string { replacer := strings.NewReplacer( "&", "&", "<", "<", ">", ">", `"`, """, "'", "'", ) return replacer.Replace(s) } type slidingWindowLimiter struct { mu sync.Mutex limit int window time.Duration sent []time.Time } func newSlidingWindowLimiter(limit int, window time.Duration) *slidingWindowLimiter { if window <= 0 { window = time.Hour } return &slidingWindowLimiter{ limit: limit, window: window, sent: []time.Time{}, } } func (l *slidingWindowLimiter) SetLimit(limit int) { l.mu.Lock() defer l.mu.Unlock() l.limit = limit } func (l *slidingWindowLimiter) Allow(now time.Time) bool { l.mu.Lock() defer l.mu.Unlock() if l.limit <= 0 { return true } cutoff := now.Add(-l.window) keep := l.sent[:0] for _, t := range l.sent { if t.After(cutoff) { keep = append(keep, t) } } l.sent = keep if len(l.sent) >= l.limit { return false } l.sent = append(l.sent, now) return true } // computeGroupAvailableRatio returns the available percentage for a group. // Formula: (AvailableCount / TotalAccounts) * 100. // Returns 0 when TotalAccounts is 0. func computeGroupAvailableRatio(group *GroupAvailability) float64 { if group == nil || group.TotalAccounts <= 0 { return 0 } return (float64(group.AvailableCount) / float64(group.TotalAccounts)) * 100 } // countAccountsByCondition counts accounts that satisfy the given condition. func countAccountsByCondition(accounts map[int64]*AccountAvailability, condition func(*AccountAvailability) bool) int64 { if len(accounts) == 0 || condition == nil { return 0 } var count int64 for _, account := range accounts { if account != nil && condition(account) { count++ } } return count }