perf(admin-dashboard): accelerate trend load with pre-aggregation and async user trend
This commit is contained in:
@@ -1655,6 +1655,13 @@ func (r *usageLogRepository) GetBatchAPIKeyUsageStats(ctx context.Context, apiKe
|
||||
|
||||
// GetUsageTrendWithFilters returns usage trend data with optional filters
|
||||
func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, requestType *int16, stream *bool, billingType *int8) (results []TrendDataPoint, err error) {
|
||||
if shouldUsePreaggregatedTrend(granularity, userID, apiKeyID, accountID, groupID, model, requestType, stream, billingType) {
|
||||
aggregated, aggregatedErr := r.getUsageTrendFromAggregates(ctx, startTime, endTime, granularity)
|
||||
if aggregatedErr == nil && len(aggregated) > 0 {
|
||||
return aggregated, nil
|
||||
}
|
||||
}
|
||||
|
||||
dateFormat := safeDateFormat(granularity)
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
@@ -1719,6 +1726,78 @@ func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, start
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func shouldUsePreaggregatedTrend(granularity string, userID, apiKeyID, accountID, groupID int64, model string, requestType *int16, stream *bool, billingType *int8) bool {
|
||||
if granularity != "day" && granularity != "hour" {
|
||||
return false
|
||||
}
|
||||
return userID == 0 &&
|
||||
apiKeyID == 0 &&
|
||||
accountID == 0 &&
|
||||
groupID == 0 &&
|
||||
model == "" &&
|
||||
requestType == nil &&
|
||||
stream == nil &&
|
||||
billingType == nil
|
||||
}
|
||||
|
||||
func (r *usageLogRepository) getUsageTrendFromAggregates(ctx context.Context, startTime, endTime time.Time, granularity string) (results []TrendDataPoint, err error) {
|
||||
dateFormat := safeDateFormat(granularity)
|
||||
query := ""
|
||||
args := []any{startTime, endTime}
|
||||
|
||||
switch granularity {
|
||||
case "hour":
|
||||
query = fmt.Sprintf(`
|
||||
SELECT
|
||||
TO_CHAR(bucket_start, '%s') as date,
|
||||
total_requests as requests,
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
(cache_creation_tokens + cache_read_tokens) as cache_tokens,
|
||||
(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens) as total_tokens,
|
||||
total_cost as cost,
|
||||
actual_cost
|
||||
FROM usage_dashboard_hourly
|
||||
WHERE bucket_start >= $1 AND bucket_start < $2
|
||||
ORDER BY bucket_start ASC
|
||||
`, dateFormat)
|
||||
case "day":
|
||||
query = fmt.Sprintf(`
|
||||
SELECT
|
||||
TO_CHAR(bucket_date::timestamp, '%s') as date,
|
||||
total_requests as requests,
|
||||
input_tokens,
|
||||
output_tokens,
|
||||
(cache_creation_tokens + cache_read_tokens) as cache_tokens,
|
||||
(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens) as total_tokens,
|
||||
total_cost as cost,
|
||||
actual_cost
|
||||
FROM usage_dashboard_daily
|
||||
WHERE bucket_date >= $1::date AND bucket_date < $2::date
|
||||
ORDER BY bucket_date ASC
|
||||
`, dateFormat)
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
rows, err := r.sql.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if closeErr := rows.Close(); closeErr != nil && err == nil {
|
||||
err = closeErr
|
||||
results = nil
|
||||
}
|
||||
}()
|
||||
|
||||
results, err = scanTrendRows(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// GetModelStatsWithFilters returns model statistics with optional filters
|
||||
func (r *usageLogRepository) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, requestType *int16, stream *bool, billingType *int8) (results []ModelStat, err error) {
|
||||
actualCostExpr := "COALESCE(SUM(actual_cost), 0) as actual_cost"
|
||||
|
||||
Reference in New Issue
Block a user