From 1a869547d717aeb7f7c2bbd99a55370680ac616f Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 16:01:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E9=A2=84=E8=81=9A=E5=90=88=E7=BB=9F=E8=AE=A1=E4=B8=8E?= =?UTF-8?q?=E8=81=9A=E5=90=88=E4=BD=9C=E4=B8=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/cmd/server/wire_gen.go | 8 +- backend/internal/config/config.go | 115 +++++- backend/internal/config/config_test.go | 53 +++ .../handler/admin/dashboard_handler.go | 61 ++- .../pkg/usagestats/usage_log_types.go | 6 + .../repository/dashboard_aggregation_repo.go | 360 ++++++++++++++++++ backend/internal/repository/usage_log_repo.go | 59 ++- .../usage_log_repo_integration_test.go | 154 +++++++- backend/internal/repository/wire.go | 1 + backend/internal/server/routes/admin.go | 1 + .../service/dashboard_aggregation_service.go | 224 +++++++++++ backend/internal/service/dashboard_service.go | 78 +++- .../service/dashboard_service_test.go | 100 ++++- backend/internal/service/wire.go | 8 + ...034_usage_dashboard_aggregation_tables.sql | 77 ++++ .../035_usage_logs_partitioning.sql | 54 +++ config.yaml | 33 ++ deploy/config.example.yaml | 33 ++ frontend/src/types/index.ts | 3 + 19 files changed, 1366 insertions(+), 62 deletions(-) create mode 100644 backend/internal/repository/dashboard_aggregation_repo.go create mode 100644 backend/internal/service/dashboard_aggregation_service.go create mode 100644 backend/migrations/034_usage_dashboard_aggregation_tables.sql create mode 100644 backend/migrations/035_usage_logs_partitioning.sql diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 4fb8351e..e321576e 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -67,6 +67,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { userHandler := handler.NewUserHandler(userService) apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService) usageLogRepository := repository.NewUsageLogRepository(client, db) + dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db) usageService := service.NewUsageService(usageLogRepository, userRepository, client, apiKeyAuthCacheInvalidator) usageHandler := handler.NewUsageHandler(usageService, apiKeyService) redeemCodeRepository := repository.NewRedeemCodeRepository(client) @@ -76,8 +77,10 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { redeemHandler := handler.NewRedeemHandler(redeemService) subscriptionHandler := handler.NewSubscriptionHandler(subscriptionService) dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig) - dashboardService := service.NewDashboardService(usageLogRepository, dashboardStatsCache, configConfig) - dashboardHandler := admin.NewDashboardHandler(dashboardService) + timingWheelService := service.ProvideTimingWheelService() + dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig) + dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig) + dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService) accountRepository := repository.NewAccountRepository(client, db) proxyRepository := repository.NewProxyRepository(client, db) proxyExitInfoProber := repository.NewProxyExitInfoProber(configConfig) @@ -138,7 +141,6 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { billingService := service.NewBillingService(configConfig, pricingService) identityCache := repository.NewIdentityCache(redisClient) identityService := service.NewIdentityService(identityCache) - timingWheelService := service.ProvideTimingWheelService() deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 677d0c6e..b91a07c1 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -36,27 +36,28 @@ const ( ) type Config struct { - Server ServerConfig `mapstructure:"server"` - CORS CORSConfig `mapstructure:"cors"` - Security SecurityConfig `mapstructure:"security"` - Billing BillingConfig `mapstructure:"billing"` - Turnstile TurnstileConfig `mapstructure:"turnstile"` - Database DatabaseConfig `mapstructure:"database"` - Redis RedisConfig `mapstructure:"redis"` - JWT JWTConfig `mapstructure:"jwt"` - LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"` - Default DefaultConfig `mapstructure:"default"` - RateLimit RateLimitConfig `mapstructure:"rate_limit"` - Pricing PricingConfig `mapstructure:"pricing"` - Gateway GatewayConfig `mapstructure:"gateway"` - APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"` - Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"` - Concurrency ConcurrencyConfig `mapstructure:"concurrency"` - TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"` - RunMode string `mapstructure:"run_mode" yaml:"run_mode"` - Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC" - Gemini GeminiConfig `mapstructure:"gemini"` - Update UpdateConfig `mapstructure:"update"` + Server ServerConfig `mapstructure:"server"` + CORS CORSConfig `mapstructure:"cors"` + Security SecurityConfig `mapstructure:"security"` + Billing BillingConfig `mapstructure:"billing"` + Turnstile TurnstileConfig `mapstructure:"turnstile"` + Database DatabaseConfig `mapstructure:"database"` + Redis RedisConfig `mapstructure:"redis"` + JWT JWTConfig `mapstructure:"jwt"` + LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"` + Default DefaultConfig `mapstructure:"default"` + RateLimit RateLimitConfig `mapstructure:"rate_limit"` + Pricing PricingConfig `mapstructure:"pricing"` + Gateway GatewayConfig `mapstructure:"gateway"` + APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"` + Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"` + DashboardAgg DashboardAggregationConfig `mapstructure:"dashboard_aggregation"` + Concurrency ConcurrencyConfig `mapstructure:"concurrency"` + TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"` + RunMode string `mapstructure:"run_mode" yaml:"run_mode"` + Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC" + Gemini GeminiConfig `mapstructure:"gemini"` + Update UpdateConfig `mapstructure:"update"` } // UpdateConfig 在线更新相关配置 @@ -387,6 +388,29 @@ type DashboardCacheConfig struct { StatsRefreshTimeoutSeconds int `mapstructure:"stats_refresh_timeout_seconds"` } +// DashboardAggregationConfig 仪表盘预聚合配置 +type DashboardAggregationConfig struct { + // Enabled: 是否启用预聚合作业 + Enabled bool `mapstructure:"enabled"` + // IntervalSeconds: 聚合刷新间隔(秒) + IntervalSeconds int `mapstructure:"interval_seconds"` + // LookbackSeconds: 回看窗口(秒) + LookbackSeconds int `mapstructure:"lookback_seconds"` + // BackfillEnabled: 是否允许全量回填 + BackfillEnabled bool `mapstructure:"backfill_enabled"` + // Retention: 各表保留窗口(天) + Retention DashboardAggregationRetentionConfig `mapstructure:"retention"` + // RecomputeDays: 启动时重算最近 N 天 + RecomputeDays int `mapstructure:"recompute_days"` +} + +// DashboardAggregationRetentionConfig 预聚合保留窗口 +type DashboardAggregationRetentionConfig struct { + UsageLogsDays int `mapstructure:"usage_logs_days"` + HourlyDays int `mapstructure:"hourly_days"` + DailyDays int `mapstructure:"daily_days"` +} + func NormalizeRunMode(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) switch normalized { @@ -697,6 +721,16 @@ func setDefaults() { viper.SetDefault("dashboard_cache.stats_ttl_seconds", 30) viper.SetDefault("dashboard_cache.stats_refresh_timeout_seconds", 30) + // Dashboard aggregation + viper.SetDefault("dashboard_aggregation.enabled", true) + viper.SetDefault("dashboard_aggregation.interval_seconds", 60) + viper.SetDefault("dashboard_aggregation.lookback_seconds", 120) + viper.SetDefault("dashboard_aggregation.backfill_enabled", false) + viper.SetDefault("dashboard_aggregation.retention.usage_logs_days", 90) + viper.SetDefault("dashboard_aggregation.retention.hourly_days", 180) + viper.SetDefault("dashboard_aggregation.retention.daily_days", 730) + viper.SetDefault("dashboard_aggregation.recompute_days", 2) + // Gateway viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久 viper.SetDefault("gateway.log_upstream_error_body", false) @@ -879,6 +913,45 @@ func (c *Config) Validate() error { return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be non-negative") } } + if c.DashboardAgg.Enabled { + if c.DashboardAgg.IntervalSeconds <= 0 { + return fmt.Errorf("dashboard_aggregation.interval_seconds must be positive") + } + if c.DashboardAgg.LookbackSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") + } + if c.DashboardAgg.Retention.UsageLogsDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be positive") + } + if c.DashboardAgg.Retention.HourlyDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be positive") + } + if c.DashboardAgg.Retention.DailyDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.daily_days must be positive") + } + if c.DashboardAgg.RecomputeDays < 0 { + return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative") + } + } else { + if c.DashboardAgg.IntervalSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.interval_seconds must be non-negative") + } + if c.DashboardAgg.LookbackSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") + } + if c.DashboardAgg.Retention.UsageLogsDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be non-negative") + } + if c.DashboardAgg.Retention.HourlyDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be non-negative") + } + if c.DashboardAgg.Retention.DailyDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.daily_days must be non-negative") + } + if c.DashboardAgg.RecomputeDays < 0 { + return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative") + } + } if c.Gateway.MaxBodySize <= 0 { return fmt.Errorf("gateway.max_body_size must be positive") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 6cd95b1c..7fc34d64 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -205,3 +205,56 @@ func TestValidateDashboardCacheConfigDisabled(t *testing.T) { t.Fatalf("Validate() expected stats_ttl_seconds error, got: %v", err) } } + +func TestLoadDefaultDashboardAggregationConfig(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if !cfg.DashboardAgg.Enabled { + t.Fatalf("DashboardAgg.Enabled = false, want true") + } + if cfg.DashboardAgg.IntervalSeconds != 60 { + t.Fatalf("DashboardAgg.IntervalSeconds = %d, want 60", cfg.DashboardAgg.IntervalSeconds) + } + if cfg.DashboardAgg.LookbackSeconds != 120 { + t.Fatalf("DashboardAgg.LookbackSeconds = %d, want 120", cfg.DashboardAgg.LookbackSeconds) + } + if cfg.DashboardAgg.BackfillEnabled { + t.Fatalf("DashboardAgg.BackfillEnabled = true, want false") + } + if cfg.DashboardAgg.Retention.UsageLogsDays != 90 { + t.Fatalf("DashboardAgg.Retention.UsageLogsDays = %d, want 90", cfg.DashboardAgg.Retention.UsageLogsDays) + } + if cfg.DashboardAgg.Retention.HourlyDays != 180 { + t.Fatalf("DashboardAgg.Retention.HourlyDays = %d, want 180", cfg.DashboardAgg.Retention.HourlyDays) + } + if cfg.DashboardAgg.Retention.DailyDays != 730 { + t.Fatalf("DashboardAgg.Retention.DailyDays = %d, want 730", cfg.DashboardAgg.Retention.DailyDays) + } + if cfg.DashboardAgg.RecomputeDays != 2 { + t.Fatalf("DashboardAgg.RecomputeDays = %d, want 2", cfg.DashboardAgg.RecomputeDays) + } +} + +func TestValidateDashboardAggregationConfigDisabled(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.DashboardAgg.Enabled = false + cfg.DashboardAgg.IntervalSeconds = -1 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for negative dashboard_aggregation.interval_seconds, got nil") + } + if !strings.Contains(err.Error(), "dashboard_aggregation.interval_seconds") { + t.Fatalf("Validate() expected interval_seconds error, got: %v", err) + } +} diff --git a/backend/internal/handler/admin/dashboard_handler.go b/backend/internal/handler/admin/dashboard_handler.go index 30cdd914..560d1075 100644 --- a/backend/internal/handler/admin/dashboard_handler.go +++ b/backend/internal/handler/admin/dashboard_handler.go @@ -1,6 +1,7 @@ package admin import ( + "errors" "strconv" "time" @@ -13,15 +14,17 @@ import ( // DashboardHandler handles admin dashboard statistics type DashboardHandler struct { - dashboardService *service.DashboardService - startTime time.Time // Server start time for uptime calculation + dashboardService *service.DashboardService + aggregationService *service.DashboardAggregationService + startTime time.Time // Server start time for uptime calculation } // NewDashboardHandler creates a new admin dashboard handler -func NewDashboardHandler(dashboardService *service.DashboardService) *DashboardHandler { +func NewDashboardHandler(dashboardService *service.DashboardService, aggregationService *service.DashboardAggregationService) *DashboardHandler { return &DashboardHandler{ - dashboardService: dashboardService, - startTime: time.Now(), + dashboardService: dashboardService, + aggregationService: aggregationService, + startTime: time.Now(), } } @@ -114,6 +117,54 @@ func (h *DashboardHandler) GetStats(c *gin.Context) { // 性能指标 "rpm": stats.Rpm, "tpm": stats.Tpm, + + // 预聚合新鲜度 + "hourly_active_users": stats.HourlyActiveUsers, + "stats_updated_at": stats.StatsUpdatedAt, + "stats_stale": stats.StatsStale, + }) +} + +type DashboardAggregationBackfillRequest struct { + Start string `json:"start"` + End string `json:"end"` +} + +// BackfillAggregation handles triggering aggregation backfill +// POST /api/v1/admin/dashboard/aggregation/backfill +func (h *DashboardHandler) BackfillAggregation(c *gin.Context) { + if h.aggregationService == nil { + response.InternalError(c, "Aggregation service not available") + return + } + + var req DashboardAggregationBackfillRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request body") + return + } + start, err := time.Parse(time.RFC3339, req.Start) + if err != nil { + response.BadRequest(c, "Invalid start time") + return + } + end, err := time.Parse(time.RFC3339, req.End) + if err != nil { + response.BadRequest(c, "Invalid end time") + return + } + + if err := h.aggregationService.TriggerBackfill(start, end); err != nil { + if errors.Is(err, service.ErrDashboardBackfillDisabled) { + response.Forbidden(c, "Backfill is disabled") + return + } + response.InternalError(c, "Failed to trigger backfill") + return + } + + response.Success(c, gin.H{ + "status": "accepted", }) } diff --git a/backend/internal/pkg/usagestats/usage_log_types.go b/backend/internal/pkg/usagestats/usage_log_types.go index 39314602..3952785b 100644 --- a/backend/internal/pkg/usagestats/usage_log_types.go +++ b/backend/internal/pkg/usagestats/usage_log_types.go @@ -9,6 +9,12 @@ type DashboardStats struct { TotalUsers int64 `json:"total_users"` TodayNewUsers int64 `json:"today_new_users"` // 今日新增用户数 ActiveUsers int64 `json:"active_users"` // 今日有请求的用户数 + // 小时活跃用户数(UTC 当前小时) + HourlyActiveUsers int64 `json:"hourly_active_users"` + + // 预聚合新鲜度 + StatsUpdatedAt string `json:"stats_updated_at"` + StatsStale bool `json:"stats_stale"` // API Key 统计 TotalAPIKeys int64 `json:"total_api_keys"` diff --git a/backend/internal/repository/dashboard_aggregation_repo.go b/backend/internal/repository/dashboard_aggregation_repo.go new file mode 100644 index 00000000..dbba5cdb --- /dev/null +++ b/backend/internal/repository/dashboard_aggregation_repo.go @@ -0,0 +1,360 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/lib/pq" +) + +type dashboardAggregationRepository struct { + sql sqlExecutor +} + +// NewDashboardAggregationRepository 创建仪表盘预聚合仓储。 +func NewDashboardAggregationRepository(sqlDB *sql.DB) service.DashboardAggregationRepository { + return newDashboardAggregationRepositoryWithSQL(sqlDB) +} + +func newDashboardAggregationRepositoryWithSQL(sqlq sqlExecutor) *dashboardAggregationRepository { + return &dashboardAggregationRepository{sql: sqlq} +} + +func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error { + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return nil + } + + hourStart := startUTC.Truncate(time.Hour) + hourEnd := endUTC.Truncate(time.Hour) + if endUTC.After(hourEnd) { + hourEnd = hourEnd.Add(time.Hour) + } + + dayStart := truncateToDayUTC(startUTC) + dayEnd := truncateToDayUTC(endUTC) + if endUTC.After(dayEnd) { + dayEnd = dayEnd.Add(24 * time.Hour) + } + + if err := r.insertHourlyActiveUsers(ctx, startUTC, endUTC); err != nil { + return err + } + if err := r.insertDailyActiveUsers(ctx, startUTC, endUTC); err != nil { + return err + } + if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil { + return err + } + if err := r.upsertDailyAggregates(ctx, dayStart, dayEnd); err != nil { + return err + } + return nil +} + +func (r *dashboardAggregationRepository) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + var ts time.Time + query := "SELECT last_aggregated_at FROM usage_dashboard_aggregation_watermark WHERE id = 1" + if err := scanSingleRow(ctx, r.sql, query, nil, &ts); err != nil { + if err == sql.ErrNoRows { + return time.Unix(0, 0).UTC(), nil + } + return time.Time{}, err + } + return ts.UTC(), nil +} + +func (r *dashboardAggregationRepository) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + query := ` + INSERT INTO usage_dashboard_aggregation_watermark (id, last_aggregated_at, updated_at) + VALUES (1, $1, NOW()) + ON CONFLICT (id) + DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at, updated_at = EXCLUDED.updated_at + ` + _, err := r.sql.ExecContext(ctx, query, aggregatedAt.UTC()) + return err +} + +func (r *dashboardAggregationRepository) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + _, err := r.sql.ExecContext(ctx, ` + DELETE FROM usage_dashboard_hourly WHERE bucket_start < $1; + DELETE FROM usage_dashboard_hourly_users WHERE bucket_start < $1; + DELETE FROM usage_dashboard_daily WHERE bucket_date < $2::date; + DELETE FROM usage_dashboard_daily_users WHERE bucket_date < $2::date; + `, hourlyCutoff.UTC(), dailyCutoff.UTC()) + return err +} + +func (r *dashboardAggregationRepository) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + isPartitioned, err := r.isUsageLogsPartitioned(ctx) + if err != nil { + return err + } + if isPartitioned { + return r.dropUsageLogsPartitions(ctx, cutoff) + } + _, err = r.sql.ExecContext(ctx, "DELETE FROM usage_logs WHERE created_at < $1", cutoff.UTC()) + return err +} + +func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + isPartitioned, err := r.isUsageLogsPartitioned(ctx) + if err != nil || !isPartitioned { + return err + } + monthStart := truncateToMonthUTC(now) + prevMonth := monthStart.AddDate(0, -1, 0) + nextMonth := monthStart.AddDate(0, 1, 0) + + for _, m := range []time.Time{prevMonth, monthStart, nextMonth} { + if err := r.createUsageLogsPartition(ctx, m); err != nil { + return err + } + } + return nil +} + +func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error { + query := ` + INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id) + SELECT DISTINCT + date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + user_id + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ON CONFLICT DO NOTHING + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error { + query := ` + INSERT INTO usage_dashboard_daily_users (bucket_date, user_id) + SELECT DISTINCT + (created_at AT TIME ZONE 'UTC')::date AS bucket_date, + user_id + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ON CONFLICT DO NOTHING + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error { + query := ` + WITH hourly AS ( + SELECT + date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + COUNT(*) AS total_requests, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens, + COALESCE(SUM(total_cost), 0) AS total_cost, + COALESCE(SUM(actual_cost), 0) AS actual_cost, + COALESCE(SUM(COALESCE(duration_ms, 0)), 0) AS total_duration_ms + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + GROUP BY 1 + ), + user_counts AS ( + SELECT bucket_start, COUNT(*) AS active_users + FROM usage_dashboard_hourly_users + WHERE bucket_start >= $1 AND bucket_start < $2 + GROUP BY bucket_start + ) + INSERT INTO usage_dashboard_hourly ( + bucket_start, + total_requests, + input_tokens, + output_tokens, + cache_creation_tokens, + cache_read_tokens, + total_cost, + actual_cost, + total_duration_ms, + active_users, + computed_at + ) + SELECT + hourly.bucket_start, + hourly.total_requests, + hourly.input_tokens, + hourly.output_tokens, + hourly.cache_creation_tokens, + hourly.cache_read_tokens, + hourly.total_cost, + hourly.actual_cost, + hourly.total_duration_ms, + COALESCE(user_counts.active_users, 0) AS active_users, + NOW() + FROM hourly + LEFT JOIN user_counts ON user_counts.bucket_start = hourly.bucket_start + ON CONFLICT (bucket_start) + DO UPDATE SET + total_requests = EXCLUDED.total_requests, + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_creation_tokens = EXCLUDED.cache_creation_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + total_cost = EXCLUDED.total_cost, + actual_cost = EXCLUDED.actual_cost, + total_duration_ms = EXCLUDED.total_duration_ms, + active_users = EXCLUDED.active_users, + computed_at = EXCLUDED.computed_at + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error { + query := ` + WITH daily AS ( + SELECT + (bucket_start AT TIME ZONE 'UTC')::date AS bucket_date, + COALESCE(SUM(total_requests), 0) AS total_requests, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens, + COALESCE(SUM(total_cost), 0) AS total_cost, + COALESCE(SUM(actual_cost), 0) AS actual_cost, + COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms + FROM usage_dashboard_hourly + WHERE bucket_start >= $1 AND bucket_start < $2 + GROUP BY (bucket_start AT TIME ZONE 'UTC')::date + ), + user_counts AS ( + SELECT bucket_date, COUNT(*) AS active_users + FROM usage_dashboard_daily_users + WHERE bucket_date >= $3::date AND bucket_date < $4::date + GROUP BY bucket_date + ) + INSERT INTO usage_dashboard_daily ( + bucket_date, + total_requests, + input_tokens, + output_tokens, + cache_creation_tokens, + cache_read_tokens, + total_cost, + actual_cost, + total_duration_ms, + active_users, + computed_at + ) + SELECT + daily.bucket_date, + daily.total_requests, + daily.input_tokens, + daily.output_tokens, + daily.cache_creation_tokens, + daily.cache_read_tokens, + daily.total_cost, + daily.actual_cost, + daily.total_duration_ms, + COALESCE(user_counts.active_users, 0) AS active_users, + NOW() + FROM daily + LEFT JOIN user_counts ON user_counts.bucket_date = daily.bucket_date + ON CONFLICT (bucket_date) + DO UPDATE SET + total_requests = EXCLUDED.total_requests, + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_creation_tokens = EXCLUDED.cache_creation_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + total_cost = EXCLUDED.total_cost, + actual_cost = EXCLUDED.actual_cost, + total_duration_ms = EXCLUDED.total_duration_ms, + active_users = EXCLUDED.active_users, + computed_at = EXCLUDED.computed_at + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) isUsageLogsPartitioned(ctx context.Context) (bool, error) { + query := ` + SELECT EXISTS( + SELECT 1 + FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + WHERE c.relname = 'usage_logs' + ) + ` + var partitioned bool + if err := scanSingleRow(ctx, r.sql, query, nil, &partitioned); err != nil { + return false, err + } + return partitioned, nil +} + +func (r *dashboardAggregationRepository) dropUsageLogsPartitions(ctx context.Context, cutoff time.Time) error { + rows, err := r.sql.QueryContext(ctx, ` + SELECT c.relname + FROM pg_inherits + JOIN pg_class c ON c.oid = pg_inherits.inhrelid + JOIN pg_class p ON p.oid = pg_inherits.inhparent + WHERE p.relname = 'usage_logs' + `) + if err != nil { + return err + } + defer rows.Close() + + cutoffMonth := truncateToMonthUTC(cutoff) + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return err + } + if !strings.HasPrefix(name, "usage_logs_") { + continue + } + suffix := strings.TrimPrefix(name, "usage_logs_") + month, err := time.Parse("200601", suffix) + if err != nil { + continue + } + month = month.UTC() + if month.Before(cutoffMonth) { + if _, err := r.sql.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(name))); err != nil { + return err + } + } + } + return rows.Err() +} + +func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Context, month time.Time) error { + monthStart := truncateToMonthUTC(month) + nextMonth := monthStart.AddDate(0, 1, 0) + name := fmt.Sprintf("usage_logs_%s", monthStart.Format("200601")) + query := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF usage_logs FOR VALUES FROM (%s) TO (%s)", + pq.QuoteIdentifier(name), + pq.QuoteLiteral(monthStart.Format("2006-01-02")), + pq.QuoteLiteral(nextMonth.Format("2006-01-02")), + ) + _, err := r.sql.ExecContext(ctx, query) + return err +} + +func truncateToDayUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} + +func truncateToMonthUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC) +} diff --git a/backend/internal/repository/usage_log_repo.go b/backend/internal/repository/usage_log_repo.go index 6ed8910e..be2a6d18 100644 --- a/backend/internal/repository/usage_log_repo.go +++ b/backend/internal/repository/usage_log_repo.go @@ -270,15 +270,14 @@ type DashboardStats = usagestats.DashboardStats func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) { var stats DashboardStats - today := timezone.Today() now := time.Now() + todayUTC := truncateToDayUTC(now) // 合并用户统计查询 userStatsQuery := ` SELECT COUNT(*) as total_users, - COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users, - (SELECT COUNT(DISTINCT user_id) FROM usage_logs WHERE created_at >= $2) as active_users + COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users FROM users WHERE deleted_at IS NULL ` @@ -286,10 +285,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS ctx, r.sql, userStatsQuery, - []any{today, today}, + []any{todayUTC}, &stats.TotalUsers, &stats.TodayNewUsers, - &stats.ActiveUsers, ); err != nil { return nil, err } @@ -341,16 +339,17 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS // 累计 Token 统计 totalStatsQuery := ` SELECT - COUNT(*) as total_requests, + COALESCE(SUM(total_requests), 0) as total_requests, COALESCE(SUM(input_tokens), 0) as total_input_tokens, COALESCE(SUM(output_tokens), 0) as total_output_tokens, COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens, COALESCE(SUM(total_cost), 0) as total_cost, COALESCE(SUM(actual_cost), 0) as total_actual_cost, - COALESCE(AVG(duration_ms), 0) as avg_duration_ms - FROM usage_logs + COALESCE(SUM(total_duration_ms), 0) as total_duration_ms + FROM usage_dashboard_daily ` + var totalDurationMs int64 if err := scanSingleRow( ctx, r.sql, @@ -363,30 +362,34 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalCacheReadTokens, &stats.TotalCost, &stats.TotalActualCost, - &stats.AverageDurationMs, + &totalDurationMs, ); err != nil { return nil, err } stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens + if stats.TotalRequests > 0 { + stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) + } // 今日 Token 统计 todayStatsQuery := ` SELECT - COUNT(*) as today_requests, - COALESCE(SUM(input_tokens), 0) as today_input_tokens, - COALESCE(SUM(output_tokens), 0) as today_output_tokens, - COALESCE(SUM(cache_creation_tokens), 0) as today_cache_creation_tokens, - COALESCE(SUM(cache_read_tokens), 0) as today_cache_read_tokens, - COALESCE(SUM(total_cost), 0) as today_cost, - COALESCE(SUM(actual_cost), 0) as today_actual_cost - FROM usage_logs - WHERE created_at >= $1 + total_requests as today_requests, + input_tokens as today_input_tokens, + output_tokens as today_output_tokens, + cache_creation_tokens as today_cache_creation_tokens, + cache_read_tokens as today_cache_read_tokens, + total_cost as today_cost, + actual_cost as today_actual_cost, + active_users as active_users + FROM usage_dashboard_daily + WHERE bucket_date = $1::date ` if err := scanSingleRow( ctx, r.sql, todayStatsQuery, - []any{today}, + []any{todayUTC}, &stats.TodayRequests, &stats.TodayInputTokens, &stats.TodayOutputTokens, @@ -394,11 +397,27 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TodayCacheReadTokens, &stats.TodayCost, &stats.TodayActualCost, + &stats.ActiveUsers, ); err != nil { - return nil, err + if err != sql.ErrNoRows { + return nil, err + } } stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens + // 当前小时活跃用户 + hourlyActiveQuery := ` + SELECT active_users + FROM usage_dashboard_hourly + WHERE bucket_start = $1 + ` + hourStart := now.UTC().Truncate(time.Hour) + if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + // 性能指标:RPM 和 TPM(最近1分钟,全局) rpm, tpm, err := r.getPerformanceStats(ctx, 0) if err != nil { diff --git a/backend/internal/repository/usage_log_repo_integration_test.go b/backend/internal/repository/usage_log_repo_integration_test.go index 7193718f..09341db3 100644 --- a/backend/internal/repository/usage_log_repo_integration_test.go +++ b/backend/internal/repository/usage_log_repo_integration_test.go @@ -198,8 +198,8 @@ func (s *UsageLogRepoSuite) TestListWithFilters() { // --- GetDashboardStats --- func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { - now := time.Now() - todayStart := timezone.Today() + now := time.Now().UTC() + todayStart := truncateToDayUTC(now) baseStats, err := s.repo.GetDashboardStats(s.ctx) s.Require().NoError(err, "GetDashboardStats base") @@ -273,6 +273,11 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { _, err = s.repo.Create(s.ctx, logPerf) s.Require().NoError(err, "Create logPerf") + aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx) + aggStart := todayStart.Add(-2 * time.Hour) + aggEnd := now.Add(2 * time.Minute) + s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd), "AggregateRange") + stats, err := s.repo.GetDashboardStats(s.ctx) s.Require().NoError(err, "GetDashboardStats") @@ -333,6 +338,151 @@ func (s *UsageLogRepoSuite) TestGetAccountTodayStats() { s.Require().Equal(int64(30), stats.Tokens) } +func (s *UsageLogRepoSuite) TestDashboardAggregationConsistency() { + now := time.Now().UTC().Truncate(time.Second) + hour1 := now.Add(-90 * time.Minute).Truncate(time.Hour) + hour2 := now.Add(-30 * time.Minute).Truncate(time.Hour) + dayStart := truncateToDayUTC(now) + + user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u1@test.com"}) + user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u2@test.com"}) + apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-agg-1", Name: "k1"}) + apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-agg-2", Name: "k2"}) + account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-agg"}) + + d1, d2, d3 := 100, 200, 150 + log1 := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 10, + OutputTokens: 20, + CacheCreationTokens: 2, + CacheReadTokens: 1, + TotalCost: 1.0, + ActualCost: 0.9, + DurationMs: &d1, + CreatedAt: hour1.Add(5 * time.Minute), + } + _, err := s.repo.Create(s.ctx, log1) + s.Require().NoError(err) + + log2 := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 5, + OutputTokens: 5, + TotalCost: 0.5, + ActualCost: 0.5, + DurationMs: &d2, + CreatedAt: hour1.Add(20 * time.Minute), + } + _, err = s.repo.Create(s.ctx, log2) + s.Require().NoError(err) + + log3 := &service.UsageLog{ + UserID: user2.ID, + APIKeyID: apiKey2.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 7, + OutputTokens: 8, + TotalCost: 0.7, + ActualCost: 0.7, + DurationMs: &d3, + CreatedAt: hour2.Add(10 * time.Minute), + } + _, err = s.repo.Create(s.ctx, log3) + s.Require().NoError(err) + + aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx) + aggStart := hour1.Add(-5 * time.Minute) + aggEnd := now.Add(5 * time.Minute) + s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd)) + + type hourlyRow struct { + totalRequests int64 + inputTokens int64 + outputTokens int64 + cacheCreationTokens int64 + cacheReadTokens int64 + totalCost float64 + actualCost float64 + totalDurationMs int64 + activeUsers int64 + } + fetchHourly := func(bucketStart time.Time) hourlyRow { + var row hourlyRow + err := scanSingleRow(s.ctx, s.tx, ` + SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, + total_cost, actual_cost, total_duration_ms, active_users + FROM usage_dashboard_hourly + WHERE bucket_start = $1 + `, []any{bucketStart}, &row.totalRequests, &row.inputTokens, &row.outputTokens, + &row.cacheCreationTokens, &row.cacheReadTokens, &row.totalCost, &row.actualCost, + &row.totalDurationMs, &row.activeUsers, + ) + s.Require().NoError(err) + return row + } + + hour1Row := fetchHourly(hour1) + s.Require().Equal(int64(2), hour1Row.totalRequests) + s.Require().Equal(int64(15), hour1Row.inputTokens) + s.Require().Equal(int64(25), hour1Row.outputTokens) + s.Require().Equal(int64(2), hour1Row.cacheCreationTokens) + s.Require().Equal(int64(1), hour1Row.cacheReadTokens) + s.Require().Equal(1.5, hour1Row.totalCost) + s.Require().Equal(1.4, hour1Row.actualCost) + s.Require().Equal(int64(300), hour1Row.totalDurationMs) + s.Require().Equal(int64(1), hour1Row.activeUsers) + + hour2Row := fetchHourly(hour2) + s.Require().Equal(int64(1), hour2Row.totalRequests) + s.Require().Equal(int64(7), hour2Row.inputTokens) + s.Require().Equal(int64(8), hour2Row.outputTokens) + s.Require().Equal(int64(0), hour2Row.cacheCreationTokens) + s.Require().Equal(int64(0), hour2Row.cacheReadTokens) + s.Require().Equal(0.7, hour2Row.totalCost) + s.Require().Equal(0.7, hour2Row.actualCost) + s.Require().Equal(int64(150), hour2Row.totalDurationMs) + s.Require().Equal(int64(1), hour2Row.activeUsers) + + var daily struct { + totalRequests int64 + inputTokens int64 + outputTokens int64 + cacheCreationTokens int64 + cacheReadTokens int64 + totalCost float64 + actualCost float64 + totalDurationMs int64 + activeUsers int64 + } + err = scanSingleRow(s.ctx, s.tx, ` + SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, + total_cost, actual_cost, total_duration_ms, active_users + FROM usage_dashboard_daily + WHERE bucket_date = $1::date + `, []any{dayStart}, &daily.totalRequests, &daily.inputTokens, &daily.outputTokens, + &daily.cacheCreationTokens, &daily.cacheReadTokens, &daily.totalCost, &daily.actualCost, + &daily.totalDurationMs, &daily.activeUsers, + ) + s.Require().NoError(err) + s.Require().Equal(int64(3), daily.totalRequests) + s.Require().Equal(int64(22), daily.inputTokens) + s.Require().Equal(int64(33), daily.outputTokens) + s.Require().Equal(int64(2), daily.cacheCreationTokens) + s.Require().Equal(int64(1), daily.cacheReadTokens) + s.Require().Equal(2.2, daily.totalCost) + s.Require().Equal(2.1, daily.actualCost) + s.Require().Equal(int64(450), daily.totalDurationMs) + s.Require().Equal(int64(2), daily.activeUsers) +} + // --- GetBatchUserUsageStats --- func (s *UsageLogRepoSuite) TestGetBatchUserUsageStats() { diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 1b6a7b91..8cc937bb 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -47,6 +47,7 @@ var ProviderSet = wire.NewSet( NewRedeemCodeRepository, NewPromoCodeRepository, NewUsageLogRepository, + NewDashboardAggregationRepository, NewSettingRepository, NewUserSubscriptionRepository, NewUserAttributeDefinitionRepository, diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 6f40c491..c9c5352c 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -75,6 +75,7 @@ func registerDashboardRoutes(admin *gin.RouterGroup, h *handler.Handlers) { dashboard.GET("/users-trend", h.Admin.Dashboard.GetUserUsageTrend) dashboard.POST("/users-usage", h.Admin.Dashboard.GetBatchUsersUsage) dashboard.POST("/api-keys-usage", h.Admin.Dashboard.GetBatchAPIKeysUsage) + dashboard.POST("/aggregation/backfill", h.Admin.Dashboard.BackfillAggregation) } } diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go new file mode 100644 index 00000000..343c3240 --- /dev/null +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -0,0 +1,224 @@ +package service + +import ( + "context" + "errors" + "log" + "sync/atomic" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" +) + +const ( + defaultDashboardAggregationTimeout = 2 * time.Minute + defaultDashboardAggregationBackfillTimeout = 30 * time.Minute + dashboardAggregationRetentionInterval = 6 * time.Hour +) + +var ( + // ErrDashboardBackfillDisabled 当配置禁用回填时返回。 + ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用") +) + +// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。 +type DashboardAggregationRepository interface { + AggregateRange(ctx context.Context, start, end time.Time) error + GetAggregationWatermark(ctx context.Context) (time.Time, error) + UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error + CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error + CleanupUsageLogs(ctx context.Context, cutoff time.Time) error + EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error +} + +// DashboardAggregationService 负责定时聚合与回填。 +type DashboardAggregationService struct { + repo DashboardAggregationRepository + timingWheel *TimingWheelService + cfg config.DashboardAggregationConfig + running int32 + lastRetentionCleanup atomic.Value // time.Time +} + +// NewDashboardAggregationService 创建聚合服务。 +func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService { + var aggCfg config.DashboardAggregationConfig + if cfg != nil { + aggCfg = cfg.DashboardAgg + } + return &DashboardAggregationService{ + repo: repo, + timingWheel: timingWheel, + cfg: aggCfg, + } +} + +// Start 启动定时聚合作业(重启生效配置)。 +func (s *DashboardAggregationService) Start() { + if s == nil || s.repo == nil || s.timingWheel == nil { + return + } + if !s.cfg.Enabled { + log.Printf("[DashboardAggregation] 聚合作业已禁用") + return + } + + interval := time.Duration(s.cfg.IntervalSeconds) * time.Second + if interval <= 0 { + interval = time.Minute + } + + if s.cfg.RecomputeDays > 0 { + go s.recomputeRecentDays() + } + + s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() { + s.runScheduledAggregation() + }) + log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds) +} + +// TriggerBackfill 触发回填(异步)。 +func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error { + if s == nil || s.repo == nil { + return errors.New("聚合服务未初始化") + } + if !s.cfg.BackfillEnabled { + log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false") + return ErrDashboardBackfillDisabled + } + if !end.After(start) { + return errors.New("回填时间范围无效") + } + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) + defer cancel() + if err := s.backfillRange(ctx, start, end); err != nil { + log.Printf("[DashboardAggregation] 回填失败: %v", err) + } + }() + return nil +} + +func (s *DashboardAggregationService) recomputeRecentDays() { + days := s.cfg.RecomputeDays + if days <= 0 { + return + } + now := time.Now().UTC() + start := now.AddDate(0, 0, -days) + + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) + defer cancel() + if err := s.backfillRange(ctx, start, now); err != nil { + log.Printf("[DashboardAggregation] 启动重算失败: %v", err) + return + } +} + +func (s *DashboardAggregationService) runScheduledAggregation() { + if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { + return + } + defer atomic.StoreInt32(&s.running, 0) + + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout) + defer cancel() + + now := time.Now().UTC() + last, err := s.repo.GetAggregationWatermark(ctx) + if err != nil { + log.Printf("[DashboardAggregation] 读取水位失败: %v", err) + last = time.Unix(0, 0).UTC() + } + + lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second + start := last.Add(-lookback) + epoch := time.Unix(0, 0).UTC() + if !last.After(epoch) { + start = now.Add(-lookback) + } + if start.After(now) { + start = now.Add(-lookback) + } + + if err := s.aggregateRange(ctx, start, now); err != nil { + log.Printf("[DashboardAggregation] 聚合失败: %v", err) + return + } + + if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil { + log.Printf("[DashboardAggregation] 更新水位失败: %v", err) + } + + s.maybeCleanupRetention(ctx, now) +} + +func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error { + if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { + return errors.New("聚合作业正在运行") + } + defer atomic.StoreInt32(&s.running, 0) + + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return errors.New("回填时间范围无效") + } + + cursor := truncateToDayUTC(startUTC) + for cursor.Before(endUTC) { + windowEnd := cursor.Add(24 * time.Hour) + if windowEnd.After(endUTC) { + windowEnd = endUTC + } + if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil { + return err + } + cursor = windowEnd + } + + if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil { + log.Printf("[DashboardAggregation] 更新水位失败: %v", err) + } + + s.maybeCleanupRetention(ctx, endUTC) + return nil +} + +func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error { + if !end.After(start) { + return nil + } + if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil { + log.Printf("[DashboardAggregation] 分区检查失败: %v", err) + } + return s.repo.AggregateRange(ctx, start, end) +} + +func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) { + lastAny := s.lastRetentionCleanup.Load() + if lastAny != nil { + if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval { + return + } + } + s.lastRetentionCleanup.Store(now) + + hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays) + dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays) + usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays) + + if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil { + log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err) + } + if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil { + log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err) + } +} + +func truncateToDayUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index 468135e3..40ab877d 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -37,17 +37,24 @@ type dashboardStatsCacheEntry struct { // DashboardService provides aggregated statistics for admin dashboard. type DashboardService struct { usageRepo UsageLogRepository + aggRepo DashboardAggregationRepository cache DashboardStatsCache cacheFreshTTL time.Duration cacheTTL time.Duration refreshTimeout time.Duration refreshing int32 + aggEnabled bool + aggInterval time.Duration + aggLookback time.Duration } -func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { +func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { freshTTL := defaultDashboardStatsFreshTTL cacheTTL := defaultDashboardStatsCacheTTL refreshTimeout := defaultDashboardStatsRefreshTimeout + aggEnabled := true + aggInterval := time.Minute + aggLookback := 2 * time.Minute if cfg != nil { if !cfg.Dashboard.Enabled { cache = nil @@ -61,13 +68,24 @@ func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache if cfg.Dashboard.StatsRefreshTimeoutSeconds > 0 { refreshTimeout = time.Duration(cfg.Dashboard.StatsRefreshTimeoutSeconds) * time.Second } + aggEnabled = cfg.DashboardAgg.Enabled + if cfg.DashboardAgg.IntervalSeconds > 0 { + aggInterval = time.Duration(cfg.DashboardAgg.IntervalSeconds) * time.Second + } + if cfg.DashboardAgg.LookbackSeconds > 0 { + aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second + } } return &DashboardService{ usageRepo: usageRepo, + aggRepo: aggRepo, cache: cache, cacheFreshTTL: freshTTL, cacheTTL: cacheTTL, refreshTimeout: refreshTimeout, + aggEnabled: aggEnabled, + aggInterval: aggInterval, + aggLookback: aggLookback, } } @@ -75,6 +93,7 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D if s.cache != nil { cached, fresh, err := s.getCachedDashboardStats(ctx) if err == nil && cached != nil { + s.refreshAggregationStaleness(cached) if !fresh { s.refreshDashboardStatsAsync() } @@ -133,6 +152,7 @@ func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagesta if err != nil { return nil, err } + s.applyAggregationStatus(ctx, stats) cacheCtx, cancel := s.cacheOperationContext() defer cancel() s.saveDashboardStatsCache(cacheCtx, stats) @@ -158,6 +178,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() { log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) return } + s.applyAggregationStatus(ctx, stats) cacheCtx, cancel := s.cacheOperationContext() defer cancel() s.saveDashboardStatsCache(cacheCtx, stats) @@ -203,6 +224,61 @@ func (s *DashboardService) cacheOperationContext() (context.Context, context.Can return context.WithTimeout(context.Background(), s.refreshTimeout) } +func (s *DashboardService) applyAggregationStatus(ctx context.Context, stats *usagestats.DashboardStats) { + if stats == nil { + return + } + updatedAt := s.fetchAggregationUpdatedAt(ctx) + stats.StatsUpdatedAt = updatedAt.UTC().Format(time.RFC3339) + stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC()) +} + +func (s *DashboardService) refreshAggregationStaleness(stats *usagestats.DashboardStats) { + if stats == nil { + return + } + updatedAt := parseStatsUpdatedAt(stats.StatsUpdatedAt) + stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC()) +} + +func (s *DashboardService) fetchAggregationUpdatedAt(ctx context.Context) time.Time { + if s.aggRepo == nil { + return time.Unix(0, 0).UTC() + } + updatedAt, err := s.aggRepo.GetAggregationWatermark(ctx) + if err != nil { + log.Printf("[Dashboard] 读取聚合水位失败: %v", err) + return time.Unix(0, 0).UTC() + } + if updatedAt.IsZero() { + return time.Unix(0, 0).UTC() + } + return updatedAt.UTC() +} + +func (s *DashboardService) isAggregationStale(updatedAt, now time.Time) bool { + if !s.aggEnabled { + return true + } + epoch := time.Unix(0, 0).UTC() + if !updatedAt.After(epoch) { + return true + } + threshold := s.aggInterval + s.aggLookback + return now.Sub(updatedAt) > threshold +} + +func parseStatsUpdatedAt(raw string) time.Time { + if raw == "" { + return time.Unix(0, 0).UTC() + } + parsed, err := time.Parse(time.RFC3339, raw) + if err != nil { + return time.Unix(0, 0).UTC() + } + return parsed.UTC() +} + func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) { trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit) if err != nil { diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go index 17f46ead..c7b9c6af 100644 --- a/backend/internal/service/dashboard_service_test.go +++ b/backend/internal/service/dashboard_service_test.go @@ -74,6 +74,38 @@ func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error { return nil } +type dashboardAggregationRepoStub struct { + watermark time.Time + err error +} + +func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + if s.err != nil { + return time.Time{}, s.err + } + return s.watermark, nil +} + +func (s *dashboardAggregationRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + return nil +} + func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry { t.Helper() c.lastSetMu.Lock() @@ -88,7 +120,9 @@ func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntr func TestDashboardService_CacheHitFresh(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 10, + TotalUsers: 10, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } entry := dashboardStatsCacheEntry{ Stats: stats, @@ -105,8 +139,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { repo := &usageRepoStub{ stats: &usagestats.DashboardStats{TotalUsers: 99}, } + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -118,7 +153,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 7, + TotalUsers: 7, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } cache := &dashboardCacheStub{ get: func(ctx context.Context) (string, error) { @@ -126,8 +163,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { }, } repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -142,7 +180,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 3, + TotalUsers: 3, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } cache := &dashboardCacheStub{ get: func(ctx context.Context) (string, error) { @@ -150,8 +190,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { }, } repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -163,7 +204,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { staleStats := &usagestats.DashboardStats{ - TotalUsers: 11, + TotalUsers: 11, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } entry := dashboardStatsCacheEntry{ Stats: staleStats, @@ -182,8 +225,9 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { stats: &usagestats.DashboardStats{TotalUsers: 22}, onCall: refreshCh, } + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -207,8 +251,9 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) { } stats := &usagestats.DashboardStats{TotalUsers: 9} repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -224,10 +269,45 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) { }, } repo := &usageRepoStub{err: errors.New("db down")} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) _, err := svc.GetDashboardStats(context.Background()) require.Error(t, err) require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls)) } + +func TestDashboardService_StatsUpdatedAtEpochWhenMissing(t *testing.T) { + stats := &usagestats.DashboardStats{} + repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} + svc := NewDashboardService(repo, aggRepo, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, "1970-01-01T00:00:00Z", got.StatsUpdatedAt) + require.True(t, got.StatsStale) +} + +func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) { + aggNow := time.Now().UTC().Truncate(time.Second) + stats := &usagestats.DashboardStats{} + repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: aggNow} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + IntervalSeconds: 60, + LookbackSeconds: 120, + }, + } + svc := NewDashboardService(repo, aggRepo, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt) + require.False(t, got.StatsStale) +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 54c37b54..f1074e9d 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -47,6 +47,13 @@ func ProvideTokenRefreshService( return svc } +// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务 +func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService { + svc := NewDashboardAggregationService(repo, timingWheel, cfg) + svc.Start() + return svc +} + // ProvideAccountExpiryService creates and starts AccountExpiryService. func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService { svc := NewAccountExpiryService(accountRepo, time.Minute) @@ -126,6 +133,7 @@ var ProviderSet = wire.NewSet( ProvideTokenRefreshService, ProvideAccountExpiryService, ProvideTimingWheelService, + ProvideDashboardAggregationService, ProvideDeferredService, NewAntigravityQuotaFetcher, NewUserAttributeService, diff --git a/backend/migrations/034_usage_dashboard_aggregation_tables.sql b/backend/migrations/034_usage_dashboard_aggregation_tables.sql new file mode 100644 index 00000000..64b383d4 --- /dev/null +++ b/backend/migrations/034_usage_dashboard_aggregation_tables.sql @@ -0,0 +1,77 @@ +-- Usage dashboard aggregation tables (hourly/daily) + active-user dedup + watermark. +-- These tables support Admin Dashboard statistics without full-table scans on usage_logs. + +-- Hourly aggregates (UTC buckets). +CREATE TABLE IF NOT EXISTS usage_dashboard_hourly ( + bucket_start TIMESTAMPTZ PRIMARY KEY, + total_requests BIGINT NOT NULL DEFAULT 0, + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_creation_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + total_duration_ms BIGINT NOT NULL DEFAULT 0, + active_users BIGINT NOT NULL DEFAULT 0, + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_bucket_start + ON usage_dashboard_hourly (bucket_start DESC); + +COMMENT ON TABLE usage_dashboard_hourly IS 'Pre-aggregated hourly usage metrics for admin dashboard (UTC buckets).'; +COMMENT ON COLUMN usage_dashboard_hourly.bucket_start IS 'UTC start timestamp of the hour bucket.'; +COMMENT ON COLUMN usage_dashboard_hourly.computed_at IS 'When the hourly row was last computed/refreshed.'; + +-- Daily aggregates (UTC dates). +CREATE TABLE IF NOT EXISTS usage_dashboard_daily ( + bucket_date DATE PRIMARY KEY, + total_requests BIGINT NOT NULL DEFAULT 0, + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_creation_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + total_duration_ms BIGINT NOT NULL DEFAULT 0, + active_users BIGINT NOT NULL DEFAULT 0, + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_bucket_date + ON usage_dashboard_daily (bucket_date DESC); + +COMMENT ON TABLE usage_dashboard_daily IS 'Pre-aggregated daily usage metrics for admin dashboard (UTC dates).'; +COMMENT ON COLUMN usage_dashboard_daily.bucket_date IS 'UTC date of the day bucket.'; +COMMENT ON COLUMN usage_dashboard_daily.computed_at IS 'When the daily row was last computed/refreshed.'; + +-- Hourly active user dedup table. +CREATE TABLE IF NOT EXISTS usage_dashboard_hourly_users ( + bucket_start TIMESTAMPTZ NOT NULL, + user_id BIGINT NOT NULL, + PRIMARY KEY (bucket_start, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_users_bucket_start + ON usage_dashboard_hourly_users (bucket_start); + +-- Daily active user dedup table. +CREATE TABLE IF NOT EXISTS usage_dashboard_daily_users ( + bucket_date DATE NOT NULL, + user_id BIGINT NOT NULL, + PRIMARY KEY (bucket_date, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_users_bucket_date + ON usage_dashboard_daily_users (bucket_date); + +-- Aggregation watermark table (single row). +CREATE TABLE IF NOT EXISTS usage_dashboard_aggregation_watermark ( + id INT PRIMARY KEY, + last_aggregated_at TIMESTAMPTZ NOT NULL DEFAULT TIMESTAMPTZ '1970-01-01 00:00:00+00', + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +INSERT INTO usage_dashboard_aggregation_watermark (id) +VALUES (1) +ON CONFLICT (id) DO NOTHING; diff --git a/backend/migrations/035_usage_logs_partitioning.sql b/backend/migrations/035_usage_logs_partitioning.sql new file mode 100644 index 00000000..5919b5c3 --- /dev/null +++ b/backend/migrations/035_usage_logs_partitioning.sql @@ -0,0 +1,54 @@ +-- usage_logs monthly partition bootstrap. +-- Only converts to partitioned table when usage_logs is empty. +-- Existing installations with data require a manual migration plan. + +DO $$ +DECLARE + is_partitioned BOOLEAN := FALSE; + has_data BOOLEAN := FALSE; + month_start DATE; + prev_month DATE; + next_month DATE; +BEGIN + SELECT EXISTS( + SELECT 1 + FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + WHERE c.relname = 'usage_logs' + ) INTO is_partitioned; + + IF NOT is_partitioned THEN + SELECT EXISTS(SELECT 1 FROM usage_logs LIMIT 1) INTO has_data; + IF NOT has_data THEN + EXECUTE 'ALTER TABLE usage_logs PARTITION BY RANGE (created_at)'; + is_partitioned := TRUE; + END IF; + END IF; + + IF is_partitioned THEN + month_start := date_trunc('month', now() AT TIME ZONE 'UTC')::date; + prev_month := (month_start - INTERVAL '1 month')::date; + next_month := (month_start + INTERVAL '1 month')::date; + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(prev_month, 'YYYYMM'), + prev_month, + month_start + ); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(month_start, 'YYYYMM'), + month_start, + next_month + ); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(next_month, 'YYYYMM'), + next_month, + (next_month + INTERVAL '1 month')::date + ); + END IF; +END $$; diff --git a/config.yaml b/config.yaml index ffc070a0..848421d6 100644 --- a/config.yaml +++ b/config.yaml @@ -215,6 +215,39 @@ dashboard_cache: # 异步刷新超时(秒) stats_refresh_timeout_seconds: 30 +# ============================================================================= +# Dashboard Aggregation Configuration +# 仪表盘预聚合配置(重启生效) +# ============================================================================= +dashboard_aggregation: + # Enable aggregation job + # 启用聚合作业 + enabled: true + # Refresh interval (seconds) + # 刷新间隔(秒) + interval_seconds: 60 + # Lookback window (seconds) for late-arriving data + # 回看窗口(秒),处理迟到数据 + lookback_seconds: 120 + # Allow manual backfill + # 允许手动回填 + backfill_enabled: false + # Recompute recent N days on startup + # 启动时重算最近 N 天 + recompute_days: 2 + # Retention windows (days) + # 保留窗口(天) + retention: + # Raw usage_logs retention + # 原始 usage_logs 保留天数 + usage_logs_days: 90 + # Hourly aggregation retention + # 小时聚合保留天数 + hourly_days: 180 + # Daily aggregation retention + # 日聚合保留天数 + daily_days: 730 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 7083f9e9..460606ab 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -215,6 +215,39 @@ dashboard_cache: # 异步刷新超时(秒) stats_refresh_timeout_seconds: 30 +# ============================================================================= +# Dashboard Aggregation Configuration +# 仪表盘预聚合配置(重启生效) +# ============================================================================= +dashboard_aggregation: + # Enable aggregation job + # 启用聚合作业 + enabled: true + # Refresh interval (seconds) + # 刷新间隔(秒) + interval_seconds: 60 + # Lookback window (seconds) for late-arriving data + # 回看窗口(秒),处理迟到数据 + lookback_seconds: 120 + # Allow manual backfill + # 允许手动回填 + backfill_enabled: false + # Recompute recent N days on startup + # 启动时重算最近 N 天 + recompute_days: 2 + # Retention windows (days) + # 保留窗口(天) + retention: + # Raw usage_logs retention + # 原始 usage_logs 保留天数 + usage_logs_days: 90 + # Hourly aggregation retention + # 小时聚合保留天数 + hourly_days: 180 + # Daily aggregation retention + # 日聚合保留天数 + daily_days: 730 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 3d1b17f6..98718b19 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -651,6 +651,9 @@ export interface DashboardStats { total_users: number today_new_users: number // 今日新增用户数 active_users: number // 今日有请求的用户数 + hourly_active_users: number // 当前小时活跃用户数(UTC) + stats_updated_at: string // 统计更新时间(UTC RFC3339) + stats_stale: boolean // 统计是否过期 // API Key 统计 total_api_keys: number