Files
sub2api/backend/internal/repository/dashboard_aggregation_repo.go

388 lines
12 KiB
Go

package repository
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/lib/pq"
)
type dashboardAggregationRepository struct {
sql sqlExecutor
}
// NewDashboardAggregationRepository 创建仪表盘预聚合仓储。
func NewDashboardAggregationRepository(sqlDB *sql.DB) service.DashboardAggregationRepository {
if sqlDB == nil {
return nil
}
if !isPostgresDriver(sqlDB) {
log.Printf("[DashboardAggregation] 检测到非 PostgreSQL 驱动,已自动禁用预聚合")
return nil
}
return newDashboardAggregationRepositoryWithSQL(sqlDB)
}
func newDashboardAggregationRepositoryWithSQL(sqlq sqlExecutor) *dashboardAggregationRepository {
return &dashboardAggregationRepository{sql: sqlq}
}
func isPostgresDriver(db *sql.DB) bool {
if db == nil {
return false
}
_, ok := db.Driver().(*pq.Driver)
return ok
}
func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error {
startUTC := start.UTC()
endUTC := end.UTC()
if !endUTC.After(startUTC) {
return nil
}
hourStart := startUTC.Truncate(time.Hour)
hourEnd := endUTC.Truncate(time.Hour)
if endUTC.After(hourEnd) {
hourEnd = hourEnd.Add(time.Hour)
}
dayStart := truncateToDayUTC(startUTC)
dayEnd := truncateToDayUTC(endUTC)
if endUTC.After(dayEnd) {
dayEnd = dayEnd.Add(24 * time.Hour)
}
// 以桶边界聚合,允许覆盖 end 所在桶的剩余区间。
if err := r.insertHourlyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.insertDailyActiveUsers(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.upsertHourlyAggregates(ctx, hourStart, hourEnd); err != nil {
return err
}
if err := r.upsertDailyAggregates(ctx, dayStart, dayEnd); err != nil {
return err
}
return nil
}
func (r *dashboardAggregationRepository) GetAggregationWatermark(ctx context.Context) (time.Time, error) {
var ts time.Time
query := "SELECT last_aggregated_at FROM usage_dashboard_aggregation_watermark WHERE id = 1"
if err := scanSingleRow(ctx, r.sql, query, nil, &ts); err != nil {
if err == sql.ErrNoRows {
return time.Unix(0, 0).UTC(), nil
}
return time.Time{}, err
}
return ts.UTC(), nil
}
func (r *dashboardAggregationRepository) UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error {
query := `
INSERT INTO usage_dashboard_aggregation_watermark (id, last_aggregated_at, updated_at)
VALUES (1, $1, NOW())
ON CONFLICT (id)
DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at, updated_at = EXCLUDED.updated_at
`
_, err := r.sql.ExecContext(ctx, query, aggregatedAt.UTC())
return err
}
func (r *dashboardAggregationRepository) CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error {
hourlyCutoffUTC := hourlyCutoff.UTC()
dailyCutoffUTC := dailyCutoff.UTC()
if _, err := r.sql.ExecContext(ctx, "DELETE FROM usage_dashboard_hourly WHERE bucket_start < $1", hourlyCutoffUTC); err != nil {
return err
}
if _, err := r.sql.ExecContext(ctx, "DELETE FROM usage_dashboard_hourly_users WHERE bucket_start < $1", hourlyCutoffUTC); err != nil {
return err
}
if _, err := r.sql.ExecContext(ctx, "DELETE FROM usage_dashboard_daily WHERE bucket_date < $1::date", dailyCutoffUTC); err != nil {
return err
}
if _, err := r.sql.ExecContext(ctx, "DELETE FROM usage_dashboard_daily_users WHERE bucket_date < $1::date", dailyCutoffUTC); err != nil {
return err
}
return nil
}
func (r *dashboardAggregationRepository) CleanupUsageLogs(ctx context.Context, cutoff time.Time) error {
isPartitioned, err := r.isUsageLogsPartitioned(ctx)
if err != nil {
return err
}
if isPartitioned {
return r.dropUsageLogsPartitions(ctx, cutoff)
}
_, err = r.sql.ExecContext(ctx, "DELETE FROM usage_logs WHERE created_at < $1", cutoff.UTC())
return err
}
func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error {
isPartitioned, err := r.isUsageLogsPartitioned(ctx)
if err != nil || !isPartitioned {
return err
}
monthStart := truncateToMonthUTC(now)
prevMonth := monthStart.AddDate(0, -1, 0)
nextMonth := monthStart.AddDate(0, 1, 0)
for _, m := range []time.Time{prevMonth, monthStart, nextMonth} {
if err := r.createUsageLogsPartition(ctx, m); err != nil {
return err
}
}
return nil
}
func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error {
query := `
INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id)
SELECT DISTINCT
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
user_id
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
ON CONFLICT DO NOTHING
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error {
query := `
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
SELECT DISTINCT
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
user_id
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $1 AND bucket_start < $2
ON CONFLICT DO NOTHING
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error {
query := `
WITH hourly AS (
SELECT
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
COUNT(*) AS total_requests,
COALESCE(SUM(input_tokens), 0) AS input_tokens,
COALESCE(SUM(output_tokens), 0) AS output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens,
COALESCE(SUM(total_cost), 0) AS total_cost,
COALESCE(SUM(actual_cost), 0) AS actual_cost,
COALESCE(SUM(COALESCE(duration_ms, 0)), 0) AS total_duration_ms
FROM usage_logs
WHERE created_at >= $1 AND created_at < $2
GROUP BY 1
),
user_counts AS (
SELECT bucket_start, COUNT(*) AS active_users
FROM usage_dashboard_hourly_users
WHERE bucket_start >= $1 AND bucket_start < $2
GROUP BY bucket_start
)
INSERT INTO usage_dashboard_hourly (
bucket_start,
total_requests,
input_tokens,
output_tokens,
cache_creation_tokens,
cache_read_tokens,
total_cost,
actual_cost,
total_duration_ms,
active_users,
computed_at
)
SELECT
hourly.bucket_start,
hourly.total_requests,
hourly.input_tokens,
hourly.output_tokens,
hourly.cache_creation_tokens,
hourly.cache_read_tokens,
hourly.total_cost,
hourly.actual_cost,
hourly.total_duration_ms,
COALESCE(user_counts.active_users, 0) AS active_users,
NOW()
FROM hourly
LEFT JOIN user_counts ON user_counts.bucket_start = hourly.bucket_start
ON CONFLICT (bucket_start)
DO UPDATE SET
total_requests = EXCLUDED.total_requests,
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_creation_tokens = EXCLUDED.cache_creation_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
total_cost = EXCLUDED.total_cost,
actual_cost = EXCLUDED.actual_cost,
total_duration_ms = EXCLUDED.total_duration_ms,
active_users = EXCLUDED.active_users,
computed_at = EXCLUDED.computed_at
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error {
query := `
WITH daily AS (
SELECT
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
COALESCE(SUM(total_requests), 0) AS total_requests,
COALESCE(SUM(input_tokens), 0) AS input_tokens,
COALESCE(SUM(output_tokens), 0) AS output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) AS cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) AS cache_read_tokens,
COALESCE(SUM(total_cost), 0) AS total_cost,
COALESCE(SUM(actual_cost), 0) AS actual_cost,
COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms
FROM usage_dashboard_hourly
WHERE bucket_start >= $1 AND bucket_start < $2
GROUP BY (bucket_start AT TIME ZONE 'UTC')::date
),
user_counts AS (
SELECT bucket_date, COUNT(*) AS active_users
FROM usage_dashboard_daily_users
WHERE bucket_date >= $3::date AND bucket_date < $4::date
GROUP BY bucket_date
)
INSERT INTO usage_dashboard_daily (
bucket_date,
total_requests,
input_tokens,
output_tokens,
cache_creation_tokens,
cache_read_tokens,
total_cost,
actual_cost,
total_duration_ms,
active_users,
computed_at
)
SELECT
daily.bucket_date,
daily.total_requests,
daily.input_tokens,
daily.output_tokens,
daily.cache_creation_tokens,
daily.cache_read_tokens,
daily.total_cost,
daily.actual_cost,
daily.total_duration_ms,
COALESCE(user_counts.active_users, 0) AS active_users,
NOW()
FROM daily
LEFT JOIN user_counts ON user_counts.bucket_date = daily.bucket_date
ON CONFLICT (bucket_date)
DO UPDATE SET
total_requests = EXCLUDED.total_requests,
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_creation_tokens = EXCLUDED.cache_creation_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
total_cost = EXCLUDED.total_cost,
actual_cost = EXCLUDED.actual_cost,
total_duration_ms = EXCLUDED.total_duration_ms,
active_users = EXCLUDED.active_users,
computed_at = EXCLUDED.computed_at
`
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC())
return err
}
func (r *dashboardAggregationRepository) isUsageLogsPartitioned(ctx context.Context) (bool, error) {
query := `
SELECT EXISTS(
SELECT 1
FROM pg_partitioned_table pt
JOIN pg_class c ON c.oid = pt.partrelid
WHERE c.relname = 'usage_logs'
)
`
var partitioned bool
if err := scanSingleRow(ctx, r.sql, query, nil, &partitioned); err != nil {
return false, err
}
return partitioned, nil
}
func (r *dashboardAggregationRepository) dropUsageLogsPartitions(ctx context.Context, cutoff time.Time) error {
rows, err := r.sql.QueryContext(ctx, `
SELECT c.relname
FROM pg_inherits
JOIN pg_class c ON c.oid = pg_inherits.inhrelid
JOIN pg_class p ON p.oid = pg_inherits.inhparent
WHERE p.relname = 'usage_logs'
`)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
cutoffMonth := truncateToMonthUTC(cutoff)
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return err
}
if !strings.HasPrefix(name, "usage_logs_") {
continue
}
suffix := strings.TrimPrefix(name, "usage_logs_")
month, err := time.Parse("200601", suffix)
if err != nil {
continue
}
month = month.UTC()
if month.Before(cutoffMonth) {
if _, err := r.sql.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(name))); err != nil {
return err
}
}
}
return rows.Err()
}
func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Context, month time.Time) error {
monthStart := truncateToMonthUTC(month)
nextMonth := monthStart.AddDate(0, 1, 0)
name := fmt.Sprintf("usage_logs_%s", monthStart.Format("200601"))
query := fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF usage_logs FOR VALUES FROM (%s) TO (%s)",
pq.QuoteIdentifier(name),
pq.QuoteLiteral(monthStart.Format("2006-01-02")),
pq.QuoteLiteral(nextMonth.Format("2006-01-02")),
)
_, err := r.sql.ExecContext(ctx, query)
return err
}
func truncateToDayUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
}
func truncateToMonthUTC(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, time.UTC)
}