fix(dashboard): 修复预聚合表使用UTC时区导致今日统计不准确的问题
将 dashboard_aggregation_repo.go 和 usage_log_repo.go 中的时区处理 从 UTC 改为使用服务器配置时区(默认 Asia/Shanghai),确保"今日" 统计数据与用户预期一致。 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
|
||||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
)
|
)
|
||||||
@@ -41,21 +42,22 @@ func isPostgresDriver(db *sql.DB) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error {
|
func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error {
|
||||||
startUTC := start.UTC()
|
loc := timezone.Location()
|
||||||
endUTC := end.UTC()
|
startLocal := start.In(loc)
|
||||||
if !endUTC.After(startUTC) {
|
endLocal := end.In(loc)
|
||||||
|
if !endLocal.After(startLocal) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
hourStart := startUTC.Truncate(time.Hour)
|
hourStart := startLocal.Truncate(time.Hour)
|
||||||
hourEnd := endUTC.Truncate(time.Hour)
|
hourEnd := endLocal.Truncate(time.Hour)
|
||||||
if endUTC.After(hourEnd) {
|
if endLocal.After(hourEnd) {
|
||||||
hourEnd = hourEnd.Add(time.Hour)
|
hourEnd = hourEnd.Add(time.Hour)
|
||||||
}
|
}
|
||||||
|
|
||||||
dayStart := truncateToDayUTC(startUTC)
|
dayStart := truncateToDay(startLocal)
|
||||||
dayEnd := truncateToDayUTC(endUTC)
|
dayEnd := truncateToDay(endLocal)
|
||||||
if endUTC.After(dayEnd) {
|
if endLocal.After(dayEnd) {
|
||||||
dayEnd = dayEnd.Add(24 * time.Hour)
|
dayEnd = dayEnd.Add(24 * time.Hour)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,38 +148,41 @@ func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error {
|
func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error {
|
||||||
|
tzName := timezone.Name()
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id)
|
INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id)
|
||||||
SELECT DISTINCT
|
SELECT DISTINCT
|
||||||
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
|
date_trunc('hour', created_at AT TIME ZONE $3) AT TIME ZONE $3 AS bucket_start,
|
||||||
user_id
|
user_id
|
||||||
FROM usage_logs
|
FROM usage_logs
|
||||||
WHERE created_at >= $1 AND created_at < $2
|
WHERE created_at >= $1 AND created_at < $2
|
||||||
ON CONFLICT DO NOTHING
|
ON CONFLICT DO NOTHING
|
||||||
`
|
`
|
||||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error {
|
func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error {
|
||||||
|
tzName := timezone.Name()
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
|
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
|
||||||
SELECT DISTINCT
|
SELECT DISTINCT
|
||||||
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
|
(bucket_start AT TIME ZONE $3)::date AS bucket_date,
|
||||||
user_id
|
user_id
|
||||||
FROM usage_dashboard_hourly_users
|
FROM usage_dashboard_hourly_users
|
||||||
WHERE bucket_start >= $1 AND bucket_start < $2
|
WHERE bucket_start >= $1 AND bucket_start < $2
|
||||||
ON CONFLICT DO NOTHING
|
ON CONFLICT DO NOTHING
|
||||||
`
|
`
|
||||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error {
|
func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error {
|
||||||
|
tzName := timezone.Name()
|
||||||
query := `
|
query := `
|
||||||
WITH hourly AS (
|
WITH hourly AS (
|
||||||
SELECT
|
SELECT
|
||||||
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
|
date_trunc('hour', created_at AT TIME ZONE $3) AT TIME ZONE $3 AS bucket_start,
|
||||||
COUNT(*) AS total_requests,
|
COUNT(*) AS total_requests,
|
||||||
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
||||||
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
||||||
@@ -236,15 +241,16 @@ func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Cont
|
|||||||
active_users = EXCLUDED.active_users,
|
active_users = EXCLUDED.active_users,
|
||||||
computed_at = EXCLUDED.computed_at
|
computed_at = EXCLUDED.computed_at
|
||||||
`
|
`
|
||||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error {
|
func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error {
|
||||||
|
tzName := timezone.Name()
|
||||||
query := `
|
query := `
|
||||||
WITH daily AS (
|
WITH daily AS (
|
||||||
SELECT
|
SELECT
|
||||||
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
|
(bucket_start AT TIME ZONE $5)::date AS bucket_date,
|
||||||
COALESCE(SUM(total_requests), 0) AS total_requests,
|
COALESCE(SUM(total_requests), 0) AS total_requests,
|
||||||
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
||||||
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
||||||
@@ -255,7 +261,7 @@ func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Conte
|
|||||||
COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms
|
COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms
|
||||||
FROM usage_dashboard_hourly
|
FROM usage_dashboard_hourly
|
||||||
WHERE bucket_start >= $1 AND bucket_start < $2
|
WHERE bucket_start >= $1 AND bucket_start < $2
|
||||||
GROUP BY (bucket_start AT TIME ZONE 'UTC')::date
|
GROUP BY (bucket_start AT TIME ZONE $5)::date
|
||||||
),
|
),
|
||||||
user_counts AS (
|
user_counts AS (
|
||||||
SELECT bucket_date, COUNT(*) AS active_users
|
SELECT bucket_date, COUNT(*) AS active_users
|
||||||
@@ -303,7 +309,7 @@ func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Conte
|
|||||||
active_users = EXCLUDED.active_users,
|
active_users = EXCLUDED.active_users,
|
||||||
computed_at = EXCLUDED.computed_at
|
computed_at = EXCLUDED.computed_at
|
||||||
`
|
`
|
||||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC())
|
_, err := r.sql.ExecContext(ctx, query, start, end, start, end, tzName)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -376,9 +382,8 @@ func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Co
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func truncateToDayUTC(t time.Time) time.Time {
|
func truncateToDay(t time.Time) time.Time {
|
||||||
t = t.UTC()
|
return timezone.StartOfDay(t)
|
||||||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func truncateToMonthUTC(t time.Time) time.Time {
|
func truncateToMonthUTC(t time.Time) time.Time {
|
||||||
|
|||||||
@@ -455,7 +455,7 @@ func (r *usageLogRepository) fillDashboardUsageStatsAggregated(ctx context.Conte
|
|||||||
FROM usage_dashboard_hourly
|
FROM usage_dashboard_hourly
|
||||||
WHERE bucket_start = $1
|
WHERE bucket_start = $1
|
||||||
`
|
`
|
||||||
hourStart := now.UTC().Truncate(time.Hour)
|
hourStart := now.In(timezone.Location()).Truncate(time.Hour)
|
||||||
if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil {
|
if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil {
|
||||||
if err != sql.ErrNoRows {
|
if err != sql.ErrNoRows {
|
||||||
return err
|
return err
|
||||||
|
|||||||
Reference in New Issue
Block a user