feat(channel-monitor): aggregate history to daily rollups + soft delete
明细只保留 1 天,超过 1 天聚合到新表 channel_monitor_daily_rollups(按 monitor_id/model/bucket_date 维度),聚合保留 30 天。两张表都用 SoftDeleteMixin 软删除(DELETE 自动改为 UPDATE deleted_at = NOW())。 聚合 + 清理任务由 OpsCleanupService 的 cron 统一调度,与运维监控的清理共享 schedule(默认 0 2 * * *)和 leader lock。ChannelMonitorRunner 的 cleanupLoop 被移除,只保留 dueCheckLoop。 读取路径 ComputeAvailability* 改为 UNION 明细(今天 deleted_at IS NULL)+ 聚合(过去 windowDays 天 deleted_at IS NULL),SUM(ok)/SUM(total) 自然加权 计算可用率,AVG latency 用 SUM(sum_latency_ms)/SUM(count_latency)。 watermark 表 channel_monitor_aggregation_watermark 单行(id=1),记录 last_aggregated_date,重启后从该日期 +1 继续聚合,首次为 nil 则从 today - 30d 开始回填,单次最多 35 天上限避免长事务。 raw SQL 的 ListLatestPerModel / ListLatestForMonitorIDs / ListRecentHistoryForMonitors 都补上 deleted_at IS NULL 过滤(SoftDeleteMixin interceptor 只对 ent query 生效)。 bump version to 0.1.114.28 GroupBadge 在 MonitorKeyPickerDialog 中复用平台主题色 + 倍率/专属倍率 (顺手优化)。
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
dbent "github.com/Wei-Shaw/sub2api/ent"
|
||||
"github.com/Wei-Shaw/sub2api/ent/channelmonitor"
|
||||
"github.com/Wei-Shaw/sub2api/ent/channelmonitordailyrollup"
|
||||
"github.com/Wei-Shaw/sub2api/ent/channelmonitorhistory"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/lib/pq"
|
||||
@@ -246,6 +247,7 @@ func (r *channelMonitorRepository) ListLatestPerModel(ctx context.Context, monit
|
||||
model, status, latency_ms, ping_latency_ms, checked_at
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = $1
|
||||
AND deleted_at IS NULL
|
||||
ORDER BY model, checked_at DESC
|
||||
`
|
||||
rows, err := r.db.QueryContext(ctx, q, monitorID)
|
||||
@@ -280,23 +282,48 @@ func assignNullInt(dst **int, n sql.NullInt64) {
|
||||
|
||||
// ComputeAvailability 计算指定窗口内每个模型的可用率与平均延迟。
|
||||
// "可用" = status IN (operational, degraded)。
|
||||
//
|
||||
// 数据来源:明细表只保留 1 天;窗口前其余天数走聚合表。
|
||||
// - raw = 今天(CURRENT_DATE 起)的未软删明细,按 model 累加
|
||||
// - rollup = [CURRENT_DATE - windowDays, CURRENT_DATE) 区间的聚合行
|
||||
//
|
||||
// 总窗口为 "今天 + 过去 windowDays 天",比 windowDays 字面值大 1 天,但因为聚合
|
||||
// 是按整 UTC 日切的,这是聚合化无法避免的精度损失,且偏宽不偏窄(数据更全)。
|
||||
func (r *channelMonitorRepository) ComputeAvailability(ctx context.Context, monitorID int64, windowDays int) ([]*service.ChannelMonitorAvailability, error) {
|
||||
if windowDays <= 0 {
|
||||
windowDays = 7
|
||||
}
|
||||
const q = `
|
||||
SELECT
|
||||
model,
|
||||
COUNT(*) AS total_checks,
|
||||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_checks,
|
||||
AVG(latency_ms) FILTER (WHERE latency_ms IS NOT NULL) AS avg_latency_ms
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = $1
|
||||
AND checked_at >= $2
|
||||
WITH raw AS (
|
||||
SELECT model,
|
||||
COUNT(*) AS total_checks,
|
||||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||||
COUNT(latency_ms) AS count_latency
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = $1
|
||||
AND deleted_at IS NULL
|
||||
AND checked_at >= CURRENT_DATE
|
||||
GROUP BY model
|
||||
),
|
||||
rollup AS (
|
||||
SELECT model, total_checks, ok_count, sum_latency_ms, count_latency
|
||||
FROM channel_monitor_daily_rollups
|
||||
WHERE monitor_id = $1
|
||||
AND deleted_at IS NULL
|
||||
AND bucket_date >= (CURRENT_DATE - $2::int)
|
||||
AND bucket_date < CURRENT_DATE
|
||||
)
|
||||
SELECT model,
|
||||
SUM(total_checks) AS total,
|
||||
SUM(ok_count) AS ok,
|
||||
CASE WHEN SUM(count_latency) > 0
|
||||
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
|
||||
ELSE NULL END AS avg_latency_ms
|
||||
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
|
||||
GROUP BY model
|
||||
`
|
||||
from := time.Now().AddDate(0, 0, -windowDays)
|
||||
rows, err := r.db.QueryContext(ctx, q, monitorID, from)
|
||||
rows, err := r.db.QueryContext(ctx, q, monitorID, windowDays)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query availability: %w", err)
|
||||
}
|
||||
@@ -349,6 +376,7 @@ func (r *channelMonitorRepository) ListLatestForMonitorIDs(ctx context.Context,
|
||||
monitor_id, model, status, latency_ms, ping_latency_ms, checked_at
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = ANY($1)
|
||||
AND deleted_at IS NULL
|
||||
ORDER BY monitor_id, model, checked_at DESC
|
||||
`
|
||||
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids))
|
||||
@@ -409,6 +437,7 @@ func (r *channelMonitorRepository) ListRecentHistoryForMonitors(
|
||||
FROM channel_monitor_histories h
|
||||
JOIN targets t
|
||||
ON t.monitor_id = h.monitor_id AND t.model = h.model
|
||||
WHERE h.deleted_at IS NULL
|
||||
)
|
||||
SELECT monitor_id, status, latency_ms, ping_latency_ms, checked_at
|
||||
FROM ranked
|
||||
@@ -476,6 +505,7 @@ func clampTimelineLimit(n int) int {
|
||||
}
|
||||
|
||||
// ComputeAvailabilityForMonitors 一次性计算多个监控在某个窗口内的每模型可用率与平均延迟。
|
||||
// 与单 monitor 版本同构:明细只覆盖今天,更早走聚合表 UNION 合并。
|
||||
func (r *channelMonitorRepository) ComputeAvailabilityForMonitors(ctx context.Context, ids []int64, windowDays int) (map[int64][]*service.ChannelMonitorAvailability, error) {
|
||||
out := make(map[int64][]*service.ChannelMonitorAvailability, len(ids))
|
||||
if len(ids) == 0 {
|
||||
@@ -485,19 +515,38 @@ func (r *channelMonitorRepository) ComputeAvailabilityForMonitors(ctx context.Co
|
||||
windowDays = 7
|
||||
}
|
||||
const q = `
|
||||
SELECT
|
||||
monitor_id,
|
||||
model,
|
||||
COUNT(*) AS total_checks,
|
||||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_checks,
|
||||
AVG(latency_ms) FILTER (WHERE latency_ms IS NOT NULL) AS avg_latency_ms
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = ANY($1)
|
||||
AND checked_at >= $2
|
||||
WITH raw AS (
|
||||
SELECT monitor_id,
|
||||
model,
|
||||
COUNT(*) AS total_checks,
|
||||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||||
COUNT(latency_ms) AS count_latency
|
||||
FROM channel_monitor_histories
|
||||
WHERE monitor_id = ANY($1)
|
||||
AND deleted_at IS NULL
|
||||
AND checked_at >= CURRENT_DATE
|
||||
GROUP BY monitor_id, model
|
||||
),
|
||||
rollup AS (
|
||||
SELECT monitor_id, model, total_checks, ok_count, sum_latency_ms, count_latency
|
||||
FROM channel_monitor_daily_rollups
|
||||
WHERE monitor_id = ANY($1)
|
||||
AND deleted_at IS NULL
|
||||
AND bucket_date >= (CURRENT_DATE - $2::int)
|
||||
AND bucket_date < CURRENT_DATE
|
||||
)
|
||||
SELECT monitor_id,
|
||||
model,
|
||||
SUM(total_checks) AS total,
|
||||
SUM(ok_count) AS ok,
|
||||
CASE WHEN SUM(count_latency) > 0
|
||||
THEN SUM(sum_latency_ms)::float8 / SUM(count_latency)
|
||||
ELSE NULL END AS avg_latency_ms
|
||||
FROM (SELECT * FROM raw UNION ALL SELECT * FROM rollup) combined
|
||||
GROUP BY monitor_id, model
|
||||
`
|
||||
from := time.Now().AddDate(0, 0, -windowDays)
|
||||
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids), from)
|
||||
rows, err := r.db.QueryContext(ctx, q, pq.Array(ids), windowDays)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query availability batch: %w", err)
|
||||
}
|
||||
@@ -521,6 +570,116 @@ func (r *channelMonitorRepository) ComputeAvailabilityForMonitors(ctx context.Co
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ---------- 聚合维护 ----------
|
||||
|
||||
// UpsertDailyRollupsFor 把 targetDate 当天([targetDate, targetDate+1d))未软删的明细
|
||||
// 按 (monitor_id, model, bucket_date) 聚合写入 channel_monitor_daily_rollups。
|
||||
// - 用 ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE 实现幂等回填,
|
||||
// 重复执行只会用最新统计覆盖;
|
||||
// - 同时把 deleted_at 重置为 NULL,避免历史误删后聚合行被持续过滤掉;
|
||||
// - $1::date 让 PG 自动把入参 truncate 到 UTC 日期,调用方不需要预处理 targetDate。
|
||||
func (r *channelMonitorRepository) UpsertDailyRollupsFor(ctx context.Context, targetDate time.Time) (int64, error) {
|
||||
const q = `
|
||||
INSERT INTO channel_monitor_daily_rollups (
|
||||
monitor_id, model, bucket_date,
|
||||
total_checks, ok_count,
|
||||
operational_count, degraded_count, failed_count, error_count,
|
||||
sum_latency_ms, count_latency,
|
||||
sum_ping_latency_ms, count_ping_latency,
|
||||
computed_at
|
||||
)
|
||||
SELECT
|
||||
monitor_id,
|
||||
model,
|
||||
$1::date AS bucket_date,
|
||||
COUNT(*) AS total_checks,
|
||||
COUNT(*) FILTER (WHERE status IN ('operational','degraded')) AS ok_count,
|
||||
COUNT(*) FILTER (WHERE status = 'operational') AS operational_count,
|
||||
COUNT(*) FILTER (WHERE status = 'degraded') AS degraded_count,
|
||||
COUNT(*) FILTER (WHERE status = 'failed') AS failed_count,
|
||||
COUNT(*) FILTER (WHERE status = 'error') AS error_count,
|
||||
COALESCE(SUM(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0) AS sum_latency_ms,
|
||||
COUNT(latency_ms) AS count_latency,
|
||||
COALESCE(SUM(ping_latency_ms) FILTER (WHERE ping_latency_ms IS NOT NULL), 0) AS sum_ping_latency_ms,
|
||||
COUNT(ping_latency_ms) AS count_ping_latency,
|
||||
NOW()
|
||||
FROM channel_monitor_histories
|
||||
WHERE deleted_at IS NULL
|
||||
AND checked_at >= $1::date
|
||||
AND checked_at < ($1::date + INTERVAL '1 day')
|
||||
GROUP BY monitor_id, model
|
||||
ON CONFLICT (monitor_id, model, bucket_date) DO UPDATE SET
|
||||
total_checks = EXCLUDED.total_checks,
|
||||
ok_count = EXCLUDED.ok_count,
|
||||
operational_count = EXCLUDED.operational_count,
|
||||
degraded_count = EXCLUDED.degraded_count,
|
||||
failed_count = EXCLUDED.failed_count,
|
||||
error_count = EXCLUDED.error_count,
|
||||
sum_latency_ms = EXCLUDED.sum_latency_ms,
|
||||
count_latency = EXCLUDED.count_latency,
|
||||
sum_ping_latency_ms = EXCLUDED.sum_ping_latency_ms,
|
||||
count_ping_latency = EXCLUDED.count_ping_latency,
|
||||
computed_at = NOW(),
|
||||
deleted_at = NULL
|
||||
`
|
||||
res, err := r.db.ExecContext(ctx, q, targetDate)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("upsert daily rollups for %s: %w", targetDate.Format("2006-01-02"), err)
|
||||
}
|
||||
n, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("rows affected (upsert rollups): %w", err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// DeleteRollupsBefore 软删 bucket_date < beforeDate 的聚合行。
|
||||
// 走 ent client,利用 SoftDeleteMixin 把 DELETE 自动改写为 UPDATE deleted_at = NOW()。
|
||||
func (r *channelMonitorRepository) DeleteRollupsBefore(ctx context.Context, beforeDate time.Time) (int64, error) {
|
||||
client := clientFromContext(ctx, r.client)
|
||||
n, err := client.ChannelMonitorDailyRollup.Delete().
|
||||
Where(channelmonitordailyrollup.BucketDateLT(beforeDate)).
|
||||
Exec(ctx)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("delete rollups before: %w", err)
|
||||
}
|
||||
return int64(n), nil
|
||||
}
|
||||
|
||||
// LoadAggregationWatermark 读 watermark 表(id=1)。
|
||||
// watermark 表不是 ent schema(只有一行),直接走原生 SQL。
|
||||
// - 行不存在或 last_aggregated_date IS NULL:返回 (nil, nil),由调用方决定首次回填策略
|
||||
func (r *channelMonitorRepository) LoadAggregationWatermark(ctx context.Context) (*time.Time, error) {
|
||||
const q = `SELECT last_aggregated_date FROM channel_monitor_aggregation_watermark WHERE id = 1`
|
||||
var t sql.NullTime
|
||||
if err := r.db.QueryRowContext(ctx, q).Scan(&t); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, fmt.Errorf("load aggregation watermark: %w", err)
|
||||
}
|
||||
if !t.Valid {
|
||||
return nil, nil
|
||||
}
|
||||
return &t.Time, nil
|
||||
}
|
||||
|
||||
// UpdateAggregationWatermark 更新 watermark(UPSERT 到 id=1)。
|
||||
// $1::date 让 PG 把入参 truncate 到 UTC 日期,与 last_aggregated_date 列的 DATE 类型一致。
|
||||
func (r *channelMonitorRepository) UpdateAggregationWatermark(ctx context.Context, date time.Time) error {
|
||||
const q = `
|
||||
INSERT INTO channel_monitor_aggregation_watermark (id, last_aggregated_date, updated_at)
|
||||
VALUES (1, $1::date, NOW())
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
last_aggregated_date = EXCLUDED.last_aggregated_date,
|
||||
updated_at = NOW()
|
||||
`
|
||||
if _, err := r.db.ExecContext(ctx, q, date); err != nil {
|
||||
return fmt.Errorf("update aggregation watermark: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---------- helpers ----------
|
||||
|
||||
func entToServiceMonitor(row *dbent.ChannelMonitor) *service.ChannelMonitor {
|
||||
|
||||
@@ -15,8 +15,16 @@ const (
|
||||
monitorPingTimeout = 8 * time.Second
|
||||
// monitorDegradedThreshold 主请求成功但耗时超过该阈值视为 degraded。
|
||||
monitorDegradedThreshold = 6 * time.Second
|
||||
// monitorHistoryRetentionDays 历史保留天数(每天清理一次)。
|
||||
monitorHistoryRetentionDays = 30
|
||||
// monitorHistoryRetentionDays 明细历史保留天数。
|
||||
// 明细只保留 1 天,超出由 SoftDeleteMixin 软删;
|
||||
// 维护任务每天凌晨跑(由 OpsCleanupService 统一调度)。
|
||||
monitorHistoryRetentionDays = 1
|
||||
// monitorRollupRetentionDays 日聚合保留天数。
|
||||
// 日聚合行由 RunDailyMaintenance 在超过该窗口后软删。
|
||||
monitorRollupRetentionDays = 30
|
||||
// monitorMaintenanceMaxDaysPerRun 单次维护任务最多聚合的天数。
|
||||
// 用于限制首次上线回填(30 天)+ 少量余量,避免长事务。
|
||||
monitorMaintenanceMaxDaysPerRun = 35
|
||||
// monitorWorkerConcurrency 调度器并发执行的监控数(pond 池容量)。
|
||||
monitorWorkerConcurrency = 5
|
||||
// monitorTickerInterval 调度器扫描"到期监控"的间隔。
|
||||
@@ -55,11 +63,6 @@ const (
|
||||
monitorAvailability15Days = 15
|
||||
monitorAvailability30Days = 30
|
||||
|
||||
// monitorCleanupCheckInterval 历史清理调度器的检查频率(每小时检查"是否到 03:00")。
|
||||
monitorCleanupCheckInterval = time.Hour
|
||||
// monitorCleanupHour 凌晨 3 点执行历史清理。
|
||||
monitorCleanupHour = 3
|
||||
|
||||
// MonitorHistoryDefaultLimit 历史查询默认返回条数(handler 层共享)。
|
||||
MonitorHistoryDefaultLimit = 100
|
||||
// MonitorHistoryMaxLimit 历史查询最大返回条数(handler 层共享)。
|
||||
@@ -82,10 +85,6 @@ const (
|
||||
monitorListDueTimeout = 10 * time.Second
|
||||
// monitorRunOneBuffer runOne 的总超时缓冲(除请求超时与 ping 超时外的额外裕量)。
|
||||
monitorRunOneBuffer = 10 * time.Second
|
||||
// monitorCleanupTimeout 历史清理任务的总超时。
|
||||
monitorCleanupTimeout = 30 * time.Second
|
||||
// monitorCleanupDayLayout 历史清理用于"今日是否已跑过"判定的日期格式。
|
||||
monitorCleanupDayLayout = "2006-01-02"
|
||||
|
||||
// monitorIdleConnTimeout HTTP transport 空闲连接关闭超时。
|
||||
monitorIdleConnTimeout = 30 * time.Second
|
||||
|
||||
@@ -14,10 +14,10 @@ import (
|
||||
// 职责:
|
||||
// - 每 monitorTickerInterval 扫描一次"到期需要检测"的监控
|
||||
// - 通过 pond 池(容量 monitorWorkerConcurrency)异步执行检测
|
||||
// - 每小时检查一次时钟,到 monitorCleanupHour 点时执行历史清理
|
||||
// - Stop 时优雅关闭:池 drain + ticker.Stop + wg.Wait
|
||||
//
|
||||
// 不引入 cron 库;清理调度通过"每小时检查时间"实现,足够 MVP。
|
||||
// 历史清理与日聚合维护不再由 runner 负责,由 OpsCleanupService 的统一 cron
|
||||
// 在凌晨触发 ChannelMonitorService.RunDailyMaintenance(复用 leader lock + heartbeat)。
|
||||
//
|
||||
// 定时任务维护:删除/创建/编辑 monitor 无需显式 reload,每个 tick 都会重新查 DB
|
||||
// (ListEnabled + listDueForCheck),新 monitor 的 LastCheckedAt 为 nil 天然立即到期,
|
||||
@@ -35,10 +35,6 @@ type ChannelMonitorRunner struct {
|
||||
// 防止单次检测耗时 > interval 时同一 monitor 被并发执行。
|
||||
inFlight map[int64]struct{}
|
||||
inFlightMu sync.Mutex
|
||||
|
||||
// 清理状态:lastCleanupDay 记录上次清理的"年-月-日",避免同一天重复跑。
|
||||
lastCleanupDay string
|
||||
cleanupMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewChannelMonitorRunner 构造调度器。Start 在 wire 中调用。
|
||||
@@ -52,7 +48,7 @@ func NewChannelMonitorRunner(svc *ChannelMonitorService, settingService *Setting
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动 ticker + worker pool + cleanup loop。
|
||||
// Start 启动 ticker + worker pool。
|
||||
// 调用方需保证只调一次(wire ProvideChannelMonitorRunner 内只调一次)。
|
||||
func (r *ChannelMonitorRunner) Start() {
|
||||
if r == nil || r.svc == nil {
|
||||
@@ -61,12 +57,11 @@ func (r *ChannelMonitorRunner) Start() {
|
||||
// 容量 5 的 pond 池:超出时调用方等待,避免调度堆积无限增长。
|
||||
r.pool = pond.NewPool(monitorWorkerConcurrency)
|
||||
|
||||
r.wg.Add(2)
|
||||
r.wg.Add(1)
|
||||
go r.dueCheckLoop()
|
||||
go r.cleanupLoop()
|
||||
}
|
||||
|
||||
// Stop 优雅停止:close stopCh -> 等待两个 loop 退出 -> 池 drain。
|
||||
// Stop 优雅停止:close stopCh -> 等待 loop 退出 -> 池 drain。
|
||||
func (r *ChannelMonitorRunner) Stop() {
|
||||
if r == nil {
|
||||
return
|
||||
@@ -176,45 +171,3 @@ func (r *ChannelMonitorRunner) runOne(id int64, name string) {
|
||||
"monitor_id", id, "name", name, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupLoop 每小时检查当前时间,到 monitorCleanupHour 点(且当天还没清理过)则跑一次清理。
|
||||
// 启动时立即检查一次,避免长时间运行才跑首次清理。
|
||||
func (r *ChannelMonitorRunner) cleanupLoop() {
|
||||
defer r.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(monitorCleanupCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
r.maybeRunCleanup()
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
r.maybeRunCleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// maybeRunCleanup 如果当前小时是 monitorCleanupHour 且当天未跑过,则执行清理。
|
||||
func (r *ChannelMonitorRunner) maybeRunCleanup() {
|
||||
now := time.Now()
|
||||
if now.Hour() != monitorCleanupHour {
|
||||
return
|
||||
}
|
||||
day := now.Format(monitorCleanupDayLayout)
|
||||
|
||||
r.cleanupMu.Lock()
|
||||
if r.lastCleanupDay == day {
|
||||
r.cleanupMu.Unlock()
|
||||
return
|
||||
}
|
||||
r.lastCleanupDay = day
|
||||
r.cleanupMu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), monitorCleanupTimeout)
|
||||
defer cancel()
|
||||
if err := r.svc.cleanupOldHistory(ctx); err != nil {
|
||||
slog.Warn("channel_monitor: cleanup history failed", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,20 @@ type ChannelMonitorRepository interface {
|
||||
// ListRecentHistoryForMonitors 批量取多个 monitor 各自主模型(primaryModels[monitorID])最近 perMonitorLimit 条历史。
|
||||
// 返回的 entry 已按 checked_at DESC 排序(最新在前),不含 message 字段。
|
||||
ListRecentHistoryForMonitors(ctx context.Context, ids []int64, primaryModels map[int64]string, perMonitorLimit int) (map[int64][]*ChannelMonitorHistoryEntry, error)
|
||||
|
||||
// ---------- 聚合维护(OpsCleanupService 调用) ----------
|
||||
|
||||
// UpsertDailyRollupsFor 把 targetDate 当天的明细按 (monitor_id, model, bucket_date)
|
||||
// 聚合到 channel_monitor_daily_rollups。targetDate 会被截断到日期;
|
||||
// 用 ON CONFLICT DO UPDATE 实现幂等回填,返回 upsert 影响的行数。
|
||||
UpsertDailyRollupsFor(ctx context.Context, targetDate time.Time) (int64, error)
|
||||
// DeleteRollupsBefore 软删 bucket_date < beforeDate 的聚合行,返回删除行数。
|
||||
DeleteRollupsBefore(ctx context.Context, beforeDate time.Time) (int64, error)
|
||||
// LoadAggregationWatermark 读 watermark(id=1)。
|
||||
// 返回 nil 表示从未聚合过;watermark 表本身预期已存在单行(migration 110 写入)。
|
||||
LoadAggregationWatermark(ctx context.Context) (*time.Time, error)
|
||||
// UpdateAggregationWatermark 写 watermark(UPSERT 到 id=1)。
|
||||
UpdateAggregationWatermark(ctx context.Context, date time.Time) error
|
||||
}
|
||||
|
||||
// ChannelMonitorService 渠道监控管理服务。
|
||||
@@ -300,9 +314,10 @@ func (s *ChannelMonitorService) listDueForCheck(ctx context.Context) ([]*Channel
|
||||
return due, nil
|
||||
}
|
||||
|
||||
// cleanupOldHistory 删除 monitorHistoryRetentionDays 天之前的历史记录。
|
||||
// cleanupOldHistory 删除 monitorHistoryRetentionDays 天之前的明细历史记录。
|
||||
// 由 RunDailyMaintenance 调用;SoftDeleteMixin 自动把 DELETE 改为 UPDATE deleted_at。
|
||||
func (s *ChannelMonitorService) cleanupOldHistory(ctx context.Context) error {
|
||||
before := time.Now().AddDate(0, 0, -monitorHistoryRetentionDays)
|
||||
before := time.Now().UTC().AddDate(0, 0, -monitorHistoryRetentionDays)
|
||||
deleted, err := s.repo.DeleteHistoryBefore(ctx, before)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete history before %s: %w", before.Format(time.RFC3339), err)
|
||||
@@ -314,6 +329,94 @@ func (s *ChannelMonitorService) cleanupOldHistory(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RunDailyMaintenance 每日维护任务:聚合昨天之前未聚合的明细,软删过期明细和聚合。
|
||||
// 由 OpsCleanupService 的 cron 调度触发(共享 schedule 和 leader lock)。
|
||||
//
|
||||
// 幂等性:
|
||||
// - watermark 保证已聚合的日期不会重复处理;
|
||||
// - UpsertDailyRollupsFor 内部使用 ON CONFLICT DO UPDATE,同一日重复跑结果一致。
|
||||
//
|
||||
// 每一步失败都只记 slog.Warn,整体函数始终返回 nil 让后续步骤能继续跑
|
||||
// (与 OpsCleanupService.runCleanupOnce 风格一致)。
|
||||
func (s *ChannelMonitorService) RunDailyMaintenance(ctx context.Context) error {
|
||||
now := time.Now().UTC()
|
||||
today := now.Truncate(24 * time.Hour)
|
||||
|
||||
if err := s.runDailyAggregation(ctx, today); err != nil {
|
||||
slog.Warn("channel_monitor: maintenance step failed",
|
||||
"step", "aggregate", "error", err)
|
||||
}
|
||||
if err := s.cleanupOldHistory(ctx); err != nil {
|
||||
slog.Warn("channel_monitor: maintenance step failed",
|
||||
"step", "prune_history", "error", err)
|
||||
}
|
||||
if err := s.cleanupOldRollups(ctx, today); err != nil {
|
||||
slog.Warn("channel_monitor: maintenance step failed",
|
||||
"step", "prune_rollups", "error", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// runDailyAggregation 从 watermark+1 聚合到昨天(UTC)。
|
||||
// 首次跑(watermark nil):从 today-monitorRollupRetentionDays 开始回填。
|
||||
// 每次最多聚合 monitorMaintenanceMaxDaysPerRun 天,避免长事务。
|
||||
func (s *ChannelMonitorService) runDailyAggregation(ctx context.Context, today time.Time) error {
|
||||
watermark, err := s.repo.LoadAggregationWatermark(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load watermark: %w", err)
|
||||
}
|
||||
|
||||
start := s.resolveAggregationStart(watermark, today)
|
||||
if !start.Before(today) {
|
||||
return nil // 没有需要聚合的日期
|
||||
}
|
||||
|
||||
iterations := 0
|
||||
for d := start; d.Before(today); d = d.Add(24 * time.Hour) {
|
||||
if iterations >= monitorMaintenanceMaxDaysPerRun {
|
||||
slog.Info("channel_monitor: maintenance aggregation capped",
|
||||
"max_days", monitorMaintenanceMaxDaysPerRun,
|
||||
"next_resume", d.Format("2006-01-02"))
|
||||
break
|
||||
}
|
||||
affected, upErr := s.repo.UpsertDailyRollupsFor(ctx, d)
|
||||
if upErr != nil {
|
||||
return fmt.Errorf("upsert rollups for %s: %w", d.Format("2006-01-02"), upErr)
|
||||
}
|
||||
if err := s.repo.UpdateAggregationWatermark(ctx, d); err != nil {
|
||||
return fmt.Errorf("update watermark to %s: %w", d.Format("2006-01-02"), err)
|
||||
}
|
||||
slog.Info("channel_monitor: rollups upserted",
|
||||
"date", d.Format("2006-01-02"), "affected_rows", affected)
|
||||
iterations++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolveAggregationStart 计算本次聚合起点:
|
||||
// - watermark == nil:today - monitorRollupRetentionDays(首次回填最多 30 天)
|
||||
// - watermark != nil:*watermark + 1 day
|
||||
func (s *ChannelMonitorService) resolveAggregationStart(watermark *time.Time, today time.Time) time.Time {
|
||||
if watermark == nil {
|
||||
return today.AddDate(0, 0, -monitorRollupRetentionDays)
|
||||
}
|
||||
return watermark.UTC().Truncate(24 * time.Hour).Add(24 * time.Hour)
|
||||
}
|
||||
|
||||
// cleanupOldRollups 软删 bucket_date < today - monitorRollupRetentionDays 的日聚合行。
|
||||
func (s *ChannelMonitorService) cleanupOldRollups(ctx context.Context, today time.Time) error {
|
||||
cutoff := today.AddDate(0, 0, -monitorRollupRetentionDays)
|
||||
deleted, err := s.repo.DeleteRollupsBefore(ctx, cutoff)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete rollups before %s: %w", cutoff.Format("2006-01-02"), err)
|
||||
}
|
||||
if deleted > 0 {
|
||||
slog.Info("channel_monitor: rollups cleanup",
|
||||
"deleted_rows", deleted, "before", cutoff.Format("2006-01-02"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ---------- helpers ----------
|
||||
|
||||
// decryptInPlace 把 ChannelMonitor.APIKey 从密文解密为明文。
|
||||
|
||||
@@ -36,11 +36,15 @@ return 0
|
||||
// - Scheduling: 5-field cron spec (minute hour dom month dow).
|
||||
// - Multi-instance: best-effort Redis leader lock so only one node runs cleanup.
|
||||
// - Safety: deletes in batches to avoid long transactions.
|
||||
//
|
||||
// 附带:在 runCleanupOnce 末尾调用 ChannelMonitorService.RunDailyMaintenance,
|
||||
// 统一共享 cron schedule + leader lock + heartbeat,避免再引一套调度。
|
||||
type OpsCleanupService struct {
|
||||
opsRepo OpsRepository
|
||||
db *sql.DB
|
||||
redisClient *redis.Client
|
||||
cfg *config.Config
|
||||
opsRepo OpsRepository
|
||||
db *sql.DB
|
||||
redisClient *redis.Client
|
||||
cfg *config.Config
|
||||
channelMonitorSvc *ChannelMonitorService
|
||||
|
||||
instanceID string
|
||||
|
||||
@@ -57,13 +61,15 @@ func NewOpsCleanupService(
|
||||
db *sql.DB,
|
||||
redisClient *redis.Client,
|
||||
cfg *config.Config,
|
||||
channelMonitorSvc *ChannelMonitorService,
|
||||
) *OpsCleanupService {
|
||||
return &OpsCleanupService{
|
||||
opsRepo: opsRepo,
|
||||
db: db,
|
||||
redisClient: redisClient,
|
||||
cfg: cfg,
|
||||
instanceID: uuid.NewString(),
|
||||
opsRepo: opsRepo,
|
||||
db: db,
|
||||
redisClient: redisClient,
|
||||
cfg: cfg,
|
||||
channelMonitorSvc: channelMonitorSvc,
|
||||
instanceID: uuid.NewString(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,6 +254,15 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
|
||||
out.dailyPreagg = n
|
||||
}
|
||||
|
||||
// Channel monitor 每日维护(聚合昨日明细 + 软删过期明细/聚合)。
|
||||
// 失败只记日志,不影响 ops 清理的成功状态(与 ops 各步骤风格一致);
|
||||
// 维护本身已经把每步错误打到 slog,heartbeat result 不再分项记录。
|
||||
if s.channelMonitorSvc != nil {
|
||||
if err := s.channelMonitorSvc.RunDailyMaintenance(ctx); err != nil {
|
||||
logger.LegacyPrintf("service.ops_cleanup", "[OpsCleanup] channel monitor maintenance failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -262,13 +262,16 @@ func ProvideOpsAlertEvaluatorService(
|
||||
}
|
||||
|
||||
// ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled).
|
||||
// channelMonitorSvc 让维护任务(聚合 + 历史/聚合软删)跟随 ops 清理 cron 一起跑,
|
||||
// 共享 leader lock + heartbeat。
|
||||
func ProvideOpsCleanupService(
|
||||
opsRepo OpsRepository,
|
||||
db *sql.DB,
|
||||
redisClient *redis.Client,
|
||||
cfg *config.Config,
|
||||
channelMonitorSvc *ChannelMonitorService,
|
||||
) *OpsCleanupService {
|
||||
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg)
|
||||
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg, channelMonitorSvc)
|
||||
svc.Start()
|
||||
return svc
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user