Files
sub2api-ht/backend/internal/service/ops_cleanup_service.go
erio d218b6c2aa refactor(ops-cleanup): 拆分 executor + table-driven + 提取常量 + 补测试
代码审查反馈:

1. 文件行数超标:ops_cleanup_service.go 594→413 行。
   拆 opsCleanupPlan / deleteOldRowsByID / truncateOpsTable / isMissingRelationError
   + counts struct 到 ops_cleanup_executor.go (164 行)。

2. runCleanupOnce 89 行→30 行(table-driven):
   用 []opsCleanupTarget 循环替代三组重复的 opsCleanupPlan → runOne → assign。

3. 魔法值提取常量:
   opsCleanupDefaultSchedule / opsCleanupBatchSize / opsCleanupCronStopTimeout /
   opsCleanupRunTimeout / opsCleanupHeartbeatTimeout。
   ops_settings.go 中 "0 2 * * *" 也统一引用 opsCleanupDefaultSchedule。

4. 补 5 个缺失测试:
   - Reload 未 Start 时 no-op
   - Reload 已 Stop 后 no-op
   - cleanupReloader==nil 时 Update 不 panic
   - Start 重复调用幂等
   - refreshEffectiveBeforeRun 正确更新 snapshot
2026-05-04 13:35:26 +08:00

411 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
const (
opsCleanupJobName = "ops_cleanup"
opsCleanupLeaderLockKeyDefault = "ops:cleanup:leader"
opsCleanupLeaderLockTTLDefault = 30 * time.Minute
)
var opsCleanupCronParser = cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
var opsCleanupReleaseScript = redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
// OpsCleanupService periodically deletes old ops data to prevent unbounded DB growth.
//
// - Scheduling: 5-field cron spec (minute hour dom month dow).
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
// - Safety: deletes in batches to avoid long transactions.
//
// 附带:在 runCleanupOnce 末尾调用 ChannelMonitorService.RunDailyMaintenance
// 统一共享 cron schedule + leader lock + heartbeat避免再引一套调度。
type OpsCleanupService struct {
opsRepo OpsRepository
db *sql.DB
redisClient *redis.Client
cfg *config.Config
channelMonitorSvc *ChannelMonitorService
settingRepo SettingRepository
instanceID string
// mu 守护 cron 实例切换 + effective 配置切换。
// 这里不再用 startOnce/stopOnce是因为 Reload 需要"停旧 cron 重启新 cron"
// 而 Once 一旦触发就无法再次执行;改为 started/stopped 布尔配合 mu。
mu sync.Mutex
cron *cron.Cron
started bool
stopped bool
effective config.OpsCleanupConfig
warnNoRedisOnce sync.Once
}
func NewOpsCleanupService(
opsRepo OpsRepository,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
channelMonitorSvc *ChannelMonitorService,
settingRepo SettingRepository,
) *OpsCleanupService {
return &OpsCleanupService{
opsRepo: opsRepo,
db: db,
redisClient: redisClient,
cfg: cfg,
channelMonitorSvc: channelMonitorSvc,
settingRepo: settingRepo,
instanceID: uuid.NewString(),
}
}
// Start 首次启动 cron 调度。Enabled / Schedule 由 effective 配置决定settings 优先 cfg
// 重复调用幂等。
func (s *OpsCleanupService) Start() {
if s == nil {
return
}
if s.cfg != nil && !s.cfg.Ops.Enabled {
return
}
if s.opsRepo == nil || s.db == nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started (missing deps)")
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.started || s.stopped {
return
}
s.started = true
if err := s.applyScheduleLocked(context.Background()); err != nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] not started: %v", err)
}
}
// Stop 关闭 cron。幂等。
func (s *OpsCleanupService) Stop() {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.stopped {
return
}
s.stopped = true
s.stopCronLocked()
}
// stopCronLocked 停掉当前 cron 实例(带 3s 超时)。调用方持锁。
func (s *OpsCleanupService) stopCronLocked() {
if s.cron == nil {
return
}
ctx := s.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(opsCleanupCronStopTimeout):
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron stop timed out")
}
s.cron = nil
}
// applyScheduleLocked 重新计算 effective 配置并按其 schedule 重建 cron。调用方持锁。
// 若 effective.Enabled=false用户在 UI 关闭清理),停旧 cron 后直接返回,不创建新 cron。
func (s *OpsCleanupService) applyScheduleLocked(ctx context.Context) error {
s.computeEffectiveLocked(ctx)
s.stopCronLocked()
if !s.effective.Enabled {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cron disabled by settings")
return nil
}
schedule := strings.TrimSpace(s.effective.Schedule)
if schedule == "" {
schedule = opsCleanupDefaultSchedule
}
loc := time.Local
if s.cfg != nil && strings.TrimSpace(s.cfg.Timezone) != "" {
if parsed, err := time.LoadLocation(strings.TrimSpace(s.cfg.Timezone)); err == nil && parsed != nil {
loc = parsed
}
}
c := cron.New(cron.WithParser(opsCleanupCronParser), cron.WithLocation(loc))
if _, err := c.AddFunc(schedule, func() { s.runScheduled() }); err != nil {
return fmt.Errorf("invalid schedule %q: %w", schedule, err)
}
c.Start()
s.cron = c
logger.LegacyPrintf("service.ops_cleanup",
"[OpsCleanup] scheduled (schedule=%q tz=%s retention_days=err:%d/min:%d/hour:%d)",
schedule, loc.String(),
s.effective.ErrorLogRetentionDays,
s.effective.MinuteMetricsRetentionDays,
s.effective.HourlyMetricsRetentionDays,
)
return nil
}
// Reload 重新读取 ops_advanced_settings.data_retention 并按新配置重建 cron。
// 适用于 admin 在 UI 修改清理设置后立即生效schedule / enabled 改动需要 Reload
// retention 改动 runScheduled 顶部也会刷新,下一次触发即生效)。
// 若 service 还未 Start 或已 StopReload 不做任何事。
func (s *OpsCleanupService) Reload(ctx context.Context) error {
if s == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if !s.started || s.stopped {
return nil
}
return s.applyScheduleLocked(ctx)
}
// computeEffectiveLocked 计算"生效配置"并写入 s.effective。调用方持锁。
//
// 优先级UI 写入的 settings.ops_advanced_settings.data_retention权威覆盖 cfg.Ops.Cleanup 的副本。
// - Enabledsettings 直接覆盖
// - Schedulesettings 非空时覆盖,否则保留 cfg
// - *RetentionDayssettings >=0 时覆盖(包括 0=TRUNCATE<0 沿用 cfg
//
// 若 settings 表无该 keyErrSettingNotFound或解析失败整体 fallback 到 cfg.Ops.Cleanup。
func (s *OpsCleanupService) computeEffectiveLocked(ctx context.Context) {
base := config.OpsCleanupConfig{}
if s.cfg != nil {
base = s.cfg.Ops.Cleanup
}
defer func() { s.effective = base }()
if s.settingRepo == nil {
return
}
if ctx == nil {
ctx = context.Background()
}
raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsAdvancedSettings)
if err != nil {
if !errors.Is(err, ErrSettingNotFound) {
logger.LegacyPrintf("service.ops_cleanup",
"[OpsCleanup] read advanced settings failed, using cfg: %v", err)
}
return
}
var adv OpsAdvancedSettings
if err := json.Unmarshal([]byte(raw), &adv); err != nil {
logger.LegacyPrintf("service.ops_cleanup",
"[OpsCleanup] parse advanced settings failed, using cfg: %v", err)
return
}
dr := adv.DataRetention
base.Enabled = dr.CleanupEnabled
if sched := strings.TrimSpace(dr.CleanupSchedule); sched != "" {
base.Schedule = sched
}
if dr.ErrorLogRetentionDays >= 0 {
base.ErrorLogRetentionDays = dr.ErrorLogRetentionDays
}
if dr.MinuteMetricsRetentionDays >= 0 {
base.MinuteMetricsRetentionDays = dr.MinuteMetricsRetentionDays
}
if dr.HourlyMetricsRetentionDays >= 0 {
base.HourlyMetricsRetentionDays = dr.HourlyMetricsRetentionDays
}
}
// snapshotEffective 取一份 effective 副本runCleanupOnce 等读路径使用)。
func (s *OpsCleanupService) snapshotEffective() config.OpsCleanupConfig {
s.mu.Lock()
defer s.mu.Unlock()
return s.effective
}
// refreshEffectiveBeforeRun 在 cron 触发时刷新 effective让 retention 改动当次即生效。
// schedule 改动不影响当次cron 调度由库管理,需要 Reload 才换 schedule
func (s *OpsCleanupService) refreshEffectiveBeforeRun(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()
s.computeEffectiveLocked(ctx)
}
func (s *OpsCleanupService) runScheduled() {
if s == nil || s.db == nil || s.opsRepo == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupRunTimeout)
defer cancel()
// 让 retention 改动当次生效schedule/enabled 改动需要 Reload
s.refreshEffectiveBeforeRun(ctx)
release, ok := s.tryAcquireLeaderLock(ctx)
if !ok {
return
}
if release != nil {
defer release()
}
startedAt := time.Now().UTC()
runAt := startedAt
counts, err := s.runCleanupOnce(ctx)
if err != nil {
s.recordHeartbeatError(runAt, time.Since(startedAt), err)
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup failed: %v", err)
return
}
s.recordHeartbeatSuccess(runAt, time.Since(startedAt), counts)
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] cleanup complete: %s", counts)
}
func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDeletedCounts, error) {
out := opsCleanupDeletedCounts{}
if s == nil || s.db == nil || s.cfg == nil {
return out, nil
}
effective := s.snapshotEffective()
now := time.Now().UTC()
targets := []opsCleanupTarget{
{effective.ErrorLogRetentionDays, "ops_error_logs", "created_at", false, &out.errorLogs},
{effective.ErrorLogRetentionDays, "ops_retry_attempts", "created_at", false, &out.retryAttempts},
{effective.ErrorLogRetentionDays, "ops_alert_events", "created_at", false, &out.alertEvents},
{effective.ErrorLogRetentionDays, "ops_system_logs", "created_at", false, &out.systemLogs},
{effective.ErrorLogRetentionDays, "ops_system_log_cleanup_audits", "created_at", false, &out.logAudits},
{effective.MinuteMetricsRetentionDays, "ops_system_metrics", "created_at", false, &out.systemMetrics},
{effective.HourlyMetricsRetentionDays, "ops_metrics_hourly", "bucket_start", false, &out.hourlyPreagg},
{effective.HourlyMetricsRetentionDays, "ops_metrics_daily", "bucket_date", true, &out.dailyPreagg},
}
for _, t := range targets {
cutoff, truncate, ok := opsCleanupPlan(now, t.retentionDays)
if !ok {
continue
}
n, err := opsCleanupRunOne(ctx, s.db, truncate, cutoff, t.table, t.timeCol, t.castDate, opsCleanupBatchSize)
if err != nil {
return out, err
}
*t.counter = n
}
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
// 失败只记日志,不影响 ops 清理的成功状态(与 ops 各步骤风格一致);
// 维护本身已经把每步错误打到 slogheartbeat result 不再分项记录。
if s.channelMonitorSvc != nil {
if err := s.channelMonitorSvc.RunDailyMaintenance(ctx); err != nil {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] channel monitor maintenance failed: %v", err)
}
}
return out, nil
}
func (s *OpsCleanupService) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
if s == nil {
return nil, false
}
// In simple run mode, assume single instance.
if s.cfg != nil && s.cfg.RunMode == config.RunModeSimple {
return nil, true
}
key := opsCleanupLeaderLockKeyDefault
ttl := opsCleanupLeaderLockTTLDefault
// Prefer Redis leader lock when available, but avoid stampeding the DB when Redis is flaky by
// falling back to a DB advisory lock.
if s.redisClient != nil {
ok, err := s.redisClient.SetNX(ctx, key, s.instanceID, ttl).Result()
if err == nil {
if !ok {
return nil, false
}
return func() {
_, _ = opsCleanupReleaseScript.Run(ctx, s.redisClient, []string{key}, s.instanceID).Result()
}, true
}
// Redis error: fall back to DB advisory lock.
s.warnNoRedisOnce.Do(func() {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] leader lock SetNX failed; falling back to DB advisory lock: %v", err)
})
} else {
s.warnNoRedisOnce.Do(func() {
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] redis not configured; using DB advisory lock")
})
}
release, ok := tryAcquireDBAdvisoryLock(ctx, s.db, hashAdvisoryLockID(key))
if !ok {
return nil, false
}
return release, true
}
func (s *OpsCleanupService) recordHeartbeatSuccess(runAt time.Time, duration time.Duration, counts opsCleanupDeletedCounts) {
if s == nil || s.opsRepo == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
result := truncateString(counts.String(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastSuccessAt: &now,
LastDurationMs: &durMs,
LastResult: &result,
})
}
func (s *OpsCleanupService) recordHeartbeatError(runAt time.Time, duration time.Duration, err error) {
if s == nil || s.opsRepo == nil || err == nil {
return
}
now := time.Now().UTC()
durMs := duration.Milliseconds()
msg := truncateString(err.Error(), 2048)
ctx, cancel := context.WithTimeout(context.Background(), opsCleanupHeartbeatTimeout)
defer cancel()
_ = s.opsRepo.UpsertJobHeartbeat(ctx, &OpsUpsertJobHeartbeatInput{
JobName: opsCleanupJobName,
LastRunAt: &runAt,
LastErrorAt: &now,
LastError: &msg,
LastDurationMs: &durMs,
})
}