fix(仪表盘): 修正聚合时间桶与清理节流

This commit is contained in:
yangjianbo
2026-01-11 17:21:17 +08:00
parent d78f42d2fd
commit 5364011a5b
3 changed files with 108 additions and 14 deletions

View File

@@ -43,10 +43,11 @@ func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, sta
dayEnd = dayEnd.Add(24 * time.Hour)
}
if err := r.insertHourlyActiveUsers(ctx, startUTC, endUTC); err != nil {
// 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。
if err := r.insertHourlyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.insertDailyActiveUsers(ctx, startUTC, endUTC); err != nil {
if err := r.insertDailyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil {
@@ -138,10 +139,10 @@ func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Cont
query := `
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
SELECT DISTINCT
(created_at AT TIME ZONE 'UTC')::date AS bucket_date,
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
user_id
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $1 AND bucket_start < $2
ON CONFLICT DO NOTHING
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())

View File

@@ -134,12 +134,12 @@ func (s *DashboardAggregationService) runScheduledAggregation() {
}
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
start := last.Add(-lookback)
epoch := time.Unix(0, 0).UTC()
start := last.Add(-lookback)
if !last.After(epoch) {
start = now.Add(-lookback)
}
if start.After(now) {
// 首次聚合覆盖当天,避免只统计最后一个窗口。
start = truncateToDayUTC(now)
} else if start.After(now) {
start = now.Add(-lookback)
}
@@ -204,17 +204,21 @@ func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context,
return
}
}
s.lastRetentionCleanup.Store(now)
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil {
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err)
aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff)
if aggErr != nil {
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr)
}
if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil {
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err)
usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff)
if usageErr != nil {
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr)
}
if aggErr == nil && usageErr == nil {
s.lastRetentionCleanup.Store(now)
}
}

View File

@@ -0,0 +1,89 @@
package service
import (
"context"
"errors"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/stretchr/testify/require"
)
type dashboardAggregationRepoTestStub struct {
aggregateCalls int
lastStart time.Time
lastEnd time.Time
watermark time.Time
aggregateErr error
cleanupAggregatesErr error
cleanupUsageErr error
}
func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, start, end time.Time) error {
s.aggregateCalls++
s.lastStart = start
s.lastEnd = end
return s.aggregateErr
}
func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return s.watermark, nil
}
func (s *dashboardAggregationRepoTestStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardAggregationRepoTestStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return s.cleanupAggregatesErr
}
func (s *dashboardAggregationRepoTestStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return s.cleanupUsageErr
}
func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) {
repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()}
svc := &DashboardAggregationService{
repo: repo,
cfg: config.DashboardAggregationConfig{
Enabled: true,
IntervalSeconds: 60,
LookbackSeconds: 120,
Retention: config.DashboardAggregationRetentionConfig{
UsageLogsDays: 1,
HourlyDays: 1,
DailyDays: 1,
},
},
}
svc.runScheduledAggregation()
require.Equal(t, 1, repo.aggregateCalls)
require.False(t, repo.lastEnd.IsZero())
require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart)
}
func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) {
repo := &dashboardAggregationRepoTestStub{cleanupAggregatesErr: errors.New("清理失败")}
svc := &DashboardAggregationService{
repo: repo,
cfg: config.DashboardAggregationConfig{
Retention: config.DashboardAggregationRetentionConfig{
UsageLogsDays: 1,
HourlyDays: 1,
DailyDays: 1,
},
},
}
svc.maybeCleanupRetention(context.Background(), time.Now().UTC())
require.Nil(t, svc.lastRetentionCleanup.Load())
}