Merge branch 'Wei-Shaw:main' into main
This commit is contained in:
@@ -80,6 +80,10 @@ func (r *accountRepository) Create(ctx context.Context, account *service.Account
|
||||
SetSchedulable(account.Schedulable).
|
||||
SetAutoPauseOnExpired(account.AutoPauseOnExpired)
|
||||
|
||||
if account.RateMultiplier != nil {
|
||||
builder.SetRateMultiplier(*account.RateMultiplier)
|
||||
}
|
||||
|
||||
if account.ProxyID != nil {
|
||||
builder.SetProxyID(*account.ProxyID)
|
||||
}
|
||||
@@ -291,6 +295,10 @@ func (r *accountRepository) Update(ctx context.Context, account *service.Account
|
||||
SetSchedulable(account.Schedulable).
|
||||
SetAutoPauseOnExpired(account.AutoPauseOnExpired)
|
||||
|
||||
if account.RateMultiplier != nil {
|
||||
builder.SetRateMultiplier(*account.RateMultiplier)
|
||||
}
|
||||
|
||||
if account.ProxyID != nil {
|
||||
builder.SetProxyID(*account.ProxyID)
|
||||
} else {
|
||||
@@ -999,6 +1007,11 @@ func (r *accountRepository) BulkUpdate(ctx context.Context, ids []int64, updates
|
||||
args = append(args, *updates.Priority)
|
||||
idx++
|
||||
}
|
||||
if updates.RateMultiplier != nil {
|
||||
setClauses = append(setClauses, "rate_multiplier = $"+itoa(idx))
|
||||
args = append(args, *updates.RateMultiplier)
|
||||
idx++
|
||||
}
|
||||
if updates.Status != nil {
|
||||
setClauses = append(setClauses, "status = $"+itoa(idx))
|
||||
args = append(args, *updates.Status)
|
||||
@@ -1347,6 +1360,8 @@ func accountEntityToService(m *dbent.Account) *service.Account {
|
||||
return nil
|
||||
}
|
||||
|
||||
rateMultiplier := m.RateMultiplier
|
||||
|
||||
return &service.Account{
|
||||
ID: m.ID,
|
||||
Name: m.Name,
|
||||
@@ -1358,6 +1373,7 @@ func accountEntityToService(m *dbent.Account) *service.Account {
|
||||
ProxyID: m.ProxyID,
|
||||
Concurrency: m.Concurrency,
|
||||
Priority: m.Priority,
|
||||
RateMultiplier: &rateMultiplier,
|
||||
Status: m.Status,
|
||||
ErrorMessage: derefString(m.ErrorMessage),
|
||||
LastUsedAt: m.LastUsedAt,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
@@ -41,21 +42,22 @@ func isPostgresDriver(db *sql.DB) bool {
|
||||
}
|
||||
|
||||
func (r *dashboardAggregationRepository) AggregateRange(ctx context.Context, start, end time.Time) error {
|
||||
startUTC := start.UTC()
|
||||
endUTC := end.UTC()
|
||||
if !endUTC.After(startUTC) {
|
||||
loc := timezone.Location()
|
||||
startLocal := start.In(loc)
|
||||
endLocal := end.In(loc)
|
||||
if !endLocal.After(startLocal) {
|
||||
return nil
|
||||
}
|
||||
|
||||
hourStart := startUTC.Truncate(time.Hour)
|
||||
hourEnd := endUTC.Truncate(time.Hour)
|
||||
if endUTC.After(hourEnd) {
|
||||
hourStart := startLocal.Truncate(time.Hour)
|
||||
hourEnd := endLocal.Truncate(time.Hour)
|
||||
if endLocal.After(hourEnd) {
|
||||
hourEnd = hourEnd.Add(time.Hour)
|
||||
}
|
||||
|
||||
dayStart := truncateToDayUTC(startUTC)
|
||||
dayEnd := truncateToDayUTC(endUTC)
|
||||
if endUTC.After(dayEnd) {
|
||||
dayStart := truncateToDay(startLocal)
|
||||
dayEnd := truncateToDay(endLocal)
|
||||
if endLocal.After(dayEnd) {
|
||||
dayEnd = dayEnd.Add(24 * time.Hour)
|
||||
}
|
||||
|
||||
@@ -146,38 +148,41 @@ func (r *dashboardAggregationRepository) EnsureUsageLogsPartitions(ctx context.C
|
||||
}
|
||||
|
||||
func (r *dashboardAggregationRepository) insertHourlyActiveUsers(ctx context.Context, start, end time.Time) error {
|
||||
tzName := timezone.Name()
|
||||
query := `
|
||||
INSERT INTO usage_dashboard_hourly_users (bucket_start, user_id)
|
||||
SELECT DISTINCT
|
||||
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
|
||||
date_trunc('hour', created_at AT TIME ZONE $3) AT TIME ZONE $3 AS bucket_start,
|
||||
user_id
|
||||
FROM usage_logs
|
||||
WHERE created_at >= $1 AND created_at < $2
|
||||
ON CONFLICT DO NOTHING
|
||||
`
|
||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
||||
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *dashboardAggregationRepository) insertDailyActiveUsers(ctx context.Context, start, end time.Time) error {
|
||||
tzName := timezone.Name()
|
||||
query := `
|
||||
INSERT INTO usage_dashboard_daily_users (bucket_date, user_id)
|
||||
SELECT DISTINCT
|
||||
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
|
||||
(bucket_start AT TIME ZONE $3)::date AS bucket_date,
|
||||
user_id
|
||||
FROM usage_dashboard_hourly_users
|
||||
WHERE bucket_start >= $1 AND bucket_start < $2
|
||||
ON CONFLICT DO NOTHING
|
||||
`
|
||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
||||
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Context, start, end time.Time) error {
|
||||
tzName := timezone.Name()
|
||||
query := `
|
||||
WITH hourly AS (
|
||||
SELECT
|
||||
date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start,
|
||||
date_trunc('hour', created_at AT TIME ZONE $3) AT TIME ZONE $3 AS bucket_start,
|
||||
COUNT(*) AS total_requests,
|
||||
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
||||
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
||||
@@ -236,15 +241,16 @@ func (r *dashboardAggregationRepository) upsertHourlyAggregates(ctx context.Cont
|
||||
active_users = EXCLUDED.active_users,
|
||||
computed_at = EXCLUDED.computed_at
|
||||
`
|
||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC())
|
||||
_, err := r.sql.ExecContext(ctx, query, start, end, tzName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Context, start, end time.Time) error {
|
||||
tzName := timezone.Name()
|
||||
query := `
|
||||
WITH daily AS (
|
||||
SELECT
|
||||
(bucket_start AT TIME ZONE 'UTC')::date AS bucket_date,
|
||||
(bucket_start AT TIME ZONE $5)::date AS bucket_date,
|
||||
COALESCE(SUM(total_requests), 0) AS total_requests,
|
||||
COALESCE(SUM(input_tokens), 0) AS input_tokens,
|
||||
COALESCE(SUM(output_tokens), 0) AS output_tokens,
|
||||
@@ -255,7 +261,7 @@ func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Conte
|
||||
COALESCE(SUM(total_duration_ms), 0) AS total_duration_ms
|
||||
FROM usage_dashboard_hourly
|
||||
WHERE bucket_start >= $1 AND bucket_start < $2
|
||||
GROUP BY (bucket_start AT TIME ZONE 'UTC')::date
|
||||
GROUP BY (bucket_start AT TIME ZONE $5)::date
|
||||
),
|
||||
user_counts AS (
|
||||
SELECT bucket_date, COUNT(*) AS active_users
|
||||
@@ -303,7 +309,7 @@ func (r *dashboardAggregationRepository) upsertDailyAggregates(ctx context.Conte
|
||||
active_users = EXCLUDED.active_users,
|
||||
computed_at = EXCLUDED.computed_at
|
||||
`
|
||||
_, err := r.sql.ExecContext(ctx, query, start.UTC(), end.UTC(), start.UTC(), end.UTC())
|
||||
_, err := r.sql.ExecContext(ctx, query, start, end, start, end, tzName)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -376,9 +382,8 @@ func (r *dashboardAggregationRepository) createUsageLogsPartition(ctx context.Co
|
||||
return err
|
||||
}
|
||||
|
||||
func truncateToDayUTC(t time.Time) time.Time {
|
||||
t = t.UTC()
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||||
func truncateToDay(t time.Time) time.Time {
|
||||
return timezone.StartOfDay(t)
|
||||
}
|
||||
|
||||
func truncateToMonthUTC(t time.Time) time.Time {
|
||||
|
||||
@@ -55,7 +55,6 @@ INSERT INTO ops_error_logs (
|
||||
upstream_error_message,
|
||||
upstream_error_detail,
|
||||
upstream_errors,
|
||||
duration_ms,
|
||||
time_to_first_token_ms,
|
||||
request_body,
|
||||
request_body_truncated,
|
||||
@@ -65,7 +64,7 @@ INSERT INTO ops_error_logs (
|
||||
retry_count,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34
|
||||
) RETURNING id`
|
||||
|
||||
var id int64
|
||||
@@ -98,7 +97,6 @@ INSERT INTO ops_error_logs (
|
||||
opsNullString(input.UpstreamErrorMessage),
|
||||
opsNullString(input.UpstreamErrorDetail),
|
||||
opsNullString(input.UpstreamErrorsJSON),
|
||||
opsNullInt(input.DurationMs),
|
||||
opsNullInt64(input.TimeToFirstTokenMs),
|
||||
opsNullString(input.RequestBodyJSON),
|
||||
input.RequestBodyTruncated,
|
||||
@@ -135,7 +133,7 @@ func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsEr
|
||||
}
|
||||
|
||||
where, args := buildOpsErrorLogsWhere(filter)
|
||||
countSQL := "SELECT COUNT(*) FROM ops_error_logs " + where
|
||||
countSQL := "SELECT COUNT(*) FROM ops_error_logs e " + where
|
||||
|
||||
var total int
|
||||
if err := r.db.QueryRowContext(ctx, countSQL, args...).Scan(&total); err != nil {
|
||||
@@ -146,28 +144,43 @@ func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsEr
|
||||
argsWithLimit := append(args, pageSize, offset)
|
||||
selectSQL := `
|
||||
SELECT
|
||||
id,
|
||||
created_at,
|
||||
error_phase,
|
||||
error_type,
|
||||
severity,
|
||||
COALESCE(upstream_status_code, status_code, 0),
|
||||
COALESCE(platform, ''),
|
||||
COALESCE(model, ''),
|
||||
duration_ms,
|
||||
COALESCE(client_request_id, ''),
|
||||
COALESCE(request_id, ''),
|
||||
COALESCE(error_message, ''),
|
||||
user_id,
|
||||
api_key_id,
|
||||
account_id,
|
||||
group_id,
|
||||
CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END,
|
||||
COALESCE(request_path, ''),
|
||||
stream
|
||||
FROM ops_error_logs
|
||||
e.id,
|
||||
e.created_at,
|
||||
e.error_phase,
|
||||
e.error_type,
|
||||
COALESCE(e.error_owner, ''),
|
||||
COALESCE(e.error_source, ''),
|
||||
e.severity,
|
||||
COALESCE(e.upstream_status_code, e.status_code, 0),
|
||||
COALESCE(e.platform, ''),
|
||||
COALESCE(e.model, ''),
|
||||
COALESCE(e.is_retryable, false),
|
||||
COALESCE(e.retry_count, 0),
|
||||
COALESCE(e.resolved, false),
|
||||
e.resolved_at,
|
||||
e.resolved_by_user_id,
|
||||
COALESCE(u2.email, ''),
|
||||
e.resolved_retry_id,
|
||||
COALESCE(e.client_request_id, ''),
|
||||
COALESCE(e.request_id, ''),
|
||||
COALESCE(e.error_message, ''),
|
||||
e.user_id,
|
||||
COALESCE(u.email, ''),
|
||||
e.api_key_id,
|
||||
e.account_id,
|
||||
COALESCE(a.name, ''),
|
||||
e.group_id,
|
||||
COALESCE(g.name, ''),
|
||||
CASE WHEN e.client_ip IS NULL THEN NULL ELSE e.client_ip::text END,
|
||||
COALESCE(e.request_path, ''),
|
||||
e.stream
|
||||
FROM ops_error_logs e
|
||||
LEFT JOIN accounts a ON e.account_id = a.id
|
||||
LEFT JOIN groups g ON e.group_id = g.id
|
||||
LEFT JOIN users u ON e.user_id = u.id
|
||||
LEFT JOIN users u2 ON e.resolved_by_user_id = u2.id
|
||||
` + where + `
|
||||
ORDER BY created_at DESC
|
||||
ORDER BY e.created_at DESC
|
||||
LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, selectSQL, argsWithLimit...)
|
||||
@@ -179,39 +192,65 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
out := make([]*service.OpsErrorLog, 0, pageSize)
|
||||
for rows.Next() {
|
||||
var item service.OpsErrorLog
|
||||
var latency sql.NullInt64
|
||||
var statusCode sql.NullInt64
|
||||
var clientIP sql.NullString
|
||||
var userID sql.NullInt64
|
||||
var apiKeyID sql.NullInt64
|
||||
var accountID sql.NullInt64
|
||||
var accountName string
|
||||
var groupID sql.NullInt64
|
||||
var groupName string
|
||||
var userEmail string
|
||||
var resolvedAt sql.NullTime
|
||||
var resolvedBy sql.NullInt64
|
||||
var resolvedByName string
|
||||
var resolvedRetryID sql.NullInt64
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.CreatedAt,
|
||||
&item.Phase,
|
||||
&item.Type,
|
||||
&item.Owner,
|
||||
&item.Source,
|
||||
&item.Severity,
|
||||
&statusCode,
|
||||
&item.Platform,
|
||||
&item.Model,
|
||||
&latency,
|
||||
&item.IsRetryable,
|
||||
&item.RetryCount,
|
||||
&item.Resolved,
|
||||
&resolvedAt,
|
||||
&resolvedBy,
|
||||
&resolvedByName,
|
||||
&resolvedRetryID,
|
||||
&item.ClientRequestID,
|
||||
&item.RequestID,
|
||||
&item.Message,
|
||||
&userID,
|
||||
&userEmail,
|
||||
&apiKeyID,
|
||||
&accountID,
|
||||
&accountName,
|
||||
&groupID,
|
||||
&groupName,
|
||||
&clientIP,
|
||||
&item.RequestPath,
|
||||
&item.Stream,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if latency.Valid {
|
||||
v := int(latency.Int64)
|
||||
item.LatencyMs = &v
|
||||
if resolvedAt.Valid {
|
||||
t := resolvedAt.Time
|
||||
item.ResolvedAt = &t
|
||||
}
|
||||
if resolvedBy.Valid {
|
||||
v := resolvedBy.Int64
|
||||
item.ResolvedByUserID = &v
|
||||
}
|
||||
item.ResolvedByUserName = resolvedByName
|
||||
if resolvedRetryID.Valid {
|
||||
v := resolvedRetryID.Int64
|
||||
item.ResolvedRetryID = &v
|
||||
}
|
||||
item.StatusCode = int(statusCode.Int64)
|
||||
if clientIP.Valid {
|
||||
@@ -222,6 +261,7 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
v := userID.Int64
|
||||
item.UserID = &v
|
||||
}
|
||||
item.UserEmail = userEmail
|
||||
if apiKeyID.Valid {
|
||||
v := apiKeyID.Int64
|
||||
item.APIKeyID = &v
|
||||
@@ -230,10 +270,12 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
|
||||
v := accountID.Int64
|
||||
item.AccountID = &v
|
||||
}
|
||||
item.AccountName = accountName
|
||||
if groupID.Valid {
|
||||
v := groupID.Int64
|
||||
item.GroupID = &v
|
||||
}
|
||||
item.GroupName = groupName
|
||||
out = append(out, &item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
@@ -258,49 +300,64 @@ func (r *opsRepository) GetErrorLogByID(ctx context.Context, id int64) (*service
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
id,
|
||||
created_at,
|
||||
error_phase,
|
||||
error_type,
|
||||
severity,
|
||||
COALESCE(upstream_status_code, status_code, 0),
|
||||
COALESCE(platform, ''),
|
||||
COALESCE(model, ''),
|
||||
duration_ms,
|
||||
COALESCE(client_request_id, ''),
|
||||
COALESCE(request_id, ''),
|
||||
COALESCE(error_message, ''),
|
||||
COALESCE(error_body, ''),
|
||||
upstream_status_code,
|
||||
COALESCE(upstream_error_message, ''),
|
||||
COALESCE(upstream_error_detail, ''),
|
||||
COALESCE(upstream_errors::text, ''),
|
||||
is_business_limited,
|
||||
user_id,
|
||||
api_key_id,
|
||||
account_id,
|
||||
group_id,
|
||||
CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END,
|
||||
COALESCE(request_path, ''),
|
||||
stream,
|
||||
COALESCE(user_agent, ''),
|
||||
auth_latency_ms,
|
||||
routing_latency_ms,
|
||||
upstream_latency_ms,
|
||||
response_latency_ms,
|
||||
time_to_first_token_ms,
|
||||
COALESCE(request_body::text, ''),
|
||||
request_body_truncated,
|
||||
request_body_bytes,
|
||||
COALESCE(request_headers::text, '')
|
||||
FROM ops_error_logs
|
||||
WHERE id = $1
|
||||
e.id,
|
||||
e.created_at,
|
||||
e.error_phase,
|
||||
e.error_type,
|
||||
COALESCE(e.error_owner, ''),
|
||||
COALESCE(e.error_source, ''),
|
||||
e.severity,
|
||||
COALESCE(e.upstream_status_code, e.status_code, 0),
|
||||
COALESCE(e.platform, ''),
|
||||
COALESCE(e.model, ''),
|
||||
COALESCE(e.is_retryable, false),
|
||||
COALESCE(e.retry_count, 0),
|
||||
COALESCE(e.resolved, false),
|
||||
e.resolved_at,
|
||||
e.resolved_by_user_id,
|
||||
e.resolved_retry_id,
|
||||
COALESCE(e.client_request_id, ''),
|
||||
COALESCE(e.request_id, ''),
|
||||
COALESCE(e.error_message, ''),
|
||||
COALESCE(e.error_body, ''),
|
||||
e.upstream_status_code,
|
||||
COALESCE(e.upstream_error_message, ''),
|
||||
COALESCE(e.upstream_error_detail, ''),
|
||||
COALESCE(e.upstream_errors::text, ''),
|
||||
e.is_business_limited,
|
||||
e.user_id,
|
||||
COALESCE(u.email, ''),
|
||||
e.api_key_id,
|
||||
e.account_id,
|
||||
COALESCE(a.name, ''),
|
||||
e.group_id,
|
||||
COALESCE(g.name, ''),
|
||||
CASE WHEN e.client_ip IS NULL THEN NULL ELSE e.client_ip::text END,
|
||||
COALESCE(e.request_path, ''),
|
||||
e.stream,
|
||||
COALESCE(e.user_agent, ''),
|
||||
e.auth_latency_ms,
|
||||
e.routing_latency_ms,
|
||||
e.upstream_latency_ms,
|
||||
e.response_latency_ms,
|
||||
e.time_to_first_token_ms,
|
||||
COALESCE(e.request_body::text, ''),
|
||||
e.request_body_truncated,
|
||||
e.request_body_bytes,
|
||||
COALESCE(e.request_headers::text, '')
|
||||
FROM ops_error_logs e
|
||||
LEFT JOIN users u ON e.user_id = u.id
|
||||
LEFT JOIN accounts a ON e.account_id = a.id
|
||||
LEFT JOIN groups g ON e.group_id = g.id
|
||||
WHERE e.id = $1
|
||||
LIMIT 1`
|
||||
|
||||
var out service.OpsErrorLogDetail
|
||||
var latency sql.NullInt64
|
||||
var statusCode sql.NullInt64
|
||||
var upstreamStatusCode sql.NullInt64
|
||||
var resolvedAt sql.NullTime
|
||||
var resolvedBy sql.NullInt64
|
||||
var resolvedRetryID sql.NullInt64
|
||||
var clientIP sql.NullString
|
||||
var userID sql.NullInt64
|
||||
var apiKeyID sql.NullInt64
|
||||
@@ -318,11 +375,18 @@ LIMIT 1`
|
||||
&out.CreatedAt,
|
||||
&out.Phase,
|
||||
&out.Type,
|
||||
&out.Owner,
|
||||
&out.Source,
|
||||
&out.Severity,
|
||||
&statusCode,
|
||||
&out.Platform,
|
||||
&out.Model,
|
||||
&latency,
|
||||
&out.IsRetryable,
|
||||
&out.RetryCount,
|
||||
&out.Resolved,
|
||||
&resolvedAt,
|
||||
&resolvedBy,
|
||||
&resolvedRetryID,
|
||||
&out.ClientRequestID,
|
||||
&out.RequestID,
|
||||
&out.Message,
|
||||
@@ -333,9 +397,12 @@ LIMIT 1`
|
||||
&out.UpstreamErrors,
|
||||
&out.IsBusinessLimited,
|
||||
&userID,
|
||||
&out.UserEmail,
|
||||
&apiKeyID,
|
||||
&accountID,
|
||||
&out.AccountName,
|
||||
&groupID,
|
||||
&out.GroupName,
|
||||
&clientIP,
|
||||
&out.RequestPath,
|
||||
&out.Stream,
|
||||
@@ -355,9 +422,17 @@ LIMIT 1`
|
||||
}
|
||||
|
||||
out.StatusCode = int(statusCode.Int64)
|
||||
if latency.Valid {
|
||||
v := int(latency.Int64)
|
||||
out.LatencyMs = &v
|
||||
if resolvedAt.Valid {
|
||||
t := resolvedAt.Time
|
||||
out.ResolvedAt = &t
|
||||
}
|
||||
if resolvedBy.Valid {
|
||||
v := resolvedBy.Int64
|
||||
out.ResolvedByUserID = &v
|
||||
}
|
||||
if resolvedRetryID.Valid {
|
||||
v := resolvedRetryID.Int64
|
||||
out.ResolvedRetryID = &v
|
||||
}
|
||||
if clientIP.Valid {
|
||||
s := clientIP.String
|
||||
@@ -487,9 +562,15 @@ SET
|
||||
status = $2,
|
||||
finished_at = $3,
|
||||
duration_ms = $4,
|
||||
result_request_id = $5,
|
||||
result_error_id = $6,
|
||||
error_message = $7
|
||||
success = $5,
|
||||
http_status_code = $6,
|
||||
upstream_request_id = $7,
|
||||
used_account_id = $8,
|
||||
response_preview = $9,
|
||||
response_truncated = $10,
|
||||
result_request_id = $11,
|
||||
result_error_id = $12,
|
||||
error_message = $13
|
||||
WHERE id = $1`
|
||||
|
||||
_, err := r.db.ExecContext(
|
||||
@@ -499,8 +580,14 @@ WHERE id = $1`
|
||||
strings.TrimSpace(input.Status),
|
||||
nullTime(input.FinishedAt),
|
||||
input.DurationMs,
|
||||
nullBool(input.Success),
|
||||
nullInt(input.HTTPStatusCode),
|
||||
opsNullString(input.UpstreamRequestID),
|
||||
nullInt64(input.UsedAccountID),
|
||||
opsNullString(input.ResponsePreview),
|
||||
nullBool(input.ResponseTruncated),
|
||||
opsNullString(input.ResultRequestID),
|
||||
opsNullInt64(input.ResultErrorID),
|
||||
nullInt64(input.ResultErrorID),
|
||||
opsNullString(input.ErrorMessage),
|
||||
)
|
||||
return err
|
||||
@@ -526,6 +613,12 @@ SELECT
|
||||
started_at,
|
||||
finished_at,
|
||||
duration_ms,
|
||||
success,
|
||||
http_status_code,
|
||||
upstream_request_id,
|
||||
used_account_id,
|
||||
response_preview,
|
||||
response_truncated,
|
||||
result_request_id,
|
||||
result_error_id,
|
||||
error_message
|
||||
@@ -540,6 +633,12 @@ LIMIT 1`
|
||||
var startedAt sql.NullTime
|
||||
var finishedAt sql.NullTime
|
||||
var durationMs sql.NullInt64
|
||||
var success sql.NullBool
|
||||
var httpStatusCode sql.NullInt64
|
||||
var upstreamRequestID sql.NullString
|
||||
var usedAccountID sql.NullInt64
|
||||
var responsePreview sql.NullString
|
||||
var responseTruncated sql.NullBool
|
||||
var resultRequestID sql.NullString
|
||||
var resultErrorID sql.NullInt64
|
||||
var errorMessage sql.NullString
|
||||
@@ -555,6 +654,12 @@ LIMIT 1`
|
||||
&startedAt,
|
||||
&finishedAt,
|
||||
&durationMs,
|
||||
&success,
|
||||
&httpStatusCode,
|
||||
&upstreamRequestID,
|
||||
&usedAccountID,
|
||||
&responsePreview,
|
||||
&responseTruncated,
|
||||
&resultRequestID,
|
||||
&resultErrorID,
|
||||
&errorMessage,
|
||||
@@ -579,6 +684,30 @@ LIMIT 1`
|
||||
v := durationMs.Int64
|
||||
out.DurationMs = &v
|
||||
}
|
||||
if success.Valid {
|
||||
v := success.Bool
|
||||
out.Success = &v
|
||||
}
|
||||
if httpStatusCode.Valid {
|
||||
v := int(httpStatusCode.Int64)
|
||||
out.HTTPStatusCode = &v
|
||||
}
|
||||
if upstreamRequestID.Valid {
|
||||
s := upstreamRequestID.String
|
||||
out.UpstreamRequestID = &s
|
||||
}
|
||||
if usedAccountID.Valid {
|
||||
v := usedAccountID.Int64
|
||||
out.UsedAccountID = &v
|
||||
}
|
||||
if responsePreview.Valid {
|
||||
s := responsePreview.String
|
||||
out.ResponsePreview = &s
|
||||
}
|
||||
if responseTruncated.Valid {
|
||||
v := responseTruncated.Bool
|
||||
out.ResponseTruncated = &v
|
||||
}
|
||||
if resultRequestID.Valid {
|
||||
s := resultRequestID.String
|
||||
out.ResultRequestID = &s
|
||||
@@ -602,30 +731,234 @@ func nullTime(t time.Time) sql.NullTime {
|
||||
return sql.NullTime{Time: t, Valid: true}
|
||||
}
|
||||
|
||||
func nullBool(v *bool) sql.NullBool {
|
||||
if v == nil {
|
||||
return sql.NullBool{}
|
||||
}
|
||||
return sql.NullBool{Bool: *v, Valid: true}
|
||||
}
|
||||
|
||||
func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*service.OpsRetryAttempt, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if sourceErrorID <= 0 {
|
||||
return nil, fmt.Errorf("invalid source_error_id")
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
if limit > 200 {
|
||||
limit = 200
|
||||
}
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
r.id,
|
||||
r.created_at,
|
||||
COALESCE(r.requested_by_user_id, 0),
|
||||
r.source_error_id,
|
||||
COALESCE(r.mode, ''),
|
||||
r.pinned_account_id,
|
||||
COALESCE(pa.name, ''),
|
||||
COALESCE(r.status, ''),
|
||||
r.started_at,
|
||||
r.finished_at,
|
||||
r.duration_ms,
|
||||
r.success,
|
||||
r.http_status_code,
|
||||
r.upstream_request_id,
|
||||
r.used_account_id,
|
||||
COALESCE(ua.name, ''),
|
||||
r.response_preview,
|
||||
r.response_truncated,
|
||||
r.result_request_id,
|
||||
r.result_error_id,
|
||||
r.error_message
|
||||
FROM ops_retry_attempts r
|
||||
LEFT JOIN accounts pa ON r.pinned_account_id = pa.id
|
||||
LEFT JOIN accounts ua ON r.used_account_id = ua.id
|
||||
WHERE r.source_error_id = $1
|
||||
ORDER BY r.created_at DESC
|
||||
LIMIT $2`
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
out := make([]*service.OpsRetryAttempt, 0, 16)
|
||||
for rows.Next() {
|
||||
var item service.OpsRetryAttempt
|
||||
var pinnedAccountID sql.NullInt64
|
||||
var pinnedAccountName string
|
||||
var requestedBy sql.NullInt64
|
||||
var startedAt sql.NullTime
|
||||
var finishedAt sql.NullTime
|
||||
var durationMs sql.NullInt64
|
||||
var success sql.NullBool
|
||||
var httpStatusCode sql.NullInt64
|
||||
var upstreamRequestID sql.NullString
|
||||
var usedAccountID sql.NullInt64
|
||||
var usedAccountName string
|
||||
var responsePreview sql.NullString
|
||||
var responseTruncated sql.NullBool
|
||||
var resultRequestID sql.NullString
|
||||
var resultErrorID sql.NullInt64
|
||||
var errorMessage sql.NullString
|
||||
|
||||
if err := rows.Scan(
|
||||
&item.ID,
|
||||
&item.CreatedAt,
|
||||
&requestedBy,
|
||||
&item.SourceErrorID,
|
||||
&item.Mode,
|
||||
&pinnedAccountID,
|
||||
&pinnedAccountName,
|
||||
&item.Status,
|
||||
&startedAt,
|
||||
&finishedAt,
|
||||
&durationMs,
|
||||
&success,
|
||||
&httpStatusCode,
|
||||
&upstreamRequestID,
|
||||
&usedAccountID,
|
||||
&usedAccountName,
|
||||
&responsePreview,
|
||||
&responseTruncated,
|
||||
&resultRequestID,
|
||||
&resultErrorID,
|
||||
&errorMessage,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
item.RequestedByUserID = requestedBy.Int64
|
||||
if pinnedAccountID.Valid {
|
||||
v := pinnedAccountID.Int64
|
||||
item.PinnedAccountID = &v
|
||||
}
|
||||
item.PinnedAccountName = pinnedAccountName
|
||||
if startedAt.Valid {
|
||||
t := startedAt.Time
|
||||
item.StartedAt = &t
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
t := finishedAt.Time
|
||||
item.FinishedAt = &t
|
||||
}
|
||||
if durationMs.Valid {
|
||||
v := durationMs.Int64
|
||||
item.DurationMs = &v
|
||||
}
|
||||
if success.Valid {
|
||||
v := success.Bool
|
||||
item.Success = &v
|
||||
}
|
||||
if httpStatusCode.Valid {
|
||||
v := int(httpStatusCode.Int64)
|
||||
item.HTTPStatusCode = &v
|
||||
}
|
||||
if upstreamRequestID.Valid {
|
||||
item.UpstreamRequestID = &upstreamRequestID.String
|
||||
}
|
||||
if usedAccountID.Valid {
|
||||
v := usedAccountID.Int64
|
||||
item.UsedAccountID = &v
|
||||
}
|
||||
item.UsedAccountName = usedAccountName
|
||||
if responsePreview.Valid {
|
||||
item.ResponsePreview = &responsePreview.String
|
||||
}
|
||||
if responseTruncated.Valid {
|
||||
v := responseTruncated.Bool
|
||||
item.ResponseTruncated = &v
|
||||
}
|
||||
if resultRequestID.Valid {
|
||||
item.ResultRequestID = &resultRequestID.String
|
||||
}
|
||||
if resultErrorID.Valid {
|
||||
v := resultErrorID.Int64
|
||||
item.ResultErrorID = &v
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
item.ErrorMessage = &errorMessage.String
|
||||
}
|
||||
out = append(out, &item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
|
||||
if r == nil || r.db == nil {
|
||||
return fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if errorID <= 0 {
|
||||
return fmt.Errorf("invalid error id")
|
||||
}
|
||||
|
||||
q := `
|
||||
UPDATE ops_error_logs
|
||||
SET
|
||||
resolved = $2,
|
||||
resolved_at = $3,
|
||||
resolved_by_user_id = $4,
|
||||
resolved_retry_id = $5
|
||||
WHERE id = $1`
|
||||
|
||||
at := sql.NullTime{}
|
||||
if resolvedAt != nil && !resolvedAt.IsZero() {
|
||||
at = sql.NullTime{Time: resolvedAt.UTC(), Valid: true}
|
||||
} else if resolved {
|
||||
now := time.Now().UTC()
|
||||
at = sql.NullTime{Time: now, Valid: true}
|
||||
}
|
||||
|
||||
_, err := r.db.ExecContext(
|
||||
ctx,
|
||||
q,
|
||||
errorID,
|
||||
resolved,
|
||||
at,
|
||||
nullInt64(resolvedByUserID),
|
||||
nullInt64(resolvedRetryID),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
|
||||
clauses := make([]string, 0, 8)
|
||||
args := make([]any, 0, 8)
|
||||
clauses := make([]string, 0, 12)
|
||||
args := make([]any, 0, 12)
|
||||
clauses = append(clauses, "1=1")
|
||||
|
||||
phaseFilter := ""
|
||||
if filter != nil {
|
||||
phaseFilter = strings.TrimSpace(strings.ToLower(filter.Phase))
|
||||
}
|
||||
// ops_error_logs primarily stores client-visible error requests (status>=400),
|
||||
// ops_error_logs stores client-visible error requests (status>=400),
|
||||
// but we also persist "recovered" upstream errors (status<400) for upstream health visibility.
|
||||
// By default, keep list endpoints scoped to client errors unless explicitly filtering upstream phase.
|
||||
// If Resolved is not specified, do not filter by resolved state (backward-compatible).
|
||||
resolvedFilter := (*bool)(nil)
|
||||
if filter != nil {
|
||||
resolvedFilter = filter.Resolved
|
||||
}
|
||||
// Keep list endpoints scoped to client errors unless explicitly filtering upstream phase.
|
||||
if phaseFilter != "upstream" {
|
||||
clauses = append(clauses, "COALESCE(status_code, 0) >= 400")
|
||||
}
|
||||
|
||||
if filter.StartTime != nil && !filter.StartTime.IsZero() {
|
||||
args = append(args, filter.StartTime.UTC())
|
||||
clauses = append(clauses, "created_at >= $"+itoa(len(args)))
|
||||
clauses = append(clauses, "e.created_at >= $"+itoa(len(args)))
|
||||
}
|
||||
if filter.EndTime != nil && !filter.EndTime.IsZero() {
|
||||
args = append(args, filter.EndTime.UTC())
|
||||
// Keep time-window semantics consistent with other ops queries: [start, end)
|
||||
clauses = append(clauses, "created_at < $"+itoa(len(args)))
|
||||
clauses = append(clauses, "e.created_at < $"+itoa(len(args)))
|
||||
}
|
||||
if p := strings.TrimSpace(filter.Platform); p != "" {
|
||||
args = append(args, p)
|
||||
@@ -643,10 +976,59 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
|
||||
args = append(args, phase)
|
||||
clauses = append(clauses, "error_phase = $"+itoa(len(args)))
|
||||
}
|
||||
if filter != nil {
|
||||
if owner := strings.TrimSpace(strings.ToLower(filter.Owner)); owner != "" {
|
||||
args = append(args, owner)
|
||||
clauses = append(clauses, "LOWER(COALESCE(error_owner,'')) = $"+itoa(len(args)))
|
||||
}
|
||||
if source := strings.TrimSpace(strings.ToLower(filter.Source)); source != "" {
|
||||
args = append(args, source)
|
||||
clauses = append(clauses, "LOWER(COALESCE(error_source,'')) = $"+itoa(len(args)))
|
||||
}
|
||||
}
|
||||
if resolvedFilter != nil {
|
||||
args = append(args, *resolvedFilter)
|
||||
clauses = append(clauses, "COALESCE(resolved,false) = $"+itoa(len(args)))
|
||||
}
|
||||
|
||||
// View filter: errors vs excluded vs all.
|
||||
// Excluded = upstream 429/529 and business-limited (quota/concurrency/billing) errors.
|
||||
view := ""
|
||||
if filter != nil {
|
||||
view = strings.ToLower(strings.TrimSpace(filter.View))
|
||||
}
|
||||
switch view {
|
||||
case "", "errors":
|
||||
clauses = append(clauses, "COALESCE(is_business_limited,false) = false")
|
||||
clauses = append(clauses, "COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)")
|
||||
case "excluded":
|
||||
clauses = append(clauses, "(COALESCE(is_business_limited,false) = true OR COALESCE(upstream_status_code, status_code, 0) IN (429, 529))")
|
||||
case "all":
|
||||
// no-op
|
||||
default:
|
||||
// treat unknown as default 'errors'
|
||||
clauses = append(clauses, "COALESCE(is_business_limited,false) = false")
|
||||
clauses = append(clauses, "COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)")
|
||||
}
|
||||
if len(filter.StatusCodes) > 0 {
|
||||
args = append(args, pq.Array(filter.StatusCodes))
|
||||
clauses = append(clauses, "COALESCE(upstream_status_code, status_code, 0) = ANY($"+itoa(len(args))+")")
|
||||
} else if filter.StatusCodesOther {
|
||||
// "Other" means: status codes not in the common list.
|
||||
known := []int{400, 401, 403, 404, 409, 422, 429, 500, 502, 503, 504, 529}
|
||||
args = append(args, pq.Array(known))
|
||||
clauses = append(clauses, "NOT (COALESCE(upstream_status_code, status_code, 0) = ANY($"+itoa(len(args))+"))")
|
||||
}
|
||||
// Exact correlation keys (preferred for request↔upstream linkage).
|
||||
if rid := strings.TrimSpace(filter.RequestID); rid != "" {
|
||||
args = append(args, rid)
|
||||
clauses = append(clauses, "COALESCE(request_id,'') = $"+itoa(len(args)))
|
||||
}
|
||||
if crid := strings.TrimSpace(filter.ClientRequestID); crid != "" {
|
||||
args = append(args, crid)
|
||||
clauses = append(clauses, "COALESCE(client_request_id,'') = $"+itoa(len(args)))
|
||||
}
|
||||
|
||||
if q := strings.TrimSpace(filter.Query); q != "" {
|
||||
like := "%" + q + "%"
|
||||
args = append(args, like)
|
||||
@@ -654,6 +1036,13 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
|
||||
clauses = append(clauses, "(request_id ILIKE $"+n+" OR client_request_id ILIKE $"+n+" OR error_message ILIKE $"+n+")")
|
||||
}
|
||||
|
||||
if userQuery := strings.TrimSpace(filter.UserQuery); userQuery != "" {
|
||||
like := "%" + userQuery + "%"
|
||||
args = append(args, like)
|
||||
n := itoa(len(args))
|
||||
clauses = append(clauses, "u.email ILIKE $"+n)
|
||||
}
|
||||
|
||||
return "WHERE " + strings.Join(clauses, " AND "), args
|
||||
}
|
||||
|
||||
|
||||
@@ -354,7 +354,7 @@ SELECT
|
||||
created_at
|
||||
FROM ops_alert_events
|
||||
` + where + `
|
||||
ORDER BY fired_at DESC
|
||||
ORDER BY fired_at DESC, id DESC
|
||||
LIMIT ` + limitArg
|
||||
|
||||
rows, err := r.db.QueryContext(ctx, q, args...)
|
||||
@@ -413,6 +413,43 @@ LIMIT ` + limitArg
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) GetAlertEventByID(ctx context.Context, eventID int64) (*service.OpsAlertEvent, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if eventID <= 0 {
|
||||
return nil, fmt.Errorf("invalid event id")
|
||||
}
|
||||
|
||||
q := `
|
||||
SELECT
|
||||
id,
|
||||
COALESCE(rule_id, 0),
|
||||
COALESCE(severity, ''),
|
||||
COALESCE(status, ''),
|
||||
COALESCE(title, ''),
|
||||
COALESCE(description, ''),
|
||||
metric_value,
|
||||
threshold_value,
|
||||
dimensions,
|
||||
fired_at,
|
||||
resolved_at,
|
||||
email_sent,
|
||||
created_at
|
||||
FROM ops_alert_events
|
||||
WHERE id = $1`
|
||||
|
||||
row := r.db.QueryRowContext(ctx, q, eventID)
|
||||
ev, err := scanOpsAlertEvent(row)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return ev, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*service.OpsAlertEvent, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
@@ -591,6 +628,121 @@ type opsAlertEventRow interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
func (r *opsRepository) CreateAlertSilence(ctx context.Context, input *service.OpsAlertSilence) (*service.OpsAlertSilence, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return nil, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if input == nil {
|
||||
return nil, fmt.Errorf("nil input")
|
||||
}
|
||||
if input.RuleID <= 0 {
|
||||
return nil, fmt.Errorf("invalid rule_id")
|
||||
}
|
||||
platform := strings.TrimSpace(input.Platform)
|
||||
if platform == "" {
|
||||
return nil, fmt.Errorf("invalid platform")
|
||||
}
|
||||
if input.Until.IsZero() {
|
||||
return nil, fmt.Errorf("invalid until")
|
||||
}
|
||||
|
||||
q := `
|
||||
INSERT INTO ops_alert_silences (
|
||||
rule_id,
|
||||
platform,
|
||||
group_id,
|
||||
region,
|
||||
until,
|
||||
reason,
|
||||
created_by,
|
||||
created_at
|
||||
) VALUES (
|
||||
$1,$2,$3,$4,$5,$6,$7,NOW()
|
||||
)
|
||||
RETURNING id, rule_id, platform, group_id, region, until, COALESCE(reason,''), created_by, created_at`
|
||||
|
||||
row := r.db.QueryRowContext(
|
||||
ctx,
|
||||
q,
|
||||
input.RuleID,
|
||||
platform,
|
||||
opsNullInt64(input.GroupID),
|
||||
opsNullString(input.Region),
|
||||
input.Until,
|
||||
opsNullString(input.Reason),
|
||||
opsNullInt64(input.CreatedBy),
|
||||
)
|
||||
|
||||
var out service.OpsAlertSilence
|
||||
var groupID sql.NullInt64
|
||||
var region sql.NullString
|
||||
var createdBy sql.NullInt64
|
||||
if err := row.Scan(
|
||||
&out.ID,
|
||||
&out.RuleID,
|
||||
&out.Platform,
|
||||
&groupID,
|
||||
®ion,
|
||||
&out.Until,
|
||||
&out.Reason,
|
||||
&createdBy,
|
||||
&out.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if groupID.Valid {
|
||||
v := groupID.Int64
|
||||
out.GroupID = &v
|
||||
}
|
||||
if region.Valid {
|
||||
v := strings.TrimSpace(region.String)
|
||||
if v != "" {
|
||||
out.Region = &v
|
||||
}
|
||||
}
|
||||
if createdBy.Valid {
|
||||
v := createdBy.Int64
|
||||
out.CreatedBy = &v
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (r *opsRepository) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
|
||||
if r == nil || r.db == nil {
|
||||
return false, fmt.Errorf("nil ops repository")
|
||||
}
|
||||
if ruleID <= 0 {
|
||||
return false, fmt.Errorf("invalid rule id")
|
||||
}
|
||||
platform = strings.TrimSpace(platform)
|
||||
if platform == "" {
|
||||
return false, nil
|
||||
}
|
||||
if now.IsZero() {
|
||||
now = time.Now().UTC()
|
||||
}
|
||||
|
||||
q := `
|
||||
SELECT 1
|
||||
FROM ops_alert_silences
|
||||
WHERE rule_id = $1
|
||||
AND platform = $2
|
||||
AND (group_id IS NOT DISTINCT FROM $3)
|
||||
AND (region IS NOT DISTINCT FROM $4)
|
||||
AND until > $5
|
||||
LIMIT 1`
|
||||
|
||||
var dummy int
|
||||
err := r.db.QueryRowContext(ctx, q, ruleID, platform, opsNullInt64(groupID), opsNullString(region), now).Scan(&dummy)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func scanOpsAlertEvent(row opsAlertEventRow) (*service.OpsAlertEvent, error) {
|
||||
var ev service.OpsAlertEvent
|
||||
var metricValue sql.NullFloat64
|
||||
@@ -652,6 +804,10 @@ func buildOpsAlertEventsWhere(filter *service.OpsAlertEventFilter) (string, []an
|
||||
args = append(args, severity)
|
||||
clauses = append(clauses, "severity = $"+itoa(len(args)))
|
||||
}
|
||||
if filter.EmailSent != nil {
|
||||
args = append(args, *filter.EmailSent)
|
||||
clauses = append(clauses, "email_sent = $"+itoa(len(args)))
|
||||
}
|
||||
if filter.StartTime != nil && !filter.StartTime.IsZero() {
|
||||
args = append(args, *filter.StartTime)
|
||||
clauses = append(clauses, "fired_at >= $"+itoa(len(args)))
|
||||
@@ -661,6 +817,14 @@ func buildOpsAlertEventsWhere(filter *service.OpsAlertEventFilter) (string, []an
|
||||
clauses = append(clauses, "fired_at < $"+itoa(len(args)))
|
||||
}
|
||||
|
||||
// Cursor pagination (descending by fired_at, then id)
|
||||
if filter.BeforeFiredAt != nil && !filter.BeforeFiredAt.IsZero() && filter.BeforeID != nil && *filter.BeforeID > 0 {
|
||||
args = append(args, *filter.BeforeFiredAt)
|
||||
tsArg := "$" + itoa(len(args))
|
||||
args = append(args, *filter.BeforeID)
|
||||
idArg := "$" + itoa(len(args))
|
||||
clauses = append(clauses, fmt.Sprintf("(fired_at < %s OR (fired_at = %s AND id < %s))", tsArg, tsArg, idArg))
|
||||
}
|
||||
// Dimensions are stored in JSONB. We filter best-effort without requiring GIN indexes.
|
||||
if platform := strings.TrimSpace(filter.Platform); platform != "" {
|
||||
args = append(args, platform)
|
||||
|
||||
74
backend/internal/repository/proxy_latency_cache.go
Normal file
74
backend/internal/repository/proxy_latency_cache.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const proxyLatencyKeyPrefix = "proxy:latency:"
|
||||
|
||||
func proxyLatencyKey(proxyID int64) string {
|
||||
return fmt.Sprintf("%s%d", proxyLatencyKeyPrefix, proxyID)
|
||||
}
|
||||
|
||||
type proxyLatencyCache struct {
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewProxyLatencyCache(rdb *redis.Client) service.ProxyLatencyCache {
|
||||
return &proxyLatencyCache{rdb: rdb}
|
||||
}
|
||||
|
||||
func (c *proxyLatencyCache) GetProxyLatencies(ctx context.Context, proxyIDs []int64) (map[int64]*service.ProxyLatencyInfo, error) {
|
||||
results := make(map[int64]*service.ProxyLatencyInfo)
|
||||
if len(proxyIDs) == 0 {
|
||||
return results, nil
|
||||
}
|
||||
|
||||
keys := make([]string, 0, len(proxyIDs))
|
||||
for _, id := range proxyIDs {
|
||||
keys = append(keys, proxyLatencyKey(id))
|
||||
}
|
||||
|
||||
values, err := c.rdb.MGet(ctx, keys...).Result()
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
for i, raw := range values {
|
||||
if raw == nil {
|
||||
continue
|
||||
}
|
||||
var payload []byte
|
||||
switch v := raw.(type) {
|
||||
case string:
|
||||
payload = []byte(v)
|
||||
case []byte:
|
||||
payload = v
|
||||
default:
|
||||
continue
|
||||
}
|
||||
var info service.ProxyLatencyInfo
|
||||
if err := json.Unmarshal(payload, &info); err != nil {
|
||||
continue
|
||||
}
|
||||
results[proxyIDs[i]] = &info
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (c *proxyLatencyCache) SetProxyLatency(ctx context.Context, proxyID int64, info *service.ProxyLatencyInfo) error {
|
||||
if info == nil {
|
||||
return nil
|
||||
}
|
||||
payload, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.rdb.Set(ctx, proxyLatencyKey(proxyID), payload, 0).Err()
|
||||
}
|
||||
@@ -34,7 +34,10 @@ func NewProxyExitInfoProber(cfg *config.Config) service.ProxyExitInfoProber {
|
||||
}
|
||||
}
|
||||
|
||||
const defaultIPInfoURL = "https://ipinfo.io/json"
|
||||
const (
|
||||
defaultIPInfoURL = "https://ipinfo.io/json"
|
||||
defaultProxyProbeTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
type proxyProbeService struct {
|
||||
ipInfoURL string
|
||||
@@ -46,7 +49,7 @@ type proxyProbeService struct {
|
||||
func (s *proxyProbeService) ProbeProxy(ctx context.Context, proxyURL string) (*service.ProxyExitInfo, int64, error) {
|
||||
client, err := httpclient.GetClient(httpclient.Options{
|
||||
ProxyURL: proxyURL,
|
||||
Timeout: 15 * time.Second,
|
||||
Timeout: defaultProxyProbeTimeout,
|
||||
InsecureSkipVerify: s.insecureSkipVerify,
|
||||
ProxyStrict: true,
|
||||
ValidateResolvedIP: s.validateResolvedIP,
|
||||
|
||||
@@ -219,12 +219,54 @@ func (r *proxyRepository) ExistsByHostPortAuth(ctx context.Context, host string,
|
||||
// CountAccountsByProxyID returns the number of accounts using a specific proxy
|
||||
func (r *proxyRepository) CountAccountsByProxyID(ctx context.Context, proxyID int64) (int64, error) {
|
||||
var count int64
|
||||
if err := scanSingleRow(ctx, r.sql, "SELECT COUNT(*) FROM accounts WHERE proxy_id = $1", []any{proxyID}, &count); err != nil {
|
||||
if err := scanSingleRow(ctx, r.sql, "SELECT COUNT(*) FROM accounts WHERE proxy_id = $1 AND deleted_at IS NULL", []any{proxyID}, &count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (r *proxyRepository) ListAccountSummariesByProxyID(ctx context.Context, proxyID int64) ([]service.ProxyAccountSummary, error) {
|
||||
rows, err := r.sql.QueryContext(ctx, `
|
||||
SELECT id, name, platform, type, notes
|
||||
FROM accounts
|
||||
WHERE proxy_id = $1 AND deleted_at IS NULL
|
||||
ORDER BY id DESC
|
||||
`, proxyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
out := make([]service.ProxyAccountSummary, 0)
|
||||
for rows.Next() {
|
||||
var (
|
||||
id int64
|
||||
name string
|
||||
platform string
|
||||
accType string
|
||||
notes sql.NullString
|
||||
)
|
||||
if err := rows.Scan(&id, &name, &platform, &accType, ¬es); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var notesPtr *string
|
||||
if notes.Valid {
|
||||
notesPtr = ¬es.String
|
||||
}
|
||||
out = append(out, service.ProxyAccountSummary{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Platform: platform,
|
||||
Type: accType,
|
||||
Notes: notesPtr,
|
||||
})
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// GetAccountCountsForProxies returns a map of proxy ID to account count for all proxies
|
||||
func (r *proxyRepository) GetAccountCountsForProxies(ctx context.Context) (counts map[int64]int64, err error) {
|
||||
rows, err := r.sql.QueryContext(ctx, "SELECT proxy_id, COUNT(*) AS count FROM accounts WHERE proxy_id IS NOT NULL AND deleted_at IS NULL GROUP BY proxy_id")
|
||||
|
||||
@@ -27,7 +27,7 @@ func TestSchedulerSnapshotOutboxReplay(t *testing.T) {
|
||||
RunMode: config.RunModeStandard,
|
||||
Gateway: config.GatewayConfig{
|
||||
Scheduling: config.GatewaySchedulingConfig{
|
||||
OutboxPollIntervalSeconds: 1,
|
||||
OutboxPollIntervalSeconds: 1,
|
||||
FullRebuildIntervalSeconds: 0,
|
||||
DbFallbackEnabled: true,
|
||||
},
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
const usageLogSelectColumns = "id, user_id, api_key_id, account_id, request_id, model, group_id, subscription_id, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, cache_creation_5m_tokens, cache_creation_1h_tokens, input_cost, output_cost, cache_creation_cost, cache_read_cost, total_cost, actual_cost, rate_multiplier, billing_type, stream, duration_ms, first_token_ms, user_agent, ip_address, image_count, image_size, created_at"
|
||||
const usageLogSelectColumns = "id, user_id, api_key_id, account_id, request_id, model, group_id, subscription_id, input_tokens, output_tokens, cache_creation_tokens, cache_read_tokens, cache_creation_5m_tokens, cache_creation_1h_tokens, input_cost, output_cost, cache_creation_cost, cache_read_cost, total_cost, actual_cost, rate_multiplier, account_rate_multiplier, billing_type, stream, duration_ms, first_token_ms, user_agent, ip_address, image_count, image_size, created_at"
|
||||
|
||||
type usageLogRepository struct {
|
||||
client *dbent.Client
|
||||
@@ -105,6 +105,7 @@ func (r *usageLogRepository) Create(ctx context.Context, log *service.UsageLog)
|
||||
total_cost,
|
||||
actual_cost,
|
||||
rate_multiplier,
|
||||
account_rate_multiplier,
|
||||
billing_type,
|
||||
stream,
|
||||
duration_ms,
|
||||
@@ -120,7 +121,7 @@ func (r *usageLogRepository) Create(ctx context.Context, log *service.UsageLog)
|
||||
$8, $9, $10, $11,
|
||||
$12, $13,
|
||||
$14, $15, $16, $17, $18, $19,
|
||||
$20, $21, $22, $23, $24, $25, $26, $27, $28, $29
|
||||
$20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30
|
||||
)
|
||||
ON CONFLICT (request_id, api_key_id) DO NOTHING
|
||||
RETURNING id, created_at
|
||||
@@ -160,6 +161,7 @@ func (r *usageLogRepository) Create(ctx context.Context, log *service.UsageLog)
|
||||
log.TotalCost,
|
||||
log.ActualCost,
|
||||
rateMultiplier,
|
||||
log.AccountRateMultiplier,
|
||||
log.BillingType,
|
||||
log.Stream,
|
||||
duration,
|
||||
@@ -270,13 +272,13 @@ type DashboardStats = usagestats.DashboardStats
|
||||
|
||||
func (r *usageLogRepository) GetDashboardStats(ctx context.Context) (*DashboardStats, error) {
|
||||
stats := &DashboardStats{}
|
||||
now := time.Now().UTC()
|
||||
todayUTC := truncateToDayUTC(now)
|
||||
now := timezone.Now()
|
||||
todayStart := timezone.Today()
|
||||
|
||||
if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil {
|
||||
if err := r.fillDashboardEntityStats(ctx, stats, todayStart, now); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := r.fillDashboardUsageStatsAggregated(ctx, stats, todayUTC, now); err != nil {
|
||||
if err := r.fillDashboardUsageStatsAggregated(ctx, stats, todayStart, now); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -298,13 +300,13 @@ func (r *usageLogRepository) GetDashboardStatsWithRange(ctx context.Context, sta
|
||||
}
|
||||
|
||||
stats := &DashboardStats{}
|
||||
now := time.Now().UTC()
|
||||
todayUTC := truncateToDayUTC(now)
|
||||
now := timezone.Now()
|
||||
todayStart := timezone.Today()
|
||||
|
||||
if err := r.fillDashboardEntityStats(ctx, stats, todayUTC, now); err != nil {
|
||||
if err := r.fillDashboardEntityStats(ctx, stats, todayStart, now); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := r.fillDashboardUsageStatsFromUsageLogs(ctx, stats, startUTC, endUTC, todayUTC, now); err != nil {
|
||||
if err := r.fillDashboardUsageStatsFromUsageLogs(ctx, stats, startUTC, endUTC, todayStart, now); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -455,7 +457,7 @@ func (r *usageLogRepository) fillDashboardUsageStatsAggregated(ctx context.Conte
|
||||
FROM usage_dashboard_hourly
|
||||
WHERE bucket_start = $1
|
||||
`
|
||||
hourStart := now.UTC().Truncate(time.Hour)
|
||||
hourStart := now.In(timezone.Location()).Truncate(time.Hour)
|
||||
if err := scanSingleRow(ctx, r.sql, hourlyActiveQuery, []any{hourStart}, &stats.HourlyActiveUsers); err != nil {
|
||||
if err != sql.ErrNoRows {
|
||||
return err
|
||||
@@ -835,7 +837,9 @@ func (r *usageLogRepository) GetAccountTodayStats(ctx context.Context, accountID
|
||||
SELECT
|
||||
COUNT(*) as requests,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) as tokens,
|
||||
COALESCE(SUM(actual_cost), 0) as cost
|
||||
COALESCE(SUM(total_cost * COALESCE(account_rate_multiplier, 1)), 0) as cost,
|
||||
COALESCE(SUM(total_cost), 0) as standard_cost,
|
||||
COALESCE(SUM(actual_cost), 0) as user_cost
|
||||
FROM usage_logs
|
||||
WHERE account_id = $1 AND created_at >= $2
|
||||
`
|
||||
@@ -849,6 +853,8 @@ func (r *usageLogRepository) GetAccountTodayStats(ctx context.Context, accountID
|
||||
&stats.Requests,
|
||||
&stats.Tokens,
|
||||
&stats.Cost,
|
||||
&stats.StandardCost,
|
||||
&stats.UserCost,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -861,7 +867,9 @@ func (r *usageLogRepository) GetAccountWindowStats(ctx context.Context, accountI
|
||||
SELECT
|
||||
COUNT(*) as requests,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) as tokens,
|
||||
COALESCE(SUM(actual_cost), 0) as cost
|
||||
COALESCE(SUM(total_cost * COALESCE(account_rate_multiplier, 1)), 0) as cost,
|
||||
COALESCE(SUM(total_cost), 0) as standard_cost,
|
||||
COALESCE(SUM(actual_cost), 0) as user_cost
|
||||
FROM usage_logs
|
||||
WHERE account_id = $1 AND created_at >= $2
|
||||
`
|
||||
@@ -875,6 +883,8 @@ func (r *usageLogRepository) GetAccountWindowStats(ctx context.Context, accountI
|
||||
&stats.Requests,
|
||||
&stats.Tokens,
|
||||
&stats.Cost,
|
||||
&stats.StandardCost,
|
||||
&stats.UserCost,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1400,8 +1410,8 @@ func (r *usageLogRepository) GetBatchAPIKeyUsageStats(ctx context.Context, apiKe
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetUsageTrendWithFilters returns usage trend data with optional user/api_key filters
|
||||
func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID int64) (results []TrendDataPoint, err error) {
|
||||
// GetUsageTrendWithFilters returns usage trend data with optional filters
|
||||
func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID, accountID, groupID int64, model string, stream *bool) (results []TrendDataPoint, err error) {
|
||||
dateFormat := "YYYY-MM-DD"
|
||||
if granularity == "hour" {
|
||||
dateFormat = "YYYY-MM-DD HH24:00"
|
||||
@@ -1430,6 +1440,22 @@ func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, start
|
||||
query += fmt.Sprintf(" AND api_key_id = $%d", len(args)+1)
|
||||
args = append(args, apiKeyID)
|
||||
}
|
||||
if accountID > 0 {
|
||||
query += fmt.Sprintf(" AND account_id = $%d", len(args)+1)
|
||||
args = append(args, accountID)
|
||||
}
|
||||
if groupID > 0 {
|
||||
query += fmt.Sprintf(" AND group_id = $%d", len(args)+1)
|
||||
args = append(args, groupID)
|
||||
}
|
||||
if model != "" {
|
||||
query += fmt.Sprintf(" AND model = $%d", len(args)+1)
|
||||
args = append(args, model)
|
||||
}
|
||||
if stream != nil {
|
||||
query += fmt.Sprintf(" AND stream = $%d", len(args)+1)
|
||||
args = append(args, *stream)
|
||||
}
|
||||
query += " GROUP BY date ORDER BY date ASC"
|
||||
|
||||
rows, err := r.sql.QueryContext(ctx, query, args...)
|
||||
@@ -1452,9 +1478,15 @@ func (r *usageLogRepository) GetUsageTrendWithFilters(ctx context.Context, start
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// GetModelStatsWithFilters returns model statistics with optional user/api_key filters
|
||||
func (r *usageLogRepository) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID int64) (results []ModelStat, err error) {
|
||||
query := `
|
||||
// GetModelStatsWithFilters returns model statistics with optional filters
|
||||
func (r *usageLogRepository) GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID, groupID int64, stream *bool) (results []ModelStat, err error) {
|
||||
actualCostExpr := "COALESCE(SUM(actual_cost), 0) as actual_cost"
|
||||
// 当仅按 account_id 聚合时,实际费用使用账号倍率(total_cost * account_rate_multiplier)。
|
||||
if accountID > 0 && userID == 0 && apiKeyID == 0 {
|
||||
actualCostExpr = "COALESCE(SUM(total_cost * COALESCE(account_rate_multiplier, 1)), 0) as actual_cost"
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
model,
|
||||
COUNT(*) as requests,
|
||||
@@ -1462,10 +1494,10 @@ func (r *usageLogRepository) GetModelStatsWithFilters(ctx context.Context, start
|
||||
COALESCE(SUM(output_tokens), 0) as output_tokens,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) as total_tokens,
|
||||
COALESCE(SUM(total_cost), 0) as cost,
|
||||
COALESCE(SUM(actual_cost), 0) as actual_cost
|
||||
%s
|
||||
FROM usage_logs
|
||||
WHERE created_at >= $1 AND created_at < $2
|
||||
`
|
||||
`, actualCostExpr)
|
||||
|
||||
args := []any{startTime, endTime}
|
||||
if userID > 0 {
|
||||
@@ -1480,6 +1512,14 @@ func (r *usageLogRepository) GetModelStatsWithFilters(ctx context.Context, start
|
||||
query += fmt.Sprintf(" AND account_id = $%d", len(args)+1)
|
||||
args = append(args, accountID)
|
||||
}
|
||||
if groupID > 0 {
|
||||
query += fmt.Sprintf(" AND group_id = $%d", len(args)+1)
|
||||
args = append(args, groupID)
|
||||
}
|
||||
if stream != nil {
|
||||
query += fmt.Sprintf(" AND stream = $%d", len(args)+1)
|
||||
args = append(args, *stream)
|
||||
}
|
||||
query += " GROUP BY model ORDER BY total_tokens DESC"
|
||||
|
||||
rows, err := r.sql.QueryContext(ctx, query, args...)
|
||||
@@ -1587,12 +1627,14 @@ func (r *usageLogRepository) GetStatsWithFilters(ctx context.Context, filters Us
|
||||
COALESCE(SUM(cache_creation_tokens + cache_read_tokens), 0) as total_cache_tokens,
|
||||
COALESCE(SUM(total_cost), 0) as total_cost,
|
||||
COALESCE(SUM(actual_cost), 0) as total_actual_cost,
|
||||
COALESCE(SUM(total_cost * COALESCE(account_rate_multiplier, 1)), 0) as total_account_cost,
|
||||
COALESCE(AVG(duration_ms), 0) as avg_duration_ms
|
||||
FROM usage_logs
|
||||
%s
|
||||
`, buildWhere(conditions))
|
||||
|
||||
stats := &UsageStats{}
|
||||
var totalAccountCost float64
|
||||
if err := scanSingleRow(
|
||||
ctx,
|
||||
r.sql,
|
||||
@@ -1604,10 +1646,14 @@ func (r *usageLogRepository) GetStatsWithFilters(ctx context.Context, filters Us
|
||||
&stats.TotalCacheTokens,
|
||||
&stats.TotalCost,
|
||||
&stats.TotalActualCost,
|
||||
&totalAccountCost,
|
||||
&stats.AverageDurationMs,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if filters.AccountID > 0 {
|
||||
stats.TotalAccountCost = &totalAccountCost
|
||||
}
|
||||
stats.TotalTokens = stats.TotalInputTokens + stats.TotalOutputTokens + stats.TotalCacheTokens
|
||||
return stats, nil
|
||||
}
|
||||
@@ -1634,7 +1680,8 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
COUNT(*) as requests,
|
||||
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) as tokens,
|
||||
COALESCE(SUM(total_cost), 0) as cost,
|
||||
COALESCE(SUM(actual_cost), 0) as actual_cost
|
||||
COALESCE(SUM(total_cost * COALESCE(account_rate_multiplier, 1)), 0) as actual_cost,
|
||||
COALESCE(SUM(actual_cost), 0) as user_cost
|
||||
FROM usage_logs
|
||||
WHERE account_id = $1 AND created_at >= $2 AND created_at < $3
|
||||
GROUP BY date
|
||||
@@ -1661,7 +1708,8 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
var tokens int64
|
||||
var cost float64
|
||||
var actualCost float64
|
||||
if err = rows.Scan(&date, &requests, &tokens, &cost, &actualCost); err != nil {
|
||||
var userCost float64
|
||||
if err = rows.Scan(&date, &requests, &tokens, &cost, &actualCost, &userCost); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t, _ := time.Parse("2006-01-02", date)
|
||||
@@ -1672,19 +1720,21 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
Tokens: tokens,
|
||||
Cost: cost,
|
||||
ActualCost: actualCost,
|
||||
UserCost: userCost,
|
||||
})
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var totalActualCost, totalStandardCost float64
|
||||
var totalAccountCost, totalUserCost, totalStandardCost float64
|
||||
var totalRequests, totalTokens int64
|
||||
var highestCostDay, highestRequestDay *AccountUsageHistory
|
||||
|
||||
for i := range history {
|
||||
h := &history[i]
|
||||
totalActualCost += h.ActualCost
|
||||
totalAccountCost += h.ActualCost
|
||||
totalUserCost += h.UserCost
|
||||
totalStandardCost += h.Cost
|
||||
totalRequests += h.Requests
|
||||
totalTokens += h.Tokens
|
||||
@@ -1711,11 +1761,13 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
summary := AccountUsageSummary{
|
||||
Days: daysCount,
|
||||
ActualDaysUsed: actualDaysUsed,
|
||||
TotalCost: totalActualCost,
|
||||
TotalCost: totalAccountCost,
|
||||
TotalUserCost: totalUserCost,
|
||||
TotalStandardCost: totalStandardCost,
|
||||
TotalRequests: totalRequests,
|
||||
TotalTokens: totalTokens,
|
||||
AvgDailyCost: totalActualCost / float64(actualDaysUsed),
|
||||
AvgDailyCost: totalAccountCost / float64(actualDaysUsed),
|
||||
AvgDailyUserCost: totalUserCost / float64(actualDaysUsed),
|
||||
AvgDailyRequests: float64(totalRequests) / float64(actualDaysUsed),
|
||||
AvgDailyTokens: float64(totalTokens) / float64(actualDaysUsed),
|
||||
AvgDurationMs: avgDuration,
|
||||
@@ -1727,11 +1779,13 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
summary.Today = &struct {
|
||||
Date string `json:"date"`
|
||||
Cost float64 `json:"cost"`
|
||||
UserCost float64 `json:"user_cost"`
|
||||
Requests int64 `json:"requests"`
|
||||
Tokens int64 `json:"tokens"`
|
||||
}{
|
||||
Date: history[i].Date,
|
||||
Cost: history[i].ActualCost,
|
||||
UserCost: history[i].UserCost,
|
||||
Requests: history[i].Requests,
|
||||
Tokens: history[i].Tokens,
|
||||
}
|
||||
@@ -1744,11 +1798,13 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
Date string `json:"date"`
|
||||
Label string `json:"label"`
|
||||
Cost float64 `json:"cost"`
|
||||
UserCost float64 `json:"user_cost"`
|
||||
Requests int64 `json:"requests"`
|
||||
}{
|
||||
Date: highestCostDay.Date,
|
||||
Label: highestCostDay.Label,
|
||||
Cost: highestCostDay.ActualCost,
|
||||
UserCost: highestCostDay.UserCost,
|
||||
Requests: highestCostDay.Requests,
|
||||
}
|
||||
}
|
||||
@@ -1759,15 +1815,17 @@ func (r *usageLogRepository) GetAccountUsageStats(ctx context.Context, accountID
|
||||
Label string `json:"label"`
|
||||
Requests int64 `json:"requests"`
|
||||
Cost float64 `json:"cost"`
|
||||
UserCost float64 `json:"user_cost"`
|
||||
}{
|
||||
Date: highestRequestDay.Date,
|
||||
Label: highestRequestDay.Label,
|
||||
Requests: highestRequestDay.Requests,
|
||||
Cost: highestRequestDay.ActualCost,
|
||||
UserCost: highestRequestDay.UserCost,
|
||||
}
|
||||
}
|
||||
|
||||
models, err := r.GetModelStatsWithFilters(ctx, startTime, endTime, 0, 0, accountID)
|
||||
models, err := r.GetModelStatsWithFilters(ctx, startTime, endTime, 0, 0, accountID, 0, nil)
|
||||
if err != nil {
|
||||
models = []ModelStat{}
|
||||
}
|
||||
@@ -1994,36 +2052,37 @@ func (r *usageLogRepository) loadSubscriptions(ctx context.Context, ids []int64)
|
||||
|
||||
func scanUsageLog(scanner interface{ Scan(...any) error }) (*service.UsageLog, error) {
|
||||
var (
|
||||
id int64
|
||||
userID int64
|
||||
apiKeyID int64
|
||||
accountID int64
|
||||
requestID sql.NullString
|
||||
model string
|
||||
groupID sql.NullInt64
|
||||
subscriptionID sql.NullInt64
|
||||
inputTokens int
|
||||
outputTokens int
|
||||
cacheCreationTokens int
|
||||
cacheReadTokens int
|
||||
cacheCreation5m int
|
||||
cacheCreation1h int
|
||||
inputCost float64
|
||||
outputCost float64
|
||||
cacheCreationCost float64
|
||||
cacheReadCost float64
|
||||
totalCost float64
|
||||
actualCost float64
|
||||
rateMultiplier float64
|
||||
billingType int16
|
||||
stream bool
|
||||
durationMs sql.NullInt64
|
||||
firstTokenMs sql.NullInt64
|
||||
userAgent sql.NullString
|
||||
ipAddress sql.NullString
|
||||
imageCount int
|
||||
imageSize sql.NullString
|
||||
createdAt time.Time
|
||||
id int64
|
||||
userID int64
|
||||
apiKeyID int64
|
||||
accountID int64
|
||||
requestID sql.NullString
|
||||
model string
|
||||
groupID sql.NullInt64
|
||||
subscriptionID sql.NullInt64
|
||||
inputTokens int
|
||||
outputTokens int
|
||||
cacheCreationTokens int
|
||||
cacheReadTokens int
|
||||
cacheCreation5m int
|
||||
cacheCreation1h int
|
||||
inputCost float64
|
||||
outputCost float64
|
||||
cacheCreationCost float64
|
||||
cacheReadCost float64
|
||||
totalCost float64
|
||||
actualCost float64
|
||||
rateMultiplier float64
|
||||
accountRateMultiplier sql.NullFloat64
|
||||
billingType int16
|
||||
stream bool
|
||||
durationMs sql.NullInt64
|
||||
firstTokenMs sql.NullInt64
|
||||
userAgent sql.NullString
|
||||
ipAddress sql.NullString
|
||||
imageCount int
|
||||
imageSize sql.NullString
|
||||
createdAt time.Time
|
||||
)
|
||||
|
||||
if err := scanner.Scan(
|
||||
@@ -2048,6 +2107,7 @@ func scanUsageLog(scanner interface{ Scan(...any) error }) (*service.UsageLog, e
|
||||
&totalCost,
|
||||
&actualCost,
|
||||
&rateMultiplier,
|
||||
&accountRateMultiplier,
|
||||
&billingType,
|
||||
&stream,
|
||||
&durationMs,
|
||||
@@ -2080,6 +2140,7 @@ func scanUsageLog(scanner interface{ Scan(...any) error }) (*service.UsageLog, e
|
||||
TotalCost: totalCost,
|
||||
ActualCost: actualCost,
|
||||
RateMultiplier: rateMultiplier,
|
||||
AccountRateMultiplier: nullFloat64Ptr(accountRateMultiplier),
|
||||
BillingType: int8(billingType),
|
||||
Stream: stream,
|
||||
ImageCount: imageCount,
|
||||
@@ -2186,6 +2247,14 @@ func nullInt(v *int) sql.NullInt64 {
|
||||
return sql.NullInt64{Int64: int64(*v), Valid: true}
|
||||
}
|
||||
|
||||
func nullFloat64Ptr(v sql.NullFloat64) *float64 {
|
||||
if !v.Valid {
|
||||
return nil
|
||||
}
|
||||
out := v.Float64
|
||||
return &out
|
||||
}
|
||||
|
||||
func nullString(v *string) sql.NullString {
|
||||
if v == nil || *v == "" {
|
||||
return sql.NullString{}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
dbent "github.com/Wei-Shaw/sub2api/ent"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/stretchr/testify/suite"
|
||||
@@ -36,6 +37,12 @@ func TestUsageLogRepoSuite(t *testing.T) {
|
||||
suite.Run(t, new(UsageLogRepoSuite))
|
||||
}
|
||||
|
||||
// truncateToDayUTC 截断到 UTC 日期边界(测试辅助函数)
|
||||
func truncateToDayUTC(t time.Time) time.Time {
|
||||
t = t.UTC()
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||||
}
|
||||
|
||||
func (s *UsageLogRepoSuite) createUsageLog(user *service.User, apiKey *service.APIKey, account *service.Account, inputTokens, outputTokens int, cost float64, createdAt time.Time) *service.UsageLog {
|
||||
log := &service.UsageLog{
|
||||
UserID: user.ID,
|
||||
@@ -95,6 +102,34 @@ func (s *UsageLogRepoSuite) TestGetByID_NotFound() {
|
||||
s.Require().Error(err, "expected error for non-existent ID")
|
||||
}
|
||||
|
||||
func (s *UsageLogRepoSuite) TestGetByID_ReturnsAccountRateMultiplier() {
|
||||
user := mustCreateUser(s.T(), s.client, &service.User{Email: "getbyid-mult@test.com"})
|
||||
apiKey := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user.ID, Key: "sk-getbyid-mult", Name: "k"})
|
||||
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-getbyid-mult"})
|
||||
|
||||
m := 0.5
|
||||
log := &service.UsageLog{
|
||||
UserID: user.ID,
|
||||
APIKeyID: apiKey.ID,
|
||||
AccountID: account.ID,
|
||||
RequestID: uuid.New().String(),
|
||||
Model: "claude-3",
|
||||
InputTokens: 10,
|
||||
OutputTokens: 20,
|
||||
TotalCost: 1.0,
|
||||
ActualCost: 2.0,
|
||||
AccountRateMultiplier: &m,
|
||||
CreatedAt: timezone.Today().Add(2 * time.Hour),
|
||||
}
|
||||
_, err := s.repo.Create(s.ctx, log)
|
||||
s.Require().NoError(err)
|
||||
|
||||
got, err := s.repo.GetByID(s.ctx, log.ID)
|
||||
s.Require().NoError(err)
|
||||
s.Require().NotNil(got.AccountRateMultiplier)
|
||||
s.Require().InEpsilon(0.5, *got.AccountRateMultiplier, 0.0001)
|
||||
}
|
||||
|
||||
// --- Delete ---
|
||||
|
||||
func (s *UsageLogRepoSuite) TestDelete() {
|
||||
@@ -403,12 +438,49 @@ func (s *UsageLogRepoSuite) TestGetAccountTodayStats() {
|
||||
apiKey := mustCreateApiKey(s.T(), s.client, &service.APIKey{UserID: user.ID, Key: "sk-acctoday", Name: "k"})
|
||||
account := mustCreateAccount(s.T(), s.client, &service.Account{Name: "acc-today"})
|
||||
|
||||
s.createUsageLog(user, apiKey, account, 10, 20, 0.5, time.Now())
|
||||
createdAt := timezone.Today().Add(1 * time.Hour)
|
||||
|
||||
m1 := 1.5
|
||||
m2 := 0.0
|
||||
_, err := s.repo.Create(s.ctx, &service.UsageLog{
|
||||
UserID: user.ID,
|
||||
APIKeyID: apiKey.ID,
|
||||
AccountID: account.ID,
|
||||
RequestID: uuid.New().String(),
|
||||
Model: "claude-3",
|
||||
InputTokens: 10,
|
||||
OutputTokens: 20,
|
||||
TotalCost: 1.0,
|
||||
ActualCost: 2.0,
|
||||
AccountRateMultiplier: &m1,
|
||||
CreatedAt: createdAt,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
_, err = s.repo.Create(s.ctx, &service.UsageLog{
|
||||
UserID: user.ID,
|
||||
APIKeyID: apiKey.ID,
|
||||
AccountID: account.ID,
|
||||
RequestID: uuid.New().String(),
|
||||
Model: "claude-3",
|
||||
InputTokens: 5,
|
||||
OutputTokens: 5,
|
||||
TotalCost: 0.5,
|
||||
ActualCost: 1.0,
|
||||
AccountRateMultiplier: &m2,
|
||||
CreatedAt: createdAt,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
stats, err := s.repo.GetAccountTodayStats(s.ctx, account.ID)
|
||||
s.Require().NoError(err, "GetAccountTodayStats")
|
||||
s.Require().Equal(int64(1), stats.Requests)
|
||||
s.Require().Equal(int64(30), stats.Tokens)
|
||||
s.Require().Equal(int64(2), stats.Requests)
|
||||
s.Require().Equal(int64(40), stats.Tokens)
|
||||
// account cost = SUM(total_cost * account_rate_multiplier)
|
||||
s.Require().InEpsilon(1.5, stats.Cost, 0.0001)
|
||||
// standard cost = SUM(total_cost)
|
||||
s.Require().InEpsilon(1.5, stats.StandardCost, 0.0001)
|
||||
// user cost = SUM(actual_cost)
|
||||
s.Require().InEpsilon(3.0, stats.UserCost, 0.0001)
|
||||
}
|
||||
|
||||
func (s *UsageLogRepoSuite) TestDashboardAggregationConsistency() {
|
||||
@@ -416,8 +488,8 @@ func (s *UsageLogRepoSuite) TestDashboardAggregationConsistency() {
|
||||
// 使用固定的时间偏移确保 hour1 和 hour2 在同一天且都在过去
|
||||
// 选择当天 02:00 和 03:00 作为测试时间点(基于 now 的日期)
|
||||
dayStart := truncateToDayUTC(now)
|
||||
hour1 := dayStart.Add(2 * time.Hour) // 当天 02:00
|
||||
hour2 := dayStart.Add(3 * time.Hour) // 当天 03:00
|
||||
hour1 := dayStart.Add(2 * time.Hour) // 当天 02:00
|
||||
hour2 := dayStart.Add(3 * time.Hour) // 当天 03:00
|
||||
// 如果当前时间早于 hour2,则使用昨天的时间
|
||||
if now.Before(hour2.Add(time.Hour)) {
|
||||
dayStart = dayStart.Add(-24 * time.Hour)
|
||||
@@ -872,17 +944,17 @@ func (s *UsageLogRepoSuite) TestGetUsageTrendWithFilters() {
|
||||
endTime := base.Add(48 * time.Hour)
|
||||
|
||||
// Test with user filter
|
||||
trend, err := s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", user.ID, 0)
|
||||
trend, err := s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", user.ID, 0, 0, 0, "", nil)
|
||||
s.Require().NoError(err, "GetUsageTrendWithFilters user filter")
|
||||
s.Require().Len(trend, 2)
|
||||
|
||||
// Test with apiKey filter
|
||||
trend, err = s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", 0, apiKey.ID)
|
||||
trend, err = s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", 0, apiKey.ID, 0, 0, "", nil)
|
||||
s.Require().NoError(err, "GetUsageTrendWithFilters apiKey filter")
|
||||
s.Require().Len(trend, 2)
|
||||
|
||||
// Test with both filters
|
||||
trend, err = s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", user.ID, apiKey.ID)
|
||||
trend, err = s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "day", user.ID, apiKey.ID, 0, 0, "", nil)
|
||||
s.Require().NoError(err, "GetUsageTrendWithFilters both filters")
|
||||
s.Require().Len(trend, 2)
|
||||
}
|
||||
@@ -899,7 +971,7 @@ func (s *UsageLogRepoSuite) TestGetUsageTrendWithFilters_HourlyGranularity() {
|
||||
startTime := base.Add(-1 * time.Hour)
|
||||
endTime := base.Add(3 * time.Hour)
|
||||
|
||||
trend, err := s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "hour", user.ID, 0)
|
||||
trend, err := s.repo.GetUsageTrendWithFilters(s.ctx, startTime, endTime, "hour", user.ID, 0, 0, 0, "", nil)
|
||||
s.Require().NoError(err, "GetUsageTrendWithFilters hourly")
|
||||
s.Require().Len(trend, 2)
|
||||
}
|
||||
@@ -945,17 +1017,17 @@ func (s *UsageLogRepoSuite) TestGetModelStatsWithFilters() {
|
||||
endTime := base.Add(2 * time.Hour)
|
||||
|
||||
// Test with user filter
|
||||
stats, err := s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, user.ID, 0, 0)
|
||||
stats, err := s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, user.ID, 0, 0, 0, nil)
|
||||
s.Require().NoError(err, "GetModelStatsWithFilters user filter")
|
||||
s.Require().Len(stats, 2)
|
||||
|
||||
// Test with apiKey filter
|
||||
stats, err = s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, 0, apiKey.ID, 0)
|
||||
stats, err = s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, 0, apiKey.ID, 0, 0, nil)
|
||||
s.Require().NoError(err, "GetModelStatsWithFilters apiKey filter")
|
||||
s.Require().Len(stats, 2)
|
||||
|
||||
// Test with account filter
|
||||
stats, err = s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, 0, 0, account.ID)
|
||||
stats, err = s.repo.GetModelStatsWithFilters(s.ctx, startTime, endTime, 0, 0, account.ID, 0, nil)
|
||||
s.Require().NoError(err, "GetModelStatsWithFilters account filter")
|
||||
s.Require().Len(stats, 2)
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ var ProviderSet = wire.NewSet(
|
||||
NewGeminiTokenCache,
|
||||
NewSchedulerCache,
|
||||
NewSchedulerOutboxRepository,
|
||||
NewProxyLatencyCache,
|
||||
|
||||
// HTTP service ports (DI Strategy A: return interface directly)
|
||||
NewTurnstileVerifier,
|
||||
|
||||
Reference in New Issue
Block a user