925 lines
22 KiB
Go
925 lines
22 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const (
|
|
opsAlertEvaluatorJobName = "ops_alert_evaluator"
|
|
|
|
opsAlertEvaluatorTimeout = 45 * time.Second
|
|
opsAlertEvaluatorLeaderLockKey = "ops:alert:evaluator:leader"
|
|
opsAlertEvaluatorLeaderLockTTL = 90 * time.Second
|
|
opsAlertEvaluatorSkipLogInterval = 1 * time.Minute
|
|
)
|
|
|
|
var opsAlertEvaluatorReleaseScript = redis.NewScript(`
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
end
|
|
return 0
|
|
`)
|
|
|
|
type OpsAlertEvaluatorService struct {
|
|
opsService *OpsService
|
|
opsRepo OpsRepository
|
|
emailService *EmailService
|
|
|
|
redisClient *redis.Client
|
|
cfg *config.Config
|
|
instanceID string
|
|
|
|
stopCh chan struct{}
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
wg sync.WaitGroup
|
|
|
|
mu sync.Mutex
|
|
ruleStates map[int64]*opsAlertRuleState
|
|
|
|
emailLimiter *slidingWindowLimiter
|
|
|
|
skipLogMu sync.Mutex
|
|
skipLogAt time.Time
|
|
|
|
warnNoRedisOnce sync.Once
|
|
}
|
|
|
|
type opsAlertRuleState struct {
|
|
LastEvaluatedAt time.Time
|
|
ConsecutiveBreaches int
|
|
}
|
|
|
|
func NewOpsAlertEvaluatorService(
|
|
opsService *OpsService,
|
|
opsRepo OpsRepository,
|
|
emailService *EmailService,
|
|
redisClient *redis.Client,
|
|
cfg *config.Config,
|
|
) *OpsAlertEvaluatorService {
|
|
return &OpsAlertEvaluatorService{
|
|
opsService: opsService,
|
|
opsRepo: opsRepo,
|
|
emailService: emailService,
|
|
redisClient: redisClient,
|
|
cfg: cfg,
|
|
instanceID: uuid.NewString(),
|
|
ruleStates: map[int64]*opsAlertRuleState{},
|
|
emailLimiter: newSlidingWindowLimiter(0, time.Hour),
|
|
}
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) Start() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.startOnce.Do(func() {
|
|
if s.stopCh == nil {
|
|
s.stopCh = make(chan struct{})
|
|
}
|
|
go s.run()
|
|
})
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) Stop() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.stopOnce.Do(func() {
|
|
if s.stopCh != nil {
|
|
close(s.stopCh)
|
|
}
|
|
})
|
|
s.wg.Wait()
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) run() {
|
|
s.wg.Add(1)
|
|
defer s.wg.Done()
|
|
|
|
// Start immediately to produce early feedback in ops dashboard.
|
|
timer := time.NewTimer(0)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
interval := s.getInterval()
|
|
s.evaluateOnce(interval)
|
|
timer.Reset(interval)
|
|
case <-s.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) getInterval() time.Duration {
|
|
// Default.
|
|
interval := 60 * time.Second
|
|
|
|
if s == nil || s.opsService == nil {
|
|
return interval
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
cfg, err := s.opsService.GetOpsAlertRuntimeSettings(ctx)
|
|
if err != nil || cfg == nil {
|
|
return interval
|
|
}
|
|
if cfg.EvaluationIntervalSeconds <= 0 {
|
|
return interval
|
|
}
|
|
if cfg.EvaluationIntervalSeconds < 1 {
|
|
return interval
|
|
}
|
|
if cfg.EvaluationIntervalSeconds > int((24 * time.Hour).Seconds()) {
|
|
return interval
|
|
}
|
|
return time.Duration(cfg.EvaluationIntervalSeconds) * time.Second
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) {
|
|
if s == nil || s.opsRepo == nil {
|
|
return
|
|
}
|
|
if s.cfg != nil && !s.cfg.Ops.Enabled {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), opsAlertEvaluatorTimeout)
|
|
defer cancel()
|
|
|
|
if s.opsService != nil && !s.opsService.IsMonitoringEnabled(ctx) {
|
|
return
|
|
}
|
|
|
|
runtimeCfg := defaultOpsAlertRuntimeSettings()
|
|
if s.opsService != nil {
|
|
if loaded, err := s.opsService.GetOpsAlertRuntimeSettings(ctx); err == nil && loaded != nil {
|
|
runtimeCfg = loaded
|
|
}
|
|
}
|
|
|
|
release, ok := s.tryAcquireLeaderLock(ctx, runtimeCfg.DistributedLock)
|
|
if !ok {
|
|
return
|
|
}
|
|
if release != nil {
|
|
defer release()
|
|
}
|
|
|
|
startedAt := time.Now().UTC()
|
|
runAt := startedAt
|
|
|
|
rules, err := s.opsRepo.ListAlertRules(ctx)
|
|
if err != nil {
|
|
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
|
|
log.Printf("[OpsAlertEvaluator] list rules failed: %v", err)
|
|
return
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
safeEnd := now.Truncate(time.Minute)
|
|
if safeEnd.IsZero() {
|
|
safeEnd = now
|
|
}
|
|
|
|
systemMetrics, _ := s.opsRepo.GetLatestSystemMetrics(ctx, 1)
|
|
|
|
// Cleanup stale state for removed rules.
|
|
s.pruneRuleStates(rules)
|
|
|
|
for _, rule := range rules {
|
|
if rule == nil || !rule.Enabled || rule.ID <= 0 {
|
|
continue
|
|
}
|
|
|
|
scopePlatform, scopeGroupID := parseOpsAlertRuleScope(rule.Filters)
|
|
|
|
windowMinutes := rule.WindowMinutes
|
|
if windowMinutes <= 0 {
|
|
windowMinutes = 1
|
|
}
|
|
windowStart := safeEnd.Add(-time.Duration(windowMinutes) * time.Minute)
|
|
windowEnd := safeEnd
|
|
|
|
metricValue, ok := s.computeRuleMetric(ctx, rule, systemMetrics, windowStart, windowEnd, scopePlatform, scopeGroupID)
|
|
if !ok {
|
|
s.resetRuleState(rule.ID, now)
|
|
continue
|
|
}
|
|
|
|
breachedNow := compareMetric(metricValue, rule.Operator, rule.Threshold)
|
|
required := requiredSustainedBreaches(rule.SustainedMinutes, interval)
|
|
consecutive := s.updateRuleBreaches(rule.ID, now, interval, breachedNow)
|
|
|
|
activeEvent, err := s.opsRepo.GetActiveAlertEvent(ctx, rule.ID)
|
|
if err != nil {
|
|
log.Printf("[OpsAlertEvaluator] get active event failed (rule=%d): %v", rule.ID, err)
|
|
continue
|
|
}
|
|
|
|
if breachedNow && consecutive >= required {
|
|
if activeEvent != nil {
|
|
continue
|
|
}
|
|
|
|
// Scoped silencing: if a matching silence exists, skip creating a firing event.
|
|
if s.opsService != nil {
|
|
platform := strings.TrimSpace(scopePlatform)
|
|
region := (*string)(nil)
|
|
if platform != "" {
|
|
if ok, err := s.opsService.IsAlertSilenced(ctx, rule.ID, platform, scopeGroupID, region, now); err == nil && ok {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID)
|
|
if err != nil {
|
|
log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err)
|
|
continue
|
|
}
|
|
if latestEvent != nil && rule.CooldownMinutes > 0 {
|
|
cooldown := time.Duration(rule.CooldownMinutes) * time.Minute
|
|
if now.Sub(latestEvent.FiredAt) < cooldown {
|
|
continue
|
|
}
|
|
}
|
|
|
|
firedEvent := &OpsAlertEvent{
|
|
RuleID: rule.ID,
|
|
Severity: strings.TrimSpace(rule.Severity),
|
|
Status: OpsAlertStatusFiring,
|
|
Title: fmt.Sprintf("%s: %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name)),
|
|
Description: buildOpsAlertDescription(rule, metricValue, windowMinutes, scopePlatform, scopeGroupID),
|
|
MetricValue: float64Ptr(metricValue),
|
|
ThresholdValue: float64Ptr(rule.Threshold),
|
|
Dimensions: buildOpsAlertDimensions(scopePlatform, scopeGroupID),
|
|
FiredAt: now,
|
|
CreatedAt: now,
|
|
}
|
|
|
|
created, err := s.opsRepo.CreateAlertEvent(ctx, firedEvent)
|
|
if err != nil {
|
|
log.Printf("[OpsAlertEvaluator] create event failed (rule=%d): %v", rule.ID, err)
|
|
continue
|
|
}
|
|
|
|
if created != nil && created.ID > 0 {
|
|
s.maybeSendAlertEmail(ctx, runtimeCfg, rule, created)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Not breached: resolve active event if present.
|
|
if activeEvent != nil {
|
|
resolvedAt := now
|
|
if err := s.opsRepo.UpdateAlertEventStatus(ctx, activeEvent.ID, OpsAlertStatusResolved, &resolvedAt); err != nil {
|
|
log.Printf("[OpsAlertEvaluator] resolve event failed (event=%d): %v", activeEvent.ID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
s.recordHeartbeatSuccess(runAt, time.Since(startedAt))
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) pruneRuleStates(rules []*OpsAlertRule) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
live := map[int64]struct{}{}
|
|
for _, r := range rules {
|
|
if r != nil && r.ID > 0 {
|
|
live[r.ID] = struct{}{}
|
|
}
|
|
}
|
|
for id := range s.ruleStates {
|
|
if _, ok := live[id]; !ok {
|
|
delete(s.ruleStates, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) resetRuleState(ruleID int64, now time.Time) {
|
|
if ruleID <= 0 {
|
|
return
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
state, ok := s.ruleStates[ruleID]
|
|
if !ok {
|
|
state = &opsAlertRuleState{}
|
|
s.ruleStates[ruleID] = state
|
|
}
|
|
state.LastEvaluatedAt = now
|
|
state.ConsecutiveBreaches = 0
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) updateRuleBreaches(ruleID int64, now time.Time, interval time.Duration, breached bool) int {
|
|
if ruleID <= 0 {
|
|
return 0
|
|
}
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
state, ok := s.ruleStates[ruleID]
|
|
if !ok {
|
|
state = &opsAlertRuleState{}
|
|
s.ruleStates[ruleID] = state
|
|
}
|
|
|
|
if !state.LastEvaluatedAt.IsZero() && interval > 0 {
|
|
if now.Sub(state.LastEvaluatedAt) > interval*2 {
|
|
state.ConsecutiveBreaches = 0
|
|
}
|
|
}
|
|
|
|
state.LastEvaluatedAt = now
|
|
if breached {
|
|
state.ConsecutiveBreaches++
|
|
} else {
|
|
state.ConsecutiveBreaches = 0
|
|
}
|
|
return state.ConsecutiveBreaches
|
|
}
|
|
|
|
func requiredSustainedBreaches(sustainedMinutes int, interval time.Duration) int {
|
|
if sustainedMinutes <= 0 {
|
|
return 1
|
|
}
|
|
if interval <= 0 {
|
|
return sustainedMinutes
|
|
}
|
|
required := int(math.Ceil(float64(sustainedMinutes*60) / interval.Seconds()))
|
|
if required < 1 {
|
|
return 1
|
|
}
|
|
return required
|
|
}
|
|
|
|
func parseOpsAlertRuleScope(filters map[string]any) (platform string, groupID *int64) {
|
|
if filters == nil {
|
|
return "", nil
|
|
}
|
|
if v, ok := filters["platform"]; ok {
|
|
if s, ok := v.(string); ok {
|
|
platform = strings.TrimSpace(s)
|
|
}
|
|
}
|
|
if v, ok := filters["group_id"]; ok {
|
|
switch t := v.(type) {
|
|
case float64:
|
|
if t > 0 {
|
|
id := int64(t)
|
|
groupID = &id
|
|
}
|
|
case int64:
|
|
if t > 0 {
|
|
id := t
|
|
groupID = &id
|
|
}
|
|
case int:
|
|
if t > 0 {
|
|
id := int64(t)
|
|
groupID = &id
|
|
}
|
|
case string:
|
|
n, err := strconv.ParseInt(strings.TrimSpace(t), 10, 64)
|
|
if err == nil && n > 0 {
|
|
groupID = &n
|
|
}
|
|
}
|
|
}
|
|
return platform, groupID
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) computeRuleMetric(
|
|
ctx context.Context,
|
|
rule *OpsAlertRule,
|
|
systemMetrics *OpsSystemMetricsSnapshot,
|
|
start time.Time,
|
|
end time.Time,
|
|
platform string,
|
|
groupID *int64,
|
|
) (float64, bool) {
|
|
if rule == nil {
|
|
return 0, false
|
|
}
|
|
switch strings.TrimSpace(rule.MetricType) {
|
|
case "cpu_usage_percent":
|
|
if systemMetrics != nil && systemMetrics.CPUUsagePercent != nil {
|
|
return *systemMetrics.CPUUsagePercent, true
|
|
}
|
|
return 0, false
|
|
case "memory_usage_percent":
|
|
if systemMetrics != nil && systemMetrics.MemoryUsagePercent != nil {
|
|
return *systemMetrics.MemoryUsagePercent, true
|
|
}
|
|
return 0, false
|
|
case "concurrency_queue_depth":
|
|
if systemMetrics != nil && systemMetrics.ConcurrencyQueueDepth != nil {
|
|
return float64(*systemMetrics.ConcurrencyQueueDepth), true
|
|
}
|
|
return 0, false
|
|
case "group_available_accounts":
|
|
if groupID == nil || *groupID <= 0 {
|
|
return 0, false
|
|
}
|
|
if s == nil || s.opsService == nil {
|
|
return 0, false
|
|
}
|
|
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
|
|
if err != nil || availability == nil {
|
|
return 0, false
|
|
}
|
|
if availability.Group == nil {
|
|
return 0, true
|
|
}
|
|
return float64(availability.Group.AvailableCount), true
|
|
case "group_available_ratio":
|
|
if groupID == nil || *groupID <= 0 {
|
|
return 0, false
|
|
}
|
|
if s == nil || s.opsService == nil {
|
|
return 0, false
|
|
}
|
|
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
|
|
if err != nil || availability == nil {
|
|
return 0, false
|
|
}
|
|
return computeGroupAvailableRatio(availability.Group), true
|
|
case "account_rate_limited_count":
|
|
if s == nil || s.opsService == nil {
|
|
return 0, false
|
|
}
|
|
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
|
|
if err != nil || availability == nil {
|
|
return 0, false
|
|
}
|
|
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
|
|
return acc.IsRateLimited
|
|
})), true
|
|
case "account_error_count":
|
|
if s == nil || s.opsService == nil {
|
|
return 0, false
|
|
}
|
|
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
|
|
if err != nil || availability == nil {
|
|
return 0, false
|
|
}
|
|
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
|
|
return acc.HasError && acc.TempUnschedulableUntil == nil
|
|
})), true
|
|
}
|
|
|
|
overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{
|
|
StartTime: start,
|
|
EndTime: end,
|
|
Platform: platform,
|
|
GroupID: groupID,
|
|
QueryMode: OpsQueryModeRaw,
|
|
})
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
if overview == nil {
|
|
return 0, false
|
|
}
|
|
|
|
switch strings.TrimSpace(rule.MetricType) {
|
|
case "success_rate":
|
|
if overview.RequestCountSLA <= 0 {
|
|
return 0, false
|
|
}
|
|
return overview.SLA * 100, true
|
|
case "error_rate":
|
|
if overview.RequestCountSLA <= 0 {
|
|
return 0, false
|
|
}
|
|
return overview.ErrorRate * 100, true
|
|
case "upstream_error_rate":
|
|
if overview.RequestCountSLA <= 0 {
|
|
return 0, false
|
|
}
|
|
return overview.UpstreamErrorRate * 100, true
|
|
case "p95_latency_ms":
|
|
if overview.Duration.P95 == nil {
|
|
return 0, false
|
|
}
|
|
return float64(*overview.Duration.P95), true
|
|
case "p99_latency_ms":
|
|
if overview.Duration.P99 == nil {
|
|
return 0, false
|
|
}
|
|
return float64(*overview.Duration.P99), true
|
|
default:
|
|
return 0, false
|
|
}
|
|
}
|
|
|
|
func compareMetric(value float64, operator string, threshold float64) bool {
|
|
switch strings.TrimSpace(operator) {
|
|
case ">":
|
|
return value > threshold
|
|
case ">=":
|
|
return value >= threshold
|
|
case "<":
|
|
return value < threshold
|
|
case "<=":
|
|
return value <= threshold
|
|
case "==":
|
|
return value == threshold
|
|
case "!=":
|
|
return value != threshold
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func buildOpsAlertDimensions(platform string, groupID *int64) map[string]any {
|
|
dims := map[string]any{}
|
|
if strings.TrimSpace(platform) != "" {
|
|
dims["platform"] = strings.TrimSpace(platform)
|
|
}
|
|
if groupID != nil && *groupID > 0 {
|
|
dims["group_id"] = *groupID
|
|
}
|
|
if len(dims) == 0 {
|
|
return nil
|
|
}
|
|
return dims
|
|
}
|
|
|
|
func buildOpsAlertDescription(rule *OpsAlertRule, value float64, windowMinutes int, platform string, groupID *int64) string {
|
|
if rule == nil {
|
|
return ""
|
|
}
|
|
scope := "overall"
|
|
if strings.TrimSpace(platform) != "" {
|
|
scope = fmt.Sprintf("platform=%s", strings.TrimSpace(platform))
|
|
}
|
|
if groupID != nil && *groupID > 0 {
|
|
scope = fmt.Sprintf("%s group_id=%d", scope, *groupID)
|
|
}
|
|
if windowMinutes <= 0 {
|
|
windowMinutes = 1
|
|
}
|
|
return fmt.Sprintf("%s %s %.2f (current %.2f) over last %dm (%s)",
|
|
strings.TrimSpace(rule.MetricType),
|
|
strings.TrimSpace(rule.Operator),
|
|
rule.Threshold,
|
|
value,
|
|
windowMinutes,
|
|
strings.TrimSpace(scope),
|
|
)
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) maybeSendAlertEmail(ctx context.Context, runtimeCfg *OpsAlertRuntimeSettings, rule *OpsAlertRule, event *OpsAlertEvent) {
|
|
if s == nil || s.emailService == nil || s.opsService == nil || event == nil || rule == nil {
|
|
return
|
|
}
|
|
if event.EmailSent {
|
|
return
|
|
}
|
|
if !rule.NotifyEmail {
|
|
return
|
|
}
|
|
|
|
emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx)
|
|
if err != nil || emailCfg == nil || !emailCfg.Alert.Enabled {
|
|
return
|
|
}
|
|
|
|
if len(emailCfg.Alert.Recipients) == 0 {
|
|
return
|
|
}
|
|
if !shouldSendOpsAlertEmailByMinSeverity(strings.TrimSpace(emailCfg.Alert.MinSeverity), strings.TrimSpace(rule.Severity)) {
|
|
return
|
|
}
|
|
|
|
if runtimeCfg != nil && runtimeCfg.Silencing.Enabled {
|
|
if isOpsAlertSilenced(time.Now().UTC(), rule, event, runtimeCfg.Silencing) {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Apply/update rate limiter.
|
|
s.emailLimiter.SetLimit(emailCfg.Alert.RateLimitPerHour)
|
|
|
|
subject := fmt.Sprintf("[Ops Alert][%s] %s", strings.TrimSpace(rule.Severity), strings.TrimSpace(rule.Name))
|
|
body := buildOpsAlertEmailBody(rule, event)
|
|
|
|
anySent := false
|
|
for _, to := range emailCfg.Alert.Recipients {
|
|
addr := strings.TrimSpace(to)
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
if !s.emailLimiter.Allow(time.Now().UTC()) {
|
|
continue
|
|
}
|
|
if err := s.emailService.SendEmail(ctx, addr, subject, body); err != nil {
|
|
// Ignore per-recipient failures; continue best-effort.
|
|
continue
|
|
}
|
|
anySent = true
|
|
}
|
|
|
|
if anySent {
|
|
_ = s.opsRepo.UpdateAlertEventEmailSent(context.Background(), event.ID, true)
|
|
}
|
|
}
|
|
|
|
func buildOpsAlertEmailBody(rule *OpsAlertRule, event *OpsAlertEvent) string {
|
|
if rule == nil || event == nil {
|
|
return ""
|
|
}
|
|
metric := strings.TrimSpace(rule.MetricType)
|
|
value := "-"
|
|
threshold := fmt.Sprintf("%.2f", rule.Threshold)
|
|
if event.MetricValue != nil {
|
|
value = fmt.Sprintf("%.2f", *event.MetricValue)
|
|
}
|
|
if event.ThresholdValue != nil {
|
|
threshold = fmt.Sprintf("%.2f", *event.ThresholdValue)
|
|
}
|
|
return fmt.Sprintf(`
|
|
<h2>Ops Alert</h2>
|
|
<p><b>Rule</b>: %s</p>
|
|
<p><b>Severity</b>: %s</p>
|
|
<p><b>Status</b>: %s</p>
|
|
<p><b>Metric</b>: %s %s %s</p>
|
|
<p><b>Fired at</b>: %s</p>
|
|
<p><b>Description</b>: %s</p>
|
|
`,
|
|
htmlEscape(rule.Name),
|
|
htmlEscape(rule.Severity),
|
|
htmlEscape(event.Status),
|
|
htmlEscape(metric),
|
|
htmlEscape(rule.Operator),
|
|
htmlEscape(fmt.Sprintf("%s (threshold %s)", value, threshold)),
|
|
event.FiredAt.Format(time.RFC3339),
|
|
htmlEscape(event.Description),
|
|
)
|
|
}
|
|
|
|
func shouldSendOpsAlertEmailByMinSeverity(minSeverity string, ruleSeverity string) bool {
|
|
minSeverity = strings.ToLower(strings.TrimSpace(minSeverity))
|
|
if minSeverity == "" {
|
|
return true
|
|
}
|
|
|
|
eventLevel := opsEmailSeverityForOps(ruleSeverity)
|
|
minLevel := strings.ToLower(minSeverity)
|
|
|
|
rank := func(level string) int {
|
|
switch level {
|
|
case "critical":
|
|
return 3
|
|
case "warning":
|
|
return 2
|
|
case "info":
|
|
return 1
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
return rank(eventLevel) >= rank(minLevel)
|
|
}
|
|
|
|
func opsEmailSeverityForOps(severity string) string {
|
|
switch strings.ToUpper(strings.TrimSpace(severity)) {
|
|
case "P0":
|
|
return "critical"
|
|
case "P1":
|
|
return "warning"
|
|
default:
|
|
return "info"
|
|
}
|
|
}
|
|
|
|
func isOpsAlertSilenced(now time.Time, rule *OpsAlertRule, event *OpsAlertEvent, silencing OpsAlertSilencingSettings) bool {
|
|
if !silencing.Enabled {
|
|
return false
|
|
}
|
|
if now.IsZero() {
|
|
now = time.Now().UTC()
|
|
}
|
|
if strings.TrimSpace(silencing.GlobalUntilRFC3339) != "" {
|
|
if t, err := time.Parse(time.RFC3339, strings.TrimSpace(silencing.GlobalUntilRFC3339)); err == nil {
|
|
if now.Before(t) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, entry := range silencing.Entries {
|
|
untilRaw := strings.TrimSpace(entry.UntilRFC3339)
|
|
if untilRaw == "" {
|
|
continue
|
|
}
|
|
until, err := time.Parse(time.RFC3339, untilRaw)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if now.After(until) {
|
|
continue
|
|
}
|
|
if entry.RuleID != nil && rule != nil && rule.ID > 0 && *entry.RuleID != rule.ID {
|
|
continue
|
|
}
|
|
if len(entry.Severities) > 0 {
|
|
match := false
|
|
for _, s := range entry.Severities {
|
|
if strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(event.Severity)) || strings.EqualFold(strings.TrimSpace(s), strings.TrimSpace(rule.Severity)) {
|
|
match = true
|
|
break
|
|
}
|
|
}
|
|
if !match {
|
|
continue
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) tryAcquireLeaderLock(ctx context.Context, lock OpsDistributedLockSettings) (func(), bool) {
|
|
if !lock.Enabled {
|
|
return nil, true
|
|
}
|
|
if s.redisClient == nil {
|
|
s.warnNoRedisOnce.Do(func() {
|
|
log.Printf("[OpsAlertEvaluator] redis not configured; running without distributed lock")
|
|
})
|
|
return nil, true
|
|
}
|
|
key := strings.TrimSpace(lock.Key)
|
|
if key == "" {
|
|
key = opsAlertEvaluatorLeaderLockKey
|
|
}
|
|
ttl := time.Duration(lock.TTLSeconds) * time.Second
|
|
if ttl <= 0 {
|
|
ttl = opsAlertEvaluatorLeaderLockTTL
|
|
}
|
|
|
|
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
|
|
if err != nil {
|
|
// Prefer fail-closed to avoid duplicate evaluators stampeding the DB when Redis is flaky.
|
|
// Single-node deployments can disable the distributed lock via runtime settings.
|
|
s.warnNoRedisOnce.Do(func() {
|
|
log.Printf("[OpsAlertEvaluator] leader lock SetNX failed; skipping this cycle: %v", err)
|
|
})
|
|
return nil, false
|
|
}
|
|
if !ok {
|
|
s.maybeLogSkip(key)
|
|
return nil, false
|
|
}
|
|
return func() {
|
|
_, _ = opsAlertEvaluatorReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
|
|
}, true
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) maybeLogSkip(key string) {
|
|
s.skipLogMu.Lock()
|
|
defer s.skipLogMu.Unlock()
|
|
|
|
now := time.Now()
|
|
if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < opsAlertEvaluatorSkipLogInterval {
|
|
return
|
|
}
|
|
s.skipLogAt = now
|
|
log.Printf("[OpsAlertEvaluator] leader lock held by another instance; skipping (key=%q)", key)
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) {
|
|
if s == nil || s.opsRepo == nil {
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
durMs := duration.Milliseconds()
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
|
JobName: opsAlertEvaluatorJobName,
|
|
LastRunAt: &runAt,
|
|
LastSuccessAt: &now,
|
|
LastDurationMs: &durMs,
|
|
})
|
|
}
|
|
|
|
func (s *OpsAlertEvaluatorService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
|
|
if s == nil || s.opsRepo == nil || err == nil {
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
durMs := duration.Milliseconds()
|
|
msg := truncateString(err.Error(), 2048)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
|
|
JobName: opsAlertEvaluatorJobName,
|
|
LastRunAt: &runAt,
|
|
LastErrorAt: &now,
|
|
LastError: &msg,
|
|
LastDurationMs: &durMs,
|
|
})
|
|
}
|
|
|
|
func htmlEscape(s string) string {
|
|
replacer := strings.NewReplacer(
|
|
"&", "&",
|
|
"<", "<",
|
|
">", ">",
|
|
`"`, """,
|
|
"'", "'",
|
|
)
|
|
return replacer.Replace(s)
|
|
}
|
|
|
|
type slidingWindowLimiter struct {
|
|
mu sync.Mutex
|
|
limit int
|
|
window time.Duration
|
|
sent []time.Time
|
|
}
|
|
|
|
func newSlidingWindowLimiter(limit int, window time.Duration) *slidingWindowLimiter {
|
|
if window <= 0 {
|
|
window = time.Hour
|
|
}
|
|
return &slidingWindowLimiter{
|
|
limit: limit,
|
|
window: window,
|
|
sent: []time.Time{},
|
|
}
|
|
}
|
|
|
|
func (l *slidingWindowLimiter) SetLimit(limit int) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
l.limit = limit
|
|
}
|
|
|
|
func (l *slidingWindowLimiter) Allow(now time.Time) bool {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
if l.limit <= 0 {
|
|
return true
|
|
}
|
|
cutoff := now.Add(-l.window)
|
|
keep := l.sent[:0]
|
|
for _, t := range l.sent {
|
|
if t.After(cutoff) {
|
|
keep = append(keep, t)
|
|
}
|
|
}
|
|
l.sent = keep
|
|
if len(l.sent) >= l.limit {
|
|
return false
|
|
}
|
|
l.sent = append(l.sent, now)
|
|
return true
|
|
}
|
|
|
|
// computeGroupAvailableRatio returns the available percentage for a group.
|
|
// Formula: (AvailableCount / TotalAccounts) * 100.
|
|
// Returns 0 when TotalAccounts is 0.
|
|
func computeGroupAvailableRatio(group *GroupAvailability) float64 {
|
|
if group == nil || group.TotalAccounts <= 0 {
|
|
return 0
|
|
}
|
|
return (float64(group.AvailableCount) / float64(group.TotalAccounts)) * 100
|
|
}
|
|
|
|
// countAccountsByCondition counts accounts that satisfy the given condition.
|
|
func countAccountsByCondition(accounts map[int64]*AccountAvailability, condition func(*AccountAvailability) bool) int64 {
|
|
if len(accounts) == 0 || condition == nil {
|
|
return 0
|
|
}
|
|
var count int64
|
|
for _, account := range accounts {
|
|
if account != nil && condition(account) {
|
|
count++
|
|
}
|
|
}
|
|
return count
|
|
}
|