Files
sub2api/backend/internal/service/ops_scheduled_report_service.go
IanShaw027 23aa69f56f refactor(ops): 优化任务心跳和组件刷新机制
后端改动:
- 添加 ops_job_heartbeats.last_result 字段记录任务执行结果
- 优化告警评估器统计信息(规则数/事件数/邮件数)
- 统一各定时任务的心跳记录格式

前端改动:
- 重构 OpsConcurrencyCard 使用父组件统一控制刷新节奏
- 移除独立的 5 秒刷新定时器,改用 refreshToken 机制
- 修复 TypeScript 类型错误
2026-01-15 21:31:55 +08:00

722 lines
18 KiB
Go

package service
import (
"context"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsScheduledReportJobName = "ops_scheduled_reports"
opsScheduledReportLeaderLockKeyDefault = "ops:scheduled_reports:leader"
opsScheduledReportLeaderLockTTLDefault = 5 * time.Minute
opsScheduledReportLastRunKeyPrefix = "ops:scheduled_reports:last_run:"
opsScheduledReportTickInterval = 1 * time.Minute
)
var opsScheduledReportCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsScheduledReportReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
type OpsScheduledReportService struct {
opsService *OpsService
userService *UserService
emailService *EmailService
redisClient *redis.Client
cfg *config.Config
instanceID string
loc *time.Location
distributedLockOn bool
warnNoRedisOnce sync.Once
startOnce sync.Once
stopOnce sync.Once
stopCtx context.Context
stop context.CancelFunc
wg sync.WaitGroup
}
func NewOpsScheduledReportService(
opsService *OpsService,
userService *UserService,
emailService *EmailService,
redisClient *redis.Client,
cfg *config.Config,
) *OpsScheduledReportService {
lockOn := cfg == nil || strings.TrimSpace(cfg.RunMode) != config.RunModeSimple
loc := time.Local
if cfg != nil && strings.TrimSpace(cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
return &OpsScheduledReportService{
opsService: opsService,
userService: userService,
emailService: emailService,
redisClient: redisClient,
cfg: cfg,
instanceID: uuid.NewString(),
loc: loc,
distributedLockOn: lockOn,
warnNoRedisOnce: sync.Once{},
startOnce: sync.Once{},
stopOnce: sync.Once{},
stopCtx: nil,
stop: nil,
wg: sync.WaitGroup{},
}
}
func (s *OpsScheduledReportService) Start() {
s.StartWithContext(context.Background())
}
func (s *OpsScheduledReportService) StartWithContext(ctx context.Context) {
if s == nil {
return
}
if ctx == nil {
ctx = context.Background()
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.opsService == nil || s.emailService == nil {
return
}
s.startOnce.Do(func() {
s.stopCtx, s.stop = context.WithCancel(ctx)
s.wg.Add(1)
go s.run()
})
}
func (s *OpsScheduledReportService) Stop() {
if s == nil {
return
}
s.stopOnce.Do(func() {
if s.stop != nil {
s.stop()
}
})
s.wg.Wait()
}
func (s *OpsScheduledReportService) run() {
defer s.wg.Done()
ticker := time.NewTicker(opsScheduledReportTickInterval)
defer ticker.Stop()
s.runOnce()
for {
select {
case <-ticker.C:
s.runOnce()
case <-s.stopCtx.Done():
return
}
}
}
func (s *OpsScheduledReportService) runOnce() {
if s == nil || s.opsService == nil || s.emailService == nil {
return
}
startedAt := time.Now().UTC()
runAt := startedAt
ctx, cancel := context.WithTimeout(s.stopCtx, 60*time.Second)
defer cancel()
// Respect ops monitoring enabled switch.
if !s.opsService.IsMonitoringEnabled(ctx) {
return
}
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
now := time.Now()
if s.loc != nil {
now = now.In(s.loc)
}
reports := s.listScheduledReports(ctx, now)
if len(reports) == 0 {
return
}
reportsTotal := len(reports)
reportsDue := 0
sentAttempts := 0
for _, report := range reports {
if report == nil || !report.Enabled {
continue
}
if report.NextRunAt.After(now) {
continue
}
reportsDue++
attempts, err := s.runReport(ctx, report, now)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
return
}
sentAttempts += attempts
}
result := truncateString(fmt.Sprintf("reports=%d due=%d send_attempts=%d", reportsTotal, reportsDue, sentAttempts), 2048)
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), result)
}
type opsScheduledReport struct {
Name string
ReportType string
Schedule string
Enabled bool
TimeRange time.Duration
Recipients []string
ErrorDigestMinCount int
AccountHealthErrorRateThreshold float64
LastRunAt *time.Time
NextRunAt time.Time
}
func (s *OpsScheduledReportService) listScheduledReports(ctx context.Context, now time.Time) []*opsScheduledReport {
if s == nil || s.opsService == nil {
return nil
}
if ctx == nil {
ctx = context.Background()
}
emailCfg, err := s.opsService.GetEmailNotificationConfig(ctx)
if err != nil || emailCfg == nil {
return nil
}
if !emailCfg.Report.Enabled {
return nil
}
recipients := normalizeEmails(emailCfg.Report.Recipients)
type reportDef struct {
enabled bool
name string
kind string
timeRange time.Duration
schedule string
}
defs := []reportDef{
{enabled: emailCfg.Report.DailySummaryEnabled, name: "日报", kind: "daily_summary", timeRange: 24 * time.Hour, schedule: emailCfg.Report.DailySummarySchedule},
{enabled: emailCfg.Report.WeeklySummaryEnabled, name: "周报", kind: "weekly_summary", timeRange: 7 * 24 * time.Hour, schedule: emailCfg.Report.WeeklySummarySchedule},
{enabled: emailCfg.Report.ErrorDigestEnabled, name: "错误摘要", kind: "error_digest", timeRange: 24 * time.Hour, schedule: emailCfg.Report.ErrorDigestSchedule},
{enabled: emailCfg.Report.AccountHealthEnabled, name: "账号健康", kind: "account_health", timeRange: 24 * time.Hour, schedule: emailCfg.Report.AccountHealthSchedule},
}
out := make([]*opsScheduledReport, 0, len(defs))
for _, d := range defs {
if !d.enabled {
continue
}
spec := strings.TrimSpace(d.schedule)
if spec == "" {
continue
}
sched, err := opsScheduledReportCronParser.Parse(spec)
if err != nil {
log.Printf("[OpsScheduledReport] invalid cron spec=%q for report=%s: %v", spec, d.kind, err)
continue
}
lastRun := s.getLastRunAt(ctx, d.kind)
base := lastRun
if base.IsZero() {
// Allow a schedule matching the current minute to trigger right after startup.
base = now.Add(-1 * time.Minute)
}
next := sched.Next(base)
if next.IsZero() {
continue
}
var lastRunPtr *time.Time
if !lastRun.IsZero() {
lastCopy := lastRun
lastRunPtr = &lastCopy
}
out = append(out, &opsScheduledReport{
Name: d.name,
ReportType: d.kind,
Schedule: spec,
Enabled: true,
TimeRange: d.timeRange,
Recipients: recipients,
ErrorDigestMinCount: emailCfg.Report.ErrorDigestMinCount,
AccountHealthErrorRateThreshold: emailCfg.Report.AccountHealthErrorRateThreshold,
LastRunAt: lastRunPtr,
NextRunAt: next,
})
}
return out
}
func (s *OpsScheduledReportService) runReport(ctx context.Context, report *opsScheduledReport, now time.Time) (int, error) {
if s == nil || s.opsService == nil || s.emailService == nil || report == nil {
return 0, nil
}
if ctx == nil {
ctx = context.Background()
}
// Mark as "run" up-front so a broken SMTP config doesn't spam retries every minute.
s.setLastRunAt(ctx, report.ReportType, now)
content, err := s.generateReportHTML(ctx, report, now)
if err != nil {
return 0, err
}
if strings.TrimSpace(content) == "" {
// Skip sending when the report decides not to emit content (e.g., digest below min count).
return 0, nil
}
recipients := report.Recipients
if len(recipients) == 0 && s.userService != nil {
admin, err := s.userService.GetFirstAdmin(ctx)
if err == nil && admin != nil && strings.TrimSpace(admin.Email) != "" {
recipients = []string{strings.TrimSpace(admin.Email)}
}
}
if len(recipients) == 0 {
return 0, nil
}
subject := fmt.Sprintf("[Ops Report] %s", strings.TrimSpace(report.Name))
attempts := 0
for _, to := range recipients {
addr := strings.TrimSpace(to)
if addr == "" {
continue
}
attempts++
if err := s.emailService.SendEmail(ctx, addr, subject, content); err != nil {
// Ignore per-recipient failures; continue best-effort.
continue
}
}
return attempts, nil
}
func (s *OpsScheduledReportService) generateReportHTML(ctx context.Context, report *opsScheduledReport, now time.Time) (string, error) {
if s == nil || s.opsService == nil || report == nil {
return "", fmt.Errorf("service not initialized")
}
if report.TimeRange <= 0 {
return "", fmt.Errorf("invalid time range")
}
end := now.UTC()
start := end.Add(-report.TimeRange)
switch strings.TrimSpace(report.ReportType) {
case "daily_summary", "weekly_summary":
overview, err := s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{
StartTime: start,
EndTime: end,
Platform: "",
GroupID: nil,
QueryMode: OpsQueryModeAuto,
})
if err != nil {
// If pre-aggregation isn't ready but the report is requested, fall back to raw.
if strings.TrimSpace(report.ReportType) == "daily_summary" || strings.TrimSpace(report.ReportType) == "weekly_summary" {
overview, err = s.opsService.GetDashboardOverview(ctx, &OpsDashboardFilter{
StartTime: start,
EndTime: end,
Platform: "",
GroupID: nil,
QueryMode: OpsQueryModeRaw,
})
}
if err != nil {
return "", err
}
}
return buildOpsSummaryEmailHTML(report.Name, start, end, overview), nil
case "error_digest":
// Lightweight digest: list recent errors (status>=400) and breakdown by type.
startTime := start
endTime := end
filter := &OpsErrorLogFilter{
StartTime: &startTime,
EndTime: &endTime,
Page: 1,
PageSize: 100,
}
out, err := s.opsService.GetErrorLogs(ctx, filter)
if err != nil {
return "", err
}
if report.ErrorDigestMinCount > 0 && out != nil && out.Total < report.ErrorDigestMinCount {
return "", nil
}
return buildOpsErrorDigestEmailHTML(report.Name, start, end, out), nil
case "account_health":
// Best-effort: use account availability (not error rate yet).
avail, err := s.opsService.GetAccountAvailability(ctx, "", nil)
if err != nil {
return "", err
}
_ = report.AccountHealthErrorRateThreshold // reserved for future per-account error rate report
return buildOpsAccountHealthEmailHTML(report.Name, start, end, avail), nil
default:
return "", fmt.Errorf("unknown report type: %s", report.ReportType)
}
}
func buildOpsSummaryEmailHTML(title string, start, end time.Time, overview *OpsDashboardOverview) string {
if overview == nil {
return fmt.Sprintf("<h2>%s</h2><p>No data.</p>", htmlEscape(title))
}
latP50 := "-"
latP99 := "-"
if overview.Duration.P50 != nil {
latP50 = fmt.Sprintf("%dms", *overview.Duration.P50)
}
if overview.Duration.P99 != nil {
latP99 = fmt.Sprintf("%dms", *overview.Duration.P99)
}
ttftP50 := "-"
ttftP99 := "-"
if overview.TTFT.P50 != nil {
ttftP50 = fmt.Sprintf("%dms", *overview.TTFT.P50)
}
if overview.TTFT.P99 != nil {
ttftP99 = fmt.Sprintf("%dms", *overview.TTFT.P99)
}
return fmt.Sprintf(`
<h2>%s</h2>
<p><b>Period</b>: %s ~ %s (UTC)</p>
<ul>
<li><b>Total Requests</b>: %d</li>
<li><b>Success</b>: %d</li>
<li><b>Errors (SLA)</b>: %d</li>
<li><b>Business Limited</b>: %d</li>
<li><b>SLA</b>: %.2f%%</li>
<li><b>Error Rate</b>: %.2f%%</li>
<li><b>Upstream Error Rate (excl 429/529)</b>: %.2f%%</li>
<li><b>Upstream Errors</b>: excl429/529=%d, 429=%d, 529=%d</li>
<li><b>Latency</b>: p50=%s, p99=%s</li>
<li><b>TTFT</b>: p50=%s, p99=%s</li>
<li><b>Tokens</b>: %d</li>
<li><b>QPS</b>: current=%.1f, peak=%.1f, avg=%.1f</li>
<li><b>TPS</b>: current=%.1f, peak=%.1f, avg=%.1f</li>
</ul>
`,
htmlEscape(strings.TrimSpace(title)),
htmlEscape(start.UTC().Format(time.RFC3339)),
htmlEscape(end.UTC().Format(time.RFC3339)),
overview.RequestCountTotal,
overview.SuccessCount,
overview.ErrorCountSLA,
overview.BusinessLimitedCount,
overview.SLA*100,
overview.ErrorRate*100,
overview.UpstreamErrorRate*100,
overview.UpstreamErrorCountExcl429529,
overview.Upstream429Count,
overview.Upstream529Count,
htmlEscape(latP50),
htmlEscape(latP99),
htmlEscape(ttftP50),
htmlEscape(ttftP99),
overview.TokenConsumed,
overview.QPS.Current,
overview.QPS.Peak,
overview.QPS.Avg,
overview.TPS.Current,
overview.TPS.Peak,
overview.TPS.Avg,
)
}
func buildOpsErrorDigestEmailHTML(title string, start, end time.Time, list *OpsErrorLogList) string {
total := 0
recent := []*OpsErrorLog{}
if list != nil {
total = list.Total
recent = list.Errors
}
if len(recent) > 10 {
recent = recent[:10]
}
rows := ""
for _, item := range recent {
if item == nil {
continue
}
rows += fmt.Sprintf(
"<tr><td>%s</td><td>%s</td><td>%d</td><td>%s</td></tr>",
htmlEscape(item.CreatedAt.UTC().Format(time.RFC3339)),
htmlEscape(item.Platform),
item.StatusCode,
htmlEscape(truncateString(item.Message, 180)),
)
}
if rows == "" {
rows = "<tr><td colspan=\"4\">No recent errors.</td></tr>"
}
return fmt.Sprintf(`
<h2>%s</h2>
<p><b>Period</b>: %s ~ %s (UTC)</p>
<p><b>Total Errors</b>: %d</p>
<h3>Recent</h3>
<table border="1" cellpadding="6" cellspacing="0" style="border-collapse:collapse;">
<thead><tr><th>Time</th><th>Platform</th><th>Status</th><th>Message</th></tr></thead>
<tbody>%s</tbody>
</table>
`,
htmlEscape(strings.TrimSpace(title)),
htmlEscape(start.UTC().Format(time.RFC3339)),
htmlEscape(end.UTC().Format(time.RFC3339)),
total,
rows,
)
}
func buildOpsAccountHealthEmailHTML(title string, start, end time.Time, avail *OpsAccountAvailability) string {
total := 0
available := 0
rateLimited := 0
hasError := 0
if avail != nil && avail.Accounts != nil {
for _, a := range avail.Accounts {
if a == nil {
continue
}
total++
if a.IsAvailable {
available++
}
if a.IsRateLimited {
rateLimited++
}
if a.HasError {
hasError++
}
}
}
return fmt.Sprintf(`
<h2>%s</h2>
<p><b>Period</b>: %s ~ %s (UTC)</p>
<ul>
<li><b>Total Accounts</b>: %d</li>
<li><b>Available</b>: %d</li>
<li><b>Rate Limited</b>: %d</li>
<li><b>Error</b>: %d</li>
</ul>
<p>Note: This report currently reflects account availability status only.</p>
`,
htmlEscape(strings.TrimSpace(title)),
htmlEscape(start.UTC().Format(time.RFC3339)),
htmlEscape(end.UTC().Format(time.RFC3339)),
total,
available,
rateLimited,
hasError,
)
}
func (s *OpsScheduledReportService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil || !s.distributedLockOn {
return nil, true
}
if s.redisClient == nil {
s.warnNoRedisOnce.Do(func() {
log.Printf("[OpsScheduledReport] redis not configured; running without distributed lock")
})
return nil, true
}
if ctx == nil {
ctx = context.Background()
}
key := opsScheduledReportLeaderLockKeyDefault
ttl := opsScheduledReportLeaderLockTTLDefault
if strings.TrimSpace(key) == "" {
key = "ops:scheduled_reports:leader"
}
if ttl <= 0 {
ttl = 5 * time.Minute
}
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err != nil {
// Prefer fail-closed to avoid duplicate report sends when Redis is flaky.
log.Printf("[OpsScheduledReport] leader lock SetNX failed; skipping this cycle: %v", err)
return nil, false
}
if !ok {
return nil, false
}
return func() {
_, _ = opsScheduledReportReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
func (s *OpsScheduledReportService) getLastRunAt(ctx context.Context, reportType string) time.Time {
if s == nil || s.redisClient == nil {
return time.Time{}
}
kind := strings.TrimSpace(reportType)
if kind == "" {
return time.Time{}
}
key := opsScheduledReportLastRunKeyPrefix + kind
raw, err := s.redisClient.Get(ctx, key).Result()
if err != nil || strings.TrimSpace(raw) == "" {
return time.Time{}
}
sec, err := strconv.ParseInt(strings.TrimSpace(raw), 10, 64)
if err != nil || sec <= 0 {
return time.Time{}
}
last := time.Unix(sec, 0)
// Cron schedules are interpreted in the configured timezone (s.loc). Ensure the base time
// passed into cron.Next() uses the same location; otherwise the job will drift by timezone
// offset (e.g. Asia/Shanghai default would run 8h later after the first execution).
if s.loc != nil {
return last.In(s.loc)
}
return last.UTC()
}
func (s *OpsScheduledReportService) setLastRunAt(ctx context.Context, reportType string, t time.Time) {
if s == nil || s.redisClient == nil {
return
}
kind := strings.TrimSpace(reportType)
if kind == "" {
return
}
if t.IsZero() {
t = time.Now().UTC()
}
key := opsScheduledReportLastRunKeyPrefix + kind
_ = s.redisClient.Set(ctx, key, strconv.FormatInt(t.UTC().Unix(), 10), 14*24*time.Hour).Err()
}
func (s *OpsScheduledReportService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, result string) {
if s == nil || s.opsService == nil || s.opsService.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
msg := strings.TrimSpace(result)
if msg == "" {
msg = "ok"
}
msg = truncateString(msg, 2048)
_ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsScheduledReportJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
LastResult: &msg,
})
}
func (s *OpsScheduledReportService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsService == nil || s.opsService.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.opsService.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsScheduledReportJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}
func normalizeEmails(in []string) []string {
if len(in) == 0 {
return nil
}
seen := make(map[string]struct{}, len(in))
out := make([]string, 0, len(in))
for _, raw := range in {
addr := strings.ToLower(strings.TrimSpace(raw))
if addr == "" {
continue
}
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}
out = append(out, addr)
}
return out
}