为共享 req 客户端增加 HTTP/2 选项与缓存隔离 OpenAI OAuth 超时提升到 120s,并按协议控制强制 新增客户端池与 OAuth 客户端单测覆盖 修复 usage cleanup 相关 errcheck/ineffassign/staticcheck 并统一格式 测试: make test
316 lines
9.8 KiB
Go
316 lines
9.8 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"log"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||
)
|
||
|
||
const (
|
||
defaultDashboardAggregationTimeout = 2 * time.Minute
|
||
defaultDashboardAggregationBackfillTimeout = 30 * time.Minute
|
||
dashboardAggregationRetentionInterval = 6 * time.Hour
|
||
)
|
||
|
||
var (
|
||
// ErrDashboardBackfillDisabled 当配置禁用回填时返回。
|
||
ErrDashboardBackfillDisabled = errors.New("仪表盘聚合回填已禁用")
|
||
// ErrDashboardBackfillTooLarge 当回填跨度超过限制时返回。
|
||
ErrDashboardBackfillTooLarge = errors.New("回填时间跨度过大")
|
||
errDashboardAggregationRunning = errors.New("聚合作业正在运行")
|
||
)
|
||
|
||
// DashboardAggregationRepository 定义仪表盘预聚合仓储接口。
|
||
type DashboardAggregationRepository interface {
|
||
AggregateRange(ctx context.Context, start, end time.Time) error
|
||
// RecomputeRange 重新计算指定时间范围内的聚合数据(包含活跃用户等派生表)。
|
||
// 设计目的:当 usage_logs 被批量删除/回滚后,确保聚合表可恢复一致性。
|
||
RecomputeRange(ctx context.Context, start, end time.Time) error
|
||
GetAggregationWatermark(ctx context.Context) (time.Time, error)
|
||
UpdateAggregationWatermark(ctx context.Context, aggregatedAt time.Time) error
|
||
CleanupAggregates(ctx context.Context, hourlyCutoff, dailyCutoff time.Time) error
|
||
CleanupUsageLogs(ctx context.Context, cutoff time.Time) error
|
||
EnsureUsageLogsPartitions(ctx context.Context, now time.Time) error
|
||
}
|
||
|
||
// DashboardAggregationService 负责定时聚合与回填。
|
||
type DashboardAggregationService struct {
|
||
repo DashboardAggregationRepository
|
||
timingWheel *TimingWheelService
|
||
cfg config.DashboardAggregationConfig
|
||
running int32
|
||
lastRetentionCleanup atomic.Value // time.Time
|
||
}
|
||
|
||
// NewDashboardAggregationService 创建聚合服务。
|
||
func NewDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
|
||
var aggCfg config.DashboardAggregationConfig
|
||
if cfg != nil {
|
||
aggCfg = cfg.DashboardAgg
|
||
}
|
||
return &DashboardAggregationService{
|
||
repo: repo,
|
||
timingWheel: timingWheel,
|
||
cfg: aggCfg,
|
||
}
|
||
}
|
||
|
||
// Start 启动定时聚合作业(重启生效配置)。
|
||
func (s *DashboardAggregationService) Start() {
|
||
if s == nil || s.repo == nil || s.timingWheel == nil {
|
||
return
|
||
}
|
||
if !s.cfg.Enabled {
|
||
log.Printf("[DashboardAggregation] 聚合作业已禁用")
|
||
return
|
||
}
|
||
|
||
interval := time.Duration(s.cfg.IntervalSeconds) * time.Second
|
||
if interval <= 0 {
|
||
interval = time.Minute
|
||
}
|
||
|
||
if s.cfg.RecomputeDays > 0 {
|
||
go s.recomputeRecentDays()
|
||
}
|
||
|
||
s.timingWheel.ScheduleRecurring("dashboard:aggregation", interval, func() {
|
||
s.runScheduledAggregation()
|
||
})
|
||
log.Printf("[DashboardAggregation] 聚合作业启动 (interval=%v, lookback=%ds)", interval, s.cfg.LookbackSeconds)
|
||
if !s.cfg.BackfillEnabled {
|
||
log.Printf("[DashboardAggregation] 回填已禁用,如需补齐保留窗口以外历史数据请手动回填")
|
||
}
|
||
}
|
||
|
||
// TriggerBackfill 触发回填(异步)。
|
||
func (s *DashboardAggregationService) TriggerBackfill(start, end time.Time) error {
|
||
if s == nil || s.repo == nil {
|
||
return errors.New("聚合服务未初始化")
|
||
}
|
||
if !s.cfg.BackfillEnabled {
|
||
log.Printf("[DashboardAggregation] 回填被拒绝: backfill_enabled=false")
|
||
return ErrDashboardBackfillDisabled
|
||
}
|
||
if !end.After(start) {
|
||
return errors.New("回填时间范围无效")
|
||
}
|
||
if s.cfg.BackfillMaxDays > 0 {
|
||
maxRange := time.Duration(s.cfg.BackfillMaxDays) * 24 * time.Hour
|
||
if end.Sub(start) > maxRange {
|
||
return ErrDashboardBackfillTooLarge
|
||
}
|
||
}
|
||
|
||
go func() {
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
||
defer cancel()
|
||
if err := s.backfillRange(ctx, start, end); err != nil {
|
||
log.Printf("[DashboardAggregation] 回填失败: %v", err)
|
||
}
|
||
}()
|
||
return nil
|
||
}
|
||
|
||
// TriggerRecomputeRange 触发指定范围的重新计算(异步)。
|
||
// 与 TriggerBackfill 不同:
|
||
// - 不依赖 backfill_enabled(这是内部一致性修复)
|
||
// - 不更新 watermark(避免影响正常增量聚合游标)
|
||
func (s *DashboardAggregationService) TriggerRecomputeRange(start, end time.Time) error {
|
||
if s == nil || s.repo == nil {
|
||
return errors.New("聚合服务未初始化")
|
||
}
|
||
if !s.cfg.Enabled {
|
||
return errors.New("聚合服务已禁用")
|
||
}
|
||
if !end.After(start) {
|
||
return errors.New("重新计算时间范围无效")
|
||
}
|
||
|
||
go func() {
|
||
const maxRetries = 3
|
||
for i := 0; i < maxRetries; i++ {
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
||
err := s.recomputeRange(ctx, start, end)
|
||
cancel()
|
||
if err == nil {
|
||
return
|
||
}
|
||
if !errors.Is(err, errDashboardAggregationRunning) {
|
||
log.Printf("[DashboardAggregation] 重新计算失败: %v", err)
|
||
return
|
||
}
|
||
time.Sleep(5 * time.Second)
|
||
}
|
||
log.Printf("[DashboardAggregation] 重新计算放弃: 聚合作业持续占用")
|
||
}()
|
||
return nil
|
||
}
|
||
|
||
func (s *DashboardAggregationService) recomputeRecentDays() {
|
||
days := s.cfg.RecomputeDays
|
||
if days <= 0 {
|
||
return
|
||
}
|
||
now := time.Now().UTC()
|
||
start := now.AddDate(0, 0, -days)
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationBackfillTimeout)
|
||
defer cancel()
|
||
if err := s.backfillRange(ctx, start, now); err != nil {
|
||
log.Printf("[DashboardAggregation] 启动重算失败: %v", err)
|
||
return
|
||
}
|
||
}
|
||
|
||
func (s *DashboardAggregationService) recomputeRange(ctx context.Context, start, end time.Time) error {
|
||
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
|
||
return errDashboardAggregationRunning
|
||
}
|
||
defer atomic.StoreInt32(&s.running, 0)
|
||
|
||
jobStart := time.Now().UTC()
|
||
if err := s.repo.RecomputeRange(ctx, start, end); err != nil {
|
||
return err
|
||
}
|
||
log.Printf("[DashboardAggregation] 重新计算完成 (start=%s end=%s duration=%s)",
|
||
start.UTC().Format(time.RFC3339),
|
||
end.UTC().Format(time.RFC3339),
|
||
time.Since(jobStart).String(),
|
||
)
|
||
return nil
|
||
}
|
||
|
||
func (s *DashboardAggregationService) runScheduledAggregation() {
|
||
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
|
||
return
|
||
}
|
||
defer atomic.StoreInt32(&s.running, 0)
|
||
|
||
jobStart := time.Now().UTC()
|
||
ctx, cancel := context.WithTimeout(context.Background(), defaultDashboardAggregationTimeout)
|
||
defer cancel()
|
||
|
||
now := time.Now().UTC()
|
||
last, err := s.repo.GetAggregationWatermark(ctx)
|
||
if err != nil {
|
||
log.Printf("[DashboardAggregation] 读取水位失败: %v", err)
|
||
last = time.Unix(0, 0).UTC()
|
||
}
|
||
|
||
lookback := time.Duration(s.cfg.LookbackSeconds) * time.Second
|
||
epoch := time.Unix(0, 0).UTC()
|
||
start := last.Add(-lookback)
|
||
if !last.After(epoch) {
|
||
retentionDays := s.cfg.Retention.UsageLogsDays
|
||
if retentionDays <= 0 {
|
||
retentionDays = 1
|
||
}
|
||
start = truncateToDayUTC(now.AddDate(0, 0, -retentionDays))
|
||
} else if start.After(now) {
|
||
start = now.Add(-lookback)
|
||
}
|
||
|
||
if err := s.aggregateRange(ctx, start, now); err != nil {
|
||
log.Printf("[DashboardAggregation] 聚合失败: %v", err)
|
||
return
|
||
}
|
||
|
||
updateErr := s.repo.UpdateAggregationWatermark(ctx, now)
|
||
if updateErr != nil {
|
||
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
|
||
}
|
||
log.Printf("[DashboardAggregation] 聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
|
||
start.Format(time.RFC3339),
|
||
now.Format(time.RFC3339),
|
||
time.Since(jobStart).String(),
|
||
updateErr == nil,
|
||
)
|
||
|
||
s.maybeCleanupRetention(ctx, now)
|
||
}
|
||
|
||
func (s *DashboardAggregationService) backfillRange(ctx context.Context, start, end time.Time) error {
|
||
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
|
||
return errDashboardAggregationRunning
|
||
}
|
||
defer atomic.StoreInt32(&s.running, 0)
|
||
|
||
jobStart := time.Now().UTC()
|
||
startUTC := start.UTC()
|
||
endUTC := end.UTC()
|
||
if !endUTC.After(startUTC) {
|
||
return errors.New("回填时间范围无效")
|
||
}
|
||
|
||
cursor := truncateToDayUTC(startUTC)
|
||
for cursor.Before(endUTC) {
|
||
windowEnd := cursor.Add(24 * time.Hour)
|
||
if windowEnd.After(endUTC) {
|
||
windowEnd = endUTC
|
||
}
|
||
if err := s.aggregateRange(ctx, cursor, windowEnd); err != nil {
|
||
return err
|
||
}
|
||
cursor = windowEnd
|
||
}
|
||
|
||
updateErr := s.repo.UpdateAggregationWatermark(ctx, endUTC)
|
||
if updateErr != nil {
|
||
log.Printf("[DashboardAggregation] 更新水位失败: %v", updateErr)
|
||
}
|
||
log.Printf("[DashboardAggregation] 回填聚合完成 (start=%s end=%s duration=%s watermark_updated=%t)",
|
||
startUTC.Format(time.RFC3339),
|
||
endUTC.Format(time.RFC3339),
|
||
time.Since(jobStart).String(),
|
||
updateErr == nil,
|
||
)
|
||
|
||
s.maybeCleanupRetention(ctx, endUTC)
|
||
return nil
|
||
}
|
||
|
||
func (s *DashboardAggregationService) aggregateRange(ctx context.Context, start, end time.Time) error {
|
||
if !end.After(start) {
|
||
return nil
|
||
}
|
||
if err := s.repo.EnsureUsageLogsPartitions(ctx, end); err != nil {
|
||
log.Printf("[DashboardAggregation] 分区检查失败: %v", err)
|
||
}
|
||
return s.repo.AggregateRange(ctx, start, end)
|
||
}
|
||
|
||
func (s *DashboardAggregationService) maybeCleanupRetention(ctx context.Context, now time.Time) {
|
||
lastAny := s.lastRetentionCleanup.Load()
|
||
if lastAny != nil {
|
||
if last, ok := lastAny.(time.Time); ok && now.Sub(last) < dashboardAggregationRetentionInterval {
|
||
return
|
||
}
|
||
}
|
||
|
||
hourlyCutoff := now.AddDate(0, 0, -s.cfg.Retention.HourlyDays)
|
||
dailyCutoff := now.AddDate(0, 0, -s.cfg.Retention.DailyDays)
|
||
usageCutoff := now.AddDate(0, 0, -s.cfg.Retention.UsageLogsDays)
|
||
|
||
aggErr := s.repo.CleanupAggregates(ctx, hourlyCutoff, dailyCutoff)
|
||
if aggErr != nil {
|
||
log.Printf("[DashboardAggregation] 聚合保留清理失败: %v", aggErr)
|
||
}
|
||
usageErr := s.repo.CleanupUsageLogs(ctx, usageCutoff)
|
||
if usageErr != nil {
|
||
log.Printf("[DashboardAggregation] usage_logs 保留清理失败: %v", usageErr)
|
||
}
|
||
if aggErr == nil && usageErr == nil {
|
||
s.lastRetentionCleanup.Store(now)
|
||
}
|
||
}
|
||
|
||
func truncateToDayUTC(t time.Time) time.Time {
|
||
t = t.UTC()
|
||
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
|
||
}
|