refactor(channel-monitor): tighten runner lifecycle + add unit tests
- pool 改在 NewChannelMonitorRunner 构造时初始化,消除 Start 在 mu 内
赋值、fire/Stop 在 mu 外读取的竞态隐患
- Schedule 在 !started 时由静默 return 改为 slog.Warn,错过的调度可见
- Schedule 在 interval<=0 时升为 slog.Error:Create/Update validateInterval
已保证不可达,真触发即数据/校验链 bug
- 抽出 monitorRunnerSvc 内部接口(仅 ListEnabledMonitors+RunCheck),
生产 *ChannelMonitorService 自然满足;runner 单元测试可注入轻量 stub
- 新增 channel_monitor_runner_test.go(10 个用例,//go:build unit):
覆盖 Schedule/Unschedule/Start/Stop 生命周期、in-flight 槽对称释放、
Stop 等待正在执行的 RunCheck 退出(无游离 goroutine)
启动失败的恢复策略:保持现状(log+return)。CLAUDE.md 明确"配置应保证启动
成功(必填项校验+正确数据校验)",validate{Provider,Interval,Endpoint,
APIKey,PrimaryModel} 已在 Create/Update 全部覆盖;DB 不可用是基础设施问题,
不该靠应用层无限重试兜底。
This commit is contained in:
@@ -19,6 +19,17 @@ type MonitorScheduler interface {
|
||||
Unschedule(id int64)
|
||||
}
|
||||
|
||||
// monitorRunnerSvc 抽出 runner 实际依赖的两个 service 方法:
|
||||
// - 启动时加载 enabled monitor
|
||||
// - 每次 ticker 触发执行检测
|
||||
//
|
||||
// 用接口而非 *ChannelMonitorService 是为了让 runner 单元测试可注入轻量 stub,
|
||||
// 避免依赖完整的 repo + encryptor 链路。生产实现 *ChannelMonitorService 自然满足。
|
||||
type monitorRunnerSvc interface {
|
||||
ListEnabledMonitors(ctx context.Context) ([]*ChannelMonitor, error)
|
||||
RunCheck(ctx context.Context, id int64) ([]*CheckResult, error)
|
||||
}
|
||||
|
||||
// ChannelMonitorRunner 渠道监控调度器。
|
||||
//
|
||||
// 设计:
|
||||
@@ -33,7 +44,7 @@ type MonitorScheduler interface {
|
||||
// ChannelMonitorService.RunDailyMaintenance(复用 leader lock + heartbeat),
|
||||
// 不在 runner 职责内。
|
||||
type ChannelMonitorRunner struct {
|
||||
svc *ChannelMonitorService
|
||||
svc monitorRunnerSvc
|
||||
settingService *SettingService
|
||||
|
||||
pool pond.Pool
|
||||
@@ -62,11 +73,20 @@ type scheduledMonitor struct {
|
||||
|
||||
// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用一次。
|
||||
// settingService 用于在每次 fire 前读取功能开关;传 nil 时视为总是启用(兼容测试)。
|
||||
//
|
||||
// pool 在构造时即建好:避免 Start 在 mu 内赋值、fire/Stop 在 mu 外读取的竞态隐患,
|
||||
// 且 pond.NewPool 创建本身近似零开销,提前建池不会浪费资源。
|
||||
func NewChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner {
|
||||
return newChannelMonitorRunner(svc, settingService)
|
||||
}
|
||||
|
||||
// newChannelMonitorRunner 内部构造,接受最小化接口,便于单元测试注入 stub。
|
||||
func newChannelMonitorRunner(svc monitorRunnerSvc, settingService *SettingService) *ChannelMonitorRunner {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &ChannelMonitorRunner{
|
||||
svc: svc,
|
||||
settingService: settingService,
|
||||
pool: pond.NewPool(monitorWorkerConcurrency),
|
||||
parentCtx: ctx,
|
||||
parentCancel: cancel,
|
||||
tasks: make(map[int64]*scheduledMonitor),
|
||||
@@ -86,7 +106,6 @@ func (r *ChannelMonitorRunner) Start() {
|
||||
return
|
||||
}
|
||||
r.started = true
|
||||
r.pool = pond.NewPool(monitorWorkerConcurrency)
|
||||
r.mu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), monitorStartupLoadTimeout)
|
||||
@@ -116,16 +135,28 @@ func (r *ChannelMonitorRunner) Schedule(m *ChannelMonitor) {
|
||||
}
|
||||
interval := time.Duration(m.IntervalSeconds) * time.Second
|
||||
if interval <= 0 {
|
||||
slog.Warn("channel_monitor: skip schedule for invalid interval",
|
||||
// Create/Update 已通过 validateInterval 校验区间,正常路径不可能到这里。
|
||||
// 真触发说明数据库中存在违反约束的数据或校验链路有 bug,记 Error 暴露问题。
|
||||
slog.Error("channel_monitor: skip schedule for invalid interval",
|
||||
"monitor_id", m.ID, "interval_seconds", m.IntervalSeconds)
|
||||
return
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
if r.stopped || !r.started {
|
||||
if r.stopped {
|
||||
r.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if !r.started {
|
||||
// Start 之前调用 Schedule 通常意味着 wire 顺序错乱:
|
||||
// 当前 wire 顺序是 SetScheduler → Start,CRUD 钩子最早也只能在请求到达时触发,
|
||||
// 此时 Start 早已完成。出现此分支时把 monitor 信息打出来便于排查,
|
||||
// 不入队、不缓存——交给运维通过重启或修复 wire 解决。
|
||||
r.mu.Unlock()
|
||||
slog.Warn("channel_monitor: schedule before runner started, skip",
|
||||
"monitor_id", m.ID, "name", m.Name)
|
||||
return
|
||||
}
|
||||
if existing, ok := r.tasks[m.ID]; ok {
|
||||
existing.cancel()
|
||||
}
|
||||
@@ -176,9 +207,7 @@ func (r *ChannelMonitorRunner) Stop() {
|
||||
r.mu.Unlock()
|
||||
|
||||
r.wg.Wait()
|
||||
if r.pool != nil {
|
||||
r.pool.StopAndWait()
|
||||
}
|
||||
r.pool.StopAndWait()
|
||||
}
|
||||
|
||||
// runScheduled 单个监控的循环:立即触发首次(满足"新建/启用即跑"),
|
||||
|
||||
277
backend/internal/service/channel_monitor_runner_test.go
Normal file
277
backend/internal/service/channel_monitor_runner_test.go
Normal file
@@ -0,0 +1,277 @@
|
||||
//go:build unit
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// stubMonitorSvc 实现 monitorRunnerSvc,用于隔离 runner 与真实 service/repo。
|
||||
type stubMonitorSvc struct {
|
||||
enabled []*ChannelMonitor
|
||||
runCount atomic.Int64
|
||||
runCalled chan int64 // 每次 RunCheck 触发时 push 一次(缓冲足够大避免阻塞)
|
||||
runErr error
|
||||
listErr error
|
||||
runHoldFor time.Duration // RunCheck 内额外阻塞的时长,用来测试 Stop 等待行为
|
||||
}
|
||||
|
||||
func (s *stubMonitorSvc) ListEnabledMonitors(_ context.Context) ([]*ChannelMonitor, error) {
|
||||
if s.listErr != nil {
|
||||
return nil, s.listErr
|
||||
}
|
||||
return s.enabled, nil
|
||||
}
|
||||
|
||||
func (s *stubMonitorSvc) RunCheck(ctx context.Context, id int64) ([]*CheckResult, error) {
|
||||
s.runCount.Add(1)
|
||||
if s.runCalled != nil {
|
||||
select {
|
||||
case s.runCalled <- id:
|
||||
default:
|
||||
}
|
||||
}
|
||||
if s.runHoldFor > 0 {
|
||||
select {
|
||||
case <-time.After(s.runHoldFor):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
return nil, s.runErr
|
||||
}
|
||||
|
||||
func newRunnerForTest(svc monitorRunnerSvc) *ChannelMonitorRunner {
|
||||
return newChannelMonitorRunner(svc, nil)
|
||||
}
|
||||
|
||||
// 等待 condition 在 timeout 内变 true,否则 t.Fatalf。轮询 5ms 一次。
|
||||
func waitFor(t *testing.T, timeout time.Duration, msg string, cond func() bool) {
|
||||
t.Helper()
|
||||
deadline := time.Now().Add(timeout)
|
||||
for time.Now().Before(deadline) {
|
||||
if cond() {
|
||||
return
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
if !cond() {
|
||||
t.Fatalf("waitFor timed out: %s", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func runnerTaskCount(r *ChannelMonitorRunner) int {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return len(r.tasks)
|
||||
}
|
||||
|
||||
func runnerTaskPtr(r *ChannelMonitorRunner, id int64) *scheduledMonitor {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return r.tasks[id]
|
||||
}
|
||||
|
||||
// TestSchedule_AddsTaskAndFiresOnce 验证 Schedule 后立即触发一次首检测,并把任务记入 tasks 表。
|
||||
func TestSchedule_AddsTaskAndFiresOnce(t *testing.T) {
|
||||
svc := &stubMonitorSvc{runCalled: make(chan int64, 4)}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start() // svc.enabled 为空,Start 立即完成
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 1, Name: "m1", Enabled: true, IntervalSeconds: 60})
|
||||
|
||||
if got := runnerTaskCount(r); got != 1 {
|
||||
t.Fatalf("expected 1 scheduled task, got %d", got)
|
||||
}
|
||||
|
||||
select {
|
||||
case id := <-svc.runCalled:
|
||||
if id != 1 {
|
||||
t.Fatalf("expected first fire for id=1, got %d", id)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("expected immediate first fire within 2s")
|
||||
}
|
||||
|
||||
r.Stop()
|
||||
}
|
||||
|
||||
// TestSchedule_ReplaceCancelsOldTask 验证对同一 id 二次 Schedule 会替换旧 task 实例。
|
||||
// (旧 goroutine 通过 ctx 取消退出;这里以 task 指针不同 + Stop 不超时作为证据。)
|
||||
func TestSchedule_ReplaceCancelsOldTask(t *testing.T) {
|
||||
svc := &stubMonitorSvc{runCalled: make(chan int64, 8)}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
|
||||
m := &ChannelMonitor{ID: 7, Name: "m7", Enabled: true, IntervalSeconds: 60}
|
||||
r.Schedule(m)
|
||||
first := runnerTaskPtr(r, 7)
|
||||
if first == nil {
|
||||
t.Fatal("first schedule did not register task")
|
||||
}
|
||||
|
||||
r.Schedule(m)
|
||||
second := runnerTaskPtr(r, 7)
|
||||
if second == nil {
|
||||
t.Fatal("second schedule did not register task")
|
||||
}
|
||||
if first == second {
|
||||
t.Fatal("re-Schedule should create a new scheduledMonitor instance")
|
||||
}
|
||||
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
}
|
||||
|
||||
// TestUnschedule_RemovesTask 验证 Unschedule 删除 task 并使对应 goroutine 退出。
|
||||
func TestUnschedule_RemovesTask(t *testing.T) {
|
||||
svc := &stubMonitorSvc{runCalled: make(chan int64, 4)}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 3, Enabled: true, IntervalSeconds: 60})
|
||||
waitFor(t, time.Second, "task registered", func() bool { return runnerTaskCount(r) == 1 })
|
||||
|
||||
r.Unschedule(3)
|
||||
if got := runnerTaskCount(r); got != 0 {
|
||||
t.Fatalf("expected tasks empty after Unschedule, got %d", got)
|
||||
}
|
||||
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
}
|
||||
|
||||
// TestSchedule_DisabledRedirectsToUnschedule 验证 Enabled=false 等同于 Unschedule。
|
||||
func TestSchedule_DisabledRedirectsToUnschedule(t *testing.T) {
|
||||
svc := &stubMonitorSvc{runCalled: make(chan int64, 4)}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 9, Enabled: true, IntervalSeconds: 60})
|
||||
waitFor(t, time.Second, "task registered", func() bool { return runnerTaskCount(r) == 1 })
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 9, Enabled: false, IntervalSeconds: 60})
|
||||
if got := runnerTaskCount(r); got != 0 {
|
||||
t.Fatalf("expected tasks empty after disabled re-Schedule, got %d", got)
|
||||
}
|
||||
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
}
|
||||
|
||||
// TestSchedule_InvalidIntervalSkipped 验证 IntervalSeconds<=0 不会注册任务(防御性检查)。
|
||||
func TestSchedule_InvalidIntervalSkipped(t *testing.T) {
|
||||
svc := &stubMonitorSvc{}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 0})
|
||||
if got := runnerTaskCount(r); got != 0 {
|
||||
t.Fatalf("expected no task for invalid interval, got %d", got)
|
||||
}
|
||||
r.Stop()
|
||||
}
|
||||
|
||||
// TestSchedule_BeforeStartIsNoOp 验证 Start 之前调用 Schedule 不会注册任务。
|
||||
func TestSchedule_BeforeStartIsNoOp(t *testing.T) {
|
||||
svc := &stubMonitorSvc{}
|
||||
r := newRunnerForTest(svc)
|
||||
// 故意不调用 Start
|
||||
|
||||
r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 60})
|
||||
if got := runnerTaskCount(r); got != 0 {
|
||||
t.Fatalf("expected no task before Start, got %d", got)
|
||||
}
|
||||
r.Stop()
|
||||
}
|
||||
|
||||
// TestStart_LoadsAllEnabledMonitors 验证 Start 会为 ListEnabledMonitors 返回的每条记录建立任务。
|
||||
func TestStart_LoadsAllEnabledMonitors(t *testing.T) {
|
||||
svc := &stubMonitorSvc{
|
||||
enabled: []*ChannelMonitor{
|
||||
{ID: 1, Enabled: true, IntervalSeconds: 60},
|
||||
{ID: 2, Enabled: true, IntervalSeconds: 60},
|
||||
{ID: 3, Enabled: true, IntervalSeconds: 60},
|
||||
},
|
||||
}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
waitFor(t, 2*time.Second, "all 3 tasks scheduled", func() bool { return runnerTaskCount(r) == 3 })
|
||||
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
}
|
||||
|
||||
// TestStop_DrainsAllGoroutines 验证 Stop 会等待所有调度 goroutine 退出(无游离)。
|
||||
func TestStop_DrainsAllGoroutines(t *testing.T) {
|
||||
svc := &stubMonitorSvc{}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
|
||||
for id := int64(1); id <= 5; id++ {
|
||||
r.Schedule(&ChannelMonitor{ID: id, Enabled: true, IntervalSeconds: 60})
|
||||
}
|
||||
waitFor(t, 2*time.Second, "5 tasks scheduled", func() bool { return runnerTaskCount(r) == 5 })
|
||||
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
}
|
||||
|
||||
// TestStop_WaitsForInFlightCheck 验证 Stop 会等待正在执行的 RunCheck 退出(pool.StopAndWait)。
|
||||
func TestStop_WaitsForInFlightCheck(t *testing.T) {
|
||||
svc := &stubMonitorSvc{
|
||||
runCalled: make(chan int64, 1),
|
||||
runHoldFor: 200 * time.Millisecond,
|
||||
}
|
||||
r := newRunnerForTest(svc)
|
||||
r.Start()
|
||||
r.Schedule(&ChannelMonitor{ID: 1, Enabled: true, IntervalSeconds: 60})
|
||||
|
||||
select {
|
||||
case <-svc.runCalled:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("first fire never happened")
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
stoppedWithin(t, r, 3*time.Second)
|
||||
elapsed := time.Since(start)
|
||||
// Stop 必须等待 in-flight check 跑完(runHoldFor=200ms),耗时下界约 100ms。
|
||||
if elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("Stop returned too fast (%v); did not wait for in-flight check", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInFlight_PoolFullReleasesSlot 直接驱动 fire 路径,模拟 pool.TrySubmit 失败时 inFlight 必须释放。
|
||||
// 用一个小型 stub pool 替换 r.pool 不便(pond.Pool 是接口但 mock 麻烦),
|
||||
// 改为:占满 inFlight 后直接 fire,验证不会在 inFlight 空槽时永久卡住。
|
||||
func TestInFlight_AcquireReleaseSymmetric(t *testing.T) {
|
||||
svc := &stubMonitorSvc{}
|
||||
r := newRunnerForTest(svc)
|
||||
|
||||
if !r.tryAcquireInFlight(42) {
|
||||
t.Fatal("first acquire should succeed")
|
||||
}
|
||||
if r.tryAcquireInFlight(42) {
|
||||
t.Fatal("second acquire (no release) must fail")
|
||||
}
|
||||
r.releaseInFlight(42)
|
||||
if !r.tryAcquireInFlight(42) {
|
||||
t.Fatal("acquire after release should succeed")
|
||||
}
|
||||
r.releaseInFlight(42)
|
||||
}
|
||||
|
||||
// stoppedWithin 在 timeout 内并行调用 Stop,超时则 Fatal。验证 Stop 不会阻塞。
|
||||
func stoppedWithin(t *testing.T, r *ChannelMonitorRunner, timeout time.Duration) {
|
||||
t.Helper()
|
||||
done := make(chan struct{})
|
||||
var once sync.Once
|
||||
go func() {
|
||||
r.Stop()
|
||||
once.Do(func() { close(done) })
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("Stop did not return within %s — leaked goroutine?", timeout)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user