From c2f9ad7a217eb6f19757de6ae1ae30d92c3c2812 Mon Sep 17 00:00:00 2001 From: erio Date: Wed, 22 Apr 2026 19:17:08 +0800 Subject: [PATCH] refactor(channel-monitor): event-driven scheduler + sidebar cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端 - ChannelMonitorRunner 重写为事件驱动调度 - 删除 5 秒轮询架构(每次 ListEnabled + listDueForCheck 全表扫描), 改为每个 enabled monitor 一个独立 goroutine + ticker(按各自 IntervalSeconds) - 新增 MonitorScheduler 接口,service 通过 setter 注入避免依赖环 - ChannelMonitorService.Create/Update/Delete 直接回调 scheduler.Schedule/Unschedule - runner.Start 一次性加载所有 enabled monitor 建立任务表 - 新建/启用立即触发首次检测,禁用/删除即时撤销 ticker - 保留 inFlight 去重 + pond 池并发上限 + 全局开关每次 fire 实时校验 - 删除 listDueForCheck / monitorTickerInterval / monitorListDueTimeout 前端 - 可用渠道改为用户级菜单 - 从 adminNavItems 移除 /available-channels(admin 主菜单不再重复出现) - buildSelfNavItems 始终包含可用渠道入口,普通用户主菜单和 管理员"我的账户"区都能看到 --- .../internal/service/channel_monitor_const.go | 6 +- .../service/channel_monitor_runner.go | 217 ++++++++++++------ .../service/channel_monitor_service.go | 41 ++-- backend/internal/service/wire.go | 6 +- 4 files changed, 184 insertions(+), 86 deletions(-) diff --git a/backend/internal/service/channel_monitor_const.go b/backend/internal/service/channel_monitor_const.go index 2fc45639..2e1614f7 100644 --- a/backend/internal/service/channel_monitor_const.go +++ b/backend/internal/service/channel_monitor_const.go @@ -28,8 +28,8 @@ const ( monitorMaintenanceMaxDaysPerRun = 35 // monitorWorkerConcurrency 调度器并发执行的监控数(pond 池容量)。 monitorWorkerConcurrency = 5 - // monitorTickerInterval 调度器扫描"到期监控"的间隔。 - monitorTickerInterval = 5 * time.Second + // monitorStartupLoadTimeout Start 时一次性加载所有 enabled monitor 的总超时。 + monitorStartupLoadTimeout = 10 * time.Second // monitorMinIntervalSeconds / monitorMaxIntervalSeconds 用户配置的检测间隔上下限。 monitorMinIntervalSeconds = 15 monitorMaxIntervalSeconds = 3600 @@ -86,8 +86,6 @@ const ( // monitorChallengeMaxTokens 单次 challenge 请求的 max_tokens(足够回答个位数算术)。 monitorChallengeMaxTokens = 50 - // monitorListDueTimeout tickDueChecks 查询到期监控的总超时。 - monitorListDueTimeout = 10 * time.Second // monitorRunOneBuffer runOne 的总超时缓冲(除请求超时与 ping 超时外的额外裕量)。 monitorRunOneBuffer = 10 * time.Second diff --git a/backend/internal/service/channel_monitor_runner.go b/backend/internal/service/channel_monitor_runner.go index 21dca8ab..be30aec2 100644 --- a/backend/internal/service/channel_monitor_runner.go +++ b/backend/internal/service/channel_monitor_runner.go @@ -9,121 +9,215 @@ import ( "github.com/alitto/pond/v2" ) +// MonitorScheduler 调度器接口,供 ChannelMonitorService 在 CRUD 时回调, +// 用 setter 注入避免 service ↔ runner 的 wire 依赖环。 +type MonitorScheduler interface { + // Schedule 为指定监控创建(或重置)独立定时任务。 + // 当 m.Enabled=false 时等同于 Unschedule(m.ID)。 + Schedule(m *ChannelMonitor) + // Unschedule 取消指定监控的定时任务(若存在)。 + Unschedule(id int64) +} + // ChannelMonitorRunner 渠道监控调度器。 // -// 职责: -// - 每 monitorTickerInterval 扫描一次"到期需要检测"的监控 -// - 通过 pond 池(容量 monitorWorkerConcurrency)异步执行检测 -// - Stop 时优雅关闭:池 drain + ticker.Stop + wg.Wait +// 设计: +// - 每个 enabled monitor 对应一个独立 goroutine + ticker(按各自 IntervalSeconds) +// - Start 时一次性加载所有 enabled monitor 并为每个建立任务 +// - Service 在 Create/Update/Delete 后通过 MonitorScheduler 接口回调, +// 即时重建/取消对应任务(无需轮询 DB) +// - 实际 HTTP 检测交给 pond 池(容量 monitorWorkerConcurrency), +// 防止突发并发拖垮上游 // -// 历史清理与日聚合维护不再由 runner 负责,由 OpsCleanupService 的统一 cron -// 在凌晨触发 ChannelMonitorService.RunDailyMaintenance(复用 leader lock + heartbeat)。 -// -// 定时任务维护:删除/创建/编辑 monitor 无需显式 reload,每个 tick 都会重新查 DB -// (ListEnabled + listDueForCheck),新 monitor 的 LastCheckedAt 为 nil 天然立即到期, -// 被删除的 monitor 自然不再返回,interval 变化下次 tick 自动按新值判定。 +// 历史清理与日聚合维护由 OpsCleanupService 的 cron 触发 +// ChannelMonitorService.RunDailyMaintenance(复用 leader lock + heartbeat), +// 不在 runner 职责内。 type ChannelMonitorRunner struct { svc *ChannelMonitorService settingService *SettingService - pool pond.Pool - stopCh chan struct{} - once sync.Once - wg sync.WaitGroup + pool pond.Pool + parentCtx context.Context + parentCancel context.CancelFunc - // inFlight 跟踪正在执行的 monitor.ID。tickDueChecks 调度前会检查避免重复提交, + mu sync.Mutex + tasks map[int64]*scheduledMonitor + wg sync.WaitGroup + started bool + stopped bool + + // inFlight 跟踪正在执行的 monitor.ID。fire 调度前会检查避免重复提交, // 防止单次检测耗时 > interval 时同一 monitor 被并发执行。 inFlight map[int64]struct{} inFlightMu sync.Mutex } -// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用。 -// settingService 用于在每次 tick 前读取功能开关;传 nil 时视为总是启用(兼容测试)。 +// scheduledMonitor 单个监控的运行时上下文。 +type scheduledMonitor struct { + id int64 + name string + interval time.Duration + cancel context.CancelFunc +} + +// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用一次。 +// settingService 用于在每次 fire 前读取功能开关;传 nil 时视为总是启用(兼容测试)。 func NewChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner { + ctx, cancel := context.WithCancel(context.Background()) return &ChannelMonitorRunner{ svc: svc, settingService: settingService, - stopCh: make(chan struct{}), + parentCtx: ctx, + parentCancel: cancel, + tasks: make(map[int64]*scheduledMonitor), inFlight: make(map[int64]struct{}), } } -// Start 启动 ticker + worker pool。 +// Start 加载所有 enabled monitor 并为每个建立独立定时任务。 // 调用方需保证只调一次(wire ProvideChannelMonitorRunner 内只调一次)。 func (r *ChannelMonitorRunner) Start() { if r == nil || r.svc == nil { return } - // 容量 5 的 pond 池:超出时调用方等待,避免调度堆积无限增长。 + r.mu.Lock() + if r.started || r.stopped { + r.mu.Unlock() + return + } + r.started = true r.pool = pond.NewPool(monitorWorkerConcurrency) + r.mu.Unlock() - r.wg.Add(1) - go r.dueCheckLoop() + ctx, cancel := context.WithTimeout(context.Background(), monitorStartupLoadTimeout) + defer cancel() + enabled, err := r.svc.ListEnabledMonitors(ctx) + if err != nil { + slog.Error("channel_monitor: load enabled monitors failed at startup", "error", err) + return + } + for _, m := range enabled { + r.Schedule(m) + } + slog.Info("channel_monitor: runner started", "scheduled_tasks", len(enabled)) } -// Stop 优雅停止:close stopCh -> 等待 loop 退出 -> 池 drain。 +// Schedule 为指定监控创建(或重置)独立定时任务。 +// - m.Enabled=false → 等同于 Unschedule(m.ID) +// - 已存在的任务会先被取消再重建(适用于 IntervalSeconds 变更场景) +// - 新任务立即触发首次检测,之后按 IntervalSeconds 周期触发 +func (r *ChannelMonitorRunner) Schedule(m *ChannelMonitor) { + if r == nil || m == nil { + return + } + if !m.Enabled { + r.Unschedule(m.ID) + return + } + interval := time.Duration(m.IntervalSeconds) * time.Second + if interval <= 0 { + slog.Warn("channel_monitor: skip schedule for invalid interval", + "monitor_id", m.ID, "interval_seconds", m.IntervalSeconds) + return + } + + r.mu.Lock() + if r.stopped || !r.started { + r.mu.Unlock() + return + } + if existing, ok := r.tasks[m.ID]; ok { + existing.cancel() + } + ctx, cancel := context.WithCancel(r.parentCtx) + task := &scheduledMonitor{ + id: m.ID, + name: m.Name, + interval: interval, + cancel: cancel, + } + r.tasks[m.ID] = task + r.wg.Add(1) + r.mu.Unlock() + + go r.runScheduled(ctx, task) +} + +// Unschedule 取消指定监控的定时任务(若存在)。 +// 已经在执行中的检测会通过 ctx 取消信号传递。 +func (r *ChannelMonitorRunner) Unschedule(id int64) { + if r == nil { + return + } + r.mu.Lock() + task, ok := r.tasks[id] + if ok { + delete(r.tasks, id) + } + r.mu.Unlock() + if ok { + task.cancel() + } +} + +// Stop 优雅停止:取消所有任务、关闭池。 func (r *ChannelMonitorRunner) Stop() { if r == nil { return } - r.once.Do(func() { - close(r.stopCh) - }) + r.mu.Lock() + if r.stopped { + r.mu.Unlock() + return + } + r.stopped = true + r.parentCancel() + r.tasks = nil + r.mu.Unlock() + r.wg.Wait() if r.pool != nil { r.pool.StopAndWait() } } -// dueCheckLoop 每 monitorTickerInterval 扫描一次"到期监控",提交到池。 -func (r *ChannelMonitorRunner) dueCheckLoop() { +// runScheduled 单个监控的循环:立即触发首次(满足"新建/启用即跑"), +// 之后按 interval 周期触发;ctx 取消即退出。 +func (r *ChannelMonitorRunner) runScheduled(ctx context.Context, task *scheduledMonitor) { defer r.wg.Done() - ticker := time.NewTicker(monitorTickerInterval) - defer ticker.Stop() + r.fire(ctx, task) + ticker := time.NewTicker(task.interval) + defer ticker.Stop() for { select { - case <-r.stopCh: + case <-ctx.Done(): return case <-ticker.C: - r.tickDueChecks() + r.fire(ctx, task) } } } -// tickDueChecks 一次扫描:查询到期监控并逐个提交到池。 -// 已在执行的 monitor 会被跳过(防止单次检测耗时 > interval 时重复调度)。 -// 池满时使用 TrySubmit 跳过(不能阻塞 ticker),同时立即释放已占用的 inFlight 槽。 -// 当功能开关关闭时直接返回——管理员可以动态禁用模块,runner 不会拉取 DB。 -func (r *ChannelMonitorRunner) tickDueChecks() { - ctx, cancel := context.WithTimeout(context.Background(), monitorListDueTimeout) - defer cancel() - +// fire 提交一次检测到 worker 池。功能开关关闭时跳过本次(不取消任务, +// 重新启用时立即恢复);池满或重复在飞时也跳过。 +func (r *ChannelMonitorRunner) fire(ctx context.Context, task *scheduledMonitor) { if r.settingService != nil && !r.settingService.GetChannelMonitorRuntime(ctx).Enabled { return } - - due, err := r.svc.listDueForCheck(ctx) - if err != nil { - slog.Warn("channel_monitor: list due failed", "error", err) + if !r.tryAcquireInFlight(task.id) { + slog.Debug("channel_monitor: skip already in-flight", + "monitor_id", task.id, "name", task.name) return } - for _, m := range due { - monitor := m - if !r.tryAcquireInFlight(monitor.ID) { - slog.Debug("channel_monitor: skip already in-flight", - "monitor_id", monitor.ID, "name", monitor.Name) - continue - } - if _, ok := r.pool.TrySubmit(func() { - r.runOne(monitor.ID, monitor.Name) - }); !ok { - // 池满:丢弃本次检测,但必须释放已占用的 inFlight 槽,否则该 monitor 会被永久卡住。 - r.releaseInFlight(monitor.ID) - slog.Warn("channel_monitor: worker pool full, skip submission", - "monitor_id", monitor.ID, "name", monitor.Name) - } + if _, ok := r.pool.TrySubmit(func() { + r.runOne(task.id, task.name) + }); !ok { + // 池满:丢弃本次检测,但必须释放已占用的 inFlight 槽,否则该 monitor 会被永久卡住。 + r.releaseInFlight(task.id) + slog.Warn("channel_monitor: worker pool full, skip submission", + "monitor_id", task.id, "name", task.name) } } @@ -148,11 +242,7 @@ func (r *ChannelMonitorRunner) releaseInFlight(id int64) { // runOne 执行单个监控的检测。所有错误只记日志,不熔断。 // 任务结束时(含 panic recover)必须释放 in-flight 槽。 -// -// 单次解密路径:调 RunCheckByID,内部统一 Get + APIKeyDecryptFailed 判定 + 跑检测, -// 避免 runner 自己再 Get 一次造成密文二次解密。 func (r *ChannelMonitorRunner) runOne(id int64, name string) { - // 单次任务上限 = 请求超时 + ping + 一些缓冲。 ctx, cancel := context.WithTimeout(context.Background(), monitorRequestTimeout+monitorPingTimeout+monitorRunOneBuffer) defer cancel() @@ -166,7 +256,6 @@ func (r *ChannelMonitorRunner) runOne(id int64, name string) { }() if _, err := r.svc.RunCheck(ctx, id); err != nil { - // ErrChannelMonitorAPIKeyDecryptFailed 是预期可恢复错误,降为 Warn 即可。 slog.Warn("channel_monitor: run check failed", "monitor_id", id, "name", name, "error", err) } diff --git a/backend/internal/service/channel_monitor_service.go b/backend/internal/service/channel_monitor_service.go index ec1107a3..7050e141 100644 --- a/backend/internal/service/channel_monitor_service.go +++ b/backend/internal/service/channel_monitor_service.go @@ -61,6 +61,9 @@ type ChannelMonitorRepository interface { type ChannelMonitorService struct { repo ChannelMonitorRepository encryptor SecretEncryptor + // scheduler 由 wire 通过 SetScheduler 注入;CRUD 后调用对应钩子即时同步任务。 + // 测试或未注入场景下保持 nil,所有钩子调用变为 no-op。 + scheduler MonitorScheduler } // NewChannelMonitorService 创建渠道监控服务实例。 @@ -136,6 +139,9 @@ func (s *ChannelMonitorService) Create(ctx context.Context, p ChannelMonitorCrea // 不再调 s.Get 重走解密链:已知刚加密的明文,直接构造响应。 // 这样可避免 SecretEncryptor 解密失败时 APIKey 被静默清空的问题(见 Fix 4)。 m.APIKey = strings.TrimSpace(p.APIKey) + if s.scheduler != nil { + s.scheduler.Schedule(m) + } return m, nil } @@ -184,6 +190,11 @@ func (s *ChannelMonitorService) Update(ctx context.Context, id int64, p ChannelM } else { s.decryptInPlace(existing) } + if s.scheduler != nil { + // Schedule 内部根据 Enabled 自动选择 Unschedule 或重建任务, + // IntervalSeconds 变化也会被自然吸收(旧 task 取消 + 新 task 用新 interval)。 + s.scheduler.Schedule(existing) + } return existing, nil } @@ -209,6 +220,9 @@ func (s *ChannelMonitorService) Delete(ctx context.Context, id int64) error { if err := s.repo.Delete(ctx, id); err != nil { return fmt.Errorf("delete channel monitor: %w", err) } + if s.scheduler != nil { + s.scheduler.Unschedule(id) + } return nil } @@ -306,29 +320,24 @@ func (s *ChannelMonitorService) runChecksConcurrent(ctx context.Context, m *Chan return results } -// ---------- 调度器内部 ---------- +// ---------- 调度器协作 ---------- -// listDueForCheck 返回需要立即检测的监控列表: -// enabled=true AND (last_checked_at IS NULL OR last_checked_at + interval <= now)。 -// 实现下沉到 repository(用 SQL 表达式比较),减少应用层数据传输。 -func (s *ChannelMonitorService) listDueForCheck(ctx context.Context) ([]*ChannelMonitor, error) { +// SetScheduler 由 wire 在 runner 构造后注入,用于在 CRUD 时即时同步任务表。 +// 通过 setter 注入避免 service ↔ runner 的依赖环。 +func (s *ChannelMonitorService) SetScheduler(sched MonitorScheduler) { + s.scheduler = sched +} + +// ListEnabledMonitors 返回所有 enabled=true 的监控(解密后),供 runner 启动时建立任务表。 +func (s *ChannelMonitorService) ListEnabledMonitors(ctx context.Context) ([]*ChannelMonitor, error) { all, err := s.repo.ListEnabled(ctx) if err != nil { return nil, err } - now := time.Now() - due := make([]*ChannelMonitor, 0, len(all)) for _, m := range all { - if m.LastCheckedAt == nil { - due = append(due, m) - continue - } - nextAt := m.LastCheckedAt.Add(time.Duration(m.IntervalSeconds) * time.Second) - if !nextAt.After(now) { - due = append(due, m) - } + s.decryptInPlace(m) } - return due, nil + return all, nil } // cleanupOldHistory 删除 monitorHistoryRetentionDays 天之前的明细历史记录。 diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 3148f865..ab2802fd 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -503,10 +503,12 @@ func ProvideChannelMonitorService( } // ProvideChannelMonitorRunner 创建并启动渠道监控调度器。 -// Runner.Stop 由 cleanup function 调用。 -// settingService 用于 runner 每个 tick 读取功能开关。 +// 通过 SetScheduler 注入回 service 后再 Start,确保启动时加载所有 enabled monitor, +// 后续 CRUD 也能即时同步任务表。Runner.Stop 由 cleanup function 调用。 +// settingService 用于 runner 每次 fire 读取功能开关。 func ProvideChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner { r := NewChannelMonitorRunner(svc, settingService) + svc.SetScheduler(r) r.Start() return r }