diff --git a/backend/internal/repository/dashboard_aggregation_repo.go b/backend/internal/repository/dashboard_aggregation_repo.go index dbba5cdb..b02cde0d 100644 --- a/backend/internal/repository/dashboard_aggregation_repo.go +++ b/backend/internal/repository/dashboard_aggregation_repo.go @@ -43,10 +43,11 @@ func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, sta dayEnd = dayEnd.Add(24 * time.Hour) } - if err := r.insertHourlyActiveUsers(ctx, startUTC, endUTC); err != nil { + // 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。 + if err := r.insertHourlyActiveUsers(ctx, hourStart, hourEnd); err != nil { return err } - if err := r.insertDailyActiveUsers(ctx, startUTC, endUTC); err != nil { + if err := r.insertDailyActiveUsers(ctx, hourStart, hourEnd); err != nil { return err } if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil { @@ -138,10 +139,10 @@ func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Cont query := ` INSERT INTO usage_dashboard_daily_users (bucket_date, user_id) SELECT DISTINCT - (created_at AT TIME ZONE 'UTC')::date AS bucket_date, + (bucket_start AT TIME ZONE 'UTC')::date AS bucket_date, user_id - FROM usage_logs - WHERE created_at >= $1 AND created_at < $2 + FROM usage_dashboard_hourly_users + WHERE bucket_start >= $1 AND bucket_start < $2 ON CONFLICT DO NOTHING ` _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go index 343c3240..133ab018 100644 --- a/backend/internal/service/dashboard_aggregation_service.go +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -134,12 +134,12 @@ func (s *DashboardAggregationService) runScheduledAggregation() { } lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second - start := last.Add(-lookback) epoch := time.Unix(0, 0).UTC() + start := last.Add(-lookback) if !last.After(epoch) { - start = now.Add(-lookback) - } - if start.After(now) { + // 首次聚合覆盖当天,避免只统计最后一个窗口。 + start = truncateToDayUTC(now) + } else if start.After(now) { start = now.Add(-lookback) } @@ -204,17 +204,21 @@ func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, return } } - s.lastRetentionCleanup.Store(now) hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays) dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays) usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays) - if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil { - log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err) + aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff) + if aggErr != nil { + log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr) } - if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil { - log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err) + usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff) + if usageErr != nil { + log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr) + } + if aggErr == nil && usageErr == nil { + s.lastRetentionCleanup.Store(now) } } diff --git a/backend/internal/service/dashboard_aggregation_service_test.go b/backend/internal/service/dashboard_aggregation_service_test.go new file mode 100644 index 00000000..501b11d4 --- /dev/null +++ b/backend/internal/service/dashboard_aggregation_service_test.go @@ -0,0 +1,89 @@ +package service + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/stretchr/testify/require" +) + +type dashboardAggregationRepoTestStub struct { + aggregateCalls int + lastStart time.Time + lastEnd time.Time + watermark time.Time + aggregateErr error + cleanupAggregatesErr error + cleanupUsageErr error +} + +func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, start, end time.Time) error { + s.aggregateCalls++ + s.lastStart = start + s.lastEnd = end + return s.aggregateErr +} + +func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + return s.watermark, nil +} + +func (s *dashboardAggregationRepoTestStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoTestStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + return s.cleanupAggregatesErr +} + +func (s *dashboardAggregationRepoTestStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + return s.cleanupUsageErr +} + +func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + return nil +} + +func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + Enabled: true, + IntervalSeconds: 60, + LookbackSeconds: 120, + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 1, + HourlyDays: 1, + DailyDays: 1, + }, + }, + } + + svc.runScheduledAggregation() + + require.Equal(t, 1, repo.aggregateCalls) + require.False(t, repo.lastEnd.IsZero()) + require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart) +} + +func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{cleanupAggregatesErr: errors.New("清理失败")} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 1, + HourlyDays: 1, + DailyDays: 1, + }, + }, + } + + svc.maybeCleanupRetention(context.Background(), time.Now().UTC()) + + require.Nil(t, svc.lastRetentionCleanup.Load()) +}