243 lines
7.3 KiB
Go
243 lines
7.3 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
|
)
|
|
|
|
const (
|
|
defaultDashboardAggregationTimeout = 2 * time.Minute
|
|
defaultDashboardAggregationBackfillTimeout = 30 * time.Minute
|
|
dashboardAggregationRetentionInterval = 6 * time.Hour
|
|
)
|
|
|
|
var (
|
|
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
|
|
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
|
|
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
|
|
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
|
|
)
|
|
|
|
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
|
|
type DashboardAggregationRepository interface {
|
|
AggregateRange(ctx context.Context, start, end time.Time) error
|
|
GetAggregationWatermark(ctx context.Context) (time.Time, error)
|
|
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
|
|
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
|
|
CleanupUsageLogs(ctx context.Context, cutoff time.Time) error
|
|
EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error
|
|
}
|
|
|
|
// DashboardAggregationService 负责定时聚合与回填。
|
|
type DashboardAggregationService struct {
|
|
repo DashboardAggregationRepository
|
|
timingWheel *TimingWheelService
|
|
cfg config.DashboardAggregationConfig
|
|
running int32
|
|
lastRetentionCleanup atomic.Value // time.Time
|
|
}
|
|
|
|
// NewDashboardAggregationService 创建聚合服务。
|
|
func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
|
|
var aggCfg config.DashboardAggregationConfig
|
|
if cfg != nil {
|
|
aggCfg = cfg.DashboardAgg
|
|
}
|
|
return &DashboardAggregationService{
|
|
repo: repo,
|
|
timingWheel: timingWheel,
|
|
cfg: aggCfg,
|
|
}
|
|
}
|
|
|
|
// Start 启动定时聚合作业(重启生效配置)。
|
|
func (s *DashboardAggregationService) Start() {
|
|
if s == nil || s.repo == nil || s.timingWheel == nil {
|
|
return
|
|
}
|
|
if !s.cfg.Enabled {
|
|
log.Printf("[DashboardAggregation] 聚合作业已禁用")
|
|
return
|
|
}
|
|
|
|
interval := time.Duration(s.cfg.IntervalSeconds) * time.Second
|
|
if interval <= 0 {
|
|
interval = time.Minute
|
|
}
|
|
|
|
if s.cfg.RecomputeDays > 0 {
|
|
go s.recomputeRecentDays()
|
|
}
|
|
|
|
s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() {
|
|
s.runScheduledAggregation()
|
|
})
|
|
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
|
|
if !s.cfg.BackfillEnabled {
|
|
log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填")
|
|
}
|
|
}
|
|
|
|
// TriggerBackfill 触发回填(异步)。
|
|
func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error {
|
|
if s == nil || s.repo == nil {
|
|
return errors.New("聚合服务未初始化")
|
|
}
|
|
if !s.cfg.BackfillEnabled {
|
|
log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false")
|
|
return ErrDashboardBackfillDisabled
|
|
}
|
|
if !end.After(start) {
|
|
return errors.New("回填时间范围无效")
|
|
}
|
|
if s.cfg.BackfillMaxDays > 0 {
|
|
maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour
|
|
if end.Sub(start) > maxRange {
|
|
return ErrDashboardBackfillTooLarge
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
|
defer cancel()
|
|
if err := s.backfillRange(ctx, start, end); err != nil {
|
|
log.Printf("[DashboardAggregation] 回填失败: %v", err)
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (s *DashboardAggregationService) recomputeRecentDays() {
|
|
days := s.cfg.RecomputeDays
|
|
if days <= 0 {
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
start := now.AddDate(0, 0, -days)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
|
defer cancel()
|
|
if err := s.backfillRange(ctx, start, now); err != nil {
|
|
log.Printf("[DashboardAggregation] 启动重算失败: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (s *DashboardAggregationService) runScheduledAggregation() {
|
|
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
|
|
return
|
|
}
|
|
defer atomic.StoreInt32(&s.running, 0)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
|
|
defer cancel()
|
|
|
|
now := time.Now().UTC()
|
|
last, err := s.repo.GetAggregationWatermark(ctx)
|
|
if err != nil {
|
|
log.Printf("[DashboardAggregation] 读取水位失败: %v", err)
|
|
last = time.Unix(0, 0).UTC()
|
|
}
|
|
|
|
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
|
|
epoch := time.Unix(0, 0).UTC()
|
|
start := last.Add(-lookback)
|
|
if !last.After(epoch) {
|
|
retentionDays := s.cfg.Retention.UsageLogsDays
|
|
if retentionDays <= 0 {
|
|
retentionDays = 1
|
|
}
|
|
start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays))
|
|
} else if start.After(now) {
|
|
start = now.Add(-lookback)
|
|
}
|
|
|
|
if err := s.aggregateRange(ctx, start, now); err != nil {
|
|
log.Printf("[DashboardAggregation] 聚合失败: %v", err)
|
|
return
|
|
}
|
|
|
|
if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil {
|
|
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
|
|
}
|
|
|
|
s.maybeCleanupRetention(ctx, now)
|
|
}
|
|
|
|
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
|
|
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
|
|
return errors.New("聚合作业正在运行")
|
|
}
|
|
defer atomic.StoreInt32(&s.running, 0)
|
|
|
|
startUTC := start.UTC()
|
|
endUTC := end.UTC()
|
|
if !endUTC.After(startUTC) {
|
|
return errors.New("回填时间范围无效")
|
|
}
|
|
|
|
cursor := truncateToDayUTC(startUTC)
|
|
for cursor.Before(endUTC) {
|
|
windowEnd := cursor.Add(24 * time.Hour)
|
|
if windowEnd.After(endUTC) {
|
|
windowEnd = endUTC
|
|
}
|
|
if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil {
|
|
return err
|
|
}
|
|
cursor = windowEnd
|
|
}
|
|
|
|
if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil {
|
|
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
|
|
}
|
|
|
|
s.maybeCleanupRetention(ctx, endUTC)
|
|
return nil
|
|
}
|
|
|
|
func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error {
|
|
if !end.After(start) {
|
|
return nil
|
|
}
|
|
if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil {
|
|
log.Printf("[DashboardAggregation] 分区检查失败: %v", err)
|
|
}
|
|
return s.repo.AggregateRange(ctx, start, end)
|
|
}
|
|
|
|
func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) {
|
|
lastAny := s.lastRetentionCleanup.Load()
|
|
if lastAny != nil {
|
|
if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval {
|
|
return
|
|
}
|
|
}
|
|
|
|
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
|
|
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
|
|
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
|
|
|
|
aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff)
|
|
if aggErr != nil {
|
|
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr)
|
|
}
|
|
usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff)
|
|
if usageErr != nil {
|
|
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr)
|
|
}
|
|
if aggErr == nil && usageErr == nil {
|
|
s.lastRetentionCleanup.Store(now)
|
|
}
|
|
}
|
|
|
|
func truncateToDayUTC(t time.Time) time.Time {
|
|
t = t.UTC()
|
|
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
|
}
|