From 6271a33d0898f1e3b43bac0495ff81103f437171 Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 18:20:15 +0800 Subject: [PATCH] =?UTF-8?q?fix(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=E7=A6=81=E7=94=A8=E8=81=9A=E5=90=88=E4=B8=8E=E5=9B=9E?= =?UTF-8?q?=E5=A1=AB=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/config/config.go | 12 ++ backend/internal/config/config_test.go | 22 +++ .../handler/admin/dashboard_handler.go | 4 + backend/internal/repository/usage_log_repo.go | 169 +++++++++++++++--- .../usage_log_repo_integration_test.go | 74 ++++++++ .../service/dashboard_aggregation_service.go | 18 +- .../dashboard_aggregation_service_test.go | 21 ++- backend/internal/service/dashboard_service.go | 25 ++- .../service/dashboard_service_test.go | 94 ++++++++-- config.yaml | 3 + deploy/.env.example | 27 +++ deploy/config.example.yaml | 3 + 12 files changed, 434 insertions(+), 38 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index b91a07c1..a2fbbd1d 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -398,6 +398,8 @@ type DashboardAggregationConfig struct { LookbackSeconds int `mapstructure:"lookback_seconds"` // BackfillEnabled: 是否允许全量回填 BackfillEnabled bool `mapstructure:"backfill_enabled"` + // BackfillMaxDays: 回填最大跨度(天) + BackfillMaxDays int `mapstructure:"backfill_max_days"` // Retention: 各表保留窗口(天) Retention DashboardAggregationRetentionConfig `mapstructure:"retention"` // RecomputeDays: 启动时重算最近 N 天 @@ -726,6 +728,7 @@ func setDefaults() { viper.SetDefault("dashboard_aggregation.interval_seconds", 60) viper.SetDefault("dashboard_aggregation.lookback_seconds", 120) viper.SetDefault("dashboard_aggregation.backfill_enabled", false) + viper.SetDefault("dashboard_aggregation.backfill_max_days", 31) viper.SetDefault("dashboard_aggregation.retention.usage_logs_days", 90) viper.SetDefault("dashboard_aggregation.retention.hourly_days", 180) viper.SetDefault("dashboard_aggregation.retention.daily_days", 730) @@ -920,6 +923,12 @@ func (c *Config) Validate() error { if c.DashboardAgg.LookbackSeconds < 0 { return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") } + if c.DashboardAgg.BackfillMaxDays < 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative") + } + if c.DashboardAgg.BackfillEnabled && c.DashboardAgg.BackfillMaxDays == 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be positive") + } if c.DashboardAgg.Retention.UsageLogsDays <= 0 { return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be positive") } @@ -939,6 +948,9 @@ func (c *Config) Validate() error { if c.DashboardAgg.LookbackSeconds < 0 { return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") } + if c.DashboardAgg.BackfillMaxDays < 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative") + } if c.DashboardAgg.Retention.UsageLogsDays < 0 { return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be non-negative") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 7fc34d64..1ba6d053 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -226,6 +226,9 @@ func TestLoadDefaultDashboardAggregationConfig(t *testing.T) { if cfg.DashboardAgg.BackfillEnabled { t.Fatalf("DashboardAgg.BackfillEnabled = true, want false") } + if cfg.DashboardAgg.BackfillMaxDays != 31 { + t.Fatalf("DashboardAgg.BackfillMaxDays = %d, want 31", cfg.DashboardAgg.BackfillMaxDays) + } if cfg.DashboardAgg.Retention.UsageLogsDays != 90 { t.Fatalf("DashboardAgg.Retention.UsageLogsDays = %d, want 90", cfg.DashboardAgg.Retention.UsageLogsDays) } @@ -258,3 +261,22 @@ func TestValidateDashboardAggregationConfigDisabled(t *testing.T) { t.Fatalf("Validate() expected interval_seconds error, got: %v", err) } } + +func TestValidateDashboardAggregationBackfillMaxDays(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.DashboardAgg.BackfillEnabled = true + cfg.DashboardAgg.BackfillMaxDays = 0 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for dashboard_aggregation.backfill_max_days, got nil") + } + if !strings.Contains(err.Error(), "dashboard_aggregation.backfill_max_days") { + t.Fatalf("Validate() expected backfill_max_days error, got: %v", err) + } +} diff --git a/backend/internal/handler/admin/dashboard_handler.go b/backend/internal/handler/admin/dashboard_handler.go index 560d1075..9b675974 100644 --- a/backend/internal/handler/admin/dashboard_handler.go +++ b/backend/internal/handler/admin/dashboard_handler.go @@ -159,6 +159,10 @@ func (h *DashboardHandler) BackfillAggregation(c *gin.Context) { response.Forbidden(c, "Backfill is disabled") return } + if errors.Is(err, service.ErrDashboardBackfillTooLarge) { + response.BadRequest(c, "Backfill range too large") + return + } response.InternalError(c, "Failed to trigger backfill") return } diff --git a/backend/internal/repository/usage_log_repo.go b/backend/internal/repository/usage_log_repo.go index be2a6d18..e483f89f 100644 --- a/backend/internal/repository/usage_log_repo.go +++ b/backend/internal/repository/usage_log_repo.go @@ -269,11 +269,56 @@ func (r *usageLogRepository) GetUserStats(ctx context.Context, userID int64, sta type DashboardStats = usagestats.DashboardStats func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) { - var stats DashboardStats - now := time.Now() + stats := &DashboardStats{} + now := time.Now().UTC() todayUTC := truncateToDayUTC(now) - // 合并用户统计查询 + if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + if err := r.fillDashboardUsageStatsAggregated(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + + rpm, tpm, err := r.getPerformanceStats(ctx, 0) + if err != nil { + return nil, err + } + stats.Rpm = rpm + stats.Tpm = tpm + + return stats, nil +} + +func (r *usageLogRepository) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*DashboardStats, error) { + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return nil, errors.New("统计时间范围无效") + } + + stats := &DashboardStats{} + now := time.Now().UTC() + todayUTC := truncateToDayUTC(now) + + if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + if err := r.fillDashboardUsageStatsFromUsageLogs(ctx, stats, startUTC, endUTC, todayUTC, now); err != nil { + return nil, err + } + + rpm, tpm, err := r.getPerformanceStats(ctx, 0) + if err != nil { + return nil, err + } + stats.Rpm = rpm + stats.Tpm = tpm + + return stats, nil +} + +func (r *usageLogRepository) fillDashboardEntityStats(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error { userStatsQuery := ` SELECT COUNT(*) as total_users, @@ -289,10 +334,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalUsers, &stats.TodayNewUsers, ); err != nil { - return nil, err + return err } - // 合并API Key统计查询 apiKeyStatsQuery := ` SELECT COUNT(*) as total_api_keys, @@ -308,10 +352,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalAPIKeys, &stats.ActiveAPIKeys, ); err != nil { - return nil, err + return err } - // 合并账户统计查询 accountStatsQuery := ` SELECT COUNT(*) as total_accounts, @@ -333,10 +376,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.RateLimitAccounts, &stats.OverloadAccounts, ); err != nil { - return nil, err + return err } - // 累计 Token 统计 + return nil +} + +func (r *usageLogRepository) fillDashboardUsageStatsAggregated(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error { totalStatsQuery := ` SELECT COALESCE(SUM(total_requests), 0) as total_requests, @@ -364,14 +410,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalActualCost, &totalDurationMs, ); err != nil { - return nil, err + return err } stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens if stats.TotalRequests > 0 { stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) } - // 今日 Token 统计 todayStatsQuery := ` SELECT total_requests as today_requests, @@ -400,12 +445,11 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.ActiveUsers, ); err != nil { if err != sql.ErrNoRows { - return nil, err + return err } } stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens - // 当前小时活跃用户 hourlyActiveQuery := ` SELECT active_users FROM usage_dashboard_hourly @@ -414,19 +458,100 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS hourStart := now.UTC().Truncate(time.Hour) if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil { if err != sql.ErrNoRows { - return nil, err + return err } } - // 性能指标:RPM 和 TPM(最近1分钟,全局) - rpm, tpm, err := r.getPerformanceStats(ctx, 0) - if err != nil { - return nil, err - } - stats.Rpm = rpm - stats.Tpm = tpm + return nil +} - return &stats, nil +func (r *usageLogRepository) fillDashboardUsageStatsFromUsageLogs(ctx context.Context, stats *DashboardStats, startUTC, endUTC, todayUTC, now time.Time) error { + totalStatsQuery := ` + SELECT + COUNT(*) as total_requests, + COALESCE(SUM(input_tokens), 0) as total_input_tokens, + COALESCE(SUM(output_tokens), 0) as total_output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens, + COALESCE(SUM(total_cost), 0) as total_cost, + COALESCE(SUM(actual_cost), 0) as total_actual_cost, + COALESCE(SUM(COALESCE(duration_ms, 0)), 0) as total_duration_ms + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + var totalDurationMs int64 + if err := scanSingleRow( + ctx, + r.sql, + totalStatsQuery, + []any{startUTC, endUTC}, + &stats.TotalRequests, + &stats.TotalInputTokens, + &stats.TotalOutputTokens, + &stats.TotalCacheCreationTokens, + &stats.TotalCacheReadTokens, + &stats.TotalCost, + &stats.TotalActualCost, + &totalDurationMs, + ); err != nil { + return err + } + stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens + if stats.TotalRequests > 0 { + stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) + } + + todayEnd := todayUTC.Add(24 * time.Hour) + todayStatsQuery := ` + SELECT + COUNT(*) as today_requests, + COALESCE(SUM(input_tokens), 0) as today_input_tokens, + COALESCE(SUM(output_tokens), 0) as today_output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) as today_cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) as today_cache_read_tokens, + COALESCE(SUM(total_cost), 0) as today_cost, + COALESCE(SUM(actual_cost), 0) as today_actual_cost + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow( + ctx, + r.sql, + todayStatsQuery, + []any{todayUTC, todayEnd}, + &stats.TodayRequests, + &stats.TodayInputTokens, + &stats.TodayOutputTokens, + &stats.TodayCacheCreationTokens, + &stats.TodayCacheReadTokens, + &stats.TodayCost, + &stats.TodayActualCost, + ); err != nil { + return err + } + stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens + + activeUsersQuery := ` + SELECT COUNT(DISTINCT user_id) as active_users + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow(ctx, r.sql, activeUsersQuery, []any{todayUTC, todayEnd}, &stats.ActiveUsers); err != nil { + return err + } + + hourStart := now.UTC().Truncate(time.Hour) + hourEnd := hourStart.Add(time.Hour) + hourlyActiveQuery := ` + SELECT COUNT(DISTINCT user_id) as active_users + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart, hourEnd}, &stats.HourlyActiveUsers); err != nil { + return err + } + + return nil } func (r *usageLogRepository) ListByAccount(ctx context.Context, accountID int64, params pagination.PaginationParams) ([]service.UsageLog, *pagination.PaginationResult, error) { diff --git a/backend/internal/repository/usage_log_repo_integration_test.go b/backend/internal/repository/usage_log_repo_integration_test.go index 09341db3..a944ed32 100644 --- a/backend/internal/repository/usage_log_repo_integration_test.go +++ b/backend/internal/repository/usage_log_repo_integration_test.go @@ -308,6 +308,80 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { s.Require().Equal(wantTpm, stats.Tpm, "Tpm mismatch") } +func (s *UsageLogRepoSuite) TestDashboardStatsWithRange_Fallback() { + now := time.Now().UTC() + todayStart := truncateToDayUTC(now) + rangeStart := todayStart.Add(-24 * time.Hour) + rangeEnd := now + + user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u1@test.com"}) + user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u2@test.com"}) + apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-range-1", Name: "k1"}) + apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-range-2", Name: "k2"}) + account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-range"}) + + d1, d2, d3 := 100, 200, 300 + logOutside := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 7, + OutputTokens: 8, + TotalCost: 0.8, + ActualCost: 0.7, + DurationMs: &d3, + CreatedAt: rangeStart.Add(-1 * time.Hour), + } + _, err := s.repo.Create(s.ctx, logOutside) + s.Require().NoError(err) + + logRange := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 10, + OutputTokens: 20, + CacheCreationTokens: 1, + CacheReadTokens: 2, + TotalCost: 1.0, + ActualCost: 0.9, + DurationMs: &d1, + CreatedAt: rangeStart.Add(2 * time.Hour), + } + _, err = s.repo.Create(s.ctx, logRange) + s.Require().NoError(err) + + logToday := &service.UsageLog{ + UserID: user2.ID, + APIKeyID: apiKey2.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 5, + OutputTokens: 6, + CacheReadTokens: 1, + TotalCost: 0.5, + ActualCost: 0.5, + DurationMs: &d2, + CreatedAt: now, + } + _, err = s.repo.Create(s.ctx, logToday) + s.Require().NoError(err) + + stats, err := s.repo.GetDashboardStatsWithRange(s.ctx, rangeStart, rangeEnd) + s.Require().NoError(err) + s.Require().Equal(int64(2), stats.TotalRequests) + s.Require().Equal(int64(15), stats.TotalInputTokens) + s.Require().Equal(int64(26), stats.TotalOutputTokens) + s.Require().Equal(int64(1), stats.TotalCacheCreationTokens) + s.Require().Equal(int64(3), stats.TotalCacheReadTokens) + s.Require().Equal(int64(45), stats.TotalTokens) + s.Require().Equal(1.5, stats.TotalCost) + s.Require().Equal(1.4, stats.TotalActualCost) + s.Require().InEpsilon(150.0, stats.AverageDurationMs, 0.0001) +} + // --- GetUserDashboardStats --- func (s *UsageLogRepoSuite) TestGetUserDashboardStats() { diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go index 133ab018..0d1cec57 100644 --- a/backend/internal/service/dashboard_aggregation_service.go +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -19,6 +19,8 @@ const ( var ( // ErrDashboardBackfillDisabled 当配置禁用回填时返回。 ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用") + // ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。 + ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大") ) // DashboardAggregationRepository 定义仪表盘预聚合仓储接口。 @@ -76,6 +78,9 @@ func (s *DashboardAggregationService) Start() { s.runScheduledAggregation() }) log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds) + if !s.cfg.BackfillEnabled { + log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填") + } } // TriggerBackfill 触发回填(异步)。 @@ -90,6 +95,12 @@ func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) erro if !end.After(start) { return errors.New("回填时间范围无效") } + if s.cfg.BackfillMaxDays > 0 { + maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour + if end.Sub(start) > maxRange { + return ErrDashboardBackfillTooLarge + } + } go func() { ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) @@ -137,8 +148,11 @@ func (s *DashboardAggregationService) runScheduledAggregation() { epoch := time.Unix(0, 0).UTC() start := last.Add(-lookback) if !last.After(epoch) { - // 首次聚合覆盖当天,避免只统计最后一个窗口。 - start = truncateToDayUTC(now) + retentionDays := s.cfg.Retention.UsageLogsDays + if retentionDays <= 0 { + retentionDays = 1 + } + start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays)) } else if start.After(now) { start = now.Add(-lookback) } diff --git a/backend/internal/service/dashboard_aggregation_service_test.go b/backend/internal/service/dashboard_aggregation_service_test.go index 501b11d4..2fc22105 100644 --- a/backend/internal/service/dashboard_aggregation_service_test.go +++ b/backend/internal/service/dashboard_aggregation_service_test.go @@ -47,7 +47,7 @@ func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context return nil } -func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) { +func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesRetentionStart(t *testing.T) { repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()} svc := &DashboardAggregationService{ repo: repo, @@ -67,7 +67,7 @@ func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t require.Equal(t, 1, repo.aggregateCalls) require.False(t, repo.lastEnd.IsZero()) - require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart) + require.Equal(t, truncateToDayUTC(repo.lastEnd.AddDate(0, 0, -1)), repo.lastStart) } func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) { @@ -87,3 +87,20 @@ func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *te require.Nil(t, svc.lastRetentionCleanup.Load()) } + +func TestDashboardAggregationService_TriggerBackfill_TooLarge(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + BackfillEnabled: true, + BackfillMaxDays: 1, + }, + } + + start := time.Now().AddDate(0, 0, -3) + end := time.Now() + err := svc.TriggerBackfill(start, end) + require.ErrorIs(t, err, ErrDashboardBackfillTooLarge) + require.Equal(t, 0, repo.aggregateCalls) +} diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index d0e6e03c..69d251cb 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -29,6 +29,10 @@ type DashboardStatsCache interface { DeleteDashboardStats(ctx context.Context) error } +type dashboardStatsRangeFetcher interface { + GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) +} + type dashboardStatsCacheEntry struct { Stats *usagestats.DashboardStats `json:"stats"` UpdatedAt int64 `json:"updated_at"` @@ -46,6 +50,7 @@ type DashboardService struct { aggEnabled bool aggInterval time.Duration aggLookback time.Duration + aggUsageDays int } func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { @@ -55,6 +60,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat aggEnabled := true aggInterval := time.Minute aggLookback := 2 * time.Minute + aggUsageDays := 90 if cfg != nil { if !cfg.Dashboard.Enabled { cache = nil @@ -75,6 +81,9 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat if cfg.DashboardAgg.LookbackSeconds > 0 { aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second } + if cfg.DashboardAgg.Retention.UsageLogsDays > 0 { + aggUsageDays = cfg.DashboardAgg.Retention.UsageLogsDays + } } return &DashboardService{ usageRepo: usageRepo, @@ -86,6 +95,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat aggEnabled: aggEnabled, aggInterval: aggInterval, aggLookback: aggLookback, + aggUsageDays: aggUsageDays, } } @@ -148,7 +158,7 @@ func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usages } func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { - stats, err := s.usageRepo.GetDashboardStats(ctx) + stats, err := s.fetchDashboardStats(ctx) if err != nil { return nil, err } @@ -173,7 +183,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() { ctx, cancel := context.WithTimeout(context.Background(), s.refreshTimeout) defer cancel() - stats, err := s.usageRepo.GetDashboardStats(ctx) + stats, err := s.fetchDashboardStats(ctx) if err != nil { log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) return @@ -185,6 +195,17 @@ func (s *DashboardService) refreshDashboardStatsAsync() { }() } +func (s *DashboardService) fetchDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { + if !s.aggEnabled { + if fetcher, ok := s.usageRepo.(dashboardStatsRangeFetcher); ok { + now := time.Now().UTC() + start := truncateToDayUTC(now.AddDate(0, 0, -s.aggUsageDays)) + return fetcher.GetDashboardStatsWithRange(ctx, start, now) + } + } + return s.usageRepo.GetDashboardStats(ctx) +} + func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *usagestats.DashboardStats) { if s.cache == nil || stats == nil { return diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go index c7b9c6af..db3c78c3 100644 --- a/backend/internal/service/dashboard_service_test.go +++ b/backend/internal/service/dashboard_service_test.go @@ -16,10 +16,15 @@ import ( type usageRepoStub struct { UsageLogRepository - stats *usagestats.DashboardStats - err error - calls int32 - onCall chan struct{} + stats *usagestats.DashboardStats + rangeStats *usagestats.DashboardStats + err error + rangeErr error + calls int32 + rangeCalls int32 + rangeStart time.Time + rangeEnd time.Time + onCall chan struct{} } func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { @@ -36,6 +41,19 @@ func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.Dash return s.stats, nil } +func (s *usageRepoStub) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) { + atomic.AddInt32(&s.rangeCalls, 1) + s.rangeStart = start + s.rangeEnd = end + if s.rangeErr != nil { + return nil, s.rangeErr + } + if s.rangeStats != nil { + return s.rangeStats, nil + } + return s.stats, nil +} + type dashboardCacheStub struct { get func(ctx context.Context) (string, error) set func(ctx context.Context, data string, ttl time.Duration) error @@ -140,7 +158,12 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { stats: &usagestats.DashboardStats{TotalUsers: 99}, } aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -164,7 +187,12 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { } repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -191,7 +219,12 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { } repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -226,7 +259,12 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { onCall: refreshCh, } aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -252,7 +290,12 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) { stats := &usagestats.DashboardStats{TotalUsers: 9} repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -270,7 +313,12 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) { } repo := &usageRepoStub{err: errors.New("db down")} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) _, err := svc.GetDashboardStats(context.Background()) @@ -311,3 +359,29 @@ func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) { require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt) require.False(t, got.StatsStale) } + +func TestDashboardService_AggDisabled_UsesUsageLogsFallback(t *testing.T) { + expected := &usagestats.DashboardStats{TotalUsers: 42} + repo := &usageRepoStub{ + rangeStats: expected, + err: errors.New("should not call aggregated stats"), + } + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: false, + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 7, + }, + }, + } + svc := NewDashboardService(repo, nil, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(42), got.TotalUsers) + require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls)) + require.Equal(t, int32(1), atomic.LoadInt32(&repo.rangeCalls)) + require.False(t, repo.rangeEnd.IsZero()) + require.Equal(t, truncateToDayUTC(repo.rangeEnd.AddDate(0, 0, -7)), repo.rangeStart) +} diff --git a/config.yaml b/config.yaml index 848421d6..b5272aac 100644 --- a/config.yaml +++ b/config.yaml @@ -232,6 +232,9 @@ dashboard_aggregation: # Allow manual backfill # 允许手动回填 backfill_enabled: false + # Backfill max range (days) + # 回填最大跨度(天) + backfill_max_days: 31 # Recompute recent N days on startup # 启动时重算最近 N 天 recompute_days: 2 diff --git a/deploy/.env.example b/deploy/.env.example index bd8abc5c..83e58a50 100644 --- a/deploy/.env.example +++ b/deploy/.env.example @@ -69,6 +69,33 @@ JWT_EXPIRE_HOUR=24 # Leave unset to use default ./config.yaml #CONFIG_FILE=./config.yaml +# ----------------------------------------------------------------------------- +# Dashboard Aggregation (Optional) +# ----------------------------------------------------------------------------- +# Enable aggregation job +# 启用仪表盘预聚合 +DASHBOARD_AGGREGATION_ENABLED=true +# Refresh interval (seconds) +# 刷新间隔(秒) +DASHBOARD_AGGREGATION_INTERVAL_SECONDS=60 +# Lookback window (seconds) +# 回看窗口(秒) +DASHBOARD_AGGREGATION_LOOKBACK_SECONDS=120 +# Allow manual backfill +# 允许手动回填 +DASHBOARD_AGGREGATION_BACKFILL_ENABLED=false +# Backfill max range (days) +# 回填最大跨度(天) +DASHBOARD_AGGREGATION_BACKFILL_MAX_DAYS=31 +# Recompute recent N days on startup +# 启动时重算最近 N 天 +DASHBOARD_AGGREGATION_RECOMPUTE_DAYS=2 +# Retention windows (days) +# 保留窗口(天) +DASHBOARD_AGGREGATION_RETENTION_USAGE_LOGS_DAYS=90 +DASHBOARD_AGGREGATION_RETENTION_HOURLY_DAYS=180 +DASHBOARD_AGGREGATION_RETENTION_DAILY_DAYS=730 + # ----------------------------------------------------------------------------- # Security Configuration # ----------------------------------------------------------------------------- diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 460606ab..57239f8e 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -232,6 +232,9 @@ dashboard_aggregation: # Allow manual backfill # 允许手动回填 backfill_enabled: false + # Backfill max range (days) + # 回填最大跨度(天) + backfill_max_days: 31 # Recompute recent N days on startup # 启动时重算最近 N 天 recompute_days: 2