Files
yinghuoapi/backend/internal/service/dashboard_aggregation_service.go
2026-02-02 22:13:50 +08:00

316 lines
9.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"errors"
"log"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
)
const (
defaultDashboardAggregationTimeout = 2 * time.Minute
defaultDashboardAggregationBackfillTimeout = 30 * time.Minute
dashboardAggregationRetentionInterval = 6 * time.Hour
)
var (
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
errDashboardAggregationRunning = errors.New("聚合作业正在运行")
)
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
type DashboardAggregationRepository interface {
AggregateRange(ctx context.Context, start, end time.Time) error
// RecomputeRange 重新计算指定时间范围内的聚合数据(包含活跃用户等派生表)。
// 设计目的:当 usage_logs 被批量删除/回滚后,确保聚合表可恢复一致性。
RecomputeRange(ctx context.Context, start, end time.Time) error
GetAggregationWatermark(ctx context.Context) (time.Time, error)
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
CleanupUsageLogs(ctx context.Context, cutoff time.Time) error
EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error
}
// DashboardAggregationService 负责定时聚合与回填。
type DashboardAggregationService struct {
repo DashboardAggregationRepository
timingWheel *TimingWheelService
cfg config.DashboardAggregationConfig
running int32
lastRetentionCleanup atomic.Value // time.Time
}
// NewDashboardAggregationService 创建聚合服务。
func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
var aggCfg config.DashboardAggregationConfig
if cfg != nil {
aggCfg = cfg.DashboardAgg
}
return &DashboardAggregationService{
repo: repo,
timingWheel: timingWheel,
cfg: aggCfg,
}
}
// Start 启动定时聚合作业(重启生效配置)。
func (s *DashboardAggregationService) Start() {
if s == nil || s.repo == nil || s.timingWheel == nil {
return
}
if !s.cfg.Enabled {
log.Printf("[DashboardAggregation] 聚合作业已禁用")
return
}
interval := time.Duration(s.cfg.IntervalSeconds) * time.Second
if interval <= 0 {
interval = time.Minute
}
if s.cfg.RecomputeDays > 0 {
go s.recomputeRecentDays()
}
s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() {
s.runScheduledAggregation()
})
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
if !s.cfg.BackfillEnabled {
log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填")
}
}
// TriggerBackfill 触发回填(异步)。
func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.BackfillEnabled {
log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false")
return ErrDashboardBackfillDisabled
}
if !end.After(start) {
return errors.New("回填时间范围无效")
}
if s.cfg.BackfillMaxDays > 0 {
maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour
if end.Sub(start) > maxRange {
return ErrDashboardBackfillTooLarge
}
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, end); err != nil {
log.Printf("[DashboardAggregation] 回填失败: %v", err)
}
}()
return nil
}
// TriggerRecomputeRange 触发指定范围的重新计算(异步)。
// 与 TriggerBackfill 不同:
// - 不依赖 backfill_enabled这是内部一致性修复
// - 不更新 watermark避免影响正常增量聚合游标
func (s *DashboardAggregationService) TriggerRecomputeRange(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.Enabled {
return errors.New("聚合服务已禁用")
}
if !end.After(start) {
return errors.New("重新计算时间范围无效")
}
go func() {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
err := s.recomputeRange(ctx, start, end)
cancel()
if err == nil {
return
}
if !errors.Is(err, errDashboardAggregationRunning) {
log.Printf("[DashboardAggregation] 重新计算失败: %v", err)
return
}
time.Sleep(5 * time.Second)
}
log.Printf("[DashboardAggregation] 重新计算放弃: 聚合作业持续占用")
}()
return nil
}
func (s *DashboardAggregationService) recomputeRecentDays() {
days := s.cfg.RecomputeDays
if days <= 0 {
return
}
now := time.Now().UTC()
start := now.AddDate(0, 0, -days)
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 启动重算失败: %v", err)
return
}
}
func (s *DashboardAggregationService) recomputeRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)
jobStart := time.Now().UTC()
if err := s.repo.RecomputeRange(ctx, start, end); err != nil {
return err
}
log.Printf("[DashboardAggregation] 重新计算完成 (start=%s end=%s duration=%s)",
start.UTC().Format(time.RFC3339),
end.UTC().Format(time.RFC3339),
time.Since(jobStart).String(),
)
return nil
}
func (s *DashboardAggregationService) runScheduledAggregation() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return
}
defer atomic.StoreInt32(&s.running, 0)
jobStart := time.Now().UTC()
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
defer cancel()
now := time.Now().UTC()
last, err := s.repo.GetAggregationWatermark(ctx)
if err != nil {
log.Printf("[DashboardAggregation] 读取水位失败: %v", err)
last = time.Unix(0, 0).UTC()
}
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
epoch := time.Unix(0, 0).UTC()
start := last.Add(-lookback)
if !last.After(epoch) {
retentionDays := s.cfg.Retention.UsageLogsDays
if retentionDays <= 0 {
retentionDays = 1
}
start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays))
} else if start.After(now) {
start = now.Add(-lookback)
}
if err := s.aggregateRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 聚合失败: %v", err)
return
}
updateErr := s.repo.UpdateAggregationWatermark(ctx, now)
if updateErr != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
}
log.Printf("[DashboardAggregation] 聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
start.Format(time.RFC3339),
now.Format(time.RFC3339),
time.Since(jobStart).String(),
updateErr == nil,
)
s.maybeCleanupRetention(ctx, now)
}
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errDashboardAggregationRunning
}
defer atomic.StoreInt32(&s.running, 0)
jobStart := time.Now().UTC()
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return errors.New("回填时间范围无效")
}
cursor := truncateToDayUTC(startUTC)
for cursor.Before(endUTC) {
windowEnd := cursor.Add(24 * time.Hour)
if windowEnd.After(endUTC) {
windowEnd = endUTC
}
if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil {
return err
}
cursor = windowEnd
}
updateErr := s.repo.UpdateAggregationWatermark(ctx, endUTC)
if updateErr != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
}
log.Printf("[DashboardAggregation] 回填聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
startUTC.Format(time.RFC3339),
endUTC.Format(time.RFC3339),
time.Since(jobStart).String(),
updateErr == nil,
)
s.maybeCleanupRetention(ctx, endUTC)
return nil
}
func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error {
if !end.After(start) {
return nil
}
if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil {
log.Printf("[DashboardAggregation] 分区检查失败: %v", err)
}
return s.repo.AggregateRange(ctx, start, end)
}
func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) {
lastAny := s.lastRetentionCleanup.Load()
if lastAny != nil {
if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval {
return
}
}
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff)
if aggErr != nil {
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr)
}
usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff)
if usageErr != nil {
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr)
}
if aggErr == nil && usageErr == nil {
s.lastRetentionCleanup.Store(now)
}
}
func truncateToDayUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}