refactor(channel-monitor): event-driven scheduler + sidebar cleanup

后端 - ChannelMonitorRunner 重写为事件驱动调度
- 删除 5 秒轮询架构(每次 ListEnabled + listDueForCheck 全表扫描),
  改为每个 enabled monitor 一个独立 goroutine + ticker(按各自 IntervalSeconds)
- 新增 MonitorScheduler 接口,service 通过 setter 注入避免依赖环
- ChannelMonitorService.Create/Update/Delete 直接回调 scheduler.Schedule/Unschedule
- runner.Start 一次性加载所有 enabled monitor 建立任务表
- 新建/启用立即触发首次检测,禁用/删除即时撤销 ticker
- 保留 inFlight 去重 + pond 池并发上限 + 全局开关每次 fire 实时校验
- 删除 listDueForCheck / monitorTickerInterval / monitorListDueTimeout

前端 - 可用渠道改为用户级菜单
- 从 adminNavItems 移除 /available-channels(admin 主菜单不再重复出现)
- buildSelfNavItems 始终包含可用渠道入口,普通用户主菜单和
  管理员"我的账户"区都能看到
This commit is contained in:
erio
2026-04-22 19:17:08 +08:00
parent e1193212b5
commit c2f9ad7a21
4 changed files with 184 additions and 86 deletions

View File

@@ -9,121 +9,215 @@ import (
"github.com/alitto/pond/v2"
)
// MonitorScheduler 调度器接口,供 ChannelMonitorService 在 CRUD 时回调,
// 用 setter 注入避免 service ↔ runner 的 wire 依赖环。
type MonitorScheduler interface {
// Schedule 为指定监控创建(或重置)独立定时任务。
// 当 m.Enabled=false 时等同于 Unschedule(m.ID)。
Schedule(m *ChannelMonitor)
// Unschedule 取消指定监控的定时任务(若存在)。
Unschedule(id int64)
}
// ChannelMonitorRunner 渠道监控调度器。
//
// 职责
// - 每 monitorTickerInterval 扫描一次"到期需要检测"的监控
// - 通过 pond 池(容量 monitorWorkerConcurrency异步执行检测
// - Stop 时优雅关闭:池 drain + ticker.Stop + wg.Wait
// 设计
// - 每个 enabled monitor 对应一个独立 goroutine + ticker按各自 IntervalSeconds
// - Start 时一次性加载所有 enabled monitor 并为每个建立任务
// - Service 在 Create/Update/Delete 后通过 MonitorScheduler 接口回调,
// 即时重建/取消对应任务(无需轮询 DB
// - 实际 HTTP 检测交给 pond 池(容量 monitorWorkerConcurrency
// 防止突发并发拖垮上游
//
// 历史清理与日聚合维护不再由 runner 负责,由 OpsCleanupService 的统一 cron
// 在凌晨触发 ChannelMonitorService.RunDailyMaintenance复用 leader lock + heartbeat
//
// 定时任务维护:删除/创建/编辑 monitor 无需显式 reload每个 tick 都会重新查 DB
// ListEnabled + listDueForCheck新 monitor 的 LastCheckedAt 为 nil 天然立即到期,
// 被删除的 monitor 自然不再返回interval 变化下次 tick 自动按新值判定。
// 历史清理与日聚合维护由 OpsCleanupService 的 cron 触发
// ChannelMonitorService.RunDailyMaintenance复用 leader lock + heartbeat
// 不在 runner 职责内。
type ChannelMonitorRunner struct {
svc *ChannelMonitorService
settingService *SettingService
pool pond.Pool
stopCh chan struct{}
once sync.Once
wg sync.WaitGroup
pool pond.Pool
parentCtx context.Context
parentCancel context.CancelFunc
// inFlight 跟踪正在执行的 monitor.ID。tickDueChecks 调度前会检查避免重复提交,
mu sync.Mutex
tasks map[int64]*scheduledMonitor
wg sync.WaitGroup
started bool
stopped bool
// inFlight 跟踪正在执行的 monitor.ID。fire 调度前会检查避免重复提交,
// 防止单次检测耗时 > interval 时同一 monitor 被并发执行。
inFlight map[int64]struct{}
inFlightMu sync.Mutex
}
// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用
// settingService 用于在每次 tick 前读取功能开关;传 nil 时视为总是启用(兼容测试)。
// scheduledMonitor 单个监控的运行时上下文
type scheduledMonitor struct {
id int64
name string
interval time.Duration
cancel context.CancelFunc
}
// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用一次。
// settingService 用于在每次 fire 前读取功能开关;传 nil 时视为总是启用(兼容测试)。
func NewChannelMonitorRunner(svc *ChannelMonitorService, settingService *SettingService) *ChannelMonitorRunner {
ctx, cancel := context.WithCancel(context.Background())
return &ChannelMonitorRunner{
svc: svc,
settingService: settingService,
stopCh: make(chan struct{}),
parentCtx: ctx,
parentCancel: cancel,
tasks: make(map[int64]*scheduledMonitor),
inFlight: make(map[int64]struct{}),
}
}
// Start 启动 ticker + worker pool
// Start 加载所有 enabled monitor 并为每个建立独立定时任务
// 调用方需保证只调一次wire ProvideChannelMonitorRunner 内只调一次)。
func (r *ChannelMonitorRunner) Start() {
if r == nil || r.svc == nil {
return
}
// 容量 5 的 pond 池:超出时调用方等待,避免调度堆积无限增长。
r.mu.Lock()
if r.started || r.stopped {
r.mu.Unlock()
return
}
r.started = true
r.pool = pond.NewPool(monitorWorkerConcurrency)
r.mu.Unlock()
r.wg.Add(1)
go r.dueCheckLoop()
ctx, cancel := context.WithTimeout(context.Background(), monitorStartupLoadTimeout)
defer cancel()
enabled, err := r.svc.ListEnabledMonitors(ctx)
if err != nil {
slog.Error("channel_monitor: load enabled monitors failed at startup", "error", err)
return
}
for _, m := range enabled {
r.Schedule(m)
}
slog.Info("channel_monitor: runner started", "scheduled_tasks", len(enabled))
}
// Stop 优雅停止close stopCh -> 等待 loop 退出 -> 池 drain
// Schedule 为指定监控创建(或重置)独立定时任务
// - m.Enabled=false → 等同于 Unschedule(m.ID)
// - 已存在的任务会先被取消再重建(适用于 IntervalSeconds 变更场景)
// - 新任务立即触发首次检测,之后按 IntervalSeconds 周期触发
func (r *ChannelMonitorRunner) Schedule(m *ChannelMonitor) {
if r == nil || m == nil {
return
}
if !m.Enabled {
r.Unschedule(m.ID)
return
}
interval := time.Duration(m.IntervalSeconds) * time.Second
if interval <= 0 {
slog.Warn("channel_monitor: skip schedule for invalid interval",
"monitor_id", m.ID, "interval_seconds", m.IntervalSeconds)
return
}
r.mu.Lock()
if r.stopped || !r.started {
r.mu.Unlock()
return
}
if existing, ok := r.tasks[m.ID]; ok {
existing.cancel()
}
ctx, cancel := context.WithCancel(r.parentCtx)
task := &scheduledMonitor{
id: m.ID,
name: m.Name,
interval: interval,
cancel: cancel,
}
r.tasks[m.ID] = task
r.wg.Add(1)
r.mu.Unlock()
go r.runScheduled(ctx, task)
}
// Unschedule 取消指定监控的定时任务(若存在)。
// 已经在执行中的检测会通过 ctx 取消信号传递。
func (r *ChannelMonitorRunner) Unschedule(id int64) {
if r == nil {
return
}
r.mu.Lock()
task, ok := r.tasks[id]
if ok {
delete(r.tasks, id)
}
r.mu.Unlock()
if ok {
task.cancel()
}
}
// Stop 优雅停止:取消所有任务、关闭池。
func (r *ChannelMonitorRunner) Stop() {
if r == nil {
return
}
r.once.Do(func() {
close(r.stopCh)
})
r.mu.Lock()
if r.stopped {
r.mu.Unlock()
return
}
r.stopped = true
r.parentCancel()
r.tasks = nil
r.mu.Unlock()
r.wg.Wait()
if r.pool != nil {
r.pool.StopAndWait()
}
}
// dueCheckLoop 每 monitorTickerInterval 扫描一次"到期监控",提交到池。
func (r *ChannelMonitorRunner) dueCheckLoop() {
// runScheduled 单个监控的循环:立即触发首次(满足"新建/启用即跑"
// 之后按 interval 周期触发ctx 取消即退出。
func (r *ChannelMonitorRunner) runScheduled(ctx context.Context, task *scheduledMonitor) {
defer r.wg.Done()
ticker := time.NewTicker(monitorTickerInterval)
defer ticker.Stop()
r.fire(ctx, task)
ticker := time.NewTicker(task.interval)
defer ticker.Stop()
for {
select {
case <-r.stopCh:
case <-ctx.Done():
return
case <-ticker.C:
r.tickDueChecks()
r.fire(ctx, task)
}
}
}
// tickDueChecks 一次扫描:查询到期监控并逐个提交到池。
// 已在执行的 monitor 会被跳过(防止单次检测耗时 > interval 时重复调度)
// 池满时使用 TrySubmit 跳过(不能阻塞 ticker同时立即释放已占用的 inFlight 槽。
// 当功能开关关闭时直接返回——管理员可以动态禁用模块runner 不会拉取 DB。
func (r *ChannelMonitorRunner) tickDueChecks() {
ctx, cancel := context.WithTimeout(context.Background(), monitorListDueTimeout)
defer cancel()
// fire 提交一次检测到 worker 池。功能开关关闭时跳过本次(不取消任务,
// 重新启用时立即恢复);池满或重复在飞时也跳过
func (r *ChannelMonitorRunner) fire(ctx context.Context, task *scheduledMonitor) {
if r.settingService != nil && !r.settingService.GetChannelMonitorRuntime(ctx).Enabled {
return
}
due, err := r.svc.listDueForCheck(ctx)
if err != nil {
slog.Warn("channel_monitor: list due failed", "error", err)
if !r.tryAcquireInFlight(task.id) {
slog.Debug("channel_monitor: skip already in-flight",
"monitor_id", task.id, "name", task.name)
return
}
for _, m := range due {
monitor := m
if !r.tryAcquireInFlight(monitor.ID) {
slog.Debug("channel_monitor: skip already in-flight",
"monitor_id", monitor.ID, "name", monitor.Name)
continue
}
if _, ok := r.pool.TrySubmit(func() {
r.runOne(monitor.ID, monitor.Name)
}); !ok {
// 池满:丢弃本次检测,但必须释放已占用的 inFlight 槽,否则该 monitor 会被永久卡住。
r.releaseInFlight(monitor.ID)
slog.Warn("channel_monitor: worker pool full, skip submission",
"monitor_id", monitor.ID, "name", monitor.Name)
}
if _, ok := r.pool.TrySubmit(func() {
r.runOne(task.id, task.name)
}); !ok {
// 池满:丢弃本次检测,但必须释放已占用的 inFlight 槽,否则该 monitor 会被永久卡住。
r.releaseInFlight(task.id)
slog.Warn("channel_monitor: worker pool full, skip submission",
"monitor_id", task.id, "name", task.name)
}
}
@@ -148,11 +242,7 @@ func (r *ChannelMonitorRunner) releaseInFlight(id int64) {
// runOne 执行单个监控的检测。所有错误只记日志,不熔断。
// 任务结束时(含 panic recover必须释放 in-flight 槽。
//
// 单次解密路径:调 RunCheckByID内部统一 Get + APIKeyDecryptFailed 判定 + 跑检测,
// 避免 runner 自己再 Get 一次造成密文二次解密。
func (r *ChannelMonitorRunner) runOne(id int64, name string) {
// 单次任务上限 = 请求超时 + ping + 一些缓冲。
ctx, cancel := context.WithTimeout(context.Background(), monitorRequestTimeout+monitorPingTimeout+monitorRunOneBuffer)
defer cancel()
@@ -166,7 +256,6 @@ func (r *ChannelMonitorRunner) runOne(id int64, name string) {
}()
if _, err := r.svc.RunCheck(ctx, id); err != nil {
// ErrChannelMonitorAPIKeyDecryptFailed 是预期可恢复错误,降为 Warn 即可。
slog.Warn("channel_monitor: run check failed",
"monitor_id", id, "name", name, "error", err)
}