refactor(ops): 重构ops handler和repository层

This commit is contained in:
IanShaw027
2026-01-14 12:40:34 +08:00
parent 967e25878f
commit 2ca6c631ac
2 changed files with 133 additions and 79 deletions

View File

@@ -544,6 +544,11 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc {
body := w.buf.Bytes() body := w.buf.Bytes()
parsed := parseOpsErrorResponse(body) parsed := parseOpsErrorResponse(body)
// Skip logging if the error should be filtered based on settings
if shouldSkipOpsErrorLog(c.Request.Context(), ops, parsed.Message, string(body), c.Request.URL.Path) {
return
}
apiKey, _ := middleware2.GetAPIKeyFromContext(c) apiKey, _ := middleware2.GetAPIKeyFromContext(c)
clientRequestID, _ := c.Request.Context().Value(ctxkey.ClientRequestID).(string) clientRequestID, _ := c.Request.Context().Value(ctxkey.ClientRequestID).(string)
@@ -969,3 +974,43 @@ func truncateString(s string, max int) string {
func strconvItoa(v int) string { func strconvItoa(v int) string {
return strconv.Itoa(v) return strconv.Itoa(v)
} }
// shouldSkipOpsErrorLog determines if an error should be skipped from logging based on settings.
// Returns true for errors that should be filtered according to OpsAdvancedSettings.
func shouldSkipOpsErrorLog(ctx context.Context, ops *service.OpsService, message, body, requestPath string) bool {
if ops == nil {
return false
}
// Get advanced settings to check filter configuration
settings, err := ops.GetOpsAdvancedSettings(ctx)
if err != nil || settings == nil {
// If we can't get settings, don't skip (fail open)
return false
}
msgLower := strings.ToLower(message)
bodyLower := strings.ToLower(body)
// Check if count_tokens errors should be ignored
if settings.IgnoreCountTokensErrors && strings.Contains(requestPath, "/count_tokens") {
return true
}
// Check if context canceled errors should be ignored (client disconnects)
if settings.IgnoreContextCanceled {
if strings.Contains(msgLower, "context canceled") || strings.Contains(bodyLower, "context canceled") {
return true
}
}
// Check if "no available accounts" errors should be ignored
if settings.IgnoreNoAvailableAccounts {
if strings.Contains(msgLower, "no available accounts") || strings.Contains(bodyLower, "no available accounts") {
return true
}
}
return false
}

View File

@@ -55,7 +55,6 @@ INSERT INTO ops_error_logs (
upstream_error_message, upstream_error_message,
upstream_error_detail, upstream_error_detail,
upstream_errors, upstream_errors,
duration_ms,
time_to_first_token_ms, time_to_first_token_ms,
request_body, request_body,
request_body_truncated, request_body_truncated,
@@ -65,7 +64,7 @@ INSERT INTO ops_error_logs (
retry_count, retry_count,
created_at created_at
) VALUES ( ) VALUES (
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35 $1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34
) RETURNING id` ) RETURNING id`
var id int64 var id int64
@@ -98,7 +97,6 @@ INSERT INTO ops_error_logs (
opsNullString(input.UpstreamErrorMessage), opsNullString(input.UpstreamErrorMessage),
opsNullString(input.UpstreamErrorDetail), opsNullString(input.UpstreamErrorDetail),
opsNullString(input.UpstreamErrorsJSON), opsNullString(input.UpstreamErrorsJSON),
opsNullInt(input.DurationMs),
opsNullInt64(input.TimeToFirstTokenMs), opsNullInt64(input.TimeToFirstTokenMs),
opsNullString(input.RequestBodyJSON), opsNullString(input.RequestBodyJSON),
input.RequestBodyTruncated, input.RequestBodyTruncated,
@@ -136,7 +134,7 @@ func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsEr
// buildOpsErrorLogsWhere may mutate filter (default resolved filter). // buildOpsErrorLogsWhere may mutate filter (default resolved filter).
where, args := buildOpsErrorLogsWhere(filter) where, args := buildOpsErrorLogsWhere(filter)
countSQL := "SELECT COUNT(*) FROM ops_error_logs " + where countSQL := "SELECT COUNT(*) FROM ops_error_logs e " + where
var total int var total int
if err := r.db.QueryRowContext(ctx, countSQL, args...).Scan(&total); err != nil { if err := r.db.QueryRowContext(ctx, countSQL, args...).Scan(&total); err != nil {
@@ -147,36 +145,43 @@ func (r *opsRepository) ListErrorLogs(ctx context.Context, filter *service.OpsEr
argsWithLimit := append(args, pageSize, offset) argsWithLimit := append(args, pageSize, offset)
selectSQL := ` selectSQL := `
SELECT SELECT
id, e.id,
created_at, e.created_at,
error_phase, e.error_phase,
error_type, e.error_type,
COALESCE(error_owner, ''), COALESCE(e.error_owner, ''),
COALESCE(error_source, ''), COALESCE(e.error_source, ''),
severity, e.severity,
COALESCE(upstream_status_code, status_code, 0), COALESCE(e.upstream_status_code, e.status_code, 0),
COALESCE(platform, ''), COALESCE(e.platform, ''),
COALESCE(model, ''), COALESCE(e.model, ''),
duration_ms, COALESCE(e.is_retryable, false),
COALESCE(is_retryable, false), COALESCE(e.retry_count, 0),
COALESCE(retry_count, 0), COALESCE(e.resolved, false),
COALESCE(resolved, false), e.resolved_at,
resolved_at, e.resolved_by_user_id,
resolved_by_user_id, COALESCE(u2.email, ''),
resolved_retry_id, e.resolved_retry_id,
COALESCE(client_request_id, ''), COALESCE(e.client_request_id, ''),
COALESCE(request_id, ''), COALESCE(e.request_id, ''),
COALESCE(error_message, ''), COALESCE(e.error_message, ''),
user_id, e.user_id,
api_key_id, COALESCE(u.email, ''),
account_id, e.api_key_id,
group_id, e.account_id,
CASE WHEN client_ip IS NULL THEN NULL ELSE client_ip::text END, COALESCE(a.name, ''),
COALESCE(request_path, ''), e.group_id,
stream COALESCE(g.name, ''),
FROM ops_error_logs CASE WHEN e.client_ip IS NULL THEN NULL ELSE e.client_ip::text END,
COALESCE(e.request_path, ''),
e.stream
FROM ops_error_logs e
LEFT JOIN accounts a ON e.account_id = a.id
LEFT JOIN groups g ON e.group_id = g.id
LEFT JOIN users u ON e.user_id = u.id
LEFT JOIN users u2 ON e.resolved_by_user_id = u2.id
` + where + ` ` + where + `
ORDER BY created_at DESC ORDER BY e.created_at DESC
LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2) LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
rows, err := r.db.QueryContext(ctx, selectSQL, argsWithLimit...) rows, err := r.db.QueryContext(ctx, selectSQL, argsWithLimit...)
@@ -188,15 +193,18 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
out := make([]*service.OpsErrorLog, 0, pageSize) out := make([]*service.OpsErrorLog, 0, pageSize)
for rows.Next() { for rows.Next() {
var item service.OpsErrorLog var item service.OpsErrorLog
var latency sql.NullInt64
var statusCode sql.NullInt64 var statusCode sql.NullInt64
var clientIP sql.NullString var clientIP sql.NullString
var userID sql.NullInt64 var userID sql.NullInt64
var apiKeyID sql.NullInt64 var apiKeyID sql.NullInt64
var accountID sql.NullInt64 var accountID sql.NullInt64
var accountName string
var groupID sql.NullInt64 var groupID sql.NullInt64
var groupName string
var userEmail string
var resolvedAt sql.NullTime var resolvedAt sql.NullTime
var resolvedBy sql.NullInt64 var resolvedBy sql.NullInt64
var resolvedByName string
var resolvedRetryID sql.NullInt64 var resolvedRetryID sql.NullInt64
if err := rows.Scan( if err := rows.Scan(
&item.ID, &item.ID,
@@ -209,20 +217,23 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
&statusCode, &statusCode,
&item.Platform, &item.Platform,
&item.Model, &item.Model,
&latency,
&item.IsRetryable, &item.IsRetryable,
&item.RetryCount, &item.RetryCount,
&item.Resolved, &item.Resolved,
&resolvedAt, &resolvedAt,
&resolvedBy, &resolvedBy,
&resolvedByName,
&resolvedRetryID, &resolvedRetryID,
&item.ClientRequestID, &item.ClientRequestID,
&item.RequestID, &item.RequestID,
&item.Message, &item.Message,
&userID, &userID,
&userEmail,
&apiKeyID, &apiKeyID,
&accountID, &accountID,
&accountName,
&groupID, &groupID,
&groupName,
&clientIP, &clientIP,
&item.RequestPath, &item.RequestPath,
&item.Stream, &item.Stream,
@@ -237,14 +248,11 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
v := resolvedBy.Int64 v := resolvedBy.Int64
item.ResolvedByUserID = &v item.ResolvedByUserID = &v
} }
item.ResolvedByUserName = resolvedByName
if resolvedRetryID.Valid { if resolvedRetryID.Valid {
v := resolvedRetryID.Int64 v := resolvedRetryID.Int64
item.ResolvedRetryID = &v item.ResolvedRetryID = &v
} }
if latency.Valid {
v := int(latency.Int64)
item.LatencyMs = &v
}
item.StatusCode = int(statusCode.Int64) item.StatusCode = int(statusCode.Int64)
if clientIP.Valid { if clientIP.Valid {
s := clientIP.String s := clientIP.String
@@ -254,6 +262,7 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
v := userID.Int64 v := userID.Int64
item.UserID = &v item.UserID = &v
} }
item.UserEmail = userEmail
if apiKeyID.Valid { if apiKeyID.Valid {
v := apiKeyID.Int64 v := apiKeyID.Int64
item.APIKeyID = &v item.APIKeyID = &v
@@ -262,10 +271,12 @@ LIMIT $` + itoa(len(args)+1) + ` OFFSET $` + itoa(len(args)+2)
v := accountID.Int64 v := accountID.Int64
item.AccountID = &v item.AccountID = &v
} }
item.AccountName = accountName
if groupID.Valid { if groupID.Valid {
v := groupID.Int64 v := groupID.Int64
item.GroupID = &v item.GroupID = &v
} }
item.GroupName = groupName
out = append(out, &item) out = append(out, &item)
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
@@ -300,7 +311,6 @@ SELECT
COALESCE(upstream_status_code, status_code, 0), COALESCE(upstream_status_code, status_code, 0),
COALESCE(platform, ''), COALESCE(platform, ''),
COALESCE(model, ''), COALESCE(model, ''),
duration_ms,
COALESCE(is_retryable, false), COALESCE(is_retryable, false),
COALESCE(retry_count, 0), COALESCE(retry_count, 0),
COALESCE(resolved, false), COALESCE(resolved, false),
@@ -338,7 +348,6 @@ WHERE id = $1
LIMIT 1` LIMIT 1`
var out service.OpsErrorLogDetail var out service.OpsErrorLogDetail
var latency sql.NullInt64
var statusCode sql.NullInt64 var statusCode sql.NullInt64
var upstreamStatusCode sql.NullInt64 var upstreamStatusCode sql.NullInt64
var resolvedAt sql.NullTime var resolvedAt sql.NullTime
@@ -367,7 +376,6 @@ LIMIT 1`
&statusCode, &statusCode,
&out.Platform, &out.Platform,
&out.Model, &out.Model,
&latency,
&out.IsRetryable, &out.IsRetryable,
&out.RetryCount, &out.RetryCount,
&out.Resolved, &out.Resolved,
@@ -406,10 +414,6 @@ LIMIT 1`
} }
out.StatusCode = int(statusCode.Int64) out.StatusCode = int(statusCode.Int64)
if latency.Valid {
v := int(latency.Int64)
out.LatencyMs = &v
}
if resolvedAt.Valid { if resolvedAt.Valid {
t := resolvedAt.Time t := resolvedAt.Time
out.ResolvedAt = &t out.ResolvedAt = &t
@@ -742,28 +746,32 @@ func (r *opsRepository) ListRetryAttemptsByErrorID(ctx context.Context, sourceEr
q := ` q := `
SELECT SELECT
id, r.id,
created_at, r.created_at,
COALESCE(requested_by_user_id, 0), COALESCE(r.requested_by_user_id, 0),
source_error_id, r.source_error_id,
COALESCE(mode, ''), COALESCE(r.mode, ''),
pinned_account_id, r.pinned_account_id,
COALESCE(status, ''), COALESCE(pa.name, ''),
started_at, COALESCE(r.status, ''),
finished_at, r.started_at,
duration_ms, r.finished_at,
success, r.duration_ms,
http_status_code, r.success,
upstream_request_id, r.http_status_code,
used_account_id, r.upstream_request_id,
response_preview, r.used_account_id,
response_truncated, COALESCE(ua.name, ''),
result_request_id, r.response_preview,
result_error_id, r.response_truncated,
error_message r.result_request_id,
FROM ops_retry_attempts r.result_error_id,
WHERE source_error_id = $1 r.error_message
ORDER BY created_at DESC FROM ops_retry_attempts r
LEFT JOIN accounts pa ON r.pinned_account_id = pa.id
LEFT JOIN accounts ua ON r.used_account_id = ua.id
WHERE r.source_error_id = $1
ORDER BY r.created_at DESC
LIMIT $2` LIMIT $2`
rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit) rows, err := r.db.QueryContext(ctx, q, sourceErrorID, limit)
@@ -776,6 +784,7 @@ LIMIT $2`
for rows.Next() { for rows.Next() {
var item service.OpsRetryAttempt var item service.OpsRetryAttempt
var pinnedAccountID sql.NullInt64 var pinnedAccountID sql.NullInt64
var pinnedAccountName string
var requestedBy sql.NullInt64 var requestedBy sql.NullInt64
var startedAt sql.NullTime var startedAt sql.NullTime
var finishedAt sql.NullTime var finishedAt sql.NullTime
@@ -784,6 +793,7 @@ LIMIT $2`
var httpStatusCode sql.NullInt64 var httpStatusCode sql.NullInt64
var upstreamRequestID sql.NullString var upstreamRequestID sql.NullString
var usedAccountID sql.NullInt64 var usedAccountID sql.NullInt64
var usedAccountName string
var responsePreview sql.NullString var responsePreview sql.NullString
var responseTruncated sql.NullBool var responseTruncated sql.NullBool
var resultRequestID sql.NullString var resultRequestID sql.NullString
@@ -797,6 +807,7 @@ LIMIT $2`
&item.SourceErrorID, &item.SourceErrorID,
&item.Mode, &item.Mode,
&pinnedAccountID, &pinnedAccountID,
&pinnedAccountName,
&item.Status, &item.Status,
&startedAt, &startedAt,
&finishedAt, &finishedAt,
@@ -805,6 +816,7 @@ LIMIT $2`
&httpStatusCode, &httpStatusCode,
&upstreamRequestID, &upstreamRequestID,
&usedAccountID, &usedAccountID,
&usedAccountName,
&responsePreview, &responsePreview,
&responseTruncated, &responseTruncated,
&resultRequestID, &resultRequestID,
@@ -819,6 +831,7 @@ LIMIT $2`
v := pinnedAccountID.Int64 v := pinnedAccountID.Int64
item.PinnedAccountID = &v item.PinnedAccountID = &v
} }
item.PinnedAccountName = pinnedAccountName
if startedAt.Valid { if startedAt.Valid {
t := startedAt.Time t := startedAt.Time
item.StartedAt = &t item.StartedAt = &t
@@ -840,34 +853,30 @@ LIMIT $2`
item.HTTPStatusCode = &v item.HTTPStatusCode = &v
} }
if upstreamRequestID.Valid { if upstreamRequestID.Valid {
s := upstreamRequestID.String item.UpstreamRequestID = &upstreamRequestID.String
item.UpstreamRequestID = &s
} }
if usedAccountID.Valid { if usedAccountID.Valid {
v := usedAccountID.Int64 v := usedAccountID.Int64
item.UsedAccountID = &v item.UsedAccountID = &v
} }
item.UsedAccountName = usedAccountName
if responsePreview.Valid { if responsePreview.Valid {
s := responsePreview.String item.ResponsePreview = &responsePreview.String
item.ResponsePreview = &s
} }
if responseTruncated.Valid { if responseTruncated.Valid {
v := responseTruncated.Bool v := responseTruncated.Bool
item.ResponseTruncated = &v item.ResponseTruncated = &v
} }
if resultRequestID.Valid { if resultRequestID.Valid {
s := resultRequestID.String item.ResultRequestID = &resultRequestID.String
item.ResultRequestID = &s
} }
if resultErrorID.Valid { if resultErrorID.Valid {
v := resultErrorID.Int64 v := resultErrorID.Int64
item.ResultErrorID = &v item.ResultErrorID = &v
} }
if errorMessage.Valid { if errorMessage.Valid {
s := errorMessage.String item.ErrorMessage = &errorMessage.String
item.ErrorMessage = &s
} }
out = append(out, &item) out = append(out, &item)
} }
if err := rows.Err(); err != nil { if err := rows.Err(); err != nil {
@@ -940,12 +949,12 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) {
if filter.StartTime != nil && !filter.StartTime.IsZero() { if filter.StartTime != nil && !filter.StartTime.IsZero() {
args = append(args, filter.StartTime.UTC()) args = append(args, filter.StartTime.UTC())
clauses = append(clauses, "created_at >= $"+itoa(len(args))) clauses = append(clauses, "e.created_at >= $"+itoa(len(args)))
} }
if filter.EndTime != nil && !filter.EndTime.IsZero() { if filter.EndTime != nil && !filter.EndTime.IsZero() {
args = append(args, filter.EndTime.UTC()) args = append(args, filter.EndTime.UTC())
// Keep time-window semantics consistent with other ops queries: [start, end) // Keep time-window semantics consistent with other ops queries: [start, end)
clauses = append(clauses, "created_at < $"+itoa(len(args))) clauses = append(clauses, "e.created_at < $"+itoa(len(args)))
} }
if p := strings.TrimSpace(filter.Platform); p != "" { if p := strings.TrimSpace(filter.Platform); p != "" {
args = append(args, p) args = append(args, p)