diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 40f0191c..81225ca6 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -254,7 +254,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig) opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) opsAlertEvaluatorService := service.ProvideOpsAlertEvaluatorService(opsService, opsRepository, emailService, redisClient, configConfig) - opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig, channelMonitorService) + opsCleanupService := service.ProvideOpsCleanupService(opsRepository, db, redisClient, configConfig, channelMonitorService, settingRepository, opsService) opsScheduledReportService := service.ProvideOpsScheduledReportService(opsService, userService, emailService, redisClient, configConfig) tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, compositeTokenCacheInvalidator, schedulerCache, configConfig, tempUnschedCache, privacyClientFactory, proxyRepository, oAuthRefreshAPI) accountExpiryService := service.ProvideAccountExpiryService(accountRepository) diff --git a/backend/internal/service/ops_cleanup_executor.go b/backend/internal/service/ops_cleanup_executor.go new file mode 100644 index 00000000..63a7367f --- /dev/null +++ b/backend/internal/service/ops_cleanup_executor.go @@ -0,0 +1,164 @@ +package service + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" +) + +const ( + opsCleanupDefaultSchedule = "0 2 * * *" + opsCleanupBatchSize = 5000 + opsCleanupCronStopTimeout = 3 * time.Second + opsCleanupRunTimeout = 30 * time.Minute + opsCleanupHeartbeatTimeout = 2 * time.Second +) + +type opsCleanupTarget struct { + retentionDays int + table string + timeCol string + castDate bool + counter *int64 +} + +type opsCleanupDeletedCounts struct { + errorLogs int64 + retryAttempts int64 + alertEvents int64 + systemLogs int64 + logAudits int64 + systemMetrics int64 + hourlyPreagg int64 + dailyPreagg int64 +} + +func (c opsCleanupDeletedCounts) String() string { + return fmt.Sprintf( + "error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", + c.errorLogs, + c.retryAttempts, + c.alertEvents, + c.systemLogs, + c.logAudits, + c.systemMetrics, + c.hourlyPreagg, + c.dailyPreagg, + ) +} + +// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。 +// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据 +// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true +// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天 +func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) { + if days < 0 { + return time.Time{}, false, false + } + if days == 0 { + return time.Time{}, true, true + } + return now.AddDate(0, 0, -days), false, true +} + +func opsCleanupRunOne( + ctx context.Context, + db *sql.DB, + truncate bool, + cutoff time.Time, + table, timeCol string, + castDate bool, + batchSize int, +) (int64, error) { + if truncate { + return truncateOpsTable(ctx, db, table) + } + return deleteOldRowsByID(ctx, db, table, timeCol, cutoff, batchSize, castDate) +} + +func deleteOldRowsByID( + ctx context.Context, + db *sql.DB, + table string, + timeColumn string, + cutoff time.Time, + batchSize int, + castCutoffToDate bool, +) (int64, error) { + if db == nil { + return 0, nil + } + if batchSize <= 0 { + batchSize = opsCleanupBatchSize + } + + where := fmt.Sprintf("%s < $1", timeColumn) + if castCutoffToDate { + where = fmt.Sprintf("%s < $1::date", timeColumn) + } + + q := fmt.Sprintf(` +WITH batch AS ( + SELECT id FROM %s + WHERE %s + ORDER BY id + LIMIT $2 +) +DELETE FROM %s +WHERE id IN (SELECT id FROM batch) +`, table, where, table) + + var total int64 + for { + res, err := db.ExecContext(ctx, q, cutoff, batchSize) + if err != nil { + if isMissingRelationError(err) { + return total, nil + } + return total, err + } + affected, err := res.RowsAffected() + if err != nil { + return total, err + } + total += affected + if affected == 0 { + break + } + } + return total, nil +} + +// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。 +func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) { + if db == nil { + return 0, nil + } + var count int64 + if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil { + if isMissingRelationError(err) { + return 0, nil + } + return 0, fmt.Errorf("count %s: %w", table, err) + } + if count == 0 { + return 0, nil + } + if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil { + if isMissingRelationError(err) { + return 0, nil + } + return 0, fmt.Errorf("truncate %s: %w", table, err) + } + return count, nil +} + +func isMissingRelationError(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "does not exist") && strings.Contains(s, "relation") +} diff --git a/backend/internal/service/ops_cleanup_overlay_test.go b/backend/internal/service/ops_cleanup_overlay_test.go new file mode 100644 index 00000000..f751a426 --- /dev/null +++ b/backend/internal/service/ops_cleanup_overlay_test.go @@ -0,0 +1,257 @@ +//go:build unit + +package service + +import ( + "context" + "encoding/json" + "testing" + + "github.com/Wei-Shaw/sub2api/internal/config" +) + +// makeOverlayService 构造一个没有 cron / db 的 cleanup service,仅用来测试 effective overlay。 +func makeOverlayService(repo SettingRepository, base config.OpsCleanupConfig) *OpsCleanupService { + cfg := &config.Config{} + cfg.Ops.Cleanup = base + return &OpsCleanupService{ + cfg: cfg, + settingRepo: repo, + } +} + +func writeAdvancedSettings(t *testing.T, repo *runtimeSettingRepoStub, dr OpsDataRetentionSettings) { + t.Helper() + adv := OpsAdvancedSettings{DataRetention: dr} + raw, err := json.Marshal(adv) + if err != nil { + t.Fatalf("marshal: %v", err) + } + if err := repo.Set(context.Background(), SettingKeyOpsAdvancedSettings, string(raw)); err != nil { + t.Fatalf("set: %v", err) + } +} + +func TestComputeEffective_FallbackToCfgWhenSettingsAbsent(t *testing.T) { + repo := newRuntimeSettingRepoStub() + base := config.OpsCleanupConfig{ + Enabled: false, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + MinuteMetricsRetentionDays: 30, + HourlyMetricsRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + + svc.computeEffectiveLocked(context.Background()) + + if svc.effective != base { + t.Fatalf("expected effective == cfg base, got %#v", svc.effective) + } +} + +func TestComputeEffective_SettingsOverridesAll(t *testing.T) { + repo := newRuntimeSettingRepoStub() + writeAdvancedSettings(t, repo, OpsDataRetentionSettings{ + CleanupEnabled: true, + CleanupSchedule: "0 * * * *", + ErrorLogRetentionDays: 0, + MinuteMetricsRetentionDays: 7, + HourlyMetricsRetentionDays: 14, + }) + base := config.OpsCleanupConfig{ + Enabled: false, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + MinuteMetricsRetentionDays: 30, + HourlyMetricsRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + + svc.computeEffectiveLocked(context.Background()) + + want := config.OpsCleanupConfig{ + Enabled: true, + Schedule: "0 * * * *", + ErrorLogRetentionDays: 0, + MinuteMetricsRetentionDays: 7, + HourlyMetricsRetentionDays: 14, + } + if svc.effective != want { + t.Fatalf("effective mismatch:\nwant %#v\n got %#v", want, svc.effective) + } +} + +func TestComputeEffective_EmptyScheduleFallbackToCfg(t *testing.T) { + repo := newRuntimeSettingRepoStub() + writeAdvancedSettings(t, repo, OpsDataRetentionSettings{ + CleanupEnabled: true, + CleanupSchedule: " ", // 空白被 trim 后视为空 + ErrorLogRetentionDays: 5, + MinuteMetricsRetentionDays: 5, + HourlyMetricsRetentionDays: 5, + }) + base := config.OpsCleanupConfig{ + Enabled: false, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + MinuteMetricsRetentionDays: 30, + HourlyMetricsRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + + svc.computeEffectiveLocked(context.Background()) + + if svc.effective.Schedule != "0 2 * * *" { + t.Fatalf("expected schedule fallback to cfg, got %q", svc.effective.Schedule) + } + if !svc.effective.Enabled { + t.Fatalf("expected enabled=true from settings") + } + if svc.effective.ErrorLogRetentionDays != 5 { + t.Fatalf("expected retention=5 from settings, got %d", svc.effective.ErrorLogRetentionDays) + } +} + +func TestComputeEffective_NegativeRetentionFallsBackToCfg(t *testing.T) { + repo := newRuntimeSettingRepoStub() + writeAdvancedSettings(t, repo, OpsDataRetentionSettings{ + CleanupEnabled: true, + CleanupSchedule: "0 * * * *", + ErrorLogRetentionDays: -1, + MinuteMetricsRetentionDays: -1, + HourlyMetricsRetentionDays: -1, + }) + base := config.OpsCleanupConfig{ + Enabled: false, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + MinuteMetricsRetentionDays: 60, + HourlyMetricsRetentionDays: 90, + } + svc := makeOverlayService(repo, base) + + svc.computeEffectiveLocked(context.Background()) + + if svc.effective.ErrorLogRetentionDays != 30 || + svc.effective.MinuteMetricsRetentionDays != 60 || + svc.effective.HourlyMetricsRetentionDays != 90 { + t.Fatalf("expected retention fallback to cfg, got %#v", svc.effective) + } +} + +func TestComputeEffective_BadJSONFallsBackToCfg(t *testing.T) { + repo := newRuntimeSettingRepoStub() + if err := repo.Set(context.Background(), SettingKeyOpsAdvancedSettings, "{not json"); err != nil { + t.Fatalf("set: %v", err) + } + base := config.OpsCleanupConfig{ + Enabled: true, + Schedule: "0 3 * * *", + ErrorLogRetentionDays: 30, + MinuteMetricsRetentionDays: 30, + HourlyMetricsRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + + svc.computeEffectiveLocked(context.Background()) + + if svc.effective != base { + t.Fatalf("expected fallback to cfg on bad JSON, got %#v", svc.effective) + } +} + +// 验证 OpsService.UpdateOpsAdvancedSettings 写入后会调用 cleanupReloader.Reload。 +type fakeCleanupReloader struct { + calls int + last context.Context + err error +} + +func (f *fakeCleanupReloader) Reload(ctx context.Context) error { + f.calls++ + f.last = ctx + return f.err +} + +func TestUpdateOpsAdvancedSettings_TriggersReload(t *testing.T) { + repo := newRuntimeSettingRepoStub() + reloader := &fakeCleanupReloader{} + svc := &OpsService{settingRepo: repo} + svc.SetCleanupReloader(reloader) + + cfg := defaultOpsAdvancedSettings() + cfg.DataRetention.CleanupEnabled = true + cfg.DataRetention.CleanupSchedule = "0 * * * *" + cfg.DataRetention.ErrorLogRetentionDays = 3 + cfg.DataRetention.MinuteMetricsRetentionDays = 3 + cfg.DataRetention.HourlyMetricsRetentionDays = 3 + + if _, err := svc.UpdateOpsAdvancedSettings(context.Background(), cfg); err != nil { + t.Fatalf("update: %v", err) + } + if reloader.calls != 1 { + t.Fatalf("expected reloader.Reload called once, got %d", reloader.calls) + } +} + +func TestReload_BeforeStart_IsNoop(t *testing.T) { + svc := &OpsCleanupService{} + if err := svc.Reload(context.Background()); err != nil { + t.Fatalf("Reload before Start should return nil, got %v", err) + } +} + +func TestReload_AfterStop_IsNoop(t *testing.T) { + svc := &OpsCleanupService{started: true, stopped: true} + if err := svc.Reload(context.Background()); err != nil { + t.Fatalf("Reload after Stop should return nil, got %v", err) + } +} + +func TestUpdateOpsAdvancedSettings_NilReloader_NoPanic(t *testing.T) { + repo := newRuntimeSettingRepoStub() + svc := &OpsService{settingRepo: repo} + // cleanupReloader intentionally nil + + cfg := defaultOpsAdvancedSettings() + cfg.DataRetention.ErrorLogRetentionDays = 7 + + // should not panic + if _, err := svc.UpdateOpsAdvancedSettings(context.Background(), cfg); err != nil { + t.Fatalf("update with nil reloader: %v", err) + } +} + +func TestStart_IdempotentSecondCall(t *testing.T) { + svc := &OpsCleanupService{started: true} + svc.Start() // second call should be noop, not panic +} + +func TestRefreshEffectiveBeforeRun_UpdatesSnapshot(t *testing.T) { + repo := newRuntimeSettingRepoStub() + base := config.OpsCleanupConfig{ + Enabled: true, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + svc.computeEffectiveLocked(context.Background()) + + if svc.effective.ErrorLogRetentionDays != 30 { + t.Fatalf("initial retention should be 30, got %d", svc.effective.ErrorLogRetentionDays) + } + + // simulate UI change + writeAdvancedSettings(t, repo, OpsDataRetentionSettings{ + CleanupEnabled: true, + CleanupSchedule: "0 * * * *", + ErrorLogRetentionDays: 7, + }) + + svc.refreshEffectiveBeforeRun(context.Background()) + snap := svc.snapshotEffective() + if snap.ErrorLogRetentionDays != 7 { + t.Fatalf("after refresh, retention should be 7, got %d", snap.ErrorLogRetentionDays) + } +} diff --git a/backend/internal/service/ops_cleanup_service.go b/backend/internal/service/ops_cleanup_service.go index 44ec1ad1..60a690f3 100644 --- a/backend/internal/service/ops_cleanup_service.go +++ b/backend/internal/service/ops_cleanup_service.go @@ -3,6 +3,8 @@ package service import ( "context" "database/sql" + "encoding/json" + "errors" "fmt" "strings" "sync" @@ -45,13 +47,18 @@ type OpsCleanupService struct { redisClient *redis.Client cfg *config.Config channelMonitorSvc *ChannelMonitorService + settingRepo SettingRepository instanceID string - cron *cron.Cron - - startOnce sync.Once - stopOnce sync.Once + // mu 守护 cron 实例切换 + effective 配置切换。 + // 这里不再用 startOnce/stopOnce,是因为 Reload 需要"停旧 cron 重启新 cron", + // 而 Once 一旦触发就无法再次执行;改为 started/stopped 布尔配合 mu。 + mu sync.Mutex + cron *cron.Cron + started bool + stopped bool + effective config.OpsCleanupConfig warnNoRedisOnce sync.Once } @@ -62,6 +69,7 @@ func NewOpsCleanupService( redisClient *redis.Client, cfg *config.Config, channelMonitorSvc *ChannelMonitorService, + settingRepo SettingRepository, ) *OpsCleanupService { return &OpsCleanupService{ opsRepo: opsRepo, @@ -69,10 +77,13 @@ func NewOpsCleanupService( redisClient: redisClient, cfg: cfg, channelMonitorSvc: channelMonitorSvc, + settingRepo: settingRepo, instanceID: uuid.NewString(), } } +// Start 首次启动 cron 调度。Enabled / Schedule 由 effective 配置决定(settings 优先 cfg)。 +// 重复调用幂等。 func (s *OpsCleanupService) Start() { if s == nil { return @@ -80,54 +91,169 @@ func (s *OpsCleanupService) Start() { if s.cfg != nil && !s.cfg.Ops.Enabled { return } - if s.cfg != nil && !s.cfg.Ops.Cleanup.Enabled { - logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (disabled)") - return - } if s.opsRepo == nil || s.db == nil { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (missing deps)") return } - s.startOnce.Do(func() { - schedule := "0 2 * * *" - if s.cfg != nil && strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) != "" { - schedule = strings.TrimSpace(s.cfg.Ops.Cleanup.Schedule) - } - - loc := time.Local - if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" { - if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil { - loc = parsed - } - } - - c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc)) - _, err := c.AddFunc(schedule, func() { s.runScheduled() }) - if err != nil { - logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (invalid schedule=%q): %v", schedule, err) - return - } - s.cron = c - s.cron.Start() - logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] started (schedule=%q tz=%s)", schedule, loc.String()) - }) + s.mu.Lock() + defer s.mu.Unlock() + if s.started || s.stopped { + return + } + s.started = true + if err := s.applyScheduleLocked(context.Background()); err != nil { + logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started: %v", err) + } } +// Stop 关闭 cron。幂等。 func (s *OpsCleanupService) Stop() { if s == nil { return } - s.stopOnce.Do(func() { - if s.cron != nil { - ctx := s.cron.Stop() - select { - case <-ctx.Done(): - case <-time.After(3 * time.Second): - logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out") - } + s.mu.Lock() + defer s.mu.Unlock() + if s.stopped { + return + } + s.stopped = true + s.stopCronLocked() +} + +// stopCronLocked 停掉当前 cron 实例(带 3s 超时)。调用方持锁。 +func (s *OpsCleanupService) stopCronLocked() { + if s.cron == nil { + return + } + ctx := s.cron.Stop() + select { + case <-ctx.Done(): + case <-time.After(opsCleanupCronStopTimeout): + logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out") + } + s.cron = nil +} + +// applyScheduleLocked 重新计算 effective 配置并按其 schedule 重建 cron。调用方持锁。 +// 若 effective.Enabled=false(用户在 UI 关闭清理),停旧 cron 后直接返回,不创建新 cron。 +func (s *OpsCleanupService) applyScheduleLocked(ctx context.Context) error { + s.computeEffectiveLocked(ctx) + s.stopCronLocked() + + if !s.effective.Enabled { + logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron disabled by settings") + return nil + } + + schedule := strings.TrimSpace(s.effective.Schedule) + if schedule == "" { + schedule = opsCleanupDefaultSchedule + } + + loc := time.Local + if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" { + if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil { + loc = parsed } - }) + } + + c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc)) + if _, err := c.AddFunc(schedule, func() { s.runScheduled() }); err != nil { + return fmt.Errorf("invalid schedule %q: %w", schedule, err) + } + c.Start() + s.cron = c + logger.LegacyPrintf("service.ops_cleanup", + "[OpsCleanup] scheduled (schedule=%q tz=%s retention_days=err:%d/min:%d/hour:%d)", + schedule, loc.String(), + s.effective.ErrorLogRetentionDays, + s.effective.MinuteMetricsRetentionDays, + s.effective.HourlyMetricsRetentionDays, + ) + return nil +} + +// Reload 重新读取 ops_advanced_settings.data_retention 并按新配置重建 cron。 +// 适用于 admin 在 UI 修改清理设置后立即生效(schedule / enabled 改动需要 Reload; +// retention 改动 runScheduled 顶部也会刷新,下一次触发即生效)。 +// 若 service 还未 Start 或已 Stop,Reload 不做任何事。 +func (s *OpsCleanupService) Reload(ctx context.Context) error { + if s == nil { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + if !s.started || s.stopped { + return nil + } + return s.applyScheduleLocked(ctx) +} + +// computeEffectiveLocked 计算"生效配置"并写入 s.effective。调用方持锁。 +// +// 优先级:UI 写入的 settings.ops_advanced_settings.data_retention(权威)覆盖 cfg.Ops.Cleanup 的副本。 +// - Enabled:settings 直接覆盖 +// - Schedule:settings 非空时覆盖,否则保留 cfg +// - *RetentionDays:settings >=0 时覆盖(包括 0=TRUNCATE),<0 沿用 cfg +// +// 若 settings 表无该 key(ErrSettingNotFound)或解析失败,整体 fallback 到 cfg.Ops.Cleanup。 +func (s *OpsCleanupService) computeEffectiveLocked(ctx context.Context) { + base := config.OpsCleanupConfig{} + if s.cfg != nil { + base = s.cfg.Ops.Cleanup + } + defer func() { s.effective = base }() + + if s.settingRepo == nil { + return + } + if ctx == nil { + ctx = context.Background() + } + raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsAdvancedSettings) + if err != nil { + if !errors.Is(err, ErrSettingNotFound) { + logger.LegacyPrintf("service.ops_cleanup", + "[OpsCleanup] read advanced settings failed, using cfg: %v", err) + } + return + } + var adv OpsAdvancedSettings + if err := json.Unmarshal([]byte(raw), &adv); err != nil { + logger.LegacyPrintf("service.ops_cleanup", + "[OpsCleanup] parse advanced settings failed, using cfg: %v", err) + return + } + dr := adv.DataRetention + base.Enabled = dr.CleanupEnabled + if sched := strings.TrimSpace(dr.CleanupSchedule); sched != "" { + base.Schedule = sched + } + if dr.ErrorLogRetentionDays >= 0 { + base.ErrorLogRetentionDays = dr.ErrorLogRetentionDays + } + if dr.MinuteMetricsRetentionDays >= 0 { + base.MinuteMetricsRetentionDays = dr.MinuteMetricsRetentionDays + } + if dr.HourlyMetricsRetentionDays >= 0 { + base.HourlyMetricsRetentionDays = dr.HourlyMetricsRetentionDays + } +} + +// snapshotEffective 取一份 effective 副本(runCleanupOnce 等读路径使用)。 +func (s *OpsCleanupService) snapshotEffective() config.OpsCleanupConfig { + s.mu.Lock() + defer s.mu.Unlock() + return s.effective +} + +// refreshEffectiveBeforeRun 在 cron 触发时刷新 effective,让 retention 改动当次即生效。 +// schedule 改动不影响当次(cron 调度由库管理,需要 Reload 才换 schedule)。 +func (s *OpsCleanupService) refreshEffectiveBeforeRun(ctx context.Context) { + s.mu.Lock() + defer s.mu.Unlock() + s.computeEffectiveLocked(ctx) } func (s *OpsCleanupService) runScheduled() { @@ -135,9 +261,12 @@ func (s *OpsCleanupService) runScheduled() { return } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupRunTimeout) defer cancel() + // 让 retention 改动当次生效(schedule/enabled 改动需要 Reload)。 + s.refreshEffectiveBeforeRun(ctx) + release, ok := s.tryAcquireLeaderLock(ctx) if !ok { return @@ -159,124 +288,36 @@ func (s *OpsCleanupService) runScheduled() { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts) } -type opsCleanupDeletedCounts struct { - errorLogs int64 - retryAttempts int64 - alertEvents int64 - systemLogs int64 - logAudits int64 - systemMetrics int64 - hourlyPreagg int64 - dailyPreagg int64 -} - -func (c opsCleanupDeletedCounts) String() string { - return fmt.Sprintf( - "error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", - c.errorLogs, - c.retryAttempts, - c.alertEvents, - c.systemLogs, - c.logAudits, - c.systemMetrics, - c.hourlyPreagg, - c.dailyPreagg, - ) -} - -// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。 -// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据 -// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true -// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天 -// -// 之所以 days==0 走 TRUNCATE 而非"now+24h cutoff + DELETE": -// - 速度从 O(N) 降到 O(1),对百万行级表毫秒完成 -// - 无 WAL 写入、无后续 VACUUM 压力 -// - 这些 ops 表只有 cleanup 任务自己写,TRUNCATE 的 ACCESS EXCLUSIVE 锁影响可忽略 -func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) { - if days < 0 { - return time.Time{}, false, false - } - if days == 0 { - return time.Time{}, true, true - } - return now.AddDate(0, 0, -days), false, true -} - func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) { out := opsCleanupDeletedCounts{} if s == nil || s.db == nil || s.cfg == nil { return out, nil } - batchSize := 5000 - + effective := s.snapshotEffective() now := time.Now().UTC() - // runOne 把"truncate? cutoff? batched delete?"封装到一处, - // 让三组清理(错误日志类 / 分钟指标 / 小时+日预聚合)调用方只关心表名和列名。 - runOne := func(truncate bool, cutoff time.Time, table, timeCol string, castDate bool) (int64, error) { - if truncate { - return truncateOpsTable(ctx, s.db, table) - } - return deleteOldRowsByID(ctx, s.db, table, timeCol, cutoff, batchSize, castDate) + targets := []opsCleanupTarget{ + {effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs}, + {effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts}, + {effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents}, + {effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs}, + {effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits}, + {effective.MinuteMetricsRetentionDays, "ops_system_metrics", "created_at", false, &out.systemMetrics}, + {effective.HourlyMetricsRetentionDays, "ops_metrics_hourly", "bucket_start", false, &out.hourlyPreagg}, + {effective.HourlyMetricsRetentionDays, "ops_metrics_daily", "bucket_date", true, &out.dailyPreagg}, } - // Error-like tables: error logs / retry attempts / alert events / system logs / cleanup audits. - if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.ErrorLogRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_error_logs", "created_at", false) + for _, t := range targets { + cutoff, truncate, ok := opsCleanupPlan(now, t.retentionDays) + if !ok { + continue + } + n, err := opsCleanupRunOne(ctx, s.db, truncate, cutoff, t.table, t.timeCol, t.castDate, opsCleanupBatchSize) if err != nil { return out, err } - out.errorLogs = n - - n, err = runOne(truncate, cutoff, "ops_retry_attempts", "created_at", false) - if err != nil { - return out, err - } - out.retryAttempts = n - - n, err = runOne(truncate, cutoff, "ops_alert_events", "created_at", false) - if err != nil { - return out, err - } - out.alertEvents = n - - n, err = runOne(truncate, cutoff, "ops_system_logs", "created_at", false) - if err != nil { - return out, err - } - out.systemLogs = n - - n, err = runOne(truncate, cutoff, "ops_system_log_cleanup_audits", "created_at", false) - if err != nil { - return out, err - } - out.logAudits = n - } - - // Minute-level metrics snapshots. - if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.MinuteMetricsRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_system_metrics", "created_at", false) - if err != nil { - return out, err - } - out.systemMetrics = n - } - - // Pre-aggregation tables (hourly/daily). - if cutoff, truncate, ok := opsCleanupPlan(now, s.cfg.Ops.Cleanup.HourlyMetricsRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_metrics_hourly", "bucket_start", false) - if err != nil { - return out, err - } - out.hourlyPreagg = n - - n, err = runOne(truncate, cutoff, "ops_metrics_daily", "bucket_date", true) - if err != nil { - return out, err - } - out.dailyPreagg = n + *t.counter = n } // Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。 @@ -291,100 +332,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet return out, nil } -func deleteOldRowsByID( - ctx context.Context, - db *sql.DB, - table string, - timeColumn string, - cutoff time.Time, - batchSize int, - castCutoffToDate bool, -) (int64, error) { - if db == nil { - return 0, nil - } - if batchSize <= 0 { - batchSize = 5000 - } - - where := fmt.Sprintf("%s < $1", timeColumn) - if castCutoffToDate { - where = fmt.Sprintf("%s < $1::date", timeColumn) - } - - q := fmt.Sprintf(` -WITH batch AS ( - SELECT id FROM %s - WHERE %s - ORDER BY id - LIMIT $2 -) -DELETE FROM %s -WHERE id IN (SELECT id FROM batch) -`, table, where, table) - - var total int64 - for { - res, err := db.ExecContext(ctx, q, cutoff, batchSize) - if err != nil { - // If ops tables aren't present yet (partial deployments), treat as no-op. - if isMissingRelationError(err) { - return total, nil - } - return total, err - } - affected, err := res.RowsAffected() - if err != nil { - return total, err - } - total += affected - if affected == 0 { - break - } - } - return total, nil -} - -// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。 -// -// 与 deleteOldRowsByID 的差异: -// - 不可指定 WHERE 条件,仅用于 days==0 的"清空全部"语义 -// - O(1) 释放表的物理存储页,毫秒级完成,无 WAL 写入、无 VACUUM 压力 -// - 需要 ACCESS EXCLUSIVE 锁,但 ops 表只有清理任务自己写入,瞬间锁影响可忽略 -// -// 表不存在(部分部署)静默返回 0,与 deleteOldRowsByID 保持一致。 -func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) { - if db == nil { - return 0, nil - } - var count int64 - if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil { - if isMissingRelationError(err) { - return 0, nil - } - return 0, fmt.Errorf("count %s: %w", table, err) - } - if count == 0 { - return 0, nil - } - if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil { - if isMissingRelationError(err) { - return 0, nil - } - return 0, fmt.Errorf("truncate %s: %w", table, err) - } - return count, nil -} - -// isMissingRelationError 判断 PG 报错是否为"表不存在",用于让清理任务在部分部署场景静默跳过。 -func isMissingRelationError(err error) bool { - if err == nil { - return false - } - s := strings.ToLower(err.Error()) - return strings.Contains(s, "does not exist") && strings.Contains(s, "relation") -} - func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { if s == nil { return nil, false @@ -433,7 +380,7 @@ func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration tim now := time.Now().UTC() durMs := duration.Milliseconds() result := truncateString(counts.String(), 2048) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, @@ -451,7 +398,7 @@ func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time. now := time.Now().UTC() durMs := duration.Milliseconds() msg := truncateString(err.Error(), 2048) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, diff --git a/backend/internal/service/ops_service.go b/backend/internal/service/ops_service.go index cd3974a0..11afc6f9 100644 --- a/backend/internal/service/ops_service.go +++ b/backend/internal/service/ops_service.go @@ -54,6 +54,24 @@ type OpsService struct { geminiCompatService *GeminiMessagesCompatService antigravityGatewayService *AntigravityGatewayService systemLogSink *OpsSystemLogSink + + // cleanupReloader 由 wire 在 OpsCleanupService 构造完成后通过 SetCleanupReloader 注入。 + // 解耦避免 OpsService -> OpsCleanupService 的硬依赖(cleanup 也读 settings,会循环)。 + cleanupReloader CleanupReloader +} + +// CleanupReloader 由 OpsCleanupService 实现。 +// UpdateOpsAdvancedSettings 写入新配置后调用 Reload,让 schedule/enabled 改动立刻生效。 +type CleanupReloader interface { + Reload(ctx context.Context) error +} + +// SetCleanupReloader 由 wire 注入 cleanup hook(构造期循环依赖的解耦点)。 +func (s *OpsService) SetCleanupReloader(r CleanupReloader) { + if s == nil { + return + } + s.cleanupReloader = r } func NewOpsService( diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index ecc3a94b..68c1d9dd 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "strings" "time" ) @@ -360,7 +361,7 @@ func defaultOpsAdvancedSettings() *OpsAdvancedSettings { return &OpsAdvancedSettings{ DataRetention: OpsDataRetentionSettings{ CleanupEnabled: false, - CleanupSchedule: "0 2 * * *", + CleanupSchedule: opsCleanupDefaultSchedule, ErrorLogRetentionDays: 30, MinuteMetricsRetentionDays: 30, HourlyMetricsRetentionDays: 30, @@ -385,7 +386,7 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) { } cfg.DataRetention.CleanupSchedule = strings.TrimSpace(cfg.DataRetention.CleanupSchedule) if cfg.DataRetention.CleanupSchedule == "" { - cfg.DataRetention.CleanupSchedule = "0 2 * * *" + cfg.DataRetention.CleanupSchedule = opsCleanupDefaultSchedule } // 保留天数:0 表示每次定时清理全部(清空所有),> 0 表示按天数保留; // 仅在拿到非法的负数时回填默认值,避免覆盖用户主动设的 0。 @@ -477,6 +478,14 @@ func (s *OpsService) UpdateOpsAdvancedSettings(ctx context.Context, cfg *OpsAdva return nil, err } + // notify cleanup service to reload schedule/enabled. + if s.cleanupReloader != nil { + if rerr := s.cleanupReloader.Reload(ctx); rerr != nil { + logger.LegacyPrintf("service.ops_settings", + "[OpsSettings] cleanup reload after advanced-settings update failed: %v", rerr) + } + } + updated := &OpsAdvancedSettings{} _ = json.Unmarshal(raw, updated) return updated, nil diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 8b50e478..0f36412b 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -271,15 +271,22 @@ func ProvideOpsAlertEvaluatorService( // ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled). // channelMonitorSvc 让维护任务(聚合 + 历史/聚合软删)跟随 ops 清理 cron 一起跑, // 共享 leader lock + heartbeat。 +// settingRepo 让 cleanup service 自己读 ops_advanced_settings.data_retention 覆盖 cfg; +// opsService 用来反向注入 cleanup hook,以便 UI 改清理设置时能 Reload cron。 func ProvideOpsCleanupService( opsRepo OpsRepository, db *sql.DB, redisClient *redis.Client, cfg *config.Config, channelMonitorSvc *ChannelMonitorService, + settingRepo SettingRepository, + opsService *OpsService, ) *OpsCleanupService { - svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg, channelMonitorSvc) + svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg, channelMonitorSvc, settingRepo) svc.Start() + if opsService != nil { + opsService.SetCleanupReloader(svc) + } return svc }