From c2c865b0cbace797a295d4ded6b0806d328b815a Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 10:07:03 +0800 Subject: [PATCH 1/6] =?UTF-8?q?perf(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20?= =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E7=BB=9F=E8=AE=A1=E7=BC=93=E5=AD=98=E4=B8=8E?= =?UTF-8?q?=E9=9A=94=E7=A6=BB=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增仪表盘缓存开关与 TTL 配置,支持 Redis key 前缀隔离,并补充单测与校验。 测试: make test-backend --- backend/cmd/server/wire_gen.go | 3 +- backend/internal/config/config.go | 47 +++++ backend/internal/config/config_test.go | 64 ++++++ .../internal/repository/dashboard_cache.go | 51 +++++ backend/internal/repository/wire.go | 1 + backend/internal/service/dashboard_service.go | 146 +++++++++++++- .../service/dashboard_service_test.go | 189 ++++++++++++++++++ config.yaml | 21 ++ deploy/config.example.yaml | 21 ++ 9 files changed, 536 insertions(+), 7 deletions(-) create mode 100644 backend/internal/repository/dashboard_cache.go create mode 100644 backend/internal/service/dashboard_service_test.go diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 95a7b30b..4fb8351e 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -75,7 +75,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { redeemService := service.NewRedeemService(redeemCodeRepository, userRepository, subscriptionService, redeemCache, billingCacheService, client, apiKeyAuthCacheInvalidator) redeemHandler := handler.NewRedeemHandler(redeemService) subscriptionHandler := handler.NewSubscriptionHandler(subscriptionService) - dashboardService := service.NewDashboardService(usageLogRepository) + dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig) + dashboardService := service.NewDashboardService(usageLogRepository, dashboardStatsCache, configConfig) dashboardHandler := admin.NewDashboardHandler(dashboardService) accountRepository := repository.NewAccountRepository(client, db) proxyRepository := repository.NewProxyRepository(client, db) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 29eaa42e..677d0c6e 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -50,6 +50,7 @@ type Config struct { Pricing PricingConfig `mapstructure:"pricing"` Gateway GatewayConfig `mapstructure:"gateway"` APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"` + Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"` Concurrency ConcurrencyConfig `mapstructure:"concurrency"` TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"` RunMode string `mapstructure:"run_mode" yaml:"run_mode"` @@ -372,6 +373,20 @@ type APIKeyAuthCacheConfig struct { Singleflight bool `mapstructure:"singleflight"` } +// DashboardCacheConfig 仪表盘统计缓存配置 +type DashboardCacheConfig struct { + // Enabled: 是否启用仪表盘缓存 + Enabled bool `mapstructure:"enabled"` + // KeyPrefix: Redis key 前缀,用于多环境隔离 + KeyPrefix string `mapstructure:"key_prefix"` + // StatsFreshTTLSeconds: 缓存命中认为“新鲜”的时间窗口(秒) + StatsFreshTTLSeconds int `mapstructure:"stats_fresh_ttl_seconds"` + // StatsTTLSeconds: Redis 缓存总 TTL(秒) + StatsTTLSeconds int `mapstructure:"stats_ttl_seconds"` + // StatsRefreshTimeoutSeconds: 异步刷新超时(秒) + StatsRefreshTimeoutSeconds int `mapstructure:"stats_refresh_timeout_seconds"` +} + func NormalizeRunMode(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) switch normalized { @@ -437,6 +452,7 @@ func Load() (*Config, error) { cfg.LinuxDo.UserInfoEmailPath = strings.TrimSpace(cfg.LinuxDo.UserInfoEmailPath) cfg.LinuxDo.UserInfoIDPath = strings.TrimSpace(cfg.LinuxDo.UserInfoIDPath) cfg.LinuxDo.UserInfoUsernamePath = strings.TrimSpace(cfg.LinuxDo.UserInfoUsernamePath) + cfg.Dashboard.KeyPrefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix) cfg.CORS.AllowedOrigins = normalizeStringSlice(cfg.CORS.AllowedOrigins) cfg.Security.ResponseHeaders.AdditionalAllowed = normalizeStringSlice(cfg.Security.ResponseHeaders.AdditionalAllowed) cfg.Security.ResponseHeaders.ForceRemove = normalizeStringSlice(cfg.Security.ResponseHeaders.ForceRemove) @@ -674,6 +690,13 @@ func setDefaults() { viper.SetDefault("api_key_auth_cache.jitter_percent", 10) viper.SetDefault("api_key_auth_cache.singleflight", true) + // Dashboard cache + viper.SetDefault("dashboard_cache.enabled", true) + viper.SetDefault("dashboard_cache.key_prefix", "sub2api:") + viper.SetDefault("dashboard_cache.stats_fresh_ttl_seconds", 15) + viper.SetDefault("dashboard_cache.stats_ttl_seconds", 30) + viper.SetDefault("dashboard_cache.stats_refresh_timeout_seconds", 30) + // Gateway viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久 viper.SetDefault("gateway.log_upstream_error_body", false) @@ -832,6 +855,30 @@ func (c *Config) Validate() error { if c.Redis.MinIdleConns > c.Redis.PoolSize { return fmt.Errorf("redis.min_idle_conns cannot exceed redis.pool_size") } + if c.Dashboard.Enabled { + if c.Dashboard.StatsFreshTTLSeconds <= 0 { + return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be positive") + } + if c.Dashboard.StatsTTLSeconds <= 0 { + return fmt.Errorf("dashboard_cache.stats_ttl_seconds must be positive") + } + if c.Dashboard.StatsRefreshTimeoutSeconds <= 0 { + return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be positive") + } + if c.Dashboard.StatsFreshTTLSeconds > c.Dashboard.StatsTTLSeconds { + return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be <= dashboard_cache.stats_ttl_seconds") + } + } else { + if c.Dashboard.StatsFreshTTLSeconds < 0 { + return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be non-negative") + } + if c.Dashboard.StatsTTLSeconds < 0 { + return fmt.Errorf("dashboard_cache.stats_ttl_seconds must be non-negative") + } + if c.Dashboard.StatsRefreshTimeoutSeconds < 0 { + return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be non-negative") + } + } if c.Gateway.MaxBodySize <= 0 { return fmt.Errorf("gateway.max_body_size must be positive") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index a39d41f9..6cd95b1c 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -141,3 +141,67 @@ func TestValidateLinuxDoPKCERequiredForPublicClient(t *testing.T) { t.Fatalf("Validate() expected use_pkce error, got: %v", err) } } + +func TestLoadDefaultDashboardCacheConfig(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if !cfg.Dashboard.Enabled { + t.Fatalf("Dashboard.Enabled = false, want true") + } + if cfg.Dashboard.KeyPrefix != "sub2api:" { + t.Fatalf("Dashboard.KeyPrefix = %q, want %q", cfg.Dashboard.KeyPrefix, "sub2api:") + } + if cfg.Dashboard.StatsFreshTTLSeconds != 15 { + t.Fatalf("Dashboard.StatsFreshTTLSeconds = %d, want 15", cfg.Dashboard.StatsFreshTTLSeconds) + } + if cfg.Dashboard.StatsTTLSeconds != 30 { + t.Fatalf("Dashboard.StatsTTLSeconds = %d, want 30", cfg.Dashboard.StatsTTLSeconds) + } + if cfg.Dashboard.StatsRefreshTimeoutSeconds != 30 { + t.Fatalf("Dashboard.StatsRefreshTimeoutSeconds = %d, want 30", cfg.Dashboard.StatsRefreshTimeoutSeconds) + } +} + +func TestValidateDashboardCacheConfigEnabled(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.Dashboard.Enabled = true + cfg.Dashboard.StatsFreshTTLSeconds = 10 + cfg.Dashboard.StatsTTLSeconds = 5 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for stats_fresh_ttl_seconds > stats_ttl_seconds, got nil") + } + if !strings.Contains(err.Error(), "dashboard_cache.stats_fresh_ttl_seconds") { + t.Fatalf("Validate() expected stats_fresh_ttl_seconds error, got: %v", err) + } +} + +func TestValidateDashboardCacheConfigDisabled(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.Dashboard.Enabled = false + cfg.Dashboard.StatsTTLSeconds = -1 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for negative stats_ttl_seconds, got nil") + } + if !strings.Contains(err.Error(), "dashboard_cache.stats_ttl_seconds") { + t.Fatalf("Validate() expected stats_ttl_seconds error, got: %v", err) + } +} diff --git a/backend/internal/repository/dashboard_cache.go b/backend/internal/repository/dashboard_cache.go new file mode 100644 index 00000000..ec6ef25c --- /dev/null +++ b/backend/internal/repository/dashboard_cache.go @@ -0,0 +1,51 @@ +package repository + +import ( + "context" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/redis/go-redis/v9" +) + +const dashboardStatsCacheKey = "dashboard:stats:v1" + +type dashboardCache struct { + rdb *redis.Client + keyPrefix string +} + +func NewDashboardCache(rdb *redis.Client, cfg *config.Config) service.DashboardStatsCache { + prefix := "sub2api:" + if cfg != nil { + prefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix) + } + return &dashboardCache{ + rdb: rdb, + keyPrefix: prefix, + } +} + +func (c *dashboardCache) GetDashboardStats(ctx context.Context) (string, error) { + val, err := c.rdb.Get(ctx, c.buildKey()).Result() + if err != nil { + if err == redis.Nil { + return "", service.ErrDashboardStatsCacheMiss + } + return "", err + } + return val, nil +} + +func (c *dashboardCache) SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error { + return c.rdb.Set(ctx, c.buildKey(), data, ttl).Err() +} + +func (c *dashboardCache) buildKey() string { + if c.keyPrefix == "" { + return dashboardStatsCacheKey + } + return c.keyPrefix + dashboardStatsCacheKey +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 0a6118e2..1b6a7b91 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -58,6 +58,7 @@ var ProviderSet = wire.NewSet( NewAPIKeyCache, NewTempUnschedCache, ProvideConcurrencyCache, + NewDashboardCache, NewEmailCache, NewIdentityCache, NewRedeemCache, diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index f0b1f2a0..f56480d3 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -2,25 +2,89 @@ package service import ( "context" + "encoding/json" + "errors" "fmt" + "log" + "sync/atomic" "time" + "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" ) -// DashboardService provides aggregated statistics for admin dashboard. -type DashboardService struct { - usageRepo UsageLogRepository +const ( + defaultDashboardStatsFreshTTL = 15 * time.Second + defaultDashboardStatsCacheTTL = 30 * time.Second + defaultDashboardStatsRefreshTimeout = 30 * time.Second +) + +// ErrDashboardStatsCacheMiss 标记仪表盘缓存未命中。 +var ErrDashboardStatsCacheMiss = errors.New("仪表盘缓存未命中") + +// DashboardStatsCache 定义仪表盘统计缓存接口。 +type DashboardStatsCache interface { + GetDashboardStats(ctx context.Context) (string, error) + SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error } -func NewDashboardService(usageRepo UsageLogRepository) *DashboardService { +type dashboardStatsCacheEntry struct { + Stats *usagestats.DashboardStats `json:"stats"` + UpdatedAt int64 `json:"updated_at"` +} + +// DashboardService provides aggregated statistics for admin dashboard. +type DashboardService struct { + usageRepo UsageLogRepository + cache DashboardStatsCache + cacheFreshTTL time.Duration + cacheTTL time.Duration + refreshTimeout time.Duration + refreshing int32 +} + +func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { + freshTTL := defaultDashboardStatsFreshTTL + cacheTTL := defaultDashboardStatsCacheTTL + refreshTimeout := defaultDashboardStatsRefreshTimeout + if cfg != nil { + if !cfg.Dashboard.Enabled { + cache = nil + } + if cfg.Dashboard.StatsFreshTTLSeconds > 0 { + freshTTL = time.Duration(cfg.Dashboard.StatsFreshTTLSeconds) * time.Second + } + if cfg.Dashboard.StatsTTLSeconds > 0 { + cacheTTL = time.Duration(cfg.Dashboard.StatsTTLSeconds) * time.Second + } + if cfg.Dashboard.StatsRefreshTimeoutSeconds > 0 { + refreshTimeout = time.Duration(cfg.Dashboard.StatsRefreshTimeoutSeconds) * time.Second + } + } return &DashboardService{ - usageRepo: usageRepo, + usageRepo: usageRepo, + cache: cache, + cacheFreshTTL: freshTTL, + cacheTTL: cacheTTL, + refreshTimeout: refreshTimeout, } } func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { - stats, err := s.usageRepo.GetDashboardStats(ctx) + if s.cache != nil { + cached, fresh, err := s.getCachedDashboardStats(ctx) + if err == nil && cached != nil { + if !fresh { + s.refreshDashboardStatsAsync() + } + return cached, nil + } + if err != nil && !errors.Is(err, ErrDashboardStatsCacheMiss) { + log.Printf("[Dashboard] 仪表盘缓存读取失败: %v", err) + } + } + + stats, err := s.refreshDashboardStats(ctx) if err != nil { return nil, fmt.Errorf("get dashboard stats: %w", err) } @@ -43,6 +107,76 @@ func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTi return stats, nil } +func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usagestats.DashboardStats, bool, error) { + data, err := s.cache.GetDashboardStats(ctx) + if err != nil { + return nil, false, err + } + + var entry dashboardStatsCacheEntry + if err := json.Unmarshal([]byte(data), &entry); err != nil { + return nil, false, err + } + if entry.Stats == nil { + return nil, false, errors.New("仪表盘缓存缺少统计数据") + } + + age := time.Since(time.Unix(entry.UpdatedAt, 0)) + return entry.Stats, age <= s.cacheFreshTTL, nil +} + +func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { + stats, err := s.usageRepo.GetDashboardStats(ctx) + if err != nil { + return nil, err + } + s.saveDashboardStatsCache(ctx, stats) + return stats, nil +} + +func (s *DashboardService) refreshDashboardStatsAsync() { + if s.cache == nil { + return + } + if !atomic.CompareAndSwapInt32(&s.refreshing, 0, 1) { + return + } + + go func() { + defer atomic.StoreInt32(&s.refreshing, 0) + + ctx, cancel := context.WithTimeout(context.Background(), s.refreshTimeout) + defer cancel() + + stats, err := s.usageRepo.GetDashboardStats(ctx) + if err != nil { + log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) + return + } + s.saveDashboardStatsCache(ctx, stats) + }() +} + +func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *usagestats.DashboardStats) { + if s.cache == nil || stats == nil { + return + } + + entry := dashboardStatsCacheEntry{ + Stats: stats, + UpdatedAt: time.Now().Unix(), + } + data, err := json.Marshal(entry) + if err != nil { + log.Printf("[Dashboard] 仪表盘缓存序列化失败: %v", err) + return + } + + if err := s.cache.SetDashboardStats(ctx, string(data), s.cacheTTL); err != nil { + log.Printf("[Dashboard] 仪表盘缓存写入失败: %v", err) + } +} + func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) { trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit) if err != nil { diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go new file mode 100644 index 00000000..21d7b580 --- /dev/null +++ b/backend/internal/service/dashboard_service_test.go @@ -0,0 +1,189 @@ +package service + +import ( + "context" + "encoding/json" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" + "github.com/stretchr/testify/require" +) + +type usageRepoStub struct { + UsageLogRepository + stats *usagestats.DashboardStats + err error + calls int32 + onCall chan struct{} +} + +func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { + atomic.AddInt32(&s.calls, 1) + if s.onCall != nil { + select { + case s.onCall <- struct{}{}: + default: + } + } + if s.err != nil { + return nil, s.err + } + return s.stats, nil +} + +type dashboardCacheStub struct { + get func(ctx context.Context) (string, error) + set func(ctx context.Context, data string, ttl time.Duration) error + getCalls int32 + setCalls int32 + lastSetMu sync.Mutex + lastSet string +} + +func (c *dashboardCacheStub) GetDashboardStats(ctx context.Context) (string, error) { + atomic.AddInt32(&c.getCalls, 1) + if c.get != nil { + return c.get(ctx) + } + return "", ErrDashboardStatsCacheMiss +} + +func (c *dashboardCacheStub) SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error { + atomic.AddInt32(&c.setCalls, 1) + c.lastSetMu.Lock() + c.lastSet = data + c.lastSetMu.Unlock() + if c.set != nil { + return c.set(ctx, data, ttl) + } + return nil +} + +func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry { + t.Helper() + c.lastSetMu.Lock() + data := c.lastSet + c.lastSetMu.Unlock() + + var entry dashboardStatsCacheEntry + err := json.Unmarshal([]byte(data), &entry) + require.NoError(t, err) + return entry +} + +func TestDashboardService_CacheHitFresh(t *testing.T) { + stats := &usagestats.DashboardStats{ + TotalUsers: 10, + } + entry := dashboardStatsCacheEntry{ + Stats: stats, + UpdatedAt: time.Now().Unix(), + } + payload, err := json.Marshal(entry) + require.NoError(t, err) + + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return string(payload), nil + }, + } + repo := &usageRepoStub{ + stats: &usagestats.DashboardStats{TotalUsers: 99}, + } + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + svc := NewDashboardService(repo, cache, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, stats, got) + require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls)) + require.Equal(t, int32(1), atomic.LoadInt32(&cache.getCalls)) + require.Equal(t, int32(0), atomic.LoadInt32(&cache.setCalls)) +} + +func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { + stats := &usagestats.DashboardStats{ + TotalUsers: 7, + } + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return "", ErrDashboardStatsCacheMiss + }, + } + repo := &usageRepoStub{stats: stats} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + svc := NewDashboardService(repo, cache, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, stats, got) + require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls)) + require.Equal(t, int32(1), atomic.LoadInt32(&cache.getCalls)) + require.Equal(t, int32(1), atomic.LoadInt32(&cache.setCalls)) + entry := cache.readLastEntry(t) + require.Equal(t, stats, entry.Stats) + require.WithinDuration(t, time.Now(), time.Unix(entry.UpdatedAt, 0), time.Second) +} + +func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { + stats := &usagestats.DashboardStats{ + TotalUsers: 3, + } + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return "", nil + }, + } + repo := &usageRepoStub{stats: stats} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} + svc := NewDashboardService(repo, cache, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, stats, got) + require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls)) + require.Equal(t, int32(0), atomic.LoadInt32(&cache.getCalls)) + require.Equal(t, int32(0), atomic.LoadInt32(&cache.setCalls)) +} + +func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { + staleStats := &usagestats.DashboardStats{ + TotalUsers: 11, + } + entry := dashboardStatsCacheEntry{ + Stats: staleStats, + UpdatedAt: time.Now().Add(-defaultDashboardStatsFreshTTL * 2).Unix(), + } + payload, err := json.Marshal(entry) + require.NoError(t, err) + + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return string(payload), nil + }, + } + refreshCh := make(chan struct{}, 1) + repo := &usageRepoStub{ + stats: &usagestats.DashboardStats{TotalUsers: 22}, + onCall: refreshCh, + } + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + svc := NewDashboardService(repo, cache, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, staleStats, got) + + select { + case <-refreshCh: + case <-time.After(1 * time.Second): + t.Fatal("等待异步刷新超时") + } + require.Eventually(t, func() bool { + return atomic.LoadInt32(&cache.setCalls) >= 1 + }, 1*time.Second, 10*time.Millisecond) +} diff --git a/config.yaml b/config.yaml index ecd7dfc2..ffc070a0 100644 --- a/config.yaml +++ b/config.yaml @@ -194,6 +194,27 @@ api_key_auth_cache: # 缓存未命中时启用 singleflight 合并回源 singleflight: true +# ============================================================================= +# Dashboard Cache Configuration +# 仪表盘缓存配置 +# ============================================================================= +dashboard_cache: + # Enable dashboard cache + # 启用仪表盘缓存 + enabled: true + # Redis key prefix for multi-environment isolation + # Redis key 前缀,用于多环境隔离 + key_prefix: "sub2api:" + # Fresh TTL (seconds); within this window cached stats are considered fresh + # 新鲜阈值(秒);命中后处于该窗口视为新鲜数据 + stats_fresh_ttl_seconds: 15 + # Cache TTL (seconds) stored in Redis + # Redis 缓存 TTL(秒) + stats_ttl_seconds: 30 + # Async refresh timeout (seconds) + # 异步刷新超时(秒) + stats_refresh_timeout_seconds: 30 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 87abffa0..7083f9e9 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -194,6 +194,27 @@ api_key_auth_cache: # 缓存未命中时启用 singleflight 合并回源 singleflight: true +# ============================================================================= +# Dashboard Cache Configuration +# 仪表盘缓存配置 +# ============================================================================= +dashboard_cache: + # Enable dashboard cache + # 启用仪表盘缓存 + enabled: true + # Redis key prefix for multi-environment isolation + # Redis key 前缀,用于多环境隔离 + key_prefix: "sub2api:" + # Fresh TTL (seconds); within this window cached stats are considered fresh + # 新鲜阈值(秒);命中后处于该窗口视为新鲜数据 + stats_fresh_ttl_seconds: 15 + # Cache TTL (seconds) stored in Redis + # Redis 缓存 TTL(秒) + stats_ttl_seconds: 30 + # Async refresh timeout (seconds) + # 异步刷新超时(秒) + stats_refresh_timeout_seconds: 30 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 From ab5839b461e82bb4784394a0d6be6a17243d0fab Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 15:00:16 +0800 Subject: [PATCH 2/6] =?UTF-8?q?fix(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BC=93=E5=AD=98=E7=A8=B3=E5=AE=9A=E6=80=A7?= =?UTF-8?q?=E5=B9=B6=E8=A1=A5=E5=85=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/repository/dashboard_cache.go | 7 +++ .../repository/dashboard_cache_test.go | 28 ++++++++++++ backend/internal/service/dashboard_service.go | 34 ++++++++++++-- .../service/dashboard_service_test.go | 44 +++++++++++++++++++ 4 files changed, 109 insertions(+), 4 deletions(-) create mode 100644 backend/internal/repository/dashboard_cache_test.go diff --git a/backend/internal/repository/dashboard_cache.go b/backend/internal/repository/dashboard_cache.go index ec6ef25c..f996cd68 100644 --- a/backend/internal/repository/dashboard_cache.go +++ b/backend/internal/repository/dashboard_cache.go @@ -22,6 +22,9 @@ func NewDashboardCache(rdb *redis.Client, cfg *config.Config) service.DashboardS if cfg != nil { prefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix) } + if prefix != "" && !strings.HasSuffix(prefix, ":") { + prefix += ":" + } return &dashboardCache{ rdb: rdb, keyPrefix: prefix, @@ -49,3 +52,7 @@ func (c *dashboardCache) buildKey() string { } return c.keyPrefix + dashboardStatsCacheKey } + +func (c *dashboardCache) DeleteDashboardStats(ctx context.Context) error { + return c.rdb.Del(ctx, c.buildKey()).Err() +} diff --git a/backend/internal/repository/dashboard_cache_test.go b/backend/internal/repository/dashboard_cache_test.go new file mode 100644 index 00000000..3bb0da4f --- /dev/null +++ b/backend/internal/repository/dashboard_cache_test.go @@ -0,0 +1,28 @@ +package repository + +import ( + "testing" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/stretchr/testify/require" +) + +func TestNewDashboardCacheKeyPrefix(t *testing.T) { + cache := NewDashboardCache(nil, &config.Config{ + Dashboard: config.DashboardCacheConfig{ + KeyPrefix: "prod", + }, + }) + impl, ok := cache.(*dashboardCache) + require.True(t, ok) + require.Equal(t, "prod:", impl.keyPrefix) + + cache = NewDashboardCache(nil, &config.Config{ + Dashboard: config.DashboardCacheConfig{ + KeyPrefix: "staging:", + }, + }) + impl, ok = cache.(*dashboardCache) + require.True(t, ok) + require.Equal(t, "staging:", impl.keyPrefix) +} diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index f56480d3..468135e3 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -26,6 +26,7 @@ var ErrDashboardStatsCacheMiss = errors.New("仪表盘缓存未命中") type DashboardStatsCache interface { GetDashboardStats(ctx context.Context) (string, error) SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error + DeleteDashboardStats(ctx context.Context) error } type dashboardStatsCacheEntry struct { @@ -115,10 +116,12 @@ func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usages var entry dashboardStatsCacheEntry if err := json.Unmarshal([]byte(data), &entry); err != nil { - return nil, false, err + s.evictDashboardStatsCache(err) + return nil, false, ErrDashboardStatsCacheMiss } if entry.Stats == nil { - return nil, false, errors.New("仪表盘缓存缺少统计数据") + s.evictDashboardStatsCache(errors.New("仪表盘缓存缺少统计数据")) + return nil, false, ErrDashboardStatsCacheMiss } age := time.Since(time.Unix(entry.UpdatedAt, 0)) @@ -130,7 +133,9 @@ func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagesta if err != nil { return nil, err } - s.saveDashboardStatsCache(ctx, stats) + cacheCtx, cancel := s.cacheOperationContext() + defer cancel() + s.saveDashboardStatsCache(cacheCtx, stats) return stats, nil } @@ -153,7 +158,9 @@ func (s *DashboardService) refreshDashboardStatsAsync() { log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) return } - s.saveDashboardStatsCache(ctx, stats) + cacheCtx, cancel := s.cacheOperationContext() + defer cancel() + s.saveDashboardStatsCache(cacheCtx, stats) }() } @@ -177,6 +184,25 @@ func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *u } } +func (s *DashboardService) evictDashboardStatsCache(reason error) { + if s.cache == nil { + return + } + cacheCtx, cancel := s.cacheOperationContext() + defer cancel() + + if err := s.cache.DeleteDashboardStats(cacheCtx); err != nil { + log.Printf("[Dashboard] 仪表盘缓存清理失败: %v", err) + } + if reason != nil { + log.Printf("[Dashboard] 仪表盘缓存异常,已清理: %v", reason) + } +} + +func (s *DashboardService) cacheOperationContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), s.refreshTimeout) +} + func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) { trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit) if err != nil { diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go index 21d7b580..17f46ead 100644 --- a/backend/internal/service/dashboard_service_test.go +++ b/backend/internal/service/dashboard_service_test.go @@ -3,6 +3,7 @@ package service import ( "context" "encoding/json" + "errors" "sync" "sync/atomic" "testing" @@ -38,8 +39,10 @@ func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.Dash type dashboardCacheStub struct { get func(ctx context.Context) (string, error) set func(ctx context.Context, data string, ttl time.Duration) error + del func(ctx context.Context) error getCalls int32 setCalls int32 + delCalls int32 lastSetMu sync.Mutex lastSet string } @@ -63,6 +66,14 @@ func (c *dashboardCacheStub) SetDashboardStats(ctx context.Context, data string, return nil } +func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error { + atomic.AddInt32(&c.delCalls, 1) + if c.del != nil { + return c.del(ctx) + } + return nil +} + func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry { t.Helper() c.lastSetMu.Lock() @@ -187,3 +198,36 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { return atomic.LoadInt32(&cache.setCalls) >= 1 }, 1*time.Second, 10*time.Millisecond) } + +func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) { + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return "not-json", nil + }, + } + stats := &usagestats.DashboardStats{TotalUsers: 9} + repo := &usageRepoStub{stats: stats} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + svc := NewDashboardService(repo, cache, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, stats, got) + require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls)) + require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls)) +} + +func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) { + cache := &dashboardCacheStub{ + get: func(ctx context.Context) (string, error) { + return "not-json", nil + }, + } + repo := &usageRepoStub{err: errors.New("db down")} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + svc := NewDashboardService(repo, cache, cfg) + + _, err := svc.GetDashboardStats(context.Background()) + require.Error(t, err) + require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls)) +} From 1a869547d717aeb7f7c2bbd99a55370680ac616f Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 16:01:35 +0800 Subject: [PATCH 3/6] =?UTF-8?q?feat(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20?= =?UTF-8?q?=E5=BC=95=E5=85=A5=E9=A2=84=E8=81=9A=E5=90=88=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E4=B8=8E=E8=81=9A=E5=90=88=E4=BD=9C=E4=B8=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/cmd/server/wire_gen.go | 8 +- backend/internal/config/config.go | 115 +++++- backend/internal/config/config_test.go | 53 +++ .../handler/admin/dashboard_handler.go | 61 ++- .../pkg/usagestats/usage_log_types.go | 6 + .../repository/dashboard_aggregation_repo.go | 360 ++++++++++++++++++ backend/internal/repository/usage_log_repo.go | 59 ++- .../usage_log_repo_integration_test.go | 154 +++++++- backend/internal/repository/wire.go | 1 + backend/internal/server/routes/admin.go | 1 + .../service/dashboard_aggregation_service.go | 224 +++++++++++ backend/internal/service/dashboard_service.go | 78 +++- .../service/dashboard_service_test.go | 100 ++++- backend/internal/service/wire.go | 8 + ...034_usage_dashboard_aggregation_tables.sql | 77 ++++ .../035_usage_logs_partitioning.sql | 54 +++ config.yaml | 33 ++ deploy/config.example.yaml | 33 ++ frontend/src/types/index.ts | 3 + 19 files changed, 1366 insertions(+), 62 deletions(-) create mode 100644 backend/internal/repository/dashboard_aggregation_repo.go create mode 100644 backend/internal/service/dashboard_aggregation_service.go create mode 100644 backend/migrations/034_usage_dashboard_aggregation_tables.sql create mode 100644 backend/migrations/035_usage_logs_partitioning.sql diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 4fb8351e..e321576e 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -67,6 +67,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { userHandler := handler.NewUserHandler(userService) apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService) usageLogRepository := repository.NewUsageLogRepository(client, db) + dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db) usageService := service.NewUsageService(usageLogRepository, userRepository, client, apiKeyAuthCacheInvalidator) usageHandler := handler.NewUsageHandler(usageService, apiKeyService) redeemCodeRepository := repository.NewRedeemCodeRepository(client) @@ -76,8 +77,10 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { redeemHandler := handler.NewRedeemHandler(redeemService) subscriptionHandler := handler.NewSubscriptionHandler(subscriptionService) dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig) - dashboardService := service.NewDashboardService(usageLogRepository, dashboardStatsCache, configConfig) - dashboardHandler := admin.NewDashboardHandler(dashboardService) + timingWheelService := service.ProvideTimingWheelService() + dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig) + dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig) + dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService) accountRepository := repository.NewAccountRepository(client, db) proxyRepository := repository.NewProxyRepository(client, db) proxyExitInfoProber := repository.NewProxyExitInfoProber(configConfig) @@ -138,7 +141,6 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { billingService := service.NewBillingService(configConfig, pricingService) identityCache := repository.NewIdentityCache(redisClient) identityService := service.NewIdentityService(identityCache) - timingWheelService := service.ProvideTimingWheelService() deferredService := service.ProvideDeferredService(accountRepository, timingWheelService) gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService) geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 677d0c6e..b91a07c1 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -36,27 +36,28 @@ const ( ) type Config struct { - Server ServerConfig `mapstructure:"server"` - CORS CORSConfig `mapstructure:"cors"` - Security SecurityConfig `mapstructure:"security"` - Billing BillingConfig `mapstructure:"billing"` - Turnstile TurnstileConfig `mapstructure:"turnstile"` - Database DatabaseConfig `mapstructure:"database"` - Redis RedisConfig `mapstructure:"redis"` - JWT JWTConfig `mapstructure:"jwt"` - LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"` - Default DefaultConfig `mapstructure:"default"` - RateLimit RateLimitConfig `mapstructure:"rate_limit"` - Pricing PricingConfig `mapstructure:"pricing"` - Gateway GatewayConfig `mapstructure:"gateway"` - APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"` - Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"` - Concurrency ConcurrencyConfig `mapstructure:"concurrency"` - TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"` - RunMode string `mapstructure:"run_mode" yaml:"run_mode"` - Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC" - Gemini GeminiConfig `mapstructure:"gemini"` - Update UpdateConfig `mapstructure:"update"` + Server ServerConfig `mapstructure:"server"` + CORS CORSConfig `mapstructure:"cors"` + Security SecurityConfig `mapstructure:"security"` + Billing BillingConfig `mapstructure:"billing"` + Turnstile TurnstileConfig `mapstructure:"turnstile"` + Database DatabaseConfig `mapstructure:"database"` + Redis RedisConfig `mapstructure:"redis"` + JWT JWTConfig `mapstructure:"jwt"` + LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"` + Default DefaultConfig `mapstructure:"default"` + RateLimit RateLimitConfig `mapstructure:"rate_limit"` + Pricing PricingConfig `mapstructure:"pricing"` + Gateway GatewayConfig `mapstructure:"gateway"` + APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"` + Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"` + DashboardAgg DashboardAggregationConfig `mapstructure:"dashboard_aggregation"` + Concurrency ConcurrencyConfig `mapstructure:"concurrency"` + TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"` + RunMode string `mapstructure:"run_mode" yaml:"run_mode"` + Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC" + Gemini GeminiConfig `mapstructure:"gemini"` + Update UpdateConfig `mapstructure:"update"` } // UpdateConfig 在线更新相关配置 @@ -387,6 +388,29 @@ type DashboardCacheConfig struct { StatsRefreshTimeoutSeconds int `mapstructure:"stats_refresh_timeout_seconds"` } +// DashboardAggregationConfig 仪表盘预聚合配置 +type DashboardAggregationConfig struct { + // Enabled: 是否启用预聚合作业 + Enabled bool `mapstructure:"enabled"` + // IntervalSeconds: 聚合刷新间隔(秒) + IntervalSeconds int `mapstructure:"interval_seconds"` + // LookbackSeconds: 回看窗口(秒) + LookbackSeconds int `mapstructure:"lookback_seconds"` + // BackfillEnabled: 是否允许全量回填 + BackfillEnabled bool `mapstructure:"backfill_enabled"` + // Retention: 各表保留窗口(天) + Retention DashboardAggregationRetentionConfig `mapstructure:"retention"` + // RecomputeDays: 启动时重算最近 N 天 + RecomputeDays int `mapstructure:"recompute_days"` +} + +// DashboardAggregationRetentionConfig 预聚合保留窗口 +type DashboardAggregationRetentionConfig struct { + UsageLogsDays int `mapstructure:"usage_logs_days"` + HourlyDays int `mapstructure:"hourly_days"` + DailyDays int `mapstructure:"daily_days"` +} + func NormalizeRunMode(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) switch normalized { @@ -697,6 +721,16 @@ func setDefaults() { viper.SetDefault("dashboard_cache.stats_ttl_seconds", 30) viper.SetDefault("dashboard_cache.stats_refresh_timeout_seconds", 30) + // Dashboard aggregation + viper.SetDefault("dashboard_aggregation.enabled", true) + viper.SetDefault("dashboard_aggregation.interval_seconds", 60) + viper.SetDefault("dashboard_aggregation.lookback_seconds", 120) + viper.SetDefault("dashboard_aggregation.backfill_enabled", false) + viper.SetDefault("dashboard_aggregation.retention.usage_logs_days", 90) + viper.SetDefault("dashboard_aggregation.retention.hourly_days", 180) + viper.SetDefault("dashboard_aggregation.retention.daily_days", 730) + viper.SetDefault("dashboard_aggregation.recompute_days", 2) + // Gateway viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头,LLM高负载时可能排队较久 viper.SetDefault("gateway.log_upstream_error_body", false) @@ -879,6 +913,45 @@ func (c *Config) Validate() error { return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be non-negative") } } + if c.DashboardAgg.Enabled { + if c.DashboardAgg.IntervalSeconds <= 0 { + return fmt.Errorf("dashboard_aggregation.interval_seconds must be positive") + } + if c.DashboardAgg.LookbackSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") + } + if c.DashboardAgg.Retention.UsageLogsDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be positive") + } + if c.DashboardAgg.Retention.HourlyDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be positive") + } + if c.DashboardAgg.Retention.DailyDays <= 0 { + return fmt.Errorf("dashboard_aggregation.retention.daily_days must be positive") + } + if c.DashboardAgg.RecomputeDays < 0 { + return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative") + } + } else { + if c.DashboardAgg.IntervalSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.interval_seconds must be non-negative") + } + if c.DashboardAgg.LookbackSeconds < 0 { + return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") + } + if c.DashboardAgg.Retention.UsageLogsDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be non-negative") + } + if c.DashboardAgg.Retention.HourlyDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be non-negative") + } + if c.DashboardAgg.Retention.DailyDays < 0 { + return fmt.Errorf("dashboard_aggregation.retention.daily_days must be non-negative") + } + if c.DashboardAgg.RecomputeDays < 0 { + return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative") + } + } if c.Gateway.MaxBodySize <= 0 { return fmt.Errorf("gateway.max_body_size must be positive") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 6cd95b1c..7fc34d64 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -205,3 +205,56 @@ func TestValidateDashboardCacheConfigDisabled(t *testing.T) { t.Fatalf("Validate() expected stats_ttl_seconds error, got: %v", err) } } + +func TestLoadDefaultDashboardAggregationConfig(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if !cfg.DashboardAgg.Enabled { + t.Fatalf("DashboardAgg.Enabled = false, want true") + } + if cfg.DashboardAgg.IntervalSeconds != 60 { + t.Fatalf("DashboardAgg.IntervalSeconds = %d, want 60", cfg.DashboardAgg.IntervalSeconds) + } + if cfg.DashboardAgg.LookbackSeconds != 120 { + t.Fatalf("DashboardAgg.LookbackSeconds = %d, want 120", cfg.DashboardAgg.LookbackSeconds) + } + if cfg.DashboardAgg.BackfillEnabled { + t.Fatalf("DashboardAgg.BackfillEnabled = true, want false") + } + if cfg.DashboardAgg.Retention.UsageLogsDays != 90 { + t.Fatalf("DashboardAgg.Retention.UsageLogsDays = %d, want 90", cfg.DashboardAgg.Retention.UsageLogsDays) + } + if cfg.DashboardAgg.Retention.HourlyDays != 180 { + t.Fatalf("DashboardAgg.Retention.HourlyDays = %d, want 180", cfg.DashboardAgg.Retention.HourlyDays) + } + if cfg.DashboardAgg.Retention.DailyDays != 730 { + t.Fatalf("DashboardAgg.Retention.DailyDays = %d, want 730", cfg.DashboardAgg.Retention.DailyDays) + } + if cfg.DashboardAgg.RecomputeDays != 2 { + t.Fatalf("DashboardAgg.RecomputeDays = %d, want 2", cfg.DashboardAgg.RecomputeDays) + } +} + +func TestValidateDashboardAggregationConfigDisabled(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.DashboardAgg.Enabled = false + cfg.DashboardAgg.IntervalSeconds = -1 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for negative dashboard_aggregation.interval_seconds, got nil") + } + if !strings.Contains(err.Error(), "dashboard_aggregation.interval_seconds") { + t.Fatalf("Validate() expected interval_seconds error, got: %v", err) + } +} diff --git a/backend/internal/handler/admin/dashboard_handler.go b/backend/internal/handler/admin/dashboard_handler.go index 30cdd914..560d1075 100644 --- a/backend/internal/handler/admin/dashboard_handler.go +++ b/backend/internal/handler/admin/dashboard_handler.go @@ -1,6 +1,7 @@ package admin import ( + "errors" "strconv" "time" @@ -13,15 +14,17 @@ import ( // DashboardHandler handles admin dashboard statistics type DashboardHandler struct { - dashboardService *service.DashboardService - startTime time.Time // Server start time for uptime calculation + dashboardService *service.DashboardService + aggregationService *service.DashboardAggregationService + startTime time.Time // Server start time for uptime calculation } // NewDashboardHandler creates a new admin dashboard handler -func NewDashboardHandler(dashboardService *service.DashboardService) *DashboardHandler { +func NewDashboardHandler(dashboardService *service.DashboardService, aggregationService *service.DashboardAggregationService) *DashboardHandler { return &DashboardHandler{ - dashboardService: dashboardService, - startTime: time.Now(), + dashboardService: dashboardService, + aggregationService: aggregationService, + startTime: time.Now(), } } @@ -114,6 +117,54 @@ func (h *DashboardHandler) GetStats(c *gin.Context) { // 性能指标 "rpm": stats.Rpm, "tpm": stats.Tpm, + + // 预聚合新鲜度 + "hourly_active_users": stats.HourlyActiveUsers, + "stats_updated_at": stats.StatsUpdatedAt, + "stats_stale": stats.StatsStale, + }) +} + +type DashboardAggregationBackfillRequest struct { + Start string `json:"start"` + End string `json:"end"` +} + +// BackfillAggregation handles triggering aggregation backfill +// POST /api/v1/admin/dashboard/aggregation/backfill +func (h *DashboardHandler) BackfillAggregation(c *gin.Context) { + if h.aggregationService == nil { + response.InternalError(c, "Aggregation service not available") + return + } + + var req DashboardAggregationBackfillRequest + if err := c.ShouldBindJSON(&req); err != nil { + response.BadRequest(c, "Invalid request body") + return + } + start, err := time.Parse(time.RFC3339, req.Start) + if err != nil { + response.BadRequest(c, "Invalid start time") + return + } + end, err := time.Parse(time.RFC3339, req.End) + if err != nil { + response.BadRequest(c, "Invalid end time") + return + } + + if err := h.aggregationService.TriggerBackfill(start, end); err != nil { + if errors.Is(err, service.ErrDashboardBackfillDisabled) { + response.Forbidden(c, "Backfill is disabled") + return + } + response.InternalError(c, "Failed to trigger backfill") + return + } + + response.Success(c, gin.H{ + "status": "accepted", }) } diff --git a/backend/internal/pkg/usagestats/usage_log_types.go b/backend/internal/pkg/usagestats/usage_log_types.go index 39314602..3952785b 100644 --- a/backend/internal/pkg/usagestats/usage_log_types.go +++ b/backend/internal/pkg/usagestats/usage_log_types.go @@ -9,6 +9,12 @@ type DashboardStats struct { TotalUsers int64 `json:"total_users"` TodayNewUsers int64 `json:"today_new_users"` // 今日新增用户数 ActiveUsers int64 `json:"active_users"` // 今日有请求的用户数 + // 小时活跃用户数(UTC 当前小时) + HourlyActiveUsers int64 `json:"hourly_active_users"` + + // 预聚合新鲜度 + StatsUpdatedAt string `json:"stats_updated_at"` + StatsStale bool `json:"stats_stale"` // API Key 统计 TotalAPIKeys int64 `json:"total_api_keys"` diff --git a/backend/internal/repository/dashboard_aggregation_repo.go b/backend/internal/repository/dashboard_aggregation_repo.go new file mode 100644 index 00000000..dbba5cdb --- /dev/null +++ b/backend/internal/repository/dashboard_aggregation_repo.go @@ -0,0 +1,360 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/lib/pq" +) + +type dashboardAggregationRepository struct { + sql sqlExecutor +} + +// NewDashboardAggregationRepository 创建仪表盘预聚合仓储。 +func NewDashboardAggregationRepository(sqlDB *sql.DB) service.DashboardAggregationRepository { + return newDashboardAggregationRepositoryWithSQL(sqlDB) +} + +func newDashboardAggregationRepositoryWithSQL(sqlq sqlExecutor) *dashboardAggregationRepository { + return &dashboardAggregationRepository{sql: sqlq} +} + +func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error { + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return nil + } + + hourStart := startUTC.Truncate(time.Hour) + hourEnd := endUTC.Truncate(time.Hour) + if endUTC.After(hourEnd) { + hourEnd = hourEnd.Add(time.Hour) + } + + dayStart := truncateToDayUTC(startUTC) + dayEnd := truncateToDayUTC(endUTC) + if endUTC.After(dayEnd) { + dayEnd = dayEnd.Add(24 * time.Hour) + } + + if err := r.insertHourlyActiveUsers(ctx, startUTC, endUTC); err != nil { + return err + } + if err := r.insertDailyActiveUsers(ctx, startUTC, endUTC); err != nil { + return err + } + if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil { + return err + } + if err := r.upsertDailyAggregates(ctx, dayStart, dayEnd); err != nil { + return err + } + return nil +} + +func (r *dashboardAggregationRepository) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + var ts time.Time + query := "SELECT last_aggregated_at FROM usage_dashboard_aggregation_watermark WHERE id = 1" + if err := scanSingleRow(ctx, r.sql, query, nil, &ts); err != nil { + if err == sql.ErrNoRows { + return time.Unix(0, 0).UTC(), nil + } + return time.Time{}, err + } + return ts.UTC(), nil +} + +func (r *dashboardAggregationRepository) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + query := ` + INSERT INTO usage_dashboard_aggregation_watermark (id, last_aggregated_at, updated_at) + VALUES (1, $1, NOW()) + ON CONFLICT (id) + DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at, updated_at = EXCLUDED.updated_at + ` + _, err := r.sql.ExecContext(ctx, query, aggregatedAt.UTC()) + return err +} + +func (r *dashboardAggregationRepository) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + _, err := r.sql.ExecContext(ctx, ` + DELETE FROM usage_dashboard_hourly WHERE bucket_start < $1; + DELETE FROM usage_dashboard_hourly_users WHERE bucket_start < $1; + DELETE FROM usage_dashboard_daily WHERE bucket_date < $2::date; + DELETE FROM usage_dashboard_daily_users WHERE bucket_date < $2::date; + `, hourlyCutoff.UTC(), dailyCutoff.UTC()) + return err +} + +func (r *dashboardAggregationRepository) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + isPartitioned, err := r.isUsageLogsPartitioned(ctx) + if err != nil { + return err + } + if isPartitioned { + return r.dropUsageLogsPartitions(ctx, cutoff) + } + _, err = r.sql.ExecContext(ctx, "DELETE FROM usage_logs WHERE created_at < $1", cutoff.UTC()) + return err +} + +func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + isPartitioned, err := r.isUsageLogsPartitioned(ctx) + if err != nil || !isPartitioned { + return err + } + monthStart := truncateToMonthUTC(now) + prevMonth := monthStart.AddDate(0, -1, 0) + nextMonth := monthStart.AddDate(0, 1, 0) + + for _, m := range []time.Time{prevMonth, monthStart, nextMonth} { + if err := r.createUsageLogsPartition(ctx, m); err != nil { + return err + } + } + return nil +} + +func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error { + query := ` + INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id) + SELECT DISTINCT + date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + user_id + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ON CONFLICT DO NOTHING + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error { + query := ` + INSERT INTO usage_dashboard_daily_users (bucket_date, user_id) + SELECT DISTINCT + (created_at AT TIME ZONE 'UTC')::date AS bucket_date, + user_id + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ON CONFLICT DO NOTHING + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error { + query := ` + WITH hourly AS ( + SELECT + date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + COUNT(*) AS total_requests, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens, + COALESCE(SUM(total_cost), 0) AS total_cost, + COALESCE(SUM(actual_cost), 0) AS actual_cost, + COALESCE(SUM(COALESCE(duration_ms, 0)), 0) AS total_duration_ms + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + GROUP BY 1 + ), + user_counts AS ( + SELECT bucket_start, COUNT(*) AS active_users + FROM usage_dashboard_hourly_users + WHERE bucket_start >= $1 AND bucket_start < $2 + GROUP BY bucket_start + ) + INSERT INTO usage_dashboard_hourly ( + bucket_start, + total_requests, + input_tokens, + output_tokens, + cache_creation_tokens, + cache_read_tokens, + total_cost, + actual_cost, + total_duration_ms, + active_users, + computed_at + ) + SELECT + hourly.bucket_start, + hourly.total_requests, + hourly.input_tokens, + hourly.output_tokens, + hourly.cache_creation_tokens, + hourly.cache_read_tokens, + hourly.total_cost, + hourly.actual_cost, + hourly.total_duration_ms, + COALESCE(user_counts.active_users, 0) AS active_users, + NOW() + FROM hourly + LEFT JOIN user_counts ON user_counts.bucket_start = hourly.bucket_start + ON CONFLICT (bucket_start) + DO UPDATE SET + total_requests = EXCLUDED.total_requests, + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_creation_tokens = EXCLUDED.cache_creation_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + total_cost = EXCLUDED.total_cost, + actual_cost = EXCLUDED.actual_cost, + total_duration_ms = EXCLUDED.total_duration_ms, + active_users = EXCLUDED.active_users, + computed_at = EXCLUDED.computed_at + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error { + query := ` + WITH daily AS ( + SELECT + (bucket_start AT TIME ZONE 'UTC')::date AS bucket_date, + COALESCE(SUM(total_requests), 0) AS total_requests, + COALESCE(SUM(input_tokens), 0) AS input_tokens, + COALESCE(SUM(output_tokens), 0) AS output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens, + COALESCE(SUM(total_cost), 0) AS total_cost, + COALESCE(SUM(actual_cost), 0) AS actual_cost, + COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms + FROM usage_dashboard_hourly + WHERE bucket_start >= $1 AND bucket_start < $2 + GROUP BY (bucket_start AT TIME ZONE 'UTC')::date + ), + user_counts AS ( + SELECT bucket_date, COUNT(*) AS active_users + FROM usage_dashboard_daily_users + WHERE bucket_date >= $3::date AND bucket_date < $4::date + GROUP BY bucket_date + ) + INSERT INTO usage_dashboard_daily ( + bucket_date, + total_requests, + input_tokens, + output_tokens, + cache_creation_tokens, + cache_read_tokens, + total_cost, + actual_cost, + total_duration_ms, + active_users, + computed_at + ) + SELECT + daily.bucket_date, + daily.total_requests, + daily.input_tokens, + daily.output_tokens, + daily.cache_creation_tokens, + daily.cache_read_tokens, + daily.total_cost, + daily.actual_cost, + daily.total_duration_ms, + COALESCE(user_counts.active_users, 0) AS active_users, + NOW() + FROM daily + LEFT JOIN user_counts ON user_counts.bucket_date = daily.bucket_date + ON CONFLICT (bucket_date) + DO UPDATE SET + total_requests = EXCLUDED.total_requests, + input_tokens = EXCLUDED.input_tokens, + output_tokens = EXCLUDED.output_tokens, + cache_creation_tokens = EXCLUDED.cache_creation_tokens, + cache_read_tokens = EXCLUDED.cache_read_tokens, + total_cost = EXCLUDED.total_cost, + actual_cost = EXCLUDED.actual_cost, + total_duration_ms = EXCLUDED.total_duration_ms, + active_users = EXCLUDED.active_users, + computed_at = EXCLUDED.computed_at + ` + _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC()) + return err +} + +func (r *dashboardAggregationRepository) isUsageLogsPartitioned(ctx context.Context) (bool, error) { + query := ` + SELECT EXISTS( + SELECT 1 + FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + WHERE c.relname = 'usage_logs' + ) + ` + var partitioned bool + if err := scanSingleRow(ctx, r.sql, query, nil, &partitioned); err != nil { + return false, err + } + return partitioned, nil +} + +func (r *dashboardAggregationRepository) dropUsageLogsPartitions(ctx context.Context, cutoff time.Time) error { + rows, err := r.sql.QueryContext(ctx, ` + SELECT c.relname + FROM pg_inherits + JOIN pg_class c ON c.oid = pg_inherits.inhrelid + JOIN pg_class p ON p.oid = pg_inherits.inhparent + WHERE p.relname = 'usage_logs' + `) + if err != nil { + return err + } + defer rows.Close() + + cutoffMonth := truncateToMonthUTC(cutoff) + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + return err + } + if !strings.HasPrefix(name, "usage_logs_") { + continue + } + suffix := strings.TrimPrefix(name, "usage_logs_") + month, err := time.Parse("200601", suffix) + if err != nil { + continue + } + month = month.UTC() + if month.Before(cutoffMonth) { + if _, err := r.sql.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(name))); err != nil { + return err + } + } + } + return rows.Err() +} + +func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Context, month time.Time) error { + monthStart := truncateToMonthUTC(month) + nextMonth := monthStart.AddDate(0, 1, 0) + name := fmt.Sprintf("usage_logs_%s", monthStart.Format("200601")) + query := fmt.Sprintf( + "CREATE TABLE IF NOT EXISTS %s PARTITION OF usage_logs FOR VALUES FROM (%s) TO (%s)", + pq.QuoteIdentifier(name), + pq.QuoteLiteral(monthStart.Format("2006-01-02")), + pq.QuoteLiteral(nextMonth.Format("2006-01-02")), + ) + _, err := r.sql.ExecContext(ctx, query) + return err +} + +func truncateToDayUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} + +func truncateToMonthUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC) +} diff --git a/backend/internal/repository/usage_log_repo.go b/backend/internal/repository/usage_log_repo.go index 6ed8910e..be2a6d18 100644 --- a/backend/internal/repository/usage_log_repo.go +++ b/backend/internal/repository/usage_log_repo.go @@ -270,15 +270,14 @@ type DashboardStats = usagestats.DashboardStats func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) { var stats DashboardStats - today := timezone.Today() now := time.Now() + todayUTC := truncateToDayUTC(now) // 合并用户统计查询 userStatsQuery := ` SELECT COUNT(*) as total_users, - COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users, - (SELECT COUNT(DISTINCT user_id) FROM usage_logs WHERE created_at >= $2) as active_users + COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users FROM users WHERE deleted_at IS NULL ` @@ -286,10 +285,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS ctx, r.sql, userStatsQuery, - []any{today, today}, + []any{todayUTC}, &stats.TotalUsers, &stats.TodayNewUsers, - &stats.ActiveUsers, ); err != nil { return nil, err } @@ -341,16 +339,17 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS // 累计 Token 统计 totalStatsQuery := ` SELECT - COUNT(*) as total_requests, + COALESCE(SUM(total_requests), 0) as total_requests, COALESCE(SUM(input_tokens), 0) as total_input_tokens, COALESCE(SUM(output_tokens), 0) as total_output_tokens, COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens, COALESCE(SUM(total_cost), 0) as total_cost, COALESCE(SUM(actual_cost), 0) as total_actual_cost, - COALESCE(AVG(duration_ms), 0) as avg_duration_ms - FROM usage_logs + COALESCE(SUM(total_duration_ms), 0) as total_duration_ms + FROM usage_dashboard_daily ` + var totalDurationMs int64 if err := scanSingleRow( ctx, r.sql, @@ -363,30 +362,34 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalCacheReadTokens, &stats.TotalCost, &stats.TotalActualCost, - &stats.AverageDurationMs, + &totalDurationMs, ); err != nil { return nil, err } stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens + if stats.TotalRequests > 0 { + stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) + } // 今日 Token 统计 todayStatsQuery := ` SELECT - COUNT(*) as today_requests, - COALESCE(SUM(input_tokens), 0) as today_input_tokens, - COALESCE(SUM(output_tokens), 0) as today_output_tokens, - COALESCE(SUM(cache_creation_tokens), 0) as today_cache_creation_tokens, - COALESCE(SUM(cache_read_tokens), 0) as today_cache_read_tokens, - COALESCE(SUM(total_cost), 0) as today_cost, - COALESCE(SUM(actual_cost), 0) as today_actual_cost - FROM usage_logs - WHERE created_at >= $1 + total_requests as today_requests, + input_tokens as today_input_tokens, + output_tokens as today_output_tokens, + cache_creation_tokens as today_cache_creation_tokens, + cache_read_tokens as today_cache_read_tokens, + total_cost as today_cost, + actual_cost as today_actual_cost, + active_users as active_users + FROM usage_dashboard_daily + WHERE bucket_date = $1::date ` if err := scanSingleRow( ctx, r.sql, todayStatsQuery, - []any{today}, + []any{todayUTC}, &stats.TodayRequests, &stats.TodayInputTokens, &stats.TodayOutputTokens, @@ -394,11 +397,27 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TodayCacheReadTokens, &stats.TodayCost, &stats.TodayActualCost, + &stats.ActiveUsers, ); err != nil { - return nil, err + if err != sql.ErrNoRows { + return nil, err + } } stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens + // 当前小时活跃用户 + hourlyActiveQuery := ` + SELECT active_users + FROM usage_dashboard_hourly + WHERE bucket_start = $1 + ` + hourStart := now.UTC().Truncate(time.Hour) + if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil { + if err != sql.ErrNoRows { + return nil, err + } + } + // 性能指标:RPM 和 TPM(最近1分钟,全局) rpm, tpm, err := r.getPerformanceStats(ctx, 0) if err != nil { diff --git a/backend/internal/repository/usage_log_repo_integration_test.go b/backend/internal/repository/usage_log_repo_integration_test.go index 7193718f..09341db3 100644 --- a/backend/internal/repository/usage_log_repo_integration_test.go +++ b/backend/internal/repository/usage_log_repo_integration_test.go @@ -198,8 +198,8 @@ func (s *UsageLogRepoSuite) TestListWithFilters() { // --- GetDashboardStats --- func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { - now := time.Now() - todayStart := timezone.Today() + now := time.Now().UTC() + todayStart := truncateToDayUTC(now) baseStats, err := s.repo.GetDashboardStats(s.ctx) s.Require().NoError(err, "GetDashboardStats base") @@ -273,6 +273,11 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { _, err = s.repo.Create(s.ctx, logPerf) s.Require().NoError(err, "Create logPerf") + aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx) + aggStart := todayStart.Add(-2 * time.Hour) + aggEnd := now.Add(2 * time.Minute) + s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd), "AggregateRange") + stats, err := s.repo.GetDashboardStats(s.ctx) s.Require().NoError(err, "GetDashboardStats") @@ -333,6 +338,151 @@ func (s *UsageLogRepoSuite) TestGetAccountTodayStats() { s.Require().Equal(int64(30), stats.Tokens) } +func (s *UsageLogRepoSuite) TestDashboardAggregationConsistency() { + now := time.Now().UTC().Truncate(time.Second) + hour1 := now.Add(-90 * time.Minute).Truncate(time.Hour) + hour2 := now.Add(-30 * time.Minute).Truncate(time.Hour) + dayStart := truncateToDayUTC(now) + + user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u1@test.com"}) + user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u2@test.com"}) + apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-agg-1", Name: "k1"}) + apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-agg-2", Name: "k2"}) + account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-agg"}) + + d1, d2, d3 := 100, 200, 150 + log1 := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 10, + OutputTokens: 20, + CacheCreationTokens: 2, + CacheReadTokens: 1, + TotalCost: 1.0, + ActualCost: 0.9, + DurationMs: &d1, + CreatedAt: hour1.Add(5 * time.Minute), + } + _, err := s.repo.Create(s.ctx, log1) + s.Require().NoError(err) + + log2 := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 5, + OutputTokens: 5, + TotalCost: 0.5, + ActualCost: 0.5, + DurationMs: &d2, + CreatedAt: hour1.Add(20 * time.Minute), + } + _, err = s.repo.Create(s.ctx, log2) + s.Require().NoError(err) + + log3 := &service.UsageLog{ + UserID: user2.ID, + APIKeyID: apiKey2.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 7, + OutputTokens: 8, + TotalCost: 0.7, + ActualCost: 0.7, + DurationMs: &d3, + CreatedAt: hour2.Add(10 * time.Minute), + } + _, err = s.repo.Create(s.ctx, log3) + s.Require().NoError(err) + + aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx) + aggStart := hour1.Add(-5 * time.Minute) + aggEnd := now.Add(5 * time.Minute) + s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd)) + + type hourlyRow struct { + totalRequests int64 + inputTokens int64 + outputTokens int64 + cacheCreationTokens int64 + cacheReadTokens int64 + totalCost float64 + actualCost float64 + totalDurationMs int64 + activeUsers int64 + } + fetchHourly := func(bucketStart time.Time) hourlyRow { + var row hourlyRow + err := scanSingleRow(s.ctx, s.tx, ` + SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, + total_cost, actual_cost, total_duration_ms, active_users + FROM usage_dashboard_hourly + WHERE bucket_start = $1 + `, []any{bucketStart}, &row.totalRequests, &row.inputTokens, &row.outputTokens, + &row.cacheCreationTokens, &row.cacheReadTokens, &row.totalCost, &row.actualCost, + &row.totalDurationMs, &row.activeUsers, + ) + s.Require().NoError(err) + return row + } + + hour1Row := fetchHourly(hour1) + s.Require().Equal(int64(2), hour1Row.totalRequests) + s.Require().Equal(int64(15), hour1Row.inputTokens) + s.Require().Equal(int64(25), hour1Row.outputTokens) + s.Require().Equal(int64(2), hour1Row.cacheCreationTokens) + s.Require().Equal(int64(1), hour1Row.cacheReadTokens) + s.Require().Equal(1.5, hour1Row.totalCost) + s.Require().Equal(1.4, hour1Row.actualCost) + s.Require().Equal(int64(300), hour1Row.totalDurationMs) + s.Require().Equal(int64(1), hour1Row.activeUsers) + + hour2Row := fetchHourly(hour2) + s.Require().Equal(int64(1), hour2Row.totalRequests) + s.Require().Equal(int64(7), hour2Row.inputTokens) + s.Require().Equal(int64(8), hour2Row.outputTokens) + s.Require().Equal(int64(0), hour2Row.cacheCreationTokens) + s.Require().Equal(int64(0), hour2Row.cacheReadTokens) + s.Require().Equal(0.7, hour2Row.totalCost) + s.Require().Equal(0.7, hour2Row.actualCost) + s.Require().Equal(int64(150), hour2Row.totalDurationMs) + s.Require().Equal(int64(1), hour2Row.activeUsers) + + var daily struct { + totalRequests int64 + inputTokens int64 + outputTokens int64 + cacheCreationTokens int64 + cacheReadTokens int64 + totalCost float64 + actualCost float64 + totalDurationMs int64 + activeUsers int64 + } + err = scanSingleRow(s.ctx, s.tx, ` + SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, + total_cost, actual_cost, total_duration_ms, active_users + FROM usage_dashboard_daily + WHERE bucket_date = $1::date + `, []any{dayStart}, &daily.totalRequests, &daily.inputTokens, &daily.outputTokens, + &daily.cacheCreationTokens, &daily.cacheReadTokens, &daily.totalCost, &daily.actualCost, + &daily.totalDurationMs, &daily.activeUsers, + ) + s.Require().NoError(err) + s.Require().Equal(int64(3), daily.totalRequests) + s.Require().Equal(int64(22), daily.inputTokens) + s.Require().Equal(int64(33), daily.outputTokens) + s.Require().Equal(int64(2), daily.cacheCreationTokens) + s.Require().Equal(int64(1), daily.cacheReadTokens) + s.Require().Equal(2.2, daily.totalCost) + s.Require().Equal(2.1, daily.actualCost) + s.Require().Equal(int64(450), daily.totalDurationMs) + s.Require().Equal(int64(2), daily.activeUsers) +} + // --- GetBatchUserUsageStats --- func (s *UsageLogRepoSuite) TestGetBatchUserUsageStats() { diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index 1b6a7b91..8cc937bb 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -47,6 +47,7 @@ var ProviderSet = wire.NewSet( NewRedeemCodeRepository, NewPromoCodeRepository, NewUsageLogRepository, + NewDashboardAggregationRepository, NewSettingRepository, NewUserSubscriptionRepository, NewUserAttributeDefinitionRepository, diff --git a/backend/internal/server/routes/admin.go b/backend/internal/server/routes/admin.go index 6f40c491..c9c5352c 100644 --- a/backend/internal/server/routes/admin.go +++ b/backend/internal/server/routes/admin.go @@ -75,6 +75,7 @@ func registerDashboardRoutes(admin *gin.RouterGroup, h *handler.Handlers) { dashboard.GET("/users-trend", h.Admin.Dashboard.GetUserUsageTrend) dashboard.POST("/users-usage", h.Admin.Dashboard.GetBatchUsersUsage) dashboard.POST("/api-keys-usage", h.Admin.Dashboard.GetBatchAPIKeysUsage) + dashboard.POST("/aggregation/backfill", h.Admin.Dashboard.BackfillAggregation) } } diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go new file mode 100644 index 00000000..343c3240 --- /dev/null +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -0,0 +1,224 @@ +package service + +import ( + "context" + "errors" + "log" + "sync/atomic" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" +) + +const ( + defaultDashboardAggregationTimeout = 2 * time.Minute + defaultDashboardAggregationBackfillTimeout = 30 * time.Minute + dashboardAggregationRetentionInterval = 6 * time.Hour +) + +var ( + // ErrDashboardBackfillDisabled 当配置禁用回填时返回。 + ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用") +) + +// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。 +type DashboardAggregationRepository interface { + AggregateRange(ctx context.Context, start, end time.Time) error + GetAggregationWatermark(ctx context.Context) (time.Time, error) + UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error + CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error + CleanupUsageLogs(ctx context.Context, cutoff time.Time) error + EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error +} + +// DashboardAggregationService 负责定时聚合与回填。 +type DashboardAggregationService struct { + repo DashboardAggregationRepository + timingWheel *TimingWheelService + cfg config.DashboardAggregationConfig + running int32 + lastRetentionCleanup atomic.Value // time.Time +} + +// NewDashboardAggregationService 创建聚合服务。 +func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService { + var aggCfg config.DashboardAggregationConfig + if cfg != nil { + aggCfg = cfg.DashboardAgg + } + return &DashboardAggregationService{ + repo: repo, + timingWheel: timingWheel, + cfg: aggCfg, + } +} + +// Start 启动定时聚合作业(重启生效配置)。 +func (s *DashboardAggregationService) Start() { + if s == nil || s.repo == nil || s.timingWheel == nil { + return + } + if !s.cfg.Enabled { + log.Printf("[DashboardAggregation] 聚合作业已禁用") + return + } + + interval := time.Duration(s.cfg.IntervalSeconds) * time.Second + if interval <= 0 { + interval = time.Minute + } + + if s.cfg.RecomputeDays > 0 { + go s.recomputeRecentDays() + } + + s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() { + s.runScheduledAggregation() + }) + log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds) +} + +// TriggerBackfill 触发回填(异步)。 +func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error { + if s == nil || s.repo == nil { + return errors.New("聚合服务未初始化") + } + if !s.cfg.BackfillEnabled { + log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false") + return ErrDashboardBackfillDisabled + } + if !end.After(start) { + return errors.New("回填时间范围无效") + } + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) + defer cancel() + if err := s.backfillRange(ctx, start, end); err != nil { + log.Printf("[DashboardAggregation] 回填失败: %v", err) + } + }() + return nil +} + +func (s *DashboardAggregationService) recomputeRecentDays() { + days := s.cfg.RecomputeDays + if days <= 0 { + return + } + now := time.Now().UTC() + start := now.AddDate(0, 0, -days) + + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) + defer cancel() + if err := s.backfillRange(ctx, start, now); err != nil { + log.Printf("[DashboardAggregation] 启动重算失败: %v", err) + return + } +} + +func (s *DashboardAggregationService) runScheduledAggregation() { + if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { + return + } + defer atomic.StoreInt32(&s.running, 0) + + ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout) + defer cancel() + + now := time.Now().UTC() + last, err := s.repo.GetAggregationWatermark(ctx) + if err != nil { + log.Printf("[DashboardAggregation] 读取水位失败: %v", err) + last = time.Unix(0, 0).UTC() + } + + lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second + start := last.Add(-lookback) + epoch := time.Unix(0, 0).UTC() + if !last.After(epoch) { + start = now.Add(-lookback) + } + if start.After(now) { + start = now.Add(-lookback) + } + + if err := s.aggregateRange(ctx, start, now); err != nil { + log.Printf("[DashboardAggregation] 聚合失败: %v", err) + return + } + + if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil { + log.Printf("[DashboardAggregation] 更新水位失败: %v", err) + } + + s.maybeCleanupRetention(ctx, now) +} + +func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error { + if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { + return errors.New("聚合作业正在运行") + } + defer atomic.StoreInt32(&s.running, 0) + + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return errors.New("回填时间范围无效") + } + + cursor := truncateToDayUTC(startUTC) + for cursor.Before(endUTC) { + windowEnd := cursor.Add(24 * time.Hour) + if windowEnd.After(endUTC) { + windowEnd = endUTC + } + if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil { + return err + } + cursor = windowEnd + } + + if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil { + log.Printf("[DashboardAggregation] 更新水位失败: %v", err) + } + + s.maybeCleanupRetention(ctx, endUTC) + return nil +} + +func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error { + if !end.After(start) { + return nil + } + if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil { + log.Printf("[DashboardAggregation] 分区检查失败: %v", err) + } + return s.repo.AggregateRange(ctx, start, end) +} + +func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) { + lastAny := s.lastRetentionCleanup.Load() + if lastAny != nil { + if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval { + return + } + } + s.lastRetentionCleanup.Store(now) + + hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays) + dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays) + usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays) + + if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil { + log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err) + } + if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil { + log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err) + } +} + +func truncateToDayUTC(t time.Time) time.Time { + t = t.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC) +} diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index 468135e3..40ab877d 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -37,17 +37,24 @@ type dashboardStatsCacheEntry struct { // DashboardService provides aggregated statistics for admin dashboard. type DashboardService struct { usageRepo UsageLogRepository + aggRepo DashboardAggregationRepository cache DashboardStatsCache cacheFreshTTL time.Duration cacheTTL time.Duration refreshTimeout time.Duration refreshing int32 + aggEnabled bool + aggInterval time.Duration + aggLookback time.Duration } -func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { +func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { freshTTL := defaultDashboardStatsFreshTTL cacheTTL := defaultDashboardStatsCacheTTL refreshTimeout := defaultDashboardStatsRefreshTimeout + aggEnabled := true + aggInterval := time.Minute + aggLookback := 2 * time.Minute if cfg != nil { if !cfg.Dashboard.Enabled { cache = nil @@ -61,13 +68,24 @@ func NewDashboardService(usageRepo UsageLogRepository, cache DashboardStatsCache if cfg.Dashboard.StatsRefreshTimeoutSeconds > 0 { refreshTimeout = time.Duration(cfg.Dashboard.StatsRefreshTimeoutSeconds) * time.Second } + aggEnabled = cfg.DashboardAgg.Enabled + if cfg.DashboardAgg.IntervalSeconds > 0 { + aggInterval = time.Duration(cfg.DashboardAgg.IntervalSeconds) * time.Second + } + if cfg.DashboardAgg.LookbackSeconds > 0 { + aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second + } } return &DashboardService{ usageRepo: usageRepo, + aggRepo: aggRepo, cache: cache, cacheFreshTTL: freshTTL, cacheTTL: cacheTTL, refreshTimeout: refreshTimeout, + aggEnabled: aggEnabled, + aggInterval: aggInterval, + aggLookback: aggLookback, } } @@ -75,6 +93,7 @@ func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.D if s.cache != nil { cached, fresh, err := s.getCachedDashboardStats(ctx) if err == nil && cached != nil { + s.refreshAggregationStaleness(cached) if !fresh { s.refreshDashboardStatsAsync() } @@ -133,6 +152,7 @@ func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagesta if err != nil { return nil, err } + s.applyAggregationStatus(ctx, stats) cacheCtx, cancel := s.cacheOperationContext() defer cancel() s.saveDashboardStatsCache(cacheCtx, stats) @@ -158,6 +178,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() { log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) return } + s.applyAggregationStatus(ctx, stats) cacheCtx, cancel := s.cacheOperationContext() defer cancel() s.saveDashboardStatsCache(cacheCtx, stats) @@ -203,6 +224,61 @@ func (s *DashboardService) cacheOperationContext() (context.Context, context.Can return context.WithTimeout(context.Background(), s.refreshTimeout) } +func (s *DashboardService) applyAggregationStatus(ctx context.Context, stats *usagestats.DashboardStats) { + if stats == nil { + return + } + updatedAt := s.fetchAggregationUpdatedAt(ctx) + stats.StatsUpdatedAt = updatedAt.UTC().Format(time.RFC3339) + stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC()) +} + +func (s *DashboardService) refreshAggregationStaleness(stats *usagestats.DashboardStats) { + if stats == nil { + return + } + updatedAt := parseStatsUpdatedAt(stats.StatsUpdatedAt) + stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC()) +} + +func (s *DashboardService) fetchAggregationUpdatedAt(ctx context.Context) time.Time { + if s.aggRepo == nil { + return time.Unix(0, 0).UTC() + } + updatedAt, err := s.aggRepo.GetAggregationWatermark(ctx) + if err != nil { + log.Printf("[Dashboard] 读取聚合水位失败: %v", err) + return time.Unix(0, 0).UTC() + } + if updatedAt.IsZero() { + return time.Unix(0, 0).UTC() + } + return updatedAt.UTC() +} + +func (s *DashboardService) isAggregationStale(updatedAt, now time.Time) bool { + if !s.aggEnabled { + return true + } + epoch := time.Unix(0, 0).UTC() + if !updatedAt.After(epoch) { + return true + } + threshold := s.aggInterval + s.aggLookback + return now.Sub(updatedAt) > threshold +} + +func parseStatsUpdatedAt(raw string) time.Time { + if raw == "" { + return time.Unix(0, 0).UTC() + } + parsed, err := time.Parse(time.RFC3339, raw) + if err != nil { + return time.Unix(0, 0).UTC() + } + return parsed.UTC() +} + func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) { trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit) if err != nil { diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go index 17f46ead..c7b9c6af 100644 --- a/backend/internal/service/dashboard_service_test.go +++ b/backend/internal/service/dashboard_service_test.go @@ -74,6 +74,38 @@ func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error { return nil } +type dashboardAggregationRepoStub struct { + watermark time.Time + err error +} + +func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + if s.err != nil { + return time.Time{}, s.err + } + return s.watermark, nil +} + +func (s *dashboardAggregationRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + return nil +} + func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry { t.Helper() c.lastSetMu.Lock() @@ -88,7 +120,9 @@ func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntr func TestDashboardService_CacheHitFresh(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 10, + TotalUsers: 10, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } entry := dashboardStatsCacheEntry{ Stats: stats, @@ -105,8 +139,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { repo := &usageRepoStub{ stats: &usagestats.DashboardStats{TotalUsers: 99}, } + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -118,7 +153,9 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 7, + TotalUsers: 7, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } cache := &dashboardCacheStub{ get: func(ctx context.Context) (string, error) { @@ -126,8 +163,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { }, } repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -142,7 +180,9 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { stats := &usagestats.DashboardStats{ - TotalUsers: 3, + TotalUsers: 3, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } cache := &dashboardCacheStub{ get: func(ctx context.Context) (string, error) { @@ -150,8 +190,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { }, } repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -163,7 +204,9 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { staleStats := &usagestats.DashboardStats{ - TotalUsers: 11, + TotalUsers: 11, + StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339), + StatsStale: true, } entry := dashboardStatsCacheEntry{ Stats: staleStats, @@ -182,8 +225,9 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { stats: &usagestats.DashboardStats{TotalUsers: 22}, onCall: refreshCh, } + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -207,8 +251,9 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) { } stats := &usagestats.DashboardStats{TotalUsers: 9} repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) require.NoError(t, err) @@ -224,10 +269,45 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) { }, } repo := &usageRepoStub{err: errors.New("db down")} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} - svc := NewDashboardService(repo, cache, cfg) + svc := NewDashboardService(repo, aggRepo, cache, cfg) _, err := svc.GetDashboardStats(context.Background()) require.Error(t, err) require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls)) } + +func TestDashboardService_StatsUpdatedAtEpochWhenMissing(t *testing.T) { + stats := &usagestats.DashboardStats{} + repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} + cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} + svc := NewDashboardService(repo, aggRepo, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, "1970-01-01T00:00:00Z", got.StatsUpdatedAt) + require.True(t, got.StatsStale) +} + +func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) { + aggNow := time.Now().UTC().Truncate(time.Second) + stats := &usagestats.DashboardStats{} + repo := &usageRepoStub{stats: stats} + aggRepo := &dashboardAggregationRepoStub{watermark: aggNow} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + IntervalSeconds: 60, + LookbackSeconds: 120, + }, + } + svc := NewDashboardService(repo, aggRepo, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt) + require.False(t, got.StatsStale) +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 54c37b54..f1074e9d 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -47,6 +47,13 @@ func ProvideTokenRefreshService( return svc } +// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务 +func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService { + svc := NewDashboardAggregationService(repo, timingWheel, cfg) + svc.Start() + return svc +} + // ProvideAccountExpiryService creates and starts AccountExpiryService. func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService { svc := NewAccountExpiryService(accountRepo, time.Minute) @@ -126,6 +133,7 @@ var ProviderSet = wire.NewSet( ProvideTokenRefreshService, ProvideAccountExpiryService, ProvideTimingWheelService, + ProvideDashboardAggregationService, ProvideDeferredService, NewAntigravityQuotaFetcher, NewUserAttributeService, diff --git a/backend/migrations/034_usage_dashboard_aggregation_tables.sql b/backend/migrations/034_usage_dashboard_aggregation_tables.sql new file mode 100644 index 00000000..64b383d4 --- /dev/null +++ b/backend/migrations/034_usage_dashboard_aggregation_tables.sql @@ -0,0 +1,77 @@ +-- Usage dashboard aggregation tables (hourly/daily) + active-user dedup + watermark. +-- These tables support Admin Dashboard statistics without full-table scans on usage_logs. + +-- Hourly aggregates (UTC buckets). +CREATE TABLE IF NOT EXISTS usage_dashboard_hourly ( + bucket_start TIMESTAMPTZ PRIMARY KEY, + total_requests BIGINT NOT NULL DEFAULT 0, + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_creation_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + total_duration_ms BIGINT NOT NULL DEFAULT 0, + active_users BIGINT NOT NULL DEFAULT 0, + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_bucket_start + ON usage_dashboard_hourly (bucket_start DESC); + +COMMENT ON TABLE usage_dashboard_hourly IS 'Pre-aggregated hourly usage metrics for admin dashboard (UTC buckets).'; +COMMENT ON COLUMN usage_dashboard_hourly.bucket_start IS 'UTC start timestamp of the hour bucket.'; +COMMENT ON COLUMN usage_dashboard_hourly.computed_at IS 'When the hourly row was last computed/refreshed.'; + +-- Daily aggregates (UTC dates). +CREATE TABLE IF NOT EXISTS usage_dashboard_daily ( + bucket_date DATE PRIMARY KEY, + total_requests BIGINT NOT NULL DEFAULT 0, + input_tokens BIGINT NOT NULL DEFAULT 0, + output_tokens BIGINT NOT NULL DEFAULT 0, + cache_creation_tokens BIGINT NOT NULL DEFAULT 0, + cache_read_tokens BIGINT NOT NULL DEFAULT 0, + total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0, + total_duration_ms BIGINT NOT NULL DEFAULT 0, + active_users BIGINT NOT NULL DEFAULT 0, + computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_bucket_date + ON usage_dashboard_daily (bucket_date DESC); + +COMMENT ON TABLE usage_dashboard_daily IS 'Pre-aggregated daily usage metrics for admin dashboard (UTC dates).'; +COMMENT ON COLUMN usage_dashboard_daily.bucket_date IS 'UTC date of the day bucket.'; +COMMENT ON COLUMN usage_dashboard_daily.computed_at IS 'When the daily row was last computed/refreshed.'; + +-- Hourly active user dedup table. +CREATE TABLE IF NOT EXISTS usage_dashboard_hourly_users ( + bucket_start TIMESTAMPTZ NOT NULL, + user_id BIGINT NOT NULL, + PRIMARY KEY (bucket_start, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_users_bucket_start + ON usage_dashboard_hourly_users (bucket_start); + +-- Daily active user dedup table. +CREATE TABLE IF NOT EXISTS usage_dashboard_daily_users ( + bucket_date DATE NOT NULL, + user_id BIGINT NOT NULL, + PRIMARY KEY (bucket_date, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_users_bucket_date + ON usage_dashboard_daily_users (bucket_date); + +-- Aggregation watermark table (single row). +CREATE TABLE IF NOT EXISTS usage_dashboard_aggregation_watermark ( + id INT PRIMARY KEY, + last_aggregated_at TIMESTAMPTZ NOT NULL DEFAULT TIMESTAMPTZ '1970-01-01 00:00:00+00', + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +INSERT INTO usage_dashboard_aggregation_watermark (id) +VALUES (1) +ON CONFLICT (id) DO NOTHING; diff --git a/backend/migrations/035_usage_logs_partitioning.sql b/backend/migrations/035_usage_logs_partitioning.sql new file mode 100644 index 00000000..5919b5c3 --- /dev/null +++ b/backend/migrations/035_usage_logs_partitioning.sql @@ -0,0 +1,54 @@ +-- usage_logs monthly partition bootstrap. +-- Only converts to partitioned table when usage_logs is empty. +-- Existing installations with data require a manual migration plan. + +DO $$ +DECLARE + is_partitioned BOOLEAN := FALSE; + has_data BOOLEAN := FALSE; + month_start DATE; + prev_month DATE; + next_month DATE; +BEGIN + SELECT EXISTS( + SELECT 1 + FROM pg_partitioned_table pt + JOIN pg_class c ON c.oid = pt.partrelid + WHERE c.relname = 'usage_logs' + ) INTO is_partitioned; + + IF NOT is_partitioned THEN + SELECT EXISTS(SELECT 1 FROM usage_logs LIMIT 1) INTO has_data; + IF NOT has_data THEN + EXECUTE 'ALTER TABLE usage_logs PARTITION BY RANGE (created_at)'; + is_partitioned := TRUE; + END IF; + END IF; + + IF is_partitioned THEN + month_start := date_trunc('month', now() AT TIME ZONE 'UTC')::date; + prev_month := (month_start - INTERVAL '1 month')::date; + next_month := (month_start + INTERVAL '1 month')::date; + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(prev_month, 'YYYYMM'), + prev_month, + month_start + ); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(month_start, 'YYYYMM'), + month_start, + next_month + ); + + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)', + to_char(next_month, 'YYYYMM'), + next_month, + (next_month + INTERVAL '1 month')::date + ); + END IF; +END $$; diff --git a/config.yaml b/config.yaml index ffc070a0..848421d6 100644 --- a/config.yaml +++ b/config.yaml @@ -215,6 +215,39 @@ dashboard_cache: # 异步刷新超时(秒) stats_refresh_timeout_seconds: 30 +# ============================================================================= +# Dashboard Aggregation Configuration +# 仪表盘预聚合配置(重启生效) +# ============================================================================= +dashboard_aggregation: + # Enable aggregation job + # 启用聚合作业 + enabled: true + # Refresh interval (seconds) + # 刷新间隔(秒) + interval_seconds: 60 + # Lookback window (seconds) for late-arriving data + # 回看窗口(秒),处理迟到数据 + lookback_seconds: 120 + # Allow manual backfill + # 允许手动回填 + backfill_enabled: false + # Recompute recent N days on startup + # 启动时重算最近 N 天 + recompute_days: 2 + # Retention windows (days) + # 保留窗口(天) + retention: + # Raw usage_logs retention + # 原始 usage_logs 保留天数 + usage_logs_days: 90 + # Hourly aggregation retention + # 小时聚合保留天数 + hourly_days: 180 + # Daily aggregation retention + # 日聚合保留天数 + daily_days: 730 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 7083f9e9..460606ab 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -215,6 +215,39 @@ dashboard_cache: # 异步刷新超时(秒) stats_refresh_timeout_seconds: 30 +# ============================================================================= +# Dashboard Aggregation Configuration +# 仪表盘预聚合配置(重启生效) +# ============================================================================= +dashboard_aggregation: + # Enable aggregation job + # 启用聚合作业 + enabled: true + # Refresh interval (seconds) + # 刷新间隔(秒) + interval_seconds: 60 + # Lookback window (seconds) for late-arriving data + # 回看窗口(秒),处理迟到数据 + lookback_seconds: 120 + # Allow manual backfill + # 允许手动回填 + backfill_enabled: false + # Recompute recent N days on startup + # 启动时重算最近 N 天 + recompute_days: 2 + # Retention windows (days) + # 保留窗口(天) + retention: + # Raw usage_logs retention + # 原始 usage_logs 保留天数 + usage_logs_days: 90 + # Hourly aggregation retention + # 小时聚合保留天数 + hourly_days: 180 + # Daily aggregation retention + # 日聚合保留天数 + daily_days: 730 + # ============================================================================= # Concurrency Wait Configuration # 并发等待配置 diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 3d1b17f6..98718b19 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -651,6 +651,9 @@ export interface DashboardStats { total_users: number today_new_users: number // 今日新增用户数 active_users: number // 今日有请求的用户数 + hourly_active_users: number // 当前小时活跃用户数(UTC) + stats_updated_at: string // 统计更新时间(UTC RFC3339) + stats_stale: boolean // 统计是否过期 // API Key 统计 total_api_keys: number From d78f42d2fda854e479e94e3637db2468a7d0fe5e Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 16:02:28 +0800 Subject: [PATCH 4/6] =?UTF-8?q?chore(=E6=B3=A8=E9=87=8A):=20=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E4=BB=AA=E8=A1=A8=E7=9B=98=E6=B3=A8=E9=87=8A=E4=B8=BA?= =?UTF-8?q?=E4=B8=AD=E6=96=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/service/dashboard_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index 40ab877d..d0e6e03c 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -34,7 +34,7 @@ type dashboardStatsCacheEntry struct { UpdatedAt int64 `json:"updated_at"` } -// DashboardService provides aggregated statistics for admin dashboard. +// DashboardService 提供管理员仪表盘统计服务。 type DashboardService struct { usageRepo UsageLogRepository aggRepo DashboardAggregationRepository From 5364011a5bacff1f1a00c98b2d8f473e33f5baca Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 17:21:17 +0800 Subject: [PATCH 5/6] =?UTF-8?q?fix(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E8=81=9A=E5=90=88=E6=97=B6=E9=97=B4=E6=A1=B6?= =?UTF-8?q?=E4=B8=8E=E6=B8=85=E7=90=86=E8=8A=82=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repository/dashboard_aggregation_repo.go | 11 +-- .../service/dashboard_aggregation_service.go | 22 +++-- .../dashboard_aggregation_service_test.go | 89 +++++++++++++++++++ 3 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 backend/internal/service/dashboard_aggregation_service_test.go diff --git a/backend/internal/repository/dashboard_aggregation_repo.go b/backend/internal/repository/dashboard_aggregation_repo.go index dbba5cdb..b02cde0d 100644 --- a/backend/internal/repository/dashboard_aggregation_repo.go +++ b/backend/internal/repository/dashboard_aggregation_repo.go @@ -43,10 +43,11 @@ func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, sta dayEnd = dayEnd.Add(24 * time.Hour) } - if err := r.insertHourlyActiveUsers(ctx, startUTC, endUTC); err != nil { + // 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。 + if err := r.insertHourlyActiveUsers(ctx, hourStart, hourEnd); err != nil { return err } - if err := r.insertDailyActiveUsers(ctx, startUTC, endUTC); err != nil { + if err := r.insertDailyActiveUsers(ctx, hourStart, hourEnd); err != nil { return err } if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil { @@ -138,10 +139,10 @@ func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Cont query := ` INSERT INTO usage_dashboard_daily_users (bucket_date, user_id) SELECT DISTINCT - (created_at AT TIME ZONE 'UTC')::date AS bucket_date, + (bucket_start AT TIME ZONE 'UTC')::date AS bucket_date, user_id - FROM usage_logs - WHERE created_at >= $1 AND created_at < $2 + FROM usage_dashboard_hourly_users + WHERE bucket_start >= $1 AND bucket_start < $2 ON CONFLICT DO NOTHING ` _, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC()) diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go index 343c3240..133ab018 100644 --- a/backend/internal/service/dashboard_aggregation_service.go +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -134,12 +134,12 @@ func (s *DashboardAggregationService) runScheduledAggregation() { } lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second - start := last.Add(-lookback) epoch := time.Unix(0, 0).UTC() + start := last.Add(-lookback) if !last.After(epoch) { - start = now.Add(-lookback) - } - if start.After(now) { + // 首次聚合覆盖当天,避免只统计最后一个窗口。 + start = truncateToDayUTC(now) + } else if start.After(now) { start = now.Add(-lookback) } @@ -204,17 +204,21 @@ func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, return } } - s.lastRetentionCleanup.Store(now) hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays) dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays) usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays) - if err := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff); err != nil { - log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", err) + aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff) + if aggErr != nil { + log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr) } - if err := s.repo.CleanupUsageLogs(ctx, usageCutoff); err != nil { - log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", err) + usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff) + if usageErr != nil { + log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr) + } + if aggErr == nil && usageErr == nil { + s.lastRetentionCleanup.Store(now) } } diff --git a/backend/internal/service/dashboard_aggregation_service_test.go b/backend/internal/service/dashboard_aggregation_service_test.go new file mode 100644 index 00000000..501b11d4 --- /dev/null +++ b/backend/internal/service/dashboard_aggregation_service_test.go @@ -0,0 +1,89 @@ +package service + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/stretchr/testify/require" +) + +type dashboardAggregationRepoTestStub struct { + aggregateCalls int + lastStart time.Time + lastEnd time.Time + watermark time.Time + aggregateErr error + cleanupAggregatesErr error + cleanupUsageErr error +} + +func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, start, end time.Time) error { + s.aggregateCalls++ + s.lastStart = start + s.lastEnd = end + return s.aggregateErr +} + +func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) { + return s.watermark, nil +} + +func (s *dashboardAggregationRepoTestStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error { + return nil +} + +func (s *dashboardAggregationRepoTestStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error { + return s.cleanupAggregatesErr +} + +func (s *dashboardAggregationRepoTestStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error { + return s.cleanupUsageErr +} + +func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error { + return nil +} + +func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + Enabled: true, + IntervalSeconds: 60, + LookbackSeconds: 120, + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 1, + HourlyDays: 1, + DailyDays: 1, + }, + }, + } + + svc.runScheduledAggregation() + + require.Equal(t, 1, repo.aggregateCalls) + require.False(t, repo.lastEnd.IsZero()) + require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart) +} + +func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{cleanupAggregatesErr: errors.New("清理失败")} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 1, + HourlyDays: 1, + DailyDays: 1, + }, + }, + } + + svc.maybeCleanupRetention(context.Background(), time.Now().UTC()) + + require.Nil(t, svc.lastRetentionCleanup.Load()) +} From 6271a33d0898f1e3b43bac0495ff81103f437171 Mon Sep 17 00:00:00 2001 From: yangjianbo Date: Sun, 11 Jan 2026 18:20:15 +0800 Subject: [PATCH 6/6] =?UTF-8?q?fix(=E4=BB=AA=E8=A1=A8=E7=9B=98):=20?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E7=A6=81=E7=94=A8=E8=81=9A=E5=90=88=E4=B8=8E?= =?UTF-8?q?=E5=9B=9E=E5=A1=AB=E9=99=90=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/internal/config/config.go | 12 ++ backend/internal/config/config_test.go | 22 +++ .../handler/admin/dashboard_handler.go | 4 + backend/internal/repository/usage_log_repo.go | 169 +++++++++++++++--- .../usage_log_repo_integration_test.go | 74 ++++++++ .../service/dashboard_aggregation_service.go | 18 +- .../dashboard_aggregation_service_test.go | 21 ++- backend/internal/service/dashboard_service.go | 25 ++- .../service/dashboard_service_test.go | 94 ++++++++-- config.yaml | 3 + deploy/.env.example | 27 +++ deploy/config.example.yaml | 3 + 12 files changed, 434 insertions(+), 38 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index b91a07c1..a2fbbd1d 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -398,6 +398,8 @@ type DashboardAggregationConfig struct { LookbackSeconds int `mapstructure:"lookback_seconds"` // BackfillEnabled: 是否允许全量回填 BackfillEnabled bool `mapstructure:"backfill_enabled"` + // BackfillMaxDays: 回填最大跨度(天) + BackfillMaxDays int `mapstructure:"backfill_max_days"` // Retention: 各表保留窗口(天) Retention DashboardAggregationRetentionConfig `mapstructure:"retention"` // RecomputeDays: 启动时重算最近 N 天 @@ -726,6 +728,7 @@ func setDefaults() { viper.SetDefault("dashboard_aggregation.interval_seconds", 60) viper.SetDefault("dashboard_aggregation.lookback_seconds", 120) viper.SetDefault("dashboard_aggregation.backfill_enabled", false) + viper.SetDefault("dashboard_aggregation.backfill_max_days", 31) viper.SetDefault("dashboard_aggregation.retention.usage_logs_days", 90) viper.SetDefault("dashboard_aggregation.retention.hourly_days", 180) viper.SetDefault("dashboard_aggregation.retention.daily_days", 730) @@ -920,6 +923,12 @@ func (c *Config) Validate() error { if c.DashboardAgg.LookbackSeconds < 0 { return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") } + if c.DashboardAgg.BackfillMaxDays < 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative") + } + if c.DashboardAgg.BackfillEnabled && c.DashboardAgg.BackfillMaxDays == 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be positive") + } if c.DashboardAgg.Retention.UsageLogsDays <= 0 { return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be positive") } @@ -939,6 +948,9 @@ func (c *Config) Validate() error { if c.DashboardAgg.LookbackSeconds < 0 { return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative") } + if c.DashboardAgg.BackfillMaxDays < 0 { + return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative") + } if c.DashboardAgg.Retention.UsageLogsDays < 0 { return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be non-negative") } diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index 7fc34d64..1ba6d053 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -226,6 +226,9 @@ func TestLoadDefaultDashboardAggregationConfig(t *testing.T) { if cfg.DashboardAgg.BackfillEnabled { t.Fatalf("DashboardAgg.BackfillEnabled = true, want false") } + if cfg.DashboardAgg.BackfillMaxDays != 31 { + t.Fatalf("DashboardAgg.BackfillMaxDays = %d, want 31", cfg.DashboardAgg.BackfillMaxDays) + } if cfg.DashboardAgg.Retention.UsageLogsDays != 90 { t.Fatalf("DashboardAgg.Retention.UsageLogsDays = %d, want 90", cfg.DashboardAgg.Retention.UsageLogsDays) } @@ -258,3 +261,22 @@ func TestValidateDashboardAggregationConfigDisabled(t *testing.T) { t.Fatalf("Validate() expected interval_seconds error, got: %v", err) } } + +func TestValidateDashboardAggregationBackfillMaxDays(t *testing.T) { + viper.Reset() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + cfg.DashboardAgg.BackfillEnabled = true + cfg.DashboardAgg.BackfillMaxDays = 0 + err = cfg.Validate() + if err == nil { + t.Fatalf("Validate() expected error for dashboard_aggregation.backfill_max_days, got nil") + } + if !strings.Contains(err.Error(), "dashboard_aggregation.backfill_max_days") { + t.Fatalf("Validate() expected backfill_max_days error, got: %v", err) + } +} diff --git a/backend/internal/handler/admin/dashboard_handler.go b/backend/internal/handler/admin/dashboard_handler.go index 560d1075..9b675974 100644 --- a/backend/internal/handler/admin/dashboard_handler.go +++ b/backend/internal/handler/admin/dashboard_handler.go @@ -159,6 +159,10 @@ func (h *DashboardHandler) BackfillAggregation(c *gin.Context) { response.Forbidden(c, "Backfill is disabled") return } + if errors.Is(err, service.ErrDashboardBackfillTooLarge) { + response.BadRequest(c, "Backfill range too large") + return + } response.InternalError(c, "Failed to trigger backfill") return } diff --git a/backend/internal/repository/usage_log_repo.go b/backend/internal/repository/usage_log_repo.go index be2a6d18..e483f89f 100644 --- a/backend/internal/repository/usage_log_repo.go +++ b/backend/internal/repository/usage_log_repo.go @@ -269,11 +269,56 @@ func (r *usageLogRepository) GetUserStats(ctx context.Context, userID int64, sta type DashboardStats = usagestats.DashboardStats func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) { - var stats DashboardStats - now := time.Now() + stats := &DashboardStats{} + now := time.Now().UTC() todayUTC := truncateToDayUTC(now) - // 合并用户统计查询 + if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + if err := r.fillDashboardUsageStatsAggregated(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + + rpm, tpm, err := r.getPerformanceStats(ctx, 0) + if err != nil { + return nil, err + } + stats.Rpm = rpm + stats.Tpm = tpm + + return stats, nil +} + +func (r *usageLogRepository) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*DashboardStats, error) { + startUTC := start.UTC() + endUTC := end.UTC() + if !endUTC.After(startUTC) { + return nil, errors.New("统计时间范围无效") + } + + stats := &DashboardStats{} + now := time.Now().UTC() + todayUTC := truncateToDayUTC(now) + + if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil { + return nil, err + } + if err := r.fillDashboardUsageStatsFromUsageLogs(ctx, stats, startUTC, endUTC, todayUTC, now); err != nil { + return nil, err + } + + rpm, tpm, err := r.getPerformanceStats(ctx, 0) + if err != nil { + return nil, err + } + stats.Rpm = rpm + stats.Tpm = tpm + + return stats, nil +} + +func (r *usageLogRepository) fillDashboardEntityStats(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error { userStatsQuery := ` SELECT COUNT(*) as total_users, @@ -289,10 +334,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalUsers, &stats.TodayNewUsers, ); err != nil { - return nil, err + return err } - // 合并API Key统计查询 apiKeyStatsQuery := ` SELECT COUNT(*) as total_api_keys, @@ -308,10 +352,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalAPIKeys, &stats.ActiveAPIKeys, ); err != nil { - return nil, err + return err } - // 合并账户统计查询 accountStatsQuery := ` SELECT COUNT(*) as total_accounts, @@ -333,10 +376,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.RateLimitAccounts, &stats.OverloadAccounts, ); err != nil { - return nil, err + return err } - // 累计 Token 统计 + return nil +} + +func (r *usageLogRepository) fillDashboardUsageStatsAggregated(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error { totalStatsQuery := ` SELECT COALESCE(SUM(total_requests), 0) as total_requests, @@ -364,14 +410,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.TotalActualCost, &totalDurationMs, ); err != nil { - return nil, err + return err } stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens if stats.TotalRequests > 0 { stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) } - // 今日 Token 统计 todayStatsQuery := ` SELECT total_requests as today_requests, @@ -400,12 +445,11 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS &stats.ActiveUsers, ); err != nil { if err != sql.ErrNoRows { - return nil, err + return err } } stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens - // 当前小时活跃用户 hourlyActiveQuery := ` SELECT active_users FROM usage_dashboard_hourly @@ -414,19 +458,100 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS hourStart := now.UTC().Truncate(time.Hour) if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil { if err != sql.ErrNoRows { - return nil, err + return err } } - // 性能指标:RPM 和 TPM(最近1分钟,全局) - rpm, tpm, err := r.getPerformanceStats(ctx, 0) - if err != nil { - return nil, err - } - stats.Rpm = rpm - stats.Tpm = tpm + return nil +} - return &stats, nil +func (r *usageLogRepository) fillDashboardUsageStatsFromUsageLogs(ctx context.Context, stats *DashboardStats, startUTC, endUTC, todayUTC, now time.Time) error { + totalStatsQuery := ` + SELECT + COUNT(*) as total_requests, + COALESCE(SUM(input_tokens), 0) as total_input_tokens, + COALESCE(SUM(output_tokens), 0) as total_output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens, + COALESCE(SUM(total_cost), 0) as total_cost, + COALESCE(SUM(actual_cost), 0) as total_actual_cost, + COALESCE(SUM(COALESCE(duration_ms, 0)), 0) as total_duration_ms + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + var totalDurationMs int64 + if err := scanSingleRow( + ctx, + r.sql, + totalStatsQuery, + []any{startUTC, endUTC}, + &stats.TotalRequests, + &stats.TotalInputTokens, + &stats.TotalOutputTokens, + &stats.TotalCacheCreationTokens, + &stats.TotalCacheReadTokens, + &stats.TotalCost, + &stats.TotalActualCost, + &totalDurationMs, + ); err != nil { + return err + } + stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens + if stats.TotalRequests > 0 { + stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests) + } + + todayEnd := todayUTC.Add(24 * time.Hour) + todayStatsQuery := ` + SELECT + COUNT(*) as today_requests, + COALESCE(SUM(input_tokens), 0) as today_input_tokens, + COALESCE(SUM(output_tokens), 0) as today_output_tokens, + COALESCE(SUM(cache_creation_tokens), 0) as today_cache_creation_tokens, + COALESCE(SUM(cache_read_tokens), 0) as today_cache_read_tokens, + COALESCE(SUM(total_cost), 0) as today_cost, + COALESCE(SUM(actual_cost), 0) as today_actual_cost + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow( + ctx, + r.sql, + todayStatsQuery, + []any{todayUTC, todayEnd}, + &stats.TodayRequests, + &stats.TodayInputTokens, + &stats.TodayOutputTokens, + &stats.TodayCacheCreationTokens, + &stats.TodayCacheReadTokens, + &stats.TodayCost, + &stats.TodayActualCost, + ); err != nil { + return err + } + stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens + + activeUsersQuery := ` + SELECT COUNT(DISTINCT user_id) as active_users + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow(ctx, r.sql, activeUsersQuery, []any{todayUTC, todayEnd}, &stats.ActiveUsers); err != nil { + return err + } + + hourStart := now.UTC().Truncate(time.Hour) + hourEnd := hourStart.Add(time.Hour) + hourlyActiveQuery := ` + SELECT COUNT(DISTINCT user_id) as active_users + FROM usage_logs + WHERE created_at >= $1 AND created_at < $2 + ` + if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart, hourEnd}, &stats.HourlyActiveUsers); err != nil { + return err + } + + return nil } func (r *usageLogRepository) ListByAccount(ctx context.Context, accountID int64, params pagination.PaginationParams) ([]service.UsageLog, *pagination.PaginationResult, error) { diff --git a/backend/internal/repository/usage_log_repo_integration_test.go b/backend/internal/repository/usage_log_repo_integration_test.go index 09341db3..a944ed32 100644 --- a/backend/internal/repository/usage_log_repo_integration_test.go +++ b/backend/internal/repository/usage_log_repo_integration_test.go @@ -308,6 +308,80 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() { s.Require().Equal(wantTpm, stats.Tpm, "Tpm mismatch") } +func (s *UsageLogRepoSuite) TestDashboardStatsWithRange_Fallback() { + now := time.Now().UTC() + todayStart := truncateToDayUTC(now) + rangeStart := todayStart.Add(-24 * time.Hour) + rangeEnd := now + + user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u1@test.com"}) + user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u2@test.com"}) + apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-range-1", Name: "k1"}) + apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-range-2", Name: "k2"}) + account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-range"}) + + d1, d2, d3 := 100, 200, 300 + logOutside := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 7, + OutputTokens: 8, + TotalCost: 0.8, + ActualCost: 0.7, + DurationMs: &d3, + CreatedAt: rangeStart.Add(-1 * time.Hour), + } + _, err := s.repo.Create(s.ctx, logOutside) + s.Require().NoError(err) + + logRange := &service.UsageLog{ + UserID: user1.ID, + APIKeyID: apiKey1.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 10, + OutputTokens: 20, + CacheCreationTokens: 1, + CacheReadTokens: 2, + TotalCost: 1.0, + ActualCost: 0.9, + DurationMs: &d1, + CreatedAt: rangeStart.Add(2 * time.Hour), + } + _, err = s.repo.Create(s.ctx, logRange) + s.Require().NoError(err) + + logToday := &service.UsageLog{ + UserID: user2.ID, + APIKeyID: apiKey2.ID, + AccountID: account.ID, + Model: "claude-3", + InputTokens: 5, + OutputTokens: 6, + CacheReadTokens: 1, + TotalCost: 0.5, + ActualCost: 0.5, + DurationMs: &d2, + CreatedAt: now, + } + _, err = s.repo.Create(s.ctx, logToday) + s.Require().NoError(err) + + stats, err := s.repo.GetDashboardStatsWithRange(s.ctx, rangeStart, rangeEnd) + s.Require().NoError(err) + s.Require().Equal(int64(2), stats.TotalRequests) + s.Require().Equal(int64(15), stats.TotalInputTokens) + s.Require().Equal(int64(26), stats.TotalOutputTokens) + s.Require().Equal(int64(1), stats.TotalCacheCreationTokens) + s.Require().Equal(int64(3), stats.TotalCacheReadTokens) + s.Require().Equal(int64(45), stats.TotalTokens) + s.Require().Equal(1.5, stats.TotalCost) + s.Require().Equal(1.4, stats.TotalActualCost) + s.Require().InEpsilon(150.0, stats.AverageDurationMs, 0.0001) +} + // --- GetUserDashboardStats --- func (s *UsageLogRepoSuite) TestGetUserDashboardStats() { diff --git a/backend/internal/service/dashboard_aggregation_service.go b/backend/internal/service/dashboard_aggregation_service.go index 133ab018..0d1cec57 100644 --- a/backend/internal/service/dashboard_aggregation_service.go +++ b/backend/internal/service/dashboard_aggregation_service.go @@ -19,6 +19,8 @@ const ( var ( // ErrDashboardBackfillDisabled 当配置禁用回填时返回。 ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用") + // ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。 + ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大") ) // DashboardAggregationRepository 定义仪表盘预聚合仓储接口。 @@ -76,6 +78,9 @@ func (s *DashboardAggregationService) Start() { s.runScheduledAggregation() }) log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds) + if !s.cfg.BackfillEnabled { + log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填") + } } // TriggerBackfill 触发回填(异步)。 @@ -90,6 +95,12 @@ func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) erro if !end.After(start) { return errors.New("回填时间范围无效") } + if s.cfg.BackfillMaxDays > 0 { + maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour + if end.Sub(start) > maxRange { + return ErrDashboardBackfillTooLarge + } + } go func() { ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout) @@ -137,8 +148,11 @@ func (s *DashboardAggregationService) runScheduledAggregation() { epoch := time.Unix(0, 0).UTC() start := last.Add(-lookback) if !last.After(epoch) { - // 首次聚合覆盖当天,避免只统计最后一个窗口。 - start = truncateToDayUTC(now) + retentionDays := s.cfg.Retention.UsageLogsDays + if retentionDays <= 0 { + retentionDays = 1 + } + start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays)) } else if start.After(now) { start = now.Add(-lookback) } diff --git a/backend/internal/service/dashboard_aggregation_service_test.go b/backend/internal/service/dashboard_aggregation_service_test.go index 501b11d4..2fc22105 100644 --- a/backend/internal/service/dashboard_aggregation_service_test.go +++ b/backend/internal/service/dashboard_aggregation_service_test.go @@ -47,7 +47,7 @@ func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context return nil } -func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t *testing.T) { +func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesRetentionStart(t *testing.T) { repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()} svc := &DashboardAggregationService{ repo: repo, @@ -67,7 +67,7 @@ func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesDayStart(t require.Equal(t, 1, repo.aggregateCalls) require.False(t, repo.lastEnd.IsZero()) - require.Equal(t, truncateToDayUTC(repo.lastEnd), repo.lastStart) + require.Equal(t, truncateToDayUTC(repo.lastEnd.AddDate(0, 0, -1)), repo.lastStart) } func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) { @@ -87,3 +87,20 @@ func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *te require.Nil(t, svc.lastRetentionCleanup.Load()) } + +func TestDashboardAggregationService_TriggerBackfill_TooLarge(t *testing.T) { + repo := &dashboardAggregationRepoTestStub{} + svc := &DashboardAggregationService{ + repo: repo, + cfg: config.DashboardAggregationConfig{ + BackfillEnabled: true, + BackfillMaxDays: 1, + }, + } + + start := time.Now().AddDate(0, 0, -3) + end := time.Now() + err := svc.TriggerBackfill(start, end) + require.ErrorIs(t, err, ErrDashboardBackfillTooLarge) + require.Equal(t, 0, repo.aggregateCalls) +} diff --git a/backend/internal/service/dashboard_service.go b/backend/internal/service/dashboard_service.go index d0e6e03c..69d251cb 100644 --- a/backend/internal/service/dashboard_service.go +++ b/backend/internal/service/dashboard_service.go @@ -29,6 +29,10 @@ type DashboardStatsCache interface { DeleteDashboardStats(ctx context.Context) error } +type dashboardStatsRangeFetcher interface { + GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) +} + type dashboardStatsCacheEntry struct { Stats *usagestats.DashboardStats `json:"stats"` UpdatedAt int64 `json:"updated_at"` @@ -46,6 +50,7 @@ type DashboardService struct { aggEnabled bool aggInterval time.Duration aggLookback time.Duration + aggUsageDays int } func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService { @@ -55,6 +60,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat aggEnabled := true aggInterval := time.Minute aggLookback := 2 * time.Minute + aggUsageDays := 90 if cfg != nil { if !cfg.Dashboard.Enabled { cache = nil @@ -75,6 +81,9 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat if cfg.DashboardAgg.LookbackSeconds > 0 { aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second } + if cfg.DashboardAgg.Retention.UsageLogsDays > 0 { + aggUsageDays = cfg.DashboardAgg.Retention.UsageLogsDays + } } return &DashboardService{ usageRepo: usageRepo, @@ -86,6 +95,7 @@ func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregat aggEnabled: aggEnabled, aggInterval: aggInterval, aggLookback: aggLookback, + aggUsageDays: aggUsageDays, } } @@ -148,7 +158,7 @@ func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usages } func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { - stats, err := s.usageRepo.GetDashboardStats(ctx) + stats, err := s.fetchDashboardStats(ctx) if err != nil { return nil, err } @@ -173,7 +183,7 @@ func (s *DashboardService) refreshDashboardStatsAsync() { ctx, cancel := context.WithTimeout(context.Background(), s.refreshTimeout) defer cancel() - stats, err := s.usageRepo.GetDashboardStats(ctx) + stats, err := s.fetchDashboardStats(ctx) if err != nil { log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err) return @@ -185,6 +195,17 @@ func (s *DashboardService) refreshDashboardStatsAsync() { }() } +func (s *DashboardService) fetchDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { + if !s.aggEnabled { + if fetcher, ok := s.usageRepo.(dashboardStatsRangeFetcher); ok { + now := time.Now().UTC() + start := truncateToDayUTC(now.AddDate(0, 0, -s.aggUsageDays)) + return fetcher.GetDashboardStatsWithRange(ctx, start, now) + } + } + return s.usageRepo.GetDashboardStats(ctx) +} + func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *usagestats.DashboardStats) { if s.cache == nil || stats == nil { return diff --git a/backend/internal/service/dashboard_service_test.go b/backend/internal/service/dashboard_service_test.go index c7b9c6af..db3c78c3 100644 --- a/backend/internal/service/dashboard_service_test.go +++ b/backend/internal/service/dashboard_service_test.go @@ -16,10 +16,15 @@ import ( type usageRepoStub struct { UsageLogRepository - stats *usagestats.DashboardStats - err error - calls int32 - onCall chan struct{} + stats *usagestats.DashboardStats + rangeStats *usagestats.DashboardStats + err error + rangeErr error + calls int32 + rangeCalls int32 + rangeStart time.Time + rangeEnd time.Time + onCall chan struct{} } func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) { @@ -36,6 +41,19 @@ func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.Dash return s.stats, nil } +func (s *usageRepoStub) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) { + atomic.AddInt32(&s.rangeCalls, 1) + s.rangeStart = start + s.rangeEnd = end + if s.rangeErr != nil { + return nil, s.rangeErr + } + if s.rangeStats != nil { + return s.rangeStats, nil + } + return s.stats, nil +} + type dashboardCacheStub struct { get func(ctx context.Context) (string, error) set func(ctx context.Context, data string, ttl time.Duration) error @@ -140,7 +158,12 @@ func TestDashboardService_CacheHitFresh(t *testing.T) { stats: &usagestats.DashboardStats{TotalUsers: 99}, } aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -164,7 +187,12 @@ func TestDashboardService_CacheMiss_StoresCache(t *testing.T) { } repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -191,7 +219,12 @@ func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) { } repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -226,7 +259,12 @@ func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) { onCall: refreshCh, } aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -252,7 +290,12 @@ func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) { stats := &usagestats.DashboardStats{TotalUsers: 9} repo := &usageRepoStub{stats: stats} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) got, err := svc.GetDashboardStats(context.Background()) @@ -270,7 +313,12 @@ func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) { } repo := &usageRepoStub{err: errors.New("db down")} aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()} - cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: true}} + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: true}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: true, + }, + } svc := NewDashboardService(repo, aggRepo, cache, cfg) _, err := svc.GetDashboardStats(context.Background()) @@ -311,3 +359,29 @@ func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) { require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt) require.False(t, got.StatsStale) } + +func TestDashboardService_AggDisabled_UsesUsageLogsFallback(t *testing.T) { + expected := &usagestats.DashboardStats{TotalUsers: 42} + repo := &usageRepoStub{ + rangeStats: expected, + err: errors.New("should not call aggregated stats"), + } + cfg := &config.Config{ + Dashboard: config.DashboardCacheConfig{Enabled: false}, + DashboardAgg: config.DashboardAggregationConfig{ + Enabled: false, + Retention: config.DashboardAggregationRetentionConfig{ + UsageLogsDays: 7, + }, + }, + } + svc := NewDashboardService(repo, nil, nil, cfg) + + got, err := svc.GetDashboardStats(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(42), got.TotalUsers) + require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls)) + require.Equal(t, int32(1), atomic.LoadInt32(&repo.rangeCalls)) + require.False(t, repo.rangeEnd.IsZero()) + require.Equal(t, truncateToDayUTC(repo.rangeEnd.AddDate(0, 0, -7)), repo.rangeStart) +} diff --git a/config.yaml b/config.yaml index 848421d6..b5272aac 100644 --- a/config.yaml +++ b/config.yaml @@ -232,6 +232,9 @@ dashboard_aggregation: # Allow manual backfill # 允许手动回填 backfill_enabled: false + # Backfill max range (days) + # 回填最大跨度(天) + backfill_max_days: 31 # Recompute recent N days on startup # 启动时重算最近 N 天 recompute_days: 2 diff --git a/deploy/.env.example b/deploy/.env.example index bd8abc5c..83e58a50 100644 --- a/deploy/.env.example +++ b/deploy/.env.example @@ -69,6 +69,33 @@ JWT_EXPIRE_HOUR=24 # Leave unset to use default ./config.yaml #CONFIG_FILE=./config.yaml +# ----------------------------------------------------------------------------- +# Dashboard Aggregation (Optional) +# ----------------------------------------------------------------------------- +# Enable aggregation job +# 启用仪表盘预聚合 +DASHBOARD_AGGREGATION_ENABLED=true +# Refresh interval (seconds) +# 刷新间隔(秒) +DASHBOARD_AGGREGATION_INTERVAL_SECONDS=60 +# Lookback window (seconds) +# 回看窗口(秒) +DASHBOARD_AGGREGATION_LOOKBACK_SECONDS=120 +# Allow manual backfill +# 允许手动回填 +DASHBOARD_AGGREGATION_BACKFILL_ENABLED=false +# Backfill max range (days) +# 回填最大跨度(天) +DASHBOARD_AGGREGATION_BACKFILL_MAX_DAYS=31 +# Recompute recent N days on startup +# 启动时重算最近 N 天 +DASHBOARD_AGGREGATION_RECOMPUTE_DAYS=2 +# Retention windows (days) +# 保留窗口(天) +DASHBOARD_AGGREGATION_RETENTION_USAGE_LOGS_DAYS=90 +DASHBOARD_AGGREGATION_RETENTION_HOURLY_DAYS=180 +DASHBOARD_AGGREGATION_RETENTION_DAILY_DAYS=730 + # ----------------------------------------------------------------------------- # Security Configuration # ----------------------------------------------------------------------------- diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 460606ab..57239f8e 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -232,6 +232,9 @@ dashboard_aggregation: # Allow manual backfill # 允许手动回填 backfill_enabled: false + # Backfill max range (days) + # 回填最大跨度(天) + backfill_max_days: 31 # Recompute recent N days on startup # 启动时重算最近 N 天 recompute_days: 2