Merge branch 'test' into dev

This commit is contained in:
yangjianbo
2026-01-11 18:22:07 +08:00
23 changed files with 2494 additions and 66 deletions

View File

@@ -67,6 +67,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
userHandler := handler.NewUserHandler(userService)
apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService)
usageLogRepository := repository.NewUsageLogRepository(client, db)
dashboardAggregationRepository := repository.NewDashboardAggregationRepository(db)
usageService := service.NewUsageService(usageLogRepository, userRepository, client, apiKeyAuthCacheInvalidator)
usageHandler := handler.NewUsageHandler(usageService, apiKeyService)
redeemCodeRepository := repository.NewRedeemCodeRepository(client)
@@ -75,8 +76,11 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
redeemService := service.NewRedeemService(redeemCodeRepository, userRepository, subscriptionService, redeemCache, billingCacheService, client, apiKeyAuthCacheInvalidator)
redeemHandler := handler.NewRedeemHandler(redeemService)
subscriptionHandler := handler.NewSubscriptionHandler(subscriptionService)
dashboardService := service.NewDashboardService(usageLogRepository)
dashboardHandler := admin.NewDashboardHandler(dashboardService)
dashboardStatsCache := repository.NewDashboardCache(redisClient, configConfig)
timingWheelService := service.ProvideTimingWheelService()
dashboardAggregationService := service.ProvideDashboardAggregationService(dashboardAggregationRepository, timingWheelService, configConfig)
dashboardService := service.NewDashboardService(usageLogRepository, dashboardAggregationRepository, dashboardStatsCache, configConfig)
dashboardHandler := admin.NewDashboardHandler(dashboardService, dashboardAggregationService)
accountRepository := repository.NewAccountRepository(client, db)
proxyRepository := repository.NewProxyRepository(client, db)
proxyExitInfoProber := repository.NewProxyExitInfoProber(configConfig)
@@ -137,7 +141,6 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
billingService := service.NewBillingService(configConfig, pricingService)
identityCache := repository.NewIdentityCache(redisClient)
identityService := service.NewIdentityService(identityCache)
timingWheelService := service.ProvideTimingWheelService()
deferredService := service.ProvideDeferredService(accountRepository, timingWheelService)
gatewayService := service.NewGatewayService(accountRepository, groupRepository, usageLogRepository, userRepository, userSubscriptionRepository, gatewayCache, configConfig, concurrencyService, billingService, rateLimitService, billingCacheService, identityService, httpUpstream, deferredService)
geminiMessagesCompatService := service.NewGeminiMessagesCompatService(accountRepository, groupRepository, gatewayCache, geminiTokenProvider, rateLimitService, httpUpstream, antigravityGatewayService, configConfig)

View File

@@ -36,26 +36,28 @@ const (
)
type Config struct {
Server ServerConfig `mapstructure:"server"`
CORS CORSConfig `mapstructure:"cors"`
Security SecurityConfig `mapstructure:"security"`
Billing BillingConfig `mapstructure:"billing"`
Turnstile TurnstileConfig `mapstructure:"turnstile"`
Database DatabaseConfig `mapstructure:"database"`
Redis RedisConfig `mapstructure:"redis"`
JWT JWTConfig `mapstructure:"jwt"`
LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"`
Default DefaultConfig `mapstructure:"default"`
RateLimit RateLimitConfig `mapstructure:"rate_limit"`
Pricing PricingConfig `mapstructure:"pricing"`
Gateway GatewayConfig `mapstructure:"gateway"`
APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"`
Concurrency ConcurrencyConfig `mapstructure:"concurrency"`
TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"`
RunMode string `mapstructure:"run_mode" yaml:"run_mode"`
Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC"
Gemini GeminiConfig `mapstructure:"gemini"`
Update UpdateConfig `mapstructure:"update"`
Server ServerConfig `mapstructure:"server"`
CORS CORSConfig `mapstructure:"cors"`
Security SecurityConfig `mapstructure:"security"`
Billing BillingConfig `mapstructure:"billing"`
Turnstile TurnstileConfig `mapstructure:"turnstile"`
Database DatabaseConfig `mapstructure:"database"`
Redis RedisConfig `mapstructure:"redis"`
JWT JWTConfig `mapstructure:"jwt"`
LinuxDo LinuxDoConnectConfig `mapstructure:"linuxdo_connect"`
Default DefaultConfig `mapstructure:"default"`
RateLimit RateLimitConfig `mapstructure:"rate_limit"`
Pricing PricingConfig `mapstructure:"pricing"`
Gateway GatewayConfig `mapstructure:"gateway"`
APIKeyAuth APIKeyAuthCacheConfig `mapstructure:"api_key_auth_cache"`
Dashboard DashboardCacheConfig `mapstructure:"dashboard_cache"`
DashboardAgg DashboardAggregationConfig `mapstructure:"dashboard_aggregation"`
Concurrency ConcurrencyConfig `mapstructure:"concurrency"`
TokenRefresh TokenRefreshConfig `mapstructure:"token_refresh"`
RunMode string `mapstructure:"run_mode" yaml:"run_mode"`
Timezone string `mapstructure:"timezone"` // e.g. "Asia/Shanghai", "UTC"
Gemini GeminiConfig `mapstructure:"gemini"`
Update UpdateConfig `mapstructure:"update"`
}
// UpdateConfig 在线更新相关配置
@@ -386,6 +388,45 @@ type APIKeyAuthCacheConfig struct {
Singleflight bool `mapstructure:"singleflight"`
}
// DashboardCacheConfig 仪表盘统计缓存配置
type DashboardCacheConfig struct {
// Enabled: 是否启用仪表盘缓存
Enabled bool `mapstructure:"enabled"`
// KeyPrefix: Redis key 前缀,用于多环境隔离
KeyPrefix string `mapstructure:"key_prefix"`
// StatsFreshTTLSeconds: 缓存命中认为“新鲜”的时间窗口(秒)
StatsFreshTTLSeconds int `mapstructure:"stats_fresh_ttl_seconds"`
// StatsTTLSeconds: Redis 缓存总 TTL
StatsTTLSeconds int `mapstructure:"stats_ttl_seconds"`
// StatsRefreshTimeoutSeconds: 异步刷新超时(秒)
StatsRefreshTimeoutSeconds int `mapstructure:"stats_refresh_timeout_seconds"`
}
// DashboardAggregationConfig 仪表盘预聚合配置
type DashboardAggregationConfig struct {
// Enabled: 是否启用预聚合作业
Enabled bool `mapstructure:"enabled"`
// IntervalSeconds: 聚合刷新间隔(秒)
IntervalSeconds int `mapstructure:"interval_seconds"`
// LookbackSeconds: 回看窗口(秒)
LookbackSeconds int `mapstructure:"lookback_seconds"`
// BackfillEnabled: 是否允许全量回填
BackfillEnabled bool `mapstructure:"backfill_enabled"`
// BackfillMaxDays: 回填最大跨度(天)
BackfillMaxDays int `mapstructure:"backfill_max_days"`
// Retention: 各表保留窗口(天)
Retention DashboardAggregationRetentionConfig `mapstructure:"retention"`
// RecomputeDays: 启动时重算最近 N 天
RecomputeDays int `mapstructure:"recompute_days"`
}
// DashboardAggregationRetentionConfig 预聚合保留窗口
type DashboardAggregationRetentionConfig struct {
UsageLogsDays int `mapstructure:"usage_logs_days"`
HourlyDays int `mapstructure:"hourly_days"`
DailyDays int `mapstructure:"daily_days"`
}
func NormalizeRunMode(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
switch normalized {
@@ -451,6 +492,7 @@ func Load() (*Config, error) {
cfg.LinuxDo.UserInfoEmailPath = strings.TrimSpace(cfg.LinuxDo.UserInfoEmailPath)
cfg.LinuxDo.UserInfoIDPath = strings.TrimSpace(cfg.LinuxDo.UserInfoIDPath)
cfg.LinuxDo.UserInfoUsernamePath = strings.TrimSpace(cfg.LinuxDo.UserInfoUsernamePath)
cfg.Dashboard.KeyPrefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix)
cfg.CORS.AllowedOrigins = normalizeStringSlice(cfg.CORS.AllowedOrigins)
cfg.Security.ResponseHeaders.AdditionalAllowed = normalizeStringSlice(cfg.Security.ResponseHeaders.AdditionalAllowed)
cfg.Security.ResponseHeaders.ForceRemove = normalizeStringSlice(cfg.Security.ResponseHeaders.ForceRemove)
@@ -688,6 +730,24 @@ func setDefaults() {
viper.SetDefault("api_key_auth_cache.jitter_percent", 10)
viper.SetDefault("api_key_auth_cache.singleflight", true)
// Dashboard cache
viper.SetDefault("dashboard_cache.enabled", true)
viper.SetDefault("dashboard_cache.key_prefix", "sub2api:")
viper.SetDefault("dashboard_cache.stats_fresh_ttl_seconds", 15)
viper.SetDefault("dashboard_cache.stats_ttl_seconds", 30)
viper.SetDefault("dashboard_cache.stats_refresh_timeout_seconds", 30)
// Dashboard aggregation
viper.SetDefault("dashboard_aggregation.enabled", true)
viper.SetDefault("dashboard_aggregation.interval_seconds", 60)
viper.SetDefault("dashboard_aggregation.lookback_seconds", 120)
viper.SetDefault("dashboard_aggregation.backfill_enabled", false)
viper.SetDefault("dashboard_aggregation.backfill_max_days", 31)
viper.SetDefault("dashboard_aggregation.retention.usage_logs_days", 90)
viper.SetDefault("dashboard_aggregation.retention.hourly_days", 180)
viper.SetDefault("dashboard_aggregation.retention.daily_days", 730)
viper.SetDefault("dashboard_aggregation.recompute_days", 2)
// Gateway
viper.SetDefault("gateway.response_header_timeout", 600) // 600秒(10分钟)等待上游响应头LLM高负载时可能排队较久
viper.SetDefault("gateway.log_upstream_error_body", false)
@@ -846,6 +906,78 @@ func (c *Config) Validate() error {
if c.Redis.MinIdleConns > c.Redis.PoolSize {
return fmt.Errorf("redis.min_idle_conns cannot exceed redis.pool_size")
}
if c.Dashboard.Enabled {
if c.Dashboard.StatsFreshTTLSeconds <= 0 {
return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be positive")
}
if c.Dashboard.StatsTTLSeconds <= 0 {
return fmt.Errorf("dashboard_cache.stats_ttl_seconds must be positive")
}
if c.Dashboard.StatsRefreshTimeoutSeconds <= 0 {
return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be positive")
}
if c.Dashboard.StatsFreshTTLSeconds > c.Dashboard.StatsTTLSeconds {
return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be <= dashboard_cache.stats_ttl_seconds")
}
} else {
if c.Dashboard.StatsFreshTTLSeconds < 0 {
return fmt.Errorf("dashboard_cache.stats_fresh_ttl_seconds must be non-negative")
}
if c.Dashboard.StatsTTLSeconds < 0 {
return fmt.Errorf("dashboard_cache.stats_ttl_seconds must be non-negative")
}
if c.Dashboard.StatsRefreshTimeoutSeconds < 0 {
return fmt.Errorf("dashboard_cache.stats_refresh_timeout_seconds must be non-negative")
}
}
if c.DashboardAgg.Enabled {
if c.DashboardAgg.IntervalSeconds <= 0 {
return fmt.Errorf("dashboard_aggregation.interval_seconds must be positive")
}
if c.DashboardAgg.LookbackSeconds < 0 {
return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative")
}
if c.DashboardAgg.BackfillMaxDays < 0 {
return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative")
}
if c.DashboardAgg.BackfillEnabled && c.DashboardAgg.BackfillMaxDays == 0 {
return fmt.Errorf("dashboard_aggregation.backfill_max_days must be positive")
}
if c.DashboardAgg.Retention.UsageLogsDays <= 0 {
return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be positive")
}
if c.DashboardAgg.Retention.HourlyDays <= 0 {
return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be positive")
}
if c.DashboardAgg.Retention.DailyDays <= 0 {
return fmt.Errorf("dashboard_aggregation.retention.daily_days must be positive")
}
if c.DashboardAgg.RecomputeDays < 0 {
return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative")
}
} else {
if c.DashboardAgg.IntervalSeconds < 0 {
return fmt.Errorf("dashboard_aggregation.interval_seconds must be non-negative")
}
if c.DashboardAgg.LookbackSeconds < 0 {
return fmt.Errorf("dashboard_aggregation.lookback_seconds must be non-negative")
}
if c.DashboardAgg.BackfillMaxDays < 0 {
return fmt.Errorf("dashboard_aggregation.backfill_max_days must be non-negative")
}
if c.DashboardAgg.Retention.UsageLogsDays < 0 {
return fmt.Errorf("dashboard_aggregation.retention.usage_logs_days must be non-negative")
}
if c.DashboardAgg.Retention.HourlyDays < 0 {
return fmt.Errorf("dashboard_aggregation.retention.hourly_days must be non-negative")
}
if c.DashboardAgg.Retention.DailyDays < 0 {
return fmt.Errorf("dashboard_aggregation.retention.daily_days must be non-negative")
}
if c.DashboardAgg.RecomputeDays < 0 {
return fmt.Errorf("dashboard_aggregation.recompute_days must be non-negative")
}
}
if c.Gateway.MaxBodySize <= 0 {
return fmt.Errorf("gateway.max_body_size must be positive")
}

View File

@@ -141,3 +141,142 @@ func TestValidateLinuxDoPKCERequiredForPublicClient(t *testing.T) {
t.Fatalf("Validate() expected use_pkce error, got: %v", err)
}
}
func TestLoadDefaultDashboardCacheConfig(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
if !cfg.Dashboard.Enabled {
t.Fatalf("Dashboard.Enabled = false, want true")
}
if cfg.Dashboard.KeyPrefix != "sub2api:" {
t.Fatalf("Dashboard.KeyPrefix = %q, want %q", cfg.Dashboard.KeyPrefix, "sub2api:")
}
if cfg.Dashboard.StatsFreshTTLSeconds != 15 {
t.Fatalf("Dashboard.StatsFreshTTLSeconds = %d, want 15", cfg.Dashboard.StatsFreshTTLSeconds)
}
if cfg.Dashboard.StatsTTLSeconds != 30 {
t.Fatalf("Dashboard.StatsTTLSeconds = %d, want 30", cfg.Dashboard.StatsTTLSeconds)
}
if cfg.Dashboard.StatsRefreshTimeoutSeconds != 30 {
t.Fatalf("Dashboard.StatsRefreshTimeoutSeconds = %d, want 30", cfg.Dashboard.StatsRefreshTimeoutSeconds)
}
}
func TestValidateDashboardCacheConfigEnabled(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
cfg.Dashboard.Enabled = true
cfg.Dashboard.StatsFreshTTLSeconds = 10
cfg.Dashboard.StatsTTLSeconds = 5
err = cfg.Validate()
if err == nil {
t.Fatalf("Validate() expected error for stats_fresh_ttl_seconds > stats_ttl_seconds, got nil")
}
if !strings.Contains(err.Error(), "dashboard_cache.stats_fresh_ttl_seconds") {
t.Fatalf("Validate() expected stats_fresh_ttl_seconds error, got: %v", err)
}
}
func TestValidateDashboardCacheConfigDisabled(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
cfg.Dashboard.Enabled = false
cfg.Dashboard.StatsTTLSeconds = -1
err = cfg.Validate()
if err == nil {
t.Fatalf("Validate() expected error for negative stats_ttl_seconds, got nil")
}
if !strings.Contains(err.Error(), "dashboard_cache.stats_ttl_seconds") {
t.Fatalf("Validate() expected stats_ttl_seconds error, got: %v", err)
}
}
func TestLoadDefaultDashboardAggregationConfig(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
if !cfg.DashboardAgg.Enabled {
t.Fatalf("DashboardAgg.Enabled = false, want true")
}
if cfg.DashboardAgg.IntervalSeconds != 60 {
t.Fatalf("DashboardAgg.IntervalSeconds = %d, want 60", cfg.DashboardAgg.IntervalSeconds)
}
if cfg.DashboardAgg.LookbackSeconds != 120 {
t.Fatalf("DashboardAgg.LookbackSeconds = %d, want 120", cfg.DashboardAgg.LookbackSeconds)
}
if cfg.DashboardAgg.BackfillEnabled {
t.Fatalf("DashboardAgg.BackfillEnabled = true, want false")
}
if cfg.DashboardAgg.BackfillMaxDays != 31 {
t.Fatalf("DashboardAgg.BackfillMaxDays = %d, want 31", cfg.DashboardAgg.BackfillMaxDays)
}
if cfg.DashboardAgg.Retention.UsageLogsDays != 90 {
t.Fatalf("DashboardAgg.Retention.UsageLogsDays = %d, want 90", cfg.DashboardAgg.Retention.UsageLogsDays)
}
if cfg.DashboardAgg.Retention.HourlyDays != 180 {
t.Fatalf("DashboardAgg.Retention.HourlyDays = %d, want 180", cfg.DashboardAgg.Retention.HourlyDays)
}
if cfg.DashboardAgg.Retention.DailyDays != 730 {
t.Fatalf("DashboardAgg.Retention.DailyDays = %d, want 730", cfg.DashboardAgg.Retention.DailyDays)
}
if cfg.DashboardAgg.RecomputeDays != 2 {
t.Fatalf("DashboardAgg.RecomputeDays = %d, want 2", cfg.DashboardAgg.RecomputeDays)
}
}
func TestValidateDashboardAggregationConfigDisabled(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
cfg.DashboardAgg.Enabled = false
cfg.DashboardAgg.IntervalSeconds = -1
err = cfg.Validate()
if err == nil {
t.Fatalf("Validate() expected error for negative dashboard_aggregation.interval_seconds, got nil")
}
if !strings.Contains(err.Error(), "dashboard_aggregation.interval_seconds") {
t.Fatalf("Validate() expected interval_seconds error, got: %v", err)
}
}
func TestValidateDashboardAggregationBackfillMaxDays(t *testing.T) {
viper.Reset()
cfg, err := Load()
if err != nil {
t.Fatalf("Load() error: %v", err)
}
cfg.DashboardAgg.BackfillEnabled = true
cfg.DashboardAgg.BackfillMaxDays = 0
err = cfg.Validate()
if err == nil {
t.Fatalf("Validate() expected error for dashboard_aggregation.backfill_max_days, got nil")
}
if !strings.Contains(err.Error(), "dashboard_aggregation.backfill_max_days") {
t.Fatalf("Validate() expected backfill_max_days error, got: %v", err)
}
}

View File

@@ -1,6 +1,7 @@
package admin
import (
"errors"
"strconv"
"time"
@@ -13,15 +14,17 @@ import (
// DashboardHandler handles admin dashboard statistics
type DashboardHandler struct {
dashboardService *service.DashboardService
startTime time.Time // Server start time for uptime calculation
dashboardService *service.DashboardService
aggregationService *service.DashboardAggregationService
startTime time.Time // Server start time for uptime calculation
}
// NewDashboardHandler creates a new admin dashboard handler
func NewDashboardHandler(dashboardService *service.DashboardService) *DashboardHandler {
func NewDashboardHandler(dashboardService *service.DashboardService, aggregationService *service.DashboardAggregationService) *DashboardHandler {
return &DashboardHandler{
dashboardService: dashboardService,
startTime: time.Now(),
dashboardService: dashboardService,
aggregationService: aggregationService,
startTime: time.Now(),
}
}
@@ -114,6 +117,58 @@ func (h *DashboardHandler) GetStats(c *gin.Context) {
// 性能指标
"rpm": stats.Rpm,
"tpm": stats.Tpm,
// 预聚合新鲜度
"hourly_active_users": stats.HourlyActiveUsers,
"stats_updated_at": stats.StatsUpdatedAt,
"stats_stale": stats.StatsStale,
})
}
type DashboardAggregationBackfillRequest struct {
Start string `json:"start"`
End string `json:"end"`
}
// BackfillAggregation handles triggering aggregation backfill
// POST /api/v1/admin/dashboard/aggregation/backfill
func (h *DashboardHandler) BackfillAggregation(c *gin.Context) {
if h.aggregationService == nil {
response.InternalError(c, "Aggregation service not available")
return
}
var req DashboardAggregationBackfillRequest
if err := c.ShouldBindJSON(&req); err != nil {
response.BadRequest(c, "Invalid request body")
return
}
start, err := time.Parse(time.RFC3339, req.Start)
if err != nil {
response.BadRequest(c, "Invalid start time")
return
}
end, err := time.Parse(time.RFC3339, req.End)
if err != nil {
response.BadRequest(c, "Invalid end time")
return
}
if err := h.aggregationService.TriggerBackfill(start, end); err != nil {
if errors.Is(err, service.ErrDashboardBackfillDisabled) {
response.Forbidden(c, "Backfill is disabled")
return
}
if errors.Is(err, service.ErrDashboardBackfillTooLarge) {
response.BadRequest(c, "Backfill range too large")
return
}
response.InternalError(c, "Failed to trigger backfill")
return
}
response.Success(c, gin.H{
"status": "accepted",
})
}

View File

@@ -9,6 +9,12 @@ type DashboardStats struct {
TotalUsers int64 `json:"total_users"`
TodayNewUsers int64 `json:"today_new_users"` // 今日新增用户数
ActiveUsers int64 `json:"active_users"` // 今日有请求的用户数
// 小时活跃用户数UTC 当前小时)
HourlyActiveUsers int64 `json:"hourly_active_users"`
// 预聚合新鲜度
StatsUpdatedAt string `json:"stats_updated_at"`
StatsStale bool `json:"stats_stale"`
// API Key 统计
TotalAPIKeys int64 `json:"total_api_keys"`

View File

@@ -0,0 +1,361 @@
package repository
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/lib/pq"
)
type dashboardAggregationRepository struct {
sql sqlExecutor
}
// NewDashboardAggregationRepository 创建仪表盘预聚合仓储。
func NewDashboardAggregationRepository(sqlDB *sql.DB) service.DashboardAggregationRepository {
return newDashboardAggregationRepositoryWithSQL(sqlDB)
}
func newDashboardAggregationRepositoryWithSQL(sqlq sqlExecutor) *dashboardAggregationRepository {
return &dashboardAggregationRepository{sql: sqlq}
}
func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error {
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return nil
}
hourStart := startUTC.Truncate(time.Hour)
hourEnd := endUTC.Truncate(time.Hour)
if endUTC.After(hourEnd) {
hourEnd = hourEnd.Add(time.Hour)
}
dayStart := truncateToDayUTC(startUTC)
dayEnd := truncateToDayUTC(endUTC)
if endUTC.After(dayEnd) {
dayEnd = dayEnd.Add(24 * time.Hour)
}
// 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。
if err := r.insertHourlyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.insertDailyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.upsertDailyAggregates(ctx, dayStart, dayEnd); err != nil {
return err
}
return nil
}
func (r *dashboardAggregationRepository) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
var ts time.Time
query := "SELECT last_aggregated_at FROM usage_dashboard_aggregation_watermark WHERE id = 1"
if err := scanSingleRow(ctx, r.sql, query, nil, &ts); err != nil {
if err == sql.ErrNoRows {
return time.Unix(0, 0).UTC(), nil
}
return time.Time{}, err
}
return ts.UTC(), nil
}
func (r *dashboardAggregationRepository) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
query := `
INSERT INTO usage_dashboard_aggregation_watermark (id, last_aggregated_at, updated_at)
VALUES (1, $1, NOW())
ON CONFLICT (id)
DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at, updated_at = EXCLUDED.updated_at
`
_, err := r.sql.ExecContext(ctx, query, aggregatedAt.UTC())
return err
}
func (r *dashboardAggregationRepository) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
_, err := r.sql.ExecContext(ctx, `
DELETE FROM usage_dashboard_hourly WHERE bucket_start < $1;
DELETE FROM usage_dashboard_hourly_users WHERE bucket_start < $1;
DELETE FROM usage_dashboard_daily WHERE bucket_date < $2::date;
DELETE FROM usage_dashboard_daily_users WHERE bucket_date < $2::date;
`, hourlyCutoff.UTC(), dailyCutoff.UTC())
return err
}
func (r *dashboardAggregationRepository) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
isPartitioned, err := r.isUsageLogsPartitioned(ctx)
if err != nil {
return err
}
if isPartitioned {
return r.dropUsageLogsPartitions(ctx, cutoff)
}
_, err = r.sql.ExecContext(ctx, "DELETE FROM usage_logs WHERE created_at < $1", cutoff.UTC())
return err
}
func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
isPartitioned, err := r.isUsageLogsPartitioned(ctx)
if err != nil || !isPartitioned {
return err
}
monthStart := truncateToMonthUTC(now)
prevMonth := monthStart.AddDate(0, -1, 0)
nextMonth := monthStart.AddDate(0, 1, 0)
for _, m := range []time.Time{prevMonth, monthStart, nextMonth} {
if err := r.createUsageLogsPartition(ctx, m); err != nil {
return err
}
}
return nil
}
func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error {
query := `
INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id)
SELECT DISTINCT
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
user_id
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
ON CONFLICT DO NOTHING
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error {
query := `
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
SELECT DISTINCT
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
user_id
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $1 AND bucket_start < $2
ON CONFLICT DO NOTHING
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error {
query := `
WITH hourly AS (
SELECT
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
COUNT(*) AS total_requests,
COALESCE(SUM(input_tokens), 0) AS input_tokens,
COALESCE(SUM(output_tokens), 0) AS output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens,
COALESCE(SUM(total_cost), 0) AS total_cost,
COALESCE(SUM(actual_cost), 0) AS actual_cost,
COALESCE(SUM(COALESCE(duration_ms, 0)), 0) AS total_duration_ms
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
GROUP BY 1
),
user_counts AS (
SELECT bucket_start, COUNT(*) AS active_users
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $1 AND bucket_start < $2
GROUP BY bucket_start
)
INSERT INTO usage_dashboard_hourly (
bucket_start,
total_requests,
input_tokens,
output_tokens,
cache_creation_tokens,
cache_read_tokens,
total_cost,
actual_cost,
total_duration_ms,
active_users,
computed_at
)
SELECT
hourly.bucket_start,
hourly.total_requests,
hourly.input_tokens,
hourly.output_tokens,
hourly.cache_creation_tokens,
hourly.cache_read_tokens,
hourly.total_cost,
hourly.actual_cost,
hourly.total_duration_ms,
COALESCE(user_counts.active_users, 0) AS active_users,
NOW()
FROM hourly
LEFT JOIN user_counts ON user_counts.bucket_start = hourly.bucket_start
ON CONFLICT (bucket_start)
DO UPDATE SET
total_requests = EXCLUDED.total_requests,
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_creation_tokens = EXCLUDED.cache_creation_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
total_cost = EXCLUDED.total_cost,
actual_cost = EXCLUDED.actual_cost,
total_duration_ms = EXCLUDED.total_duration_ms,
active_users = EXCLUDED.active_users,
computed_at = EXCLUDED.computed_at
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error {
query := `
WITH daily AS (
SELECT
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
COALESCE(SUM(total_requests), 0) AS total_requests,
COALESCE(SUM(input_tokens), 0) AS input_tokens,
COALESCE(SUM(output_tokens), 0) AS output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens,
COALESCE(SUM(total_cost), 0) AS total_cost,
COALESCE(SUM(actual_cost), 0) AS actual_cost,
COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms
FROM usage_dashboard_hourly
WHERE bucket_start >= $1 AND bucket_start < $2
GROUP BY (bucket_start AT TIME ZONE 'UTC')::date
),
user_counts AS (
SELECT bucket_date, COUNT(*) AS active_users
FROM usage_dashboard_daily_users
WHERE bucket_date >= $3::date AND bucket_date < $4::date
GROUP BY bucket_date
)
INSERT INTO usage_dashboard_daily (
bucket_date,
total_requests,
input_tokens,
output_tokens,
cache_creation_tokens,
cache_read_tokens,
total_cost,
actual_cost,
total_duration_ms,
active_users,
computed_at
)
SELECT
daily.bucket_date,
daily.total_requests,
daily.input_tokens,
daily.output_tokens,
daily.cache_creation_tokens,
daily.cache_read_tokens,
daily.total_cost,
daily.actual_cost,
daily.total_duration_ms,
COALESCE(user_counts.active_users, 0) AS active_users,
NOW()
FROM daily
LEFT JOIN user_counts ON user_counts.bucket_date = daily.bucket_date
ON CONFLICT (bucket_date)
DO UPDATE SET
total_requests = EXCLUDED.total_requests,
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_creation_tokens = EXCLUDED.cache_creation_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
total_cost = EXCLUDED.total_cost,
actual_cost = EXCLUDED.actual_cost,
total_duration_ms = EXCLUDED.total_duration_ms,
active_users = EXCLUDED.active_users,
computed_at = EXCLUDED.computed_at
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) isUsageLogsPartitioned(ctx context.Context) (bool, error) {
query := `
SELECT EXISTS(
SELECT 1
FROM pg_partitioned_table pt
JOIN pg_class c ON c.oid = pt.partrelid
WHERE c.relname = 'usage_logs'
)
`
var partitioned bool
if err := scanSingleRow(ctx, r.sql, query, nil, &partitioned); err != nil {
return false, err
}
return partitioned, nil
}
func (r *dashboardAggregationRepository) dropUsageLogsPartitions(ctx context.Context, cutoff time.Time) error {
rows, err := r.sql.QueryContext(ctx, `
SELECT c.relname
FROM pg_inherits
JOIN pg_class c ON c.oid = pg_inherits.inhrelid
JOIN pg_class p ON p.oid = pg_inherits.inhparent
WHERE p.relname = 'usage_logs'
`)
if err != nil {
return err
}
defer rows.Close()
cutoffMonth := truncateToMonthUTC(cutoff)
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return err
}
if !strings.HasPrefix(name, "usage_logs_") {
continue
}
suffix := strings.TrimPrefix(name, "usage_logs_")
month, err := time.Parse("200601", suffix)
if err != nil {
continue
}
month = month.UTC()
if month.Before(cutoffMonth) {
if _, err := r.sql.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(name))); err != nil {
return err
}
}
}
return rows.Err()
}
func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Context, month time.Time) error {
monthStart := truncateToMonthUTC(month)
nextMonth := monthStart.AddDate(0, 1, 0)
name := fmt.Sprintf("usage_logs_%s", monthStart.Format("200601"))
query := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF usage_logs FOR VALUES FROM (%s) TO (%s)",
pq.QuoteIdentifier(name),
pq.QuoteLiteral(monthStart.Format("2006-01-02")),
pq.QuoteLiteral(nextMonth.Format("2006-01-02")),
)
_, err := r.sql.ExecContext(ctx, query)
return err
}
func truncateToDayUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}
func truncateToMonthUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
}

View File

@@ -0,0 +1,58 @@
package repository
import (
"context"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/redis/go-redis/v9"
)
const dashboardStatsCacheKey = "dashboard:stats:v1"
type dashboardCache struct {
rdb *redis.Client
keyPrefix string
}
func NewDashboardCache(rdb *redis.Client, cfg *config.Config) service.DashboardStatsCache {
prefix := "sub2api:"
if cfg != nil {
prefix = strings.TrimSpace(cfg.Dashboard.KeyPrefix)
}
if prefix != "" && !strings.HasSuffix(prefix, ":") {
prefix += ":"
}
return &dashboardCache{
rdb: rdb,
keyPrefix: prefix,
}
}
func (c *dashboardCache) GetDashboardStats(ctx context.Context) (string, error) {
val, err := c.rdb.Get(ctx, c.buildKey()).Result()
if err != nil {
if err == redis.Nil {
return "", service.ErrDashboardStatsCacheMiss
}
return "", err
}
return val, nil
}
func (c *dashboardCache) SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error {
return c.rdb.Set(ctx, c.buildKey(), data, ttl).Err()
}
func (c *dashboardCache) buildKey() string {
if c.keyPrefix == "" {
return dashboardStatsCacheKey
}
return c.keyPrefix + dashboardStatsCacheKey
}
func (c *dashboardCache) DeleteDashboardStats(ctx context.Context) error {
return c.rdb.Del(ctx, c.buildKey()).Err()
}

View File

@@ -0,0 +1,28 @@
package repository
import (
"testing"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/stretchr/testify/require"
)
func TestNewDashboardCacheKeyPrefix(t *testing.T) {
cache := NewDashboardCache(nil, &config.Config{
Dashboard: config.DashboardCacheConfig{
KeyPrefix: "prod",
},
})
impl, ok := cache.(*dashboardCache)
require.True(t, ok)
require.Equal(t, "prod:", impl.keyPrefix)
cache = NewDashboardCache(nil, &config.Config{
Dashboard: config.DashboardCacheConfig{
KeyPrefix: "staging:",
},
})
impl, ok = cache.(*dashboardCache)
require.True(t, ok)
require.Equal(t, "staging:", impl.keyPrefix)
}

View File

@@ -269,16 +269,60 @@ func (r *usageLogRepository) GetUserStats(ctx context.Context, userID int64, sta
type DashboardStats = usagestats.DashboardStats
func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) {
var stats DashboardStats
today := timezone.Today()
now := time.Now()
stats := &DashboardStats{}
now := time.Now().UTC()
todayUTC := truncateToDayUTC(now)
// 合并用户统计查询
if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil {
return nil, err
}
if err := r.fillDashboardUsageStatsAggregated(ctx, stats, todayUTC, now); err != nil {
return nil, err
}
rpm, tpm, err := r.getPerformanceStats(ctx, 0)
if err != nil {
return nil, err
}
stats.Rpm = rpm
stats.Tpm = tpm
return stats, nil
}
func (r *usageLogRepository) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*DashboardStats, error) {
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return nil, errors.New("统计时间范围无效")
}
stats := &DashboardStats{}
now := time.Now().UTC()
todayUTC := truncateToDayUTC(now)
if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil {
return nil, err
}
if err := r.fillDashboardUsageStatsFromUsageLogs(ctx, stats, startUTC, endUTC, todayUTC, now); err != nil {
return nil, err
}
rpm, tpm, err := r.getPerformanceStats(ctx, 0)
if err != nil {
return nil, err
}
stats.Rpm = rpm
stats.Tpm = tpm
return stats, nil
}
func (r *usageLogRepository) fillDashboardEntityStats(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error {
userStatsQuery := `
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users,
(SELECT COUNT(DISTINCT user_id) FROM usage_logs WHERE created_at >= $2) as active_users
COUNT(CASE WHEN created_at >= $1 THEN 1 END) as today_new_users
FROM users
WHERE deleted_at IS NULL
`
@@ -286,15 +330,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
ctx,
r.sql,
userStatsQuery,
[]any{today, today},
[]any{todayUTC},
&stats.TotalUsers,
&stats.TodayNewUsers,
&stats.ActiveUsers,
); err != nil {
return nil, err
return err
}
// 合并API Key统计查询
apiKeyStatsQuery := `
SELECT
COUNT(*) as total_api_keys,
@@ -310,10 +352,9 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
&stats.TotalAPIKeys,
&stats.ActiveAPIKeys,
); err != nil {
return nil, err
return err
}
// 合并账户统计查询
accountStatsQuery := `
SELECT
COUNT(*) as total_accounts,
@@ -335,22 +376,26 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
&stats.RateLimitAccounts,
&stats.OverloadAccounts,
); err != nil {
return nil, err
return err
}
// 累计 Token 统计
return nil
}
func (r *usageLogRepository) fillDashboardUsageStatsAggregated(ctx context.Context, stats *DashboardStats, todayUTC, now time.Time) error {
totalStatsQuery := `
SELECT
COUNT(*) as total_requests,
COALESCE(SUM(total_requests), 0) as total_requests,
COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens,
COALESCE(SUM(total_cost), 0) as total_cost,
COALESCE(SUM(actual_cost), 0) as total_actual_cost,
COALESCE(AVG(duration_ms), 0) as avg_duration_ms
FROM usage_logs
COALESCE(SUM(total_duration_ms), 0) as total_duration_ms
FROM usage_dashboard_daily
`
var totalDurationMs int64
if err := scanSingleRow(
ctx,
r.sql,
@@ -363,13 +408,100 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
&stats.TotalCacheReadTokens,
&stats.TotalCost,
&stats.TotalActualCost,
&stats.AverageDurationMs,
&totalDurationMs,
); err != nil {
return nil, err
return err
}
stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens
if stats.TotalRequests > 0 {
stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests)
}
// 今日 Token 统计
todayStatsQuery := `
SELECT
total_requests as today_requests,
input_tokens as today_input_tokens,
output_tokens as today_output_tokens,
cache_creation_tokens as today_cache_creation_tokens,
cache_read_tokens as today_cache_read_tokens,
total_cost as today_cost,
actual_cost as today_actual_cost,
active_users as active_users
FROM usage_dashboard_daily
WHERE bucket_date = $1::date
`
if err := scanSingleRow(
ctx,
r.sql,
todayStatsQuery,
[]any{todayUTC},
&stats.TodayRequests,
&stats.TodayInputTokens,
&stats.TodayOutputTokens,
&stats.TodayCacheCreationTokens,
&stats.TodayCacheReadTokens,
&stats.TodayCost,
&stats.TodayActualCost,
&stats.ActiveUsers,
); err != nil {
if err != sql.ErrNoRows {
return err
}
}
stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens
hourlyActiveQuery := `
SELECT active_users
FROM usage_dashboard_hourly
WHERE bucket_start = $1
`
hourStart := now.UTC().Truncate(time.Hour)
if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil {
if err != sql.ErrNoRows {
return err
}
}
return nil
}
func (r *usageLogRepository) fillDashboardUsageStatsFromUsageLogs(ctx context.Context, stats *DashboardStats, startUTC, endUTC, todayUTC, now time.Time) error {
totalStatsQuery := `
SELECT
COUNT(*) as total_requests,
COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens,
COALESCE(SUM(total_cost), 0) as total_cost,
COALESCE(SUM(actual_cost), 0) as total_actual_cost,
COALESCE(SUM(COALESCE(duration_ms, 0)), 0) as total_duration_ms
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
`
var totalDurationMs int64
if err := scanSingleRow(
ctx,
r.sql,
totalStatsQuery,
[]any{startUTC, endUTC},
&stats.TotalRequests,
&stats.TotalInputTokens,
&stats.TotalOutputTokens,
&stats.TotalCacheCreationTokens,
&stats.TotalCacheReadTokens,
&stats.TotalCost,
&stats.TotalActualCost,
&totalDurationMs,
); err != nil {
return err
}
stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheCreationTokens + stats.TotalCacheReadTokens
if stats.TotalRequests > 0 {
stats.AverageDurationMs = float64(totalDurationMs) / float64(stats.TotalRequests)
}
todayEnd := todayUTC.Add(24 * time.Hour)
todayStatsQuery := `
SELECT
COUNT(*) as today_requests,
@@ -380,13 +512,13 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
COALESCE(SUM(total_cost), 0) as today_cost,
COALESCE(SUM(actual_cost), 0) as today_actual_cost
FROM usage_logs
WHERE created_at >= $1
WHERE created_at >= $1 AND created_at < $2
`
if err := scanSingleRow(
ctx,
r.sql,
todayStatsQuery,
[]any{today},
[]any{todayUTC, todayEnd},
&stats.TodayRequests,
&stats.TodayInputTokens,
&stats.TodayOutputTokens,
@@ -395,19 +527,31 @@ func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardS
&stats.TodayCost,
&stats.TodayActualCost,
); err != nil {
return nil, err
return err
}
stats.TodayTokens = stats.TodayInputTokens + stats.TodayOutputTokens + stats.TodayCacheCreationTokens + stats.TodayCacheReadTokens
// 性能指标RPM 和 TPM最近1分钟全局
rpm, tpm, err := r.getPerformanceStats(ctx, 0)
if err != nil {
return nil, err
activeUsersQuery := `
SELECT COUNT(DISTINCT user_id) as active_users
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
`
if err := scanSingleRow(ctx, r.sql, activeUsersQuery, []any{todayUTC, todayEnd}, &stats.ActiveUsers); err != nil {
return err
}
stats.Rpm = rpm
stats.Tpm = tpm
return &stats, nil
hourStart := now.UTC().Truncate(time.Hour)
hourEnd := hourStart.Add(time.Hour)
hourlyActiveQuery := `
SELECT COUNT(DISTINCT user_id) as active_users
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
`
if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart, hourEnd}, &stats.HourlyActiveUsers); err != nil {
return err
}
return nil
}
func (r *usageLogRepository) ListByAccount(ctx context.Context, accountID int64, params pagination.PaginationParams) ([]service.UsageLog, *pagination.PaginationResult, error) {

View File

@@ -198,8 +198,8 @@ func (s *UsageLogRepoSuite) TestListWithFilters() {
// --- GetDashboardStats ---
func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() {
now := time.Now()
todayStart := timezone.Today()
now := time.Now().UTC()
todayStart := truncateToDayUTC(now)
baseStats, err := s.repo.GetDashboardStats(s.ctx)
s.Require().NoError(err, "GetDashboardStats base")
@@ -273,6 +273,11 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() {
_, err = s.repo.Create(s.ctx, logPerf)
s.Require().NoError(err, "Create logPerf")
aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx)
aggStart := todayStart.Add(-2 * time.Hour)
aggEnd := now.Add(2 * time.Minute)
s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd), "AggregateRange")
stats, err := s.repo.GetDashboardStats(s.ctx)
s.Require().NoError(err, "GetDashboardStats")
@@ -303,6 +308,80 @@ func (s *UsageLogRepoSuite) TestDashboardStats_TodayTotalsAndPerformance() {
s.Require().Equal(wantTpm, stats.Tpm, "Tpm mismatch")
}
func (s *UsageLogRepoSuite) TestDashboardStatsWithRange_Fallback() {
now := time.Now().UTC()
todayStart := truncateToDayUTC(now)
rangeStart := todayStart.Add(-24 * time.Hour)
rangeEnd := now
user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u1@test.com"})
user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "range-u2@test.com"})
apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-range-1", Name: "k1"})
apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-range-2", Name: "k2"})
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-range"})
d1, d2, d3 := 100, 200, 300
logOutside := &service.UsageLog{
UserID: user1.ID,
APIKeyID: apiKey1.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 7,
OutputTokens: 8,
TotalCost: 0.8,
ActualCost: 0.7,
DurationMs: &d3,
CreatedAt: rangeStart.Add(-1 * time.Hour),
}
_, err := s.repo.Create(s.ctx, logOutside)
s.Require().NoError(err)
logRange := &service.UsageLog{
UserID: user1.ID,
APIKeyID: apiKey1.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 10,
OutputTokens: 20,
CacheCreationTokens: 1,
CacheReadTokens: 2,
TotalCost: 1.0,
ActualCost: 0.9,
DurationMs: &d1,
CreatedAt: rangeStart.Add(2 * time.Hour),
}
_, err = s.repo.Create(s.ctx, logRange)
s.Require().NoError(err)
logToday := &service.UsageLog{
UserID: user2.ID,
APIKeyID: apiKey2.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 5,
OutputTokens: 6,
CacheReadTokens: 1,
TotalCost: 0.5,
ActualCost: 0.5,
DurationMs: &d2,
CreatedAt: now,
}
_, err = s.repo.Create(s.ctx, logToday)
s.Require().NoError(err)
stats, err := s.repo.GetDashboardStatsWithRange(s.ctx, rangeStart, rangeEnd)
s.Require().NoError(err)
s.Require().Equal(int64(2), stats.TotalRequests)
s.Require().Equal(int64(15), stats.TotalInputTokens)
s.Require().Equal(int64(26), stats.TotalOutputTokens)
s.Require().Equal(int64(1), stats.TotalCacheCreationTokens)
s.Require().Equal(int64(3), stats.TotalCacheReadTokens)
s.Require().Equal(int64(45), stats.TotalTokens)
s.Require().Equal(1.5, stats.TotalCost)
s.Require().Equal(1.4, stats.TotalActualCost)
s.Require().InEpsilon(150.0, stats.AverageDurationMs, 0.0001)
}
// --- GetUserDashboardStats ---
func (s *UsageLogRepoSuite) TestGetUserDashboardStats() {
@@ -333,6 +412,151 @@ func (s *UsageLogRepoSuite) TestGetAccountTodayStats() {
s.Require().Equal(int64(30), stats.Tokens)
}
func (s *UsageLogRepoSuite) TestDashboardAggregationConsistency() {
now := time.Now().UTC().Truncate(time.Second)
hour1 := now.Add(-90 * time.Minute).Truncate(time.Hour)
hour2 := now.Add(-30 * time.Minute).Truncate(time.Hour)
dayStart := truncateToDayUTC(now)
user1 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u1@test.com"})
user2 := mustCreateUser(s.T(), s.client, &service.User{Email: "agg-u2@test.com"})
apiKey1 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user1.ID, Key: "sk-agg-1", Name: "k1"})
apiKey2 := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user2.ID, Key: "sk-agg-2", Name: "k2"})
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-agg"})
d1, d2, d3 := 100, 200, 150
log1 := &service.UsageLog{
UserID: user1.ID,
APIKeyID: apiKey1.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 10,
OutputTokens: 20,
CacheCreationTokens: 2,
CacheReadTokens: 1,
TotalCost: 1.0,
ActualCost: 0.9,
DurationMs: &d1,
CreatedAt: hour1.Add(5 * time.Minute),
}
_, err := s.repo.Create(s.ctx, log1)
s.Require().NoError(err)
log2 := &service.UsageLog{
UserID: user1.ID,
APIKeyID: apiKey1.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 5,
OutputTokens: 5,
TotalCost: 0.5,
ActualCost: 0.5,
DurationMs: &d2,
CreatedAt: hour1.Add(20 * time.Minute),
}
_, err = s.repo.Create(s.ctx, log2)
s.Require().NoError(err)
log3 := &service.UsageLog{
UserID: user2.ID,
APIKeyID: apiKey2.ID,
AccountID: account.ID,
Model: "claude-3",
InputTokens: 7,
OutputTokens: 8,
TotalCost: 0.7,
ActualCost: 0.7,
DurationMs: &d3,
CreatedAt: hour2.Add(10 * time.Minute),
}
_, err = s.repo.Create(s.ctx, log3)
s.Require().NoError(err)
aggRepo := newDashboardAggregationRepositoryWithSQL(s.tx)
aggStart := hour1.Add(-5 * time.Minute)
aggEnd := now.Add(5 * time.Minute)
s.Require().NoError(aggRepo.AggregateRange(s.ctx, aggStart, aggEnd))
type hourlyRow struct {
totalRequests int64
inputTokens int64
outputTokens int64
cacheCreationTokens int64
cacheReadTokens int64
totalCost float64
actualCost float64
totalDurationMs int64
activeUsers int64
}
fetchHourly := func(bucketStart time.Time) hourlyRow {
var row hourlyRow
err := scanSingleRow(s.ctx, s.tx, `
SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens,
total_cost, actual_cost, total_duration_ms, active_users
FROM usage_dashboard_hourly
WHERE bucket_start = $1
`, []any{bucketStart}, &row.totalRequests, &row.inputTokens, &row.outputTokens,
&row.cacheCreationTokens, &row.cacheReadTokens, &row.totalCost, &row.actualCost,
&row.totalDurationMs, &row.activeUsers,
)
s.Require().NoError(err)
return row
}
hour1Row := fetchHourly(hour1)
s.Require().Equal(int64(2), hour1Row.totalRequests)
s.Require().Equal(int64(15), hour1Row.inputTokens)
s.Require().Equal(int64(25), hour1Row.outputTokens)
s.Require().Equal(int64(2), hour1Row.cacheCreationTokens)
s.Require().Equal(int64(1), hour1Row.cacheReadTokens)
s.Require().Equal(1.5, hour1Row.totalCost)
s.Require().Equal(1.4, hour1Row.actualCost)
s.Require().Equal(int64(300), hour1Row.totalDurationMs)
s.Require().Equal(int64(1), hour1Row.activeUsers)
hour2Row := fetchHourly(hour2)
s.Require().Equal(int64(1), hour2Row.totalRequests)
s.Require().Equal(int64(7), hour2Row.inputTokens)
s.Require().Equal(int64(8), hour2Row.outputTokens)
s.Require().Equal(int64(0), hour2Row.cacheCreationTokens)
s.Require().Equal(int64(0), hour2Row.cacheReadTokens)
s.Require().Equal(0.7, hour2Row.totalCost)
s.Require().Equal(0.7, hour2Row.actualCost)
s.Require().Equal(int64(150), hour2Row.totalDurationMs)
s.Require().Equal(int64(1), hour2Row.activeUsers)
var daily struct {
totalRequests int64
inputTokens int64
outputTokens int64
cacheCreationTokens int64
cacheReadTokens int64
totalCost float64
actualCost float64
totalDurationMs int64
activeUsers int64
}
err = scanSingleRow(s.ctx, s.tx, `
SELECT total_requests, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens,
total_cost, actual_cost, total_duration_ms, active_users
FROM usage_dashboard_daily
WHERE bucket_date = $1::date
`, []any{dayStart}, &daily.totalRequests, &daily.inputTokens, &daily.outputTokens,
&daily.cacheCreationTokens, &daily.cacheReadTokens, &daily.totalCost, &daily.actualCost,
&daily.totalDurationMs, &daily.activeUsers,
)
s.Require().NoError(err)
s.Require().Equal(int64(3), daily.totalRequests)
s.Require().Equal(int64(22), daily.inputTokens)
s.Require().Equal(int64(33), daily.outputTokens)
s.Require().Equal(int64(2), daily.cacheCreationTokens)
s.Require().Equal(int64(1), daily.cacheReadTokens)
s.Require().Equal(2.2, daily.totalCost)
s.Require().Equal(2.1, daily.actualCost)
s.Require().Equal(int64(450), daily.totalDurationMs)
s.Require().Equal(int64(2), daily.activeUsers)
}
// --- GetBatchUserUsageStats ---
func (s *UsageLogRepoSuite) TestGetBatchUserUsageStats() {

View File

@@ -47,6 +47,7 @@ var ProviderSet = wire.NewSet(
NewRedeemCodeRepository,
NewPromoCodeRepository,
NewUsageLogRepository,
NewDashboardAggregationRepository,
NewSettingRepository,
NewUserSubscriptionRepository,
NewUserAttributeDefinitionRepository,
@@ -58,6 +59,7 @@ var ProviderSet = wire.NewSet(
NewAPIKeyCache,
NewTempUnschedCache,
ProvideConcurrencyCache,
NewDashboardCache,
NewEmailCache,
NewIdentityCache,
NewRedeemCache,

View File

@@ -75,6 +75,7 @@ func registerDashboardRoutes(admin *gin.RouterGroup, h *handler.Handlers) {
dashboard.GET("/users-trend", h.Admin.Dashboard.GetUserUsageTrend)
dashboard.POST("/users-usage", h.Admin.Dashboard.GetBatchUsersUsage)
dashboard.POST("/api-keys-usage", h.Admin.Dashboard.GetBatchAPIKeysUsage)
dashboard.POST("/aggregation/backfill", h.Admin.Dashboard.BackfillAggregation)
}
}

View File

@@ -0,0 +1,242 @@
package service
import (
"context"
"errors"
"log"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
)
const (
defaultDashboardAggregationTimeout = 2 * time.Minute
defaultDashboardAggregationBackfillTimeout = 30 * time.Minute
dashboardAggregationRetentionInterval = 6 * time.Hour
)
var (
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
)
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
type DashboardAggregationRepository interface {
AggregateRange(ctx context.Context, start, end time.Time) error
GetAggregationWatermark(ctx context.Context) (time.Time, error)
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
CleanupUsageLogs(ctx context.Context, cutoff time.Time) error
EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error
}
// DashboardAggregationService 负责定时聚合与回填。
type DashboardAggregationService struct {
repo DashboardAggregationRepository
timingWheel *TimingWheelService
cfg config.DashboardAggregationConfig
running int32
lastRetentionCleanup atomic.Value // time.Time
}
// NewDashboardAggregationService 创建聚合服务。
func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
var aggCfg config.DashboardAggregationConfig
if cfg != nil {
aggCfg = cfg.DashboardAgg
}
return &DashboardAggregationService{
repo: repo,
timingWheel: timingWheel,
cfg: aggCfg,
}
}
// Start 启动定时聚合作业(重启生效配置)。
func (s *DashboardAggregationService) Start() {
if s == nil || s.repo == nil || s.timingWheel == nil {
return
}
if !s.cfg.Enabled {
log.Printf("[DashboardAggregation] 聚合作业已禁用")
return
}
interval := time.Duration(s.cfg.IntervalSeconds) * time.Second
if interval <= 0 {
interval = time.Minute
}
if s.cfg.RecomputeDays > 0 {
go s.recomputeRecentDays()
}
s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() {
s.runScheduledAggregation()
})
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
if !s.cfg.BackfillEnabled {
log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填")
}
}
// TriggerBackfill 触发回填(异步)。
func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error {
if s == nil || s.repo == nil {
return errors.New("聚合服务未初始化")
}
if !s.cfg.BackfillEnabled {
log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false")
return ErrDashboardBackfillDisabled
}
if !end.After(start) {
return errors.New("回填时间范围无效")
}
if s.cfg.BackfillMaxDays > 0 {
maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour
if end.Sub(start) > maxRange {
return ErrDashboardBackfillTooLarge
}
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, end); err != nil {
log.Printf("[DashboardAggregation] 回填失败: %v", err)
}
}()
return nil
}
func (s *DashboardAggregationService) recomputeRecentDays() {
days := s.cfg.RecomputeDays
if days <= 0 {
return
}
now := time.Now().UTC()
start := now.AddDate(0, 0, -days)
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
defer cancel()
if err := s.backfillRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 启动重算失败: %v", err)
return
}
}
func (s *DashboardAggregationService) runScheduledAggregation() {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return
}
defer atomic.StoreInt32(&s.running, 0)
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
defer cancel()
now := time.Now().UTC()
last, err := s.repo.GetAggregationWatermark(ctx)
if err != nil {
log.Printf("[DashboardAggregation] 读取水位失败: %v", err)
last = time.Unix(0, 0).UTC()
}
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
epoch := time.Unix(0, 0).UTC()
start := last.Add(-lookback)
if !last.After(epoch) {
retentionDays := s.cfg.Retention.UsageLogsDays
if retentionDays <= 0 {
retentionDays = 1
}
start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays))
} else if start.After(now) {
start = now.Add(-lookback)
}
if err := s.aggregateRange(ctx, start, now); err != nil {
log.Printf("[DashboardAggregation] 聚合失败: %v", err)
return
}
if err := s.repo.UpdateAggregationWatermark(ctx, now); err != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
}
s.maybeCleanupRetention(ctx, now)
}
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return errors.New("聚合作业正在运行")
}
defer atomic.StoreInt32(&s.running, 0)
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return errors.New("回填时间范围无效")
}
cursor := truncateToDayUTC(startUTC)
for cursor.Before(endUTC) {
windowEnd := cursor.Add(24 * time.Hour)
if windowEnd.After(endUTC) {
windowEnd = endUTC
}
if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil {
return err
}
cursor = windowEnd
}
if err := s.repo.UpdateAggregationWatermark(ctx, endUTC); err != nil {
log.Printf("[DashboardAggregation] 更新水位失败: %v", err)
}
s.maybeCleanupRetention(ctx, endUTC)
return nil
}
func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error {
if !end.After(start) {
return nil
}
if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil {
log.Printf("[DashboardAggregation] 分区检查失败: %v", err)
}
return s.repo.AggregateRange(ctx, start, end)
}
func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) {
lastAny := s.lastRetentionCleanup.Load()
if lastAny != nil {
if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval {
return
}
}
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff)
if aggErr != nil {
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr)
}
usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff)
if usageErr != nil {
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr)
}
if aggErr == nil && usageErr == nil {
s.lastRetentionCleanup.Store(now)
}
}
func truncateToDayUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}

View File

@@ -0,0 +1,106 @@
package service
import (
"context"
"errors"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/stretchr/testify/require"
)
type dashboardAggregationRepoTestStub struct {
aggregateCalls int
lastStart time.Time
lastEnd time.Time
watermark time.Time
aggregateErr error
cleanupAggregatesErr error
cleanupUsageErr error
}
func (s *dashboardAggregationRepoTestStub) AggregateRange(ctx context.Context, start, end time.Time) error {
s.aggregateCalls++
s.lastStart = start
s.lastEnd = end
return s.aggregateErr
}
func (s *dashboardAggregationRepoTestStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
return s.watermark, nil
}
func (s *dashboardAggregationRepoTestStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardAggregationRepoTestStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return s.cleanupAggregatesErr
}
func (s *dashboardAggregationRepoTestStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return s.cleanupUsageErr
}
func (s *dashboardAggregationRepoTestStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func TestDashboardAggregationService_RunScheduledAggregation_EpochUsesRetentionStart(t *testing.T) {
repo := &dashboardAggregationRepoTestStub{watermark: time.Unix(0, 0).UTC()}
svc := &DashboardAggregationService{
repo: repo,
cfg: config.DashboardAggregationConfig{
Enabled: true,
IntervalSeconds: 60,
LookbackSeconds: 120,
Retention: config.DashboardAggregationRetentionConfig{
UsageLogsDays: 1,
HourlyDays: 1,
DailyDays: 1,
},
},
}
svc.runScheduledAggregation()
require.Equal(t, 1, repo.aggregateCalls)
require.False(t, repo.lastEnd.IsZero())
require.Equal(t, truncateToDayUTC(repo.lastEnd.AddDate(0, 0, -1)), repo.lastStart)
}
func TestDashboardAggregationService_CleanupRetentionFailure_DoesNotRecord(t *testing.T) {
repo := &dashboardAggregationRepoTestStub{cleanupAggregatesErr: errors.New("清理失败")}
svc := &DashboardAggregationService{
repo: repo,
cfg: config.DashboardAggregationConfig{
Retention: config.DashboardAggregationRetentionConfig{
UsageLogsDays: 1,
HourlyDays: 1,
DailyDays: 1,
},
},
}
svc.maybeCleanupRetention(context.Background(), time.Now().UTC())
require.Nil(t, svc.lastRetentionCleanup.Load())
}
func TestDashboardAggregationService_TriggerBackfill_TooLarge(t *testing.T) {
repo := &dashboardAggregationRepoTestStub{}
svc := &DashboardAggregationService{
repo: repo,
cfg: config.DashboardAggregationConfig{
BackfillEnabled: true,
BackfillMaxDays: 1,
},
}
start := time.Now().AddDate(0, 0, -3)
end := time.Now()
err := svc.TriggerBackfill(start, end)
require.ErrorIs(t, err, ErrDashboardBackfillTooLarge)
require.Equal(t, 0, repo.aggregateCalls)
}

View File

@@ -2,25 +2,119 @@ package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
)
// DashboardService provides aggregated statistics for admin dashboard.
type DashboardService struct {
usageRepo UsageLogRepository
const (
defaultDashboardStatsFreshTTL = 15 * time.Second
defaultDashboardStatsCacheTTL = 30 * time.Second
defaultDashboardStatsRefreshTimeout = 30 * time.Second
)
// ErrDashboardStatsCacheMiss 标记仪表盘缓存未命中。
var ErrDashboardStatsCacheMiss = errors.New("仪表盘缓存未命中")
// DashboardStatsCache 定义仪表盘统计缓存接口。
type DashboardStatsCache interface {
GetDashboardStats(ctx context.Context) (string, error)
SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error
DeleteDashboardStats(ctx context.Context) error
}
func NewDashboardService(usageRepo UsageLogRepository) *DashboardService {
type dashboardStatsRangeFetcher interface {
GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error)
}
type dashboardStatsCacheEntry struct {
Stats *usagestats.DashboardStats `json:"stats"`
UpdatedAt int64 `json:"updated_at"`
}
// DashboardService 提供管理员仪表盘统计服务。
type DashboardService struct {
usageRepo UsageLogRepository
aggRepo DashboardAggregationRepository
cache DashboardStatsCache
cacheFreshTTL time.Duration
cacheTTL time.Duration
refreshTimeout time.Duration
refreshing int32
aggEnabled bool
aggInterval time.Duration
aggLookback time.Duration
aggUsageDays int
}
func NewDashboardService(usageRepo UsageLogRepository, aggRepo DashboardAggregationRepository, cache DashboardStatsCache, cfg *config.Config) *DashboardService {
freshTTL := defaultDashboardStatsFreshTTL
cacheTTL := defaultDashboardStatsCacheTTL
refreshTimeout := defaultDashboardStatsRefreshTimeout
aggEnabled := true
aggInterval := time.Minute
aggLookback := 2 * time.Minute
aggUsageDays := 90
if cfg != nil {
if !cfg.Dashboard.Enabled {
cache = nil
}
if cfg.Dashboard.StatsFreshTTLSeconds > 0 {
freshTTL = time.Duration(cfg.Dashboard.StatsFreshTTLSeconds) * time.Second
}
if cfg.Dashboard.StatsTTLSeconds > 0 {
cacheTTL = time.Duration(cfg.Dashboard.StatsTTLSeconds) * time.Second
}
if cfg.Dashboard.StatsRefreshTimeoutSeconds > 0 {
refreshTimeout = time.Duration(cfg.Dashboard.StatsRefreshTimeoutSeconds) * time.Second
}
aggEnabled = cfg.DashboardAgg.Enabled
if cfg.DashboardAgg.IntervalSeconds > 0 {
aggInterval = time.Duration(cfg.DashboardAgg.IntervalSeconds) * time.Second
}
if cfg.DashboardAgg.LookbackSeconds > 0 {
aggLookback = time.Duration(cfg.DashboardAgg.LookbackSeconds) * time.Second
}
if cfg.DashboardAgg.Retention.UsageLogsDays > 0 {
aggUsageDays = cfg.DashboardAgg.Retention.UsageLogsDays
}
}
return &DashboardService{
usageRepo: usageRepo,
usageRepo: usageRepo,
aggRepo: aggRepo,
cache: cache,
cacheFreshTTL: freshTTL,
cacheTTL: cacheTTL,
refreshTimeout: refreshTimeout,
aggEnabled: aggEnabled,
aggInterval: aggInterval,
aggLookback: aggLookback,
aggUsageDays: aggUsageDays,
}
}
func (s *DashboardService) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
stats, err := s.usageRepo.GetDashboardStats(ctx)
if s.cache != nil {
cached, fresh, err := s.getCachedDashboardStats(ctx)
if err == nil && cached != nil {
s.refreshAggregationStaleness(cached)
if !fresh {
s.refreshDashboardStatsAsync()
}
return cached, nil
}
if err != nil && !errors.Is(err, ErrDashboardStatsCacheMiss) {
log.Printf("[Dashboard] 仪表盘缓存读取失败: %v", err)
}
}
stats, err := s.refreshDashboardStats(ctx)
if err != nil {
return nil, fmt.Errorf("get dashboard stats: %w", err)
}
@@ -43,6 +137,169 @@ func (s *DashboardService) GetModelStatsWithFilters(ctx context.Context, startTi
return stats, nil
}
func (s *DashboardService) getCachedDashboardStats(ctx context.Context) (*usagestats.DashboardStats, bool, error) {
data, err := s.cache.GetDashboardStats(ctx)
if err != nil {
return nil, false, err
}
var entry dashboardStatsCacheEntry
if err := json.Unmarshal([]byte(data), &entry); err != nil {
s.evictDashboardStatsCache(err)
return nil, false, ErrDashboardStatsCacheMiss
}
if entry.Stats == nil {
s.evictDashboardStatsCache(errors.New("仪表盘缓存缺少统计数据"))
return nil, false, ErrDashboardStatsCacheMiss
}
age := time.Since(time.Unix(entry.UpdatedAt, 0))
return entry.Stats, age <= s.cacheFreshTTL, nil
}
func (s *DashboardService) refreshDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
stats, err := s.fetchDashboardStats(ctx)
if err != nil {
return nil, err
}
s.applyAggregationStatus(ctx, stats)
cacheCtx, cancel := s.cacheOperationContext()
defer cancel()
s.saveDashboardStatsCache(cacheCtx, stats)
return stats, nil
}
func (s *DashboardService) refreshDashboardStatsAsync() {
if s.cache == nil {
return
}
if !atomic.CompareAndSwapInt32(&s.refreshing, 0, 1) {
return
}
go func() {
defer atomic.StoreInt32(&s.refreshing, 0)
ctx, cancel := context.WithTimeout(context.Background(), s.refreshTimeout)
defer cancel()
stats, err := s.fetchDashboardStats(ctx)
if err != nil {
log.Printf("[Dashboard] 仪表盘缓存异步刷新失败: %v", err)
return
}
s.applyAggregationStatus(ctx, stats)
cacheCtx, cancel := s.cacheOperationContext()
defer cancel()
s.saveDashboardStatsCache(cacheCtx, stats)
}()
}
func (s *DashboardService) fetchDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
if !s.aggEnabled {
if fetcher, ok := s.usageRepo.(dashboardStatsRangeFetcher); ok {
now := time.Now().UTC()
start := truncateToDayUTC(now.AddDate(0, 0, -s.aggUsageDays))
return fetcher.GetDashboardStatsWithRange(ctx, start, now)
}
}
return s.usageRepo.GetDashboardStats(ctx)
}
func (s *DashboardService) saveDashboardStatsCache(ctx context.Context, stats *usagestats.DashboardStats) {
if s.cache == nil || stats == nil {
return
}
entry := dashboardStatsCacheEntry{
Stats: stats,
UpdatedAt: time.Now().Unix(),
}
data, err := json.Marshal(entry)
if err != nil {
log.Printf("[Dashboard] 仪表盘缓存序列化失败: %v", err)
return
}
if err := s.cache.SetDashboardStats(ctx, string(data), s.cacheTTL); err != nil {
log.Printf("[Dashboard] 仪表盘缓存写入失败: %v", err)
}
}
func (s *DashboardService) evictDashboardStatsCache(reason error) {
if s.cache == nil {
return
}
cacheCtx, cancel := s.cacheOperationContext()
defer cancel()
if err := s.cache.DeleteDashboardStats(cacheCtx); err != nil {
log.Printf("[Dashboard] 仪表盘缓存清理失败: %v", err)
}
if reason != nil {
log.Printf("[Dashboard] 仪表盘缓存异常,已清理: %v", reason)
}
}
func (s *DashboardService) cacheOperationContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), s.refreshTimeout)
}
func (s *DashboardService) applyAggregationStatus(ctx context.Context, stats *usagestats.DashboardStats) {
if stats == nil {
return
}
updatedAt := s.fetchAggregationUpdatedAt(ctx)
stats.StatsUpdatedAt = updatedAt.UTC().Format(time.RFC3339)
stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC())
}
func (s *DashboardService) refreshAggregationStaleness(stats *usagestats.DashboardStats) {
if stats == nil {
return
}
updatedAt := parseStatsUpdatedAt(stats.StatsUpdatedAt)
stats.StatsStale = s.isAggregationStale(updatedAt, time.Now().UTC())
}
func (s *DashboardService) fetchAggregationUpdatedAt(ctx context.Context) time.Time {
if s.aggRepo == nil {
return time.Unix(0, 0).UTC()
}
updatedAt, err := s.aggRepo.GetAggregationWatermark(ctx)
if err != nil {
log.Printf("[Dashboard] 读取聚合水位失败: %v", err)
return time.Unix(0, 0).UTC()
}
if updatedAt.IsZero() {
return time.Unix(0, 0).UTC()
}
return updatedAt.UTC()
}
func (s *DashboardService) isAggregationStale(updatedAt, now time.Time) bool {
if !s.aggEnabled {
return true
}
epoch := time.Unix(0, 0).UTC()
if !updatedAt.After(epoch) {
return true
}
threshold := s.aggInterval + s.aggLookback
return now.Sub(updatedAt) > threshold
}
func parseStatsUpdatedAt(raw string) time.Time {
if raw == "" {
return time.Unix(0, 0).UTC()
}
parsed, err := time.Parse(time.RFC3339, raw)
if err != nil {
return time.Unix(0, 0).UTC()
}
return parsed.UTC()
}
func (s *DashboardService) GetAPIKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.APIKeyUsageTrendPoint, error) {
trend, err := s.usageRepo.GetAPIKeyUsageTrend(ctx, startTime, endTime, granularity, limit)
if err != nil {

View File

@@ -0,0 +1,387 @@
package service
import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
"github.com/stretchr/testify/require"
)
type usageRepoStub struct {
UsageLogRepository
stats *usagestats.DashboardStats
rangeStats *usagestats.DashboardStats
err error
rangeErr error
calls int32
rangeCalls int32
rangeStart time.Time
rangeEnd time.Time
onCall chan struct{}
}
func (s *usageRepoStub) GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error) {
atomic.AddInt32(&s.calls, 1)
if s.onCall != nil {
select {
case s.onCall <- struct{}{}:
default:
}
}
if s.err != nil {
return nil, s.err
}
return s.stats, nil
}
func (s *usageRepoStub) GetDashboardStatsWithRange(ctx context.Context, start, end time.Time) (*usagestats.DashboardStats, error) {
atomic.AddInt32(&s.rangeCalls, 1)
s.rangeStart = start
s.rangeEnd = end
if s.rangeErr != nil {
return nil, s.rangeErr
}
if s.rangeStats != nil {
return s.rangeStats, nil
}
return s.stats, nil
}
type dashboardCacheStub struct {
get func(ctx context.Context) (string, error)
set func(ctx context.Context, data string, ttl time.Duration) error
del func(ctx context.Context) error
getCalls int32
setCalls int32
delCalls int32
lastSetMu sync.Mutex
lastSet string
}
func (c *dashboardCacheStub) GetDashboardStats(ctx context.Context) (string, error) {
atomic.AddInt32(&c.getCalls, 1)
if c.get != nil {
return c.get(ctx)
}
return "", ErrDashboardStatsCacheMiss
}
func (c *dashboardCacheStub) SetDashboardStats(ctx context.Context, data string, ttl time.Duration) error {
atomic.AddInt32(&c.setCalls, 1)
c.lastSetMu.Lock()
c.lastSet = data
c.lastSetMu.Unlock()
if c.set != nil {
return c.set(ctx, data, ttl)
}
return nil
}
func (c *dashboardCacheStub) DeleteDashboardStats(ctx context.Context) error {
atomic.AddInt32(&c.delCalls, 1)
if c.del != nil {
return c.del(ctx)
}
return nil
}
type dashboardAggregationRepoStub struct {
watermark time.Time
err error
}
func (s *dashboardAggregationRepoStub) AggregateRange(ctx context.Context, start, end time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
if s.err != nil {
return time.Time{}, s.err
}
return s.watermark, nil
}
func (s *dashboardAggregationRepoStub) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
return nil
}
func (s *dashboardAggregationRepoStub) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
return nil
}
func (c *dashboardCacheStub) readLastEntry(t *testing.T) dashboardStatsCacheEntry {
t.Helper()
c.lastSetMu.Lock()
data := c.lastSet
c.lastSetMu.Unlock()
var entry dashboardStatsCacheEntry
err := json.Unmarshal([]byte(data), &entry)
require.NoError(t, err)
return entry
}
func TestDashboardService_CacheHitFresh(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 10,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
entry := dashboardStatsCacheEntry{
Stats: stats,
UpdatedAt: time.Now().Unix(),
}
payload, err := json.Marshal(entry)
require.NoError(t, err)
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return string(payload), nil
},
}
repo := &usageRepoStub{
stats: &usagestats.DashboardStats{TotalUsers: 99},
}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: true},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, stats, got)
require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls))
require.Equal(t, int32(1), atomic.LoadInt32(&cache.getCalls))
require.Equal(t, int32(0), atomic.LoadInt32(&cache.setCalls))
}
func TestDashboardService_CacheMiss_StoresCache(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 7,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return "", ErrDashboardStatsCacheMiss
},
}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: true},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, stats, got)
require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls))
require.Equal(t, int32(1), atomic.LoadInt32(&cache.getCalls))
require.Equal(t, int32(1), atomic.LoadInt32(&cache.setCalls))
entry := cache.readLastEntry(t)
require.Equal(t, stats, entry.Stats)
require.WithinDuration(t, time.Now(), time.Unix(entry.UpdatedAt, 0), time.Second)
}
func TestDashboardService_CacheDisabled_SkipsCache(t *testing.T) {
stats := &usagestats.DashboardStats{
TotalUsers: 3,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return "", nil
},
}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: false},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, stats, got)
require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls))
require.Equal(t, int32(0), atomic.LoadInt32(&cache.getCalls))
require.Equal(t, int32(0), atomic.LoadInt32(&cache.setCalls))
}
func TestDashboardService_CacheHitStale_TriggersAsyncRefresh(t *testing.T) {
staleStats := &usagestats.DashboardStats{
TotalUsers: 11,
StatsUpdatedAt: time.Unix(0, 0).UTC().Format(time.RFC3339),
StatsStale: true,
}
entry := dashboardStatsCacheEntry{
Stats: staleStats,
UpdatedAt: time.Now().Add(-defaultDashboardStatsFreshTTL * 2).Unix(),
}
payload, err := json.Marshal(entry)
require.NoError(t, err)
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return string(payload), nil
},
}
refreshCh := make(chan struct{}, 1)
repo := &usageRepoStub{
stats: &usagestats.DashboardStats{TotalUsers: 22},
onCall: refreshCh,
}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: true},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, staleStats, got)
select {
case <-refreshCh:
case <-time.After(1 * time.Second):
t.Fatal("等待异步刷新超时")
}
require.Eventually(t, func() bool {
return atomic.LoadInt32(&cache.setCalls) >= 1
}, 1*time.Second, 10*time.Millisecond)
}
func TestDashboardService_CacheParseError_EvictsAndRefetches(t *testing.T) {
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return "not-json", nil
},
}
stats := &usagestats.DashboardStats{TotalUsers: 9}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: true},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, stats, got)
require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls))
require.Equal(t, int32(1), atomic.LoadInt32(&repo.calls))
}
func TestDashboardService_CacheParseError_RepoFailure(t *testing.T) {
cache := &dashboardCacheStub{
get: func(ctx context.Context) (string, error) {
return "not-json", nil
},
}
repo := &usageRepoStub{err: errors.New("db down")}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: true},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
},
}
svc := NewDashboardService(repo, aggRepo, cache, cfg)
_, err := svc.GetDashboardStats(context.Background())
require.Error(t, err)
require.Equal(t, int32(1), atomic.LoadInt32(&cache.delCalls))
}
func TestDashboardService_StatsUpdatedAtEpochWhenMissing(t *testing.T) {
stats := &usagestats.DashboardStats{}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: time.Unix(0, 0).UTC()}
cfg := &config.Config{Dashboard: config.DashboardCacheConfig{Enabled: false}}
svc := NewDashboardService(repo, aggRepo, nil, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, "1970-01-01T00:00:00Z", got.StatsUpdatedAt)
require.True(t, got.StatsStale)
}
func TestDashboardService_StatsStaleFalseWhenFresh(t *testing.T) {
aggNow := time.Now().UTC().Truncate(time.Second)
stats := &usagestats.DashboardStats{}
repo := &usageRepoStub{stats: stats}
aggRepo := &dashboardAggregationRepoStub{watermark: aggNow}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: false},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: true,
IntervalSeconds: 60,
LookbackSeconds: 120,
},
}
svc := NewDashboardService(repo, aggRepo, nil, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, aggNow.Format(time.RFC3339), got.StatsUpdatedAt)
require.False(t, got.StatsStale)
}
func TestDashboardService_AggDisabled_UsesUsageLogsFallback(t *testing.T) {
expected := &usagestats.DashboardStats{TotalUsers: 42}
repo := &usageRepoStub{
rangeStats: expected,
err: errors.New("should not call aggregated stats"),
}
cfg := &config.Config{
Dashboard: config.DashboardCacheConfig{Enabled: false},
DashboardAgg: config.DashboardAggregationConfig{
Enabled: false,
Retention: config.DashboardAggregationRetentionConfig{
UsageLogsDays: 7,
},
},
}
svc := NewDashboardService(repo, nil, nil, cfg)
got, err := svc.GetDashboardStats(context.Background())
require.NoError(t, err)
require.Equal(t, int64(42), got.TotalUsers)
require.Equal(t, int32(0), atomic.LoadInt32(&repo.calls))
require.Equal(t, int32(1), atomic.LoadInt32(&repo.rangeCalls))
require.False(t, repo.rangeEnd.IsZero())
require.Equal(t, truncateToDayUTC(repo.rangeEnd.AddDate(0, 0, -7)), repo.rangeStart)
}

View File

@@ -47,6 +47,13 @@ func ProvideTokenRefreshService(
return svc
}
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
svc.Start()
return svc
}
// ProvideAccountExpiryService creates and starts AccountExpiryService.
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
svc := NewAccountExpiryService(accountRepo, time.Minute)
@@ -126,6 +133,7 @@ var ProviderSet = wire.NewSet(
ProvideTokenRefreshService,
ProvideAccountExpiryService,
ProvideTimingWheelService,
ProvideDashboardAggregationService,
ProvideDeferredService,
NewAntigravityQuotaFetcher,
NewUserAttributeService,

View File

@@ -0,0 +1,77 @@
-- Usage dashboard aggregation tables (hourly/daily) + active-user dedup + watermark.
-- These tables support Admin Dashboard statistics without full-table scans on usage_logs.
-- Hourly aggregates (UTC buckets).
CREATE TABLE IF NOT EXISTS usage_dashboard_hourly (
bucket_start TIMESTAMPTZ PRIMARY KEY,
total_requests BIGINT NOT NULL DEFAULT 0,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_creation_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0,
actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0,
total_duration_ms BIGINT NOT NULL DEFAULT 0,
active_users BIGINT NOT NULL DEFAULT 0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_bucket_start
ON usage_dashboard_hourly (bucket_start DESC);
COMMENT ON TABLE usage_dashboard_hourly IS 'Pre-aggregated hourly usage metrics for admin dashboard (UTC buckets).';
COMMENT ON COLUMN usage_dashboard_hourly.bucket_start IS 'UTC start timestamp of the hour bucket.';
COMMENT ON COLUMN usage_dashboard_hourly.computed_at IS 'When the hourly row was last computed/refreshed.';
-- Daily aggregates (UTC dates).
CREATE TABLE IF NOT EXISTS usage_dashboard_daily (
bucket_date DATE PRIMARY KEY,
total_requests BIGINT NOT NULL DEFAULT 0,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_creation_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
total_cost DECIMAL(20, 10) NOT NULL DEFAULT 0,
actual_cost DECIMAL(20, 10) NOT NULL DEFAULT 0,
total_duration_ms BIGINT NOT NULL DEFAULT 0,
active_users BIGINT NOT NULL DEFAULT 0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_bucket_date
ON usage_dashboard_daily (bucket_date DESC);
COMMENT ON TABLE usage_dashboard_daily IS 'Pre-aggregated daily usage metrics for admin dashboard (UTC dates).';
COMMENT ON COLUMN usage_dashboard_daily.bucket_date IS 'UTC date of the day bucket.';
COMMENT ON COLUMN usage_dashboard_daily.computed_at IS 'When the daily row was last computed/refreshed.';
-- Hourly active user dedup table.
CREATE TABLE IF NOT EXISTS usage_dashboard_hourly_users (
bucket_start TIMESTAMPTZ NOT NULL,
user_id BIGINT NOT NULL,
PRIMARY KEY (bucket_start, user_id)
);
CREATE INDEX IF NOT EXISTS idx_usage_dashboard_hourly_users_bucket_start
ON usage_dashboard_hourly_users (bucket_start);
-- Daily active user dedup table.
CREATE TABLE IF NOT EXISTS usage_dashboard_daily_users (
bucket_date DATE NOT NULL,
user_id BIGINT NOT NULL,
PRIMARY KEY (bucket_date, user_id)
);
CREATE INDEX IF NOT EXISTS idx_usage_dashboard_daily_users_bucket_date
ON usage_dashboard_daily_users (bucket_date);
-- Aggregation watermark table (single row).
CREATE TABLE IF NOT EXISTS usage_dashboard_aggregation_watermark (
id INT PRIMARY KEY,
last_aggregated_at TIMESTAMPTZ NOT NULL DEFAULT TIMESTAMPTZ '1970-01-01 00:00:00+00',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO usage_dashboard_aggregation_watermark (id)
VALUES (1)
ON CONFLICT (id) DO NOTHING;

View File

@@ -0,0 +1,54 @@
-- usage_logs monthly partition bootstrap.
-- Only converts to partitioned table when usage_logs is empty.
-- Existing installations with data require a manual migration plan.
DO $$
DECLARE
is_partitioned BOOLEAN := FALSE;
has_data BOOLEAN := FALSE;
month_start DATE;
prev_month DATE;
next_month DATE;
BEGIN
SELECT EXISTS(
SELECT 1
FROM pg_partitioned_table pt
JOIN pg_class c ON c.oid = pt.partrelid
WHERE c.relname = 'usage_logs'
) INTO is_partitioned;
IF NOT is_partitioned THEN
SELECT EXISTS(SELECT 1 FROM usage_logs LIMIT 1) INTO has_data;
IF NOT has_data THEN
EXECUTE 'ALTER TABLE usage_logs PARTITION BY RANGE (created_at)';
is_partitioned := TRUE;
END IF;
END IF;
IF is_partitioned THEN
month_start := date_trunc('month', now() AT TIME ZONE 'UTC')::date;
prev_month := (month_start - INTERVAL '1 month')::date;
next_month := (month_start + INTERVAL '1 month')::date;
EXECUTE format(
'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)',
to_char(prev_month, 'YYYYMM'),
prev_month,
month_start
);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)',
to_char(month_start, 'YYYYMM'),
month_start,
next_month
);
EXECUTE format(
'CREATE TABLE IF NOT EXISTS usage_logs_%s PARTITION OF usage_logs FOR VALUES FROM (%L) TO (%L)',
to_char(next_month, 'YYYYMM'),
next_month,
(next_month + INTERVAL '1 month')::date
);
END IF;
END $$;

View File

@@ -194,6 +194,63 @@ api_key_auth_cache:
# 缓存未命中时启用 singleflight 合并回源
singleflight: true
# =============================================================================
# Dashboard Cache Configuration
# 仪表盘缓存配置
# =============================================================================
dashboard_cache:
# Enable dashboard cache
# 启用仪表盘缓存
enabled: true
# Redis key prefix for multi-environment isolation
# Redis key 前缀,用于多环境隔离
key_prefix: "sub2api:"
# Fresh TTL (seconds); within this window cached stats are considered fresh
# 新鲜阈值(秒);命中后处于该窗口视为新鲜数据
stats_fresh_ttl_seconds: 15
# Cache TTL (seconds) stored in Redis
# Redis 缓存 TTL
stats_ttl_seconds: 30
# Async refresh timeout (seconds)
# 异步刷新超时(秒)
stats_refresh_timeout_seconds: 30
# =============================================================================
# Dashboard Aggregation Configuration
# 仪表盘预聚合配置(重启生效)
# =============================================================================
dashboard_aggregation:
# Enable aggregation job
# 启用聚合作业
enabled: true
# Refresh interval (seconds)
# 刷新间隔(秒)
interval_seconds: 60
# Lookback window (seconds) for late-arriving data
# 回看窗口(秒),处理迟到数据
lookback_seconds: 120
# Allow manual backfill
# 允许手动回填
backfill_enabled: false
# Backfill max range (days)
# 回填最大跨度(天)
backfill_max_days: 31
# Recompute recent N days on startup
# 启动时重算最近 N 天
recompute_days: 2
# Retention windows (days)
# 保留窗口(天)
retention:
# Raw usage_logs retention
# 原始 usage_logs 保留天数
usage_logs_days: 90
# Hourly aggregation retention
# 小时聚合保留天数
hourly_days: 180
# Daily aggregation retention
# 日聚合保留天数
daily_days: 730
# =============================================================================
# Concurrency Wait Configuration
# 并发等待配置

View File

@@ -69,6 +69,33 @@ JWT_EXPIRE_HOUR=24
# Leave unset to use default ./config.yaml
#CONFIG_FILE=./config.yaml
# -----------------------------------------------------------------------------
# Dashboard Aggregation (Optional)
# -----------------------------------------------------------------------------
# Enable aggregation job
# 启用仪表盘预聚合
DASHBOARD_AGGREGATION_ENABLED=true
# Refresh interval (seconds)
# 刷新间隔(秒)
DASHBOARD_AGGREGATION_INTERVAL_SECONDS=60
# Lookback window (seconds)
# 回看窗口(秒)
DASHBOARD_AGGREGATION_LOOKBACK_SECONDS=120
# Allow manual backfill
# 允许手动回填
DASHBOARD_AGGREGATION_BACKFILL_ENABLED=false
# Backfill max range (days)
# 回填最大跨度(天)
DASHBOARD_AGGREGATION_BACKFILL_MAX_DAYS=31
# Recompute recent N days on startup
# 启动时重算最近 N 天
DASHBOARD_AGGREGATION_RECOMPUTE_DAYS=2
# Retention windows (days)
# 保留窗口(天)
DASHBOARD_AGGREGATION_RETENTION_USAGE_LOGS_DAYS=90
DASHBOARD_AGGREGATION_RETENTION_HOURLY_DAYS=180
DASHBOARD_AGGREGATION_RETENTION_DAILY_DAYS=730
# -----------------------------------------------------------------------------
# Security Configuration
# -----------------------------------------------------------------------------

View File

@@ -194,6 +194,63 @@ api_key_auth_cache:
# 缓存未命中时启用 singleflight 合并回源
singleflight: true
# =============================================================================
# Dashboard Cache Configuration
# 仪表盘缓存配置
# =============================================================================
dashboard_cache:
# Enable dashboard cache
# 启用仪表盘缓存
enabled: true
# Redis key prefix for multi-environment isolation
# Redis key 前缀,用于多环境隔离
key_prefix: "sub2api:"
# Fresh TTL (seconds); within this window cached stats are considered fresh
# 新鲜阈值(秒);命中后处于该窗口视为新鲜数据
stats_fresh_ttl_seconds: 15
# Cache TTL (seconds) stored in Redis
# Redis 缓存 TTL
stats_ttl_seconds: 30
# Async refresh timeout (seconds)
# 异步刷新超时(秒)
stats_refresh_timeout_seconds: 30
# =============================================================================
# Dashboard Aggregation Configuration
# 仪表盘预聚合配置(重启生效)
# =============================================================================
dashboard_aggregation:
# Enable aggregation job
# 启用聚合作业
enabled: true
# Refresh interval (seconds)
# 刷新间隔(秒)
interval_seconds: 60
# Lookback window (seconds) for late-arriving data
# 回看窗口(秒),处理迟到数据
lookback_seconds: 120
# Allow manual backfill
# 允许手动回填
backfill_enabled: false
# Backfill max range (days)
# 回填最大跨度(天)
backfill_max_days: 31
# Recompute recent N days on startup
# 启动时重算最近 N 天
recompute_days: 2
# Retention windows (days)
# 保留窗口(天)
retention:
# Raw usage_logs retention
# 原始 usage_logs 保留天数
usage_logs_days: 90
# Hourly aggregation retention
# 小时聚合保留天数
hourly_days: 180
# Daily aggregation retention
# 日聚合保留天数
daily_days: 730
# =============================================================================
# Concurrency Wait Configuration
# 并发等待配置

View File

@@ -652,6 +652,9 @@ export interface DashboardStats {
total_users: number
today_new_users: number // 今日新增用户数
active_users: number // 今日有请求的用户数
hourly_active_users: number // 当前小时活跃用户数UTC
stats_updated_at: string // 统计更新时间UTC RFC3339
stats_stale: boolean // 统计是否过期
// API Key 统计
total_api_keys: number