feat(repository): 扩展ops数据访问层功能

- 新增告警静默相关数据库操作
- 增强错误日志查询和统计功能
- 优化重试结果和解决状态的存储
This commit is contained in:
IanShaw027
2026-01-14 09:03:01 +08:00
parent 340dc9cadb
commit 16ddc6a83b
2 changed files with 501 additions and 9 deletions

View File

@@ -134,6 +134,7 @@ func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsEr
pageSize = 500
}
// buildOpsErrorLogsWhere may mutate filter (default resolved filter).
where, args := buildOpsErrorLogsWhere(filter)
countSQL := "SELECT COUNT(*) FROM ops_error_logs " + where
@@ -150,11 +151,19 @@ SELECT
created_at,
error_phase,
error_type,
COALESCE(error_owner, ''),
COALESCE(error_source, ''),
severity,
COALESCE(upstream_status_code, status_code, 0),
COALESCE(platform, ''),
COALESCE(model, ''),
duration_ms,
COALESCE(is_retryable, false),
COALESCE(retry_count, 0),
COALESCE(resolved, false),
resolved_at,
resolved_by_user_id,
resolved_retry_id,
COALESCE(client_request_id, ''),
COALESCE(request_id, ''),
COALESCE(error_message, ''),
@@ -186,16 +195,27 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
var apiKeyID sql.NullInt64
var accountID sql.NullInt64
var groupID sql.NullInt64
var resolvedAt sql.NullTime
var resolvedBy sql.NullInt64
var resolvedRetryID sql.NullInt64
if err := rows.Scan(
&item.ID,
&item.CreatedAt,
&item.Phase,
&item.Type,
&item.Owner,
&item.Source,
&item.Severity,
&statusCode,
&item.Platform,
&item.Model,
&latency,
&item.IsRetryable,
&item.RetryCount,
&item.Resolved,
&resolvedAt,
&resolvedBy,
&resolvedRetryID,
&item.ClientRequestID,
&item.RequestID,
&item.Message,
@@ -209,6 +229,18 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
); err != nil {
return nil, err
}
if resolvedAt.Valid {
t := resolvedAt.Time
item.ResolvedAt = &t
}
if resolvedBy.Valid {
v := resolvedBy.Int64
item.ResolvedByUserID = &v
}
if resolvedRetryID.Valid {
v := resolvedRetryID.Int64
item.ResolvedRetryID = &v
}
if latency.Valid {
v := int(latency.Int64)
item.LatencyMs = &v
@@ -262,11 +294,19 @@ SELECT
created_at,
error_phase,
error_type,
COALESCE(error_owner, ''),
COALESCE(error_source, ''),
severity,
COALESCE(upstream_status_code, status_code, 0),
COALESCE(platform, ''),
COALESCE(model, ''),
duration_ms,
COALESCE(is_retryable, false),
COALESCE(retry_count, 0),
COALESCE(resolved, false),
resolved_at,
resolved_by_user_id,
resolved_retry_id,
COALESCE(client_request_id, ''),
COALESCE(request_id, ''),
COALESCE(error_message, ''),
@@ -301,6 +341,9 @@ LIMIT 1`
var latency sql.NullInt64
var statusCode sql.NullInt64
var upstreamStatusCode sql.NullInt64
var resolvedAt sql.NullTime
var resolvedBy sql.NullInt64
var resolvedRetryID sql.NullInt64
var clientIP sql.NullString
var userID sql.NullInt64
var apiKeyID sql.NullInt64
@@ -318,11 +361,19 @@ LIMIT 1`
&out.CreatedAt,
&out.Phase,
&out.Type,
&out.Owner,
&out.Source,
&out.Severity,
&statusCode,
&out.Platform,
&out.Model,
&latency,
&out.IsRetryable,
&out.RetryCount,
&out.Resolved,
&resolvedAt,
&resolvedBy,
&resolvedRetryID,
&out.ClientRequestID,
&out.RequestID,
&out.Message,
@@ -359,6 +410,18 @@ LIMIT 1`
v := int(latency.Int64)
out.LatencyMs = &v
}
if resolvedAt.Valid {
t := resolvedAt.Time
out.ResolvedAt = &t
}
if resolvedBy.Valid {
v := resolvedBy.Int64
out.ResolvedByUserID = &v
}
if resolvedRetryID.Valid {
v := resolvedRetryID.Int64
out.ResolvedRetryID = &v
}
if clientIP.Valid {
s := clientIP.String
out.ClientIP = &s
@@ -487,9 +550,15 @@ SET
status = $2,
finished_at = $3,
duration_ms = $4,
result_request_id = $5,
result_error_id = $6,
error_message = $7
success = $5,
http_status_code = $6,
upstream_request_id = $7,
used_account_id = $8,
response_preview = $9,
response_truncated = $10,
result_request_id = $11,
result_error_id = $12,
error_message = $13
WHERE id = $1`
_, err := r.db.ExecContext(
@@ -499,8 +568,14 @@ WHERE id = $1`
strings.TrimSpace(input.Status),
nullTime(input.FinishedAt),
input.DurationMs,
nullBool(input.Success),
nullInt(input.HTTPStatusCode),
opsNullString(input.UpstreamRequestID),
nullInt64(input.UsedAccountID),
opsNullString(input.ResponsePreview),
nullBool(input.ResponseTruncated),
opsNullString(input.ResultRequestID),
opsNullInt64(input.ResultErrorID),
nullInt64(input.ResultErrorID),
opsNullString(input.ErrorMessage),
)
return err
@@ -526,6 +601,12 @@ SELECT
started_at,
finished_at,
duration_ms,
success,
http_status_code,
upstream_request_id,
used_account_id,
response_preview,
response_truncated,
result_request_id,
result_error_id,
error_message
@@ -540,6 +621,12 @@ LIMIT 1`
var startedAt sql.NullTime
var finishedAt sql.NullTime
var durationMs sql.NullInt64
var success sql.NullBool
var httpStatusCode sql.NullInt64
var upstreamRequestID sql.NullString
var usedAccountID sql.NullInt64
var responsePreview sql.NullString
var responseTruncated sql.NullBool
var resultRequestID sql.NullString
var resultErrorID sql.NullInt64
var errorMessage sql.NullString
@@ -555,6 +642,12 @@ LIMIT 1`
&startedAt,
&finishedAt,
&durationMs,
&success,
&httpStatusCode,
&upstreamRequestID,
&usedAccountID,
&responsePreview,
&responseTruncated,
&resultRequestID,
&resultErrorID,
&errorMessage,
@@ -579,6 +672,30 @@ LIMIT 1`
v := durationMs.Int64
out.DurationMs = &v
}
if success.Valid {
v := success.Bool
out.Success = &v
}
if httpStatusCode.Valid {
v := int(httpStatusCode.Int64)
out.HTTPStatusCode = &v
}
if upstreamRequestID.Valid {
s := upstreamRequestID.String
out.UpstreamRequestID = &s
}
if usedAccountID.Valid {
v := usedAccountID.Int64
out.UsedAccountID = &v
}
if responsePreview.Valid {
s := responsePreview.String
out.ResponsePreview = &s
}
if responseTruncated.Valid {
v := responseTruncated.Bool
out.ResponseTruncated = &v
}
if resultRequestID.Valid {
s := resultRequestID.String
out.ResultRequestID = &s
@@ -602,18 +719,217 @@ func nullTime(t time.Time) sql.NullTime {
return sql.NullTime{Time: t, Valid: true}
}
func nullBool(v *bool) sql.NullBool {
if v == nil {
return sql.NullBool{}
}
return sql.NullBool{Bool: *v, Valid: true}
}
func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*service.OpsRetryAttempt, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if sourceErrorID <= 0 {
return nil, fmt.Errorf("invalid source_error_id")
}
if limit <= 0 {
limit = 50
}
if limit > 200 {
limit = 200
}
q := `
SELECT
id,
created_at,
COALESCE(requested_by_user_id, 0),
source_error_id,
COALESCE(mode, ''),
pinned_account_id,
COALESCE(status, ''),
started_at,
finished_at,
duration_ms,
success,
http_status_code,
upstream_request_id,
used_account_id,
response_preview,
response_truncated,
result_request_id,
result_error_id,
error_message
FROM ops_retry_attempts
WHERE source_error_id = $1
ORDER BY created_at DESC
LIMIT $2`
rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
out := make([]*service.OpsRetryAttempt, 0, 16)
for rows.Next() {
var item service.OpsRetryAttempt
var pinnedAccountID sql.NullInt64
var requestedBy sql.NullInt64
var startedAt sql.NullTime
var finishedAt sql.NullTime
var durationMs sql.NullInt64
var success sql.NullBool
var httpStatusCode sql.NullInt64
var upstreamRequestID sql.NullString
var usedAccountID sql.NullInt64
var responsePreview sql.NullString
var responseTruncated sql.NullBool
var resultRequestID sql.NullString
var resultErrorID sql.NullInt64
var errorMessage sql.NullString
if err := rows.Scan(
&item.ID,
&item.CreatedAt,
&requestedBy,
&item.SourceErrorID,
&item.Mode,
&pinnedAccountID,
&item.Status,
&startedAt,
&finishedAt,
&durationMs,
&success,
&httpStatusCode,
&upstreamRequestID,
&usedAccountID,
&responsePreview,
&responseTruncated,
&resultRequestID,
&resultErrorID,
&errorMessage,
); err != nil {
return nil, err
}
item.RequestedByUserID = requestedBy.Int64
if pinnedAccountID.Valid {
v := pinnedAccountID.Int64
item.PinnedAccountID = &v
}
if startedAt.Valid {
t := startedAt.Time
item.StartedAt = &t
}
if finishedAt.Valid {
t := finishedAt.Time
item.FinishedAt = &t
}
if durationMs.Valid {
v := durationMs.Int64
item.DurationMs = &v
}
if success.Valid {
v := success.Bool
item.Success = &v
}
if httpStatusCode.Valid {
v := int(httpStatusCode.Int64)
item.HTTPStatusCode = &v
}
if upstreamRequestID.Valid {
s := upstreamRequestID.String
item.UpstreamRequestID = &s
}
if usedAccountID.Valid {
v := usedAccountID.Int64
item.UsedAccountID = &v
}
if responsePreview.Valid {
s := responsePreview.String
item.ResponsePreview = &s
}
if responseTruncated.Valid {
v := responseTruncated.Bool
item.ResponseTruncated = &v
}
if resultRequestID.Valid {
s := resultRequestID.String
item.ResultRequestID = &s
}
if resultErrorID.Valid {
v := resultErrorID.Int64
item.ResultErrorID = &v
}
if errorMessage.Valid {
s := errorMessage.String
item.ErrorMessage = &s
}
out = append(out, &item)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
func (r *opsRepository) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
if r == nil || r.db == nil {
return fmt.Errorf("nil ops repository")
}
if errorID <= 0 {
return fmt.Errorf("invalid error id")
}
q := `
UPDATE ops_error_logs
SET
resolved = $2,
resolved_at = $3,
resolved_by_user_id = $4,
resolved_retry_id = $5
WHERE id = $1`
at := sql.NullTime{}
if resolvedAt != nil && !resolvedAt.IsZero() {
at = sql.NullTime{Time: resolvedAt.UTC(), Valid: true}
} else if resolved {
now := time.Now().UTC()
at = sql.NullTime{Time: now, Valid: true}
}
_, err := r.db.ExecContext(
ctx,
q,
errorID,
resolved,
at,
nullInt64(resolvedByUserID),
nullInt64(resolvedRetryID),
)
return err
}
func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
clauses := make([]string, 0, 8)
args := make([]any, 0, 8)
clauses := make([]string, 0, 12)
args := make([]any, 0, 12)
clauses = append(clauses, "1=1")
phaseFilter := ""
if filter != nil {
phaseFilter = strings.TrimSpace(strings.ToLower(filter.Phase))
}
// ops_error_logs primarily stores client-visible error requests (status>=400),
// ops_error_logs stores client-visible error requests (status>=400),
// but we also persist "recovered" upstream errors (status<400) for upstream health visibility.
// By default, keep list endpoints scoped to client errors unless explicitly filtering upstream phase.
// By default, keep list endpoints scoped to unresolved records if the caller didn't specify.
if filter != nil && filter.Resolved == nil {
f := false
filter.Resolved = &f
}
// Keep list endpoints scoped to client errors unless explicitly filtering upstream phase.
if phaseFilter != "upstream" {
clauses = append(clauses, "COALESCE(status_code, 0) >= 400")
}
@@ -643,6 +959,18 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
args = append(args, phase)
clauses = append(clauses, "error_phase = $"+itoa(len(args)))
}
if owner := strings.TrimSpace(strings.ToLower(filter.Owner)); owner != "" {
args = append(args, owner)
clauses = append(clauses, "LOWER(COALESCE(error_owner,'')) = $"+itoa(len(args)))
}
if source := strings.TrimSpace(strings.ToLower(filter.Source)); source != "" {
args = append(args, source)
clauses = append(clauses, "LOWER(COALESCE(error_source,'')) = $"+itoa(len(args)))
}
if filter.Resolved != nil {
args = append(args, *filter.Resolved)
clauses = append(clauses, "COALESCE(resolved,false) = $"+itoa(len(args)))
}
if len(filter.StatusCodes) > 0 {
args = append(args, pq.Array(filter.StatusCodes))
clauses = append(clauses, "COALESCE(upstream_status_code, status_code, 0) = ANY($"+itoa(len(args))+")")

View File

@@ -354,7 +354,7 @@ SELECT
created_at
FROM ops_alert_events
` + where + `
ORDER BY fired_at DESC
ORDER BY fired_at DESC, id DESC
LIMIT ` + limitArg
rows, err := r.db.QueryContext(ctx, q, args...)
@@ -413,6 +413,43 @@ LIMIT ` + limitArg
return out, nil
}
func (r *opsRepository) GetAlertEventByID(ctx context.Context, eventID int64) (*service.OpsAlertEvent, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if eventID <= 0 {
return nil, fmt.Errorf("invalid event id")
}
q := `
SELECT
id,
COALESCE(rule_id, 0),
COALESCE(severity, ''),
COALESCE(status, ''),
COALESCE(title, ''),
COALESCE(description, ''),
metric_value,
threshold_value,
dimensions,
fired_at,
resolved_at,
email_sent,
created_at
FROM ops_alert_events
WHERE id = $1`
row := r.db.QueryRowContext(ctx, q, eventID)
ev, err := scanOpsAlertEvent(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return ev, nil
}
func (r *opsRepository) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*service.OpsAlertEvent, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
@@ -591,6 +628,121 @@ type opsAlertEventRow interface {
Scan(dest ...any) error
}
func (r *opsRepository) CreateAlertSilence(ctx context.Context, input *service.OpsAlertSilence) (*service.OpsAlertSilence, error) {
if r == nil || r.db == nil {
return nil, fmt.Errorf("nil ops repository")
}
if input == nil {
return nil, fmt.Errorf("nil input")
}
if input.RuleID <= 0 {
return nil, fmt.Errorf("invalid rule_id")
}
platform := strings.TrimSpace(input.Platform)
if platform == "" {
return nil, fmt.Errorf("invalid platform")
}
if input.Until.IsZero() {
return nil, fmt.Errorf("invalid until")
}
q := `
INSERT INTO ops_alert_silences (
rule_id,
platform,
group_id,
region,
until,
reason,
created_by,
created_at
) VALUES (
$1,$2,$3,$4,$5,$6,$7,NOW()
)
RETURNING id, rule_id, platform, group_id, region, until, COALESCE(reason,''), created_by, created_at`
row := r.db.QueryRowContext(
ctx,
q,
input.RuleID,
platform,
opsNullInt64(input.GroupID),
opsNullString(input.Region),
input.Until,
opsNullString(input.Reason),
opsNullInt64(input.CreatedBy),
)
var out service.OpsAlertSilence
var groupID sql.NullInt64
var region sql.NullString
var createdBy sql.NullInt64
if err := row.Scan(
&out.ID,
&out.RuleID,
&out.Platform,
&groupID,
&region,
&out.Until,
&out.Reason,
&createdBy,
&out.CreatedAt,
); err != nil {
return nil, err
}
if groupID.Valid {
v := groupID.Int64
out.GroupID = &v
}
if region.Valid {
v := strings.TrimSpace(region.String)
if v != "" {
out.Region = &v
}
}
if createdBy.Valid {
v := createdBy.Int64
out.CreatedBy = &v
}
return &out, nil
}
func (r *opsRepository) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
if r == nil || r.db == nil {
return false, fmt.Errorf("nil ops repository")
}
if ruleID <= 0 {
return false, fmt.Errorf("invalid rule id")
}
platform = strings.TrimSpace(platform)
if platform == "" {
return false, nil
}
if now.IsZero() {
now = time.Now().UTC()
}
q := `
SELECT 1
FROM ops_alert_silences
WHERE rule_id = $1
AND platform = $2
AND (group_id IS NOT DISTINCT FROM $3)
AND (region IS NOT DISTINCT FROM $4)
AND until > $5
LIMIT 1`
var dummy int
err := r.db.QueryRowContext(ctx, q, ruleID, platform, opsNullInt64(groupID), opsNullString(region), now).Scan(&dummy)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
func scanOpsAlertEvent(row opsAlertEventRow) (*service.OpsAlertEvent, error) {
var ev service.OpsAlertEvent
var metricValue sql.NullFloat64
@@ -652,6 +804,10 @@ func buildOpsAlertEventsWhere(filter *service.OpsAlertEventFilter) (string, []an
args = append(args, severity)
clauses = append(clauses, "severity = $"+itoa(len(args)))
}
if filter.EmailSent != nil {
args = append(args, *filter.EmailSent)
clauses = append(clauses, "email_sent = $"+itoa(len(args)))
}
if filter.StartTime != nil && !filter.StartTime.IsZero() {
args = append(args, *filter.StartTime)
clauses = append(clauses, "fired_at >= $"+itoa(len(args)))
@@ -661,6 +817,14 @@ func buildOpsAlertEventsWhere(filter *service.OpsAlertEventFilter) (string, []an
clauses = append(clauses, "fired_at < $"+itoa(len(args)))
}
// Cursor pagination (descending by fired_at, then id)
if filter.BeforeFiredAt != nil && !filter.BeforeFiredAt.IsZero() && filter.BeforeID != nil && *filter.BeforeID > 0 {
args = append(args, *filter.BeforeFiredAt)
tsArg := "$" + itoa(len(args))
args = append(args, *filter.BeforeID)
idArg := "$" + itoa(len(args))
clauses = append(clauses, fmt.Sprintf("(fired_at < %s OR (fired_at = %s AND id < %s))", tsArg, tsArg, idArg))
}
// Dimensions are stored in JSONB. We filter best-effort without requiring GIN indexes.
if platform := strings.TrimSpace(filter.Platform); platform != "" {
args = append(args, platform)