package service import ( "context" "sync" "time" "github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" "github.com/robfig/cron/v3" ) const scheduledTestDefaultMaxWorkers = 10 // ScheduledTestRunnerService periodically scans due test plans and executes them. type ScheduledTestRunnerService struct { planRepo ScheduledTestPlanRepository scheduledSvc *ScheduledTestService accountTestSvc *AccountTestService cfg *config.Config cron *cron.Cron startOnce sync.Once stopOnce sync.Once } // NewScheduledTestRunnerService creates a new runner. func NewScheduledTestRunnerService( planRepo ScheduledTestPlanRepository, scheduledSvc *ScheduledTestService, accountTestSvc *AccountTestService, cfg *config.Config, ) *ScheduledTestRunnerService { return &ScheduledTestRunnerService{ planRepo: planRepo, scheduledSvc: scheduledSvc, accountTestSvc: accountTestSvc, cfg: cfg, } } // Start begins the cron ticker (every minute). func (s *ScheduledTestRunnerService) Start() { if s == nil { return } s.startOnce.Do(func() { loc := time.Local if s.cfg != nil { if parsed, err := time.LoadLocation(s.cfg.Timezone); err == nil && parsed != nil { loc = parsed } } c := cron.New(cron.WithParser(scheduledTestCronParser), cron.WithLocation(loc)) _, err := c.AddFunc("* * * * *", func() { s.runScheduled() }) if err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] not started (invalid schedule): %v", err) return } s.cron = c s.cron.Start() logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] started (tick=every minute)") }) } // Stop gracefully shuts down the cron scheduler. func (s *ScheduledTestRunnerService) Stop() { if s == nil { return } s.stopOnce.Do(func() { if s.cron != nil { ctx := s.cron.Stop() select { case <-ctx.Done(): case <-time.After(3 * time.Second): logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] cron stop timed out") } } }) } func (s *ScheduledTestRunnerService) runScheduled() { // Delay 10s so execution lands at ~:10 of each minute instead of :00. time.Sleep(10 * time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() now := time.Now() plans, err := s.planRepo.ListDue(ctx, now) if err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] ListDue error: %v", err) return } if len(plans) == 0 { return } logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] found %d due plans", len(plans)) sem := make(chan struct{}, scheduledTestDefaultMaxWorkers) var wg sync.WaitGroup for _, plan := range plans { sem <- struct{}{} wg.Add(1) go func(p *ScheduledTestPlan) { defer wg.Done() defer func() { <-sem }() s.runOnePlan(ctx, p) }(plan) } wg.Wait() } func (s *ScheduledTestRunnerService) runOnePlan(ctx context.Context, plan *ScheduledTestPlan) { result, err := s.accountTestSvc.RunTestBackground(ctx, plan.AccountID, plan.ModelID) if err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d RunTestBackground error: %v", plan.ID, err) return } if err := s.scheduledSvc.SaveResult(ctx, plan.ID, plan.MaxResults, result); err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d SaveResult error: %v", plan.ID, err) } nextRun, err := computeNextRun(plan.CronExpression, time.Now()) if err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d computeNextRun error: %v", plan.ID, err) return } if err := s.planRepo.UpdateAfterRun(ctx, plan.ID, time.Now(), nextRun); err != nil { logger.LegacyPrintf("service.scheduled_test_runner", "[ScheduledTestRunner] plan=%d UpdateAfterRun error: %v", plan.ID, err) } }