fix(仪表盘): 兼容禁用聚合与回填限制
This commit is contained in:
@@ -19,6 +19,8 @@ const (
|
||||
var (
|
||||
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
|
||||
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
|
||||
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
|
||||
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
|
||||
)
|
||||
|
||||
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
|
||||
@@ -76,6 +78,9 @@ func (s *DashboardAggregationService) Start() {
|
||||
s.runScheduledAggregation()
|
||||
})
|
||||
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
|
||||
if !s.cfg.BackfillEnabled {
|
||||
log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填")
|
||||
}
|
||||
}
|
||||
|
||||
// TriggerBackfill 触发回填(异步)。
|
||||
@@ -90,6 +95,12 @@ func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) erro
|
||||
if !end.After(start) {
|
||||
return errors.New("回填时间范围无效")
|
||||
}
|
||||
if s.cfg.BackfillMaxDays > 0 {
|
||||
maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour
|
||||
if end.Sub(start) > maxRange {
|
||||
return ErrDashboardBackfillTooLarge
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
||||
@@ -137,8 +148,11 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
|
||||
epoch := time.Unix(0, 0).UTC()
|
||||
start := last.Add(-lookback)
|
||||
if !last.After(epoch) {
|
||||
// 首次聚合覆盖当天,避免只统计最后一个窗口。
|
||||
start = truncateToDayUTC(now)
|
||||
retentionDays := s.cfg.Retention.UsageLogsDays
|
||||
if retentionDays <= 0 {
|
||||
retentionDays = 1
|
||||
}
|
||||
start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays))
|
||||
} else if start.After(now) {
|
||||
start = now.Add(-lookback)
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) {
|
||||
func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesRetentionStart(t *testing.T) {
|
||||
repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()}
|
||||
svc := &DashboardAggregationService{
|
||||
repo: repo,
|
||||
@@ -67,7 +67,7 @@ func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t
|
||||
|
||||
require.Equal(t, 1, repo.aggregateCalls)
|
||||
require.False(t, repo.lastEnd.IsZero())
|
||||
require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart)
|
||||
require.Equal(t, truncateToDayUTC(repo.lastEnd.AddDate(0, 0, -1)), repo.lastStart)
|
||||
}
|
||||
|
||||
func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) {
|
||||
@@ -87,3 +87,20 @@ func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *te
|
||||
|
||||
require.Nil(t, svc.lastRetentionCleanup.Load())
|
||||
}
|
||||
|
||||
func TestDashboardAggregationService_TriggerBackfill_TooLarge(t *testing.T) {
|
||||
repo := &dashboardAggregationRepoTestStub{}
|
||||
svc := &DashboardAggregationService{
|
||||
repo: repo,
|
||||
cfg: config.DashboardAggregationConfig{
|
||||
BackfillEnabled: true,
|
||||
BackfillMaxDays: 1,
|
||||
},
|
||||
}
|
||||
|
||||
start := time.Now().AddDate(0, 0, -3)
|
||||
end := time.Now()
|
||||
err := svc.TriggerBackfill(start, end)
|
||||
require.ErrorIs(t, err, ErrDashboardBackfillTooLarge)
|
||||
require.Equal(t, 0, repo.aggregateCalls)
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@ type DashboardStatsCache interface {
|
||||
DeleteDashboardStats(ctx context.Context) error
|
||||
}
|
||||
|
||||
type dashboardStatsRangeFetcher interface {
|
||||
GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error)
|
||||
}
|
||||
|
||||
type dashboardStatsCacheEntry struct {
|
||||
Stats *usagestats.DashboardStats `json:"stats"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
@@ -46,6 +50,7 @@ type DashboardService struct {
|
||||
aggEnabled bool
|
||||
aggInterval time.Duration
|
||||
aggLookback time.Duration
|
||||
aggUsageDays int
|
||||
}
|
||||
|
||||
func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService {
|
||||
@@ -55,6 +60,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat
|
||||
aggEnabled := true
|
||||
aggInterval := time.Minute
|
||||
aggLookback := 2 * time.Minute
|
||||
aggUsageDays := 90
|
||||
if cfg != nil {
|
||||
if !cfg.Dashboard.Enabled {
|
||||
cache = nil
|
||||
@@ -75,6 +81,9 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat
|
||||
if cfg.DashboardAgg.LookbackSeconds > 0 {
|
||||
aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second
|
||||
}
|
||||
if cfg.DashboardAgg.Retention.UsageLogsDays > 0 {
|
||||
aggUsageDays = cfg.DashboardAgg.Retention.UsageLogsDays
|
||||
}
|
||||
}
|
||||
return &DashboardService{
|
||||
usageRepo: usageRepo,
|
||||
@@ -86,6 +95,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat
|
||||
aggEnabled: aggEnabled,
|
||||
aggInterval: aggInterval,
|
||||
aggLookback: aggLookback,
|
||||
aggUsageDays: aggUsageDays,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +158,7 @@ func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usages
|
||||
}
|
||||
|
||||
func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
|
||||
stats, err := s.usageRepo.GetDashboardStats(ctx)
|
||||
stats, err := s.fetchDashboardStats(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -173,7 +183,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.refreshTimeout)
|
||||
defer cancel()
|
||||
|
||||
stats, err := s.usageRepo.GetDashboardStats(ctx)
|
||||
stats, err := s.fetchDashboardStats(ctx)
|
||||
if err != nil {
|
||||
log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err)
|
||||
return
|
||||
@@ -185,6 +195,17 @@ func (s *DashboardService) refreshDashboardStatsAsync() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *DashboardService) fetchDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
|
||||
if !s.aggEnabled {
|
||||
if fetcher, ok := s.usageRepo.(dashboardStatsRangeFetcher); ok {
|
||||
now := time.Now().UTC()
|
||||
start := truncateToDayUTC(now.AddDate(0, 0, -s.aggUsageDays))
|
||||
return fetcher.GetDashboardStatsWithRange(ctx, start, now)
|
||||
}
|
||||
}
|
||||
return s.usageRepo.GetDashboardStats(ctx)
|
||||
}
|
||||
|
||||
func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *usagestats.DashboardStats) {
|
||||
if s.cache == nil || stats == nil {
|
||||
return
|
||||
|
||||
@@ -16,10 +16,15 @@ import (
|
||||
|
||||
type usageRepoStub struct {
|
||||
UsageLogRepository
|
||||
stats *usagestats.DashboardStats
|
||||
err error
|
||||
calls int32
|
||||
onCall chan struct{}
|
||||
stats *usagestats.DashboardStats
|
||||
rangeStats *usagestats.DashboardStats
|
||||
err error
|
||||
rangeErr error
|
||||
calls int32
|
||||
rangeCalls int32
|
||||
rangeStart time.Time
|
||||
rangeEnd time.Time
|
||||
onCall chan struct{}
|
||||
}
|
||||
|
||||
func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
|
||||
@@ -36,6 +41,19 @@ func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.Dash
|
||||
return s.stats, nil
|
||||
}
|
||||
|
||||
func (s *usageRepoStub) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) {
|
||||
atomic.AddInt32(&s.rangeCalls, 1)
|
||||
s.rangeStart = start
|
||||
s.rangeEnd = end
|
||||
if s.rangeErr != nil {
|
||||
return nil, s.rangeErr
|
||||
}
|
||||
if s.rangeStats != nil {
|
||||
return s.rangeStats, nil
|
||||
}
|
||||
return s.stats, nil
|
||||
}
|
||||
|
||||
type dashboardCacheStub struct {
|
||||
get func(ctx context.Context) (string, error)
|
||||
set func(ctx context.Context, data string, ttl time.Duration) error
|
||||
@@ -140,7 +158,12 @@ func TestDashboardService_CacheHitFresh(t *testing.T) {
|
||||
stats: &usagestats.DashboardStats{TotalUsers: 99},
|
||||
}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: true},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -164,7 +187,12 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) {
|
||||
}
|
||||
repo := &usageRepoStub{stats: stats}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: true},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -191,7 +219,12 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) {
|
||||
}
|
||||
repo := &usageRepoStub{stats: stats}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: false},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -226,7 +259,12 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) {
|
||||
onCall: refreshCh,
|
||||
}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: true},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -252,7 +290,12 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) {
|
||||
stats := &usagestats.DashboardStats{TotalUsers: 9}
|
||||
repo := &usageRepoStub{stats: stats}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: true},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -270,7 +313,12 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) {
|
||||
}
|
||||
repo := &usageRepoStub{err: errors.New("db down")}
|
||||
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
|
||||
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: true},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, aggRepo, cache, cfg)
|
||||
|
||||
_, err := svc.GetDashboardStats(context.Background())
|
||||
@@ -311,3 +359,29 @@ func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) {
|
||||
require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt)
|
||||
require.False(t, got.StatsStale)
|
||||
}
|
||||
|
||||
func TestDashboardService_AggDisabled_UsesUsageLogsFallback(t *testing.T) {
|
||||
expected := &usagestats.DashboardStats{TotalUsers: 42}
|
||||
repo := &usageRepoStub{
|
||||
rangeStats: expected,
|
||||
err: errors.New("should not call aggregated stats"),
|
||||
}
|
||||
cfg := &config.Config{
|
||||
Dashboard: config.DashboardCacheConfig{Enabled: false},
|
||||
DashboardAgg: config.DashboardAggregationConfig{
|
||||
Enabled: false,
|
||||
Retention: config.DashboardAggregationRetentionConfig{
|
||||
UsageLogsDays: 7,
|
||||
},
|
||||
},
|
||||
}
|
||||
svc := NewDashboardService(repo, nil, nil, cfg)
|
||||
|
||||
got, err := svc.GetDashboardStats(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(42), got.TotalUsers)
|
||||
require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls))
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&repo.rangeCalls))
|
||||
require.False(t, repo.rangeEnd.IsZero())
|
||||
require.Equal(t, truncateToDayUTC(repo.rangeEnd.AddDate(0, 0, -7)), repo.rangeStart)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user