Files
xinghuoapi/backend/internal/repository/ops_repo.go
IanShaw027 bb5303272b feat(repository): 实现运维监控数据访问层
- 新增 ops 主仓库(ops_repo.go)
- 实现告警数据访问(ops_repo_alerts.go)
- 实现仪表板数据访问(ops_repo_dashboard.go)
- 实现直方图数据访问(ops_repo_histograms.go)
- 实现延迟直方图桶逻辑(ops_repo_latency_histogram_buckets.go)
- 新增延迟直方图桶测试(ops_repo_latency_histogram_buckets_test.go)
- 实现指标数据访问(ops_repo_metrics.go)
- 实现预聚合数据访问(ops_repo_preagg.go)
- 实现请求详情数据访问(ops_repo_request_details.go)
- 实现趋势数据访问(ops_repo_trends.go)
- 实现窗口统计数据访问(ops_repo_window_stats.go)
- 更新并发缓存支持 ops 场景
- 注册 repository 依赖注入
2026-01-09 20:52:57 +08:00

677 lines
15 KiB
Go

package repository
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/lib/pq"
)
type opsRepository struct {
db *sql.DB
}
func NewOpsRepository(db *sql.DB) service.OpsRepository {
return &opsRepository{db: db}
}
func (r *opsRepository) InsertErrorLog(ctx context.Context, input *service.OpsInsertErrorLogInput) (int64, error) {
if r == nil || r.db == nil {
return 0, fmt.Errorf("nil ops repository")
}
if input == nil {
return 0, fmt.Errorf("nil input")
}
q := `
INSERT INTO ops_error_logs (
request_id,
client_request_id,
user_id,
api_key_id,
account_id,
group_id,
client_ip,
platform,
model,
request_path,
stream,
user_agent,
error_phase,
error_type,
severity,
status_code,
is_business_limited,
error_message,
error_body,
error_source,
error_owner,
upstream_status_code,
upstream_error_message,
upstream_error_detail,
duration_ms,
time_to_first_token_ms,
request_body,
request_body_truncated,
request_body_bytes,
request_headers,
is_retryable,
retry_count,
created_at
) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33
) RETURNING id`
var id int64
err := r.db.QueryRowContext(
ctx,
q,
opsNullString(input.RequestID),
opsNullString(input.ClientRequestID),
opsNullInt64(input.UserID),
opsNullInt64(input.APIKeyID),
opsNullInt64(input.AccountID),
opsNullInt64(input.GroupID),
opsNullString(input.ClientIP),
opsNullString(input.Platform),
opsNullString(input.Model),
opsNullString(input.RequestPath),
input.Stream,
opsNullString(input.UserAgent),
input.ErrorPhase,
input.ErrorType,
opsNullString(input.Severity),
opsNullInt(input.StatusCode),
input.IsBusinessLimited,
opsNullString(input.ErrorMessage),
opsNullString(input.ErrorBody),
opsNullString(input.ErrorSource),
opsNullString(input.ErrorOwner),
opsNullInt(input.UpstreamStatusCode),
opsNullString(input.UpstreamErrorMessage),
opsNullString(input.UpstreamErrorDetail),
opsNullInt(input.DurationMs),
opsNullInt64(input.TimeToFirstTokenMs),
opsNullString(input.RequestBodyJSON),
input.RequestBodyTruncated,
opsNullInt(input.RequestBodyBytes),
opsNullString(input.RequestHeadersJSON),
input.IsRetryable,
input.RetryCount,
input.CreatedAt,
).Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}
func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsErrorLogFilter) (*service.OpsErrorLogList, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if filter == nil {
filter = &service.OpsErrorLogFilter{}
}
page := filter.Page
if page <= 0 {
page = 1
}
pageSize := filter.PageSize
if pageSize <= 0 {
pageSize = 20
}
if pageSize > 500 {
pageSize = 500
}
where, args := buildOpsErrorLogsWhere(filter)
countSQL := "SELECT COUNT(*) FROM ops_error_logs " + where
var total int
if err := r.db.QueryRowContext(ctx, countSQL, args...).Scan(&total); err != nil {
return nil, err
}
offset := (page - 1) * pageSize
argsWithLimit := append(args, pageSize, offset)
selectSQL := `
SELECT
id,
created_at,
error_phase,
error_type,
severity,
COALESCE(status_code, 0),
COALESCE(platform, ''),
COALESCE(model, ''),
duration_ms,
COALESCE(client_request_id, ''),
COALESCE(request_id, ''),
COALESCE(error_message, ''),
user_id,
api_key_id,
account_id,
group_id,
CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END,
COALESCE(request_path, ''),
stream
FROM ops_error_logs
` + where + `
ORDER BY created_at DESC
LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
rows, err := r.db.QueryContext(ctx, selectSQL, argsWithLimit...)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]*service.OpsErrorLog, 0, pageSize)
for rows.Next() {
var item service.OpsErrorLog
var latency sql.NullInt64
var statusCode sql.NullInt64
var clientIP sql.NullString
var userID sql.NullInt64
var apiKeyID sql.NullInt64
var accountID sql.NullInt64
var groupID sql.NullInt64
if err := rows.Scan(
&item.ID,
&item.CreatedAt,
&item.Phase,
&item.Type,
&item.Severity,
&statusCode,
&item.Platform,
&item.Model,
&latency,
&item.ClientRequestID,
&item.RequestID,
&item.Message,
&userID,
&apiKeyID,
&accountID,
&groupID,
&clientIP,
&item.RequestPath,
&item.Stream,
); err != nil {
return nil, err
}
if latency.Valid {
v := int(latency.Int64)
item.LatencyMs = &v
}
item.StatusCode = int(statusCode.Int64)
if clientIP.Valid {
s := clientIP.String
item.ClientIP = &s
}
if userID.Valid {
v := userID.Int64
item.UserID = &v
}
if apiKeyID.Valid {
v := apiKeyID.Int64
item.APIKeyID = &v
}
if accountID.Valid {
v := accountID.Int64
item.AccountID = &v
}
if groupID.Valid {
v := groupID.Int64
item.GroupID = &v
}
out = append(out, &item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return &service.OpsErrorLogList{
Errors: out,
Total: total,
Page: page,
PageSize: pageSize,
}, nil
}
func (r *opsRepository) GetErrorLogByID(ctx context.Context, id int64) (*service.OpsErrorLogDetail, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if id <= 0 {
return nil, fmt.Errorf("invalid id")
}
q := `
SELECT
id,
created_at,
error_phase,
error_type,
severity,
COALESCE(status_code, 0),
COALESCE(platform, ''),
COALESCE(model, ''),
duration_ms,
COALESCE(client_request_id, ''),
COALESCE(request_id, ''),
COALESCE(error_message, ''),
COALESCE(error_body, ''),
is_business_limited,
user_id,
api_key_id,
account_id,
group_id,
CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END,
COALESCE(request_path, ''),
stream,
COALESCE(user_agent, ''),
auth_latency_ms,
routing_latency_ms,
upstream_latency_ms,
response_latency_ms,
time_to_first_token_ms,
COALESCE(request_body::text, ''),
request_body_truncated,
request_body_bytes,
COALESCE(request_headers::text, '')
FROM ops_error_logs
WHERE id = $1
LIMIT 1`
var out service.OpsErrorLogDetail
var latency sql.NullInt64
var statusCode sql.NullInt64
var clientIP sql.NullString
var userID sql.NullInt64
var apiKeyID sql.NullInt64
var accountID sql.NullInt64
var groupID sql.NullInt64
var authLatency sql.NullInt64
var routingLatency sql.NullInt64
var upstreamLatency sql.NullInt64
var responseLatency sql.NullInt64
var ttft sql.NullInt64
var requestBodyBytes sql.NullInt64
err := r.db.QueryRowContext(ctx, q, id).Scan(
&out.ID,
&out.CreatedAt,
&out.Phase,
&out.Type,
&out.Severity,
&statusCode,
&out.Platform,
&out.Model,
&latency,
&out.ClientRequestID,
&out.RequestID,
&out.Message,
&out.ErrorBody,
&out.IsBusinessLimited,
&userID,
&apiKeyID,
&accountID,
&groupID,
&clientIP,
&out.RequestPath,
&out.Stream,
&out.UserAgent,
&authLatency,
&routingLatency,
&upstreamLatency,
&responseLatency,
&ttft,
&out.RequestBody,
&out.RequestBodyTruncated,
&requestBodyBytes,
&out.RequestHeaders,
)
if err != nil {
return nil, err
}
out.StatusCode = int(statusCode.Int64)
if latency.Valid {
v := int(latency.Int64)
out.LatencyMs = &v
}
if clientIP.Valid {
s := clientIP.String
out.ClientIP = &s
}
if userID.Valid {
v := userID.Int64
out.UserID = &v
}
if apiKeyID.Valid {
v := apiKeyID.Int64
out.APIKeyID = &v
}
if accountID.Valid {
v := accountID.Int64
out.AccountID = &v
}
if groupID.Valid {
v := groupID.Int64
out.GroupID = &v
}
if authLatency.Valid {
v := authLatency.Int64
out.AuthLatencyMs = &v
}
if routingLatency.Valid {
v := routingLatency.Int64
out.RoutingLatencyMs = &v
}
if upstreamLatency.Valid {
v := upstreamLatency.Int64
out.UpstreamLatencyMs = &v
}
if responseLatency.Valid {
v := responseLatency.Int64
out.ResponseLatencyMs = &v
}
if ttft.Valid {
v := ttft.Int64
out.TimeToFirstTokenMs = &v
}
if requestBodyBytes.Valid {
v := int(requestBodyBytes.Int64)
out.RequestBodyBytes = &v
}
// Normalize request_body to empty string when stored as JSON null.
out.RequestBody = strings.TrimSpace(out.RequestBody)
if out.RequestBody == "null" {
out.RequestBody = ""
}
// Normalize request_headers to empty string when stored as JSON null.
out.RequestHeaders = strings.TrimSpace(out.RequestHeaders)
if out.RequestHeaders == "null" {
out.RequestHeaders = ""
}
return &out, nil
}
func (r *opsRepository) InsertRetryAttempt(ctx context.Context, input *service.OpsInsertRetryAttemptInput) (int64, error) {
if r == nil || r.db == nil {
return 0, fmt.Errorf("nil ops repository")
}
if input == nil {
return 0, fmt.Errorf("nil input")
}
if input.SourceErrorID <= 0 {
return 0, fmt.Errorf("invalid source_error_id")
}
if strings.TrimSpace(input.Mode) == "" {
return 0, fmt.Errorf("invalid mode")
}
q := `
INSERT INTO ops_retry_attempts (
requested_by_user_id,
source_error_id,
mode,
pinned_account_id,
status,
started_at
) VALUES (
$1,$2,$3,$4,$5,$6
) RETURNING id`
var id int64
err := r.db.QueryRowContext(
ctx,
q,
opsNullInt64(&input.RequestedByUserID),
input.SourceErrorID,
strings.TrimSpace(input.Mode),
opsNullInt64(input.PinnedAccountID),
strings.TrimSpace(input.Status),
input.StartedAt,
).Scan(&id)
if err != nil {
return 0, err
}
return id, nil
}
func (r *opsRepository) UpdateRetryAttempt(ctx context.Context, input *service.OpsUpdateRetryAttemptInput) error {
if r == nil || r.db == nil {
return fmt.Errorf("nil ops repository")
}
if input == nil {
return fmt.Errorf("nil input")
}
if input.ID <= 0 {
return fmt.Errorf("invalid id")
}
q := `
UPDATE ops_retry_attempts
SET
status = $2,
finished_at = $3,
duration_ms = $4,
result_request_id = $5,
result_error_id = $6,
error_message = $7
WHERE id = $1`
_, err := r.db.ExecContext(
ctx,
q,
input.ID,
strings.TrimSpace(input.Status),
nullTime(input.FinishedAt),
input.DurationMs,
opsNullString(input.ResultRequestID),
opsNullInt64(input.ResultErrorID),
opsNullString(input.ErrorMessage),
)
return err
}
func (r *opsRepository) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*service.OpsRetryAttempt, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if sourceErrorID <= 0 {
return nil, fmt.Errorf("invalid source_error_id")
}
q := `
SELECT
id,
created_at,
COALESCE(requested_by_user_id, 0),
source_error_id,
COALESCE(mode, ''),
pinned_account_id,
COALESCE(status, ''),
started_at,
finished_at,
duration_ms,
result_request_id,
result_error_id,
error_message
FROM ops_retry_attempts
WHERE source_error_id = $1
ORDER BY created_at DESC
LIMIT 1`
var out service.OpsRetryAttempt
var pinnedAccountID sql.NullInt64
var requestedBy sql.NullInt64
var startedAt sql.NullTime
var finishedAt sql.NullTime
var durationMs sql.NullInt64
var resultRequestID sql.NullString
var resultErrorID sql.NullInt64
var errorMessage sql.NullString
err := r.db.QueryRowContext(ctx, q, sourceErrorID).Scan(
&out.ID,
&out.CreatedAt,
&requestedBy,
&out.SourceErrorID,
&out.Mode,
&pinnedAccountID,
&out.Status,
&startedAt,
&finishedAt,
&durationMs,
&resultRequestID,
&resultErrorID,
&errorMessage,
)
if err != nil {
return nil, err
}
out.RequestedByUserID = requestedBy.Int64
if pinnedAccountID.Valid {
v := pinnedAccountID.Int64
out.PinnedAccountID = &v
}
if startedAt.Valid {
t := startedAt.Time
out.StartedAt = &t
}
if finishedAt.Valid {
t := finishedAt.Time
out.FinishedAt = &t
}
if durationMs.Valid {
v := durationMs.Int64
out.DurationMs = &v
}
if resultRequestID.Valid {
s := resultRequestID.String
out.ResultRequestID = &s
}
if resultErrorID.Valid {
v := resultErrorID.Int64
out.ResultErrorID = &v
}
if errorMessage.Valid {
s := errorMessage.String
out.ErrorMessage = &s
}
return &out, nil
}
func nullTime(t time.Time) sql.NullTime {
if t.IsZero() {
return sql.NullTime{}
}
return sql.NullTime{Time: t, Valid: true}
}
func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
clauses := make([]string, 0, 8)
args := make([]any, 0, 8)
clauses = append(clauses, "1=1")
if filter.StartTime != nil && !filter.StartTime.IsZero() {
args = append(args, filter.StartTime.UTC())
clauses = append(clauses, "created_at >= $"+itoa(len(args)))
}
if filter.EndTime != nil && !filter.EndTime.IsZero() {
args = append(args, filter.EndTime.UTC())
// Keep time-window semantics consistent with other ops queries: [start, end)
clauses = append(clauses, "created_at < $"+itoa(len(args)))
}
if p := strings.TrimSpace(filter.Platform); p != "" {
args = append(args, p)
clauses = append(clauses, "platform = $"+itoa(len(args)))
}
if filter.GroupID != nil && *filter.GroupID > 0 {
args = append(args, *filter.GroupID)
clauses = append(clauses, "group_id = $"+itoa(len(args)))
}
if filter.AccountID != nil && *filter.AccountID > 0 {
args = append(args, *filter.AccountID)
clauses = append(clauses, "account_id = $"+itoa(len(args)))
}
if phase := strings.TrimSpace(filter.Phase); phase != "" {
args = append(args, phase)
clauses = append(clauses, "error_phase = $"+itoa(len(args)))
}
if len(filter.StatusCodes) > 0 {
args = append(args, pq.Array(filter.StatusCodes))
clauses = append(clauses, "status_code = ANY($"+itoa(len(args))+")")
}
if q := strings.TrimSpace(filter.Query); q != "" {
like := "%" + q + "%"
args = append(args, like)
n := itoa(len(args))
clauses = append(clauses, "(request_id ILIKE $"+n+" OR client_request_id ILIKE $"+n+" OR error_message ILIKE $"+n+")")
}
return "WHERE " + strings.Join(clauses, " AND "), args
}
// Helpers for nullable args
func opsNullString(v any) any {
switch s := v.(type) {
case nil:
return sql.NullString{}
case *string:
if s == nil || strings.TrimSpace(*s) == "" {
return sql.NullString{}
}
return sql.NullString{String: strings.TrimSpace(*s), Valid: true}
case string:
if strings.TrimSpace(s) == "" {
return sql.NullString{}
}
return sql.NullString{String: strings.TrimSpace(s), Valid: true}
default:
return sql.NullString{}
}
}
func opsNullInt64(v *int64) any {
if v == nil || *v == 0 {
return sql.NullInt64{}
}
return sql.NullInt64{Int64: *v, Valid: true}
}
func opsNullInt(v any) any {
switch n := v.(type) {
case nil:
return sql.NullInt64{}
case *int:
if n == nil || *n == 0 {
return sql.NullInt64{}
}
return sql.NullInt64{Int64: int64(*n), Valid: true}
case *int64:
if n == nil || *n == 0 {
return sql.NullInt64{}
}
return sql.NullInt64{Int64: *n, Valid: true}
case int:
if n == 0 {
return sql.NullInt64{}
}
return sql.NullInt64{Int64: int64(n), Valid: true}
default:
return sql.NullInt64{}
}
}