fix: fix outbox watermark context expiry and add in-batch group rebuild dedup

Fixes #1691

- pollOutbox() reused a 10s context for SetOutboxWatermark after event
  processing could take much longer, causing "outbox watermark write
  failed: context deadline exceeded". The watermark never advanced so
  the same 200 events were reprocessed every poll cycle, spiking CPU.
  Now uses an independent 5s context with up to 3 retries (200ms apart).

- When multiple Codex accounts sharing the same 21-22 groups are all
  rate-limited in quick succession, each account_changed event triggered
  redundant bucket rebuild attempts for the same groups. Introduce
  batchSeenKey{groupID, platform} and thread a seen map through the
  handler chain; rebuildBucketsForPlatform skips (group, platform) pairs
  already rebuilt within the same poll batch (~80% fewer rebuild calls in
  the 5-accounts-same-groups scenario).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Elysia
2026-04-16 19:09:40 +08:00
parent e6e73b4f52
commit e44baa1094

View File

@@ -20,6 +20,14 @@ var (
const outboxEventTimeout = 2 * time.Minute
// batchSeenKey tracks which (groupID, platform) bucket sets have already been
// rebuilt within a single pollOutbox call, to avoid redundant work when multiple
// account_changed events share the same groups.
type batchSeenKey struct {
groupID int64
platform string
}
type SchedulerSnapshotService struct {
cache SchedulerCache
outboxRepo SchedulerOutboxRepository
@@ -244,9 +252,10 @@ func (s *SchedulerSnapshotService) pollOutbox() {
}
watermarkForCheck := watermark
seen := make(map[batchSeenKey]struct{})
for _, event := range events {
eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout)
err := s.handleOutboxEvent(eventCtx, event)
err := s.handleOutboxEvent(eventCtx, event, seen)
cancel()
if err != nil {
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
@@ -255,8 +264,20 @@ func (s *SchedulerSnapshotService) pollOutbox() {
}
lastID := events[len(events)-1].ID
if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil {
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err)
wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer wmCancel()
var wmErr error
for i := range 3 {
wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID)
if wmErr == nil {
break
}
if i < 2 {
time.Sleep(200 * time.Millisecond)
}
}
if wmErr != nil {
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr)
} else {
watermarkForCheck = lastID
}
@@ -264,18 +285,18 @@ func (s *SchedulerSnapshotService) pollOutbox() {
s.checkOutboxLag(ctx, events[0], watermarkForCheck)
}
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent) error {
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error {
switch event.EventType {
case SchedulerOutboxEventAccountLastUsed:
return s.handleLastUsedEvent(ctx, event.Payload)
case SchedulerOutboxEventAccountBulkChanged:
return s.handleBulkAccountEvent(ctx, event.Payload)
return s.handleBulkAccountEvent(ctx, event.Payload, seen)
case SchedulerOutboxEventAccountGroupsChanged:
return s.handleAccountEvent(ctx, event.AccountID, event.Payload)
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
case SchedulerOutboxEventAccountChanged:
return s.handleAccountEvent(ctx, event.AccountID, event.Payload)
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
case SchedulerOutboxEventGroupChanged:
return s.handleGroupEvent(ctx, event.GroupID)
return s.handleGroupEvent(ctx, event.GroupID, seen)
case SchedulerOutboxEventFullRebuild:
return s.triggerFullRebuild("outbox")
default:
@@ -309,7 +330,7 @@ func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payl
return s.cache.UpdateLastUsed(ctx, updates)
}
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any) error {
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error {
if payload == nil {
return nil
}
@@ -323,15 +344,15 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
}
ids := make([]int64, 0, len(rawIDs))
seen := make(map[int64]struct{}, len(rawIDs))
seenIDs := make(map[int64]struct{}, len(rawIDs))
for _, id := range rawIDs {
if id <= 0 {
continue
}
if _, exists := seen[id]; exists {
if _, exists := seenIDs[id]; exists {
continue
}
seen[id] = struct{}{}
seenIDs[id] = struct{}{}
ids = append(ids, id)
}
if len(ids) == 0 {
@@ -384,10 +405,10 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
for gid := range rebuildGroupSet {
rebuildGroupIDs = append(rebuildGroupIDs, gid)
}
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change")
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen)
}
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any) error {
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error {
if accountID == nil || *accountID <= 0 {
return nil
}
@@ -408,7 +429,7 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
return err
}
}
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss")
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen)
}
return err
}
@@ -420,18 +441,18 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
if len(groupIDs) == 0 {
groupIDs = account.GroupIDs
}
return s.rebuildByAccount(ctx, account, groupIDs, "account_change")
return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen)
}
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64) error {
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error {
if groupID == nil || *groupID <= 0 {
return nil
}
groupIDs := []int64{*groupID}
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change")
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen)
}
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string) error {
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
if account == nil {
return nil
}
@@ -441,21 +462,21 @@ func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account
}
var firstErr error
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason); err != nil && firstErr == nil {
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err
}
if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() {
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason); err != nil && firstErr == nil {
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err
}
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason); err != nil && firstErr == nil {
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string) error {
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
groupIDs = s.normalizeGroupIDs(groupIDs)
if len(groupIDs) == 0 {
return nil
@@ -463,19 +484,30 @@ func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupI
platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity}
var firstErr error
for _, platform := range platforms {
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason); err != nil && firstErr == nil {
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string) error {
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
if platform == "" {
return nil
}
var firstErr error
for _, gid := range groupIDs {
// Within a single poll batch, skip (groupID, platform) pairs that were
// already rebuilt. The first rebuild loads fresh DB data for all accounts
// in the group, so subsequent rebuilds for the same group+platform within
// the same batch are redundant.
if seen != nil {
key := batchSeenKey{gid, platform}
if _, exists := seen[key]; exists {
continue
}
seen[key] = struct{}{}
}
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil {
firstErr = err
}