diff --git a/backend/internal/repository/concurrency_cache.go b/backend/internal/repository/concurrency_cache.go index 0831f5eb..b34961e1 100644 --- a/backend/internal/repository/concurrency_cache.go +++ b/backend/internal/repository/concurrency_cache.go @@ -93,7 +93,7 @@ var ( return redis.call('ZCARD', key) `) - // incrementWaitScript - only sets TTL on first creation to avoid refreshing + // incrementWaitScript - refreshes TTL on each increment to keep queue depth accurate // KEYS[1] = wait queue key // ARGV[1] = maxWait // ARGV[2] = TTL in seconds @@ -111,15 +111,13 @@ var ( local newVal = redis.call('INCR', KEYS[1]) - -- Only set TTL on first creation to avoid refreshing zombie data - if newVal == 1 then - redis.call('EXPIRE', KEYS[1], ARGV[2]) - end + -- Refresh TTL so long-running traffic doesn't expire active queue counters. + redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 `) - // incrementAccountWaitScript - account-level wait queue count + // incrementAccountWaitScript - account-level wait queue count (refresh TTL on each increment) incrementAccountWaitScript = redis.NewScript(` local current = redis.call('GET', KEYS[1]) if current == false then @@ -134,10 +132,8 @@ var ( local newVal = redis.call('INCR', KEYS[1]) - -- Only set TTL on first creation to avoid refreshing zombie data - if newVal == 1 then - redis.call('EXPIRE', KEYS[1], ARGV[2]) - end + -- Refresh TTL so long-running traffic doesn't expire active queue counters. + redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 `) diff --git a/backend/internal/repository/ops_repo.go b/backend/internal/repository/ops_repo.go new file mode 100644 index 00000000..b27a9ea0 --- /dev/null +++ b/backend/internal/repository/ops_repo.go @@ -0,0 +1,676 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" + "github.com/lib/pq" +) + +type opsRepository struct { + db *sql.DB +} + +func NewOpsRepository(db *sql.DB) service.OpsRepository { + return &opsRepository{db: db} +} + +func (r *opsRepository) InsertErrorLog(ctx context.Context, input *service.OpsInsertErrorLogInput) (int64, error) { + if r == nil || r.db == nil { + return 0, fmt.Errorf("nil ops repository") + } + if input == nil { + return 0, fmt.Errorf("nil input") + } + + q := ` +INSERT INTO ops_error_logs ( + request_id, + client_request_id, + user_id, + api_key_id, + account_id, + group_id, + client_ip, + platform, + model, + request_path, + stream, + user_agent, + error_phase, + error_type, + severity, + status_code, + is_business_limited, + error_message, + error_body, + error_source, + error_owner, + upstream_status_code, + upstream_error_message, + upstream_error_detail, + duration_ms, + time_to_first_token_ms, + request_body, + request_body_truncated, + request_body_bytes, + request_headers, + is_retryable, + retry_count, + created_at +) VALUES ( + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33 +) RETURNING id` + + var id int64 + err := r.db.QueryRowContext( + ctx, + q, + opsNullString(input.RequestID), + opsNullString(input.ClientRequestID), + opsNullInt64(input.UserID), + opsNullInt64(input.APIKeyID), + opsNullInt64(input.AccountID), + opsNullInt64(input.GroupID), + opsNullString(input.ClientIP), + opsNullString(input.Platform), + opsNullString(input.Model), + opsNullString(input.RequestPath), + input.Stream, + opsNullString(input.UserAgent), + input.ErrorPhase, + input.ErrorType, + opsNullString(input.Severity), + opsNullInt(input.StatusCode), + input.IsBusinessLimited, + opsNullString(input.ErrorMessage), + opsNullString(input.ErrorBody), + opsNullString(input.ErrorSource), + opsNullString(input.ErrorOwner), + opsNullInt(input.UpstreamStatusCode), + opsNullString(input.UpstreamErrorMessage), + opsNullString(input.UpstreamErrorDetail), + opsNullInt(input.DurationMs), + opsNullInt64(input.TimeToFirstTokenMs), + opsNullString(input.RequestBodyJSON), + input.RequestBodyTruncated, + opsNullInt(input.RequestBodyBytes), + opsNullString(input.RequestHeadersJSON), + input.IsRetryable, + input.RetryCount, + input.CreatedAt, + ).Scan(&id) + if err != nil { + return 0, err + } + return id, nil +} + +func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsErrorLogFilter) (*service.OpsErrorLogList, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + filter = &service.OpsErrorLogFilter{} + } + + page := filter.Page + if page <= 0 { + page = 1 + } + pageSize := filter.PageSize + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 500 { + pageSize = 500 + } + + where, args := buildOpsErrorLogsWhere(filter) + countSQL := "SELECT COUNT(*) FROM ops_error_logs " + where + + var total int + if err := r.db.QueryRowContext(ctx, countSQL, args...).Scan(&total); err != nil { + return nil, err + } + + offset := (page - 1) * pageSize + argsWithLimit := append(args, pageSize, offset) + selectSQL := ` +SELECT + id, + created_at, + error_phase, + error_type, + severity, + COALESCE(status_code, 0), + COALESCE(platform, ''), + COALESCE(model, ''), + duration_ms, + COALESCE(client_request_id, ''), + COALESCE(request_id, ''), + COALESCE(error_message, ''), + user_id, + api_key_id, + account_id, + group_id, + CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END, + COALESCE(request_path, ''), + stream +FROM ops_error_logs +` + where + ` +ORDER BY created_at DESC +LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2) + + rows, err := r.db.QueryContext(ctx, selectSQL, argsWithLimit...) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make([]*service.OpsErrorLog, 0, pageSize) + for rows.Next() { + var item service.OpsErrorLog + var latency sql.NullInt64 + var statusCode sql.NullInt64 + var clientIP sql.NullString + var userID sql.NullInt64 + var apiKeyID sql.NullInt64 + var accountID sql.NullInt64 + var groupID sql.NullInt64 + if err := rows.Scan( + &item.ID, + &item.CreatedAt, + &item.Phase, + &item.Type, + &item.Severity, + &statusCode, + &item.Platform, + &item.Model, + &latency, + &item.ClientRequestID, + &item.RequestID, + &item.Message, + &userID, + &apiKeyID, + &accountID, + &groupID, + &clientIP, + &item.RequestPath, + &item.Stream, + ); err != nil { + return nil, err + } + if latency.Valid { + v := int(latency.Int64) + item.LatencyMs = &v + } + item.StatusCode = int(statusCode.Int64) + if clientIP.Valid { + s := clientIP.String + item.ClientIP = &s + } + if userID.Valid { + v := userID.Int64 + item.UserID = &v + } + if apiKeyID.Valid { + v := apiKeyID.Int64 + item.APIKeyID = &v + } + if accountID.Valid { + v := accountID.Int64 + item.AccountID = &v + } + if groupID.Valid { + v := groupID.Int64 + item.GroupID = &v + } + out = append(out, &item) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return &service.OpsErrorLogList{ + Errors: out, + Total: total, + Page: page, + PageSize: pageSize, + }, nil +} + +func (r *opsRepository) GetErrorLogByID(ctx context.Context, id int64) (*service.OpsErrorLogDetail, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if id <= 0 { + return nil, fmt.Errorf("invalid id") + } + + q := ` +SELECT + id, + created_at, + error_phase, + error_type, + severity, + COALESCE(status_code, 0), + COALESCE(platform, ''), + COALESCE(model, ''), + duration_ms, + COALESCE(client_request_id, ''), + COALESCE(request_id, ''), + COALESCE(error_message, ''), + COALESCE(error_body, ''), + is_business_limited, + user_id, + api_key_id, + account_id, + group_id, + CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END, + COALESCE(request_path, ''), + stream, + COALESCE(user_agent, ''), + auth_latency_ms, + routing_latency_ms, + upstream_latency_ms, + response_latency_ms, + time_to_first_token_ms, + COALESCE(request_body::text, ''), + request_body_truncated, + request_body_bytes, + COALESCE(request_headers::text, '') +FROM ops_error_logs +WHERE id = $1 +LIMIT 1` + + var out service.OpsErrorLogDetail + var latency sql.NullInt64 + var statusCode sql.NullInt64 + var clientIP sql.NullString + var userID sql.NullInt64 + var apiKeyID sql.NullInt64 + var accountID sql.NullInt64 + var groupID sql.NullInt64 + var authLatency sql.NullInt64 + var routingLatency sql.NullInt64 + var upstreamLatency sql.NullInt64 + var responseLatency sql.NullInt64 + var ttft sql.NullInt64 + var requestBodyBytes sql.NullInt64 + + err := r.db.QueryRowContext(ctx, q, id).Scan( + &out.ID, + &out.CreatedAt, + &out.Phase, + &out.Type, + &out.Severity, + &statusCode, + &out.Platform, + &out.Model, + &latency, + &out.ClientRequestID, + &out.RequestID, + &out.Message, + &out.ErrorBody, + &out.IsBusinessLimited, + &userID, + &apiKeyID, + &accountID, + &groupID, + &clientIP, + &out.RequestPath, + &out.Stream, + &out.UserAgent, + &authLatency, + &routingLatency, + &upstreamLatency, + &responseLatency, + &ttft, + &out.RequestBody, + &out.RequestBodyTruncated, + &requestBodyBytes, + &out.RequestHeaders, + ) + if err != nil { + return nil, err + } + + out.StatusCode = int(statusCode.Int64) + if latency.Valid { + v := int(latency.Int64) + out.LatencyMs = &v + } + if clientIP.Valid { + s := clientIP.String + out.ClientIP = &s + } + if userID.Valid { + v := userID.Int64 + out.UserID = &v + } + if apiKeyID.Valid { + v := apiKeyID.Int64 + out.APIKeyID = &v + } + if accountID.Valid { + v := accountID.Int64 + out.AccountID = &v + } + if groupID.Valid { + v := groupID.Int64 + out.GroupID = &v + } + if authLatency.Valid { + v := authLatency.Int64 + out.AuthLatencyMs = &v + } + if routingLatency.Valid { + v := routingLatency.Int64 + out.RoutingLatencyMs = &v + } + if upstreamLatency.Valid { + v := upstreamLatency.Int64 + out.UpstreamLatencyMs = &v + } + if responseLatency.Valid { + v := responseLatency.Int64 + out.ResponseLatencyMs = &v + } + if ttft.Valid { + v := ttft.Int64 + out.TimeToFirstTokenMs = &v + } + if requestBodyBytes.Valid { + v := int(requestBodyBytes.Int64) + out.RequestBodyBytes = &v + } + + // Normalize request_body to empty string when stored as JSON null. + out.RequestBody = strings.TrimSpace(out.RequestBody) + if out.RequestBody == "null" { + out.RequestBody = "" + } + // Normalize request_headers to empty string when stored as JSON null. + out.RequestHeaders = strings.TrimSpace(out.RequestHeaders) + if out.RequestHeaders == "null" { + out.RequestHeaders = "" + } + + return &out, nil +} + +func (r *opsRepository) InsertRetryAttempt(ctx context.Context, input *service.OpsInsertRetryAttemptInput) (int64, error) { + if r == nil || r.db == nil { + return 0, fmt.Errorf("nil ops repository") + } + if input == nil { + return 0, fmt.Errorf("nil input") + } + if input.SourceErrorID <= 0 { + return 0, fmt.Errorf("invalid source_error_id") + } + if strings.TrimSpace(input.Mode) == "" { + return 0, fmt.Errorf("invalid mode") + } + + q := ` +INSERT INTO ops_retry_attempts ( + requested_by_user_id, + source_error_id, + mode, + pinned_account_id, + status, + started_at +) VALUES ( + $1,$2,$3,$4,$5,$6 +) RETURNING id` + + var id int64 + err := r.db.QueryRowContext( + ctx, + q, + opsNullInt64(&input.RequestedByUserID), + input.SourceErrorID, + strings.TrimSpace(input.Mode), + opsNullInt64(input.PinnedAccountID), + strings.TrimSpace(input.Status), + input.StartedAt, + ).Scan(&id) + if err != nil { + return 0, err + } + return id, nil +} + +func (r *opsRepository) UpdateRetryAttempt(ctx context.Context, input *service.OpsUpdateRetryAttemptInput) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if input == nil { + return fmt.Errorf("nil input") + } + if input.ID <= 0 { + return fmt.Errorf("invalid id") + } + + q := ` +UPDATE ops_retry_attempts +SET + status = $2, + finished_at = $3, + duration_ms = $4, + result_request_id = $5, + result_error_id = $6, + error_message = $7 +WHERE id = $1` + + _, err := r.db.ExecContext( + ctx, + q, + input.ID, + strings.TrimSpace(input.Status), + nullTime(input.FinishedAt), + input.DurationMs, + opsNullString(input.ResultRequestID), + opsNullInt64(input.ResultErrorID), + opsNullString(input.ErrorMessage), + ) + return err +} + +func (r *opsRepository) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*service.OpsRetryAttempt, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if sourceErrorID <= 0 { + return nil, fmt.Errorf("invalid source_error_id") + } + + q := ` +SELECT + id, + created_at, + COALESCE(requested_by_user_id, 0), + source_error_id, + COALESCE(mode, ''), + pinned_account_id, + COALESCE(status, ''), + started_at, + finished_at, + duration_ms, + result_request_id, + result_error_id, + error_message +FROM ops_retry_attempts +WHERE source_error_id = $1 +ORDER BY created_at DESC +LIMIT 1` + + var out service.OpsRetryAttempt + var pinnedAccountID sql.NullInt64 + var requestedBy sql.NullInt64 + var startedAt sql.NullTime + var finishedAt sql.NullTime + var durationMs sql.NullInt64 + var resultRequestID sql.NullString + var resultErrorID sql.NullInt64 + var errorMessage sql.NullString + + err := r.db.QueryRowContext(ctx, q, sourceErrorID).Scan( + &out.ID, + &out.CreatedAt, + &requestedBy, + &out.SourceErrorID, + &out.Mode, + &pinnedAccountID, + &out.Status, + &startedAt, + &finishedAt, + &durationMs, + &resultRequestID, + &resultErrorID, + &errorMessage, + ) + if err != nil { + return nil, err + } + out.RequestedByUserID = requestedBy.Int64 + if pinnedAccountID.Valid { + v := pinnedAccountID.Int64 + out.PinnedAccountID = &v + } + if startedAt.Valid { + t := startedAt.Time + out.StartedAt = &t + } + if finishedAt.Valid { + t := finishedAt.Time + out.FinishedAt = &t + } + if durationMs.Valid { + v := durationMs.Int64 + out.DurationMs = &v + } + if resultRequestID.Valid { + s := resultRequestID.String + out.ResultRequestID = &s + } + if resultErrorID.Valid { + v := resultErrorID.Int64 + out.ResultErrorID = &v + } + if errorMessage.Valid { + s := errorMessage.String + out.ErrorMessage = &s + } + + return &out, nil +} + +func nullTime(t time.Time) sql.NullTime { + if t.IsZero() { + return sql.NullTime{} + } + return sql.NullTime{Time: t, Valid: true} +} + +func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) { + clauses := make([]string, 0, 8) + args := make([]any, 0, 8) + clauses = append(clauses, "1=1") + + if filter.StartTime != nil && !filter.StartTime.IsZero() { + args = append(args, filter.StartTime.UTC()) + clauses = append(clauses, "created_at >= $"+itoa(len(args))) + } + if filter.EndTime != nil && !filter.EndTime.IsZero() { + args = append(args, filter.EndTime.UTC()) + // Keep time-window semantics consistent with other ops queries: [start, end) + clauses = append(clauses, "created_at < $"+itoa(len(args))) + } + if p := strings.TrimSpace(filter.Platform); p != "" { + args = append(args, p) + clauses = append(clauses, "platform = $"+itoa(len(args))) + } + if filter.GroupID != nil && *filter.GroupID > 0 { + args = append(args, *filter.GroupID) + clauses = append(clauses, "group_id = $"+itoa(len(args))) + } + if filter.AccountID != nil && *filter.AccountID > 0 { + args = append(args, *filter.AccountID) + clauses = append(clauses, "account_id = $"+itoa(len(args))) + } + if phase := strings.TrimSpace(filter.Phase); phase != "" { + args = append(args, phase) + clauses = append(clauses, "error_phase = $"+itoa(len(args))) + } + if len(filter.StatusCodes) > 0 { + args = append(args, pq.Array(filter.StatusCodes)) + clauses = append(clauses, "status_code = ANY($"+itoa(len(args))+")") + } + if q := strings.TrimSpace(filter.Query); q != "" { + like := "%" + q + "%" + args = append(args, like) + n := itoa(len(args)) + clauses = append(clauses, "(request_id ILIKE $"+n+" OR client_request_id ILIKE $"+n+" OR error_message ILIKE $"+n+")") + } + + return "WHERE " + strings.Join(clauses, " AND "), args +} + +// Helpers for nullable args +func opsNullString(v any) any { + switch s := v.(type) { + case nil: + return sql.NullString{} + case *string: + if s == nil || strings.TrimSpace(*s) == "" { + return sql.NullString{} + } + return sql.NullString{String: strings.TrimSpace(*s), Valid: true} + case string: + if strings.TrimSpace(s) == "" { + return sql.NullString{} + } + return sql.NullString{String: strings.TrimSpace(s), Valid: true} + default: + return sql.NullString{} + } +} + +func opsNullInt64(v *int64) any { + if v == nil || *v == 0 { + return sql.NullInt64{} + } + return sql.NullInt64{Int64: *v, Valid: true} +} + +func opsNullInt(v any) any { + switch n := v.(type) { + case nil: + return sql.NullInt64{} + case *int: + if n == nil || *n == 0 { + return sql.NullInt64{} + } + return sql.NullInt64{Int64: int64(*n), Valid: true} + case *int64: + if n == nil || *n == 0 { + return sql.NullInt64{} + } + return sql.NullInt64{Int64: *n, Valid: true} + case int: + if n == 0 { + return sql.NullInt64{} + } + return sql.NullInt64{Int64: int64(n), Valid: true} + default: + return sql.NullInt64{} + } +} diff --git a/backend/internal/repository/ops_repo_alerts.go b/backend/internal/repository/ops_repo_alerts.go new file mode 100644 index 00000000..ce99e6f7 --- /dev/null +++ b/backend/internal/repository/ops_repo_alerts.go @@ -0,0 +1,689 @@ +package repository + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) ListAlertRules(ctx context.Context) ([]*service.OpsAlertRule, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + + q := ` +SELECT + id, + name, + COALESCE(description, ''), + enabled, + COALESCE(severity, ''), + metric_type, + operator, + threshold, + window_minutes, + sustained_minutes, + cooldown_minutes, + COALESCE(notify_email, true), + filters, + last_triggered_at, + created_at, + updated_at +FROM ops_alert_rules +ORDER BY id DESC` + + rows, err := r.db.QueryContext(ctx, q) + if err != nil { + return nil, err + } + defer rows.Close() + + out := []*service.OpsAlertRule{} + for rows.Next() { + var rule service.OpsAlertRule + var filtersRaw []byte + var lastTriggeredAt sql.NullTime + if err := rows.Scan( + &rule.ID, + &rule.Name, + &rule.Description, + &rule.Enabled, + &rule.Severity, + &rule.MetricType, + &rule.Operator, + &rule.Threshold, + &rule.WindowMinutes, + &rule.SustainedMinutes, + &rule.CooldownMinutes, + &rule.NotifyEmail, + &filtersRaw, + &lastTriggeredAt, + &rule.CreatedAt, + &rule.UpdatedAt, + ); err != nil { + return nil, err + } + if lastTriggeredAt.Valid { + v := lastTriggeredAt.Time + rule.LastTriggeredAt = &v + } + if len(filtersRaw) > 0 && string(filtersRaw) != "null" { + var decoded map[string]any + if err := json.Unmarshal(filtersRaw, &decoded); err == nil { + rule.Filters = decoded + } + } + out = append(out, &rule) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func (r *opsRepository) CreateAlertRule(ctx context.Context, input *service.OpsAlertRule) (*service.OpsAlertRule, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if input == nil { + return nil, fmt.Errorf("nil input") + } + + filtersArg, err := opsNullJSONMap(input.Filters) + if err != nil { + return nil, err + } + + q := ` +INSERT INTO ops_alert_rules ( + name, + description, + enabled, + severity, + metric_type, + operator, + threshold, + window_minutes, + sustained_minutes, + cooldown_minutes, + notify_email, + filters, + created_at, + updated_at +) VALUES ( + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,NOW(),NOW() +) +RETURNING + id, + name, + COALESCE(description, ''), + enabled, + COALESCE(severity, ''), + metric_type, + operator, + threshold, + window_minutes, + sustained_minutes, + cooldown_minutes, + COALESCE(notify_email, true), + filters, + last_triggered_at, + created_at, + updated_at` + + var out service.OpsAlertRule + var filtersRaw []byte + var lastTriggeredAt sql.NullTime + + if err := r.db.QueryRowContext( + ctx, + q, + strings.TrimSpace(input.Name), + strings.TrimSpace(input.Description), + input.Enabled, + strings.TrimSpace(input.Severity), + strings.TrimSpace(input.MetricType), + strings.TrimSpace(input.Operator), + input.Threshold, + input.WindowMinutes, + input.SustainedMinutes, + input.CooldownMinutes, + input.NotifyEmail, + filtersArg, + ).Scan( + &out.ID, + &out.Name, + &out.Description, + &out.Enabled, + &out.Severity, + &out.MetricType, + &out.Operator, + &out.Threshold, + &out.WindowMinutes, + &out.SustainedMinutes, + &out.CooldownMinutes, + &out.NotifyEmail, + &filtersRaw, + &lastTriggeredAt, + &out.CreatedAt, + &out.UpdatedAt, + ); err != nil { + return nil, err + } + if lastTriggeredAt.Valid { + v := lastTriggeredAt.Time + out.LastTriggeredAt = &v + } + if len(filtersRaw) > 0 && string(filtersRaw) != "null" { + var decoded map[string]any + if err := json.Unmarshal(filtersRaw, &decoded); err == nil { + out.Filters = decoded + } + } + + return &out, nil +} + +func (r *opsRepository) UpdateAlertRule(ctx context.Context, input *service.OpsAlertRule) (*service.OpsAlertRule, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if input == nil { + return nil, fmt.Errorf("nil input") + } + if input.ID <= 0 { + return nil, fmt.Errorf("invalid id") + } + + filtersArg, err := opsNullJSONMap(input.Filters) + if err != nil { + return nil, err + } + + q := ` +UPDATE ops_alert_rules +SET + name = $2, + description = $3, + enabled = $4, + severity = $5, + metric_type = $6, + operator = $7, + threshold = $8, + window_minutes = $9, + sustained_minutes = $10, + cooldown_minutes = $11, + notify_email = $12, + filters = $13, + updated_at = NOW() +WHERE id = $1 +RETURNING + id, + name, + COALESCE(description, ''), + enabled, + COALESCE(severity, ''), + metric_type, + operator, + threshold, + window_minutes, + sustained_minutes, + cooldown_minutes, + COALESCE(notify_email, true), + filters, + last_triggered_at, + created_at, + updated_at` + + var out service.OpsAlertRule + var filtersRaw []byte + var lastTriggeredAt sql.NullTime + + if err := r.db.QueryRowContext( + ctx, + q, + input.ID, + strings.TrimSpace(input.Name), + strings.TrimSpace(input.Description), + input.Enabled, + strings.TrimSpace(input.Severity), + strings.TrimSpace(input.MetricType), + strings.TrimSpace(input.Operator), + input.Threshold, + input.WindowMinutes, + input.SustainedMinutes, + input.CooldownMinutes, + input.NotifyEmail, + filtersArg, + ).Scan( + &out.ID, + &out.Name, + &out.Description, + &out.Enabled, + &out.Severity, + &out.MetricType, + &out.Operator, + &out.Threshold, + &out.WindowMinutes, + &out.SustainedMinutes, + &out.CooldownMinutes, + &out.NotifyEmail, + &filtersRaw, + &lastTriggeredAt, + &out.CreatedAt, + &out.UpdatedAt, + ); err != nil { + return nil, err + } + + if lastTriggeredAt.Valid { + v := lastTriggeredAt.Time + out.LastTriggeredAt = &v + } + if len(filtersRaw) > 0 && string(filtersRaw) != "null" { + var decoded map[string]any + if err := json.Unmarshal(filtersRaw, &decoded); err == nil { + out.Filters = decoded + } + } + + return &out, nil +} + +func (r *opsRepository) DeleteAlertRule(ctx context.Context, id int64) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if id <= 0 { + return fmt.Errorf("invalid id") + } + + res, err := r.db.ExecContext(ctx, "DELETE FROM ops_alert_rules WHERE id = $1", id) + if err != nil { + return err + } + affected, err := res.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return sql.ErrNoRows + } + return nil +} + +func (r *opsRepository) ListAlertEvents(ctx context.Context, filter *service.OpsAlertEventFilter) ([]*service.OpsAlertEvent, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + filter = &service.OpsAlertEventFilter{} + } + + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + if limit > 500 { + limit = 500 + } + + where, args := buildOpsAlertEventsWhere(filter) + args = append(args, limit) + limitArg := "$" + itoa(len(args)) + + q := ` +SELECT + id, + COALESCE(rule_id, 0), + COALESCE(severity, ''), + COALESCE(status, ''), + COALESCE(title, ''), + COALESCE(description, ''), + metric_value, + threshold_value, + dimensions, + fired_at, + resolved_at, + email_sent, + created_at +FROM ops_alert_events +` + where + ` +ORDER BY fired_at DESC +LIMIT ` + limitArg + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + out := []*service.OpsAlertEvent{} + for rows.Next() { + var ev service.OpsAlertEvent + var metricValue sql.NullFloat64 + var thresholdValue sql.NullFloat64 + var dimensionsRaw []byte + var resolvedAt sql.NullTime + if err := rows.Scan( + &ev.ID, + &ev.RuleID, + &ev.Severity, + &ev.Status, + &ev.Title, + &ev.Description, + &metricValue, + &thresholdValue, + &dimensionsRaw, + &ev.FiredAt, + &resolvedAt, + &ev.EmailSent, + &ev.CreatedAt, + ); err != nil { + return nil, err + } + if metricValue.Valid { + v := metricValue.Float64 + ev.MetricValue = &v + } + if thresholdValue.Valid { + v := thresholdValue.Float64 + ev.ThresholdValue = &v + } + if resolvedAt.Valid { + v := resolvedAt.Time + ev.ResolvedAt = &v + } + if len(dimensionsRaw) > 0 && string(dimensionsRaw) != "null" { + var decoded map[string]any + if err := json.Unmarshal(dimensionsRaw, &decoded); err == nil { + ev.Dimensions = decoded + } + } + out = append(out, &ev) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func (r *opsRepository) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*service.OpsAlertEvent, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if ruleID <= 0 { + return nil, fmt.Errorf("invalid rule id") + } + + q := ` +SELECT + id, + COALESCE(rule_id, 0), + COALESCE(severity, ''), + COALESCE(status, ''), + COALESCE(title, ''), + COALESCE(description, ''), + metric_value, + threshold_value, + dimensions, + fired_at, + resolved_at, + email_sent, + created_at +FROM ops_alert_events +WHERE rule_id = $1 AND status = $2 +ORDER BY fired_at DESC +LIMIT 1` + + row := r.db.QueryRowContext(ctx, q, ruleID, service.OpsAlertStatusFiring) + ev, err := scanOpsAlertEvent(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return ev, nil +} + +func (r *opsRepository) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*service.OpsAlertEvent, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if ruleID <= 0 { + return nil, fmt.Errorf("invalid rule id") + } + + q := ` +SELECT + id, + COALESCE(rule_id, 0), + COALESCE(severity, ''), + COALESCE(status, ''), + COALESCE(title, ''), + COALESCE(description, ''), + metric_value, + threshold_value, + dimensions, + fired_at, + resolved_at, + email_sent, + created_at +FROM ops_alert_events +WHERE rule_id = $1 +ORDER BY fired_at DESC +LIMIT 1` + + row := r.db.QueryRowContext(ctx, q, ruleID) + ev, err := scanOpsAlertEvent(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return ev, nil +} + +func (r *opsRepository) CreateAlertEvent(ctx context.Context, event *service.OpsAlertEvent) (*service.OpsAlertEvent, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if event == nil { + return nil, fmt.Errorf("nil event") + } + + dimensionsArg, err := opsNullJSONMap(event.Dimensions) + if err != nil { + return nil, err + } + + q := ` +INSERT INTO ops_alert_events ( + rule_id, + severity, + status, + title, + description, + metric_value, + threshold_value, + dimensions, + fired_at, + resolved_at, + email_sent, + created_at +) VALUES ( + $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,NOW() +) +RETURNING + id, + COALESCE(rule_id, 0), + COALESCE(severity, ''), + COALESCE(status, ''), + COALESCE(title, ''), + COALESCE(description, ''), + metric_value, + threshold_value, + dimensions, + fired_at, + resolved_at, + email_sent, + created_at` + + row := r.db.QueryRowContext( + ctx, + q, + opsNullInt64(&event.RuleID), + opsNullString(event.Severity), + opsNullString(event.Status), + opsNullString(event.Title), + opsNullString(event.Description), + opsNullFloat64(event.MetricValue), + opsNullFloat64(event.ThresholdValue), + dimensionsArg, + event.FiredAt, + opsNullTime(event.ResolvedAt), + event.EmailSent, + ) + return scanOpsAlertEvent(row) +} + +func (r *opsRepository) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if eventID <= 0 { + return fmt.Errorf("invalid event id") + } + if strings.TrimSpace(status) == "" { + return fmt.Errorf("invalid status") + } + + q := ` +UPDATE ops_alert_events +SET status = $2, + resolved_at = $3 +WHERE id = $1` + + _, err := r.db.ExecContext(ctx, q, eventID, strings.TrimSpace(status), opsNullTime(resolvedAt)) + return err +} + +func (r *opsRepository) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if eventID <= 0 { + return fmt.Errorf("invalid event id") + } + + _, err := r.db.ExecContext(ctx, "UPDATE ops_alert_events SET email_sent = $2 WHERE id = $1", eventID, emailSent) + return err +} + +type opsAlertEventRow interface { + Scan(dest ...any) error +} + +func scanOpsAlertEvent(row opsAlertEventRow) (*service.OpsAlertEvent, error) { + var ev service.OpsAlertEvent + var metricValue sql.NullFloat64 + var thresholdValue sql.NullFloat64 + var dimensionsRaw []byte + var resolvedAt sql.NullTime + + if err := row.Scan( + &ev.ID, + &ev.RuleID, + &ev.Severity, + &ev.Status, + &ev.Title, + &ev.Description, + &metricValue, + &thresholdValue, + &dimensionsRaw, + &ev.FiredAt, + &resolvedAt, + &ev.EmailSent, + &ev.CreatedAt, + ); err != nil { + return nil, err + } + if metricValue.Valid { + v := metricValue.Float64 + ev.MetricValue = &v + } + if thresholdValue.Valid { + v := thresholdValue.Float64 + ev.ThresholdValue = &v + } + if resolvedAt.Valid { + v := resolvedAt.Time + ev.ResolvedAt = &v + } + if len(dimensionsRaw) > 0 && string(dimensionsRaw) != "null" { + var decoded map[string]any + if err := json.Unmarshal(dimensionsRaw, &decoded); err == nil { + ev.Dimensions = decoded + } + } + return &ev, nil +} + +func buildOpsAlertEventsWhere(filter *service.OpsAlertEventFilter) (string, []any) { + clauses := []string{"1=1"} + args := []any{} + + if filter == nil { + return "WHERE " + strings.Join(clauses, " AND "), args + } + + if status := strings.TrimSpace(filter.Status); status != "" { + args = append(args, status) + clauses = append(clauses, "status = $"+itoa(len(args))) + } + if severity := strings.TrimSpace(filter.Severity); severity != "" { + args = append(args, severity) + clauses = append(clauses, "severity = $"+itoa(len(args))) + } + if filter.StartTime != nil && !filter.StartTime.IsZero() { + args = append(args, *filter.StartTime) + clauses = append(clauses, "fired_at >= $"+itoa(len(args))) + } + if filter.EndTime != nil && !filter.EndTime.IsZero() { + args = append(args, *filter.EndTime) + clauses = append(clauses, "fired_at < $"+itoa(len(args))) + } + + // Dimensions are stored in JSONB. We filter best-effort without requiring GIN indexes. + if platform := strings.TrimSpace(filter.Platform); platform != "" { + args = append(args, platform) + clauses = append(clauses, "(dimensions->>'platform') = $"+itoa(len(args))) + } + if filter.GroupID != nil && *filter.GroupID > 0 { + args = append(args, fmt.Sprintf("%d", *filter.GroupID)) + clauses = append(clauses, "(dimensions->>'group_id') = $"+itoa(len(args))) + } + + return "WHERE " + strings.Join(clauses, " AND "), args +} + +func opsNullJSONMap(v map[string]any) (any, error) { + if v == nil { + return sql.NullString{}, nil + } + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + if len(b) == 0 { + return sql.NullString{}, nil + } + return sql.NullString{String: string(b), Valid: true}, nil +} diff --git a/backend/internal/repository/ops_repo_dashboard.go b/backend/internal/repository/ops_repo_dashboard.go new file mode 100644 index 00000000..d96efd48 --- /dev/null +++ b/backend/internal/repository/ops_repo_dashboard.go @@ -0,0 +1,1012 @@ +package repository + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) GetDashboardOverview(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsDashboardOverview, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + mode := filter.QueryMode + if !mode.IsValid() { + mode = service.OpsQueryModeRaw + } + + switch mode { + case service.OpsQueryModePreagg: + return r.getDashboardOverviewPreaggregated(ctx, filter) + case service.OpsQueryModeAuto: + out, err := r.getDashboardOverviewPreaggregated(ctx, filter) + if err != nil && errors.Is(err, service.ErrOpsPreaggregatedNotPopulated) { + return r.getDashboardOverviewRaw(ctx, filter) + } + return out, err + default: + return r.getDashboardOverviewRaw(ctx, filter) + } +} + +func (r *opsRepository) getDashboardOverviewRaw(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsDashboardOverview, error) { + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + + successCount, tokenConsumed, err := r.queryUsageCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + duration, ttft, err := r.queryUsageLatency(ctx, filter, start, end) + if err != nil { + return nil, err + } + + errorTotal, businessLimited, errorCountSLA, upstreamExcl, upstream429, upstream529, err := r.queryErrorCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + windowSeconds := end.Sub(start).Seconds() + if windowSeconds <= 0 { + windowSeconds = 1 + } + + requestCountTotal := successCount + errorTotal + requestCountSLA := successCount + errorCountSLA + + sla := safeDivideFloat64(float64(successCount), float64(requestCountSLA)) + errorRate := safeDivideFloat64(float64(errorCountSLA), float64(requestCountSLA)) + upstreamErrorRate := safeDivideFloat64(float64(upstreamExcl), float64(requestCountSLA)) + + qpsCurrent, tpsCurrent, err := r.queryCurrentRates(ctx, filter, end) + if err != nil { + return nil, err + } + + qpsPeak, err := r.queryPeakQPS(ctx, filter, start, end) + if err != nil { + return nil, err + } + tpsPeak, err := r.queryPeakTPS(ctx, filter, start, end) + if err != nil { + return nil, err + } + + qpsAvg := roundTo1DP(float64(requestCountTotal) / windowSeconds) + tpsAvg := roundTo1DP(float64(tokenConsumed) / windowSeconds) + + return &service.OpsDashboardOverview{ + StartTime: start, + EndTime: end, + Platform: strings.TrimSpace(filter.Platform), + GroupID: filter.GroupID, + + SuccessCount: successCount, + ErrorCountTotal: errorTotal, + BusinessLimitedCount: businessLimited, + ErrorCountSLA: errorCountSLA, + RequestCountTotal: requestCountTotal, + RequestCountSLA: requestCountSLA, + TokenConsumed: tokenConsumed, + + SLA: roundTo4DP(sla), + ErrorRate: roundTo4DP(errorRate), + UpstreamErrorRate: roundTo4DP(upstreamErrorRate), + UpstreamErrorCountExcl429529: upstreamExcl, + Upstream429Count: upstream429, + Upstream529Count: upstream529, + + QPS: service.OpsRateSummary{ + Current: qpsCurrent, + Peak: qpsPeak, + Avg: qpsAvg, + }, + TPS: service.OpsRateSummary{ + Current: tpsCurrent, + Peak: tpsPeak, + Avg: tpsAvg, + }, + + Duration: duration, + TTFT: ttft, + }, nil +} + +type opsDashboardPartial struct { + successCount int64 + errorCountTotal int64 + businessLimitedCount int64 + errorCountSLA int64 + + upstreamErrorCountExcl429529 int64 + upstream429Count int64 + upstream529Count int64 + + tokenConsumed int64 + + duration service.OpsPercentiles + ttft service.OpsPercentiles +} + +func (r *opsRepository) getDashboardOverviewPreaggregated(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsDashboardOverview, error) { + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + + // Stable full-hour range covered by pre-aggregation. + aggSafeEnd := preaggSafeEnd(end) + aggFullStart := utcCeilToHour(start) + aggFullEnd := utcFloorToHour(aggSafeEnd) + + // If there are no stable full-hour buckets, use raw directly (short windows). + if !aggFullStart.Before(aggFullEnd) { + return r.getDashboardOverviewRaw(ctx, filter) + } + + // 1) Pre-aggregated stable segment. + preaggRows, err := r.listHourlyMetricsRows(ctx, filter, aggFullStart, aggFullEnd) + if err != nil { + return nil, err + } + if len(preaggRows) == 0 { + // Distinguish "no data" vs "preagg not populated yet". + if exists, err := r.rawOpsDataExists(ctx, filter, aggFullStart, aggFullEnd); err == nil && exists { + return nil, service.ErrOpsPreaggregatedNotPopulated + } + } + preagg := aggregateHourlyRows(preaggRows) + + // 2) Raw head/tail fragments (at most ~1 hour each). + head := opsDashboardPartial{} + tail := opsDashboardPartial{} + + if start.Before(aggFullStart) { + part, err := r.queryRawPartial(ctx, filter, start, minTime(end, aggFullStart)) + if err != nil { + return nil, err + } + head = *part + } + if aggFullEnd.Before(end) { + part, err := r.queryRawPartial(ctx, filter, maxTime(start, aggFullEnd), end) + if err != nil { + return nil, err + } + tail = *part + } + + // Merge counts. + successCount := preagg.successCount + head.successCount + tail.successCount + errorTotal := preagg.errorCountTotal + head.errorCountTotal + tail.errorCountTotal + businessLimited := preagg.businessLimitedCount + head.businessLimitedCount + tail.businessLimitedCount + errorCountSLA := preagg.errorCountSLA + head.errorCountSLA + tail.errorCountSLA + + upstreamExcl := preagg.upstreamErrorCountExcl429529 + head.upstreamErrorCountExcl429529 + tail.upstreamErrorCountExcl429529 + upstream429 := preagg.upstream429Count + head.upstream429Count + tail.upstream429Count + upstream529 := preagg.upstream529Count + head.upstream529Count + tail.upstream529Count + + tokenConsumed := preagg.tokenConsumed + head.tokenConsumed + tail.tokenConsumed + + // Approximate percentiles across segments: + // - p50/p90/avg: weighted average by success_count + // - p95/p99/max: max (conservative tail) + duration := combineApproxPercentiles([]opsPercentileSegment{ + {weight: preagg.successCount, p: preagg.duration}, + {weight: head.successCount, p: head.duration}, + {weight: tail.successCount, p: tail.duration}, + }) + ttft := combineApproxPercentiles([]opsPercentileSegment{ + {weight: preagg.successCount, p: preagg.ttft}, + {weight: head.successCount, p: head.ttft}, + {weight: tail.successCount, p: tail.ttft}, + }) + + windowSeconds := end.Sub(start).Seconds() + if windowSeconds <= 0 { + windowSeconds = 1 + } + + requestCountTotal := successCount + errorTotal + requestCountSLA := successCount + errorCountSLA + + sla := safeDivideFloat64(float64(successCount), float64(requestCountSLA)) + errorRate := safeDivideFloat64(float64(errorCountSLA), float64(requestCountSLA)) + upstreamErrorRate := safeDivideFloat64(float64(upstreamExcl), float64(requestCountSLA)) + + // Keep "current" rates as raw, to preserve realtime semantics. + qpsCurrent, tpsCurrent, err := r.queryCurrentRates(ctx, filter, end) + if err != nil { + return nil, err + } + + // NOTE: peak still uses raw logs (minute granularity). This is typically cheaper than percentile_cont + // and keeps semantics consistent across modes. + qpsPeak, err := r.queryPeakQPS(ctx, filter, start, end) + if err != nil { + return nil, err + } + tpsPeak, err := r.queryPeakTPS(ctx, filter, start, end) + if err != nil { + return nil, err + } + + qpsAvg := roundTo1DP(float64(requestCountTotal) / windowSeconds) + tpsAvg := roundTo1DP(float64(tokenConsumed) / windowSeconds) + + return &service.OpsDashboardOverview{ + StartTime: start, + EndTime: end, + Platform: strings.TrimSpace(filter.Platform), + GroupID: filter.GroupID, + + SuccessCount: successCount, + ErrorCountTotal: errorTotal, + BusinessLimitedCount: businessLimited, + ErrorCountSLA: errorCountSLA, + RequestCountTotal: requestCountTotal, + RequestCountSLA: requestCountSLA, + TokenConsumed: tokenConsumed, + + SLA: roundTo4DP(sla), + ErrorRate: roundTo4DP(errorRate), + UpstreamErrorRate: roundTo4DP(upstreamErrorRate), + UpstreamErrorCountExcl429529: upstreamExcl, + Upstream429Count: upstream429, + Upstream529Count: upstream529, + + QPS: service.OpsRateSummary{ + Current: qpsCurrent, + Peak: qpsPeak, + Avg: qpsAvg, + }, + TPS: service.OpsRateSummary{ + Current: tpsCurrent, + Peak: tpsPeak, + Avg: tpsAvg, + }, + + Duration: duration, + TTFT: ttft, + }, nil +} + +type opsHourlyMetricsRow struct { + bucketStart time.Time + + successCount int64 + errorCountTotal int64 + businessLimitedCount int64 + errorCountSLA int64 + + upstreamErrorCountExcl429529 int64 + upstream429Count int64 + upstream529Count int64 + + tokenConsumed int64 + + durationP50 sql.NullInt64 + durationP90 sql.NullInt64 + durationP95 sql.NullInt64 + durationP99 sql.NullInt64 + durationAvg sql.NullFloat64 + durationMax sql.NullInt64 + + ttftP50 sql.NullInt64 + ttftP90 sql.NullInt64 + ttftP95 sql.NullInt64 + ttftP99 sql.NullInt64 + ttftAvg sql.NullFloat64 + ttftMax sql.NullInt64 +} + +func (r *opsRepository) listHourlyMetricsRows(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) ([]opsHourlyMetricsRow, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if start.IsZero() || end.IsZero() || !start.Before(end) { + return []opsHourlyMetricsRow{}, nil + } + + where := "bucket_start >= $1 AND bucket_start < $2" + args := []any{start.UTC(), end.UTC()} + idx := 3 + + platform := "" + groupID := (*int64)(nil) + if filter != nil { + platform = strings.TrimSpace(strings.ToLower(filter.Platform)) + groupID = filter.GroupID + } + + switch { + case groupID != nil && *groupID > 0: + where += fmt.Sprintf(" AND group_id = $%d", idx) + args = append(args, *groupID) + idx++ + if platform != "" { + where += fmt.Sprintf(" AND platform = $%d", idx) + args = append(args, platform) + idx++ + } + case platform != "": + where += fmt.Sprintf(" AND platform = $%d AND group_id IS NULL", idx) + args = append(args, platform) + idx++ + default: + where += " AND platform IS NULL AND group_id IS NULL" + } + + q := ` +SELECT + bucket_start, + success_count, + error_count_total, + business_limited_count, + error_count_sla, + upstream_error_count_excl_429_529, + upstream_429_count, + upstream_529_count, + token_consumed, + duration_p50_ms, + duration_p90_ms, + duration_p95_ms, + duration_p99_ms, + duration_avg_ms, + duration_max_ms, + ttft_p50_ms, + ttft_p90_ms, + ttft_p95_ms, + ttft_p99_ms, + ttft_avg_ms, + ttft_max_ms +FROM ops_metrics_hourly +WHERE ` + where + ` +ORDER BY bucket_start ASC` + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make([]opsHourlyMetricsRow, 0, 64) + for rows.Next() { + var row opsHourlyMetricsRow + if err := rows.Scan( + &row.bucketStart, + &row.successCount, + &row.errorCountTotal, + &row.businessLimitedCount, + &row.errorCountSLA, + &row.upstreamErrorCountExcl429529, + &row.upstream429Count, + &row.upstream529Count, + &row.tokenConsumed, + &row.durationP50, + &row.durationP90, + &row.durationP95, + &row.durationP99, + &row.durationAvg, + &row.durationMax, + &row.ttftP50, + &row.ttftP90, + &row.ttftP95, + &row.ttftP99, + &row.ttftAvg, + &row.ttftMax, + ); err != nil { + return nil, err + } + out = append(out, row) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func aggregateHourlyRows(rows []opsHourlyMetricsRow) opsDashboardPartial { + out := opsDashboardPartial{} + if len(rows) == 0 { + return out + } + + var ( + p50Sum float64 + p50W int64 + p90Sum float64 + p90W int64 + avgSum float64 + avgW int64 + ) + var ( + ttftP50Sum float64 + ttftP50W int64 + ttftP90Sum float64 + ttftP90W int64 + ttftAvgSum float64 + ttftAvgW int64 + ) + + var ( + p95Max *int + p99Max *int + maxMax *int + + ttftP95Max *int + ttftP99Max *int + ttftMaxMax *int + ) + + for _, row := range rows { + out.successCount += row.successCount + out.errorCountTotal += row.errorCountTotal + out.businessLimitedCount += row.businessLimitedCount + out.errorCountSLA += row.errorCountSLA + + out.upstreamErrorCountExcl429529 += row.upstreamErrorCountExcl429529 + out.upstream429Count += row.upstream429Count + out.upstream529Count += row.upstream529Count + + out.tokenConsumed += row.tokenConsumed + + if row.successCount > 0 { + if row.durationP50.Valid { + p50Sum += float64(row.durationP50.Int64) * float64(row.successCount) + p50W += row.successCount + } + if row.durationP90.Valid { + p90Sum += float64(row.durationP90.Int64) * float64(row.successCount) + p90W += row.successCount + } + if row.durationAvg.Valid { + avgSum += row.durationAvg.Float64 * float64(row.successCount) + avgW += row.successCount + } + if row.ttftP50.Valid { + ttftP50Sum += float64(row.ttftP50.Int64) * float64(row.successCount) + ttftP50W += row.successCount + } + if row.ttftP90.Valid { + ttftP90Sum += float64(row.ttftP90.Int64) * float64(row.successCount) + ttftP90W += row.successCount + } + if row.ttftAvg.Valid { + ttftAvgSum += row.ttftAvg.Float64 * float64(row.successCount) + ttftAvgW += row.successCount + } + } + + if row.durationP95.Valid { + v := int(row.durationP95.Int64) + if p95Max == nil || v > *p95Max { + p95Max = &v + } + } + if row.durationP99.Valid { + v := int(row.durationP99.Int64) + if p99Max == nil || v > *p99Max { + p99Max = &v + } + } + if row.durationMax.Valid { + v := int(row.durationMax.Int64) + if maxMax == nil || v > *maxMax { + maxMax = &v + } + } + + if row.ttftP95.Valid { + v := int(row.ttftP95.Int64) + if ttftP95Max == nil || v > *ttftP95Max { + ttftP95Max = &v + } + } + if row.ttftP99.Valid { + v := int(row.ttftP99.Int64) + if ttftP99Max == nil || v > *ttftP99Max { + ttftP99Max = &v + } + } + if row.ttftMax.Valid { + v := int(row.ttftMax.Int64) + if ttftMaxMax == nil || v > *ttftMaxMax { + ttftMaxMax = &v + } + } + } + + // duration + if p50W > 0 { + v := int(math.Round(p50Sum / float64(p50W))) + out.duration.P50 = &v + } + if p90W > 0 { + v := int(math.Round(p90Sum / float64(p90W))) + out.duration.P90 = &v + } + out.duration.P95 = p95Max + out.duration.P99 = p99Max + if avgW > 0 { + v := int(math.Round(avgSum / float64(avgW))) + out.duration.Avg = &v + } + out.duration.Max = maxMax + + // ttft + if ttftP50W > 0 { + v := int(math.Round(ttftP50Sum / float64(ttftP50W))) + out.ttft.P50 = &v + } + if ttftP90W > 0 { + v := int(math.Round(ttftP90Sum / float64(ttftP90W))) + out.ttft.P90 = &v + } + out.ttft.P95 = ttftP95Max + out.ttft.P99 = ttftP99Max + if ttftAvgW > 0 { + v := int(math.Round(ttftAvgSum / float64(ttftAvgW))) + out.ttft.Avg = &v + } + out.ttft.Max = ttftMaxMax + + return out +} + +func (r *opsRepository) queryRawPartial(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (*opsDashboardPartial, error) { + successCount, tokenConsumed, err := r.queryUsageCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + duration, ttft, err := r.queryUsageLatency(ctx, filter, start, end) + if err != nil { + return nil, err + } + + errorTotal, businessLimited, errorCountSLA, upstreamExcl, upstream429, upstream529, err := r.queryErrorCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + return &opsDashboardPartial{ + successCount: successCount, + errorCountTotal: errorTotal, + businessLimitedCount: businessLimited, + errorCountSLA: errorCountSLA, + upstreamErrorCountExcl429529: upstreamExcl, + upstream429Count: upstream429, + upstream529Count: upstream529, + tokenConsumed: tokenConsumed, + duration: duration, + ttft: ttft, + }, nil +} + +func (r *opsRepository) rawOpsDataExists(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (bool, error) { + { + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + q := `SELECT EXISTS(SELECT 1 FROM usage_logs ul ` + join + ` ` + where + ` LIMIT 1)` + var exists bool + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&exists); err != nil { + return false, err + } + if exists { + return true, nil + } + } + + { + where, args, _ := buildErrorWhere(filter, start, end, 1) + q := `SELECT EXISTS(SELECT 1 FROM ops_error_logs ` + where + ` LIMIT 1)` + var exists bool + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&exists); err != nil { + return false, err + } + return exists, nil + } +} + +type opsPercentileSegment struct { + weight int64 + p service.OpsPercentiles +} + +func combineApproxPercentiles(segments []opsPercentileSegment) service.OpsPercentiles { + weightedInt := func(get func(service.OpsPercentiles) *int) *int { + var sum float64 + var w int64 + for _, seg := range segments { + if seg.weight <= 0 { + continue + } + v := get(seg.p) + if v == nil { + continue + } + sum += float64(*v) * float64(seg.weight) + w += seg.weight + } + if w <= 0 { + return nil + } + out := int(math.Round(sum / float64(w))) + return &out + } + + maxInt := func(get func(service.OpsPercentiles) *int) *int { + var max *int + for _, seg := range segments { + v := get(seg.p) + if v == nil { + continue + } + if max == nil || *v > *max { + c := *v + max = &c + } + } + return max + } + + return service.OpsPercentiles{ + P50: weightedInt(func(p service.OpsPercentiles) *int { return p.P50 }), + P90: weightedInt(func(p service.OpsPercentiles) *int { return p.P90 }), + P95: maxInt(func(p service.OpsPercentiles) *int { return p.P95 }), + P99: maxInt(func(p service.OpsPercentiles) *int { return p.P99 }), + Avg: weightedInt(func(p service.OpsPercentiles) *int { return p.Avg }), + Max: maxInt(func(p service.OpsPercentiles) *int { return p.Max }), + } +} + +func preaggSafeEnd(endTime time.Time) time.Time { + now := time.Now().UTC() + cutoff := now.Add(-5 * time.Minute) + if endTime.After(cutoff) { + return cutoff + } + return endTime +} + +func utcCeilToHour(t time.Time) time.Time { + u := t.UTC() + f := u.Truncate(time.Hour) + if f.Equal(u) { + return f + } + return f.Add(time.Hour) +} + +func utcFloorToHour(t time.Time) time.Time { + return t.UTC().Truncate(time.Hour) +} + +func minTime(a, b time.Time) time.Time { + if a.Before(b) { + return a + } + return b +} + +func maxTime(a, b time.Time) time.Time { + if a.After(b) { + return a + } + return b +} + +func (r *opsRepository) queryUsageCounts(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (successCount int64, tokenConsumed int64, err error) { + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + + q := ` +SELECT + COALESCE(COUNT(*), 0) AS success_count, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed +FROM usage_logs ul +` + join + ` +` + where + + var tokens sql.NullInt64 + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&successCount, &tokens); err != nil { + return 0, 0, err + } + if tokens.Valid { + tokenConsumed = tokens.Int64 + } + return successCount, tokenConsumed, nil +} + +func (r *opsRepository) queryUsageLatency(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (duration service.OpsPercentiles, ttft service.OpsPercentiles, err error) { + { + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + q := ` +SELECT + percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) AS p50, + percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) AS p90, + percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) AS p95, + percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99, + AVG(duration_ms) AS avg_ms, + MAX(duration_ms) AS max_ms +FROM usage_logs ul +` + join + ` +` + where + ` +AND duration_ms IS NOT NULL` + + var p50, p90, p95, p99 sql.NullFloat64 + var avg sql.NullFloat64 + var max sql.NullInt64 + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil { + return service.OpsPercentiles{}, service.OpsPercentiles{}, err + } + duration.P50 = floatToIntPtr(p50) + duration.P90 = floatToIntPtr(p90) + duration.P95 = floatToIntPtr(p95) + duration.P99 = floatToIntPtr(p99) + duration.Avg = floatToIntPtr(avg) + if max.Valid { + v := int(max.Int64) + duration.Max = &v + } + } + + { + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + q := ` +SELECT + percentile_cont(0.50) WITHIN GROUP (ORDER BY first_token_ms) AS p50, + percentile_cont(0.90) WITHIN GROUP (ORDER BY first_token_ms) AS p90, + percentile_cont(0.95) WITHIN GROUP (ORDER BY first_token_ms) AS p95, + percentile_cont(0.99) WITHIN GROUP (ORDER BY first_token_ms) AS p99, + AVG(first_token_ms) AS avg_ms, + MAX(first_token_ms) AS max_ms +FROM usage_logs ul +` + join + ` +` + where + ` +AND first_token_ms IS NOT NULL` + + var p50, p90, p95, p99 sql.NullFloat64 + var avg sql.NullFloat64 + var max sql.NullInt64 + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil { + return service.OpsPercentiles{}, service.OpsPercentiles{}, err + } + ttft.P50 = floatToIntPtr(p50) + ttft.P90 = floatToIntPtr(p90) + ttft.P95 = floatToIntPtr(p95) + ttft.P99 = floatToIntPtr(p99) + ttft.Avg = floatToIntPtr(avg) + if max.Valid { + v := int(max.Int64) + ttft.Max = &v + } + } + + return duration, ttft, nil +} + +func (r *opsRepository) queryErrorCounts(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) ( + errorTotal int64, + businessLimited int64, + errorCountSLA int64, + upstreamExcl429529 int64, + upstream429 int64, + upstream529 int64, + err error, +) { + where, args, _ := buildErrorWhere(filter, start, end, 1) + + q := ` +SELECT + COALESCE(COUNT(*), 0) AS error_total, + COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited, + COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429), 0) AS upstream_429, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529), 0) AS upstream_529 +FROM ops_error_logs +` + where + + if err := r.db.QueryRowContext(ctx, q, args...).Scan( + &errorTotal, + &businessLimited, + &errorCountSLA, + &upstreamExcl429529, + &upstream429, + &upstream529, + ); err != nil { + return 0, 0, 0, 0, 0, 0, err + } + return errorTotal, businessLimited, errorCountSLA, upstreamExcl429529, upstream429, upstream529, nil +} + +func (r *opsRepository) queryCurrentRates(ctx context.Context, filter *service.OpsDashboardFilter, end time.Time) (qpsCurrent float64, tpsCurrent float64, err error) { + windowStart := end.Add(-1 * time.Minute) + + successCount1m, token1m, err := r.queryUsageCounts(ctx, filter, windowStart, end) + if err != nil { + return 0, 0, err + } + errorCount1m, _, _, _, _, _, err := r.queryErrorCounts(ctx, filter, windowStart, end) + if err != nil { + return 0, 0, err + } + + qpsCurrent = roundTo1DP(float64(successCount1m+errorCount1m) / 60.0) + tpsCurrent = roundTo1DP(float64(token1m) / 60.0) + return qpsCurrent, tpsCurrent, nil +} + +func (r *opsRepository) queryPeakQPS(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (float64, error) { + usageJoin, usageWhere, usageArgs, next := buildUsageWhere(filter, start, end, 1) + errorWhere, errorArgs, _ := buildErrorWhere(filter, start, end, next) + + q := ` +WITH usage_buckets AS ( + SELECT date_trunc('minute', ul.created_at) AS bucket, COUNT(*) AS cnt + FROM usage_logs ul + ` + usageJoin + ` + ` + usageWhere + ` + GROUP BY 1 +), +error_buckets AS ( + SELECT date_trunc('minute', created_at) AS bucket, COUNT(*) AS cnt + FROM ops_error_logs + ` + errorWhere + ` + GROUP BY 1 +), +combined AS ( + SELECT COALESCE(u.bucket, e.bucket) AS bucket, + COALESCE(u.cnt, 0) + COALESCE(e.cnt, 0) AS total + FROM usage_buckets u + FULL OUTER JOIN error_buckets e ON u.bucket = e.bucket +) +SELECT COALESCE(MAX(total), 0) FROM combined` + + args := append(usageArgs, errorArgs...) + + var maxPerMinute sql.NullInt64 + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&maxPerMinute); err != nil { + return 0, err + } + if !maxPerMinute.Valid || maxPerMinute.Int64 <= 0 { + return 0, nil + } + return roundTo1DP(float64(maxPerMinute.Int64) / 60.0), nil +} + +func (r *opsRepository) queryPeakTPS(ctx context.Context, filter *service.OpsDashboardFilter, start, end time.Time) (float64, error) { + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + + q := ` +SELECT COALESCE(MAX(tokens_per_min), 0) +FROM ( + SELECT + date_trunc('minute', ul.created_at) AS bucket, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS tokens_per_min + FROM usage_logs ul + ` + join + ` + ` + where + ` + GROUP BY 1 +) t` + + var maxPerMinute sql.NullInt64 + if err := r.db.QueryRowContext(ctx, q, args...).Scan(&maxPerMinute); err != nil { + return 0, err + } + if !maxPerMinute.Valid || maxPerMinute.Int64 <= 0 { + return 0, nil + } + return roundTo1DP(float64(maxPerMinute.Int64) / 60.0), nil +} + +func buildUsageWhere(filter *service.OpsDashboardFilter, start, end time.Time, startIndex int) (join string, where string, args []any, nextIndex int) { + platform := "" + groupID := (*int64)(nil) + if filter != nil { + platform = strings.TrimSpace(strings.ToLower(filter.Platform)) + groupID = filter.GroupID + } + + idx := startIndex + clauses := make([]string, 0, 4) + args = make([]any, 0, 4) + + args = append(args, start) + clauses = append(clauses, fmt.Sprintf("ul.created_at >= $%d", idx)) + idx++ + args = append(args, end) + clauses = append(clauses, fmt.Sprintf("ul.created_at < $%d", idx)) + idx++ + + if groupID != nil && *groupID > 0 { + args = append(args, *groupID) + clauses = append(clauses, fmt.Sprintf("ul.group_id = $%d", idx)) + idx++ + } + if platform != "" { + // Prefer group.platform when available; fall back to account.platform so we don't + // drop rows where group_id is NULL. + join = "LEFT JOIN groups g ON g.id = ul.group_id LEFT JOIN accounts a ON a.id = ul.account_id" + args = append(args, platform) + clauses = append(clauses, fmt.Sprintf("COALESCE(NULLIF(g.platform,''), a.platform) = $%d", idx)) + idx++ + } + + where = "WHERE " + strings.Join(clauses, " AND ") + return join, where, args, idx +} + +func buildErrorWhere(filter *service.OpsDashboardFilter, start, end time.Time, startIndex int) (where string, args []any, nextIndex int) { + platform := "" + groupID := (*int64)(nil) + if filter != nil { + platform = strings.TrimSpace(strings.ToLower(filter.Platform)) + groupID = filter.GroupID + } + + idx := startIndex + clauses := make([]string, 0, 4) + args = make([]any, 0, 4) + + args = append(args, start) + clauses = append(clauses, fmt.Sprintf("created_at >= $%d", idx)) + idx++ + args = append(args, end) + clauses = append(clauses, fmt.Sprintf("created_at < $%d", idx)) + idx++ + + if groupID != nil && *groupID > 0 { + args = append(args, *groupID) + clauses = append(clauses, fmt.Sprintf("group_id = $%d", idx)) + idx++ + } + if platform != "" { + args = append(args, platform) + clauses = append(clauses, fmt.Sprintf("platform = $%d", idx)) + idx++ + } + + where = "WHERE " + strings.Join(clauses, " AND ") + return where, args, idx +} + +func floatToIntPtr(v sql.NullFloat64) *int { + if !v.Valid { + return nil + } + n := int(math.Round(v.Float64)) + return &n +} + +func safeDivideFloat64(numerator float64, denominator float64) float64 { + if denominator == 0 { + return 0 + } + return numerator / denominator +} + +func roundTo1DP(v float64) float64 { + return math.Round(v*10) / 10 +} + +func roundTo4DP(v float64) float64 { + return math.Round(v*10000) / 10000 +} diff --git a/backend/internal/repository/ops_repo_histograms.go b/backend/internal/repository/ops_repo_histograms.go new file mode 100644 index 00000000..143c7e83 --- /dev/null +++ b/backend/internal/repository/ops_repo_histograms.go @@ -0,0 +1,79 @@ +package repository + +import ( + "context" + "fmt" + "strings" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) GetLatencyHistogram(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsLatencyHistogramResponse, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + + join, where, args, _ := buildUsageWhere(filter, start, end, 1) + rangeExpr := latencyHistogramRangeCaseExpr("ul.duration_ms") + orderExpr := latencyHistogramRangeOrderCaseExpr("ul.duration_ms") + + q := ` +SELECT + ` + rangeExpr + ` AS range, + COALESCE(COUNT(*), 0) AS count, + ` + orderExpr + ` AS ord +FROM usage_logs ul +` + join + ` +` + where + ` +AND ul.duration_ms IS NOT NULL +GROUP BY 1, 3 +ORDER BY 3 ASC` + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + counts := make(map[string]int64, len(latencyHistogramOrderedRanges)) + var total int64 + for rows.Next() { + var label string + var count int64 + var _ord int + if err := rows.Scan(&label, &count, &_ord); err != nil { + return nil, err + } + counts[label] = count + total += count + } + if err := rows.Err(); err != nil { + return nil, err + } + + buckets := make([]*service.OpsLatencyHistogramBucket, 0, len(latencyHistogramOrderedRanges)) + for _, label := range latencyHistogramOrderedRanges { + buckets = append(buckets, &service.OpsLatencyHistogramBucket{ + Range: label, + Count: counts[label], + }) + } + + return &service.OpsLatencyHistogramResponse{ + StartTime: start, + EndTime: end, + Platform: strings.TrimSpace(filter.Platform), + GroupID: filter.GroupID, + TotalRequests: total, + Buckets: buckets, + }, nil +} diff --git a/backend/internal/repository/ops_repo_latency_histogram_buckets.go b/backend/internal/repository/ops_repo_latency_histogram_buckets.go new file mode 100644 index 00000000..fc085fc6 --- /dev/null +++ b/backend/internal/repository/ops_repo_latency_histogram_buckets.go @@ -0,0 +1,64 @@ +package repository + +import ( + "fmt" + "strings" +) + +type latencyHistogramBucket struct { + upperMs int + label string +} + +var latencyHistogramBuckets = []latencyHistogramBucket{ + {upperMs: 100, label: "0-100ms"}, + {upperMs: 200, label: "100-200ms"}, + {upperMs: 500, label: "200-500ms"}, + {upperMs: 1000, label: "500-1000ms"}, + {upperMs: 2000, label: "1000-2000ms"}, + {upperMs: 0, label: "2000ms+"}, // default bucket +} + +var latencyHistogramOrderedRanges = func() []string { + out := make([]string, 0, len(latencyHistogramBuckets)) + for _, b := range latencyHistogramBuckets { + out = append(out, b.label) + } + return out +}() + +func latencyHistogramRangeCaseExpr(column string) string { + var sb strings.Builder + sb.WriteString("CASE\n") + + for _, b := range latencyHistogramBuckets { + if b.upperMs <= 0 { + continue + } + sb.WriteString(fmt.Sprintf("\tWHEN %s < %d THEN '%s'\n", column, b.upperMs, b.label)) + } + + // Default bucket. + last := latencyHistogramBuckets[len(latencyHistogramBuckets)-1] + sb.WriteString(fmt.Sprintf("\tELSE '%s'\n", last.label)) + sb.WriteString("END") + return sb.String() +} + +func latencyHistogramRangeOrderCaseExpr(column string) string { + var sb strings.Builder + sb.WriteString("CASE\n") + + order := 1 + for _, b := range latencyHistogramBuckets { + if b.upperMs <= 0 { + continue + } + sb.WriteString(fmt.Sprintf("\tWHEN %s < %d THEN %d\n", column, b.upperMs, order)) + order++ + } + + sb.WriteString(fmt.Sprintf("\tELSE %d\n", order)) + sb.WriteString("END") + return sb.String() +} diff --git a/backend/internal/repository/ops_repo_latency_histogram_buckets_test.go b/backend/internal/repository/ops_repo_latency_histogram_buckets_test.go new file mode 100644 index 00000000..dc79f6cc --- /dev/null +++ b/backend/internal/repository/ops_repo_latency_histogram_buckets_test.go @@ -0,0 +1,14 @@ +package repository + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLatencyHistogramBuckets_AreConsistent(t *testing.T) { + require.Equal(t, len(latencyHistogramBuckets), len(latencyHistogramOrderedRanges)) + for i, b := range latencyHistogramBuckets { + require.Equal(t, b.label, latencyHistogramOrderedRanges[i]) + } +} diff --git a/backend/internal/repository/ops_repo_metrics.go b/backend/internal/repository/ops_repo_metrics.go new file mode 100644 index 00000000..96bad88a --- /dev/null +++ b/backend/internal/repository/ops_repo_metrics.go @@ -0,0 +1,401 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) InsertSystemMetrics(ctx context.Context, input *service.OpsInsertSystemMetricsInput) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if input == nil { + return fmt.Errorf("nil input") + } + + window := input.WindowMinutes + if window <= 0 { + window = 1 + } + createdAt := input.CreatedAt + if createdAt.IsZero() { + createdAt = time.Now().UTC() + } + + q := ` +INSERT INTO ops_system_metrics ( + created_at, + window_minutes, + platform, + group_id, + + success_count, + error_count_total, + business_limited_count, + error_count_sla, + + upstream_error_count_excl_429_529, + upstream_429_count, + upstream_529_count, + + token_consumed, + qps, + tps, + + duration_p50_ms, + duration_p90_ms, + duration_p95_ms, + duration_p99_ms, + duration_avg_ms, + duration_max_ms, + + ttft_p50_ms, + ttft_p90_ms, + ttft_p95_ms, + ttft_p99_ms, + ttft_avg_ms, + ttft_max_ms, + + cpu_usage_percent, + memory_used_mb, + memory_total_mb, + memory_usage_percent, + + db_ok, + redis_ok, + + db_conn_active, + db_conn_idle, + db_conn_waiting, + + goroutine_count, + concurrency_queue_depth +) VALUES ( + $1,$2,$3,$4, + $5,$6,$7,$8, + $9,$10,$11, + $12,$13,$14, + $15,$16,$17,$18,$19,$20, + $21,$22,$23,$24,$25,$26, + $27,$28,$29,$30, + $31,$32, + $33,$34,$35, + $36,$37 +)` + + _, err := r.db.ExecContext( + ctx, + q, + createdAt, + window, + opsNullString(input.Platform), + opsNullInt64(input.GroupID), + + input.SuccessCount, + input.ErrorCountTotal, + input.BusinessLimitedCount, + input.ErrorCountSLA, + + input.UpstreamErrorCountExcl429529, + input.Upstream429Count, + input.Upstream529Count, + + input.TokenConsumed, + opsNullFloat64(input.QPS), + opsNullFloat64(input.TPS), + + opsNullInt(input.DurationP50Ms), + opsNullInt(input.DurationP90Ms), + opsNullInt(input.DurationP95Ms), + opsNullInt(input.DurationP99Ms), + opsNullFloat64(input.DurationAvgMs), + opsNullInt(input.DurationMaxMs), + + opsNullInt(input.TTFTP50Ms), + opsNullInt(input.TTFTP90Ms), + opsNullInt(input.TTFTP95Ms), + opsNullInt(input.TTFTP99Ms), + opsNullFloat64(input.TTFTAvgMs), + opsNullInt(input.TTFTMaxMs), + + opsNullFloat64(input.CPUUsagePercent), + opsNullInt(input.MemoryUsedMB), + opsNullInt(input.MemoryTotalMB), + opsNullFloat64(input.MemoryUsagePercent), + + opsNullBool(input.DBOK), + opsNullBool(input.RedisOK), + + opsNullInt(input.DBConnActive), + opsNullInt(input.DBConnIdle), + opsNullInt(input.DBConnWaiting), + + opsNullInt(input.GoroutineCount), + opsNullInt(input.ConcurrencyQueueDepth), + ) + return err +} + +func (r *opsRepository) GetLatestSystemMetrics(ctx context.Context, windowMinutes int) (*service.OpsSystemMetricsSnapshot, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if windowMinutes <= 0 { + windowMinutes = 1 + } + + q := ` +SELECT + id, + created_at, + window_minutes, + + cpu_usage_percent, + memory_used_mb, + memory_total_mb, + memory_usage_percent, + + db_ok, + redis_ok, + + db_conn_active, + db_conn_idle, + db_conn_waiting, + + goroutine_count, + concurrency_queue_depth +FROM ops_system_metrics +WHERE window_minutes = $1 + AND platform IS NULL + AND group_id IS NULL +ORDER BY created_at DESC +LIMIT 1` + + var out service.OpsSystemMetricsSnapshot + var cpu sql.NullFloat64 + var memUsed sql.NullInt64 + var memTotal sql.NullInt64 + var memPct sql.NullFloat64 + var dbOK sql.NullBool + var redisOK sql.NullBool + var dbActive sql.NullInt64 + var dbIdle sql.NullInt64 + var dbWaiting sql.NullInt64 + var goroutines sql.NullInt64 + var queueDepth sql.NullInt64 + + if err := r.db.QueryRowContext(ctx, q, windowMinutes).Scan( + &out.ID, + &out.CreatedAt, + &out.WindowMinutes, + &cpu, + &memUsed, + &memTotal, + &memPct, + &dbOK, + &redisOK, + &dbActive, + &dbIdle, + &dbWaiting, + &goroutines, + &queueDepth, + ); err != nil { + return nil, err + } + + if cpu.Valid { + v := cpu.Float64 + out.CPUUsagePercent = &v + } + if memUsed.Valid { + v := memUsed.Int64 + out.MemoryUsedMB = &v + } + if memTotal.Valid { + v := memTotal.Int64 + out.MemoryTotalMB = &v + } + if memPct.Valid { + v := memPct.Float64 + out.MemoryUsagePercent = &v + } + if dbOK.Valid { + v := dbOK.Bool + out.DBOK = &v + } + if redisOK.Valid { + v := redisOK.Bool + out.RedisOK = &v + } + if dbActive.Valid { + v := int(dbActive.Int64) + out.DBConnActive = &v + } + if dbIdle.Valid { + v := int(dbIdle.Int64) + out.DBConnIdle = &v + } + if dbWaiting.Valid { + v := int(dbWaiting.Int64) + out.DBConnWaiting = &v + } + if goroutines.Valid { + v := int(goroutines.Int64) + out.GoroutineCount = &v + } + if queueDepth.Valid { + v := int(queueDepth.Int64) + out.ConcurrencyQueueDepth = &v + } + + return &out, nil +} + +func (r *opsRepository) UpsertJobHeartbeat(ctx context.Context, input *service.OpsUpsertJobHeartbeatInput) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if input == nil { + return fmt.Errorf("nil input") + } + if input.JobName == "" { + return fmt.Errorf("job_name required") + } + + q := ` +INSERT INTO ops_job_heartbeats ( + job_name, + last_run_at, + last_success_at, + last_error_at, + last_error, + last_duration_ms, + updated_at +) VALUES ( + $1,$2,$3,$4,$5,$6,NOW() +) +ON CONFLICT (job_name) DO UPDATE SET + last_run_at = COALESCE(EXCLUDED.last_run_at, ops_job_heartbeats.last_run_at), + last_success_at = COALESCE(EXCLUDED.last_success_at, ops_job_heartbeats.last_success_at), + last_error_at = CASE + WHEN EXCLUDED.last_success_at IS NOT NULL THEN NULL + ELSE COALESCE(EXCLUDED.last_error_at, ops_job_heartbeats.last_error_at) + END, + last_error = CASE + WHEN EXCLUDED.last_success_at IS NOT NULL THEN NULL + ELSE COALESCE(EXCLUDED.last_error, ops_job_heartbeats.last_error) + END, + last_duration_ms = COALESCE(EXCLUDED.last_duration_ms, ops_job_heartbeats.last_duration_ms), + updated_at = NOW()` + + _, err := r.db.ExecContext( + ctx, + q, + input.JobName, + opsNullTime(input.LastRunAt), + opsNullTime(input.LastSuccessAt), + opsNullTime(input.LastErrorAt), + opsNullString(input.LastError), + opsNullInt(input.LastDurationMs), + ) + return err +} + +func (r *opsRepository) ListJobHeartbeats(ctx context.Context) ([]*service.OpsJobHeartbeat, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + + q := ` +SELECT + job_name, + last_run_at, + last_success_at, + last_error_at, + last_error, + last_duration_ms, + updated_at +FROM ops_job_heartbeats +ORDER BY job_name ASC` + + rows, err := r.db.QueryContext(ctx, q) + if err != nil { + return nil, err + } + defer rows.Close() + + out := make([]*service.OpsJobHeartbeat, 0, 8) + for rows.Next() { + var item service.OpsJobHeartbeat + var lastRun sql.NullTime + var lastSuccess sql.NullTime + var lastErrorAt sql.NullTime + var lastError sql.NullString + var lastDuration sql.NullInt64 + + if err := rows.Scan( + &item.JobName, + &lastRun, + &lastSuccess, + &lastErrorAt, + &lastError, + &lastDuration, + &item.UpdatedAt, + ); err != nil { + return nil, err + } + + if lastRun.Valid { + v := lastRun.Time + item.LastRunAt = &v + } + if lastSuccess.Valid { + v := lastSuccess.Time + item.LastSuccessAt = &v + } + if lastErrorAt.Valid { + v := lastErrorAt.Time + item.LastErrorAt = &v + } + if lastError.Valid { + v := lastError.String + item.LastError = &v + } + if lastDuration.Valid { + v := lastDuration.Int64 + item.LastDurationMs = &v + } + + out = append(out, &item) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func opsNullBool(v *bool) any { + if v == nil { + return sql.NullBool{} + } + return sql.NullBool{Bool: *v, Valid: true} +} + +func opsNullFloat64(v *float64) any { + if v == nil { + return sql.NullFloat64{} + } + return sql.NullFloat64{Float64: *v, Valid: true} +} + +func opsNullTime(v *time.Time) any { + if v == nil || v.IsZero() { + return sql.NullTime{} + } + return sql.NullTime{Time: *v, Valid: true} +} + diff --git a/backend/internal/repository/ops_repo_preagg.go b/backend/internal/repository/ops_repo_preagg.go new file mode 100644 index 00000000..6a8b9184 --- /dev/null +++ b/backend/internal/repository/ops_repo_preagg.go @@ -0,0 +1,359 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +func (r *opsRepository) UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if startTime.IsZero() || endTime.IsZero() || !endTime.After(startTime) { + return nil + } + + start := startTime.UTC() + end := endTime.UTC() + + // NOTE: + // - We aggregate usage_logs + ops_error_logs into ops_metrics_hourly. + // - We emit three dimension granularities via GROUPING SETS: + // 1) overall: (bucket_start) + // 2) platform: (bucket_start, platform) + // 3) group: (bucket_start, platform, group_id) + // + // IMPORTANT: Postgres UNIQUE treats NULLs as distinct, so the table uses a COALESCE-based + // unique index; our ON CONFLICT target must match that expression set. + q := ` +WITH usage_base AS ( + SELECT + date_trunc('hour', ul.created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + g.platform AS platform, + ul.group_id AS group_id, + ul.duration_ms AS duration_ms, + ul.first_token_ms AS first_token_ms, + (ul.input_tokens + ul.output_tokens + ul.cache_creation_tokens + ul.cache_read_tokens) AS tokens + FROM usage_logs ul + JOIN groups g ON g.id = ul.group_id + WHERE ul.created_at >= $1 AND ul.created_at < $2 +), +usage_agg AS ( + SELECT + bucket_start, + CASE WHEN GROUPING(platform) = 1 THEN NULL ELSE platform END AS platform, + CASE WHEN GROUPING(group_id) = 1 THEN NULL ELSE group_id END AS group_id, + COUNT(*) AS success_count, + COALESCE(SUM(tokens), 0) AS token_consumed, + + percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) FILTER (WHERE duration_ms IS NOT NULL) AS duration_p50_ms, + percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) FILTER (WHERE duration_ms IS NOT NULL) AS duration_p90_ms, + percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) FILTER (WHERE duration_ms IS NOT NULL) AS duration_p95_ms, + percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) FILTER (WHERE duration_ms IS NOT NULL) AS duration_p99_ms, + AVG(duration_ms) FILTER (WHERE duration_ms IS NOT NULL) AS duration_avg_ms, + MAX(duration_ms) AS duration_max_ms, + + percentile_cont(0.50) WITHIN GROUP (ORDER BY first_token_ms) FILTER (WHERE first_token_ms IS NOT NULL) AS ttft_p50_ms, + percentile_cont(0.90) WITHIN GROUP (ORDER BY first_token_ms) FILTER (WHERE first_token_ms IS NOT NULL) AS ttft_p90_ms, + percentile_cont(0.95) WITHIN GROUP (ORDER BY first_token_ms) FILTER (WHERE first_token_ms IS NOT NULL) AS ttft_p95_ms, + percentile_cont(0.99) WITHIN GROUP (ORDER BY first_token_ms) FILTER (WHERE first_token_ms IS NOT NULL) AS ttft_p99_ms, + AVG(first_token_ms) FILTER (WHERE first_token_ms IS NOT NULL) AS ttft_avg_ms, + MAX(first_token_ms) AS ttft_max_ms + FROM usage_base + GROUP BY GROUPING SETS ( + (bucket_start), + (bucket_start, platform), + (bucket_start, platform, group_id) + ) +), +error_base AS ( + SELECT + date_trunc('hour', created_at AT TIME ZONE 'UTC') AT TIME ZONE 'UTC' AS bucket_start, + platform AS platform, + group_id AS group_id, + is_business_limited AS is_business_limited, + error_owner AS error_owner, + status_code AS status_code + FROM ops_error_logs + WHERE created_at >= $1 AND created_at < $2 +), +error_agg AS ( + SELECT + bucket_start, + CASE WHEN GROUPING(platform) = 1 THEN NULL ELSE platform END AS platform, + CASE WHEN GROUPING(group_id) = 1 THEN NULL ELSE group_id END AS group_id, + COUNT(*) AS error_count_total, + COUNT(*) FILTER (WHERE is_business_limited) AS business_limited_count, + COUNT(*) FILTER (WHERE NOT is_business_limited) AS error_count_sla, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)) AS upstream_error_count_excl_429_529, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429) AS upstream_429_count, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529) AS upstream_529_count + FROM error_base + GROUP BY GROUPING SETS ( + (bucket_start), + (bucket_start, platform), + (bucket_start, platform, group_id) + ) + HAVING GROUPING(group_id) = 1 OR group_id IS NOT NULL +), +combined AS ( + SELECT + COALESCE(u.bucket_start, e.bucket_start) AS bucket_start, + COALESCE(u.platform, e.platform) AS platform, + COALESCE(u.group_id, e.group_id) AS group_id, + + COALESCE(u.success_count, 0) AS success_count, + COALESCE(e.error_count_total, 0) AS error_count_total, + COALESCE(e.business_limited_count, 0) AS business_limited_count, + COALESCE(e.error_count_sla, 0) AS error_count_sla, + COALESCE(e.upstream_error_count_excl_429_529, 0) AS upstream_error_count_excl_429_529, + COALESCE(e.upstream_429_count, 0) AS upstream_429_count, + COALESCE(e.upstream_529_count, 0) AS upstream_529_count, + + COALESCE(u.token_consumed, 0) AS token_consumed, + + u.duration_p50_ms, + u.duration_p90_ms, + u.duration_p95_ms, + u.duration_p99_ms, + u.duration_avg_ms, + u.duration_max_ms, + + u.ttft_p50_ms, + u.ttft_p90_ms, + u.ttft_p95_ms, + u.ttft_p99_ms, + u.ttft_avg_ms, + u.ttft_max_ms + FROM usage_agg u + FULL OUTER JOIN error_agg e + ON u.bucket_start = e.bucket_start + AND COALESCE(u.platform, '') = COALESCE(e.platform, '') + AND COALESCE(u.group_id, 0) = COALESCE(e.group_id, 0) +) +INSERT INTO ops_metrics_hourly ( + bucket_start, + platform, + group_id, + success_count, + error_count_total, + business_limited_count, + error_count_sla, + upstream_error_count_excl_429_529, + upstream_429_count, + upstream_529_count, + token_consumed, + duration_p50_ms, + duration_p90_ms, + duration_p95_ms, + duration_p99_ms, + duration_avg_ms, + duration_max_ms, + ttft_p50_ms, + ttft_p90_ms, + ttft_p95_ms, + ttft_p99_ms, + ttft_avg_ms, + ttft_max_ms, + computed_at +) +SELECT + bucket_start, + NULLIF(platform, '') AS platform, + group_id, + success_count, + error_count_total, + business_limited_count, + error_count_sla, + upstream_error_count_excl_429_529, + upstream_429_count, + upstream_529_count, + token_consumed, + duration_p50_ms::int, + duration_p90_ms::int, + duration_p95_ms::int, + duration_p99_ms::int, + duration_avg_ms, + duration_max_ms::int, + ttft_p50_ms::int, + ttft_p90_ms::int, + ttft_p95_ms::int, + ttft_p99_ms::int, + ttft_avg_ms, + ttft_max_ms::int, + NOW() +FROM combined +WHERE bucket_start IS NOT NULL + AND (platform IS NULL OR platform <> '') +ON CONFLICT (bucket_start, COALESCE(platform, ''), COALESCE(group_id, 0)) DO UPDATE SET + success_count = EXCLUDED.success_count, + error_count_total = EXCLUDED.error_count_total, + business_limited_count = EXCLUDED.business_limited_count, + error_count_sla = EXCLUDED.error_count_sla, + upstream_error_count_excl_429_529 = EXCLUDED.upstream_error_count_excl_429_529, + upstream_429_count = EXCLUDED.upstream_429_count, + upstream_529_count = EXCLUDED.upstream_529_count, + token_consumed = EXCLUDED.token_consumed, + + duration_p50_ms = EXCLUDED.duration_p50_ms, + duration_p90_ms = EXCLUDED.duration_p90_ms, + duration_p95_ms = EXCLUDED.duration_p95_ms, + duration_p99_ms = EXCLUDED.duration_p99_ms, + duration_avg_ms = EXCLUDED.duration_avg_ms, + duration_max_ms = EXCLUDED.duration_max_ms, + + ttft_p50_ms = EXCLUDED.ttft_p50_ms, + ttft_p90_ms = EXCLUDED.ttft_p90_ms, + ttft_p95_ms = EXCLUDED.ttft_p95_ms, + ttft_p99_ms = EXCLUDED.ttft_p99_ms, + ttft_avg_ms = EXCLUDED.ttft_avg_ms, + ttft_max_ms = EXCLUDED.ttft_max_ms, + + computed_at = NOW() +` + + _, err := r.db.ExecContext(ctx, q, start, end) + return err +} + +func (r *opsRepository) UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error { + if r == nil || r.db == nil { + return fmt.Errorf("nil ops repository") + } + if startTime.IsZero() || endTime.IsZero() || !endTime.After(startTime) { + return nil + } + + start := startTime.UTC() + end := endTime.UTC() + + q := ` +INSERT INTO ops_metrics_daily ( + bucket_date, + platform, + group_id, + success_count, + error_count_total, + business_limited_count, + error_count_sla, + upstream_error_count_excl_429_529, + upstream_429_count, + upstream_529_count, + token_consumed, + duration_p50_ms, + duration_p90_ms, + duration_p95_ms, + duration_p99_ms, + duration_avg_ms, + duration_max_ms, + ttft_p50_ms, + ttft_p90_ms, + ttft_p95_ms, + ttft_p99_ms, + ttft_avg_ms, + ttft_max_ms, + computed_at +) +SELECT + (bucket_start AT TIME ZONE 'UTC')::date AS bucket_date, + platform, + group_id, + + COALESCE(SUM(success_count), 0) AS success_count, + COALESCE(SUM(error_count_total), 0) AS error_count_total, + COALESCE(SUM(business_limited_count), 0) AS business_limited_count, + COALESCE(SUM(error_count_sla), 0) AS error_count_sla, + COALESCE(SUM(upstream_error_count_excl_429_529), 0) AS upstream_error_count_excl_429_529, + COALESCE(SUM(upstream_429_count), 0) AS upstream_429_count, + COALESCE(SUM(upstream_529_count), 0) AS upstream_529_count, + COALESCE(SUM(token_consumed), 0) AS token_consumed, + + -- Approximation: weighted average for p50/p90, max for p95/p99 (conservative tail). + ROUND(SUM(duration_p50_ms::double precision * success_count) FILTER (WHERE duration_p50_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE duration_p50_ms IS NOT NULL), 0))::int AS duration_p50_ms, + ROUND(SUM(duration_p90_ms::double precision * success_count) FILTER (WHERE duration_p90_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE duration_p90_ms IS NOT NULL), 0))::int AS duration_p90_ms, + MAX(duration_p95_ms) AS duration_p95_ms, + MAX(duration_p99_ms) AS duration_p99_ms, + SUM(duration_avg_ms * success_count) FILTER (WHERE duration_avg_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE duration_avg_ms IS NOT NULL), 0) AS duration_avg_ms, + MAX(duration_max_ms) AS duration_max_ms, + + ROUND(SUM(ttft_p50_ms::double precision * success_count) FILTER (WHERE ttft_p50_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE ttft_p50_ms IS NOT NULL), 0))::int AS ttft_p50_ms, + ROUND(SUM(ttft_p90_ms::double precision * success_count) FILTER (WHERE ttft_p90_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE ttft_p90_ms IS NOT NULL), 0))::int AS ttft_p90_ms, + MAX(ttft_p95_ms) AS ttft_p95_ms, + MAX(ttft_p99_ms) AS ttft_p99_ms, + SUM(ttft_avg_ms * success_count) FILTER (WHERE ttft_avg_ms IS NOT NULL) + / NULLIF(SUM(success_count) FILTER (WHERE ttft_avg_ms IS NOT NULL), 0) AS ttft_avg_ms, + MAX(ttft_max_ms) AS ttft_max_ms, + + NOW() +FROM ops_metrics_hourly +WHERE bucket_start >= $1 AND bucket_start < $2 +GROUP BY 1, 2, 3 +ON CONFLICT (bucket_date, COALESCE(platform, ''), COALESCE(group_id, 0)) DO UPDATE SET + success_count = EXCLUDED.success_count, + error_count_total = EXCLUDED.error_count_total, + business_limited_count = EXCLUDED.business_limited_count, + error_count_sla = EXCLUDED.error_count_sla, + upstream_error_count_excl_429_529 = EXCLUDED.upstream_error_count_excl_429_529, + upstream_429_count = EXCLUDED.upstream_429_count, + upstream_529_count = EXCLUDED.upstream_529_count, + token_consumed = EXCLUDED.token_consumed, + + duration_p50_ms = EXCLUDED.duration_p50_ms, + duration_p90_ms = EXCLUDED.duration_p90_ms, + duration_p95_ms = EXCLUDED.duration_p95_ms, + duration_p99_ms = EXCLUDED.duration_p99_ms, + duration_avg_ms = EXCLUDED.duration_avg_ms, + duration_max_ms = EXCLUDED.duration_max_ms, + + ttft_p50_ms = EXCLUDED.ttft_p50_ms, + ttft_p90_ms = EXCLUDED.ttft_p90_ms, + ttft_p95_ms = EXCLUDED.ttft_p95_ms, + ttft_p99_ms = EXCLUDED.ttft_p99_ms, + ttft_avg_ms = EXCLUDED.ttft_avg_ms, + ttft_max_ms = EXCLUDED.ttft_max_ms, + + computed_at = NOW() +` + + _, err := r.db.ExecContext(ctx, q, start, end) + return err +} + +func (r *opsRepository) GetLatestHourlyBucketStart(ctx context.Context) (time.Time, bool, error) { + if r == nil || r.db == nil { + return time.Time{}, false, fmt.Errorf("nil ops repository") + } + + var value sql.NullTime + if err := r.db.QueryRowContext(ctx, `SELECT MAX(bucket_start) FROM ops_metrics_hourly`).Scan(&value); err != nil { + return time.Time{}, false, err + } + if !value.Valid { + return time.Time{}, false, nil + } + return value.Time.UTC(), true, nil +} + +func (r *opsRepository) GetLatestDailyBucketDate(ctx context.Context) (time.Time, bool, error) { + if r == nil || r.db == nil { + return time.Time{}, false, fmt.Errorf("nil ops repository") + } + + var value sql.NullTime + if err := r.db.QueryRowContext(ctx, `SELECT MAX(bucket_date) FROM ops_metrics_daily`).Scan(&value); err != nil { + return time.Time{}, false, err + } + if !value.Valid { + return time.Time{}, false, nil + } + t := value.Time.UTC() + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC), true, nil +} + diff --git a/backend/internal/repository/ops_repo_request_details.go b/backend/internal/repository/ops_repo_request_details.go new file mode 100644 index 00000000..57b93b21 --- /dev/null +++ b/backend/internal/repository/ops_repo_request_details.go @@ -0,0 +1,285 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) ListRequestDetails(ctx context.Context, filter *service.OpsRequestDetailFilter) ([]*service.OpsRequestDetail, int64, error) { + if r == nil || r.db == nil { + return nil, 0, fmt.Errorf("nil ops repository") + } + + page, pageSize, startTime, endTime := filter.Normalize() + offset := (page - 1) * pageSize + + conditions := make([]string, 0, 16) + args := make([]any, 0, 24) + + // Placeholders $1/$2 reserved for time window inside the CTE. + args = append(args, startTime.UTC(), endTime.UTC()) + + addCondition := func(condition string, values ...any) { + conditions = append(conditions, condition) + args = append(args, values...) + } + + if filter != nil { + if kind := strings.TrimSpace(strings.ToLower(filter.Kind)); kind != "" && kind != "all" { + if kind != string(service.OpsRequestKindSuccess) && kind != string(service.OpsRequestKindError) { + return nil, 0, fmt.Errorf("invalid kind") + } + addCondition(fmt.Sprintf("kind = $%d", len(args)+1), kind) + } + + if platform := strings.TrimSpace(strings.ToLower(filter.Platform)); platform != "" { + addCondition(fmt.Sprintf("platform = $%d", len(args)+1), platform) + } + if filter.GroupID != nil && *filter.GroupID > 0 { + addCondition(fmt.Sprintf("group_id = $%d", len(args)+1), *filter.GroupID) + } + + if filter.UserID != nil && *filter.UserID > 0 { + addCondition(fmt.Sprintf("user_id = $%d", len(args)+1), *filter.UserID) + } + if filter.APIKeyID != nil && *filter.APIKeyID > 0 { + addCondition(fmt.Sprintf("api_key_id = $%d", len(args)+1), *filter.APIKeyID) + } + if filter.AccountID != nil && *filter.AccountID > 0 { + addCondition(fmt.Sprintf("account_id = $%d", len(args)+1), *filter.AccountID) + } + + if model := strings.TrimSpace(filter.Model); model != "" { + addCondition(fmt.Sprintf("model = $%d", len(args)+1), model) + } + if requestID := strings.TrimSpace(filter.RequestID); requestID != "" { + addCondition(fmt.Sprintf("request_id = $%d", len(args)+1), requestID) + } + if q := strings.TrimSpace(filter.Query); q != "" { + like := "%" + strings.ToLower(q) + "%" + startIdx := len(args) + 1 + addCondition( + fmt.Sprintf("(LOWER(COALESCE(request_id,'')) LIKE $%d OR LOWER(COALESCE(model,'')) LIKE $%d OR LOWER(COALESCE(message,'')) LIKE $%d)", + startIdx, startIdx+1, startIdx+2, + ), + like, like, like, + ) + } + + if filter.MinDurationMs != nil { + addCondition(fmt.Sprintf("duration_ms >= $%d", len(args)+1), *filter.MinDurationMs) + } + if filter.MaxDurationMs != nil { + addCondition(fmt.Sprintf("duration_ms <= $%d", len(args)+1), *filter.MaxDurationMs) + } + } + + where := "" + if len(conditions) > 0 { + where = "WHERE " + strings.Join(conditions, " AND ") + } + + cte := ` +WITH combined AS ( + SELECT + 'success'::TEXT AS kind, + ul.created_at AS created_at, + ul.request_id AS request_id, + COALESCE(NULLIF(g.platform, ''), NULLIF(a.platform, ''), '') AS platform, + ul.model AS model, + ul.duration_ms AS duration_ms, + NULL::INT AS status_code, + NULL::BIGINT AS error_id, + NULL::TEXT AS phase, + NULL::TEXT AS severity, + NULL::TEXT AS message, + ul.user_id AS user_id, + ul.api_key_id AS api_key_id, + ul.account_id AS account_id, + ul.group_id AS group_id, + ul.stream AS stream + FROM usage_logs ul + LEFT JOIN groups g ON g.id = ul.group_id + LEFT JOIN accounts a ON a.id = ul.account_id + WHERE ul.created_at >= $1 AND ul.created_at < $2 + + UNION ALL + + SELECT + 'error'::TEXT AS kind, + o.created_at AS created_at, + COALESCE(NULLIF(o.request_id,''), NULLIF(o.client_request_id,''), '') AS request_id, + COALESCE(NULLIF(o.platform, ''), NULLIF(g.platform, ''), NULLIF(a.platform, ''), '') AS platform, + o.model AS model, + o.duration_ms AS duration_ms, + o.status_code AS status_code, + o.id AS error_id, + o.error_phase AS phase, + o.severity AS severity, + o.error_message AS message, + o.user_id AS user_id, + o.api_key_id AS api_key_id, + o.account_id AS account_id, + o.group_id AS group_id, + o.stream AS stream + FROM ops_error_logs o + LEFT JOIN groups g ON g.id = o.group_id + LEFT JOIN accounts a ON a.id = o.account_id + WHERE o.created_at >= $1 AND o.created_at < $2 +) +` + + countQuery := fmt.Sprintf(`%s SELECT COUNT(1) FROM combined %s`, cte, where) + var total int64 + if err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + if err == sql.ErrNoRows { + total = 0 + } else { + return nil, 0, err + } + } + + sort := "ORDER BY created_at DESC" + if filter != nil { + switch strings.TrimSpace(strings.ToLower(filter.Sort)) { + case "", "created_at_desc": + // default + case "duration_desc": + sort = "ORDER BY duration_ms DESC NULLS LAST, created_at DESC" + default: + return nil, 0, fmt.Errorf("invalid sort") + } + } + + listQuery := fmt.Sprintf(` +%s +SELECT + kind, + created_at, + request_id, + platform, + model, + duration_ms, + status_code, + error_id, + phase, + severity, + message, + user_id, + api_key_id, + account_id, + group_id, + stream +FROM combined +%s +%s +LIMIT $%d OFFSET $%d +`, cte, where, sort, len(args)+1, len(args)+2) + + listArgs := append(append([]any{}, args...), pageSize, offset) + rows, err := r.db.QueryContext(ctx, listQuery, listArgs...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + toIntPtr := func(v sql.NullInt64) *int { + if !v.Valid { + return nil + } + i := int(v.Int64) + return &i + } + toInt64Ptr := func(v sql.NullInt64) *int64 { + if !v.Valid { + return nil + } + i := v.Int64 + return &i + } + + out := make([]*service.OpsRequestDetail, 0, pageSize) + for rows.Next() { + var ( + kind string + createdAt time.Time + requestID sql.NullString + platform sql.NullString + model sql.NullString + + durationMs sql.NullInt64 + statusCode sql.NullInt64 + errorID sql.NullInt64 + + phase sql.NullString + severity sql.NullString + message sql.NullString + + userID sql.NullInt64 + apiKeyID sql.NullInt64 + accountID sql.NullInt64 + groupID sql.NullInt64 + + stream bool + ) + + if err := rows.Scan( + &kind, + &createdAt, + &requestID, + &platform, + &model, + &durationMs, + &statusCode, + &errorID, + &phase, + &severity, + &message, + &userID, + &apiKeyID, + &accountID, + &groupID, + &stream, + ); err != nil { + return nil, 0, err + } + + item := &service.OpsRequestDetail{ + Kind: service.OpsRequestKind(kind), + CreatedAt: createdAt, + RequestID: strings.TrimSpace(requestID.String), + Platform: strings.TrimSpace(platform.String), + Model: strings.TrimSpace(model.String), + + DurationMs: toIntPtr(durationMs), + StatusCode: toIntPtr(statusCode), + ErrorID: toInt64Ptr(errorID), + Phase: phase.String, + Severity: severity.String, + Message: message.String, + + UserID: toInt64Ptr(userID), + APIKeyID: toInt64Ptr(apiKeyID), + AccountID: toInt64Ptr(accountID), + GroupID: toInt64Ptr(groupID), + + Stream: stream, + } + + if item.Platform == "" { + item.Platform = "unknown" + } + + out = append(out, item) + } + if err := rows.Err(); err != nil { + return nil, 0, err + } + + return out, total, nil +} diff --git a/backend/internal/repository/ops_repo_trends.go b/backend/internal/repository/ops_repo_trends.go new file mode 100644 index 00000000..5f32c5d1 --- /dev/null +++ b/backend/internal/repository/ops_repo_trends.go @@ -0,0 +1,567 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) GetThroughputTrend(ctx context.Context, filter *service.OpsDashboardFilter, bucketSeconds int) (*service.OpsThroughputTrendResponse, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + if bucketSeconds <= 0 { + bucketSeconds = 60 + } + if bucketSeconds != 60 && bucketSeconds != 300 && bucketSeconds != 3600 { + // Keep a small, predictable set of supported buckets for now. + bucketSeconds = 60 + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + + usageJoin, usageWhere, usageArgs, next := buildUsageWhere(filter, start, end, 1) + errorWhere, errorArgs, _ := buildErrorWhere(filter, start, end, next) + + usageBucketExpr := opsBucketExprForUsage(bucketSeconds) + errorBucketExpr := opsBucketExprForError(bucketSeconds) + + q := ` +WITH usage_buckets AS ( + SELECT ` + usageBucketExpr + ` AS bucket, + COUNT(*) AS success_count, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed + FROM usage_logs ul + ` + usageJoin + ` + ` + usageWhere + ` + GROUP BY 1 +), +error_buckets AS ( + SELECT ` + errorBucketExpr + ` AS bucket, + COUNT(*) AS error_count + FROM ops_error_logs + ` + errorWhere + ` + GROUP BY 1 +), +combined AS ( + SELECT COALESCE(u.bucket, e.bucket) AS bucket, + COALESCE(u.success_count, 0) AS success_count, + COALESCE(e.error_count, 0) AS error_count, + COALESCE(u.token_consumed, 0) AS token_consumed + FROM usage_buckets u + FULL OUTER JOIN error_buckets e ON u.bucket = e.bucket +) +SELECT + bucket, + (success_count + error_count) AS request_count, + token_consumed +FROM combined +ORDER BY bucket ASC` + + args := append(usageArgs, errorArgs...) + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + points := make([]*service.OpsThroughputTrendPoint, 0, 256) + for rows.Next() { + var bucket time.Time + var requests int64 + var tokens sql.NullInt64 + if err := rows.Scan(&bucket, &requests, &tokens); err != nil { + return nil, err + } + tokenConsumed := int64(0) + if tokens.Valid { + tokenConsumed = tokens.Int64 + } + + denom := float64(bucketSeconds) + if denom <= 0 { + denom = 60 + } + qps := roundTo1DP(float64(requests) / denom) + tps := roundTo1DP(float64(tokenConsumed) / denom) + + points = append(points, &service.OpsThroughputTrendPoint{ + BucketStart: bucket.UTC(), + RequestCount: requests, + TokenConsumed: tokenConsumed, + QPS: qps, + TPS: tps, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + + // Fill missing buckets with zeros so charts render continuous timelines. + points = fillOpsThroughputBuckets(start, end, bucketSeconds, points) + + var byPlatform []*service.OpsThroughputPlatformBreakdownItem + var topGroups []*service.OpsThroughputGroupBreakdownItem + + platform := "" + if filter != nil { + platform = strings.TrimSpace(strings.ToLower(filter.Platform)) + } + groupID := (*int64)(nil) + if filter != nil { + groupID = filter.GroupID + } + + // Drilldown helpers: + // - No platform/group: totals by platform + // - Platform selected but no group: top groups in that platform + if platform == "" && (groupID == nil || *groupID <= 0) { + items, err := r.getThroughputBreakdownByPlatform(ctx, start, end) + if err != nil { + return nil, err + } + byPlatform = items + } else if platform != "" && (groupID == nil || *groupID <= 0) { + items, err := r.getThroughputTopGroupsByPlatform(ctx, start, end, platform, 10) + if err != nil { + return nil, err + } + topGroups = items + } + + return &service.OpsThroughputTrendResponse{ + Bucket: opsBucketLabel(bucketSeconds), + Points: points, + + ByPlatform: byPlatform, + TopGroups: topGroups, + }, nil +} + +func (r *opsRepository) getThroughputBreakdownByPlatform(ctx context.Context, start, end time.Time) ([]*service.OpsThroughputPlatformBreakdownItem, error) { + q := ` +WITH usage_totals AS ( + SELECT COALESCE(NULLIF(g.platform,''), a.platform) AS platform, + COUNT(*) AS success_count, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed + FROM usage_logs ul + LEFT JOIN groups g ON g.id = ul.group_id + LEFT JOIN accounts a ON a.id = ul.account_id + WHERE ul.created_at >= $1 AND ul.created_at < $2 + GROUP BY 1 +), +error_totals AS ( + SELECT platform, + COUNT(*) AS error_count + FROM ops_error_logs + WHERE created_at >= $1 AND created_at < $2 + GROUP BY 1 +), +combined AS ( + SELECT COALESCE(u.platform, e.platform) AS platform, + COALESCE(u.success_count, 0) AS success_count, + COALESCE(e.error_count, 0) AS error_count, + COALESCE(u.token_consumed, 0) AS token_consumed + FROM usage_totals u + FULL OUTER JOIN error_totals e ON u.platform = e.platform +) +SELECT platform, (success_count + error_count) AS request_count, token_consumed +FROM combined +WHERE platform IS NOT NULL AND platform <> '' +ORDER BY request_count DESC` + + rows, err := r.db.QueryContext(ctx, q, start, end) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]*service.OpsThroughputPlatformBreakdownItem, 0, 8) + for rows.Next() { + var platform string + var requests int64 + var tokens sql.NullInt64 + if err := rows.Scan(&platform, &requests, &tokens); err != nil { + return nil, err + } + tokenConsumed := int64(0) + if tokens.Valid { + tokenConsumed = tokens.Int64 + } + items = append(items, &service.OpsThroughputPlatformBreakdownItem{ + Platform: platform, + RequestCount: requests, + TokenConsumed: tokenConsumed, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +func (r *opsRepository) getThroughputTopGroupsByPlatform(ctx context.Context, start, end time.Time, platform string, limit int) ([]*service.OpsThroughputGroupBreakdownItem, error) { + if strings.TrimSpace(platform) == "" { + return nil, nil + } + if limit <= 0 || limit > 100 { + limit = 10 + } + + q := ` +WITH usage_totals AS ( + SELECT ul.group_id AS group_id, + g.name AS group_name, + COUNT(*) AS success_count, + COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed + FROM usage_logs ul + JOIN groups g ON g.id = ul.group_id + WHERE ul.created_at >= $1 AND ul.created_at < $2 + AND g.platform = $3 + GROUP BY 1, 2 +), +error_totals AS ( + SELECT group_id, + COUNT(*) AS error_count + FROM ops_error_logs + WHERE created_at >= $1 AND created_at < $2 + AND platform = $3 + AND group_id IS NOT NULL + GROUP BY 1 +), +combined AS ( + SELECT COALESCE(u.group_id, e.group_id) AS group_id, + COALESCE(u.group_name, g2.name, '') AS group_name, + COALESCE(u.success_count, 0) AS success_count, + COALESCE(e.error_count, 0) AS error_count, + COALESCE(u.token_consumed, 0) AS token_consumed + FROM usage_totals u + FULL OUTER JOIN error_totals e ON u.group_id = e.group_id + LEFT JOIN groups g2 ON g2.id = COALESCE(u.group_id, e.group_id) +) +SELECT group_id, group_name, (success_count + error_count) AS request_count, token_consumed +FROM combined +WHERE group_id IS NOT NULL +ORDER BY request_count DESC +LIMIT $4` + + rows, err := r.db.QueryContext(ctx, q, start, end, platform, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]*service.OpsThroughputGroupBreakdownItem, 0, limit) + for rows.Next() { + var groupID int64 + var groupName sql.NullString + var requests int64 + var tokens sql.NullInt64 + if err := rows.Scan(&groupID, &groupName, &requests, &tokens); err != nil { + return nil, err + } + tokenConsumed := int64(0) + if tokens.Valid { + tokenConsumed = tokens.Int64 + } + name := "" + if groupName.Valid { + name = groupName.String + } + items = append(items, &service.OpsThroughputGroupBreakdownItem{ + GroupID: groupID, + GroupName: name, + RequestCount: requests, + TokenConsumed: tokenConsumed, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +func opsBucketExprForUsage(bucketSeconds int) string { + switch bucketSeconds { + case 3600: + return "date_trunc('hour', ul.created_at)" + case 300: + // 5-minute buckets in UTC. + return "to_timestamp(floor(extract(epoch from ul.created_at) / 300) * 300)" + default: + return "date_trunc('minute', ul.created_at)" + } +} + +func opsBucketExprForError(bucketSeconds int) string { + switch bucketSeconds { + case 3600: + return "date_trunc('hour', created_at)" + case 300: + return "to_timestamp(floor(extract(epoch from created_at) / 300) * 300)" + default: + return "date_trunc('minute', created_at)" + } +} + +func opsBucketLabel(bucketSeconds int) string { + if bucketSeconds <= 0 { + return "1m" + } + if bucketSeconds%3600 == 0 { + h := bucketSeconds / 3600 + if h <= 0 { + h = 1 + } + return fmt.Sprintf("%dh", h) + } + m := bucketSeconds / 60 + if m <= 0 { + m = 1 + } + return fmt.Sprintf("%dm", m) +} + +func opsFloorToBucketStart(t time.Time, bucketSeconds int) time.Time { + t = t.UTC() + if bucketSeconds <= 0 { + bucketSeconds = 60 + } + secs := t.Unix() + floored := secs - (secs % int64(bucketSeconds)) + return time.Unix(floored, 0).UTC() +} + +func fillOpsThroughputBuckets(start, end time.Time, bucketSeconds int, points []*service.OpsThroughputTrendPoint) []*service.OpsThroughputTrendPoint { + if bucketSeconds <= 0 { + bucketSeconds = 60 + } + if !start.Before(end) { + return points + } + + endMinus := end.Add(-time.Nanosecond) + if endMinus.Before(start) { + return points + } + + first := opsFloorToBucketStart(start, bucketSeconds) + last := opsFloorToBucketStart(endMinus, bucketSeconds) + step := time.Duration(bucketSeconds) * time.Second + + existing := make(map[int64]*service.OpsThroughputTrendPoint, len(points)) + for _, p := range points { + if p == nil { + continue + } + existing[p.BucketStart.UTC().Unix()] = p + } + + out := make([]*service.OpsThroughputTrendPoint, 0, int(last.Sub(first)/step)+1) + for cursor := first; !cursor.After(last); cursor = cursor.Add(step) { + if p, ok := existing[cursor.Unix()]; ok && p != nil { + out = append(out, p) + continue + } + out = append(out, &service.OpsThroughputTrendPoint{ + BucketStart: cursor, + RequestCount: 0, + TokenConsumed: 0, + QPS: 0, + TPS: 0, + }) + } + return out +} + +func (r *opsRepository) GetErrorTrend(ctx context.Context, filter *service.OpsDashboardFilter, bucketSeconds int) (*service.OpsErrorTrendResponse, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + if bucketSeconds <= 0 { + bucketSeconds = 60 + } + if bucketSeconds != 60 && bucketSeconds != 300 && bucketSeconds != 3600 { + bucketSeconds = 60 + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + where, args, _ := buildErrorWhere(filter, start, end, 1) + bucketExpr := opsBucketExprForError(bucketSeconds) + + q := ` +SELECT + ` + bucketExpr + ` AS bucket, + COUNT(*) AS error_total, + COUNT(*) FILTER (WHERE is_business_limited) AS business_limited, + COUNT(*) FILTER (WHERE NOT is_business_limited) AS error_sla, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)) AS upstream_excl, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429) AS upstream_429, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529) AS upstream_529 +FROM ops_error_logs +` + where + ` +GROUP BY 1 +ORDER BY 1 ASC` + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + points := make([]*service.OpsErrorTrendPoint, 0, 256) + for rows.Next() { + var bucket time.Time + var total, businessLimited, sla, upstreamExcl, upstream429, upstream529 int64 + if err := rows.Scan(&bucket, &total, &businessLimited, &sla, &upstreamExcl, &upstream429, &upstream529); err != nil { + return nil, err + } + points = append(points, &service.OpsErrorTrendPoint{ + BucketStart: bucket.UTC(), + + ErrorCountTotal: total, + BusinessLimitedCount: businessLimited, + ErrorCountSLA: sla, + + UpstreamErrorCountExcl429529: upstreamExcl, + Upstream429Count: upstream429, + Upstream529Count: upstream529, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + + points = fillOpsErrorTrendBuckets(start, end, bucketSeconds, points) + + return &service.OpsErrorTrendResponse{ + Bucket: opsBucketLabel(bucketSeconds), + Points: points, + }, nil +} + +func fillOpsErrorTrendBuckets(start, end time.Time, bucketSeconds int, points []*service.OpsErrorTrendPoint) []*service.OpsErrorTrendPoint { + if bucketSeconds <= 0 { + bucketSeconds = 60 + } + if !start.Before(end) { + return points + } + + endMinus := end.Add(-time.Nanosecond) + if endMinus.Before(start) { + return points + } + + first := opsFloorToBucketStart(start, bucketSeconds) + last := opsFloorToBucketStart(endMinus, bucketSeconds) + step := time.Duration(bucketSeconds) * time.Second + + existing := make(map[int64]*service.OpsErrorTrendPoint, len(points)) + for _, p := range points { + if p == nil { + continue + } + existing[p.BucketStart.UTC().Unix()] = p + } + + out := make([]*service.OpsErrorTrendPoint, 0, int(last.Sub(first)/step)+1) + for cursor := first; !cursor.After(last); cursor = cursor.Add(step) { + if p, ok := existing[cursor.Unix()]; ok && p != nil { + out = append(out, p) + continue + } + out = append(out, &service.OpsErrorTrendPoint{ + BucketStart: cursor, + + ErrorCountTotal: 0, + BusinessLimitedCount: 0, + ErrorCountSLA: 0, + + UpstreamErrorCountExcl429529: 0, + Upstream429Count: 0, + Upstream529Count: 0, + }) + } + return out +} + +func (r *opsRepository) GetErrorDistribution(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsErrorDistributionResponse, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + where, args, _ := buildErrorWhere(filter, start, end, 1) + + q := ` +SELECT + COALESCE(status_code, 0) AS status_code, + COUNT(*) AS total, + COUNT(*) FILTER (WHERE NOT is_business_limited) AS sla, + COUNT(*) FILTER (WHERE is_business_limited) AS business_limited +FROM ops_error_logs +` + where + ` +GROUP BY 1 +ORDER BY total DESC +LIMIT 20` + + rows, err := r.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]*service.OpsErrorDistributionItem, 0, 16) + var total int64 + for rows.Next() { + var statusCode int + var cntTotal, cntSLA, cntBiz int64 + if err := rows.Scan(&statusCode, &cntTotal, &cntSLA, &cntBiz); err != nil { + return nil, err + } + total += cntTotal + items = append(items, &service.OpsErrorDistributionItem{ + StatusCode: statusCode, + Total: cntTotal, + SLA: cntSLA, + BusinessLimited: cntBiz, + }) + } + if err := rows.Err(); err != nil { + return nil, err + } + + return &service.OpsErrorDistributionResponse{ + Total: total, + Items: items, + }, nil +} diff --git a/backend/internal/repository/ops_repo_window_stats.go b/backend/internal/repository/ops_repo_window_stats.go new file mode 100644 index 00000000..8221c473 --- /dev/null +++ b/backend/internal/repository/ops_repo_window_stats.go @@ -0,0 +1,50 @@ +package repository + +import ( + "context" + "fmt" + "time" + + "github.com/Wei-Shaw/sub2api/internal/service" +) + +func (r *opsRepository) GetWindowStats(ctx context.Context, filter *service.OpsDashboardFilter) (*service.OpsWindowStats, error) { + if r == nil || r.db == nil { + return nil, fmt.Errorf("nil ops repository") + } + if filter == nil { + return nil, fmt.Errorf("nil filter") + } + if filter.StartTime.IsZero() || filter.EndTime.IsZero() { + return nil, fmt.Errorf("start_time/end_time required") + } + + start := filter.StartTime.UTC() + end := filter.EndTime.UTC() + if start.After(end) { + return nil, fmt.Errorf("start_time must be <= end_time") + } + // Bound excessively large windows to prevent accidental heavy queries. + if end.Sub(start) > 24*time.Hour { + return nil, fmt.Errorf("window too large") + } + + successCount, tokenConsumed, err := r.queryUsageCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + errorTotal, _, _, _, _, _, err := r.queryErrorCounts(ctx, filter, start, end) + if err != nil { + return nil, err + } + + return &service.OpsWindowStats{ + StartTime: start, + EndTime: end, + + SuccessCount: successCount, + ErrorCountTotal: errorTotal, + TokenConsumed: tokenConsumed, + }, nil +} diff --git a/backend/internal/repository/wire.go b/backend/internal/repository/wire.go index f7574563..315bc1b6 100644 --- a/backend/internal/repository/wire.go +++ b/backend/internal/repository/wire.go @@ -35,6 +35,7 @@ var ProviderSet = wire.NewSet( NewRedeemCodeRepository, NewUsageLogRepository, NewSettingRepository, + NewOpsRepository, NewUserSubscriptionRepository, NewUserAttributeDefinitionRepository, NewUserAttributeValueRepository,