Files
xinghuoapi/backend/internal/service/ops_aggregation_service.go
IanShaw027 23aa69f56f refactor(ops): 优化任务心跳和组件刷新机制
后端改动:
- 添加 ops_job_heartbeats.last_result 字段记录任务执行结果
- 优化告警评估器统计信息(规则数/事件数/邮件数)
- 统一各定时任务的心跳记录格式

前端改动:
- 重构 OpsConcurrencyCard 使用父组件统一控制刷新节奏
- 移除独立的 5 秒刷新定时器,改用 refreshToken 机制
- 修复 TypeScript 类型错误
2026-01-15 21:31:55 +08:00

449 lines
10 KiB
Go

package service
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
const (
opsAggHourlyJobName = "ops_preaggregation_hourly"
opsAggDailyJobName = "ops_preaggregation_daily"
opsAggHourlyInterval = 10 * time.Minute
opsAggDailyInterval = 1 * time.Hour
// Keep in sync with ops retention target (vNext default 30d).
opsAggBackfillWindow = 30 * 24 * time.Hour
// Recompute overlap to absorb late-arriving rows near boundaries.
opsAggHourlyOverlap = 2 * time.Hour
opsAggDailyOverlap = 48 * time.Hour
opsAggHourlyChunk = 24 * time.Hour
opsAggDailyChunk = 7 * 24 * time.Hour
// Delay around boundaries (e.g. 10:00..10:05) to avoid aggregating buckets
// that may still receive late inserts.
opsAggSafeDelay = 5 * time.Minute
opsAggMaxQueryTimeout = 3 * time.Second
opsAggHourlyTimeout = 5 * time.Minute
opsAggDailyTimeout = 2 * time.Minute
opsAggHourlyLeaderLockKey = "ops:aggregation:hourly:leader"
opsAggDailyLeaderLockKey = "ops:aggregation:daily:leader"
opsAggHourlyLeaderLockTTL = 15 * time.Minute
opsAggDailyLeaderLockTTL = 10 * time.Minute
)
// OpsAggregationService periodically backfills ops_metrics_hourly / ops_metrics_daily
// for stable long-window dashboard queries.
//
// It is safe to run in multi-replica deployments when Redis is available (leader lock).
type OpsAggregationService struct {
opsRepo OpsRepository
settingRepo SettingRepository
cfg *config.Config
db *sql.DB
redisClient *redis.Client
instanceID string
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
hourlyMu sync.Mutex
dailyMu sync.Mutex
skipLogMu sync.Mutex
skipLogAt time.Time
}
func NewOpsAggregationService(
opsRepo OpsRepository,
settingRepo SettingRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsAggregationService {
return &OpsAggregationService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
}
}
func (s *OpsAggregationService) Start() {
if s == nil {
return
}
s.startOnce.Do(func() {
if s.stopCh == nil {
s.stopCh = make(chan struct{})
}
go s.hourlyLoop()
go s.dailyLoop()
})
}
func (s *OpsAggregationService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.stopCh != nil {
close(s.stopCh)
}
})
}
func (s *OpsAggregationService) hourlyLoop() {
// First run immediately.
s.aggregateHourly()
ticker := time.NewTicker(opsAggHourlyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateHourly()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) dailyLoop() {
// First run immediately.
s.aggregateDaily()
ticker := time.NewTicker(opsAggDailyInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.aggregateDaily()
case <-s.stopCh:
return
}
}
}
func (s *OpsAggregationService) aggregateHourly() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggHourlyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggHourlyLeaderLockKey, opsAggHourlyLeaderLockTTL, "[OpsAggregation][hourly]")
if !ok {
return
}
if release != nil {
defer release()
}
s.hourlyMu.Lock()
defer s.hourlyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
// Aggregate stable full hours only.
end := utcFloorToHour(time.Now().UTC().Add(-opsAggSafeDelay))
start := end.Add(-opsAggBackfillWindow)
// Resume from the latest bucket with overlap.
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestHourlyBucketStart(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][hourly] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggHourlyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToHour(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggHourlyChunk) {
chunkEnd := minTime(cursor.Add(opsAggHourlyChunk), end)
if err := s.opsRepo.UpsertHourlyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][hourly] upsert failed (%s..%s): %v", cursor.Format(time.RFC3339), chunkEnd.Format(time.RFC3339), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
result := truncateString(fmt.Sprintf("window=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), 2048)
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggHourlyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
LastResult: &result,
})
}
func (s *OpsAggregationService) aggregateDaily() {
if s == nil || s.opsRepo == nil {
return
}
if s.cfg != nil {
if !s.cfg.Ops.Enabled {
return
}
if !s.cfg.Ops.Aggregation.Enabled {
return
}
}
ctx, cancel := context.WithTimeout(context.Background(), opsAggDailyTimeout)
defer cancel()
if !s.isMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx, opsAggDailyLeaderLockKey, opsAggDailyLeaderLockTTL, "[OpsAggregation][daily]")
if !ok {
return
}
if release != nil {
defer release()
}
s.dailyMu.Lock()
defer s.dailyMu.Unlock()
startedAt := time.Now().UTC()
runAt := startedAt
end := utcFloorToDay(time.Now().UTC())
start := end.Add(-opsAggBackfillWindow)
{
ctxMax, cancelMax := context.WithTimeout(context.Background(), opsAggMaxQueryTimeout)
latest, ok, err := s.opsRepo.GetLatestDailyBucketDate(ctxMax)
cancelMax()
if err != nil {
log.Printf("[OpsAggregation][daily] failed to read latest bucket: %v", err)
} else if ok {
candidate := latest.Add(-opsAggDailyOverlap)
if candidate.After(start) {
start = candidate
}
}
}
start = utcFloorToDay(start)
if !start.Before(end) {
return
}
var aggErr error
for cursor := start; cursor.Before(end); cursor = cursor.Add(opsAggDailyChunk) {
chunkEnd := minTime(cursor.Add(opsAggDailyChunk), end)
if err := s.opsRepo.UpsertDailyMetrics(ctx, cursor, chunkEnd); err != nil {
aggErr = err
log.Printf("[OpsAggregation][daily] upsert failed (%s..%s): %v", cursor.Format("2006-01-02"), chunkEnd.Format("2006-01-02"), err)
break
}
}
finishedAt := time.Now().UTC()
durationMs := finishedAt.Sub(startedAt).Milliseconds()
dur := durationMs
if aggErr != nil {
msg := truncateString(aggErr.Error(), 2048)
errAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastErrorAt: &errAt,
LastError: &msg,
LastDurationMs: &dur,
})
return
}
successAt := finishedAt
hbCtx, hbCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer hbCancel()
result := truncateString(fmt.Sprintf("window=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), 2048)
_ = s.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
JobName: opsAggDailyJobName,
LastRunAt: &runAt,
LastSuccessAt: &successAt,
LastDurationMs: &dur,
LastResult: &result,
})
}
func (s *OpsAggregationService) isMonitoringEnabled(ctx context.Context) bool {
if s == nil {
return false
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return false
}
if s.settingRepo == nil {
return true
}
if ctx == nil {
ctx = context.Background()
}
value, err := s.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
return true
}
return true
}
switch strings.ToLower(strings.TrimSpace(value)) {
case "false", "0", "off", "disabled":
return false
default:
return true
}
}
var opsAggReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
func (s *OpsAggregationService) tryAcquireLeaderLock(ctx context.Context, key string, ttl time.Duration, logPrefix string) (func(), bool) {
if s == nil {
return nil, false
}
if ctx == nil {
ctx = context.Background()
}
// Prefer Redis leader lock when available (multi-instance), but avoid stampeding
// the DB when Redis is flaky by falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
release := func() {
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, _ = opsAggReleaseScript.Run(ctx2, s.redisClient, []string{key}, s.instanceID).Result()
}
return release, true
}
// Redis error: fall through to DB advisory lock.
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
s.maybeLogSkip(logPrefix)
return nil, false
}
return release, true
}
func (s *OpsAggregationService) maybeLogSkip(prefix string) {
s.skipLogMu.Lock()
defer s.skipLogMu.Unlock()
now := time.Now()
if !s.skipLogAt.IsZero() && now.Sub(s.skipLogAt) < time.Minute {
return
}
s.skipLogAt = now
if prefix == "" {
prefix = "[OpsAggregation]"
}
log.Printf("%s leader lock held by another instance; skipping", prefix)
}
func utcFloorToHour(t time.Time) time.Time {
return t.UTC().Truncate(time.Hour)
}
func utcFloorToDay(t time.Time) time.Time {
u := t.UTC()
y, m, d := u.Date()
return time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
}
func minTime(a, b time.Time) time.Time {
if a.Before(b) {
return a
}
return b
}