diff --git a/backend/internal/service/ops_cleanup_executor.go b/backend/internal/service/ops_cleanup_executor.go new file mode 100644 index 00000000..63a7367f --- /dev/null +++ b/backend/internal/service/ops_cleanup_executor.go @@ -0,0 +1,164 @@ +package service + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" +) + +const ( + opsCleanupDefaultSchedule = "0 2 * * *" + opsCleanupBatchSize = 5000 + opsCleanupCronStopTimeout = 3 * time.Second + opsCleanupRunTimeout = 30 * time.Minute + opsCleanupHeartbeatTimeout = 2 * time.Second +) + +type opsCleanupTarget struct { + retentionDays int + table string + timeCol string + castDate bool + counter *int64 +} + +type opsCleanupDeletedCounts struct { + errorLogs int64 + retryAttempts int64 + alertEvents int64 + systemLogs int64 + logAudits int64 + systemMetrics int64 + hourlyPreagg int64 + dailyPreagg int64 +} + +func (c opsCleanupDeletedCounts) String() string { + return fmt.Sprintf( + "error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", + c.errorLogs, + c.retryAttempts, + c.alertEvents, + c.systemLogs, + c.logAudits, + c.systemMetrics, + c.hourlyPreagg, + c.dailyPreagg, + ) +} + +// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。 +// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据 +// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true +// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天 +func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) { + if days < 0 { + return time.Time{}, false, false + } + if days == 0 { + return time.Time{}, true, true + } + return now.AddDate(0, 0, -days), false, true +} + +func opsCleanupRunOne( + ctx context.Context, + db *sql.DB, + truncate bool, + cutoff time.Time, + table, timeCol string, + castDate bool, + batchSize int, +) (int64, error) { + if truncate { + return truncateOpsTable(ctx, db, table) + } + return deleteOldRowsByID(ctx, db, table, timeCol, cutoff, batchSize, castDate) +} + +func deleteOldRowsByID( + ctx context.Context, + db *sql.DB, + table string, + timeColumn string, + cutoff time.Time, + batchSize int, + castCutoffToDate bool, +) (int64, error) { + if db == nil { + return 0, nil + } + if batchSize <= 0 { + batchSize = opsCleanupBatchSize + } + + where := fmt.Sprintf("%s < $1", timeColumn) + if castCutoffToDate { + where = fmt.Sprintf("%s < $1::date", timeColumn) + } + + q := fmt.Sprintf(` +WITH batch AS ( + SELECT id FROM %s + WHERE %s + ORDER BY id + LIMIT $2 +) +DELETE FROM %s +WHERE id IN (SELECT id FROM batch) +`, table, where, table) + + var total int64 + for { + res, err := db.ExecContext(ctx, q, cutoff, batchSize) + if err != nil { + if isMissingRelationError(err) { + return total, nil + } + return total, err + } + affected, err := res.RowsAffected() + if err != nil { + return total, err + } + total += affected + if affected == 0 { + break + } + } + return total, nil +} + +// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。 +func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) { + if db == nil { + return 0, nil + } + var count int64 + if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil { + if isMissingRelationError(err) { + return 0, nil + } + return 0, fmt.Errorf("count %s: %w", table, err) + } + if count == 0 { + return 0, nil + } + if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil { + if isMissingRelationError(err) { + return 0, nil + } + return 0, fmt.Errorf("truncate %s: %w", table, err) + } + return count, nil +} + +func isMissingRelationError(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + return strings.Contains(s, "does not exist") && strings.Contains(s, "relation") +} diff --git a/backend/internal/service/ops_cleanup_overlay_test.go b/backend/internal/service/ops_cleanup_overlay_test.go index 7ac19018..f751a426 100644 --- a/backend/internal/service/ops_cleanup_overlay_test.go +++ b/backend/internal/service/ops_cleanup_overlay_test.go @@ -194,3 +194,64 @@ func TestUpdateOpsAdvancedSettings_TriggersReload(t *testing.T) { t.Fatalf("expected reloader.Reload called once, got %d", reloader.calls) } } + +func TestReload_BeforeStart_IsNoop(t *testing.T) { + svc := &OpsCleanupService{} + if err := svc.Reload(context.Background()); err != nil { + t.Fatalf("Reload before Start should return nil, got %v", err) + } +} + +func TestReload_AfterStop_IsNoop(t *testing.T) { + svc := &OpsCleanupService{started: true, stopped: true} + if err := svc.Reload(context.Background()); err != nil { + t.Fatalf("Reload after Stop should return nil, got %v", err) + } +} + +func TestUpdateOpsAdvancedSettings_NilReloader_NoPanic(t *testing.T) { + repo := newRuntimeSettingRepoStub() + svc := &OpsService{settingRepo: repo} + // cleanupReloader intentionally nil + + cfg := defaultOpsAdvancedSettings() + cfg.DataRetention.ErrorLogRetentionDays = 7 + + // should not panic + if _, err := svc.UpdateOpsAdvancedSettings(context.Background(), cfg); err != nil { + t.Fatalf("update with nil reloader: %v", err) + } +} + +func TestStart_IdempotentSecondCall(t *testing.T) { + svc := &OpsCleanupService{started: true} + svc.Start() // second call should be noop, not panic +} + +func TestRefreshEffectiveBeforeRun_UpdatesSnapshot(t *testing.T) { + repo := newRuntimeSettingRepoStub() + base := config.OpsCleanupConfig{ + Enabled: true, + Schedule: "0 2 * * *", + ErrorLogRetentionDays: 30, + } + svc := makeOverlayService(repo, base) + svc.computeEffectiveLocked(context.Background()) + + if svc.effective.ErrorLogRetentionDays != 30 { + t.Fatalf("initial retention should be 30, got %d", svc.effective.ErrorLogRetentionDays) + } + + // simulate UI change + writeAdvancedSettings(t, repo, OpsDataRetentionSettings{ + CleanupEnabled: true, + CleanupSchedule: "0 * * * *", + ErrorLogRetentionDays: 7, + }) + + svc.refreshEffectiveBeforeRun(context.Background()) + snap := svc.snapshotEffective() + if snap.ErrorLogRetentionDays != 7 { + t.Fatalf("after refresh, retention should be 7, got %d", snap.ErrorLogRetentionDays) + } +} diff --git a/backend/internal/service/ops_cleanup_service.go b/backend/internal/service/ops_cleanup_service.go index 2ff2352b..60a690f3 100644 --- a/backend/internal/service/ops_cleanup_service.go +++ b/backend/internal/service/ops_cleanup_service.go @@ -129,7 +129,7 @@ func (s *OpsCleanupService) stopCronLocked() { ctx := s.cron.Stop() select { case <-ctx.Done(): - case <-time.After(3 * time.Second): + case <-time.After(opsCleanupCronStopTimeout): logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out") } s.cron = nil @@ -148,7 +148,7 @@ func (s *OpsCleanupService) applyScheduleLocked(ctx context.Context) error { schedule := strings.TrimSpace(s.effective.Schedule) if schedule == "" { - schedule = "0 2 * * *" + schedule = opsCleanupDefaultSchedule } loc := time.Local @@ -261,7 +261,7 @@ func (s *OpsCleanupService) runScheduled() { return } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupRunTimeout) defer cancel() // 让 retention 改动当次生效(schedule/enabled 改动需要 Reload)。 @@ -288,50 +288,6 @@ func (s *OpsCleanupService) runScheduled() { logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts) } -type opsCleanupDeletedCounts struct { - errorLogs int64 - retryAttempts int64 - alertEvents int64 - systemLogs int64 - logAudits int64 - systemMetrics int64 - hourlyPreagg int64 - dailyPreagg int64 -} - -func (c opsCleanupDeletedCounts) String() string { - return fmt.Sprintf( - "error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d", - c.errorLogs, - c.retryAttempts, - c.alertEvents, - c.systemLogs, - c.logAudits, - c.systemMetrics, - c.hourlyPreagg, - c.dailyPreagg, - ) -} - -// opsCleanupPlan 把"保留天数"翻译成具体的清理动作。 -// - days < 0 → 跳过该项清理(ok=false),保留兼容老数据 -// - days == 0 → TRUNCATE TABLE(O(1) 全清),truncate=true -// - days > 0 → 批量 DELETE 早于 now-N天 的行,cutoff = now - N 天 -// -// 之所以 days==0 走 TRUNCATE 而非"now+24h cutoff + DELETE": -// - 速度从 O(N) 降到 O(1),对百万行级表毫秒完成 -// - 无 WAL 写入、无后续 VACUUM 压力 -// - 这些 ops 表只有 cleanup 任务自己写,TRUNCATE 的 ACCESS EXCLUSIVE 锁影响可忽略 -func opsCleanupPlan(now time.Time, days int) (cutoff time.Time, truncate, ok bool) { - if days < 0 { - return time.Time{}, false, false - } - if days == 0 { - return time.Time{}, true, true - } - return now.AddDate(0, 0, -days), false, true -} - func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) { out := opsCleanupDeletedCounts{} if s == nil || s.db == nil || s.cfg == nil { @@ -339,75 +295,29 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet } effective := s.snapshotEffective() - - batchSize := 5000 - now := time.Now().UTC() - // runOne 把"truncate? cutoff? batched delete?"封装到一处, - // 让三组清理(错误日志类 / 分钟指标 / 小时+日预聚合)调用方只关心表名和列名。 - runOne := func(truncate bool, cutoff time.Time, table, timeCol string, castDate bool) (int64, error) { - if truncate { - return truncateOpsTable(ctx, s.db, table) - } - return deleteOldRowsByID(ctx, s.db, table, timeCol, cutoff, batchSize, castDate) + targets := []opsCleanupTarget{ + {effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs}, + {effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts}, + {effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents}, + {effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs}, + {effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits}, + {effective.MinuteMetricsRetentionDays, "ops_system_metrics", "created_at", false, &out.systemMetrics}, + {effective.HourlyMetricsRetentionDays, "ops_metrics_hourly", "bucket_start", false, &out.hourlyPreagg}, + {effective.HourlyMetricsRetentionDays, "ops_metrics_daily", "bucket_date", true, &out.dailyPreagg}, } - // Error-like tables: error logs / retry attempts / alert events / system logs / cleanup audits. - if cutoff, truncate, ok := opsCleanupPlan(now, effective.ErrorLogRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_error_logs", "created_at", false) + for _, t := range targets { + cutoff, truncate, ok := opsCleanupPlan(now, t.retentionDays) + if !ok { + continue + } + n, err := opsCleanupRunOne(ctx, s.db, truncate, cutoff, t.table, t.timeCol, t.castDate, opsCleanupBatchSize) if err != nil { return out, err } - out.errorLogs = n - - n, err = runOne(truncate, cutoff, "ops_retry_attempts", "created_at", false) - if err != nil { - return out, err - } - out.retryAttempts = n - - n, err = runOne(truncate, cutoff, "ops_alert_events", "created_at", false) - if err != nil { - return out, err - } - out.alertEvents = n - - n, err = runOne(truncate, cutoff, "ops_system_logs", "created_at", false) - if err != nil { - return out, err - } - out.systemLogs = n - - n, err = runOne(truncate, cutoff, "ops_system_log_cleanup_audits", "created_at", false) - if err != nil { - return out, err - } - out.logAudits = n - } - - // Minute-level metrics snapshots. - if cutoff, truncate, ok := opsCleanupPlan(now, effective.MinuteMetricsRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_system_metrics", "created_at", false) - if err != nil { - return out, err - } - out.systemMetrics = n - } - - // Pre-aggregation tables (hourly/daily). - if cutoff, truncate, ok := opsCleanupPlan(now, effective.HourlyMetricsRetentionDays); ok { - n, err := runOne(truncate, cutoff, "ops_metrics_hourly", "bucket_start", false) - if err != nil { - return out, err - } - out.hourlyPreagg = n - - n, err = runOne(truncate, cutoff, "ops_metrics_daily", "bucket_date", true) - if err != nil { - return out, err - } - out.dailyPreagg = n + *t.counter = n } // Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。 @@ -422,100 +332,6 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet return out, nil } -func deleteOldRowsByID( - ctx context.Context, - db *sql.DB, - table string, - timeColumn string, - cutoff time.Time, - batchSize int, - castCutoffToDate bool, -) (int64, error) { - if db == nil { - return 0, nil - } - if batchSize <= 0 { - batchSize = 5000 - } - - where := fmt.Sprintf("%s < $1", timeColumn) - if castCutoffToDate { - where = fmt.Sprintf("%s < $1::date", timeColumn) - } - - q := fmt.Sprintf(` -WITH batch AS ( - SELECT id FROM %s - WHERE %s - ORDER BY id - LIMIT $2 -) -DELETE FROM %s -WHERE id IN (SELECT id FROM batch) -`, table, where, table) - - var total int64 - for { - res, err := db.ExecContext(ctx, q, cutoff, batchSize) - if err != nil { - // If ops tables aren't present yet (partial deployments), treat as no-op. - if isMissingRelationError(err) { - return total, nil - } - return total, err - } - affected, err := res.RowsAffected() - if err != nil { - return total, err - } - total += affected - if affected == 0 { - break - } - } - return total, nil -} - -// truncateOpsTable 用 TRUNCATE TABLE 清空指定表,先 SELECT COUNT(*) 取得清空前行数用于 heartbeat。 -// -// 与 deleteOldRowsByID 的差异: -// - 不可指定 WHERE 条件,仅用于 days==0 的"清空全部"语义 -// - O(1) 释放表的物理存储页,毫秒级完成,无 WAL 写入、无 VACUUM 压力 -// - 需要 ACCESS EXCLUSIVE 锁,但 ops 表只有清理任务自己写入,瞬间锁影响可忽略 -// -// 表不存在(部分部署)静默返回 0,与 deleteOldRowsByID 保持一致。 -func truncateOpsTable(ctx context.Context, db *sql.DB, table string) (int64, error) { - if db == nil { - return 0, nil - } - var count int64 - if err := db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&count); err != nil { - if isMissingRelationError(err) { - return 0, nil - } - return 0, fmt.Errorf("count %s: %w", table, err) - } - if count == 0 { - return 0, nil - } - if _, err := db.ExecContext(ctx, fmt.Sprintf("TRUNCATE TABLE %s", table)); err != nil { - if isMissingRelationError(err) { - return 0, nil - } - return 0, fmt.Errorf("truncate %s: %w", table, err) - } - return count, nil -} - -// isMissingRelationError 判断 PG 报错是否为"表不存在",用于让清理任务在部分部署场景静默跳过。 -func isMissingRelationError(err error) bool { - if err == nil { - return false - } - s := strings.ToLower(err.Error()) - return strings.Contains(s, "does not exist") && strings.Contains(s, "relation") -} - func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) { if s == nil { return nil, false @@ -564,7 +380,7 @@ func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration tim now := time.Now().UTC() durMs := duration.Milliseconds() result := truncateString(counts.String(), 2048) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, @@ -582,7 +398,7 @@ func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time. now := time.Now().UTC() durMs := duration.Milliseconds() msg := truncateString(err.Error(), 2048) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout) defer cancel() _ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{ JobName: opsCleanupJobName, diff --git a/backend/internal/service/ops_settings.go b/backend/internal/service/ops_settings.go index ddb682c2..68c1d9dd 100644 --- a/backend/internal/service/ops_settings.go +++ b/backend/internal/service/ops_settings.go @@ -361,7 +361,7 @@ func defaultOpsAdvancedSettings() *OpsAdvancedSettings { return &OpsAdvancedSettings{ DataRetention: OpsDataRetentionSettings{ CleanupEnabled: false, - CleanupSchedule: "0 2 * * *", + CleanupSchedule: opsCleanupDefaultSchedule, ErrorLogRetentionDays: 30, MinuteMetricsRetentionDays: 30, HourlyMetricsRetentionDays: 30, @@ -386,7 +386,7 @@ func normalizeOpsAdvancedSettings(cfg *OpsAdvancedSettings) { } cfg.DataRetention.CleanupSchedule = strings.TrimSpace(cfg.DataRetention.CleanupSchedule) if cfg.DataRetention.CleanupSchedule == "" { - cfg.DataRetention.CleanupSchedule = "0 2 * * *" + cfg.DataRetention.CleanupSchedule = opsCleanupDefaultSchedule } // 保留天数:0 表示每次定时清理全部(清空所有),> 0 表示按天数保留; // 仅在拿到非法的负数时回填默认值,避免覆盖用户主动设的 0。