package service import ( "context" "fmt" "log" "strconv" "strings" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/google/uuid" "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" ) const ( opsScheduledReportJobName = "ops_scheduled_reports" opsScheduledReportLeaderLockKeyDefault = "ops:scheduled_reports:leader" opsScheduledReportLeaderLockTTLDefault = 5 * time.Minute opsScheduledReportLastRunKeyPrefix = "ops:scheduled_reports:last_run:" opsScheduledReportTickInterval = 1 * time.Minute ) var opsScheduledReportCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) var opsScheduledReportReleaseScript = redis.NewScript(` if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) end return 0 `) type OpsScheduledReportService struct { opsService *OpsService userService *UserService emailService *EmailService redisClient *redis.Client cfg *config.Config instanceID string loc *time.Location distributedLockOn bool warnNoRedisOnce sync.Once startOnce sync.Once stopOnce sync.Once stopCtx context.Context stop context.CancelFunc wg sync.WaitGroup } func NewOpsScheduledReportService( opsService *OpsService, userService *UserService, emailService *EmailService, redisClient *redis.Client, cfg *config.Config, ) *OpsScheduledReportService { lockOn := cfg == nil || strings.TrimSpace(cfg.RunMode) != config.RunModeSimple loc := time.Local if cfg != nil && strings.TrimSpace(cfg.Timezone) != "" { if parsed, err := time.LoadLocation(strings.TrimSpace(cfg.Timezone)); err == nil && parsed != nil { loc = parsed } } return &OpsScheduledReportService{ opsService: opsService, userService: userService, emailService: emailService, redisClient: redisClient, cfg: cfg, instanceID: uuid.NewString(), loc: loc, distributedLockOn: lockOn, warnNoRedisOnce: sync.Once{}, startOnce: sync.Once{}, stopOnce: sync.Once{}, stopCtx: nil, stop: nil, wg: sync.WaitGroup{}, } } func (s *OpsScheduledReportService) Start() { s.StartWithContext(context.Background()) } func (s *OpsScheduledReportService) StartWithContext(ctx context.Context) { if s == nil { return } if ctx == nil { ctx = context.Background() } if s.cfg != nil && !s.cfg.Ops.Enabled { return } if s.opsService == nil || s.emailService == nil { return } s.startOnce.Do(func() { s.stopCtx, s.stop = context.WithCancel(ctx) s.wg.Add(1) go s.run() }) } func (s *OpsScheduledReportService) Stop() { if s == nil { return } s.stopOnce.Do(func() { if s.stop != nil { s.stop() } }) s.wg.Wait() } func (s *OpsScheduledReportService) run() { defer s.wg.Done() ticker := time.NewTicker(opsScheduledReportTickInterval) defer ticker.Stop() s.runOnce() for { select { case <-ticker.C: s.runOnce() case <-s.stopCtx.Done(): return } } } func (s *OpsScheduledReportService) runOnce() { if s == nil || s.opsService == nil || s.emailService == nil { return } startedAt := time.Now().UTC() runAt := startedAt ctx, cancel := context.WithTimeout(s.stopCtx, 60*time.Second) defer cancel() // Respect ops monitoring enabled switch. if !s.opsService.IsMonitoringEnabled(ctx) { return } release, ok := s.tryAcquireLeaderLock(ctx) if !ok { return } if release != nil { defer release() } now := time.Now() if s.loc != nil { now = now.In(s.loc) } reports := s.listScheduledReports(ctx, now) if len(reports) == 0 { return } reportsTotal := len(reports) reportsDue := 0 sentAttempts := 0 for _, report := range reports { if report == nil || !report.Enabled { continue } if report.NextRunAt.After(now) { continue } reportsDue++ attempts, err := s.runReport(ctx, report, now) if err != nil { s.recordHeartbeatError(runAt, time.Since(startedAt), err) return } sentAttempts += attempts } result := truncateString(fmt.Sprintf("reports=%d due=%d send_attempts=%d", reportsTotal, reportsDue, sentAttempts), 2048) s.recordHeartbeatSuccess(runAt, time.Since(startedAt), result) } type opsScheduledReport struct { Name string ReportType string Schedule string Enabled bool TimeRange time.Duration Recipients []string ErrorDigestMinCount int AccountHealthErrorRateThreshold float64 LastRunAt *time.Time NextRunAt time.Time } func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, now time.Time) []*opsScheduledReport { if s == nil || s.opsService == nil { return nil } if ctx == nil { ctx = context.Background() } emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx) if err != nil || emailCfg == nil { return nil } if !emailCfg.Report.Enabled { return nil } recipients := normalizeEmails(emailCfg.Report.Recipients) type reportDef struct { enabled bool name string kind string timeRange time.Duration schedule string } defs := []reportDef{ {enabled: emailCfg.Report.DailySummaryEnabled, name: "日报", kind: "daily_summary", timeRange: 24 * time.Hour, schedule: emailCfg.Report.DailySummarySchedule}, {enabled: emailCfg.Report.WeeklySummaryEnabled, name: "周报", kind: "weekly_summary", timeRange: 7 * 24 * time.Hour, schedule: emailCfg.Report.WeeklySummarySchedule}, {enabled: emailCfg.Report.ErrorDigestEnabled, name: "错误摘要", kind: "error_digest", timeRange: 24 * time.Hour, schedule: emailCfg.Report.ErrorDigestSchedule}, {enabled: emailCfg.Report.AccountHealthEnabled, name: "账号健康", kind: "account_health", timeRange: 24 * time.Hour, schedule: emailCfg.Report.AccountHealthSchedule}, } out := make([]*opsScheduledReport, 0, len(defs)) for _, d := range defs { if !d.enabled { continue } spec := strings.TrimSpace(d.schedule) if spec == "" { continue } sched, err := opsScheduledReportCronParser.Parse(spec) if err != nil { log.Printf("[OpsScheduledReport] invalid cron spec=%q for report=%s: %v", spec, d.kind, err) continue } lastRun := s.getLastRunAt(ctx, d.kind) base := lastRun if base.IsZero() { // Allow a schedule matching the current minute to trigger right after startup. base = now.Add(-1 * time.Minute) } next := sched.Next(base) if next.IsZero() { continue } var lastRunPtr *time.Time if !lastRun.IsZero() { lastCopy := lastRun lastRunPtr = &lastCopy } out = append(out, &opsScheduledReport{ Name: d.name, ReportType: d.kind, Schedule: spec, Enabled: true, TimeRange: d.timeRange, Recipients: recipients, ErrorDigestMinCount: emailCfg.Report.ErrorDigestMinCount, AccountHealthErrorRateThreshold: emailCfg.Report.AccountHealthErrorRateThreshold, LastRunAt: lastRunPtr, NextRunAt: next, }) } return out } func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) (int, error) { if s == nil || s.opsService == nil || s.emailService == nil || report == nil { return 0, nil } if ctx == nil { ctx = context.Background() } // Mark as "run" up-front so a broken SMTP config doesn't spam retries every minute. s.setLastRunAt(ctx, report.ReportType, now) content, err := s.generateReportHTML(ctx, report, now) if err != nil { return 0, err } if strings.TrimSpace(content) == "" { // Skip sending when the report decides not to emit content (e.g., digest below min count). return 0, nil } recipients := report.Recipients if len(recipients) == 0 && s.userService != nil { admin, err := s.userService.GetFirstAdmin(ctx) if err == nil && admin != nil && strings.TrimSpace(admin.Email) != "" { recipients = []string{strings.TrimSpace(admin.Email)} } } if len(recipients) == 0 { return 0, nil } subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name)) attempts := 0 for _, to := range recipients { addr := strings.TrimSpace(to) if addr == "" { continue } attempts++ if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil { // Ignore per-recipient failures; continue best-effort. continue } } return attempts, nil } func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) { if s == nil || s.opsService == nil || report == nil { return "", fmt.Errorf("service not initialized") } if report.TimeRange <= 0 { return "", fmt.Errorf("invalid time range") } end := now.UTC() start := end.Add(-report.TimeRange) switch strings.TrimSpace(report.ReportType) { case "daily_summary", "weekly_summary": overview, err := s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{ StartTime: start, EndTime: end, Platform: "", GroupID: nil, QueryMode: OpsQueryModeAuto, }) if err != nil { // If pre-aggregation isn't ready but the report is requested, fall back to raw. if strings.TrimSpace(report.ReportType) == "daily_summary" || strings.TrimSpace(report.ReportType) == "weekly_summary" { overview, err = s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{ StartTime: start, EndTime: end, Platform: "", GroupID: nil, QueryMode: OpsQueryModeRaw, }) } if err != nil { return "", err } } return buildOpsSummaryEmailHTML(report.Name, start, end, overview), nil case "error_digest": // Lightweight digest: list recent errors (status>=400) and breakdown by type. startTime := start endTime := end filter := &OpsErrorLogFilter{ StartTime: &startTime, EndTime: &endTime, Page: 1, PageSize: 100, } out, err := s.opsService.GetErrorLogs(ctx, filter) if err != nil { return "", err } if report.ErrorDigestMinCount > 0 && out != nil && out.Total < report.ErrorDigestMinCount { return "", nil } return buildOpsErrorDigestEmailHTML(report.Name, start, end, out), nil case "account_health": // Best-effort: use account availability (not error rate yet). avail, err := s.opsService.GetAccountAvailability(ctx, "", nil) if err != nil { return "", err } _ = report.AccountHealthErrorRateThreshold // reserved for future per-account error rate report return buildOpsAccountHealthEmailHTML(report.Name, start, end, avail), nil default: return "", fmt.Errorf("unknown report type: %s", report.ReportType) } } func buildOpsSummaryEmailHTML(title string, start, end time.Time, overview *OpsDashboardOverview) string { if overview == nil { return fmt.Sprintf("
No data.
", htmlEscape(title)) } latP50 := "-" latP99 := "-" if overview.Duration.P50 != nil { latP50 = fmt.Sprintf("%dms", *overview.Duration.P50) } if overview.Duration.P99 != nil { latP99 = fmt.Sprintf("%dms", *overview.Duration.P99) } ttftP50 := "-" ttftP99 := "-" if overview.TTFT.P50 != nil { ttftP50 = fmt.Sprintf("%dms", *overview.TTFT.P50) } if overview.TTFT.P99 != nil { ttftP99 = fmt.Sprintf("%dms", *overview.TTFT.P99) } return fmt.Sprintf(`Period: %s ~ %s (UTC)
Period: %s ~ %s (UTC)
Total Errors: %d
| Time | Platform | Status | Message |
|---|
Period: %s ~ %s (UTC)
Note: This report currently reflects account availability status only.
`, htmlEscape(strings.TrimSpace(title)), htmlEscape(start.UTC().Format(time.RFC3339)), htmlEscape(end.UTC().Format(time.RFC3339)), total, available, rateLimited, hasError, ) } func (s *OpsScheduledReportService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { if s == nil || !s.distributedLockOn { return nil, true } if s.redisClient == nil { s.warnNoRedisOnce.Do(func() { log.Printf("[OpsScheduledReport] redis not configured; running without distributed lock") }) return nil, true } if ctx == nil { ctx = context.Background() } key := opsScheduledReportLeaderLockKeyDefault ttl := opsScheduledReportLeaderLockTTLDefault if strings.TrimSpace(key) == "" { key = "ops:scheduled_reports:leader" } if ttl <= 0 { ttl = 5 * time.Minute } ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result() if err != nil { // Prefer fail-closed to avoid duplicate report sends when Redis is flaky. log.Printf("[OpsScheduledReport] leader lock SetNX failed; skipping this cycle: %v", err) return nil, false } if !ok { return nil, false } return func() { _, _ = opsScheduledReportReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result() }, true } func (s *OpsScheduledReportService) getLastRunAt(ctx context.Context, reportType string) time.Time { if s == nil || s.redisClient == nil { return time.Time{} } kind := strings.TrimSpace(reportType) if kind == "" { return time.Time{} } key := opsScheduledReportLastRunKeyPrefix + kind raw, err := s.redisClient.Get(ctx, key).Result() if err != nil || strings.TrimSpace(raw) == "" { return time.Time{} } sec, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64) if err != nil || sec <= 0 { return time.Time{} } last := time.Unix(sec, 0) // Cron schedules are interpreted in the configured timezone (s.loc). Ensure the base time // passed into cron.Next() uses the same location; otherwise the job will drift by timezone // offset (e.g. Asia/Shanghai default would run 8h later after the first execution). if s.loc != nil { return last.In(s.loc) } return last.UTC() } func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType string, t time.Time) { if s == nil || s.redisClient == nil { return } kind := strings.TrimSpace(reportType) if kind == "" { return } if t.IsZero() { t = time.Now().UTC() } key := opsScheduledReportLastRunKeyPrefix + kind _ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err() } func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, result string) { if s == nil || s.opsService == nil || s.opsService.opsRepo == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msg := strings.TrimSpace(result) if msg == "" { msg = "ok" } msg = truncateString(msg, 2048) _ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsScheduledReportJobName, LastRunAt: &runAt, LastSuccessAt: &now, LastDurationMs: &durMs, LastResult: &msg, }) } func (s *OpsScheduledReportService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) { if s == nil || s.opsService == nil || s.opsService.opsRepo == nil || err == nil { return } now := time.Now().UTC() durMs := duration.Milliseconds() msg := truncateString(err.Error(), 2048) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() _ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsScheduledReportJobName, LastRunAt: &runAt, LastErrorAt: &now, LastError: &msg, LastDurationMs: &durMs, }) } func normalizeEmails(in []string) []string { if len(in) == 0 { return nil } seen := make(map[string]struct{}, len(in)) out := make([]string, 0, len(in)) for _, raw := range in { addr := strings.ToLower(strings.TrimSpace(raw)) if addr == "" { continue } if _, ok := seen[addr]; ok { continue } seen[addr] = struct{}{} out = append(out, addr) } return out }