feat(仪表盘): 引入预聚合统计与聚合作业

This commit is contained in:
yangjianbo
2026-01-11 16:01:35 +08:00
parent ab5839b461
commit 1a869547d7
19 changed files with 1366 additions and 62 deletions

View File

@@ -0,0 +1,224 @@
package service
import (
"context"
"errors"
"log"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
)
const (
defaultDashboardAggregationTimeout = 2 * time.Minute
defaultDashboardAggregationBackfillTimeout = 30 * time.Minute
dashboardAggregationRetentionInterval = 6 * time.Hour
)
var (
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
)
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
type DashboardAggregationRepository interface {
AggregateRange(ctx context.Context, start, end time.Time) error
GetAggregationWatermark(ctx context.Context) (time.Time, error)
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
CleanupUsageLogs(ctx context.Context, cutoff time.Time) error
EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error
}
// DashboardAggregationService 负责定时聚合与回填。
type DashboardAggregationService struct {
repo DashboardAggregationRepository
timingWheel *TimingWheelService
cfg config.DashboardAggregationConfig
running int32
lastRetentionCleanup atomic.Value // time.Time
}
// NewDashboardAggregationService 创建聚合服务。
func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
var aggCfg config.DashboardAggregationConfig
if cfg != nil {
aggCfg = cfg.DashboardAgg
}
return &DashboardAggregationService{
repo: repo,
timingWheel: timingWheel,
cfg: aggCfg,
}
}
// Start 启动定时聚合作业(重启生效配置)。
func (s *DashboardAggregationService) Start() {
if s == nil || s.repo == nil || s.timingWheel == nil {
return
}
if !s.cfg.Enabled {
log.Printf("[DashboardAggregation] 聚合作业已禁用")
return
}
interval := time.Duration(s.cfg.IntervalSeconds) * time.Second
if interval <= 0 {
interval = time.Minute
}
if s.cfg.RecomputeDays > 0 {
go s.recomputeRecentDays()
}
s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() {
s.runScheduledAggregation()
})
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
}
// TriggerBackfill 触发回填(异步)。
func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.BackfillEnabled {
log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false")
return ErrDashboardBackfillDisabled
}
if !end.After(start) {
return errors.New("回填时间范围无效")
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, end); err != nil {
log.Printf("[DashboardAggregation] 回填失败: %v", err)
}
}()
return nil
}
func (s *DashboardAggregationService) recomputeRecentDays() {
days := s.cfg.RecomputeDays
if days <= 0 {
return
}
now := time.Now().UTC()
start := now.AddDate(0, 0, -days)
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 启动重算失败: %v", err)
return
}
}
func (s *DashboardAggregationService) runScheduledAggregation() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return
}
defer atomic.StoreInt32(&s.running, 0)
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
defer cancel()
now := time.Now().UTC()
last, err := s.repo.GetAggregationWatermark(ctx)
if err != nil {
log.Printf("[DashboardAggregation] 读取水位失败: %v", err)
last = time.Unix(0, 0).UTC()
}
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
start := last.Add(-lookback)
epoch := time.Unix(0, 0).UTC()
if !last.After(epoch) {
start = now.Add(-lookback)
}
if start.After(now) {
start = now.Add(-lookback)
}
if err := s.aggregateRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 聚合失败: %v", err)
return
}
if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
}
s.maybeCleanupRetention(ctx, now)
}
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errors.New("聚合作业正在运行")
}
defer atomic.StoreInt32(&s.running, 0)
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return errors.New("回填时间范围无效")
}
cursor := truncateToDayUTC(startUTC)
for cursor.Before(endUTC) {
windowEnd := cursor.Add(24 * time.Hour)
if windowEnd.After(endUTC) {
windowEnd = endUTC
}
if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil {
return err
}
cursor = windowEnd
}
if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
}
s.maybeCleanupRetention(ctx, endUTC)
return nil
}
func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error {
if !end.After(start) {
return nil
}
if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil {
log.Printf("[DashboardAggregation] 分区检查失败: %v", err)
}
return s.repo.AggregateRange(ctx, start, end)
}
func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) {
lastAny := s.lastRetentionCleanup.Load()
if lastAny != nil {
if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval {
return
}
}
s.lastRetentionCleanup.Store(now)
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil {
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err)
}
if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil {
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err)
}
}
func truncateToDayUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}

View File

@@ -37,17 +37,24 @@ type dashboardStatsCacheEntry struct {
// DashboardService provides aggregated statistics for admin dashboard.
type DashboardService struct {
usageRepo UsageLogRepository
aggRepo DashboardAggregationRepository
cache DashboardStatsCache
cacheFreshTTL time.Duration
cacheTTL time.Duration
refreshTimeout time.Duration
refreshing int32
aggEnabled bool
aggInterval time.Duration
aggLookback time.Duration
}
func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService {
func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService {
freshTTL := defaultDashboardStatsFreshTTL
cacheTTL := defaultDashboardStatsCacheTTL
refreshTimeout := defaultDashboardStatsRefreshTimeout
aggEnabled := true
aggInterval := time.Minute
aggLookback := 2 * time.Minute
if cfg != nil {
if !cfg.Dashboard.Enabled {
cache = nil
@@ -61,13 +68,24 @@ func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache
if cfg.Dashboard.StatsRefreshTimeoutSeconds > 0 {
refreshTimeout = time.Duration(cfg.Dashboard.StatsRefreshTimeoutSeconds) * time.Second
}
aggEnabled = cfg.DashboardAgg.Enabled
if cfg.DashboardAgg.IntervalSeconds > 0 {
aggInterval = time.Duration(cfg.DashboardAgg.IntervalSeconds) * time.Second
}
if cfg.DashboardAgg.LookbackSeconds > 0 {
aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second
}
}
return &DashboardService{
usageRepo: usageRepo,
aggRepo: aggRepo,
cache: cache,
cacheFreshTTL: freshTTL,
cacheTTL: cacheTTL,
refreshTimeout: refreshTimeout,
aggEnabled: aggEnabled,
aggInterval: aggInterval,
aggLookback: aggLookback,
}
}
@@ -75,6 +93,7 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D
if s.cache != nil {
cached, fresh, err := s.getCachedDashboardStats(ctx)
if err == nil && cached != nil {
s.refreshAggregationStaleness(cached)
if !fresh {
s.refreshDashboardStatsAsync()
}
@@ -133,6 +152,7 @@ func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagesta
if err != nil {
return nil, err
}
s.applyAggregationStatus(ctx, stats)
cacheCtx, cancel := s.cacheOperationContext()
defer cancel()
s.saveDashboardStatsCache(cacheCtx, stats)
@@ -158,6 +178,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() {
log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err)
return
}
s.applyAggregationStatus(ctx, stats)
cacheCtx, cancel := s.cacheOperationContext()
defer cancel()
s.saveDashboardStatsCache(cacheCtx, stats)
@@ -203,6 +224,61 @@ func (s *DashboardService) cacheOperationContext() (context.Context, context.Can
return context.WithTimeout(context.Background(), s.refreshTimeout)
}
func (s *DashboardService) applyAggregationStatus(ctx context.Context, stats *usagestats.DashboardStats) {
if stats == nil {
return
}
updatedAt := s.fetchAggregationUpdatedAt(ctx)
stats.StatsUpdatedAt = updatedAt.UTC().Format(time.RFC3339)
stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC())
}
func (s *DashboardService) refreshAggregationStaleness(stats *usagestats.DashboardStats) {
if stats == nil {
return
}
updatedAt := parseStatsUpdatedAt(stats.StatsUpdatedAt)
stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC())
}
func (s *DashboardService) fetchAggregationUpdatedAt(ctx context.Context) time.Time {
if s.aggRepo == nil {
return time.Unix(0, 0).UTC()
}
updatedAt, err := s.aggRepo.GetAggregationWatermark(ctx)
if err != nil {
log.Printf("[Dashboard] 读取聚合水位失败: %v", err)
return time.Unix(0, 0).UTC()
}
if updatedAt.IsZero() {
return time.Unix(0, 0).UTC()
}
return updatedAt.UTC()
}
func (s *DashboardService) isAggregationStale(updatedAt, now time.Time) bool {
if !s.aggEnabled {
return true
}
epoch := time.Unix(0, 0).UTC()
if !updatedAt.After(epoch) {
return true
}
threshold := s.aggInterval + s.aggLookback
return now.Sub(updatedAt) > threshold
}
func parseStatsUpdatedAt(raw string) time.Time {
if raw == "" {
return time.Unix(0, 0).UTC()
}
parsed, err := time.Parse(time.RFC3339, raw)
if err != nil {
return time.Unix(0, 0).UTC()
}
return parsed.UTC()
}
func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) {
trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit)
if err != nil {

View File

@@ -74,6 +74,38 @@ func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error {
return nil
}
type dashboardAggregationRepoStub struct {
watermark time.Time
err error
}
func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
if s.err != nil {
return time.Time{}, s.err
}
return s.watermark, nil
}
func (s *dashboardAggregationRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry {
t.Helper()
c.lastSetMu.Lock()
@@ -88,7 +120,9 @@ func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntr
func TestDashboardService_CacheHitFresh(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 10,
TotalUsers: 10,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
entry := dashboardStatsCacheEntry{
Stats: stats,
@@ -105,8 +139,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) {
repo := &usageRepoStub{
stats: &usagestats.DashboardStats{TotalUsers: 99},
}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
@@ -118,7 +153,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) {
func TestDashboardService_CacheMiss_StoresCache(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 7,
TotalUsers: 7,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
@@ -126,8 +163,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) {
},
}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
@@ -142,7 +180,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) {
func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 3,
TotalUsers: 3,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
@@ -150,8 +190,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) {
},
}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
@@ -163,7 +204,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) {
func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) {
staleStats := &usagestats.DashboardStats{
TotalUsers: 11,
TotalUsers: 11,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
entry := dashboardStatsCacheEntry{
Stats: staleStats,
@@ -182,8 +225,9 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) {
stats: &usagestats.DashboardStats{TotalUsers: 22},
onCall: refreshCh,
}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
@@ -207,8 +251,9 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) {
}
stats := &usagestats.DashboardStats{TotalUsers: 9}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
@@ -224,10 +269,45 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) {
},
}
repo := &usageRepoStub{err: errors.New("db down")}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
svc := NewDashboardService(repo, cache, cfg)
svc := NewDashboardService(repo, aggRepo, cache, cfg)
_, err := svc.GetDashboardStats(context.Background())
require.Error(t, err)
require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls))
}
func TestDashboardService_StatsUpdatedAtEpochWhenMissing(t *testing.T) {
stats := &usagestats.DashboardStats{}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}}
svc := NewDashboardService(repo, aggRepo, nil, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, "1970-01-01T00:00:00Z", got.StatsUpdatedAt)
require.True(t, got.StatsStale)
}
func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) {
aggNow := time.Now().UTC().Truncate(time.Second)
stats := &usagestats.DashboardStats{}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: aggNow}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: false},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
IntervalSeconds: 60,
LookbackSeconds: 120,
},
}
svc := NewDashboardService(repo, aggRepo, nil, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt)
require.False(t, got.StatsStale)
}

View File

@@ -47,6 +47,13 @@ func ProvideTokenRefreshService(
return svc
}
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
@@ -126,6 +133,7 @@ var ProviderSet = wire.NewSet(
ProvideTokenRefreshService,
ProvideAccountExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,