diff --git a/backend/.dockerignore b/backend/.dockerignore new file mode 100644 index 00000000..c1c2a854 --- /dev/null +++ b/backend/.dockerignore @@ -0,0 +1,2 @@ +.cache/ +.DS_Store diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index b4a74a81..85a9b785 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -66,6 +66,7 @@ func provideCleanup( opsAggregation *service.OpsAggregationService, opsAlertEvaluator *service.OpsAlertEvaluatorService, opsCleanup *service.OpsCleanupService, + opsScheduledReport *service.OpsScheduledReportService, tokenRefresh *service.TokenRefreshService, accountExpiry *service.AccountExpiryService, pricing *service.PricingService, @@ -85,6 +86,12 @@ func provideCleanup( name string fn func() error }{ + {"OpsScheduledReportService", func() error { + if opsScheduledReport != nil { + opsScheduledReport.Stop() + } + return nil + }}, {"OpsCleanupService", func() error { if opsCleanup != nil { opsCleanup.Stop() diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 407a1e71..89c7175a 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -157,9 +157,10 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig) opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig) + opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, configConfig) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) - v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, tokenRefreshService, accountExpiryService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) + v := provideCleanup(client, redisClient, opsMetricsCollector, opsAggregationService, opsAlertEvaluatorService, opsCleanupService, opsScheduledReportService, tokenRefreshService, accountExpiryService, pricingService, emailQueueService, billingCacheService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) application := &Application{ Server: httpServer, Cleanup: v, @@ -188,6 +189,7 @@ func provideCleanup( opsAggregation *service.OpsAggregationService, opsAlertEvaluator *service.OpsAlertEvaluatorService, opsCleanup *service.OpsCleanupService, + opsScheduledReport *service.OpsScheduledReportService, tokenRefresh *service.TokenRefreshService, accountExpiry *service.AccountExpiryService, pricing *service.PricingService, @@ -206,6 +208,12 @@ func provideCleanup( name string fn func() error }{ + {"OpsScheduledReportService", func() error { + if opsScheduledReport != nil { + opsScheduledReport.Stop() + } + return nil + }}, {"OpsCleanupService", func() error { if opsCleanup != nil { opsCleanup.Stop() diff --git a/backend/internal/handler/ops_error_logger.go b/backend/internal/handler/ops_error_logger.go index f4ab00c4..7115059a 100644 --- a/backend/internal/handler/ops_error_logger.go +++ b/backend/internal/handler/ops_error_logger.go @@ -309,10 +309,6 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { c.Writer = w c.Next() - status := c.Writer.Status() - if status < 400 { - return - } if ops == nil { return } @@ -320,6 +316,229 @@ func OpsErrorLoggerMiddleware(ops *service.OpsService) gin.HandlerFunc { return } + status := c.Writer.Status() + if status < 400 { + // Even when the client request succeeds, we still want to persist upstream error attempts + // (retries/failover) so ops can observe upstream instability that gets "covered" by retries. + var events []*service.OpsUpstreamErrorEvent + if v, ok := c.Get(service.OpsUpstreamErrorsKey); ok { + if arr, ok := v.([]*service.OpsUpstreamErrorEvent); ok && len(arr) > 0 { + events = arr + } + } + // Also accept single upstream fields set by gateway services (rare for successful requests). + hasUpstreamContext := len(events) > 0 + if !hasUpstreamContext { + if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok { + switch t := v.(type) { + case int: + hasUpstreamContext = t > 0 + case int64: + hasUpstreamContext = t > 0 + } + } + } + if !hasUpstreamContext { + if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + hasUpstreamContext = true + } + } + } + if !hasUpstreamContext { + if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + hasUpstreamContext = true + } + } + } + if !hasUpstreamContext { + return + } + + apiKey, _ := middleware2.GetAPIKeyFromContext(c) + clientRequestID, _ := c.Request.Context().Value(ctxkey.ClientRequestID).(string) + + model, _ := c.Get(opsModelKey) + streamV, _ := c.Get(opsStreamKey) + accountIDV, _ := c.Get(opsAccountIDKey) + + var modelName string + if s, ok := model.(string); ok { + modelName = s + } + stream := false + if b, ok := streamV.(bool); ok { + stream = b + } + + // Prefer showing the account that experienced the upstream error (if we have events), + // otherwise fall back to the final selected account (best-effort). + var accountID *int64 + if len(events) > 0 { + if last := events[len(events)-1]; last != nil && last.AccountID > 0 { + v := last.AccountID + accountID = &v + } + } + if accountID == nil { + if v, ok := accountIDV.(int64); ok && v > 0 { + accountID = &v + } + } + + fallbackPlatform := guessPlatformFromPath(c.Request.URL.Path) + platform := resolveOpsPlatform(apiKey, fallbackPlatform) + + requestID := c.Writer.Header().Get("X-Request-Id") + if requestID == "" { + requestID = c.Writer.Header().Get("x-request-id") + } + + // Best-effort backfill single upstream fields from the last event (if present). + var upstreamStatusCode *int + var upstreamErrorMessage *string + var upstreamErrorDetail *string + if len(events) > 0 { + last := events[len(events)-1] + if last != nil { + if last.UpstreamStatusCode > 0 { + code := last.UpstreamStatusCode + upstreamStatusCode = &code + } + if msg := strings.TrimSpace(last.Message); msg != "" { + upstreamErrorMessage = &msg + } + if detail := strings.TrimSpace(last.Detail); detail != "" { + upstreamErrorDetail = &detail + } + } + } + + if upstreamStatusCode == nil { + if v, ok := c.Get(service.OpsUpstreamStatusCodeKey); ok { + switch t := v.(type) { + case int: + if t > 0 { + code := t + upstreamStatusCode = &code + } + case int64: + if t > 0 { + code := int(t) + upstreamStatusCode = &code + } + } + } + } + if upstreamErrorMessage == nil { + if v, ok := c.Get(service.OpsUpstreamErrorMessageKey); ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + msg := strings.TrimSpace(s) + upstreamErrorMessage = &msg + } + } + } + if upstreamErrorDetail == nil { + if v, ok := c.Get(service.OpsUpstreamErrorDetailKey); ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + detail := strings.TrimSpace(s) + upstreamErrorDetail = &detail + } + } + } + + // If we still have nothing meaningful, skip. + if upstreamStatusCode == nil && upstreamErrorMessage == nil && upstreamErrorDetail == nil && len(events) == 0 { + return + } + + effectiveUpstreamStatus := 0 + if upstreamStatusCode != nil { + effectiveUpstreamStatus = *upstreamStatusCode + } + + recoveredMsg := "Recovered upstream error" + if effectiveUpstreamStatus > 0 { + recoveredMsg += " " + strconvItoa(effectiveUpstreamStatus) + } + if upstreamErrorMessage != nil && strings.TrimSpace(*upstreamErrorMessage) != "" { + recoveredMsg += ": " + strings.TrimSpace(*upstreamErrorMessage) + } + recoveredMsg = truncateString(recoveredMsg, 2048) + + entry := &service.OpsInsertErrorLogInput{ + RequestID: requestID, + ClientRequestID: clientRequestID, + + AccountID: accountID, + Platform: platform, + Model: modelName, + RequestPath: func() string { + if c.Request != nil && c.Request.URL != nil { + return c.Request.URL.Path + } + return "" + }(), + Stream: stream, + UserAgent: c.GetHeader("User-Agent"), + + ErrorPhase: "upstream", + ErrorType: "upstream_error", + // Severity/retryability should reflect the upstream failure, not the final client status (200). + Severity: classifyOpsSeverity("upstream_error", effectiveUpstreamStatus), + StatusCode: status, + IsBusinessLimited: false, + + ErrorMessage: recoveredMsg, + ErrorBody: "", + + ErrorSource: "upstream_http", + ErrorOwner: "provider", + + UpstreamStatusCode: upstreamStatusCode, + UpstreamErrorMessage: upstreamErrorMessage, + UpstreamErrorDetail: upstreamErrorDetail, + UpstreamErrors: events, + + IsRetryable: classifyOpsIsRetryable("upstream_error", effectiveUpstreamStatus), + RetryCount: 0, + CreatedAt: time.Now(), + } + + if apiKey != nil { + entry.APIKeyID = &apiKey.ID + if apiKey.User != nil { + entry.UserID = &apiKey.User.ID + } + if apiKey.GroupID != nil { + entry.GroupID = apiKey.GroupID + } + // Prefer group platform if present (more stable than inferring from path). + if apiKey.Group != nil && apiKey.Group.Platform != "" { + entry.Platform = apiKey.Group.Platform + } + } + + var clientIP string + if ip := strings.TrimSpace(c.ClientIP()); ip != "" { + clientIP = ip + entry.ClientIP = &clientIP + } + + var requestBody []byte + if v, ok := c.Get(opsRequestBodyKey); ok { + if b, ok := v.([]byte); ok && len(b) > 0 { + requestBody = b + } + } + // Store request headers/body only when an upstream error occurred to keep overhead minimal. + entry.RequestHeadersJSON = extractOpsRetryRequestHeaders(c) + + enqueueOpsErrorLog(ops, entry, requestBody) + return + } + body := w.buf.Bytes() parsed := parseOpsErrorResponse(body) diff --git a/backend/internal/repository/ops_repo.go b/backend/internal/repository/ops_repo.go index 86372166..e558c08a 100644 --- a/backend/internal/repository/ops_repo.go +++ b/backend/internal/repository/ops_repo.go @@ -149,7 +149,7 @@ SELECT error_phase, error_type, severity, - COALESCE(status_code, 0), + COALESCE(upstream_status_code, status_code, 0), COALESCE(platform, ''), COALESCE(model, ''), duration_ms, @@ -261,7 +261,7 @@ SELECT error_phase, error_type, severity, - COALESCE(status_code, 0), + COALESCE(upstream_status_code, status_code, 0), COALESCE(platform, ''), COALESCE(model, ''), duration_ms, @@ -605,6 +605,17 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) { args := make([]any, 0, 8) clauses = append(clauses, "1=1") + phaseFilter := "" + if filter != nil { + phaseFilter = strings.TrimSpace(strings.ToLower(filter.Phase)) + } + // ops_error_logs primarily stores client-visible error requests (status>=400), + // but we also persist "recovered" upstream errors (status<400) for upstream health visibility. + // By default, keep list endpoints scoped to client errors unless explicitly filtering upstream phase. + if phaseFilter != "upstream" { + clauses = append(clauses, "COALESCE(status_code, 0) >= 400") + } + if filter.StartTime != nil && !filter.StartTime.IsZero() { args = append(args, filter.StartTime.UTC()) clauses = append(clauses, "created_at >= $"+itoa(len(args))) @@ -626,13 +637,13 @@ func buildOpsErrorLogsWhere(filter *service.OpsErrorLogFilter) (string, []any) { args = append(args, *filter.AccountID) clauses = append(clauses, "account_id = $"+itoa(len(args))) } - if phase := strings.TrimSpace(filter.Phase); phase != "" { + if phase := phaseFilter; phase != "" { args = append(args, phase) clauses = append(clauses, "error_phase = $"+itoa(len(args))) } if len(filter.StatusCodes) > 0 { args = append(args, pq.Array(filter.StatusCodes)) - clauses = append(clauses, "status_code = ANY($"+itoa(len(args))+")") + clauses = append(clauses, "COALESCE(upstream_status_code, status_code, 0) = ANY($"+itoa(len(args))+")") } if q := strings.TrimSpace(filter.Query); q != "" { like := "%" + q + "%" diff --git a/backend/internal/repository/ops_repo_dashboard.go b/backend/internal/repository/ops_repo_dashboard.go index 3f2c5b90..156d5caf 100644 --- a/backend/internal/repository/ops_repo_dashboard.go +++ b/backend/internal/repository/ops_repo_dashboard.go @@ -815,9 +815,9 @@ func (r *opsRepository) queryErrorCounts(ctx context.Context, filter *service.Op q := ` SELECT - COALESCE(COUNT(*), 0) AS error_total, - COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited, - COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400), 0) AS error_total, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited), 0) AS business_limited, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited), 0) AS error_sla, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529 @@ -870,6 +870,7 @@ error_buckets AS ( SELECT date_trunc('minute', created_at) AS bucket, COUNT(*) AS cnt FROM ops_error_logs ` + errorWhere + ` + AND COALESCE(status_code, 0) >= 400 GROUP BY 1 ), combined AS ( diff --git a/backend/internal/repository/ops_repo_preagg.go b/backend/internal/repository/ops_repo_preagg.go index f31cc435..fc74e4f6 100644 --- a/backend/internal/repository/ops_repo_preagg.go +++ b/backend/internal/repository/ops_repo_preagg.go @@ -75,7 +75,8 @@ error_base AS ( group_id AS group_id, is_business_limited AS is_business_limited, error_owner AS error_owner, - COALESCE(upstream_status_code, status_code) AS status_code + status_code AS client_status_code, + COALESCE(upstream_status_code, status_code, 0) AS effective_status_code FROM ops_error_logs WHERE created_at >= $1 AND created_at < $2 ), @@ -84,12 +85,12 @@ error_agg AS ( bucket_start, CASE WHEN GROUPING(platform) = 1 THEN NULL ELSE platform END AS platform, CASE WHEN GROUPING(group_id) = 1 THEN NULL ELSE group_id END AS group_id, - COUNT(*) AS error_count_total, - COUNT(*) FILTER (WHERE is_business_limited) AS business_limited_count, - COUNT(*) FILTER (WHERE NOT is_business_limited) AS error_count_sla, - COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)) AS upstream_error_count_excl_429_529, - COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429) AS upstream_429_count, - COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529) AS upstream_529_count + COUNT(*) FILTER (WHERE COALESCE(client_status_code, 0) >= 400) AS error_count_total, + COUNT(*) FILTER (WHERE COALESCE(client_status_code, 0) >= 400 AND is_business_limited) AS business_limited_count, + COUNT(*) FILTER (WHERE COALESCE(client_status_code, 0) >= 400 AND NOT is_business_limited) AS error_count_sla, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(effective_status_code, 0) NOT IN (429, 529)) AS upstream_error_count_excl_429_529, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(effective_status_code, 0) = 429) AS upstream_429_count, + COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(effective_status_code, 0) = 529) AS upstream_529_count FROM error_base GROUP BY GROUPING SETS ( (bucket_start), diff --git a/backend/internal/repository/ops_repo_request_details.go b/backend/internal/repository/ops_repo_request_details.go index 57b93b21..678e1648 100644 --- a/backend/internal/repository/ops_repo_request_details.go +++ b/backend/internal/repository/ops_repo_request_details.go @@ -131,6 +131,7 @@ WITH combined AS ( LEFT JOIN groups g ON g.id = o.group_id LEFT JOIN accounts a ON a.id = o.account_id WHERE o.created_at >= $1 AND o.created_at < $2 + AND COALESCE(o.status_code, 0) >= 400 ) ` diff --git a/backend/internal/repository/ops_repo_trends.go b/backend/internal/repository/ops_repo_trends.go index d71f5080..f25e157a 100644 --- a/backend/internal/repository/ops_repo_trends.go +++ b/backend/internal/repository/ops_repo_trends.go @@ -53,6 +53,7 @@ error_buckets AS ( COUNT(*) AS error_count FROM ops_error_logs ` + errorWhere + ` + AND COALESCE(status_code, 0) >= 400 GROUP BY 1 ), combined AS ( @@ -168,6 +169,7 @@ error_totals AS ( COUNT(*) AS error_count FROM ops_error_logs WHERE created_at >= $1 AND created_at < $2 + AND COALESCE(status_code, 0) >= 400 GROUP BY 1 ), combined AS ( @@ -240,6 +242,7 @@ error_totals AS ( WHERE created_at >= $1 AND created_at < $2 AND platform = $3 AND group_id IS NOT NULL + AND COALESCE(status_code, 0) >= 400 GROUP BY 1 ), combined AS ( @@ -413,9 +416,9 @@ func (r *opsRepository) GetErrorTrend(ctx context.Context, filter *service.OpsDa q := ` SELECT ` + bucketExpr + ` AS bucket, - COUNT(*) AS error_total, - COUNT(*) FILTER (WHERE is_business_limited) AS business_limited, - COUNT(*) FILTER (WHERE NOT is_business_limited) AS error_sla, + COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400) AS error_total, + COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited) AS business_limited, + COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited) AS error_sla, COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)) AS upstream_excl, COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429) AS upstream_429, COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529) AS upstream_529 @@ -524,12 +527,13 @@ func (r *opsRepository) GetErrorDistribution(ctx context.Context, filter *servic q := ` SELECT - COALESCE(status_code, 0) AS status_code, + COALESCE(upstream_status_code, status_code, 0) AS status_code, COUNT(*) AS total, COUNT(*) FILTER (WHERE NOT is_business_limited) AS sla, COUNT(*) FILTER (WHERE is_business_limited) AS business_limited FROM ops_error_logs ` + where + ` + AND COALESCE(status_code, 0) >= 400 GROUP BY 1 ORDER BY total DESC LIMIT 20` diff --git a/backend/internal/service/ops_metrics_collector.go b/backend/internal/service/ops_metrics_collector.go index 8a4dd2d0..edf32cf2 100644 --- a/backend/internal/service/ops_metrics_collector.go +++ b/backend/internal/service/ops_metrics_collector.go @@ -529,9 +529,9 @@ func (c *OpsMetricsCollector) queryErrorCounts(ctx context.Context, start, end t ) { q := ` SELECT - COALESCE(COUNT(*), 0) AS error_total, - COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited, - COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400), 0) AS error_total, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited), 0) AS business_limited, + COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited), 0) AS error_sla, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429, COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529 diff --git a/backend/internal/service/ops_scheduled_report_service.go b/backend/internal/service/ops_scheduled_report_service.go new file mode 100644 index 00000000..f0f3fb6a --- /dev/null +++ b/backend/internal/service/ops_scheduled_report_service.go @@ -0,0 +1,708 @@ +package service + +import ( + "context" + "fmt" + "log" + "strconv" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" +) + +const ( + opsScheduledReportJobName = "ops_scheduled_reports" + + opsScheduledReportLeaderLockKeyDefault = "ops:scheduled_reports:leader" + opsScheduledReportLeaderLockTTLDefault = 5 * time.Minute + + opsScheduledReportLastRunKeyPrefix = "ops:scheduled_reports:last_run:" + + opsScheduledReportTickInterval = 1 * time.Minute +) + +var opsScheduledReportCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + +var opsScheduledReportReleaseScript = redis.NewScript(` +if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) +end +return 0 +`) + +type OpsScheduledReportService struct { + opsService *OpsService + userService *UserService + emailService *EmailService + redisClient *redis.Client + cfg *config.Config + + instanceID string + loc *time.Location + + distributedLockOn bool + warnNoRedisOnce sync.Once + + startOnce sync.Once + stopOnce sync.Once + stopCtx context.Context + stop context.CancelFunc + wg sync.WaitGroup +} + +func NewOpsScheduledReportService( + opsService *OpsService, + userService *UserService, + emailService *EmailService, + redisClient *redis.Client, + cfg *config.Config, +) *OpsScheduledReportService { + lockOn := true + if cfg != nil && strings.TrimSpace(cfg.RunMode) == config.RunModeSimple { + lockOn = false + } + + loc := time.Local + if cfg != nil && strings.TrimSpace(cfg.Timezone) != "" { + if parsed, err := time.LoadLocation(strings.TrimSpace(cfg.Timezone)); err == nil && parsed != nil { + loc = parsed + } + } + return &OpsScheduledReportService{ + opsService: opsService, + userService: userService, + emailService: emailService, + redisClient: redisClient, + cfg: cfg, + + instanceID: uuid.NewString(), + loc: loc, + distributedLockOn: lockOn, + warnNoRedisOnce: sync.Once{}, + startOnce: sync.Once{}, + stopOnce: sync.Once{}, + stopCtx: nil, + stop: nil, + wg: sync.WaitGroup{}, + } +} + +func (s *OpsScheduledReportService) Start() { + s.StartWithContext(context.Background()) +} + +func (s *OpsScheduledReportService) StartWithContext(ctx context.Context) { + if s == nil { + return + } + if ctx == nil { + ctx = context.Background() + } + if s.cfg != nil && !s.cfg.Ops.Enabled { + return + } + if s.opsService == nil || s.emailService == nil { + return + } + + s.startOnce.Do(func() { + s.stopCtx, s.stop = context.WithCancel(ctx) + s.wg.Add(1) + go s.run() + }) +} + +func (s *OpsScheduledReportService) Stop() { + if s == nil { + return + } + s.stopOnce.Do(func() { + if s.stop != nil { + s.stop() + } + }) + s.wg.Wait() +} + +func (s *OpsScheduledReportService) run() { + defer s.wg.Done() + + ticker := time.NewTicker(opsScheduledReportTickInterval) + defer ticker.Stop() + + s.runOnce() + for { + select { + case <-ticker.C: + s.runOnce() + case <-s.stopCtx.Done(): + return + } + } +} + +func (s *OpsScheduledReportService) runOnce() { + if s == nil || s.opsService == nil || s.emailService == nil { + return + } + + startedAt := time.Now().UTC() + runAt := startedAt + + ctx, cancel := context.WithTimeout(s.stopCtx, 60*time.Second) + defer cancel() + + // Respect ops monitoring enabled switch. + if !s.opsService.IsMonitoringEnabled(ctx) { + return + } + + release, ok := s.tryAcquireLeaderLock(ctx) + if !ok { + return + } + if release != nil { + defer release() + } + + now := time.Now() + if s.loc != nil { + now = now.In(s.loc) + } + + reports := s.listScheduledReports(ctx, now) + if len(reports) == 0 { + return + } + + for _, report := range reports { + if report == nil || !report.Enabled { + continue + } + if report.NextRunAt.After(now) { + continue + } + + if err := s.runReport(ctx, report, now); err != nil { + s.recordHeartbeatError(runAt, time.Since(startedAt), err) + return + } + } + + s.recordHeartbeatSuccess(runAt, time.Since(startedAt)) +} + +type opsScheduledReport struct { + Name string + ReportType string + Schedule string + Enabled bool + + TimeRange time.Duration + + Recipients []string + + ErrorDigestMinCount int + AccountHealthErrorRateThreshold float64 + + LastRunAt *time.Time + NextRunAt time.Time +} + +func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, now time.Time) []*opsScheduledReport { + if s == nil || s.opsService == nil { + return nil + } + if ctx == nil { + ctx = context.Background() + } + + emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx) + if err != nil || emailCfg == nil { + return nil + } + if !emailCfg.Report.Enabled { + return nil + } + + recipients := normalizeEmails(emailCfg.Report.Recipients) + + type reportDef struct { + enabled bool + name string + kind string + timeRange time.Duration + schedule string + } + + defs := []reportDef{ + {enabled: emailCfg.Report.DailySummaryEnabled, name: "日报", kind: "daily_summary", timeRange: 24 * time.Hour, schedule: emailCfg.Report.DailySummarySchedule}, + {enabled: emailCfg.Report.WeeklySummaryEnabled, name: "周报", kind: "weekly_summary", timeRange: 7 * 24 * time.Hour, schedule: emailCfg.Report.WeeklySummarySchedule}, + {enabled: emailCfg.Report.ErrorDigestEnabled, name: "错误摘要", kind: "error_digest", timeRange: 24 * time.Hour, schedule: emailCfg.Report.ErrorDigestSchedule}, + {enabled: emailCfg.Report.AccountHealthEnabled, name: "账号健康", kind: "account_health", timeRange: 24 * time.Hour, schedule: emailCfg.Report.AccountHealthSchedule}, + } + + out := make([]*opsScheduledReport, 0, len(defs)) + for _, d := range defs { + if !d.enabled { + continue + } + spec := strings.TrimSpace(d.schedule) + if spec == "" { + continue + } + sched, err := opsScheduledReportCronParser.Parse(spec) + if err != nil { + log.Printf("[OpsScheduledReport] invalid cron spec=%q for report=%s: %v", spec, d.kind, err) + continue + } + + lastRun := s.getLastRunAt(ctx, d.kind) + base := lastRun + if base.IsZero() { + // Allow a schedule matching the current minute to trigger right after startup. + base = now.Add(-1 * time.Minute) + } + next := sched.Next(base) + if next.IsZero() { + continue + } + + var lastRunPtr *time.Time + if !lastRun.IsZero() { + lastCopy := lastRun + lastRunPtr = &lastCopy + } + + out = append(out, &opsScheduledReport{ + Name: d.name, + ReportType: d.kind, + Schedule: spec, + Enabled: true, + + TimeRange: d.timeRange, + + Recipients: recipients, + + ErrorDigestMinCount: emailCfg.Report.ErrorDigestMinCount, + AccountHealthErrorRateThreshold: emailCfg.Report.AccountHealthErrorRateThreshold, + + LastRunAt: lastRunPtr, + NextRunAt: next, + }) + } + + return out +} + +func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) error { + if s == nil || s.opsService == nil || s.emailService == nil || report == nil { + return nil + } + if ctx == nil { + ctx = context.Background() + } + + // Mark as "run" up-front so a broken SMTP config doesn't spam retries every minute. + s.setLastRunAt(ctx, report.ReportType, now) + + content, err := s.generateReportHTML(ctx, report, now) + if err != nil { + return err + } + if strings.TrimSpace(content) == "" { + // Skip sending when the report decides not to emit content (e.g., digest below min count). + return nil + } + + recipients := report.Recipients + if len(recipients) == 0 && s.userService != nil { + admin, err := s.userService.GetFirstAdmin(ctx) + if err == nil && admin != nil && strings.TrimSpace(admin.Email) != "" { + recipients = []string{strings.TrimSpace(admin.Email)} + } + } + if len(recipients) == 0 { + return nil + } + + subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name)) + + for _, to := range recipients { + addr := strings.TrimSpace(to) + if addr == "" { + continue + } + if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil { + // Ignore per-recipient failures; continue best-effort. + continue + } + } + return nil +} + +func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) { + if s == nil || s.opsService == nil || report == nil { + return "", fmt.Errorf("service not initialized") + } + if report.TimeRange <= 0 { + return "", fmt.Errorf("invalid time range") + } + + end := now.UTC() + start := end.Add(-report.TimeRange) + + switch strings.TrimSpace(report.ReportType) { + case "daily_summary", "weekly_summary": + overview, err := s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{ + StartTime: start, + EndTime: end, + Platform: "", + GroupID: nil, + QueryMode: OpsQueryModeAuto, + }) + if err != nil { + // If pre-aggregation isn't ready but the report is requested, fall back to raw. + if strings.TrimSpace(report.ReportType) == "daily_summary" || strings.TrimSpace(report.ReportType) == "weekly_summary" { + overview, err = s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{ + StartTime: start, + EndTime: end, + Platform: "", + GroupID: nil, + QueryMode: OpsQueryModeRaw, + }) + } + if err != nil { + return "", err + } + } + return buildOpsSummaryEmailHTML(report.Name, start, end, overview), nil + case "error_digest": + // Lightweight digest: list recent errors (status>=400) and breakdown by type. + startTime := start + endTime := end + filter := &OpsErrorLogFilter{ + StartTime: &startTime, + EndTime: &endTime, + Page: 1, + PageSize: 100, + } + out, err := s.opsService.GetErrorLogs(ctx, filter) + if err != nil { + return "", err + } + if report.ErrorDigestMinCount > 0 && out != nil && out.Total < report.ErrorDigestMinCount { + return "", nil + } + return buildOpsErrorDigestEmailHTML(report.Name, start, end, out), nil + case "account_health": + // Best-effort: use account availability (not error rate yet). + avail, err := s.opsService.GetAccountAvailability(ctx, "", nil) + if err != nil { + return "", err + } + _ = report.AccountHealthErrorRateThreshold // reserved for future per-account error rate report + return buildOpsAccountHealthEmailHTML(report.Name, start, end, avail), nil + default: + return "", fmt.Errorf("unknown report type: %s", report.ReportType) + } +} + +func buildOpsSummaryEmailHTML(title string, start, end time.Time, overview *OpsDashboardOverview) string { + if overview == nil { + return fmt.Sprintf("
No data.
", htmlEscape(title)) + } + + latP50 := "-" + latP99 := "-" + if overview.Duration.P50 != nil { + latP50 = fmt.Sprintf("%dms", *overview.Duration.P50) + } + if overview.Duration.P99 != nil { + latP99 = fmt.Sprintf("%dms", *overview.Duration.P99) + } + + ttftP50 := "-" + ttftP99 := "-" + if overview.TTFT.P50 != nil { + ttftP50 = fmt.Sprintf("%dms", *overview.TTFT.P50) + } + if overview.TTFT.P99 != nil { + ttftP99 = fmt.Sprintf("%dms", *overview.TTFT.P99) + } + + return fmt.Sprintf(` +Period: %s ~ %s (UTC)
+Period: %s ~ %s (UTC)
+Total Errors: %d
+| Time | Platform | Status | Message |
|---|
Period: %s ~ %s (UTC)
+Note: This report currently reflects account availability status only.
+`, + htmlEscape(strings.TrimSpace(title)), + htmlEscape(start.UTC().Format(time.RFC3339)), + htmlEscape(end.UTC().Format(time.RFC3339)), + total, + available, + rateLimited, + hasError, + ) +} + +func (s *OpsScheduledReportService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { + if s == nil || !s.distributedLockOn { + return nil, true + } + if s.redisClient == nil { + s.warnNoRedisOnce.Do(func() { + log.Printf("[OpsScheduledReport] redis not configured; running without distributed lock") + }) + return nil, true + } + if ctx == nil { + ctx = context.Background() + } + + key := opsScheduledReportLeaderLockKeyDefault + ttl := opsScheduledReportLeaderLockTTLDefault + if strings.TrimSpace(key) == "" { + key = "ops:scheduled_reports:leader" + } + if ttl <= 0 { + ttl = 5 * time.Minute + } + + ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() + if err != nil { + // Prefer fail-closed to avoid duplicate report sends when Redis is flaky. + log.Printf("[OpsScheduledReport] leader lock SetNX failed; skipping this cycle: %v", err) + return nil, false + } + if !ok { + return nil, false + } + return func() { + _, _ = opsScheduledReportReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() + }, true +} + +func (s *OpsScheduledReportService) getLastRunAt(ctx context.Context, reportType string) time.Time { + if s == nil || s.redisClient == nil { + return time.Time{} + } + kind := strings.TrimSpace(reportType) + if kind == "" { + return time.Time{} + } + key := opsScheduledReportLastRunKeyPrefix + kind + + raw, err := s.redisClient.Get(ctx, key).Result() + if err != nil || strings.TrimSpace(raw) == "" { + return time.Time{} + } + sec, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64) + if err != nil || sec <= 0 { + return time.Time{} + } + last := time.Unix(sec, 0) + // Cron schedules are interpreted in the configured timezone (s.loc). Ensure the base time + // passed into cron.Next() uses the same location; otherwise the job will drift by timezone + // offset (e.g. Asia/Shanghai default would run 8h later after the first execution). + if s.loc != nil { + return last.In(s.loc) + } + return last.UTC() +} + +func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType string, t time.Time) { + if s == nil || s.redisClient == nil { + return + } + kind := strings.TrimSpace(reportType) + if kind == "" { + return + } + if t.IsZero() { + t = time.Now().UTC() + } + key := opsScheduledReportLastRunKeyPrefix + kind + _ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err() +} + +func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration) { + if s == nil || s.opsService == nil || s.opsService.opsRepo == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsScheduledReportJobName, + LastRunAt: &runAt, + LastSuccessAt: &now, + LastDurationMs: &durMs, + }) +} + +func (s *OpsScheduledReportService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { + if s == nil || s.opsService == nil || s.opsService.opsRepo == nil || err == nil { + return + } + now := time.Now().UTC() + durMs := duration.Milliseconds() + msg := truncateString(err.Error(), 2048) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ + JobName: opsScheduledReportJobName, + LastRunAt: &runAt, + LastErrorAt: &now, + LastError: &msg, + LastDurationMs: &durMs, + }) +} + +func normalizeEmails(in []string) []string { + if len(in) == 0 { + return nil + } + seen := make(map[string]struct{}, len(in)) + out := make([]string, 0, len(in)) + for _, raw := range in { + addr := strings.ToLower(strings.TrimSpace(raw)) + if addr == "" { + continue + } + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} + out = append(out, addr) + } + return out +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 0480a90c..62f69295 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -132,6 +132,19 @@ func ProvideOpsCleanupService( return svc } +// ProvideOpsScheduledReportService creates and starts OpsScheduledReportService. +func ProvideOpsScheduledReportService( + opsService *OpsService, + userService *UserService, + emailService *EmailService, + redisClient *redis.Client, + cfg *config.Config, +) *OpsScheduledReportService { + svc := NewOpsScheduledReportService(opsService, userService, emailService, redisClient, cfg) + svc.Start() + return svc +} + // ProviderSet is the Wire provider set for all services var ProviderSet = wire.NewSet( // Core services @@ -169,6 +182,7 @@ var ProviderSet = wire.NewSet( ProvideOpsAggregationService, ProvideOpsAlertEvaluatorService, ProvideOpsCleanupService, + ProvideOpsScheduledReportService, NewEmailService, ProvideEmailQueueService, NewTurnstileService, diff --git a/frontend/src/views/admin/ops/components/OpsErrorTrendChart.vue b/frontend/src/views/admin/ops/components/OpsErrorTrendChart.vue index 032e1205..088dc317 100644 --- a/frontend/src/views/admin/ops/components/OpsErrorTrendChart.vue +++ b/frontend/src/views/admin/ops/components/OpsErrorTrendChart.vue @@ -45,10 +45,25 @@ const colors = computed(() => ({ text: isDarkMode.value ? '#9ca3af' : '#6b7280' })) -const totalErrors = computed(() => sumNumbers(props.points.map((p) => p.error_count_sla ?? 0))) +const totalRequestErrors = computed(() => + sumNumbers(props.points.map((p) => (p.error_count_sla ?? 0) + (p.business_limited_count ?? 0))) +) + +const totalUpstreamErrors = computed(() => + sumNumbers( + props.points.map((p) => (p.upstream_error_count_excl_429_529 ?? 0) + (p.upstream_429_count ?? 0) + (p.upstream_529_count ?? 0)) + ) +) + +const totalDisplayed = computed(() => + sumNumbers(props.points.map((p) => (p.error_count_sla ?? 0) + (p.upstream_error_count_excl_429_529 ?? 0) + (p.business_limited_count ?? 0))) +) + +const hasRequestErrors = computed(() => totalRequestErrors.value > 0) +const hasUpstreamErrors = computed(() => totalUpstreamErrors.value > 0) const chartData = computed(() => { - if (!props.points.length || totalErrors.value <= 0) return null + if (!props.points.length || totalDisplayed.value <= 0) return null return { labels: props.points.map((p) => formatHistoryLabel(p.bucket_start, props.timeRange)), datasets: [ @@ -158,7 +173,7 @@ const options = computed(() => {