From d60176801635a188ebcba04701d40dd5085a5b90 Mon Sep 17 00:00:00 2001 From: IanShaw027 <131567472+IanShaw027@users.noreply.github.com> Date: Wed, 14 Jan 2026 09:03:16 +0800 Subject: [PATCH] =?UTF-8?q?feat(service):=20=E5=A2=9E=E5=BC=BAops=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=E9=80=BB=E8=BE=91=E5=92=8C=E5=91=8A=E8=AD=A6=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现告警静默功能的业务逻辑 - 优化错误分类和重试机制 - 扩展告警评估和通知功能 - 完善错误解决和重试结果处理 --- .../service/ops_alert_evaluator_service.go | 11 +++ backend/internal/service/ops_alert_models.go | 29 +++++++- backend/internal/service/ops_alerts.go | 66 +++++++++++++++++ backend/internal/service/ops_models.go | 33 ++++++++- backend/internal/service/ops_port.go | 17 ++++- backend/internal/service/ops_retry.go | 32 ++++++-- backend/internal/service/ops_service.go | 74 +++++++++++++++++-- 7 files changed, 242 insertions(+), 20 deletions(-) diff --git a/backend/internal/service/ops_alert_evaluator_service.go b/backend/internal/service/ops_alert_evaluator_service.go index f376c246..3efa11d2 100644 --- a/backend/internal/service/ops_alert_evaluator_service.go +++ b/backend/internal/service/ops_alert_evaluator_service.go @@ -236,6 +236,17 @@ func (s *OpsAlertEvaluatorService) evaluateOnce(interval time.Duration) { continue } + // Scoped silencing: if a matching silence exists, skip creating a firing event. + if s.opsService != nil { + platform := strings.TrimSpace(scopePlatform) + region := (*string)(nil) + if platform != "" { + if ok, err := s.opsService.IsAlertSilenced(ctx, rule.ID, platform, scopeGroupID, region, now); err == nil && ok { + continue + } + } + } + latestEvent, err := s.opsRepo.GetLatestAlertEvent(ctx, rule.ID) if err != nil { log.Printf("[OpsAlertEvaluator] get latest event failed (rule=%d): %v", rule.ID, err) diff --git a/backend/internal/service/ops_alert_models.go b/backend/internal/service/ops_alert_models.go index 0acf13ab..a0caa990 100644 --- a/backend/internal/service/ops_alert_models.go +++ b/backend/internal/service/ops_alert_models.go @@ -8,8 +8,9 @@ import "time" // with the existing ops dashboard frontend (backup style). const ( - OpsAlertStatusFiring = "firing" - OpsAlertStatusResolved = "resolved" + OpsAlertStatusFiring = "firing" + OpsAlertStatusResolved = "resolved" + OpsAlertStatusManualResolved = "manual_resolved" ) type OpsAlertRule struct { @@ -58,12 +59,32 @@ type OpsAlertEvent struct { CreatedAt time.Time `json:"created_at"` } +type OpsAlertSilence struct { + ID int64 `json:"id"` + + RuleID int64 `json:"rule_id"` + Platform string `json:"platform"` + GroupID *int64 `json:"group_id,omitempty"` + Region *string `json:"region,omitempty"` + + Until time.Time `json:"until"` + Reason string `json:"reason"` + + CreatedBy *int64 `json:"created_by,omitempty"` + CreatedAt time.Time `json:"created_at"` +} + type OpsAlertEventFilter struct { Limit int + // Cursor pagination (descending by fired_at, then id). + BeforeFiredAt *time.Time + BeforeID *int64 + // Optional filters. - Status string - Severity string + Status string + Severity string + EmailSent *bool StartTime *time.Time EndTime *time.Time diff --git a/backend/internal/service/ops_alerts.go b/backend/internal/service/ops_alerts.go index b6c3d1c3..c2bb4e7b 100644 --- a/backend/internal/service/ops_alerts.go +++ b/backend/internal/service/ops_alerts.go @@ -88,6 +88,29 @@ func (s *OpsService) ListAlertEvents(ctx context.Context, filter *OpsAlertEventF return s.opsRepo.ListAlertEvents(ctx, filter) } +func (s *OpsService) GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if eventID <= 0 { + return nil, infraerrors.BadRequest("INVALID_EVENT_ID", "invalid event id") + } + ev, err := s.opsRepo.GetAlertEventByID(ctx, eventID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found") + } + return nil, err + } + if ev == nil { + return nil, infraerrors.NotFound("OPS_ALERT_EVENT_NOT_FOUND", "alert event not found") + } + return ev, nil +} + func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { if err := s.RequireMonitoringEnabled(ctx); err != nil { return nil, err @@ -101,6 +124,49 @@ func (s *OpsService) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*Op return s.opsRepo.GetActiveAlertEvent(ctx, ruleID) } +func (s *OpsService) CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if input == nil { + return nil, infraerrors.BadRequest("INVALID_SILENCE", "invalid silence") + } + if input.RuleID <= 0 { + return nil, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id") + } + if strings.TrimSpace(input.Platform) == "" { + return nil, infraerrors.BadRequest("INVALID_PLATFORM", "invalid platform") + } + if input.Until.IsZero() { + return nil, infraerrors.BadRequest("INVALID_UNTIL", "invalid until") + } + + created, err := s.opsRepo.CreateAlertSilence(ctx, input) + if err != nil { + return nil, err + } + return created, nil +} + +func (s *OpsService) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return false, err + } + if s.opsRepo == nil { + return false, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if ruleID <= 0 { + return false, infraerrors.BadRequest("INVALID_RULE_ID", "invalid rule id") + } + if strings.TrimSpace(platform) == "" { + return false, nil + } + return s.opsRepo.IsAlertSilenced(ctx, ruleID, platform, groupID, region, now) +} + func (s *OpsService) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) { if err := s.RequireMonitoringEnabled(ctx); err != nil { return nil, err diff --git a/backend/internal/service/ops_models.go b/backend/internal/service/ops_models.go index 996267fd..78f7cdd0 100644 --- a/backend/internal/service/ops_models.go +++ b/backend/internal/service/ops_models.go @@ -6,8 +6,16 @@ type OpsErrorLog struct { ID int64 `json:"id"` CreatedAt time.Time `json:"created_at"` - Phase string `json:"phase"` - Type string `json:"type"` + // Standardized classification + // - phase: request|auth|routing|upstream|network|internal + // - owner: client|provider|platform + // - source: client_request|upstream_http|gateway + Phase string `json:"phase"` + Type string `json:"type"` + + Owner string `json:"error_owner"` + Source string `json:"error_source"` + Severity string `json:"severity"` StatusCode int `json:"status_code"` @@ -16,6 +24,15 @@ type OpsErrorLog struct { LatencyMs *int `json:"latency_ms"` + IsRetryable bool `json:"is_retryable"` + RetryCount int `json:"retry_count"` + + Resolved bool `json:"resolved"` + ResolvedAt *time.Time `json:"resolved_at"` + ResolvedByUserID *int64 `json:"resolved_by_user_id"` + ResolvedRetryID *int64 `json:"resolved_retry_id"` + ResolvedStatusRaw string `json:"-"` + ClientRequestID string `json:"client_request_id"` RequestID string `json:"request_id"` Message string `json:"message"` @@ -69,6 +86,9 @@ type OpsErrorLogFilter struct { StatusCodes []int Phase string + Owner string + Source string + Resolved *bool Query string Page int @@ -96,6 +116,15 @@ type OpsRetryAttempt struct { FinishedAt *time.Time `json:"finished_at"` DurationMs *int64 `json:"duration_ms"` + // Persisted execution results (best-effort) + Success *bool `json:"success"` + HTTPStatusCode *int `json:"http_status_code"` + UpstreamRequestID *string `json:"upstream_request_id"` + UsedAccountID *int64 `json:"used_account_id"` + ResponsePreview *string `json:"response_preview"` + ResponseTruncated *bool `json:"response_truncated"` + + // Optional correlation ResultRequestID *string `json:"result_request_id"` ResultErrorID *int64 `json:"result_error_id"` diff --git a/backend/internal/service/ops_port.go b/backend/internal/service/ops_port.go index 4df21c37..37a8107c 100644 --- a/backend/internal/service/ops_port.go +++ b/backend/internal/service/ops_port.go @@ -14,6 +14,8 @@ type OpsRepository interface { InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) + ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) + UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error // Lightweight window stats (for realtime WS / quick sampling). GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error) @@ -39,12 +41,17 @@ type OpsRepository interface { DeleteAlertRule(ctx context.Context, id int64) error ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) + GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error + // Alert silences + CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) + IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) + // Pre-aggregation (hourly/daily) used for long-window dashboard performance. UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error @@ -124,7 +131,15 @@ type OpsUpdateRetryAttemptInput struct { FinishedAt time.Time DurationMs int64 - // Optional correlation + // Persisted execution results (best-effort) + Success *bool + HTTPStatusCode *int + UpstreamRequestID *string + UsedAccountID *int64 + ResponsePreview *string + ResponseTruncated *bool + + // Optional correlation (legacy fields kept) ResultRequestID *string ResultErrorID *int64 diff --git a/backend/internal/service/ops_retry.go b/backend/internal/service/ops_retry.go index 747aa3b8..2cbb8ced 100644 --- a/backend/internal/service/ops_retry.go +++ b/backend/internal/service/ops_retry.go @@ -231,16 +231,36 @@ func (s *OpsService) RetryError(ctx context.Context, requestedByUserID int64, er finalStatus = opsRetryStatusFailed } + success := strings.EqualFold(finalStatus, opsRetryStatusSucceeded) + httpStatus := result.HTTPStatusCode + upstreamReqID := result.UpstreamRequestID + usedAccountID := result.UsedAccountID + preview := result.ResponsePreview + truncated := result.ResponseTruncated + if err := s.opsRepo.UpdateRetryAttempt(updateCtx, &OpsUpdateRetryAttemptInput{ - ID: attemptID, - Status: finalStatus, - FinishedAt: finishedAt, - DurationMs: result.DurationMs, - ResultRequestID: resultRequestID, - ErrorMessage: updateErrMsg, + ID: attemptID, + Status: finalStatus, + FinishedAt: finishedAt, + DurationMs: result.DurationMs, + Success: &success, + HTTPStatusCode: &httpStatus, + UpstreamRequestID: &upstreamReqID, + UsedAccountID: usedAccountID, + ResponsePreview: &preview, + ResponseTruncated: &truncated, + ResultRequestID: resultRequestID, + ErrorMessage: updateErrMsg, }); err != nil { // Best-effort: retry itself already executed; do not fail the API response. log.Printf("[Ops] UpdateRetryAttempt failed: %v", err) + } else { + // Auto-resolve the source error when a manual retry succeeds. + if success { + if err := s.opsRepo.UpdateErrorResolution(updateCtx, errorID, true, &requestedByUserID, &attemptID, &finishedAt); err != nil { + log.Printf("[Ops] UpdateErrorResolution failed: %v", err) + } + } } return result, nil diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index 426d46f1..d9984659 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -256,6 +256,46 @@ func (s *OpsService) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLo return detail, nil } +func (s *OpsService) ListRetryAttemptsByErrorID(ctx context.Context, errorID int64, limit int) ([]*OpsRetryAttempt, error) { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return nil, err + } + if s.opsRepo == nil { + return nil, infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found") + } + if errorID <= 0 { + return nil, infraerrors.BadRequest("OPS_ERROR_INVALID_ID", "invalid error id") + } + items, err := s.opsRepo.ListRetryAttemptsByErrorID(ctx, errorID, limit) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return []*OpsRetryAttempt{}, nil + } + return nil, infraerrors.InternalServer("OPS_RETRY_LIST_FAILED", "Failed to list retry attempts").WithCause(err) + } + return items, nil +} + +func (s *OpsService) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64) error { + if err := s.RequireMonitoringEnabled(ctx); err != nil { + return err + } + if s.opsRepo == nil { + return infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available") + } + if errorID <= 0 { + return infraerrors.BadRequest("OPS_ERROR_INVALID_ID", "invalid error id") + } + // Best-effort ensure the error exists + if _, err := s.opsRepo.GetErrorLogByID(ctx, errorID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return infraerrors.NotFound("OPS_ERROR_NOT_FOUND", "ops error log not found") + } + return infraerrors.InternalServer("OPS_ERROR_LOAD_FAILED", "Failed to load ops error log").WithCause(err) + } + return s.opsRepo.UpdateErrorResolution(ctx, errorID, resolved, resolvedByUserID, resolvedRetryID, nil) +} + func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, truncated bool, bytesLen int) { bytesLen = len(raw) if len(raw) == 0 { @@ -296,14 +336,34 @@ func sanitizeAndTrimRequestBody(raw []byte, maxBytes int) (jsonString string, tr } } - // Last resort: store a minimal placeholder (still valid JSON). - placeholder := map[string]any{ - "request_body_truncated": true, + // Last resort: keep JSON shape but drop big fields. + // This avoids downstream code that expects certain top-level keys from crashing. + if root, ok := decoded.(map[string]any); ok { + placeholder := shallowCopyMap(root) + placeholder["request_body_truncated"] = true + + // Replace potentially huge arrays/strings, but keep the keys present. + for _, k := range []string{"messages", "contents", "input", "prompt"} { + if _, exists := placeholder[k]; exists { + placeholder[k] = []any{} + } + } + for _, k := range []string{"text"} { + if _, exists := placeholder[k]; exists { + placeholder[k] = "" + } + } + + encoded4, err4 := json.Marshal(placeholder) + if err4 == nil { + if len(encoded4) <= maxBytes { + return string(encoded4), true, bytesLen + } + } } - if model := extractString(decoded, "model"); model != "" { - placeholder["model"] = model - } - encoded4, err4 := json.Marshal(placeholder) + + // Final fallback: minimal valid JSON. + encoded4, err4 := json.Marshal(map[string]any{"request_body_truncated": true}) if err4 != nil { return "", true, bytesLen }