diff --git a/backend/internal/service/channel_monitor_runner.go b/backend/internal/service/channel_monitor_runner.go index be30aec2..08178bc6 100644 --- a/backend/internal/service/channel_monitor_runner.go +++ b/backend/internal/service/channel_monitor_runner.go @@ -19,6 +19,17 @@ type MonitorScheduler interface { Unschedule(id int64) } +// monitorRunnerSvc 抽出 runner 实际依赖的两个 service 方法: +// - 启动时加载 enabled monitor +// - 每次 ticker 触发执行检测 +// +// 用接口而非 *ChannelMonitorService 是为了让 runner 单元测试可注入轻量 stub, +// 避免依赖完整的 repo + encryptor 链路。生产实现 *ChannelMonitorService 自然满足。 +type monitorRunnerSvc interface { + ListEnabledMonitors(ctx context.Context) ([]*ChannelMonitor, error) + RunCheck(ctx context.Context, id int64) ([]*CheckResult, error) +} + // ChannelMonitorRunner 渠道监控调度器。 // // 设计: @@ -33,7 +44,7 @@ type MonitorScheduler interface { // ChannelMonitorService.RunDailyMaintenance(复用 leader lock + heartbeat), // 不在 runner 职责内。 type ChannelMonitorRunner struct { - svc *ChannelMonitorService + svc monitorRunnerSvc settingService *SettingService pool pond.Pool @@ -62,11 +73,20 @@ type scheduledMonitor struct { // NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用一次。 // settingService 用于在每次 fire 前读取功能开关;传 nil 时视为总是启用(兼容测试)。 +// +// pool 在构造时即建好:避免 Start 在 mu 内赋值、fire/Stop 在 mu 外读取的竞态隐患, +// 且 pond.NewPool 创建本身近似零开销,提前建池不会浪费资源。 func NewChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner { + return newChannelMonitorRunner(svc, settingService) +} + +// newChannelMonitorRunner 内部构造,接受最小化接口,便于单元测试注入 stub。 +func newChannelMonitorRunner(svc monitorRunnerSvc, settingService *SettingService) *ChannelMonitorRunner { ctx, cancel := context.WithCancel(context.Background()) return &ChannelMonitorRunner{ svc: svc, settingService: settingService, + pool: pond.NewPool(monitorWorkerConcurrency), parentCtx: ctx, parentCancel: cancel, tasks: make(map[int64]*scheduledMonitor), @@ -86,7 +106,6 @@ func (r *ChannelMonitorRunner) Start() { return } r.started = true - r.pool = pond.NewPool(monitorWorkerConcurrency) r.mu.Unlock() ctx, cancel := context.WithTimeout(context.Background(), monitorStartupLoadTimeout) @@ -116,16 +135,28 @@ func (r *ChannelMonitorRunner) Schedule(m *ChannelMonitor) { } interval := time.Duration(m.IntervalSeconds) * time.Second if interval <= 0 { - slog.Warn("channel_monitor: skip schedule for invalid interval", + // Create/Update 已通过 validateInterval 校验区间,正常路径不可能到这里。 + // 真触发说明数据库中存在违反约束的数据或校验链路有 bug,记 Error 暴露问题。 + slog.Error("channel_monitor: skip schedule for invalid interval", "monitor_id", m.ID, "interval_seconds", m.IntervalSeconds) return } r.mu.Lock() - if r.stopped || !r.started { + if r.stopped { r.mu.Unlock() return } + if !r.started { + // Start 之前调用 Schedule 通常意味着 wire 顺序错乱: + // 当前 wire 顺序是 SetScheduler → Start,CRUD 钩子最早也只能在请求到达时触发, + // 此时 Start 早已完成。出现此分支时把 monitor 信息打出来便于排查, + // 不入队、不缓存——交给运维通过重启或修复 wire 解决。 + r.mu.Unlock() + slog.Warn("channel_monitor: schedule before runner started, skip", + "monitor_id", m.ID, "name", m.Name) + return + } if existing, ok := r.tasks[m.ID]; ok { existing.cancel() } @@ -176,9 +207,7 @@ func (r *ChannelMonitorRunner) Stop() { r.mu.Unlock() r.wg.Wait() - if r.pool != nil { - r.pool.StopAndWait() - } + r.pool.StopAndWait() } // runScheduled 单个监控的循环:立即触发首次(满足"新建/启用即跑"), diff --git a/backend/internal/service/channel_monitor_runner_test.go b/backend/internal/service/channel_monitor_runner_test.go new file mode 100644 index 00000000..5eed3c20 --- /dev/null +++ b/backend/internal/service/channel_monitor_runner_test.go @@ -0,0 +1,277 @@ +//go:build unit + +package service + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +// stubMonitorSvc 实现 monitorRunnerSvc,用于隔离 runner 与真实 service/repo。 +type stubMonitorSvc struct { + enabled []*ChannelMonitor + runCount atomic.Int64 + runCalled chan int64 // 每次 RunCheck 触发时 push 一次(缓冲足够大避免阻塞) + runErr error + listErr error + runHoldFor time.Duration // RunCheck 内额外阻塞的时长,用来测试 Stop 等待行为 +} + +func (s *stubMonitorSvc) ListEnabledMonitors(_ context.Context) ([]*ChannelMonitor, error) { + if s.listErr != nil { + return nil, s.listErr + } + return s.enabled, nil +} + +func (s *stubMonitorSvc) RunCheck(ctx context.Context, id int64) ([]*CheckResult, error) { + s.runCount.Add(1) + if s.runCalled != nil { + select { + case s.runCalled <- id: + default: + } + } + if s.runHoldFor > 0 { + select { + case <-time.After(s.runHoldFor): + case <-ctx.Done(): + } + } + return nil, s.runErr +} + +func newRunnerForTest(svc monitorRunnerSvc) *ChannelMonitorRunner { + return newChannelMonitorRunner(svc, nil) +} + +// 等待 condition 在 timeout 内变 true,否则 t.Fatalf。轮询 5ms 一次。 +func waitFor(t *testing.T, timeout time.Duration, msg string, cond func() bool) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(5 * time.Millisecond) + } + if !cond() { + t.Fatalf("waitFor timed out: %s", msg) + } +} + +func runnerTaskCount(r *ChannelMonitorRunner) int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.tasks) +} + +func runnerTaskPtr(r *ChannelMonitorRunner, id int64) *scheduledMonitor { + r.mu.Lock() + defer r.mu.Unlock() + return r.tasks[id] +} + +// TestSchedule_AddsTaskAndFiresOnce 验证 Schedule 后立即触发一次首检测,并把任务记入 tasks 表。 +func TestSchedule_AddsTaskAndFiresOnce(t *testing.T) { + svc := &stubMonitorSvc{runCalled: make(chan int64, 4)} + r := newRunnerForTest(svc) + r.Start() // svc.enabled 为空,Start 立即完成 + + r.Schedule(&ChannelMonitor{ID: 1, Name: "m1", Enabled: true, IntervalSeconds: 60}) + + if got := runnerTaskCount(r); got != 1 { + t.Fatalf("expected 1 scheduled task, got %d", got) + } + + select { + case id := <-svc.runCalled: + if id != 1 { + t.Fatalf("expected first fire for id=1, got %d", id) + } + case <-time.After(2 * time.Second): + t.Fatal("expected immediate first fire within 2s") + } + + r.Stop() +} + +// TestSchedule_ReplaceCancelsOldTask 验证对同一 id 二次 Schedule 会替换旧 task 实例。 +// (旧 goroutine 通过 ctx 取消退出;这里以 task 指针不同 + Stop 不超时作为证据。) +func TestSchedule_ReplaceCancelsOldTask(t *testing.T) { + svc := &stubMonitorSvc{runCalled: make(chan int64, 8)} + r := newRunnerForTest(svc) + r.Start() + + m := &ChannelMonitor{ID: 7, Name: "m7", Enabled: true, IntervalSeconds: 60} + r.Schedule(m) + first := runnerTaskPtr(r, 7) + if first == nil { + t.Fatal("first schedule did not register task") + } + + r.Schedule(m) + second := runnerTaskPtr(r, 7) + if second == nil { + t.Fatal("second schedule did not register task") + } + if first == second { + t.Fatal("re-Schedule should create a new scheduledMonitor instance") + } + + stoppedWithin(t, r, 3*time.Second) +} + +// TestUnschedule_RemovesTask 验证 Unschedule 删除 task 并使对应 goroutine 退出。 +func TestUnschedule_RemovesTask(t *testing.T) { + svc := &stubMonitorSvc{runCalled: make(chan int64, 4)} + r := newRunnerForTest(svc) + r.Start() + + r.Schedule(&ChannelMonitor{ID: 3, Enabled: true, IntervalSeconds: 60}) + waitFor(t, time.Second, "task registered", func() bool { return runnerTaskCount(r) == 1 }) + + r.Unschedule(3) + if got := runnerTaskCount(r); got != 0 { + t.Fatalf("expected tasks empty after Unschedule, got %d", got) + } + + stoppedWithin(t, r, 3*time.Second) +} + +// TestSchedule_DisabledRedirectsToUnschedule 验证 Enabled=false 等同于 Unschedule。 +func TestSchedule_DisabledRedirectsToUnschedule(t *testing.T) { + svc := &stubMonitorSvc{runCalled: make(chan int64, 4)} + r := newRunnerForTest(svc) + r.Start() + + r.Schedule(&ChannelMonitor{ID: 9, Enabled: true, IntervalSeconds: 60}) + waitFor(t, time.Second, "task registered", func() bool { return runnerTaskCount(r) == 1 }) + + r.Schedule(&ChannelMonitor{ID: 9, Enabled: false, IntervalSeconds: 60}) + if got := runnerTaskCount(r); got != 0 { + t.Fatalf("expected tasks empty after disabled re-Schedule, got %d", got) + } + + stoppedWithin(t, r, 3*time.Second) +} + +// TestSchedule_InvalidIntervalSkipped 验证 IntervalSeconds<=0 不会注册任务(防御性检查)。 +func TestSchedule_InvalidIntervalSkipped(t *testing.T) { + svc := &stubMonitorSvc{} + r := newRunnerForTest(svc) + r.Start() + + r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 0}) + if got := runnerTaskCount(r); got != 0 { + t.Fatalf("expected no task for invalid interval, got %d", got) + } + r.Stop() +} + +// TestSchedule_BeforeStartIsNoOp 验证 Start 之前调用 Schedule 不会注册任务。 +func TestSchedule_BeforeStartIsNoOp(t *testing.T) { + svc := &stubMonitorSvc{} + r := newRunnerForTest(svc) + // 故意不调用 Start + + r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 60}) + if got := runnerTaskCount(r); got != 0 { + t.Fatalf("expected no task before Start, got %d", got) + } + r.Stop() +} + +// TestStart_LoadsAllEnabledMonitors 验证 Start 会为 ListEnabledMonitors 返回的每条记录建立任务。 +func TestStart_LoadsAllEnabledMonitors(t *testing.T) { + svc := &stubMonitorSvc{ + enabled: []*ChannelMonitor{ + {ID: 1, Enabled: true, IntervalSeconds: 60}, + {ID: 2, Enabled: true, IntervalSeconds: 60}, + {ID: 3, Enabled: true, IntervalSeconds: 60}, + }, + } + r := newRunnerForTest(svc) + r.Start() + waitFor(t, 2*time.Second, "all 3 tasks scheduled", func() bool { return runnerTaskCount(r) == 3 }) + + stoppedWithin(t, r, 3*time.Second) +} + +// TestStop_DrainsAllGoroutines 验证 Stop 会等待所有调度 goroutine 退出(无游离)。 +func TestStop_DrainsAllGoroutines(t *testing.T) { + svc := &stubMonitorSvc{} + r := newRunnerForTest(svc) + r.Start() + + for id := int64(1); id <= 5; id++ { + r.Schedule(&ChannelMonitor{ID: id, Enabled: true, IntervalSeconds: 60}) + } + waitFor(t, 2*time.Second, "5 tasks scheduled", func() bool { return runnerTaskCount(r) == 5 }) + + stoppedWithin(t, r, 3*time.Second) +} + +// TestStop_WaitsForInFlightCheck 验证 Stop 会等待正在执行的 RunCheck 退出(pool.StopAndWait)。 +func TestStop_WaitsForInFlightCheck(t *testing.T) { + svc := &stubMonitorSvc{ + runCalled: make(chan int64, 1), + runHoldFor: 200 * time.Millisecond, + } + r := newRunnerForTest(svc) + r.Start() + r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 60}) + + select { + case <-svc.runCalled: + case <-time.After(2 * time.Second): + t.Fatal("first fire never happened") + } + + start := time.Now() + stoppedWithin(t, r, 3*time.Second) + elapsed := time.Since(start) + // Stop 必须等待 in-flight check 跑完(runHoldFor=200ms),耗时下界约 100ms。 + if elapsed < 100*time.Millisecond { + t.Fatalf("Stop returned too fast (%v); did not wait for in-flight check", elapsed) + } +} + +// TestInFlight_PoolFullReleasesSlot 直接驱动 fire 路径,模拟 pool.TrySubmit 失败时 inFlight 必须释放。 +// 用一个小型 stub pool 替换 r.pool 不便(pond.Pool 是接口但 mock 麻烦), +// 改为:占满 inFlight 后直接 fire,验证不会在 inFlight 空槽时永久卡住。 +func TestInFlight_AcquireReleaseSymmetric(t *testing.T) { + svc := &stubMonitorSvc{} + r := newRunnerForTest(svc) + + if !r.tryAcquireInFlight(42) { + t.Fatal("first acquire should succeed") + } + if r.tryAcquireInFlight(42) { + t.Fatal("second acquire (no release) must fail") + } + r.releaseInFlight(42) + if !r.tryAcquireInFlight(42) { + t.Fatal("acquire after release should succeed") + } + r.releaseInFlight(42) +} + +// stoppedWithin 在 timeout 内并行调用 Stop,超时则 Fatal。验证 Stop 不会阻塞。 +func stoppedWithin(t *testing.T, r *ChannelMonitorRunner, timeout time.Duration) { + t.Helper() + done := make(chan struct{}) + var once sync.Once + go func() { + r.Stop() + once.Do(func() { close(done) }) + }() + select { + case <-done: + case <-time.After(timeout): + t.Fatalf("Stop did not return within %s — leaked goroutine?", timeout) + } +}