diff --git a/backend/internal/service/scheduler_snapshot_service.go b/backend/internal/service/scheduler_snapshot_service.go index d1330abb..5ead45bc 100644 --- a/backend/internal/service/scheduler_snapshot_service.go +++ b/backend/internal/service/scheduler_snapshot_service.go @@ -20,6 +20,14 @@ var ( const outboxEventTimeout = 2 * time.Minute +// batchSeenKey tracks which (groupID, platform) bucket sets have already been +// rebuilt within a single pollOutbox call, to avoid redundant work when multiple +// account_changed events share the same groups. +type batchSeenKey struct { + groupID int64 + platform string +} + type SchedulerSnapshotService struct { cache SchedulerCache outboxRepo SchedulerOutboxRepository @@ -244,9 +252,10 @@ func (s *SchedulerSnapshotService) pollOutbox() { } watermarkForCheck := watermark + seen := make(map[batchSeenKey]struct{}) for _, event := range events { eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout) - err := s.handleOutboxEvent(eventCtx, event) + err := s.handleOutboxEvent(eventCtx, event, seen) cancel() if err != nil { logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err) @@ -255,8 +264,20 @@ func (s *SchedulerSnapshotService) pollOutbox() { } lastID := events[len(events)-1].ID - if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil { - logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err) + wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer wmCancel() + var wmErr error + for i := range 3 { + wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID) + if wmErr == nil { + break + } + if i < 2 { + time.Sleep(200 * time.Millisecond) + } + } + if wmErr != nil { + logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr) } else { watermarkForCheck = lastID } @@ -264,18 +285,18 @@ func (s *SchedulerSnapshotService) pollOutbox() { s.checkOutboxLag(ctx, events[0], watermarkForCheck) } -func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent) error { +func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error { switch event.EventType { case SchedulerOutboxEventAccountLastUsed: return s.handleLastUsedEvent(ctx, event.Payload) case SchedulerOutboxEventAccountBulkChanged: - return s.handleBulkAccountEvent(ctx, event.Payload) + return s.handleBulkAccountEvent(ctx, event.Payload, seen) case SchedulerOutboxEventAccountGroupsChanged: - return s.handleAccountEvent(ctx, event.AccountID, event.Payload) + return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen) case SchedulerOutboxEventAccountChanged: - return s.handleAccountEvent(ctx, event.AccountID, event.Payload) + return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen) case SchedulerOutboxEventGroupChanged: - return s.handleGroupEvent(ctx, event.GroupID) + return s.handleGroupEvent(ctx, event.GroupID, seen) case SchedulerOutboxEventFullRebuild: return s.triggerFullRebuild("outbox") default: @@ -309,7 +330,7 @@ func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payl return s.cache.UpdateLastUsed(ctx, updates) } -func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any) error { +func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error { if payload == nil { return nil } @@ -323,15 +344,15 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p } ids := make([]int64, 0, len(rawIDs)) - seen := make(map[int64]struct{}, len(rawIDs)) + seenIDs := make(map[int64]struct{}, len(rawIDs)) for _, id := range rawIDs { if id <= 0 { continue } - if _, exists := seen[id]; exists { + if _, exists := seenIDs[id]; exists { continue } - seen[id] = struct{}{} + seenIDs[id] = struct{}{} ids = append(ids, id) } if len(ids) == 0 { @@ -384,10 +405,10 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p for gid := range rebuildGroupSet { rebuildGroupIDs = append(rebuildGroupIDs, gid) } - return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change") + return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen) } -func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any) error { +func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error { if accountID == nil || *accountID <= 0 { return nil } @@ -408,7 +429,7 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou return err } } - return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss") + return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen) } return err } @@ -420,18 +441,18 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou if len(groupIDs) == 0 { groupIDs = account.GroupIDs } - return s.rebuildByAccount(ctx, account, groupIDs, "account_change") + return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen) } -func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64) error { +func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error { if groupID == nil || *groupID <= 0 { return nil } groupIDs := []int64{*groupID} - return s.rebuildByGroupIDs(ctx, groupIDs, "group_change") + return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen) } -func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { if account == nil { return nil } @@ -441,21 +462,21 @@ func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account } var firstErr error - if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() { - if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } - if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } } return firstErr } -func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { groupIDs = s.normalizeGroupIDs(groupIDs) if len(groupIDs) == 0 { return nil @@ -463,19 +484,30 @@ func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupI platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity} var firstErr error for _, platform := range platforms { - if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason); err != nil && firstErr == nil { + if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil { firstErr = err } } return firstErr } -func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string) error { +func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error { if platform == "" { return nil } var firstErr error for _, gid := range groupIDs { + // Within a single poll batch, skip (groupID, platform) pairs that were + // already rebuilt. The first rebuild loads fresh DB data for all accounts + // in the group, so subsequent rebuilds for the same group+platform within + // the same batch are redundant. + if seen != nil { + key := batchSeenKey{gid, platform} + if _, exists := seen[key]; exists { + continue + } + seen[key] = struct{}{} + } if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil { firstErr = err }