Merge pull request #1702 from StarryKira/fix/outbox-watermark-context-dedup-1691
fix: fix outbox watermark context expiry and add in-batch group rebuild dedup
This commit is contained in:
@@ -20,6 +20,14 @@ var (
|
||||
|
||||
const outboxEventTimeout = 2 * time.Minute
|
||||
|
||||
// batchSeenKey tracks which (groupID, platform) bucket sets have already been
|
||||
// rebuilt within a single pollOutbox call, to avoid redundant work when multiple
|
||||
// account_changed events share the same groups.
|
||||
type batchSeenKey struct {
|
||||
groupID int64
|
||||
platform string
|
||||
}
|
||||
|
||||
type SchedulerSnapshotService struct {
|
||||
cache SchedulerCache
|
||||
outboxRepo SchedulerOutboxRepository
|
||||
@@ -244,9 +252,10 @@ func (s *SchedulerSnapshotService) pollOutbox() {
|
||||
}
|
||||
|
||||
watermarkForCheck := watermark
|
||||
seen := make(map[batchSeenKey]struct{})
|
||||
for _, event := range events {
|
||||
eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout)
|
||||
err := s.handleOutboxEvent(eventCtx, event)
|
||||
err := s.handleOutboxEvent(eventCtx, event, seen)
|
||||
cancel()
|
||||
if err != nil {
|
||||
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
|
||||
@@ -255,8 +264,20 @@ func (s *SchedulerSnapshotService) pollOutbox() {
|
||||
}
|
||||
|
||||
lastID := events[len(events)-1].ID
|
||||
if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil {
|
||||
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err)
|
||||
var wmErr error
|
||||
for i := range 3 {
|
||||
wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID)
|
||||
wmCancel()
|
||||
if wmErr == nil {
|
||||
break
|
||||
}
|
||||
if i < 2 {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
if wmErr != nil {
|
||||
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr)
|
||||
} else {
|
||||
watermarkForCheck = lastID
|
||||
}
|
||||
@@ -264,18 +285,18 @@ func (s *SchedulerSnapshotService) pollOutbox() {
|
||||
s.checkOutboxLag(ctx, events[0], watermarkForCheck)
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent) error {
|
||||
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error {
|
||||
switch event.EventType {
|
||||
case SchedulerOutboxEventAccountLastUsed:
|
||||
return s.handleLastUsedEvent(ctx, event.Payload)
|
||||
case SchedulerOutboxEventAccountBulkChanged:
|
||||
return s.handleBulkAccountEvent(ctx, event.Payload)
|
||||
return s.handleBulkAccountEvent(ctx, event.Payload, seen)
|
||||
case SchedulerOutboxEventAccountGroupsChanged:
|
||||
return s.handleAccountEvent(ctx, event.AccountID, event.Payload)
|
||||
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
|
||||
case SchedulerOutboxEventAccountChanged:
|
||||
return s.handleAccountEvent(ctx, event.AccountID, event.Payload)
|
||||
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
|
||||
case SchedulerOutboxEventGroupChanged:
|
||||
return s.handleGroupEvent(ctx, event.GroupID)
|
||||
return s.handleGroupEvent(ctx, event.GroupID, seen)
|
||||
case SchedulerOutboxEventFullRebuild:
|
||||
return s.triggerFullRebuild("outbox")
|
||||
default:
|
||||
@@ -309,7 +330,7 @@ func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payl
|
||||
return s.cache.UpdateLastUsed(ctx, updates)
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any) error {
|
||||
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error {
|
||||
if payload == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -323,15 +344,15 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
|
||||
}
|
||||
|
||||
ids := make([]int64, 0, len(rawIDs))
|
||||
seen := make(map[int64]struct{}, len(rawIDs))
|
||||
seenIDs := make(map[int64]struct{}, len(rawIDs))
|
||||
for _, id := range rawIDs {
|
||||
if id <= 0 {
|
||||
continue
|
||||
}
|
||||
if _, exists := seen[id]; exists {
|
||||
if _, exists := seenIDs[id]; exists {
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
seenIDs[id] = struct{}{}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
@@ -384,10 +405,10 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
|
||||
for gid := range rebuildGroupSet {
|
||||
rebuildGroupIDs = append(rebuildGroupIDs, gid)
|
||||
}
|
||||
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change")
|
||||
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen)
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any) error {
|
||||
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error {
|
||||
if accountID == nil || *accountID <= 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -408,7 +429,7 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
|
||||
return err
|
||||
}
|
||||
}
|
||||
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss")
|
||||
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -420,18 +441,18 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
|
||||
if len(groupIDs) == 0 {
|
||||
groupIDs = account.GroupIDs
|
||||
}
|
||||
return s.rebuildByAccount(ctx, account, groupIDs, "account_change")
|
||||
return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen)
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64) error {
|
||||
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error {
|
||||
if groupID == nil || *groupID <= 0 {
|
||||
return nil
|
||||
}
|
||||
groupIDs := []int64{*groupID}
|
||||
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change")
|
||||
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen)
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string) error {
|
||||
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
||||
if account == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -441,21 +462,21 @@ func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account
|
||||
}
|
||||
|
||||
var firstErr error
|
||||
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason); err != nil && firstErr == nil {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason); err != nil && firstErr == nil {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason); err != nil && firstErr == nil {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string) error {
|
||||
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
||||
groupIDs = s.normalizeGroupIDs(groupIDs)
|
||||
if len(groupIDs) == 0 {
|
||||
return nil
|
||||
@@ -463,19 +484,30 @@ func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupI
|
||||
platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity}
|
||||
var firstErr error
|
||||
for _, platform := range platforms {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason); err != nil && firstErr == nil {
|
||||
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string) error {
|
||||
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
||||
if platform == "" {
|
||||
return nil
|
||||
}
|
||||
var firstErr error
|
||||
for _, gid := range groupIDs {
|
||||
// Within a single poll batch, skip (groupID, platform) pairs that were
|
||||
// already rebuilt. The first rebuild loads fresh DB data for all accounts
|
||||
// in the group, so subsequent rebuilds for the same group+platform within
|
||||
// the same batch are redundant.
|
||||
if seen != nil {
|
||||
key := batchSeenKey{gid, platform}
|
||||
if _, exists := seen[key]; exists {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
}
|
||||
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user